Files
archon/docs/docs/background-tasks.mdx

320 lines
10 KiB
Plaintext

---
sidebar_position: 25
sidebar_label: Background Tasks
---
# Background Tasks Architecture
## Overview
This document describes the architecture for handling long-running operations in Archon without blocking the FastAPI/Socket.IO event loop. The key insight is that browser-based operations (crawling) must remain in the main event loop, while only CPU-intensive operations should be offloaded to threads.
## Architecture Principles
1. **Keep async I/O operations in the main event loop** - Browser automation, database operations, and network requests must stay async
2. **Only offload CPU-intensive work to threads** - Text processing, chunking, and synchronous API calls can run in ThreadPoolExecutor
3. **Use asyncio.create_task for background async work** - Don't block the event loop, but keep async operations async
4. **Maintain single event loop** - Never create new event loops in threads
## Architecture Diagram
```mermaid
graph TB
subgraph "Main Event Loop"
API[FastAPI Endpoint]
SIO[Socket.IO Handler]
BGTask[Background Async Task]
Crawler[AsyncWebCrawler]
DB[Database Operations]
Progress[Progress Updates]
end
subgraph "ThreadPoolExecutor"
Chunk[Text Chunking]
Embed[Embedding Generation]
Summary[Summary Extraction]
CodeExt[Code Extraction]
end
API -->|asyncio.create_task| BGTask
BGTask -->|await| Crawler
BGTask -->|await| DB
BGTask -->|run_in_executor| Chunk
BGTask -->|run_in_executor| Embed
BGTask -->|run_in_executor| Summary
BGTask -->|run_in_executor| CodeExt
BGTask -->|emit| Progress
Progress -->|websocket| SIO
classDef async fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef sync fill:#fff3e0,stroke:#e65100,stroke-width:2px
class API,SIO,BGTask,Crawler,DB,Progress async
class Chunk,Embed,Summary,CodeExt sync
```
## Core Components
### CrawlOrchestrationService
The orchestration service manages the entire crawl workflow while keeping the main event loop responsive:
```python
class CrawlOrchestrationService:
def __init__(self, crawler, supabase_client, progress_id=None):
self.crawler = crawler
self.supabase_client = supabase_client
self.progress_id = progress_id
self.active_tasks = {}
# Thread pool for CPU-intensive operations only
self.executor = ThreadPoolExecutor(max_workers=4)
async def orchestrate_crawl(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Start crawl operation as background task"""
url = str(request.get('url', ''))
# Create background task in the SAME event loop
task = asyncio.create_task(
self._async_orchestrate_crawl(request)
)
# Store task reference
self.active_tasks[self.progress_id] = task
# Return immediately
return {
"task_id": self.progress_id,
"status": "started",
"message": f"Crawl operation started for {url}"
}
async def _async_orchestrate_crawl(self, request: Dict[str, Any]):
"""Background async task - runs in main event loop"""
try:
url = request.get('url', '')
# Emit initial progress
await self._emit_progress({
'status': 'analyzing',
'percentage': 0,
'currentUrl': url,
'log': f'Analyzing URL type for {url}'
})
# Step 1: Crawl URLs (MUST stay async in main loop)
crawl_results = await self._crawl_urls_async(url, request)
# Step 2: Process documents (CPU-intensive, can go to thread)
loop = asyncio.get_event_loop()
doc_results = await loop.run_in_executor(
self.executor,
self._process_documents_sync, # Sync version
crawl_results, request
)
# Step 3: Store in database (MUST stay async in main loop)
await self._store_documents_async(doc_results)
# Step 4: Generate embeddings (CPU-intensive, can go to thread)
await loop.run_in_executor(
self.executor,
self._generate_embeddings_sync,
doc_results
)
# Step 5: Extract code (CPU-intensive, can go to thread)
code_count = await loop.run_in_executor(
self.executor,
self._extract_code_sync,
crawl_results
)
# Complete
await self._emit_progress({
'status': 'complete',
'percentage': 100,
'log': 'Crawl operation completed successfully'
})
except Exception as e:
logger.error(f"Crawl orchestration error: {e}")
await self._emit_progress({
'status': 'error',
'percentage': -1,
'error': str(e)
})
async def _emit_progress(self, update: Dict[str, Any]):
"""Emit progress via Socket.IO"""
if self.progress_id:
await update_crawl_progress(self.progress_id, update)
```
### Sync Functions for Thread Execution
Only CPU-intensive operations should have sync versions for thread execution:
```python
def _process_documents_sync(self, crawl_results, request):
"""Sync version for thread execution - CPU-intensive text processing"""
all_chunks = []
for doc in crawl_results:
# Text chunking is CPU-intensive
chunks = self.chunk_text(doc['markdown'])
all_chunks.extend(chunks)
return {
'chunks': all_chunks,
'chunk_count': len(all_chunks)
}
def _generate_embeddings_sync(self, doc_results):
"""Sync version - uses synchronous OpenAI client"""
client = openai.Client() # Sync client
embeddings = []
for chunk in doc_results['chunks']:
# CPU-intensive: preparing embedding request
response = client.embeddings.create(
input=chunk,
model="text-embedding-3-small"
)
embeddings.append(response.data[0].embedding)
return embeddings
def _extract_code_sync(self, crawl_results):
"""Sync version - CPU-intensive regex and parsing"""
code_examples = []
for doc in crawl_results:
# Extract code blocks with regex
code_blocks = self.extract_code_blocks(doc['markdown'])
code_examples.extend(code_blocks)
return len(code_examples)
```
### Socket.IO Integration
Socket.IO handlers remain in the main event loop:
```python
# socketio_handlers.py
async def update_crawl_progress(progress_id: str, data: dict):
"""Emit progress updates to connected clients"""
# Check if room has subscribers
room_sids = []
if hasattr(sio.manager, 'rooms'):
namespace_rooms = sio.manager.rooms.get('/', {})
room_sids = list(namespace_rooms.get(progress_id, []))
if not room_sids:
logger.warning(f"No subscribers in room {progress_id}")
return
# Emit progress
data['progressId'] = progress_id
await sio.emit('crawl_progress', data, room=progress_id)
@sio.event
async def subscribe_to_progress(sid, data):
"""Client subscribes to progress updates"""
progress_id = data.get('progressId')
if progress_id:
sio.enter_room(sid, progress_id)
# Send current status if task is running
orchestrator = get_orchestrator_for_progress(progress_id)
if orchestrator and progress_id in orchestrator.active_tasks:
await sio.emit('crawl_progress', {
'progressId': progress_id,
'status': 'running',
'message': 'Reconnected to running task'
}, to=sid)
```
### API Endpoint Pattern
FastAPI endpoints start background tasks and return immediately:
```python
# knowledge_api.py
@router.post("/knowledge/add")
async def add_knowledge_item(request: KnowledgeAddRequest):
"""Start crawl operation - returns immediately"""
# Generate progress ID
progress_id = str(uuid.uuid4())
# Create orchestrator
orchestrator = CrawlOrchestrationService(
crawler=await crawler_manager.get_crawler(),
supabase_client=supabase_client,
progress_id=progress_id
)
# Start background task
result = await orchestrator.orchestrate_crawl(request.dict())
# Return task info immediately
return {
"success": True,
"task_id": result["task_id"],
"progress_id": progress_id,
"message": "Crawl started in background"
}
@router.get("/knowledge/status/{task_id}")
async def get_task_status(task_id: str):
"""Check status of background task"""
orchestrator = get_orchestrator_for_task(task_id)
if not orchestrator:
raise HTTPException(404, "Task not found")
task = orchestrator.active_tasks.get(task_id)
if not task:
raise HTTPException(404, "Task not found")
return {
"task_id": task_id,
"done": task.done(),
"cancelled": task.cancelled()
}
```
## Key Patterns
### What Stays Async (Main Loop)
- Browser automation (crawling)
- Database operations
- Network requests
- Socket.IO communications
- Task coordination
### What Goes to Threads
- Text chunking
- Markdown parsing
- Code extraction
- Embedding preparation
- CPU-intensive calculations
### Progress Updates
- Use asyncio.Queue for async tasks
- Regular python Queue for thread tasks
- Always emit from main event loop
- Include detailed status information
## Common Pitfalls to Avoid
1. **Don't create new event loops in threads** - Database connections won't work
2. **Don't run browser automation in threads** - It needs the main event loop
3. **Don't block the main loop** - Use asyncio.create_task for background work
4. **Don't mix async and sync incorrectly** - Keep clear boundaries
5. **Don't forget progress updates** - Users need feedback
## Testing Guidelines
1. Test with long-running crawls (100+ pages)
2. Verify Socket.IO doesn't disconnect
3. Check database operations work correctly
4. Monitor memory usage in threads
5. Test task cancellation
6. Verify progress accuracy