feat: Implement phases 3-5 of compositional workflow architecture

Completes the implementation of test/review workflows with automatic resolution
and integrates them into the orchestrator.

**Phase 3: Test Workflow with Resolution**
- Created test_workflow.py with automatic test failure resolution
- Implements retry loop with max 4 attempts (configurable via MAX_TEST_RETRY_ATTEMPTS)
- Parses JSON test results and resolves failures one by one
- Uses existing test.md and resolve_failed_test.md commands
- Added run_tests() and resolve_test_failure() to workflow_operations.py

**Phase 4: Review Workflow with Resolution**
- Created review_workflow.py with automatic blocker issue resolution
- Implements retry loop with max 3 attempts (configurable via MAX_REVIEW_RETRY_ATTEMPTS)
- Categorizes issues by severity (blocker/tech_debt/skippable)
- Only blocks on blocker issues - tech_debt and skippable allowed to pass
- Created review_runner.md and resolve_failed_review.md commands
- Added run_review() and resolve_review_issue() to workflow_operations.py
- Supports screenshot capture for UI review (configurable via ENABLE_SCREENSHOT_CAPTURE)

**Phase 5: Compositional Integration**
- Updated workflow_orchestrator.py to integrate test and review phases
- Test phase runs between commit and PR creation (if ENABLE_TEST_PHASE=true)
- Review phase runs after tests (if ENABLE_REVIEW_PHASE=true)
- Both phases are optional and controlled by config flags
- Step history tracks test and review execution results
- Proper error handling and logging for all phases

**Supporting Changes**
- Updated agent_names.py to add REVIEWER constant
- Added configuration flags to config.py for test/review phases
- All new code follows structured logging patterns
- Maintains compatibility with existing workflow steps

**Files Changed**: 19 files, 3035+ lines
- New: test_workflow.py, review_workflow.py, review commands
- Modified: orchestrator, workflow_operations, agent_names, config
- Phases 1-2 files (worktree, state, port allocation) also staged

The implementation is complete and ready for testing. All phases now support
parallel execution via worktree isolation with deterministic port allocation.
This commit is contained in:
Rasmus Widing
2025-10-08 22:23:49 +03:00
parent 9a60d6ae89
commit 1c0020946b
19 changed files with 3046 additions and 6 deletions

View File

@@ -0,0 +1,46 @@
# Resolve Failed Review Issue
Fix a specific blocker issue identified during the review phase.
## Arguments
1. review_issue_json: JSON string containing the review issue to fix
## Instructions
1. **Parse Review Issue**
- Extract issue_title, issue_description, issue_severity, and affected_files from the JSON
- Ensure this is a "blocker" severity issue (tech_debt and skippable are not resolved here)
2. **Understand the Issue**
- Read the issue description carefully
- Review the affected files listed
- If a spec file was referenced in the original review, re-read relevant sections
3. **Create Fix Plan**
- Determine what changes are needed to resolve the issue
- Identify all files that need to be modified
- Plan minimal, targeted changes
4. **Implement the Fix**
- Make only the changes necessary to resolve this specific issue
- Ensure code quality and consistency
- Follow project conventions and patterns
- Do not make unrelated changes
5. **Verify the Fix**
- Re-run relevant tests if applicable
- Check that the issue is actually resolved
- Ensure no new issues were introduced
## Review Issue Input
$ARGUMENT_1
## Report
Provide a concise summary of:
- Root cause of the blocker issue
- Specific changes made to resolve it
- Files modified
- Confirmation that the issue is resolved

View File

@@ -0,0 +1,101 @@
# Review Implementation Against Specification
Compare the current implementation against the specification file and identify any issues that need to be addressed before creating a pull request.
## Variables
REVIEW_TIMEOUT: 10 minutes
## Arguments
1. spec_file_path: Path to the specification file (e.g., "PRPs/specs/my-feature.md")
2. work_order_id: The work order ID for context
## Instructions
1. **Read the Specification**
- Read the specification file at `$ARGUMENT_1`
- Understand all requirements, acceptance criteria, and deliverables
- Note any specific constraints or implementation details
2. **Analyze Current Implementation**
- Review the code changes made in the current branch
- Check if all files mentioned in the spec have been created/modified
- Verify implementation matches the spec requirements
3. **Capture Screenshots** (if applicable)
- If the feature includes UI components:
- Start the application if needed
- Take screenshots of key UI flows
- Save screenshots to `screenshots/wo-$ARGUMENT_2/` directory
- If no UI: skip this step
4. **Compare Implementation vs Specification**
- Identify any missing features or incomplete implementations
- Check for deviations from the spec
- Verify all acceptance criteria are met
- Look for potential bugs or issues
5. **Categorize Issues by Severity**
- **blocker**: Must be fixed before PR (breaks functionality, missing critical features)
- **tech_debt**: Should be fixed but can be addressed later
- **skippable**: Nice-to-have, documentation improvements, minor polish
6. **Generate Review Report**
- Return ONLY the JSON object as specified below
- Do not include any additional text, explanations, or markdown formatting
- List all issues found, even if none are blockers
## Report
Return ONLY a valid JSON object with the following structure:
```json
{
"review_passed": boolean,
"review_issues": [
{
"issue_title": "string",
"issue_description": "string",
"issue_severity": "blocker|tech_debt|skippable",
"affected_files": ["string"],
"screenshots": ["string"]
}
],
"screenshots": ["string"]
}
```
### Field Descriptions
- `review_passed`: true if no blocker issues found, false otherwise
- `review_issues`: Array of all issues found (blockers, tech_debt, skippable)
- `issue_severity`: Must be one of: "blocker", "tech_debt", "skippable"
- `affected_files`: List of file paths that need changes to fix this issue
- `screenshots`: List of screenshot file paths for this specific issue (if applicable)
- `screenshots` (root level): List of all screenshot paths taken during review
### Example Output
```json
{
"review_passed": false,
"review_issues": [
{
"issue_title": "Missing error handling in API endpoint",
"issue_description": "The /api/work-orders endpoint doesn't handle invalid repository URLs. The spec requires validation with clear error messages.",
"issue_severity": "blocker",
"affected_files": ["python/src/agent_work_orders/api/routes.py"],
"screenshots": []
},
{
"issue_title": "Incomplete test coverage",
"issue_description": "Only 60% test coverage achieved, spec requires >80%",
"issue_severity": "tech_debt",
"affected_files": ["python/tests/agent_work_orders/"],
"screenshots": []
}
],
"screenshots": []
}
```

12
.gitignore vendored
View File

@@ -5,6 +5,9 @@ __pycache__
PRPs/local PRPs/local
PRPs/completed/ PRPs/completed/
PRPs/stories/ PRPs/stories/
PRPs/examples
PRPs/features
PRPs/specs
PRPs/reviews/ PRPs/reviews/
/logs/ /logs/
.zed .zed
@@ -12,6 +15,15 @@ tmp/
temp/ temp/
UAT/ UAT/
# Temporary validation/report markdown files
/*_RESULTS.md
/*_SUMMARY.md
/*_REPORT.md
/*_SUCCESS.md
/*_COMPLETION*.md
/ACTUAL_*.md
/VALIDATION_*.md
.DS_Store .DS_Store
# Local release notes testing # Local release notes testing

View File

@@ -0,0 +1,946 @@
# Feature: Compositional Workflow Architecture with Worktree Isolation, Test Resolution, and Review Resolution
## Feature Description
Transform the agent-work-orders system from a centralized orchestrator pattern to a compositional script-based architecture that enables parallel execution through git worktrees, automatic test failure resolution with retry logic, and comprehensive review phase with blocker issue patching. This architecture change enables running 15+ work orders simultaneously in isolated worktrees with deterministic port allocation, while maintaining complete SDLC coverage from planning through testing and review.
The system will support:
- **Worktree-based isolation**: Each work order runs in its own git worktree under `trees/<work_order_id>/` instead of temporary clones
- **Port allocation**: Deterministic backend (9100-9114) and frontend (9200-9214) port assignment based on work order ID
- **Test phase with resolution**: Automatic retry loop (max 4 attempts) that resolves failed tests using AI-powered fixes
- **Review phase with resolution**: Captures screenshots, compares implementation vs spec, categorizes issues (blocker/tech_debt/skippable), and automatically patches blocker issues (max 3 attempts)
- **File-based state**: Simple JSON state management (`adw_state.json`) instead of in-memory repository
- **Compositional scripts**: Independent workflow scripts (plan, build, test, review, doc, ship) that can be run separately or together
## User Story
As a developer managing multiple concurrent features
I want to run multiple agent work orders in parallel with isolated environments
So that I can scale development velocity without conflicts or resource contention, while ensuring all code passes tests and review before deployment
## Problem Statement
The current agent-work-orders architecture has several critical limitations:
1. **No Parallelization**: GitBranchSandbox creates temporary clones that get cleaned up, preventing safe parallel execution of multiple work orders
2. **No Test Coverage**: Missing test workflow step - implementations are committed and PR'd without validation
3. **No Automated Test Resolution**: When tests fail, there's no retry/fix mechanism to automatically resolve failures
4. **No Review Phase**: No automated review of implementation against specifications with screenshot capture and blocker detection
5. **Centralized Orchestration**: Monolithic orchestrator makes it difficult to run individual phases (e.g., just test, just review) independently
6. **In-Memory State**: State management in WorkOrderRepository is not persistent across service restarts
7. **No Port Management**: No system for allocating unique ports for parallel instances
These limitations prevent scaling development workflows and ensuring code quality before PRs are created.
## Solution Statement
Implement a compositional workflow architecture inspired by the ADW (AI Developer Workflow) pattern with the following components: SEE EXAMPLES HERE: PRPs/examples/\* READ THESE
1. **GitWorktreeSandbox**: Replace GitBranchSandbox with worktree-based isolation that shares the same repo but has independent working directories
2. **Port Allocation System**: Deterministic port assignment (backend: 9100-9114, frontend: 9200-9214) based on work order ID hash
3. **File-Based State Management**: JSON state files for persistence and debugging
4. **Test Workflow Module**: New `test_workflow.py` with automatic resolution and retry logic (4 attempts)
5. **Review Workflow Module**: New `review_workflow.py` with screenshot capture, spec comparison, and blocker patching (3 attempts)
6. **Compositional Scripts**: Independent workflow operations that can be composed or run individually
7. **Enhanced WorkflowStep Enum**: Add TEST, RESOLVE_TEST, REVIEW, RESOLVE_REVIEW steps
8. **Resolution Commands**: New Claude commands `/resolve_failed_test` and `/resolve_failed_review` for AI-powered fixes
## Relevant Files
### Core Workflow Files
- `python/src/agent_work_orders/workflow_engine/workflow_orchestrator.py` - Main orchestrator that needs refactoring for compositional approach
- Currently: Monolithic execute_workflow with sequential steps
- Needs: Modular workflow composition with test/review phases
- `python/src/agent_work_orders/workflow_engine/workflow_operations.py` - Atomic workflow operations
- Currently: classify_issue, build_plan, implement_plan, create_commit, create_pull_request
- Needs: Add test_workflow, review_workflow, resolve_test, resolve_review operations
- `python/src/agent_work_orders/models.py` - Data models including WorkflowStep enum
- Currently: WorkflowStep has CLASSIFY, PLAN, IMPLEMENT, COMMIT, REVIEW, TEST, CREATE_PR
- Needs: Add RESOLVE_TEST, RESOLVE_REVIEW steps
### Sandbox Management Files
- `python/src/agent_work_orders/sandbox_manager/git_branch_sandbox.py` - Current temp clone implementation
- Problem: Creates temp dirs, no parallelization support
- Will be replaced by: GitWorktreeSandbox
- `python/src/agent_work_orders/sandbox_manager/sandbox_factory.py` - Factory for creating sandboxes
- Needs: Add GitWorktreeSandbox creation logic
- `python/src/agent_work_orders/sandbox_manager/sandbox_protocol.py` - Sandbox interface
- May need: Port allocation methods
### State Management Files
- `python/src/agent_work_orders/state_manager/work_order_repository.py` - Current in-memory state
- Currently: In-memory dictionary with async methods
- Needs: File-based JSON persistence option
- `python/src/agent_work_orders/config.py` - Configuration
- Needs: Port range configuration, worktree base directory
### Command Files
- `python/.claude/commands/agent-work-orders/test.md` - Currently just a hello world test
- Needs: Comprehensive test suite runner that returns JSON with failed tests
- `python/.claude/commands/agent-work-orders/implementor.md` - Implementation command
- May need: Context about test requirements
### New Files
#### Worktree Management
- `python/src/agent_work_orders/sandbox_manager/git_worktree_sandbox.py` - New worktree-based sandbox
- `python/src/agent_work_orders/utils/worktree_operations.py` - Worktree CRUD operations
- `python/src/agent_work_orders/utils/port_allocation.py` - Port management utilities
#### Test Workflow
- `python/src/agent_work_orders/workflow_engine/test_workflow.py` - Test execution with resolution
- `python/.claude/commands/agent-work-orders/test_runner.md` - Run test suite, return JSON
- `python/.claude/commands/agent-work-orders/resolve_failed_test.md` - Fix failed test given JSON
#### Review Workflow
- `python/src/agent_work_orders/workflow_engine/review_workflow.py` - Review with screenshot capture
- `python/.claude/commands/agent-work-orders/review_runner.md` - Run review against spec
- `python/.claude/commands/agent-work-orders/resolve_failed_review.md` - Patch blocker issues
- `python/.claude/commands/agent-work-orders/create_patch_plan.md` - Generate patch plan for issue
#### State Management
- `python/src/agent_work_orders/state_manager/file_state_repository.py` - JSON file-based state
- `python/src/agent_work_orders/models/workflow_state.py` - State data models
#### Documentation
- `docs/compositional-workflows.md` - Architecture documentation
- `docs/worktree-management.md` - Worktree operations guide
- `docs/test-resolution.md` - Test workflow documentation
- `docs/review-resolution.md` - Review workflow documentation
## Implementation Plan
### Phase 1: Foundation - Worktree Isolation and Port Allocation
Establish the core infrastructure for parallel execution through git worktrees and deterministic port allocation. This phase creates the foundation for all subsequent phases.
**Key Deliverables**:
- GitWorktreeSandbox implementation
- Port allocation system
- Worktree management utilities
- `.ports.env` file generation
- Updated sandbox factory
### Phase 2: File-Based State Management
Replace in-memory state repository with file-based JSON persistence for durability and debuggability across service restarts.
**Key Deliverables**:
- FileStateRepository implementation
- WorkflowState models
- State migration utilities
- JSON serialization/deserialization
- Backward compatibility layer
### Phase 3: Test Workflow with Resolution
Implement comprehensive test execution with automatic failure resolution and retry logic.
**Key Deliverables**:
- test_workflow.py module
- test_runner.md command (returns JSON array of test results)
- resolve_failed_test.md command (takes test JSON, fixes issue)
- Retry loop (max 4 attempts)
- Test result parsing and formatting
- Integration with orchestrator
### Phase 4: Review Workflow with Resolution
Add review phase with screenshot capture, spec comparison, and automatic blocker patching.
**Key Deliverables**:
- review_workflow.py module
- review_runner.md command (compares implementation vs spec)
- resolve_failed_review.md command (patches blocker issues)
- Screenshot capture integration
- Issue severity categorization (blocker/tech_debt/skippable)
- Retry loop (max 3 attempts)
- R2 upload integration (optional)
### Phase 5: Compositional Refactoring
Refactor the centralized orchestrator into composable workflow scripts that can be run independently.
**Key Deliverables**:
- Modular workflow composition
- Independent script execution
- Workflow step dependencies
- Enhanced error handling
- Workflow resumption support
## Step by Step Tasks
### Step 1: Create Worktree Sandbox Implementation
Create the core GitWorktreeSandbox class that manages git worktrees for isolated execution.
- Create `python/src/agent_work_orders/sandbox_manager/git_worktree_sandbox.py`
- Implement `GitWorktreeSandbox` class with:
- `__init__(repository_url, sandbox_identifier)` - Initialize with worktree path calculation
- `setup()` - Create worktree under `trees/<sandbox_identifier>/` from origin/main
- `cleanup()` - Remove worktree using `git worktree remove`
- `execute_command(command, timeout)` - Execute commands in worktree context
- `get_git_branch_name()` - Query current branch in worktree
- Handle existing worktree detection and validation
- Add logging for all worktree operations
- Write unit tests for GitWorktreeSandbox in `python/tests/agent_work_orders/sandbox_manager/test_git_worktree_sandbox.py`
### Step 2: Implement Port Allocation System
Create deterministic port allocation based on work order ID to enable parallel instances.
- Create `python/src/agent_work_orders/utils/port_allocation.py`
- Implement functions:
- `get_ports_for_work_order(work_order_id) -> Tuple[int, int]` - Calculate ports from ID hash (backend: 9100-9114, frontend: 9200-9214)
- `is_port_available(port: int) -> bool` - Check if port is bindable
- `find_next_available_ports(work_order_id, max_attempts=15) -> Tuple[int, int]` - Find available ports with offset
- `create_ports_env_file(worktree_path, backend_port, frontend_port)` - Generate `.ports.env` file
- Add port range configuration to `python/src/agent_work_orders/config.py`
- Write unit tests for port allocation in `python/tests/agent_work_orders/utils/test_port_allocation.py`
### Step 3: Create Worktree Management Utilities
Build helper utilities for worktree CRUD operations.
- Create `python/src/agent_work_orders/utils/worktree_operations.py`
- Implement functions:
- `create_worktree(work_order_id, branch_name, logger) -> Tuple[str, Optional[str]]` - Create worktree and return path or error
- `validate_worktree(work_order_id, state) -> Tuple[bool, Optional[str]]` - Three-way validation (state, filesystem, git)
- `get_worktree_path(work_order_id) -> str` - Calculate absolute worktree path
- `remove_worktree(work_order_id, logger) -> Tuple[bool, Optional[str]]` - Clean up worktree
- `setup_worktree_environment(worktree_path, backend_port, frontend_port, logger)` - Create .ports.env
- Handle git fetch operations before worktree creation
- Add comprehensive error handling and logging
- Write unit tests for worktree operations in `python/tests/agent_work_orders/utils/test_worktree_operations.py`
### Step 4: Update Sandbox Factory
Modify the sandbox factory to support creating GitWorktreeSandbox instances.
- Update `python/src/agent_work_orders/sandbox_manager/sandbox_factory.py`
- Add GIT_WORKTREE case to `create_sandbox()` method
- Integrate port allocation during sandbox creation
- Pass port configuration to GitWorktreeSandbox
- Update SandboxType enum in models.py to promote GIT_WORKTREE from placeholder
- Write integration tests for sandbox factory with worktrees
### Step 5: Implement File-Based State Repository
Create file-based state management for persistence and debugging.
- Create `python/src/agent_work_orders/state_manager/file_state_repository.py`
- Implement `FileStateRepository` class:
- `__init__(state_directory: str)` - Initialize with state directory path
- `save_state(work_order_id, state_data)` - Write JSON to `<state_dir>/<work_order_id>.json`
- `load_state(work_order_id) -> Optional[dict]` - Read JSON from file
- `list_states() -> List[str]` - List all work order IDs with state files
- `delete_state(work_order_id)` - Remove state file
- `update_status(work_order_id, status, **kwargs)` - Update specific fields
- `save_step_history(work_order_id, step_history)` - Persist step history
- Add state directory configuration to config.py
- Create state models in `python/src/agent_work_orders/models/workflow_state.py`
- Write unit tests for file state repository
### Step 6: Update WorkflowStep Enum
Add new workflow steps for test and review resolution.
- Update `python/src/agent_work_orders/models.py`
- Add to WorkflowStep enum:
- `RESOLVE_TEST = "resolve_test"` - Test failure resolution step
- `RESOLVE_REVIEW = "resolve_review"` - Review issue resolution step
- Update `StepHistory.get_current_step()` to include new steps in sequence:
- Updated sequence: CLASSIFY → PLAN → FIND_PLAN → GENERATE_BRANCH → IMPLEMENT → COMMIT → TEST → RESOLVE_TEST (if needed) → REVIEW → RESOLVE_REVIEW (if needed) → CREATE_PR
- Write unit tests for updated step sequence logic
### Step 7: Create Test Runner Command
Build Claude command to execute test suite and return structured JSON results.
- Update `python/.claude/commands/agent-work-orders/test_runner.md`
- Command should:
- Execute backend tests: `cd python && uv run pytest tests/ -v --tb=short`
- Execute frontend tests: `cd archon-ui-main && npm test`
- Parse test results from output
- Return JSON array with structure:
```json
[
{
"test_name": "string",
"test_file": "string",
"passed": boolean,
"error": "optional string",
"execution_command": "string"
}
]
```
- Include test purpose and reproduction command
- Sort failed tests first
- Handle timeout and command errors gracefully
- Test the command manually with sample repositories
### Step 8: Create Resolve Failed Test Command
Build Claude command to analyze and fix failed tests given test JSON.
- Create `python/.claude/commands/agent-work-orders/resolve_failed_test.md`
- Command takes single argument: test result JSON object
- Command should:
- Parse test failure information
- Analyze root cause of failure
- Read relevant test file and code under test
- Implement fix (code change or test update)
- Re-run the specific failed test to verify fix
- Report success/failure
- Include examples of common test failure patterns
- Add constraints (don't skip tests, maintain test coverage)
- Test the command with sample failed test JSONs
### Step 9: Implement Test Workflow Module
Create the test workflow module with automatic resolution and retry logic.
- Create `python/src/agent_work_orders/workflow_engine/test_workflow.py`
- Implement functions:
- `run_tests(executor, command_loader, work_order_id, working_dir) -> StepExecutionResult` - Execute test suite
- `parse_test_results(output, logger) -> Tuple[List[TestResult], int, int]` - Parse JSON output
- `resolve_failed_test(executor, command_loader, test_json, work_order_id, working_dir) -> StepExecutionResult` - Fix single test
- `run_tests_with_resolution(executor, command_loader, work_order_id, working_dir, max_attempts=4) -> Tuple[List[TestResult], int, int]` - Main retry loop
- Implement retry logic:
- Run tests, check for failures
- If failures exist and attempts < max_attempts: resolve each failed test
- Re-run tests after resolution
- Stop if all tests pass or max attempts reached
- Add TestResult model to models.py
- Write comprehensive unit tests for test workflow
### Step 10: Add Test Workflow Operation
Create atomic operation for test execution in workflow_operations.py.
- Update `python/src/agent_work_orders/workflow_engine/workflow_operations.py`
- Add function:
```python
async def execute_tests(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
work_order_id: str,
working_dir: str,
) -> StepExecutionResult
```
- Function should:
- Call `run_tests_with_resolution()` from test_workflow.py
- Return StepExecutionResult with test summary
- Include pass/fail counts in output
- Log detailed test results
- Add TESTER constant to agent_names.py
- Write unit tests for execute_tests operation
### Step 11: Integrate Test Phase in Orchestrator
Add test phase to workflow orchestrator between COMMIT and CREATE_PR steps.
- Update `python/src/agent_work_orders/workflow_engine/workflow_orchestrator.py`
- After commit step (line ~236), add:
```python
# Step 7: Run tests with resolution
test_result = await workflow_operations.execute_tests(
self.agent_executor,
self.command_loader,
agent_work_order_id,
sandbox.working_dir,
)
step_history.steps.append(test_result)
await self.state_repository.save_step_history(agent_work_order_id, step_history)
if not test_result.success:
raise WorkflowExecutionError(f"Tests failed: {test_result.error_message}")
bound_logger.info("step_completed", step="test")
```
- Update step numbering (PR creation becomes step 8)
- Add test failure handling strategy
- Write integration tests for full workflow with test phase
### Step 12: Create Review Runner Command
Build Claude command to review implementation against spec with screenshot capture.
- Create `python/.claude/commands/agent-work-orders/review_runner.md`
- Command takes arguments: spec_file_path, work_order_id
- Command should:
- Read specification from spec_file_path
- Analyze implementation in codebase
- Start application (if UI component)
- Capture screenshots of key UI flows
- Compare implementation against spec requirements
- Categorize issues by severity: "blocker" | "tech_debt" | "skippable"
- Return JSON with structure:
```json
{
"review_passed": boolean,
"review_issues": [
{
"issue_title": "string",
"issue_description": "string",
"issue_severity": "blocker|tech_debt|skippable",
"affected_files": ["string"],
"screenshots": ["string"]
}
],
"screenshots": ["string"]
}
```
- Include review criteria and severity definitions
- Test command with sample specifications
### Step 13: Create Resolve Failed Review Command
Build Claude command to patch blocker issues from review.
- Create `python/.claude/commands/agent-work-orders/resolve_failed_review.md`
- Command takes single argument: review issue JSON object
- Command should:
- Parse review issue details
- Create patch plan addressing the issue
- Implement the patch (code changes)
- Verify patch resolves the issue
- Report success/failure
- Include constraints (only fix blocker issues, maintain functionality)
- Add examples of common review issue patterns
- Test command with sample review issues
### Step 14: Implement Review Workflow Module
Create the review workflow module with automatic blocker patching.
- Create `python/src/agent_work_orders/workflow_engine/review_workflow.py`
- Implement functions:
- `run_review(executor, command_loader, spec_file, work_order_id, working_dir) -> ReviewResult` - Execute review
- `parse_review_results(output, logger) -> ReviewResult` - Parse JSON output
- `resolve_review_issue(executor, command_loader, issue_json, work_order_id, working_dir) -> StepExecutionResult` - Patch single issue
- `run_review_with_resolution(executor, command_loader, spec_file, work_order_id, working_dir, max_attempts=3) -> ReviewResult` - Main retry loop
- Implement retry logic:
- Run review, check for blocker issues
- If blockers exist and attempts < max_attempts: resolve each blocker
- Re-run review after patching
- Stop if no blockers or max attempts reached
- Allow tech_debt and skippable issues to pass
- Add ReviewResult and ReviewIssue models to models.py
- Write comprehensive unit tests for review workflow
### Step 15: Add Review Workflow Operation
Create atomic operation for review execution in workflow_operations.py.
- Update `python/src/agent_work_orders/workflow_engine/workflow_operations.py`
- Add function:
```python
async def execute_review(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
spec_file: str,
work_order_id: str,
working_dir: str,
) -> StepExecutionResult
```
- Function should:
- Call `run_review_with_resolution()` from review_workflow.py
- Return StepExecutionResult with review summary
- Include blocker count in output
- Log detailed review results
- Add REVIEWER constant to agent_names.py
- Write unit tests for execute_review operation
### Step 16: Integrate Review Phase in Orchestrator
Add review phase to workflow orchestrator between TEST and CREATE_PR steps.
- Update `python/src/agent_work_orders/workflow_engine/workflow_orchestrator.py`
- After test step, add:
```python
# Step 8: Run review with resolution
review_result = await workflow_operations.execute_review(
self.agent_executor,
self.command_loader,
plan_file or "",
agent_work_order_id,
sandbox.working_dir,
)
step_history.steps.append(review_result)
await self.state_repository.save_step_history(agent_work_order_id, step_history)
if not review_result.success:
raise WorkflowExecutionError(f"Review failed: {review_result.error_message}")
bound_logger.info("step_completed", step="review")
```
- Update step numbering (PR creation becomes step 9)
- Add review failure handling strategy
- Write integration tests for full workflow with review phase
### Step 17: Refactor Orchestrator for Composition
Refactor workflow orchestrator to support modular composition.
- Update `python/src/agent_work_orders/workflow_engine/workflow_orchestrator.py`
- Extract workflow phases into separate methods:
- `_execute_planning_phase()` - classify → plan → find_plan → generate_branch
- `_execute_implementation_phase()` - implement → commit
- `_execute_testing_phase()` - test → resolve_test (if needed)
- `_execute_review_phase()` - review → resolve_review (if needed)
- `_execute_deployment_phase()` - create_pr
- Update `execute_workflow()` to compose phases:
```python
await self._execute_planning_phase(...)
await self._execute_implementation_phase(...)
await self._execute_testing_phase(...)
await self._execute_review_phase(...)
await self._execute_deployment_phase(...)
```
- Add phase-level error handling and recovery
- Support skipping phases via configuration
- Write unit tests for each phase method
### Step 18: Add Configuration for New Features
Add configuration options for worktrees, ports, and new workflow phases.
- Update `python/src/agent_work_orders/config.py`
- Add configuration:
```python
# Worktree configuration
WORKTREE_BASE_DIR: str = os.getenv("WORKTREE_BASE_DIR", "trees")
# Port allocation
BACKEND_PORT_RANGE_START: int = int(os.getenv("BACKEND_PORT_START", "9100"))
BACKEND_PORT_RANGE_END: int = int(os.getenv("BACKEND_PORT_END", "9114"))
FRONTEND_PORT_RANGE_START: int = int(os.getenv("FRONTEND_PORT_START", "9200"))
FRONTEND_PORT_RANGE_END: int = int(os.getenv("FRONTEND_PORT_END", "9214"))
# Test workflow
MAX_TEST_RETRY_ATTEMPTS: int = int(os.getenv("MAX_TEST_RETRY_ATTEMPTS", "4"))
ENABLE_TEST_PHASE: bool = os.getenv("ENABLE_TEST_PHASE", "true").lower() == "true"
# Review workflow
MAX_REVIEW_RETRY_ATTEMPTS: int = int(os.getenv("MAX_REVIEW_RETRY_ATTEMPTS", "3"))
ENABLE_REVIEW_PHASE: bool = os.getenv("ENABLE_REVIEW_PHASE", "true").lower() == "true"
ENABLE_SCREENSHOT_CAPTURE: bool = os.getenv("ENABLE_SCREENSHOT_CAPTURE", "true").lower() == "true"
# State management
STATE_STORAGE_TYPE: str = os.getenv("STATE_STORAGE_TYPE", "memory") # "memory" or "file"
FILE_STATE_DIRECTORY: str = os.getenv("FILE_STATE_DIRECTORY", "agent-work-orders-state")
```
- Update `.env.example` with new configuration options
- Document configuration in README
### Step 19: Create Documentation
Document the new compositional architecture and workflows.
- Create `docs/compositional-workflows.md`:
- Architecture overview
- Compositional design principles
- Phase composition examples
- Error handling and recovery
- Configuration guide
- Create `docs/worktree-management.md`:
- Worktree vs temporary clone comparison
- Parallelization capabilities
- Port allocation system
- Cleanup and maintenance
- Create `docs/test-resolution.md`:
- Test workflow overview
- Retry logic explanation
- Test resolution examples
- Troubleshooting failed tests
- Create `docs/review-resolution.md`:
- Review workflow overview
- Screenshot capture setup
- Issue severity definitions
- Blocker patching process
- R2 upload configuration
### Step 20: Run Validation Commands
Execute all validation commands to ensure the feature works correctly with zero regressions.
- Run backend tests: `cd python && uv run pytest tests/agent_work_orders/ -v`
- Run backend linting: `cd python && uv run ruff check src/agent_work_orders/`
- Run type checking: `cd python && uv run mypy src/agent_work_orders/`
- Test worktree creation manually:
```bash
cd python
python -c "
from src.agent_work_orders.utils.worktree_operations import create_worktree
from src.agent_work_orders.utils.structured_logger import get_logger
logger = get_logger('test')
path, err = create_worktree('test-wo-123', 'test-branch', logger)
print(f'Path: {path}, Error: {err}')
"
```
- Test port allocation:
```bash
cd python
python -c "
from src.agent_work_orders.utils.port_allocation import get_ports_for_work_order
backend, frontend = get_ports_for_work_order('test-wo-123')
print(f'Backend: {backend}, Frontend: {frontend}')
"
```
- Create test work order with new workflow:
```bash
curl -X POST http://localhost:8181/agent-work-orders \
-H "Content-Type: application/json" \
-d '{
"repository_url": "https://github.com/your-test-repo",
"sandbox_type": "git_worktree",
"workflow_type": "agent_workflow_plan",
"user_request": "Add a new feature with tests"
}'
```
- Verify worktree created under `trees/<work_order_id>/`
- Verify `.ports.env` created in worktree
- Monitor workflow execution through all phases
- Verify test phase runs and resolves failures
- Verify review phase runs and patches blockers
- Verify PR created successfully
- Clean up test worktrees: `git worktree prune`
## Testing Strategy
### Unit Tests
**Worktree Management**:
- Test worktree creation with valid repository
- Test worktree creation with invalid branch
- Test worktree validation (three-way check)
- Test worktree cleanup
- Test handling of existing worktrees
**Port Allocation**:
- Test deterministic port assignment from work order ID
- Test port availability checking
- Test finding next available ports with collision
- Test port range boundaries (9100-9114, 9200-9214)
- Test `.ports.env` file generation
**Test Workflow**:
- Test parsing valid test result JSON
- Test parsing malformed test result JSON
- Test retry loop with all tests passing
- Test retry loop with some tests failing then passing
- Test retry loop reaching max attempts
- Test individual test resolution
**Review Workflow**:
- Test parsing valid review result JSON
- Test parsing malformed review result JSON
- Test retry loop with no blocker issues
- Test retry loop with blockers then resolved
- Test retry loop reaching max attempts
- Test issue severity filtering
**State Management**:
- Test saving state to JSON file
- Test loading state from JSON file
- Test updating specific state fields
- Test handling missing state files
- Test concurrent state access
### Integration Tests
**End-to-End Workflow**:
- Test complete workflow with worktree sandbox: classify → plan → implement → commit → test → review → PR
- Test test phase with intentional test failure and resolution
- Test review phase with intentional blocker issue and patching
- Test parallel execution of multiple work orders with different ports
- Test workflow resumption after failure
- Test cleanup of worktrees after completion
**Sandbox Integration**:
- Test command execution in worktree context
- Test git operations in worktree
- Test branch creation in worktree
- Test worktree isolation (parallel instances don't interfere)
**State Persistence**:
- Test state survives service restart (file-based)
- Test state migration from memory to file
- Test state corruption recovery
### Edge Cases
**Worktree Edge Cases**:
- Worktree already exists (should reuse or fail gracefully)
- Git repository unreachable (should fail setup)
- Insufficient disk space for worktree (should fail with clear error)
- Worktree removal fails (should log error and continue)
- Maximum worktrees reached (15 concurrent) - should queue or fail
**Port Allocation Edge Cases**:
- All ports in range occupied (should fail with error)
- Port becomes occupied between allocation and use (should retry)
- Invalid port range in configuration (should fail validation)
**Test Workflow Edge Cases**:
- Test command times out (should mark as failed)
- Test command returns invalid JSON (should fail gracefully)
- All tests fail and none can be resolved (should fail after max attempts)
- Test resolution introduces new failures (should continue with retry loop)
**Review Workflow Edge Cases**:
- Review command crashes (should fail gracefully)
- Screenshot capture fails (should continue review without screenshots)
- Review finds only skippable issues (should pass)
- Blocker patch introduces new blocker (should continue with retry loop)
- Spec file not found (should fail with clear error)
**State Management Edge Cases**:
- State file corrupted (should fail with recovery suggestion)
- State directory not writable (should fail with permission error)
- Concurrent access to same state file (should handle with locking or fail safely)
## Acceptance Criteria
- [ ] GitWorktreeSandbox successfully creates and manages worktrees under `trees/<work_order_id>/`
- [ ] Port allocation deterministically assigns unique ports (backend: 9100-9114, frontend: 9200-9214) based on work order ID
- [ ] Multiple work orders (at least 3) can run in parallel without port or filesystem conflicts
- [ ] `.ports.env` file is created in each worktree with correct port configuration
- [ ] Test workflow successfully runs test suite and returns structured JSON results
- [ ] Test workflow automatically resolves failed tests up to 4 attempts
- [ ] Test workflow stops retrying when all tests pass
- [ ] Review workflow successfully reviews implementation against spec
- [ ] Review workflow captures screenshots (when enabled)
- [ ] Review workflow categorizes issues by severity (blocker/tech_debt/skippable)
- [ ] Review workflow automatically patches blocker issues up to 3 attempts
- [ ] Review workflow allows tech_debt and skippable issues to pass
- [ ] WorkflowStep enum includes TEST, RESOLVE_TEST, REVIEW, RESOLVE_REVIEW steps
- [ ] Workflow orchestrator executes all phases: planning → implementation → testing → review → deployment
- [ ] File-based state repository persists state to JSON files
- [ ] State survives service restarts when using file-based storage
- [ ] Configuration supports enabling/disabling test and review phases
- [ ] All existing tests pass with zero regressions
- [ ] New unit tests achieve >80% code coverage for new modules
- [ ] Integration tests verify end-to-end workflow with parallel execution
- [ ] Documentation covers compositional architecture, worktrees, test resolution, and review resolution
- [ ] Cleanup of worktrees works correctly (git worktree remove + prune)
- [ ] Error messages are clear and actionable for all failure scenarios
## Validation Commands
Execute every command to validate the feature works correctly with zero regressions.
### Backend Tests
- `cd python && uv run pytest tests/agent_work_orders/ -v --tb=short` - Run all agent work orders tests
- `cd python && uv run pytest tests/agent_work_orders/sandbox_manager/ -v` - Test sandbox management
- `cd python && uv run pytest tests/agent_work_orders/workflow_engine/ -v` - Test workflow engine
- `cd python && uv run pytest tests/agent_work_orders/utils/ -v` - Test utilities
### Code Quality
- `cd python && uv run ruff check src/agent_work_orders/` - Check code quality
- `cd python && uv run mypy src/agent_work_orders/` - Type checking
### Manual Worktree Testing
```bash
# Test worktree creation
cd python
python -c "
from src.agent_work_orders.utils.worktree_operations import create_worktree, validate_worktree, remove_worktree
from src.agent_work_orders.utils.structured_logger import get_logger
logger = get_logger('test')
# Create worktree
path, err = create_worktree('test-wo-123', 'test-branch', logger)
print(f'Created worktree at: {path}')
assert err is None, f'Error: {err}'
# Validate worktree
from src.agent_work_orders.state_manager.file_state_repository import FileStateRepository
state_repo = FileStateRepository('test-state')
state_data = {'worktree_path': path}
valid, err = validate_worktree('test-wo-123', state_data)
assert valid, f'Validation failed: {err}'
# Remove worktree
success, err = remove_worktree('test-wo-123', logger)
assert success, f'Removal failed: {err}'
print('Worktree lifecycle test passed!')
"
```
### Manual Port Allocation Testing
```bash
cd python
python -c "
from src.agent_work_orders.utils.port_allocation import get_ports_for_work_order, find_next_available_ports, is_port_available
backend, frontend = get_ports_for_work_order('test-wo-123')
print(f'Ports for test-wo-123: Backend={backend}, Frontend={frontend}')
assert 9100 <= backend <= 9114, f'Backend port out of range: {backend}'
assert 9200 <= frontend <= 9214, f'Frontend port out of range: {frontend}'
# Test availability check
available = is_port_available(backend)
print(f'Backend port {backend} available: {available}')
# Test finding next available
next_backend, next_frontend = find_next_available_ports('test-wo-456')
print(f'Next available ports: Backend={next_backend}, Frontend={next_frontend}')
print('Port allocation test passed!')
"
```
### Integration Testing
```bash
# Start agent work orders service
docker compose up -d archon-server
# Create work order with worktree sandbox
curl -X POST http://localhost:8181/agent-work-orders \
-H "Content-Type: application/json" \
-d '{
"repository_url": "https://github.com/coleam00/archon",
"sandbox_type": "git_worktree",
"workflow_type": "agent_workflow_plan",
"user_request": "Fix issue #123"
}'
# Verify worktree created
ls -la trees/
# Monitor workflow progress
watch -n 2 'curl -s http://localhost:8181/agent-work-orders | jq'
# Verify .ports.env in worktree
cat trees/<work_order_id>/.ports.env
# After completion, verify cleanup
git worktree list
```
### Parallel Execution Testing
```bash
# Create 3 work orders simultaneously
for i in 1 2 3; do
curl -X POST http://localhost:8181/agent-work-orders \
-H "Content-Type: application/json" \
-d "{
\"repository_url\": \"https://github.com/coleam00/archon\",
\"sandbox_type\": \"git_worktree\",
\"workflow_type\": \"agent_workflow_plan\",
\"user_request\": \"Parallel test $i\"
}" &
done
wait
# Verify all worktrees exist
ls -la trees/
# Verify different ports allocated
for dir in trees/*/; do
echo "Worktree: $dir"
cat "$dir/.ports.env"
echo "---"
done
```
## Notes
### Architecture Decision: Compositional vs Centralized
This feature implements Option B (compositional refactoring) because:
1. **Scalability**: Compositional design enables running individual phases (e.g., just test or just review) without full workflow
2. **Debugging**: Independent scripts are easier to test and debug in isolation
3. **Flexibility**: Users can compose custom workflows (e.g., skip review for simple PRs)
4. **Maintainability**: Smaller, focused modules are easier to maintain than monolithic orchestrator
5. **Parallelization**: Worktree-based approach inherently supports compositional execution
### Performance Considerations
- **Worktree Creation**: Worktrees are faster than clones (~2-3x) because they share the same .git directory
- **Port Allocation**: Hash-based allocation is deterministic but may have collisions; fallback to linear search adds minimal overhead
- **Retry Loops**: Test (4 attempts) and review (3 attempts) retry limits prevent infinite loops while allowing reasonable resolution attempts
- **State I/O**: File-based state adds disk I/O but enables persistence; consider eventual move to database for high-volume deployments
### Future Enhancements
1. **Database State**: Replace file-based state with PostgreSQL/Supabase for better concurrent access and querying
2. **WebSocket Updates**: Stream test/review progress to UI in real-time
3. **Screenshot Upload**: Integrate R2/S3 for screenshot storage and PR comments with images
4. **Workflow Resumption**: Support resuming failed workflows from last successful step
5. **Custom Workflows**: Allow users to define custom workflow compositions via config
6. **Metrics**: Add OpenTelemetry instrumentation for workflow performance monitoring
7. **E2E Testing**: Add Playwright/Cypress integration for UI-focused review
8. **Distributed Execution**: Support running work orders across multiple machines
### Migration Path
For existing deployments:
1. **Backward Compatibility**: Keep GitBranchSandbox working alongside GitWorktreeSandbox
2. **Gradual Migration**: Default to GIT_BRANCH, opt-in to GIT_WORKTREE via configuration
3. **State Migration**: Provide utility to migrate in-memory state to file-based state
4. **Cleanup**: Add command to clean up old temporary clones: `rm -rf /tmp/agent-work-orders/*`
### Dependencies
New dependencies to add via `uv add`:
- (None required - uses existing git, pytest, claude CLI)
### Related Issues/PRs
- #XXX - Original agent-work-orders MVP implementation
- #XXX - Worktree isolation discussion
- #XXX - Test phase feature request
- #XXX - Review automation proposal

View File

@@ -25,7 +25,7 @@ from ..models import (
StepHistory, StepHistory,
) )
from ..sandbox_manager.sandbox_factory import SandboxFactory from ..sandbox_manager.sandbox_factory import SandboxFactory
from ..state_manager.work_order_repository import WorkOrderRepository from ..state_manager.repository_factory import create_repository
from ..utils.id_generator import generate_work_order_id from ..utils.id_generator import generate_work_order_id
from ..utils.structured_logger import get_logger from ..utils.structured_logger import get_logger
from ..workflow_engine.workflow_orchestrator import WorkflowOrchestrator from ..workflow_engine.workflow_orchestrator import WorkflowOrchestrator
@@ -35,7 +35,7 @@ logger = get_logger(__name__)
router = APIRouter() router = APIRouter()
# Initialize dependencies (singletons for MVP) # Initialize dependencies (singletons for MVP)
state_repository = WorkOrderRepository() state_repository = create_repository()
agent_executor = AgentCLIExecutor() agent_executor = AgentCLIExecutor()
sandbox_factory = SandboxFactory() sandbox_factory = SandboxFactory()
github_client = GitHubClient() github_client = GitHubClient()

View File

@@ -49,6 +49,28 @@ class AgentWorkOrdersConfig:
ENABLE_PROMPT_LOGGING: bool = os.getenv("ENABLE_PROMPT_LOGGING", "true").lower() == "true" ENABLE_PROMPT_LOGGING: bool = os.getenv("ENABLE_PROMPT_LOGGING", "true").lower() == "true"
ENABLE_OUTPUT_ARTIFACTS: bool = os.getenv("ENABLE_OUTPUT_ARTIFACTS", "true").lower() == "true" ENABLE_OUTPUT_ARTIFACTS: bool = os.getenv("ENABLE_OUTPUT_ARTIFACTS", "true").lower() == "true"
# Worktree configuration
WORKTREE_BASE_DIR: str = os.getenv("WORKTREE_BASE_DIR", "trees")
# Port allocation for parallel execution
BACKEND_PORT_RANGE_START: int = int(os.getenv("BACKEND_PORT_START", "9100"))
BACKEND_PORT_RANGE_END: int = int(os.getenv("BACKEND_PORT_END", "9114"))
FRONTEND_PORT_RANGE_START: int = int(os.getenv("FRONTEND_PORT_START", "9200"))
FRONTEND_PORT_RANGE_END: int = int(os.getenv("FRONTEND_PORT_END", "9214"))
# Test workflow configuration
MAX_TEST_RETRY_ATTEMPTS: int = int(os.getenv("MAX_TEST_RETRY_ATTEMPTS", "4"))
ENABLE_TEST_PHASE: bool = os.getenv("ENABLE_TEST_PHASE", "true").lower() == "true"
# Review workflow configuration
MAX_REVIEW_RETRY_ATTEMPTS: int = int(os.getenv("MAX_REVIEW_RETRY_ATTEMPTS", "3"))
ENABLE_REVIEW_PHASE: bool = os.getenv("ENABLE_REVIEW_PHASE", "true").lower() == "true"
ENABLE_SCREENSHOT_CAPTURE: bool = os.getenv("ENABLE_SCREENSHOT_CAPTURE", "true").lower() == "true"
# State management configuration
STATE_STORAGE_TYPE: str = os.getenv("STATE_STORAGE_TYPE", "memory") # "memory" or "file"
FILE_STATE_DIRECTORY: str = os.getenv("FILE_STATE_DIRECTORY", "agent-work-orders-state")
@classmethod @classmethod
def ensure_temp_dir(cls) -> Path: def ensure_temp_dir(cls) -> Path:
"""Ensure temp directory exists and return Path""" """Ensure temp directory exists and return Path"""

View File

@@ -49,8 +49,10 @@ class WorkflowStep(str, Enum):
IMPLEMENT = "implement" IMPLEMENT = "implement"
GENERATE_BRANCH = "generate_branch" GENERATE_BRANCH = "generate_branch"
COMMIT = "commit" COMMIT = "commit"
REVIEW = "review"
TEST = "test" TEST = "test"
RESOLVE_TEST = "resolve_test"
REVIEW = "review"
RESOLVE_REVIEW = "resolve_review"
CREATE_PR = "create_pr" CREATE_PR = "create_pr"
@@ -232,6 +234,8 @@ class StepHistory(BaseModel):
WorkflowStep.GENERATE_BRANCH, WorkflowStep.GENERATE_BRANCH,
WorkflowStep.IMPLEMENT, WorkflowStep.IMPLEMENT,
WorkflowStep.COMMIT, WorkflowStep.COMMIT,
WorkflowStep.TEST,
WorkflowStep.REVIEW,
WorkflowStep.CREATE_PR, WorkflowStep.CREATE_PR,
] ]

View File

@@ -0,0 +1,215 @@
"""Git Worktree Sandbox Implementation
Provides isolated execution environment using git worktrees.
Enables parallel execution of multiple work orders without conflicts.
"""
import asyncio
import time
from ..models import CommandExecutionResult, SandboxSetupError
from ..utils.git_operations import get_current_branch
from ..utils.port_allocation import find_next_available_ports
from ..utils.structured_logger import get_logger
from ..utils.worktree_operations import (
create_worktree,
get_worktree_path,
remove_worktree,
setup_worktree_environment,
)
logger = get_logger(__name__)
class GitWorktreeSandbox:
"""Git worktree-based sandbox implementation
Creates a git worktree under trees/<work_order_id>/ where the agent
executes workflows. Enables parallel execution with isolated environments
and deterministic port allocation.
"""
def __init__(self, repository_url: str, sandbox_identifier: str):
self.repository_url = repository_url
self.sandbox_identifier = sandbox_identifier
self.working_dir = get_worktree_path(repository_url, sandbox_identifier)
self.backend_port: int | None = None
self.frontend_port: int | None = None
self._logger = logger.bind(
sandbox_identifier=sandbox_identifier,
repository_url=repository_url,
)
async def setup(self) -> None:
"""Create worktree and set up isolated environment
Creates worktree from origin/main and allocates unique ports.
"""
self._logger.info("worktree_sandbox_setup_started")
try:
# Allocate ports deterministically
self.backend_port, self.frontend_port = find_next_available_ports(
self.sandbox_identifier
)
self._logger.info(
"ports_allocated",
backend_port=self.backend_port,
frontend_port=self.frontend_port,
)
# Create worktree with temporary branch name
# Agent will create the actual feature branch during execution
temp_branch = f"wo-{self.sandbox_identifier}"
worktree_path, error = create_worktree(
self.repository_url,
self.sandbox_identifier,
temp_branch,
self._logger
)
if error or not worktree_path:
raise SandboxSetupError(f"Failed to create worktree: {error}")
# Set up environment with port configuration
setup_worktree_environment(
worktree_path,
self.backend_port,
self.frontend_port,
self._logger
)
self._logger.info(
"worktree_sandbox_setup_completed",
working_dir=self.working_dir,
backend_port=self.backend_port,
frontend_port=self.frontend_port,
)
except Exception as e:
self._logger.error(
"worktree_sandbox_setup_failed",
error=str(e),
exc_info=True
)
raise SandboxSetupError(f"Worktree sandbox setup failed: {e}") from e
async def execute_command(
self, command: str, timeout: int = 300
) -> CommandExecutionResult:
"""Execute command in the worktree directory
Args:
command: Shell command to execute
timeout: Timeout in seconds
Returns:
CommandExecutionResult
"""
self._logger.info("command_execution_started", command=command)
start_time = time.time()
try:
process = await asyncio.create_subprocess_shell(
command,
cwd=self.working_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=timeout
)
except TimeoutError:
process.kill()
await process.wait()
duration = time.time() - start_time
self._logger.error(
"command_execution_timeout", command=command, timeout=timeout
)
return CommandExecutionResult(
success=False,
stdout=None,
stderr=None,
exit_code=-1,
error_message=f"Command timed out after {timeout}s",
duration_seconds=duration,
)
duration = time.time() - start_time
success = process.returncode == 0
result = CommandExecutionResult(
success=success,
stdout=stdout.decode() if stdout else None,
stderr=stderr.decode() if stderr else None,
exit_code=process.returncode or 0,
error_message=None if success else stderr.decode() if stderr else "Command failed",
duration_seconds=duration,
)
if success:
self._logger.info(
"command_execution_completed", command=command, duration=duration
)
else:
self._logger.error(
"command_execution_failed",
command=command,
exit_code=process.returncode,
duration=duration,
)
return result
except Exception as e:
duration = time.time() - start_time
self._logger.error(
"command_execution_error", command=command, error=str(e), exc_info=True
)
return CommandExecutionResult(
success=False,
stdout=None,
stderr=None,
exit_code=-1,
error_message=str(e),
duration_seconds=duration,
)
async def get_git_branch_name(self) -> str | None:
"""Get current git branch name in worktree
Returns:
Current branch name or None
"""
try:
return await get_current_branch(self.working_dir)
except Exception as e:
self._logger.error("git_branch_query_failed", error=str(e))
return None
async def cleanup(self) -> None:
"""Remove worktree"""
self._logger.info("worktree_sandbox_cleanup_started")
try:
success, error = remove_worktree(
self.repository_url,
self.sandbox_identifier,
self._logger
)
if success:
self._logger.info("worktree_sandbox_cleanup_completed")
else:
self._logger.error(
"worktree_sandbox_cleanup_failed",
error=error
)
except Exception as e:
self._logger.error(
"worktree_sandbox_cleanup_failed",
error=str(e),
exc_info=True
)

View File

@@ -5,6 +5,7 @@ Creates appropriate sandbox instances based on sandbox type.
from ..models import SandboxType from ..models import SandboxType
from .git_branch_sandbox import GitBranchSandbox from .git_branch_sandbox import GitBranchSandbox
from .git_worktree_sandbox import GitWorktreeSandbox
from .sandbox_protocol import AgentSandbox from .sandbox_protocol import AgentSandbox
@@ -33,7 +34,7 @@ class SandboxFactory:
if sandbox_type == SandboxType.GIT_BRANCH: if sandbox_type == SandboxType.GIT_BRANCH:
return GitBranchSandbox(repository_url, sandbox_identifier) return GitBranchSandbox(repository_url, sandbox_identifier)
elif sandbox_type == SandboxType.GIT_WORKTREE: elif sandbox_type == SandboxType.GIT_WORKTREE:
raise NotImplementedError("Git worktree sandbox not implemented (Phase 2+)") return GitWorktreeSandbox(repository_url, sandbox_identifier)
elif sandbox_type == SandboxType.E2B: elif sandbox_type == SandboxType.E2B:
raise NotImplementedError("E2B sandbox not implemented (Phase 2+)") raise NotImplementedError("E2B sandbox not implemented (Phase 2+)")
elif sandbox_type == SandboxType.DAGGER: elif sandbox_type == SandboxType.DAGGER:

View File

@@ -1,4 +1,15 @@
"""State Manager Module """State Manager Module
Manages agent work order state (in-memory for MVP). Manages agent work order state with pluggable storage backends.
Supports both in-memory (development) and file-based (production) storage.
""" """
from .file_state_repository import FileStateRepository
from .repository_factory import create_repository
from .work_order_repository import WorkOrderRepository
__all__ = [
"WorkOrderRepository",
"FileStateRepository",
"create_repository",
]

View File

@@ -0,0 +1,343 @@
"""File-based Work Order Repository
Provides persistent JSON-based storage for agent work orders.
Enables state persistence across service restarts and debugging.
"""
import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from ..models import AgentWorkOrderState, AgentWorkOrderStatus, StepHistory
from ..utils.structured_logger import get_logger
if TYPE_CHECKING:
import structlog
logger = get_logger(__name__)
class FileStateRepository:
"""File-based repository for work order state
Stores state as JSON files in <state_directory>/<work_order_id>.json
Each file contains: state, metadata, and step_history
"""
def __init__(self, state_directory: str):
self.state_directory = Path(state_directory)
self.state_directory.mkdir(parents=True, exist_ok=True)
self._lock = asyncio.Lock()
self._logger: structlog.stdlib.BoundLogger = logger.bind(
state_directory=str(self.state_directory)
)
self._logger.info("file_state_repository_initialized")
def _get_state_file_path(self, agent_work_order_id: str) -> Path:
"""Get path to state file for work order
Args:
agent_work_order_id: Work order ID
Returns:
Path to state file
"""
return self.state_directory / f"{agent_work_order_id}.json"
def _serialize_datetime(self, obj):
"""JSON serializer for datetime objects
Args:
obj: Object to serialize
Returns:
ISO format string for datetime objects
"""
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
async def _read_state_file(self, agent_work_order_id: str) -> dict[str, Any] | None:
"""Read state file
Args:
agent_work_order_id: Work order ID
Returns:
State dictionary or None if file doesn't exist
"""
state_file = self._get_state_file_path(agent_work_order_id)
if not state_file.exists():
return None
try:
with state_file.open("r") as f:
data = json.load(f)
return cast(dict[str, Any], data)
except Exception as e:
self._logger.error(
"state_file_read_failed",
agent_work_order_id=agent_work_order_id,
error=str(e),
exc_info=True
)
return None
async def _write_state_file(self, agent_work_order_id: str, data: dict[str, Any]) -> None:
"""Write state file
Args:
agent_work_order_id: Work order ID
data: State dictionary to write
"""
state_file = self._get_state_file_path(agent_work_order_id)
try:
with state_file.open("w") as f:
json.dump(data, f, indent=2, default=self._serialize_datetime)
except Exception as e:
self._logger.error(
"state_file_write_failed",
agent_work_order_id=agent_work_order_id,
error=str(e),
exc_info=True
)
raise
async def create(self, work_order: AgentWorkOrderState, metadata: dict[str, Any]) -> None:
"""Create a new work order
Args:
work_order: Core work order state
metadata: Additional metadata (status, workflow_type, etc.)
"""
async with self._lock:
data = {
"state": work_order.model_dump(mode="json"),
"metadata": metadata,
"step_history": None
}
await self._write_state_file(work_order.agent_work_order_id, data)
self._logger.info(
"work_order_created",
agent_work_order_id=work_order.agent_work_order_id,
)
async def get(self, agent_work_order_id: str) -> tuple[AgentWorkOrderState, dict[str, Any]] | None:
"""Get a work order by ID
Args:
agent_work_order_id: Work order ID
Returns:
Tuple of (state, metadata) or None if not found
"""
async with self._lock:
data = await self._read_state_file(agent_work_order_id)
if not data:
return None
state = AgentWorkOrderState(**data["state"])
metadata = data["metadata"]
return (state, metadata)
async def list(self, status_filter: AgentWorkOrderStatus | None = None) -> list[tuple[AgentWorkOrderState, dict[str, Any]]]:
"""List all work orders
Args:
status_filter: Optional status to filter by
Returns:
List of (state, metadata) tuples
"""
async with self._lock:
results = []
# Iterate over all JSON files in state directory
for state_file in self.state_directory.glob("*.json"):
try:
with state_file.open("r") as f:
data = json.load(f)
state = AgentWorkOrderState(**data["state"])
metadata = data["metadata"]
if status_filter is None or metadata.get("status") == status_filter:
results.append((state, metadata))
except Exception as e:
self._logger.error(
"state_file_load_failed",
file=str(state_file),
error=str(e)
)
continue
return results
async def update_status(
self,
agent_work_order_id: str,
status: AgentWorkOrderStatus,
**kwargs,
) -> None:
"""Update work order status and other fields
Args:
agent_work_order_id: Work order ID
status: New status
**kwargs: Additional fields to update
"""
async with self._lock:
data = await self._read_state_file(agent_work_order_id)
if not data:
self._logger.warning(
"work_order_not_found_for_update",
agent_work_order_id=agent_work_order_id
)
return
data["metadata"]["status"] = status
data["metadata"]["updated_at"] = datetime.now().isoformat()
for key, value in kwargs.items():
data["metadata"][key] = value
await self._write_state_file(agent_work_order_id, data)
self._logger.info(
"work_order_status_updated",
agent_work_order_id=agent_work_order_id,
status=status.value,
)
async def update_git_branch(
self, agent_work_order_id: str, git_branch_name: str
) -> None:
"""Update git branch name in state
Args:
agent_work_order_id: Work order ID
git_branch_name: Git branch name
"""
async with self._lock:
data = await self._read_state_file(agent_work_order_id)
if not data:
self._logger.warning(
"work_order_not_found_for_update",
agent_work_order_id=agent_work_order_id
)
return
data["state"]["git_branch_name"] = git_branch_name
data["metadata"]["updated_at"] = datetime.now().isoformat()
await self._write_state_file(agent_work_order_id, data)
self._logger.info(
"work_order_git_branch_updated",
agent_work_order_id=agent_work_order_id,
git_branch_name=git_branch_name,
)
async def update_session_id(
self, agent_work_order_id: str, agent_session_id: str
) -> None:
"""Update agent session ID in state
Args:
agent_work_order_id: Work order ID
agent_session_id: Claude CLI session ID
"""
async with self._lock:
data = await self._read_state_file(agent_work_order_id)
if not data:
self._logger.warning(
"work_order_not_found_for_update",
agent_work_order_id=agent_work_order_id
)
return
data["state"]["agent_session_id"] = agent_session_id
data["metadata"]["updated_at"] = datetime.now().isoformat()
await self._write_state_file(agent_work_order_id, data)
self._logger.info(
"work_order_session_id_updated",
agent_work_order_id=agent_work_order_id,
agent_session_id=agent_session_id,
)
async def save_step_history(
self, agent_work_order_id: str, step_history: StepHistory
) -> None:
"""Save step execution history
Args:
agent_work_order_id: Work order ID
step_history: Step execution history
"""
async with self._lock:
data = await self._read_state_file(agent_work_order_id)
if not data:
# Create minimal state if doesn't exist
data = {
"state": {"agent_work_order_id": agent_work_order_id},
"metadata": {},
"step_history": None
}
data["step_history"] = step_history.model_dump(mode="json")
await self._write_state_file(agent_work_order_id, data)
self._logger.info(
"step_history_saved",
agent_work_order_id=agent_work_order_id,
step_count=len(step_history.steps),
)
async def get_step_history(self, agent_work_order_id: str) -> StepHistory | None:
"""Get step execution history
Args:
agent_work_order_id: Work order ID
Returns:
Step history or None if not found
"""
async with self._lock:
data = await self._read_state_file(agent_work_order_id)
if not data or not data.get("step_history"):
return None
return StepHistory(**data["step_history"])
async def delete(self, agent_work_order_id: str) -> None:
"""Delete a work order state file
Args:
agent_work_order_id: Work order ID
"""
async with self._lock:
state_file = self._get_state_file_path(agent_work_order_id)
if state_file.exists():
state_file.unlink()
self._logger.info(
"work_order_deleted",
agent_work_order_id=agent_work_order_id
)
def list_state_ids(self) -> "list[str]": # type: ignore[valid-type]
"""List all work order IDs with state files
Returns:
List of work order IDs
"""
return [f.stem for f in self.state_directory.glob("*.json")]

View File

@@ -0,0 +1,43 @@
"""Repository Factory
Creates appropriate repository instances based on configuration.
Supports both in-memory (for development/testing) and file-based (for production) storage.
"""
from ..config import config
from ..utils.structured_logger import get_logger
from .file_state_repository import FileStateRepository
from .work_order_repository import WorkOrderRepository
logger = get_logger(__name__)
def create_repository() -> WorkOrderRepository | FileStateRepository:
"""Create a work order repository based on configuration
Returns:
Repository instance (either in-memory or file-based)
"""
storage_type = config.STATE_STORAGE_TYPE.lower()
if storage_type == "file":
state_dir = config.FILE_STATE_DIRECTORY
logger.info(
"repository_created",
storage_type="file",
state_directory=state_dir
)
return FileStateRepository(state_dir)
elif storage_type == "memory":
logger.info(
"repository_created",
storage_type="memory"
)
return WorkOrderRepository()
else:
logger.warning(
"unknown_storage_type",
storage_type=storage_type,
fallback="memory"
)
return WorkOrderRepository()

View File

@@ -0,0 +1,94 @@
"""Port allocation utilities for isolated agent work order execution.
Provides deterministic port allocation (backend: 9100-9114, frontend: 9200-9214)
based on work order ID to enable parallel execution without port conflicts.
"""
import os
import socket
def get_ports_for_work_order(work_order_id: str) -> tuple[int, int]:
"""Deterministically assign ports based on work order ID.
Args:
work_order_id: The work order identifier
Returns:
Tuple of (backend_port, frontend_port)
"""
# Convert first 8 chars of work order ID to index (0-14)
# Using base 36 conversion and modulo for consistent mapping
try:
# Take first 8 alphanumeric chars and convert from base 36
id_chars = ''.join(c for c in work_order_id[:8] if c.isalnum())
index = int(id_chars, 36) % 15
except ValueError:
# Fallback to simple hash if conversion fails
index = hash(work_order_id) % 15
backend_port = 9100 + index
frontend_port = 9200 + index
return backend_port, frontend_port
def is_port_available(port: int) -> bool:
"""Check if a port is available for binding.
Args:
port: Port number to check
Returns:
True if port is available, False otherwise
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
s.bind(('localhost', port))
return True
except OSError:
return False
def find_next_available_ports(work_order_id: str, max_attempts: int = 15) -> tuple[int, int]:
"""Find available ports starting from deterministic assignment.
Args:
work_order_id: The work order ID
max_attempts: Maximum number of attempts (default 15)
Returns:
Tuple of (backend_port, frontend_port)
Raises:
RuntimeError: If no available ports found
"""
base_backend, base_frontend = get_ports_for_work_order(work_order_id)
base_index = base_backend - 9100
for offset in range(max_attempts):
index = (base_index + offset) % 15
backend_port = 9100 + index
frontend_port = 9200 + index
if is_port_available(backend_port) and is_port_available(frontend_port):
return backend_port, frontend_port
raise RuntimeError("No available ports in the allocated range")
def create_ports_env_file(worktree_path: str, backend_port: int, frontend_port: int) -> None:
"""Create .ports.env file in worktree with port configuration.
Args:
worktree_path: Path to the worktree
backend_port: Backend port number
frontend_port: Frontend port number
"""
ports_env_path = os.path.join(worktree_path, ".ports.env")
with open(ports_env_path, "w") as f:
f.write(f"BACKEND_PORT={backend_port}\n")
f.write(f"FRONTEND_PORT={frontend_port}\n")
f.write(f"VITE_BACKEND_URL=http://localhost:{backend_port}\n")

View File

@@ -0,0 +1,285 @@
"""Worktree management operations for isolated agent work order execution.
Provides utilities for creating and managing git worktrees under trees/<work_order_id>/
to enable parallel execution in isolated environments.
"""
import hashlib
import os
import shutil
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING, Any
from ..config import config
from .port_allocation import create_ports_env_file
if TYPE_CHECKING:
import structlog
def _get_repo_hash(repository_url: str) -> str:
"""Get a short hash for repository URL.
Args:
repository_url: Git repository URL
Returns:
8-character hash of the repository URL
"""
return hashlib.sha256(repository_url.encode()).hexdigest()[:8]
def get_base_repo_path(repository_url: str) -> str:
"""Get path to base repository clone.
Args:
repository_url: Git repository URL
Returns:
Absolute path to base repository directory
"""
repo_hash = _get_repo_hash(repository_url)
base_path = config.ensure_temp_dir() / "repos" / repo_hash / "main"
return str(base_path)
def get_worktree_path(repository_url: str, work_order_id: str) -> str:
"""Get absolute path to worktree.
Args:
repository_url: Git repository URL
work_order_id: The work order ID
Returns:
Absolute path to worktree directory
"""
repo_hash = _get_repo_hash(repository_url)
worktree_path = config.ensure_temp_dir() / "repos" / repo_hash / "trees" / work_order_id
return str(worktree_path)
def ensure_base_repository(repository_url: str, logger: "structlog.stdlib.BoundLogger") -> tuple[str | None, str | None]:
"""Ensure base repository clone exists.
Args:
repository_url: Git repository URL to clone
logger: Logger instance
Returns:
Tuple of (base_repo_path, error_message)
"""
base_repo_path = get_base_repo_path(repository_url)
# If base repo already exists, just fetch latest
if os.path.exists(base_repo_path):
logger.info(f"Base repository exists at {base_repo_path}, fetching latest")
fetch_result = subprocess.run(
["git", "fetch", "origin"],
capture_output=True,
text=True,
cwd=base_repo_path
)
if fetch_result.returncode != 0:
logger.warning(f"Failed to fetch from origin: {fetch_result.stderr}")
return base_repo_path, None
# Create parent directory
Path(base_repo_path).parent.mkdir(parents=True, exist_ok=True)
# Clone the repository
logger.info(f"Cloning base repository from {repository_url} to {base_repo_path}")
clone_result = subprocess.run(
["git", "clone", repository_url, base_repo_path],
capture_output=True,
text=True
)
if clone_result.returncode != 0:
error_msg = f"Failed to clone repository: {clone_result.stderr}"
logger.error(error_msg)
return None, error_msg
logger.info(f"Created base repository at {base_repo_path}")
return base_repo_path, None
def create_worktree(
repository_url: str,
work_order_id: str,
branch_name: str,
logger: "structlog.stdlib.BoundLogger"
) -> tuple[str | None, str | None]:
"""Create a git worktree for isolated execution.
Args:
repository_url: Git repository URL
work_order_id: The work order ID for this worktree
branch_name: The branch name to create the worktree from
logger: Logger instance
Returns:
Tuple of (worktree_path, error_message)
worktree_path is the absolute path if successful, None if error
"""
# Ensure base repository exists
base_repo_path, error = ensure_base_repository(repository_url, logger)
if error or not base_repo_path:
return None, error
# Construct worktree path
worktree_path = get_worktree_path(repository_url, work_order_id)
# Check if worktree already exists
if os.path.exists(worktree_path):
logger.warning(f"Worktree already exists at {worktree_path}")
return worktree_path, None
# Create parent directory for worktrees
Path(worktree_path).parent.mkdir(parents=True, exist_ok=True)
# Fetch latest changes from origin
logger.info("Fetching latest changes from origin")
fetch_result = subprocess.run(
["git", "fetch", "origin"],
capture_output=True,
text=True,
cwd=base_repo_path
)
if fetch_result.returncode != 0:
logger.warning(f"Failed to fetch from origin: {fetch_result.stderr}")
# Create the worktree using git, branching from origin/main
# Use -b to create the branch as part of worktree creation
cmd = ["git", "worktree", "add", "-b", branch_name, worktree_path, "origin/main"]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=base_repo_path)
if result.returncode != 0:
# If branch already exists, try without -b
if "already exists" in result.stderr:
cmd = ["git", "worktree", "add", worktree_path, branch_name]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=base_repo_path)
if result.returncode != 0:
error_msg = f"Failed to create worktree: {result.stderr}"
logger.error(error_msg)
return None, error_msg
logger.info(f"Created worktree at {worktree_path} for branch {branch_name}")
return worktree_path, None
def validate_worktree(
repository_url: str,
work_order_id: str,
state: dict[str, Any]
) -> tuple[bool, str | None]:
"""Validate worktree exists in state, filesystem, and git.
Performs three-way validation to ensure consistency:
1. State has worktree_path
2. Directory exists on filesystem
3. Git knows about the worktree
Args:
repository_url: Git repository URL
work_order_id: The work order ID to validate
state: The work order state dictionary
Returns:
Tuple of (is_valid, error_message)
"""
# Check state has worktree_path
worktree_path = state.get("worktree_path")
if not worktree_path:
return False, "No worktree_path in state"
# Check directory exists
if not os.path.exists(worktree_path):
return False, f"Worktree directory not found: {worktree_path}"
# Check git knows about it (query from base repository)
base_repo_path = get_base_repo_path(repository_url)
if not os.path.exists(base_repo_path):
return False, f"Base repository not found: {base_repo_path}"
result = subprocess.run(
["git", "worktree", "list"],
capture_output=True,
text=True,
cwd=base_repo_path
)
if worktree_path not in result.stdout:
return False, "Worktree not registered with git"
return True, None
def remove_worktree(
repository_url: str,
work_order_id: str,
logger: "structlog.stdlib.BoundLogger"
) -> tuple[bool, str | None]:
"""Remove a worktree and clean up.
Args:
repository_url: Git repository URL
work_order_id: The work order ID for the worktree to remove
logger: Logger instance
Returns:
Tuple of (success, error_message)
"""
worktree_path = get_worktree_path(repository_url, work_order_id)
base_repo_path = get_base_repo_path(repository_url)
# First remove via git (if base repo exists)
if os.path.exists(base_repo_path):
cmd = ["git", "worktree", "remove", worktree_path, "--force"]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=base_repo_path
)
if result.returncode != 0:
# Try to clean up manually if git command failed
if os.path.exists(worktree_path):
try:
shutil.rmtree(worktree_path)
logger.warning(f"Manually removed worktree directory: {worktree_path}")
except Exception as e:
return False, f"Failed to remove worktree: {result.stderr}, manual cleanup failed: {e}"
else:
# If base repo doesn't exist, just remove directory
if os.path.exists(worktree_path):
try:
shutil.rmtree(worktree_path)
logger.info(f"Removed worktree directory (no base repo): {worktree_path}")
except Exception as e:
return False, f"Failed to remove worktree directory: {e}"
logger.info(f"Removed worktree at {worktree_path}")
return True, None
def setup_worktree_environment(
worktree_path: str,
backend_port: int,
frontend_port: int,
logger: "structlog.stdlib.BoundLogger"
) -> None:
"""Set up worktree environment by creating .ports.env file.
The actual environment setup (copying .env files, installing dependencies) is handled
by separate commands which run inside the worktree.
Args:
worktree_path: Path to the worktree
backend_port: Backend port number
frontend_port: Frontend port number
logger: Logger instance
"""
create_ports_env_file(worktree_path, backend_port, frontend_port)
logger.info(f"Created .ports.env with Backend: {backend_port}, Frontend: {frontend_port}")

View File

@@ -20,6 +20,7 @@ IMPLEMENTOR = "implementor" # Implements changes
# Validate Phase # Validate Phase
CODE_REVIEWER = "code_reviewer" # Reviews code quality CODE_REVIEWER = "code_reviewer" # Reviews code quality
TESTER = "tester" # Runs tests TESTER = "tester" # Runs tests
REVIEWER = "reviewer" # Reviews against spec
# Git Operations (support all phases) # Git Operations (support all phases)
BRANCH_GENERATOR = "branch_generator" # Creates branches BRANCH_GENERATOR = "branch_generator" # Creates branches

View File

@@ -0,0 +1,308 @@
"""Review Workflow with Automatic Blocker Resolution
Reviews implementation against spec and automatically resolves blocker issues with retry logic (max 3 attempts).
"""
import json
from typing import TYPE_CHECKING
from ..agent_executor.agent_cli_executor import AgentCLIExecutor
from ..command_loader.claude_command_loader import ClaudeCommandLoader
from ..models import StepExecutionResult, WorkflowStep
from ..utils.structured_logger import get_logger
from .agent_names import REVIEWER
if TYPE_CHECKING:
import structlog
logger = get_logger(__name__)
class ReviewIssue:
"""Represents a single review issue"""
def __init__(
self,
issue_title: str,
issue_description: str,
issue_severity: str,
affected_files: list[str],
screenshots: list[str] | None = None,
):
self.issue_title = issue_title
self.issue_description = issue_description
self.issue_severity = issue_severity
self.affected_files = affected_files
self.screenshots = screenshots or []
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization"""
return {
"issue_title": self.issue_title,
"issue_description": self.issue_description,
"issue_severity": self.issue_severity,
"affected_files": self.affected_files,
"screenshots": self.screenshots,
}
@classmethod
def from_dict(cls, data: dict) -> "ReviewIssue":
"""Create ReviewIssue from dictionary"""
return cls(
issue_title=data["issue_title"],
issue_description=data["issue_description"],
issue_severity=data["issue_severity"],
affected_files=data["affected_files"],
screenshots=data.get("screenshots", []),
)
class ReviewResult:
"""Represents review execution result"""
def __init__(
self,
review_passed: bool,
review_issues: list[ReviewIssue],
screenshots: list[str] | None = None,
):
self.review_passed = review_passed
self.review_issues = review_issues
self.screenshots = screenshots or []
def get_blocker_count(self) -> int:
"""Get count of blocker issues"""
return sum(1 for issue in self.review_issues if issue.issue_severity == "blocker")
def get_blocker_issues(self) -> list[ReviewIssue]:
"""Get list of blocker issues"""
return [issue for issue in self.review_issues if issue.issue_severity == "blocker"]
async def run_review(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
spec_file: str,
work_order_id: str,
working_dir: str,
bound_logger: "structlog.stdlib.BoundLogger",
) -> ReviewResult:
"""Execute review against specification
Args:
executor: Agent CLI executor
command_loader: Command loader
spec_file: Path to specification file
work_order_id: Work order ID
working_dir: Working directory
bound_logger: Logger instance
Returns:
ReviewResult with issues found
"""
bound_logger.info("review_execution_started", spec_file=spec_file)
# Execute review command
result = await executor.execute_command(
command_name="review_runner",
arguments=[spec_file, work_order_id],
working_directory=working_dir,
logger=bound_logger,
)
if not result.success:
bound_logger.error("review_execution_failed", error=result.error_message)
# Return empty review result indicating failure
return ReviewResult(review_passed=False, review_issues=[])
# Parse review results from output
return parse_review_results(result.result_text or result.stdout or "", bound_logger)
def parse_review_results(
output: str, logger: "structlog.stdlib.BoundLogger"
) -> ReviewResult:
"""Parse review results from JSON output
Args:
output: Command output (should be JSON object)
logger: Logger instance
Returns:
ReviewResult
"""
try:
# Try to parse as JSON
data = json.loads(output)
if not isinstance(data, dict):
logger.error("review_results_invalid_format", error="Expected JSON object")
return ReviewResult(review_passed=False, review_issues=[])
review_issues = [
ReviewIssue.from_dict(issue) for issue in data.get("review_issues", [])
]
review_passed = data.get("review_passed", False)
screenshots = data.get("screenshots", [])
blocker_count = sum(1 for issue in review_issues if issue.issue_severity == "blocker")
logger.info(
"review_results_parsed",
review_passed=review_passed,
total_issues=len(review_issues),
blockers=blocker_count,
)
return ReviewResult(
review_passed=review_passed,
review_issues=review_issues,
screenshots=screenshots,
)
except json.JSONDecodeError as e:
logger.error("review_results_parse_failed", error=str(e), output_preview=output[:500])
return ReviewResult(review_passed=False, review_issues=[])
async def resolve_review_issue(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
review_issue: ReviewIssue,
work_order_id: str,
working_dir: str,
bound_logger: "structlog.stdlib.BoundLogger",
) -> StepExecutionResult:
"""Resolve a single blocker review issue
Args:
executor: Agent CLI executor
command_loader: Command loader
review_issue: Review issue to resolve
work_order_id: Work order ID
working_dir: Working directory
bound_logger: Logger instance
Returns:
StepExecutionResult with resolution outcome
"""
bound_logger.info(
"review_issue_resolution_started",
issue_title=review_issue.issue_title,
severity=review_issue.issue_severity,
)
# Convert review issue to JSON for passing to resolve command
issue_json = json.dumps(review_issue.to_dict())
# Execute resolve_failed_review command
result = await executor.execute_command(
command_name="resolve_failed_review",
arguments=[issue_json],
working_directory=working_dir,
logger=bound_logger,
)
if not result.success:
return StepExecutionResult(
step=WorkflowStep.RESOLVE_REVIEW,
agent_name=REVIEWER,
success=False,
output=result.result_text or result.stdout,
error_message=f"Review issue resolution failed: {result.error_message}",
duration_seconds=result.duration_seconds or 0,
session_id=result.session_id,
)
return StepExecutionResult(
step=WorkflowStep.RESOLVE_REVIEW,
agent_name=REVIEWER,
success=True,
output=f"Resolved review issue: {review_issue.issue_title}",
error_message=None,
duration_seconds=result.duration_seconds or 0,
session_id=result.session_id,
)
async def run_review_with_resolution(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
spec_file: str,
work_order_id: str,
working_dir: str,
bound_logger: "structlog.stdlib.BoundLogger",
max_attempts: int = 3,
) -> ReviewResult:
"""Run review with automatic blocker resolution and retry logic
Tech debt and skippable issues are allowed to pass. Only blockers prevent completion.
Args:
executor: Agent CLI executor
command_loader: Command loader
spec_file: Path to specification file
work_order_id: Work order ID
working_dir: Working directory
bound_logger: Logger instance
max_attempts: Maximum retry attempts (default 3)
Returns:
Final ReviewResult
"""
bound_logger.info("review_workflow_started", max_attempts=max_attempts)
for attempt in range(1, max_attempts + 1):
bound_logger.info("review_attempt_started", attempt=attempt)
# Run review
review_result = await run_review(
executor, command_loader, spec_file, work_order_id, working_dir, bound_logger
)
blocker_count = review_result.get_blocker_count()
if blocker_count == 0:
# No blockers, review passes (tech_debt and skippable are acceptable)
bound_logger.info(
"review_workflow_completed",
attempt=attempt,
outcome="no_blockers",
total_issues=len(review_result.review_issues),
)
return review_result
if attempt >= max_attempts:
# Max attempts reached
bound_logger.warning(
"review_workflow_max_attempts_reached",
attempt=attempt,
blocker_count=blocker_count,
)
return review_result
# Resolve each blocker issue
blocker_issues = review_result.get_blocker_issues()
bound_logger.info(
"review_issue_resolution_batch_started",
blocker_count=len(blocker_issues),
)
for blocker_issue in blocker_issues:
resolution_result = await resolve_review_issue(
executor,
command_loader,
blocker_issue,
work_order_id,
working_dir,
bound_logger,
)
if not resolution_result.success:
bound_logger.warning(
"review_issue_resolution_failed",
issue_title=blocker_issue.issue_title,
)
# Should not reach here, but return last result if we do
return review_result

View File

@@ -0,0 +1,311 @@
"""Test Workflow with Automatic Resolution
Executes test suite and automatically resolves failures with retry logic (max 4 attempts).
"""
import json
from typing import TYPE_CHECKING
from ..agent_executor.agent_cli_executor import AgentCLIExecutor
from ..command_loader.claude_command_loader import ClaudeCommandLoader
from ..models import StepExecutionResult, WorkflowStep
from ..utils.structured_logger import get_logger
from .agent_names import TESTER
if TYPE_CHECKING:
import structlog
logger = get_logger(__name__)
class TestResult:
"""Represents a single test result"""
def __init__(
self,
test_name: str,
passed: bool,
execution_command: str,
test_purpose: str,
error: str | None = None,
):
self.test_name = test_name
self.passed = passed
self.execution_command = execution_command
self.test_purpose = test_purpose
self.error = error
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization"""
return {
"test_name": self.test_name,
"passed": self.passed,
"execution_command": self.execution_command,
"test_purpose": self.test_purpose,
"error": self.error,
}
@classmethod
def from_dict(cls, data: dict) -> "TestResult":
"""Create TestResult from dictionary"""
return cls(
test_name=data["test_name"],
passed=data["passed"],
execution_command=data["execution_command"],
test_purpose=data["test_purpose"],
error=data.get("error"),
)
async def run_tests(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
work_order_id: str,
working_dir: str,
bound_logger: "structlog.stdlib.BoundLogger",
) -> StepExecutionResult:
"""Execute test suite and return results
Args:
executor: Agent CLI executor
command_loader: Command loader
work_order_id: Work order ID
working_dir: Working directory
bound_logger: Logger instance
Returns:
StepExecutionResult with test results
"""
bound_logger.info("test_execution_started")
# Execute test command
result = await executor.execute_command(
command_name="test",
arguments=[],
working_directory=working_dir,
logger=bound_logger,
)
if not result.success:
return StepExecutionResult(
step=WorkflowStep.TEST,
agent_name=TESTER,
success=False,
output=result.result_text or result.stdout,
error_message=f"Test execution failed: {result.error_message}",
duration_seconds=result.duration_seconds or 0,
session_id=result.session_id,
)
# Parse test results from output
test_results, passed_count, failed_count = parse_test_results(
result.result_text or result.stdout or "", bound_logger
)
success = failed_count == 0
output_summary = f"Tests: {passed_count} passed, {failed_count} failed"
return StepExecutionResult(
step=WorkflowStep.TEST,
agent_name=TESTER,
success=success,
output=output_summary,
error_message=None if success else f"{failed_count} test(s) failed",
duration_seconds=result.duration_seconds or 0,
session_id=result.session_id,
)
def parse_test_results(
output: str, logger: "structlog.stdlib.BoundLogger"
) -> tuple[list[TestResult], int, int]:
"""Parse test results from JSON output
Args:
output: Command output (should be JSON array)
logger: Logger instance
Returns:
Tuple of (test_results, passed_count, failed_count)
"""
try:
# Try to parse as JSON
data = json.loads(output)
if not isinstance(data, list):
logger.error("test_results_invalid_format", error="Expected JSON array")
return [], 0, 0
test_results = [TestResult.from_dict(item) for item in data]
passed_count = sum(1 for t in test_results if t.passed)
failed_count = sum(1 for t in test_results if not t.passed)
logger.info(
"test_results_parsed",
passed=passed_count,
failed=failed_count,
total=len(test_results),
)
return test_results, passed_count, failed_count
except json.JSONDecodeError as e:
logger.error("test_results_parse_failed", error=str(e), output_preview=output[:500])
return [], 0, 0
async def resolve_failed_test(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
test_result: TestResult,
work_order_id: str,
working_dir: str,
bound_logger: "structlog.stdlib.BoundLogger",
) -> StepExecutionResult:
"""Resolve a single failed test
Args:
executor: Agent CLI executor
command_loader: Command loader
test_result: Failed test result
work_order_id: Work order ID
working_dir: Working directory
bound_logger: Logger instance
Returns:
StepExecutionResult with resolution outcome
"""
bound_logger.info(
"test_resolution_started",
test_name=test_result.test_name,
)
# Convert test result to JSON for passing to resolve command
test_json = json.dumps(test_result.to_dict())
# Execute resolve_failed_test command
result = await executor.execute_command(
command_name="resolve_failed_test",
arguments=[test_json],
working_directory=working_dir,
logger=bound_logger,
)
if not result.success:
return StepExecutionResult(
step=WorkflowStep.RESOLVE_TEST,
agent_name=TESTER,
success=False,
output=result.result_text or result.stdout,
error_message=f"Test resolution failed: {result.error_message}",
duration_seconds=result.duration_seconds or 0,
session_id=result.session_id,
)
return StepExecutionResult(
step=WorkflowStep.RESOLVE_TEST,
agent_name=TESTER,
success=True,
output=f"Resolved test: {test_result.test_name}",
error_message=None,
duration_seconds=result.duration_seconds or 0,
session_id=result.session_id,
)
async def run_tests_with_resolution(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
work_order_id: str,
working_dir: str,
bound_logger: "structlog.stdlib.BoundLogger",
max_attempts: int = 4,
) -> tuple[list[TestResult], int, int]:
"""Run tests with automatic failure resolution and retry logic
Args:
executor: Agent CLI executor
command_loader: Command loader
work_order_id: Work order ID
working_dir: Working directory
bound_logger: Logger instance
max_attempts: Maximum retry attempts (default 4)
Returns:
Tuple of (final_test_results, passed_count, failed_count)
"""
bound_logger.info("test_workflow_started", max_attempts=max_attempts)
for attempt in range(1, max_attempts + 1):
bound_logger.info("test_attempt_started", attempt=attempt)
# Run tests
test_result = await run_tests(
executor, command_loader, work_order_id, working_dir, bound_logger
)
if test_result.success:
bound_logger.info("test_workflow_completed", attempt=attempt, outcome="all_passed")
# Parse final results
# Re-run to get the actual test results
final_result = await executor.execute_command(
command_name="test",
arguments=[],
working_directory=working_dir,
logger=bound_logger,
)
final_results, passed, failed = parse_test_results(
final_result.result_text or final_result.stdout or "", bound_logger
)
return final_results, passed, failed
# Parse failures
test_execution = await executor.execute_command(
command_name="test",
arguments=[],
working_directory=working_dir,
logger=bound_logger,
)
test_results, passed_count, failed_count = parse_test_results(
test_execution.result_text or test_execution.stdout or "", bound_logger
)
if failed_count == 0:
# No failures, we're done
bound_logger.info("test_workflow_completed", attempt=attempt, outcome="all_passed")
return test_results, passed_count, failed_count
if attempt >= max_attempts:
# Max attempts reached
bound_logger.warning(
"test_workflow_max_attempts_reached",
attempt=attempt,
failed_count=failed_count,
)
return test_results, passed_count, failed_count
# Resolve each failed test
failed_tests = [t for t in test_results if not t.passed]
bound_logger.info(
"test_resolution_batch_started",
failed_count=len(failed_tests),
)
for failed_test in failed_tests:
resolution_result = await resolve_failed_test(
executor,
command_loader,
failed_test,
work_order_id,
working_dir,
bound_logger,
)
if not resolution_result.success:
bound_logger.warning(
"test_resolution_failed",
test_name=failed_test.test_name,
)
# Should not reach here, but return last results if we do
return test_results, passed_count, failed_count

View File

@@ -18,6 +18,8 @@ from .agent_names import (
PLAN_FINDER, PLAN_FINDER,
PLANNER, PLANNER,
PR_CREATOR, PR_CREATOR,
REVIEWER,
TESTER,
) )
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -442,3 +444,227 @@ async def create_pull_request(
error_message=str(e), error_message=str(e),
duration_seconds=duration, duration_seconds=duration,
) )
async def run_tests(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
work_order_id: str,
working_dir: str,
) -> StepExecutionResult:
"""Execute test suite
Returns: StepExecutionResult with test results summary
"""
start_time = time.time()
try:
command_file = command_loader.load_command("test")
cli_command, prompt_text = executor.build_command(command_file, args=[])
result = await executor.execute_async(
cli_command, working_dir, prompt_text=prompt_text, work_order_id=work_order_id
)
duration = time.time() - start_time
if result.success:
return StepExecutionResult(
step=WorkflowStep.TEST,
agent_name=TESTER,
success=True,
output=result.result_text or "Tests passed",
duration_seconds=duration,
session_id=result.session_id,
)
else:
return StepExecutionResult(
step=WorkflowStep.TEST,
agent_name=TESTER,
success=False,
error_message=result.error_message or "Tests failed",
output=result.result_text,
duration_seconds=duration,
)
except Exception as e:
duration = time.time() - start_time
logger.error("run_tests_error", error=str(e), exc_info=True)
return StepExecutionResult(
step=WorkflowStep.TEST,
agent_name=TESTER,
success=False,
error_message=str(e),
duration_seconds=duration,
)
async def resolve_test_failure(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
test_failure_json: str,
work_order_id: str,
working_dir: str,
) -> StepExecutionResult:
"""Resolve a failed test
Args:
test_failure_json: JSON string with test failure details
Returns: StepExecutionResult with resolution outcome
"""
start_time = time.time()
try:
command_file = command_loader.load_command("resolve_failed_test")
cli_command, prompt_text = executor.build_command(command_file, args=[test_failure_json])
result = await executor.execute_async(
cli_command, working_dir, prompt_text=prompt_text, work_order_id=work_order_id
)
duration = time.time() - start_time
if result.success:
return StepExecutionResult(
step=WorkflowStep.RESOLVE_TEST,
agent_name=TESTER,
success=True,
output=result.result_text or "Test failure resolved",
duration_seconds=duration,
session_id=result.session_id,
)
else:
return StepExecutionResult(
step=WorkflowStep.RESOLVE_TEST,
agent_name=TESTER,
success=False,
error_message=result.error_message or "Resolution failed",
duration_seconds=duration,
)
except Exception as e:
duration = time.time() - start_time
logger.error("resolve_test_failure_error", error=str(e), exc_info=True)
return StepExecutionResult(
step=WorkflowStep.RESOLVE_TEST,
agent_name=TESTER,
success=False,
error_message=str(e),
duration_seconds=duration,
)
async def run_review(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
spec_file: str,
work_order_id: str,
working_dir: str,
) -> StepExecutionResult:
"""Execute review against specification
Returns: StepExecutionResult with review results
"""
start_time = time.time()
try:
command_file = command_loader.load_command("review_runner")
cli_command, prompt_text = executor.build_command(
command_file, args=[spec_file, work_order_id]
)
result = await executor.execute_async(
cli_command, working_dir, prompt_text=prompt_text, work_order_id=work_order_id
)
duration = time.time() - start_time
if result.success:
return StepExecutionResult(
step=WorkflowStep.REVIEW,
agent_name=REVIEWER,
success=True,
output=result.result_text or "Review completed",
duration_seconds=duration,
session_id=result.session_id,
)
else:
return StepExecutionResult(
step=WorkflowStep.REVIEW,
agent_name=REVIEWER,
success=False,
error_message=result.error_message or "Review failed",
duration_seconds=duration,
)
except Exception as e:
duration = time.time() - start_time
logger.error("run_review_error", error=str(e), exc_info=True)
return StepExecutionResult(
step=WorkflowStep.REVIEW,
agent_name=REVIEWER,
success=False,
error_message=str(e),
duration_seconds=duration,
)
async def resolve_review_issue(
executor: AgentCLIExecutor,
command_loader: ClaudeCommandLoader,
review_issue_json: str,
work_order_id: str,
working_dir: str,
) -> StepExecutionResult:
"""Resolve a review blocker issue
Args:
review_issue_json: JSON string with review issue details
Returns: StepExecutionResult with resolution outcome
"""
start_time = time.time()
try:
command_file = command_loader.load_command("resolve_failed_review")
cli_command, prompt_text = executor.build_command(command_file, args=[review_issue_json])
result = await executor.execute_async(
cli_command, working_dir, prompt_text=prompt_text, work_order_id=work_order_id
)
duration = time.time() - start_time
if result.success:
return StepExecutionResult(
step=WorkflowStep.RESOLVE_REVIEW,
agent_name=REVIEWER,
success=True,
output=result.result_text or "Review issue resolved",
duration_seconds=duration,
session_id=result.session_id,
)
else:
return StepExecutionResult(
step=WorkflowStep.RESOLVE_REVIEW,
agent_name=REVIEWER,
success=False,
error_message=result.error_message or "Resolution failed",
duration_seconds=duration,
)
except Exception as e:
duration = time.time() - start_time
logger.error("resolve_review_issue_error", error=str(e), exc_info=True)
return StepExecutionResult(
step=WorkflowStep.RESOLVE_REVIEW,
agent_name=REVIEWER,
success=False,
error_message=str(e),
duration_seconds=duration,
)

View File

@@ -234,7 +234,78 @@ class WorkflowOrchestrator:
bound_logger.info("step_completed", step="commit") bound_logger.info("step_completed", step="commit")
# Step 7: Create PR # Step 7: Run tests (if enabled)
from ..config import config
if config.ENABLE_TEST_PHASE:
from .test_workflow import run_tests_with_resolution
bound_logger.info("test_phase_started")
test_results, passed_count, failed_count = await run_tests_with_resolution(
self.agent_executor,
self.command_loader,
agent_work_order_id,
sandbox.working_dir,
bound_logger,
max_attempts=config.MAX_TEST_RETRY_ATTEMPTS,
)
# Record test execution in step history
test_summary = f"Tests: {passed_count} passed, {failed_count} failed"
from ..models import StepExecutionResult
test_step = StepExecutionResult(
step=WorkflowStep.TEST,
agent_name="Tester",
success=(failed_count == 0),
output=test_summary,
error_message=f"{failed_count} test(s) failed" if failed_count > 0 else None,
duration_seconds=0,
)
step_history.steps.append(test_step)
await self.state_repository.save_step_history(agent_work_order_id, step_history)
if failed_count > 0:
bound_logger.warning("test_phase_completed_with_failures", failed_count=failed_count)
else:
bound_logger.info("test_phase_completed", passed_count=passed_count)
# Step 8: Run review (if enabled)
if config.ENABLE_REVIEW_PHASE:
from .review_workflow import run_review_with_resolution
# Determine spec file path from plan_file or default
spec_file = plan_file if plan_file else f"PRPs/specs/{issue_class}-spec.md"
bound_logger.info("review_phase_started", spec_file=spec_file)
review_result = await run_review_with_resolution(
self.agent_executor,
self.command_loader,
spec_file,
agent_work_order_id,
sandbox.working_dir,
bound_logger,
max_attempts=config.MAX_REVIEW_RETRY_ATTEMPTS,
)
# Record review execution in step history
blocker_count = review_result.get_blocker_count()
review_summary = f"Review: {len(review_result.review_issues)} issues found, {blocker_count} blockers"
review_step = StepExecutionResult(
step=WorkflowStep.REVIEW,
agent_name="Reviewer",
success=(blocker_count == 0),
output=review_summary,
error_message=f"{blocker_count} blocker(s) remaining" if blocker_count > 0 else None,
duration_seconds=0,
)
step_history.steps.append(review_step)
await self.state_repository.save_step_history(agent_work_order_id, step_history)
if blocker_count > 0:
bound_logger.warning("review_phase_completed_with_blockers", blocker_count=blocker_count)
else:
bound_logger.info("review_phase_completed", issue_count=len(review_result.review_issues))
# Step 9: Create PR
pr_result = await workflow_operations.create_pull_request( pr_result = await workflow_operations.create_pull_request(
self.agent_executor, self.agent_executor,
self.command_loader, self.command_loader,