From bfd0a84f64d50cdd524e480aee2558119fe3da69 Mon Sep 17 00:00:00 2001 From: Cole Medin Date: Thu, 9 Oct 2025 20:39:27 -0400 Subject: [PATCH] RAG Enhancements (Page Level Retrieval) (#767) * Initial commit for RAG by document * Phase 2 * Adding migrations * Fixing page IDs for chunk metadata * Fixing unit tests, adding tool to list pages for source * Fixing page storage upsert issues * Max file length for retrieval * Fixing title issue * Fixing tests --- CLAUDE.md | 2 + .../0.1.0/010_add_page_metadata_table.sql | 74 +++++ .../src/mcp_server/features/rag/rag_tools.py | 165 ++++++++++- python/src/server/api_routes/knowledge_api.py | 78 +++--- python/src/server/api_routes/pages_api.py | 199 +++++++++++++ python/src/server/main.py | 2 + .../services/crawling/crawling_service.py | 3 + .../crawling/document_storage_operations.py | 95 ++++++- .../crawling/helpers/llms_full_parser.py | 262 ++++++++++++++++++ .../crawling/page_storage_operations.py | 248 +++++++++++++++++ .../services/crawling/strategies/batch.py | 14 + .../crawling/strategies/single_page.py | 14 +- .../src/server/services/search/rag_service.py | 101 ++++++- .../services/source_management_service.py | 18 +- .../services/storage/base_storage_service.py | 17 ++ .../storage/document_storage_service.py | 5 + .../server/services/test_llms_full_parser.py | 195 +++++++++++++ python/tests/test_source_race_condition.py | 10 +- 18 files changed, 1437 insertions(+), 65 deletions(-) create mode 100644 migration/0.1.0/010_add_page_metadata_table.sql create mode 100644 python/src/server/api_routes/pages_api.py create mode 100644 python/src/server/services/crawling/helpers/llms_full_parser.py create mode 100644 python/src/server/services/crawling/page_storage_operations.py create mode 100644 python/tests/server/services/test_llms_full_parser.py diff --git a/CLAUDE.md b/CLAUDE.md index 77673db7..10441ac2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -268,6 +268,8 @@ When connected to Claude/Cursor/Windsurf, the following tools are available: - `archon:rag_search_knowledge_base` - Search knowledge base for relevant content - `archon:rag_search_code_examples` - Find code snippets in the knowledge base - `archon:rag_get_available_sources` - List available knowledge sources +- `archon:rag_list_pages_for_source` - List all pages for a given source (browse documentation structure) +- `archon:rag_read_full_page` - Retrieve full page content by page_id or URL ### Project Management diff --git a/migration/0.1.0/010_add_page_metadata_table.sql b/migration/0.1.0/010_add_page_metadata_table.sql new file mode 100644 index 00000000..bb3abdec --- /dev/null +++ b/migration/0.1.0/010_add_page_metadata_table.sql @@ -0,0 +1,74 @@ +-- ===================================================== +-- Add archon_page_metadata table for page-based RAG retrieval +-- ===================================================== +-- This migration adds support for storing complete documentation pages +-- alongside chunks for improved agent context retrieval. +-- +-- Features: +-- - Full page content storage with metadata +-- - Support for llms-full.txt section-based pages +-- - Foreign key relationship from chunks to pages +-- ===================================================== + +-- Create archon_page_metadata table +CREATE TABLE IF NOT EXISTS archon_page_metadata ( + -- Primary identification + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source_id TEXT NOT NULL, + url TEXT NOT NULL, + + -- Content + full_content TEXT NOT NULL, + + -- Section metadata (for llms-full.txt H1 sections) + section_title TEXT, + section_order INT DEFAULT 0, + + -- Statistics + word_count INT NOT NULL, + char_count INT NOT NULL, + chunk_count INT NOT NULL DEFAULT 0, + + -- Timestamps + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + -- Flexible metadata storage + metadata JSONB DEFAULT '{}'::jsonb, + + -- Constraints + CONSTRAINT archon_page_metadata_url_unique UNIQUE(url), + CONSTRAINT archon_page_metadata_source_fk FOREIGN KEY (source_id) + REFERENCES archon_sources(source_id) ON DELETE CASCADE +); + +-- Add page_id foreign key to archon_crawled_pages +-- This links chunks back to their parent page +-- NULLABLE because existing chunks won't have a page_id yet +ALTER TABLE archon_crawled_pages +ADD COLUMN IF NOT EXISTS page_id UUID REFERENCES archon_page_metadata(id) ON DELETE SET NULL; + +-- Create indexes for query performance +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_source_id ON archon_page_metadata(source_id); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_url ON archon_page_metadata(url); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_section ON archon_page_metadata(source_id, section_title, section_order); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_created_at ON archon_page_metadata(created_at); +CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_metadata ON archon_page_metadata USING GIN(metadata); +CREATE INDEX IF NOT EXISTS idx_archon_crawled_pages_page_id ON archon_crawled_pages(page_id); + +-- Add comments to document the table structure +COMMENT ON TABLE archon_page_metadata IS 'Stores complete documentation pages for agent retrieval'; +COMMENT ON COLUMN archon_page_metadata.source_id IS 'References the source this page belongs to'; +COMMENT ON COLUMN archon_page_metadata.url IS 'Unique URL of the page (synthetic for llms-full.txt sections with #anchor)'; +COMMENT ON COLUMN archon_page_metadata.full_content IS 'Complete markdown/text content of the page'; +COMMENT ON COLUMN archon_page_metadata.section_title IS 'H1 section title for llms-full.txt pages'; +COMMENT ON COLUMN archon_page_metadata.section_order IS 'Order of section in llms-full.txt file (0-based)'; +COMMENT ON COLUMN archon_page_metadata.word_count IS 'Number of words in full_content'; +COMMENT ON COLUMN archon_page_metadata.char_count IS 'Number of characters in full_content'; +COMMENT ON COLUMN archon_page_metadata.chunk_count IS 'Number of chunks created from this page'; +COMMENT ON COLUMN archon_page_metadata.metadata IS 'Flexible JSON metadata (page_type, knowledge_type, tags, etc)'; +COMMENT ON COLUMN archon_crawled_pages.page_id IS 'Foreign key linking chunk to parent page'; + +-- ===================================================== +-- MIGRATION COMPLETE +-- ===================================================== diff --git a/python/src/mcp_server/features/rag/rag_tools.py b/python/src/mcp_server/features/rag/rag_tools.py index e171865c..85cadc29 100644 --- a/python/src/mcp_server/features/rag/rag_tools.py +++ b/python/src/mcp_server/features/rag/rag_tools.py @@ -77,7 +77,11 @@ def register_rag_tools(mcp: FastMCP): @mcp.tool() async def rag_search_knowledge_base( - ctx: Context, query: str, source_id: str | None = None, match_count: int = 5 + ctx: Context, + query: str, + source_id: str | None = None, + match_count: int = 5, + return_mode: str = "pages" ) -> str: """ Search knowledge base for relevant content using RAG. @@ -90,20 +94,31 @@ def register_rag_tools(mcp: FastMCP): This is the 'id' field from available sources, NOT a URL or domain name. Example: "src_1234abcd" not "docs.anthropic.com" match_count: Max results (default: 5) + return_mode: "pages" (default, full pages with metadata) or "chunks" (raw text chunks) Returns: JSON string with structure: - success: bool - Operation success status - - results: list[dict] - Array of matching documents with content and metadata + - results: list[dict] - Array of pages/chunks with content and metadata + Pages include: page_id, url, title, preview, word_count, chunk_matches + Chunks include: content, metadata, similarity + - return_mode: str - Mode used ("pages" or "chunks") - reranked: bool - Whether results were reranked - error: str|null - Error description if success=false + + Note: Use "pages" mode for better context (recommended), or "chunks" for raw granular results. + After getting pages, use rag_read_full_page() to retrieve complete page content. """ try: api_url = get_api_url() timeout = httpx.Timeout(30.0, connect=5.0) async with httpx.AsyncClient(timeout=timeout) as client: - request_data = {"query": query, "match_count": match_count} + request_data = { + "query": query, + "match_count": match_count, + "return_mode": return_mode + } if source_id: request_data["source"] = source_id @@ -115,6 +130,7 @@ def register_rag_tools(mcp: FastMCP): { "success": True, "results": result.get("results", []), + "return_mode": result.get("return_mode", return_mode), "reranked": result.get("reranked", False), "error": None, }, @@ -198,5 +214,148 @@ def register_rag_tools(mcp: FastMCP): logger.error(f"Error searching code examples: {e}") return json.dumps({"success": False, "results": [], "error": str(e)}, indent=2) + @mcp.tool() + async def rag_list_pages_for_source( + ctx: Context, source_id: str, section: str | None = None + ) -> str: + """ + List all pages for a given knowledge source. + + Use this after rag_get_available_sources() to see all pages in a source. + Useful for browsing documentation structure or finding specific pages. + + Args: + source_id: Source ID from rag_get_available_sources() (e.g., "src_1234abcd") + section: Optional filter for llms-full.txt section title (e.g., "# Core Concepts") + + Returns: + JSON string with structure: + - success: bool - Operation success status + - pages: list[dict] - Array of page objects with id, url, section_title, word_count + - total: int - Total number of pages + - source_id: str - The source ID that was queried + - error: str|null - Error description if success=false + + Example workflow: + 1. Call rag_get_available_sources() to get source_id + 2. Call rag_list_pages_for_source(source_id) to see all pages + 3. Call rag_read_full_page(page_id) to read specific pages + """ + try: + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + params = {"source_id": source_id} + if section: + params["section"] = section + + response = await client.get( + urljoin(api_url, "/api/pages"), + params=params + ) + + if response.status_code == 200: + result = response.json() + return json.dumps( + { + "success": True, + "pages": result.get("pages", []), + "total": result.get("total", 0), + "source_id": result.get("source_id", source_id), + "error": None, + }, + indent=2, + ) + else: + error_detail = response.text + return json.dumps( + { + "success": False, + "pages": [], + "total": 0, + "source_id": source_id, + "error": f"HTTP {response.status_code}: {error_detail}", + }, + indent=2, + ) + + except Exception as e: + logger.error(f"Error listing pages for source {source_id}: {e}") + return json.dumps( + { + "success": False, + "pages": [], + "total": 0, + "source_id": source_id, + "error": str(e) + }, + indent=2 + ) + + @mcp.tool() + async def rag_read_full_page( + ctx: Context, page_id: str | None = None, url: str | None = None + ) -> str: + """ + Retrieve full page content from knowledge base. + Use this to get complete page content after RAG search. + + Args: + page_id: Page UUID from search results (e.g., "550e8400-e29b-41d4-a716-446655440000") + url: Page URL (e.g., "https://docs.example.com/getting-started") + + Note: Provide EITHER page_id OR url, not both. + + Returns: + JSON string with structure: + - success: bool + - page: dict with full_content, title, url, metadata + - error: str|null + """ + try: + if not page_id and not url: + return json.dumps( + {"success": False, "error": "Must provide either page_id or url"}, + indent=2 + ) + + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + if page_id: + response = await client.get(urljoin(api_url, f"/api/pages/{page_id}")) + else: + response = await client.get( + urljoin(api_url, "/api/pages/by-url"), + params={"url": url} + ) + + if response.status_code == 200: + page_data = response.json() + return json.dumps( + { + "success": True, + "page": page_data, + "error": None, + }, + indent=2, + ) + else: + error_detail = response.text + return json.dumps( + { + "success": False, + "page": None, + "error": f"HTTP {response.status_code}: {error_detail}", + }, + indent=2, + ) + + except Exception as e: + logger.error(f"Error reading page: {e}") + return json.dumps({"success": False, "page": None, "error": str(e)}, indent=2) + # Log successful registration logger.info("✓ RAG tools registered (HTTP-based version)") diff --git a/python/src/server/api_routes/knowledge_api.py b/python/src/server/api_routes/knowledge_api.py index 47a3d9db..052f7521 100644 --- a/python/src/server/api_routes/knowledge_api.py +++ b/python/src/server/api_routes/knowledge_api.py @@ -84,39 +84,39 @@ async def _validate_provider_api_key(provider: str = None) -> None: safe_provider = provider[:20] # Limit length logger.info(f"🔑 Testing {safe_provider.title()} API key with minimal embedding request...") - try: - # Test API key with minimal embedding request using provider-scoped configuration - from ..services.embeddings.embedding_service import create_embedding - - test_result = await create_embedding(text="test", provider=provider) - - if not test_result: - logger.error( - f"❌ {provider.title()} API key validation failed - no embedding returned" - ) - raise HTTPException( - status_code=401, - detail={ - "error": f"Invalid {provider.title()} API key", - "message": f"Please verify your {provider.title()} API key in Settings.", - "error_type": "authentication_failed", - "provider": provider, - }, - ) - except Exception as e: - logger.error( - f"❌ {provider.title()} API key validation failed: {e}", - exc_info=True, - ) - raise HTTPException( - status_code=401, - detail={ - "error": f"Invalid {provider.title()} API key", - "message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}", - "error_type": "authentication_failed", - "provider": provider, - }, - ) + try: + # Test API key with minimal embedding request using provider-scoped configuration + from ..services.embeddings.embedding_service import create_embedding + + test_result = await create_embedding(text="test", provider=provider) + + if not test_result: + logger.error( + f"❌ {provider.title()} API key validation failed - no embedding returned" + ) + raise HTTPException( + status_code=401, + detail={ + "error": f"Invalid {provider.title()} API key", + "message": f"Please verify your {provider.title()} API key in Settings.", + "error_type": "authentication_failed", + "provider": provider, + }, + ) + except Exception as e: + logger.error( + f"❌ {provider.title()} API key validation failed: {e}", + exc_info=True, + ) + raise HTTPException( + status_code=401, + detail={ + "error": f"Invalid {provider.title()} API key", + "message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}", + "error_type": "authentication_failed", + "provider": provider, + }, + ) logger.info(f"✅ {provider.title()} API key validation successful") @@ -177,6 +177,7 @@ class RagQueryRequest(BaseModel): query: str source: str | None = None match_count: int = 5 + return_mode: str = "chunks" # "chunks" or "pages" @router.get("/crawl-progress/{progress_id}") @@ -1116,10 +1117,13 @@ async def perform_rag_query(request: RagQueryRequest): raise HTTPException(status_code=422, detail="Query cannot be empty") try: - # Use RAGService for RAG query + # Use RAGService for unified RAG query with return_mode support search_service = RAGService(get_supabase_client()) success, result = await search_service.perform_rag_query( - query=request.query, source=request.source, match_count=request.match_count + query=request.query, + source=request.source, + match_count=request.match_count, + return_mode=request.return_mode ) if success: @@ -1288,7 +1292,7 @@ async def stop_crawl_task(progress_id: str): found = False # Step 1: Cancel the orchestration service - orchestration = await get_active_orchestration(progress_id) + orchestration = await get_active_orchestration(progress_id) if orchestration: orchestration.cancel() found = True @@ -1306,7 +1310,7 @@ async def stop_crawl_task(progress_id: str): found = True # Step 3: Remove from active orchestrations registry - await unregister_orchestration(progress_id) + await unregister_orchestration(progress_id) # Step 4: Update progress tracker to reflect cancellation (only if we found and cancelled something) if found: diff --git a/python/src/server/api_routes/pages_api.py b/python/src/server/api_routes/pages_api.py new file mode 100644 index 00000000..b6a20233 --- /dev/null +++ b/python/src/server/api_routes/pages_api.py @@ -0,0 +1,199 @@ +""" +Pages API Module + +This module handles page retrieval operations for RAG: +- List pages for a source +- Get page by ID +- Get page by URL +""" + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel + +from ..config.logfire_config import get_logger, safe_logfire_error +from ..utils import get_supabase_client + +# Get logger for this module +logger = get_logger(__name__) + +# Create router +router = APIRouter(prefix="/api", tags=["pages"]) + +# Maximum character count for returning full page content +MAX_PAGE_CHARS = 20_000 + + +class PageSummary(BaseModel): + """Summary model for page listings (no content)""" + + id: str + url: str + section_title: str | None = None + section_order: int = 0 + word_count: int + char_count: int + chunk_count: int + + +class PageResponse(BaseModel): + """Response model for a single page (with content)""" + + id: str + source_id: str + url: str + full_content: str + section_title: str | None = None + section_order: int = 0 + word_count: int + char_count: int + chunk_count: int + metadata: dict + created_at: str + updated_at: str + + +class PageListResponse(BaseModel): + """Response model for page listing""" + + pages: list[PageSummary] + total: int + source_id: str + + +def _handle_large_page_content(page_data: dict) -> dict: + """ + Replace full_content with a helpful message if page is too large for LLM context. + + Args: + page_data: Page data from database + + Returns: + Page data with full_content potentially replaced + """ + char_count = page_data.get("char_count", 0) + + if char_count > MAX_PAGE_CHARS: + page_data["full_content"] = ( + f"[Page too large for context - {char_count:,} characters]\n\n" + f"This page exceeds the {MAX_PAGE_CHARS:,} character limit for retrieval.\n\n" + f"To access content from this page, use a RAG search with return_mode='chunks' instead of 'pages'.\n" + f"This will retrieve specific relevant sections rather than the entire page.\n\n" + f"Page details:\n" + f"- URL: {page_data.get('url', 'N/A')}\n" + f"- Section: {page_data.get('section_title', 'N/A')}\n" + f"- Word count: {page_data.get('word_count', 0):,}\n" + f"- Character count: {char_count:,}\n" + f"- Available chunks: {page_data.get('chunk_count', 0)}" + ) + + return page_data + + +@router.get("/pages") +async def list_pages( + source_id: str = Query(..., description="Source ID to filter pages"), + section: str | None = Query(None, description="Filter by section title (for llms-full.txt)"), +): + """ + List all pages for a given source. + + Args: + source_id: The source ID to filter pages + section: Optional H1 section title for llms-full.txt sources + + Returns: + PageListResponse with list of pages and metadata + """ + try: + client = get_supabase_client() + + # Build query - select only summary fields (no full_content) + query = client.table("archon_page_metadata").select( + "id, url, section_title, section_order, word_count, char_count, chunk_count" + ).eq("source_id", source_id) + + # Add section filter if provided + if section: + query = query.eq("section_title", section) + + # Order by section_order and created_at + query = query.order("section_order").order("created_at") + + # Execute query + result = query.execute() + + # Use PageSummary (no content handling needed) + pages = [PageSummary(**page) for page in result.data] + + return PageListResponse(pages=pages, total=len(pages), source_id=source_id) + + except Exception as e: + logger.error(f"Error listing pages for source {source_id}: {e}", exc_info=True) + safe_logfire_error(f"Failed to list pages | source_id={source_id} | error={str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to list pages: {str(e)}") from e + + +@router.get("/pages/by-url") +async def get_page_by_url(url: str = Query(..., description="The URL of the page to retrieve")): + """ + Get a single page by its URL. + + This is useful for retrieving pages from RAG search results which return URLs. + + Args: + url: The complete URL of the page (including anchors for llms-full.txt sections) + + Returns: + PageResponse with complete page data + """ + try: + client = get_supabase_client() + + # Query by URL + result = client.table("archon_page_metadata").select("*").eq("url", url).single().execute() + + if not result.data: + raise HTTPException(status_code=404, detail=f"Page not found for URL: {url}") + + # Handle large pages + page_data = _handle_large_page_content(result.data.copy()) + return PageResponse(**page_data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting page by URL {url}: {e}", exc_info=True) + safe_logfire_error(f"Failed to get page by URL | url={url} | error={str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get page: {str(e)}") from e + + +@router.get("/pages/{page_id}") +async def get_page_by_id(page_id: str): + """ + Get a single page by its ID. + + Args: + page_id: The UUID of the page + + Returns: + PageResponse with complete page data + """ + try: + client = get_supabase_client() + + # Query by ID + result = client.table("archon_page_metadata").select("*").eq("id", page_id).single().execute() + + if not result.data: + raise HTTPException(status_code=404, detail=f"Page not found: {page_id}") + + # Handle large pages + page_data = _handle_large_page_content(result.data.copy()) + return PageResponse(**page_data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting page {page_id}: {e}", exc_info=True) + safe_logfire_error(f"Failed to get page | page_id={page_id} | error={str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get page: {str(e)}") from e diff --git a/python/src/server/main.py b/python/src/server/main.py index 19456e06..bd23dfa1 100644 --- a/python/src/server/main.py +++ b/python/src/server/main.py @@ -25,6 +25,7 @@ from .api_routes.knowledge_api import router as knowledge_router from .api_routes.mcp_api import router as mcp_router from .api_routes.migration_api import router as migration_router from .api_routes.ollama_api import router as ollama_router +from .api_routes.pages_api import router as pages_router from .api_routes.progress_api import router as progress_router from .api_routes.projects_api import router as projects_router from .api_routes.providers_api import router as providers_router @@ -183,6 +184,7 @@ app.include_router(settings_router) app.include_router(mcp_router) # app.include_router(mcp_client_router) # Removed - not part of new architecture app.include_router(knowledge_router) +app.include_router(pages_router) app.include_router(ollama_router) app.include_router(projects_router) app.include_router(progress_router) diff --git a/python/src/server/services/crawling/crawling_service.py b/python/src/server/services/crawling/crawling_service.py index 82a98c0c..7a68030e 100644 --- a/python/src/server/services/crawling/crawling_service.py +++ b/python/src/server/services/crawling/crawling_service.py @@ -19,6 +19,7 @@ from ..credential_service import credential_service # Import strategies # Import operations from .document_storage_operations import DocumentStorageOperations +from .page_storage_operations import PageStorageOperations from .helpers.site_config import SiteConfig # Import helpers @@ -98,6 +99,7 @@ class CrawlingService: # Initialize operations self.doc_storage_ops = DocumentStorageOperations(self.supabase_client) + self.page_storage_ops = PageStorageOperations(self.supabase_client) # Track progress state across all stages to prevent UI resets self.progress_state = {"progressId": self.progress_id} if self.progress_id else {} @@ -431,6 +433,7 @@ class CrawlingService: self._check_cancellation, source_url=url, source_display_name=source_display_name, + url_to_page_id=None, # Will be populated after page storage ) # Update progress tracker with source_id now that it's created diff --git a/python/src/server/services/crawling/document_storage_operations.py b/python/src/server/services/crawling/document_storage_operations.py index 8bfa4560..669a9f65 100644 --- a/python/src/server/services/crawling/document_storage_operations.py +++ b/python/src/server/services/crawling/document_storage_operations.py @@ -44,6 +44,7 @@ class DocumentStorageOperations: cancellation_check: Callable | None = None, source_url: str | None = None, source_display_name: str | None = None, + url_to_page_id: dict[str, str] | None = None, ) -> dict[str, Any]: """ Process crawled documents and store them in the database. @@ -128,7 +129,7 @@ class DocumentStorageOperations: all_chunk_numbers.append(i) all_contents.append(chunk) - # Create metadata for each chunk + # Create metadata for each chunk (page_id will be set later) word_count = len(chunk.split()) metadata = { "url": doc_url, @@ -136,6 +137,7 @@ class DocumentStorageOperations: "description": doc.get("description", ""), "source_id": source_id, "knowledge_type": request.get("knowledge_type", "documentation"), + "page_id": None, # Will be set after pages are stored "crawl_type": crawl_type, "word_count": word_count, "char_count": len(chunk), @@ -155,13 +157,101 @@ class DocumentStorageOperations: if doc_index > 0 and doc_index % 5 == 0: await asyncio.sleep(0) - # Create/update source record FIRST before storing documents + # Create/update source record FIRST (required for FK constraints on pages and chunks) if all_contents and all_metadatas: await self._create_source_records( all_metadatas, all_contents, source_word_counts, request, source_url, source_display_name ) + # Store pages AFTER source is created but BEFORE chunks (FK constraint requirement) + from .page_storage_operations import PageStorageOperations + page_storage_ops = PageStorageOperations(self.supabase_client) + + # Check if this is an llms-full.txt file + is_llms_full = crawl_type == "llms-txt" or ( + len(url_to_full_document) == 1 and + next(iter(url_to_full_document.keys())).endswith("llms-full.txt") + ) + + if is_llms_full and url_to_full_document: + # Handle llms-full.txt with section-based pages + base_url = next(iter(url_to_full_document.keys())) + content = url_to_full_document[base_url] + + # Store section pages + url_to_page_id = await page_storage_ops.store_llms_full_sections( + base_url, + content, + original_source_id, + request, + crawl_type="llms_full", + ) + + # Parse sections and re-chunk each section + from .helpers.llms_full_parser import parse_llms_full_sections + sections = parse_llms_full_sections(content, base_url) + + # Clear existing chunks and re-create from sections + all_urls.clear() + all_chunk_numbers.clear() + all_contents.clear() + all_metadatas.clear() + url_to_full_document.clear() + + # Chunk each section separately + for section in sections: + # Update url_to_full_document with section content + url_to_full_document[section.url] = section.content + section_chunks = await storage_service.smart_chunk_text_async( + section.content, chunk_size=5000 + ) + + for i, chunk in enumerate(section_chunks): + all_urls.append(section.url) + all_chunk_numbers.append(i) + all_contents.append(chunk) + + word_count = len(chunk.split()) + metadata = { + "url": section.url, + "title": section.section_title, + "description": "", + "source_id": original_source_id, + "knowledge_type": request.get("knowledge_type", "documentation"), + "page_id": url_to_page_id.get(section.url), + "crawl_type": "llms_full", + "word_count": word_count, + "char_count": len(chunk), + "chunk_index": i, + "tags": request.get("tags", []), + } + all_metadatas.append(metadata) + else: + # Handle regular pages + reconstructed_crawl_results = [] + for url, markdown in url_to_full_document.items(): + reconstructed_crawl_results.append({ + "url": url, + "markdown": markdown, + }) + + if reconstructed_crawl_results: + url_to_page_id = await page_storage_ops.store_pages( + reconstructed_crawl_results, + original_source_id, + request, + crawl_type, + ) + else: + url_to_page_id = {} + + # Update all chunk metadata with correct page_id + for metadata in all_metadatas: + chunk_url = metadata.get("url") + if chunk_url and chunk_url in url_to_page_id: + metadata["page_id"] = url_to_page_id[chunk_url] + safe_logfire_info(f"url_to_full_document keys: {list(url_to_full_document.keys())[:5]}") # Log chunking results @@ -183,6 +273,7 @@ class DocumentStorageOperations: enable_parallel_batches=True, # Enable parallel processing provider=None, # Use configured provider cancellation_check=cancellation_check, # Pass cancellation check + url_to_page_id=url_to_page_id, # Link chunks to pages ) # Calculate chunk counts diff --git a/python/src/server/services/crawling/helpers/llms_full_parser.py b/python/src/server/services/crawling/helpers/llms_full_parser.py new file mode 100644 index 00000000..8af3c371 --- /dev/null +++ b/python/src/server/services/crawling/helpers/llms_full_parser.py @@ -0,0 +1,262 @@ +""" +LLMs-full.txt Section Parser + +Parses llms-full.txt files by splitting on H1 headers (# ) to create separate +"pages" for each section. Each section gets a synthetic URL with a slug anchor. +""" + +import re + +from pydantic import BaseModel + + +class LLMsFullSection(BaseModel): + """Parsed section from llms-full.txt file""" + + section_title: str # Raw H1 text: "# Core Concepts" + section_order: int # Position in document: 0, 1, 2, ... + content: str # Section content (including H1 header) + url: str # Synthetic URL: base.txt#core-concepts + word_count: int + + +def create_section_slug(h1_heading: str) -> str: + """ + Generate URL slug from H1 heading. + + Args: + h1_heading: H1 text like "# Core Concepts" or "# Getting Started" + + Returns: + Slug like "core-concepts" or "getting-started" + + Examples: + "# Core Concepts" -> "core-concepts" + "# API Reference" -> "api-reference" + "# Getting Started!" -> "getting-started" + """ + # Remove "# " prefix if present + slug_text = h1_heading.replace("# ", "").strip() + + # Convert to lowercase + slug = slug_text.lower() + + # Replace spaces with hyphens + slug = slug.replace(" ", "-") + + # Remove special characters (keep only alphanumeric and hyphens) + slug = re.sub(r"[^a-z0-9-]", "", slug) + + # Remove consecutive hyphens + slug = re.sub(r"-+", "-", slug) + + # Remove leading/trailing hyphens + slug = slug.strip("-") + + return slug + + +def create_section_url(base_url: str, h1_heading: str, section_order: int) -> str: + """ + Generate synthetic URL with slug anchor for a section. + + Args: + base_url: Base URL like "https://example.com/llms-full.txt" + h1_heading: H1 text like "# Core Concepts" + section_order: Section position (0-based) + + Returns: + Synthetic URL like "https://example.com/llms-full.txt#section-0-core-concepts" + """ + slug = create_section_slug(h1_heading) + return f"{base_url}#section-{section_order}-{slug}" + + +def parse_llms_full_sections(content: str, base_url: str) -> list[LLMsFullSection]: + """ + Split llms-full.txt content by H1 headers to create separate sections. + + Each H1 (lines starting with "# " but not "##") marks a new section. + Sections are given synthetic URLs with slug anchors. + + Args: + content: Full text content of llms-full.txt file + base_url: Base URL of the file (e.g., "https://example.com/llms-full.txt") + + Returns: + List of LLMsFullSection objects, one per H1 section + + Edge cases: + - No H1 headers: Returns single section with entire content + - Multiple consecutive H1s: Creates separate sections correctly + - Empty sections: Skipped (not included in results) + + Example: + Input content: + ''' + # Core Concepts + Claude is an AI assistant... + + # Getting Started + To get started... + ''' + + Returns: + [ + LLMsFullSection( + section_title="# Core Concepts", + section_order=0, + content="# Core Concepts\\nClaude is...", + url="https://example.com/llms-full.txt#core-concepts", + word_count=5 + ), + LLMsFullSection( + section_title="# Getting Started", + section_order=1, + content="# Getting Started\\nTo get started...", + url="https://example.com/llms-full.txt#getting-started", + word_count=4 + ) + ] + """ + lines = content.split("\n") + + # Pre-scan: mark which lines are inside code blocks + inside_code_block = set() + in_block = False + for i, line in enumerate(lines): + if line.strip().startswith("```"): + in_block = not in_block + if in_block: + inside_code_block.add(i) + + # Parse sections, ignoring H1 headers inside code blocks + sections: list[LLMsFullSection] = [] + current_h1: str | None = None + current_content: list[str] = [] + section_order = 0 + + for i, line in enumerate(lines): + # Detect H1 (starts with "# " but not "##") - but ONLY if not in code block + is_h1 = line.startswith("# ") and not line.startswith("## ") + if is_h1 and i not in inside_code_block: + # Save previous section if it exists + if current_h1 is not None: + section_text = "\n".join(current_content) + # Skip empty sections (only whitespace) + if section_text.strip(): + section_url = create_section_url(base_url, current_h1, section_order) + word_count = len(section_text.split()) + + sections.append( + LLMsFullSection( + section_title=current_h1, + section_order=section_order, + content=section_text, + url=section_url, + word_count=word_count, + ) + ) + section_order += 1 + + # Start new section + current_h1 = line + current_content = [line] + else: + # Only accumulate if we've seen an H1 + if current_h1 is not None: + current_content.append(line) + + # Save last section + if current_h1 is not None: + section_text = "\n".join(current_content) + if section_text.strip(): + section_url = create_section_url(base_url, current_h1, section_order) + word_count = len(section_text.split()) + sections.append( + LLMsFullSection( + section_title=current_h1, + section_order=section_order, + content=section_text, + url=section_url, + word_count=word_count, + ) + ) + + # Edge case: No H1 headers found, treat entire file as single page + if not sections and content.strip(): + sections.append( + LLMsFullSection( + section_title="Full Document", + section_order=0, + content=content, + url=base_url, # No anchor for single-page + word_count=len(content.split()), + ) + ) + + # Fix sections that were split inside code blocks - merge them with next section + if sections: + fixed_sections: list[LLMsFullSection] = [] + i = 0 + while i < len(sections): + current = sections[i] + + # Count ``` at start of lines only (proper code fences) + code_fence_count = sum( + 1 for line in current.content.split('\n') + if line.strip().startswith('```') + ) + + # If odd number, we're inside an unclosed code block - merge with next + while code_fence_count % 2 == 1 and i + 1 < len(sections): + next_section = sections[i + 1] + # Combine content + combined_content = current.content + "\n\n" + next_section.content + # Update current with combined content + current = LLMsFullSection( + section_title=current.section_title, + section_order=current.section_order, + content=combined_content, + url=current.url, + word_count=len(combined_content.split()), + ) + # Move to next section and recount ``` at start of lines + i += 1 + code_fence_count = sum( + 1 for line in current.content.split('\n') + if line.strip().startswith('```') + ) + + fixed_sections.append(current) + i += 1 + + sections = fixed_sections + + # Combine consecutive small sections (<200 chars) together + if sections: + combined_sections: list[LLMsFullSection] = [] + i = 0 + while i < len(sections): + current = sections[i] + combined_content = current.content + + # Keep combining while current is small and there are more sections + while len(combined_content) < 200 and i + 1 < len(sections): + i += 1 + combined_content = combined_content + "\n\n" + sections[i].content + + # Create combined section with first section's metadata + combined = LLMsFullSection( + section_title=current.section_title, + section_order=current.section_order, + content=combined_content, + url=current.url, + word_count=len(combined_content.split()), + ) + combined_sections.append(combined) + i += 1 + + sections = combined_sections + + return sections diff --git a/python/src/server/services/crawling/page_storage_operations.py b/python/src/server/services/crawling/page_storage_operations.py new file mode 100644 index 00000000..a0e8a331 --- /dev/null +++ b/python/src/server/services/crawling/page_storage_operations.py @@ -0,0 +1,248 @@ +""" +Page Storage Operations + +Handles the storage of complete documentation pages in the archon_page_metadata table. +Pages are stored BEFORE chunking to maintain full context for agent retrieval. +""" + +from typing import Any + +from postgrest.exceptions import APIError + +from ...config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info +from .helpers.llms_full_parser import parse_llms_full_sections + +logger = get_logger(__name__) + + +class PageStorageOperations: + """ + Handles page storage operations for crawled content. + + Pages are stored in the archon_page_metadata table with full content and metadata. + This enables agents to retrieve complete documentation pages instead of just chunks. + """ + + def __init__(self, supabase_client): + """ + Initialize page storage operations. + + Args: + supabase_client: The Supabase client for database operations + """ + self.supabase_client = supabase_client + + async def store_pages( + self, + crawl_results: list[dict], + source_id: str, + request: dict[str, Any], + crawl_type: str, + ) -> dict[str, str]: + """ + Store pages in archon_page_metadata table from regular crawl results. + + Args: + crawl_results: List of crawled documents with url, markdown, title, etc. + source_id: The source ID these pages belong to + request: The original crawl request with knowledge_type, tags, etc. + crawl_type: Type of crawl performed (sitemap, url, link_collection, etc.) + + Returns: + {url: page_id} mapping for FK references in chunks + """ + safe_logfire_info( + f"store_pages called | source_id={source_id} | crawl_type={crawl_type} | num_results={len(crawl_results)}" + ) + + url_to_page_id: dict[str, str] = {} + pages_to_insert: list[dict[str, Any]] = [] + + for doc in crawl_results: + url = doc.get("url", "").strip() + markdown = doc.get("markdown", "").strip() + + # Skip documents with empty content or missing URLs + if not url or not markdown: + continue + + # Prepare page record + word_count = len(markdown.split()) + char_count = len(markdown) + + page_record = { + "source_id": source_id, + "url": url, + "full_content": markdown, + "section_title": None, # Regular page, not a section + "section_order": 0, + "word_count": word_count, + "char_count": char_count, + "chunk_count": 0, # Will be updated after chunking + "metadata": { + "knowledge_type": request.get("knowledge_type", "documentation"), + "crawl_type": crawl_type, + "page_type": "documentation", + "tags": request.get("tags", []), + }, + } + pages_to_insert.append(page_record) + + # Batch upsert pages + if pages_to_insert: + try: + safe_logfire_info( + f"Upserting {len(pages_to_insert)} pages into archon_page_metadata table" + ) + result = ( + self.supabase_client.table("archon_page_metadata") + .upsert(pages_to_insert, on_conflict="url") + .execute() + ) + + # Build url → page_id mapping + for page in result.data: + url_to_page_id[page["url"]] = page["id"] + + safe_logfire_info( + f"Successfully stored {len(url_to_page_id)}/{len(pages_to_insert)} pages in archon_page_metadata" + ) + + except APIError as e: + safe_logfire_error( + f"Database error upserting pages | source_id={source_id} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Failed to upsert pages for source {source_id}: {e}", exc_info=True) + # Don't raise - allow chunking to continue even if page storage fails + + except Exception as e: + safe_logfire_error( + f"Unexpected error upserting pages | source_id={source_id} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Unexpected error upserting pages for source {source_id}: {e}", exc_info=True) + # Don't raise - allow chunking to continue + + return url_to_page_id + + async def store_llms_full_sections( + self, + base_url: str, + content: str, + source_id: str, + request: dict[str, Any], + crawl_type: str = "llms_full", + ) -> dict[str, str]: + """ + Store llms-full.txt sections as separate pages. + + Each H1 section gets its own page record with a synthetic URL. + + Args: + base_url: Base URL of the llms-full.txt file + content: Full text content of the file + source_id: The source ID these sections belong to + request: The original crawl request + crawl_type: Type of crawl (defaults to "llms_full") + + Returns: + {url: page_id} mapping for FK references in chunks + """ + url_to_page_id: dict[str, str] = {} + + # Parse sections from content + sections = parse_llms_full_sections(content, base_url) + + if not sections: + logger.warning(f"No sections found in llms-full.txt file: {base_url}") + return url_to_page_id + + safe_logfire_info( + f"Parsed {len(sections)} sections from llms-full.txt file: {base_url}" + ) + + # Prepare page records for each section + pages_to_insert: list[dict[str, Any]] = [] + + for section in sections: + page_record = { + "source_id": source_id, + "url": section.url, + "full_content": section.content, + "section_title": section.section_title, + "section_order": section.section_order, + "word_count": section.word_count, + "char_count": len(section.content), + "chunk_count": 0, # Will be updated after chunking + "metadata": { + "knowledge_type": request.get("knowledge_type", "documentation"), + "crawl_type": crawl_type, + "page_type": "llms_full_section", + "tags": request.get("tags", []), + "section_metadata": { + "section_title": section.section_title, + "section_order": section.section_order, + "base_url": base_url, + }, + }, + } + pages_to_insert.append(page_record) + + # Batch upsert pages + if pages_to_insert: + try: + safe_logfire_info( + f"Upserting {len(pages_to_insert)} section pages into archon_page_metadata" + ) + result = ( + self.supabase_client.table("archon_page_metadata") + .upsert(pages_to_insert, on_conflict="url") + .execute() + ) + + # Build url → page_id mapping + for page in result.data: + url_to_page_id[page["url"]] = page["id"] + + safe_logfire_info( + f"Successfully stored {len(url_to_page_id)}/{len(pages_to_insert)} section pages" + ) + + except APIError as e: + safe_logfire_error( + f"Database error upserting sections | base_url={base_url} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Failed to upsert sections for {base_url}: {e}", exc_info=True) + # Don't raise - allow process to continue + + except Exception as e: + safe_logfire_error( + f"Unexpected error upserting sections | base_url={base_url} | attempted={len(pages_to_insert)} | error={str(e)}" + ) + logger.error(f"Unexpected error upserting sections for {base_url}: {e}", exc_info=True) + # Don't raise - allow process to continue + + return url_to_page_id + + async def update_page_chunk_count(self, page_id: str, chunk_count: int) -> None: + """ + Update the chunk_count field for a page after chunking is complete. + + Args: + page_id: The UUID of the page to update + chunk_count: Number of chunks created from this page + """ + try: + self.supabase_client.table("archon_page_metadata").update( + {"chunk_count": chunk_count} + ).eq("id", page_id).execute() + + safe_logfire_info(f"Updated chunk_count={chunk_count} for page_id={page_id}") + + except APIError as e: + logger.warning( + f"Database error updating chunk_count for page {page_id}: {e}", exc_info=True + ) + except Exception as e: + logger.warning( + f"Unexpected error updating chunk_count for page {page_id}: {e}", exc_info=True + ) diff --git a/python/src/server/services/crawling/strategies/batch.py b/python/src/server/services/crawling/strategies/batch.py index 1457fdca..6a318879 100644 --- a/python/src/server/services/crawling/strategies/batch.py +++ b/python/src/server/services/crawling/strategies/batch.py @@ -234,10 +234,24 @@ class BatchCrawlStrategy: if result.success and result.markdown and result.markdown.fit_markdown: # Map back to original URL original_url = url_mapping.get(result.url, result.url) + + # Extract title from HTML tag + title = "Untitled" + if result.html: + import re + title_match = re.search(r'<title[^>]*>(.*?)', result.html, re.IGNORECASE | re.DOTALL) + if title_match: + extracted_title = title_match.group(1).strip() + # Clean up HTML entities + extracted_title = extracted_title.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"') + if extracted_title: + title = extracted_title + successful_results.append({ "url": original_url, "markdown": result.markdown.fit_markdown, "html": result.html, # Use raw HTML + "title": title, }) else: logger.warning( diff --git a/python/src/server/services/crawling/strategies/single_page.py b/python/src/server/services/crawling/strategies/single_page.py index 6a2cc1cc..58610d01 100644 --- a/python/src/server/services/crawling/strategies/single_page.py +++ b/python/src/server/services/crawling/strategies/single_page.py @@ -179,12 +179,24 @@ class SinglePageCrawlStrategy: if 'getting-started' in url: logger.info(f"Markdown sample for getting-started: {markdown_sample}") + # Extract title from HTML tag + title = "Untitled" + if result.html: + import re + title_match = re.search(r'<title[^>]*>(.*?)', result.html, re.IGNORECASE | re.DOTALL) + if title_match: + extracted_title = title_match.group(1).strip() + # Clean up HTML entities + extracted_title = extracted_title.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"') + if extracted_title: + title = extracted_title + return { "success": True, "url": original_url, # Use original URL for tracking "markdown": result.markdown, "html": result.html, # Use raw HTML instead of cleaned_html for code extraction - "title": result.title or "Untitled", + "title": title, "links": result.links, "content_length": len(result.markdown) } diff --git a/python/src/server/services/search/rag_service.py b/python/src/server/services/search/rag_service.py index cf89cffe..825f2df0 100644 --- a/python/src/server/services/search/rag_service.py +++ b/python/src/server/services/search/rag_service.py @@ -172,21 +172,91 @@ class RAGService: use_enhancement=True, ) + async def _group_chunks_by_pages( + self, chunk_results: list[dict[str, Any]], match_count: int + ) -> list[dict[str, Any]]: + """Group chunk results by page_id (if available) or URL and fetch page metadata.""" + page_groups: dict[str, dict[str, Any]] = {} + + for result in chunk_results: + metadata = result.get("metadata", {}) + page_id = metadata.get("page_id") + url = metadata.get("url") + + # Use page_id as key if available, otherwise URL + group_key = page_id if page_id else url + if not group_key: + continue + + if group_key not in page_groups: + page_groups[group_key] = { + "page_id": page_id, + "url": url, + "chunk_matches": 0, + "total_similarity": 0.0, + "best_chunk_content": result.get("content", ""), + "source_id": metadata.get("source_id"), + } + + page_groups[group_key]["chunk_matches"] += 1 + page_groups[group_key]["total_similarity"] += result.get("similarity_score", 0.0) + + page_results = [] + for group_key, data in page_groups.items(): + avg_similarity = data["total_similarity"] / data["chunk_matches"] + match_boost = min(0.2, data["chunk_matches"] * 0.02) + aggregate_score = avg_similarity * (1 + match_boost) + + # Query page by page_id if available, otherwise by URL + if data["page_id"]: + page_info = ( + self.supabase_client.table("archon_page_metadata") + .select("id, url, section_title, word_count") + .eq("id", data["page_id"]) + .maybe_single() + .execute() + ) + else: + # Regular pages - exact URL match + page_info = ( + self.supabase_client.table("archon_page_metadata") + .select("id, url, section_title, word_count") + .eq("url", data["url"]) + .maybe_single() + .execute() + ) + + if page_info and page_info.data is not None: + page_results.append({ + "page_id": page_info.data["id"], + "url": page_info.data["url"], + "section_title": page_info.data.get("section_title"), + "word_count": page_info.data.get("word_count", 0), + "chunk_matches": data["chunk_matches"], + "aggregate_similarity": aggregate_score, + "average_similarity": avg_similarity, + "source_id": data["source_id"], + }) + + page_results.sort(key=lambda x: x["aggregate_similarity"], reverse=True) + return page_results[:match_count] + async def perform_rag_query( - self, query: str, source: str = None, match_count: int = 5 + self, query: str, source: str = None, match_count: int = 5, return_mode: str = "chunks" ) -> tuple[bool, dict[str, Any]]: """ - Perform a comprehensive RAG query that combines all enabled strategies. + Unified RAG query with all strategies. Pipeline: - 1. Start with vector search - 2. Apply hybrid search if enabled - 3. Apply reranking if enabled + 1. Vector/Hybrid Search (based on settings) + 2. Reranking (if enabled) + 3. Page Grouping (if return_mode="pages") Args: query: The search query source: Optional source domain to filter results match_count: Maximum number of results to return + return_mode: "chunks" (default) or "pages" Returns: Tuple of (success, result_dict) @@ -256,6 +326,23 @@ class RAGService: if len(formatted_results) > match_count: formatted_results = formatted_results[:match_count] + # Step 4: Group by pages if return_mode="pages" AND pages exist + actual_return_mode = return_mode + if return_mode == "pages": + # Check if any chunks have page_id set + has_page_ids = any( + result.get("metadata", {}).get("page_id") is not None + for result in formatted_results + ) + + if has_page_ids: + # Group by pages when page_ids exist + formatted_results = await self._group_chunks_by_pages(formatted_results, match_count) + else: + # Fall back to chunks when no page_ids (pre-migration data) + actual_return_mode = "chunks" + logger.info("No page_ids found in results, returning chunks instead of pages") + # Build response response_data = { "results": formatted_results, @@ -266,13 +353,15 @@ class RAGService: "execution_path": "rag_service_pipeline", "search_mode": "hybrid" if use_hybrid_search else "vector", "reranking_applied": reranking_applied, + "return_mode": actual_return_mode, } span.set_attribute("final_results_count", len(formatted_results)) span.set_attribute("reranking_applied", reranking_applied) + span.set_attribute("return_mode", return_mode) span.set_attribute("success", True) - logger.info(f"RAG query completed - {len(formatted_results)} results found") + logger.info(f"RAG query completed - {len(formatted_results)} {return_mode} found") return True, response_data except Exception as e: diff --git a/python/src/server/services/source_management_service.py b/python/src/server/services/source_management_service.py index 7152f830..cc06bd0a 100644 --- a/python/src/server/services/source_management_service.py +++ b/python/src/server/services/source_management_service.py @@ -273,26 +273,22 @@ async def update_source_info( if original_url: metadata["original_url"] = original_url - # Update existing source (preserving title) - update_data = { + # Use upsert to handle race conditions + upsert_data = { + "source_id": source_id, + "title": existing_title, "summary": summary, "total_word_count": word_count, "metadata": metadata, - "updated_at": "now()", } # Add new fields if provided if source_url: - update_data["source_url"] = source_url + upsert_data["source_url"] = source_url if source_display_name: - update_data["source_display_name"] = source_display_name + upsert_data["source_display_name"] = source_display_name - result = ( - client.table("archon_sources") - .update(update_data) - .eq("source_id", source_id) - .execute() - ) + client.table("archon_sources").upsert(upsert_data).execute() search_logger.info( f"Updated source {source_id} while preserving title: {existing_title}" diff --git a/python/src/server/services/storage/base_storage_service.py b/python/src/server/services/storage/base_storage_service.py index 66332f4f..0a66e849 100644 --- a/python/src/server/services/storage/base_storage_service.py +++ b/python/src/server/services/storage/base_storage_service.py @@ -100,6 +100,23 @@ class BaseStorageService(ABC): # Move start position for next chunk start = end + # Combine consecutive small chunks (<200 chars) together + if chunks: + combined_chunks: list[str] = [] + i = 0 + while i < len(chunks): + current = chunks[i] + + # Keep combining while current is small and there are more chunks + while len(current) < 200 and i + 1 < len(chunks): + i += 1 + current = current + "\n\n" + chunks[i] + + combined_chunks.append(current) + i += 1 + + chunks = combined_chunks + return chunks async def smart_chunk_text_async( diff --git a/python/src/server/services/storage/document_storage_service.py b/python/src/server/services/storage/document_storage_service.py index 4cf02dc4..89841758 100644 --- a/python/src/server/services/storage/document_storage_service.py +++ b/python/src/server/services/storage/document_storage_service.py @@ -25,6 +25,7 @@ async def add_documents_to_supabase( enable_parallel_batches: bool = True, provider: str | None = None, cancellation_check: Any | None = None, + url_to_page_id: dict[str, str] | None = None, ) -> dict[str, int]: """ Add documents to Supabase with threading optimizations. @@ -399,6 +400,9 @@ async def add_documents_to_supabase( search_logger.warning(f"Unsupported embedding dimension {embedding_dim}, using embedding_1536") embedding_column = "embedding_1536" + # Get page_id for this URL if available + page_id = url_to_page_id.get(batch_urls[j]) if url_to_page_id else None + data = { "url": batch_urls[j], "chunk_number": batch_chunk_numbers[j], @@ -409,6 +413,7 @@ async def add_documents_to_supabase( "llm_chat_model": llm_chat_model, # Add LLM model tracking "embedding_model": embedding_model_name, # Add embedding model tracking "embedding_dimension": embedding_dim, # Add dimension tracking + "page_id": page_id, # Link chunk to page } batch_data.append(data) diff --git a/python/tests/server/services/test_llms_full_parser.py b/python/tests/server/services/test_llms_full_parser.py new file mode 100644 index 00000000..ff87d3f2 --- /dev/null +++ b/python/tests/server/services/test_llms_full_parser.py @@ -0,0 +1,195 @@ +""" +Tests for LLMs-full.txt Section Parser +""" + +import pytest + +from src.server.services.crawling.helpers.llms_full_parser import ( + create_section_slug, + create_section_url, + parse_llms_full_sections, +) + + +def test_create_section_slug(): + """Test slug generation from H1 headings""" + assert create_section_slug("# Core Concepts") == "core-concepts" + assert create_section_slug("# Getting Started!") == "getting-started" + assert create_section_slug("# API Reference (v2)") == "api-reference-v2" + assert create_section_slug("# Hello World") == "hello-world" + assert create_section_slug("# Spaces ") == "spaces" + + +def test_create_section_url(): + """Test synthetic URL generation with slug anchor""" + base_url = "https://example.com/llms-full.txt" + url = create_section_url(base_url, "# Core Concepts", 0) + assert url == "https://example.com/llms-full.txt#section-0-core-concepts" + + url = create_section_url(base_url, "# Getting Started", 1) + assert url == "https://example.com/llms-full.txt#section-1-getting-started" + + +def test_parse_single_section(): + """Test parsing a single H1 section""" + content = """# Core Concepts +Claude is an AI assistant built by Anthropic. +It can help with various tasks. +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + assert len(sections) == 1 + assert sections[0].section_title == "# Core Concepts" + assert sections[0].section_order == 0 + assert sections[0].url == "https://example.com/llms-full.txt#section-0-core-concepts" + assert "Claude is an AI assistant" in sections[0].content + assert sections[0].word_count > 0 + + +def test_parse_multiple_sections(): + """Test parsing multiple H1 sections""" + content = """# Core Concepts +Claude is an AI assistant built by Anthropic that can help with various tasks. +It uses advanced language models to understand and respond to queries. +This section provides an overview of the core concepts and capabilities. + +# Getting Started +To get started with Claude, you'll need to create an account and obtain API credentials. +Follow the setup instructions and configure your development environment properly. +This will enable you to make your first API calls and start building applications. + +# API Reference +The API uses REST principles and supports standard HTTP methods like GET, POST, PUT, and DELETE. +Authentication is handled through API keys that should be kept secure at all times. +Comprehensive documentation is available for all endpoints and response formats. +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + assert len(sections) == 3 + assert sections[0].section_title == "# Core Concepts" + assert sections[1].section_title == "# Getting Started" + assert sections[2].section_title == "# API Reference" + + assert sections[0].section_order == 0 + assert sections[1].section_order == 1 + assert sections[2].section_order == 2 + + assert sections[0].url == "https://example.com/llms-full.txt#section-0-core-concepts" + assert sections[1].url == "https://example.com/llms-full.txt#section-1-getting-started" + assert sections[2].url == "https://example.com/llms-full.txt#section-2-api-reference" + + +def test_no_h1_headers(): + """Test handling content with no H1 headers""" + content = """This is some documentation. +It has no H1 headers. +Just regular content. +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + assert len(sections) == 1 + assert sections[0].section_title == "Full Document" + assert sections[0].url == "https://example.com/llms-full.txt" + assert "This is some documentation" in sections[0].content + + +def test_h2_not_treated_as_section(): + """Test that H2 headers (##) are not treated as section boundaries""" + content = """# Main Section +This is the main section. + +## Subsection +This is a subsection. + +## Another Subsection +This is another subsection. +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + assert len(sections) == 1 + assert sections[0].section_title == "# Main Section" + assert "## Subsection" in sections[0].content + assert "## Another Subsection" in sections[0].content + + +def test_empty_sections_skipped(): + """Test that empty sections are skipped""" + content = """# Section 1 +This is the first section with enough content to prevent automatic combination. +It contains multiple sentences and provides substantial information for testing purposes. +The section has several lines to ensure it exceeds the minimum character threshold. + +# + +# Section 2 +This is the second section with enough content to prevent automatic combination. +It also contains multiple sentences and provides substantial information for testing. +The section has several lines to ensure it exceeds the minimum character threshold. +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + # Should only have 2 sections (empty one skipped) + assert len(sections) == 2 + assert sections[0].section_title == "# Section 1" + assert sections[1].section_title == "# Section 2" + + +def test_consecutive_h1_headers(): + """Test handling multiple consecutive H1 headers""" + content = """# Section 1 +The first section contains enough content to prevent automatic combination with subsequent sections. +It has multiple sentences and provides substantial information for proper testing functionality. +This ensures that the section exceeds the minimum character threshold requirement. +# Section 2 +This section also has enough content to prevent automatic combination with the previous section. +It contains multiple sentences and provides substantial information for proper testing. +The content here ensures that the section exceeds the minimum character threshold. +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + # Both sections should be parsed + assert len(sections) == 2 + assert sections[0].section_title == "# Section 1" + assert sections[1].section_title == "# Section 2" + assert "The first section contains enough content" in sections[0].content + assert "This section also has enough content" in sections[1].content + + +def test_word_count_calculation(): + """Test word count calculation for sections""" + content = """# Test Section +This is a test section with exactly ten words here. +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + assert len(sections) == 1 + # Word count includes the H1 heading + assert sections[0].word_count > 10 + + +def test_empty_content(): + """Test handling empty content""" + content = "" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + assert len(sections) == 0 + + +def test_whitespace_only_content(): + """Test handling whitespace-only content""" + content = """ + + +""" + base_url = "https://example.com/llms-full.txt" + sections = parse_llms_full_sections(content, base_url) + + assert len(sections) == 0 diff --git a/python/tests/test_source_race_condition.py b/python/tests/test_source_race_condition.py index 5864db28..a6ff4116 100644 --- a/python/tests/test_source_race_condition.py +++ b/python/tests/test_source_race_condition.py @@ -119,8 +119,8 @@ class TestSourceRaceCondition: assert "upsert" in methods_called, "Should use upsert for new sources" assert "insert" not in methods_called, "Should not use insert to avoid race conditions" - def test_existing_source_uses_update(self): - """Test that existing sources still use UPDATE (not affected by change).""" + def test_existing_source_uses_upsert(self): + """Test that existing sources use UPSERT to handle race conditions.""" mock_client = Mock() methods_called = [] @@ -158,9 +158,9 @@ class TestSourceRaceCondition: )) loop.close() - # Should use update for existing sources - assert "update" in methods_called, "Should use update for existing sources" - assert "upsert" not in methods_called, "Should not use upsert for existing sources" + # Should use upsert for existing sources to handle race conditions + assert "upsert" in methods_called, "Should use upsert for existing sources" + assert "update" not in methods_called, "Should not use update (upsert handles race conditions)" @pytest.mark.asyncio async def test_async_concurrent_creation(self):