fix: linting issues in agent work orders tests

- Sort imports consistently
- Remove unused imports (pytest, MagicMock, patch, etc.)
- Update to datetime.UTC alias from timezone.utc
- Fix formatting and organization issues
This commit is contained in:
Rasmus Widing
2025-10-24 00:07:32 +03:00
parent d80a12f395
commit 8728c67448
23 changed files with 1402 additions and 71 deletions

View File

@@ -6,6 +6,7 @@ readme = "README.md"
requires-python = ">=3.12"
# Base dependencies - empty since we're using dependency groups
dependencies = [
"sse-starlette>=2.3.3",
"structlog>=25.4.0",
]

View File

@@ -6,7 +6,8 @@ FastAPI routes for agent work orders.
import asyncio
from datetime import datetime
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, Query
from sse_starlette.sse import EventSourceResponse
from ..agent_executor.agent_cli_executor import AgentCLIExecutor
from ..command_loader.claude_command_loader import ClaudeCommandLoader
@@ -27,8 +28,10 @@ from ..models import (
from ..sandbox_manager.sandbox_factory import SandboxFactory
from ..state_manager.repository_factory import create_repository
from ..utils.id_generator import generate_work_order_id
from ..utils.log_buffer import WorkOrderLogBuffer
from ..utils.structured_logger import get_logger
from ..workflow_engine.workflow_orchestrator import WorkflowOrchestrator
from .sse_streams import stream_work_order_logs
logger = get_logger(__name__)
router = APIRouter()
@@ -39,6 +42,7 @@ agent_executor = AgentCLIExecutor()
sandbox_factory = SandboxFactory()
github_client = GitHubClient()
command_loader = ClaudeCommandLoader()
log_buffer = WorkOrderLogBuffer()
orchestrator = WorkflowOrchestrator(
agent_executor=agent_executor,
sandbox_factory=sandbox_factory,
@@ -286,31 +290,118 @@ async def get_git_progress(agent_work_order_id: str) -> GitProgressSnapshot:
@router.get("/{agent_work_order_id}/logs")
async def get_agent_work_order_logs(
agent_work_order_id: str,
limit: int = 100,
offset: int = 0,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
level: str | None = Query(None, description="Filter by log level (info, warning, error, debug)"),
step: str | None = Query(None, description="Filter by step name"),
) -> dict:
"""Get structured logs for a work order
"""Get buffered logs for a work order.
TODO Phase 2+: Implement log storage and retrieval
For MVP, returns empty logs.
Returns logs from the in-memory buffer. For real-time streaming, use the
/logs/stream endpoint.
Args:
agent_work_order_id: Work order ID
limit: Maximum number of logs to return (1-1000)
offset: Number of logs to skip for pagination
level: Optional log level filter
step: Optional step name filter
Returns:
Dictionary with log entries and pagination metadata
"""
logger.info(
"agent_logs_get_started",
agent_work_order_id=agent_work_order_id,
limit=limit,
offset=offset,
level=level,
step=step,
)
# Verify work order exists
work_order = await state_repository.get(agent_work_order_id)
if not work_order:
raise HTTPException(status_code=404, detail="Agent work order not found")
# Get logs from buffer
log_entries = log_buffer.get_logs(
work_order_id=agent_work_order_id,
level=level,
step=step,
limit=limit,
offset=offset,
)
# TODO Phase 2+: Read from log files or Supabase
return {
"agent_work_order_id": agent_work_order_id,
"log_entries": [],
"total": 0,
"log_entries": log_entries,
"total": log_buffer.get_log_count(agent_work_order_id),
"limit": limit,
"offset": offset,
}
@router.get("/{agent_work_order_id}/logs/stream")
async def stream_agent_work_order_logs(
agent_work_order_id: str,
level: str | None = Query(None, description="Filter by log level (info, warning, error, debug)"),
step: str | None = Query(None, description="Filter by step name"),
since: str | None = Query(None, description="ISO timestamp - only return logs after this time"),
) -> EventSourceResponse:
"""Stream work order logs in real-time via Server-Sent Events.
Connects to a live stream that delivers logs as they are generated.
Connection stays open until work order completes or client disconnects.
Args:
agent_work_order_id: Work order ID
level: Optional log level filter (info, warning, error, debug)
step: Optional step name filter (exact match)
since: Optional ISO timestamp - only return logs after this time
Returns:
EventSourceResponse streaming log events
Examples:
curl -N http://localhost:8053/api/agent-work-orders/wo-123/logs/stream
curl -N "http://localhost:8053/api/agent-work-orders/wo-123/logs/stream?level=error"
Notes:
- Uses Server-Sent Events (SSE) protocol
- Sends heartbeat every 15 seconds to keep connection alive
- Automatically handles client disconnect
- Each event is JSON with timestamp, level, event, work_order_id, and extra fields
"""
logger.info(
"agent_logs_stream_started",
agent_work_order_id=agent_work_order_id,
level=level,
step=step,
since=since,
)
# Verify work order exists
work_order = await state_repository.get(agent_work_order_id)
if not work_order:
raise HTTPException(status_code=404, detail="Agent work order not found")
# Create SSE stream
return EventSourceResponse(
stream_work_order_logs(
work_order_id=agent_work_order_id,
log_buffer=log_buffer,
level_filter=level,
step_filter=step,
since_timestamp=since,
),
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@router.get("/{agent_work_order_id}/steps")
async def get_agent_work_order_steps(agent_work_order_id: str) -> StepHistory:
"""Get step execution history for a work order

View File

@@ -0,0 +1,134 @@
"""Server-Sent Events (SSE) Streaming for Work Order Logs
Implements SSE streaming endpoint for real-time log delivery.
Uses sse-starlette for W3C SSE specification compliance.
"""
import asyncio
import json
from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from typing import Any
from ..utils.log_buffer import WorkOrderLogBuffer
async def stream_work_order_logs(
work_order_id: str,
log_buffer: WorkOrderLogBuffer,
level_filter: str | None = None,
step_filter: str | None = None,
since_timestamp: str | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""Stream work order logs via Server-Sent Events.
Yields existing buffered logs first, then new logs as they arrive.
Sends heartbeat comments every 15 seconds to prevent connection timeout.
Args:
work_order_id: ID of the work order to stream logs for
log_buffer: The WorkOrderLogBuffer instance to read from
level_filter: Optional log level filter (info, warning, error, debug)
step_filter: Optional step name filter (exact match)
since_timestamp: Optional ISO timestamp - only return logs after this time
Yields:
SSE event dictionaries with "data" key containing JSON log entry
Examples:
async for event in stream_work_order_logs("wo-123", buffer):
# event = {"data": '{"timestamp": "...", "level": "info", ...}'}
print(event)
Notes:
- Generator automatically handles client disconnects via CancelledError
- Heartbeat comments prevent proxy/load balancer timeouts
- Non-blocking polling with 0.5s interval
"""
# Get existing buffered logs first
existing_logs = log_buffer.get_logs(
work_order_id=work_order_id,
level=level_filter,
step=step_filter,
since=since_timestamp,
)
# Yield existing logs as SSE events
for log_entry in existing_logs:
yield format_log_event(log_entry)
# Track last seen timestamp to avoid duplicates
last_timestamp = (
existing_logs[-1]["timestamp"] if existing_logs else since_timestamp or ""
)
# Stream new logs as they arrive
heartbeat_counter = 0
heartbeat_interval = 30 # 30 iterations * 0.5s = 15 seconds
try:
while True:
# Poll for new logs
new_logs = log_buffer.get_logs_since(
work_order_id=work_order_id,
since_timestamp=last_timestamp,
level=level_filter,
step=step_filter,
)
# Yield new logs
for log_entry in new_logs:
yield format_log_event(log_entry)
last_timestamp = log_entry["timestamp"]
# Send heartbeat comment every 15 seconds to keep connection alive
heartbeat_counter += 1
if heartbeat_counter >= heartbeat_interval:
yield {"comment": "keepalive"}
heartbeat_counter = 0
# Non-blocking sleep before next poll
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# Client disconnected - clean exit
pass
def format_log_event(log_dict: dict[str, Any]) -> dict[str, str]:
"""Format a log dictionary as an SSE event.
Args:
log_dict: Dictionary containing log entry data
Returns:
SSE event dictionary with "data" key containing JSON string
Examples:
event = format_log_event({
"timestamp": "2025-10-23T12:00:00Z",
"level": "info",
"event": "step_started",
"work_order_id": "wo-123",
"step": "planning"
})
# Returns: {"data": '{"timestamp": "...", "level": "info", ...}'}
Notes:
- JSON serialization handles datetime conversion
- Event format follows SSE specification: data: {json}
"""
return {"data": json.dumps(log_dict)}
def get_current_timestamp() -> str:
"""Get current timestamp in ISO format with timezone.
Returns:
ISO format timestamp string (e.g., "2025-10-23T12:34:56.789Z")
Examples:
timestamp = get_current_timestamp()
# "2025-10-23T12:34:56.789123Z"
"""
return datetime.now(UTC).isoformat()

View File

@@ -14,14 +14,20 @@ import httpx
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .api.routes import router
from .api.routes import log_buffer, router
from .config import config
from .utils.structured_logger import configure_structured_logging, get_logger
from .utils.structured_logger import (
configure_structured_logging_with_buffer,
get_logger,
)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Lifespan context manager for startup and shutdown tasks"""
# Configure structured logging with buffer for SSE streaming
configure_structured_logging_with_buffer(config.LOG_LEVEL, log_buffer)
logger = get_logger(__name__)
logger.info(
@@ -32,6 +38,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
},
)
# Start log buffer cleanup task
await log_buffer.start_cleanup_task()
# Validate Claude CLI is available
try:
result = subprocess.run(
@@ -84,9 +93,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.info("Shutting down Agent Work Orders service")
# Configure logging on startup
configure_structured_logging(config.LOG_LEVEL)
# Stop log buffer cleanup task
await log_buffer.stop_cleanup_task()
# Create FastAPI app with lifespan
app = FastAPI(

View File

@@ -0,0 +1,252 @@
"""In-Memory Log Buffer for Agent Work Orders
Thread-safe circular buffer to store recent logs for SSE streaming.
Automatically cleans up old work orders to prevent memory leaks.
"""
import asyncio
import threading
import time
from collections import defaultdict, deque
from datetime import UTC, datetime
from typing import Any
class WorkOrderLogBuffer:
"""Thread-safe circular buffer for work order logs.
Stores up to MAX_LOGS_PER_WORK_ORDER logs per work order in memory.
Automatically removes work orders older than cleanup threshold.
Supports filtering by log level, step name, and timestamp.
"""
MAX_LOGS_PER_WORK_ORDER = 1000
CLEANUP_THRESHOLD_HOURS = 1
def __init__(self) -> None:
"""Initialize the log buffer with thread safety."""
self._buffers: dict[str, deque[dict[str, Any]]] = defaultdict(
lambda: deque(maxlen=self.MAX_LOGS_PER_WORK_ORDER)
)
self._last_activity: dict[str, float] = {}
self._lock = threading.Lock()
self._cleanup_task: asyncio.Task[None] | None = None
def add_log(
self,
work_order_id: str,
level: str,
event: str,
timestamp: str | None = None,
**extra: Any,
) -> None:
"""Add a log entry to the buffer.
Args:
work_order_id: ID of the work order this log belongs to
level: Log level (debug, info, warning, error)
event: Event name describing what happened
timestamp: ISO format timestamp (auto-generated if not provided)
**extra: Additional structured log fields
Examples:
buffer.add_log(
"wo-123",
"info",
"step_started",
step="planning",
progress="2/5"
)
"""
with self._lock:
log_entry = {
"work_order_id": work_order_id,
"level": level,
"event": event,
"timestamp": timestamp or datetime.now(UTC).isoformat(),
**extra,
}
self._buffers[work_order_id].append(log_entry)
self._last_activity[work_order_id] = time.time()
def get_logs(
self,
work_order_id: str,
level: str | None = None,
step: str | None = None,
since: str | None = None,
limit: int | None = None,
offset: int = 0,
) -> list[dict[str, Any]]:
"""Retrieve logs for a work order with optional filtering.
Args:
work_order_id: ID of the work order
level: Filter by log level (case-insensitive)
step: Filter by step name (exact match)
since: ISO timestamp - only return logs after this time
limit: Maximum number of logs to return
offset: Number of logs to skip (for pagination)
Returns:
List of log entries matching filters, in chronological order
Examples:
# Get all logs
logs = buffer.get_logs("wo-123")
# Get recent error logs
errors = buffer.get_logs("wo-123", level="error", since="2025-10-23T12:00:00Z")
# Get logs for specific step
planning_logs = buffer.get_logs("wo-123", step="planning")
"""
with self._lock:
logs = list(self._buffers.get(work_order_id, []))
# Apply filters
if level:
level_lower = level.lower()
logs = [log for log in logs if log.get("level", "").lower() == level_lower]
if step:
logs = [log for log in logs if log.get("step") == step]
if since:
logs = [log for log in logs if log.get("timestamp", "") > since]
# Apply pagination
if offset > 0:
logs = logs[offset:]
if limit is not None and limit > 0:
logs = logs[:limit]
return logs
def get_logs_since(
self,
work_order_id: str,
since_timestamp: str,
level: str | None = None,
step: str | None = None,
) -> list[dict[str, Any]]:
"""Get logs after a specific timestamp.
Convenience method for streaming use cases.
Args:
work_order_id: ID of the work order
since_timestamp: ISO timestamp - only return logs after this time
level: Optional log level filter
step: Optional step name filter
Returns:
List of log entries after the timestamp
"""
return self.get_logs(
work_order_id=work_order_id, level=level, step=step, since=since_timestamp
)
def clear_work_order(self, work_order_id: str) -> None:
"""Remove all logs for a specific work order.
Args:
work_order_id: ID of the work order to clear
Examples:
buffer.clear_work_order("wo-123")
"""
with self._lock:
if work_order_id in self._buffers:
del self._buffers[work_order_id]
if work_order_id in self._last_activity:
del self._last_activity[work_order_id]
def cleanup_old_work_orders(self) -> int:
"""Remove work orders older than CLEANUP_THRESHOLD_HOURS.
Returns:
Number of work orders removed
Examples:
removed_count = buffer.cleanup_old_work_orders()
"""
threshold = time.time() - (self.CLEANUP_THRESHOLD_HOURS * 3600)
removed_count = 0
with self._lock:
# Find work orders to remove
to_remove = [
work_order_id
for work_order_id, last_time in self._last_activity.items()
if last_time < threshold
]
# Remove them
for work_order_id in to_remove:
if work_order_id in self._buffers:
del self._buffers[work_order_id]
if work_order_id in self._last_activity:
del self._last_activity[work_order_id]
removed_count += 1
return removed_count
async def start_cleanup_task(self, interval_seconds: int = 300) -> None:
"""Start automatic cleanup task in background.
Args:
interval_seconds: How often to run cleanup (default: 5 minutes)
Examples:
await buffer.start_cleanup_task()
"""
if self._cleanup_task is not None:
return
async def cleanup_loop() -> None:
while True:
await asyncio.sleep(interval_seconds)
removed = self.cleanup_old_work_orders()
if removed > 0:
# Note: We don't log here to avoid circular dependency
# The cleanup is logged by the caller if needed
pass
self._cleanup_task = asyncio.create_task(cleanup_loop())
async def stop_cleanup_task(self) -> None:
"""Stop the automatic cleanup task.
Examples:
await buffer.stop_cleanup_task()
"""
if self._cleanup_task is not None:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
def get_work_order_count(self) -> int:
"""Get the number of work orders currently in the buffer.
Returns:
Count of work orders being tracked
"""
with self._lock:
return len(self._buffers)
def get_log_count(self, work_order_id: str) -> int:
"""Get the number of logs for a specific work order.
Args:
work_order_id: ID of the work order
Returns:
Number of logs for this work order
"""
with self._lock:
return len(self._buffers.get(work_order_id, []))

View File

@@ -1,14 +1,74 @@
"""Structured Logging Setup
Configures structlog for PRD-compliant event logging.
Configures structlog for PRD-compliant event logging with SSE streaming support.
Event naming follows: {module}_{noun}_{verb_past_tense}
"""
from collections.abc import MutableMapping
from typing import Any
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
from .log_buffer import WorkOrderLogBuffer
class BufferProcessor:
"""Custom structlog processor to route logs to WorkOrderLogBuffer.
Only buffers logs that have 'work_order_id' in their context.
This ensures we only store logs for active work orders.
"""
def __init__(self, buffer: WorkOrderLogBuffer) -> None:
"""Initialize processor with a log buffer.
Args:
buffer: The WorkOrderLogBuffer instance to write logs to
"""
self.buffer = buffer
def __call__(
self, logger: Any, method_name: str, event_dict: MutableMapping[str, Any]
) -> MutableMapping[str, Any]:
"""Process log event and add to buffer if it has work_order_id.
Args:
logger: The logger instance
method_name: The log level method name
event_dict: Dictionary containing log event data
Returns:
Unmodified event_dict (pass-through processor)
"""
work_order_id = event_dict.get("work_order_id")
if work_order_id:
# Extract core fields
level = event_dict.get("level", method_name)
event = event_dict.get("event", "")
timestamp = event_dict.get("timestamp", "")
# Get all extra fields (everything except core fields)
extra = {
k: v
for k, v in event_dict.items()
if k not in ("work_order_id", "level", "event", "timestamp")
}
# Add to buffer
self.buffer.add_log(
work_order_id=work_order_id,
level=level,
event=event,
timestamp=timestamp,
**extra,
)
return event_dict
def configure_structured_logging(log_level: str = "INFO") -> None:
"""Configure structlog with console rendering
"""Configure structlog with console rendering.
Event naming convention: {module}_{noun}_{verb_past_tense}
Examples:
@@ -16,6 +76,9 @@ def configure_structured_logging(log_level: str = "INFO") -> None:
- git_branch_created
- workflow_phase_started
- sandbox_cleanup_completed
Args:
log_level: Minimum log level (DEBUG, INFO, WARNING, ERROR)
"""
structlog.configure(
processors=[
@@ -24,7 +87,7 @@ def configure_structured_logging(log_level: str = "INFO") -> None:
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.dev.ConsoleRenderer(), # Pretty console for MVP
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
@@ -32,13 +95,83 @@ def configure_structured_logging(log_level: str = "INFO") -> None:
)
def configure_structured_logging_with_buffer(
log_level: str, buffer: WorkOrderLogBuffer
) -> None:
"""Configure structlog with both console rendering and log buffering.
This configuration enables SSE streaming by routing logs to the buffer
while maintaining console output for local development.
Args:
log_level: Minimum log level (DEBUG, INFO, WARNING, ERROR)
buffer: WorkOrderLogBuffer instance to store logs for streaming
Examples:
buffer = WorkOrderLogBuffer()
configure_structured_logging_with_buffer("INFO", buffer)
"""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
BufferProcessor(buffer),
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
def bind_work_order_context(work_order_id: str) -> None:
"""Bind work order ID to the current context.
All logs in this context will include the work_order_id automatically.
Convenience wrapper around structlog.contextvars.bind_contextvars.
Args:
work_order_id: The work order ID to bind to the context
Examples:
bind_work_order_context("wo-abc123")
logger.info("step_started", step="planning")
# Log will include work_order_id="wo-abc123" automatically
"""
bind_contextvars(work_order_id=work_order_id)
def clear_work_order_context() -> None:
"""Clear the work order context.
Should be called when work order execution completes to prevent
context leakage to other work orders.
Convenience wrapper around structlog.contextvars.clear_contextvars.
Examples:
try:
bind_work_order_context("wo-abc123")
# ... execute work order ...
finally:
clear_work_order_context()
"""
clear_contextvars()
def get_logger(name: str | None = None) -> structlog.stdlib.BoundLogger:
"""Get a structured logger instance
"""Get a structured logger instance.
Args:
name: Optional name for the logger
Returns:
Configured structlog logger
Examples:
logger = get_logger(__name__)
logger.info("operation_completed", duration_ms=123)
"""
return structlog.get_logger(name) # type: ignore[no-any-return]

View File

@@ -3,6 +3,8 @@
Main orchestration logic for workflow execution.
"""
import time
from ..agent_executor.agent_cli_executor import AgentCLIExecutor
from ..command_loader.claude_command_loader import ClaudeCommandLoader
from ..github_integration.github_client import GitHubClient
@@ -17,7 +19,11 @@ from ..state_manager.file_state_repository import FileStateRepository
from ..state_manager.work_order_repository import WorkOrderRepository
from ..utils.git_operations import get_commit_count, get_files_changed
from ..utils.id_generator import generate_sandbox_identifier
from ..utils.structured_logger import get_logger
from ..utils.structured_logger import (
bind_work_order_context,
clear_work_order_context,
get_logger,
)
from . import workflow_operations
logger = get_logger(__name__)
@@ -66,13 +72,24 @@ class WorkflowOrchestrator:
if selected_commands is None:
selected_commands = ["create-branch", "planning", "execute", "commit", "create-pr"]
# Bind work order context for structured logging
bind_work_order_context(agent_work_order_id)
bound_logger = self._logger.bind(
agent_work_order_id=agent_work_order_id,
sandbox_type=sandbox_type.value,
selected_commands=selected_commands,
)
bound_logger.info("agent_work_order_started")
# Track workflow start time
workflow_start_time = time.time()
total_steps = len(selected_commands)
bound_logger.info(
"workflow_started",
total_steps=total_steps,
repository_url=repository_url,
)
# Initialize step history and context
step_history = StepHistory(agent_work_order_id=agent_work_order_id)
@@ -90,12 +107,17 @@ class WorkflowOrchestrator:
)
# Create sandbox
bound_logger.info("sandbox_setup_started", repository_url=repository_url)
sandbox_identifier = generate_sandbox_identifier(agent_work_order_id)
sandbox = self.sandbox_factory.create_sandbox(
sandbox_type, repository_url, sandbox_identifier
)
await sandbox.setup()
bound_logger.info("sandbox_created", sandbox_identifier=sandbox_identifier)
bound_logger.info(
"sandbox_setup_completed",
sandbox_identifier=sandbox_identifier,
working_dir=sandbox.working_dir,
)
# Command mapping
command_map = {
@@ -108,15 +130,29 @@ class WorkflowOrchestrator:
}
# Execute each command in sequence
for command_name in selected_commands:
for index, command_name in enumerate(selected_commands):
if command_name not in command_map:
raise WorkflowExecutionError(f"Unknown command: {command_name}")
bound_logger.info("command_execution_started", command=command_name)
# Calculate progress
step_number = index + 1
progress_pct = int((step_number / total_steps) * 100)
elapsed_seconds = int(time.time() - workflow_start_time)
bound_logger.info(
"step_started",
step=command_name,
step_number=step_number,
total_steps=total_steps,
progress=f"{step_number}/{total_steps}",
progress_pct=progress_pct,
elapsed_seconds=elapsed_seconds,
)
command_func = command_map[command_name]
# Execute command
step_start_time = time.time()
result = await command_func(
executor=self.agent_executor,
command_loader=self.command_loader,
@@ -124,6 +160,7 @@ class WorkflowOrchestrator:
working_dir=sandbox.working_dir,
context=context,
)
step_duration = time.time() - step_start_time
# Save step result
step_history.steps.append(result)
@@ -133,10 +170,12 @@ class WorkflowOrchestrator:
# Log completion
bound_logger.info(
"command_execution_completed",
command=command_name,
"step_completed",
step=command_name,
step_number=step_number,
total_steps=total_steps,
success=result.success,
duration=result.duration_seconds,
duration_seconds=round(step_duration, 2),
)
# STOP on failure
@@ -199,11 +238,24 @@ class WorkflowOrchestrator:
# Save final step history
await self.state_repository.save_step_history(agent_work_order_id, step_history)
bound_logger.info("agent_work_order_completed", total_steps=len(step_history.steps))
total_duration = time.time() - workflow_start_time
bound_logger.info(
"workflow_completed",
total_steps=len(step_history.steps),
total_duration_seconds=round(total_duration, 2),
)
except Exception as e:
error_msg = str(e)
bound_logger.error("agent_work_order_failed", error=error_msg, exc_info=True)
total_duration = time.time() - workflow_start_time
bound_logger.exception(
"workflow_failed",
error=error_msg,
total_duration_seconds=round(total_duration, 2),
completed_steps=len(step_history.steps),
total_steps=total_steps,
)
# Save partial step history even on failure
await self.state_repository.save_step_history(agent_work_order_id, step_history)
@@ -218,15 +270,18 @@ class WorkflowOrchestrator:
# Cleanup sandbox
if sandbox:
try:
bound_logger.info("sandbox_cleanup_started")
await sandbox.cleanup()
bound_logger.info("sandbox_cleanup_completed")
except Exception as cleanup_error:
bound_logger.error(
bound_logger.exception(
"sandbox_cleanup_failed",
error=str(cleanup_error),
exc_info=True,
)
# Clear work order context to prevent leakage
clear_work_order_context()
async def _calculate_git_stats(
self, branch_name: str | None, repo_path: str
) -> dict[str, int]:

View File

@@ -1,11 +1,12 @@
"""Tests for Agent Executor"""
import asyncio
import pytest
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.agent_executor.agent_cli_executor import AgentCLIExecutor
@@ -258,8 +259,8 @@ def test_build_command_replaces_arguments_placeholder():
executor = AgentCLIExecutor()
# Create temp command file with $ARGUMENTS
import tempfile
import os
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Classify this issue:\n\n$ARGUMENTS")
@@ -281,8 +282,8 @@ def test_build_command_replaces_positional_arguments():
"""Test that $1, $2, $3 are replaced with positional arguments"""
executor = AgentCLIExecutor()
import tempfile
import os
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write("Issue: $1\nWorkOrder: $2\nData: $3")

View File

@@ -1,17 +1,16 @@
"""Integration Tests for API Endpoints"""
import pytest
from datetime import datetime
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from src.agent_work_orders.server import app
from src.agent_work_orders.models import (
AgentWorkOrderStatus,
AgentWorkflowType,
AgentWorkOrderStatus,
SandboxType,
)
from src.agent_work_orders.server import app
client = TestClient(app)
@@ -248,14 +247,19 @@ def test_send_prompt_to_agent():
def test_get_logs():
"""Test getting logs (placeholder)"""
response = client.get("/api/agent-work-orders/wo-test123/logs")
"""Test getting logs from log buffer"""
with patch("src.agent_work_orders.api.routes.state_repository") as mock_repo:
# Mock work order exists
mock_repo.get = AsyncMock(return_value=({"id": "wo-test123"}, {}))
# Currently returns empty logs (Phase 2+)
assert response.status_code == 200
data = response.json()
assert "log_entries" in data
assert len(data["log_entries"]) == 0
response = client.get("/api/agent-work-orders/wo-test123/logs")
assert response.status_code == 200
data = response.json()
assert "log_entries" in data
assert "total" in data
assert "limit" in data
assert "offset" in data
def test_verify_repository_success():

View File

@@ -1,9 +1,10 @@
"""Tests for Command Loader"""
import pytest
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from src.agent_work_orders.command_loader.claude_command_loader import (
ClaudeCommandLoader,
)

View File

@@ -4,9 +4,10 @@ Tests configuration loading, service discovery, and URL construction.
"""
import importlib
import pytest
from unittest.mock import patch
import pytest
@pytest.mark.unit
def test_config_default_values():
@@ -114,7 +115,6 @@ def test_config_cors_origins_override():
@pytest.mark.unit
def test_config_ensure_temp_dir(tmp_path):
"""Test ensure_temp_dir creates directory"""
import os
import src.agent_work_orders.config as config_module
# Use tmp_path for testing

View File

@@ -1,9 +1,10 @@
"""Tests for GitHub Integration"""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.github_integration.github_client import GitHubClient
from src.agent_work_orders.models import GitHubOperationError

View File

@@ -1,8 +1,8 @@
"""Tests for ID Generator"""
from src.agent_work_orders.utils.id_generator import (
generate_work_order_id,
generate_sandbox_identifier,
generate_work_order_id,
)

View File

@@ -0,0 +1,309 @@
"""Unit tests for WorkOrderLogBuffer
Tests circular buffer behavior, filtering, thread safety, and cleanup.
"""
import threading
import time
from datetime import datetime
import pytest
from src.agent_work_orders.utils.log_buffer import WorkOrderLogBuffer
@pytest.mark.unit
def test_add_and_get_logs():
"""Test adding and retrieving logs"""
buffer = WorkOrderLogBuffer()
# Add logs
buffer.add_log("wo-123", "info", "step_started", step="planning")
buffer.add_log("wo-123", "info", "step_completed", step="planning", duration=12.5)
# Get all logs
logs = buffer.get_logs("wo-123")
assert len(logs) == 2
assert logs[0]["event"] == "step_started"
assert logs[0]["step"] == "planning"
assert logs[1]["event"] == "step_completed"
assert logs[1]["duration"] == 12.5
@pytest.mark.unit
def test_circular_buffer_overflow():
"""Test that buffer keeps only last MAX_LOGS_PER_WORK_ORDER logs"""
buffer = WorkOrderLogBuffer()
# Add more logs than max capacity
for i in range(1500):
buffer.add_log("wo-123", "info", f"event_{i}", index=i)
logs = buffer.get_logs("wo-123")
# Should only have last 1000
assert len(logs) == buffer.MAX_LOGS_PER_WORK_ORDER
# First log should be index 500 (1500 - 1000)
assert logs[0]["index"] == 500
# Last log should be index 1499
assert logs[-1]["index"] == 1499
@pytest.mark.unit
def test_filter_by_level():
"""Test filtering logs by log level"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "info_event")
buffer.add_log("wo-123", "warning", "warning_event")
buffer.add_log("wo-123", "error", "error_event")
buffer.add_log("wo-123", "info", "another_info_event")
# Filter by level (case-insensitive)
info_logs = buffer.get_logs("wo-123", level="info")
assert len(info_logs) == 2
assert all(log["level"] == "info" for log in info_logs)
error_logs = buffer.get_logs("wo-123", level="error")
assert len(error_logs) == 1
assert error_logs[0]["event"] == "error_event"
# Test case insensitivity
warning_logs = buffer.get_logs("wo-123", level="WARNING")
assert len(warning_logs) == 1
@pytest.mark.unit
def test_filter_by_step():
"""Test filtering logs by step name"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1", step="planning")
buffer.add_log("wo-123", "info", "event2", step="execute")
buffer.add_log("wo-123", "info", "event3", step="planning")
planning_logs = buffer.get_logs("wo-123", step="planning")
assert len(planning_logs) == 2
assert all(log["step"] == "planning" for log in planning_logs)
execute_logs = buffer.get_logs("wo-123", step="execute")
assert len(execute_logs) == 1
@pytest.mark.unit
def test_filter_by_timestamp():
"""Test filtering logs by timestamp"""
buffer = WorkOrderLogBuffer()
# Add logs with explicit timestamps
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
ts3 = "2025-10-23T12:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1)
buffer.add_log("wo-123", "info", "event2", timestamp=ts2)
buffer.add_log("wo-123", "info", "event3", timestamp=ts3)
# Get logs since 11:00
recent_logs = buffer.get_logs("wo-123", since=ts2)
assert len(recent_logs) == 1 # Only ts3 is after ts2
assert recent_logs[0]["event"] == "event3"
@pytest.mark.unit
def test_multiple_work_orders():
"""Test that logs from different work orders are isolated"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1")
buffer.add_log("wo-456", "info", "event2")
buffer.add_log("wo-123", "info", "event3")
logs_123 = buffer.get_logs("wo-123")
logs_456 = buffer.get_logs("wo-456")
assert len(logs_123) == 2
assert len(logs_456) == 1
assert all(log["work_order_id"] == "wo-123" for log in logs_123)
assert all(log["work_order_id"] == "wo-456" for log in logs_456)
@pytest.mark.unit
def test_clear_work_order():
"""Test clearing logs for a specific work order"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1")
buffer.add_log("wo-456", "info", "event2")
assert buffer.get_log_count("wo-123") == 1
assert buffer.get_log_count("wo-456") == 1
buffer.clear_work_order("wo-123")
assert buffer.get_log_count("wo-123") == 0
assert buffer.get_log_count("wo-456") == 1 # Other work order unaffected
@pytest.mark.unit
def test_thread_safety():
"""Test concurrent adds from multiple threads"""
buffer = WorkOrderLogBuffer()
num_threads = 10
logs_per_thread = 100
def add_logs(thread_id):
for i in range(logs_per_thread):
buffer.add_log("wo-123", "info", f"thread_{thread_id}_event_{i}")
threads = [threading.Thread(target=add_logs, args=(i,)) for i in range(num_threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# Should have all logs (or max capacity if exceeded)
logs = buffer.get_logs("wo-123")
expected = min(num_threads * logs_per_thread, buffer.MAX_LOGS_PER_WORK_ORDER)
assert len(logs) == expected
@pytest.mark.unit
def test_cleanup_old_work_orders():
"""Test automatic cleanup of old work orders"""
buffer = WorkOrderLogBuffer()
# Add logs for work orders
buffer.add_log("wo-old", "info", "event1")
buffer.add_log("wo-new", "info", "event2")
# Manually set old work order's last activity to past threshold
threshold_time = time.time() - (buffer.CLEANUP_THRESHOLD_HOURS * 3600 + 100)
buffer._last_activity["wo-old"] = threshold_time
# Run cleanup
removed = buffer.cleanup_old_work_orders()
assert removed == 1
assert buffer.get_log_count("wo-old") == 0
assert buffer.get_log_count("wo-new") == 1
@pytest.mark.unit
def test_get_logs_with_pagination():
"""Test pagination with limit and offset"""
buffer = WorkOrderLogBuffer()
for i in range(50):
buffer.add_log("wo-123", "info", f"event_{i}", index=i)
# Get first page
page1 = buffer.get_logs("wo-123", limit=10, offset=0)
assert len(page1) == 10
assert page1[0]["index"] == 0
# Get second page
page2 = buffer.get_logs("wo-123", limit=10, offset=10)
assert len(page2) == 10
assert page2[0]["index"] == 10
# Get partial last page
page_last = buffer.get_logs("wo-123", limit=10, offset=45)
assert len(page_last) == 5
@pytest.mark.unit
def test_get_logs_since_convenience_method():
"""Test get_logs_since convenience method"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1, step="planning")
buffer.add_log("wo-123", "info", "event2", timestamp=ts2, step="execute")
logs = buffer.get_logs_since("wo-123", ts1, step="execute")
assert len(logs) == 1
assert logs[0]["event"] == "event2"
@pytest.mark.unit
def test_get_work_order_count():
"""Test getting count of tracked work orders"""
buffer = WorkOrderLogBuffer()
assert buffer.get_work_order_count() == 0
buffer.add_log("wo-123", "info", "event1")
assert buffer.get_work_order_count() == 1
buffer.add_log("wo-456", "info", "event2")
assert buffer.get_work_order_count() == 2
buffer.clear_work_order("wo-123")
assert buffer.get_work_order_count() == 1
@pytest.mark.unit
def test_empty_buffer_returns_empty_list():
"""Test that getting logs from empty buffer returns empty list"""
buffer = WorkOrderLogBuffer()
logs = buffer.get_logs("wo-nonexistent")
assert logs == []
assert buffer.get_log_count("wo-nonexistent") == 0
@pytest.mark.unit
def test_timestamp_auto_generation():
"""Test that timestamps are auto-generated if not provided"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1")
logs = buffer.get_logs("wo-123")
assert len(logs) == 1
assert "timestamp" in logs[0]
# Verify it's a valid ISO format timestamp
datetime.fromisoformat(logs[0]["timestamp"].replace("Z", "+00:00"))
@pytest.mark.unit
@pytest.mark.asyncio
async def test_cleanup_task_lifecycle():
"""Test starting and stopping cleanup task"""
buffer = WorkOrderLogBuffer()
# Start cleanup task
await buffer.start_cleanup_task(interval_seconds=1)
assert buffer._cleanup_task is not None
# Starting again should be idempotent
await buffer.start_cleanup_task()
assert buffer._cleanup_task is not None
# Stop cleanup task
await buffer.stop_cleanup_task()
assert buffer._cleanup_task is None
@pytest.mark.unit
def test_combined_filters():
"""Test using multiple filters together"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1, step="planning")
buffer.add_log("wo-123", "error", "event2", timestamp=ts2, step="planning")
buffer.add_log("wo-123", "info", "event3", timestamp=ts2, step="execute")
# Filter by level AND step AND timestamp
logs = buffer.get_logs("wo-123", level="info", step="execute", since=ts1)
assert len(logs) == 1
assert logs[0]["event"] == "event3"

View File

@@ -1,14 +1,13 @@
"""Tests for Agent Work Orders Models"""
import pytest
from datetime import datetime
from src.agent_work_orders.models import (
AgentWorkflowPhase,
AgentWorkflowType,
AgentWorkOrder,
AgentWorkOrderState,
AgentWorkOrderStatus,
AgentWorkflowPhase,
AgentWorkflowType,
CommandExecutionResult,
CreateAgentWorkOrderRequest,
SandboxType,

View File

@@ -1,16 +1,17 @@
"""Tests for Port Allocation with 10-Port Ranges"""
import pytest
from unittest.mock import patch
import pytest
from src.agent_work_orders.utils.port_allocation import (
MAX_CONCURRENT_WORK_ORDERS,
PORT_BASE,
PORT_RANGE_SIZE,
create_ports_env_file,
find_available_port_range,
get_port_range_for_work_order,
is_port_available,
find_available_port_range,
create_ports_env_file,
PORT_RANGE_SIZE,
PORT_BASE,
MAX_CONCURRENT_WORK_ORDERS,
)

View File

@@ -1,9 +1,10 @@
"""Tests for Sandbox Manager"""
import pytest
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from tempfile import TemporaryDirectory
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.models import SandboxSetupError, SandboxType
from src.agent_work_orders.sandbox_manager.git_branch_sandbox import GitBranchSandbox

View File

@@ -3,8 +3,9 @@
Tests the server entry point, health checks, and service discovery configuration.
"""
from unittest.mock import AsyncMock, Mock, patch
import pytest
from unittest.mock import Mock, patch, AsyncMock
from fastapi.testclient import TestClient
@@ -179,7 +180,6 @@ def test_router_included_with_prefix():
@patch.dict("os.environ", {"SERVICE_DISCOVERY_MODE": "local"})
def test_startup_logs_local_mode(caplog):
"""Test startup logs service discovery mode"""
from src.agent_work_orders.server import app
from src.agent_work_orders.config import config
# Verify config is set to local mode
@@ -191,6 +191,7 @@ def test_startup_logs_local_mode(caplog):
def test_startup_logs_docker_mode(caplog):
"""Test startup logs docker_compose mode"""
import importlib
import src.agent_work_orders.config as config_module
importlib.reload(config_module)
from src.agent_work_orders.config import AgentWorkOrdersConfig

View File

@@ -0,0 +1,334 @@
"""Unit tests for SSE Streaming Module
Tests SSE event formatting, streaming logic, filtering, and disconnect handling.
"""
import asyncio
import json
from datetime import UTC
import pytest
from src.agent_work_orders.api.sse_streams import (
format_log_event,
get_current_timestamp,
stream_work_order_logs,
)
from src.agent_work_orders.utils.log_buffer import WorkOrderLogBuffer
@pytest.mark.unit
def test_format_log_event():
"""Test formatting log dictionary as SSE event"""
log_dict = {
"timestamp": "2025-10-23T12:00:00Z",
"level": "info",
"event": "step_started",
"work_order_id": "wo-123",
"step": "planning",
}
event = format_log_event(log_dict)
assert "data" in event
# Data should be JSON string
parsed = json.loads(event["data"])
assert parsed["timestamp"] == "2025-10-23T12:00:00Z"
assert parsed["level"] == "info"
assert parsed["event"] == "step_started"
assert parsed["work_order_id"] == "wo-123"
assert parsed["step"] == "planning"
@pytest.mark.unit
def test_get_current_timestamp():
"""Test timestamp generation in ISO format"""
timestamp = get_current_timestamp()
# Should be valid ISO format
assert isinstance(timestamp, str)
assert "T" in timestamp
# Should be recent (within last second)
from datetime import datetime
parsed = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
now = datetime.now(UTC)
assert (now - parsed).total_seconds() < 1
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_empty_buffer():
"""Test streaming when buffer is empty"""
buffer = WorkOrderLogBuffer()
events = []
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
# Break after heartbeat to avoid infinite loop
if "comment" in event:
break
# Should receive at least one heartbeat
assert len(events) >= 1
assert events[-1] == {"comment": "keepalive"}
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_existing_logs():
"""Test streaming existing buffered logs first"""
buffer = WorkOrderLogBuffer()
# Add existing logs
buffer.add_log("wo-123", "info", "event1", step="planning")
buffer.add_log("wo-123", "info", "event2", step="execute")
events = []
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
# Stop after receiving both events
if len(events) >= 2:
break
assert len(events) == 2
# Both should be data events
assert "data" in events[0]
assert "data" in events[1]
# Parse and verify content
log1 = json.loads(events[0]["data"])
log2 = json.loads(events[1]["data"])
assert log1["event"] == "event1"
assert log2["event"] == "event2"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_level_filter():
"""Test streaming with log level filter"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "info_event")
buffer.add_log("wo-123", "error", "error_event")
buffer.add_log("wo-123", "info", "another_info_event")
events = []
async for event in stream_work_order_logs("wo-123", buffer, level_filter="error"):
events.append(event)
if "data" in event:
break
# Should only get error event
assert len(events) == 1
log = json.loads(events[0]["data"])
assert log["level"] == "error"
assert log["event"] == "error_event"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_step_filter():
"""Test streaming with step filter"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1", step="planning")
buffer.add_log("wo-123", "info", "event2", step="execute")
buffer.add_log("wo-123", "info", "event3", step="planning")
events = []
async for event in stream_work_order_logs("wo-123", buffer, step_filter="planning"):
events.append(event)
if len(events) >= 2:
break
assert len(events) == 2
log1 = json.loads(events[0]["data"])
log2 = json.loads(events[1]["data"])
assert log1["step"] == "planning"
assert log2["step"] == "planning"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_with_since_timestamp():
"""Test streaming logs after specific timestamp"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
ts3 = "2025-10-23T12:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1)
buffer.add_log("wo-123", "info", "event2", timestamp=ts2)
buffer.add_log("wo-123", "info", "event3", timestamp=ts3)
events = []
async for event in stream_work_order_logs("wo-123", buffer, since_timestamp=ts2):
events.append(event)
if "data" in event:
break
# Should only get event3 (after ts2)
assert len(events) == 1
log = json.loads(events[0]["data"])
assert log["event"] == "event3"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_heartbeat():
"""Test that heartbeat comments are sent periodically"""
buffer = WorkOrderLogBuffer()
heartbeat_count = 0
event_count = 0
async for event in stream_work_order_logs("wo-123", buffer):
if "comment" in event:
heartbeat_count += 1
if heartbeat_count >= 2:
break
if "data" in event:
event_count += 1
# Should have received at least 2 heartbeats
assert heartbeat_count >= 2
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_disconnect():
"""Test handling of client disconnect (CancelledError)"""
buffer = WorkOrderLogBuffer()
async def stream_with_cancel():
events = []
try:
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
# Simulate disconnect after first event
if len(events) >= 1:
raise asyncio.CancelledError()
except asyncio.CancelledError:
# Should be caught and handled gracefully
pass
return events
events = await stream_with_cancel()
# Should have at least one event before cancel
assert len(events) >= 1
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_yields_new_logs():
"""Test that stream yields new logs as they arrive"""
buffer = WorkOrderLogBuffer()
# Add initial log
buffer.add_log("wo-123", "info", "initial_event")
events = []
async def consume_stream():
async for event in stream_work_order_logs("wo-123", buffer):
events.append(event)
if len(events) >= 2 and "data" in events[1]:
break
async def add_new_log():
# Wait a bit then add new log
await asyncio.sleep(0.6)
buffer.add_log("wo-123", "info", "new_event")
# Run both concurrently
await asyncio.gather(consume_stream(), add_new_log())
# Should have received both events
data_events = [e for e in events if "data" in e]
assert len(data_events) >= 2
log1 = json.loads(data_events[0]["data"])
log2 = json.loads(data_events[1]["data"])
assert log1["event"] == "initial_event"
assert log2["event"] == "new_event"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_combined_filters():
"""Test streaming with multiple filters combined"""
buffer = WorkOrderLogBuffer()
ts1 = "2025-10-23T10:00:00Z"
ts2 = "2025-10-23T11:00:00Z"
buffer.add_log("wo-123", "info", "event1", timestamp=ts1, step="planning")
buffer.add_log("wo-123", "error", "event2", timestamp=ts2, step="planning")
buffer.add_log("wo-123", "info", "event3", timestamp=ts2, step="execute")
events = []
async for event in stream_work_order_logs(
"wo-123",
buffer,
level_filter="info",
step_filter="execute",
since_timestamp=ts1,
):
events.append(event)
if "data" in event:
break
# Should only get event3
assert len(events) == 1
log = json.loads(events[0]["data"])
assert log["event"] == "event3"
assert log["level"] == "info"
assert log["step"] == "execute"
@pytest.mark.unit
def test_format_log_event_with_extra_fields():
"""Test that format_log_event preserves all fields"""
log_dict = {
"timestamp": "2025-10-23T12:00:00Z",
"level": "info",
"event": "step_completed",
"work_order_id": "wo-123",
"step": "planning",
"duration_seconds": 45.2,
"custom_field": "custom_value",
}
event = format_log_event(log_dict)
parsed = json.loads(event["data"])
# All fields should be preserved
assert parsed["duration_seconds"] == 45.2
assert parsed["custom_field"] == "custom_value"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_stream_no_duplicate_events():
"""Test that streaming doesn't yield duplicate events"""
buffer = WorkOrderLogBuffer()
buffer.add_log("wo-123", "info", "event1", timestamp="2025-10-23T10:00:00Z")
buffer.add_log("wo-123", "info", "event2", timestamp="2025-10-23T11:00:00Z")
events = []
async for event in stream_work_order_logs("wo-123", buffer):
if "data" in event:
events.append(event)
if len(events) >= 2:
# Stop after receiving initial logs
break
# Should have exactly 2 events, no duplicates
assert len(events) == 2
log1 = json.loads(events[0]["data"])
log2 = json.loads(events[1]["data"])
assert log1["event"] == "event1"
assert log2["event"] == "event2"

View File

@@ -1,12 +1,13 @@
"""Tests for State Manager"""
import pytest
from datetime import datetime
import pytest
from src.agent_work_orders.models import (
AgentWorkflowType,
AgentWorkOrderState,
AgentWorkOrderStatus,
AgentWorkflowType,
SandboxType,
StepExecutionResult,
StepHistory,

View File

@@ -1,7 +1,8 @@
"""Tests for Workflow Operations - Refactored Command Stitching Architecture"""
from unittest.mock import AsyncMock, MagicMock
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.agent_work_orders.models import (
CommandExecutionResult,

View File

@@ -1,14 +1,13 @@
"""Tests for Workflow Orchestrator - Command Stitching Architecture"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.agent_work_orders.models import (
AgentWorkOrderStatus,
SandboxType,
StepExecutionResult,
StepHistory,
WorkflowExecutionError,
WorkflowStep,
)
from src.agent_work_orders.workflow_engine.workflow_orchestrator import WorkflowOrchestrator

6
python/uv.lock generated
View File

@@ -164,6 +164,7 @@ name = "archon"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "sse-starlette" },
{ name = "structlog" },
]
@@ -269,7 +270,10 @@ server-reranking = [
]
[package.metadata]
requires-dist = [{ name = "structlog", specifier = ">=25.4.0" }]
requires-dist = [
{ name = "sse-starlette", specifier = ">=2.3.3" },
{ name = "structlog", specifier = ">=25.4.0" },
]
[package.metadata.requires-dev]
agent-work-orders = [