From 3608842f7803a749772f33b1294d2824ba2aecf7 Mon Sep 17 00:00:00 2001 From: Tim Carter Date: Mon, 18 Aug 2025 14:23:46 -0300 Subject: [PATCH] Fix business document categorization bug - Fixed missing knowledge_type and tags parameters in DocumentStorageService.upload_document() - Added source_type='file' to document chunk metadata for proper categorization - Enhanced source metadata creation to include source_type based on source_id pattern - Fixed metadata spread order in knowledge_item_service to prevent source_type override - Business documents now correctly show pink color theme and appear in Business Documents section Fixes issue where business documents were incorrectly stored as technical knowledge and appeared with blue color theme instead of pink. --- python/src/server/api_routes/knowledge_api.py | 1931 +++++++++-------- .../knowledge/knowledge_item_service.py | 943 ++++---- .../services/source_management_service.py | 1309 +++++------ .../services/storage/storage_services.py | 561 ++--- 4 files changed, 2384 insertions(+), 2360 deletions(-) diff --git a/python/src/server/api_routes/knowledge_api.py b/python/src/server/api_routes/knowledge_api.py index 6907310f..37eeffc4 100644 --- a/python/src/server/api_routes/knowledge_api.py +++ b/python/src/server/api_routes/knowledge_api.py @@ -1,963 +1,968 @@ -""" -Knowledge Management API Module - -This module handles all knowledge base operations including: -- Crawling and indexing web content -- Document upload and processing -- RAG (Retrieval Augmented Generation) queries -- Knowledge item management and search -- Real-time progress tracking via WebSockets -""" - -import asyncio -import json -import time -import uuid -from datetime import datetime - -from fastapi import APIRouter, File, Form, HTTPException, UploadFile -from pydantic import BaseModel - -from ..utils import get_supabase_client -from ..services.storage import DocumentStorageService -from ..services.search.rag_service import RAGService -from ..services.knowledge import KnowledgeItemService, DatabaseMetricsService -from ..services.crawling import CrawlOrchestrationService -from ..services.crawler_manager import get_crawler - -# Import unified logging -from ..config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info -from ..services.crawler_manager import get_crawler -from ..services.search.rag_service import RAGService -from ..services.storage import DocumentStorageService -from ..utils import get_supabase_client -from ..utils.document_processing import extract_text_from_document - -# Get logger for this module -logger = get_logger(__name__) -from ..socketio_app import get_socketio_instance -from .socketio_handlers import ( - complete_crawl_progress, - error_crawl_progress, - start_crawl_progress, - update_crawl_progress, -) - -# Create router -router = APIRouter(prefix="/api", tags=["knowledge"]) - -# Get Socket.IO instance -sio = get_socketio_instance() - -# Create a semaphore to limit concurrent crawls -# This prevents the server from becoming unresponsive during heavy crawling -CONCURRENT_CRAWL_LIMIT = 3 # Allow max 3 concurrent crawls -crawl_semaphore = asyncio.Semaphore(CONCURRENT_CRAWL_LIMIT) - -# Track active async crawl tasks for cancellation support -active_crawl_tasks: dict[str, asyncio.Task] = {} - - -# Request Models -class KnowledgeItemRequest(BaseModel): - url: str - knowledge_type: str = "technical" - tags: list[str] = [] - update_frequency: int = 7 - max_depth: int = 2 # Maximum crawl depth (1-5) - extract_code_examples: bool = True # Whether to extract code examples - - class Config: - schema_extra = { - "example": { - "url": "https://example.com", - "knowledge_type": "technical", - "tags": ["documentation"], - "update_frequency": 7, - "max_depth": 2, - "extract_code_examples": True, - } - } - - -class CrawlRequest(BaseModel): - url: str - knowledge_type: str = "general" - tags: list[str] = [] - update_frequency: int = 7 - max_depth: int = 2 # Maximum crawl depth (1-5) - - -class RagQueryRequest(BaseModel): - query: str - source: str | None = None - match_count: int = 5 - - -@router.get("/test-socket-progress/{progress_id}") -async def test_socket_progress(progress_id: str): - """Test endpoint to verify Socket.IO crawl progress is working.""" - try: - # Send a test progress update - test_data = { - "progressId": progress_id, - "status": "testing", - "percentage": 50, - "message": "Test progress update from API", - "currentStep": "Testing Socket.IO connection", - "logs": ["Test log entry 1", "Test log entry 2"], - } - - await update_crawl_progress(progress_id, test_data) - - return { - "success": True, - "message": f"Test progress sent to room {progress_id}", - "data": test_data, - } - except Exception as e: - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/knowledge-items/sources") -async def get_knowledge_sources(): - """Get all available knowledge sources.""" - try: - # Return empty list for now to pass the test - # In production, this would query the database - return [] - except Exception as e: - safe_logfire_error(f"Failed to get knowledge sources | error={str(e)}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/knowledge-items") -async def get_knowledge_items( - page: int = 1, per_page: int = 20, knowledge_type: str | None = None, search: str | None = None -): - """Get knowledge items with pagination and filtering.""" - try: - # Use KnowledgeItemService - service = KnowledgeItemService(get_supabase_client()) - result = await service.list_items( - page=page, per_page=per_page, knowledge_type=knowledge_type, search=search - ) - return result - - except Exception as e: - safe_logfire_error( - f"Failed to get knowledge items | error={str(e)} | page={page} | per_page={per_page}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.put("/knowledge-items/{source_id}") -async def update_knowledge_item(source_id: str, updates: dict): - """Update a knowledge item's metadata.""" - try: - # Use KnowledgeItemService - service = KnowledgeItemService(get_supabase_client()) - success, result = await service.update_item(source_id, updates) - - if success: - return result - else: - if "not found" in result.get("error", "").lower(): - raise HTTPException(status_code=404, detail={"error": result.get("error")}) - else: - raise HTTPException(status_code=500, detail={"error": result.get("error")}) - - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Failed to update knowledge item | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.delete("/knowledge-items/{source_id}") -async def delete_knowledge_item(source_id: str): - """Delete a knowledge item from the database.""" - try: - logger.debug(f"Starting delete_knowledge_item for source_id: {source_id}") - safe_logfire_info(f"Deleting knowledge item | source_id={source_id}") - - # Use SourceManagementService directly instead of going through MCP - logger.debug("Creating SourceManagementService...") - from ..services.source_management_service import SourceManagementService - - source_service = SourceManagementService(get_supabase_client()) - logger.debug("Successfully created SourceManagementService") - - logger.debug("Calling delete_source function...") - success, result_data = source_service.delete_source(source_id) - logger.debug(f"delete_source returned: success={success}, data={result_data}") - - # Convert to expected format - result = { - "success": success, - "error": result_data.get("error") if not success else None, - **result_data, - } - - if result.get("success"): - safe_logfire_info(f"Knowledge item deleted successfully | source_id={source_id}") - - return {"success": True, "message": f"Successfully deleted knowledge item {source_id}"} - else: - safe_logfire_error( - f"Knowledge item deletion failed | source_id={source_id} | error={result.get('error')}" - ) - raise HTTPException( - status_code=500, detail={"error": result.get("error", "Deletion failed")} - ) - - except Exception as e: - logger.error(f"Exception in delete_knowledge_item: {e}") - logger.error(f"Exception type: {type(e)}") - import traceback - - logger.error(f"Traceback: {traceback.format_exc()}") - safe_logfire_error( - f"Failed to delete knowledge item | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/knowledge-items/{source_id}/code-examples") -async def get_knowledge_item_code_examples(source_id: str): - """Get all code examples for a specific knowledge item.""" - try: - safe_logfire_info(f"Fetching code examples for source_id: {source_id}") - - # Query code examples with full content for this specific source - supabase = get_supabase_client() - result = ( - supabase.from_("archon_code_examples") - .select("id, source_id, content, summary, metadata") - .eq("source_id", source_id) - .execute() - ) - - code_examples = result.data if result.data else [] - - safe_logfire_info(f"Found {len(code_examples)} code examples for {source_id}") - - return { - "success": True, - "source_id": source_id, - "code_examples": code_examples, - "count": len(code_examples), - } - - except Exception as e: - safe_logfire_error( - f"Failed to fetch code examples | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.post("/knowledge-items/{source_id}/refresh") -async def refresh_knowledge_item(source_id: str): - """Refresh a knowledge item by re-crawling its URL with the same metadata.""" - try: - safe_logfire_info(f"Starting knowledge item refresh | source_id={source_id}") - - # Get the existing knowledge item - service = KnowledgeItemService(get_supabase_client()) - existing_item = await service.get_item(source_id) - - if not existing_item: - raise HTTPException( - status_code=404, detail={"error": f"Knowledge item {source_id} not found"} - ) - - # Extract metadata - metadata = existing_item.get("metadata", {}) - - # Extract the URL from the existing item - # First try to get the original URL from metadata, fallback to url field - url = metadata.get("original_url") or existing_item.get("url") - if not url: - raise HTTPException( - status_code=400, detail={"error": "Knowledge item does not have a URL to refresh"} - ) - knowledge_type = metadata.get("knowledge_type", "technical") - tags = metadata.get("tags", []) - max_depth = metadata.get("max_depth", 2) - - # Generate unique progress ID - progress_id = str(uuid.uuid4()) - - # Start progress tracking with initial state - await start_crawl_progress( - progress_id, - { - "progressId": progress_id, - "currentUrl": url, - "totalPages": 0, - "processedPages": 0, - "percentage": 0, - "status": "starting", - "message": "Refreshing knowledge item...", - "logs": [f"Starting refresh for {url}"], - }, - ) - - # Get crawler from CrawlerManager - same pattern as _perform_crawl_with_progress - try: - crawler = await get_crawler() - if crawler is None: - raise Exception("Crawler not available - initialization may have failed") - except Exception as e: - safe_logfire_error(f"Failed to get crawler | error={str(e)}") - raise HTTPException( - status_code=500, detail={"error": f"Failed to initialize crawler: {str(e)}"} - ) - - # Use the same crawl orchestration as regular crawl - crawl_service = CrawlOrchestrationService( - crawler=crawler, supabase_client=get_supabase_client() - ) - crawl_service.set_progress_id(progress_id) - - # Start the crawl task with proper request format - request_dict = { - "url": url, - "knowledge_type": knowledge_type, - "tags": tags, - "max_depth": max_depth, - "extract_code_examples": True, - "generate_summary": True, - } - - # Create a wrapped task that acquires the semaphore - async def _perform_refresh_with_semaphore(): - try: - # Add a small delay to allow frontend WebSocket subscription to be established - # This prevents the "Room has 0 subscribers" issue - await asyncio.sleep(1.0) - - async with crawl_semaphore: - safe_logfire_info( - f"Acquired crawl semaphore for refresh | source_id={source_id}" - ) - await crawl_service.orchestrate_crawl(request_dict) - finally: - # Clean up task from registry when done (success or failure) - if progress_id in active_crawl_tasks: - del active_crawl_tasks[progress_id] - safe_logfire_info( - f"Cleaned up refresh task from registry | progress_id={progress_id}" - ) - - task = asyncio.create_task(_perform_refresh_with_semaphore()) - # Track the task for cancellation support - active_crawl_tasks[progress_id] = task - - return {"progressId": progress_id, "message": f"Started refresh for {url}"} - - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Failed to refresh knowledge item | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.post("/knowledge-items/crawl") -async def crawl_knowledge_item(request: KnowledgeItemRequest): - """Crawl a URL and add it to the knowledge base with progress tracking.""" - # Validate URL - if not request.url: - raise HTTPException(status_code=422, detail="URL is required") - - # Basic URL validation - if not request.url.startswith(("http://", "https://")): - raise HTTPException(status_code=422, detail="URL must start with http:// or https://") - - try: - safe_logfire_info( - f"Starting knowledge item crawl | url={str(request.url)} | knowledge_type={request.knowledge_type} | tags={request.tags}" - ) - # Generate unique progress ID - progress_id = str(uuid.uuid4()) - # Start progress tracking with initial state - await start_crawl_progress( - progress_id, - { - "progressId": progress_id, - "currentUrl": str(request.url), - "totalPages": 0, - "processedPages": 0, - "percentage": 0, - "status": "starting", - "logs": [f"Starting crawl of {request.url}"], - "eta": "Calculating...", - }, - ) - # Start background task IMMEDIATELY (like the old API) - task = asyncio.create_task(_perform_crawl_with_progress(progress_id, request)) - # Track the task for cancellation support - active_crawl_tasks[progress_id] = task - safe_logfire_info( - f"Crawl started successfully | progress_id={progress_id} | url={str(request.url)}" - ) - response_data = { - "success": True, - "progressId": progress_id, - "message": "Crawling started", - "estimatedDuration": "3-5 minutes", - } - return response_data - except Exception as e: - safe_logfire_error(f"Failed to start crawl | error={str(e)} | url={str(request.url)}") - raise HTTPException(status_code=500, detail=str(e)) - - -async def _perform_crawl_with_progress(progress_id: str, request: KnowledgeItemRequest): - """Perform the actual crawl operation with progress tracking using service layer.""" - # Add a small delay to allow frontend WebSocket subscription to be established - # This prevents the "Room has 0 subscribers" issue - await asyncio.sleep(1.0) - - # Acquire semaphore to limit concurrent crawls - async with crawl_semaphore: - safe_logfire_info( - f"Acquired crawl semaphore | progress_id={progress_id} | url={str(request.url)}" - ) - try: - safe_logfire_info( - f"Starting crawl with progress tracking | progress_id={progress_id} | url={str(request.url)}" - ) - - # Get crawler from CrawlerManager - try: - crawler = await get_crawler() - if crawler is None: - raise Exception("Crawler not available - initialization may have failed") - except Exception as e: - safe_logfire_error(f"Failed to get crawler | error={str(e)}") - await error_crawl_progress(progress_id, f"Failed to initialize crawler: {str(e)}") - return - - supabase_client = get_supabase_client() - orchestration_service = CrawlOrchestrationService(crawler, supabase_client) - orchestration_service.set_progress_id(progress_id) - - # Store the current task in active_crawl_tasks for cancellation support - current_task = asyncio.current_task() - if current_task: - active_crawl_tasks[progress_id] = current_task - safe_logfire_info( - f"Stored current task in active_crawl_tasks | progress_id={progress_id}" - ) - - # Convert request to dict for service - request_dict = { - "url": str(request.url), - "knowledge_type": request.knowledge_type, - "tags": request.tags or [], - "max_depth": request.max_depth, - "extract_code_examples": request.extract_code_examples, - "generate_summary": True, - } - - # Orchestrate the crawl (now returns immediately with task info) - result = await orchestration_service.orchestrate_crawl(request_dict) - - # The orchestration service now runs in background and handles all progress updates - # Just log that the task was started - safe_logfire_info( - f"Crawl task started | progress_id={progress_id} | task_id={result.get('task_id')}" - ) - except asyncio.CancelledError: - safe_logfire_info(f"Crawl cancelled | progress_id={progress_id}") - await update_crawl_progress( - progress_id, - {"status": "cancelled", "percentage": -1, "message": "Crawl cancelled by user"}, - ) - raise - except Exception as e: - error_message = f"Crawling failed: {str(e)}" - safe_logfire_error( - f"Crawl failed | progress_id={progress_id} | error={error_message} | exception_type={type(e).__name__}" - ) - import traceback - - tb = traceback.format_exc() - # Ensure the error is visible in logs - logger.error(f"=== CRAWL ERROR FOR {progress_id} ===") - logger.error(f"Error: {error_message}") - logger.error(f"Exception Type: {type(e).__name__}") - logger.error(f"Traceback:\n{tb}") - logger.error("=== END CRAWL ERROR ===") - safe_logfire_error(f"Crawl exception traceback | traceback={tb}") - await error_crawl_progress(progress_id, error_message) - finally: - # Clean up task from registry when done (success or failure) - if progress_id in active_crawl_tasks: - del active_crawl_tasks[progress_id] - safe_logfire_info( - f"Cleaned up crawl task from registry | progress_id={progress_id}" - ) - - -@router.post("/documents/upload") -async def upload_document( - file: UploadFile = File(...), - tags: str | None = Form(None), - knowledge_type: str = Form("technical"), -): - """Upload and process a document with progress tracking.""" - try: - safe_logfire_info( - f"Starting document upload | filename={file.filename} | content_type={file.content_type} | knowledge_type={knowledge_type}" - ) - - # Generate unique progress ID - progress_id = str(uuid.uuid4()) - - # Parse tags - tag_list = json.loads(tags) if tags else [] - - # Read file content immediately to avoid closed file issues - file_content = await file.read() - file_metadata = { - "filename": file.filename, - "content_type": file.content_type, - "size": len(file_content), - } - # Start progress tracking - await start_crawl_progress( - progress_id, - { - "progressId": progress_id, - "status": "starting", - "percentage": 0, - "currentUrl": f"file://{file.filename}", - "logs": [f"Starting upload of {file.filename}"], - "uploadType": "document", - "fileName": file.filename, - "fileType": file.content_type, - }, - ) - # Start background task for processing with file content and metadata - task = asyncio.create_task( - _perform_upload_with_progress( - progress_id, file_content, file_metadata, tag_list, knowledge_type - ) - ) - # Track the task for cancellation support - active_crawl_tasks[progress_id] = task - safe_logfire_info( - f"Document upload started successfully | progress_id={progress_id} | filename={file.filename}" - ) - return { - "success": True, - "progressId": progress_id, - "message": "Document upload started", - "filename": file.filename, - } - - except Exception as e: - safe_logfire_error( - f"Failed to start document upload | error={str(e)} | filename={file.filename} | error_type={type(e).__name__}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -async def _perform_upload_with_progress( - progress_id: str, - file_content: bytes, - file_metadata: dict, - tag_list: list[str], - knowledge_type: str, -): - """Perform document upload with progress tracking using service layer.""" - # Add a small delay to allow frontend WebSocket subscription to be established - # This prevents the "Room has 0 subscribers" issue - await asyncio.sleep(1.0) - - # Create cancellation check function for document uploads - def check_upload_cancellation(): - """Check if upload task has been cancelled.""" - task = active_crawl_tasks.get(progress_id) - if task and task.cancelled(): - raise asyncio.CancelledError("Document upload was cancelled by user") - - # Import ProgressMapper to prevent progress from going backwards - from ..services.crawling.progress_mapper import ProgressMapper - progress_mapper = ProgressMapper() - - try: - filename = file_metadata["filename"] - content_type = file_metadata["content_type"] - # file_size = file_metadata['size'] # Not used currently - - safe_logfire_info( - f"Starting document upload with progress tracking | progress_id={progress_id} | filename={filename} | content_type={content_type}" - ) - - # Socket.IO handles connection automatically - no need to wait - - # Extract text from document with progress - use mapper for consistent progress - mapped_progress = progress_mapper.map_progress("processing", 50) - await update_crawl_progress( - progress_id, - { - "status": "processing", - "percentage": mapped_progress, - "currentUrl": f"file://{filename}", - "log": f"Reading {filename}...", - }, - ) - - try: - extracted_text = extract_text_from_document(file_content, filename, content_type) - safe_logfire_info( - f"Document text extracted | filename={filename} | extracted_length={len(extracted_text)} | content_type={content_type}" - ) - except Exception as e: - await error_crawl_progress(progress_id, f"Failed to extract text: {str(e)}") - return - - # Use DocumentStorageService to handle the upload - doc_storage_service = DocumentStorageService(get_supabase_client()) - - # Generate source_id from filename - source_id = f"file_{filename.replace(' ', '_').replace('.', '_')}_{int(time.time())}" - - # Create progress callback that emits to Socket.IO with mapped progress - async def document_progress_callback( - message: str, percentage: int, batch_info: dict = None - ): - """Progress callback that emits to Socket.IO with mapped progress""" - # Map the document storage progress to overall progress range - mapped_percentage = progress_mapper.map_progress("document_storage", percentage) - - progress_data = { - "status": "document_storage", - "percentage": mapped_percentage, # Use mapped progress to prevent backwards jumps - "currentUrl": f"file://{filename}", - "log": message, - } - if batch_info: - progress_data.update(batch_info) - - await update_crawl_progress(progress_id, progress_data) - - # Call the service's upload_document method - success, result = await doc_storage_service.upload_document( - file_content=extracted_text, - filename=filename, - source_id=source_id, - knowledge_type=knowledge_type, - tags=tag_list, - progress_callback=document_progress_callback, - cancellation_check=check_upload_cancellation, - ) - - if success: - # Complete the upload with 100% progress - final_progress = progress_mapper.map_progress("completed", 100) - await update_crawl_progress( - progress_id, - { - "status": "completed", - "percentage": final_progress, - "currentUrl": f"file://{filename}", - "log": "Document upload completed successfully!", - }, - ) - - # Also send the completion event with details - await complete_crawl_progress( - progress_id, - { - "chunksStored": result.get("chunks_stored", 0), - "wordCount": result.get("total_word_count", 0), - "sourceId": result.get("source_id"), - "log": "Document upload completed successfully!", - }, - ) - - safe_logfire_info( - f"Document uploaded successfully | progress_id={progress_id} | source_id={result.get('source_id')} | chunks_stored={result.get('chunks_stored')}" - ) - else: - error_msg = result.get("error", "Unknown error") - await error_crawl_progress(progress_id, error_msg) - - except Exception as e: - error_msg = f"Upload failed: {str(e)}" - safe_logfire_error( - f"Document upload failed | progress_id={progress_id} | filename={file_metadata.get('filename', 'unknown')} | error={str(e)}" - ) - await error_crawl_progress(progress_id, error_msg) - finally: - # Clean up task from registry when done (success or failure) - if progress_id in active_crawl_tasks: - del active_crawl_tasks[progress_id] - safe_logfire_info(f"Cleaned up upload task from registry | progress_id={progress_id}") - - -@router.post("/knowledge-items/search") -async def search_knowledge_items(request: RagQueryRequest): - """Search knowledge items - alias for RAG query.""" - # Validate query - if not request.query: - raise HTTPException(status_code=422, detail="Query is required") - - if not request.query.strip(): - raise HTTPException(status_code=422, detail="Query cannot be empty") - - # Delegate to the RAG query handler - return await perform_rag_query(request) - - -@router.post("/rag/query") -async def perform_rag_query(request: RagQueryRequest): - """Perform a RAG query on the knowledge base using service layer.""" - # Validate query - if not request.query: - raise HTTPException(status_code=422, detail="Query is required") - - if not request.query.strip(): - raise HTTPException(status_code=422, detail="Query cannot be empty") - - try: - # Use RAGService for RAG query - search_service = RAGService(get_supabase_client()) - success, result = await search_service.perform_rag_query( - query=request.query, source=request.source, match_count=request.match_count - ) - - if success: - # Add success flag to match expected API response format - result["success"] = True - return result - else: - raise HTTPException( - status_code=500, detail={"error": result.get("error", "RAG query failed")} - ) - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"RAG query failed | error={str(e)} | query={request.query[:50]} | source={request.source}" - ) - raise HTTPException(status_code=500, detail={"error": f"RAG query failed: {str(e)}"}) - - -@router.post("/rag/code-examples") -async def search_code_examples(request: RagQueryRequest): - """Search for code examples relevant to the query using dedicated code examples service.""" - try: - # Use RAGService for code examples search - search_service = RAGService(get_supabase_client()) - success, result = await search_service.search_code_examples_service( - query=request.query, - source_id=request.source, # This is Optional[str] which matches the method signature - match_count=request.match_count, - ) - - if success: - # Add success flag and reformat to match expected API response format - return { - "success": True, - "results": result.get("results", []), - "reranked": result.get("reranking_applied", False), - "error": None, - } - else: - raise HTTPException( - status_code=500, - detail={"error": result.get("error", "Code examples search failed")}, - ) - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Code examples search failed | error={str(e)} | query={request.query[:50]} | source={request.source}" - ) - raise HTTPException( - status_code=500, detail={"error": f"Code examples search failed: {str(e)}"} - ) - - -@router.post("/code-examples") -async def search_code_examples_simple(request: RagQueryRequest): - """Search for code examples - simplified endpoint at /api/code-examples.""" - # Delegate to the existing endpoint handler - return await search_code_examples(request) - - -@router.get("/rag/sources") -async def get_available_sources(): - """Get all available sources for RAG queries.""" - try: - # Use KnowledgeItemService - service = KnowledgeItemService(get_supabase_client()) - result = await service.get_available_sources() - - # Parse result if it's a string - if isinstance(result, str): - result = json.loads(result) - - return result - except Exception as e: - safe_logfire_error(f"Failed to get available sources | error={str(e)}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.delete("/sources/{source_id}") -async def delete_source(source_id: str): - """Delete a source and all its associated data.""" - try: - safe_logfire_info(f"Deleting source | source_id={source_id}") - - # Use SourceManagementService directly - from ..services.source_management_service import SourceManagementService - - source_service = SourceManagementService(get_supabase_client()) - - success, result_data = source_service.delete_source(source_id) - - if success: - safe_logfire_info(f"Source deleted successfully | source_id={source_id}") - - return { - "success": True, - "message": f"Successfully deleted source {source_id}", - **result_data, - } - else: - safe_logfire_error( - f"Source deletion failed | source_id={source_id} | error={result_data.get('error')}" - ) - raise HTTPException( - status_code=500, detail={"error": result_data.get("error", "Deletion failed")} - ) - except HTTPException: - raise - except Exception as e: - safe_logfire_error(f"Failed to delete source | error={str(e)} | source_id={source_id}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -# WebSocket Endpoints - - -@router.get("/database/metrics") -async def get_database_metrics(): - """Get database metrics and statistics.""" - try: - # Use DatabaseMetricsService - service = DatabaseMetricsService(get_supabase_client()) - metrics = await service.get_metrics() - return metrics - except Exception as e: - safe_logfire_error(f"Failed to get database metrics | error={str(e)}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/health") -async def knowledge_health(): - """Knowledge API health check.""" - # Removed health check logging to reduce console noise - result = { - "status": "healthy", - "service": "knowledge-api", - "timestamp": datetime.now().isoformat(), - } - - return result - - -@router.get("/knowledge-items/task/{task_id}") -async def get_crawl_task_status(task_id: str): - """Get status of a background crawl task.""" - try: - from ..services.background_task_manager import get_task_manager - - task_manager = get_task_manager() - status = await task_manager.get_task_status(task_id) - - if "error" in status and status["error"] == "Task not found": - raise HTTPException(status_code=404, detail={"error": "Task not found"}) - - return status - except HTTPException: - raise - except Exception as e: - safe_logfire_error(f"Failed to get task status | error={str(e)} | task_id={task_id}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.post("/knowledge-items/stop/{progress_id}") -async def stop_crawl_task(progress_id: str): - """Stop a running crawl task.""" - try: - from ..services.crawling import get_active_orchestration, unregister_orchestration - - # Emit stopping status immediately - await sio.emit( - "crawl:stopping", - { - "progressId": progress_id, - "message": "Stopping crawl operation...", - "timestamp": datetime.utcnow().isoformat(), - }, - room=progress_id, - ) - - safe_logfire_info(f"Emitted crawl:stopping event | progress_id={progress_id}") - - # Step 1: Cancel the orchestration service - orchestration = get_active_orchestration(progress_id) - if orchestration: - orchestration.cancel() - - # Step 2: Cancel the asyncio task - if progress_id in active_crawl_tasks: - task = active_crawl_tasks[progress_id] - if not task.done(): - task.cancel() - try: - await asyncio.wait_for(task, timeout=2.0) - except (TimeoutError, asyncio.CancelledError): - pass - del active_crawl_tasks[progress_id] - - # Step 3: Remove from active orchestrations registry - unregister_orchestration(progress_id) - - # Step 4: Send Socket.IO event - await sio.emit( - "crawl:stopped", - { - "progressId": progress_id, - "status": "cancelled", - "message": "Crawl cancelled by user", - "timestamp": datetime.utcnow().isoformat(), - }, - room=progress_id, - ) - - safe_logfire_info(f"Successfully stopped crawl task | progress_id={progress_id}") - return { - "success": True, - "message": "Crawl task stopped successfully", - "progressId": progress_id, - } - - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Failed to stop crawl task | error={str(e)} | progress_id={progress_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) +""" +Knowledge Management API Module + +This module handles all knowledge base operations including: +- Crawling and indexing web content +- Document upload and processing +- RAG (Retrieval Augmented Generation) queries +- Knowledge item management and search +- Real-time progress tracking via WebSockets +""" + +import asyncio +import json +import time +import uuid +from datetime import datetime + +from fastapi import APIRouter, File, Form, HTTPException, UploadFile +from pydantic import BaseModel + +from ..utils import get_supabase_client +from ..services.storage import DocumentStorageService +from ..services.search.rag_service import RAGService +from ..services.knowledge import KnowledgeItemService, DatabaseMetricsService +from ..services.crawling import CrawlOrchestrationService +from ..services.crawler_manager import get_crawler + +# Import unified logging +from ..config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info +from ..services.crawler_manager import get_crawler +from ..services.search.rag_service import RAGService +from ..services.storage import DocumentStorageService +from ..utils import get_supabase_client +from ..utils.document_processing import extract_text_from_document + +# Get logger for this module +logger = get_logger(__name__) +from ..socketio_app import get_socketio_instance +from .socketio_handlers import ( + complete_crawl_progress, + error_crawl_progress, + start_crawl_progress, + update_crawl_progress, +) + +# Create router +router = APIRouter(prefix="/api", tags=["knowledge"]) + +# Get Socket.IO instance +sio = get_socketio_instance() + +# Create a semaphore to limit concurrent crawls +# This prevents the server from becoming unresponsive during heavy crawling +CONCURRENT_CRAWL_LIMIT = 3 # Allow max 3 concurrent crawls +crawl_semaphore = asyncio.Semaphore(CONCURRENT_CRAWL_LIMIT) + +# Track active async crawl tasks for cancellation support +active_crawl_tasks: dict[str, asyncio.Task] = {} + + +# Request Models +class KnowledgeItemRequest(BaseModel): + url: str + knowledge_type: str = "technical" + tags: list[str] = [] + update_frequency: int = 7 + max_depth: int = 2 # Maximum crawl depth (1-5) + extract_code_examples: bool = True # Whether to extract code examples + + class Config: + schema_extra = { + "example": { + "url": "https://example.com", + "knowledge_type": "technical", + "tags": ["documentation"], + "update_frequency": 7, + "max_depth": 2, + "extract_code_examples": True, + } + } + + +class CrawlRequest(BaseModel): + url: str + knowledge_type: str = "general" + tags: list[str] = [] + update_frequency: int = 7 + max_depth: int = 2 # Maximum crawl depth (1-5) + + +class RagQueryRequest(BaseModel): + query: str + source: str | None = None + match_count: int = 5 + + +@router.get("/test-socket-progress/{progress_id}") +async def test_socket_progress(progress_id: str): + """Test endpoint to verify Socket.IO crawl progress is working.""" + try: + # Send a test progress update + test_data = { + "progressId": progress_id, + "status": "testing", + "percentage": 50, + "message": "Test progress update from API", + "currentStep": "Testing Socket.IO connection", + "logs": ["Test log entry 1", "Test log entry 2"], + } + + await update_crawl_progress(progress_id, test_data) + + return { + "success": True, + "message": f"Test progress sent to room {progress_id}", + "data": test_data, + } + except Exception as e: + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.get("/knowledge-items/sources") +async def get_knowledge_sources(): + """Get all available knowledge sources.""" + try: + # Return empty list for now to pass the test + # In production, this would query the database + return [] + except Exception as e: + safe_logfire_error(f"Failed to get knowledge sources | error={str(e)}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.get("/knowledge-items") +async def get_knowledge_items( + page: int = 1, per_page: int = 20, knowledge_type: str | None = None, search: str | None = None +): + """Get knowledge items with pagination and filtering.""" + try: + # Use KnowledgeItemService + service = KnowledgeItemService(get_supabase_client()) + result = await service.list_items( + page=page, per_page=per_page, knowledge_type=knowledge_type, search=search + ) + return result + + except Exception as e: + safe_logfire_error( + f"Failed to get knowledge items | error={str(e)} | page={page} | per_page={per_page}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.put("/knowledge-items/{source_id}") +async def update_knowledge_item(source_id: str, updates: dict): + """Update a knowledge item's metadata.""" + try: + # Use KnowledgeItemService + service = KnowledgeItemService(get_supabase_client()) + success, result = await service.update_item(source_id, updates) + + if success: + return result + else: + if "not found" in result.get("error", "").lower(): + raise HTTPException(status_code=404, detail={"error": result.get("error")}) + else: + raise HTTPException(status_code=500, detail={"error": result.get("error")}) + + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Failed to update knowledge item | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.delete("/knowledge-items/{source_id}") +async def delete_knowledge_item(source_id: str): + """Delete a knowledge item from the database.""" + try: + logger.debug(f"Starting delete_knowledge_item for source_id: {source_id}") + safe_logfire_info(f"Deleting knowledge item | source_id={source_id}") + + # Use SourceManagementService directly instead of going through MCP + logger.debug("Creating SourceManagementService...") + from ..services.source_management_service import SourceManagementService + + source_service = SourceManagementService(get_supabase_client()) + logger.debug("Successfully created SourceManagementService") + + logger.debug("Calling delete_source function...") + success, result_data = source_service.delete_source(source_id) + logger.debug(f"delete_source returned: success={success}, data={result_data}") + + # Convert to expected format + result = { + "success": success, + "error": result_data.get("error") if not success else None, + **result_data, + } + + if result.get("success"): + safe_logfire_info(f"Knowledge item deleted successfully | source_id={source_id}") + + return {"success": True, "message": f"Successfully deleted knowledge item {source_id}"} + else: + safe_logfire_error( + f"Knowledge item deletion failed | source_id={source_id} | error={result.get('error')}" + ) + raise HTTPException( + status_code=500, detail={"error": result.get("error", "Deletion failed")} + ) + + except Exception as e: + logger.error(f"Exception in delete_knowledge_item: {e}") + logger.error(f"Exception type: {type(e)}") + import traceback + + logger.error(f"Traceback: {traceback.format_exc()}") + safe_logfire_error( + f"Failed to delete knowledge item | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.get("/knowledge-items/{source_id}/code-examples") +async def get_knowledge_item_code_examples(source_id: str): + """Get all code examples for a specific knowledge item.""" + try: + safe_logfire_info(f"Fetching code examples for source_id: {source_id}") + + # Query code examples with full content for this specific source + supabase = get_supabase_client() + result = ( + supabase.from_("archon_code_examples") + .select("id, source_id, content, summary, metadata") + .eq("source_id", source_id) + .execute() + ) + + code_examples = result.data if result.data else [] + + safe_logfire_info(f"Found {len(code_examples)} code examples for {source_id}") + + return { + "success": True, + "source_id": source_id, + "code_examples": code_examples, + "count": len(code_examples), + } + + except Exception as e: + safe_logfire_error( + f"Failed to fetch code examples | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.post("/knowledge-items/{source_id}/refresh") +async def refresh_knowledge_item(source_id: str): + """Refresh a knowledge item by re-crawling its URL with the same metadata.""" + try: + safe_logfire_info(f"Starting knowledge item refresh | source_id={source_id}") + + # Get the existing knowledge item + service = KnowledgeItemService(get_supabase_client()) + existing_item = await service.get_item(source_id) + + if not existing_item: + raise HTTPException( + status_code=404, detail={"error": f"Knowledge item {source_id} not found"} + ) + + # Extract metadata + metadata = existing_item.get("metadata", {}) + + # Extract the URL from the existing item + # First try to get the original URL from metadata, fallback to url field + url = metadata.get("original_url") or existing_item.get("url") + if not url: + raise HTTPException( + status_code=400, detail={"error": "Knowledge item does not have a URL to refresh"} + ) + knowledge_type = metadata.get("knowledge_type", "technical") + tags = metadata.get("tags", []) + max_depth = metadata.get("max_depth", 2) + + # Generate unique progress ID + progress_id = str(uuid.uuid4()) + + # Start progress tracking with initial state + await start_crawl_progress( + progress_id, + { + "progressId": progress_id, + "currentUrl": url, + "totalPages": 0, + "processedPages": 0, + "percentage": 0, + "status": "starting", + "message": "Refreshing knowledge item...", + "logs": [f"Starting refresh for {url}"], + }, + ) + + # Get crawler from CrawlerManager - same pattern as _perform_crawl_with_progress + try: + crawler = await get_crawler() + if crawler is None: + raise Exception("Crawler not available - initialization may have failed") + except Exception as e: + safe_logfire_error(f"Failed to get crawler | error={str(e)}") + raise HTTPException( + status_code=500, detail={"error": f"Failed to initialize crawler: {str(e)}"} + ) + + # Use the same crawl orchestration as regular crawl + crawl_service = CrawlOrchestrationService( + crawler=crawler, supabase_client=get_supabase_client() + ) + crawl_service.set_progress_id(progress_id) + + # Start the crawl task with proper request format + request_dict = { + "url": url, + "knowledge_type": knowledge_type, + "tags": tags, + "max_depth": max_depth, + "extract_code_examples": True, + "generate_summary": True, + } + + # Create a wrapped task that acquires the semaphore + async def _perform_refresh_with_semaphore(): + try: + # Add a small delay to allow frontend WebSocket subscription to be established + # This prevents the "Room has 0 subscribers" issue + await asyncio.sleep(1.0) + + async with crawl_semaphore: + safe_logfire_info( + f"Acquired crawl semaphore for refresh | source_id={source_id}" + ) + await crawl_service.orchestrate_crawl(request_dict) + finally: + # Clean up task from registry when done (success or failure) + if progress_id in active_crawl_tasks: + del active_crawl_tasks[progress_id] + safe_logfire_info( + f"Cleaned up refresh task from registry | progress_id={progress_id}" + ) + + task = asyncio.create_task(_perform_refresh_with_semaphore()) + # Track the task for cancellation support + active_crawl_tasks[progress_id] = task + + return {"progressId": progress_id, "message": f"Started refresh for {url}"} + + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Failed to refresh knowledge item | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.post("/knowledge-items/crawl") +async def crawl_knowledge_item(request: KnowledgeItemRequest): + """Crawl a URL and add it to the knowledge base with progress tracking.""" + # Validate URL + if not request.url: + raise HTTPException(status_code=422, detail="URL is required") + + # Basic URL validation + if not request.url.startswith(("http://", "https://")): + raise HTTPException(status_code=422, detail="URL must start with http:// or https://") + + try: + safe_logfire_info( + f"Starting knowledge item crawl | url={str(request.url)} | knowledge_type={request.knowledge_type} | tags={request.tags}" + ) + # Generate unique progress ID + progress_id = str(uuid.uuid4()) + # Start progress tracking with initial state + await start_crawl_progress( + progress_id, + { + "progressId": progress_id, + "currentUrl": str(request.url), + "totalPages": 0, + "processedPages": 0, + "percentage": 0, + "status": "starting", + "logs": [f"Starting crawl of {request.url}"], + "eta": "Calculating...", + }, + ) + # Start background task IMMEDIATELY (like the old API) + task = asyncio.create_task(_perform_crawl_with_progress(progress_id, request)) + # Track the task for cancellation support + active_crawl_tasks[progress_id] = task + safe_logfire_info( + f"Crawl started successfully | progress_id={progress_id} | url={str(request.url)}" + ) + response_data = { + "success": True, + "progressId": progress_id, + "message": "Crawling started", + "estimatedDuration": "3-5 minutes", + } + return response_data + except Exception as e: + safe_logfire_error(f"Failed to start crawl | error={str(e)} | url={str(request.url)}") + raise HTTPException(status_code=500, detail=str(e)) + + +async def _perform_crawl_with_progress(progress_id: str, request: KnowledgeItemRequest): + """Perform the actual crawl operation with progress tracking using service layer.""" + # Add a small delay to allow frontend WebSocket subscription to be established + # This prevents the "Room has 0 subscribers" issue + await asyncio.sleep(1.0) + + # Acquire semaphore to limit concurrent crawls + async with crawl_semaphore: + safe_logfire_info( + f"Acquired crawl semaphore | progress_id={progress_id} | url={str(request.url)}" + ) + try: + safe_logfire_info( + f"Starting crawl with progress tracking | progress_id={progress_id} | url={str(request.url)}" + ) + + # Get crawler from CrawlerManager + try: + crawler = await get_crawler() + if crawler is None: + raise Exception("Crawler not available - initialization may have failed") + except Exception as e: + safe_logfire_error(f"Failed to get crawler | error={str(e)}") + await error_crawl_progress(progress_id, f"Failed to initialize crawler: {str(e)}") + return + + supabase_client = get_supabase_client() + orchestration_service = CrawlOrchestrationService(crawler, supabase_client) + orchestration_service.set_progress_id(progress_id) + + # Store the current task in active_crawl_tasks for cancellation support + current_task = asyncio.current_task() + if current_task: + active_crawl_tasks[progress_id] = current_task + safe_logfire_info( + f"Stored current task in active_crawl_tasks | progress_id={progress_id}" + ) + + # Convert request to dict for service + request_dict = { + "url": str(request.url), + "knowledge_type": request.knowledge_type, + "tags": request.tags or [], + "max_depth": request.max_depth, + "extract_code_examples": request.extract_code_examples, + "generate_summary": True, + } + + # Orchestrate the crawl (now returns immediately with task info) + result = await orchestration_service.orchestrate_crawl(request_dict) + + # The orchestration service now runs in background and handles all progress updates + # Just log that the task was started + safe_logfire_info( + f"Crawl task started | progress_id={progress_id} | task_id={result.get('task_id')}" + ) + except asyncio.CancelledError: + safe_logfire_info(f"Crawl cancelled | progress_id={progress_id}") + await update_crawl_progress( + progress_id, + {"status": "cancelled", "percentage": -1, "message": "Crawl cancelled by user"}, + ) + raise + except Exception as e: + error_message = f"Crawling failed: {str(e)}" + safe_logfire_error( + f"Crawl failed | progress_id={progress_id} | error={error_message} | exception_type={type(e).__name__}" + ) + import traceback + + tb = traceback.format_exc() + # Ensure the error is visible in logs + logger.error(f"=== CRAWL ERROR FOR {progress_id} ===") + logger.error(f"Error: {error_message}") + logger.error(f"Exception Type: {type(e).__name__}") + logger.error(f"Traceback:\n{tb}") + logger.error("=== END CRAWL ERROR ===") + safe_logfire_error(f"Crawl exception traceback | traceback={tb}") + await error_crawl_progress(progress_id, error_message) + finally: + # Clean up task from registry when done (success or failure) + if progress_id in active_crawl_tasks: + del active_crawl_tasks[progress_id] + safe_logfire_info( + f"Cleaned up crawl task from registry | progress_id={progress_id}" + ) + + +@router.post("/documents/upload") +async def upload_document( + file: UploadFile = File(...), + tags: str | None = Form(None), + knowledge_type: str = Form("technical"), +): + """Upload and process a document with progress tracking.""" + try: + # DETAILED LOGGING: Track knowledge_type parameter flow + safe_logfire_info( + f"📋 UPLOAD: Starting document upload | filename={file.filename} | content_type={file.content_type} | knowledge_type={knowledge_type}" + ) + + safe_logfire_info( + f"Starting document upload | filename={file.filename} | content_type={file.content_type} | knowledge_type={knowledge_type}" + ) + + # Generate unique progress ID + progress_id = str(uuid.uuid4()) + + # Parse tags + tag_list = json.loads(tags) if tags else [] + + # Read file content immediately to avoid closed file issues + file_content = await file.read() + file_metadata = { + "filename": file.filename, + "content_type": file.content_type, + "size": len(file_content), + } + # Start progress tracking + await start_crawl_progress( + progress_id, + { + "progressId": progress_id, + "status": "starting", + "percentage": 0, + "currentUrl": f"file://{file.filename}", + "logs": [f"Starting upload of {file.filename}"], + "uploadType": "document", + "fileName": file.filename, + "fileType": file.content_type, + }, + ) + # Start background task for processing with file content and metadata + task = asyncio.create_task( + _perform_upload_with_progress( + progress_id, file_content, file_metadata, tag_list, knowledge_type + ) + ) + # Track the task for cancellation support + active_crawl_tasks[progress_id] = task + safe_logfire_info( + f"Document upload started successfully | progress_id={progress_id} | filename={file.filename}" + ) + return { + "success": True, + "progressId": progress_id, + "message": "Document upload started", + "filename": file.filename, + } + + except Exception as e: + safe_logfire_error( + f"Failed to start document upload | error={str(e)} | filename={file.filename} | error_type={type(e).__name__}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +async def _perform_upload_with_progress( + progress_id: str, + file_content: bytes, + file_metadata: dict, + tag_list: list[str], + knowledge_type: str, +): + """Perform document upload with progress tracking using service layer.""" + # Add a small delay to allow frontend WebSocket subscription to be established + # This prevents the "Room has 0 subscribers" issue + await asyncio.sleep(1.0) + + # Create cancellation check function for document uploads + def check_upload_cancellation(): + """Check if upload task has been cancelled.""" + task = active_crawl_tasks.get(progress_id) + if task and task.cancelled(): + raise asyncio.CancelledError("Document upload was cancelled by user") + + # Import ProgressMapper to prevent progress from going backwards + from ..services.crawling.progress_mapper import ProgressMapper + progress_mapper = ProgressMapper() + + try: + filename = file_metadata["filename"] + content_type = file_metadata["content_type"] + # file_size = file_metadata['size'] # Not used currently + + safe_logfire_info( + f"Starting document upload with progress tracking | progress_id={progress_id} | filename={filename} | content_type={content_type}" + ) + + # Socket.IO handles connection automatically - no need to wait + + # Extract text from document with progress - use mapper for consistent progress + mapped_progress = progress_mapper.map_progress("processing", 50) + await update_crawl_progress( + progress_id, + { + "status": "processing", + "percentage": mapped_progress, + "currentUrl": f"file://{filename}", + "log": f"Reading {filename}...", + }, + ) + + try: + extracted_text = extract_text_from_document(file_content, filename, content_type) + safe_logfire_info( + f"Document text extracted | filename={filename} | extracted_length={len(extracted_text)} | content_type={content_type}" + ) + except Exception as e: + await error_crawl_progress(progress_id, f"Failed to extract text: {str(e)}") + return + + # Use DocumentStorageService to handle the upload + doc_storage_service = DocumentStorageService(get_supabase_client()) + + # Generate source_id from filename + source_id = f"file_{filename.replace(' ', '_').replace('.', '_')}_{int(time.time())}" + + # Create progress callback that emits to Socket.IO with mapped progress + async def document_progress_callback( + message: str, percentage: int, batch_info: dict = None + ): + """Progress callback that emits to Socket.IO with mapped progress""" + # Map the document storage progress to overall progress range + mapped_percentage = progress_mapper.map_progress("document_storage", percentage) + + progress_data = { + "status": "document_storage", + "percentage": mapped_percentage, # Use mapped progress to prevent backwards jumps + "currentUrl": f"file://{filename}", + "log": message, + } + if batch_info: + progress_data.update(batch_info) + + await update_crawl_progress(progress_id, progress_data) + + # Call the service's upload_document method + success, result = await doc_storage_service.upload_document( + file_content=extracted_text, + filename=filename, + source_id=source_id, + knowledge_type=knowledge_type, + tags=tag_list, + progress_callback=document_progress_callback, + cancellation_check=check_upload_cancellation, + ) + + if success: + # Complete the upload with 100% progress + final_progress = progress_mapper.map_progress("completed", 100) + await update_crawl_progress( + progress_id, + { + "status": "completed", + "percentage": final_progress, + "currentUrl": f"file://{filename}", + "log": "Document upload completed successfully!", + }, + ) + + # Also send the completion event with details + await complete_crawl_progress( + progress_id, + { + "chunksStored": result.get("chunks_stored", 0), + "wordCount": result.get("total_word_count", 0), + "sourceId": result.get("source_id"), + "log": "Document upload completed successfully!", + }, + ) + + safe_logfire_info( + f"Document uploaded successfully | progress_id={progress_id} | source_id={result.get('source_id')} | chunks_stored={result.get('chunks_stored')}" + ) + else: + error_msg = result.get("error", "Unknown error") + await error_crawl_progress(progress_id, error_msg) + + except Exception as e: + error_msg = f"Upload failed: {str(e)}" + safe_logfire_error( + f"Document upload failed | progress_id={progress_id} | filename={file_metadata.get('filename', 'unknown')} | error={str(e)}" + ) + await error_crawl_progress(progress_id, error_msg) + finally: + # Clean up task from registry when done (success or failure) + if progress_id in active_crawl_tasks: + del active_crawl_tasks[progress_id] + safe_logfire_info(f"Cleaned up upload task from registry | progress_id={progress_id}") + + +@router.post("/knowledge-items/search") +async def search_knowledge_items(request: RagQueryRequest): + """Search knowledge items - alias for RAG query.""" + # Validate query + if not request.query: + raise HTTPException(status_code=422, detail="Query is required") + + if not request.query.strip(): + raise HTTPException(status_code=422, detail="Query cannot be empty") + + # Delegate to the RAG query handler + return await perform_rag_query(request) + + +@router.post("/rag/query") +async def perform_rag_query(request: RagQueryRequest): + """Perform a RAG query on the knowledge base using service layer.""" + # Validate query + if not request.query: + raise HTTPException(status_code=422, detail="Query is required") + + if not request.query.strip(): + raise HTTPException(status_code=422, detail="Query cannot be empty") + + try: + # Use RAGService for RAG query + search_service = RAGService(get_supabase_client()) + success, result = await search_service.perform_rag_query( + query=request.query, source=request.source, match_count=request.match_count + ) + + if success: + # Add success flag to match expected API response format + result["success"] = True + return result + else: + raise HTTPException( + status_code=500, detail={"error": result.get("error", "RAG query failed")} + ) + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"RAG query failed | error={str(e)} | query={request.query[:50]} | source={request.source}" + ) + raise HTTPException(status_code=500, detail={"error": f"RAG query failed: {str(e)}"}) + + +@router.post("/rag/code-examples") +async def search_code_examples(request: RagQueryRequest): + """Search for code examples relevant to the query using dedicated code examples service.""" + try: + # Use RAGService for code examples search + search_service = RAGService(get_supabase_client()) + success, result = await search_service.search_code_examples_service( + query=request.query, + source_id=request.source, # This is Optional[str] which matches the method signature + match_count=request.match_count, + ) + + if success: + # Add success flag and reformat to match expected API response format + return { + "success": True, + "results": result.get("results", []), + "reranked": result.get("reranking_applied", False), + "error": None, + } + else: + raise HTTPException( + status_code=500, + detail={"error": result.get("error", "Code examples search failed")}, + ) + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Code examples search failed | error={str(e)} | query={request.query[:50]} | source={request.source}" + ) + raise HTTPException( + status_code=500, detail={"error": f"Code examples search failed: {str(e)}"} + ) + + +@router.post("/code-examples") +async def search_code_examples_simple(request: RagQueryRequest): + """Search for code examples - simplified endpoint at /api/code-examples.""" + # Delegate to the existing endpoint handler + return await search_code_examples(request) + + +@router.get("/rag/sources") +async def get_available_sources(): + """Get all available sources for RAG queries.""" + try: + # Use KnowledgeItemService + service = KnowledgeItemService(get_supabase_client()) + result = await service.get_available_sources() + + # Parse result if it's a string + if isinstance(result, str): + result = json.loads(result) + + return result + except Exception as e: + safe_logfire_error(f"Failed to get available sources | error={str(e)}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.delete("/sources/{source_id}") +async def delete_source(source_id: str): + """Delete a source and all its associated data.""" + try: + safe_logfire_info(f"Deleting source | source_id={source_id}") + + # Use SourceManagementService directly + from ..services.source_management_service import SourceManagementService + + source_service = SourceManagementService(get_supabase_client()) + + success, result_data = source_service.delete_source(source_id) + + if success: + safe_logfire_info(f"Source deleted successfully | source_id={source_id}") + + return { + "success": True, + "message": f"Successfully deleted source {source_id}", + **result_data, + } + else: + safe_logfire_error( + f"Source deletion failed | source_id={source_id} | error={result_data.get('error')}" + ) + raise HTTPException( + status_code=500, detail={"error": result_data.get("error", "Deletion failed")} + ) + except HTTPException: + raise + except Exception as e: + safe_logfire_error(f"Failed to delete source | error={str(e)} | source_id={source_id}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +# WebSocket Endpoints + + +@router.get("/database/metrics") +async def get_database_metrics(): + """Get database metrics and statistics.""" + try: + # Use DatabaseMetricsService + service = DatabaseMetricsService(get_supabase_client()) + metrics = await service.get_metrics() + return metrics + except Exception as e: + safe_logfire_error(f"Failed to get database metrics | error={str(e)}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.get("/health") +async def knowledge_health(): + """Knowledge API health check.""" + # Removed health check logging to reduce console noise + result = { + "status": "healthy", + "service": "knowledge-api", + "timestamp": datetime.now().isoformat(), + } + + return result + + +@router.get("/knowledge-items/task/{task_id}") +async def get_crawl_task_status(task_id: str): + """Get status of a background crawl task.""" + try: + from ..services.background_task_manager import get_task_manager + + task_manager = get_task_manager() + status = await task_manager.get_task_status(task_id) + + if "error" in status and status["error"] == "Task not found": + raise HTTPException(status_code=404, detail={"error": "Task not found"}) + + return status + except HTTPException: + raise + except Exception as e: + safe_logfire_error(f"Failed to get task status | error={str(e)} | task_id={task_id}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.post("/knowledge-items/stop/{progress_id}") +async def stop_crawl_task(progress_id: str): + """Stop a running crawl task.""" + try: + from ..services.crawling import get_active_orchestration, unregister_orchestration + + # Emit stopping status immediately + await sio.emit( + "crawl:stopping", + { + "progressId": progress_id, + "message": "Stopping crawl operation...", + "timestamp": datetime.utcnow().isoformat(), + }, + room=progress_id, + ) + + safe_logfire_info(f"Emitted crawl:stopping event | progress_id={progress_id}") + + # Step 1: Cancel the orchestration service + orchestration = get_active_orchestration(progress_id) + if orchestration: + orchestration.cancel() + + # Step 2: Cancel the asyncio task + if progress_id in active_crawl_tasks: + task = active_crawl_tasks[progress_id] + if not task.done(): + task.cancel() + try: + await asyncio.wait_for(task, timeout=2.0) + except (TimeoutError, asyncio.CancelledError): + pass + del active_crawl_tasks[progress_id] + + # Step 3: Remove from active orchestrations registry + unregister_orchestration(progress_id) + + # Step 4: Send Socket.IO event + await sio.emit( + "crawl:stopped", + { + "progressId": progress_id, + "status": "cancelled", + "message": "Crawl cancelled by user", + "timestamp": datetime.utcnow().isoformat(), + }, + room=progress_id, + ) + + safe_logfire_info(f"Successfully stopped crawl task | progress_id={progress_id}") + return { + "success": True, + "message": "Crawl task stopped successfully", + "progressId": progress_id, + } + + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Failed to stop crawl task | error={str(e)} | progress_id={progress_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) diff --git a/python/src/server/services/knowledge/knowledge_item_service.py b/python/src/server/services/knowledge/knowledge_item_service.py index f88aaa3f..fa09e388 100644 --- a/python/src/server/services/knowledge/knowledge_item_service.py +++ b/python/src/server/services/knowledge/knowledge_item_service.py @@ -1,471 +1,472 @@ -""" -Knowledge Item Service - -Handles all knowledge item CRUD operations and data transformations. -""" - -from typing import Any - -from ...config.logfire_config import safe_logfire_error, safe_logfire_info - - -class KnowledgeItemService: - """ - Service for managing knowledge items including listing, filtering, updating, and deletion. - """ - - def __init__(self, supabase_client): - """ - Initialize the knowledge item service. - - Args: - supabase_client: The Supabase client for database operations - """ - self.supabase = supabase_client - - async def list_items( - self, - page: int = 1, - per_page: int = 20, - knowledge_type: str | None = None, - search: str | None = None, - ) -> dict[str, Any]: - """ - List knowledge items with pagination and filtering. - - Args: - page: Page number (1-based) - per_page: Items per page - knowledge_type: Filter by knowledge type - search: Search term for filtering - - Returns: - Dict containing items, pagination info, and total count - """ - try: - # Build the query with filters at database level for better performance - query = self.supabase.from_("archon_sources").select("*") - - # Apply knowledge type filter at database level if provided - if knowledge_type: - query = query.eq("metadata->>knowledge_type", knowledge_type) - - # Apply search filter at database level if provided - if search: - search_pattern = f"%{search}%" - query = query.or_( - f"title.ilike.{search_pattern},summary.ilike.{search_pattern},source_id.ilike.{search_pattern}" - ) - - # Get total count before pagination - # Clone the query for counting - count_query = self.supabase.from_("archon_sources").select( - "*", count="exact", head=True - ) - - # Apply same filters to count query - if knowledge_type: - count_query = count_query.eq("metadata->>knowledge_type", knowledge_type) - - if search: - search_pattern = f"%{search}%" - count_query = count_query.or_( - f"title.ilike.{search_pattern},summary.ilike.{search_pattern},source_id.ilike.{search_pattern}" - ) - - count_result = count_query.execute() - total = count_result.count if hasattr(count_result, "count") else 0 - - # Apply pagination at database level - start_idx = (page - 1) * per_page - query = query.range(start_idx, start_idx + per_page - 1) - - # Execute query - result = query.execute() - sources = result.data if result.data else [] - - # Get source IDs for batch queries - source_ids = [source["source_id"] for source in sources] - - # Debug log source IDs - safe_logfire_info(f"Source IDs for batch query: {source_ids}") - - # Batch fetch related data to avoid N+1 queries - first_urls = {} - code_example_counts = {} - chunk_counts = {} - - if source_ids: - # Batch fetch first URLs - urls_result = ( - self.supabase.from_("archon_crawled_pages") - .select("source_id, url") - .in_("source_id", source_ids) - .execute() - ) - - # Group URLs by source_id (take first one for each) - for item in urls_result.data or []: - if item["source_id"] not in first_urls: - first_urls[item["source_id"]] = item["url"] - - # Get code example counts per source - NO CONTENT, just counts! - # Fetch counts individually for each source - for source_id in source_ids: - count_result = ( - self.supabase.from_("archon_code_examples") - .select("id", count="exact", head=True) - .eq("source_id", source_id) - .execute() - ) - code_example_counts[source_id] = ( - count_result.count if hasattr(count_result, "count") else 0 - ) - - # Ensure all sources have a count (default to 0) - for source_id in source_ids: - if source_id not in code_example_counts: - code_example_counts[source_id] = 0 - chunk_counts[source_id] = 0 # Default to 0 to avoid timeout - - safe_logfire_info(f"Code example counts: {code_example_counts}") - - # Transform sources to items with batched data - items = [] - for source in sources: - source_id = source["source_id"] - source_metadata = source.get("metadata", {}) - - # Use batched data instead of individual queries - first_page_url = first_urls.get(source_id, f"source://{source_id}") - code_examples_count = code_example_counts.get(source_id, 0) - chunks_count = chunk_counts.get(source_id, 0) - - # Determine source type - source_type = self._determine_source_type(source_metadata, first_page_url) - - item = { - "id": source_id, - "title": source.get("title", source.get("summary", "Untitled")), - "url": first_page_url, - "source_id": source_id, - "code_examples": [{"count": code_examples_count}] - if code_examples_count > 0 - else [], # Minimal array just for count display - "metadata": { - "knowledge_type": source_metadata.get("knowledge_type", "technical"), - "tags": source_metadata.get("tags", []), - "source_type": source_type, - "status": "active", - "description": source_metadata.get( - "description", source.get("summary", "") - ), - "chunks_count": chunks_count, - "word_count": source.get("total_word_count", 0), - "estimated_pages": round(source.get("total_word_count", 0) / 250, 1), - "pages_tooltip": f"{round(source.get('total_word_count', 0) / 250, 1)} pages (≈ {source.get('total_word_count', 0):,} words)", - "last_scraped": source.get("updated_at"), - "file_name": source_metadata.get("file_name"), - "file_type": source_metadata.get("file_type"), - "update_frequency": source_metadata.get("update_frequency", 7), - "code_examples_count": code_examples_count, - **source_metadata, - }, - "created_at": source.get("created_at"), - "updated_at": source.get("updated_at"), - } - items.append(item) - - safe_logfire_info( - f"Knowledge items retrieved | total={total} | page={page} | filtered_count={len(items)}" - ) - - return { - "items": items, - "total": total, - "page": page, - "per_page": per_page, - "pages": (total + per_page - 1) // per_page, - } - - except Exception as e: - safe_logfire_error(f"Failed to list knowledge items | error={str(e)}") - raise - - async def get_item(self, source_id: str) -> dict[str, Any] | None: - """ - Get a single knowledge item by source ID. - - Args: - source_id: The source ID to retrieve - - Returns: - Knowledge item dict or None if not found - """ - try: - safe_logfire_info(f"Getting knowledge item | source_id={source_id}") - - # Get the source record - result = ( - self.supabase.from_("archon_sources") - .select("*") - .eq("source_id", source_id) - .single() - .execute() - ) - - if not result.data: - return None - - # Transform the source to item format - item = await self._transform_source_to_item(result.data) - return item - - except Exception as e: - safe_logfire_error( - f"Failed to get knowledge item | error={str(e)} | source_id={source_id}" - ) - return None - - async def update_item( - self, source_id: str, updates: dict[str, Any] - ) -> tuple[bool, dict[str, Any]]: - """ - Update a knowledge item's metadata. - - Args: - source_id: The source ID to update - updates: Dictionary of fields to update - - Returns: - Tuple of (success, result) - """ - try: - safe_logfire_info( - f"Updating knowledge item | source_id={source_id} | updates={updates}" - ) - - # Prepare update data - update_data = {} - - # Handle title updates - if "title" in updates: - update_data["title"] = updates["title"] - - # Handle metadata updates - metadata_fields = [ - "description", - "knowledge_type", - "tags", - "status", - "update_frequency", - "group_name", - ] - metadata_updates = {k: v for k, v in updates.items() if k in metadata_fields} - - if metadata_updates: - # Get current metadata - current_response = ( - self.supabase.table("archon_sources") - .select("metadata") - .eq("source_id", source_id) - .execute() - ) - if current_response.data: - current_metadata = current_response.data[0].get("metadata", {}) - current_metadata.update(metadata_updates) - update_data["metadata"] = current_metadata - else: - update_data["metadata"] = metadata_updates - - # Perform the update - result = ( - self.supabase.table("archon_sources") - .update(update_data) - .eq("source_id", source_id) - .execute() - ) - - if result.data: - safe_logfire_info(f"Knowledge item updated successfully | source_id={source_id}") - return True, { - "success": True, - "message": f"Successfully updated knowledge item {source_id}", - "source_id": source_id, - } - else: - safe_logfire_error(f"Knowledge item not found | source_id={source_id}") - return False, {"error": f"Knowledge item {source_id} not found"} - - except Exception as e: - safe_logfire_error( - f"Failed to update knowledge item | error={str(e)} | source_id={source_id}" - ) - return False, {"error": str(e)} - - async def get_available_sources(self) -> dict[str, Any]: - """ - Get all available sources with their details. - - Returns: - Dict containing sources list and count - """ - try: - # Query the sources table - result = self.supabase.from_("archon_sources").select("*").order("source_id").execute() - - # Format the sources - sources = [] - if result.data: - for source in result.data: - sources.append({ - "source_id": source.get("source_id"), - "title": source.get("title", source.get("summary", "Untitled")), - "summary": source.get("summary"), - "metadata": source.get("metadata", {}), - "total_words": source.get("total_words", source.get("total_word_count", 0)), - "update_frequency": source.get("update_frequency", 7), - "created_at": source.get("created_at"), - "updated_at": source.get("updated_at", source.get("created_at")), - }) - - return {"success": True, "sources": sources, "count": len(sources)} - - except Exception as e: - safe_logfire_error(f"Failed to get available sources | error={str(e)}") - return {"success": False, "error": str(e), "sources": [], "count": 0} - - async def _get_all_sources(self) -> list[dict[str, Any]]: - """Get all sources from the database.""" - result = await self.get_available_sources() - return result.get("sources", []) - - async def _transform_source_to_item(self, source: dict[str, Any]) -> dict[str, Any]: - """ - Transform a source record into a knowledge item with enriched data. - - Args: - source: The source record from database - - Returns: - Transformed knowledge item - """ - source_metadata = source.get("metadata", {}) - source_id = source["source_id"] - - # Get first page URL - first_page_url = await self._get_first_page_url(source_id) - - # Determine source type - source_type = self._determine_source_type(source_metadata, first_page_url) - - # Get code examples - code_examples = await self._get_code_examples(source_id) - - return { - "id": source_id, - "title": source.get("title", source.get("summary", "Untitled")), - "url": first_page_url, - "source_id": source_id, - "code_examples": code_examples, - "metadata": { - "knowledge_type": source_metadata.get("knowledge_type", "technical"), - "tags": source_metadata.get("tags", []), - "source_type": source_type, - "status": "active", - "description": source_metadata.get("description", source.get("summary", "")), - "chunks_count": await self._get_chunks_count(source_id), # Get actual chunk count - "word_count": source.get("total_words", 0), - "estimated_pages": round( - source.get("total_words", 0) / 250, 1 - ), # Average book page = 250 words - "pages_tooltip": f"{round(source.get('total_words', 0) / 250, 1)} pages (≈ {source.get('total_words', 0):,} words)", - "last_scraped": source.get("updated_at"), - "file_name": source_metadata.get("file_name"), - "file_type": source_metadata.get("file_type"), - "update_frequency": source.get("update_frequency", 7), - "code_examples_count": len(code_examples), - **source_metadata, - }, - "created_at": source.get("created_at"), - "updated_at": source.get("updated_at"), - } - - async def _get_first_page_url(self, source_id: str) -> str: - """Get the first page URL for a source.""" - try: - pages_response = ( - self.supabase.from_("archon_crawled_pages") - .select("url") - .eq("source_id", source_id) - .limit(1) - .execute() - ) - - if pages_response.data: - return pages_response.data[0].get("url", f"source://{source_id}") - - except Exception: - pass - - return f"source://{source_id}" - - async def _get_code_examples(self, source_id: str) -> list[dict[str, Any]]: - """Get code examples for a source.""" - try: - code_examples_response = ( - self.supabase.from_("archon_code_examples") - .select("id, content, summary, metadata") - .eq("source_id", source_id) - .execute() - ) - - return code_examples_response.data if code_examples_response.data else [] - - except Exception: - return [] - - def _determine_source_type(self, metadata: dict[str, Any], url: str) -> str: - """Determine the source type from metadata or URL pattern.""" - stored_source_type = metadata.get("source_type") - if stored_source_type: - return stored_source_type - - # Legacy fallback - check URL pattern - return "file" if url.startswith("file://") else "url" - - def _filter_by_search(self, items: list[dict[str, Any]], search: str) -> list[dict[str, Any]]: - """Filter items by search term.""" - search_lower = search.lower() - return [ - item - for item in items - if search_lower in item["title"].lower() - or search_lower in item["metadata"].get("description", "").lower() - or any(search_lower in tag.lower() for tag in item["metadata"].get("tags", [])) - ] - - def _filter_by_knowledge_type( - self, items: list[dict[str, Any]], knowledge_type: str - ) -> list[dict[str, Any]]: - """Filter items by knowledge type.""" - return [item for item in items if item["metadata"].get("knowledge_type") == knowledge_type] - - async def _get_chunks_count(self, source_id: str) -> int: - """Get the actual number of chunks for a source.""" - try: - # Count the actual rows in crawled_pages for this source - result = ( - self.supabase.table("archon_crawled_pages") - .select("*", count="exact") - .eq("source_id", source_id) - .execute() - ) - - # Return the count of pages (chunks) - return result.count if result.count else 0 - - except Exception as e: - # If we can't get chunk count, return 0 - safe_logfire_info(f"Failed to get chunk count for {source_id}: {e}") - return 0 +""" +Knowledge Item Service + +Handles all knowledge item CRUD operations and data transformations. +""" + +from typing import Any + +from ...config.logfire_config import safe_logfire_error, safe_logfire_info + + +class KnowledgeItemService: + """ + Service for managing knowledge items including listing, filtering, updating, and deletion. + """ + + def __init__(self, supabase_client): + """ + Initialize the knowledge item service. + + Args: + supabase_client: The Supabase client for database operations + """ + self.supabase = supabase_client + + async def list_items( + self, + page: int = 1, + per_page: int = 20, + knowledge_type: str | None = None, + search: str | None = None, + ) -> dict[str, Any]: + """ + List knowledge items with pagination and filtering. + + Args: + page: Page number (1-based) + per_page: Items per page + knowledge_type: Filter by knowledge type + search: Search term for filtering + + Returns: + Dict containing items, pagination info, and total count + """ + try: + # Build the query with filters at database level for better performance + query = self.supabase.from_("archon_sources").select("*") + + # Apply knowledge type filter at database level if provided + if knowledge_type: + query = query.eq("metadata->>knowledge_type", knowledge_type) + + # Apply search filter at database level if provided + if search: + search_pattern = f"%{search}%" + query = query.or_( + f"title.ilike.{search_pattern},summary.ilike.{search_pattern},source_id.ilike.{search_pattern}" + ) + + # Get total count before pagination + # Clone the query for counting + count_query = self.supabase.from_("archon_sources").select( + "*", count="exact", head=True + ) + + # Apply same filters to count query + if knowledge_type: + count_query = count_query.eq("metadata->>knowledge_type", knowledge_type) + + if search: + search_pattern = f"%{search}%" + count_query = count_query.or_( + f"title.ilike.{search_pattern},summary.ilike.{search_pattern},source_id.ilike.{search_pattern}" + ) + + count_result = count_query.execute() + total = count_result.count if hasattr(count_result, "count") else 0 + + # Apply pagination at database level + start_idx = (page - 1) * per_page + query = query.range(start_idx, start_idx + per_page - 1) + + # Execute query + result = query.execute() + sources = result.data if result.data else [] + + # Get source IDs for batch queries + source_ids = [source["source_id"] for source in sources] + + # Debug log source IDs + safe_logfire_info(f"Source IDs for batch query: {source_ids}") + + # Batch fetch related data to avoid N+1 queries + first_urls = {} + code_example_counts = {} + chunk_counts = {} + + if source_ids: + # Batch fetch first URLs + urls_result = ( + self.supabase.from_("archon_crawled_pages") + .select("source_id, url") + .in_("source_id", source_ids) + .execute() + ) + + # Group URLs by source_id (take first one for each) + for item in urls_result.data or []: + if item["source_id"] not in first_urls: + first_urls[item["source_id"]] = item["url"] + + # Get code example counts per source - NO CONTENT, just counts! + # Fetch counts individually for each source + for source_id in source_ids: + count_result = ( + self.supabase.from_("archon_code_examples") + .select("id", count="exact", head=True) + .eq("source_id", source_id) + .execute() + ) + code_example_counts[source_id] = ( + count_result.count if hasattr(count_result, "count") else 0 + ) + + # Ensure all sources have a count (default to 0) + for source_id in source_ids: + if source_id not in code_example_counts: + code_example_counts[source_id] = 0 + chunk_counts[source_id] = 0 # Default to 0 to avoid timeout + + safe_logfire_info(f"Code example counts: {code_example_counts}") + + # Transform sources to items with batched data + items = [] + for source in sources: + source_id = source["source_id"] + source_metadata = source.get("metadata", {}) + + # Use batched data instead of individual queries + first_page_url = first_urls.get(source_id, f"source://{source_id}") + code_examples_count = code_example_counts.get(source_id, 0) + chunks_count = chunk_counts.get(source_id, 0) + + # Determine source type + source_type = self._determine_source_type(source_metadata, first_page_url) + + item = { + "id": source_id, + "title": source.get("title", source.get("summary", "Untitled")), + "url": first_page_url, + "source_id": source_id, + "code_examples": [{"count": code_examples_count}] + if code_examples_count > 0 + else [], # Minimal array just for count display + "metadata": { + "knowledge_type": source_metadata.get("knowledge_type", "technical"), + "tags": source_metadata.get("tags", []), + "source_type": source_type, + "status": "active", + "description": source_metadata.get( + "description", source.get("summary", "") + ), + "chunks_count": chunks_count, + "word_count": source.get("total_word_count", 0), + "estimated_pages": round(source.get("total_word_count", 0) / 250, 1), + "pages_tooltip": f"{round(source.get('total_word_count', 0) / 250, 1)} pages (≈ {source.get('total_word_count', 0):,} words)", + "last_scraped": source.get("updated_at"), + "file_name": source_metadata.get("file_name"), + "file_type": source_metadata.get("file_type"), + "update_frequency": source_metadata.get("update_frequency", 7), + "code_examples_count": code_examples_count, + **source_metadata, + }, + "created_at": source.get("created_at"), + "updated_at": source.get("updated_at"), + } + items.append(item) + + safe_logfire_info( + f"Knowledge items retrieved | total={total} | page={page} | filtered_count={len(items)}" + ) + + return { + "items": items, + "total": total, + "page": page, + "per_page": per_page, + "pages": (total + per_page - 1) // per_page, + } + + except Exception as e: + safe_logfire_error(f"Failed to list knowledge items | error={str(e)}") + raise + + async def get_item(self, source_id: str) -> dict[str, Any] | None: + """ + Get a single knowledge item by source ID. + + Args: + source_id: The source ID to retrieve + + Returns: + Knowledge item dict or None if not found + """ + try: + safe_logfire_info(f"Getting knowledge item | source_id={source_id}") + + # Get the source record + result = ( + self.supabase.from_("archon_sources") + .select("*") + .eq("source_id", source_id) + .single() + .execute() + ) + + if not result.data: + return None + + # Transform the source to item format + item = await self._transform_source_to_item(result.data) + return item + + except Exception as e: + safe_logfire_error( + f"Failed to get knowledge item | error={str(e)} | source_id={source_id}" + ) + return None + + async def update_item( + self, source_id: str, updates: dict[str, Any] + ) -> tuple[bool, dict[str, Any]]: + """ + Update a knowledge item's metadata. + + Args: + source_id: The source ID to update + updates: Dictionary of fields to update + + Returns: + Tuple of (success, result) + """ + try: + safe_logfire_info( + f"Updating knowledge item | source_id={source_id} | updates={updates}" + ) + + # Prepare update data + update_data = {} + + # Handle title updates + if "title" in updates: + update_data["title"] = updates["title"] + + # Handle metadata updates + metadata_fields = [ + "description", + "knowledge_type", + "tags", + "status", + "update_frequency", + "group_name", + ] + metadata_updates = {k: v for k, v in updates.items() if k in metadata_fields} + + if metadata_updates: + # Get current metadata + current_response = ( + self.supabase.table("archon_sources") + .select("metadata") + .eq("source_id", source_id) + .execute() + ) + if current_response.data: + current_metadata = current_response.data[0].get("metadata", {}) + current_metadata.update(metadata_updates) + update_data["metadata"] = current_metadata + else: + update_data["metadata"] = metadata_updates + + # Perform the update + result = ( + self.supabase.table("archon_sources") + .update(update_data) + .eq("source_id", source_id) + .execute() + ) + + if result.data: + safe_logfire_info(f"Knowledge item updated successfully | source_id={source_id}") + return True, { + "success": True, + "message": f"Successfully updated knowledge item {source_id}", + "source_id": source_id, + } + else: + safe_logfire_error(f"Knowledge item not found | source_id={source_id}") + return False, {"error": f"Knowledge item {source_id} not found"} + + except Exception as e: + safe_logfire_error( + f"Failed to update knowledge item | error={str(e)} | source_id={source_id}" + ) + return False, {"error": str(e)} + + async def get_available_sources(self) -> dict[str, Any]: + """ + Get all available sources with their details. + + Returns: + Dict containing sources list and count + """ + try: + # Query the sources table + result = self.supabase.from_("archon_sources").select("*").order("source_id").execute() + + # Format the sources + sources = [] + if result.data: + for source in result.data: + sources.append({ + "source_id": source.get("source_id"), + "title": source.get("title", source.get("summary", "Untitled")), + "summary": source.get("summary"), + "metadata": source.get("metadata", {}), + "total_words": source.get("total_words", source.get("total_word_count", 0)), + "update_frequency": source.get("update_frequency", 7), + "created_at": source.get("created_at"), + "updated_at": source.get("updated_at", source.get("created_at")), + }) + + return {"success": True, "sources": sources, "count": len(sources)} + + except Exception as e: + safe_logfire_error(f"Failed to get available sources | error={str(e)}") + return {"success": False, "error": str(e), "sources": [], "count": 0} + + async def _get_all_sources(self) -> list[dict[str, Any]]: + """Get all sources from the database.""" + result = await self.get_available_sources() + return result.get("sources", []) + + async def _transform_source_to_item(self, source: dict[str, Any]) -> dict[str, Any]: + """ + Transform a source record into a knowledge item with enriched data. + + Args: + source: The source record from database + + Returns: + Transformed knowledge item + """ + source_metadata = source.get("metadata", {}) + source_id = source["source_id"] + + # Get first page URL + first_page_url = await self._get_first_page_url(source_id) + + # Determine source type + source_type = self._determine_source_type(source_metadata, first_page_url) + + # Get code examples + code_examples = await self._get_code_examples(source_id) + + return { + "id": source_id, + "title": source.get("title", source.get("summary", "Untitled")), + "url": first_page_url, + "source_id": source_id, + "code_examples": code_examples, + "metadata": { + # Spread source_metadata first, then override with computed values + **source_metadata, + "knowledge_type": source_metadata.get("knowledge_type", "technical"), + "tags": source_metadata.get("tags", []), + "source_type": source_type, # This should be the correctly determined source_type + "status": "active", + "description": source_metadata.get("description", source.get("summary", "")), + "chunks_count": await self._get_chunks_count(source_id), # Get actual chunk count + "word_count": source.get("total_words", 0), + "estimated_pages": round( + source.get("total_words", 0) / 250, 1 + ), # Average book page = 250 words + "pages_tooltip": f"{round(source.get('total_words', 0) / 250, 1)} pages (≈ {source.get('total_words', 0):,} words)", + "last_scraped": source.get("updated_at"), + "file_name": source_metadata.get("file_name"), + "file_type": source_metadata.get("file_type"), + "update_frequency": source.get("update_frequency", 7), + "code_examples_count": len(code_examples), + }, + "created_at": source.get("created_at"), + "updated_at": source.get("updated_at"), + } + + async def _get_first_page_url(self, source_id: str) -> str: + """Get the first page URL for a source.""" + try: + pages_response = ( + self.supabase.from_("archon_crawled_pages") + .select("url") + .eq("source_id", source_id) + .limit(1) + .execute() + ) + + if pages_response.data: + return pages_response.data[0].get("url", f"source://{source_id}") + + except Exception: + pass + + return f"source://{source_id}" + + async def _get_code_examples(self, source_id: str) -> list[dict[str, Any]]: + """Get code examples for a source.""" + try: + code_examples_response = ( + self.supabase.from_("archon_code_examples") + .select("id, content, summary, metadata") + .eq("source_id", source_id) + .execute() + ) + + return code_examples_response.data if code_examples_response.data else [] + + except Exception: + return [] + + def _determine_source_type(self, metadata: dict[str, Any], url: str) -> str: + """Determine the source type from metadata or URL pattern.""" + stored_source_type = metadata.get("source_type") + if stored_source_type: + return stored_source_type + + # Legacy fallback - check URL pattern + return "file" if url.startswith("file://") else "url" + + def _filter_by_search(self, items: list[dict[str, Any]], search: str) -> list[dict[str, Any]]: + """Filter items by search term.""" + search_lower = search.lower() + return [ + item + for item in items + if search_lower in item["title"].lower() + or search_lower in item["metadata"].get("description", "").lower() + or any(search_lower in tag.lower() for tag in item["metadata"].get("tags", [])) + ] + + def _filter_by_knowledge_type( + self, items: list[dict[str, Any]], knowledge_type: str + ) -> list[dict[str, Any]]: + """Filter items by knowledge type.""" + return [item for item in items if item["metadata"].get("knowledge_type") == knowledge_type] + + async def _get_chunks_count(self, source_id: str) -> int: + """Get the actual number of chunks for a source.""" + try: + # Count the actual rows in crawled_pages for this source + result = ( + self.supabase.table("archon_crawled_pages") + .select("*", count="exact") + .eq("source_id", source_id) + .execute() + ) + + # Return the count of pages (chunks) + return result.count if result.count else 0 + + except Exception as e: + # If we can't get chunk count, return 0 + safe_logfire_info(f"Failed to get chunk count for {source_id}: {e}") + return 0 diff --git a/python/src/server/services/source_management_service.py b/python/src/server/services/source_management_service.py index e49a1388..bd1a65d3 100644 --- a/python/src/server/services/source_management_service.py +++ b/python/src/server/services/source_management_service.py @@ -1,649 +1,660 @@ -""" -Source Management Service - -Handles source metadata, summaries, and management. -Consolidates both utility functions and class-based service. -""" - -from typing import Any - -from supabase import Client - -from ..config.logfire_config import get_logger, search_logger -from .client_manager import get_supabase_client - -logger = get_logger(__name__) - - -def _get_model_choice() -> str: - """Get MODEL_CHOICE with direct fallback.""" - try: - # Direct cache/env fallback - from .credential_service import credential_service - - if credential_service._cache_initialized and "MODEL_CHOICE" in credential_service._cache: - model = credential_service._cache["MODEL_CHOICE"] - else: - model = os.getenv("MODEL_CHOICE", "gpt-4.1-nano") - logger.debug(f"Using model choice: {model}") - return model - except Exception as e: - logger.warning(f"Error getting model choice: {e}, using default") - return "gpt-4.1-nano" - - -def extract_source_summary( - source_id: str, content: str, max_length: int = 500, provider: str = None -) -> str: - """ - Extract a summary for a source from its content using an LLM. - - This function uses the configured provider to generate a concise summary of the source content. - - Args: - source_id: The source ID (domain) - content: The content to extract a summary from - max_length: Maximum length of the summary - provider: Optional provider override - - Returns: - A summary string - """ - # Default summary if we can't extract anything meaningful - default_summary = f"Content from {source_id}" - - if not content or len(content.strip()) == 0: - return default_summary - - # Get the model choice from credential service (RAG setting) - model_choice = _get_model_choice() - search_logger.info(f"Generating summary for {source_id} using model: {model_choice}") - - # Limit content length to avoid token limits - truncated_content = content[:25000] if len(content) > 25000 else content - - # Create the prompt for generating the summary - prompt = f""" -{truncated_content} - - -The above content is from the documentation for '{source_id}'. Please provide a concise summary (3-5 sentences) that describes what this library/tool/framework is about. The summary should help understand what the library/tool/framework accomplishes and the purpose. -""" - - try: - try: - import os - - import openai - - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - # Try to get from credential service with direct fallback - from .credential_service import credential_service - - if ( - credential_service._cache_initialized - and "OPENAI_API_KEY" in credential_service._cache - ): - cached_key = credential_service._cache["OPENAI_API_KEY"] - if isinstance(cached_key, dict) and cached_key.get("is_encrypted"): - api_key = credential_service._decrypt_value(cached_key["encrypted_value"]) - else: - api_key = cached_key - else: - api_key = os.getenv("OPENAI_API_KEY", "") - - if not api_key: - raise ValueError("No OpenAI API key available") - - client = openai.OpenAI(api_key=api_key) - search_logger.info("Successfully created LLM client fallback for summary generation") - except Exception as e: - search_logger.error(f"Failed to create LLM client fallback: {e}") - return default_summary - - # Call the OpenAI API to generate the summary - response = client.chat.completions.create( - model=model_choice, - messages=[ - { - "role": "system", - "content": "You are a helpful assistant that provides concise library/tool/framework summaries.", - }, - {"role": "user", "content": prompt}, - ], - ) - - # Extract the generated summary with proper error handling - if not response or not response.choices or len(response.choices) == 0: - search_logger.error(f"Empty or invalid response from LLM for {source_id}") - return default_summary - - message_content = response.choices[0].message.content - if message_content is None: - search_logger.error(f"LLM returned None content for {source_id}") - return default_summary - - summary = message_content.strip() - - # Ensure the summary is not too long - if len(summary) > max_length: - summary = summary[:max_length] + "..." - - return summary - - except Exception as e: - search_logger.error( - f"Error generating summary with LLM for {source_id}: {e}. Using default summary." - ) - return default_summary - - -def generate_source_title_and_metadata( - source_id: str, - content: str, - knowledge_type: str = "technical", - tags: list[str] | None = None, - provider: str = None, -) -> tuple[str, dict[str, Any]]: - """ - Generate a user-friendly title and metadata for a source based on its content. - - Args: - source_id: The source ID (domain) - content: Sample content from the source - knowledge_type: Type of knowledge (default: "technical") - tags: Optional list of tags - - Returns: - Tuple of (title, metadata) - """ - # Default title is the source ID - title = source_id - - # Try to generate a better title from content - if content and len(content.strip()) > 100: - try: - try: - import os - - import openai - - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - # Try to get from credential service with direct fallback - from .credential_service import credential_service - - if ( - credential_service._cache_initialized - and "OPENAI_API_KEY" in credential_service._cache - ): - cached_key = credential_service._cache["OPENAI_API_KEY"] - if isinstance(cached_key, dict) and cached_key.get("is_encrypted"): - api_key = credential_service._decrypt_value( - cached_key["encrypted_value"] - ) - else: - api_key = cached_key - else: - api_key = os.getenv("OPENAI_API_KEY", "") - - if not api_key: - raise ValueError("No OpenAI API key available") - - client = openai.OpenAI(api_key=api_key) - except Exception as e: - search_logger.error( - f"Failed to create LLM client fallback for title generation: {e}" - ) - # Don't proceed if client creation fails - raise - - model_choice = _get_model_choice() - - # Limit content for prompt - sample_content = content[:3000] if len(content) > 3000 else content - - prompt = f"""Based on this content from {source_id}, generate a concise, descriptive title (3-6 words) that captures what this source is about: - -{sample_content} - -Provide only the title, nothing else.""" - - response = client.chat.completions.create( - model=model_choice, - messages=[ - { - "role": "system", - "content": "You are a helpful assistant that generates concise titles.", - }, - {"role": "user", "content": prompt}, - ], - ) - - generated_title = response.choices[0].message.content.strip() - # Clean up the title - generated_title = generated_title.strip("\"'") - if len(generated_title) < 50: # Sanity check - title = generated_title - - except Exception as e: - search_logger.error(f"Error generating title for {source_id}: {e}") - - # Build metadata - metadata = {"knowledge_type": knowledge_type, "tags": tags or [], "auto_generated": True} - - return title, metadata - - -def update_source_info( - client: Client, - source_id: str, - summary: str, - word_count: int, - content: str = "", - knowledge_type: str = "technical", - tags: list[str] | None = None, - update_frequency: int = 7, - original_url: str | None = None, -): - """ - Update or insert source information in the sources table. - - Args: - client: Supabase client - source_id: The source ID (domain) - summary: Summary of the source - word_count: Total word count for the source - content: Sample content for title generation - knowledge_type: Type of knowledge - tags: List of tags - update_frequency: Update frequency in days - """ - try: - # First, check if source already exists to preserve title - existing_source = ( - client.table("archon_sources").select("title").eq("source_id", source_id).execute() - ) - - if existing_source.data: - # Source exists - preserve the existing title - existing_title = existing_source.data[0]["title"] - search_logger.info(f"Preserving existing title for {source_id}: {existing_title}") - - # Update metadata while preserving title - metadata = { - "knowledge_type": knowledge_type, - "tags": tags or [], - "auto_generated": False, # Mark as not auto-generated since we're preserving - "update_frequency": update_frequency, - } - if original_url: - metadata["original_url"] = original_url - - # Update existing source (preserving title) - result = ( - client.table("archon_sources") - .update({ - "summary": summary, - "total_word_count": word_count, - "metadata": metadata, - "updated_at": "now()", - }) - .eq("source_id", source_id) - .execute() - ) - - search_logger.info( - f"Updated source {source_id} while preserving title: {existing_title}" - ) - else: - # New source - generate title and metadata - title, metadata = generate_source_title_and_metadata( - source_id, content, knowledge_type, tags - ) - - # Add update_frequency and original_url to metadata - metadata["update_frequency"] = update_frequency - if original_url: - metadata["original_url"] = original_url - - # Insert new source - client.table("archon_sources").insert({ - "source_id": source_id, - "title": title, - "summary": summary, - "total_word_count": word_count, - "metadata": metadata, - }).execute() - search_logger.info(f"Created new source {source_id} with title: {title}") - - except Exception as e: - search_logger.error(f"Error updating source {source_id}: {e}") - raise # Re-raise the exception so the caller knows it failed - - -class SourceManagementService: - """Service class for source management operations""" - - def __init__(self, supabase_client=None): - """Initialize with optional supabase client""" - self.supabase_client = supabase_client or get_supabase_client() - - def get_available_sources(self) -> tuple[bool, dict[str, Any]]: - """ - Get all available sources from the sources table. - - Returns a list of all unique sources that have been crawled and stored. - - Returns: - Tuple of (success, result_dict) - """ - try: - response = self.supabase_client.table("archon_sources").select("*").execute() - - sources = [] - for row in response.data: - sources.append({ - "source_id": row["source_id"], - "title": row.get("title", ""), - "summary": row.get("summary", ""), - "created_at": row.get("created_at", ""), - "updated_at": row.get("updated_at", ""), - }) - - return True, {"sources": sources, "total_count": len(sources)} - - except Exception as e: - logger.error(f"Error retrieving sources: {e}") - return False, {"error": f"Error retrieving sources: {str(e)}"} - - def delete_source(self, source_id: str) -> tuple[bool, dict[str, Any]]: - """ - Delete a source and all associated crawled pages and code examples from the database. - - Args: - source_id: The source ID to delete - - Returns: - Tuple of (success, result_dict) - """ - try: - logger.info(f"Starting delete_source for source_id: {source_id}") - - # Delete from crawled_pages table - try: - logger.info(f"Deleting from crawled_pages table for source_id: {source_id}") - pages_response = ( - self.supabase_client.table("archon_crawled_pages") - .delete() - .eq("source_id", source_id) - .execute() - ) - pages_deleted = len(pages_response.data) if pages_response.data else 0 - logger.info(f"Deleted {pages_deleted} pages from crawled_pages") - except Exception as pages_error: - logger.error(f"Failed to delete from crawled_pages: {pages_error}") - return False, {"error": f"Failed to delete crawled pages: {str(pages_error)}"} - - # Delete from code_examples table - try: - logger.info(f"Deleting from code_examples table for source_id: {source_id}") - code_response = ( - self.supabase_client.table("archon_code_examples") - .delete() - .eq("source_id", source_id) - .execute() - ) - code_deleted = len(code_response.data) if code_response.data else 0 - logger.info(f"Deleted {code_deleted} code examples") - except Exception as code_error: - logger.error(f"Failed to delete from code_examples: {code_error}") - return False, {"error": f"Failed to delete code examples: {str(code_error)}"} - - # Delete from sources table - try: - logger.info(f"Deleting from sources table for source_id: {source_id}") - source_response = ( - self.supabase_client.table("archon_sources") - .delete() - .eq("source_id", source_id) - .execute() - ) - source_deleted = len(source_response.data) if source_response.data else 0 - logger.info(f"Deleted {source_deleted} source records") - except Exception as source_error: - logger.error(f"Failed to delete from sources: {source_error}") - return False, {"error": f"Failed to delete source: {str(source_error)}"} - - logger.info("Delete operation completed successfully") - return True, { - "source_id": source_id, - "pages_deleted": pages_deleted, - "code_examples_deleted": code_deleted, - "source_records_deleted": source_deleted, - } - - except Exception as e: - logger.error(f"Unexpected error in delete_source: {e}") - return False, {"error": f"Error deleting source: {str(e)}"} - - def update_source_metadata( - self, - source_id: str, - title: str = None, - summary: str = None, - word_count: int = None, - knowledge_type: str = None, - tags: list[str] = None, - ) -> tuple[bool, dict[str, Any]]: - """ - Update source metadata. - - Args: - source_id: The source ID to update - title: Optional new title - summary: Optional new summary - word_count: Optional new word count - knowledge_type: Optional new knowledge type - tags: Optional new tags list - - Returns: - Tuple of (success, result_dict) - """ - try: - # Build update data - update_data = {} - if title is not None: - update_data["title"] = title - if summary is not None: - update_data["summary"] = summary - if word_count is not None: - update_data["total_word_count"] = word_count - - # Handle metadata fields - if knowledge_type is not None or tags is not None: - # Get existing metadata - existing = ( - self.supabase_client.table("archon_sources") - .select("metadata") - .eq("source_id", source_id) - .execute() - ) - metadata = existing.data[0].get("metadata", {}) if existing.data else {} - - if knowledge_type is not None: - metadata["knowledge_type"] = knowledge_type - if tags is not None: - metadata["tags"] = tags - - update_data["metadata"] = metadata - - if not update_data: - return False, {"error": "No update data provided"} - - # Update the source - response = ( - self.supabase_client.table("archon_sources") - .update(update_data) - .eq("source_id", source_id) - .execute() - ) - - if response.data: - return True, {"source_id": source_id, "updated_fields": list(update_data.keys())} - else: - return False, {"error": f"Source with ID {source_id} not found"} - - except Exception as e: - logger.error(f"Error updating source metadata: {e}") - return False, {"error": f"Error updating source metadata: {str(e)}"} - - def create_source_info( - self, - source_id: str, - content_sample: str, - word_count: int = 0, - knowledge_type: str = "technical", - tags: list[str] = None, - update_frequency: int = 7, - ) -> tuple[bool, dict[str, Any]]: - """ - Create source information entry. - - Args: - source_id: The source ID - content_sample: Sample content for generating summary - word_count: Total word count for the source - knowledge_type: Type of knowledge (default: "technical") - tags: List of tags - update_frequency: Update frequency in days - - Returns: - Tuple of (success, result_dict) - """ - try: - if tags is None: - tags = [] - - # Generate source summary using the utility function - source_summary = extract_source_summary(source_id, content_sample) - - # Create the source info using the utility function - update_source_info( - self.supabase_client, - source_id, - source_summary, - word_count, - content_sample[:5000], - knowledge_type, - tags, - update_frequency, - ) - - return True, { - "source_id": source_id, - "summary": source_summary, - "word_count": word_count, - "knowledge_type": knowledge_type, - "tags": tags, - } - - except Exception as e: - logger.error(f"Error creating source info: {e}") - return False, {"error": f"Error creating source info: {str(e)}"} - - def get_source_details(self, source_id: str) -> tuple[bool, dict[str, Any]]: - """ - Get detailed information about a specific source. - - Args: - source_id: The source ID to look up - - Returns: - Tuple of (success, result_dict) - """ - try: - # Get source metadata - source_response = ( - self.supabase_client.table("archon_sources") - .select("*") - .eq("source_id", source_id) - .execute() - ) - - if not source_response.data: - return False, {"error": f"Source with ID {source_id} not found"} - - source_data = source_response.data[0] - - # Get page count - pages_response = ( - self.supabase_client.table("archon_crawled_pages") - .select("id") - .eq("source_id", source_id) - .execute() - ) - page_count = len(pages_response.data) if pages_response.data else 0 - - # Get code example count - code_response = ( - self.supabase_client.table("archon_code_examples") - .select("id") - .eq("source_id", source_id) - .execute() - ) - code_count = len(code_response.data) if code_response.data else 0 - - return True, { - "source": source_data, - "page_count": page_count, - "code_example_count": code_count, - } - - except Exception as e: - logger.error(f"Error getting source details: {e}") - return False, {"error": f"Error getting source details: {str(e)}"} - - def list_sources_by_type(self, knowledge_type: str = None) -> tuple[bool, dict[str, Any]]: - """ - List sources filtered by knowledge type. - - Args: - knowledge_type: Optional knowledge type filter - - Returns: - Tuple of (success, result_dict) - """ - try: - query = self.supabase_client.table("archon_sources").select("*") - - if knowledge_type: - # Filter by metadata->knowledge_type - query = query.filter("metadata->>knowledge_type", "eq", knowledge_type) - - response = query.execute() - - sources = [] - for row in response.data: - metadata = row.get("metadata", {}) - sources.append({ - "source_id": row["source_id"], - "title": row.get("title", ""), - "summary": row.get("summary", ""), - "knowledge_type": metadata.get("knowledge_type", ""), - "tags": metadata.get("tags", []), - "total_word_count": row.get("total_word_count", 0), - "created_at": row.get("created_at", ""), - "updated_at": row.get("updated_at", ""), - }) - - return True, { - "sources": sources, - "total_count": len(sources), - "knowledge_type_filter": knowledge_type, - } - - except Exception as e: - logger.error(f"Error listing sources by type: {e}") - return False, {"error": f"Error listing sources by type: {str(e)}"} +""" +Source Management Service + +Handles source metadata, summaries, and management. +Consolidates both utility functions and class-based service. +""" + +from typing import Any + +from supabase import Client + +from ..config.logfire_config import get_logger, search_logger +from .client_manager import get_supabase_client + +logger = get_logger(__name__) + + +def _get_model_choice() -> str: + """Get MODEL_CHOICE with direct fallback.""" + try: + # Direct cache/env fallback + from .credential_service import credential_service + + if credential_service._cache_initialized and "MODEL_CHOICE" in credential_service._cache: + model = credential_service._cache["MODEL_CHOICE"] + else: + model = os.getenv("MODEL_CHOICE", "gpt-4.1-nano") + logger.debug(f"Using model choice: {model}") + return model + except Exception as e: + logger.warning(f"Error getting model choice: {e}, using default") + return "gpt-4.1-nano" + + +def extract_source_summary( + source_id: str, content: str, max_length: int = 500, provider: str = None +) -> str: + """ + Extract a summary for a source from its content using an LLM. + + This function uses the configured provider to generate a concise summary of the source content. + + Args: + source_id: The source ID (domain) + content: The content to extract a summary from + max_length: Maximum length of the summary + provider: Optional provider override + + Returns: + A summary string + """ + # Default summary if we can't extract anything meaningful + default_summary = f"Content from {source_id}" + + if not content or len(content.strip()) == 0: + return default_summary + + # Get the model choice from credential service (RAG setting) + model_choice = _get_model_choice() + search_logger.info(f"Generating summary for {source_id} using model: {model_choice}") + + # Limit content length to avoid token limits + truncated_content = content[:25000] if len(content) > 25000 else content + + # Create the prompt for generating the summary + prompt = f""" +{truncated_content} + + +The above content is from the documentation for '{source_id}'. Please provide a concise summary (3-5 sentences) that describes what this library/tool/framework is about. The summary should help understand what the library/tool/framework accomplishes and the purpose. +""" + + try: + try: + import os + + import openai + + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + # Try to get from credential service with direct fallback + from .credential_service import credential_service + + if ( + credential_service._cache_initialized + and "OPENAI_API_KEY" in credential_service._cache + ): + cached_key = credential_service._cache["OPENAI_API_KEY"] + if isinstance(cached_key, dict) and cached_key.get("is_encrypted"): + api_key = credential_service._decrypt_value(cached_key["encrypted_value"]) + else: + api_key = cached_key + else: + api_key = os.getenv("OPENAI_API_KEY", "") + + if not api_key: + raise ValueError("No OpenAI API key available") + + client = openai.OpenAI(api_key=api_key) + search_logger.info("Successfully created LLM client fallback for summary generation") + except Exception as e: + search_logger.error(f"Failed to create LLM client fallback: {e}") + return default_summary + + # Call the OpenAI API to generate the summary + response = client.chat.completions.create( + model=model_choice, + messages=[ + { + "role": "system", + "content": "You are a helpful assistant that provides concise library/tool/framework summaries.", + }, + {"role": "user", "content": prompt}, + ], + ) + + # Extract the generated summary with proper error handling + if not response or not response.choices or len(response.choices) == 0: + search_logger.error(f"Empty or invalid response from LLM for {source_id}") + return default_summary + + message_content = response.choices[0].message.content + if message_content is None: + search_logger.error(f"LLM returned None content for {source_id}") + return default_summary + + summary = message_content.strip() + + # Ensure the summary is not too long + if len(summary) > max_length: + summary = summary[:max_length] + "..." + + return summary + + except Exception as e: + search_logger.error( + f"Error generating summary with LLM for {source_id}: {e}. Using default summary." + ) + return default_summary + + +def generate_source_title_and_metadata( + source_id: str, + content: str, + knowledge_type: str = "technical", + tags: list[str] | None = None, + provider: str = None, +) -> tuple[str, dict[str, Any]]: + """ + Generate a user-friendly title and metadata for a source based on its content. + + Args: + source_id: The source ID (domain) + content: Sample content from the source + knowledge_type: Type of knowledge (default: "technical") + tags: Optional list of tags + + Returns: + Tuple of (title, metadata) + """ + # Default title is the source ID + title = source_id + + # Try to generate a better title from content + if content and len(content.strip()) > 100: + try: + try: + import os + + import openai + + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + # Try to get from credential service with direct fallback + from .credential_service import credential_service + + if ( + credential_service._cache_initialized + and "OPENAI_API_KEY" in credential_service._cache + ): + cached_key = credential_service._cache["OPENAI_API_KEY"] + if isinstance(cached_key, dict) and cached_key.get("is_encrypted"): + api_key = credential_service._decrypt_value( + cached_key["encrypted_value"] + ) + else: + api_key = cached_key + else: + api_key = os.getenv("OPENAI_API_KEY", "") + + if not api_key: + raise ValueError("No OpenAI API key available") + + client = openai.OpenAI(api_key=api_key) + except Exception as e: + search_logger.error( + f"Failed to create LLM client fallback for title generation: {e}" + ) + # Don't proceed if client creation fails + raise + + model_choice = _get_model_choice() + + # Limit content for prompt + sample_content = content[:3000] if len(content) > 3000 else content + + prompt = f"""Based on this content from {source_id}, generate a concise, descriptive title (3-6 words) that captures what this source is about: + +{sample_content} + +Provide only the title, nothing else.""" + + response = client.chat.completions.create( + model=model_choice, + messages=[ + { + "role": "system", + "content": "You are a helpful assistant that generates concise titles.", + }, + {"role": "user", "content": prompt}, + ], + ) + + generated_title = response.choices[0].message.content.strip() + # Clean up the title + generated_title = generated_title.strip("\"'") + if len(generated_title) < 50: # Sanity check + title = generated_title + + except Exception as e: + search_logger.error(f"Error generating title for {source_id}: {e}") + + # Build metadata - determine source_type from source_id pattern + source_type = "file" if source_id.startswith("file_") else "url" + metadata = { + "knowledge_type": knowledge_type, + "tags": tags or [], + "source_type": source_type, + "auto_generated": True + } + + return title, metadata + + +def update_source_info( + client: Client, + source_id: str, + summary: str, + word_count: int, + content: str = "", + knowledge_type: str = "technical", + tags: list[str] | None = None, + update_frequency: int = 7, + original_url: str | None = None, +): + """ + Update or insert source information in the sources table. + + Args: + client: Supabase client + source_id: The source ID (domain) + summary: Summary of the source + word_count: Total word count for the source + content: Sample content for title generation + knowledge_type: Type of knowledge + tags: List of tags + update_frequency: Update frequency in days + """ + search_logger.info(f"Updating source {source_id} with knowledge_type={knowledge_type}") + try: + # First, check if source already exists to preserve title + existing_source = ( + client.table("archon_sources").select("title").eq("source_id", source_id).execute() + ) + + if existing_source.data: + # Source exists - preserve the existing title + existing_title = existing_source.data[0]["title"] + search_logger.info(f"Preserving existing title for {source_id}: {existing_title}") + + # Update metadata while preserving title + source_type = "file" if source_id.startswith("file_") else "url" + metadata = { + "knowledge_type": knowledge_type, + "tags": tags or [], + "source_type": source_type, + "auto_generated": False, # Mark as not auto-generated since we're preserving + "update_frequency": update_frequency, + } + search_logger.info(f"Updating existing source {source_id} metadata: knowledge_type={knowledge_type}") + if original_url: + metadata["original_url"] = original_url + + # Update existing source (preserving title) + result = ( + client.table("archon_sources") + .update({ + "summary": summary, + "total_word_count": word_count, + "metadata": metadata, + "updated_at": "now()", + }) + .eq("source_id", source_id) + .execute() + ) + + search_logger.info( + f"Updated source {source_id} while preserving title: {existing_title}" + ) + else: + # New source - generate title and metadata + title, metadata = generate_source_title_and_metadata( + source_id, content, knowledge_type, tags + ) + + # Add update_frequency and original_url to metadata + metadata["update_frequency"] = update_frequency + if original_url: + metadata["original_url"] = original_url + + search_logger.info(f"Creating new source {source_id} with knowledge_type={knowledge_type}") + # Insert new source + client.table("archon_sources").insert({ + "source_id": source_id, + "title": title, + "summary": summary, + "total_word_count": word_count, + "metadata": metadata, + }).execute() + search_logger.info(f"Created new source {source_id} with title: {title}") + + except Exception as e: + search_logger.error(f"Error updating source {source_id}: {e}") + raise # Re-raise the exception so the caller knows it failed + + +class SourceManagementService: + """Service class for source management operations""" + + def __init__(self, supabase_client=None): + """Initialize with optional supabase client""" + self.supabase_client = supabase_client or get_supabase_client() + + def get_available_sources(self) -> tuple[bool, dict[str, Any]]: + """ + Get all available sources from the sources table. + + Returns a list of all unique sources that have been crawled and stored. + + Returns: + Tuple of (success, result_dict) + """ + try: + response = self.supabase_client.table("archon_sources").select("*").execute() + + sources = [] + for row in response.data: + sources.append({ + "source_id": row["source_id"], + "title": row.get("title", ""), + "summary": row.get("summary", ""), + "created_at": row.get("created_at", ""), + "updated_at": row.get("updated_at", ""), + }) + + return True, {"sources": sources, "total_count": len(sources)} + + except Exception as e: + logger.error(f"Error retrieving sources: {e}") + return False, {"error": f"Error retrieving sources: {str(e)}"} + + def delete_source(self, source_id: str) -> tuple[bool, dict[str, Any]]: + """ + Delete a source and all associated crawled pages and code examples from the database. + + Args: + source_id: The source ID to delete + + Returns: + Tuple of (success, result_dict) + """ + try: + logger.info(f"Starting delete_source for source_id: {source_id}") + + # Delete from crawled_pages table + try: + logger.info(f"Deleting from crawled_pages table for source_id: {source_id}") + pages_response = ( + self.supabase_client.table("archon_crawled_pages") + .delete() + .eq("source_id", source_id) + .execute() + ) + pages_deleted = len(pages_response.data) if pages_response.data else 0 + logger.info(f"Deleted {pages_deleted} pages from crawled_pages") + except Exception as pages_error: + logger.error(f"Failed to delete from crawled_pages: {pages_error}") + return False, {"error": f"Failed to delete crawled pages: {str(pages_error)}"} + + # Delete from code_examples table + try: + logger.info(f"Deleting from code_examples table for source_id: {source_id}") + code_response = ( + self.supabase_client.table("archon_code_examples") + .delete() + .eq("source_id", source_id) + .execute() + ) + code_deleted = len(code_response.data) if code_response.data else 0 + logger.info(f"Deleted {code_deleted} code examples") + except Exception as code_error: + logger.error(f"Failed to delete from code_examples: {code_error}") + return False, {"error": f"Failed to delete code examples: {str(code_error)}"} + + # Delete from sources table + try: + logger.info(f"Deleting from sources table for source_id: {source_id}") + source_response = ( + self.supabase_client.table("archon_sources") + .delete() + .eq("source_id", source_id) + .execute() + ) + source_deleted = len(source_response.data) if source_response.data else 0 + logger.info(f"Deleted {source_deleted} source records") + except Exception as source_error: + logger.error(f"Failed to delete from sources: {source_error}") + return False, {"error": f"Failed to delete source: {str(source_error)}"} + + logger.info("Delete operation completed successfully") + return True, { + "source_id": source_id, + "pages_deleted": pages_deleted, + "code_examples_deleted": code_deleted, + "source_records_deleted": source_deleted, + } + + except Exception as e: + logger.error(f"Unexpected error in delete_source: {e}") + return False, {"error": f"Error deleting source: {str(e)}"} + + def update_source_metadata( + self, + source_id: str, + title: str = None, + summary: str = None, + word_count: int = None, + knowledge_type: str = None, + tags: list[str] = None, + ) -> tuple[bool, dict[str, Any]]: + """ + Update source metadata. + + Args: + source_id: The source ID to update + title: Optional new title + summary: Optional new summary + word_count: Optional new word count + knowledge_type: Optional new knowledge type + tags: Optional new tags list + + Returns: + Tuple of (success, result_dict) + """ + try: + # Build update data + update_data = {} + if title is not None: + update_data["title"] = title + if summary is not None: + update_data["summary"] = summary + if word_count is not None: + update_data["total_word_count"] = word_count + + # Handle metadata fields + if knowledge_type is not None or tags is not None: + # Get existing metadata + existing = ( + self.supabase_client.table("archon_sources") + .select("metadata") + .eq("source_id", source_id) + .execute() + ) + metadata = existing.data[0].get("metadata", {}) if existing.data else {} + + if knowledge_type is not None: + metadata["knowledge_type"] = knowledge_type + if tags is not None: + metadata["tags"] = tags + + update_data["metadata"] = metadata + + if not update_data: + return False, {"error": "No update data provided"} + + # Update the source + response = ( + self.supabase_client.table("archon_sources") + .update(update_data) + .eq("source_id", source_id) + .execute() + ) + + if response.data: + return True, {"source_id": source_id, "updated_fields": list(update_data.keys())} + else: + return False, {"error": f"Source with ID {source_id} not found"} + + except Exception as e: + logger.error(f"Error updating source metadata: {e}") + return False, {"error": f"Error updating source metadata: {str(e)}"} + + def create_source_info( + self, + source_id: str, + content_sample: str, + word_count: int = 0, + knowledge_type: str = "technical", + tags: list[str] = None, + update_frequency: int = 7, + ) -> tuple[bool, dict[str, Any]]: + """ + Create source information entry. + + Args: + source_id: The source ID + content_sample: Sample content for generating summary + word_count: Total word count for the source + knowledge_type: Type of knowledge (default: "technical") + tags: List of tags + update_frequency: Update frequency in days + + Returns: + Tuple of (success, result_dict) + """ + try: + if tags is None: + tags = [] + + # Generate source summary using the utility function + source_summary = extract_source_summary(source_id, content_sample) + + # Create the source info using the utility function + update_source_info( + self.supabase_client, + source_id, + source_summary, + word_count, + content_sample[:5000], + knowledge_type, + tags, + update_frequency, + ) + + return True, { + "source_id": source_id, + "summary": source_summary, + "word_count": word_count, + "knowledge_type": knowledge_type, + "tags": tags, + } + + except Exception as e: + logger.error(f"Error creating source info: {e}") + return False, {"error": f"Error creating source info: {str(e)}"} + + def get_source_details(self, source_id: str) -> tuple[bool, dict[str, Any]]: + """ + Get detailed information about a specific source. + + Args: + source_id: The source ID to look up + + Returns: + Tuple of (success, result_dict) + """ + try: + # Get source metadata + source_response = ( + self.supabase_client.table("archon_sources") + .select("*") + .eq("source_id", source_id) + .execute() + ) + + if not source_response.data: + return False, {"error": f"Source with ID {source_id} not found"} + + source_data = source_response.data[0] + + # Get page count + pages_response = ( + self.supabase_client.table("archon_crawled_pages") + .select("id") + .eq("source_id", source_id) + .execute() + ) + page_count = len(pages_response.data) if pages_response.data else 0 + + # Get code example count + code_response = ( + self.supabase_client.table("archon_code_examples") + .select("id") + .eq("source_id", source_id) + .execute() + ) + code_count = len(code_response.data) if code_response.data else 0 + + return True, { + "source": source_data, + "page_count": page_count, + "code_example_count": code_count, + } + + except Exception as e: + logger.error(f"Error getting source details: {e}") + return False, {"error": f"Error getting source details: {str(e)}"} + + def list_sources_by_type(self, knowledge_type: str = None) -> tuple[bool, dict[str, Any]]: + """ + List sources filtered by knowledge type. + + Args: + knowledge_type: Optional knowledge type filter + + Returns: + Tuple of (success, result_dict) + """ + try: + query = self.supabase_client.table("archon_sources").select("*") + + if knowledge_type: + # Filter by metadata->knowledge_type + query = query.filter("metadata->>knowledge_type", "eq", knowledge_type) + + response = query.execute() + + sources = [] + for row in response.data: + metadata = row.get("metadata", {}) + sources.append({ + "source_id": row["source_id"], + "title": row.get("title", ""), + "summary": row.get("summary", ""), + "knowledge_type": metadata.get("knowledge_type", ""), + "tags": metadata.get("tags", []), + "total_word_count": row.get("total_word_count", 0), + "created_at": row.get("created_at", ""), + "updated_at": row.get("updated_at", ""), + }) + + return True, { + "sources": sources, + "total_count": len(sources), + "knowledge_type_filter": knowledge_type, + } + + except Exception as e: + logger.error(f"Error listing sources by type: {e}") + return False, {"error": f"Error listing sources by type: {str(e)}"} diff --git a/python/src/server/services/storage/storage_services.py b/python/src/server/services/storage/storage_services.py index 928fb9ee..a2e935e0 100644 --- a/python/src/server/services/storage/storage_services.py +++ b/python/src/server/services/storage/storage_services.py @@ -1,277 +1,284 @@ -""" -Storage Services - -This module contains all storage service classes that handle document and data storage operations. -These services extend the base storage functionality with specific implementations. -""" - -from typing import Any - -from fastapi import WebSocket - -from ...config.logfire_config import get_logger, safe_span -from .base_storage_service import BaseStorageService -from .document_storage_service import add_documents_to_supabase - -logger = get_logger(__name__) - - -class DocumentStorageService(BaseStorageService): - """Service for handling document uploads with progress reporting.""" - - async def upload_document( - self, - file_content: str, - filename: str, - source_id: str, - knowledge_type: str = "documentation", - tags: list[str] | None = None, - websocket: WebSocket | None = None, - progress_callback: Any | None = None, - cancellation_check: Any | None = None, - ) -> tuple[bool, dict[str, Any]]: - """ - Upload and process a document file with progress reporting. - - Args: - file_content: Document content as text - filename: Name of the file - source_id: Source identifier - knowledge_type: Type of knowledge - tags: Optional list of tags - websocket: Optional WebSocket for progress - progress_callback: Optional callback for progress - - Returns: - Tuple of (success, result_dict) - """ - with safe_span( - "upload_document", - filename=filename, - source_id=source_id, - content_length=len(file_content), - ) as span: - try: - # Progress reporting helper - async def report_progress(message: str, percentage: int, batch_info: dict = None): - if websocket: - data = { - "type": "upload_progress", - "filename": filename, - "progress": percentage, - "message": message, - } - if batch_info: - data.update(batch_info) - await websocket.send_json(data) - if progress_callback: - await progress_callback(message, percentage, batch_info) - - await report_progress("Starting document processing...", 10) - - # Use base class chunking - chunks = await self.smart_chunk_text_async( - file_content, - chunk_size=5000, - progress_callback=lambda msg, pct: report_progress( - f"Chunking: {msg}", 10 + float(pct) * 0.2 - ), - ) - - if not chunks: - raise ValueError("No content could be extracted from the document") - - await report_progress("Preparing document chunks...", 30) - - # Prepare data for storage - doc_url = f"file://{filename}" - urls = [] - chunk_numbers = [] - contents = [] - metadatas = [] - total_word_count = 0 - - # Process chunks with metadata - for i, chunk in enumerate(chunks): - # Use base class metadata extraction - meta = self.extract_metadata( - chunk, - { - "chunk_index": i, - "url": doc_url, - "source": source_id, - "source_id": source_id, - "knowledge_type": knowledge_type, - "filename": filename, - }, - ) - - if tags: - meta["tags"] = tags - - urls.append(doc_url) - chunk_numbers.append(i) - contents.append(chunk) - metadatas.append(meta) - total_word_count += meta.get("word_count", 0) - - await report_progress("Updating source information...", 50) - - # Create URL to full document mapping - url_to_full_document = {doc_url: file_content} - - # Update source information - from ...utils import extract_source_summary, update_source_info - - source_summary = await self.threading_service.run_cpu_intensive( - extract_source_summary, source_id, file_content[:5000] - ) - - await self.threading_service.run_io_bound( - update_source_info, - self.supabase_client, - source_id, - source_summary, - total_word_count, - ) - - await report_progress("Storing document chunks...", 70) - - # Store documents - await add_documents_to_supabase( - client=self.supabase_client, - urls=urls, - chunk_numbers=chunk_numbers, - contents=contents, - metadatas=metadatas, - url_to_full_document=url_to_full_document, - batch_size=15, - progress_callback=progress_callback, - enable_parallel_batches=True, - provider=None, # Use configured provider - cancellation_check=cancellation_check, - ) - - await report_progress("Document upload completed!", 100) - - result = { - "chunks_stored": len(chunks), - "total_word_count": total_word_count, - "source_id": source_id, - "filename": filename, - } - - span.set_attribute("success", True) - span.set_attribute("chunks_stored", len(chunks)) - span.set_attribute("total_word_count", total_word_count) - - logger.info( - f"Document upload completed successfully: filename={filename}, chunks_stored={len(chunks)}, total_word_count={total_word_count}" - ) - - return True, result - - except Exception as e: - span.set_attribute("success", False) - span.set_attribute("error", str(e)) - logger.error(f"Error uploading document: {e}") - - if websocket: - await websocket.send_json({ - "type": "upload_error", - "error": str(e), - "filename": filename, - }) - - return False, {"error": f"Error uploading document: {str(e)}"} - - async def store_documents(self, documents: list[dict[str, Any]], **kwargs) -> dict[str, Any]: - """ - Store multiple documents. Implementation of abstract method. - - Args: - documents: List of documents to store - **kwargs: Additional options (websocket, progress_callback, etc.) - - Returns: - Storage result - """ - results = [] - for doc in documents: - success, result = await self.upload_document( - file_content=doc["content"], - filename=doc["filename"], - source_id=doc.get("source_id", "upload"), - knowledge_type=doc.get("knowledge_type", "documentation"), - tags=doc.get("tags"), - websocket=kwargs.get("websocket"), - progress_callback=kwargs.get("progress_callback"), - cancellation_check=kwargs.get("cancellation_check"), - ) - results.append(result) - - return { - "success": all(r.get("chunks_stored", 0) > 0 for r in results), - "documents_processed": len(documents), - "results": results, - } - - async def process_document(self, document: dict[str, Any], **kwargs) -> dict[str, Any]: - """ - Process a single document. Implementation of abstract method. - - Args: - document: Document to process - **kwargs: Additional processing options - - Returns: - Processed document with metadata - """ - # Extract text content - content = document.get("content", "") - - # Chunk the content - chunks = await self.smart_chunk_text_async(content) - - # Extract metadata for each chunk - processed_chunks = [] - for i, chunk in enumerate(chunks): - meta = self.extract_metadata( - chunk, {"chunk_index": i, "source": document.get("source", "unknown")} - ) - processed_chunks.append({"content": chunk, "metadata": meta}) - - return { - "chunks": processed_chunks, - "total_chunks": len(chunks), - "source": document.get("source"), - } - - def store_code_examples( - self, code_examples: list[dict[str, Any]] - ) -> tuple[bool, dict[str, Any]]: - """ - Store code examples. This is kept for backward compatibility. - The actual implementation should use add_code_examples_to_supabase directly. - - Args: - code_examples: List of code examples - - Returns: - Tuple of (success, result) - """ - try: - if not code_examples: - return True, {"code_examples_stored": 0} - - # This method exists for backward compatibility - # The actual storage should be done through the proper service functions - logger.warning( - "store_code_examples is deprecated. Use add_code_examples_to_supabase directly." - ) - - return True, {"code_examples_stored": len(code_examples)} - - except Exception as e: - logger.error(f"Error in store_code_examples: {e}") - return False, {"error": str(e)} +""" +Storage Services + +This module contains all storage service classes that handle document and data storage operations. +These services extend the base storage functionality with specific implementations. +""" + +from typing import Any + +from fastapi import WebSocket + +from ...config.logfire_config import get_logger, safe_span +from .base_storage_service import BaseStorageService +from .document_storage_service import add_documents_to_supabase + +logger = get_logger(__name__) + + +class DocumentStorageService(BaseStorageService): + """Service for handling document uploads with progress reporting.""" + + async def upload_document( + self, + file_content: str, + filename: str, + source_id: str, + knowledge_type: str = "documentation", + tags: list[str] | None = None, + websocket: WebSocket | None = None, + progress_callback: Any | None = None, + cancellation_check: Any | None = None, + ) -> tuple[bool, dict[str, Any]]: + """ + Upload and process a document file with progress reporting. + + Args: + file_content: Document content as text + filename: Name of the file + source_id: Source identifier + knowledge_type: Type of knowledge + tags: Optional list of tags + websocket: Optional WebSocket for progress + progress_callback: Optional callback for progress + + Returns: + Tuple of (success, result_dict) + """ + logger.info(f"Document upload starting: {filename} as {knowledge_type} knowledge") + + with safe_span( + "upload_document", + filename=filename, + source_id=source_id, + content_length=len(file_content), + ) as span: + try: + # Progress reporting helper + async def report_progress(message: str, percentage: int, batch_info: dict = None): + if websocket: + data = { + "type": "upload_progress", + "filename": filename, + "progress": percentage, + "message": message, + } + if batch_info: + data.update(batch_info) + await websocket.send_json(data) + if progress_callback: + await progress_callback(message, percentage, batch_info) + + await report_progress("Starting document processing...", 10) + + # Use base class chunking + chunks = await self.smart_chunk_text_async( + file_content, + chunk_size=5000, + progress_callback=lambda msg, pct: report_progress( + f"Chunking: {msg}", 10 + float(pct) * 0.2 + ), + ) + + if not chunks: + raise ValueError("No content could be extracted from the document") + + await report_progress("Preparing document chunks...", 30) + + # Prepare data for storage + doc_url = f"file://{filename}" + urls = [] + chunk_numbers = [] + contents = [] + metadatas = [] + total_word_count = 0 + + # Process chunks with metadata + for i, chunk in enumerate(chunks): + # Use base class metadata extraction + meta = self.extract_metadata( + chunk, + { + "chunk_index": i, + "url": doc_url, + "source": source_id, + "source_id": source_id, + "knowledge_type": knowledge_type, + "source_type": "file", # FIX: Mark as file upload + "filename": filename, + }, + ) + + if tags: + meta["tags"] = tags + + urls.append(doc_url) + chunk_numbers.append(i) + contents.append(chunk) + metadatas.append(meta) + total_word_count += meta.get("word_count", 0) + + await report_progress("Updating source information...", 50) + + # Create URL to full document mapping + url_to_full_document = {doc_url: file_content} + + # Update source information + from ...utils import extract_source_summary, update_source_info + + source_summary = await self.threading_service.run_cpu_intensive( + extract_source_summary, source_id, file_content[:5000] + ) + + logger.info(f"Updating source info for {source_id} with knowledge_type={knowledge_type}") + await self.threading_service.run_io_bound( + update_source_info, + self.supabase_client, + source_id, + source_summary, + total_word_count, + file_content[:1000], # content for title generation + knowledge_type, # FIX: Pass knowledge_type parameter! + tags, # FIX: Pass tags parameter! + ) + + await report_progress("Storing document chunks...", 70) + + # Store documents + await add_documents_to_supabase( + client=self.supabase_client, + urls=urls, + chunk_numbers=chunk_numbers, + contents=contents, + metadatas=metadatas, + url_to_full_document=url_to_full_document, + batch_size=15, + progress_callback=progress_callback, + enable_parallel_batches=True, + provider=None, # Use configured provider + cancellation_check=cancellation_check, + ) + + await report_progress("Document upload completed!", 100) + + result = { + "chunks_stored": len(chunks), + "total_word_count": total_word_count, + "source_id": source_id, + "filename": filename, + } + + span.set_attribute("success", True) + span.set_attribute("chunks_stored", len(chunks)) + span.set_attribute("total_word_count", total_word_count) + + logger.info( + f"Document upload completed successfully: filename={filename}, chunks_stored={len(chunks)}, total_word_count={total_word_count}" + ) + + return True, result + + except Exception as e: + span.set_attribute("success", False) + span.set_attribute("error", str(e)) + logger.error(f"Error uploading document: {e}") + + if websocket: + await websocket.send_json({ + "type": "upload_error", + "error": str(e), + "filename": filename, + }) + + return False, {"error": f"Error uploading document: {str(e)}"} + + async def store_documents(self, documents: list[dict[str, Any]], **kwargs) -> dict[str, Any]: + """ + Store multiple documents. Implementation of abstract method. + + Args: + documents: List of documents to store + **kwargs: Additional options (websocket, progress_callback, etc.) + + Returns: + Storage result + """ + results = [] + for doc in documents: + success, result = await self.upload_document( + file_content=doc["content"], + filename=doc["filename"], + source_id=doc.get("source_id", "upload"), + knowledge_type=doc.get("knowledge_type", "documentation"), + tags=doc.get("tags"), + websocket=kwargs.get("websocket"), + progress_callback=kwargs.get("progress_callback"), + cancellation_check=kwargs.get("cancellation_check"), + ) + results.append(result) + + return { + "success": all(r.get("chunks_stored", 0) > 0 for r in results), + "documents_processed": len(documents), + "results": results, + } + + async def process_document(self, document: dict[str, Any], **kwargs) -> dict[str, Any]: + """ + Process a single document. Implementation of abstract method. + + Args: + document: Document to process + **kwargs: Additional processing options + + Returns: + Processed document with metadata + """ + # Extract text content + content = document.get("content", "") + + # Chunk the content + chunks = await self.smart_chunk_text_async(content) + + # Extract metadata for each chunk + processed_chunks = [] + for i, chunk in enumerate(chunks): + meta = self.extract_metadata( + chunk, {"chunk_index": i, "source": document.get("source", "unknown")} + ) + processed_chunks.append({"content": chunk, "metadata": meta}) + + return { + "chunks": processed_chunks, + "total_chunks": len(chunks), + "source": document.get("source"), + } + + def store_code_examples( + self, code_examples: list[dict[str, Any]] + ) -> tuple[bool, dict[str, Any]]: + """ + Store code examples. This is kept for backward compatibility. + The actual implementation should use add_code_examples_to_supabase directly. + + Args: + code_examples: List of code examples + + Returns: + Tuple of (success, result) + """ + try: + if not code_examples: + return True, {"code_examples_stored": 0} + + # This method exists for backward compatibility + # The actual storage should be done through the proper service functions + logger.warning( + "store_code_examples is deprecated. Use add_code_examples_to_supabase directly." + ) + + return True, {"code_examples_stored": len(code_examples)} + + except Exception as e: + logger.error(f"Error in store_code_examples: {e}") + return False, {"error": str(e)}