sauce aow

This commit is contained in:
Rasmus Widing
2025-10-08 21:39:04 +03:00
parent 3f0815b686
commit 9a60d6ae89
84 changed files with 17939 additions and 2 deletions

View File

@@ -0,0 +1,11 @@
"""Pytest configuration for agent_work_orders tests"""
import pytest
@pytest.fixture(autouse=True)
def reset_structlog():
"""Reset structlog configuration for each test"""
import structlog
structlog.reset_defaults()

View File

@@ -0,0 +1,7 @@
[pytest]
testpaths = .
python_files = test_*.py
python_classes = Test*
python_functions = test_*
pythonpath = ../..
asyncio_mode = auto

View File

@@ -0,0 +1,303 @@
"""Tests for Agent Executor"""
import asyncio
import pytest
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from src.agent_work_orders.agent_executor.agent_cli_executor import AgentCLIExecutor
def test_build_command():
"""Test building Claude CLI command with all flags"""
executor = AgentCLIExecutor(cli_path="claude")
# Create a temporary command file with placeholders
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Test command content with args: $1 and $2")
command_file_path = f.name
try:
command, prompt_text = executor.build_command(
command_file_path=command_file_path,
args=["issue-42", "wo-test123"],
model="sonnet",
)
# Verify command includes required flags
assert "claude" in command
assert "--print" in command
assert "--output-format" in command
assert "stream-json" in command
assert "--verbose" in command # Required for stream-json with --print
assert "--model" in command # Model specification
assert "sonnet" in command # Model value
assert "--dangerously-skip-permissions" in command # Automation
# Note: --max-turns is optional (None by default = unlimited)
# Verify prompt text includes command content and placeholder replacements
assert "Test command content" in prompt_text
assert "issue-42" in prompt_text
assert "wo-test123" in prompt_text
assert "$1" not in prompt_text # Placeholders should be replaced
assert "$2" not in prompt_text
finally:
Path(command_file_path).unlink()
def test_build_command_no_args():
"""Test building command without arguments"""
executor = AgentCLIExecutor()
# Create a temporary command file
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Command without args")
command_file_path = f.name
try:
command, prompt_text = executor.build_command(
command_file_path=command_file_path,
model="opus",
)
assert "claude" in command
assert "--verbose" in command
assert "--model" in command
assert "opus" in command
assert "Command without args" in prompt_text
# Note: --max-turns is optional (None by default = unlimited)
finally:
Path(command_file_path).unlink()
def test_build_command_with_custom_max_turns():
"""Test building command with custom max-turns configuration"""
with patch("src.agent_work_orders.agent_executor.agent_cli_executor.config") as mock_config:
mock_config.CLAUDE_CLI_PATH = "claude"
mock_config.CLAUDE_CLI_VERBOSE = True
mock_config.CLAUDE_CLI_MAX_TURNS = 50
mock_config.CLAUDE_CLI_SKIP_PERMISSIONS = True
executor = AgentCLIExecutor()
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Test content")
command_file_path = f.name
try:
command, _ = executor.build_command(
command_file_path=command_file_path,
model="sonnet",
)
assert "--max-turns 50" in command
finally:
Path(command_file_path).unlink()
def test_build_command_missing_file():
"""Test building command with non-existent file"""
executor = AgentCLIExecutor()
with pytest.raises(ValueError, match="Failed to read command file"):
executor.build_command(
command_file_path="/nonexistent/path/to/command.md",
model="sonnet",
)
@pytest.mark.asyncio
async def test_execute_async_success():
"""Test successful command execution with prompt via stdin"""
executor = AgentCLIExecutor()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(
b'{"session_id": "session-123", "type": "init"}\n{"type": "result"}',
b"",
)
)
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await executor.execute_async(
command="claude --print --output-format stream-json --verbose --max-turns 20 --dangerously-skip-permissions",
working_directory="/tmp",
timeout_seconds=30,
prompt_text="Test prompt content",
)
assert result.success is True
assert result.exit_code == 0
assert result.session_id == "session-123"
assert result.stdout is not None
@pytest.mark.asyncio
async def test_execute_async_failure():
"""Test failed command execution"""
executor = AgentCLIExecutor()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(
return_value=(b"", b"Error: Command failed")
)
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await executor.execute_async(
command="claude --print --output-format stream-json --verbose",
working_directory="/tmp",
prompt_text="Test prompt",
)
assert result.success is False
assert result.exit_code == 1
assert result.error_message is not None
@pytest.mark.asyncio
async def test_execute_async_timeout():
"""Test command execution timeout"""
executor = AgentCLIExecutor()
# Mock subprocess that times out
mock_process = MagicMock()
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
async def mock_communicate(input=None):
await asyncio.sleep(10) # Longer than timeout
return (b"", b"")
mock_process.communicate = mock_communicate
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await executor.execute_async(
command="claude --print --output-format stream-json --verbose",
working_directory="/tmp",
timeout_seconds=0.1, # Very short timeout
prompt_text="Test prompt",
)
assert result.success is False
assert result.exit_code == -1
assert "timed out" in result.error_message.lower()
def test_extract_session_id():
"""Test extracting session ID from JSONL output"""
executor = AgentCLIExecutor()
jsonl_output = """
{"type": "init", "session_id": "session-abc123"}
{"type": "message", "content": "Hello"}
{"type": "result"}
"""
session_id = executor._extract_session_id(jsonl_output)
assert session_id == "session-abc123"
def test_extract_session_id_not_found():
"""Test extracting session ID when not present"""
executor = AgentCLIExecutor()
jsonl_output = """
{"type": "message", "content": "Hello"}
{"type": "result"}
"""
session_id = executor._extract_session_id(jsonl_output)
assert session_id is None
def test_extract_session_id_invalid_json():
"""Test extracting session ID with invalid JSON"""
executor = AgentCLIExecutor()
jsonl_output = "Not valid JSON"
session_id = executor._extract_session_id(jsonl_output)
assert session_id is None
@pytest.mark.asyncio
async def test_execute_async_extracts_result_text():
"""Test that result text is extracted from JSONL output"""
executor = AgentCLIExecutor()
# Mock subprocess that returns JSONL with result
jsonl_output = '{"type":"session_started","session_id":"test-123"}\n{"type":"result","result":"/feature","is_error":false}'
with patch("asyncio.create_subprocess_shell") as mock_subprocess:
mock_process = AsyncMock()
mock_process.communicate = AsyncMock(return_value=(jsonl_output.encode(), b""))
mock_process.returncode = 0
mock_subprocess.return_value = mock_process
result = await executor.execute_async(
"claude --print",
"/tmp/test",
prompt_text="test prompt",
work_order_id="wo-test",
)
assert result.success is True
assert result.result_text == "/feature"
assert result.session_id == "test-123"
assert '{"type":"result"' in result.stdout
def test_build_command_replaces_arguments_placeholder():
"""Test that $ARGUMENTS placeholder is replaced with actual arguments"""
executor = AgentCLIExecutor()
# Create temp command file with $ARGUMENTS
import tempfile
import os
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Classify this issue:\n\n$ARGUMENTS")
temp_file = f.name
try:
command, prompt = executor.build_command(
temp_file, args=['{"title": "Add feature", "body": "description"}']
)
assert "$ARGUMENTS" not in prompt
assert '{"title": "Add feature"' in prompt
assert "Classify this issue:" in prompt
finally:
os.unlink(temp_file)
def test_build_command_replaces_positional_arguments():
"""Test that $1, $2, $3 are replaced with positional arguments"""
executor = AgentCLIExecutor()
import tempfile
import os
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Issue: $1\nWorkOrder: $2\nData: $3")
temp_file = f.name
try:
command, prompt = executor.build_command(
temp_file, args=["42", "wo-test", '{"title":"Test"}']
)
assert "$1" not in prompt
assert "$2" not in prompt
assert "$3" not in prompt
assert "Issue: 42" in prompt
assert "WorkOrder: wo-test" in prompt
assert 'Data: {"title":"Test"}' in prompt
finally:
os.unlink(temp_file)

View File

@@ -0,0 +1,370 @@
"""Integration Tests for API Endpoints"""
import pytest
from datetime import datetime
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, MagicMock, patch
from src.agent_work_orders.main import app
from src.agent_work_orders.models import (
AgentWorkOrderStatus,
AgentWorkflowType,
SandboxType,
)
client = TestClient(app)
def test_health_endpoint():
"""Test health check endpoint"""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert data["service"] == "agent-work-orders"
def test_create_agent_work_order():
"""Test creating an agent work order"""
with patch("src.agent_work_orders.api.routes.orchestrator") as mock_orchestrator:
mock_orchestrator.execute_workflow = AsyncMock()
request_data = {
"repository_url": "https://github.com/owner/repo",
"sandbox_type": "git_branch",
"workflow_type": "agent_workflow_plan",
"user_request": "Add user authentication feature",
"github_issue_number": "42",
}
response = client.post("/agent-work-orders", json=request_data)
assert response.status_code == 201
data = response.json()
assert "agent_work_order_id" in data
assert data["status"] == "pending"
assert data["agent_work_order_id"].startswith("wo-")
def test_create_agent_work_order_without_issue():
"""Test creating work order without issue number"""
with patch("src.agent_work_orders.api.routes.orchestrator") as mock_orchestrator:
mock_orchestrator.execute_workflow = AsyncMock()
request_data = {
"repository_url": "https://github.com/owner/repo",
"sandbox_type": "git_branch",
"workflow_type": "agent_workflow_plan",
"user_request": "Fix the login bug where users can't sign in",
}
response = client.post("/agent-work-orders", json=request_data)
assert response.status_code == 201
data = response.json()
assert "agent_work_order_id" in data
def test_create_agent_work_order_invalid_data():
"""Test creating work order with invalid data"""
request_data = {
"repository_url": "https://github.com/owner/repo",
# Missing required fields
}
response = client.post("/agent-work-orders", json=request_data)
assert response.status_code == 422 # Validation error
def test_list_agent_work_orders_empty():
"""Test listing work orders when none exist"""
# Reset state repository
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.list = AsyncMock(return_value=[])
response = client.get("/agent-work-orders")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) == 0
def test_list_agent_work_orders_with_data():
"""Test listing work orders with data"""
from src.agent_work_orders.models import AgentWorkOrderState
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"github_issue_number": "42",
"status": AgentWorkOrderStatus.RUNNING,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.list = AsyncMock(return_value=[(state, metadata)])
response = client.get("/agent-work-orders")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["agent_work_order_id"] == "wo-test123"
assert data[0]["status"] == "running"
def test_list_agent_work_orders_with_status_filter():
"""Test listing work orders with status filter"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.list = AsyncMock(return_value=[])
response = client.get("/agent-work-orders?status=running")
assert response.status_code == 200
mock_repo.list.assert_called_once()
def test_get_agent_work_order():
"""Test getting a specific work order"""
from src.agent_work_orders.models import AgentWorkOrderState
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"github_issue_number": "42",
"status": AgentWorkOrderStatus.COMPLETED,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"github_pull_request_url": "https://github.com/owner/repo/pull/42",
"git_commit_count": 5,
"git_files_changed": 10,
"error_message": None,
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=(state, metadata))
response = client.get("/agent-work-orders/wo-test123")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert data["status"] == "completed"
assert data["git_branch_name"] == "feat-wo-test123"
assert data["github_pull_request_url"] == "https://github.com/owner/repo/pull/42"
def test_get_agent_work_order_not_found():
"""Test getting a non-existent work order"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=None)
response = client.get("/agent-work-orders/wo-nonexistent")
assert response.status_code == 404
def test_get_git_progress():
"""Test getting git progress"""
from src.agent_work_orders.models import AgentWorkOrderState
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.RUNNING,
"current_phase": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"git_commit_count": 3,
"git_files_changed": 7,
}
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=(state, metadata))
response = client.get("/agent-work-orders/wo-test123/git-progress")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert data["git_commit_count"] == 3
assert data["git_files_changed"] == 7
assert data["git_branch_name"] == "feat-wo-test123"
def test_get_git_progress_not_found():
"""Test getting git progress for non-existent work order"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get = AsyncMock(return_value=None)
response = client.get("/agent-work-orders/wo-nonexistent/git-progress")
assert response.status_code == 404
def test_send_prompt_to_agent():
"""Test sending prompt to agent (placeholder)"""
request_data = {
"agent_work_order_id": "wo-test123",
"prompt_text": "Continue with the next step",
}
response = client.post("/agent-work-orders/wo-test123/prompt", json=request_data)
# Currently returns success but doesn't actually send (Phase 2+)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
def test_get_logs():
"""Test getting logs (placeholder)"""
response = client.get("/agent-work-orders/wo-test123/logs")
# Currently returns empty logs (Phase 2+)
assert response.status_code == 200
data = response.json()
assert "log_entries" in data
assert len(data["log_entries"]) == 0
def test_verify_repository_success():
"""Test repository verification success"""
from src.agent_work_orders.models import GitHubRepository
mock_repo_info = GitHubRepository(
name="repo",
owner="owner",
default_branch="main",
url="https://github.com/owner/repo",
)
with patch("src.agent_work_orders.api.routes.github_client") as mock_client:
mock_client.verify_repository_access = AsyncMock(return_value=True)
mock_client.get_repository_info = AsyncMock(return_value=mock_repo_info)
request_data = {"repository_url": "https://github.com/owner/repo"}
response = client.post("/github/verify-repository", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["is_accessible"] is True
assert data["repository_name"] == "repo"
assert data["repository_owner"] == "owner"
assert data["default_branch"] == "main"
def test_verify_repository_failure():
"""Test repository verification failure"""
with patch("src.agent_work_orders.api.routes.github_client") as mock_client:
mock_client.verify_repository_access = AsyncMock(return_value=False)
request_data = {"repository_url": "https://github.com/owner/nonexistent"}
response = client.post("/github/verify-repository", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["is_accessible"] is False
assert data["error_message"] is not None
def test_get_agent_work_order_steps():
"""Test getting step history for a work order"""
from src.agent_work_orders.models import StepExecutionResult, StepHistory, WorkflowStep
# Create step history
step_history = StepHistory(
agent_work_order_id="wo-test123",
steps=[
StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
),
StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=5.0,
),
],
)
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get_step_history = AsyncMock(return_value=step_history)
response = client.get("/agent-work-orders/wo-test123/steps")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert len(data["steps"]) == 2
assert data["steps"][0]["step"] == "classify"
assert data["steps"][0]["agent_name"] == "classifier"
assert data["steps"][0]["success"] is True
assert data["steps"][1]["step"] == "plan"
assert data["steps"][1]["agent_name"] == "planner"
def test_get_agent_work_order_steps_not_found():
"""Test getting step history for non-existent work order"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get_step_history = AsyncMock(return_value=None)
response = client.get("/agent-work-orders/wo-nonexistent/steps")
assert response.status_code == 404
data = response.json()
assert "not found" in data["detail"].lower()
def test_get_agent_work_order_steps_empty():
"""Test getting empty step history"""
from src.agent_work_orders.models import StepHistory
step_history = StepHistory(agent_work_order_id="wo-test123", steps=[])
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
mock_repo.get_step_history = AsyncMock(return_value=step_history)
response = client.get("/agent-work-orders/wo-test123/steps")
assert response.status_code == 200
data = response.json()
assert data["agent_work_order_id"] == "wo-test123"
assert len(data["steps"]) == 0

View File

@@ -0,0 +1,83 @@
"""Tests for Command Loader"""
import pytest
from pathlib import Path
from tempfile import TemporaryDirectory
from src.agent_work_orders.command_loader.claude_command_loader import (
ClaudeCommandLoader,
)
from src.agent_work_orders.models import CommandNotFoundError
def test_load_command_success():
"""Test loading an existing command file"""
with TemporaryDirectory() as tmpdir:
# Create a test command file
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
command_file = commands_dir / "agent_workflow_plan.md"
command_file.write_text("# Test Command\n\nThis is a test command.")
loader = ClaudeCommandLoader(str(commands_dir))
command_path = loader.load_command("agent_workflow_plan")
assert command_path == str(command_file)
assert Path(command_path).exists()
def test_load_command_not_found():
"""Test loading a non-existent command file"""
with TemporaryDirectory() as tmpdir:
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
loader = ClaudeCommandLoader(str(commands_dir))
with pytest.raises(CommandNotFoundError) as exc_info:
loader.load_command("nonexistent_command")
assert "Command file not found" in str(exc_info.value)
def test_list_available_commands():
"""Test listing all available commands"""
with TemporaryDirectory() as tmpdir:
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
# Create multiple command files
(commands_dir / "agent_workflow_plan.md").write_text("Command 1")
(commands_dir / "agent_workflow_build.md").write_text("Command 2")
(commands_dir / "agent_workflow_test.md").write_text("Command 3")
loader = ClaudeCommandLoader(str(commands_dir))
commands = loader.list_available_commands()
assert len(commands) == 3
assert "agent_workflow_plan" in commands
assert "agent_workflow_build" in commands
assert "agent_workflow_test" in commands
def test_list_available_commands_empty_directory():
"""Test listing commands when directory is empty"""
with TemporaryDirectory() as tmpdir:
commands_dir = Path(tmpdir) / "commands"
commands_dir.mkdir()
loader = ClaudeCommandLoader(str(commands_dir))
commands = loader.list_available_commands()
assert len(commands) == 0
def test_list_available_commands_nonexistent_directory():
"""Test listing commands when directory doesn't exist"""
with TemporaryDirectory() as tmpdir:
nonexistent_dir = Path(tmpdir) / "nonexistent"
loader = ClaudeCommandLoader(str(nonexistent_dir))
commands = loader.list_available_commands()
assert len(commands) == 0

View File

@@ -0,0 +1,202 @@
"""Tests for GitHub Integration"""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.agent_work_orders.github_integration.github_client import GitHubClient
from src.agent_work_orders.models import GitHubOperationError
@pytest.mark.asyncio
async def test_verify_repository_access_success():
"""Test successful repository verification"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(return_value=(b"Repository info", b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await client.verify_repository_access("https://github.com/owner/repo")
assert result is True
@pytest.mark.asyncio
async def test_verify_repository_access_failure():
"""Test failed repository verification"""
client = GitHubClient()
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: Not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await client.verify_repository_access("https://github.com/owner/nonexistent")
assert result is False
@pytest.mark.asyncio
async def test_get_repository_info_success():
"""Test getting repository information"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_output = b'{"name": "repo", "owner": {"login": "owner"}, "defaultBranchRef": {"name": "main"}}'
mock_process.communicate = AsyncMock(return_value=(mock_output, b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
repo_info = await client.get_repository_info("https://github.com/owner/repo")
assert repo_info.name == "repo"
assert repo_info.owner == "owner"
assert repo_info.default_branch == "main"
assert repo_info.url == "https://github.com/owner/repo"
@pytest.mark.asyncio
async def test_get_repository_info_failure():
"""Test failed repository info retrieval"""
client = GitHubClient()
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: Not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(GitHubOperationError):
await client.get_repository_info("https://github.com/owner/nonexistent")
@pytest.mark.asyncio
async def test_create_pull_request_success():
"""Test successful PR creation"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(b"https://github.com/owner/repo/pull/42", b"")
)
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
pr = await client.create_pull_request(
repository_url="https://github.com/owner/repo",
head_branch="feat-wo-test123",
base_branch="main",
title="Test PR",
body="PR body",
)
assert pr.pull_request_url == "https://github.com/owner/repo/pull/42"
assert pr.pull_request_number == 42
assert pr.title == "Test PR"
assert pr.head_branch == "feat-wo-test123"
assert pr.base_branch == "main"
@pytest.mark.asyncio
async def test_create_pull_request_failure():
"""Test failed PR creation"""
client = GitHubClient()
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: PR creation failed"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(GitHubOperationError):
await client.create_pull_request(
repository_url="https://github.com/owner/repo",
head_branch="feat-wo-test123",
base_branch="main",
title="Test PR",
body="PR body",
)
def test_parse_repository_url_https():
"""Test parsing HTTPS repository URL"""
client = GitHubClient()
owner, repo = client._parse_repository_url("https://github.com/owner/repo")
assert owner == "owner"
assert repo == "repo"
def test_parse_repository_url_https_with_git():
"""Test parsing HTTPS repository URL with .git"""
client = GitHubClient()
owner, repo = client._parse_repository_url("https://github.com/owner/repo.git")
assert owner == "owner"
assert repo == "repo"
def test_parse_repository_url_short_format():
"""Test parsing short format repository URL"""
client = GitHubClient()
owner, repo = client._parse_repository_url("owner/repo")
assert owner == "owner"
assert repo == "repo"
def test_parse_repository_url_invalid():
"""Test parsing invalid repository URL"""
client = GitHubClient()
with pytest.raises(ValueError):
client._parse_repository_url("invalid-url")
with pytest.raises(ValueError):
client._parse_repository_url("owner/repo/extra")
@pytest.mark.asyncio
async def test_get_issue_success():
"""Test successful GitHub issue fetch"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
issue_json = json.dumps({
"number": 42,
"title": "Add login feature",
"body": "Users need to log in with email and password",
"state": "open",
"url": "https://github.com/owner/repo/issues/42"
})
mock_process.communicate = AsyncMock(return_value=(issue_json.encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
issue_data = await client.get_issue("https://github.com/owner/repo", "42")
assert issue_data["number"] == 42
assert issue_data["title"] == "Add login feature"
assert issue_data["state"] == "open"
@pytest.mark.asyncio
async def test_get_issue_failure():
"""Test failed GitHub issue fetch"""
client = GitHubClient()
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Issue not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(GitHubOperationError, match="Failed to fetch issue"):
await client.get_issue("https://github.com/owner/repo", "999")

View File

@@ -0,0 +1,32 @@
"""Tests for ID Generator"""
from src.agent_work_orders.utils.id_generator import (
generate_work_order_id,
generate_sandbox_identifier,
)
def test_generate_work_order_id_format():
"""Test work order ID format"""
work_order_id = generate_work_order_id()
assert work_order_id.startswith("wo-")
assert len(work_order_id) == 11 # "wo-" + 8 hex chars
# Verify it's hex
hex_part = work_order_id[3:]
assert all(c in "0123456789abcdef" for c in hex_part)
def test_generate_work_order_id_uniqueness():
"""Test that generated IDs are unique"""
ids = [generate_work_order_id() for _ in range(100)]
assert len(ids) == len(set(ids)) # All unique
def test_generate_sandbox_identifier():
"""Test sandbox identifier generation"""
work_order_id = "wo-test123"
sandbox_id = generate_sandbox_identifier(work_order_id)
assert sandbox_id == "sandbox-wo-test123"
assert sandbox_id.startswith("sandbox-")

View File

@@ -0,0 +1,300 @@
"""Tests for Agent Work Orders Models"""
import pytest
from datetime import datetime
from src.agent_work_orders.models import (
AgentWorkOrder,
AgentWorkOrderState,
AgentWorkOrderStatus,
AgentWorkflowPhase,
AgentWorkflowType,
CommandExecutionResult,
CreateAgentWorkOrderRequest,
SandboxType,
StepExecutionResult,
StepHistory,
WorkflowStep,
)
def test_agent_work_order_status_enum():
"""Test AgentWorkOrderStatus enum values"""
assert AgentWorkOrderStatus.PENDING.value == "pending"
assert AgentWorkOrderStatus.RUNNING.value == "running"
assert AgentWorkOrderStatus.COMPLETED.value == "completed"
assert AgentWorkOrderStatus.FAILED.value == "failed"
def test_agent_workflow_type_enum():
"""Test AgentWorkflowType enum values"""
assert AgentWorkflowType.PLAN.value == "agent_workflow_plan"
def test_sandbox_type_enum():
"""Test SandboxType enum values"""
assert SandboxType.GIT_BRANCH.value == "git_branch"
assert SandboxType.GIT_WORKTREE.value == "git_worktree"
assert SandboxType.E2B.value == "e2b"
assert SandboxType.DAGGER.value == "dagger"
def test_agent_workflow_phase_enum():
"""Test AgentWorkflowPhase enum values"""
assert AgentWorkflowPhase.PLANNING.value == "planning"
assert AgentWorkflowPhase.COMPLETED.value == "completed"
def test_agent_work_order_state_creation():
"""Test creating AgentWorkOrderState"""
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
assert state.agent_work_order_id == "wo-test123"
assert state.repository_url == "https://github.com/owner/repo"
assert state.sandbox_identifier == "sandbox-wo-test123"
assert state.git_branch_name is None
assert state.agent_session_id is None
def test_agent_work_order_creation():
"""Test creating complete AgentWorkOrder"""
now = datetime.now()
work_order = AgentWorkOrder(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name="feat-wo-test123",
agent_session_id="session-123",
workflow_type=AgentWorkflowType.PLAN,
sandbox_type=SandboxType.GIT_BRANCH,
github_issue_number="42",
status=AgentWorkOrderStatus.RUNNING,
current_phase=AgentWorkflowPhase.PLANNING,
created_at=now,
updated_at=now,
github_pull_request_url=None,
git_commit_count=0,
git_files_changed=0,
error_message=None,
)
assert work_order.agent_work_order_id == "wo-test123"
assert work_order.workflow_type == AgentWorkflowType.PLAN
assert work_order.status == AgentWorkOrderStatus.RUNNING
assert work_order.current_phase == AgentWorkflowPhase.PLANNING
def test_create_agent_work_order_request():
"""Test CreateAgentWorkOrderRequest validation"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
workflow_type=AgentWorkflowType.PLAN,
user_request="Add user authentication feature",
github_issue_number="42",
)
assert request.repository_url == "https://github.com/owner/repo"
assert request.sandbox_type == SandboxType.GIT_BRANCH
assert request.workflow_type == AgentWorkflowType.PLAN
assert request.user_request == "Add user authentication feature"
assert request.github_issue_number == "42"
def test_create_agent_work_order_request_optional_fields():
"""Test CreateAgentWorkOrderRequest with optional fields"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
workflow_type=AgentWorkflowType.PLAN,
user_request="Fix the login bug",
)
assert request.user_request == "Fix the login bug"
assert request.github_issue_number is None
def test_create_agent_work_order_request_with_user_request():
"""Test CreateAgentWorkOrderRequest with user_request field"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
workflow_type=AgentWorkflowType.PLAN,
user_request="Add user authentication with JWT tokens",
)
assert request.user_request == "Add user authentication with JWT tokens"
assert request.repository_url == "https://github.com/owner/repo"
assert request.github_issue_number is None
def test_create_agent_work_order_request_with_github_issue():
"""Test CreateAgentWorkOrderRequest with both user_request and issue number"""
request = CreateAgentWorkOrderRequest(
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
workflow_type=AgentWorkflowType.PLAN,
user_request="Implement the feature described in issue #42",
github_issue_number="42",
)
assert request.user_request == "Implement the feature described in issue #42"
assert request.github_issue_number == "42"
def test_workflow_step_enum():
"""Test WorkflowStep enum values"""
assert WorkflowStep.CLASSIFY.value == "classify"
assert WorkflowStep.PLAN.value == "plan"
assert WorkflowStep.FIND_PLAN.value == "find_plan"
assert WorkflowStep.IMPLEMENT.value == "implement"
assert WorkflowStep.GENERATE_BRANCH.value == "generate_branch"
assert WorkflowStep.COMMIT.value == "commit"
assert WorkflowStep.REVIEW.value == "review"
assert WorkflowStep.TEST.value == "test"
assert WorkflowStep.CREATE_PR.value == "create_pr"
def test_step_execution_result_success():
"""Test creating successful StepExecutionResult"""
result = StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.5,
session_id="session-123",
)
assert result.step == WorkflowStep.CLASSIFY
assert result.agent_name == "classifier"
assert result.success is True
assert result.output == "/feature"
assert result.error_message is None
assert result.duration_seconds == 1.5
assert result.session_id == "session-123"
assert isinstance(result.timestamp, datetime)
def test_step_execution_result_failure():
"""Test creating failed StepExecutionResult"""
result = StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=False,
error_message="Planning failed: timeout",
duration_seconds=30.0,
)
assert result.step == WorkflowStep.PLAN
assert result.agent_name == "planner"
assert result.success is False
assert result.output is None
assert result.error_message == "Planning failed: timeout"
assert result.duration_seconds == 30.0
assert result.session_id is None
def test_step_history_creation():
"""Test creating StepHistory"""
history = StepHistory(agent_work_order_id="wo-test123", steps=[])
assert history.agent_work_order_id == "wo-test123"
assert len(history.steps) == 0
def test_step_history_with_steps():
"""Test StepHistory with multiple steps"""
step1 = StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
)
step2 = StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=5.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[step1, step2])
assert history.agent_work_order_id == "wo-test123"
assert len(history.steps) == 2
assert history.steps[0].step == WorkflowStep.CLASSIFY
assert history.steps[1].step == WorkflowStep.PLAN
def test_step_history_get_current_step_initial():
"""Test get_current_step returns CLASSIFY when no steps"""
history = StepHistory(agent_work_order_id="wo-test123", steps=[])
assert history.get_current_step() == WorkflowStep.CLASSIFY
def test_step_history_get_current_step_retry_failed():
"""Test get_current_step returns same step when failed"""
failed_step = StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=False,
error_message="Planning failed",
duration_seconds=5.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[failed_step])
assert history.get_current_step() == WorkflowStep.PLAN
def test_step_history_get_current_step_next():
"""Test get_current_step returns next step after success"""
classify_step = StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[classify_step])
assert history.get_current_step() == WorkflowStep.PLAN
def test_command_execution_result_with_result_text():
"""Test CommandExecutionResult includes result_text field"""
result = CommandExecutionResult(
success=True,
stdout='{"type":"result","result":"/feature"}',
result_text="/feature",
stderr=None,
exit_code=0,
session_id="session-123",
)
assert result.result_text == "/feature"
assert result.stdout == '{"type":"result","result":"/feature"}'
assert result.success is True
def test_command_execution_result_without_result_text():
"""Test CommandExecutionResult works without result_text (backward compatibility)"""
result = CommandExecutionResult(
success=True,
stdout="raw output",
stderr=None,
exit_code=0,
)
assert result.result_text is None
assert result.stdout == "raw output"

View File

@@ -0,0 +1,205 @@
"""Tests for Sandbox Manager"""
import pytest
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from tempfile import TemporaryDirectory
from src.agent_work_orders.models import SandboxSetupError, SandboxType
from src.agent_work_orders.sandbox_manager.git_branch_sandbox import GitBranchSandbox
from src.agent_work_orders.sandbox_manager.sandbox_factory import SandboxFactory
@pytest.mark.asyncio
async def test_git_branch_sandbox_setup_success():
"""Test successful sandbox setup"""
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(return_value=(b"Cloning...", b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await sandbox.setup()
assert Path(sandbox.working_dir).name == "sandbox-test"
@pytest.mark.asyncio
async def test_git_branch_sandbox_setup_failure():
"""Test failed sandbox setup"""
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: Repository not found"))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(SandboxSetupError) as exc_info:
await sandbox.setup()
assert "Failed to clone repository" in str(exc_info.value)
@pytest.mark.asyncio
async def test_git_branch_sandbox_execute_command_success():
"""Test successful command execution in sandbox"""
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(return_value=(b"Command output", b""))
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await sandbox.execute_command("echo 'test'", timeout=10)
assert result.success is True
assert result.exit_code == 0
assert result.stdout == "Command output"
@pytest.mark.asyncio
async def test_git_branch_sandbox_execute_command_failure():
"""Test failed command execution in sandbox"""
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
# Mock subprocess failure
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Command failed"))
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await sandbox.execute_command("false", timeout=10)
assert result.success is False
assert result.exit_code == 1
assert result.error_message is not None
@pytest.mark.asyncio
async def test_git_branch_sandbox_execute_command_timeout():
"""Test command execution timeout in sandbox"""
import asyncio
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
# Mock subprocess that times out
mock_process = MagicMock()
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
async def mock_communicate():
await asyncio.sleep(10)
return (b"", b"")
mock_process.communicate = mock_communicate
with patch("asyncio.create_subprocess_shell", return_value=mock_process):
result = await sandbox.execute_command("sleep 100", timeout=0.1)
assert result.success is False
assert result.exit_code == -1
assert "timed out" in result.error_message.lower()
@pytest.mark.asyncio
async def test_git_branch_sandbox_get_git_branch_name():
"""Test getting current git branch name"""
with TemporaryDirectory() as tmpdir:
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = tmpdir
with patch(
"src.agent_work_orders.sandbox_manager.git_branch_sandbox.get_current_branch",
new=AsyncMock(return_value="feat-wo-test123"),
):
branch = await sandbox.get_git_branch_name()
assert branch == "feat-wo-test123"
@pytest.mark.asyncio
async def test_git_branch_sandbox_cleanup():
"""Test sandbox cleanup"""
with TemporaryDirectory() as tmpdir:
test_dir = Path(tmpdir) / "sandbox-test"
test_dir.mkdir()
(test_dir / "test.txt").write_text("test")
sandbox = GitBranchSandbox(
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
sandbox.working_dir = str(test_dir)
await sandbox.cleanup()
assert not test_dir.exists()
def test_sandbox_factory_git_branch():
"""Test creating git branch sandbox via factory"""
factory = SandboxFactory()
sandbox = factory.create_sandbox(
sandbox_type=SandboxType.GIT_BRANCH,
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
assert isinstance(sandbox, GitBranchSandbox)
assert sandbox.repository_url == "https://github.com/owner/repo"
assert sandbox.sandbox_identifier == "sandbox-test"
def test_sandbox_factory_not_implemented():
"""Test creating unsupported sandbox types"""
factory = SandboxFactory()
with pytest.raises(NotImplementedError):
factory.create_sandbox(
sandbox_type=SandboxType.GIT_WORKTREE,
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
with pytest.raises(NotImplementedError):
factory.create_sandbox(
sandbox_type=SandboxType.E2B,
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)
with pytest.raises(NotImplementedError):
factory.create_sandbox(
sandbox_type=SandboxType.DAGGER,
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-test",
)

View File

@@ -0,0 +1,314 @@
"""Tests for State Manager"""
import pytest
from datetime import datetime
from src.agent_work_orders.models import (
AgentWorkOrderState,
AgentWorkOrderStatus,
AgentWorkflowType,
SandboxType,
StepExecutionResult,
StepHistory,
WorkflowStep,
)
from src.agent_work_orders.state_manager.work_order_repository import (
WorkOrderRepository,
)
@pytest.mark.asyncio
async def test_create_work_order():
"""Test creating a work order"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
result = await repo.get("wo-test123")
assert result is not None
retrieved_state, retrieved_metadata = result
assert retrieved_state.agent_work_order_id == "wo-test123"
assert retrieved_metadata["status"] == AgentWorkOrderStatus.PENDING
@pytest.mark.asyncio
async def test_get_nonexistent_work_order():
"""Test getting a work order that doesn't exist"""
repo = WorkOrderRepository()
result = await repo.get("wo-nonexistent")
assert result is None
@pytest.mark.asyncio
async def test_list_work_orders():
"""Test listing all work orders"""
repo = WorkOrderRepository()
# Create multiple work orders
for i in range(3):
state = AgentWorkOrderState(
agent_work_order_id=f"wo-test{i}",
repository_url="https://github.com/owner/repo",
sandbox_identifier=f"sandbox-wo-test{i}",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
results = await repo.list()
assert len(results) == 3
@pytest.mark.asyncio
async def test_list_work_orders_with_status_filter():
"""Test listing work orders filtered by status"""
repo = WorkOrderRepository()
# Create work orders with different statuses
for i, status in enumerate([AgentWorkOrderStatus.PENDING, AgentWorkOrderStatus.RUNNING, AgentWorkOrderStatus.COMPLETED]):
state = AgentWorkOrderState(
agent_work_order_id=f"wo-test{i}",
repository_url="https://github.com/owner/repo",
sandbox_identifier=f"sandbox-wo-test{i}",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": status,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Filter by RUNNING
results = await repo.list(status_filter=AgentWorkOrderStatus.RUNNING)
assert len(results) == 1
assert results[0][1]["status"] == AgentWorkOrderStatus.RUNNING
@pytest.mark.asyncio
async def test_update_status():
"""Test updating work order status"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update status
await repo.update_status("wo-test123", AgentWorkOrderStatus.RUNNING)
result = await repo.get("wo-test123")
assert result is not None
_, updated_metadata = result
assert updated_metadata["status"] == AgentWorkOrderStatus.RUNNING
@pytest.mark.asyncio
async def test_update_status_with_additional_fields():
"""Test updating status with additional fields"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update with additional fields
await repo.update_status(
"wo-test123",
AgentWorkOrderStatus.COMPLETED,
github_pull_request_url="https://github.com/owner/repo/pull/1",
)
result = await repo.get("wo-test123")
assert result is not None
_, updated_metadata = result
assert updated_metadata["status"] == AgentWorkOrderStatus.COMPLETED
assert updated_metadata["github_pull_request_url"] == "https://github.com/owner/repo/pull/1"
@pytest.mark.asyncio
async def test_update_git_branch():
"""Test updating git branch name"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update git branch
await repo.update_git_branch("wo-test123", "feat-wo-test123")
result = await repo.get("wo-test123")
assert result is not None
updated_state, _ = result
assert updated_state.git_branch_name == "feat-wo-test123"
@pytest.mark.asyncio
async def test_update_session_id():
"""Test updating agent session ID"""
repo = WorkOrderRepository()
state = AgentWorkOrderState(
agent_work_order_id="wo-test123",
repository_url="https://github.com/owner/repo",
sandbox_identifier="sandbox-wo-test123",
git_branch_name=None,
agent_session_id=None,
)
metadata = {
"workflow_type": AgentWorkflowType.PLAN,
"sandbox_type": SandboxType.GIT_BRANCH,
"status": AgentWorkOrderStatus.PENDING,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
await repo.create(state, metadata)
# Update session ID
await repo.update_session_id("wo-test123", "session-abc123")
result = await repo.get("wo-test123")
assert result is not None
updated_state, _ = result
assert updated_state.agent_session_id == "session-abc123"
@pytest.mark.asyncio
async def test_save_and_get_step_history():
"""Test saving and retrieving step history"""
repo = WorkOrderRepository()
step1 = StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
)
step2 = StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=5.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[step1, step2])
await repo.save_step_history("wo-test123", history)
retrieved = await repo.get_step_history("wo-test123")
assert retrieved is not None
assert retrieved.agent_work_order_id == "wo-test123"
assert len(retrieved.steps) == 2
assert retrieved.steps[0].step == WorkflowStep.CLASSIFY
assert retrieved.steps[1].step == WorkflowStep.PLAN
@pytest.mark.asyncio
async def test_get_nonexistent_step_history():
"""Test getting step history that doesn't exist"""
repo = WorkOrderRepository()
retrieved = await repo.get_step_history("wo-nonexistent")
assert retrieved is None
@pytest.mark.asyncio
async def test_update_step_history():
"""Test updating step history with new steps"""
repo = WorkOrderRepository()
# Initial history
step1 = StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
)
history = StepHistory(agent_work_order_id="wo-test123", steps=[step1])
await repo.save_step_history("wo-test123", history)
# Add more steps
step2 = StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=5.0,
)
history.steps.append(step2)
await repo.save_step_history("wo-test123", history)
# Verify updated history
retrieved = await repo.get_step_history("wo-test123")
assert retrieved is not None
assert len(retrieved.steps) == 2

View File

@@ -0,0 +1,614 @@
"""Tests for Workflow Engine"""
import pytest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import AsyncMock, MagicMock, patch
from src.agent_work_orders.models import (
AgentWorkOrderStatus,
AgentWorkflowPhase,
AgentWorkflowType,
SandboxType,
WorkflowExecutionError,
)
from src.agent_work_orders.workflow_engine.workflow_phase_tracker import (
WorkflowPhaseTracker,
)
from src.agent_work_orders.workflow_engine.workflow_orchestrator import (
WorkflowOrchestrator,
)
@pytest.mark.asyncio
async def test_phase_tracker_planning_phase():
"""Test detecting planning phase"""
tracker = WorkflowPhaseTracker()
with TemporaryDirectory() as tmpdir:
with patch(
"src.agent_work_orders.utils.git_operations.get_commit_count",
return_value=0,
):
with patch(
"src.agent_work_orders.utils.git_operations.has_planning_commits",
return_value=False,
):
phase = await tracker.get_current_phase("feat-wo-test", tmpdir)
assert phase == AgentWorkflowPhase.PLANNING
@pytest.mark.asyncio
async def test_phase_tracker_completed_phase():
"""Test detecting completed phase"""
tracker = WorkflowPhaseTracker()
with TemporaryDirectory() as tmpdir:
with patch(
"src.agent_work_orders.utils.git_operations.get_commit_count",
return_value=3,
):
with patch(
"src.agent_work_orders.utils.git_operations.has_planning_commits",
return_value=True,
):
phase = await tracker.get_current_phase("feat-wo-test", tmpdir)
assert phase == AgentWorkflowPhase.COMPLETED
@pytest.mark.asyncio
async def test_phase_tracker_git_progress_snapshot():
"""Test creating git progress snapshot"""
tracker = WorkflowPhaseTracker()
with TemporaryDirectory() as tmpdir:
with patch(
"src.agent_work_orders.utils.git_operations.get_commit_count",
return_value=5,
):
with patch(
"src.agent_work_orders.utils.git_operations.get_files_changed",
return_value=10,
):
with patch(
"src.agent_work_orders.utils.git_operations.get_latest_commit_message",
return_value="plan: Create implementation plan",
):
with patch(
"src.agent_work_orders.utils.git_operations.has_planning_commits",
return_value=True,
):
snapshot = await tracker.get_git_progress_snapshot(
"wo-test123", "feat-wo-test", tmpdir
)
assert snapshot.agent_work_order_id == "wo-test123"
assert snapshot.current_phase == AgentWorkflowPhase.COMPLETED
assert snapshot.git_commit_count == 5
assert snapshot.git_files_changed == 10
assert snapshot.latest_commit_message == "plan: Create implementation plan"
@pytest.mark.asyncio
async def test_workflow_orchestrator_success():
"""Test successful workflow execution with atomic operations"""
from src.agent_work_orders.models import StepExecutionResult, WorkflowStep
# Create mocks for dependencies
mock_agent_executor = MagicMock()
mock_sandbox_factory = MagicMock()
mock_sandbox = MagicMock()
mock_sandbox.setup = AsyncMock()
mock_sandbox.cleanup = AsyncMock()
mock_sandbox.working_dir = "/tmp/sandbox"
mock_sandbox_factory.create_sandbox = MagicMock(return_value=mock_sandbox)
mock_github_client = MagicMock()
mock_phase_tracker = MagicMock()
mock_command_loader = MagicMock()
mock_state_repository = MagicMock()
mock_state_repository.update_status = AsyncMock()
mock_state_repository.update_git_branch = AsyncMock()
mock_state_repository.save_step_history = AsyncMock()
# Mock workflow operations to return successful results
with patch("src.agent_work_orders.workflow_engine.workflow_orchestrator.workflow_operations") as mock_ops:
mock_ops.classify_issue = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
)
)
mock_ops.build_plan = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=5.0,
)
)
mock_ops.find_plan_file = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.FIND_PLAN,
agent_name="plan_finder",
success=True,
output="specs/issue-42-wo-test123-planner-feature.md",
duration_seconds=1.0,
)
)
mock_ops.generate_branch = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.GENERATE_BRANCH,
agent_name="branch_generator",
success=True,
output="feat-issue-42-wo-test123",
duration_seconds=2.0,
)
)
mock_ops.implement_plan = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.IMPLEMENT,
agent_name="implementor",
success=True,
output="Implementation completed",
duration_seconds=10.0,
)
)
mock_ops.create_commit = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.COMMIT,
agent_name="committer",
success=True,
output="implementor: feat: add feature",
duration_seconds=1.0,
)
)
mock_ops.create_pull_request = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.CREATE_PR,
agent_name="pr_creator",
success=True,
output="https://github.com/owner/repo/pull/42",
duration_seconds=2.0,
)
)
orchestrator = WorkflowOrchestrator(
agent_executor=mock_agent_executor,
sandbox_factory=mock_sandbox_factory,
github_client=mock_github_client,
phase_tracker=mock_phase_tracker,
command_loader=mock_command_loader,
state_repository=mock_state_repository,
)
# Execute workflow
await orchestrator.execute_workflow(
agent_work_order_id="wo-test123",
workflow_type=AgentWorkflowType.PLAN,
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Add new user authentication feature",
github_issue_number="42",
github_issue_json='{"title": "Add feature"}',
)
# Verify all workflow operations were called
mock_ops.classify_issue.assert_called_once()
mock_ops.build_plan.assert_called_once()
mock_ops.find_plan_file.assert_called_once()
mock_ops.generate_branch.assert_called_once()
mock_ops.implement_plan.assert_called_once()
mock_ops.create_commit.assert_called_once()
mock_ops.create_pull_request.assert_called_once()
# Verify sandbox operations
mock_sandbox_factory.create_sandbox.assert_called_once()
mock_sandbox.setup.assert_called_once()
mock_sandbox.cleanup.assert_called_once()
# Verify state updates
assert mock_state_repository.update_status.call_count >= 2
mock_state_repository.update_git_branch.assert_called_once_with(
"wo-test123", "feat-issue-42-wo-test123"
)
# Verify step history was saved incrementally (7 steps + 1 final save = 8 total)
assert mock_state_repository.save_step_history.call_count == 8
@pytest.mark.asyncio
async def test_workflow_orchestrator_agent_failure():
"""Test workflow execution with step failure"""
from src.agent_work_orders.models import StepExecutionResult, WorkflowStep
# Create mocks for dependencies
mock_agent_executor = MagicMock()
mock_sandbox_factory = MagicMock()
mock_sandbox = MagicMock()
mock_sandbox.setup = AsyncMock()
mock_sandbox.cleanup = AsyncMock()
mock_sandbox.working_dir = "/tmp/sandbox"
mock_sandbox_factory.create_sandbox = MagicMock(return_value=mock_sandbox)
mock_github_client = MagicMock()
mock_phase_tracker = MagicMock()
mock_command_loader = MagicMock()
mock_state_repository = MagicMock()
mock_state_repository.update_status = AsyncMock()
mock_state_repository.save_step_history = AsyncMock()
# Mock workflow operations - classification fails
with patch("src.agent_work_orders.workflow_engine.workflow_orchestrator.workflow_operations") as mock_ops:
mock_ops.classify_issue = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=False,
error_message="Classification failed",
duration_seconds=1.0,
)
)
orchestrator = WorkflowOrchestrator(
agent_executor=mock_agent_executor,
sandbox_factory=mock_sandbox_factory,
github_client=mock_github_client,
phase_tracker=mock_phase_tracker,
command_loader=mock_command_loader,
state_repository=mock_state_repository,
)
# Execute workflow
await orchestrator.execute_workflow(
agent_work_order_id="wo-test123",
workflow_type=AgentWorkflowType.PLAN,
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Fix the critical bug in login system",
github_issue_json='{"title": "Test"}',
)
# Verify classification was attempted
mock_ops.classify_issue.assert_called_once()
# Verify cleanup happened
mock_sandbox.cleanup.assert_called_once()
# Verify step history was saved even on failure (incremental + error handler = 2 times)
assert mock_state_repository.save_step_history.call_count == 2
# Check that status was updated to FAILED
calls = [call for call in mock_state_repository.update_status.call_args_list]
assert any(
call[0][1] == AgentWorkOrderStatus.FAILED or call.kwargs.get("status") == AgentWorkOrderStatus.FAILED
for call in calls
)
@pytest.mark.asyncio
async def test_workflow_orchestrator_pr_creation_failure():
"""Test workflow execution with PR creation failure"""
from src.agent_work_orders.models import StepExecutionResult, WorkflowStep
# Create mocks for dependencies
mock_agent_executor = MagicMock()
mock_sandbox_factory = MagicMock()
mock_sandbox = MagicMock()
mock_sandbox.setup = AsyncMock()
mock_sandbox.cleanup = AsyncMock()
mock_sandbox.working_dir = "/tmp/sandbox"
mock_sandbox_factory.create_sandbox = MagicMock(return_value=mock_sandbox)
mock_github_client = MagicMock()
mock_phase_tracker = MagicMock()
mock_command_loader = MagicMock()
mock_state_repository = MagicMock()
mock_state_repository.update_status = AsyncMock()
mock_state_repository.update_git_branch = AsyncMock()
mock_state_repository.save_step_history = AsyncMock()
# Mock workflow operations - all succeed except PR creation
with patch("src.agent_work_orders.workflow_engine.workflow_orchestrator.workflow_operations") as mock_ops:
mock_ops.classify_issue = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
)
)
mock_ops.build_plan = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=5.0,
)
)
mock_ops.find_plan_file = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.FIND_PLAN,
agent_name="plan_finder",
success=True,
output="specs/plan.md",
duration_seconds=1.0,
)
)
mock_ops.generate_branch = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.GENERATE_BRANCH,
agent_name="branch_generator",
success=True,
output="feat-issue-42",
duration_seconds=2.0,
)
)
mock_ops.implement_plan = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.IMPLEMENT,
agent_name="implementor",
success=True,
output="Implementation completed",
duration_seconds=10.0,
)
)
mock_ops.create_commit = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.COMMIT,
agent_name="committer",
success=True,
output="implementor: feat: add feature",
duration_seconds=1.0,
)
)
# PR creation fails
mock_ops.create_pull_request = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.CREATE_PR,
agent_name="pr_creator",
success=False,
error_message="GitHub API error",
duration_seconds=2.0,
)
)
orchestrator = WorkflowOrchestrator(
agent_executor=mock_agent_executor,
sandbox_factory=mock_sandbox_factory,
github_client=mock_github_client,
phase_tracker=mock_phase_tracker,
command_loader=mock_command_loader,
state_repository=mock_state_repository,
)
# Execute workflow
await orchestrator.execute_workflow(
agent_work_order_id="wo-test123",
workflow_type=AgentWorkflowType.PLAN,
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Implement feature from issue 42",
github_issue_number="42",
github_issue_json='{"title": "Add feature"}',
)
# Verify PR creation was attempted
mock_ops.create_pull_request.assert_called_once()
# Verify workflow still marked as completed (PR failure is not critical)
calls = [call for call in mock_state_repository.update_status.call_args_list]
assert any(
call[0][1] == AgentWorkOrderStatus.COMPLETED or call.kwargs.get("status") == AgentWorkOrderStatus.COMPLETED
for call in calls
)
# Verify step history was saved incrementally (7 steps + 1 final save = 8 total)
assert mock_state_repository.save_step_history.call_count == 8
@pytest.mark.asyncio
async def test_orchestrator_saves_step_history_incrementally():
"""Test that step history is saved after each step, not just at the end"""
from src.agent_work_orders.models import (
CommandExecutionResult,
StepExecutionResult,
WorkflowStep,
)
from src.agent_work_orders.workflow_engine.agent_names import CLASSIFIER
# Create mocks
mock_executor = MagicMock()
mock_sandbox_factory = MagicMock()
mock_github_client = MagicMock()
mock_phase_tracker = MagicMock()
mock_command_loader = MagicMock()
mock_state_repository = MagicMock()
# Track save_step_history calls
save_calls = []
async def track_save(wo_id, history):
save_calls.append(len(history.steps))
mock_state_repository.save_step_history = AsyncMock(side_effect=track_save)
mock_state_repository.update_status = AsyncMock()
mock_state_repository.update_git_branch = AsyncMock()
# Mock sandbox
mock_sandbox = MagicMock()
mock_sandbox.working_dir = "/tmp/test"
mock_sandbox.setup = AsyncMock()
mock_sandbox.cleanup = AsyncMock()
mock_sandbox_factory.create_sandbox = MagicMock(return_value=mock_sandbox)
# Mock GitHub client
mock_github_client.get_issue = AsyncMock(return_value={
"title": "Test Issue",
"body": "Test body"
})
# Create orchestrator
orchestrator = WorkflowOrchestrator(
agent_executor=mock_executor,
sandbox_factory=mock_sandbox_factory,
github_client=mock_github_client,
phase_tracker=mock_phase_tracker,
command_loader=mock_command_loader,
state_repository=mock_state_repository,
)
# Mock workflow operations to return success for all steps
with patch("src.agent_work_orders.workflow_engine.workflow_orchestrator.workflow_operations") as mock_ops:
# Mock successful results for each step
mock_ops.classify_issue = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name=CLASSIFIER,
success=True,
output="/feature",
duration_seconds=1.0,
)
)
mock_ops.build_plan = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=2.0,
)
)
mock_ops.find_plan_file = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.FIND_PLAN,
agent_name="plan_finder",
success=True,
output="specs/plan.md",
duration_seconds=0.5,
)
)
mock_ops.generate_branch = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.GENERATE_BRANCH,
agent_name="branch_generator",
success=True,
output="feat-issue-1-wo-test",
duration_seconds=1.0,
)
)
mock_ops.implement_plan = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.IMPLEMENT,
agent_name="implementor",
success=True,
output="Implementation complete",
duration_seconds=5.0,
)
)
mock_ops.create_commit = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.COMMIT,
agent_name="committer",
success=True,
output="Commit created",
duration_seconds=1.0,
)
)
mock_ops.create_pull_request = AsyncMock(
return_value=StepExecutionResult(
step=WorkflowStep.CREATE_PR,
agent_name="pr_creator",
success=True,
output="https://github.com/owner/repo/pull/1",
duration_seconds=1.0,
)
)
# Execute workflow
await orchestrator.execute_workflow(
agent_work_order_id="wo-test",
workflow_type=AgentWorkflowType.PLAN,
repository_url="https://github.com/owner/repo",
sandbox_type=SandboxType.GIT_BRANCH,
user_request="Test feature request",
)
# Verify save_step_history was called after EACH step (7 times) + final save (8 total)
# OR at minimum, verify it was called MORE than just once at the end
assert len(save_calls) >= 7, f"Expected at least 7 incremental saves, got {len(save_calls)}"
# Verify the progression: 1 step, 2 steps, 3 steps, etc.
assert save_calls[0] == 1, "First save should have 1 step"
assert save_calls[1] == 2, "Second save should have 2 steps"
assert save_calls[2] == 3, "Third save should have 3 steps"
assert save_calls[3] == 4, "Fourth save should have 4 steps"
assert save_calls[4] == 5, "Fifth save should have 5 steps"
assert save_calls[5] == 6, "Sixth save should have 6 steps"
assert save_calls[6] == 7, "Seventh save should have 7 steps"
@pytest.mark.asyncio
async def test_step_history_visible_during_execution():
"""Test that step history can be retrieved during workflow execution"""
from src.agent_work_orders.models import StepHistory
# Create real state repository (in-memory)
from src.agent_work_orders.state_manager.work_order_repository import WorkOrderRepository
state_repo = WorkOrderRepository()
# Create empty step history
step_history = StepHistory(agent_work_order_id="wo-test")
# Simulate incremental saves during workflow
from src.agent_work_orders.models import StepExecutionResult, WorkflowStep
# Step 1: Classify
step_history.steps.append(StepExecutionResult(
step=WorkflowStep.CLASSIFY,
agent_name="classifier",
success=True,
output="/feature",
duration_seconds=1.0,
))
await state_repo.save_step_history("wo-test", step_history)
# Retrieve and verify
retrieved = await state_repo.get_step_history("wo-test")
assert retrieved is not None
assert len(retrieved.steps) == 1
assert retrieved.steps[0].step == WorkflowStep.CLASSIFY
# Step 2: Plan
step_history.steps.append(StepExecutionResult(
step=WorkflowStep.PLAN,
agent_name="planner",
success=True,
output="Plan created",
duration_seconds=2.0,
))
await state_repo.save_step_history("wo-test", step_history)
# Retrieve and verify progression
retrieved = await state_repo.get_step_history("wo-test")
assert len(retrieved.steps) == 2
assert retrieved.steps[1].step == WorkflowStep.PLAN
# Verify both steps are present
assert retrieved.steps[0].step == WorkflowStep.CLASSIFY
assert retrieved.steps[1].step == WorkflowStep.PLAN

View File

@@ -0,0 +1,406 @@
"""Tests for Workflow Operations"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.agent_work_orders.models import (
CommandExecutionResult,
WorkflowStep,
)
from src.agent_work_orders.workflow_engine import workflow_operations
from src.agent_work_orders.workflow_engine.agent_names import (
BRANCH_GENERATOR,
CLASSIFIER,
COMMITTER,
IMPLEMENTOR,
PLAN_FINDER,
PLANNER,
PR_CREATOR,
)
@pytest.mark.asyncio
async def test_classify_issue_success():
"""Test successful issue classification"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="/feature",
result_text="/feature",
stderr=None,
exit_code=0,
session_id="session-123",
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/classifier.md")
result = await workflow_operations.classify_issue(
mock_executor,
mock_loader,
'{"title": "Add feature"}',
"wo-test",
"/tmp/working",
)
assert result.step == WorkflowStep.CLASSIFY
assert result.agent_name == CLASSIFIER
assert result.success is True
assert result.output == "/feature"
assert result.session_id == "session-123"
mock_loader.load_command.assert_called_once_with("classifier")
@pytest.mark.asyncio
async def test_classify_issue_failure():
"""Test failed issue classification"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=False,
stdout=None,
stderr="Error",
exit_code=1,
error_message="Classification failed",
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/classifier.md")
result = await workflow_operations.classify_issue(
mock_executor,
mock_loader,
'{"title": "Add feature"}',
"wo-test",
"/tmp/working",
)
assert result.step == WorkflowStep.CLASSIFY
assert result.agent_name == CLASSIFIER
assert result.success is False
assert result.error_message == "Classification failed"
@pytest.mark.asyncio
async def test_build_plan_feature_success():
"""Test successful feature plan creation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="Plan created successfully",
result_text="Plan created successfully",
stderr=None,
exit_code=0,
session_id="session-123",
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/planner_feature.md")
result = await workflow_operations.build_plan(
mock_executor,
mock_loader,
"/feature",
"42",
"wo-test",
'{"title": "Add feature"}',
"/tmp/working",
)
assert result.step == WorkflowStep.PLAN
assert result.agent_name == PLANNER
assert result.success is True
assert result.output == "Plan created successfully"
mock_loader.load_command.assert_called_once_with("planner_feature")
@pytest.mark.asyncio
async def test_build_plan_bug_success():
"""Test successful bug plan creation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="Bug plan created",
result_text="Bug plan created",
stderr=None,
exit_code=0,
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/planner_bug.md")
result = await workflow_operations.build_plan(
mock_executor,
mock_loader,
"/bug",
"42",
"wo-test",
'{"title": "Fix bug"}',
"/tmp/working",
)
assert result.success is True
mock_loader.load_command.assert_called_once_with("planner_bug")
@pytest.mark.asyncio
async def test_build_plan_invalid_class():
"""Test plan creation with invalid issue class"""
mock_executor = MagicMock()
mock_loader = MagicMock()
result = await workflow_operations.build_plan(
mock_executor,
mock_loader,
"/invalid",
"42",
"wo-test",
'{"title": "Test"}',
"/tmp/working",
)
assert result.step == WorkflowStep.PLAN
assert result.success is False
assert "Unknown issue class" in result.error_message
@pytest.mark.asyncio
async def test_find_plan_file_success():
"""Test successful plan file finding"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="specs/issue-42-wo-test-planner-feature.md",
result_text="specs/issue-42-wo-test-planner-feature.md",
stderr=None,
exit_code=0,
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/plan_finder.md")
result = await workflow_operations.find_plan_file(
mock_executor,
mock_loader,
"42",
"wo-test",
"Previous output",
"/tmp/working",
)
assert result.step == WorkflowStep.FIND_PLAN
assert result.agent_name == PLAN_FINDER
assert result.success is True
assert result.output == "specs/issue-42-wo-test-planner-feature.md"
@pytest.mark.asyncio
async def test_find_plan_file_not_found():
"""Test plan file not found"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="0",
result_text="0",
stderr=None,
exit_code=0,
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/plan_finder.md")
result = await workflow_operations.find_plan_file(
mock_executor,
mock_loader,
"42",
"wo-test",
"Previous output",
"/tmp/working",
)
assert result.success is False
assert result.error_message == "Plan file not found"
@pytest.mark.asyncio
async def test_implement_plan_success():
"""Test successful plan implementation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="Implementation completed",
result_text="Implementation completed",
stderr=None,
exit_code=0,
session_id="session-123",
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/implementor.md")
result = await workflow_operations.implement_plan(
mock_executor,
mock_loader,
"specs/plan.md",
"wo-test",
"/tmp/working",
)
assert result.step == WorkflowStep.IMPLEMENT
assert result.agent_name == IMPLEMENTOR
assert result.success is True
assert result.output == "Implementation completed"
@pytest.mark.asyncio
async def test_generate_branch_success():
"""Test successful branch generation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="feat-issue-42-wo-test-add-feature",
result_text="feat-issue-42-wo-test-add-feature",
stderr=None,
exit_code=0,
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/branch_generator.md")
result = await workflow_operations.generate_branch(
mock_executor,
mock_loader,
"/feature",
"42",
"wo-test",
'{"title": "Add feature"}',
"/tmp/working",
)
assert result.step == WorkflowStep.GENERATE_BRANCH
assert result.agent_name == BRANCH_GENERATOR
assert result.success is True
assert result.output == "feat-issue-42-wo-test-add-feature"
@pytest.mark.asyncio
async def test_create_commit_success():
"""Test successful commit creation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="implementor: feat: add user authentication",
result_text="implementor: feat: add user authentication",
stderr=None,
exit_code=0,
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/committer.md")
result = await workflow_operations.create_commit(
mock_executor,
mock_loader,
"implementor",
"/feature",
'{"title": "Add auth"}',
"wo-test",
"/tmp/working",
)
assert result.step == WorkflowStep.COMMIT
assert result.agent_name == COMMITTER
assert result.success is True
assert result.output == "implementor: feat: add user authentication"
@pytest.mark.asyncio
async def test_create_pull_request_success():
"""Test successful PR creation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=True,
stdout="https://github.com/owner/repo/pull/123",
result_text="https://github.com/owner/repo/pull/123",
stderr=None,
exit_code=0,
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/pr_creator.md")
result = await workflow_operations.create_pull_request(
mock_executor,
mock_loader,
"feat-issue-42",
'{"title": "Add feature"}',
"specs/plan.md",
"wo-test",
"/tmp/working",
)
assert result.step == WorkflowStep.CREATE_PR
assert result.agent_name == PR_CREATOR
assert result.success is True
assert result.output == "https://github.com/owner/repo/pull/123"
@pytest.mark.asyncio
async def test_create_pull_request_failure():
"""Test failed PR creation"""
mock_executor = MagicMock()
mock_executor.build_command = MagicMock(return_value=("cli command", "prompt"))
mock_executor.execute_async = AsyncMock(
return_value=CommandExecutionResult(
success=False,
stdout=None,
stderr="PR creation failed",
exit_code=1,
error_message="GitHub API error",
)
)
mock_loader = MagicMock()
mock_loader.load_command = MagicMock(return_value="/path/to/pr_creator.md")
result = await workflow_operations.create_pull_request(
mock_executor,
mock_loader,
"feat-issue-42",
'{"title": "Add feature"}',
"specs/plan.md",
"wo-test",
"/tmp/working",
)
assert result.success is False
assert result.error_message == "GitHub API error"