Archon V4 - Massive Streamlit UI Overhaul for Admin Dashboard

This commit is contained in:
Cole Medin
2025-02-27 16:06:53 -06:00
parent 1b96168a30
commit 4e72bc77ce
32 changed files with 4531 additions and 292 deletions

5
.gitignore vendored
View File

@@ -5,4 +5,7 @@ venv
.langgraph_api
# Files
.env
.env
.env.temp
.env.test
env_vars.json

6
.streamlit/config.toml Normal file
View File

@@ -0,0 +1,6 @@
[client]
showErrorDetails = "none"
[theme]
primaryColor = "#FF69B4"
base="dark"

199
README.md
View File

@@ -6,8 +6,8 @@
<h3>🚀 **CURRENT VERSION** 🚀</h3>
**[ V3 - MCP Support ]**
*Using LangGraph + Pydantic AI with AI IDE integration*
**[ V4 - Massive Streamlit UI Overhaul ]**
*Comprehensive dashboard interface for managing Archon with Streamlit*
</div>
@@ -20,7 +20,7 @@ Through its iterative development, Archon showcases the power of planning, feedb
## Important Links
- The current version of Archon is V3 as mentioned above - see [V3 Documentation](iterations/v3-mcp-support/README.md) for details.
- The current version of Archon is V4 as mentioned above - see [V4 Documentation](iterations/v4-streamlit-ui-overhaul/README.md) for details.
- I **just** created the [Archon community](https://thinktank.ottomator.ai/c/archon/30) forum over in the oTTomator Think Tank! Please post any questions you have there!
@@ -34,6 +34,48 @@ Archon demonstrates three key principles in modern AI development:
2. **Domain Knowledge Integration**: Seamless embedding of frameworks like Pydantic AI and LangGraph within autonomous workflows
3. **Scalable Architecture**: Modular design supporting maintainability, cost optimization, and ethical AI practices
## Getting Started with V4 (current version)
Since V4 is the current version of Archon, all the code for V4 is in both the main directory and `archon/iterations/v4-streamlit-ui-overhaul` directory.
### Prerequisites
- Python 3.11+
- Supabase account (for vector database)
- OpenAI/OpenRouter API key or Ollama for local LLMs
### Installation
1. Clone the repository:
```bash
git clone https://github.com/coleam00/archon.git
cd archon
```
2. Install dependencies:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
```
### Quick Start
1. Start the Streamlit UI:
```bash
streamlit run streamlit_ui.py
```
2. Follow the guided setup process in the Intro section of the Streamlit UI:
- **Environment**: Configure your API keys and model settings
- **Database**: Set up your Supabase vector database
- **Documentation**: Crawl and index the Pydantic AI documentation
- **Agent Service**: Start the agent service for generating agents
- **Chat**: Interact with Archon to create AI agents
- **MCP** (optional): Configure integration with AI IDEs
The Streamlit interface will guide you through each step with clear instructions and interactive elements.
There are a good amount of steps for the setup but it goes quick!
## Project Evolution
### V1: Single-Agent Foundation
@@ -49,18 +91,27 @@ Archon demonstrates three key principles in modern AI development:
- Support for local LLMs via Ollama
- [Learn more about V2](iterations/v2-agentic-workflow/README.md)
### V3: Current - MCP Support
### V3: MCP Support
- Integration with AI IDEs like Windsurf and Cursor
- Automated file creation and dependency management
- FastAPI service for agent generation
- Improved project structure and organization
- [Learn more about V3](iterations/v3-mcp-support/README.md)
### V4: Current - Streamlit UI Overhaul
- Comprehensive Streamlit interface for managing all aspects of Archon
- Guided setup process with interactive tabs
- Environment variable management through the UI
- Database setup and documentation crawling simplified
- Agent service control and monitoring
- MCP configuration through the UI
- [Learn more about V4](iterations/v4-streamlit-ui-overhaul/README.md)
### Future Iterations
- V4: Self-Feedback Loop - Automated validation and error correction
- V5: Tool Library Integration - Pre-built external tool incorporation
- V6: Multi-Framework Support - Framework-agnostic agent generation
- V7: Autonomous Framework Learning - Self-updating framework adapters
- V5: Self-Feedback Loop - Automated validation and error correction
- V6: Tool Library Integration - Pre-built external tool incorporation
- V7: Multi-Framework Support - Framework-agnostic agent generation
- V8: Autonomous Framework Learning - Self-updating framework adapters
### Future Integrations
- Docker
@@ -68,128 +119,12 @@ Archon demonstrates three key principles in modern AI development:
- Other frameworks besides Pydantic AI
- Other vector databases besides Supabase
## Getting Started with V3 (current version)
Since V3 is the current version of Archon, all the code for V3 is in both the `archon` and `archon/iterations/v3-mcp-support` directories.
### Prerequisites
- Python 3.11+
- Supabase account and database
- OpenAI/OpenRouter API key or Ollama for local LLMs
- Streamlit (for web interface)
- Windsurf, Cursor, or another MCP-compatible AI IDE (optional)
### Installation
There are two ways to install Archon V3:
#### Option 1: Standard Installation (for Streamlit UI)
1. Clone the repository:
```bash
git clone https://github.com/coleam00/archon.git
cd archon
```
2. Install dependencies:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
```
#### Option 2: MCP Server Setup (for AI IDE integration)
1. Clone the repository as above
2. Run the MCP setup script:
```bash
python setup_mcp.py
```
For running the crawler and graph service later, activate the virtual environment too:
```bash
source venv/bin/activate # On Windows: venv\Scripts\activate
```
This will:
- Create a virtual environment if it doesn't exist
- Install dependencies from requirements.txt
- Generate an MCP configuration file
3. Configure your AI IDE:
- **In Windsurf**:
- Click on the hammer icon above the chat input
- Click on "Configure"
- Paste the JSON that `setup_mcp.py` gave you as the MCP config
- Click "Refresh" next to "Configure"
- **In Cursor**:
- Go to Cursor Settings > Features > MCP
- Click on "+ Add New MCP Server"
- Name: Archon
- Type: command (equivalent to stdio)
- Command: Paste the command that `setup_mcp.py` gave for Cursor
### Environment Setup
1. Configure environment:
- Rename `.env.example` to `.env`
- Edit `.env` with your settings:
```env
BASE_URL=https://api.openai.com/v1 for OpenAI, https://api.openrouter.ai/v1 for OpenRouter, or your Ollama URL
LLM_API_KEY=your_openai_or_openrouter_api_key
OPENAI_API_KEY=your_openai_api_key # Required for embeddings
SUPABASE_URL=your_supabase_url
SUPABASE_SERVICE_KEY=your_supabase_service_key
PRIMARY_MODEL=gpt-4o-mini # Main agent model
REASONER_MODEL=o3-mini # Planning model
```
### Quick Start
1. Set up the database:
- Execute `utils/site_pages.sql` in your Supabase SQL Editor
- This creates tables and enables vector similarity search
- See the Database Setup section for more details
2. Crawl documentation:
```bash
python archon/crawl_pydantic_ai_docs.py
```
3. Run Archon either as an MCP Server or with Streamlit:
### Using with AI IDEs (MCP Support)
1. After crawling the documentation, start the graph service:
```bash
python graph_service.py
```
Archon runs as a separate API endpoint for MCP instead of directly in the MCP server because that way Archon can be updated separately without having to restart the MCP server, and the communication protocols for MCP seemed to interfere with LLM calls when done directly within the MCP server.
2. Restart the MCP server in your AI IDE
3. You can now ask your AI IDE to create agents with Archon
4. Be sure to specify when you want to use Archon - not necessary but it helps a lot
### Using the Streamlit UI
For an interactive web interface:
```bash
streamlit run streamlit_ui.py
```
The interface will be available at `http://localhost:8501`
## Architecture
### Core Files
- `mcp_server.py`: MCP server script for AI IDE integration
- `streamlit_ui.py`: Comprehensive web interface for managing all aspects of Archon
- `graph_service.py`: FastAPI service that handles the agentic workflow
- `setup_mcp.py`: MCP setup script
- `streamlit_ui.py`: Web interface with streaming support
- `mcp_server.py`: MCP server script for AI IDE integration
- `requirements.txt`: Project dependencies
### Archon Package
@@ -202,7 +137,7 @@ The interface will be available at `http://localhost:8501`
- `utils/`: Utility functions and database setup
- `utils.py`: Shared utility functions
- `site_pages.sql`: Database setup commands
- `site_pages_ollama.sql`: Database setup commands with vector dimensions updated for nomic-embed-text
- `env_vars.json`: Environment variables defined in the UI are stored here (included in .gitignore, file is created automatically)
### Database Setup
@@ -221,14 +156,7 @@ CREATE TABLE site_pages (
);
```
Execute the SQL commands in `utils/site_pages.sql` to:
1. Create the necessary tables
2. Enable vector similarity search
3. Set up Row Level Security policies
In Supabase, do this by going to the "SQL Editor" tab and pasting in the SQL into the editor there. Then click "Run".
If using Ollama with the nomic-embed-text embedding model or another with 786 dimensions, either update site_pages.sql so that the dimensions are 768 instead of 1536 or use `utils/ollama_site_pages.sql`
The Streamlit UI provides an interface to set up this database structure automatically.
## Contributing
@@ -244,3 +172,4 @@ For version-specific details:
- [V1 Documentation](iterations/v1-single-agent/README.md)
- [V2 Documentation](iterations/v2-agentic-workflow/README.md)
- [V3 Documentation](iterations/v3-mcp-support/README.md)
- [V4 Documentation](iterations/v4-streamlit-ui-overhaul/README.md)

View File

@@ -11,6 +11,7 @@ from supabase import Client
import logfire
import os
import sys
from utils.utils import get_env_var
# Import the message classes from Pydantic AI
from pydantic_ai.messages import (
@@ -28,16 +29,16 @@ load_dotenv()
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided')
base_url = get_env_var('BASE_URL') or 'https://api.openai.com/v1'
api_key = get_env_var('LLM_API_KEY') or 'no-llm-api-key-provided'
is_ollama = "localhost" in base_url.lower()
reasoner_llm_model = os.getenv('REASONER_MODEL', 'o3-mini')
reasoner_llm_model = get_env_var('REASONER_MODEL') or 'o3-mini'
reasoner = Agent(
OpenAIModel(reasoner_llm_model, base_url=base_url, api_key=api_key),
system_prompt='You are an expert at coding AI agents with Pydantic AI and defining the scope for doing so.',
)
primary_llm_model = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini')
primary_llm_model = get_env_var('PRIMARY_MODEL') or 'gpt-4o-mini'
router_agent = Agent(
OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key),
system_prompt='Your job is to route the user message either to the end of the conversation or to continue coding the AI agent.',
@@ -53,12 +54,15 @@ openai_client=None
if is_ollama:
openai_client = AsyncOpenAI(base_url=base_url,api_key=api_key)
else:
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
openai_client = AsyncOpenAI(api_key=get_env_var("OPENAI_API_KEY"))
supabase: Client = Client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
if get_env_var("SUPABASE_URL"):
supabase: Client = Client(
get_env_var("SUPABASE_URL"),
get_env_var("SUPABASE_SERVICE_KEY")
)
else:
supabase = None
# Define state schema
class AgentState(TypedDict):

View File

@@ -1,14 +1,22 @@
import os
import sys
import json
import asyncio
import threading
import subprocess
import requests
import json
from typing import List, Dict, Any, Optional, Callable
from xml.etree import ElementTree
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timezone
from urllib.parse import urlparse
from dotenv import load_dotenv
import re
import html2text
# Add the parent directory to sys.path to allow importing from the parent directory
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.utils import get_env_var
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from openai import AsyncOpenAI
@@ -18,24 +26,31 @@ load_dotenv()
# Initialize OpenAI and Supabase clients
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided')
base_url = get_env_var('BASE_URL') or 'https://api.openai.com/v1'
api_key = get_env_var('LLM_API_KEY') or 'no-llm-api-key-provided'
is_ollama = "localhost" in base_url.lower()
embedding_model = os.getenv('EMBEDDING_MODEL', 'text-embedding-3-small')
embedding_model = get_env_var('EMBEDDING_MODEL') or 'text-embedding-3-small'
openai_client=None
if is_ollama:
openai_client = AsyncOpenAI(base_url=base_url,api_key=api_key)
else:
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
openai_client = AsyncOpenAI(api_key=get_env_var("OPENAI_API_KEY"))
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
get_env_var("SUPABASE_URL"),
get_env_var("SUPABASE_SERVICE_KEY")
)
# Initialize HTML to Markdown converter
html_converter = html2text.HTML2Text()
html_converter.ignore_links = False
html_converter.ignore_images = False
html_converter.ignore_tables = False
html_converter.body_width = 0 # No wrapping
@dataclass
class ProcessedChunk:
url: str
@@ -46,6 +61,85 @@ class ProcessedChunk:
metadata: Dict[str, Any]
embedding: List[float]
class CrawlProgressTracker:
"""Class to track progress of the crawling process."""
def __init__(self,
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None):
"""Initialize the progress tracker.
Args:
progress_callback: Function to call with progress updates
"""
self.progress_callback = progress_callback
self.urls_found = 0
self.urls_processed = 0
self.urls_succeeded = 0
self.urls_failed = 0
self.chunks_stored = 0
self.logs = []
self.is_running = False
self.start_time = None
self.end_time = None
def log(self, message: str):
"""Add a log message and update progress."""
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.logs.append(log_entry)
print(message) # Also print to console
# Call the progress callback if provided
if self.progress_callback:
self.progress_callback(self.get_status())
def start(self):
"""Mark the crawling process as started."""
self.is_running = True
self.start_time = datetime.now()
self.log("Crawling process started")
# Call the progress callback if provided
if self.progress_callback:
self.progress_callback(self.get_status())
def complete(self):
"""Mark the crawling process as completed."""
self.is_running = False
self.end_time = datetime.now()
duration = self.end_time - self.start_time if self.start_time else None
duration_str = str(duration).split('.')[0] if duration else "unknown"
self.log(f"Crawling process completed in {duration_str}")
# Call the progress callback if provided
if self.progress_callback:
self.progress_callback(self.get_status())
def get_status(self) -> Dict[str, Any]:
"""Get the current status of the crawling process."""
return {
"is_running": self.is_running,
"urls_found": self.urls_found,
"urls_processed": self.urls_processed,
"urls_succeeded": self.urls_succeeded,
"urls_failed": self.urls_failed,
"chunks_stored": self.chunks_stored,
"progress_percentage": (self.urls_processed / self.urls_found * 100) if self.urls_found > 0 else 0,
"logs": self.logs,
"start_time": self.start_time,
"end_time": self.end_time
}
@property
def is_completed(self) -> bool:
"""Return True if the crawling process is completed."""
return not self.is_running and self.end_time is not None
@property
def is_successful(self) -> bool:
"""Return True if the crawling process completed successfully."""
return self.is_completed and self.urls_failed == 0 and self.urls_succeeded > 0
def chunk_text(text: str, chunk_size: int = 5000) -> List[str]:
"""Split text into chunks, respecting code blocks and paragraphs."""
chunks = []
@@ -101,7 +195,7 @@ async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]:
try:
response = await openai_client.chat.completions.create(
model=os.getenv("PRIMARY_MODEL", "gpt-4o-mini"),
model=get_env_var("PRIMARY_MODEL") or "gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context
@@ -171,11 +265,19 @@ async def insert_chunk(chunk: ProcessedChunk):
print(f"Error inserting chunk: {e}")
return None
async def process_and_store_document(url: str, markdown: str):
async def process_and_store_document(url: str, markdown: str, tracker: Optional[CrawlProgressTracker] = None):
"""Process a document and store its chunks in parallel."""
# Split into chunks
chunks = chunk_text(markdown)
if tracker:
tracker.log(f"Split document into {len(chunks)} chunks for {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Split document into {len(chunks)} chunks for {url}")
# Process chunks in parallel
tasks = [
process_chunk(chunk, i, url)
@@ -183,47 +285,119 @@ async def process_and_store_document(url: str, markdown: str):
]
processed_chunks = await asyncio.gather(*tasks)
if tracker:
tracker.log(f"Processed {len(processed_chunks)} chunks for {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Processed {len(processed_chunks)} chunks for {url}")
# Store chunks in parallel
insert_tasks = [
insert_chunk(chunk)
for chunk in processed_chunks
]
await asyncio.gather(*insert_tasks)
if tracker:
tracker.chunks_stored += len(processed_chunks)
tracker.log(f"Stored {len(processed_chunks)} chunks for {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Stored {len(processed_chunks)} chunks for {url}")
async def crawl_parallel(urls: List[str], max_concurrent: int = 5):
"""Crawl multiple URLs in parallel with a concurrency limit."""
browser_config = BrowserConfig(
headless=True,
verbose=False,
extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
)
crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
# Create the crawler instance
crawler = AsyncWebCrawler(config=browser_config)
await crawler.start()
def fetch_url_content(url: str) -> str:
"""Fetch content from a URL using requests and convert to markdown."""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
# Create a semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
async def process_url(url: str):
async with semaphore:
result = await crawler.arun(
url=url,
config=crawl_config,
session_id="session1"
)
if result.success:
print(f"Successfully crawled: {url}")
await process_and_store_document(url, result.markdown_v2.raw_markdown)
# Convert HTML to Markdown
markdown = html_converter.handle(response.text)
# Clean up the markdown
markdown = re.sub(r'\n{3,}', '\n\n', markdown) # Remove excessive newlines
return markdown
except Exception as e:
raise Exception(f"Error fetching {url}: {str(e)}")
async def crawl_parallel_with_requests(urls: List[str], tracker: Optional[CrawlProgressTracker] = None, max_concurrent: int = 5):
"""Crawl multiple URLs in parallel with a concurrency limit using direct HTTP requests."""
# Create a semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
async def process_url(url: str):
async with semaphore:
if tracker:
tracker.log(f"Crawling: {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Crawling: {url}")
try:
# Use a thread pool to run the blocking HTTP request
loop = asyncio.get_running_loop()
if tracker:
tracker.log(f"Fetching content from: {url}")
else:
print(f"Failed: {url} - Error: {result.error_message}")
# Process all URLs in parallel with limited concurrency
await asyncio.gather(*[process_url(url) for url in urls])
finally:
await crawler.close()
print(f"Fetching content from: {url}")
markdown = await loop.run_in_executor(None, fetch_url_content, url)
if markdown:
if tracker:
tracker.urls_succeeded += 1
tracker.log(f"Successfully crawled: {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Successfully crawled: {url}")
await process_and_store_document(url, markdown, tracker)
else:
if tracker:
tracker.urls_failed += 1
tracker.log(f"Failed: {url} - No content retrieved")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Failed: {url} - No content retrieved")
except Exception as e:
if tracker:
tracker.urls_failed += 1
tracker.log(f"Error processing {url}: {str(e)}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Error processing {url}: {str(e)}")
finally:
if tracker:
tracker.urls_processed += 1
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
# Process all URLs in parallel with limited concurrency
if tracker:
tracker.log(f"Processing {len(urls)} URLs with concurrency {max_concurrent}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Processing {len(urls)} URLs with concurrency {max_concurrent}")
await asyncio.gather(*[process_url(url) for url in urls])
def get_pydantic_ai_docs_urls() -> List[str]:
"""Get URLs from Pydantic AI docs sitemap."""
@@ -254,18 +428,84 @@ async def clear_existing_records():
print(f"Error clearing existing records: {e}")
return None
async def main():
# Clear existing records first
await clear_existing_records()
# Get URLs from Pydantic AI docs
urls = get_pydantic_ai_docs_urls()
if not urls:
print("No URLs found to crawl")
return
print(f"Found {len(urls)} URLs to crawl")
await crawl_parallel(urls)
async def main_with_requests(tracker: Optional[CrawlProgressTracker] = None):
"""Main function using direct HTTP requests instead of browser automation."""
try:
# Start tracking if tracker is provided
if tracker:
tracker.start()
else:
print("Starting crawling process...")
# Clear existing records first
if tracker:
tracker.log("Clearing existing Pydantic AI docs records...")
else:
print("Clearing existing Pydantic AI docs records...")
await clear_existing_records()
if tracker:
tracker.log("Existing records cleared")
else:
print("Existing records cleared")
# Get URLs from Pydantic AI docs
if tracker:
tracker.log("Fetching URLs from Pydantic AI sitemap...")
else:
print("Fetching URLs from Pydantic AI sitemap...")
urls = get_pydantic_ai_docs_urls()
if not urls:
if tracker:
tracker.log("No URLs found to crawl")
tracker.complete()
else:
print("No URLs found to crawl")
return
if tracker:
tracker.urls_found = len(urls)
tracker.log(f"Found {len(urls)} URLs to crawl")
else:
print(f"Found {len(urls)} URLs to crawl")
# Crawl the URLs using direct HTTP requests
await crawl_parallel_with_requests(urls, tracker)
# Mark as complete if tracker is provided
if tracker:
tracker.complete()
else:
print("Crawling process completed")
except Exception as e:
if tracker:
tracker.log(f"Error in crawling process: {str(e)}")
tracker.complete()
else:
print(f"Error in crawling process: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())
def start_crawl_with_requests(progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None) -> CrawlProgressTracker:
"""Start the crawling process using direct HTTP requests in a separate thread and return the tracker."""
tracker = CrawlProgressTracker(progress_callback)
def run_crawl():
try:
asyncio.run(main_with_requests(tracker))
except Exception as e:
print(f"Error in crawl thread: {e}")
tracker.log(f"Thread error: {str(e)}")
tracker.complete()
# Start the crawling process in a separate thread
thread = threading.Thread(target=run_crawl)
thread.daemon = True
thread.start()
return tracker
if __name__ == "__main__":
# Run the main function directly
print("Starting crawler...")
asyncio.run(main_with_requests())
print("Crawler finished.")

View File

@@ -6,20 +6,26 @@ import logfire
import asyncio
import httpx
import os
import sys
import json
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
from pydantic_ai import Agent, ModelRetry, RunContext
from pydantic_ai.models.openai import OpenAIModel
from openai import AsyncOpenAI
from supabase import Client
from typing import List
from utils.utils import get_env_var
# Add the parent directory to sys.path to allow importing from the parent directory
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
load_dotenv()
llm = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini')
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided')
llm = get_env_var('PRIMARY_MODEL') or 'gpt-4o-mini'
base_url = get_env_var('BASE_URL') or 'https://api.openai.com/v1'
api_key = get_env_var('LLM_API_KEY') or 'no-llm-api-key-provided'
model = OpenAIModel(llm, base_url=base_url, api_key=api_key)
embedding_model = os.getenv('EMBEDDING_MODEL', 'text-embedding-3-small')
embedding_model = get_env_var('EMBEDDING_MODEL') or 'text-embedding-3-small'
logfire.configure(send_to_logfire='if-token-present')

View File

@@ -4,7 +4,7 @@ from typing import Optional, Dict, Any
from archon.archon_graph import agentic_flow
from langgraph.types import Command
from utils.utils import write_to_log
app = FastAPI()
class InvokeRequest(BaseModel):

View File

@@ -0,0 +1,41 @@
# Base URL for the OpenAI instance (default is https://api.openai.com/v1)
# OpenAI: https://api.openai.com/v1
# Ollama (example): http://localhost:11434/v1
# OpenRouter: https://openrouter.ai/api/v1
BASE_URL=
# For OpenAI: https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# For OpenRouter: https://openrouter.ai/keys
# For Ollama, no need to set this unless you specifically configured an API key
LLM_API_KEY=
# Get your Open AI API Key by following these instructions -
# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# Even if using OpenRouter, you still need to set this for the embedding model.
# No need to set this if using Ollama.
OPENAI_API_KEY=
# For the Supabase version (sample_supabase_agent.py), set your Supabase URL and Service Key.
# Get your SUPABASE_URL from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
SUPABASE_URL=
# Get your SUPABASE_SERVICE_KEY from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
# On this page it is called the service_role secret.
SUPABASE_SERVICE_KEY=
# The LLM you want to use for the reasoner (o3-mini, R1, QwQ, etc.).
# Example: o3-mini
# Example: deepseek-r1:7b-8k
REASONER_MODEL=
# The LLM you want to use for the primary agent/coder.
# Example: gpt-4o-mini
# Example: qwen2.5:14b-instruct-8k
PRIMARY_MODEL=
# Embedding model you want to use
# Example for Ollama: nomic-embed-text
# Example for OpenAI: text-embedding-3-small
EMBEDDING_MODEL=

View File

@@ -0,0 +1,2 @@
# Auto detect text files and perform LF normalization
* text=auto

View File

@@ -0,0 +1,11 @@
# Folders
workbench
__pycache__
venv
.langgraph_api
# Files
.env
.env.temp
.env.test
env_vars.json

View File

@@ -0,0 +1,6 @@
[client]
showErrorDetails = "none"
[theme]
primaryColor = "#FF69B4"
base="dark"

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 oTTomator and Archon contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,142 @@
# Archon V4 - Streamlit UI Overhaul
This is the fourth iteration of the Archon project, building upon V3 by adding a comprehensive Streamlit UI for managing all aspects of Archon. The system retains the core LangGraph workflow and MCP support from V3, but now provides a unified interface for environment configuration, database setup, documentation crawling, agent service management, and MCP integration.
What makes V4 special is its guided setup process that walks users through each step of configuring and running Archon. The Streamlit UI eliminates the need for manual configuration of environment variables, database setup, and service management, making Archon much more accessible to users without extensive technical knowledge.
The core remains an intelligent documentation crawler and RAG (Retrieval-Augmented Generation) system built using Pydantic AI, LangGraph, and Supabase. The system crawls the Pydantic AI documentation, stores content in a vector database, and provides Pydantic AI agent code by retrieving and analyzing relevant documentation chunks.
This version continues to support both local LLMs with Ollama and cloud-based LLMs through OpenAI/OpenRouter.
## Features
- Comprehensive Streamlit UI with multiple tabs for different functions
- Guided setup process with interactive instructions
- Environment variable management through the UI
- Database setup and configuration simplified
- Documentation crawling with progress tracking
- Agent service control and monitoring
- MCP configuration through the UI
- Multi-agent workflow using LangGraph
- Specialized agents for reasoning, routing, and coding
- Pydantic AI documentation crawling and chunking
- Vector database storage with Supabase
- Semantic search using OpenAI embeddings
- RAG-based question answering
- Support for code block preservation
- MCP server support for AI IDE integration
## Prerequisites
- Python 3.11+
- Supabase account (for vector database)
- OpenAI/OpenRouter API key or Ollama for local LLMs
## Installation
1. Clone the repository:
```bash
git clone https://github.com/coleam00/archon.git
cd archon
```
2. Install dependencies:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
```
## Usage
Start the Streamlit UI:
```bash
streamlit run streamlit_ui.py
```
The interface will be available at `http://localhost:8501`
### Streamlit UI Tabs
The Streamlit UI provides the following tabs:
1. **Intro**: Overview and guided setup process
2. **Environment**: Configure API keys and model settings
3. **Database**: Set up your Supabase vector database
4. **Documentation**: Crawl and index the Pydantic AI documentation
5. **Agent Service**: Start and monitor the agent service
6. **Chat**: Interact with Archon to create AI agents
7. **MCP**: Configure integration with AI IDEs
### Environment Configuration
The Environment tab allows you to set and manage all environment variables through the UI:
- Base URL for API endpoints
- API keys for LLM providers
- Supabase connection details
- Model selections for different agent roles
- Embedding model configuration
All settings are saved to an `env_vars.json` file, which is automatically loaded when Archon starts.
### Database Setup
The Database tab simplifies the process of setting up your Supabase database:
- Select embedding dimensions based on your model
- View SQL commands for table creation
- Get instructions for executing SQL in Supabase
- Clear existing data if needed
### Documentation Management
The Documentation tab provides an interface for crawling and managing documentation:
- Start and monitor the crawling process with progress tracking
- View logs of the crawling process
- Clear existing documentation
- View database statistics
### Agent Service Control
The Agent Service tab allows you to manage the agent service:
- Start, restart, and stop the service
- Monitor service output in real-time
- Clear output logs
- Auto-refresh for continuous monitoring
### MCP Configuration
The MCP tab simplifies the process of configuring MCP for AI IDEs:
- Select your IDE (Windsurf, Cursor, or Cline)
- Generate configuration commands or JSON
- Copy configuration to clipboard
- Get step-by-step instructions for your specific IDE
## Project Structure
### Core Files
- `streamlit_ui.py`: Comprehensive web interface for managing all aspects of Archon
- `graph_service.py`: FastAPI service that handles the agentic workflow
- `mcp_server.py`: MCP server script for AI IDE integration
- `requirements.txt`: Project dependencies
### Archon Package
- `archon/`: Core agent and workflow implementation
- `archon_graph.py`: LangGraph workflow definition and agent coordination
- `pydantic_ai_coder.py`: Main coding agent with RAG capabilities
- `crawl_pydantic_ai_docs.py`: Documentation crawler and processor
### Utilities
- `utils/`: Utility functions and database setup
- `utils.py`: Shared utility functions
- `site_pages.sql`: Database setup commands
- `env_vars.json`: Environment variables defined in the UI
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.

View File

@@ -0,0 +1,216 @@
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated, List, Any
from langgraph.config import get_stream_writer
from langgraph.types import interrupt
from dotenv import load_dotenv
from openai import AsyncOpenAI
from supabase import Client
import logfire
import os
import sys
from utils.utils import get_env_var
# Import the message classes from Pydantic AI
from pydantic_ai.messages import (
ModelMessage,
ModelMessagesTypeAdapter
)
# Add the parent directory to Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from archon.pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps, list_documentation_pages_helper
# Load environment variables
load_dotenv()
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
base_url = get_env_var('BASE_URL') or 'https://api.openai.com/v1'
api_key = get_env_var('LLM_API_KEY') or 'no-llm-api-key-provided'
is_ollama = "localhost" in base_url.lower()
reasoner_llm_model = get_env_var('REASONER_MODEL') or 'o3-mini'
reasoner = Agent(
OpenAIModel(reasoner_llm_model, base_url=base_url, api_key=api_key),
system_prompt='You are an expert at coding AI agents with Pydantic AI and defining the scope for doing so.',
)
primary_llm_model = get_env_var('PRIMARY_MODEL') or 'gpt-4o-mini'
router_agent = Agent(
OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key),
system_prompt='Your job is to route the user message either to the end of the conversation or to continue coding the AI agent.',
)
end_conversation_agent = Agent(
OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key),
system_prompt='Your job is to end a conversation for creating an AI agent by giving instructions for how to execute the agent and they saying a nice goodbye to the user.',
)
openai_client=None
if is_ollama:
openai_client = AsyncOpenAI(base_url=base_url,api_key=api_key)
else:
openai_client = AsyncOpenAI(api_key=get_env_var("OPENAI_API_KEY"))
if get_env_var("SUPABASE_URL"):
supabase: Client = Client(
get_env_var("SUPABASE_URL"),
get_env_var("SUPABASE_SERVICE_KEY")
)
else:
supabase = None
# Define state schema
class AgentState(TypedDict):
latest_user_message: str
messages: Annotated[List[bytes], lambda x, y: x + y]
scope: str
# Scope Definition Node with Reasoner LLM
async def define_scope_with_reasoner(state: AgentState):
# First, get the documentation pages so the reasoner can decide which ones are necessary
documentation_pages = await list_documentation_pages_helper(supabase)
documentation_pages_str = "\n".join(documentation_pages)
# Then, use the reasoner to define the scope
prompt = f"""
User AI Agent Request: {state['latest_user_message']}
Create detailed scope document for the AI agent including:
- Architecture diagram
- Core components
- External dependencies
- Testing strategy
Also based on these documentation pages available:
{documentation_pages_str}
Include a list of documentation pages that are relevant to creating this agent for the user in the scope document.
"""
result = await reasoner.run(prompt)
scope = result.data
# Get the directory one level up from the current file
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
scope_path = os.path.join(parent_dir, "workbench", "scope.md")
os.makedirs(os.path.join(parent_dir, "workbench"), exist_ok=True)
with open(scope_path, "w", encoding="utf-8") as f:
f.write(scope)
return {"scope": scope}
# Coding Node with Feedback Handling
async def coder_agent(state: AgentState, writer):
# Prepare dependencies
deps = PydanticAIDeps(
supabase=supabase,
openai_client=openai_client,
reasoner_output=state['scope']
)
# Get the message history into the format for Pydantic AI
message_history: list[ModelMessage] = []
for message_row in state['messages']:
message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row))
# Run the agent in a stream
if is_ollama:
writer = get_stream_writer()
result = await pydantic_ai_coder.run(state['latest_user_message'], deps=deps, message_history= message_history)
writer(result.data)
else:
async with pydantic_ai_coder.run_stream(
state['latest_user_message'],
deps=deps,
message_history= message_history
) as result:
# Stream partial text as it arrives
async for chunk in result.stream_text(delta=True):
writer(chunk)
# print(ModelMessagesTypeAdapter.validate_json(result.new_messages_json()))
return {"messages": [result.new_messages_json()]}
# Interrupt the graph to get the user's next message
def get_next_user_message(state: AgentState):
value = interrupt({})
# Set the user's latest message for the LLM to continue the conversation
return {
"latest_user_message": value
}
# Determine if the user is finished creating their AI agent or not
async def route_user_message(state: AgentState):
prompt = f"""
The user has sent a message:
{state['latest_user_message']}
If the user wants to end the conversation, respond with just the text "finish_conversation".
If the user wants to continue coding the AI agent, respond with just the text "coder_agent".
"""
result = await router_agent.run(prompt)
next_action = result.data
if next_action == "finish_conversation":
return "finish_conversation"
else:
return "coder_agent"
# End of conversation agent to give instructions for executing the agent
async def finish_conversation(state: AgentState, writer):
# Get the message history into the format for Pydantic AI
message_history: list[ModelMessage] = []
for message_row in state['messages']:
message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row))
# Run the agent in a stream
if is_ollama:
writer = get_stream_writer()
result = await end_conversation_agent.run(state['latest_user_message'], message_history= message_history)
writer(result.data)
else:
async with end_conversation_agent.run_stream(
state['latest_user_message'],
message_history= message_history
) as result:
# Stream partial text as it arrives
async for chunk in result.stream_text(delta=True):
writer(chunk)
return {"messages": [result.new_messages_json()]}
# Build workflow
builder = StateGraph(AgentState)
# Add nodes
builder.add_node("define_scope_with_reasoner", define_scope_with_reasoner)
builder.add_node("coder_agent", coder_agent)
builder.add_node("get_next_user_message", get_next_user_message)
builder.add_node("finish_conversation", finish_conversation)
# Set edges
builder.add_edge(START, "define_scope_with_reasoner")
builder.add_edge("define_scope_with_reasoner", "coder_agent")
builder.add_edge("coder_agent", "get_next_user_message")
builder.add_conditional_edges(
"get_next_user_message",
route_user_message,
{"coder_agent": "coder_agent", "finish_conversation": "finish_conversation"}
)
builder.add_edge("finish_conversation", END)
# Configure persistence
memory = MemorySaver()
agentic_flow = builder.compile(checkpointer=memory)

View File

@@ -0,0 +1,511 @@
import os
import sys
import asyncio
import threading
import subprocess
import requests
import json
from typing import List, Dict, Any, Optional, Callable
from xml.etree import ElementTree
from dataclasses import dataclass
from datetime import datetime, timezone
from urllib.parse import urlparse
from dotenv import load_dotenv
import re
import html2text
# Add the parent directory to sys.path to allow importing from the parent directory
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.utils import get_env_var
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from openai import AsyncOpenAI
from supabase import create_client, Client
load_dotenv()
# Initialize OpenAI and Supabase clients
base_url = get_env_var('BASE_URL') or 'https://api.openai.com/v1'
api_key = get_env_var('LLM_API_KEY') or 'no-llm-api-key-provided'
is_ollama = "localhost" in base_url.lower()
embedding_model = get_env_var('EMBEDDING_MODEL') or 'text-embedding-3-small'
openai_client=None
if is_ollama:
openai_client = AsyncOpenAI(base_url=base_url,api_key=api_key)
else:
openai_client = AsyncOpenAI(api_key=get_env_var("OPENAI_API_KEY"))
supabase: Client = create_client(
get_env_var("SUPABASE_URL"),
get_env_var("SUPABASE_SERVICE_KEY")
)
# Initialize HTML to Markdown converter
html_converter = html2text.HTML2Text()
html_converter.ignore_links = False
html_converter.ignore_images = False
html_converter.ignore_tables = False
html_converter.body_width = 0 # No wrapping
@dataclass
class ProcessedChunk:
url: str
chunk_number: int
title: str
summary: str
content: str
metadata: Dict[str, Any]
embedding: List[float]
class CrawlProgressTracker:
"""Class to track progress of the crawling process."""
def __init__(self,
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None):
"""Initialize the progress tracker.
Args:
progress_callback: Function to call with progress updates
"""
self.progress_callback = progress_callback
self.urls_found = 0
self.urls_processed = 0
self.urls_succeeded = 0
self.urls_failed = 0
self.chunks_stored = 0
self.logs = []
self.is_running = False
self.start_time = None
self.end_time = None
def log(self, message: str):
"""Add a log message and update progress."""
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.logs.append(log_entry)
print(message) # Also print to console
# Call the progress callback if provided
if self.progress_callback:
self.progress_callback(self.get_status())
def start(self):
"""Mark the crawling process as started."""
self.is_running = True
self.start_time = datetime.now()
self.log("Crawling process started")
# Call the progress callback if provided
if self.progress_callback:
self.progress_callback(self.get_status())
def complete(self):
"""Mark the crawling process as completed."""
self.is_running = False
self.end_time = datetime.now()
duration = self.end_time - self.start_time if self.start_time else None
duration_str = str(duration).split('.')[0] if duration else "unknown"
self.log(f"Crawling process completed in {duration_str}")
# Call the progress callback if provided
if self.progress_callback:
self.progress_callback(self.get_status())
def get_status(self) -> Dict[str, Any]:
"""Get the current status of the crawling process."""
return {
"is_running": self.is_running,
"urls_found": self.urls_found,
"urls_processed": self.urls_processed,
"urls_succeeded": self.urls_succeeded,
"urls_failed": self.urls_failed,
"chunks_stored": self.chunks_stored,
"progress_percentage": (self.urls_processed / self.urls_found * 100) if self.urls_found > 0 else 0,
"logs": self.logs,
"start_time": self.start_time,
"end_time": self.end_time
}
@property
def is_completed(self) -> bool:
"""Return True if the crawling process is completed."""
return not self.is_running and self.end_time is not None
@property
def is_successful(self) -> bool:
"""Return True if the crawling process completed successfully."""
return self.is_completed and self.urls_failed == 0 and self.urls_succeeded > 0
def chunk_text(text: str, chunk_size: int = 5000) -> List[str]:
"""Split text into chunks, respecting code blocks and paragraphs."""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
# Calculate end position
end = start + chunk_size
# If we're at the end of the text, just take what's left
if end >= text_length:
chunks.append(text[start:].strip())
break
# Try to find a code block boundary first (```)
chunk = text[start:end]
code_block = chunk.rfind('```')
if code_block != -1 and code_block > chunk_size * 0.3:
end = start + code_block
# If no code block, try to break at a paragraph
elif '\n\n' in chunk:
# Find the last paragraph break
last_break = chunk.rfind('\n\n')
if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_break
# If no paragraph break, try to break at a sentence
elif '. ' in chunk:
# Find the last sentence break
last_period = chunk.rfind('. ')
if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_period + 1
# Extract chunk and clean it up
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move start position for next chunk
start = max(start + 1, end)
return chunks
async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]:
"""Extract title and summary using GPT-4."""
system_prompt = """You are an AI that extracts titles and summaries from documentation chunks.
Return a JSON object with 'title' and 'summary' keys.
For the title: If this seems like the start of a document, extract its title. If it's a middle chunk, derive a descriptive title.
For the summary: Create a concise summary of the main points in this chunk.
Keep both title and summary concise but informative."""
try:
response = await openai_client.chat.completions.create(
model=get_env_var("PRIMARY_MODEL") or "gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context
],
response_format={ "type": "json_object" }
)
return json.loads(response.choices[0].message.content)
except Exception as e:
print(f"Error getting title and summary: {e}")
return {"title": "Error processing title", "summary": "Error processing summary"}
async def get_embedding(text: str) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model= embedding_model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
async def process_chunk(chunk: str, chunk_number: int, url: str) -> ProcessedChunk:
"""Process a single chunk of text."""
# Get title and summary
extracted = await get_title_and_summary(chunk, url)
# Get embedding
embedding = await get_embedding(chunk)
# Create metadata
metadata = {
"source": "pydantic_ai_docs",
"chunk_size": len(chunk),
"crawled_at": datetime.now(timezone.utc).isoformat(),
"url_path": urlparse(url).path
}
return ProcessedChunk(
url=url,
chunk_number=chunk_number,
title=extracted['title'],
summary=extracted['summary'],
content=chunk, # Store the original chunk content
metadata=metadata,
embedding=embedding
)
async def insert_chunk(chunk: ProcessedChunk):
"""Insert a processed chunk into Supabase."""
try:
data = {
"url": chunk.url,
"chunk_number": chunk.chunk_number,
"title": chunk.title,
"summary": chunk.summary,
"content": chunk.content,
"metadata": chunk.metadata,
"embedding": chunk.embedding
}
result = supabase.table("site_pages").insert(data).execute()
print(f"Inserted chunk {chunk.chunk_number} for {chunk.url}")
return result
except Exception as e:
print(f"Error inserting chunk: {e}")
return None
async def process_and_store_document(url: str, markdown: str, tracker: Optional[CrawlProgressTracker] = None):
"""Process a document and store its chunks in parallel."""
# Split into chunks
chunks = chunk_text(markdown)
if tracker:
tracker.log(f"Split document into {len(chunks)} chunks for {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Split document into {len(chunks)} chunks for {url}")
# Process chunks in parallel
tasks = [
process_chunk(chunk, i, url)
for i, chunk in enumerate(chunks)
]
processed_chunks = await asyncio.gather(*tasks)
if tracker:
tracker.log(f"Processed {len(processed_chunks)} chunks for {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Processed {len(processed_chunks)} chunks for {url}")
# Store chunks in parallel
insert_tasks = [
insert_chunk(chunk)
for chunk in processed_chunks
]
await asyncio.gather(*insert_tasks)
if tracker:
tracker.chunks_stored += len(processed_chunks)
tracker.log(f"Stored {len(processed_chunks)} chunks for {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Stored {len(processed_chunks)} chunks for {url}")
def fetch_url_content(url: str) -> str:
"""Fetch content from a URL using requests and convert to markdown."""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
# Convert HTML to Markdown
markdown = html_converter.handle(response.text)
# Clean up the markdown
markdown = re.sub(r'\n{3,}', '\n\n', markdown) # Remove excessive newlines
return markdown
except Exception as e:
raise Exception(f"Error fetching {url}: {str(e)}")
async def crawl_parallel_with_requests(urls: List[str], tracker: Optional[CrawlProgressTracker] = None, max_concurrent: int = 5):
"""Crawl multiple URLs in parallel with a concurrency limit using direct HTTP requests."""
# Create a semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
async def process_url(url: str):
async with semaphore:
if tracker:
tracker.log(f"Crawling: {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Crawling: {url}")
try:
# Use a thread pool to run the blocking HTTP request
loop = asyncio.get_running_loop()
if tracker:
tracker.log(f"Fetching content from: {url}")
else:
print(f"Fetching content from: {url}")
markdown = await loop.run_in_executor(None, fetch_url_content, url)
if markdown:
if tracker:
tracker.urls_succeeded += 1
tracker.log(f"Successfully crawled: {url}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Successfully crawled: {url}")
await process_and_store_document(url, markdown, tracker)
else:
if tracker:
tracker.urls_failed += 1
tracker.log(f"Failed: {url} - No content retrieved")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Failed: {url} - No content retrieved")
except Exception as e:
if tracker:
tracker.urls_failed += 1
tracker.log(f"Error processing {url}: {str(e)}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Error processing {url}: {str(e)}")
finally:
if tracker:
tracker.urls_processed += 1
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
# Process all URLs in parallel with limited concurrency
if tracker:
tracker.log(f"Processing {len(urls)} URLs with concurrency {max_concurrent}")
# Ensure UI gets updated
if tracker.progress_callback:
tracker.progress_callback(tracker.get_status())
else:
print(f"Processing {len(urls)} URLs with concurrency {max_concurrent}")
await asyncio.gather(*[process_url(url) for url in urls])
def get_pydantic_ai_docs_urls() -> List[str]:
"""Get URLs from Pydantic AI docs sitemap."""
sitemap_url = "https://ai.pydantic.dev/sitemap.xml"
try:
response = requests.get(sitemap_url)
response.raise_for_status()
# Parse the XML
root = ElementTree.fromstring(response.content)
# Extract all URLs from the sitemap
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
urls = [loc.text for loc in root.findall('.//ns:loc', namespace)]
return urls
except Exception as e:
print(f"Error fetching sitemap: {e}")
return []
async def clear_existing_records():
"""Clear all existing records with source='pydantic_ai_docs' from the site_pages table."""
try:
result = supabase.table("site_pages").delete().eq("metadata->>source", "pydantic_ai_docs").execute()
print("Cleared existing pydantic_ai_docs records from site_pages")
return result
except Exception as e:
print(f"Error clearing existing records: {e}")
return None
async def main_with_requests(tracker: Optional[CrawlProgressTracker] = None):
"""Main function using direct HTTP requests instead of browser automation."""
try:
# Start tracking if tracker is provided
if tracker:
tracker.start()
else:
print("Starting crawling process...")
# Clear existing records first
if tracker:
tracker.log("Clearing existing Pydantic AI docs records...")
else:
print("Clearing existing Pydantic AI docs records...")
await clear_existing_records()
if tracker:
tracker.log("Existing records cleared")
else:
print("Existing records cleared")
# Get URLs from Pydantic AI docs
if tracker:
tracker.log("Fetching URLs from Pydantic AI sitemap...")
else:
print("Fetching URLs from Pydantic AI sitemap...")
urls = get_pydantic_ai_docs_urls()
if not urls:
if tracker:
tracker.log("No URLs found to crawl")
tracker.complete()
else:
print("No URLs found to crawl")
return
if tracker:
tracker.urls_found = len(urls)
tracker.log(f"Found {len(urls)} URLs to crawl")
else:
print(f"Found {len(urls)} URLs to crawl")
# Crawl the URLs using direct HTTP requests
await crawl_parallel_with_requests(urls, tracker)
# Mark as complete if tracker is provided
if tracker:
tracker.complete()
else:
print("Crawling process completed")
except Exception as e:
if tracker:
tracker.log(f"Error in crawling process: {str(e)}")
tracker.complete()
else:
print(f"Error in crawling process: {str(e)}")
def start_crawl_with_requests(progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None) -> CrawlProgressTracker:
"""Start the crawling process using direct HTTP requests in a separate thread and return the tracker."""
tracker = CrawlProgressTracker(progress_callback)
def run_crawl():
try:
asyncio.run(main_with_requests(tracker))
except Exception as e:
print(f"Error in crawl thread: {e}")
tracker.log(f"Thread error: {str(e)}")
tracker.complete()
# Start the crawling process in a separate thread
thread = threading.Thread(target=run_crawl)
thread.daemon = True
thread.start()
return tracker
if __name__ == "__main__":
# Run the main function directly
print("Starting crawler...")
asyncio.run(main_with_requests())
print("Crawler finished.")

View File

@@ -0,0 +1,7 @@
{
"dependencies": ["."],
"graphs": {
"agent": "./archon_graph.py:agentic_flow"
},
"env": "../.env"
}

View File

@@ -0,0 +1,228 @@
from __future__ import annotations as _annotations
from dataclasses import dataclass
from dotenv import load_dotenv
import logfire
import asyncio
import httpx
import os
import sys
import json
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
from pydantic_ai import Agent, ModelRetry, RunContext
from pydantic_ai.models.openai import OpenAIModel
from openai import AsyncOpenAI
from supabase import Client
from utils.utils import get_env_var
# Add the parent directory to sys.path to allow importing from the parent directory
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
load_dotenv()
llm = get_env_var('PRIMARY_MODEL') or 'gpt-4o-mini'
base_url = get_env_var('BASE_URL') or 'https://api.openai.com/v1'
api_key = get_env_var('LLM_API_KEY') or 'no-llm-api-key-provided'
model = OpenAIModel(llm, base_url=base_url, api_key=api_key)
embedding_model = get_env_var('EMBEDDING_MODEL') or 'text-embedding-3-small'
logfire.configure(send_to_logfire='if-token-present')
is_ollama = "localhost" in base_url.lower()
@dataclass
class PydanticAIDeps:
supabase: Client
openai_client: AsyncOpenAI
reasoner_output: str
system_prompt = """
~~ CONTEXT: ~~
You are an expert at Pydantic AI - a Python AI agent framework that you have access to all the documentation to,
including examples, an API reference, and other resources to help you build Pydantic AI agents.
~~ GOAL: ~~
Your only job is to help the user create an AI agent with Pydantic AI.
The user will describe the AI agent they want to build, or if they don't, guide them towards doing so.
You will take their requirements, and then search through the Pydantic AI documentation with the tools provided
to find all the necessary information to create the AI agent with correct code.
It's important for you to search through multiple Pydantic AI documentation pages to get all the information you need.
Almost never stick to just one page - use RAG and the other documentation tools multiple times when you are creating
an AI agent from scratch for the user.
~~ STRUCTURE: ~~
When you build an AI agent from scratch, split the agent into this files and give the code for each:
- `agent.py`: The main agent file, which is where the Pydantic AI agent is defined.
- `agent_tools.py`: A tools file for the agent, which is where all the tool functions are defined. Use this for more complex agents.
- `agent_prompts.py`: A prompts file for the agent, which includes all system prompts and other prompts used by the agent. Use this when there are many prompts or large ones.
- `.env.example`: An example `.env` file - specify each variable that the user will need to fill in and a quick comment above each one for how to do so.
- `requirements.txt`: Don't include any versions, just the top level package names needed for the agent.
~~ INSTRUCTIONS: ~~
- Don't ask the user before taking an action, just do it. Always make sure you look at the documentation with the provided tools before writing any code.
- When you first look at the documentation, always start with RAG.
Then also always check the list of available documentation pages and retrieve the content of page(s) if it'll help.
- Always let the user know when you didn't find the answer in the documentation or the right URL - be honest.
- Helpful tip: when starting a new AI agent build, it's a good idea to look at the 'weather agent' in the docs as an example.
- When starting a new AI agent build, always produce the full code for the AI agent - never tell the user to finish a tool/function.
- When refining an existing AI agent build in a conversation, just share the code changes necessary.
- Each time you respond to the user, ask them to let you know either if they need changes or the code looks good.
"""
pydantic_ai_coder = Agent(
model,
system_prompt=system_prompt,
deps_type=PydanticAIDeps,
retries=2
)
@pydantic_ai_coder.system_prompt
def add_reasoner_output(ctx: RunContext[str]) -> str:
return f"""
\n\nAdditional thoughts/instructions from the reasoner LLM.
This scope includes documentation pages for you to search as well:
{ctx.deps.reasoner_output}
"""
# Add this in to get some crazy tool calling:
# You must get ALL documentation pages listed in the scope.
async def get_embedding(text: str, openai_client: AsyncOpenAI) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model=embedding_model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
@pydantic_ai_coder.tool
async def retrieve_relevant_documentation(ctx: RunContext[PydanticAIDeps], user_query: str) -> str:
"""
Retrieve relevant documentation chunks based on the query with RAG.
Args:
ctx: The context including the Supabase client and OpenAI client
user_query: The user's question or query
Returns:
A formatted string containing the top 5 most relevant documentation chunks
"""
try:
# Get the embedding for the query
query_embedding = await get_embedding(user_query, ctx.deps.openai_client)
# Query Supabase for relevant documents
result = ctx.deps.supabase.rpc(
'match_site_pages',
{
'query_embedding': query_embedding,
'match_count': 5,
'filter': {'source': 'pydantic_ai_docs'}
}
).execute()
if not result.data:
return "No relevant documentation found."
# Format the results
formatted_chunks = []
for doc in result.data:
chunk_text = f"""
# {doc['title']}
{doc['content']}
"""
formatted_chunks.append(chunk_text)
# Join all chunks with a separator
return "\n\n---\n\n".join(formatted_chunks)
except Exception as e:
print(f"Error retrieving documentation: {e}")
return f"Error retrieving documentation: {str(e)}"
async def list_documentation_pages_helper(supabase: Client) -> List[str]:
"""
Function to retrieve a list of all available Pydantic AI documentation pages.
This is called by the list_documentation_pages tool and also externally
to fetch documentation pages for the reasoner LLM.
Returns:
List[str]: List of unique URLs for all documentation pages
"""
try:
# Query Supabase for unique URLs where source is pydantic_ai_docs
result = supabase.from_('site_pages') \
.select('url') \
.eq('metadata->>source', 'pydantic_ai_docs') \
.execute()
if not result.data:
return []
# Extract unique URLs
urls = sorted(set(doc['url'] for doc in result.data))
return urls
except Exception as e:
print(f"Error retrieving documentation pages: {e}")
return []
@pydantic_ai_coder.tool
async def list_documentation_pages(ctx: RunContext[PydanticAIDeps]) -> List[str]:
"""
Retrieve a list of all available Pydantic AI documentation pages.
Returns:
List[str]: List of unique URLs for all documentation pages
"""
return await list_documentation_pages_helper(ctx.deps.supabase)
@pydantic_ai_coder.tool
async def get_page_content(ctx: RunContext[PydanticAIDeps], url: str) -> str:
"""
Retrieve the full content of a specific documentation page by combining all its chunks.
Args:
ctx: The context including the Supabase client
url: The URL of the page to retrieve
Returns:
str: The complete page content with all chunks combined in order
"""
try:
# Query Supabase for all chunks of this URL, ordered by chunk_number
result = ctx.deps.supabase.from_('site_pages') \
.select('title, content, chunk_number') \
.eq('url', url) \
.eq('metadata->>source', 'pydantic_ai_docs') \
.order('chunk_number') \
.execute()
if not result.data:
return f"No content found for URL: {url}"
# Format the page with its title and all chunks
page_title = result.data[0]['title'].split(' - ')[0] # Get the main title
formatted_content = [f"# {page_title}\n"]
# Add each chunk's content
for chunk in result.data:
formatted_content.append(chunk['content'])
# Join everything together
return "\n\n".join(formatted_content)
except Exception as e:
print(f"Error retrieving page content: {e}")
return f"Error retrieving page content: {str(e)}"

View File

@@ -0,0 +1,69 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any
from archon.archon_graph import agentic_flow
from langgraph.types import Command
from utils.utils import write_to_log
app = FastAPI()
class InvokeRequest(BaseModel):
message: str
thread_id: str
is_first_message: bool = False
config: Optional[Dict[str, Any]] = None
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "ok"}
@app.post("/invoke")
async def invoke_agent(request: InvokeRequest):
"""Process a message through the agentic flow and return the complete response.
The agent streams the response but this API endpoint waits for the full output
before returning so it's a synchronous operation for MCP.
Another endpoint will be made later to fully stream the response from the API.
Args:
request: The InvokeRequest containing message and thread info
Returns:
dict: Contains the complete response from the agent
"""
try:
config = request.config or {
"configurable": {
"thread_id": request.thread_id
}
}
response = ""
if request.is_first_message:
write_to_log(f"Processing first message for thread {request.thread_id}")
async for msg in agentic_flow.astream(
{"latest_user_message": request.message},
config,
stream_mode="custom"
):
response += str(msg)
else:
write_to_log(f"Processing continuation for thread {request.thread_id}")
async for msg in agentic_flow.astream(
Command(resume=request.message),
config,
stream_mode="custom"
):
response += str(msg)
write_to_log(f"Final response for thread {request.thread_id}: {response}")
return {"response": response}
except Exception as e:
write_to_log(f"Error processing message for thread {request.thread_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8100)

View File

@@ -0,0 +1,10 @@
{
"mcpServers": {
"archon": {
"command": "C:\\Users\\colem\\oTTomator\\archon\\venv\\Scripts\\python.exe",
"args": [
"C:\\Users\\colem\\oTTomator\\archon\\mcp_server.py"
]
}
}
}

View File

@@ -0,0 +1,95 @@
import os
import sys
import asyncio
import threading
from mcp.server.fastmcp import FastMCP
import requests
from typing import Dict, List
import uuid
from utils.utils import write_to_log
from graph_service import app
import uvicorn
# Initialize FastMCP server
mcp = FastMCP("archon")
# Store active threads
active_threads: Dict[str, List[str]] = {}
# FastAPI service URL
GRAPH_SERVICE_URL = "http://127.0.0.1:8100"
@mcp.tool()
async def create_thread() -> str:
"""Create a new conversation thread for Archon.
Always call this tool before invoking Archon for the first time in a conversation.
(if you don't already have a thread ID)
Returns:
str: A unique thread ID for the conversation
"""
thread_id = str(uuid.uuid4())
active_threads[thread_id] = []
write_to_log(f"Created new thread: {thread_id}")
return thread_id
def _make_request(thread_id: str, user_input: str, config: dict) -> str:
"""Make synchronous request to graph service"""
response = requests.post(
f"{GRAPH_SERVICE_URL}/invoke",
json={
"message": user_input,
"thread_id": thread_id,
"is_first_message": not active_threads[thread_id],
"config": config
}
)
response.raise_for_status()
return response.json()
@mcp.tool()
async def run_agent(thread_id: str, user_input: str) -> str:
"""Run the Archon agent with user input.
Only use this tool after you have called create_thread in this conversation to get a unique thread ID.
If you already created a thread ID in this conversation, do not create another one. Reuse the same ID.
After you receive the code from Archon, always implement it into the codebase unless asked not to.
Args:
thread_id: The conversation thread ID
user_input: The user's message to process
Returns:
str: The agent's response which generally includes the code for the agent
"""
if thread_id not in active_threads:
write_to_log(f"Error: Thread not found - {thread_id}")
raise ValueError("Thread not found")
write_to_log(f"Processing message for thread {thread_id}: {user_input}")
config = {
"configurable": {
"thread_id": thread_id
}
}
try:
result = await asyncio.to_thread(_make_request, thread_id, user_input, config)
active_threads[thread_id].append(user_input)
return result['response']
except Exception as e:
raise
if __name__ == "__main__":
write_to_log("Starting MCP server")
# Run MCP server
mcp.run(transport='stdio')

Binary file not shown.

After

Width:  |  Height:  |  Size: 576 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 327 KiB

View File

@@ -0,0 +1,177 @@
aiofiles==24.1.0
aiohappyeyeballs==2.4.4
aiohttp==3.11.11
aiosignal==1.3.2
aiosqlite==0.20.0
altair==5.5.0
annotated-types==0.7.0
anthropic==0.42.0
anyio==4.8.0
attrs==24.3.0
beautifulsoup4==4.12.3
blinker==1.9.0
cachetools==5.5.0
certifi==2024.12.14
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
cohere==5.13.12
colorama==0.4.6
Crawl4AI==0.4.247
cryptography==43.0.3
Deprecated==1.2.15
deprecation==2.1.0
distro==1.9.0
dnspython==2.7.0
email_validator==2.2.0
eval_type_backport==0.2.2
executing==2.1.0
fake-http-header==0.3.5
fastapi==0.115.8
fastapi-cli==0.0.7
fastavro==1.10.0
filelock==3.16.1
frozenlist==1.5.0
fsspec==2024.12.0
gitdb==4.0.12
GitPython==3.1.44
google-auth==2.37.0
googleapis-common-protos==1.66.0
gotrue==2.11.1
greenlet==3.1.1
griffe==1.5.4
groq==0.15.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
html2text==2024.2.26
httpcore==1.0.7
httptools==0.6.4
httpx==0.27.2
httpx-sse==0.4.0
huggingface-hub==0.27.1
hyperframe==6.0.1
idna==3.10
importlib_metadata==8.5.0
iniconfig==2.0.0
itsdangerous==2.2.0
Jinja2==3.1.5
jiter==0.8.2
joblib==1.4.2
jsonpatch==1.33
jsonpath-python==1.0.6
jsonpointer==3.0.0
jsonschema==4.23.0
jsonschema-specifications==2024.10.1
jsonschema_rs==0.25.1
langchain-core==0.3.33
langgraph==0.2.69
langgraph-api==0.0.22
langgraph-checkpoint==2.0.10
langgraph-cli==0.1.71
langgraph-sdk==0.1.51
langsmith==0.3.6
litellm==1.57.8
logfire==3.1.0
logfire-api==3.1.0
lxml==5.3.0
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mcp==1.2.1
mdurl==0.1.2
mistralai==1.2.6
mockito==1.5.3
msgpack==1.1.0
multidict==6.1.0
mypy-extensions==1.0.0
narwhals==1.21.1
nltk==3.9.1
numpy==2.2.1
openai==1.59.6
opentelemetry-api==1.29.0
opentelemetry-exporter-otlp-proto-common==1.29.0
opentelemetry-exporter-otlp-proto-http==1.29.0
opentelemetry-instrumentation==0.50b0
opentelemetry-proto==1.29.0
opentelemetry-sdk==1.29.0
opentelemetry-semantic-conventions==0.50b0
orjson==3.10.15
packaging==24.2
pandas==2.2.3
pillow==10.4.0
playwright==1.49.1
pluggy==1.5.0
postgrest==0.19.1
propcache==0.2.1
protobuf==5.29.3
psutil==6.1.1
pyarrow==18.1.0
pyasn1==0.6.1
pyasn1_modules==0.4.1
pycparser==2.22
pydantic==2.10.5
pydantic-ai==0.0.22
pydantic-ai-slim==0.0.22
pydantic-extra-types==2.10.2
pydantic-graph==0.0.22
pydantic-settings==2.7.1
pydantic_core==2.27.2
pydeck==0.9.1
pyee==12.0.0
Pygments==2.19.1
PyJWT==2.10.1
pyOpenSSL==24.3.0
pytest==8.3.4
pytest-mockito==0.0.4
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-multipart==0.0.20
pytz==2024.2
PyYAML==6.0.2
rank-bm25==0.2.2
realtime==2.1.0
referencing==0.35.1
regex==2024.11.6
requests==2.32.3
requests-toolbelt==1.0.0
rich==13.9.4
rich-toolkit==0.13.2
rpds-py==0.22.3
rsa==4.9
shellingham==1.5.4
six==1.17.0
smmap==5.0.2
sniffio==1.3.1
snowballstemmer==2.2.0
soupsieve==2.6
sse-starlette==2.1.3
starlette==0.45.3
storage3==0.11.0
streamlit==1.41.1
StrEnum==0.4.15
structlog==24.4.0
supabase==2.11.0
supafunc==0.9.0
tenacity==9.0.0
tf-playwright-stealth==1.1.0
tiktoken==0.8.0
tokenizers==0.21.0
toml==0.10.2
tornado==6.4.2
tqdm==4.67.1
typer==0.15.1
types-requests==2.32.0.20241016
typing-inspect==0.9.0
typing_extensions==4.12.2
tzdata==2024.2
ujson==5.10.0
urllib3==2.3.0
uvicorn==0.34.0
watchdog==6.0.0
watchfiles==1.0.4
websockets==13.1
wrapt==1.17.1
xxhash==3.5.0
yarl==1.18.3
zipp==3.21.0
zstandard==0.23.0

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,72 @@
-- Enable the pgvector extension
create extension if not exists vector;
-- Create the documentation chunks table
create table site_pages (
id bigserial primary key,
url varchar not null,
chunk_number integer not null,
title varchar not null,
summary varchar not null,
content text not null, -- Added content column
metadata jsonb not null default '{}'::jsonb, -- Added metadata column
embedding vector(1536), -- OpenAI embeddings are 1536 dimensions
created_at timestamp with time zone default timezone('utc'::text, now()) not null,
-- Add a unique constraint to prevent duplicate chunks for the same URL
unique(url, chunk_number)
);
-- Create an index for better vector similarity search performance
create index on site_pages using ivfflat (embedding vector_cosine_ops);
-- Create an index on metadata for faster filtering
create index idx_site_pages_metadata on site_pages using gin (metadata);
-- Create a function to search for documentation chunks
create function match_site_pages (
query_embedding vector(1536),
match_count int default 10,
filter jsonb DEFAULT '{}'::jsonb
) returns table (
id bigint,
url varchar,
chunk_number integer,
title varchar,
summary varchar,
content text,
metadata jsonb,
similarity float
)
language plpgsql
as $$
#variable_conflict use_column
begin
return query
select
id,
url,
chunk_number,
title,
summary,
content,
metadata,
1 - (site_pages.embedding <=> query_embedding) as similarity
from site_pages
where metadata @> filter
order by site_pages.embedding <=> query_embedding
limit match_count;
end;
$$;
-- Everything above will work for any PostgreSQL database. The below commands are for Supabase security
-- Enable RLS on the table
alter table site_pages enable row level security;
-- Create a policy that allows anyone to read
create policy "Allow public read access"
on site_pages
for select
to public
using (true);

View File

@@ -0,0 +1,111 @@
import os
from datetime import datetime
from functools import wraps
import inspect
import json
from typing import Optional
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def write_to_log(message: str):
"""Write a message to the logs.txt file in the workbench directory.
Args:
message: The message to log
"""
# Get the directory one level up from the current file
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
workbench_dir = os.path.join(parent_dir, "workbench")
log_path = os.path.join(workbench_dir, "logs.txt")
os.makedirs(workbench_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
with open(log_path, "a", encoding="utf-8") as f:
f.write(log_entry)
def get_env_var(var_name: str) -> Optional[str]:
"""Get an environment variable from the saved JSON file or from environment variables.
Args:
var_name: The name of the environment variable to retrieve
Returns:
The value of the environment variable or None if not found
"""
# Path to the JSON file storing environment variables
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
env_file_path = os.path.join(current_dir, "env_vars.json")
# First try to get from JSON file
if os.path.exists(env_file_path):
try:
with open(env_file_path, "r") as f:
env_vars = json.load(f)
if var_name in env_vars and env_vars[var_name]:
return env_vars[var_name]
except (json.JSONDecodeError, IOError) as e:
write_to_log(f"Error reading env_vars.json: {str(e)}")
# If not found in JSON, try to get from environment variables
return os.environ.get(var_name)
def save_env_var(var_name: str, value: str) -> bool:
"""Save an environment variable to the JSON file.
Args:
var_name: The name of the environment variable
value: The value to save
Returns:
True if successful, False otherwise
"""
# Path to the JSON file storing environment variables
current_dir = os.path.dirname(os.path.abspath(__file__))
env_file_path = os.path.join(current_dir, "env_vars.json")
# Load existing env vars or create empty dict
env_vars = {}
if os.path.exists(env_file_path):
try:
with open(env_file_path, "r") as f:
env_vars = json.load(f)
except (json.JSONDecodeError, IOError) as e:
write_to_log(f"Error reading env_vars.json: {str(e)}")
# Continue with empty dict if file is corrupted
# Update the variable
env_vars[var_name] = value
# Save back to file
try:
with open(env_file_path, "w") as f:
json.dump(env_vars, f, indent=2)
return True
except IOError as e:
write_to_log(f"Error writing to env_vars.json: {str(e)}")
return False
def log_node_execution(func):
"""Decorator to log the start and end of graph node execution.
Args:
func: The async function to wrap
"""
@wraps(func)
async def wrapper(*args, **kwargs):
func_name = func.__name__
write_to_log(f"Starting node: {func_name}")
try:
result = await func(*args, **kwargs)
write_to_log(f"Completed node: {func_name}")
return result
except Exception as e:
write_to_log(f"Error in node {func_name}: {str(e)}")
raise
return wrapper

BIN
public/ArchonLightGrey.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 327 KiB

View File

@@ -44,6 +44,7 @@ groq==0.15.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
html2text==2024.2.26
httpcore==1.0.7
httptools==0.6.4
httpx==0.27.2

View File

@@ -1,69 +0,0 @@
import os
import json
import subprocess
import sys
import platform
def setup_venv():
# Get the absolute path to the current directory
base_path = os.path.abspath(os.path.dirname(__file__))
venv_path = os.path.join(base_path, 'venv')
venv_created = False
# Create virtual environment if it doesn't exist
if not os.path.exists(venv_path):
print("Creating virtual environment...")
subprocess.run([sys.executable, '-m', 'venv', venv_path], check=True)
print("Virtual environment created successfully!")
venv_created = True
else:
print("Virtual environment already exists.")
# Install requirements if we just created the venv
if venv_created:
print("\nInstalling requirements...")
# Determine the correct pip path based on the OS
if platform.system() == "Windows":
pip_path = os.path.join(venv_path, 'Scripts', 'pip.exe')
else: # macOS or Linux
pip_path = os.path.join(venv_path, 'bin', 'pip')
requirements_path = os.path.join(base_path, 'requirements.txt')
subprocess.run([pip_path, 'install', '-r', requirements_path], check=True)
print("Requirements installed successfully!")
def generate_mcp_config():
# Get the absolute path to the current directory
base_path = os.path.abspath(os.path.dirname(__file__))
# Determine the correct python path based on the OS
if platform.system() == "Windows":
python_path = os.path.join(base_path, 'venv', 'Scripts', 'python.exe')
else: # macOS or Linux
python_path = os.path.join(base_path, 'venv', 'bin', 'python')
server_script_path = os.path.join(base_path, 'mcp_server.py')
# Create the config dictionary
config = {
"mcpServers": {
"archon": {
"command": python_path,
"args": [server_script_path]
}
}
}
# Write the config to a file
config_path = os.path.join(base_path, 'mcp-config.json')
with open(config_path, 'w') as f:
json.dump(config, f, indent=2)
print(f"\nMCP configuration has been written to: {config_path}")
print(f"\nMCP configuration for Cursor:\n\n{python_path} {server_script_path}")
print("\nMCP configuration for Windsurf/Claude Desktop:")
print(json.dumps(config, indent=2))
if __name__ == '__main__':
setup_venv()
generate_mcp_config()

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,12 @@ import os
from datetime import datetime
from functools import wraps
import inspect
import json
from typing import Optional
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def write_to_log(message: str):
"""Write a message to the logs.txt file in the workbench directory.
@@ -22,6 +28,69 @@ def write_to_log(message: str):
with open(log_path, "a", encoding="utf-8") as f:
f.write(log_entry)
def get_env_var(var_name: str) -> Optional[str]:
"""Get an environment variable from the saved JSON file or from environment variables.
Args:
var_name: The name of the environment variable to retrieve
Returns:
The value of the environment variable or None if not found
"""
# Path to the JSON file storing environment variables
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
env_file_path = os.path.join(current_dir, "env_vars.json")
# First try to get from JSON file
if os.path.exists(env_file_path):
try:
with open(env_file_path, "r") as f:
env_vars = json.load(f)
if var_name in env_vars and env_vars[var_name]:
return env_vars[var_name]
except (json.JSONDecodeError, IOError) as e:
write_to_log(f"Error reading env_vars.json: {str(e)}")
# If not found in JSON, try to get from environment variables
return os.environ.get(var_name)
def save_env_var(var_name: str, value: str) -> bool:
"""Save an environment variable to the JSON file.
Args:
var_name: The name of the environment variable
value: The value to save
Returns:
True if successful, False otherwise
"""
# Path to the JSON file storing environment variables
current_dir = os.path.dirname(os.path.abspath(__file__))
env_file_path = os.path.join(current_dir, "env_vars.json")
# Load existing env vars or create empty dict
env_vars = {}
if os.path.exists(env_file_path):
try:
with open(env_file_path, "r") as f:
env_vars = json.load(f)
except (json.JSONDecodeError, IOError) as e:
write_to_log(f"Error reading env_vars.json: {str(e)}")
# Continue with empty dict if file is corrupted
# Update the variable
env_vars[var_name] = value
# Save back to file
try:
with open(env_file_path, "w") as f:
json.dump(env_vars, f, indent=2)
return True
except IOError as e:
write_to_log(f"Error writing to env_vars.json: {str(e)}")
return False
def log_node_execution(func):
"""Decorator to log the start and end of graph node execution.