diff --git a/CLAUDE.md b/CLAUDE.md index 77673db7..10441ac2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -268,6 +268,8 @@ When connected to Claude/Cursor/Windsurf, the following tools are available: - `archon:rag_search_knowledge_base` - Search knowledge base for relevant content - `archon:rag_search_code_examples` - Find code snippets in the knowledge base - `archon:rag_get_available_sources` - List available knowledge sources +- `archon:rag_list_pages_for_source` - List all pages for a given source (browse documentation structure) +- `archon:rag_read_full_page` - Retrieve full page content by page_id or URL ### Project Management diff --git a/migration/0.1.0/010_add_page_metadata_table.sql b/migration/0.1.0/010_add_page_metadata_table.sql new file mode 100644 index 00000000..bb3abdec --- /dev/null +++ b/migration/0.1.0/010_add_page_metadata_table.sql @@ -0,0 +1,74 @@ +-- ===================================================== +-- Add archon_page_metadata table for page-based RAG retrieval +-- ===================================================== +-- This migration adds support for storing complete documentation pages +-- alongside chunks for improved agent context retrieval. +-- +-- Features: +-- - Full page content storage with metadata +-- - Support for llms-full.txt section-based pages +-- - Foreign key relationship from chunks to pages +-- ===================================================== + +-- Create archon_page_metadata table +CREATE TABLE IF NOT EXISTS archon_page_metadata ( + -- Primary identification + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source_id TEXT NOT NULL, + url TEXT NOT NULL, + + -- Content + full_content TEXT NOT NULL, + + -- Section metadata (for llms-full.txt H1 sections) + section_title TEXT, + section_order INT DEFAULT 0, + + -- Statistics + word_count INT NOT NULL, + char_count INT NOT NULL, + chunk_count INT NOT NULL DEFAULT 0, + + -- Timestamps + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + -- Flexible metadata storage + metadata JSONB DEFAULT '{}'::jsonb, + + -- Constraints + CONSTRAINT archon_page_metadata_url_unique UNIQUE(url), + CONSTRAINT archon_page_metadata_source_fk FOREIGN KEY (source_id) + REFERENCES archon_sources(source_id) ON DELETE CASCADE +); + +-- Add page_id foreign key to archon_crawled_pages +-- This links chunks back to their parent page +-- NULLABLE because existing chunks won't have a page_id yet +ALTER TABLE archon_crawled_pages +ADD COLUMN IF NOT EXISTS page_id UUID REFERENCES archon_page_metadata(id) ON DELETE SET NULL; + +-- Create indexes for query performance +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_source_id ON archon_page_metadata(source_id); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_url ON archon_page_metadata(url); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_section ON archon_page_metadata(source_id, section_title, section_order); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_created_at ON archon_page_metadata(created_at); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_metadata ON archon_page_metadata USING GIN(metadata); +CREATE INDEX IF NOT EXISTS idx_archon_crawled_pages_page_id ON archon_crawled_pages(page_id); + +-- Add comments to document the table structure +COMMENT ON TABLE archon_page_metadata IS 'Stores complete documentation pages for agent retrieval'; +COMMENT ON COLUMN archon_page_metadata.source_id IS 'References the source this page belongs to'; +COMMENT ON COLUMN archon_page_metadata.url IS 'Unique URL of the page (synthetic for llms-full.txt sections with #anchor)'; +COMMENT ON COLUMN archon_page_metadata.full_content IS 'Complete markdown/text content of the page'; +COMMENT ON COLUMN archon_page_metadata.section_title IS 'H1 section title for llms-full.txt pages'; +COMMENT ON COLUMN archon_page_metadata.section_order IS 'Order of section in llms-full.txt file (0-based)'; +COMMENT ON COLUMN archon_page_metadata.word_count IS 'Number of words in full_content'; +COMMENT ON COLUMN archon_page_metadata.char_count IS 'Number of characters in full_content'; +COMMENT ON COLUMN archon_page_metadata.chunk_count IS 'Number of chunks created from this page'; +COMMENT ON COLUMN archon_page_metadata.metadata IS 'Flexible JSON metadata (page_type, knowledge_type, tags, etc)'; +COMMENT ON COLUMN archon_crawled_pages.page_id IS 'Foreign key linking chunk to parent page'; + +-- ===================================================== +-- MIGRATION COMPLETE +-- ===================================================== diff --git a/python/src/mcp_server/features/rag/rag_tools.py b/python/src/mcp_server/features/rag/rag_tools.py index e171865c..85cadc29 100644 --- a/python/src/mcp_server/features/rag/rag_tools.py +++ b/python/src/mcp_server/features/rag/rag_tools.py @@ -77,7 +77,11 @@ def register_rag_tools(mcp: FastMCP): @mcp.tool() async def rag_search_knowledge_base( - ctx: Context, query: str, source_id: str | None = None, match_count: int = 5 + ctx: Context, + query: str, + source_id: str | None = None, + match_count: int = 5, + return_mode: str = "pages" ) -> str: """ Search knowledge base for relevant content using RAG. @@ -90,20 +94,31 @@ def register_rag_tools(mcp: FastMCP): This is the 'id' field from available sources, NOT a URL or domain name. Example: "src_1234abcd" not "docs.anthropic.com" match_count: Max results (default: 5) + return_mode: "pages" (default, full pages with metadata) or "chunks" (raw text chunks) Returns: JSON string with structure: - success: bool - Operation success status - - results: list[dict] - Array of matching documents with content and metadata + - results: list[dict] - Array of pages/chunks with content and metadata + Pages include: page_id, url, title, preview, word_count, chunk_matches + Chunks include: content, metadata, similarity + - return_mode: str - Mode used ("pages" or "chunks") - reranked: bool - Whether results were reranked - error: str|null - Error description if success=false + + Note: Use "pages" mode for better context (recommended), or "chunks" for raw granular results. + After getting pages, use rag_read_full_page() to retrieve complete page content. """ try: api_url = get_api_url() timeout = httpx.Timeout(30.0, connect=5.0) async with httpx.AsyncClient(timeout=timeout) as client: - request_data = {"query": query, "match_count": match_count} + request_data = { + "query": query, + "match_count": match_count, + "return_mode": return_mode + } if source_id: request_data["source"] = source_id @@ -115,6 +130,7 @@ def register_rag_tools(mcp: FastMCP): { "success": True, "results": result.get("results", []), + "return_mode": result.get("return_mode", return_mode), "reranked": result.get("reranked", False), "error": None, }, @@ -198,5 +214,148 @@ def register_rag_tools(mcp: FastMCP): logger.error(f"Error searching code examples: {e}") return json.dumps({"success": False, "results": [], "error": str(e)}, indent=2) + @mcp.tool() + async def rag_list_pages_for_source( + ctx: Context, source_id: str, section: str | None = None + ) -> str: + """ + List all pages for a given knowledge source. + + Use this after rag_get_available_sources() to see all pages in a source. + Useful for browsing documentation structure or finding specific pages. + + Args: + source_id: Source ID from rag_get_available_sources() (e.g., "src_1234abcd") + section: Optional filter for llms-full.txt section title (e.g., "# Core Concepts") + + Returns: + JSON string with structure: + - success: bool - Operation success status + - pages: list[dict] - Array of page objects with id, url, section_title, word_count + - total: int - Total number of pages + - source_id: str - The source ID that was queried + - error: str|null - Error description if success=false + + Example workflow: + 1. Call rag_get_available_sources() to get source_id + 2. Call rag_list_pages_for_source(source_id) to see all pages + 3. Call rag_read_full_page(page_id) to read specific pages + """ + try: + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + params = {"source_id": source_id} + if section: + params["section"] = section + + response = await client.get( + urljoin(api_url, "/api/pages"), + params=params + ) + + if response.status_code == 200: + result = response.json() + return json.dumps( + { + "success": True, + "pages": result.get("pages", []), + "total": result.get("total", 0), + "source_id": result.get("source_id", source_id), + "error": None, + }, + indent=2, + ) + else: + error_detail = response.text + return json.dumps( + { + "success": False, + "pages": [], + "total": 0, + "source_id": source_id, + "error": f"HTTP {response.status_code}: {error_detail}", + }, + indent=2, + ) + + except Exception as e: + logger.error(f"Error listing pages for source {source_id}: {e}") + return json.dumps( + { + "success": False, + "pages": [], + "total": 0, + "source_id": source_id, + "error": str(e) + }, + indent=2 + ) + + @mcp.tool() + async def rag_read_full_page( + ctx: Context, page_id: str | None = None, url: str | None = None + ) -> str: + """ + Retrieve full page content from knowledge base. + Use this to get complete page content after RAG search. + + Args: + page_id: Page UUID from search results (e.g., "550e8400-e29b-41d4-a716-446655440000") + url: Page URL (e.g., "https://docs.example.com/getting-started") + + Note: Provide EITHER page_id OR url, not both. + + Returns: + JSON string with structure: + - success: bool + - page: dict with full_content, title, url, metadata + - error: str|null + """ + try: + if not page_id and not url: + return json.dumps( + {"success": False, "error": "Must provide either page_id or url"}, + indent=2 + ) + + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + if page_id: + response = await client.get(urljoin(api_url, f"/api/pages/{page_id}")) + else: + response = await client.get( + urljoin(api_url, "/api/pages/by-url"), + params={"url": url} + ) + + if response.status_code == 200: + page_data = response.json() + return json.dumps( + { + "success": True, + "page": page_data, + "error": None, + }, + indent=2, + ) + else: + error_detail = response.text + return json.dumps( + { + "success": False, + "page": None, + "error": f"HTTP {response.status_code}: {error_detail}", + }, + indent=2, + ) + + except Exception as e: + logger.error(f"Error reading page: {e}") + return json.dumps({"success": False, "page": None, "error": str(e)}, indent=2) + # Log successful registration logger.info("✓ RAG tools registered (HTTP-based version)") diff --git a/python/src/server/api_routes/knowledge_api.py b/python/src/server/api_routes/knowledge_api.py index 47a3d9db..052f7521 100644 --- a/python/src/server/api_routes/knowledge_api.py +++ b/python/src/server/api_routes/knowledge_api.py @@ -84,39 +84,39 @@ async def _validate_provider_api_key(provider: str = None) -> None: safe_provider = provider[:20] # Limit length logger.info(f"🔑 Testing {safe_provider.title()} API key with minimal embedding request...") - try: - # Test API key with minimal embedding request using provider-scoped configuration - from ..services.embeddings.embedding_service import create_embedding - - test_result = await create_embedding(text="test", provider=provider) - - if not test_result: - logger.error( - f"❌ {provider.title()} API key validation failed - no embedding returned" - ) - raise HTTPException( - status_code=401, - detail={ - "error": f"Invalid {provider.title()} API key", - "message": f"Please verify your {provider.title()} API key in Settings.", - "error_type": "authentication_failed", - "provider": provider, - }, - ) - except Exception as e: - logger.error( - f"❌ {provider.title()} API key validation failed: {e}", - exc_info=True, - ) - raise HTTPException( - status_code=401, - detail={ - "error": f"Invalid {provider.title()} API key", - "message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}", - "error_type": "authentication_failed", - "provider": provider, - }, - ) + try: + # Test API key with minimal embedding request using provider-scoped configuration + from ..services.embeddings.embedding_service import create_embedding + + test_result = await create_embedding(text="test", provider=provider) + + if not test_result: + logger.error( + f"❌ {provider.title()} API key validation failed - no embedding returned" + ) + raise HTTPException( + status_code=401, + detail={ + "error": f"Invalid {provider.title()} API key", + "message": f"Please verify your {provider.title()} API key in Settings.", + "error_type": "authentication_failed", + "provider": provider, + }, + ) + except Exception as e: + logger.error( + f"❌ {provider.title()} API key validation failed: {e}", + exc_info=True, + ) + raise HTTPException( + status_code=401, + detail={ + "error": f"Invalid {provider.title()} API key", + "message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}", + "error_type": "authentication_failed", + "provider": provider, + }, + ) logger.info(f"✅ {provider.title()} API key validation successful") @@ -177,6 +177,7 @@ class RagQueryRequest(BaseModel): query: str source: str | None = None match_count: int = 5 + return_mode: str = "chunks" # "chunks" or "pages" @router.get("/crawl-progress/{progress_id}") @@ -1116,10 +1117,13 @@ async def perform_rag_query(request: RagQueryRequest): raise HTTPException(status_code=422, detail="Query cannot be empty") try: - # Use RAGService for RAG query + # Use RAGService for unified RAG query with return_mode support search_service = RAGService(get_supabase_client()) success, result = await search_service.perform_rag_query( - query=request.query, source=request.source, match_count=request.match_count + query=request.query, + source=request.source, + match_count=request.match_count, + return_mode=request.return_mode ) if success: @@ -1288,7 +1292,7 @@ async def stop_crawl_task(progress_id: str): found = False # Step 1: Cancel the orchestration service - orchestration = await get_active_orchestration(progress_id) + orchestration = await get_active_orchestration(progress_id) if orchestration: orchestration.cancel() found = True @@ -1306,7 +1310,7 @@ async def stop_crawl_task(progress_id: str): found = True # Step 3: Remove from active orchestrations registry - await unregister_orchestration(progress_id) + await unregister_orchestration(progress_id) # Step 4: Update progress tracker to reflect cancellation (only if we found and cancelled something) if found: diff --git a/python/src/server/api_routes/pages_api.py b/python/src/server/api_routes/pages_api.py new file mode 100644 index 00000000..b6a20233 --- /dev/null +++ b/python/src/server/api_routes/pages_api.py @@ -0,0 +1,199 @@ +""" +Pages API Module + +This module handles page retrieval operations for RAG: +- List pages for a source +- Get page by ID +- Get page by URL +""" + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel + +from ..config.logfire_config import get_logger, safe_logfire_error +from ..utils import get_supabase_client + +# Get logger for this module +logger = get_logger(__name__) + +# Create router +router = APIRouter(prefix="/api", tags=["pages"]) + +# Maximum character count for returning full page content +MAX_PAGE_CHARS = 20_000 + + +class PageSummary(BaseModel): + """Summary model for page listings (no content)""" + + id: str + url: str + section_title: str | None = None + section_order: int = 0 + word_count: int + char_count: int + chunk_count: int + + +class PageResponse(BaseModel): + """Response model for a single page (with content)""" + + id: str + source_id: str + url: str + full_content: str + section_title: str | None = None + section_order: int = 0 + word_count: int + char_count: int + chunk_count: int + metadata: dict + created_at: str + updated_at: str + + +class PageListResponse(BaseModel): + """Response model for page listing""" + + pages: list[PageSummary] + total: int + source_id: str + + +def _handle_large_page_content(page_data: dict) -> dict: + """ + Replace full_content with a helpful message if page is too large for LLM context. + + Args: + page_data: Page data from database + + Returns: + Page data with full_content potentially replaced + """ + char_count = page_data.get("char_count", 0) + + if char_count > MAX_PAGE_CHARS: + page_data["full_content"] = ( + f"[Page too large for context - {char_count:,} characters]\n\n" + f"This page exceeds the {MAX_PAGE_CHARS:,} character limit for retrieval.\n\n" + f"To access content from this page, use a RAG search with return_mode='chunks' instead of 'pages'.\n" + f"This will retrieve specific relevant sections rather than the entire page.\n\n" + f"Page details:\n" + f"- URL: {page_data.get('url', 'N/A')}\n" + f"- Section: {page_data.get('section_title', 'N/A')}\n" + f"- Word count: {page_data.get('word_count', 0):,}\n" + f"- Character count: {char_count:,}\n" + f"- Available chunks: {page_data.get('chunk_count', 0)}" + ) + + return page_data + + +@router.get("/pages") +async def list_pages( + source_id: str = Query(..., description="Source ID to filter pages"), + section: str | None = Query(None, description="Filter by section title (for llms-full.txt)"), +): + """ + List all pages for a given source. + + Args: + source_id: The source ID to filter pages + section: Optional H1 section title for llms-full.txt sources + + Returns: + PageListResponse with list of pages and metadata + """ + try: + client = get_supabase_client() + + # Build query - select only summary fields (no full_content) + query = client.table("archon_page_metadata").select( + "id, url, section_title, section_order, word_count, char_count, chunk_count" + ).eq("source_id", source_id) + + # Add section filter if provided + if section: + query = query.eq("section_title", section) + + # Order by section_order and created_at + query = query.order("section_order").order("created_at") + + # Execute query + result = query.execute() + + # Use PageSummary (no content handling needed) + pages = [PageSummary(**page) for page in result.data] + + return PageListResponse(pages=pages, total=len(pages), source_id=source_id) + + except Exception as e: + logger.error(f"Error listing pages for source {source_id}: {e}", exc_info=True) + safe_logfire_error(f"Failed to list pages | source_id={source_id} | error={str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to list pages: {str(e)}") from e + + +@router.get("/pages/by-url") +async def get_page_by_url(url: str = Query(..., description="The URL of the page to retrieve")): + """ + Get a single page by its URL. + + This is useful for retrieving pages from RAG search results which return URLs. + + Args: + url: The complete URL of the page (including anchors for llms-full.txt sections) + + Returns: + PageResponse with complete page data + """ + try: + client = get_supabase_client() + + # Query by URL + result = client.table("archon_page_metadata").select("*").eq("url", url).single().execute() + + if not result.data: + raise HTTPException(status_code=404, detail=f"Page not found for URL: {url}") + + # Handle large pages + page_data = _handle_large_page_content(result.data.copy()) + return PageResponse(**page_data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting page by URL {url}: {e}", exc_info=True) + safe_logfire_error(f"Failed to get page by URL | url={url} | error={str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get page: {str(e)}") from e + + +@router.get("/pages/{page_id}") +async def get_page_by_id(page_id: str): + """ + Get a single page by its ID. + + Args: + page_id: The UUID of the page + + Returns: + PageResponse with complete page data + """ + try: + client = get_supabase_client() + + # Query by ID + result = client.table("archon_page_metadata").select("*").eq("id", page_id).single().execute() + + if not result.data: + raise HTTPException(status_code=404, detail=f"Page not found: {page_id}") + + # Handle large pages + page_data = _handle_large_page_content(result.data.copy()) + return PageResponse(**page_data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting page {page_id}: {e}", exc_info=True) + safe_logfire_error(f"Failed to get page | page_id={page_id} | error={str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get page: {str(e)}") from e diff --git a/python/src/server/main.py b/python/src/server/main.py index 19456e06..bd23dfa1 100644 --- a/python/src/server/main.py +++ b/python/src/server/main.py @@ -25,6 +25,7 @@ from .api_routes.knowledge_api import router as knowledge_router from .api_routes.mcp_api import router as mcp_router from .api_routes.migration_api import router as migration_router from .api_routes.ollama_api import router as ollama_router +from .api_routes.pages_api import router as pages_router from .api_routes.progress_api import router as progress_router from .api_routes.projects_api import router as projects_router from .api_routes.providers_api import router as providers_router @@ -183,6 +184,7 @@ app.include_router(settings_router) app.include_router(mcp_router) # app.include_router(mcp_client_router) # Removed - not part of new architecture app.include_router(knowledge_router) +app.include_router(pages_router) app.include_router(ollama_router) app.include_router(projects_router) app.include_router(progress_router) diff --git a/python/src/server/services/crawling/crawling_service.py b/python/src/server/services/crawling/crawling_service.py index 82a98c0c..7a68030e 100644 --- a/python/src/server/services/crawling/crawling_service.py +++ b/python/src/server/services/crawling/crawling_service.py @@ -19,6 +19,7 @@ from ..credential_service import credential_service # Import strategies # Import operations from .document_storage_operations import DocumentStorageOperations +from .page_storage_operations import PageStorageOperations from .helpers.site_config import SiteConfig # Import helpers @@ -98,6 +99,7 @@ class CrawlingService: # Initialize operations self.doc_storage_ops = DocumentStorageOperations(self.supabase_client) + self.page_storage_ops = PageStorageOperations(self.supabase_client) # Track progress state across all stages to prevent UI resets self.progress_state = {"progressId": self.progress_id} if self.progress_id else {} @@ -431,6 +433,7 @@ class CrawlingService: self._check_cancellation, source_url=url, source_display_name=source_display_name, + url_to_page_id=None, # Will be populated after page storage ) # Update progress tracker with source_id now that it's created diff --git a/python/src/server/services/crawling/document_storage_operations.py b/python/src/server/services/crawling/document_storage_operations.py index 8bfa4560..669a9f65 100644 --- a/python/src/server/services/crawling/document_storage_operations.py +++ b/python/src/server/services/crawling/document_storage_operations.py @@ -44,6 +44,7 @@ class DocumentStorageOperations: cancellation_check: Callable | None = None, source_url: str | None = None, source_display_name: str | None = None, + url_to_page_id: dict[str, str] | None = None, ) -> dict[str, Any]: """ Process crawled documents and store them in the database. @@ -128,7 +129,7 @@ class DocumentStorageOperations: all_chunk_numbers.append(i) all_contents.append(chunk) - # Create metadata for each chunk + # Create metadata for each chunk (page_id will be set later) word_count = len(chunk.split()) metadata = { "url": doc_url, @@ -136,6 +137,7 @@ class DocumentStorageOperations: "description": doc.get("description", ""), "source_id": source_id, "knowledge_type": request.get("knowledge_type", "documentation"), + "page_id": None, # Will be set after pages are stored "crawl_type": crawl_type, "word_count": word_count, "char_count": len(chunk), @@ -155,13 +157,101 @@ class DocumentStorageOperations: if doc_index > 0 and doc_index % 5 == 0: await asyncio.sleep(0) - # Create/update source record FIRST before storing documents + # Create/update source record FIRST (required for FK constraints on pages and chunks) if all_contents and all_metadatas: await self._create_source_records( all_metadatas, all_contents, source_word_counts, request, source_url, source_display_name ) + # Store pages AFTER source is created but BEFORE chunks (FK constraint requirement) + from .page_storage_operations import PageStorageOperations + page_storage_ops = PageStorageOperations(self.supabase_client) + + # Check if this is an llms-full.txt file + is_llms_full = crawl_type == "llms-txt" or ( + len(url_to_full_document) == 1 and + next(iter(url_to_full_document.keys())).endswith("llms-full.txt") + ) + + if is_llms_full and url_to_full_document: + # Handle llms-full.txt with section-based pages + base_url = next(iter(url_to_full_document.keys())) + content = url_to_full_document[base_url] + + # Store section pages + url_to_page_id = await page_storage_ops.store_llms_full_sections( + base_url, + content, + original_source_id, + request, + crawl_type="llms_full", + ) + + # Parse sections and re-chunk each section + from .helpers.llms_full_parser import parse_llms_full_sections + sections = parse_llms_full_sections(content, base_url) + + # Clear existing chunks and re-create from sections + all_urls.clear() + all_chunk_numbers.clear() + all_contents.clear() + all_metadatas.clear() + url_to_full_document.clear() + + # Chunk each section separately + for section in sections: + # Update url_to_full_document with section content + url_to_full_document[section.url] = section.content + section_chunks = await storage_service.smart_chunk_text_async( + section.content, chunk_size=5000 + ) + + for i, chunk in enumerate(section_chunks): + all_urls.append(section.url) + all_chunk_numbers.append(i) + all_contents.append(chunk) + + word_count = len(chunk.split()) + metadata = { + "url": section.url, + "title": section.section_title, + "description": "", + "source_id": original_source_id, + "knowledge_type": request.get("knowledge_type", "documentation"), + "page_id": url_to_page_id.get(section.url), + "crawl_type": "llms_full", + "word_count": word_count, + "char_count": len(chunk), + "chunk_index": i, + "tags": request.get("tags", []), + } + all_metadatas.append(metadata) + else: + # Handle regular pages + reconstructed_crawl_results = [] + for url, markdown in url_to_full_document.items(): + reconstructed_crawl_results.append({ + "url": url, + "markdown": markdown, + }) + + if reconstructed_crawl_results: + url_to_page_id = await page_storage_ops.store_pages( + reconstructed_crawl_results, + original_source_id, + request, + crawl_type, + ) + else: + url_to_page_id = {} + + # Update all chunk metadata with correct page_id + for metadata in all_metadatas: + chunk_url = metadata.get("url") + if chunk_url and chunk_url in url_to_page_id: + metadata["page_id"] = url_to_page_id[chunk_url] + safe_logfire_info(f"url_to_full_document keys: {list(url_to_full_document.keys())[:5]}") # Log chunking results @@ -183,6 +273,7 @@ class DocumentStorageOperations: enable_parallel_batches=True, # Enable parallel processing provider=None, # Use configured provider cancellation_check=cancellation_check, # Pass cancellation check + url_to_page_id=url_to_page_id, # Link chunks to pages ) # Calculate chunk counts diff --git a/python/src/server/services/crawling/helpers/llms_full_parser.py b/python/src/server/services/crawling/helpers/llms_full_parser.py new file mode 100644 index 00000000..8af3c371 --- /dev/null +++ b/python/src/server/services/crawling/helpers/llms_full_parser.py @@ -0,0 +1,262 @@ +""" +LLMs-full.txt Section Parser + +Parses llms-full.txt files by splitting on H1 headers (# ) to create separate +"pages" for each section. Each section gets a synthetic URL with a slug anchor. +""" + +import re + +from pydantic import BaseModel + + +class LLMsFullSection(BaseModel): + """Parsed section from llms-full.txt file""" + + section_title: str # Raw H1 text: "# Core Concepts" + section_order: int # Position in document: 0, 1, 2, ... + content: str # Section content (including H1 header) + url: str # Synthetic URL: base.txt#core-concepts + word_count: int + + +def create_section_slug(h1_heading: str) -> str: + """ + Generate URL slug from H1 heading. + + Args: + h1_heading: H1 text like "# Core Concepts" or "# Getting Started" + + Returns: + Slug like "core-concepts" or "getting-started" + + Examples: + "# Core Concepts" -> "core-concepts" + "# API Reference" -> "api-reference" + "# Getting Started!" -> "getting-started" + """ + # Remove "# " prefix if present + slug_text = h1_heading.replace("# ", "").strip() + + # Convert to lowercase + slug = slug_text.lower() + + # Replace spaces with hyphens + slug = slug.replace(" ", "-") + + # Remove special characters (keep only alphanumeric and hyphens) + slug = re.sub(r"[^a-z0-9-]", "", slug) + + # Remove consecutive hyphens + slug = re.sub(r"-+", "-", slug) + + # Remove leading/trailing hyphens + slug = slug.strip("-") + + return slug + + +def create_section_url(base_url: str, h1_heading: str, section_order: int) -> str: + """ + Generate synthetic URL with slug anchor for a section. + + Args: + base_url: Base URL like "https://example.com/llms-full.txt" + h1_heading: H1 text like "# Core Concepts" + section_order: Section position (0-based) + + Returns: + Synthetic URL like "https://example.com/llms-full.txt#section-0-core-concepts" + """ + slug = create_section_slug(h1_heading) + return f"{base_url}#section-{section_order}-{slug}" + + +def parse_llms_full_sections(content: str, base_url: str) -> list[LLMsFullSection]: + """ + Split llms-full.txt content by H1 headers to create separate sections. + + Each H1 (lines starting with "# " but not "##") marks a new section. + Sections are given synthetic URLs with slug anchors. + + Args: + content: Full text content of llms-full.txt file + base_url: Base URL of the file (e.g., "https://example.com/llms-full.txt") + + Returns: + List of LLMsFullSection objects, one per H1 section + + Edge cases: + - No H1 headers: Returns single section with entire content + - Multiple consecutive H1s: Creates separate sections correctly + - Empty sections: Skipped (not included in results) + + Example: + Input content: + ''' + # Core Concepts + Claude is an AI assistant... + + # Getting Started + To get started... + ''' + + Returns: + [ + LLMsFullSection( + section_title="# Core Concepts", + section_order=0, + content="# Core Concepts\\nClaude is...", + url="https://example.com/llms-full.txt#core-concepts", + word_count=5 + ), + LLMsFullSection( + section_title="# Getting Started", + section_order=1, + content="# Getting Started\\nTo get started...", + url="https://example.com/llms-full.txt#getting-started", + word_count=4 + ) + ] + """ + lines = content.split("\n") + + # Pre-scan: mark which lines are inside code blocks + inside_code_block = set() + in_block = False + for i, line in enumerate(lines): + if line.strip().startswith("```"): + in_block = not in_block + if in_block: + inside_code_block.add(i) + + # Parse sections, ignoring H1 headers inside code blocks + sections: list[LLMsFullSection] = [] + current_h1: str | None = None + current_content: list[str] = [] + section_order = 0 + + for i, line in enumerate(lines): + # Detect H1 (starts with "# " but not "##") - but ONLY if not in code block + is_h1 = line.startswith("# ") and not line.startswith("## ") + if is_h1 and i not in inside_code_block: + # Save previous section if it exists + if current_h1 is not None: + section_text = "\n".join(current_content) + # Skip empty sections (only whitespace) + if section_text.strip(): + section_url = create_section_url(base_url, current_h1, section_order) + word_count = len(section_text.split()) + + sections.append( + LLMsFullSection( + section_title=current_h1, + section_order=section_order, + content=section_text, + url=section_url, + word_count=word_count, + ) + ) + section_order += 1 + + # Start new section + current_h1 = line + current_content = [line] + else: + # Only accumulate if we've seen an H1 + if current_h1 is not None: + current_content.append(line) + + # Save last section + if current_h1 is not None: + section_text = "\n".join(current_content) + if section_text.strip(): + section_url = create_section_url(base_url, current_h1, section_order) + word_count = len(section_text.split()) + sections.append( + LLMsFullSection( + section_title=current_h1, + section_order=section_order, + content=section_text, + url=section_url, + word_count=word_count, + ) + ) + + # Edge case: No H1 headers found, treat entire file as single page + if not sections and content.strip(): + sections.append( + LLMsFullSection( + section_title="Full Document", + section_order=0, + content=content, + url=base_url, # No anchor for single-page + word_count=len(content.split()), + ) + ) + + # Fix sections that were split inside code blocks - merge them with next section + if sections: + fixed_sections: list[LLMsFullSection] = [] + i = 0 + while i < len(sections): + current = sections[i] + + # Count ``` at start of lines only (proper code fences) + code_fence_count = sum( + 1 for line in current.content.split('\n') + if line.strip().startswith('```') + ) + + # If odd number, we're inside an unclosed code block - merge with next + while code_fence_count % 2 == 1 and i + 1 < len(sections): + next_section = sections[i + 1] + # Combine content + combined_content = current.content + "\n\n" + next_section.content + # Update current with combined content + current = LLMsFullSection( + section_title=current.section_title, + section_order=current.section_order, + content=combined_content, + url=current.url, + word_count=len(combined_content.split()), + ) + # Move to next section and recount ``` at start of lines + i += 1 + code_fence_count = sum( + 1 for line in current.content.split('\n') + if line.strip().startswith('```') + ) + + fixed_sections.append(current) + i += 1 + + sections = fixed_sections + + # Combine consecutive small sections (<200 chars) together + if sections: + combined_sections: list[LLMsFullSection] = [] + i = 0 + while i < len(sections): + current = sections[i] + combined_content = current.content + + # Keep combining while current is small and there are more sections + while len(combined_content) < 200 and i + 1 < len(sections): + i += 1 + combined_content = combined_content + "\n\n" + sections[i].content + + # Create combined section with first section's metadata + combined = LLMsFullSection( + section_title=current.section_title, + section_order=current.section_order, + content=combined_content, + url=current.url, + word_count=len(combined_content.split()), + ) + combined_sections.append(combined) + i += 1 + + sections = combined_sections + + return sections diff --git a/python/src/server/services/crawling/page_storage_operations.py b/python/src/server/services/crawling/page_storage_operations.py new file mode 100644 index 00000000..a0e8a331 --- /dev/null +++ b/python/src/server/services/crawling/page_storage_operations.py @@ -0,0 +1,248 @@ +""" +Page Storage Operations + +Handles the storage of complete documentation pages in the archon_page_metadata table. +Pages are stored BEFORE chunking to maintain full context for agent retrieval. +""" + +from typing import Any + +from postgrest.exceptions import APIError + +from ...config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info +from .helpers.llms_full_parser import parse_llms_full_sections + +logger = get_logger(__name__) + + +class PageStorageOperations: + """ + Handles page storage operations for crawled content. + + Pages are stored in the archon_page_metadata table with full content and metadata. + This enables agents to retrieve complete documentation pages instead of just chunks. + """ + + def __init__(self, supabase_client): + """ + Initialize page storage operations. + + Args: + supabase_client: The Supabase client for database operations + """ + self.supabase_client = supabase_client + + async def store_pages( + self, + crawl_results: list[dict], + source_id: str, + request: dict[str, Any], + crawl_type: str, + ) -> dict[str, str]: + """ + Store pages in archon_page_metadata table from regular crawl results. + + Args: + crawl_results: List of crawled documents with url, markdown, title, etc. + source_id: The source ID these pages belong to + request: The original crawl request with knowledge_type, tags, etc. + crawl_type: Type of crawl performed (sitemap, url, link_collection, etc.) + + Returns: + {url: page_id} mapping for FK references in chunks + """ + safe_logfire_info( + f"store_pages called | source_id={source_id} | crawl_type={crawl_type} | num_results={len(crawl_results)}" + ) + + url_to_page_id: dict[str, str] = {} + pages_to_insert: list[dict[str, Any]] = [] + + for doc in crawl_results: + url = doc.get("url", "").strip() + markdown = doc.get("markdown", "").strip() + + # Skip documents with empty content or missing URLs + if not url or not markdown: + continue + + # Prepare page record + word_count = len(markdown.split()) + char_count = len(markdown) + + page_record = { + "source_id": source_id, + "url": url, + "full_content": markdown, + "section_title": None, # Regular page, not a section + "section_order": 0, + "word_count": word_count, + "char_count": char_count, + "chunk_count": 0, # Will be updated after chunking + "metadata": { + "knowledge_type": request.get("knowledge_type", "documentation"), + "crawl_type": crawl_type, + "page_type": "documentation", + "tags": request.get("tags", []), + }, + } + pages_to_insert.append(page_record) + + # Batch upsert pages + if pages_to_insert: + try: + safe_logfire_info( + f"Upserting {len(pages_to_insert)} pages into archon_page_metadata table" + ) + result = ( + self.supabase_client.table("archon_page_metadata") + .upsert(pages_to_insert, on_conflict="url") + .execute() + ) + + # Build url → page_id mapping + for page in result.data: + url_to_page_id[page["url"]] = page["id"] + + safe_logfire_info( + f"Successfully stored {len(url_to_page_id)}/{len(pages_to_insert)} pages in archon_page_metadata" + ) + + except APIError as e: + safe_logfire_error( + f"Database error upserting pages | source_id={source_id} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Failed to upsert pages for source {source_id}: {e}", exc_info=True) + # Don't raise - allow chunking to continue even if page storage fails + + except Exception as e: + safe_logfire_error( + f"Unexpected error upserting pages | source_id={source_id} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Unexpected error upserting pages for source {source_id}: {e}", exc_info=True) + # Don't raise - allow chunking to continue + + return url_to_page_id + + async def store_llms_full_sections( + self, + base_url: str, + content: str, + source_id: str, + request: dict[str, Any], + crawl_type: str = "llms_full", + ) -> dict[str, str]: + """ + Store llms-full.txt sections as separate pages. + + Each H1 section gets its own page record with a synthetic URL. + + Args: + base_url: Base URL of the llms-full.txt file + content: Full text content of the file + source_id: The source ID these sections belong to + request: The original crawl request + crawl_type: Type of crawl (defaults to "llms_full") + + Returns: + {url: page_id} mapping for FK references in chunks + """ + url_to_page_id: dict[str, str] = {} + + # Parse sections from content + sections = parse_llms_full_sections(content, base_url) + + if not sections: + logger.warning(f"No sections found in llms-full.txt file: {base_url}") + return url_to_page_id + + safe_logfire_info( + f"Parsed {len(sections)} sections from llms-full.txt file: {base_url}" + ) + + # Prepare page records for each section + pages_to_insert: list[dict[str, Any]] = [] + + for section in sections: + page_record = { + "source_id": source_id, + "url": section.url, + "full_content": section.content, + "section_title": section.section_title, + "section_order": section.section_order, + "word_count": section.word_count, + "char_count": len(section.content), + "chunk_count": 0, # Will be updated after chunking + "metadata": { + "knowledge_type": request.get("knowledge_type", "documentation"), + "crawl_type": crawl_type, + "page_type": "llms_full_section", + "tags": request.get("tags", []), + "section_metadata": { + "section_title": section.section_title, + "section_order": section.section_order, + "base_url": base_url, + }, + }, + } + pages_to_insert.append(page_record) + + # Batch upsert pages + if pages_to_insert: + try: + safe_logfire_info( + f"Upserting {len(pages_to_insert)} section pages into archon_page_metadata" + ) + result = ( + self.supabase_client.table("archon_page_metadata") + .upsert(pages_to_insert, on_conflict="url") + .execute() + ) + + # Build url → page_id mapping + for page in result.data: + url_to_page_id[page["url"]] = page["id"] + + safe_logfire_info( + f"Successfully stored {len(url_to_page_id)}/{len(pages_to_insert)} section pages" + ) + + except APIError as e: + safe_logfire_error( + f"Database error upserting sections | base_url={base_url} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Failed to upsert sections for {base_url}: {e}", exc_info=True) + # Don't raise - allow process to continue + + except Exception as e: + safe_logfire_error( + f"Unexpected error upserting sections | base_url={base_url} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Unexpected error upserting sections for {base_url}: {e}", exc_info=True) + # Don't raise - allow process to continue + + return url_to_page_id + + async def update_page_chunk_count(self, page_id: str, chunk_count: int) -> None: + """ + Update the chunk_count field for a page after chunking is complete. + + Args: + page_id: The UUID of the page to update + chunk_count: Number of chunks created from this page + """ + try: + self.supabase_client.table("archon_page_metadata").update( + {"chunk_count": chunk_count} + ).eq("id", page_id).execute() + + safe_logfire_info(f"Updated chunk_count={chunk_count} for page_id={page_id}") + + except APIError as e: + logger.warning( + f"Database error updating chunk_count for page {page_id}: {e}", exc_info=True + ) + except Exception as e: + logger.warning( + f"Unexpected error updating chunk_count for page {page_id}: {e}", exc_info=True + ) diff --git a/python/src/server/services/crawling/strategies/batch.py b/python/src/server/services/crawling/strategies/batch.py index 1457fdca..6a318879 100644 --- a/python/src/server/services/crawling/strategies/batch.py +++ b/python/src/server/services/crawling/strategies/batch.py @@ -234,10 +234,24 @@ class BatchCrawlStrategy: if result.success and result.markdown and result.markdown.fit_markdown: # Map back to original URL original_url = url_mapping.get(result.url, result.url) + + # Extract title from HTML