mirror of
https://github.com/coleam00/Archon.git
synced 2025-12-29 13:10:08 -05:00
Code review updates and moving the prp-review step to before the Commit.
This commit is contained in:
@@ -295,7 +295,7 @@ claude --version
|
||||
Check health endpoint to see dependency status:
|
||||
|
||||
```bash
|
||||
curl http://localhost:8052/health
|
||||
curl http://localhost:8053/health
|
||||
```
|
||||
|
||||
This shows:
|
||||
|
||||
@@ -5,7 +5,7 @@ FastAPI routes for agent work orders.
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
@@ -41,6 +41,93 @@ from .sse_streams import stream_work_order_logs
|
||||
logger = get_logger(__name__)
|
||||
router = APIRouter()
|
||||
|
||||
# Registry to track background workflow tasks by work order ID
|
||||
# Enables monitoring, exception tracking, and cleanup
|
||||
_workflow_tasks: dict[str, asyncio.Task] = {}
|
||||
|
||||
|
||||
def _create_task_done_callback(agent_work_order_id: str) -> Callable[[asyncio.Task], None]:
|
||||
"""Create a done callback for workflow tasks
|
||||
|
||||
Logs exceptions, updates work order status, and removes task from registry.
|
||||
Note: This callback is synchronous but schedules async operations for status updates.
|
||||
|
||||
Args:
|
||||
agent_work_order_id: Work order ID to track
|
||||
"""
|
||||
def on_task_done(task: asyncio.Task) -> None:
|
||||
"""Callback invoked when workflow task completes
|
||||
|
||||
Inspects task.exception() to determine if workflow succeeded or failed,
|
||||
logs appropriately, and updates work order status.
|
||||
"""
|
||||
try:
|
||||
# Check if task raised an exception
|
||||
exception = task.exception()
|
||||
|
||||
if exception is None:
|
||||
# Task completed successfully
|
||||
logger.info(
|
||||
"workflow_task_completed",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
status="completed",
|
||||
)
|
||||
# Note: Orchestrator handles updating status to COMPLETED
|
||||
# so we don't need to update it here
|
||||
else:
|
||||
# Task failed with an exception
|
||||
# Log full exception details with context
|
||||
logger.exception(
|
||||
"workflow_task_failed",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
status="failed",
|
||||
exception_type=type(exception).__name__,
|
||||
exception_message=str(exception),
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Schedule async operation to update work order status if needed
|
||||
# (execute_workflow_with_error_handling may have already done this)
|
||||
async def update_status_if_needed() -> None:
|
||||
try:
|
||||
result = await state_repository.get(agent_work_order_id)
|
||||
if result:
|
||||
_, metadata = result
|
||||
current_status = metadata.get("status")
|
||||
if current_status != AgentWorkOrderStatus.FAILED:
|
||||
error_msg = f"Workflow task failed: {str(exception)}"
|
||||
await state_repository.update_status(
|
||||
agent_work_order_id,
|
||||
AgentWorkOrderStatus.FAILED,
|
||||
error_message=error_msg,
|
||||
)
|
||||
logger.info(
|
||||
"workflow_status_updated_to_failed",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
)
|
||||
except Exception as update_error:
|
||||
# Log but don't raise - task is already failed
|
||||
logger.error(
|
||||
"workflow_status_update_failed_in_callback",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
update_error=str(update_error),
|
||||
original_exception=str(exception),
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Schedule the async status update
|
||||
asyncio.create_task(update_status_if_needed())
|
||||
finally:
|
||||
# Always remove task from registry when done (success or failure)
|
||||
_workflow_tasks.pop(agent_work_order_id, None)
|
||||
logger.debug(
|
||||
"workflow_task_removed_from_registry",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
)
|
||||
|
||||
return on_task_done
|
||||
|
||||
|
||||
# Initialize dependencies (singletons for MVP)
|
||||
state_repository = create_repository()
|
||||
repository_config_repo = RepositoryConfigRepository()
|
||||
@@ -103,9 +190,15 @@ async def create_agent_work_order(
|
||||
# Save to repository
|
||||
await state_repository.create(state, metadata)
|
||||
|
||||
# Start workflow in background
|
||||
asyncio.create_task(
|
||||
orchestrator.execute_workflow(
|
||||
# Wrapper function to handle exceptions from workflow execution
|
||||
async def execute_workflow_with_error_handling() -> None:
|
||||
"""Execute workflow and handle any unhandled exceptions
|
||||
|
||||
Broad exception handler ensures all exceptions are caught and logged,
|
||||
with full context for debugging. Status is updated to FAILED on errors.
|
||||
"""
|
||||
try:
|
||||
await orchestrator.execute_workflow(
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
repository_url=request.repository_url,
|
||||
sandbox_type=request.sandbox_type,
|
||||
@@ -113,6 +206,47 @@ async def create_agent_work_order(
|
||||
selected_commands=request.selected_commands,
|
||||
github_issue_number=request.github_issue_number,
|
||||
)
|
||||
except Exception as e:
|
||||
# Catch any exceptions that weren't handled by the orchestrator
|
||||
# (e.g., exceptions during initialization, argument validation, etc.)
|
||||
error_msg = str(e)
|
||||
logger.exception(
|
||||
"workflow_execution_unhandled_exception",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
error=error_msg,
|
||||
exception_type=type(e).__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
try:
|
||||
# Update work order status to FAILED
|
||||
await state_repository.update_status(
|
||||
agent_work_order_id,
|
||||
AgentWorkOrderStatus.FAILED,
|
||||
error_message=f"Workflow execution failed before orchestrator could handle it: {error_msg}",
|
||||
)
|
||||
except Exception as update_error:
|
||||
# Log but don't raise - we've already caught the original error
|
||||
logger.error(
|
||||
"workflow_status_update_failed_after_exception",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
update_error=str(update_error),
|
||||
original_error=error_msg,
|
||||
exc_info=True,
|
||||
)
|
||||
# Re-raise to ensure task.exception() returns the exception
|
||||
raise
|
||||
|
||||
# Create and track background workflow task
|
||||
task = asyncio.create_task(execute_workflow_with_error_handling())
|
||||
_workflow_tasks[agent_work_order_id] = task
|
||||
|
||||
# Attach done callback to log exceptions and update status
|
||||
task.add_done_callback(_create_task_done_callback(agent_work_order_id))
|
||||
|
||||
logger.debug(
|
||||
"workflow_task_created_and_tracked",
|
||||
agent_work_order_id=agent_work_order_id,
|
||||
task_count=len(_workflow_tasks),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
All models follow exact naming from the PRD specification.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
@@ -284,7 +284,7 @@ class StepExecutionResult(BaseModel):
|
||||
error_message: str | None = None
|
||||
duration_seconds: float
|
||||
session_id: str | None = None
|
||||
timestamp: datetime = Field(default_factory=datetime.now)
|
||||
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
|
||||
|
||||
class StepHistory(BaseModel):
|
||||
|
||||
@@ -112,14 +112,27 @@ class GitBranchSandbox:
|
||||
duration_seconds=duration,
|
||||
)
|
||||
|
||||
# Explicit check for None returncode (should never happen after communicate())
|
||||
if process.returncode is None:
|
||||
self._logger.error(
|
||||
"command_execution_unexpected_state",
|
||||
command=command,
|
||||
error="process.returncode is None after communicate() - this indicates a serious bug",
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Process returncode is None after communicate() for command: {command}. "
|
||||
"This should never happen and indicates a serious issue."
|
||||
)
|
||||
|
||||
duration = time.time() - start_time
|
||||
success = process.returncode == 0
|
||||
exit_code = process.returncode
|
||||
success = exit_code == 0
|
||||
|
||||
result = CommandExecutionResult(
|
||||
success=success,
|
||||
stdout=stdout.decode() if stdout else None,
|
||||
stderr=stderr.decode() if stderr else None,
|
||||
exit_code=process.returncode or 0,
|
||||
exit_code=exit_code,
|
||||
error_message=None if success else stderr.decode() if stderr else "Command failed",
|
||||
duration_seconds=duration,
|
||||
)
|
||||
@@ -132,7 +145,7 @@ class GitBranchSandbox:
|
||||
self._logger.error(
|
||||
"command_execution_failed",
|
||||
command=command,
|
||||
exit_code=process.returncode,
|
||||
exit_code=exit_code,
|
||||
duration=duration,
|
||||
)
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ Enables parallel execution of multiple work orders without conflicts.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from ..models import CommandExecutionResult, SandboxSetupError
|
||||
@@ -13,6 +15,7 @@ from ..utils.port_allocation import find_available_port_range
|
||||
from ..utils.structured_logger import get_logger
|
||||
from ..utils.worktree_operations import (
|
||||
create_worktree,
|
||||
get_base_repo_path,
|
||||
get_worktree_path,
|
||||
remove_worktree,
|
||||
setup_worktree_environment,
|
||||
@@ -36,6 +39,7 @@ class GitWorktreeSandbox:
|
||||
self.port_range_start: int | None = None
|
||||
self.port_range_end: int | None = None
|
||||
self.available_ports: list[int] = []
|
||||
self.temp_branch: str | None = None # Track temporary branch for cleanup
|
||||
self._logger = logger.bind(
|
||||
sandbox_identifier=sandbox_identifier,
|
||||
repository_url=repository_url,
|
||||
@@ -63,12 +67,13 @@ class GitWorktreeSandbox:
|
||||
|
||||
# Create worktree with temporary branch name
|
||||
# Agent will create the actual feature branch during execution
|
||||
temp_branch = f"wo-{self.sandbox_identifier}"
|
||||
# The temporary branch will be cleaned up in cleanup() method
|
||||
self.temp_branch = f"wo-{self.sandbox_identifier}"
|
||||
|
||||
worktree_path, error = create_worktree(
|
||||
self.repository_url,
|
||||
self.sandbox_identifier,
|
||||
temp_branch,
|
||||
self.temp_branch,
|
||||
self._logger
|
||||
)
|
||||
|
||||
@@ -143,13 +148,15 @@ class GitWorktreeSandbox:
|
||||
)
|
||||
|
||||
duration = time.time() - start_time
|
||||
success = process.returncode == 0
|
||||
# Use actual returncode when available, or -1 as sentinel for None
|
||||
exit_code = process.returncode if process.returncode is not None else -1
|
||||
success = exit_code == 0
|
||||
|
||||
result = CommandExecutionResult(
|
||||
success=success,
|
||||
stdout=stdout.decode() if stdout else None,
|
||||
stderr=stderr.decode() if stderr else None,
|
||||
exit_code=process.returncode or 0,
|
||||
exit_code=exit_code,
|
||||
error_message=None if success else stderr.decode() if stderr else "Command failed",
|
||||
duration_seconds=duration,
|
||||
)
|
||||
@@ -162,7 +169,7 @@ class GitWorktreeSandbox:
|
||||
self._logger.error(
|
||||
"command_execution_failed",
|
||||
command=command,
|
||||
exit_code=process.returncode,
|
||||
exit_code=exit_code,
|
||||
duration=duration,
|
||||
)
|
||||
|
||||
@@ -195,25 +202,101 @@ class GitWorktreeSandbox:
|
||||
return None
|
||||
|
||||
async def cleanup(self) -> None:
|
||||
"""Remove worktree"""
|
||||
"""Remove worktree and temporary branch
|
||||
|
||||
Removes the worktree directory and the temporary branch that was created
|
||||
during setup. This ensures cleanup even if the agent failed before creating
|
||||
the actual feature branch.
|
||||
"""
|
||||
self._logger.info("worktree_sandbox_cleanup_started")
|
||||
|
||||
try:
|
||||
success, error = remove_worktree(
|
||||
# Remove the worktree first
|
||||
worktree_success, error = remove_worktree(
|
||||
self.repository_url,
|
||||
self.sandbox_identifier,
|
||||
self._logger
|
||||
)
|
||||
if success:
|
||||
self._logger.info("worktree_sandbox_cleanup_completed")
|
||||
else:
|
||||
|
||||
if not worktree_success:
|
||||
self._logger.error(
|
||||
"worktree_sandbox_cleanup_failed",
|
||||
error=error
|
||||
)
|
||||
|
||||
# Delete the temporary branch if it was created
|
||||
# Always try to delete branch even if worktree removal failed,
|
||||
# as the branch may still exist and need cleanup
|
||||
if self.temp_branch:
|
||||
await self._delete_temp_branch()
|
||||
|
||||
# Only log success if worktree removal succeeded
|
||||
if worktree_success:
|
||||
self._logger.info("worktree_sandbox_cleanup_completed")
|
||||
except Exception as e:
|
||||
self._logger.error(
|
||||
"worktree_sandbox_cleanup_failed",
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
async def _delete_temp_branch(self) -> None:
|
||||
"""Delete the temporary branch from the base repository
|
||||
|
||||
Attempts to delete the temporary branch created during setup.
|
||||
Fails gracefully if the branch doesn't exist or was already deleted.
|
||||
"""
|
||||
if not self.temp_branch:
|
||||
return
|
||||
|
||||
base_repo_path = get_base_repo_path(self.repository_url)
|
||||
|
||||
try:
|
||||
# Check if base repo exists
|
||||
if not os.path.exists(base_repo_path):
|
||||
self._logger.warning(
|
||||
"temp_branch_cleanup_skipped",
|
||||
reason="Base repository does not exist",
|
||||
temp_branch=self.temp_branch
|
||||
)
|
||||
return
|
||||
|
||||
# Delete the branch (local only - don't force push to remote)
|
||||
# Use -D to force delete even if not merged
|
||||
cmd = ["git", "branch", "-D", self.temp_branch]
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=base_repo_path,
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
self._logger.info(
|
||||
"temp_branch_deleted",
|
||||
temp_branch=self.temp_branch
|
||||
)
|
||||
else:
|
||||
# Branch might not exist (already deleted or wasn't created)
|
||||
if "not found" in result.stderr.lower() or "no such branch" in result.stderr.lower():
|
||||
self._logger.debug(
|
||||
"temp_branch_not_found",
|
||||
temp_branch=self.temp_branch,
|
||||
message="Branch may have been already deleted or never created"
|
||||
)
|
||||
else:
|
||||
# Other error (e.g., branch is checked out)
|
||||
self._logger.warning(
|
||||
"temp_branch_deletion_failed",
|
||||
temp_branch=self.temp_branch,
|
||||
error=result.stderr,
|
||||
message="Branch may need manual cleanup"
|
||||
)
|
||||
except Exception as e:
|
||||
self._logger.warning(
|
||||
"temp_branch_deletion_error",
|
||||
temp_branch=self.temp_branch,
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
message="Failed to delete temporary branch - may need manual cleanup"
|
||||
)
|
||||
|
||||
@@ -6,7 +6,7 @@ Enables state persistence across service restarts and debugging.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
@@ -203,7 +203,7 @@ class FileStateRepository:
|
||||
return
|
||||
|
||||
data["metadata"]["status"] = status
|
||||
data["metadata"]["updated_at"] = datetime.now().isoformat()
|
||||
data["metadata"]["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
for key, value in kwargs.items():
|
||||
data["metadata"][key] = value
|
||||
@@ -235,7 +235,7 @@ class FileStateRepository:
|
||||
return
|
||||
|
||||
data["state"]["git_branch_name"] = git_branch_name
|
||||
data["metadata"]["updated_at"] = datetime.now().isoformat()
|
||||
data["metadata"]["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
await self._write_state_file(agent_work_order_id, data)
|
||||
|
||||
@@ -264,7 +264,7 @@ class FileStateRepository:
|
||||
return
|
||||
|
||||
data["state"]["agent_session_id"] = agent_session_id
|
||||
data["metadata"]["updated_at"] = datetime.now().isoformat()
|
||||
data["metadata"]["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
await self._write_state_file(agent_work_order_id, data)
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ Stores repository metadata, verification status, and per-repository preferences.
|
||||
"""
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from supabase import Client, create_client
|
||||
@@ -63,17 +63,20 @@ class RepositoryConfigRepository:
|
||||
self._logger = logger.bind(table=self.table_name)
|
||||
self._logger.info("repository_config_repository_initialized")
|
||||
|
||||
def _row_to_model(self, row: dict[str, Any]) -> ConfiguredRepository:
|
||||
def _row_to_model(self, row: dict[str, Any]) -> ConfiguredRepository | None:
|
||||
"""Convert database row to ConfiguredRepository model
|
||||
|
||||
Args:
|
||||
row: Database row dictionary
|
||||
|
||||
Returns:
|
||||
ConfiguredRepository model instance
|
||||
ConfiguredRepository model instance, or None if row contains invalid enum values
|
||||
that cannot be converted (allows callers to skip invalid rows)
|
||||
|
||||
Raises:
|
||||
ValueError: If row contains invalid enum values that cannot be converted
|
||||
Note:
|
||||
Invalid enum values are logged but do not raise exceptions, allowing operations
|
||||
to continue with valid data. This prevents the entire table from becoming unreadable
|
||||
due to schema mismatches or corrupted data.
|
||||
"""
|
||||
repository_id = row.get("id", "unknown")
|
||||
|
||||
@@ -87,11 +90,10 @@ class RepositoryConfigRepository:
|
||||
repository_id=repository_id,
|
||||
invalid_commands=default_commands_raw,
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
exc_info=True,
|
||||
action="Skipping invalid row - consider running data migration to fix enum values"
|
||||
)
|
||||
raise ValueError(
|
||||
f"Database contains invalid workflow steps for repository {repository_id}: {default_commands_raw}"
|
||||
) from e
|
||||
return None
|
||||
|
||||
# Convert default_sandbox_type from string to SandboxType enum
|
||||
sandbox_type_raw = row.get("default_sandbox_type", "git_worktree")
|
||||
@@ -103,11 +105,10 @@ class RepositoryConfigRepository:
|
||||
repository_id=repository_id,
|
||||
invalid_type=sandbox_type_raw,
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
exc_info=True,
|
||||
action="Skipping invalid row - consider running data migration to fix enum values"
|
||||
)
|
||||
raise ValueError(
|
||||
f"Database contains invalid sandbox type for repository {repository_id}: {sandbox_type_raw}"
|
||||
) from e
|
||||
return None
|
||||
|
||||
return ConfiguredRepository(
|
||||
id=row["id"],
|
||||
@@ -127,7 +128,8 @@ class RepositoryConfigRepository:
|
||||
"""List all configured repositories
|
||||
|
||||
Returns:
|
||||
List of ConfiguredRepository models ordered by created_at DESC
|
||||
List of ConfiguredRepository models ordered by created_at DESC.
|
||||
Invalid rows (with bad enum values) are skipped and logged.
|
||||
|
||||
Raises:
|
||||
Exception: If database query fails
|
||||
@@ -135,7 +137,22 @@ class RepositoryConfigRepository:
|
||||
try:
|
||||
response = self.client.table(self.table_name).select("*").order("created_at", desc=True).execute()
|
||||
|
||||
repositories = [self._row_to_model(row) for row in response.data]
|
||||
repositories = []
|
||||
skipped_count = 0
|
||||
for row in response.data:
|
||||
repository = self._row_to_model(row)
|
||||
if repository is not None:
|
||||
repositories.append(repository)
|
||||
else:
|
||||
skipped_count += 1
|
||||
|
||||
if skipped_count > 0:
|
||||
self._logger.warning(
|
||||
"repositories_skipped_due_to_invalid_data",
|
||||
skipped_count=skipped_count,
|
||||
total_rows=len(response.data),
|
||||
valid_count=len(repositories)
|
||||
)
|
||||
|
||||
self._logger.info(
|
||||
"repositories_listed",
|
||||
@@ -158,7 +175,7 @@ class RepositoryConfigRepository:
|
||||
repository_id: UUID of the repository
|
||||
|
||||
Returns:
|
||||
ConfiguredRepository model or None if not found
|
||||
ConfiguredRepository model or None if not found or if data is invalid
|
||||
|
||||
Raises:
|
||||
Exception: If database query fails
|
||||
@@ -175,6 +192,15 @@ class RepositoryConfigRepository:
|
||||
|
||||
repository = self._row_to_model(response.data[0])
|
||||
|
||||
if repository is None:
|
||||
# Invalid enum values in database - treat as not found
|
||||
self._logger.warning(
|
||||
"repository_has_invalid_data",
|
||||
repository_id=repository_id,
|
||||
message="Repository exists but contains invalid enum values - consider data migration"
|
||||
)
|
||||
return None
|
||||
|
||||
self._logger.info(
|
||||
"repository_retrieved",
|
||||
repository_id=repository_id,
|
||||
@@ -226,11 +252,21 @@ class RepositoryConfigRepository:
|
||||
|
||||
# Set last_verified_at if verified
|
||||
if is_verified:
|
||||
data["last_verified_at"] = datetime.now().isoformat()
|
||||
data["last_verified_at"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
response = self.client.table(self.table_name).insert(data).execute()
|
||||
|
||||
repository = self._row_to_model(response.data[0])
|
||||
if repository is None:
|
||||
# This should not happen for newly created repositories with valid data
|
||||
# but handle defensively
|
||||
error_msg = "Failed to convert newly created repository to model - data corruption detected"
|
||||
self._logger.error(
|
||||
"repository_creation_model_conversion_failed",
|
||||
repository_url=repository_url,
|
||||
error=error_msg
|
||||
)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
self._logger.info(
|
||||
"repository_created",
|
||||
@@ -272,13 +308,13 @@ class RepositoryConfigRepository:
|
||||
for key, value in updates.items():
|
||||
if isinstance(value, SandboxType):
|
||||
prepared_updates[key] = value.value
|
||||
elif isinstance(value, list) and value and isinstance(value[0], WorkflowStep):
|
||||
elif isinstance(value, list) and value and all(isinstance(item, WorkflowStep) for item in value):
|
||||
prepared_updates[key] = [step.value for step in value]
|
||||
else:
|
||||
prepared_updates[key] = value
|
||||
|
||||
# Always update updated_at timestamp
|
||||
prepared_updates["updated_at"] = datetime.now().isoformat()
|
||||
prepared_updates["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
response = (
|
||||
self.client.table(self.table_name)
|
||||
@@ -295,6 +331,18 @@ class RepositoryConfigRepository:
|
||||
return None
|
||||
|
||||
repository = self._row_to_model(response.data[0])
|
||||
if repository is None:
|
||||
# Repository exists but has invalid enum values - cannot update
|
||||
error_msg = (
|
||||
f"Repository {repository_id} exists but contains invalid enum values. "
|
||||
"Cannot update - consider fixing data first via migration."
|
||||
)
|
||||
self._logger.error(
|
||||
"repository_update_failed_invalid_data",
|
||||
repository_id=repository_id,
|
||||
error=error_msg
|
||||
)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
self._logger.info(
|
||||
"repository_updated",
|
||||
|
||||
@@ -4,6 +4,8 @@ Creates appropriate repository instances based on configuration.
|
||||
Supports in-memory (dev/testing), file-based (legacy), and Supabase (production) storage.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from ..config import config
|
||||
from ..utils.structured_logger import get_logger
|
||||
from .file_state_repository import FileStateRepository
|
||||
@@ -12,6 +14,9 @@ from .work_order_repository import WorkOrderRepository
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Supported storage types
|
||||
SUPPORTED_STORAGE_TYPES = ["memory", "file", "supabase"]
|
||||
|
||||
|
||||
def create_repository() -> WorkOrderRepository | FileStateRepository | SupabaseWorkOrderRepository:
|
||||
"""Create a work order repository based on configuration
|
||||
@@ -20,11 +25,28 @@ def create_repository() -> WorkOrderRepository | FileStateRepository | SupabaseW
|
||||
Repository instance (in-memory, file-based, or Supabase)
|
||||
|
||||
Raises:
|
||||
ValueError: If Supabase is configured but credentials are missing
|
||||
ValueError: If Supabase is configured but credentials are missing, or if storage_type is invalid
|
||||
"""
|
||||
storage_type = config.STATE_STORAGE_TYPE.lower()
|
||||
|
||||
if storage_type == "supabase":
|
||||
# Validate Supabase credentials before creating repository
|
||||
supabase_url = os.getenv("SUPABASE_URL")
|
||||
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
|
||||
|
||||
if not supabase_url or not supabase_key:
|
||||
error_msg = (
|
||||
"Supabase storage is configured (STATE_STORAGE_TYPE=supabase) but required "
|
||||
"credentials are missing. Set SUPABASE_URL and SUPABASE_SERVICE_KEY environment variables."
|
||||
)
|
||||
logger.error(
|
||||
"supabase_credentials_missing",
|
||||
storage_type="supabase",
|
||||
missing_url=not bool(supabase_url),
|
||||
missing_key=not bool(supabase_key),
|
||||
)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
logger.info("repository_created", storage_type="supabase")
|
||||
return SupabaseWorkOrderRepository()
|
||||
elif storage_type == "file":
|
||||
@@ -42,9 +64,13 @@ def create_repository() -> WorkOrderRepository | FileStateRepository | SupabaseW
|
||||
)
|
||||
return WorkOrderRepository()
|
||||
else:
|
||||
logger.warning(
|
||||
"unknown_storage_type",
|
||||
storage_type=storage_type,
|
||||
fallback="memory"
|
||||
error_msg = (
|
||||
f"Invalid storage type '{storage_type}'. "
|
||||
f"Supported types are: {', '.join(SUPPORTED_STORAGE_TYPES)}"
|
||||
)
|
||||
return WorkOrderRepository()
|
||||
logger.error(
|
||||
"invalid_storage_type",
|
||||
storage_type=storage_type,
|
||||
supported_types=SUPPORTED_STORAGE_TYPES,
|
||||
)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
@@ -10,7 +10,7 @@ Architecture Note - async/await Pattern:
|
||||
This maintains a consistent async API contract across all repositories.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from supabase import Client
|
||||
@@ -247,7 +247,7 @@ class SupabaseWorkOrderRepository:
|
||||
# Prepare updates
|
||||
updates: dict[str, Any] = {
|
||||
"status": status.value,
|
||||
"updated_at": datetime.now().isoformat(),
|
||||
"updated_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
# Add any metadata updates to the JSONB column
|
||||
@@ -307,7 +307,7 @@ class SupabaseWorkOrderRepository:
|
||||
try:
|
||||
self.client.table(self.table_name).update({
|
||||
"git_branch_name": git_branch_name,
|
||||
"updated_at": datetime.now().isoformat(),
|
||||
"updated_at": datetime.now(timezone.utc).isoformat(),
|
||||
}).eq("agent_work_order_id", agent_work_order_id).execute()
|
||||
|
||||
self._logger.info(
|
||||
@@ -341,7 +341,7 @@ class SupabaseWorkOrderRepository:
|
||||
try:
|
||||
self.client.table(self.table_name).update({
|
||||
"agent_session_id": agent_session_id,
|
||||
"updated_at": datetime.now().isoformat(),
|
||||
"updated_at": datetime.now(timezone.utc).isoformat(),
|
||||
}).eq("agent_work_order_id", agent_work_order_id).execute()
|
||||
|
||||
self._logger.info(
|
||||
@@ -384,7 +384,7 @@ class SupabaseWorkOrderRepository:
|
||||
... agent_name="test-agent",
|
||||
... success=True,
|
||||
... duration_seconds=1.5,
|
||||
... timestamp=datetime.now()
|
||||
... timestamp=datetime.now(timezone.utc)
|
||||
... )
|
||||
... ]
|
||||
... )
|
||||
|
||||
@@ -5,6 +5,7 @@ These tools help identify orphaned worktrees (exist on filesystem but not in dat
|
||||
and dangling state (exist in database but worktree deleted).
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -139,11 +140,47 @@ async def reconcile_state(
|
||||
|
||||
if fix:
|
||||
# Clean up orphaned worktrees
|
||||
worktree_base = Path(config.WORKTREE_BASE_DIR)
|
||||
base_dir_resolved = os.path.abspath(os.path.normpath(str(worktree_base)))
|
||||
|
||||
for orphan_path in orphans:
|
||||
try:
|
||||
# Safety check: verify orphan_path is inside worktree base directory
|
||||
orphan_path_resolved = os.path.abspath(os.path.normpath(orphan_path))
|
||||
|
||||
# Verify path is within base directory and not the base directory itself
|
||||
try:
|
||||
common_path = os.path.commonpath([base_dir_resolved, orphan_path_resolved])
|
||||
is_inside_base = common_path == base_dir_resolved
|
||||
is_not_base = orphan_path_resolved != base_dir_resolved
|
||||
# Check if path is a root directory (Unix / or Windows drive root like C:\)
|
||||
path_obj = Path(orphan_path_resolved)
|
||||
is_not_root = not (
|
||||
orphan_path_resolved in ("/", "\\") or
|
||||
(os.name == "nt" and len(path_obj.parts) == 2 and path_obj.parts[1] == "")
|
||||
)
|
||||
except ValueError:
|
||||
# commonpath raises ValueError if paths are on different drives (Windows)
|
||||
is_inside_base = False
|
||||
is_not_base = True
|
||||
is_not_root = True
|
||||
|
||||
if is_inside_base and is_not_base and is_not_root:
|
||||
shutil.rmtree(orphan_path)
|
||||
actions.append(f"Deleted orphaned worktree: {orphan_path}")
|
||||
logger.info("orphaned_worktree_deleted", path=orphan_path)
|
||||
else:
|
||||
# Safety check failed - do not delete
|
||||
actions.append(f"Skipped deletion of {orphan_path} (safety check failed: outside worktree base or invalid path)")
|
||||
logger.error(
|
||||
"orphaned_worktree_deletion_skipped_safety_check_failed",
|
||||
path=orphan_path,
|
||||
path_resolved=orphan_path_resolved,
|
||||
base_dir=base_dir_resolved,
|
||||
is_inside_base=is_inside_base,
|
||||
is_not_base=is_not_base,
|
||||
is_not_root=is_not_root,
|
||||
)
|
||||
except Exception as e:
|
||||
actions.append(f"Failed to delete {orphan_path}: {e}")
|
||||
logger.error("orphaned_worktree_delete_failed", path=orphan_path, error=str(e), exc_info=True)
|
||||
|
||||
@@ -70,7 +70,7 @@ class WorkflowOrchestrator:
|
||||
"""
|
||||
# Default commands if not provided
|
||||
if selected_commands is None:
|
||||
selected_commands = ["create-branch", "planning", "execute", "commit", "create-pr"]
|
||||
selected_commands = ["create-branch", "planning", "execute", "prp-review", "commit", "create-pr"]
|
||||
|
||||
# Bind work order context for structured logging
|
||||
bind_work_order_context(agent_work_order_id)
|
||||
@@ -198,43 +198,30 @@ class WorkflowOrchestrator:
|
||||
agent_work_order_id, result.output or ""
|
||||
)
|
||||
elif command_name == "create-pr":
|
||||
# Calculate git stats before marking as completed
|
||||
# Branch name is stored in context from create-branch step
|
||||
branch_name = context.get("create-branch")
|
||||
git_stats = await self._calculate_git_stats(
|
||||
branch_name,
|
||||
sandbox.working_dir
|
||||
)
|
||||
# Store PR URL for final metadata update
|
||||
context["github_pull_request_url"] = result.output
|
||||
|
||||
await self.state_repository.update_status(
|
||||
agent_work_order_id,
|
||||
AgentWorkOrderStatus.COMPLETED,
|
||||
github_pull_request_url=result.output,
|
||||
git_commit_count=git_stats["commit_count"],
|
||||
git_files_changed=git_stats["files_changed"],
|
||||
)
|
||||
# Save final step history
|
||||
await self.state_repository.save_step_history(agent_work_order_id, step_history)
|
||||
bound_logger.info(
|
||||
"agent_work_order_completed",
|
||||
total_steps=len(step_history.steps),
|
||||
git_commit_count=git_stats["commit_count"],
|
||||
git_files_changed=git_stats["files_changed"],
|
||||
)
|
||||
return # Exit early if PR created
|
||||
|
||||
# Calculate git stats for workflows that complete without PR
|
||||
# Calculate git stats and mark as completed
|
||||
branch_name = context.get("create-branch")
|
||||
completion_metadata = {}
|
||||
|
||||
if branch_name:
|
||||
git_stats = await self._calculate_git_stats(
|
||||
branch_name, sandbox.working_dir
|
||||
)
|
||||
await self.state_repository.update_status(
|
||||
agent_work_order_id,
|
||||
AgentWorkOrderStatus.COMPLETED,
|
||||
git_commit_count=git_stats["commit_count"],
|
||||
git_files_changed=git_stats["files_changed"],
|
||||
)
|
||||
completion_metadata["git_commit_count"] = git_stats["commit_count"]
|
||||
completion_metadata["git_files_changed"] = git_stats["files_changed"]
|
||||
|
||||
# Include PR URL if create-pr step was executed
|
||||
pr_url = context.get("github_pull_request_url")
|
||||
if pr_url:
|
||||
completion_metadata["github_pull_request_url"] = pr_url
|
||||
|
||||
await self.state_repository.update_status(
|
||||
agent_work_order_id,
|
||||
AgentWorkOrderStatus.COMPLETED,
|
||||
**completion_metadata
|
||||
)
|
||||
|
||||
# Save final step history
|
||||
await self.state_repository.save_step_history(agent_work_order_id, step_history)
|
||||
|
||||
Reference in New Issue
Block a user