diff --git a/docker-compose.yml b/docker-compose.yml index ca5b44b8..68fdffb7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -165,6 +165,7 @@ services: environment: - ENABLE_AGENT_WORK_ORDERS=true - SERVICE_DISCOVERY_MODE=docker_compose + - STATE_STORAGE_TYPE=supabase - ARCHON_SERVER_URL=http://archon-server:${ARCHON_SERVER_PORT:-8181} - ARCHON_MCP_URL=http://archon-mcp:${ARCHON_MCP_PORT:-8051} - SUPABASE_URL=${SUPABASE_URL} diff --git a/migration/AGENT_WORK_ORDERS.md b/migration/AGENT_WORK_ORDERS.md new file mode 100644 index 00000000..4ec6705b --- /dev/null +++ b/migration/AGENT_WORK_ORDERS.md @@ -0,0 +1,135 @@ +# Agent Work Orders Database Migrations + +This document describes the database migrations for the Agent Work Orders feature. + +## Overview + +Agent Work Orders is an optional microservice that executes agent-based workflows using Claude Code CLI. These migrations set up the required database tables for the feature. + +## Prerequisites + +- Supabase project with the same credentials as main Archon server +- `SUPABASE_URL` and `SUPABASE_SERVICE_KEY` environment variables configured + +## Migrations + +### 1. `agent_work_orders_repositories.sql` + +**Purpose**: Configure GitHub repositories for agent work orders + +**Creates**: +- `archon_configured_repositories` table for storing repository configurations +- Indexes for fast repository lookups +- RLS policies for access control +- Validation constraints for repository URLs + +**When to run**: Before using the repository configuration feature + +**Usage**: +```bash +# Open Supabase dashboard → SQL Editor +# Copy and paste the entire migration file +# Execute +``` + +### 2. `agent_work_orders_state.sql` + +**Purpose**: Persistent state management for agent work orders + +**Creates**: +- `archon_agent_work_orders` - Main work order state and metadata table +- `archon_agent_work_order_steps` - Step execution history with foreign key constraints +- Indexes for fast queries (status, repository_url, created_at) +- Database triggers for automatic timestamp management +- RLS policies for service and authenticated access + +**Features**: +- ACID guarantees for concurrent work order execution +- Foreign key CASCADE delete (steps deleted when work order deleted) +- Hybrid schema (frequently queried columns + JSONB for flexible metadata) +- Automatic `updated_at` timestamp management + +**When to run**: To enable Supabase-backed persistent storage for agent work orders + +**Usage**: +```bash +# Open Supabase dashboard → SQL Editor +# Copy and paste the entire migration file +# Execute +``` + +**Verification**: +```sql +-- Check tables exist +SELECT table_name FROM information_schema.tables +WHERE table_schema = 'public' +AND table_name LIKE 'archon_agent_work_order%'; + +-- Verify indexes +SELECT tablename, indexname FROM pg_indexes +WHERE tablename LIKE 'archon_agent_work_order%' +ORDER BY tablename, indexname; +``` + +## Configuration + +After applying migrations, configure the agent work orders service: + +```bash +# Set environment variable +export STATE_STORAGE_TYPE=supabase + +# Restart the service +docker compose restart archon-agent-work-orders +# OR +make agent-work-orders +``` + +## Health Check + +Verify the configuration: + +```bash +curl http://localhost:8053/health | jq '{storage_type, database}' +``` + +Expected response: +```json +{ + "storage_type": "supabase", + "database": { + "status": "healthy", + "tables_exist": true + } +} +``` + +## Storage Options + +Agent Work Orders supports three storage backends: + +1. **Memory** (`STATE_STORAGE_TYPE=memory`) - Default, no persistence +2. **File** (`STATE_STORAGE_TYPE=file`) - Legacy file-based storage +3. **Supabase** (`STATE_STORAGE_TYPE=supabase`) - **Recommended for production** + +## Rollback + +To remove the agent work orders state tables: + +```sql +-- Drop tables (CASCADE will also drop indexes, triggers, and policies) +DROP TABLE IF EXISTS archon_agent_work_order_steps CASCADE; +DROP TABLE IF EXISTS archon_agent_work_orders CASCADE; +``` + +**Note**: The `update_updated_at_column()` function is shared with other Archon tables and should NOT be dropped. + +## Documentation + +For detailed setup instructions, see: +- `python/src/agent_work_orders/README.md` - Service configuration guide and migration instructions + +## Migration History + +- **agent_work_orders_repositories.sql** - Initial repository configuration support +- **agent_work_orders_state.sql** - Supabase persistence migration (replaces file-based storage) diff --git a/migration/agent_work_orders_state.sql b/migration/agent_work_orders_state.sql new file mode 100644 index 00000000..f0f8738c --- /dev/null +++ b/migration/agent_work_orders_state.sql @@ -0,0 +1,356 @@ +-- ===================================================== +-- Agent Work Orders - State Management +-- ===================================================== +-- This migration creates tables for agent work order state persistence +-- in PostgreSQL, replacing file-based JSON storage with ACID-compliant +-- database backend. +-- +-- Features: +-- - Atomic state updates with ACID guarantees +-- - Row-level locking for concurrent access control +-- - Foreign key constraints for referential integrity +-- - Indexes for fast queries by status, repository, and timestamp +-- - JSONB metadata for flexible storage +-- - Automatic timestamp management via triggers +-- - Step execution history with ordering +-- +-- Run this in your Supabase SQL Editor +-- ===================================================== + +-- ===================================================== +-- SECTION 1: CREATE TABLES +-- ===================================================== + +-- Create archon_agent_work_orders table +CREATE TABLE IF NOT EXISTS archon_agent_work_orders ( + -- Primary identification (TEXT not UUID since generated by id_generator.py) + agent_work_order_id TEXT PRIMARY KEY, + + -- Core state fields (frequently queried as separate columns) + repository_url TEXT NOT NULL, + sandbox_identifier TEXT NOT NULL, + git_branch_name TEXT, + agent_session_id TEXT, + status TEXT NOT NULL CHECK (status IN ('pending', 'running', 'completed', 'failed')), + + -- Flexible metadata (JSONB for infrequently queried fields) + -- Stores: sandbox_type, github_issue_number, current_phase, error_message, etc. + metadata JSONB DEFAULT '{}'::jsonb, + + -- Timestamps (automatically managed) + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- Create archon_agent_work_order_steps table +-- Stores step execution history with foreign key to work orders +CREATE TABLE IF NOT EXISTS archon_agent_work_order_steps ( + -- Primary identification + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- Foreign key to work order (CASCADE delete when work order deleted) + agent_work_order_id TEXT NOT NULL REFERENCES archon_agent_work_orders(agent_work_order_id) ON DELETE CASCADE, + + -- Step execution details + step TEXT NOT NULL, -- WorkflowStep enum value (e.g., "create-branch", "planning") + agent_name TEXT NOT NULL, -- Name of agent that executed step + success BOOLEAN NOT NULL, -- Whether step succeeded + output TEXT, -- Step output (nullable) + error_message TEXT, -- Error message if failed (nullable) + duration_seconds FLOAT NOT NULL, -- Execution duration + session_id TEXT, -- Agent session ID (nullable) + executed_at TIMESTAMP WITH TIME ZONE NOT NULL, -- When step was executed + step_order INT NOT NULL -- Order within work order (0-indexed for sorting) +); + +-- ===================================================== +-- SECTION 2: CREATE INDEXES +-- ===================================================== + +-- Indexes on archon_agent_work_orders for common queries + +-- Index on status for filtering by work order status +CREATE INDEX IF NOT EXISTS idx_agent_work_orders_status + ON archon_agent_work_orders(status); + +-- Index on created_at for ordering by most recent +CREATE INDEX IF NOT EXISTS idx_agent_work_orders_created_at + ON archon_agent_work_orders(created_at DESC); + +-- Index on repository_url for filtering by repository +CREATE INDEX IF NOT EXISTS idx_agent_work_orders_repository + ON archon_agent_work_orders(repository_url); + +-- GIN index on metadata JSONB for flexible queries +CREATE INDEX IF NOT EXISTS idx_agent_work_orders_metadata + ON archon_agent_work_orders USING GIN(metadata); + +-- Indexes on archon_agent_work_order_steps for step history queries + +-- Index on agent_work_order_id for retrieving all steps for a work order +CREATE INDEX IF NOT EXISTS idx_agent_work_order_steps_work_order_id + ON archon_agent_work_order_steps(agent_work_order_id); + +-- Index on executed_at for temporal queries +CREATE INDEX IF NOT EXISTS idx_agent_work_order_steps_executed_at + ON archon_agent_work_order_steps(executed_at); + +-- ===================================================== +-- SECTION 3: CREATE TRIGGER +-- ===================================================== + +-- Apply auto-update trigger for updated_at timestamp +-- Reuses existing update_updated_at_column() function from Archon migrations +CREATE TRIGGER update_agent_work_orders_updated_at + BEFORE UPDATE ON archon_agent_work_orders + FOR EACH ROW + EXECUTE FUNCTION update_updated_at_column(); + +-- ===================================================== +-- SECTION 4: ROW LEVEL SECURITY +-- ===================================================== + +-- Enable Row Level Security on both tables +ALTER TABLE archon_agent_work_orders ENABLE ROW LEVEL SECURITY; +ALTER TABLE archon_agent_work_order_steps ENABLE ROW LEVEL SECURITY; + +-- Policy 1: Service role has full access (for API operations) +CREATE POLICY "Allow service role full access to archon_agent_work_orders" + ON archon_agent_work_orders + FOR ALL + USING (auth.role() = 'service_role'); + +CREATE POLICY "Allow service role full access to archon_agent_work_order_steps" + ON archon_agent_work_order_steps + FOR ALL + USING (auth.role() = 'service_role'); + +-- Policy 2: Authenticated users can read and update (for frontend operations) +CREATE POLICY "Allow authenticated users to read and update archon_agent_work_orders" + ON archon_agent_work_orders + FOR ALL + TO authenticated + USING (true); + +CREATE POLICY "Allow authenticated users to read and update archon_agent_work_order_steps" + ON archon_agent_work_order_steps + FOR ALL + TO authenticated + USING (true); + +-- ===================================================== +-- SECTION 5: TABLE COMMENTS +-- ===================================================== + +-- Comments on archon_agent_work_orders table +COMMENT ON TABLE archon_agent_work_orders IS + 'Stores agent work order state and metadata with ACID guarantees for concurrent access'; + +COMMENT ON COLUMN archon_agent_work_orders.agent_work_order_id IS + 'Unique work order identifier (TEXT format generated by id_generator.py)'; + +COMMENT ON COLUMN archon_agent_work_orders.repository_url IS + 'GitHub repository URL for the work order'; + +COMMENT ON COLUMN archon_agent_work_orders.sandbox_identifier IS + 'Unique identifier for sandbox environment (worktree directory name)'; + +COMMENT ON COLUMN archon_agent_work_orders.git_branch_name IS + 'Git branch name created for work order (nullable if not yet created)'; + +COMMENT ON COLUMN archon_agent_work_orders.agent_session_id IS + 'Agent session ID for tracking agent execution (nullable if not yet started)'; + +COMMENT ON COLUMN archon_agent_work_orders.status IS + 'Current status: pending, running, completed, or failed'; + +COMMENT ON COLUMN archon_agent_work_orders.metadata IS + 'JSONB metadata including sandbox_type, github_issue_number, current_phase, error_message, etc.'; + +COMMENT ON COLUMN archon_agent_work_orders.created_at IS + 'Timestamp when work order was created'; + +COMMENT ON COLUMN archon_agent_work_orders.updated_at IS + 'Timestamp when work order was last updated (auto-managed by trigger)'; + +-- Comments on archon_agent_work_order_steps table +COMMENT ON TABLE archon_agent_work_order_steps IS + 'Stores step execution history for agent work orders with foreign key constraints'; + +COMMENT ON COLUMN archon_agent_work_order_steps.id IS + 'Unique UUID identifier for step record'; + +COMMENT ON COLUMN archon_agent_work_order_steps.agent_work_order_id IS + 'Foreign key to work order (CASCADE delete on work order deletion)'; + +COMMENT ON COLUMN archon_agent_work_order_steps.step IS + 'WorkflowStep enum value (e.g., "create-branch", "planning", "execute")'; + +COMMENT ON COLUMN archon_agent_work_order_steps.agent_name IS + 'Name of agent that executed the step'; + +COMMENT ON COLUMN archon_agent_work_order_steps.success IS + 'Boolean indicating if step execution succeeded'; + +COMMENT ON COLUMN archon_agent_work_order_steps.output IS + 'Step execution output (nullable)'; + +COMMENT ON COLUMN archon_agent_work_order_steps.error_message IS + 'Error message if step failed (nullable)'; + +COMMENT ON COLUMN archon_agent_work_order_steps.duration_seconds IS + 'Step execution duration in seconds'; + +COMMENT ON COLUMN archon_agent_work_order_steps.session_id IS + 'Agent session ID for tracking (nullable)'; + +COMMENT ON COLUMN archon_agent_work_order_steps.executed_at IS + 'Timestamp when step was executed'; + +COMMENT ON COLUMN archon_agent_work_order_steps.step_order IS + 'Order of step within work order (0-indexed for sorting)'; + +-- ===================================================== +-- SECTION 6: VERIFICATION +-- ===================================================== + +-- Verify archon_agent_work_orders table creation +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'archon_agent_work_orders' + ) THEN + RAISE NOTICE '✓ Table archon_agent_work_orders created successfully'; + ELSE + RAISE EXCEPTION '✗ Table archon_agent_work_orders was not created'; + END IF; +END $$; + +-- Verify archon_agent_work_order_steps table creation +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'archon_agent_work_order_steps' + ) THEN + RAISE NOTICE '✓ Table archon_agent_work_order_steps created successfully'; + ELSE + RAISE EXCEPTION '✗ Table archon_agent_work_order_steps was not created'; + END IF; +END $$; + +-- Verify indexes on archon_agent_work_orders +DO $$ +BEGIN + IF ( + SELECT COUNT(*) FROM pg_indexes + WHERE tablename = 'archon_agent_work_orders' + ) >= 4 THEN + RAISE NOTICE '✓ Indexes on archon_agent_work_orders created successfully'; + ELSE + RAISE WARNING '⚠ Expected at least 4 indexes on archon_agent_work_orders, found fewer'; + END IF; +END $$; + +-- Verify indexes on archon_agent_work_order_steps +DO $$ +BEGIN + IF ( + SELECT COUNT(*) FROM pg_indexes + WHERE tablename = 'archon_agent_work_order_steps' + ) >= 2 THEN + RAISE NOTICE '✓ Indexes on archon_agent_work_order_steps created successfully'; + ELSE + RAISE WARNING '⚠ Expected at least 2 indexes on archon_agent_work_order_steps, found fewer'; + END IF; +END $$; + +-- Verify trigger +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM pg_trigger + WHERE tgrelid = 'archon_agent_work_orders'::regclass + AND tgname = 'update_agent_work_orders_updated_at' + ) THEN + RAISE NOTICE '✓ Trigger update_agent_work_orders_updated_at created successfully'; + ELSE + RAISE EXCEPTION '✗ Trigger update_agent_work_orders_updated_at was not created'; + END IF; +END $$; + +-- Verify RLS policies on archon_agent_work_orders +DO $$ +BEGIN + IF ( + SELECT COUNT(*) FROM pg_policies + WHERE tablename = 'archon_agent_work_orders' + ) >= 2 THEN + RAISE NOTICE '✓ RLS policies on archon_agent_work_orders created successfully'; + ELSE + RAISE WARNING '⚠ Expected at least 2 RLS policies on archon_agent_work_orders, found fewer'; + END IF; +END $$; + +-- Verify RLS policies on archon_agent_work_order_steps +DO $$ +BEGIN + IF ( + SELECT COUNT(*) FROM pg_policies + WHERE tablename = 'archon_agent_work_order_steps' + ) >= 2 THEN + RAISE NOTICE '✓ RLS policies on archon_agent_work_order_steps created successfully'; + ELSE + RAISE WARNING '⚠ Expected at least 2 RLS policies on archon_agent_work_order_steps, found fewer'; + END IF; +END $$; + +-- Verify foreign key constraint +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE table_name = 'archon_agent_work_order_steps' + AND constraint_type = 'FOREIGN KEY' + ) THEN + RAISE NOTICE '✓ Foreign key constraint on archon_agent_work_order_steps created successfully'; + ELSE + RAISE EXCEPTION '✗ Foreign key constraint on archon_agent_work_order_steps was not created'; + END IF; +END $$; + +-- ===================================================== +-- SECTION 7: ROLLBACK INSTRUCTIONS +-- ===================================================== + +/* +To rollback this migration, run the following commands: + +-- Drop tables (CASCADE will also drop indexes, triggers, and policies) +DROP TABLE IF EXISTS archon_agent_work_order_steps CASCADE; +DROP TABLE IF EXISTS archon_agent_work_orders CASCADE; + +-- Verify tables are dropped +SELECT table_name FROM information_schema.tables +WHERE table_schema = 'public' +AND table_name LIKE 'archon_agent_work_order%'; +-- Should return 0 rows + +-- Note: The update_updated_at_column() function is shared and should NOT be dropped +*/ + +-- ===================================================== +-- MIGRATION COMPLETE +-- ===================================================== +-- The archon_agent_work_orders and archon_agent_work_order_steps tables +-- are now ready for use. +-- +-- Next steps: +-- 1. Set STATE_STORAGE_TYPE=supabase in environment +-- 2. Restart Agent Work Orders service +-- 3. Verify health endpoint shows database status healthy +-- 4. Test work order creation via API +-- ===================================================== diff --git a/python/src/agent_work_orders/README.md b/python/src/agent_work_orders/README.md index da3f14a3..a28a2cfc 100644 --- a/python/src/agent_work_orders/README.md +++ b/python/src/agent_work_orders/README.md @@ -97,8 +97,94 @@ docker compose up -d | `GH_CLI_PATH` | `gh` | Path to GitHub CLI executable | | `GH_TOKEN` | - | GitHub Personal Access Token for gh CLI authentication (required for PR creation) | | `LOG_LEVEL` | `INFO` | Logging level | -| `STATE_STORAGE_TYPE` | `memory` | State storage (`memory` or `file`) - Use `file` for persistence | +| `STATE_STORAGE_TYPE` | `memory` | State storage (`memory`, `file`, or `supabase`) - Use `supabase` for production | | `FILE_STATE_DIRECTORY` | `agent-work-orders-state` | Directory for file-based state (when `STATE_STORAGE_TYPE=file`) | +| `SUPABASE_URL` | - | Supabase project URL (required when `STATE_STORAGE_TYPE=supabase`) | +| `SUPABASE_SERVICE_KEY` | - | Supabase service key (required when `STATE_STORAGE_TYPE=supabase`) | + +### State Storage Options + +The service supports three state storage backends: + +**Memory Storage** (`STATE_STORAGE_TYPE=memory`): +- **Default**: Easiest for development/testing +- **Pros**: No setup required, fast +- **Cons**: State lost on service restart, no persistence +- **Use for**: Local development, unit tests + +**File Storage** (`STATE_STORAGE_TYPE=file`): +- **Legacy**: File-based JSON persistence +- **Pros**: Simple, no external dependencies +- **Cons**: No ACID guarantees, race conditions possible, file corruption risk +- **Use for**: Single-instance deployments, backward compatibility + +**Supabase Storage** (`STATE_STORAGE_TYPE=supabase`): +- **Recommended for production**: PostgreSQL-backed persistence via Supabase +- **Pros**: ACID guarantees, concurrent access support, foreign key constraints, indexes +- **Cons**: Requires Supabase configuration and credentials +- **Use for**: Production deployments, multi-instance setups + +### Supabase Configuration + +Agent Work Orders can use Supabase for production-ready persistent state management. + +#### Setup Steps + +1. **Reuse existing Archon Supabase credentials** - No new database or credentials needed. The agent work orders service shares the same Supabase project as the main Archon server. + +2. **Apply database migration**: + - Navigate to your Supabase project dashboard at https://app.supabase.com + - Open SQL Editor + - Copy and paste the migration from `migration/agent_work_orders_state.sql` (in the project root) + - Execute the migration + - See `migration/AGENT_WORK_ORDERS.md` for detailed instructions + +3. **Set environment variable**: + ```bash + export STATE_STORAGE_TYPE=supabase + ``` + +4. **Verify configuration**: + ```bash + # Start the service + make agent-work-orders + + # Check health endpoint + curl http://localhost:8053/health | jq + ``` + + Expected response: + ```json + { + "status": "healthy", + "storage_type": "supabase", + "database": { + "status": "healthy", + "tables_exist": true + } + } + ``` + +#### Database Tables + +When using Supabase storage, two tables are created: + +- **`archon_agent_work_orders`**: Main work order state and metadata +- **`archon_agent_work_order_steps`**: Step execution history with foreign key constraints + +#### Troubleshooting + +**Error: "tables_exist": false** +- Migration not applied - see `database/migrations/README.md` +- Check Supabase dashboard SQL Editor for error messages + +**Error: "SUPABASE_URL and SUPABASE_SERVICE_KEY must be set"** +- Environment variables not configured +- Ensure same credentials as main Archon server are set + +**Service starts but work orders not persisted** +- Check `STATE_STORAGE_TYPE` is set to `supabase` (case-insensitive) +- Verify health endpoint shows `"storage_type": "supabase"` ### Service Discovery Modes diff --git a/python/src/agent_work_orders/database/__init__.py b/python/src/agent_work_orders/database/__init__.py new file mode 100644 index 00000000..72ab8884 --- /dev/null +++ b/python/src/agent_work_orders/database/__init__.py @@ -0,0 +1,8 @@ +"""Database client module for Agent Work Orders. + +Provides Supabase client initialization and health checks for work order persistence. +""" + +from .client import check_database_health, get_agent_work_orders_client + +__all__ = ["get_agent_work_orders_client", "check_database_health"] diff --git a/python/src/agent_work_orders/database/client.py b/python/src/agent_work_orders/database/client.py new file mode 100644 index 00000000..a8aa5a32 --- /dev/null +++ b/python/src/agent_work_orders/database/client.py @@ -0,0 +1,74 @@ +"""Supabase client for Agent Work Orders. + +Provides database connection management and health checks for work order state persistence. +Reuses same Supabase credentials as main Archon server (SUPABASE_URL, SUPABASE_SERVICE_KEY). +""" + +import os +from typing import Any + +from supabase import Client, create_client + +from ..utils.structured_logger import get_logger + +logger = get_logger(__name__) + + +def get_agent_work_orders_client() -> Client: + """Get Supabase client for agent work orders. + + Reuses same credentials as main Archon server (SUPABASE_URL, SUPABASE_SERVICE_KEY). + The service key provides full access and bypasses Row Level Security policies. + + Returns: + Supabase client instance configured for work order operations + + Raises: + ValueError: If SUPABASE_URL or SUPABASE_SERVICE_KEY environment variables are not set + + Example: + >>> client = get_agent_work_orders_client() + >>> response = client.table("archon_agent_work_orders").select("*").execute() + """ + url = os.getenv("SUPABASE_URL") + key = os.getenv("SUPABASE_SERVICE_KEY") + + if not url or not key: + raise ValueError( + "SUPABASE_URL and SUPABASE_SERVICE_KEY must be set in environment variables. " + "These should match the credentials used by the main Archon server." + ) + + return create_client(url, key) + + +async def check_database_health() -> dict[str, Any]: + """Check if agent work orders tables exist and are accessible. + + Verifies that both archon_agent_work_orders and archon_agent_work_order_steps + tables exist and can be queried. This is a lightweight check using limit(0) + to avoid fetching actual data. + + Returns: + Dictionary with health check results: + - status: "healthy" or "unhealthy" + - tables_exist: True if both tables are accessible, False otherwise + - error: Error message if check failed (only present when unhealthy) + + Example: + >>> health = await check_database_health() + >>> if health["status"] == "healthy": + ... print("Database is ready") + """ + try: + client = get_agent_work_orders_client() + + # Try to query both tables (limit 0 to avoid fetching data) + client.table("archon_agent_work_orders").select("agent_work_order_id").limit(0).execute() + client.table("archon_agent_work_order_steps").select("id").limit(0).execute() + + logger.info("database_health_check_passed", tables=["archon_agent_work_orders", "archon_agent_work_order_steps"]) + return {"status": "healthy", "tables_exist": True} + except Exception as e: + logger.error("database_health_check_failed", error=str(e), exc_info=True) + return {"status": "unhealthy", "tables_exist": False, "error": str(e)} diff --git a/python/src/agent_work_orders/server.py b/python/src/agent_work_orders/server.py index fba5be2c..d7aee851 100644 --- a/python/src/agent_work_orders/server.py +++ b/python/src/agent_work_orders/server.py @@ -16,6 +16,7 @@ from fastapi.middleware.cors import CORSMiddleware from .api.routes import log_buffer, router from .config import config +from .database.client import check_database_health from .utils.structured_logger import ( configure_structured_logging_with_buffer, get_logger, @@ -196,6 +197,14 @@ async def health_check() -> dict[str, Any]: "error": str(e), } + # Check database health if using Supabase storage + if config.STATE_STORAGE_TYPE.lower() == "supabase": + db_health = await check_database_health() + health_status["storage_type"] = "supabase" + health_status["database"] = db_health + else: + health_status["storage_type"] = config.STATE_STORAGE_TYPE + # Check MCP server connectivity (if configured) archon_mcp_url = os.getenv("ARCHON_MCP_URL") if archon_mcp_url: diff --git a/python/src/agent_work_orders/state_manager/repository_factory.py b/python/src/agent_work_orders/state_manager/repository_factory.py index 233059be..aa5bb045 100644 --- a/python/src/agent_work_orders/state_manager/repository_factory.py +++ b/python/src/agent_work_orders/state_manager/repository_factory.py @@ -1,26 +1,33 @@ """Repository Factory Creates appropriate repository instances based on configuration. -Supports both in-memory (for development/testing) and file-based (for production) storage. +Supports in-memory (dev/testing), file-based (legacy), and Supabase (production) storage. """ from ..config import config from ..utils.structured_logger import get_logger from .file_state_repository import FileStateRepository +from .supabase_repository import SupabaseWorkOrderRepository from .work_order_repository import WorkOrderRepository logger = get_logger(__name__) -def create_repository() -> WorkOrderRepository | FileStateRepository: +def create_repository() -> WorkOrderRepository | FileStateRepository | SupabaseWorkOrderRepository: """Create a work order repository based on configuration Returns: - Repository instance (either in-memory or file-based) + Repository instance (in-memory, file-based, or Supabase) + + Raises: + ValueError: If Supabase is configured but credentials are missing """ storage_type = config.STATE_STORAGE_TYPE.lower() - if storage_type == "file": + if storage_type == "supabase": + logger.info("repository_created", storage_type="supabase") + return SupabaseWorkOrderRepository() + elif storage_type == "file": state_dir = config.FILE_STATE_DIRECTORY logger.info( "repository_created", diff --git a/python/src/agent_work_orders/state_manager/supabase_repository.py b/python/src/agent_work_orders/state_manager/supabase_repository.py new file mode 100644 index 00000000..36fde235 --- /dev/null +++ b/python/src/agent_work_orders/state_manager/supabase_repository.py @@ -0,0 +1,484 @@ +"""Supabase-backed repository for agent work order state management. + +Provides ACID-compliant persistent storage for work order state using PostgreSQL +via Supabase. Implements the same interface as in-memory and file-based repositories +for seamless switching between storage backends. + +Architecture Note - async/await Pattern: + All repository methods are declared as `async def` for interface consistency + with other repository implementations, even though Supabase operations are sync. + This maintains a consistent async API contract across all repositories. +""" + +from datetime import datetime +from typing import Any + +from supabase import Client + +from ..database.client import get_agent_work_orders_client +from ..models import ( + AgentWorkOrderState, + AgentWorkOrderStatus, + StepExecutionResult, + StepHistory, + WorkflowStep, +) +from ..utils.structured_logger import get_logger + +logger = get_logger(__name__) + + +class SupabaseWorkOrderRepository: + """Supabase-backed repository for agent work orders. + + Provides persistent storage with ACID guarantees, row-level locking, + and foreign key constraints for referential integrity. + + Architecture: + - Work orders stored in archon_agent_work_orders table + - Step history stored in archon_agent_work_order_steps table with CASCADE delete + - Hybrid schema: Frequently queried fields as columns, flexible metadata as JSONB + - Auto-managed timestamps via database triggers + + Thread Safety: + Uses Supabase client which is thread-safe for concurrent operations. + Database-level row locking prevents race conditions. + """ + + def __init__(self) -> None: + """Initialize Supabase repository with database client. + + Raises: + ValueError: If Supabase credentials are not configured + """ + self.client: Client = get_agent_work_orders_client() + self.table_name: str = "archon_agent_work_orders" + self.steps_table_name: str = "archon_agent_work_order_steps" + self._logger = logger.bind(table=self.table_name) + self._logger.info("supabase_repository_initialized") + + def _row_to_state_and_metadata(self, row: dict[str, Any]) -> tuple[AgentWorkOrderState, dict]: + """Convert database row to (AgentWorkOrderState, metadata) tuple. + + Args: + row: Raw database row with columns and JSONB metadata + + Returns: + Tuple of (state, metadata) where state contains core fields + and metadata contains status, timestamps, and JSONB fields + + Note: + Handles enum conversion from database string to AgentWorkOrderStatus + """ + # Extract core state fields + state = AgentWorkOrderState( + agent_work_order_id=row["agent_work_order_id"], + repository_url=row["repository_url"], + sandbox_identifier=row["sandbox_identifier"], + git_branch_name=row.get("git_branch_name"), + agent_session_id=row.get("agent_session_id"), + ) + + # Extract metadata + metadata = row.get("metadata", {}).copy() + metadata["status"] = AgentWorkOrderStatus(row["status"]) + metadata["created_at"] = row["created_at"] + metadata["updated_at"] = row["updated_at"] + + return (state, metadata) + + async def create(self, work_order: AgentWorkOrderState, metadata: dict) -> None: + """Create new work order in database. + + Args: + work_order: Core work order state (5 fields) + metadata: Additional metadata including status, sandbox_type, etc. + + Raises: + Exception: If database insert fails (e.g., duplicate ID, constraint violation) + + Example: + >>> state = AgentWorkOrderState( + ... agent_work_order_id="wo-123", + ... repository_url="https://github.com/test/repo", + ... sandbox_identifier="sandbox-123" + ... ) + >>> metadata = {"status": AgentWorkOrderStatus.PENDING, "sandbox_type": "git_worktree"} + >>> await repository.create(state, metadata) + """ + try: + # Prepare data for insertion + # Separate core state columns from JSONB metadata + data = { + "agent_work_order_id": work_order.agent_work_order_id, + "repository_url": work_order.repository_url, + "sandbox_identifier": work_order.sandbox_identifier, + "git_branch_name": work_order.git_branch_name, + "agent_session_id": work_order.agent_session_id, + "status": ( + metadata["status"].value + if isinstance(metadata["status"], AgentWorkOrderStatus) + else metadata["status"] + ), + # Store non-status/timestamp metadata in JSONB column + "metadata": {k: v for k, v in metadata.items() if k not in ["status", "created_at", "updated_at"]}, + } + + self.client.table(self.table_name).insert(data).execute() + + self._logger.info( + "work_order_created", + agent_work_order_id=work_order.agent_work_order_id, + repository_url=work_order.repository_url, + ) + except Exception as e: + self._logger.exception( + "create_work_order_failed", + agent_work_order_id=work_order.agent_work_order_id, + error=str(e), + ) + raise + + async def get(self, agent_work_order_id: str) -> tuple[AgentWorkOrderState, dict] | None: + """Get work order by ID. + + Args: + agent_work_order_id: Work order unique identifier + + Returns: + Tuple of (state, metadata) or None if not found + + Raises: + Exception: If database query fails + + Example: + >>> result = await repository.get("wo-123") + >>> if result: + ... state, metadata = result + ... print(f"Status: {metadata['status']}") + """ + try: + response = self.client.table(self.table_name).select("*").eq("agent_work_order_id", agent_work_order_id).execute() + + if not response.data: + self._logger.info("work_order_not_found", agent_work_order_id=agent_work_order_id) + return None + + return self._row_to_state_and_metadata(response.data[0]) + except Exception as e: + self._logger.exception( + "get_work_order_failed", + agent_work_order_id=agent_work_order_id, + error=str(e), + ) + raise + + async def list(self, status_filter: AgentWorkOrderStatus | None = None) -> list[tuple[AgentWorkOrderState, dict]]: + """List all work orders with optional status filter. + + Args: + status_filter: Optional status to filter by (e.g., PENDING, RUNNING) + + Returns: + List of (state, metadata) tuples ordered by created_at DESC + + Raises: + Exception: If database query fails + + Example: + >>> # Get all running work orders + >>> running = await repository.list(status_filter=AgentWorkOrderStatus.RUNNING) + >>> for state, metadata in running: + ... print(f"{state.agent_work_order_id}: {metadata['status']}") + """ + try: + query = self.client.table(self.table_name).select("*") + + if status_filter: + query = query.eq("status", status_filter.value) + + response = query.order("created_at", desc=True).execute() + + results = [self._row_to_state_and_metadata(row) for row in response.data] + + self._logger.info( + "work_orders_listed", + count=len(results), + status_filter=status_filter.value if status_filter else None, + ) + + return results + except Exception as e: + self._logger.exception( + "list_work_orders_failed", + status_filter=status_filter.value if status_filter else None, + error=str(e), + ) + raise + + async def update_status( + self, + agent_work_order_id: str, + status: AgentWorkOrderStatus, + **kwargs, + ) -> None: + """Update work order status and other metadata fields. + + Args: + agent_work_order_id: Work order ID to update + status: New status value + **kwargs: Additional metadata fields to update (e.g., error_message, current_phase) + + Raises: + Exception: If database update fails + + Note: + If work order not found, logs warning but does not raise exception. + Updates are merged with existing metadata in JSONB column. + + Example: + >>> await repository.update_status( + ... "wo-123", + ... AgentWorkOrderStatus.FAILED, + ... error_message="Branch creation failed" + ... ) + """ + try: + # Prepare updates + updates: dict[str, Any] = { + "status": status.value, + "updated_at": datetime.now().isoformat(), + } + + # Add any metadata updates to the JSONB column + if kwargs: + # Get current metadata, update it, then save + current = await self.get(agent_work_order_id) + if current: + _, metadata = current + metadata.update(kwargs) + # Extract non-status/timestamp metadata for JSONB column + jsonb_metadata = {k: v for k, v in metadata.items() if k not in ["status", "created_at", "updated_at"]} + updates["metadata"] = jsonb_metadata + + response = ( + self.client.table(self.table_name) + .update(updates) + .eq("agent_work_order_id", agent_work_order_id) + .execute() + ) + + if not response.data: + self._logger.warning( + "work_order_not_found_for_update", + agent_work_order_id=agent_work_order_id, + ) + return + + self._logger.info( + "work_order_status_updated", + agent_work_order_id=agent_work_order_id, + status=status.value, + ) + except Exception as e: + self._logger.exception( + "update_work_order_status_failed", + agent_work_order_id=agent_work_order_id, + status=status.value, + error=str(e), + ) + raise + + async def update_git_branch( + self, agent_work_order_id: str, git_branch_name: str + ) -> None: + """Update git branch name in work order state. + + Args: + agent_work_order_id: Work order ID to update + git_branch_name: New git branch name + + Raises: + Exception: If database update fails + + Example: + >>> await repository.update_git_branch("wo-123", "feature/new-feature") + """ + try: + self.client.table(self.table_name).update({ + "git_branch_name": git_branch_name, + "updated_at": datetime.now().isoformat(), + }).eq("agent_work_order_id", agent_work_order_id).execute() + + self._logger.info( + "work_order_git_branch_updated", + agent_work_order_id=agent_work_order_id, + git_branch_name=git_branch_name, + ) + except Exception as e: + self._logger.exception( + "update_git_branch_failed", + agent_work_order_id=agent_work_order_id, + error=str(e), + ) + raise + + async def update_session_id( + self, agent_work_order_id: str, agent_session_id: str + ) -> None: + """Update agent session ID in work order state. + + Args: + agent_work_order_id: Work order ID to update + agent_session_id: New agent session ID + + Raises: + Exception: If database update fails + + Example: + >>> await repository.update_session_id("wo-123", "session-abc-456") + """ + try: + self.client.table(self.table_name).update({ + "agent_session_id": agent_session_id, + "updated_at": datetime.now().isoformat(), + }).eq("agent_work_order_id", agent_work_order_id).execute() + + self._logger.info( + "work_order_session_id_updated", + agent_work_order_id=agent_work_order_id, + agent_session_id=agent_session_id, + ) + except Exception as e: + self._logger.exception( + "update_session_id_failed", + agent_work_order_id=agent_work_order_id, + error=str(e), + ) + raise + + async def save_step_history( + self, agent_work_order_id: str, step_history: StepHistory + ) -> None: + """Save step execution history to database. + + Uses delete + insert pattern for fresh save, replacing all existing steps. + + Args: + agent_work_order_id: Work order ID + step_history: Complete step execution history + + Raises: + Exception: If database operation fails + + Note: + Foreign key constraint ensures cascade delete when work order is deleted. + Steps are inserted with step_order to maintain execution sequence. + + Example: + >>> history = StepHistory( + ... agent_work_order_id="wo-123", + ... steps=[ + ... StepExecutionResult( + ... step=WorkflowStep.CREATE_BRANCH, + ... agent_name="test-agent", + ... success=True, + ... duration_seconds=1.5, + ... timestamp=datetime.now() + ... ) + ... ] + ... ) + >>> await repository.save_step_history("wo-123", history) + """ + try: + # Delete existing steps (fresh save pattern) + self.client.table(self.steps_table_name).delete().eq("agent_work_order_id", agent_work_order_id).execute() + + # Insert all steps + if step_history.steps: + steps_data = [] + for i, step in enumerate(step_history.steps): + steps_data.append({ + "agent_work_order_id": agent_work_order_id, + "step": step.step.value, + "agent_name": step.agent_name, + "success": step.success, + "output": step.output, + "error_message": step.error_message, + "duration_seconds": step.duration_seconds, + "session_id": step.session_id, + "executed_at": step.timestamp.isoformat(), + "step_order": i, + }) + + self.client.table(self.steps_table_name).insert(steps_data).execute() + + self._logger.info( + "step_history_saved", + agent_work_order_id=agent_work_order_id, + step_count=len(step_history.steps), + ) + except Exception as e: + self._logger.exception( + "save_step_history_failed", + agent_work_order_id=agent_work_order_id, + error=str(e), + ) + raise + + async def get_step_history(self, agent_work_order_id: str) -> StepHistory | None: + """Get step execution history from database. + + Args: + agent_work_order_id: Work order ID + + Returns: + StepHistory with ordered steps, or None if no steps found + + Raises: + Exception: If database query fails + + Example: + >>> history = await repository.get_step_history("wo-123") + >>> if history: + ... for step in history.steps: + ... print(f"{step.step}: {'✓' if step.success else '✗'}") + """ + try: + response = ( + self.client.table(self.steps_table_name) + .select("*") + .eq("agent_work_order_id", agent_work_order_id) + .order("step_order") + .execute() + ) + + if not response.data: + self._logger.info( + "step_history_not_found", + agent_work_order_id=agent_work_order_id, + ) + return None + + # Convert rows to StepExecutionResult objects + steps = [] + for row in response.data: + steps.append(StepExecutionResult( + step=WorkflowStep(row["step"]), + agent_name=row["agent_name"], + success=row["success"], + output=row.get("output"), + error_message=row.get("error_message"), + duration_seconds=row["duration_seconds"], + session_id=row.get("session_id"), + timestamp=row["executed_at"], + )) + + return StepHistory(agent_work_order_id=agent_work_order_id, steps=steps) + except Exception as e: + self._logger.exception( + "get_step_history_failed", + agent_work_order_id=agent_work_order_id, + error=str(e), + ) + raise diff --git a/python/src/agent_work_orders/utils/state_reconciliation.py b/python/src/agent_work_orders/utils/state_reconciliation.py new file mode 100644 index 00000000..f8d7f7ff --- /dev/null +++ b/python/src/agent_work_orders/utils/state_reconciliation.py @@ -0,0 +1,170 @@ +"""State Reconciliation Utilities + +Utilities to detect and fix inconsistencies between database state and filesystem. +These tools help identify orphaned worktrees (exist on filesystem but not in database) +and dangling state (exist in database but worktree deleted). +""" + +import shutil +from pathlib import Path +from typing import Any + +from ..config import config +from ..models import AgentWorkOrderStatus +from ..state_manager.supabase_repository import SupabaseWorkOrderRepository +from ..utils.structured_logger import get_logger + +logger = get_logger(__name__) + + +async def find_orphaned_worktrees(repository: SupabaseWorkOrderRepository) -> list[str]: + """Find worktrees that exist on filesystem but not in database. + + Orphaned worktrees can occur when: + - Database entries are deleted but worktree cleanup fails + - Service crashes during work order creation (worktree created but not saved to DB) + - Manual filesystem operations outside the service + + Args: + repository: Supabase repository instance to query current state + + Returns: + List of absolute paths to orphaned worktree directories + + Example: + >>> repository = SupabaseWorkOrderRepository() + >>> orphans = await find_orphaned_worktrees(repository) + >>> print(f"Found {len(orphans)} orphaned worktrees") + """ + worktree_base = Path(config.WORKTREE_BASE_DIR) + if not worktree_base.exists(): + logger.info("worktree_base_directory_not_found", path=str(worktree_base)) + return [] + + # Get all worktree directories from filesystem + filesystem_worktrees = {d.name for d in worktree_base.iterdir() if d.is_dir()} + + # Get all work orders from database + work_orders = await repository.list() + database_identifiers = {state.sandbox_identifier for state, _ in work_orders} + + # Find orphans (in filesystem but not in database) + orphans = filesystem_worktrees - database_identifiers + + logger.info( + "orphaned_worktrees_found", + count=len(orphans), + orphans=list(orphans)[:10], # Log first 10 to avoid spam + total_filesystem=len(filesystem_worktrees), + total_database=len(database_identifiers), + ) + + return [str(worktree_base / name) for name in orphans] + + +async def find_dangling_state(repository: SupabaseWorkOrderRepository) -> list[str]: + """Find database entries with missing worktrees. + + Dangling state can occur when: + - Worktree cleanup succeeds but database update fails + - Manual deletion of worktree directories + - Filesystem corruption or disk full errors + + Args: + repository: Supabase repository instance to query current state + + Returns: + List of work order IDs that have missing worktrees + + Example: + >>> repository = SupabaseWorkOrderRepository() + >>> dangling = await find_dangling_state(repository) + >>> print(f"Found {len(dangling)} dangling state entries") + """ + worktree_base = Path(config.WORKTREE_BASE_DIR) + + # Get all work orders from database + work_orders = await repository.list() + + dangling = [] + for state, _ in work_orders: + worktree_path = worktree_base / state.sandbox_identifier + if not worktree_path.exists(): + dangling.append(state.agent_work_order_id) + + logger.info( + "dangling_state_found", + count=len(dangling), + dangling=dangling[:10], # Log first 10 to avoid spam + total_work_orders=len(work_orders), + ) + + return dangling + + +async def reconcile_state( + repository: SupabaseWorkOrderRepository, + fix: bool = False +) -> dict[str, Any]: + """Reconcile database state with filesystem. + + Detects both orphaned worktrees and dangling state. If fix=True, + will clean up orphaned worktrees and mark dangling state as failed. + + Args: + repository: Supabase repository instance + fix: If True, cleanup orphans and update dangling state. If False, dry-run only. + + Returns: + Report dictionary with: + - orphaned_worktrees: List of orphaned worktree paths + - dangling_state: List of work order IDs with missing worktrees + - fix_applied: Whether fixes were applied + - actions_taken: List of action descriptions + + Example: + >>> # Dry run to see what would be fixed + >>> report = await reconcile_state(repository, fix=False) + >>> print(f"Found {len(report['orphaned_worktrees'])} orphans") + >>> + >>> # Actually fix issues + >>> report = await reconcile_state(repository, fix=True) + >>> for action in report['actions_taken']: + ... print(action) + """ + orphans = await find_orphaned_worktrees(repository) + dangling = await find_dangling_state(repository) + + actions: list[str] = [] + + if fix: + # Clean up orphaned worktrees + for orphan_path in orphans: + try: + shutil.rmtree(orphan_path) + actions.append(f"Deleted orphaned worktree: {orphan_path}") + logger.info("orphaned_worktree_deleted", path=orphan_path) + except Exception as e: + actions.append(f"Failed to delete {orphan_path}: {e}") + logger.error("orphaned_worktree_delete_failed", path=orphan_path, error=str(e), exc_info=True) + + # Update dangling state to mark as failed + for work_order_id in dangling: + try: + await repository.update_status( + work_order_id, + AgentWorkOrderStatus.FAILED, + error_message="Worktree missing - state/filesystem divergence detected during reconciliation" + ) + actions.append(f"Marked work order {work_order_id} as failed (worktree missing)") + logger.info("dangling_state_updated", work_order_id=work_order_id) + except Exception as e: + actions.append(f"Failed to update {work_order_id}: {e}") + logger.error("dangling_state_update_failed", work_order_id=work_order_id, error=str(e), exc_info=True) + + return { + "orphaned_worktrees": orphans, + "dangling_state": dangling, + "fix_applied": fix, + "actions_taken": actions, + }