feat: add supabase persistence for agent work orders

This commit is contained in:
Rasmus Widing
2025-10-24 20:37:57 +03:00
parent 71393520dc
commit bd6613014b
10 changed files with 1335 additions and 5 deletions

View File

@@ -165,6 +165,7 @@ services:
environment:
- ENABLE_AGENT_WORK_ORDERS=true
- SERVICE_DISCOVERY_MODE=docker_compose
- STATE_STORAGE_TYPE=supabase
- ARCHON_SERVER_URL=http://archon-server:${ARCHON_SERVER_PORT:-8181}
- ARCHON_MCP_URL=http://archon-mcp:${ARCHON_MCP_PORT:-8051}
- SUPABASE_URL=${SUPABASE_URL}

View File

@@ -0,0 +1,135 @@
# Agent Work Orders Database Migrations
This document describes the database migrations for the Agent Work Orders feature.
## Overview
Agent Work Orders is an optional microservice that executes agent-based workflows using Claude Code CLI. These migrations set up the required database tables for the feature.
## Prerequisites
- Supabase project with the same credentials as main Archon server
- `SUPABASE_URL` and `SUPABASE_SERVICE_KEY` environment variables configured
## Migrations
### 1. `agent_work_orders_repositories.sql`
**Purpose**: Configure GitHub repositories for agent work orders
**Creates**:
- `archon_configured_repositories` table for storing repository configurations
- Indexes for fast repository lookups
- RLS policies for access control
- Validation constraints for repository URLs
**When to run**: Before using the repository configuration feature
**Usage**:
```bash
# Open Supabase dashboard → SQL Editor
# Copy and paste the entire migration file
# Execute
```
### 2. `agent_work_orders_state.sql`
**Purpose**: Persistent state management for agent work orders
**Creates**:
- `archon_agent_work_orders` - Main work order state and metadata table
- `archon_agent_work_order_steps` - Step execution history with foreign key constraints
- Indexes for fast queries (status, repository_url, created_at)
- Database triggers for automatic timestamp management
- RLS policies for service and authenticated access
**Features**:
- ACID guarantees for concurrent work order execution
- Foreign key CASCADE delete (steps deleted when work order deleted)
- Hybrid schema (frequently queried columns + JSONB for flexible metadata)
- Automatic `updated_at` timestamp management
**When to run**: To enable Supabase-backed persistent storage for agent work orders
**Usage**:
```bash
# Open Supabase dashboard → SQL Editor
# Copy and paste the entire migration file
# Execute
```
**Verification**:
```sql
-- Check tables exist
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'archon_agent_work_order%';
-- Verify indexes
SELECT tablename, indexname FROM pg_indexes
WHERE tablename LIKE 'archon_agent_work_order%'
ORDER BY tablename, indexname;
```
## Configuration
After applying migrations, configure the agent work orders service:
```bash
# Set environment variable
export STATE_STORAGE_TYPE=supabase
# Restart the service
docker compose restart archon-agent-work-orders
# OR
make agent-work-orders
```
## Health Check
Verify the configuration:
```bash
curl http://localhost:8053/health | jq '{storage_type, database}'
```
Expected response:
```json
{
"storage_type": "supabase",
"database": {
"status": "healthy",
"tables_exist": true
}
}
```
## Storage Options
Agent Work Orders supports three storage backends:
1. **Memory** (`STATE_STORAGE_TYPE=memory`) - Default, no persistence
2. **File** (`STATE_STORAGE_TYPE=file`) - Legacy file-based storage
3. **Supabase** (`STATE_STORAGE_TYPE=supabase`) - **Recommended for production**
## Rollback
To remove the agent work orders state tables:
```sql
-- Drop tables (CASCADE will also drop indexes, triggers, and policies)
DROP TABLE IF EXISTS archon_agent_work_order_steps CASCADE;
DROP TABLE IF EXISTS archon_agent_work_orders CASCADE;
```
**Note**: The `update_updated_at_column()` function is shared with other Archon tables and should NOT be dropped.
## Documentation
For detailed setup instructions, see:
- `python/src/agent_work_orders/README.md` - Service configuration guide and migration instructions
## Migration History
- **agent_work_orders_repositories.sql** - Initial repository configuration support
- **agent_work_orders_state.sql** - Supabase persistence migration (replaces file-based storage)

View File

@@ -0,0 +1,356 @@
-- =====================================================
-- Agent Work Orders - State Management
-- =====================================================
-- This migration creates tables for agent work order state persistence
-- in PostgreSQL, replacing file-based JSON storage with ACID-compliant
-- database backend.
--
-- Features:
-- - Atomic state updates with ACID guarantees
-- - Row-level locking for concurrent access control
-- - Foreign key constraints for referential integrity
-- - Indexes for fast queries by status, repository, and timestamp
-- - JSONB metadata for flexible storage
-- - Automatic timestamp management via triggers
-- - Step execution history with ordering
--
-- Run this in your Supabase SQL Editor
-- =====================================================
-- =====================================================
-- SECTION 1: CREATE TABLES
-- =====================================================
-- Create archon_agent_work_orders table
CREATE TABLE IF NOT EXISTS archon_agent_work_orders (
-- Primary identification (TEXT not UUID since generated by id_generator.py)
agent_work_order_id TEXT PRIMARY KEY,
-- Core state fields (frequently queried as separate columns)
repository_url TEXT NOT NULL,
sandbox_identifier TEXT NOT NULL,
git_branch_name TEXT,
agent_session_id TEXT,
status TEXT NOT NULL CHECK (status IN ('pending', 'running', 'completed', 'failed')),
-- Flexible metadata (JSONB for infrequently queried fields)
-- Stores: sandbox_type, github_issue_number, current_phase, error_message, etc.
metadata JSONB DEFAULT '{}'::jsonb,
-- Timestamps (automatically managed)
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create archon_agent_work_order_steps table
-- Stores step execution history with foreign key to work orders
CREATE TABLE IF NOT EXISTS archon_agent_work_order_steps (
-- Primary identification
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Foreign key to work order (CASCADE delete when work order deleted)
agent_work_order_id TEXT NOT NULL REFERENCES archon_agent_work_orders(agent_work_order_id) ON DELETE CASCADE,
-- Step execution details
step TEXT NOT NULL, -- WorkflowStep enum value (e.g., "create-branch", "planning")
agent_name TEXT NOT NULL, -- Name of agent that executed step
success BOOLEAN NOT NULL, -- Whether step succeeded
output TEXT, -- Step output (nullable)
error_message TEXT, -- Error message if failed (nullable)
duration_seconds FLOAT NOT NULL, -- Execution duration
session_id TEXT, -- Agent session ID (nullable)
executed_at TIMESTAMP WITH TIME ZONE NOT NULL, -- When step was executed
step_order INT NOT NULL -- Order within work order (0-indexed for sorting)
);
-- =====================================================
-- SECTION 2: CREATE INDEXES
-- =====================================================
-- Indexes on archon_agent_work_orders for common queries
-- Index on status for filtering by work order status
CREATE INDEX IF NOT EXISTS idx_agent_work_orders_status
ON archon_agent_work_orders(status);
-- Index on created_at for ordering by most recent
CREATE INDEX IF NOT EXISTS idx_agent_work_orders_created_at
ON archon_agent_work_orders(created_at DESC);
-- Index on repository_url for filtering by repository
CREATE INDEX IF NOT EXISTS idx_agent_work_orders_repository
ON archon_agent_work_orders(repository_url);
-- GIN index on metadata JSONB for flexible queries
CREATE INDEX IF NOT EXISTS idx_agent_work_orders_metadata
ON archon_agent_work_orders USING GIN(metadata);
-- Indexes on archon_agent_work_order_steps for step history queries
-- Index on agent_work_order_id for retrieving all steps for a work order
CREATE INDEX IF NOT EXISTS idx_agent_work_order_steps_work_order_id
ON archon_agent_work_order_steps(agent_work_order_id);
-- Index on executed_at for temporal queries
CREATE INDEX IF NOT EXISTS idx_agent_work_order_steps_executed_at
ON archon_agent_work_order_steps(executed_at);
-- =====================================================
-- SECTION 3: CREATE TRIGGER
-- =====================================================
-- Apply auto-update trigger for updated_at timestamp
-- Reuses existing update_updated_at_column() function from Archon migrations
CREATE TRIGGER update_agent_work_orders_updated_at
BEFORE UPDATE ON archon_agent_work_orders
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
-- =====================================================
-- SECTION 4: ROW LEVEL SECURITY
-- =====================================================
-- Enable Row Level Security on both tables
ALTER TABLE archon_agent_work_orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE archon_agent_work_order_steps ENABLE ROW LEVEL SECURITY;
-- Policy 1: Service role has full access (for API operations)
CREATE POLICY "Allow service role full access to archon_agent_work_orders"
ON archon_agent_work_orders
FOR ALL
USING (auth.role() = 'service_role');
CREATE POLICY "Allow service role full access to archon_agent_work_order_steps"
ON archon_agent_work_order_steps
FOR ALL
USING (auth.role() = 'service_role');
-- Policy 2: Authenticated users can read and update (for frontend operations)
CREATE POLICY "Allow authenticated users to read and update archon_agent_work_orders"
ON archon_agent_work_orders
FOR ALL
TO authenticated
USING (true);
CREATE POLICY "Allow authenticated users to read and update archon_agent_work_order_steps"
ON archon_agent_work_order_steps
FOR ALL
TO authenticated
USING (true);
-- =====================================================
-- SECTION 5: TABLE COMMENTS
-- =====================================================
-- Comments on archon_agent_work_orders table
COMMENT ON TABLE archon_agent_work_orders IS
'Stores agent work order state and metadata with ACID guarantees for concurrent access';
COMMENT ON COLUMN archon_agent_work_orders.agent_work_order_id IS
'Unique work order identifier (TEXT format generated by id_generator.py)';
COMMENT ON COLUMN archon_agent_work_orders.repository_url IS
'GitHub repository URL for the work order';
COMMENT ON COLUMN archon_agent_work_orders.sandbox_identifier IS
'Unique identifier for sandbox environment (worktree directory name)';
COMMENT ON COLUMN archon_agent_work_orders.git_branch_name IS
'Git branch name created for work order (nullable if not yet created)';
COMMENT ON COLUMN archon_agent_work_orders.agent_session_id IS
'Agent session ID for tracking agent execution (nullable if not yet started)';
COMMENT ON COLUMN archon_agent_work_orders.status IS
'Current status: pending, running, completed, or failed';
COMMENT ON COLUMN archon_agent_work_orders.metadata IS
'JSONB metadata including sandbox_type, github_issue_number, current_phase, error_message, etc.';
COMMENT ON COLUMN archon_agent_work_orders.created_at IS
'Timestamp when work order was created';
COMMENT ON COLUMN archon_agent_work_orders.updated_at IS
'Timestamp when work order was last updated (auto-managed by trigger)';
-- Comments on archon_agent_work_order_steps table
COMMENT ON TABLE archon_agent_work_order_steps IS
'Stores step execution history for agent work orders with foreign key constraints';
COMMENT ON COLUMN archon_agent_work_order_steps.id IS
'Unique UUID identifier for step record';
COMMENT ON COLUMN archon_agent_work_order_steps.agent_work_order_id IS
'Foreign key to work order (CASCADE delete on work order deletion)';
COMMENT ON COLUMN archon_agent_work_order_steps.step IS
'WorkflowStep enum value (e.g., "create-branch", "planning", "execute")';
COMMENT ON COLUMN archon_agent_work_order_steps.agent_name IS
'Name of agent that executed the step';
COMMENT ON COLUMN archon_agent_work_order_steps.success IS
'Boolean indicating if step execution succeeded';
COMMENT ON COLUMN archon_agent_work_order_steps.output IS
'Step execution output (nullable)';
COMMENT ON COLUMN archon_agent_work_order_steps.error_message IS
'Error message if step failed (nullable)';
COMMENT ON COLUMN archon_agent_work_order_steps.duration_seconds IS
'Step execution duration in seconds';
COMMENT ON COLUMN archon_agent_work_order_steps.session_id IS
'Agent session ID for tracking (nullable)';
COMMENT ON COLUMN archon_agent_work_order_steps.executed_at IS
'Timestamp when step was executed';
COMMENT ON COLUMN archon_agent_work_order_steps.step_order IS
'Order of step within work order (0-indexed for sorting)';
-- =====================================================
-- SECTION 6: VERIFICATION
-- =====================================================
-- Verify archon_agent_work_orders table creation
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'archon_agent_work_orders'
) THEN
RAISE NOTICE '✓ Table archon_agent_work_orders created successfully';
ELSE
RAISE EXCEPTION '✗ Table archon_agent_work_orders was not created';
END IF;
END $$;
-- Verify archon_agent_work_order_steps table creation
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'archon_agent_work_order_steps'
) THEN
RAISE NOTICE '✓ Table archon_agent_work_order_steps created successfully';
ELSE
RAISE EXCEPTION '✗ Table archon_agent_work_order_steps was not created';
END IF;
END $$;
-- Verify indexes on archon_agent_work_orders
DO $$
BEGIN
IF (
SELECT COUNT(*) FROM pg_indexes
WHERE tablename = 'archon_agent_work_orders'
) >= 4 THEN
RAISE NOTICE '✓ Indexes on archon_agent_work_orders created successfully';
ELSE
RAISE WARNING '⚠ Expected at least 4 indexes on archon_agent_work_orders, found fewer';
END IF;
END $$;
-- Verify indexes on archon_agent_work_order_steps
DO $$
BEGIN
IF (
SELECT COUNT(*) FROM pg_indexes
WHERE tablename = 'archon_agent_work_order_steps'
) >= 2 THEN
RAISE NOTICE '✓ Indexes on archon_agent_work_order_steps created successfully';
ELSE
RAISE WARNING '⚠ Expected at least 2 indexes on archon_agent_work_order_steps, found fewer';
END IF;
END $$;
-- Verify trigger
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgrelid = 'archon_agent_work_orders'::regclass
AND tgname = 'update_agent_work_orders_updated_at'
) THEN
RAISE NOTICE '✓ Trigger update_agent_work_orders_updated_at created successfully';
ELSE
RAISE EXCEPTION '✗ Trigger update_agent_work_orders_updated_at was not created';
END IF;
END $$;
-- Verify RLS policies on archon_agent_work_orders
DO $$
BEGIN
IF (
SELECT COUNT(*) FROM pg_policies
WHERE tablename = 'archon_agent_work_orders'
) >= 2 THEN
RAISE NOTICE '✓ RLS policies on archon_agent_work_orders created successfully';
ELSE
RAISE WARNING '⚠ Expected at least 2 RLS policies on archon_agent_work_orders, found fewer';
END IF;
END $$;
-- Verify RLS policies on archon_agent_work_order_steps
DO $$
BEGIN
IF (
SELECT COUNT(*) FROM pg_policies
WHERE tablename = 'archon_agent_work_order_steps'
) >= 2 THEN
RAISE NOTICE '✓ RLS policies on archon_agent_work_order_steps created successfully';
ELSE
RAISE WARNING '⚠ Expected at least 2 RLS policies on archon_agent_work_order_steps, found fewer';
END IF;
END $$;
-- Verify foreign key constraint
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE table_name = 'archon_agent_work_order_steps'
AND constraint_type = 'FOREIGN KEY'
) THEN
RAISE NOTICE '✓ Foreign key constraint on archon_agent_work_order_steps created successfully';
ELSE
RAISE EXCEPTION '✗ Foreign key constraint on archon_agent_work_order_steps was not created';
END IF;
END $$;
-- =====================================================
-- SECTION 7: ROLLBACK INSTRUCTIONS
-- =====================================================
/*
To rollback this migration, run the following commands:
-- Drop tables (CASCADE will also drop indexes, triggers, and policies)
DROP TABLE IF EXISTS archon_agent_work_order_steps CASCADE;
DROP TABLE IF EXISTS archon_agent_work_orders CASCADE;
-- Verify tables are dropped
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'archon_agent_work_order%';
-- Should return 0 rows
-- Note: The update_updated_at_column() function is shared and should NOT be dropped
*/
-- =====================================================
-- MIGRATION COMPLETE
-- =====================================================
-- The archon_agent_work_orders and archon_agent_work_order_steps tables
-- are now ready for use.
--
-- Next steps:
-- 1. Set STATE_STORAGE_TYPE=supabase in environment
-- 2. Restart Agent Work Orders service
-- 3. Verify health endpoint shows database status healthy
-- 4. Test work order creation via API
-- =====================================================

View File

@@ -97,8 +97,94 @@ docker compose up -d
| `GH_CLI_PATH` | `gh` | Path to GitHub CLI executable |
| `GH_TOKEN` | - | GitHub Personal Access Token for gh CLI authentication (required for PR creation) |
| `LOG_LEVEL` | `INFO` | Logging level |
| `STATE_STORAGE_TYPE` | `memory` | State storage (`memory` or `file`) - Use `file` for persistence |
| `STATE_STORAGE_TYPE` | `memory` | State storage (`memory`, `file`, or `supabase`) - Use `supabase` for production |
| `FILE_STATE_DIRECTORY` | `agent-work-orders-state` | Directory for file-based state (when `STATE_STORAGE_TYPE=file`) |
| `SUPABASE_URL` | - | Supabase project URL (required when `STATE_STORAGE_TYPE=supabase`) |
| `SUPABASE_SERVICE_KEY` | - | Supabase service key (required when `STATE_STORAGE_TYPE=supabase`) |
### State Storage Options
The service supports three state storage backends:
**Memory Storage** (`STATE_STORAGE_TYPE=memory`):
- **Default**: Easiest for development/testing
- **Pros**: No setup required, fast
- **Cons**: State lost on service restart, no persistence
- **Use for**: Local development, unit tests
**File Storage** (`STATE_STORAGE_TYPE=file`):
- **Legacy**: File-based JSON persistence
- **Pros**: Simple, no external dependencies
- **Cons**: No ACID guarantees, race conditions possible, file corruption risk
- **Use for**: Single-instance deployments, backward compatibility
**Supabase Storage** (`STATE_STORAGE_TYPE=supabase`):
- **Recommended for production**: PostgreSQL-backed persistence via Supabase
- **Pros**: ACID guarantees, concurrent access support, foreign key constraints, indexes
- **Cons**: Requires Supabase configuration and credentials
- **Use for**: Production deployments, multi-instance setups
### Supabase Configuration
Agent Work Orders can use Supabase for production-ready persistent state management.
#### Setup Steps
1. **Reuse existing Archon Supabase credentials** - No new database or credentials needed. The agent work orders service shares the same Supabase project as the main Archon server.
2. **Apply database migration**:
- Navigate to your Supabase project dashboard at https://app.supabase.com
- Open SQL Editor
- Copy and paste the migration from `migration/agent_work_orders_state.sql` (in the project root)
- Execute the migration
- See `migration/AGENT_WORK_ORDERS.md` for detailed instructions
3. **Set environment variable**:
```bash
export STATE_STORAGE_TYPE=supabase
```
4. **Verify configuration**:
```bash
# Start the service
make agent-work-orders
# Check health endpoint
curl http://localhost:8053/health | jq
```
Expected response:
```json
{
"status": "healthy",
"storage_type": "supabase",
"database": {
"status": "healthy",
"tables_exist": true
}
}
```
#### Database Tables
When using Supabase storage, two tables are created:
- **`archon_agent_work_orders`**: Main work order state and metadata
- **`archon_agent_work_order_steps`**: Step execution history with foreign key constraints
#### Troubleshooting
**Error: "tables_exist": false**
- Migration not applied - see `database/migrations/README.md`
- Check Supabase dashboard SQL Editor for error messages
**Error: "SUPABASE_URL and SUPABASE_SERVICE_KEY must be set"**
- Environment variables not configured
- Ensure same credentials as main Archon server are set
**Service starts but work orders not persisted**
- Check `STATE_STORAGE_TYPE` is set to `supabase` (case-insensitive)
- Verify health endpoint shows `"storage_type": "supabase"`
### Service Discovery Modes

View File

@@ -0,0 +1,8 @@
"""Database client module for Agent Work Orders.
Provides Supabase client initialization and health checks for work order persistence.
"""
from .client import check_database_health, get_agent_work_orders_client
__all__ = ["get_agent_work_orders_client", "check_database_health"]

View File

@@ -0,0 +1,74 @@
"""Supabase client for Agent Work Orders.
Provides database connection management and health checks for work order state persistence.
Reuses same Supabase credentials as main Archon server (SUPABASE_URL, SUPABASE_SERVICE_KEY).
"""
import os
from typing import Any
from supabase import Client, create_client
from ..utils.structured_logger import get_logger
logger = get_logger(__name__)
def get_agent_work_orders_client() -> Client:
"""Get Supabase client for agent work orders.
Reuses same credentials as main Archon server (SUPABASE_URL, SUPABASE_SERVICE_KEY).
The service key provides full access and bypasses Row Level Security policies.
Returns:
Supabase client instance configured for work order operations
Raises:
ValueError: If SUPABASE_URL or SUPABASE_SERVICE_KEY environment variables are not set
Example:
>>> client = get_agent_work_orders_client()
>>> response = client.table("archon_agent_work_orders").select("*").execute()
"""
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_SERVICE_KEY")
if not url or not key:
raise ValueError(
"SUPABASE_URL and SUPABASE_SERVICE_KEY must be set in environment variables. "
"These should match the credentials used by the main Archon server."
)
return create_client(url, key)
async def check_database_health() -> dict[str, Any]:
"""Check if agent work orders tables exist and are accessible.
Verifies that both archon_agent_work_orders and archon_agent_work_order_steps
tables exist and can be queried. This is a lightweight check using limit(0)
to avoid fetching actual data.
Returns:
Dictionary with health check results:
- status: "healthy" or "unhealthy"
- tables_exist: True if both tables are accessible, False otherwise
- error: Error message if check failed (only present when unhealthy)
Example:
>>> health = await check_database_health()
>>> if health["status"] == "healthy":
... print("Database is ready")
"""
try:
client = get_agent_work_orders_client()
# Try to query both tables (limit 0 to avoid fetching data)
client.table("archon_agent_work_orders").select("agent_work_order_id").limit(0).execute()
client.table("archon_agent_work_order_steps").select("id").limit(0).execute()
logger.info("database_health_check_passed", tables=["archon_agent_work_orders", "archon_agent_work_order_steps"])
return {"status": "healthy", "tables_exist": True}
except Exception as e:
logger.error("database_health_check_failed", error=str(e), exc_info=True)
return {"status": "unhealthy", "tables_exist": False, "error": str(e)}

View File

@@ -16,6 +16,7 @@ from fastapi.middleware.cors import CORSMiddleware
from .api.routes import log_buffer, router
from .config import config
from .database.client import check_database_health
from .utils.structured_logger import (
configure_structured_logging_with_buffer,
get_logger,
@@ -196,6 +197,14 @@ async def health_check() -> dict[str, Any]:
"error": str(e),
}
# Check database health if using Supabase storage
if config.STATE_STORAGE_TYPE.lower() == "supabase":
db_health = await check_database_health()
health_status["storage_type"] = "supabase"
health_status["database"] = db_health
else:
health_status["storage_type"] = config.STATE_STORAGE_TYPE
# Check MCP server connectivity (if configured)
archon_mcp_url = os.getenv("ARCHON_MCP_URL")
if archon_mcp_url:

View File

@@ -1,26 +1,33 @@
"""Repository Factory
Creates appropriate repository instances based on configuration.
Supports both in-memory (for development/testing) and file-based (for production) storage.
Supports in-memory (dev/testing), file-based (legacy), and Supabase (production) storage.
"""
from ..config import config
from ..utils.structured_logger import get_logger
from .file_state_repository import FileStateRepository
from .supabase_repository import SupabaseWorkOrderRepository
from .work_order_repository import WorkOrderRepository
logger = get_logger(__name__)
def create_repository() -> WorkOrderRepository | FileStateRepository:
def create_repository() -> WorkOrderRepository | FileStateRepository | SupabaseWorkOrderRepository:
"""Create a work order repository based on configuration
Returns:
Repository instance (either in-memory or file-based)
Repository instance (in-memory, file-based, or Supabase)
Raises:
ValueError: If Supabase is configured but credentials are missing
"""
storage_type = config.STATE_STORAGE_TYPE.lower()
if storage_type == "file":
if storage_type == "supabase":
logger.info("repository_created", storage_type="supabase")
return SupabaseWorkOrderRepository()
elif storage_type == "file":
state_dir = config.FILE_STATE_DIRECTORY
logger.info(
"repository_created",

View File

@@ -0,0 +1,484 @@
"""Supabase-backed repository for agent work order state management.
Provides ACID-compliant persistent storage for work order state using PostgreSQL
via Supabase. Implements the same interface as in-memory and file-based repositories
for seamless switching between storage backends.
Architecture Note - async/await Pattern:
All repository methods are declared as `async def` for interface consistency
with other repository implementations, even though Supabase operations are sync.
This maintains a consistent async API contract across all repositories.
"""
from datetime import datetime
from typing import Any
from supabase import Client
from ..database.client import get_agent_work_orders_client
from ..models import (
AgentWorkOrderState,
AgentWorkOrderStatus,
StepExecutionResult,
StepHistory,
WorkflowStep,
)
from ..utils.structured_logger import get_logger
logger = get_logger(__name__)
class SupabaseWorkOrderRepository:
"""Supabase-backed repository for agent work orders.
Provides persistent storage with ACID guarantees, row-level locking,
and foreign key constraints for referential integrity.
Architecture:
- Work orders stored in archon_agent_work_orders table
- Step history stored in archon_agent_work_order_steps table with CASCADE delete
- Hybrid schema: Frequently queried fields as columns, flexible metadata as JSONB
- Auto-managed timestamps via database triggers
Thread Safety:
Uses Supabase client which is thread-safe for concurrent operations.
Database-level row locking prevents race conditions.
"""
def __init__(self) -> None:
"""Initialize Supabase repository with database client.
Raises:
ValueError: If Supabase credentials are not configured
"""
self.client: Client = get_agent_work_orders_client()
self.table_name: str = "archon_agent_work_orders"
self.steps_table_name: str = "archon_agent_work_order_steps"
self._logger = logger.bind(table=self.table_name)
self._logger.info("supabase_repository_initialized")
def _row_to_state_and_metadata(self, row: dict[str, Any]) -> tuple[AgentWorkOrderState, dict]:
"""Convert database row to (AgentWorkOrderState, metadata) tuple.
Args:
row: Raw database row with columns and JSONB metadata
Returns:
Tuple of (state, metadata) where state contains core fields
and metadata contains status, timestamps, and JSONB fields
Note:
Handles enum conversion from database string to AgentWorkOrderStatus
"""
# Extract core state fields
state = AgentWorkOrderState(
agent_work_order_id=row["agent_work_order_id"],
repository_url=row["repository_url"],
sandbox_identifier=row["sandbox_identifier"],
git_branch_name=row.get("git_branch_name"),
agent_session_id=row.get("agent_session_id"),
)
# Extract metadata
metadata = row.get("metadata", {}).copy()
metadata["status"] = AgentWorkOrderStatus(row["status"])
metadata["created_at"] = row["created_at"]
metadata["updated_at"] = row["updated_at"]
return (state, metadata)
async def create(self, work_order: AgentWorkOrderState, metadata: dict) -> None:
"""Create new work order in database.
Args:
work_order: Core work order state (5 fields)
metadata: Additional metadata including status, sandbox_type, etc.
Raises:
Exception: If database insert fails (e.g., duplicate ID, constraint violation)
Example:
>>> state = AgentWorkOrderState(
... agent_work_order_id="wo-123",
... repository_url="https://github.com/test/repo",
... sandbox_identifier="sandbox-123"
... )
>>> metadata = {"status": AgentWorkOrderStatus.PENDING, "sandbox_type": "git_worktree"}
>>> await repository.create(state, metadata)
"""
try:
# Prepare data for insertion
# Separate core state columns from JSONB metadata
data = {
"agent_work_order_id": work_order.agent_work_order_id,
"repository_url": work_order.repository_url,
"sandbox_identifier": work_order.sandbox_identifier,
"git_branch_name": work_order.git_branch_name,
"agent_session_id": work_order.agent_session_id,
"status": (
metadata["status"].value
if isinstance(metadata["status"], AgentWorkOrderStatus)
else metadata["status"]
),
# Store non-status/timestamp metadata in JSONB column
"metadata": {k: v for k, v in metadata.items() if k not in ["status", "created_at", "updated_at"]},
}
self.client.table(self.table_name).insert(data).execute()
self._logger.info(
"work_order_created",
agent_work_order_id=work_order.agent_work_order_id,
repository_url=work_order.repository_url,
)
except Exception as e:
self._logger.exception(
"create_work_order_failed",
agent_work_order_id=work_order.agent_work_order_id,
error=str(e),
)
raise
async def get(self, agent_work_order_id: str) -> tuple[AgentWorkOrderState, dict] | None:
"""Get work order by ID.
Args:
agent_work_order_id: Work order unique identifier
Returns:
Tuple of (state, metadata) or None if not found
Raises:
Exception: If database query fails
Example:
>>> result = await repository.get("wo-123")
>>> if result:
... state, metadata = result
... print(f"Status: {metadata['status']}")
"""
try:
response = self.client.table(self.table_name).select("*").eq("agent_work_order_id", agent_work_order_id).execute()
if not response.data:
self._logger.info("work_order_not_found", agent_work_order_id=agent_work_order_id)
return None
return self._row_to_state_and_metadata(response.data[0])
except Exception as e:
self._logger.exception(
"get_work_order_failed",
agent_work_order_id=agent_work_order_id,
error=str(e),
)
raise
async def list(self, status_filter: AgentWorkOrderStatus | None = None) -> list[tuple[AgentWorkOrderState, dict]]:
"""List all work orders with optional status filter.
Args:
status_filter: Optional status to filter by (e.g., PENDING, RUNNING)
Returns:
List of (state, metadata) tuples ordered by created_at DESC
Raises:
Exception: If database query fails
Example:
>>> # Get all running work orders
>>> running = await repository.list(status_filter=AgentWorkOrderStatus.RUNNING)
>>> for state, metadata in running:
... print(f"{state.agent_work_order_id}: {metadata['status']}")
"""
try:
query = self.client.table(self.table_name).select("*")
if status_filter:
query = query.eq("status", status_filter.value)
response = query.order("created_at", desc=True).execute()
results = [self._row_to_state_and_metadata(row) for row in response.data]
self._logger.info(
"work_orders_listed",
count=len(results),
status_filter=status_filter.value if status_filter else None,
)
return results
except Exception as e:
self._logger.exception(
"list_work_orders_failed",
status_filter=status_filter.value if status_filter else None,
error=str(e),
)
raise
async def update_status(
self,
agent_work_order_id: str,
status: AgentWorkOrderStatus,
**kwargs,
) -> None:
"""Update work order status and other metadata fields.
Args:
agent_work_order_id: Work order ID to update
status: New status value
**kwargs: Additional metadata fields to update (e.g., error_message, current_phase)
Raises:
Exception: If database update fails
Note:
If work order not found, logs warning but does not raise exception.
Updates are merged with existing metadata in JSONB column.
Example:
>>> await repository.update_status(
... "wo-123",
... AgentWorkOrderStatus.FAILED,
... error_message="Branch creation failed"
... )
"""
try:
# Prepare updates
updates: dict[str, Any] = {
"status": status.value,
"updated_at": datetime.now().isoformat(),
}
# Add any metadata updates to the JSONB column
if kwargs:
# Get current metadata, update it, then save
current = await self.get(agent_work_order_id)
if current:
_, metadata = current
metadata.update(kwargs)
# Extract non-status/timestamp metadata for JSONB column
jsonb_metadata = {k: v for k, v in metadata.items() if k not in ["status", "created_at", "updated_at"]}
updates["metadata"] = jsonb_metadata
response = (
self.client.table(self.table_name)
.update(updates)
.eq("agent_work_order_id", agent_work_order_id)
.execute()
)
if not response.data:
self._logger.warning(
"work_order_not_found_for_update",
agent_work_order_id=agent_work_order_id,
)
return
self._logger.info(
"work_order_status_updated",
agent_work_order_id=agent_work_order_id,
status=status.value,
)
except Exception as e:
self._logger.exception(
"update_work_order_status_failed",
agent_work_order_id=agent_work_order_id,
status=status.value,
error=str(e),
)
raise
async def update_git_branch(
self, agent_work_order_id: str, git_branch_name: str
) -> None:
"""Update git branch name in work order state.
Args:
agent_work_order_id: Work order ID to update
git_branch_name: New git branch name
Raises:
Exception: If database update fails
Example:
>>> await repository.update_git_branch("wo-123", "feature/new-feature")
"""
try:
self.client.table(self.table_name).update({
"git_branch_name": git_branch_name,
"updated_at": datetime.now().isoformat(),
}).eq("agent_work_order_id", agent_work_order_id).execute()
self._logger.info(
"work_order_git_branch_updated",
agent_work_order_id=agent_work_order_id,
git_branch_name=git_branch_name,
)
except Exception as e:
self._logger.exception(
"update_git_branch_failed",
agent_work_order_id=agent_work_order_id,
error=str(e),
)
raise
async def update_session_id(
self, agent_work_order_id: str, agent_session_id: str
) -> None:
"""Update agent session ID in work order state.
Args:
agent_work_order_id: Work order ID to update
agent_session_id: New agent session ID
Raises:
Exception: If database update fails
Example:
>>> await repository.update_session_id("wo-123", "session-abc-456")
"""
try:
self.client.table(self.table_name).update({
"agent_session_id": agent_session_id,
"updated_at": datetime.now().isoformat(),
}).eq("agent_work_order_id", agent_work_order_id).execute()
self._logger.info(
"work_order_session_id_updated",
agent_work_order_id=agent_work_order_id,
agent_session_id=agent_session_id,
)
except Exception as e:
self._logger.exception(
"update_session_id_failed",
agent_work_order_id=agent_work_order_id,
error=str(e),
)
raise
async def save_step_history(
self, agent_work_order_id: str, step_history: StepHistory
) -> None:
"""Save step execution history to database.
Uses delete + insert pattern for fresh save, replacing all existing steps.
Args:
agent_work_order_id: Work order ID
step_history: Complete step execution history
Raises:
Exception: If database operation fails
Note:
Foreign key constraint ensures cascade delete when work order is deleted.
Steps are inserted with step_order to maintain execution sequence.
Example:
>>> history = StepHistory(
... agent_work_order_id="wo-123",
... steps=[
... StepExecutionResult(
... step=WorkflowStep.CREATE_BRANCH,
... agent_name="test-agent",
... success=True,
... duration_seconds=1.5,
... timestamp=datetime.now()
... )
... ]
... )
>>> await repository.save_step_history("wo-123", history)
"""
try:
# Delete existing steps (fresh save pattern)
self.client.table(self.steps_table_name).delete().eq("agent_work_order_id", agent_work_order_id).execute()
# Insert all steps
if step_history.steps:
steps_data = []
for i, step in enumerate(step_history.steps):
steps_data.append({
"agent_work_order_id": agent_work_order_id,
"step": step.step.value,
"agent_name": step.agent_name,
"success": step.success,
"output": step.output,
"error_message": step.error_message,
"duration_seconds": step.duration_seconds,
"session_id": step.session_id,
"executed_at": step.timestamp.isoformat(),
"step_order": i,
})
self.client.table(self.steps_table_name).insert(steps_data).execute()
self._logger.info(
"step_history_saved",
agent_work_order_id=agent_work_order_id,
step_count=len(step_history.steps),
)
except Exception as e:
self._logger.exception(
"save_step_history_failed",
agent_work_order_id=agent_work_order_id,
error=str(e),
)
raise
async def get_step_history(self, agent_work_order_id: str) -> StepHistory | None:
"""Get step execution history from database.
Args:
agent_work_order_id: Work order ID
Returns:
StepHistory with ordered steps, or None if no steps found
Raises:
Exception: If database query fails
Example:
>>> history = await repository.get_step_history("wo-123")
>>> if history:
... for step in history.steps:
... print(f"{step.step}: {'' if step.success else ''}")
"""
try:
response = (
self.client.table(self.steps_table_name)
.select("*")
.eq("agent_work_order_id", agent_work_order_id)
.order("step_order")
.execute()
)
if not response.data:
self._logger.info(
"step_history_not_found",
agent_work_order_id=agent_work_order_id,
)
return None
# Convert rows to StepExecutionResult objects
steps = []
for row in response.data:
steps.append(StepExecutionResult(
step=WorkflowStep(row["step"]),
agent_name=row["agent_name"],
success=row["success"],
output=row.get("output"),
error_message=row.get("error_message"),
duration_seconds=row["duration_seconds"],
session_id=row.get("session_id"),
timestamp=row["executed_at"],
))
return StepHistory(agent_work_order_id=agent_work_order_id, steps=steps)
except Exception as e:
self._logger.exception(
"get_step_history_failed",
agent_work_order_id=agent_work_order_id,
error=str(e),
)
raise

View File

@@ -0,0 +1,170 @@
"""State Reconciliation Utilities
Utilities to detect and fix inconsistencies between database state and filesystem.
These tools help identify orphaned worktrees (exist on filesystem but not in database)
and dangling state (exist in database but worktree deleted).
"""
import shutil
from pathlib import Path
from typing import Any
from ..config import config
from ..models import AgentWorkOrderStatus
from ..state_manager.supabase_repository import SupabaseWorkOrderRepository
from ..utils.structured_logger import get_logger
logger = get_logger(__name__)
async def find_orphaned_worktrees(repository: SupabaseWorkOrderRepository) -> list[str]:
"""Find worktrees that exist on filesystem but not in database.
Orphaned worktrees can occur when:
- Database entries are deleted but worktree cleanup fails
- Service crashes during work order creation (worktree created but not saved to DB)
- Manual filesystem operations outside the service
Args:
repository: Supabase repository instance to query current state
Returns:
List of absolute paths to orphaned worktree directories
Example:
>>> repository = SupabaseWorkOrderRepository()
>>> orphans = await find_orphaned_worktrees(repository)
>>> print(f"Found {len(orphans)} orphaned worktrees")
"""
worktree_base = Path(config.WORKTREE_BASE_DIR)
if not worktree_base.exists():
logger.info("worktree_base_directory_not_found", path=str(worktree_base))
return []
# Get all worktree directories from filesystem
filesystem_worktrees = {d.name for d in worktree_base.iterdir() if d.is_dir()}
# Get all work orders from database
work_orders = await repository.list()
database_identifiers = {state.sandbox_identifier for state, _ in work_orders}
# Find orphans (in filesystem but not in database)
orphans = filesystem_worktrees - database_identifiers
logger.info(
"orphaned_worktrees_found",
count=len(orphans),
orphans=list(orphans)[:10], # Log first 10 to avoid spam
total_filesystem=len(filesystem_worktrees),
total_database=len(database_identifiers),
)
return [str(worktree_base / name) for name in orphans]
async def find_dangling_state(repository: SupabaseWorkOrderRepository) -> list[str]:
"""Find database entries with missing worktrees.
Dangling state can occur when:
- Worktree cleanup succeeds but database update fails
- Manual deletion of worktree directories
- Filesystem corruption or disk full errors
Args:
repository: Supabase repository instance to query current state
Returns:
List of work order IDs that have missing worktrees
Example:
>>> repository = SupabaseWorkOrderRepository()
>>> dangling = await find_dangling_state(repository)
>>> print(f"Found {len(dangling)} dangling state entries")
"""
worktree_base = Path(config.WORKTREE_BASE_DIR)
# Get all work orders from database
work_orders = await repository.list()
dangling = []
for state, _ in work_orders:
worktree_path = worktree_base / state.sandbox_identifier
if not worktree_path.exists():
dangling.append(state.agent_work_order_id)
logger.info(
"dangling_state_found",
count=len(dangling),
dangling=dangling[:10], # Log first 10 to avoid spam
total_work_orders=len(work_orders),
)
return dangling
async def reconcile_state(
repository: SupabaseWorkOrderRepository,
fix: bool = False
) -> dict[str, Any]:
"""Reconcile database state with filesystem.
Detects both orphaned worktrees and dangling state. If fix=True,
will clean up orphaned worktrees and mark dangling state as failed.
Args:
repository: Supabase repository instance
fix: If True, cleanup orphans and update dangling state. If False, dry-run only.
Returns:
Report dictionary with:
- orphaned_worktrees: List of orphaned worktree paths
- dangling_state: List of work order IDs with missing worktrees
- fix_applied: Whether fixes were applied
- actions_taken: List of action descriptions
Example:
>>> # Dry run to see what would be fixed
>>> report = await reconcile_state(repository, fix=False)
>>> print(f"Found {len(report['orphaned_worktrees'])} orphans")
>>>
>>> # Actually fix issues
>>> report = await reconcile_state(repository, fix=True)
>>> for action in report['actions_taken']:
... print(action)
"""
orphans = await find_orphaned_worktrees(repository)
dangling = await find_dangling_state(repository)
actions: list[str] = []
if fix:
# Clean up orphaned worktrees
for orphan_path in orphans:
try:
shutil.rmtree(orphan_path)
actions.append(f"Deleted orphaned worktree: {orphan_path}")
logger.info("orphaned_worktree_deleted", path=orphan_path)
except Exception as e:
actions.append(f"Failed to delete {orphan_path}: {e}")
logger.error("orphaned_worktree_delete_failed", path=orphan_path, error=str(e), exc_info=True)
# Update dangling state to mark as failed
for work_order_id in dangling:
try:
await repository.update_status(
work_order_id,
AgentWorkOrderStatus.FAILED,
error_message="Worktree missing - state/filesystem divergence detected during reconciliation"
)
actions.append(f"Marked work order {work_order_id} as failed (worktree missing)")
logger.info("dangling_state_updated", work_order_id=work_order_id)
except Exception as e:
actions.append(f"Failed to update {work_order_id}: {e}")
logger.error("dangling_state_update_failed", work_order_id=work_order_id, error=str(e), exc_info=True)
return {
"orphaned_worktrees": orphans,
"dangling_state": dangling,
"fix_applied": fix,
"actions_taken": actions,
}