Merge remote-tracking branch 'origin/feat/agent_work_orders' into ui/agent-work-order

This commit is contained in:
sean-eskerium
2025-10-25 14:32:33 -04:00
136 changed files with 17959 additions and 145 deletions

View File

@@ -0,0 +1,11 @@
"""Pytest configuration for agent_work_orders tests"""
import pytest
@pytest.fixture(autouse=True)
def reset_structlog():
"""Reset structlog configuration for each test"""
import structlog
structlog.reset_defaults()

View File

@@ -0,0 +1,7 @@
[pytest]
testpaths = .
python_files = test_*.py
python_classes = Test*
python_functions = test_*
pythonpath = ../..
asyncio_mode = auto

View File

@@ -0,0 +1,304 @@
"""Tests for Agent Executor"""
import asyncio
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.agent_executor.agent_cli_executor import AgentCLIExecutor
def test_build_command():
"""Test building Claude CLI command with all flags"""
executor = AgentCLIExecutor(cli_path="claude")
# Create a temporary command file with placeholders
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Test command content with args: $1 and $2")
command_file_path = f.name
try:
command, prompt_text = executor.build_command(
command_file_path=command_file_path,
args=["issue-42", "wo-test123"],
model="sonnet",
)
# Verify command includes required flags
assert "claude" in command
assert "--print" in command
assert "--output-format" in command
assert "stream-json" in command
assert "--verbose" in command # Required for stream-json with --print
assert "--model" in command # Model specification
assert "sonnet" in command # Model value
assert "--dangerously-skip-permissions" in command # Automation
# Note: --max-turns is optional (None by default = unlimited)
# Verify prompt text includes command content and placeholder replacements
assert "Test command content" in prompt_text
assert "issue-42" in prompt_text
assert "wo-test123" in prompt_text
assert "$1" not in prompt_text # Placeholders should be replaced
assert "$2" not in prompt_text
finally:
Path(command_file_path).unlink()
def test_build_command_no_args():
"""Test building command without arguments"""
executor = AgentCLIExecutor()
# Create a temporary command file
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Command without args")
command_file_path = f.name
try:
command, prompt_text = executor.build_command(
command_file_path=command_file_path,
model="opus",
)
assert "claude" in command
assert "--verbose" in command
assert "--model" in command
assert "opus" in command
assert "Command without args" in prompt_text
# Note: --max-turns is optional (None by default = unlimited)
finally:
Path(command_file_path).unlink()
def test_build_command_with_custom_max_turns():
"""Test building command with custom max-turns configuration"""
with patch("src.agent_work_orders.agent_executor.agent_cli_executor.config") as mock_config:
mock_config.CLAUDE_CLI_PATH = "claude"
mock_config.CLAUDE_CLI_VERBOSE = True
mock_config.CLAUDE_CLI_MAX_TURNS = 50
mock_config.CLAUDE_CLI_SKIP_PERMISSIONS = True
executor = AgentCLIExecutor()
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Test content")
command_file_path = f.name
try:
command, _ = executor.build_command(
command_file_path=command_file_path,
model="sonnet",
)
assert "--max-turns 50" in command
finally:
Path(command_file_path).unlink()
def test_build_command_missing_file():
"""Test building command with non-existent file"""
executor = AgentCLIExecutor()
with pytest.raises(ValueError, match="Failed to read command file"):
executor.build_command(
command_file_path="/nonexistent/path/to/command.md",
model="sonnet",
)
@pytest.mark.asyncio
async def test_execute_async_success():
"""Test successful command execution with prompt via stdin"""
executor = AgentCLIExecutor()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(
b'{"session_id": "session-123", "type": "init"}\n{"type": "result"}',
b"",
)
)
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await executor.execute_async(
command="claude --print --output-format stream-json --verbose --max-turns 20 --dangerously-skip-permissions",
working_directory="/tmp",
timeout_seconds=30,
prompt_text="Test prompt content",
)
assert result.success is True
assert result.exit_code == 0
assert result.session_id == "session-123"
assert result.stdout is not None
@pytest.mark.asyncio
async def test_execute_async_failure():
"""Test failed command execution"""
executor = AgentCLIExecutor()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(
return_value=(b"", b"Error: Command failed")
)
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await executor.execute_async(
command="claude --print --output-format stream-json --verbose",
working_directory="/tmp",
prompt_text="Test prompt",
)
assert result.success is False
assert result.exit_code == 1
assert result.error_message is not None
@pytest.mark.asyncio
async def test_execute_async_timeout():
"""Test command execution timeout"""
executor = AgentCLIExecutor()
# Mock subprocess that times out
mock_process = MagicMock()
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
async def mock_communicate(input=None):
await asyncio.sleep(10) # Longer than timeout
return (b"", b"")
mock_process.communicate = mock_communicate
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await executor.execute_async(
command="claude --print --output-format stream-json --verbose",
working_directory="/tmp",
timeout_seconds=0.1, # Very short timeout
prompt_text="Test prompt",
)
assert result.success is False
assert result.exit_code == -1
assert "timed out" in result.error_message.lower()
def test_extract_session_id():
"""Test extracting session ID from JSONL output"""
executor = AgentCLIExecutor()
jsonl_output = """
{"type": "init", "session_id": "session-abc123"}
{"type": "message", "content": "Hello"}
{"type": "result"}
"""
session_id = executor._extract_session_id(jsonl_output)
assert session_id == "session-abc123"
def test_extract_session_id_not_found():
"""Test extracting session ID when not present"""
executor = AgentCLIExecutor()
jsonl_output = """
{"type": "message", "content": "Hello"}
{"type": "result"}
"""
session_id = executor._extract_session_id(jsonl_output)
assert session_id is None
def test_extract_session_id_invalid_json():
"""Test extracting session ID with invalid JSON"""
executor = AgentCLIExecutor()
jsonl_output = "Not valid JSON"
session_id = executor._extract_session_id(jsonl_output)
assert session_id is None
@pytest.mark.asyncio
async def test_execute_async_extracts_result_text():
"""Test that result text is extracted from JSONL output"""
executor = AgentCLIExecutor()
# Mock subprocess that returns JSONL with result
jsonl_output = '{"type":"session_started","session_id":"test-123"}\n{"type":"result","result":"/feature","is_error":false}'
with patch("asyncio.create_subprocess_shell") as mock_subprocess:
mock_process = AsyncMock()
mock_process.communicate = AsyncMock(return_value=(jsonl_output.encode(), b""))
mock_process.returncode = 0
mock_subprocess.return_value = mock_process
result = await executor.execute_async(
"claude --print",
"/tmp/test",
prompt_text="test prompt",
work_order_id="wo-test",
)
assert result.success is True
assert result.result_text == "/feature"
assert result.session_id == "test-123"
assert '{"type":"result"' in result.stdout
def test_build_command_replaces_arguments_placeholder():
"""Test that $ARGUMENTS placeholder is replaced with actual arguments"""
executor = AgentCLIExecutor()
# Create temp command file with $ARGUMENTS
import os
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Classify this issue:\n\n$ARGUMENTS")
temp_file = f.name
try:
command, prompt = executor.build_command(
temp_file, args=['{"title": "Add feature", "body": "description"}']
)
assert "$ARGUMENTS" not in prompt
assert '{"title": "Add feature"' in prompt
assert "Classify this issue:" in prompt
finally:
os.unlink(temp_file)
def test_build_command_replaces_positional_arguments():
"""Test that $1, $2, $3 are replaced with positional arguments"""
executor = AgentCLIExecutor()
import os
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Issue: $1\nWorkOrder: $2\nData: $3")
temp_file = f.name
try:
command, prompt = executor.build_command(
temp_file, args=["42", "wo-test", '{"title":"Test"}']
)
assert "$1" not in prompt
assert "$2" not in prompt
assert "$3" not in prompt
assert "Issue: 42" in prompt
assert "WorkOrder: wo-test" in prompt
assert 'Data: {"title":"Test"}' in prompt
finally:
os.unlink(temp_file)

View File

@@ -0,0 +1,419 @@
"""Integration Tests for API Endpoints"""
from datetime import datetime
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from src.agent_work_orders.models import (
AgentWorkflowType,
AgentWorkOrderStatus,
SandboxType,
)
from src.agent_work_orders.server import app
client = TestClient(app)
def test_health_endpoint():
"""Test health check endpoint - should be healthy when feature is disabled"""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
# When feature is disabled (default), health check returns healthy
# When feature is enabled but dependencies missing, returns degraded
# We accept both as valid test outcomes
assert data["status"] in ["healthy", "degraded"]
assert data["service"] == "agent-work-orders"
assert "enabled" in data
# If disabled, should have explanatory message
if not data.get("enabled"):
assert "message" in data
def test_create_agent_work_order():
"""Test creating an agent work order"""
with patch("src.agent_work_orders.api.routes.orchestrator") as mock_orchestrator:
mock_orchestrator.execute_workflow = AsyncMock()
request_data = {
"repository_url": "https://github.com/owner/repo",
"sandbox_type": "git_branch",
"workflow_type": "agent_workflow_plan",
"user_request": "Add user authentication feature",
"github_issue_number": "42",
}
response = client.post("/api/agent-work-orders/", json=request_data)
assert response.status_code == 201
data = response.json()
assert "agent_work_order_id" in data
assert data["status"] == "pending"
assert data["agent_work_order_id"].startswith("wo-")
def test_create_agent_work_order_without_issue():
"""Test creating work order without issue number"""
with patch("src.agent_work_orders.api.routes.orchestrator") as mock_orchestrator:
mock_orchestrator.execute_workflow = AsyncMock()
request_data = {
"repository_url": "https://github.com/owner/repo",
"sandbox_type": "git_branch",
"workflow_type": "agent_workflow_plan",
"user_request": "Fix the login bug where users can't sign in",
}
response = client.post("/api/agent-work-orders/", json=request_data)
assert response.status_code == 201
data = response.json()
assert "agent_work_order_id" in data
def test_create_agent_work_order_invalid_data():
"""Test creating work order with invalid data"""
request_data = {
"repository_url": "https://github.com/owner/repo",
# Missing required fields
}
response = client.post("/api/agent-work-orders/", json=request_data)
assert response.status_code == 422 # Validation error
def test_list_agent_work_orders_empty():
"""Test listing work orders when none exist"""
# Reset state repository
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.list = AsyncMock(return_value=[])
response = client.get("/api/agent-work-orders/")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) == 0
def test_list_agent_work_orders_with_data():
"""Test listing work orders with data"""
from src.agent_work_orders.models import AgentWorkOrderState
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"github_issue_number": "42",
"status": AgentWorkOrderStatus.RUNNING,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.list = AsyncMock(return_value=[(state, metadata)])
response = client.get("/api/agent-work-orders/")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["agent_work_order_id"] == "wo-test123"
assert data[0]["status"] == "running"
def test_list_agent_work_orders_with_status_filter():
"""Test listing work orders with status filter"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.list = AsyncMock(return_value=[])
response = client.get("/api/agent-work-orders/?status=running")
assert response.status_code == 200
mock_repo.list.assert_called_once()
def test_get_agent_work_order():
"""Test getting a specific work order"""
from src.agent_work_orders.models import AgentWorkOrderState
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"github_issue_number": "42",
"status": AgentWorkOrderStatus.COMPLETED,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"github_pull_request_url": "https://github.com/owner/repo/pull/42",
"git_commit_count": 5,
"git_files_changed": 10,
"error_message": None,
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=(state, metadata))
response = client.get("/api/agent-work-orders/wo-test123")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert data["status"] == "completed"
assert data["git_branch_name"] == "feat-wo-test123"
assert data["github_pull_request_url"] == "https://github.com/owner/repo/pull/42"
def test_get_agent_work_order_not_found():
"""Test getting a non-existent work order"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=None)
response = client.get("/api/agent-work-orders/wo-nonexistent")
assert response.status_code == 404
def test_get_git_progress():
"""Test getting git progress"""
from src.agent_work_orders.models import AgentWorkOrderState
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.RUNNING,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"git_commit_count": 3,
"git_files_changed": 7,
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=(state, metadata))
response = client.get("/api/agent-work-orders/wo-test123/git-progress")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert data["git_commit_count"] == 3
assert data["git_files_changed"] == 7
assert data["git_branch_name"] == "feat-wo-test123"
def test_get_git_progress_not_found():
"""Test getting git progress for non-existent work order"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=None)
response = client.get("/api/agent-work-orders/wo-nonexistent/git-progress")
assert response.status_code == 404
def test_send_prompt_to_agent():
"""Test sending prompt to agent (placeholder)"""
request_data = {
"agent_work_order_id": "wo-test123",
"prompt_text": "Continue with the next step",
}
response = client.post("/api/agent-work-orders/wo-test123/prompt", json=request_data)
# Currently returns success but doesn't actually send (Phase 2+)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
def test_get_logs():
"""Test getting logs from log buffer"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
# Mock work order exists
mock_repo.get = AsyncMock(return_value=({"id": "wo-test123"}, {}))
response = client.get("/api/agent-work-orders/wo-test123/logs")
assert response.status_code == 200
data = response.json()
assert "log_entries" in data
assert "total" in data
assert "limit" in data
assert "offset" in data
def test_verify_repository_success():
"""Test repository verification success"""
from src.agent_work_orders.models import GitHubRepository
mock_repo_info = GitHubRepository(
name="repo",
owner="owner",
default_branch="main",
url="https://github.com/owner/repo",
)
with patch("src.agent_work_orders.api.routes.github_client") as mock_client:
mock_client.verify_repository_access = AsyncMock(return_value=True)
mock_client.get_repository_info = AsyncMock(return_value=mock_repo_info)
request_data = {"repository_url": "https://github.com/owner/repo"}
response = client.post("/api/agent-work-orders/github/verify-repository", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["is_accessible"] is True
assert data["repository_name"] == "repo"
assert data["repository_owner"] == "owner"
assert data["default_branch"] == "main"
def test_verify_repository_failure():
"""Test repository verification failure"""
with patch("src.agent_work_orders.api.routes.github_client") as mock_client:
mock_client.verify_repository_access = AsyncMock(return_value=False)
request_data = {"repository_url": "https://github.com/owner/nonexistent"}
response = client.post("/api/agent-work-orders/github/verify-repository", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["is_accessible"] is False
assert data["error_message"] is not None
def test_get_agent_work_order_steps():
"""Test getting step history for a work order"""
from src.agent_work_orders.models import AgentWorkOrderState, StepExecutionResult, StepHistory, WorkflowStep
# Create step history
step_history = StepHistory(
agent_work_order_id="wo-test123",
steps=[
StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test-feature",
duration_seconds=1.0,
),
StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=True,
output="Plan created",
duration_seconds=5.0,
),
],
)
# Mock state for get() call
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
)
metadata = {
"sandbox_type": SandboxType.GIT_BRANCH,
"github_issue_number": None,
"status": AgentWorkOrderStatus.RUNNING,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=(state, metadata))
mock_repo.get_step_history = AsyncMock(return_value=step_history)
response = client.get("/api/agent-work-orders/wo-test123/steps")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert len(data["steps"]) == 2
assert data["steps"][0]["step"] == "create-branch"
assert data["steps"][0]["agent_name"] == "BranchCreator"
assert data["steps"][0]["success"] is True
assert data["steps"][1]["step"] == "planning"
assert data["steps"][1]["agent_name"] == "Planner"
def test_get_agent_work_order_steps_not_found():
"""Test getting step history for non-existent work order"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=None)
mock_repo.get_step_history = AsyncMock(return_value=None)
response = client.get("/api/agent-work-orders/wo-nonexistent/steps")
assert response.status_code == 404
data = response.json()
assert "not found" in data["detail"].lower()
def test_get_agent_work_order_steps_empty():
"""Test getting empty step history"""
from src.agent_work_orders.models import AgentWorkOrderState, StepHistory
step_history = StepHistory(agent_work_order_id="wo-test123", steps=[])
# Mock state for get() call
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"sandbox_type": SandboxType.GIT_BRANCH,
"github_issue_number": None,
"status": AgentWorkOrderStatus.PENDING,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=(state, metadata))
mock_repo.get_step_history = AsyncMock(return_value=step_history)
response = client.get("/api/agent-work-orders/wo-test123/steps")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert len(data["steps"]) == 0

View File

@@ -0,0 +1,84 @@
"""Tests for Command Loader"""
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from src.agent_work_orders.command_loader.claude_command_loader import (
ClaudeCommandLoader,
)
from src.agent_work_orders.models import CommandNotFoundError
def test_load_command_success():
"""Test loading an existing command file"""
with TemporaryDirectory() as tmpdir:
# Create a test command file
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
command_file = commands_dir / "agent_workflow_plan.md"
command_file.write_text("# Test Command\n\nThis is a test command.")
loader = ClaudeCommandLoader(str(commands_dir))
command_path = loader.load_command("agent_workflow_plan")
assert command_path == str(command_file)
assert Path(command_path).exists()
def test_load_command_not_found():
"""Test loading a non-existent command file"""
with TemporaryDirectory() as tmpdir:
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
loader = ClaudeCommandLoader(str(commands_dir))
with pytest.raises(CommandNotFoundError) as exc_info:
loader.load_command("nonexistent_command")
assert "Command file not found" in str(exc_info.value)
def test_list_available_commands():
"""Test listing all available commands"""
with TemporaryDirectory() as tmpdir:
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
# Create multiple command files
(commands_dir / "agent_workflow_plan.md").write_text("Command 1")
(commands_dir / "agent_workflow_build.md").write_text("Command 2")
(commands_dir / "agent_workflow_test.md").write_text("Command 3")
loader = ClaudeCommandLoader(str(commands_dir))
commands = loader.list_available_commands()
assert len(commands) == 3
assert "agent_workflow_plan" in commands
assert "agent_workflow_build" in commands
assert "agent_workflow_test" in commands
def test_list_available_commands_empty_directory():
"""Test listing commands when directory is empty"""
with TemporaryDirectory() as tmpdir:
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
loader = ClaudeCommandLoader(str(commands_dir))
commands = loader.list_available_commands()
assert len(commands) == 0
def test_list_available_commands_nonexistent_directory():
"""Test listing commands when directory doesn't exist"""
with TemporaryDirectory() as tmpdir:
nonexistent_dir = Path(tmpdir) / "nonexistent"
loader = ClaudeCommandLoader(str(nonexistent_dir))
commands = loader.list_available_commands()
assert len(commands) == 0

View File

@@ -0,0 +1,187 @@
"""Tests for agent work orders configuration
Tests configuration loading, service discovery, and URL construction.
"""
import importlib
from unittest.mock import patch
import pytest
@pytest.mark.unit
def test_config_default_values():
"""Test configuration default values"""
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.CLAUDE_CLI_PATH == "claude"
assert config.GH_CLI_PATH == "gh"
assert config.EXECUTION_TIMEOUT == 3600
assert config.LOG_LEVEL == "INFO"
assert config.SERVICE_DISCOVERY_MODE == "local"
@pytest.mark.unit
@patch.dict("os.environ", {"SERVICE_DISCOVERY_MODE": "local"})
def test_config_local_service_discovery():
"""Test local service discovery mode"""
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.SERVICE_DISCOVERY_MODE == "local"
assert config.get_archon_server_url() == "http://localhost:8181"
assert config.get_archon_mcp_url() == "http://localhost:8051"
@pytest.mark.unit
@patch.dict("os.environ", {"SERVICE_DISCOVERY_MODE": "docker_compose"})
def test_config_docker_service_discovery():
"""Test docker_compose service discovery mode"""
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.SERVICE_DISCOVERY_MODE == "docker_compose"
assert config.get_archon_server_url() == "http://archon-server:8181"
assert config.get_archon_mcp_url() == "http://archon-mcp:8051"
@pytest.mark.unit
@patch.dict("os.environ", {"ARCHON_SERVER_URL": "http://custom-server:9999"})
def test_config_explicit_server_url_override():
"""Test explicit ARCHON_SERVER_URL overrides service discovery"""
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.get_archon_server_url() == "http://custom-server:9999"
@pytest.mark.unit
@patch.dict("os.environ", {"ARCHON_MCP_URL": "http://custom-mcp:7777"})
def test_config_explicit_mcp_url_override():
"""Test explicit ARCHON_MCP_URL overrides service discovery"""
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.get_archon_mcp_url() == "http://custom-mcp:7777"
@pytest.mark.unit
@patch.dict("os.environ", {"CLAUDE_CLI_PATH": "/custom/path/to/claude"})
def test_config_claude_cli_path_override():
"""Test CLAUDE_CLI_PATH can be overridden"""
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.CLAUDE_CLI_PATH == "/custom/path/to/claude"
@pytest.mark.unit
@patch.dict("os.environ", {"LOG_LEVEL": "DEBUG"})
def test_config_log_level_override():
"""Test LOG_LEVEL can be overridden"""
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.LOG_LEVEL == "DEBUG"
@pytest.mark.unit
@patch.dict("os.environ", {"CORS_ORIGINS": "http://example.com,http://test.com"})
def test_config_cors_origins_override():
"""Test CORS_ORIGINS can be overridden"""
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.CORS_ORIGINS == "http://example.com,http://test.com"
@pytest.mark.unit
def test_config_ensure_temp_dir(tmp_path):
"""Test ensure_temp_dir creates directory"""
import src.agent_work_orders.config as config_module
# Use tmp_path for testing
test_temp_dir = str(tmp_path / "test-agent-work-orders")
with patch.dict("os.environ", {"AGENT_WORK_ORDER_TEMP_DIR": test_temp_dir}):
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
temp_dir = config.ensure_temp_dir()
assert temp_dir.exists()
assert temp_dir.is_dir()
assert str(temp_dir) == test_temp_dir
@pytest.mark.unit
@patch.dict(
"os.environ",
{
"SERVICE_DISCOVERY_MODE": "docker_compose",
"ARCHON_SERVER_URL": "http://explicit-server:8888",
},
)
def test_config_explicit_url_overrides_discovery_mode():
"""Test explicit URL takes precedence over service discovery mode"""
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
# Even in docker_compose mode, explicit URL should win
assert config.SERVICE_DISCOVERY_MODE == "docker_compose"
assert config.get_archon_server_url() == "http://explicit-server:8888"
@pytest.mark.unit
def test_config_state_storage_type():
"""Test STATE_STORAGE_TYPE configuration"""
import os
# Temporarily set the environment variable
old_value = os.environ.get("STATE_STORAGE_TYPE")
os.environ["STATE_STORAGE_TYPE"] = "file"
try:
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.STATE_STORAGE_TYPE == "file"
finally:
# Restore old value
if old_value is None:
os.environ.pop("STATE_STORAGE_TYPE", None)
else:
os.environ["STATE_STORAGE_TYPE"] = old_value
@pytest.mark.unit
@patch.dict("os.environ", {"FILE_STATE_DIRECTORY": "/custom/state/dir"})
def test_config_file_state_directory():
"""Test FILE_STATE_DIRECTORY configuration"""
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
config = AgentWorkOrdersConfig()
assert config.FILE_STATE_DIRECTORY == "/custom/state/dir"

View File

@@ -0,0 +1,203 @@
"""Tests for GitHub Integration"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.github_integration.github_client import GitHubClient
from src.agent_work_orders.models import GitHubOperationError
@pytest.mark.asyncio
async def test_verify_repository_access_success():
"""Test successful repository verification"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(return_value=(b"Repository info", b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await client.verify_repository_access("https://github.com/owner/repo")
assert result is True
@pytest.mark.asyncio
async def test_verify_repository_access_failure():
"""Test failed repository verification"""
client = GitHubClient()
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: Not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await client.verify_repository_access("https://github.com/owner/nonexistent")
assert result is False
@pytest.mark.asyncio
async def test_get_repository_info_success():
"""Test getting repository information"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_output = b'{"name": "repo", "owner": {"login": "owner"}, "defaultBranchRef": {"name": "main"}}'
mock_process.communicate = AsyncMock(return_value=(mock_output, b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
repo_info = await client.get_repository_info("https://github.com/owner/repo")
assert repo_info.name == "repo"
assert repo_info.owner == "owner"
assert repo_info.default_branch == "main"
assert repo_info.url == "https://github.com/owner/repo"
@pytest.mark.asyncio
async def test_get_repository_info_failure():
"""Test failed repository info retrieval"""
client = GitHubClient()
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: Not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(GitHubOperationError):
await client.get_repository_info("https://github.com/owner/nonexistent")
@pytest.mark.asyncio
async def test_create_pull_request_success():
"""Test successful PR creation"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(b"https://github.com/owner/repo/pull/42", b"")
)
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
pr = await client.create_pull_request(
repository_url="https://github.com/owner/repo",
head_branch="feat-wo-test123",
base_branch="main",
title="Test PR",
body="PR body",
)
assert pr.pull_request_url == "https://github.com/owner/repo/pull/42"
assert pr.pull_request_number == 42
assert pr.title == "Test PR"
assert pr.head_branch == "feat-wo-test123"
assert pr.base_branch == "main"
@pytest.mark.asyncio
async def test_create_pull_request_failure():
"""Test failed PR creation"""
client = GitHubClient()
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: PR creation failed"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(GitHubOperationError):
await client.create_pull_request(
repository_url="https://github.com/owner/repo",
head_branch="feat-wo-test123",
base_branch="main",
title="Test PR",
body="PR body",
)
def test_parse_repository_url_https():
"""Test parsing HTTPS repository URL"""
client = GitHubClient()
owner, repo = client._parse_repository_url("https://github.com/owner/repo")
assert owner == "owner"
assert repo == "repo"
def test_parse_repository_url_https_with_git():
"""Test parsing HTTPS repository URL with .git"""
client = GitHubClient()
owner, repo = client._parse_repository_url("https://github.com/owner/repo.git")
assert owner == "owner"
assert repo == "repo"
def test_parse_repository_url_short_format():
"""Test parsing short format repository URL"""
client = GitHubClient()
owner, repo = client._parse_repository_url("owner/repo")
assert owner == "owner"
assert repo == "repo"
def test_parse_repository_url_invalid():
"""Test parsing invalid repository URL"""
client = GitHubClient()
with pytest.raises(ValueError):
client._parse_repository_url("invalid-url")
with pytest.raises(ValueError):
client._parse_repository_url("owner/repo/extra")
@pytest.mark.asyncio
async def test_get_issue_success():
"""Test successful GitHub issue fetch"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
issue_json = json.dumps({
"number": 42,
"title": "Add login feature",
"body": "Users need to log in with email and password",
"state": "open",
"url": "https://github.com/owner/repo/issues/42"
})
mock_process.communicate = AsyncMock(return_value=(issue_json.encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
issue_data = await client.get_issue("https://github.com/owner/repo", "42")
assert issue_data["number"] == 42
assert issue_data["title"] == "Add login feature"
assert issue_data["state"] == "open"
@pytest.mark.asyncio
async def test_get_issue_failure():
"""Test failed GitHub issue fetch"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Issue not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(GitHubOperationError, match="Failed to fetch issue"):
await client.get_issue("https://github.com/owner/repo", "999")

View File

@@ -0,0 +1,32 @@
"""Tests for ID Generator"""
from src.agent_work_orders.utils.id_generator import (
generate_sandbox_identifier,
generate_work_order_id,
)
def test_generate_work_order_id_format():
"""Test work order ID format"""
work_order_id = generate_work_order_id()
assert work_order_id.startswith("wo-")
assert len(work_order_id) == 11 # "wo-" + 8 hex chars
# Verify it's hex
hex_part = work_order_id[3:]
assert all(c in "0123456789abcdef" for c in hex_part)
def test_generate_work_order_id_uniqueness():
"""Test that generated IDs are unique"""
ids = [generate_work_order_id() for _ in range(100)]
assert len(ids) == len(set(ids)) # All unique
def test_generate_sandbox_identifier():
"""Test sandbox identifier generation"""
work_order_id = "wo-test123"
sandbox_id = generate_sandbox_identifier(work_order_id)
assert sandbox_id == "sandbox-wo-test123"
assert sandbox_id.startswith("sandbox-")

View File

@@ -0,0 +1,309 @@
"""Unit tests for WorkOrderLogBuffer
Tests circular buffer behavior, filtering, thread safety, and cleanup.
"""
import threading
import time
from datetime import datetime
import pytest
from src.agent_work_orders.utils.log_buffer import WorkOrderLogBuffer
@pytest.mark.unit
def test_add_and_get_logs():
"""Test adding and retrieving logs"""
buffer = WorkOrderLogBuffer()
# Add logs
buffer.add_log("wo-123", "info", "step_started", step="planning")
buffer.add_log("wo-123", "info", "step_completed", step="planning", duration=12.5)
# Get all logs
logs = buffer.get_logs("wo-123")
assert len(logs) == 2
assert logs[0]["event"] == "step_started"
assert logs[0]["step"] == "planning"
assert logs[1]["event"] == "step_completed"
assert logs[1]["duration"] == 12.5
@pytest.mark.unit
def test_circular_buffer_overflow():
"""Test that buffer keeps only last MAX_LOGS_PER_WORK_ORDER logs"""
buffer = WorkOrderLogBuffer()
# Add more logs than max capacity
for i in range(1500):
buffer.add_log("wo-123", "info", f"event_{i}", index=i)
logs = buffer.get_logs("wo-123")
# Should only have last 1000
assert len(logs) == buffer.MAX_LOGS_PER_WORK_ORDER
# First log should be index 500 (1500 - 1000)
assert logs[0]["index"] == 500
# Last log should be index 1499
assert logs[-1]["index"] == 1499
@pytest.mark.unit
def test_filter_by_level():
"""Test filtering logs by log level"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "info_event")
buffer.add_log("wo-123", "warning", "warning_event")
buffer.add_log("wo-123", "error", "error_event")
buffer.add_log("wo-123", "info", "another_info_event")
# Filter by level (case-insensitive)
info_logs = buffer.get_logs("wo-123", level="info")
assert len(info_logs) == 2
assert all(log["level"] == "info" for log in info_logs)
error_logs = buffer.get_logs("wo-123", level="error")
assert len(error_logs) == 1
assert error_logs[0]["event"] == "error_event"
# Test case insensitivity
warning_logs = buffer.get_logs("wo-123", level="WARNING")
assert len(warning_logs) == 1
@pytest.mark.unit
def test_filter_by_step():
"""Test filtering logs by step name"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1", step="planning")
buffer.add_log("wo-123", "info", "event2", step="execute")
buffer.add_log("wo-123", "info", "event3", step="planning")
planning_logs = buffer.get_logs("wo-123", step="planning")
assert len(planning_logs) == 2
assert all(log["step"] == "planning" for log in planning_logs)
execute_logs = buffer.get_logs("wo-123", step="execute")
assert len(execute_logs) == 1
@pytest.mark.unit
def test_filter_by_timestamp():
"""Test filtering logs by timestamp"""
buffer = WorkOrderLogBuffer()
# Add logs with explicit timestamps
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
ts3 = "2025-10-23T12:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1)
buffer.add_log("wo-123", "info", "event2", timestamp=ts2)
buffer.add_log("wo-123", "info", "event3", timestamp=ts3)
# Get logs since 11:00
recent_logs = buffer.get_logs("wo-123", since=ts2)
assert len(recent_logs) == 1 # Only ts3 is after ts2
assert recent_logs[0]["event"] == "event3"
@pytest.mark.unit
def test_multiple_work_orders():
"""Test that logs from different work orders are isolated"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1")
buffer.add_log("wo-456", "info", "event2")
buffer.add_log("wo-123", "info", "event3")
logs_123 = buffer.get_logs("wo-123")
logs_456 = buffer.get_logs("wo-456")
assert len(logs_123) == 2
assert len(logs_456) == 1
assert all(log["work_order_id"] == "wo-123" for log in logs_123)
assert all(log["work_order_id"] == "wo-456" for log in logs_456)
@pytest.mark.unit
def test_clear_work_order():
"""Test clearing logs for a specific work order"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1")
buffer.add_log("wo-456", "info", "event2")
assert buffer.get_log_count("wo-123") == 1
assert buffer.get_log_count("wo-456") == 1
buffer.clear_work_order("wo-123")
assert buffer.get_log_count("wo-123") == 0
assert buffer.get_log_count("wo-456") == 1 # Other work order unaffected
@pytest.mark.unit
def test_thread_safety():
"""Test concurrent adds from multiple threads"""
buffer = WorkOrderLogBuffer()
num_threads = 10
logs_per_thread = 100
def add_logs(thread_id):
for i in range(logs_per_thread):
buffer.add_log("wo-123", "info", f"thread_{thread_id}_event_{i}")
threads = [threading.Thread(target=add_logs, args=(i,)) for i in range(num_threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# Should have all logs (or max capacity if exceeded)
logs = buffer.get_logs("wo-123")
expected = min(num_threads * logs_per_thread, buffer.MAX_LOGS_PER_WORK_ORDER)
assert len(logs) == expected
@pytest.mark.unit
def test_cleanup_old_work_orders():
"""Test automatic cleanup of old work orders"""
buffer = WorkOrderLogBuffer()
# Add logs for work orders
buffer.add_log("wo-old", "info", "event1")
buffer.add_log("wo-new", "info", "event2")
# Manually set old work order's last activity to past threshold
threshold_time = time.time() - (buffer.CLEANUP_THRESHOLD_HOURS * 3600 + 100)
buffer._last_activity["wo-old"] = threshold_time
# Run cleanup
removed = buffer.cleanup_old_work_orders()
assert removed == 1
assert buffer.get_log_count("wo-old") == 0
assert buffer.get_log_count("wo-new") == 1
@pytest.mark.unit
def test_get_logs_with_pagination():
"""Test pagination with limit and offset"""
buffer = WorkOrderLogBuffer()
for i in range(50):
buffer.add_log("wo-123", "info", f"event_{i}", index=i)
# Get first page
page1 = buffer.get_logs("wo-123", limit=10, offset=0)
assert len(page1) == 10
assert page1[0]["index"] == 0
# Get second page
page2 = buffer.get_logs("wo-123", limit=10, offset=10)
assert len(page2) == 10
assert page2[0]["index"] == 10
# Get partial last page
page_last = buffer.get_logs("wo-123", limit=10, offset=45)
assert len(page_last) == 5
@pytest.mark.unit
def test_get_logs_since_convenience_method():
"""Test get_logs_since convenience method"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1, step="planning")
buffer.add_log("wo-123", "info", "event2", timestamp=ts2, step="execute")
logs = buffer.get_logs_since("wo-123", ts1, step="execute")
assert len(logs) == 1
assert logs[0]["event"] == "event2"
@pytest.mark.unit
def test_get_work_order_count():
"""Test getting count of tracked work orders"""
buffer = WorkOrderLogBuffer()
assert buffer.get_work_order_count() == 0
buffer.add_log("wo-123", "info", "event1")
assert buffer.get_work_order_count() == 1
buffer.add_log("wo-456", "info", "event2")
assert buffer.get_work_order_count() == 2
buffer.clear_work_order("wo-123")
assert buffer.get_work_order_count() == 1
@pytest.mark.unit
def test_empty_buffer_returns_empty_list():
"""Test that getting logs from empty buffer returns empty list"""
buffer = WorkOrderLogBuffer()
logs = buffer.get_logs("wo-nonexistent")
assert logs == []
assert buffer.get_log_count("wo-nonexistent") == 0
@pytest.mark.unit
def test_timestamp_auto_generation():
"""Test that timestamps are auto-generated if not provided"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1")
logs = buffer.get_logs("wo-123")
assert len(logs) == 1
assert "timestamp" in logs[0]
# Verify it's a valid ISO format timestamp
datetime.fromisoformat(logs[0]["timestamp"].replace("Z", "+00:00"))
@pytest.mark.unit
@pytest.mark.asyncio
async def test_cleanup_task_lifecycle():
"""Test starting and stopping cleanup task"""
buffer = WorkOrderLogBuffer()
# Start cleanup task
await buffer.start_cleanup_task(interval_seconds=1)
assert buffer._cleanup_task is not None
# Starting again should be idempotent
await buffer.start_cleanup_task()
assert buffer._cleanup_task is not None
# Stop cleanup task
await buffer.stop_cleanup_task()
assert buffer._cleanup_task is None
@pytest.mark.unit
def test_combined_filters():
"""Test using multiple filters together"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1, step="planning")
buffer.add_log("wo-123", "error", "event2", timestamp=ts2, step="planning")
buffer.add_log("wo-123", "info", "event3", timestamp=ts2, step="execute")
# Filter by level AND step AND timestamp
logs = buffer.get_logs("wo-123", level="info", step="execute", since=ts1)
assert len(logs) == 1
assert logs[0]["event"] == "event3"

View File

@@ -0,0 +1,294 @@
"""Tests for Agent Work Orders Models"""
from datetime import datetime
from src.agent_work_orders.models import (
AgentWorkflowPhase,
AgentWorkflowType,
AgentWorkOrder,
AgentWorkOrderState,
AgentWorkOrderStatus,
CommandExecutionResult,
CreateAgentWorkOrderRequest,
SandboxType,
StepExecutionResult,
StepHistory,
WorkflowStep,
)
def test_agent_work_order_status_enum():
"""Test AgentWorkOrderStatus enum values"""
assert AgentWorkOrderStatus.PENDING.value == "pending"
assert AgentWorkOrderStatus.RUNNING.value == "running"
assert AgentWorkOrderStatus.COMPLETED.value == "completed"
assert AgentWorkOrderStatus.FAILED.value == "failed"
def test_agent_workflow_type_enum():
"""Test AgentWorkflowType enum values"""
assert AgentWorkflowType.PLAN.value == "agent_workflow_plan"
def test_sandbox_type_enum():
"""Test SandboxType enum values"""
assert SandboxType.GIT_BRANCH.value == "git_branch"
assert SandboxType.GIT_WORKTREE.value == "git_worktree"
assert SandboxType.E2B.value == "e2b"
assert SandboxType.DAGGER.value == "dagger"
def test_agent_workflow_phase_enum():
"""Test AgentWorkflowPhase enum values"""
assert AgentWorkflowPhase.PLANNING.value == "planning"
assert AgentWorkflowPhase.COMPLETED.value == "completed"
def test_agent_work_order_state_creation():
"""Test creating AgentWorkOrderState"""
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
assert state.agent_work_order_id == "wo-test123"
assert state.repository_url == "https://github.com/owner/repo"
assert state.sandbox_identifier == "sandbox-wo-test123"
assert state.git_branch_name is None
assert state.agent_session_id is None
def test_agent_work_order_creation():
"""Test creating complete AgentWorkOrder"""
now = datetime.now()
work_order = AgentWorkOrder(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
sandbox_type=SandboxType.GIT_BRANCH,
github_issue_number="42",
status=AgentWorkOrderStatus.RUNNING,
current_phase=AgentWorkflowPhase.PLANNING,
created_at=now,
updated_at=now,
github_pull_request_url=None,
git_commit_count=0,
git_files_changed=0,
error_message=None,
)
assert work_order.agent_work_order_id == "wo-test123"
assert work_order.sandbox_type == SandboxType.GIT_BRANCH
assert work_order.status == AgentWorkOrderStatus.RUNNING
assert work_order.current_phase == AgentWorkflowPhase.PLANNING
def test_create_agent_work_order_request():
"""Test CreateAgentWorkOrderRequest validation"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Add user authentication feature",
github_issue_number="42",
)
assert request.repository_url == "https://github.com/owner/repo"
assert request.sandbox_type == SandboxType.GIT_BRANCH
assert request.user_request == "Add user authentication feature"
assert request.github_issue_number == "42"
assert request.selected_commands == ["create-branch", "planning", "execute", "commit", "create-pr"]
def test_create_agent_work_order_request_optional_fields():
"""Test CreateAgentWorkOrderRequest with optional fields"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Fix the login bug",
)
assert request.user_request == "Fix the login bug"
assert request.github_issue_number is None
assert request.selected_commands == ["create-branch", "planning", "execute", "commit", "create-pr"]
def test_create_agent_work_order_request_with_user_request():
"""Test CreateAgentWorkOrderRequest with user_request field"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Add user authentication with JWT tokens",
)
assert request.user_request == "Add user authentication with JWT tokens"
assert request.repository_url == "https://github.com/owner/repo"
assert request.github_issue_number is None
assert request.selected_commands == ["create-branch", "planning", "execute", "commit", "create-pr"]
def test_create_agent_work_order_request_with_github_issue():
"""Test CreateAgentWorkOrderRequest with both user_request and issue number"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Implement the feature described in issue #42",
github_issue_number="42",
)
assert request.user_request == "Implement the feature described in issue #42"
assert request.github_issue_number == "42"
assert request.selected_commands == ["create-branch", "planning", "execute", "commit", "create-pr"]
def test_workflow_step_enum():
"""Test WorkflowStep enum values"""
assert WorkflowStep.CREATE_BRANCH.value == "create-branch"
assert WorkflowStep.PLANNING.value == "planning"
assert WorkflowStep.EXECUTE.value == "execute"
assert WorkflowStep.COMMIT.value == "commit"
assert WorkflowStep.CREATE_PR.value == "create-pr"
assert WorkflowStep.REVIEW.value == "prp-review"
def test_step_execution_result_success():
"""Test creating successful StepExecutionResult"""
result = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/add-feature",
duration_seconds=1.5,
session_id="session-123",
)
assert result.step == WorkflowStep.CREATE_BRANCH
assert result.agent_name == "BranchCreator"
assert result.success is True
assert result.output == "feat/add-feature"
assert result.error_message is None
assert result.duration_seconds == 1.5
assert result.session_id == "session-123"
assert isinstance(result.timestamp, datetime)
def test_step_execution_result_failure():
"""Test creating failed StepExecutionResult"""
result = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=False,
error_message="Planning failed: timeout",
duration_seconds=30.0,
)
assert result.step == WorkflowStep.PLANNING
assert result.agent_name == "Planner"
assert result.success is False
assert result.output is None
assert result.error_message == "Planning failed: timeout"
assert result.duration_seconds == 30.0
assert result.session_id is None
def test_step_history_creation():
"""Test creating StepHistory"""
history = StepHistory(agent_work_order_id="wo-test123", steps=[])
assert history.agent_work_order_id == "wo-test123"
assert len(history.steps) == 0
def test_step_history_with_steps():
"""Test StepHistory with multiple steps"""
step1 = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/add-feature",
duration_seconds=1.0,
)
step2 = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=True,
output="PRPs/features/add-feature.md",
duration_seconds=5.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[step1, step2])
assert history.agent_work_order_id == "wo-test123"
assert len(history.steps) == 2
assert history.steps[0].step == WorkflowStep.CREATE_BRANCH
assert history.steps[1].step == WorkflowStep.PLANNING
def test_step_history_get_current_step_initial():
"""Test get_current_step returns CREATE_BRANCH when no steps"""
history = StepHistory(agent_work_order_id="wo-test123", steps=[])
assert history.get_current_step() == WorkflowStep.CREATE_BRANCH
def test_step_history_get_current_step_retry_failed():
"""Test get_current_step returns same step when failed"""
failed_step = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=False,
error_message="Planning failed",
duration_seconds=5.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[failed_step])
assert history.get_current_step() == WorkflowStep.PLANNING
def test_step_history_get_current_step_next():
"""Test get_current_step returns next step after success"""
branch_step = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/add-feature",
duration_seconds=1.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[branch_step])
assert history.get_current_step() == WorkflowStep.PLANNING
def test_command_execution_result_with_result_text():
"""Test CommandExecutionResult includes result_text field"""
result = CommandExecutionResult(
success=True,
stdout='{"type":"result","result":"/feature"}',
result_text="/feature",
stderr=None,
exit_code=0,
session_id="session-123",
)
assert result.result_text == "/feature"
assert result.stdout == '{"type":"result","result":"/feature"}'
assert result.success is True
def test_command_execution_result_without_result_text():
"""Test CommandExecutionResult works without result_text (backward compatibility)"""
result = CommandExecutionResult(
success=True,
stdout="raw output",
stderr=None,
exit_code=0,
)
assert result.result_text is None
assert result.stdout == "raw output"

View File

@@ -0,0 +1,295 @@
"""Tests for Port Allocation with 10-Port Ranges"""
from unittest.mock import patch
import pytest
from src.agent_work_orders.utils.port_allocation import (
MAX_CONCURRENT_WORK_ORDERS,
PORT_BASE,
PORT_RANGE_SIZE,
create_ports_env_file,
find_available_port_range,
get_port_range_for_work_order,
is_port_available,
)
@pytest.mark.unit
def test_get_port_range_for_work_order_deterministic():
"""Test that same work order ID always gets same port range"""
work_order_id = "wo-abc123"
start1, end1 = get_port_range_for_work_order(work_order_id)
start2, end2 = get_port_range_for_work_order(work_order_id)
assert start1 == start2
assert end1 == end2
assert end1 - start1 + 1 == PORT_RANGE_SIZE # 10 ports
assert PORT_BASE <= start1 < PORT_BASE + (MAX_CONCURRENT_WORK_ORDERS * PORT_RANGE_SIZE)
@pytest.mark.unit
def test_get_port_range_for_work_order_size():
"""Test that port range is exactly 10 ports"""
work_order_id = "wo-test123"
start, end = get_port_range_for_work_order(work_order_id)
assert end - start + 1 == 10
@pytest.mark.unit
def test_get_port_range_for_work_order_uses_different_slots():
"""Test that the hash function can produce different slot assignments"""
# Create very different IDs that should hash to different values
ids = ["wo-aaaaaaaa", "wo-zzzzz999", "wo-12345678", "wo-abcdefgh", "wo-99999999"]
ranges = [get_port_range_for_work_order(wid) for wid in ids]
# Check all ranges are valid
for start, end in ranges:
assert end - start + 1 == 10
assert PORT_BASE <= start < PORT_BASE + (MAX_CONCURRENT_WORK_ORDERS * PORT_RANGE_SIZE)
# It's theoretically possible all hash to same slot, but unlikely with very different IDs
# The important thing is the function works, not that it always distributes perfectly
assert len(ranges) == 5 # We got 5 results
@pytest.mark.unit
def test_get_port_range_for_work_order_fallback_hash():
"""Test fallback to hash when base36 conversion fails"""
# Non-alphanumeric work order ID
work_order_id = "--------"
start, end = get_port_range_for_work_order(work_order_id)
# Should still work via hash fallback
assert end - start + 1 == 10
assert PORT_BASE <= start < PORT_BASE + (MAX_CONCURRENT_WORK_ORDERS * PORT_RANGE_SIZE)
@pytest.mark.unit
def test_is_port_available_mock_available():
"""Test port availability check when port is available"""
with patch("socket.socket") as mock_socket:
mock_socket_instance = mock_socket.return_value.__enter__.return_value
mock_socket_instance.bind.return_value = None # Successful bind
result = is_port_available(9000)
assert result is True
mock_socket_instance.bind.assert_called_once_with(('localhost', 9000))
@pytest.mark.unit
def test_is_port_available_mock_unavailable():
"""Test port availability check when port is unavailable"""
with patch("socket.socket") as mock_socket:
mock_socket_instance = mock_socket.return_value.__enter__.return_value
mock_socket_instance.bind.side_effect = OSError("Port in use")
result = is_port_available(9000)
assert result is False
@pytest.mark.unit
def test_find_available_port_range_all_available():
"""Test finding port range when all ports are available"""
work_order_id = "wo-test123"
# Mock all ports as available
with patch(
"src.agent_work_orders.utils.port_allocation.is_port_available",
return_value=True,
):
start, end, available = find_available_port_range(work_order_id)
# Should get the deterministic range
expected_start, expected_end = get_port_range_for_work_order(work_order_id)
assert start == expected_start
assert end == expected_end
assert len(available) == 10 # All 10 ports available
@pytest.mark.unit
def test_find_available_port_range_some_unavailable():
"""Test finding port range when some ports are unavailable"""
work_order_id = "wo-test123"
expected_start, expected_end = get_port_range_for_work_order(work_order_id)
# Mock: first, third, and fifth ports unavailable, rest available
def mock_availability(port):
offset = port - expected_start
return offset not in [0, 2, 4] # 7 out of 10 available
with patch(
"src.agent_work_orders.utils.port_allocation.is_port_available",
side_effect=mock_availability,
):
start, end, available = find_available_port_range(work_order_id)
# Should still use this range (>= 5 ports available)
assert start == expected_start
assert end == expected_end
assert len(available) == 7 # 7 ports available
@pytest.mark.unit
def test_find_available_port_range_fallback_to_next_slot():
"""Test fallback to next slot when first slot has too few ports"""
work_order_id = "wo-test123"
expected_start, expected_end = get_port_range_for_work_order(work_order_id)
# Mock: First slot has only 3 available (< 5 needed), second slot has all
def mock_availability(port):
if expected_start <= port <= expected_end:
# First slot: only 3 available
offset = port - expected_start
return offset < 3
else:
# Other slots: all available
return True
with patch(
"src.agent_work_orders.utils.port_allocation.is_port_available",
side_effect=mock_availability,
):
start, end, available = find_available_port_range(work_order_id)
# Should use a different slot
assert (start, end) != (expected_start, expected_end)
assert len(available) >= 5 # At least half available
@pytest.mark.unit
def test_find_available_port_range_exhausted():
"""Test that RuntimeError is raised when all port ranges are exhausted"""
work_order_id = "wo-test123"
# Mock all ports as unavailable
with patch(
"src.agent_work_orders.utils.port_allocation.is_port_available",
return_value=False,
):
with pytest.raises(RuntimeError) as exc_info:
find_available_port_range(work_order_id)
assert "No suitable port range found" in str(exc_info.value)
@pytest.mark.unit
def test_create_ports_env_file(tmp_path):
"""Test creating .ports.env file with port range"""
worktree_path = str(tmp_path)
start_port = 9000
end_port = 9009
available_ports = list(range(9000, 9010)) # All 10 ports
create_ports_env_file(worktree_path, start_port, end_port, available_ports)
ports_env_path = tmp_path / ".ports.env"
assert ports_env_path.exists()
content = ports_env_path.read_text()
# Check range information
assert "PORT_RANGE_START=9000" in content
assert "PORT_RANGE_END=9009" in content
assert "PORT_RANGE_SIZE=10" in content
# Check individual ports
assert "PORT_0=9000" in content
assert "PORT_1=9001" in content
assert "PORT_9=9009" in content
# Check backward compatible aliases
assert "BACKEND_PORT=9000" in content
assert "FRONTEND_PORT=9001" in content
assert "VITE_BACKEND_URL=http://localhost:9000" in content
@pytest.mark.unit
def test_create_ports_env_file_partial_availability(tmp_path):
"""Test creating .ports.env with some ports unavailable"""
worktree_path = str(tmp_path)
start_port = 9000
end_port = 9009
# Only some ports available
available_ports = [9000, 9001, 9003, 9004, 9006, 9008, 9009] # 7 ports
create_ports_env_file(worktree_path, start_port, end_port, available_ports)
ports_env_path = tmp_path / ".ports.env"
content = ports_env_path.read_text()
# Range should still show full range
assert "PORT_RANGE_START=9000" in content
assert "PORT_RANGE_END=9009" in content
# But only available ports should be numbered
assert "PORT_0=9000" in content
assert "PORT_1=9001" in content
assert "PORT_2=9003" in content # Third available port is 9003
assert "PORT_6=9009" in content # Seventh available port is 9009
# Backward compatible aliases should use first two available
assert "BACKEND_PORT=9000" in content
assert "FRONTEND_PORT=9001" in content
@pytest.mark.unit
def test_create_ports_env_file_overwrites(tmp_path):
"""Test that creating .ports.env file overwrites existing file"""
worktree_path = str(tmp_path)
ports_env_path = tmp_path / ".ports.env"
# Create existing file with old content
ports_env_path.write_text("OLD_CONTENT=true\n")
# Create new file
create_ports_env_file(
worktree_path, 9000, 9009, list(range(9000, 9010))
)
content = ports_env_path.read_text()
assert "OLD_CONTENT" not in content
assert "PORT_RANGE_START=9000" in content
@pytest.mark.unit
def test_port_ranges_do_not_overlap():
"""Test that consecutive work order slots have non-overlapping port ranges"""
# Create work order IDs that will map to different slots
ids = [f"wo-{i:08x}" for i in range(5)] # Create 5 different IDs
ranges = [get_port_range_for_work_order(wid) for wid in ids]
# Check that ranges don't overlap
for i, (start1, end1) in enumerate(ranges):
for j, (start2, end2) in enumerate(ranges):
if i != j:
# Ranges should not overlap
overlaps = not (end1 < start2 or end2 < start1)
# If they overlap, they must be the same range (hash collision)
if overlaps:
assert start1 == start2 and end1 == end2
@pytest.mark.unit
def test_max_concurrent_work_orders():
"""Test that we support MAX_CONCURRENT_WORK_ORDERS distinct ranges"""
# Generate MAX_CONCURRENT_WORK_ORDERS + 1 IDs
ids = [f"wo-{i:08x}" for i in range(MAX_CONCURRENT_WORK_ORDERS + 1)]
ranges = [get_port_range_for_work_order(wid) for wid in ids]
unique_ranges = set(ranges)
# Should have at most MAX_CONCURRENT_WORK_ORDERS unique ranges
assert len(unique_ranges) <= MAX_CONCURRENT_WORK_ORDERS
# And they should all fit within the allocated port space
for start, end in unique_ranges:
assert PORT_BASE <= start < PORT_BASE + (MAX_CONCURRENT_WORK_ORDERS * PORT_RANGE_SIZE)
assert PORT_BASE < end <= PORT_BASE + (MAX_CONCURRENT_WORK_ORDERS * PORT_RANGE_SIZE)

View File

@@ -0,0 +1,454 @@
"""Unit Tests for RepositoryConfigRepository
Tests all CRUD operations for configured repositories.
"""
import pytest
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
from src.agent_work_orders.models import ConfiguredRepository, SandboxType, WorkflowStep
from src.agent_work_orders.state_manager.repository_config_repository import RepositoryConfigRepository
@pytest.fixture
def mock_supabase_client():
"""Mock Supabase client with chainable methods"""
mock = MagicMock()
# Set up method chaining: table().select().order().execute()
mock.table.return_value = mock
mock.select.return_value = mock
mock.order.return_value = mock
mock.insert.return_value = mock
mock.update.return_value = mock
mock.delete.return_value = mock
mock.eq.return_value = mock
# Execute returns response with data attribute
mock.execute.return_value = MagicMock(data=[])
return mock
@pytest.fixture
def repository_instance(mock_supabase_client):
"""Create RepositoryConfigRepository instance with mocked client"""
with patch('src.agent_work_orders.state_manager.repository_config_repository.get_supabase_client', return_value=mock_supabase_client):
return RepositoryConfigRepository()
@pytest.mark.unit
@pytest.mark.asyncio
async def test_list_repositories_returns_all_repositories(repository_instance, mock_supabase_client):
"""Test listing all repositories"""
# Mock response data
mock_data = [
{
"id": "repo-1",
"repository_url": "https://github.com/test/repo1",
"display_name": "test/repo1",
"owner": "test",
"default_branch": "main",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch", "planning", "execute"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}
]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
# Call method
repositories = await repository_instance.list_repositories()
# Assertions
assert len(repositories) == 1
assert isinstance(repositories[0], ConfiguredRepository)
assert repositories[0].id == "repo-1"
assert repositories[0].repository_url == "https://github.com/test/repo1"
# Verify Supabase client methods called correctly
mock_supabase_client.table.assert_called_once_with("archon_configured_repositories")
mock_supabase_client.select.assert_called_once()
@pytest.mark.unit
@pytest.mark.asyncio
async def test_list_repositories_with_empty_result(repository_instance, mock_supabase_client):
"""Test listing repositories when database is empty"""
mock_supabase_client.execute.return_value = MagicMock(data=[])
repositories = await repository_instance.list_repositories()
assert repositories == []
assert isinstance(repositories, list)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_get_repository_success(repository_instance, mock_supabase_client):
"""Test getting a single repository by ID"""
mock_data = [{
"id": "repo-1",
"repository_url": "https://github.com/test/repo1",
"display_name": "test/repo1",
"owner": "test",
"default_branch": "main",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch", "planning"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repository = await repository_instance.get_repository("repo-1")
assert repository is not None
assert isinstance(repository, ConfiguredRepository)
assert repository.id == "repo-1"
mock_supabase_client.eq.assert_called_with("id", "repo-1")
@pytest.mark.unit
@pytest.mark.asyncio
async def test_get_repository_not_found(repository_instance, mock_supabase_client):
"""Test getting a repository that doesn't exist"""
mock_supabase_client.execute.return_value = MagicMock(data=[])
repository = await repository_instance.get_repository("nonexistent-id")
assert repository is None
@pytest.mark.unit
@pytest.mark.asyncio
async def test_create_repository_success(repository_instance, mock_supabase_client):
"""Test creating a new repository"""
mock_data = [{
"id": "new-repo-id",
"repository_url": "https://github.com/test/newrepo",
"display_name": "test/newrepo",
"owner": "test",
"default_branch": "main",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch", "planning", "execute", "commit", "create-pr"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repository = await repository_instance.create_repository(
repository_url="https://github.com/test/newrepo",
display_name="test/newrepo",
owner="test",
default_branch="main",
is_verified=True,
)
assert repository is not None
assert repository.id == "new-repo-id"
assert repository.repository_url == "https://github.com/test/newrepo"
assert repository.is_verified is True
mock_supabase_client.insert.assert_called_once()
@pytest.mark.unit
@pytest.mark.asyncio
async def test_create_repository_with_verification(repository_instance, mock_supabase_client):
"""Test creating a repository with is_verified=True sets last_verified_at"""
mock_data = [{
"id": "verified-repo",
"repository_url": "https://github.com/test/verified",
"display_name": None,
"owner": None,
"default_branch": None,
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch", "planning", "execute", "commit", "create-pr"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repository = await repository_instance.create_repository(
repository_url="https://github.com/test/verified",
is_verified=True,
)
assert repository.is_verified is True
assert repository.last_verified_at is not None
@pytest.mark.unit
@pytest.mark.asyncio
async def test_update_repository_success(repository_instance, mock_supabase_client):
"""Test updating a repository"""
mock_data = [{
"id": "repo-1",
"repository_url": "https://github.com/test/repo1",
"display_name": "test/repo1",
"owner": "test",
"default_branch": "main",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_branch", # Updated value (valid enum)
"default_commands": ["create-branch", "execute"], # Updated value
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repository = await repository_instance.update_repository(
"repo-1",
default_sandbox_type=SandboxType.GIT_BRANCH,
default_commands=[WorkflowStep.CREATE_BRANCH, WorkflowStep.EXECUTE],
)
assert repository is not None
assert repository.id == "repo-1"
mock_supabase_client.update.assert_called_once()
mock_supabase_client.eq.assert_called_with("id", "repo-1")
@pytest.mark.unit
@pytest.mark.asyncio
async def test_update_repository_not_found(repository_instance, mock_supabase_client):
"""Test updating a repository that doesn't exist"""
mock_supabase_client.execute.return_value = MagicMock(data=[])
repository = await repository_instance.update_repository(
"nonexistent-id",
default_sandbox_type=SandboxType.GIT_WORKTREE,
)
assert repository is None
@pytest.mark.unit
@pytest.mark.asyncio
async def test_delete_repository_success(repository_instance, mock_supabase_client):
"""Test deleting a repository"""
mock_data = [{"id": "repo-1"}] # Supabase returns deleted row
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
deleted = await repository_instance.delete_repository("repo-1")
assert deleted is True
mock_supabase_client.delete.assert_called_once()
mock_supabase_client.eq.assert_called_with("id", "repo-1")
@pytest.mark.unit
@pytest.mark.asyncio
async def test_delete_repository_not_found(repository_instance, mock_supabase_client):
"""Test deleting a repository that doesn't exist"""
mock_supabase_client.execute.return_value = MagicMock(data=[])
deleted = await repository_instance.delete_repository("nonexistent-id")
assert deleted is False
# =====================================================
# Additional Error Handling Tests
# =====================================================
@pytest.mark.unit
@pytest.mark.asyncio
async def test_row_to_model_with_invalid_workflow_step(repository_instance):
"""Test _row_to_model raises ValueError for invalid workflow step"""
invalid_row = {
"id": "test-id",
"repository_url": "https://github.com/test/repo",
"display_name": "test/repo",
"owner": "test",
"default_branch": "main",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_worktree",
"default_commands": ["invalid-command", "planning"], # Invalid command
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}
with pytest.raises(ValueError) as exc_info:
repository_instance._row_to_model(invalid_row)
assert "invalid workflow steps" in str(exc_info.value).lower()
assert "test-id" in str(exc_info.value)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_row_to_model_with_invalid_sandbox_type(repository_instance):
"""Test _row_to_model raises ValueError for invalid sandbox type"""
invalid_row = {
"id": "test-id",
"repository_url": "https://github.com/test/repo",
"display_name": "test/repo",
"owner": "test",
"default_branch": "main",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "invalid_type", # Invalid type
"default_commands": ["create-branch", "planning"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}
with pytest.raises(ValueError) as exc_info:
repository_instance._row_to_model(invalid_row)
assert "invalid sandbox type" in str(exc_info.value).lower()
assert "test-id" in str(exc_info.value)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_create_repository_with_all_fields(repository_instance, mock_supabase_client):
"""Test creating a repository with all optional fields populated"""
mock_data = [{
"id": "full-repo-id",
"repository_url": "https://github.com/test/fullrepo",
"display_name": "test/fullrepo",
"owner": "test",
"default_branch": "develop",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch", "planning", "execute", "commit", "create-pr"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repository = await repository_instance.create_repository(
repository_url="https://github.com/test/fullrepo",
display_name="test/fullrepo",
owner="test",
default_branch="develop",
is_verified=True,
)
assert repository.id == "full-repo-id"
assert repository.display_name == "test/fullrepo"
assert repository.owner == "test"
assert repository.default_branch == "develop"
assert repository.is_verified is True
assert repository.last_verified_at is not None
@pytest.mark.unit
@pytest.mark.asyncio
async def test_update_repository_with_multiple_fields(repository_instance, mock_supabase_client):
"""Test updating repository with multiple fields at once"""
mock_data = [{
"id": "repo-1",
"repository_url": "https://github.com/test/repo1",
"display_name": "updated-name",
"owner": "updated-owner",
"default_branch": "updated-branch",
"is_verified": True,
"last_verified_at": datetime.now().isoformat(),
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repository = await repository_instance.update_repository(
"repo-1",
display_name="updated-name",
owner="updated-owner",
default_branch="updated-branch",
is_verified=True,
)
assert repository is not None
assert repository.display_name == "updated-name"
assert repository.owner == "updated-owner"
assert repository.default_branch == "updated-branch"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_list_repositories_with_multiple_items(repository_instance, mock_supabase_client):
"""Test listing multiple repositories"""
mock_data = [
{
"id": f"repo-{i}",
"repository_url": f"https://github.com/test/repo{i}",
"display_name": f"test/repo{i}",
"owner": "test",
"default_branch": "main",
"is_verified": i % 2 == 0, # Alternate verified status
"last_verified_at": datetime.now().isoformat() if i % 2 == 0 else None,
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch", "planning", "execute"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}
for i in range(5)
]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repositories = await repository_instance.list_repositories()
assert len(repositories) == 5
assert all(isinstance(repo, ConfiguredRepository) for repo in repositories)
# Check verification status alternates
assert repositories[0].is_verified is True
assert repositories[1].is_verified is False
@pytest.mark.unit
@pytest.mark.asyncio
async def test_create_repository_database_error(repository_instance, mock_supabase_client):
"""Test create_repository handles database errors properly"""
mock_supabase_client.execute.side_effect = Exception("Database connection failed")
with pytest.raises(Exception) as exc_info:
await repository_instance.create_repository(
repository_url="https://github.com/test/repo",
is_verified=False,
)
assert "Database connection failed" in str(exc_info.value)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_get_repository_with_minimal_data(repository_instance, mock_supabase_client):
"""Test getting repository with minimal fields (all optionals null)"""
mock_data = [{
"id": "minimal-repo",
"repository_url": "https://github.com/test/minimal",
"display_name": None,
"owner": None,
"default_branch": None,
"is_verified": False,
"last_verified_at": None,
"default_sandbox_type": "git_worktree",
"default_commands": ["create-branch"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
}]
mock_supabase_client.execute.return_value = MagicMock(data=mock_data)
repository = await repository_instance.get_repository("minimal-repo")
assert repository is not None
assert repository.display_name is None
assert repository.owner is None
assert repository.default_branch is None
assert repository.is_verified is False
assert repository.last_verified_at is None

View File

@@ -0,0 +1,199 @@
"""Tests for Sandbox Manager"""
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.models import SandboxSetupError, SandboxType
from src.agent_work_orders.sandbox_manager.git_branch_sandbox import GitBranchSandbox
from src.agent_work_orders.sandbox_manager.sandbox_factory import SandboxFactory
@pytest.mark.asyncio
async def test_git_branch_sandbox_setup_success():
"""Test successful sandbox setup"""
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(return_value=(b"Cloning...", b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await sandbox.setup()
assert Path(sandbox.working_dir).name == "sandbox-test"
@pytest.mark.asyncio
async def test_git_branch_sandbox_setup_failure():
"""Test failed sandbox setup"""
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: Repository not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(SandboxSetupError) as exc_info:
await sandbox.setup()
assert "Failed to clone repository" in str(exc_info.value)
@pytest.mark.asyncio
async def test_git_branch_sandbox_execute_command_success():
"""Test successful command execution in sandbox"""
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(return_value=(b"Command output", b""))
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await sandbox.execute_command("echo 'test'", timeout=10)
assert result.success is True
assert result.exit_code == 0
assert result.stdout == "Command output"
@pytest.mark.asyncio
async def test_git_branch_sandbox_execute_command_failure():
"""Test failed command execution in sandbox"""
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Command failed"))
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await sandbox.execute_command("false", timeout=10)
assert result.success is False
assert result.exit_code == 1
assert result.error_message is not None
@pytest.mark.asyncio
async def test_git_branch_sandbox_execute_command_timeout():
"""Test command execution timeout in sandbox"""
import asyncio
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
# Mock subprocess that times out
mock_process = MagicMock()
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
async def mock_communicate():
await asyncio.sleep(10)
return (b"", b"")
mock_process.communicate = mock_communicate
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await sandbox.execute_command("sleep 100", timeout=0.1)
assert result.success is False
assert result.exit_code == -1
assert "timed out" in result.error_message.lower()
@pytest.mark.asyncio
async def test_git_branch_sandbox_get_git_branch_name():
"""Test getting current git branch name"""
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
with patch(
"src.agent_work_orders.sandbox_manager.git_branch_sandbox.get_current_branch",
new=AsyncMock(return_value="feat-wo-test123"),
):
branch = await sandbox.get_git_branch_name()
assert branch == "feat-wo-test123"
@pytest.mark.asyncio
async def test_git_branch_sandbox_cleanup():
"""Test sandbox cleanup"""
with TemporaryDirectory() as tmpdir:
test_dir = Path(tmpdir) / "sandbox-test"
test_dir.mkdir()
(test_dir / "test.txt").write_text("test")
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = str(test_dir)
await sandbox.cleanup()
assert not test_dir.exists()
def test_sandbox_factory_git_branch():
"""Test creating git branch sandbox via factory"""
factory = SandboxFactory()
sandbox = factory.create_sandbox(
sandbox_type=SandboxType.GIT_BRANCH,
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
assert isinstance(sandbox, GitBranchSandbox)
assert sandbox.repository_url == "https://github.com/owner/repo"
assert sandbox.sandbox_identifier == "sandbox-test"
def test_sandbox_factory_not_implemented():
"""Test creating unsupported sandbox types"""
factory = SandboxFactory()
with pytest.raises(NotImplementedError):
factory.create_sandbox(
sandbox_type=SandboxType.E2B,
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
with pytest.raises(NotImplementedError):
factory.create_sandbox(
sandbox_type=SandboxType.DAGGER,
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)

View File

@@ -0,0 +1,203 @@
"""Tests for standalone agent work orders server
Tests the server entry point, health checks, and service discovery configuration.
"""
from unittest.mock import AsyncMock, Mock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.mark.unit
def test_server_health_endpoint():
"""Test health check endpoint returns correct structure"""
from src.agent_work_orders.server import app
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["service"] == "agent-work-orders"
assert data["version"] == "0.1.0"
assert "status" in data
assert "dependencies" in data
@pytest.mark.unit
def test_server_root_endpoint():
"""Test root endpoint returns service information"""
from src.agent_work_orders.server import app
client = TestClient(app)
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert data["service"] == "agent-work-orders"
assert data["version"] == "0.1.0"
assert "docs" in data
assert "health" in data
assert "api" in data
@pytest.mark.unit
@patch("src.agent_work_orders.server.subprocess.run")
def test_health_check_claude_cli_available(mock_run):
"""Test health check detects Claude CLI availability"""
from src.agent_work_orders.server import app
# Mock successful Claude CLI execution
mock_run.return_value = Mock(returncode=0, stdout="2.0.21\n", stderr="")
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["dependencies"]["claude_cli"]["available"] is True
assert "version" in data["dependencies"]["claude_cli"]
@pytest.mark.unit
@patch("src.agent_work_orders.server.subprocess.run")
def test_health_check_claude_cli_unavailable(mock_run):
"""Test health check handles missing Claude CLI"""
from src.agent_work_orders.server import app
# Mock Claude CLI not found
mock_run.side_effect = FileNotFoundError("claude not found")
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["dependencies"]["claude_cli"]["available"] is False
assert "error" in data["dependencies"]["claude_cli"]
@pytest.mark.unit
@patch("src.agent_work_orders.server.shutil.which")
def test_health_check_git_availability(mock_which):
"""Test health check detects git availability"""
from src.agent_work_orders.server import app
# Mock git available
mock_which.return_value = "/usr/bin/git"
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["dependencies"]["git"]["available"] is True
@pytest.mark.unit
@patch("src.agent_work_orders.server.httpx.AsyncClient")
@patch.dict("os.environ", {"ARCHON_SERVER_URL": "http://localhost:8181"})
async def test_health_check_server_connectivity(mock_client_class):
"""Test health check validates server connectivity"""
from src.agent_work_orders.server import health_check
# Mock successful server response
mock_response = Mock(status_code=200)
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
mock_client_class.return_value.__aenter__.return_value = mock_client
result = await health_check()
assert result["dependencies"]["archon_server"]["available"] is True
assert result["dependencies"]["archon_server"]["url"] == "http://localhost:8181"
@pytest.mark.unit
@patch("src.agent_work_orders.server.httpx.AsyncClient")
@patch.dict("os.environ", {"ARCHON_MCP_URL": "http://localhost:8051"})
async def test_health_check_mcp_connectivity(mock_client_class):
"""Test health check validates MCP connectivity"""
from src.agent_work_orders.server import health_check
# Mock successful MCP response
mock_response = Mock(status_code=200)
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
mock_client_class.return_value.__aenter__.return_value = mock_client
result = await health_check()
assert result["dependencies"]["archon_mcp"]["available"] is True
assert result["dependencies"]["archon_mcp"]["url"] == "http://localhost:8051"
@pytest.mark.unit
@patch("src.agent_work_orders.server.httpx.AsyncClient")
@patch.dict("os.environ", {"ARCHON_SERVER_URL": "http://localhost:8181"})
async def test_health_check_server_unavailable(mock_client_class):
"""Test health check handles unavailable server"""
from src.agent_work_orders.server import health_check
# Mock connection error
mock_client = AsyncMock()
mock_client.get.side_effect = Exception("Connection refused")
mock_client_class.return_value.__aenter__.return_value = mock_client
result = await health_check()
assert result["dependencies"]["archon_server"]["available"] is False
assert "error" in result["dependencies"]["archon_server"]
@pytest.mark.unit
def test_cors_middleware_configured():
"""Test CORS middleware is properly configured"""
from src.agent_work_orders.server import app
# Check CORS middleware is in middleware stack
middleware_classes = [m.cls.__name__ for m in app.user_middleware]
assert "CORSMiddleware" in middleware_classes
@pytest.mark.unit
def test_router_included_with_prefix():
"""Test API routes are included with correct prefix"""
from src.agent_work_orders.server import app
# Check routes are mounted with /api/agent-work-orders prefix
routes = [route.path for route in app.routes]
assert any("/api/agent-work-orders" in route for route in routes)
@pytest.mark.unit
@patch.dict("os.environ", {"SERVICE_DISCOVERY_MODE": "local"})
def test_startup_logs_local_mode(caplog):
"""Test startup logs service discovery mode"""
from src.agent_work_orders.config import config
# Verify config is set to local mode
assert config.SERVICE_DISCOVERY_MODE == "local"
@pytest.mark.unit
@patch.dict("os.environ", {"SERVICE_DISCOVERY_MODE": "docker_compose"})
def test_startup_logs_docker_mode(caplog):
"""Test startup logs docker_compose mode"""
import importlib
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig
# Create fresh config instance with env var
config = AgentWorkOrdersConfig()
# Verify config is set to docker_compose mode
assert config.SERVICE_DISCOVERY_MODE == "docker_compose"

View File

@@ -0,0 +1,334 @@
"""Unit tests for SSE Streaming Module
Tests SSE event formatting, streaming logic, filtering, and disconnect handling.
"""
import asyncio
import json
from datetime import UTC
import pytest
from src.agent_work_orders.api.sse_streams import (
format_log_event,
get_current_timestamp,
stream_work_order_logs,
)
from src.agent_work_orders.utils.log_buffer import WorkOrderLogBuffer
@pytest.mark.unit
def test_format_log_event():
"""Test formatting log dictionary as SSE event"""
log_dict = {
"timestamp": "2025-10-23T12:00:00Z",
"level": "info",
"event": "step_started",
"work_order_id": "wo-123",
"step": "planning",
}
event = format_log_event(log_dict)
assert "data" in event
# Data should be JSON string
parsed = json.loads(event["data"])
assert parsed["timestamp"] == "2025-10-23T12:00:00Z"
assert parsed["level"] == "info"
assert parsed["event"] == "step_started"
assert parsed["work_order_id"] == "wo-123"
assert parsed["step"] == "planning"
@pytest.mark.unit
def test_get_current_timestamp():
"""Test timestamp generation in ISO format"""
timestamp = get_current_timestamp()
# Should be valid ISO format
assert isinstance(timestamp, str)
assert "T" in timestamp
# Should be recent (within last second)
from datetime import datetime
parsed = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
now = datetime.now(UTC)
assert (now - parsed).total_seconds() < 1
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_empty_buffer():
"""Test streaming when buffer is empty"""
buffer = WorkOrderLogBuffer()
events = []
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
# Break after heartbeat to avoid infinite loop
if "comment" in event:
break
# Should receive at least one heartbeat
assert len(events) >= 1
assert events[-1] == {"comment": "keepalive"}
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_existing_logs():
"""Test streaming existing buffered logs first"""
buffer = WorkOrderLogBuffer()
# Add existing logs
buffer.add_log("wo-123", "info", "event1", step="planning")
buffer.add_log("wo-123", "info", "event2", step="execute")
events = []
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
# Stop after receiving both events
if len(events) >= 2:
break
assert len(events) == 2
# Both should be data events
assert "data" in events[0]
assert "data" in events[1]
# Parse and verify content
log1 = json.loads(events[0]["data"])
log2 = json.loads(events[1]["data"])
assert log1["event"] == "event1"
assert log2["event"] == "event2"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_level_filter():
"""Test streaming with log level filter"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "info_event")
buffer.add_log("wo-123", "error", "error_event")
buffer.add_log("wo-123", "info", "another_info_event")
events = []
async for event in stream_work_order_logs("wo-123", buffer, level_filter="error"):
events.append(event)
if "data" in event:
break
# Should only get error event
assert len(events) == 1
log = json.loads(events[0]["data"])
assert log["level"] == "error"
assert log["event"] == "error_event"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_step_filter():
"""Test streaming with step filter"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1", step="planning")
buffer.add_log("wo-123", "info", "event2", step="execute")
buffer.add_log("wo-123", "info", "event3", step="planning")
events = []
async for event in stream_work_order_logs("wo-123", buffer, step_filter="planning"):
events.append(event)
if len(events) >= 2:
break
assert len(events) == 2
log1 = json.loads(events[0]["data"])
log2 = json.loads(events[1]["data"])
assert log1["step"] == "planning"
assert log2["step"] == "planning"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_since_timestamp():
"""Test streaming logs after specific timestamp"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
ts3 = "2025-10-23T12:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1)
buffer.add_log("wo-123", "info", "event2", timestamp=ts2)
buffer.add_log("wo-123", "info", "event3", timestamp=ts3)
events = []
async for event in stream_work_order_logs("wo-123", buffer, since_timestamp=ts2):
events.append(event)
if "data" in event:
break
# Should only get event3 (after ts2)
assert len(events) == 1
log = json.loads(events[0]["data"])
assert log["event"] == "event3"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_heartbeat():
"""Test that heartbeat comments are sent periodically"""
buffer = WorkOrderLogBuffer()
heartbeat_count = 0
event_count = 0
async for event in stream_work_order_logs("wo-123", buffer):
if "comment" in event:
heartbeat_count += 1
if heartbeat_count >= 2:
break
if "data" in event:
event_count += 1
# Should have received at least 2 heartbeats
assert heartbeat_count >= 2
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_disconnect():
"""Test handling of client disconnect (CancelledError)"""
buffer = WorkOrderLogBuffer()
async def stream_with_cancel():
events = []
try:
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
# Simulate disconnect after first event
if len(events) >= 1:
raise asyncio.CancelledError()
except asyncio.CancelledError:
# Should be caught and handled gracefully
pass
return events
events = await stream_with_cancel()
# Should have at least one event before cancel
assert len(events) >= 1
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_yields_new_logs():
"""Test that stream yields new logs as they arrive"""
buffer = WorkOrderLogBuffer()
# Add initial log
buffer.add_log("wo-123", "info", "initial_event")
events = []
async def consume_stream():
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
if len(events) >= 2 and "data" in events[1]:
break
async def add_new_log():
# Wait a bit then add new log
await asyncio.sleep(0.6)
buffer.add_log("wo-123", "info", "new_event")
# Run both concurrently
await asyncio.gather(consume_stream(), add_new_log())
# Should have received both events
data_events = [e for e in events if "data" in e]
assert len(data_events) >= 2
log1 = json.loads(data_events[0]["data"])
log2 = json.loads(data_events[1]["data"])
assert log1["event"] == "initial_event"
assert log2["event"] == "new_event"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_combined_filters():
"""Test streaming with multiple filters combined"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1, step="planning")
buffer.add_log("wo-123", "error", "event2", timestamp=ts2, step="planning")
buffer.add_log("wo-123", "info", "event3", timestamp=ts2, step="execute")
events = []
async for event in stream_work_order_logs(
"wo-123",
buffer,
level_filter="info",
step_filter="execute",
since_timestamp=ts1,
):
events.append(event)
if "data" in event:
break
# Should only get event3
assert len(events) == 1
log = json.loads(events[0]["data"])
assert log["event"] == "event3"
assert log["level"] == "info"
assert log["step"] == "execute"
@pytest.mark.unit
def test_format_log_event_with_extra_fields():
"""Test that format_log_event preserves all fields"""
log_dict = {
"timestamp": "2025-10-23T12:00:00Z",
"level": "info",
"event": "step_completed",
"work_order_id": "wo-123",
"step": "planning",
"duration_seconds": 45.2,
"custom_field": "custom_value",
}
event = format_log_event(log_dict)
parsed = json.loads(event["data"])
# All fields should be preserved
assert parsed["duration_seconds"] == 45.2
assert parsed["custom_field"] == "custom_value"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_no_duplicate_events():
"""Test that streaming doesn't yield duplicate events"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1", timestamp="2025-10-23T10:00:00Z")
buffer.add_log("wo-123", "info", "event2", timestamp="2025-10-23T11:00:00Z")
events = []
async for event in stream_work_order_logs("wo-123", buffer):
if "data" in event:
events.append(event)
if len(events) >= 2:
# Stop after receiving initial logs
break
# Should have exactly 2 events, no duplicates
assert len(events) == 2
log1 = json.loads(events[0]["data"])
log2 = json.loads(events[1]["data"])
assert log1["event"] == "event1"
assert log2["event"] == "event2"

View File

@@ -0,0 +1,315 @@
"""Tests for State Manager"""
from datetime import datetime
import pytest
from src.agent_work_orders.models import (
AgentWorkflowType,
AgentWorkOrderState,
AgentWorkOrderStatus,
SandboxType,
StepExecutionResult,
StepHistory,
WorkflowStep,
)
from src.agent_work_orders.state_manager.work_order_repository import (
WorkOrderRepository,
)
@pytest.mark.asyncio
async def test_create_work_order():
"""Test creating a work order"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
result = await repo.get("wo-test123")
assert result is not None
retrieved_state, retrieved_metadata = result
assert retrieved_state.agent_work_order_id == "wo-test123"
assert retrieved_metadata["status"] == AgentWorkOrderStatus.PENDING
@pytest.mark.asyncio
async def test_get_nonexistent_work_order():
"""Test getting a work order that doesn't exist"""
repo = WorkOrderRepository()
result = await repo.get("wo-nonexistent")
assert result is None
@pytest.mark.asyncio
async def test_list_work_orders():
"""Test listing all work orders"""
repo = WorkOrderRepository()
# Create multiple work orders
for i in range(3):
state = AgentWorkOrderState(
agent_work_order_id=f"wo-test{i}",
repository_url="https://github.com/owner/repo",
sandbox_identifier=f"sandbox-wo-test{i}",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
results = await repo.list()
assert len(results) == 3
@pytest.mark.asyncio
async def test_list_work_orders_with_status_filter():
"""Test listing work orders filtered by status"""
repo = WorkOrderRepository()
# Create work orders with different statuses
for i, status in enumerate([AgentWorkOrderStatus.PENDING, AgentWorkOrderStatus.RUNNING, AgentWorkOrderStatus.COMPLETED]):
state = AgentWorkOrderState(
agent_work_order_id=f"wo-test{i}",
repository_url="https://github.com/owner/repo",
sandbox_identifier=f"sandbox-wo-test{i}",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": status,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Filter by RUNNING
results = await repo.list(status_filter=AgentWorkOrderStatus.RUNNING)
assert len(results) == 1
assert results[0][1]["status"] == AgentWorkOrderStatus.RUNNING
@pytest.mark.asyncio
async def test_update_status():
"""Test updating work order status"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update status
await repo.update_status("wo-test123", AgentWorkOrderStatus.RUNNING)
result = await repo.get("wo-test123")
assert result is not None
_, updated_metadata = result
assert updated_metadata["status"] == AgentWorkOrderStatus.RUNNING
@pytest.mark.asyncio
async def test_update_status_with_additional_fields():
"""Test updating status with additional fields"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update with additional fields
await repo.update_status(
"wo-test123",
AgentWorkOrderStatus.COMPLETED,
github_pull_request_url="https://github.com/owner/repo/pull/1",
)
result = await repo.get("wo-test123")
assert result is not None
_, updated_metadata = result
assert updated_metadata["status"] == AgentWorkOrderStatus.COMPLETED
assert updated_metadata["github_pull_request_url"] == "https://github.com/owner/repo/pull/1"
@pytest.mark.asyncio
async def test_update_git_branch():
"""Test updating git branch name"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update git branch
await repo.update_git_branch("wo-test123", "feat-wo-test123")
result = await repo.get("wo-test123")
assert result is not None
updated_state, _ = result
assert updated_state.git_branch_name == "feat-wo-test123"
@pytest.mark.asyncio
async def test_update_session_id():
"""Test updating agent session ID"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update session ID
await repo.update_session_id("wo-test123", "session-abc123")
result = await repo.get("wo-test123")
assert result is not None
updated_state, _ = result
assert updated_state.agent_session_id == "session-abc123"
@pytest.mark.asyncio
async def test_save_and_get_step_history():
"""Test saving and retrieving step history"""
repo = WorkOrderRepository()
step1 = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test-feature",
duration_seconds=1.0,
)
step2 = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=True,
output="Plan created",
duration_seconds=5.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[step1, step2])
await repo.save_step_history("wo-test123", history)
retrieved = await repo.get_step_history("wo-test123")
assert retrieved is not None
assert retrieved.agent_work_order_id == "wo-test123"
assert len(retrieved.steps) == 2
assert retrieved.steps[0].step == WorkflowStep.CREATE_BRANCH
assert retrieved.steps[1].step == WorkflowStep.PLANNING
@pytest.mark.asyncio
async def test_get_nonexistent_step_history():
"""Test getting step history that doesn't exist"""
repo = WorkOrderRepository()
retrieved = await repo.get_step_history("wo-nonexistent")
assert retrieved is None
@pytest.mark.asyncio
async def test_update_step_history():
"""Test updating step history with new steps"""
repo = WorkOrderRepository()
# Initial history
step1 = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test-feature",
duration_seconds=1.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[step1])
await repo.save_step_history("wo-test123", history)
# Add more steps
step2 = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=True,
output="Plan created",
duration_seconds=5.0,
)
history.steps.append(step2)
await repo.save_step_history("wo-test123", history)
# Verify updated history
retrieved = await repo.get_step_history("wo-test123")
assert retrieved is not None
assert len(retrieved.steps) == 2

View File

@@ -0,0 +1,394 @@
"""Tests for Workflow Operations - Refactored Command Stitching Architecture"""
from unittest.mock import AsyncMock, MagicMock
import pytest
from src.agent_work_orders.models import (
CommandExecutionResult,
WorkflowStep,
)
from src.agent_work_orders.workflow_engine import workflow_operations
from src.agent_work_orders.workflow_engine.agent_names import (
BRANCH_CREATOR,
COMMITTER,
IMPLEMENTOR,
PLANNER,
PR_CREATOR,
REVIEWER,
)
@pytest.mark.asyncio
async def test_run_create_branch_step_success():
"""Test successful branch creation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text="feat/add-feature",
stdout="feat/add-feature",
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock(file_path="create-branch.md"))
context = {"user_request": "Add new feature"}
result = await workflow_operations.run_create_branch_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is True
assert result.step == WorkflowStep.CREATE_BRANCH
assert result.agent_name == BRANCH_CREATOR
assert result.output == "feat/add-feature"
mock_command_loader.load_command.assert_called_once_with("create-branch")
mock_executor.build_command.assert_called_once()
@pytest.mark.asyncio
async def test_run_create_branch_step_failure():
"""Test branch creation failure"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=False,
error_message="Branch creation failed",
exit_code=1,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
context = {"user_request": "Add new feature"}
result = await workflow_operations.run_create_branch_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is False
assert result.error_message == "Branch creation failed"
assert result.step == WorkflowStep.CREATE_BRANCH
@pytest.mark.asyncio
async def test_run_planning_step_success():
"""Test successful planning step"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text="PRPs/features/add-feature.md",
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
context = {
"user_request": "Add authentication",
"github_issue_number": "123"
}
result = await workflow_operations.run_planning_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is True
assert result.step == WorkflowStep.PLANNING
assert result.agent_name == PLANNER
assert result.output == "PRPs/features/add-feature.md"
mock_command_loader.load_command.assert_called_once_with("planning")
@pytest.mark.asyncio
async def test_run_planning_step_with_none_issue_number():
"""Test planning step handles None issue number"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text="PRPs/features/add-feature.md",
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
context = {
"user_request": "Add authentication",
"github_issue_number": None # None should be converted to ""
}
result = await workflow_operations.run_planning_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is True
# Verify build_command was called with ["user_request", ""] not None
args_used = mock_executor.build_command.call_args[1]["args"]
assert args_used[1] == "" # github_issue_number should be empty string
@pytest.mark.asyncio
async def test_run_execute_step_success():
"""Test successful execute step"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text="Implementation completed",
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
context = {"planning": "PRPs/features/add-feature.md"}
result = await workflow_operations.run_execute_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is True
assert result.step == WorkflowStep.EXECUTE
assert result.agent_name == IMPLEMENTOR
assert "completed" in result.output.lower()
mock_command_loader.load_command.assert_called_once_with("execute")
@pytest.mark.asyncio
async def test_run_execute_step_missing_plan_file():
"""Test execute step fails when plan file missing from context"""
mock_executor = MagicMock()
mock_command_loader = MagicMock()
context = {} # No plan file
result = await workflow_operations.run_execute_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is False
assert "No plan file" in result.error_message
@pytest.mark.asyncio
async def test_run_commit_step_success():
"""Test successful commit step"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text="Commit: abc123\nBranch: feat/add-feature\nPushed: Yes",
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
context = {}
result = await workflow_operations.run_commit_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is True
assert result.step == WorkflowStep.COMMIT
assert result.agent_name == COMMITTER
mock_command_loader.load_command.assert_called_once_with("commit")
@pytest.mark.asyncio
async def test_run_create_pr_step_success():
"""Test successful PR creation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text="https://github.com/owner/repo/pull/123",
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
context = {
"create-branch": "feat/add-feature",
"planning": "PRPs/features/add-feature.md"
}
result = await workflow_operations.run_create_pr_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is True
assert result.step == WorkflowStep.CREATE_PR
assert result.agent_name == PR_CREATOR
assert "github.com" in result.output
mock_command_loader.load_command.assert_called_once_with("create-pr")
@pytest.mark.asyncio
async def test_run_create_pr_step_missing_branch():
"""Test PR creation fails when branch name missing"""
mock_executor = MagicMock()
mock_command_loader = MagicMock()
context = {"planning": "PRPs/features/add-feature.md"} # No branch name
result = await workflow_operations.run_create_pr_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is False
assert "No branch name" in result.error_message
@pytest.mark.asyncio
async def test_run_review_step_success():
"""Test successful review step"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text='{"blockers": [], "tech_debt": []}',
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
context = {"planning": "PRPs/features/add-feature.md"}
result = await workflow_operations.run_review_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is True
assert result.step == WorkflowStep.REVIEW
assert result.agent_name == REVIEWER
mock_command_loader.load_command.assert_called_once_with("prp-review")
@pytest.mark.asyncio
async def test_run_review_step_missing_plan():
"""Test review step fails when plan file missing"""
mock_executor = MagicMock()
mock_command_loader = MagicMock()
context = {} # No plan file
result = await workflow_operations.run_review_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert result.success is False
assert "No plan file" in result.error_message
@pytest.mark.asyncio
async def test_context_passing_between_steps():
"""Test that context is properly used across steps"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
result_text="output",
exit_code=0,
)
)
mock_command_loader = MagicMock()
mock_command_loader.load_command = MagicMock(return_value=MagicMock())
# Test context flow: create-branch -> planning
context = {"user_request": "Test feature"}
# Step 1: Create branch
branch_result = await workflow_operations.run_create_branch_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
# Simulate orchestrator storing output
context["create-branch"] = "feat/test-feature"
# Step 2: Planning should have access to branch name via context
planning_result = await workflow_operations.run_planning_step(
executor=mock_executor,
command_loader=mock_command_loader,
work_order_id="wo-test",
working_dir="/tmp/test",
context=context,
)
assert branch_result.success is True
assert planning_result.success is True
assert "create-branch" in context

View File

@@ -0,0 +1,379 @@
"""Tests for Workflow Orchestrator - Command Stitching Architecture"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.models import (
AgentWorkOrderStatus,
SandboxType,
StepExecutionResult,
WorkflowStep,
)
from src.agent_work_orders.workflow_engine.workflow_orchestrator import WorkflowOrchestrator
@pytest.fixture
def mock_dependencies():
"""Create mocked dependencies for orchestrator"""
mock_executor = MagicMock()
mock_sandbox_factory = MagicMock()
mock_github_client = MagicMock()
mock_command_loader = MagicMock()
mock_state_repository = MagicMock()
# Mock sandbox
mock_sandbox = MagicMock()
mock_sandbox.working_dir = "/tmp/test-sandbox"
mock_sandbox.setup = AsyncMock()
mock_sandbox.cleanup = AsyncMock()
mock_sandbox_factory.create_sandbox.return_value = mock_sandbox
# Mock state repository
mock_state_repository.update_status = AsyncMock()
mock_state_repository.save_step_history = AsyncMock()
mock_state_repository.update_git_branch = AsyncMock()
orchestrator = WorkflowOrchestrator(
agent_executor=mock_executor,
sandbox_factory=mock_sandbox_factory,
github_client=mock_github_client,
command_loader=mock_command_loader,
state_repository=mock_state_repository,
)
return orchestrator, {
"executor": mock_executor,
"sandbox_factory": mock_sandbox_factory,
"github_client": mock_github_client,
"command_loader": mock_command_loader,
"state_repository": mock_state_repository,
"sandbox": mock_sandbox,
}
@pytest.mark.asyncio
async def test_execute_workflow_default_commands(mock_dependencies):
"""Test workflow with default command selection"""
orchestrator, mocks = mock_dependencies
# Mock all command steps to succeed
with patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_branch_step") as mock_branch, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_planning_step") as mock_plan, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_execute_step") as mock_execute, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_commit_step") as mock_commit, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_pr_step") as mock_pr:
# Set up mock returns
mock_branch.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test-feature",
duration_seconds=1.0,
)
mock_plan.return_value = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=True,
output="PRPs/features/test.md",
duration_seconds=5.0,
)
mock_execute.return_value = StepExecutionResult(
step=WorkflowStep.EXECUTE,
agent_name="Implementor",
success=True,
output="Implementation completed",
duration_seconds=30.0,
)
mock_commit.return_value = StepExecutionResult(
step=WorkflowStep.COMMIT,
agent_name="Committer",
success=True,
output="Commit: abc123",
duration_seconds=2.0,
)
mock_pr.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_PR,
agent_name="PrCreator",
success=True,
output="https://github.com/owner/repo/pull/1",
duration_seconds=3.0,
)
# Execute workflow with default commands (None = default)
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=None, # Should use default
)
# Verify all 5 default commands were executed
assert mock_branch.called
assert mock_plan.called
assert mock_execute.called
assert mock_commit.called
assert mock_pr.called
# Verify status updates
assert mocks["state_repository"].update_status.call_count >= 2
@pytest.mark.asyncio
async def test_execute_workflow_custom_commands(mock_dependencies):
"""Test workflow with custom command selection"""
orchestrator, mocks = mock_dependencies
with patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_branch_step") as mock_branch, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_planning_step") as mock_plan:
mock_branch.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test",
duration_seconds=1.0,
)
mock_plan.return_value = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=True,
output="PRPs/features/test.md",
duration_seconds=5.0,
)
# Execute with only 2 commands
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=["create-branch", "planning"],
)
# Verify only 2 commands were executed
assert mock_branch.called
assert mock_plan.called
@pytest.mark.asyncio
async def test_execute_workflow_stop_on_failure(mock_dependencies):
"""Test workflow stops on first failure"""
orchestrator, mocks = mock_dependencies
with patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_branch_step") as mock_branch, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_planning_step") as mock_plan, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_execute_step") as mock_execute:
# First command succeeds
mock_branch.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test",
duration_seconds=1.0,
)
# Second command fails
mock_plan.return_value = StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=False,
error_message="Planning failed: timeout",
duration_seconds=5.0,
)
# Execute workflow - should stop at planning and save error to state
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=["create-branch", "planning", "execute"],
)
# Verify only first 2 commands executed, not the third
assert mock_branch.called
assert mock_plan.called
assert not mock_execute.called
# Verify failure status was set
calls = [call for call in mocks["state_repository"].update_status.call_args_list
if call[0][1] == AgentWorkOrderStatus.FAILED]
assert len(calls) > 0
@pytest.mark.asyncio
async def test_execute_workflow_context_passing(mock_dependencies):
"""Test context is passed correctly between commands"""
orchestrator, mocks = mock_dependencies
captured_contexts = []
async def capture_branch_context(executor, command_loader, work_order_id, working_dir, context):
captured_contexts.append(("branch", dict(context)))
return StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test",
duration_seconds=1.0,
)
async def capture_plan_context(executor, command_loader, work_order_id, working_dir, context):
captured_contexts.append(("planning", dict(context)))
return StepExecutionResult(
step=WorkflowStep.PLANNING,
agent_name="Planner",
success=True,
output="PRPs/features/test.md",
duration_seconds=5.0,
)
with patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_branch_step", side_effect=capture_branch_context), \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_planning_step", side_effect=capture_plan_context):
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=["create-branch", "planning"],
)
# Verify context was passed correctly
assert len(captured_contexts) == 2
# First command should have initial context
branch_context = captured_contexts[0][1]
assert "user_request" in branch_context
assert branch_context["user_request"] == "Test feature"
# Second command should have previous command's output
planning_context = captured_contexts[1][1]
assert "user_request" in planning_context
assert "create-branch" in planning_context
assert planning_context["create-branch"] == "feat/test"
@pytest.mark.asyncio
async def test_execute_workflow_updates_git_branch(mock_dependencies):
"""Test that git branch name is updated after create-branch"""
orchestrator, mocks = mock_dependencies
with patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_branch_step") as mock_branch:
mock_branch.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/awesome-feature",
duration_seconds=1.0,
)
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=["create-branch"],
)
# Verify git branch was updated
mocks["state_repository"].update_git_branch.assert_called_once_with(
"wo-test", "feat/awesome-feature"
)
@pytest.mark.asyncio
async def test_execute_workflow_updates_pr_url(mock_dependencies):
"""Test that PR URL is saved after create-pr"""
orchestrator, mocks = mock_dependencies
with patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_branch_step") as mock_branch, \
patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_pr_step") as mock_pr:
mock_branch.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=True,
output="feat/test",
duration_seconds=1.0,
)
mock_pr.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_PR,
agent_name="PrCreator",
success=True,
output="https://github.com/owner/repo/pull/42",
duration_seconds=3.0,
)
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=["create-branch", "create-pr"],
)
# Verify PR URL was saved with COMPLETED status
status_calls = [call for call in mocks["state_repository"].update_status.call_args_list
if call[0][1] == AgentWorkOrderStatus.COMPLETED]
assert any("github_pull_request_url" in str(call) for call in status_calls)
@pytest.mark.asyncio
async def test_execute_workflow_unknown_command(mock_dependencies):
"""Test that unknown commands save error to state"""
orchestrator, mocks = mock_dependencies
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=["invalid-command"],
)
# Verify error was saved to state
status_calls = [call for call in mocks["state_repository"].update_status.call_args_list
if call[0][1] == AgentWorkOrderStatus.FAILED]
assert len(status_calls) > 0
# Check that error message contains "Unknown command"
error_messages = [call.kwargs.get("error_message", "") for call in status_calls]
assert any("Unknown command" in msg for msg in error_messages)
@pytest.mark.asyncio
async def test_execute_workflow_sandbox_cleanup(mock_dependencies):
"""Test that sandbox is cleaned up even on failure"""
orchestrator, mocks = mock_dependencies
with patch("src.agent_work_orders.workflow_engine.workflow_operations.run_create_branch_step") as mock_branch:
mock_branch.return_value = StepExecutionResult(
step=WorkflowStep.CREATE_BRANCH,
agent_name="BranchCreator",
success=False,
error_message="Failed",
duration_seconds=1.0,
)
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature",
selected_commands=["create-branch"],
)
# Verify sandbox cleanup was called even on failure
assert mocks["sandbox"].cleanup.called