Merge branch 'main' of https://github.com/coleam00/Archon into feature/ui-style-guide

This commit is contained in:
sean-eskerium
2025-10-09 20:42:08 -04:00
18 changed files with 1437 additions and 65 deletions

View File

@@ -272,6 +272,8 @@ When connected to Claude/Cursor/Windsurf, the following tools are available:
- `archon:rag_search_knowledge_base` - Search knowledge base for relevant content
- `archon:rag_search_code_examples` - Find code snippets in the knowledge base
- `archon:rag_get_available_sources` - List available knowledge sources
- `archon:rag_list_pages_for_source` - List all pages for a given source (browse documentation structure)
- `archon:rag_read_full_page` - Retrieve full page content by page_id or URL
### Project Management

View File

@@ -0,0 +1,74 @@
-- =====================================================
-- Add archon_page_metadata table for page-based RAG retrieval
-- =====================================================
-- This migration adds support for storing complete documentation pages
-- alongside chunks for improved agent context retrieval.
--
-- Features:
-- - Full page content storage with metadata
-- - Support for llms-full.txt section-based pages
-- - Foreign key relationship from chunks to pages
-- =====================================================
-- Create archon_page_metadata table
CREATE TABLE IF NOT EXISTS archon_page_metadata (
-- Primary identification
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id TEXT NOT NULL,
url TEXT NOT NULL,
-- Content
full_content TEXT NOT NULL,
-- Section metadata (for llms-full.txt H1 sections)
section_title TEXT,
section_order INT DEFAULT 0,
-- Statistics
word_count INT NOT NULL,
char_count INT NOT NULL,
chunk_count INT NOT NULL DEFAULT 0,
-- Timestamps
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
-- Flexible metadata storage
metadata JSONB DEFAULT '{}'::jsonb,
-- Constraints
CONSTRAINT archon_page_metadata_url_unique UNIQUE(url),
CONSTRAINT archon_page_metadata_source_fk FOREIGN KEY (source_id)
REFERENCES archon_sources(source_id) ON DELETE CASCADE
);
-- Add page_id foreign key to archon_crawled_pages
-- This links chunks back to their parent page
-- NULLABLE because existing chunks won't have a page_id yet
ALTER TABLE archon_crawled_pages
ADD COLUMN IF NOT EXISTS page_id UUID REFERENCES archon_page_metadata(id) ON DELETE SET NULL;
-- Create indexes for query performance
CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_source_id ON archon_page_metadata(source_id);
CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_url ON archon_page_metadata(url);
CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_section ON archon_page_metadata(source_id, section_title, section_order);
CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_created_at ON archon_page_metadata(created_at);
CREATE INDEX IF NOT EXISTS idx_archon_page_metadata_metadata ON archon_page_metadata USING GIN(metadata);
CREATE INDEX IF NOT EXISTS idx_archon_crawled_pages_page_id ON archon_crawled_pages(page_id);
-- Add comments to document the table structure
COMMENT ON TABLE archon_page_metadata IS 'Stores complete documentation pages for agent retrieval';
COMMENT ON COLUMN archon_page_metadata.source_id IS 'References the source this page belongs to';
COMMENT ON COLUMN archon_page_metadata.url IS 'Unique URL of the page (synthetic for llms-full.txt sections with #anchor)';
COMMENT ON COLUMN archon_page_metadata.full_content IS 'Complete markdown/text content of the page';
COMMENT ON COLUMN archon_page_metadata.section_title IS 'H1 section title for llms-full.txt pages';
COMMENT ON COLUMN archon_page_metadata.section_order IS 'Order of section in llms-full.txt file (0-based)';
COMMENT ON COLUMN archon_page_metadata.word_count IS 'Number of words in full_content';
COMMENT ON COLUMN archon_page_metadata.char_count IS 'Number of characters in full_content';
COMMENT ON COLUMN archon_page_metadata.chunk_count IS 'Number of chunks created from this page';
COMMENT ON COLUMN archon_page_metadata.metadata IS 'Flexible JSON metadata (page_type, knowledge_type, tags, etc)';
COMMENT ON COLUMN archon_crawled_pages.page_id IS 'Foreign key linking chunk to parent page';
-- =====================================================
-- MIGRATION COMPLETE
-- =====================================================

View File

@@ -77,7 +77,11 @@ def register_rag_tools(mcp: FastMCP):
@mcp.tool()
async def rag_search_knowledge_base(
ctx: Context, query: str, source_id: str | None = None, match_count: int = 5
ctx: Context,
query: str,
source_id: str | None = None,
match_count: int = 5,
return_mode: str = "pages"
) -> str:
"""
Search knowledge base for relevant content using RAG.
@@ -90,20 +94,31 @@ def register_rag_tools(mcp: FastMCP):
This is the 'id' field from available sources, NOT a URL or domain name.
Example: "src_1234abcd" not "docs.anthropic.com"
match_count: Max results (default: 5)
return_mode: "pages" (default, full pages with metadata) or "chunks" (raw text chunks)
Returns:
JSON string with structure:
- success: bool - Operation success status
- results: list[dict] - Array of matching documents with content and metadata
- results: list[dict] - Array of pages/chunks with content and metadata
Pages include: page_id, url, title, preview, word_count, chunk_matches
Chunks include: content, metadata, similarity
- return_mode: str - Mode used ("pages" or "chunks")
- reranked: bool - Whether results were reranked
- error: str|null - Error description if success=false
Note: Use "pages" mode for better context (recommended), or "chunks" for raw granular results.
After getting pages, use rag_read_full_page() to retrieve complete page content.
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
request_data = {"query": query, "match_count": match_count}
request_data = {
"query": query,
"match_count": match_count,
"return_mode": return_mode
}
if source_id:
request_data["source"] = source_id
@@ -115,6 +130,7 @@ def register_rag_tools(mcp: FastMCP):
{
"success": True,
"results": result.get("results", []),
"return_mode": result.get("return_mode", return_mode),
"reranked": result.get("reranked", False),
"error": None,
},
@@ -198,5 +214,148 @@ def register_rag_tools(mcp: FastMCP):
logger.error(f"Error searching code examples: {e}")
return json.dumps({"success": False, "results": [], "error": str(e)}, indent=2)
@mcp.tool()
async def rag_list_pages_for_source(
ctx: Context, source_id: str, section: str | None = None
) -> str:
"""
List all pages for a given knowledge source.
Use this after rag_get_available_sources() to see all pages in a source.
Useful for browsing documentation structure or finding specific pages.
Args:
source_id: Source ID from rag_get_available_sources() (e.g., "src_1234abcd")
section: Optional filter for llms-full.txt section title (e.g., "# Core Concepts")
Returns:
JSON string with structure:
- success: bool - Operation success status
- pages: list[dict] - Array of page objects with id, url, section_title, word_count
- total: int - Total number of pages
- source_id: str - The source ID that was queried
- error: str|null - Error description if success=false
Example workflow:
1. Call rag_get_available_sources() to get source_id
2. Call rag_list_pages_for_source(source_id) to see all pages
3. Call rag_read_full_page(page_id) to read specific pages
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
params = {"source_id": source_id}
if section:
params["section"] = section
response = await client.get(
urljoin(api_url, "/api/pages"),
params=params
)
if response.status_code == 200:
result = response.json()
return json.dumps(
{
"success": True,
"pages": result.get("pages", []),
"total": result.get("total", 0),
"source_id": result.get("source_id", source_id),
"error": None,
},
indent=2,
)
else:
error_detail = response.text
return json.dumps(
{
"success": False,
"pages": [],
"total": 0,
"source_id": source_id,
"error": f"HTTP {response.status_code}: {error_detail}",
},
indent=2,
)
except Exception as e:
logger.error(f"Error listing pages for source {source_id}: {e}")
return json.dumps(
{
"success": False,
"pages": [],
"total": 0,
"source_id": source_id,
"error": str(e)
},
indent=2
)
@mcp.tool()
async def rag_read_full_page(
ctx: Context, page_id: str | None = None, url: str | None = None
) -> str:
"""
Retrieve full page content from knowledge base.
Use this to get complete page content after RAG search.
Args:
page_id: Page UUID from search results (e.g., "550e8400-e29b-41d4-a716-446655440000")
url: Page URL (e.g., "https://docs.example.com/getting-started")
Note: Provide EITHER page_id OR url, not both.
Returns:
JSON string with structure:
- success: bool
- page: dict with full_content, title, url, metadata
- error: str|null
"""
try:
if not page_id and not url:
return json.dumps(
{"success": False, "error": "Must provide either page_id or url"},
indent=2
)
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
if page_id:
response = await client.get(urljoin(api_url, f"/api/pages/{page_id}"))
else:
response = await client.get(
urljoin(api_url, "/api/pages/by-url"),
params={"url": url}
)
if response.status_code == 200:
page_data = response.json()
return json.dumps(
{
"success": True,
"page": page_data,
"error": None,
},
indent=2,
)
else:
error_detail = response.text
return json.dumps(
{
"success": False,
"page": None,
"error": f"HTTP {response.status_code}: {error_detail}",
},
indent=2,
)
except Exception as e:
logger.error(f"Error reading page: {e}")
return json.dumps({"success": False, "page": None, "error": str(e)}, indent=2)
# Log successful registration
logger.info("✓ RAG tools registered (HTTP-based version)")

View File

@@ -84,39 +84,39 @@ async def _validate_provider_api_key(provider: str = None) -> None:
safe_provider = provider[:20] # Limit length
logger.info(f"🔑 Testing {safe_provider.title()} API key with minimal embedding request...")
try:
# Test API key with minimal embedding request using provider-scoped configuration
from ..services.embeddings.embedding_service import create_embedding
test_result = await create_embedding(text="test", provider=provider)
if not test_result:
logger.error(
f"{provider.title()} API key validation failed - no embedding returned"
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings.",
"error_type": "authentication_failed",
"provider": provider,
},
)
except Exception as e:
logger.error(
f"{provider.title()} API key validation failed: {e}",
exc_info=True,
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}",
"error_type": "authentication_failed",
"provider": provider,
},
)
try:
# Test API key with minimal embedding request using provider-scoped configuration
from ..services.embeddings.embedding_service import create_embedding
test_result = await create_embedding(text="test", provider=provider)
if not test_result:
logger.error(
f"{provider.title()} API key validation failed - no embedding returned"
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings.",
"error_type": "authentication_failed",
"provider": provider,
},
)
except Exception as e:
logger.error(
f"{provider.title()} API key validation failed: {e}",
exc_info=True,
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}",
"error_type": "authentication_failed",
"provider": provider,
},
)
logger.info(f"{provider.title()} API key validation successful")
@@ -177,6 +177,7 @@ class RagQueryRequest(BaseModel):
query: str
source: str | None = None
match_count: int = 5
return_mode: str = "chunks" # "chunks" or "pages"
@router.get("/crawl-progress/{progress_id}")
@@ -1116,10 +1117,13 @@ async def perform_rag_query(request: RagQueryRequest):
raise HTTPException(status_code=422, detail="Query cannot be empty")
try:
# Use RAGService for RAG query
# Use RAGService for unified RAG query with return_mode support
search_service = RAGService(get_supabase_client())
success, result = await search_service.perform_rag_query(
query=request.query, source=request.source, match_count=request.match_count
query=request.query,
source=request.source,
match_count=request.match_count,
return_mode=request.return_mode
)
if success:
@@ -1288,7 +1292,7 @@ async def stop_crawl_task(progress_id: str):
found = False
# Step 1: Cancel the orchestration service
orchestration = await get_active_orchestration(progress_id)
orchestration = await get_active_orchestration(progress_id)
if orchestration:
orchestration.cancel()
found = True
@@ -1306,7 +1310,7 @@ async def stop_crawl_task(progress_id: str):
found = True
# Step 3: Remove from active orchestrations registry
await unregister_orchestration(progress_id)
await unregister_orchestration(progress_id)
# Step 4: Update progress tracker to reflect cancellation (only if we found and cancelled something)
if found:

View File

@@ -0,0 +1,199 @@
"""
Pages API Module
This module handles page retrieval operations for RAG:
- List pages for a source
- Get page by ID
- Get page by URL
"""
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from ..config.logfire_config import get_logger, safe_logfire_error
from ..utils import get_supabase_client
# Get logger for this module
logger = get_logger(__name__)
# Create router
router = APIRouter(prefix="/api", tags=["pages"])
# Maximum character count for returning full page content
MAX_PAGE_CHARS = 20_000
class PageSummary(BaseModel):
"""Summary model for page listings (no content)"""
id: str
url: str
section_title: str | None = None
section_order: int = 0
word_count: int
char_count: int
chunk_count: int
class PageResponse(BaseModel):
"""Response model for a single page (with content)"""
id: str
source_id: str
url: str
full_content: str
section_title: str | None = None
section_order: int = 0
word_count: int
char_count: int
chunk_count: int
metadata: dict
created_at: str
updated_at: str
class PageListResponse(BaseModel):
"""Response model for page listing"""
pages: list[PageSummary]
total: int
source_id: str
def _handle_large_page_content(page_data: dict) -> dict:
"""
Replace full_content with a helpful message if page is too large for LLM context.
Args:
page_data: Page data from database
Returns:
Page data with full_content potentially replaced
"""
char_count = page_data.get("char_count", 0)
if char_count > MAX_PAGE_CHARS:
page_data["full_content"] = (
f"[Page too large for context - {char_count:,} characters]\n\n"
f"This page exceeds the {MAX_PAGE_CHARS:,} character limit for retrieval.\n\n"
f"To access content from this page, use a RAG search with return_mode='chunks' instead of 'pages'.\n"
f"This will retrieve specific relevant sections rather than the entire page.\n\n"
f"Page details:\n"
f"- URL: {page_data.get('url', 'N/A')}\n"
f"- Section: {page_data.get('section_title', 'N/A')}\n"
f"- Word count: {page_data.get('word_count', 0):,}\n"
f"- Character count: {char_count:,}\n"
f"- Available chunks: {page_data.get('chunk_count', 0)}"
)
return page_data
@router.get("/pages")
async def list_pages(
source_id: str = Query(..., description="Source ID to filter pages"),
section: str | None = Query(None, description="Filter by section title (for llms-full.txt)"),
):
"""
List all pages for a given source.
Args:
source_id: The source ID to filter pages
section: Optional H1 section title for llms-full.txt sources
Returns:
PageListResponse with list of pages and metadata
"""
try:
client = get_supabase_client()
# Build query - select only summary fields (no full_content)
query = client.table("archon_page_metadata").select(
"id, url, section_title, section_order, word_count, char_count, chunk_count"
).eq("source_id", source_id)
# Add section filter if provided
if section:
query = query.eq("section_title", section)
# Order by section_order and created_at
query = query.order("section_order").order("created_at")
# Execute query
result = query.execute()
# Use PageSummary (no content handling needed)
pages = [PageSummary(**page) for page in result.data]
return PageListResponse(pages=pages, total=len(pages), source_id=source_id)
except Exception as e:
logger.error(f"Error listing pages for source {source_id}: {e}", exc_info=True)
safe_logfire_error(f"Failed to list pages | source_id={source_id} | error={str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to list pages: {str(e)}") from e
@router.get("/pages/by-url")
async def get_page_by_url(url: str = Query(..., description="The URL of the page to retrieve")):
"""
Get a single page by its URL.
This is useful for retrieving pages from RAG search results which return URLs.
Args:
url: The complete URL of the page (including anchors for llms-full.txt sections)
Returns:
PageResponse with complete page data
"""
try:
client = get_supabase_client()
# Query by URL
result = client.table("archon_page_metadata").select("*").eq("url", url).single().execute()
if not result.data:
raise HTTPException(status_code=404, detail=f"Page not found for URL: {url}")
# Handle large pages
page_data = _handle_large_page_content(result.data.copy())
return PageResponse(**page_data)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting page by URL {url}: {e}", exc_info=True)
safe_logfire_error(f"Failed to get page by URL | url={url} | error={str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get page: {str(e)}") from e
@router.get("/pages/{page_id}")
async def get_page_by_id(page_id: str):
"""
Get a single page by its ID.
Args:
page_id: The UUID of the page
Returns:
PageResponse with complete page data
"""
try:
client = get_supabase_client()
# Query by ID
result = client.table("archon_page_metadata").select("*").eq("id", page_id).single().execute()
if not result.data:
raise HTTPException(status_code=404, detail=f"Page not found: {page_id}")
# Handle large pages
page_data = _handle_large_page_content(result.data.copy())
return PageResponse(**page_data)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting page {page_id}: {e}", exc_info=True)
safe_logfire_error(f"Failed to get page | page_id={page_id} | error={str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get page: {str(e)}") from e

View File

@@ -25,6 +25,7 @@ from .api_routes.knowledge_api import router as knowledge_router
from .api_routes.mcp_api import router as mcp_router
from .api_routes.migration_api import router as migration_router
from .api_routes.ollama_api import router as ollama_router
from .api_routes.pages_api import router as pages_router
from .api_routes.progress_api import router as progress_router
from .api_routes.projects_api import router as projects_router
from .api_routes.providers_api import router as providers_router
@@ -183,6 +184,7 @@ app.include_router(settings_router)
app.include_router(mcp_router)
# app.include_router(mcp_client_router) # Removed - not part of new architecture
app.include_router(knowledge_router)
app.include_router(pages_router)
app.include_router(ollama_router)
app.include_router(projects_router)
app.include_router(progress_router)

View File

@@ -19,6 +19,7 @@ from ..credential_service import credential_service
# Import strategies
# Import operations
from .document_storage_operations import DocumentStorageOperations
from .page_storage_operations import PageStorageOperations
from .helpers.site_config import SiteConfig
# Import helpers
@@ -98,6 +99,7 @@ class CrawlingService:
# Initialize operations
self.doc_storage_ops = DocumentStorageOperations(self.supabase_client)
self.page_storage_ops = PageStorageOperations(self.supabase_client)
# Track progress state across all stages to prevent UI resets
self.progress_state = {"progressId": self.progress_id} if self.progress_id else {}
@@ -431,6 +433,7 @@ class CrawlingService:
self._check_cancellation,
source_url=url,
source_display_name=source_display_name,
url_to_page_id=None, # Will be populated after page storage
)
# Update progress tracker with source_id now that it's created

View File

@@ -44,6 +44,7 @@ class DocumentStorageOperations:
cancellation_check: Callable | None = None,
source_url: str | None = None,
source_display_name: str | None = None,
url_to_page_id: dict[str, str] | None = None,
) -> dict[str, Any]:
"""
Process crawled documents and store them in the database.
@@ -128,7 +129,7 @@ class DocumentStorageOperations:
all_chunk_numbers.append(i)
all_contents.append(chunk)
# Create metadata for each chunk
# Create metadata for each chunk (page_id will be set later)
word_count = len(chunk.split())
metadata = {
"url": doc_url,
@@ -136,6 +137,7 @@ class DocumentStorageOperations:
"description": doc.get("description", ""),
"source_id": source_id,
"knowledge_type": request.get("knowledge_type", "documentation"),
"page_id": None, # Will be set after pages are stored
"crawl_type": crawl_type,
"word_count": word_count,
"char_count": len(chunk),
@@ -155,13 +157,101 @@ class DocumentStorageOperations:
if doc_index > 0 and doc_index % 5 == 0:
await asyncio.sleep(0)
# Create/update source record FIRST before storing documents
# Create/update source record FIRST (required for FK constraints on pages and chunks)
if all_contents and all_metadatas:
await self._create_source_records(
all_metadatas, all_contents, source_word_counts, request,
source_url, source_display_name
)
# Store pages AFTER source is created but BEFORE chunks (FK constraint requirement)
from .page_storage_operations import PageStorageOperations
page_storage_ops = PageStorageOperations(self.supabase_client)
# Check if this is an llms-full.txt file
is_llms_full = crawl_type == "llms-txt" or (
len(url_to_full_document) == 1 and
next(iter(url_to_full_document.keys())).endswith("llms-full.txt")
)
if is_llms_full and url_to_full_document:
# Handle llms-full.txt with section-based pages
base_url = next(iter(url_to_full_document.keys()))
content = url_to_full_document[base_url]
# Store section pages
url_to_page_id = await page_storage_ops.store_llms_full_sections(
base_url,
content,
original_source_id,
request,
crawl_type="llms_full",
)
# Parse sections and re-chunk each section
from .helpers.llms_full_parser import parse_llms_full_sections
sections = parse_llms_full_sections(content, base_url)
# Clear existing chunks and re-create from sections
all_urls.clear()
all_chunk_numbers.clear()
all_contents.clear()
all_metadatas.clear()
url_to_full_document.clear()
# Chunk each section separately
for section in sections:
# Update url_to_full_document with section content
url_to_full_document[section.url] = section.content
section_chunks = await storage_service.smart_chunk_text_async(
section.content, chunk_size=5000
)
for i, chunk in enumerate(section_chunks):
all_urls.append(section.url)
all_chunk_numbers.append(i)
all_contents.append(chunk)
word_count = len(chunk.split())
metadata = {
"url": section.url,
"title": section.section_title,
"description": "",
"source_id": original_source_id,
"knowledge_type": request.get("knowledge_type", "documentation"),
"page_id": url_to_page_id.get(section.url),
"crawl_type": "llms_full",
"word_count": word_count,
"char_count": len(chunk),
"chunk_index": i,
"tags": request.get("tags", []),
}
all_metadatas.append(metadata)
else:
# Handle regular pages
reconstructed_crawl_results = []
for url, markdown in url_to_full_document.items():
reconstructed_crawl_results.append({
"url": url,
"markdown": markdown,
})
if reconstructed_crawl_results:
url_to_page_id = await page_storage_ops.store_pages(
reconstructed_crawl_results,
original_source_id,
request,
crawl_type,
)
else:
url_to_page_id = {}
# Update all chunk metadata with correct page_id
for metadata in all_metadatas:
chunk_url = metadata.get("url")
if chunk_url and chunk_url in url_to_page_id:
metadata["page_id"] = url_to_page_id[chunk_url]
safe_logfire_info(f"url_to_full_document keys: {list(url_to_full_document.keys())[:5]}")
# Log chunking results
@@ -183,6 +273,7 @@ class DocumentStorageOperations:
enable_parallel_batches=True, # Enable parallel processing
provider=None, # Use configured provider
cancellation_check=cancellation_check, # Pass cancellation check
url_to_page_id=url_to_page_id, # Link chunks to pages
)
# Calculate chunk counts

View File

@@ -0,0 +1,262 @@
"""
LLMs-full.txt Section Parser
Parses llms-full.txt files by splitting on H1 headers (# ) to create separate
"pages" for each section. Each section gets a synthetic URL with a slug anchor.
"""
import re
from pydantic import BaseModel
class LLMsFullSection(BaseModel):
"""Parsed section from llms-full.txt file"""
section_title: str # Raw H1 text: "# Core Concepts"
section_order: int # Position in document: 0, 1, 2, ...
content: str # Section content (including H1 header)
url: str # Synthetic URL: base.txt#core-concepts
word_count: int
def create_section_slug(h1_heading: str) -> str:
"""
Generate URL slug from H1 heading.
Args:
h1_heading: H1 text like "# Core Concepts" or "# Getting Started"
Returns:
Slug like "core-concepts" or "getting-started"
Examples:
"# Core Concepts" -> "core-concepts"
"# API Reference" -> "api-reference"
"# Getting Started!" -> "getting-started"
"""
# Remove "# " prefix if present
slug_text = h1_heading.replace("# ", "").strip()
# Convert to lowercase
slug = slug_text.lower()
# Replace spaces with hyphens
slug = slug.replace(" ", "-")
# Remove special characters (keep only alphanumeric and hyphens)
slug = re.sub(r"[^a-z0-9-]", "", slug)
# Remove consecutive hyphens
slug = re.sub(r"-+", "-", slug)
# Remove leading/trailing hyphens
slug = slug.strip("-")
return slug
def create_section_url(base_url: str, h1_heading: str, section_order: int) -> str:
"""
Generate synthetic URL with slug anchor for a section.
Args:
base_url: Base URL like "https://example.com/llms-full.txt"
h1_heading: H1 text like "# Core Concepts"
section_order: Section position (0-based)
Returns:
Synthetic URL like "https://example.com/llms-full.txt#section-0-core-concepts"
"""
slug = create_section_slug(h1_heading)
return f"{base_url}#section-{section_order}-{slug}"
def parse_llms_full_sections(content: str, base_url: str) -> list[LLMsFullSection]:
"""
Split llms-full.txt content by H1 headers to create separate sections.
Each H1 (lines starting with "# " but not "##") marks a new section.
Sections are given synthetic URLs with slug anchors.
Args:
content: Full text content of llms-full.txt file
base_url: Base URL of the file (e.g., "https://example.com/llms-full.txt")
Returns:
List of LLMsFullSection objects, one per H1 section
Edge cases:
- No H1 headers: Returns single section with entire content
- Multiple consecutive H1s: Creates separate sections correctly
- Empty sections: Skipped (not included in results)
Example:
Input content:
'''
# Core Concepts
Claude is an AI assistant...
# Getting Started
To get started...
'''
Returns:
[
LLMsFullSection(
section_title="# Core Concepts",
section_order=0,
content="# Core Concepts\\nClaude is...",
url="https://example.com/llms-full.txt#core-concepts",
word_count=5
),
LLMsFullSection(
section_title="# Getting Started",
section_order=1,
content="# Getting Started\\nTo get started...",
url="https://example.com/llms-full.txt#getting-started",
word_count=4
)
]
"""
lines = content.split("\n")
# Pre-scan: mark which lines are inside code blocks
inside_code_block = set()
in_block = False
for i, line in enumerate(lines):
if line.strip().startswith("```"):
in_block = not in_block
if in_block:
inside_code_block.add(i)
# Parse sections, ignoring H1 headers inside code blocks
sections: list[LLMsFullSection] = []
current_h1: str | None = None
current_content: list[str] = []
section_order = 0
for i, line in enumerate(lines):
# Detect H1 (starts with "# " but not "##") - but ONLY if not in code block
is_h1 = line.startswith("# ") and not line.startswith("## ")
if is_h1 and i not in inside_code_block:
# Save previous section if it exists
if current_h1 is not None:
section_text = "\n".join(current_content)
# Skip empty sections (only whitespace)
if section_text.strip():
section_url = create_section_url(base_url, current_h1, section_order)
word_count = len(section_text.split())
sections.append(
LLMsFullSection(
section_title=current_h1,
section_order=section_order,
content=section_text,
url=section_url,
word_count=word_count,
)
)
section_order += 1
# Start new section
current_h1 = line
current_content = [line]
else:
# Only accumulate if we've seen an H1
if current_h1 is not None:
current_content.append(line)
# Save last section
if current_h1 is not None:
section_text = "\n".join(current_content)
if section_text.strip():
section_url = create_section_url(base_url, current_h1, section_order)
word_count = len(section_text.split())
sections.append(
LLMsFullSection(
section_title=current_h1,
section_order=section_order,
content=section_text,
url=section_url,
word_count=word_count,
)
)
# Edge case: No H1 headers found, treat entire file as single page
if not sections and content.strip():
sections.append(
LLMsFullSection(
section_title="Full Document",
section_order=0,
content=content,
url=base_url, # No anchor for single-page
word_count=len(content.split()),
)
)
# Fix sections that were split inside code blocks - merge them with next section
if sections:
fixed_sections: list[LLMsFullSection] = []
i = 0
while i < len(sections):
current = sections[i]
# Count ``` at start of lines only (proper code fences)
code_fence_count = sum(
1 for line in current.content.split('\n')
if line.strip().startswith('```')
)
# If odd number, we're inside an unclosed code block - merge with next
while code_fence_count % 2 == 1 and i + 1 < len(sections):
next_section = sections[i + 1]
# Combine content
combined_content = current.content + "\n\n" + next_section.content
# Update current with combined content
current = LLMsFullSection(
section_title=current.section_title,
section_order=current.section_order,
content=combined_content,
url=current.url,
word_count=len(combined_content.split()),
)
# Move to next section and recount ``` at start of lines
i += 1
code_fence_count = sum(
1 for line in current.content.split('\n')
if line.strip().startswith('```')
)
fixed_sections.append(current)
i += 1
sections = fixed_sections
# Combine consecutive small sections (<200 chars) together
if sections:
combined_sections: list[LLMsFullSection] = []
i = 0
while i < len(sections):
current = sections[i]
combined_content = current.content
# Keep combining while current is small and there are more sections
while len(combined_content) < 200 and i + 1 < len(sections):
i += 1
combined_content = combined_content + "\n\n" + sections[i].content
# Create combined section with first section's metadata
combined = LLMsFullSection(
section_title=current.section_title,
section_order=current.section_order,
content=combined_content,
url=current.url,
word_count=len(combined_content.split()),
)
combined_sections.append(combined)
i += 1
sections = combined_sections
return sections

View File

@@ -0,0 +1,248 @@
"""
Page Storage Operations
Handles the storage of complete documentation pages in the archon_page_metadata table.
Pages are stored BEFORE chunking to maintain full context for agent retrieval.
"""
from typing import Any
from postgrest.exceptions import APIError
from ...config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info
from .helpers.llms_full_parser import parse_llms_full_sections
logger = get_logger(__name__)
class PageStorageOperations:
"""
Handles page storage operations for crawled content.
Pages are stored in the archon_page_metadata table with full content and metadata.
This enables agents to retrieve complete documentation pages instead of just chunks.
"""
def __init__(self, supabase_client):
"""
Initialize page storage operations.
Args:
supabase_client: The Supabase client for database operations
"""
self.supabase_client = supabase_client
async def store_pages(
self,
crawl_results: list[dict],
source_id: str,
request: dict[str, Any],
crawl_type: str,
) -> dict[str, str]:
"""
Store pages in archon_page_metadata table from regular crawl results.
Args:
crawl_results: List of crawled documents with url, markdown, title, etc.
source_id: The source ID these pages belong to
request: The original crawl request with knowledge_type, tags, etc.
crawl_type: Type of crawl performed (sitemap, url, link_collection, etc.)
Returns:
{url: page_id} mapping for FK references in chunks
"""
safe_logfire_info(
f"store_pages called | source_id={source_id} | crawl_type={crawl_type} | num_results={len(crawl_results)}"
)
url_to_page_id: dict[str, str] = {}
pages_to_insert: list[dict[str, Any]] = []
for doc in crawl_results:
url = doc.get("url", "").strip()
markdown = doc.get("markdown", "").strip()
# Skip documents with empty content or missing URLs
if not url or not markdown:
continue
# Prepare page record
word_count = len(markdown.split())
char_count = len(markdown)
page_record = {
"source_id": source_id,
"url": url,
"full_content": markdown,
"section_title": None, # Regular page, not a section
"section_order": 0,
"word_count": word_count,
"char_count": char_count,
"chunk_count": 0, # Will be updated after chunking
"metadata": {
"knowledge_type": request.get("knowledge_type", "documentation"),
"crawl_type": crawl_type,
"page_type": "documentation",
"tags": request.get("tags", []),
},
}
pages_to_insert.append(page_record)
# Batch upsert pages
if pages_to_insert:
try:
safe_logfire_info(
f"Upserting {len(pages_to_insert)} pages into archon_page_metadata table"
)
result = (
self.supabase_client.table("archon_page_metadata")
.upsert(pages_to_insert, on_conflict="url")
.execute()
)
# Build url → page_id mapping
for page in result.data:
url_to_page_id[page["url"]] = page["id"]
safe_logfire_info(
f"Successfully stored {len(url_to_page_id)}/{len(pages_to_insert)} pages in archon_page_metadata"
)
except APIError as e:
safe_logfire_error(
f"Database error upserting pages | source_id={source_id} | attempted={len(pages_to_insert)} | error={str(e)}"
)
logger.error(f"Failed to upsert pages for source {source_id}: {e}", exc_info=True)
# Don't raise - allow chunking to continue even if page storage fails
except Exception as e:
safe_logfire_error(
f"Unexpected error upserting pages | source_id={source_id} | attempted={len(pages_to_insert)} | error={str(e)}"
)
logger.error(f"Unexpected error upserting pages for source {source_id}: {e}", exc_info=True)
# Don't raise - allow chunking to continue
return url_to_page_id
async def store_llms_full_sections(
self,
base_url: str,
content: str,
source_id: str,
request: dict[str, Any],
crawl_type: str = "llms_full",
) -> dict[str, str]:
"""
Store llms-full.txt sections as separate pages.
Each H1 section gets its own page record with a synthetic URL.
Args:
base_url: Base URL of the llms-full.txt file
content: Full text content of the file
source_id: The source ID these sections belong to
request: The original crawl request
crawl_type: Type of crawl (defaults to "llms_full")
Returns:
{url: page_id} mapping for FK references in chunks
"""
url_to_page_id: dict[str, str] = {}
# Parse sections from content
sections = parse_llms_full_sections(content, base_url)
if not sections:
logger.warning(f"No sections found in llms-full.txt file: {base_url}")
return url_to_page_id
safe_logfire_info(
f"Parsed {len(sections)} sections from llms-full.txt file: {base_url}"
)
# Prepare page records for each section
pages_to_insert: list[dict[str, Any]] = []
for section in sections:
page_record = {
"source_id": source_id,
"url": section.url,
"full_content": section.content,
"section_title": section.section_title,
"section_order": section.section_order,
"word_count": section.word_count,
"char_count": len(section.content),
"chunk_count": 0, # Will be updated after chunking
"metadata": {
"knowledge_type": request.get("knowledge_type", "documentation"),
"crawl_type": crawl_type,
"page_type": "llms_full_section",
"tags": request.get("tags", []),
"section_metadata": {
"section_title": section.section_title,
"section_order": section.section_order,
"base_url": base_url,
},
},
}
pages_to_insert.append(page_record)
# Batch upsert pages
if pages_to_insert:
try:
safe_logfire_info(
f"Upserting {len(pages_to_insert)} section pages into archon_page_metadata"
)
result = (
self.supabase_client.table("archon_page_metadata")
.upsert(pages_to_insert, on_conflict="url")
.execute()
)
# Build url → page_id mapping
for page in result.data:
url_to_page_id[page["url"]] = page["id"]
safe_logfire_info(
f"Successfully stored {len(url_to_page_id)}/{len(pages_to_insert)} section pages"
)
except APIError as e:
safe_logfire_error(
f"Database error upserting sections | base_url={base_url} | attempted={len(pages_to_insert)} | error={str(e)}"
)
logger.error(f"Failed to upsert sections for {base_url}: {e}", exc_info=True)
# Don't raise - allow process to continue
except Exception as e:
safe_logfire_error(
f"Unexpected error upserting sections | base_url={base_url} | attempted={len(pages_to_insert)} | error={str(e)}"
)
logger.error(f"Unexpected error upserting sections for {base_url}: {e}", exc_info=True)
# Don't raise - allow process to continue
return url_to_page_id
async def update_page_chunk_count(self, page_id: str, chunk_count: int) -> None:
"""
Update the chunk_count field for a page after chunking is complete.
Args:
page_id: The UUID of the page to update
chunk_count: Number of chunks created from this page
"""
try:
self.supabase_client.table("archon_page_metadata").update(
{"chunk_count": chunk_count}
).eq("id", page_id).execute()
safe_logfire_info(f"Updated chunk_count={chunk_count} for page_id={page_id}")
except APIError as e:
logger.warning(
f"Database error updating chunk_count for page {page_id}: {e}", exc_info=True
)
except Exception as e:
logger.warning(
f"Unexpected error updating chunk_count for page {page_id}: {e}", exc_info=True
)

View File

@@ -234,10 +234,24 @@ class BatchCrawlStrategy:
if result.success and result.markdown and result.markdown.fit_markdown:
# Map back to original URL
original_url = url_mapping.get(result.url, result.url)
# Extract title from HTML <title> tag
title = "Untitled"
if result.html:
import re
title_match = re.search(r'<title[^>]*>(.*?)</title>', result.html, re.IGNORECASE | re.DOTALL)
if title_match:
extracted_title = title_match.group(1).strip()
# Clean up HTML entities
extracted_title = extracted_title.replace('&amp;', '&').replace('&lt;', '<').replace('&gt;', '>').replace('&quot;', '"')
if extracted_title:
title = extracted_title
successful_results.append({
"url": original_url,
"markdown": result.markdown.fit_markdown,
"html": result.html, # Use raw HTML
"title": title,
})
else:
logger.warning(

View File

@@ -179,12 +179,24 @@ class SinglePageCrawlStrategy:
if 'getting-started' in url:
logger.info(f"Markdown sample for getting-started: {markdown_sample}")
# Extract title from HTML <title> tag
title = "Untitled"
if result.html:
import re
title_match = re.search(r'<title[^>]*>(.*?)</title>', result.html, re.IGNORECASE | re.DOTALL)
if title_match:
extracted_title = title_match.group(1).strip()
# Clean up HTML entities
extracted_title = extracted_title.replace('&amp;', '&').replace('&lt;', '<').replace('&gt;', '>').replace('&quot;', '"')
if extracted_title:
title = extracted_title
return {
"success": True,
"url": original_url, # Use original URL for tracking
"markdown": result.markdown,
"html": result.html, # Use raw HTML instead of cleaned_html for code extraction
"title": result.title or "Untitled",
"title": title,
"links": result.links,
"content_length": len(result.markdown)
}

View File

@@ -172,21 +172,91 @@ class RAGService:
use_enhancement=True,
)
async def _group_chunks_by_pages(
self, chunk_results: list[dict[str, Any]], match_count: int
) -> list[dict[str, Any]]:
"""Group chunk results by page_id (if available) or URL and fetch page metadata."""
page_groups: dict[str, dict[str, Any]] = {}
for result in chunk_results:
metadata = result.get("metadata", {})
page_id = metadata.get("page_id")
url = metadata.get("url")
# Use page_id as key if available, otherwise URL
group_key = page_id if page_id else url
if not group_key:
continue
if group_key not in page_groups:
page_groups[group_key] = {
"page_id": page_id,
"url": url,
"chunk_matches": 0,
"total_similarity": 0.0,
"best_chunk_content": result.get("content", ""),
"source_id": metadata.get("source_id"),
}
page_groups[group_key]["chunk_matches"] += 1
page_groups[group_key]["total_similarity"] += result.get("similarity_score", 0.0)
page_results = []
for group_key, data in page_groups.items():
avg_similarity = data["total_similarity"] / data["chunk_matches"]
match_boost = min(0.2, data["chunk_matches"] * 0.02)
aggregate_score = avg_similarity * (1 + match_boost)
# Query page by page_id if available, otherwise by URL
if data["page_id"]:
page_info = (
self.supabase_client.table("archon_page_metadata")
.select("id, url, section_title, word_count")
.eq("id", data["page_id"])
.maybe_single()
.execute()
)
else:
# Regular pages - exact URL match
page_info = (
self.supabase_client.table("archon_page_metadata")
.select("id, url, section_title, word_count")
.eq("url", data["url"])
.maybe_single()
.execute()
)
if page_info and page_info.data is not None:
page_results.append({
"page_id": page_info.data["id"],
"url": page_info.data["url"],
"section_title": page_info.data.get("section_title"),
"word_count": page_info.data.get("word_count", 0),
"chunk_matches": data["chunk_matches"],
"aggregate_similarity": aggregate_score,
"average_similarity": avg_similarity,
"source_id": data["source_id"],
})
page_results.sort(key=lambda x: x["aggregate_similarity"], reverse=True)
return page_results[:match_count]
async def perform_rag_query(
self, query: str, source: str = None, match_count: int = 5
self, query: str, source: str = None, match_count: int = 5, return_mode: str = "chunks"
) -> tuple[bool, dict[str, Any]]:
"""
Perform a comprehensive RAG query that combines all enabled strategies.
Unified RAG query with all strategies.
Pipeline:
1. Start with vector search
2. Apply hybrid search if enabled
3. Apply reranking if enabled
1. Vector/Hybrid Search (based on settings)
2. Reranking (if enabled)
3. Page Grouping (if return_mode="pages")
Args:
query: The search query
source: Optional source domain to filter results
match_count: Maximum number of results to return
return_mode: "chunks" (default) or "pages"
Returns:
Tuple of (success, result_dict)
@@ -256,6 +326,23 @@ class RAGService:
if len(formatted_results) > match_count:
formatted_results = formatted_results[:match_count]
# Step 4: Group by pages if return_mode="pages" AND pages exist
actual_return_mode = return_mode
if return_mode == "pages":
# Check if any chunks have page_id set
has_page_ids = any(
result.get("metadata", {}).get("page_id") is not None
for result in formatted_results
)
if has_page_ids:
# Group by pages when page_ids exist
formatted_results = await self._group_chunks_by_pages(formatted_results, match_count)
else:
# Fall back to chunks when no page_ids (pre-migration data)
actual_return_mode = "chunks"
logger.info("No page_ids found in results, returning chunks instead of pages")
# Build response
response_data = {
"results": formatted_results,
@@ -266,13 +353,15 @@ class RAGService:
"execution_path": "rag_service_pipeline",
"search_mode": "hybrid" if use_hybrid_search else "vector",
"reranking_applied": reranking_applied,
"return_mode": actual_return_mode,
}
span.set_attribute("final_results_count", len(formatted_results))
span.set_attribute("reranking_applied", reranking_applied)
span.set_attribute("return_mode", return_mode)
span.set_attribute("success", True)
logger.info(f"RAG query completed - {len(formatted_results)} results found")
logger.info(f"RAG query completed - {len(formatted_results)} {return_mode} found")
return True, response_data
except Exception as e:

View File

@@ -273,26 +273,22 @@ async def update_source_info(
if original_url:
metadata["original_url"] = original_url
# Update existing source (preserving title)
update_data = {
# Use upsert to handle race conditions
upsert_data = {
"source_id": source_id,
"title": existing_title,
"summary": summary,
"total_word_count": word_count,
"metadata": metadata,
"updated_at": "now()",
}
# Add new fields if provided
if source_url:
update_data["source_url"] = source_url
upsert_data["source_url"] = source_url
if source_display_name:
update_data["source_display_name"] = source_display_name
upsert_data["source_display_name"] = source_display_name
result = (
client.table("archon_sources")
.update(update_data)
.eq("source_id", source_id)
.execute()
)
client.table("archon_sources").upsert(upsert_data).execute()
search_logger.info(
f"Updated source {source_id} while preserving title: {existing_title}"

View File

@@ -100,6 +100,23 @@ class BaseStorageService(ABC):
# Move start position for next chunk
start = end
# Combine consecutive small chunks (<200 chars) together
if chunks:
combined_chunks: list[str] = []
i = 0
while i < len(chunks):
current = chunks[i]
# Keep combining while current is small and there are more chunks
while len(current) < 200 and i + 1 < len(chunks):
i += 1
current = current + "\n\n" + chunks[i]
combined_chunks.append(current)
i += 1
chunks = combined_chunks
return chunks
async def smart_chunk_text_async(

View File

@@ -25,6 +25,7 @@ async def add_documents_to_supabase(
enable_parallel_batches: bool = True,
provider: str | None = None,
cancellation_check: Any | None = None,
url_to_page_id: dict[str, str] | None = None,
) -> dict[str, int]:
"""
Add documents to Supabase with threading optimizations.
@@ -399,6 +400,9 @@ async def add_documents_to_supabase(
search_logger.warning(f"Unsupported embedding dimension {embedding_dim}, using embedding_1536")
embedding_column = "embedding_1536"
# Get page_id for this URL if available
page_id = url_to_page_id.get(batch_urls[j]) if url_to_page_id else None
data = {
"url": batch_urls[j],
"chunk_number": batch_chunk_numbers[j],
@@ -409,6 +413,7 @@ async def add_documents_to_supabase(
"llm_chat_model": llm_chat_model, # Add LLM model tracking
"embedding_model": embedding_model_name, # Add embedding model tracking
"embedding_dimension": embedding_dim, # Add dimension tracking
"page_id": page_id, # Link chunk to page
}
batch_data.append(data)

View File

@@ -0,0 +1,195 @@
"""
Tests for LLMs-full.txt Section Parser
"""
import pytest
from src.server.services.crawling.helpers.llms_full_parser import (
create_section_slug,
create_section_url,
parse_llms_full_sections,
)
def test_create_section_slug():
"""Test slug generation from H1 headings"""
assert create_section_slug("# Core Concepts") == "core-concepts"
assert create_section_slug("# Getting Started!") == "getting-started"
assert create_section_slug("# API Reference (v2)") == "api-reference-v2"
assert create_section_slug("# Hello World") == "hello-world"
assert create_section_slug("# Spaces ") == "spaces"
def test_create_section_url():
"""Test synthetic URL generation with slug anchor"""
base_url = "https://example.com/llms-full.txt"
url = create_section_url(base_url, "# Core Concepts", 0)
assert url == "https://example.com/llms-full.txt#section-0-core-concepts"
url = create_section_url(base_url, "# Getting Started", 1)
assert url == "https://example.com/llms-full.txt#section-1-getting-started"
def test_parse_single_section():
"""Test parsing a single H1 section"""
content = """# Core Concepts
Claude is an AI assistant built by Anthropic.
It can help with various tasks.
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
assert len(sections) == 1
assert sections[0].section_title == "# Core Concepts"
assert sections[0].section_order == 0
assert sections[0].url == "https://example.com/llms-full.txt#section-0-core-concepts"
assert "Claude is an AI assistant" in sections[0].content
assert sections[0].word_count > 0
def test_parse_multiple_sections():
"""Test parsing multiple H1 sections"""
content = """# Core Concepts
Claude is an AI assistant built by Anthropic that can help with various tasks.
It uses advanced language models to understand and respond to queries.
This section provides an overview of the core concepts and capabilities.
# Getting Started
To get started with Claude, you'll need to create an account and obtain API credentials.
Follow the setup instructions and configure your development environment properly.
This will enable you to make your first API calls and start building applications.
# API Reference
The API uses REST principles and supports standard HTTP methods like GET, POST, PUT, and DELETE.
Authentication is handled through API keys that should be kept secure at all times.
Comprehensive documentation is available for all endpoints and response formats.
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
assert len(sections) == 3
assert sections[0].section_title == "# Core Concepts"
assert sections[1].section_title == "# Getting Started"
assert sections[2].section_title == "# API Reference"
assert sections[0].section_order == 0
assert sections[1].section_order == 1
assert sections[2].section_order == 2
assert sections[0].url == "https://example.com/llms-full.txt#section-0-core-concepts"
assert sections[1].url == "https://example.com/llms-full.txt#section-1-getting-started"
assert sections[2].url == "https://example.com/llms-full.txt#section-2-api-reference"
def test_no_h1_headers():
"""Test handling content with no H1 headers"""
content = """This is some documentation.
It has no H1 headers.
Just regular content.
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
assert len(sections) == 1
assert sections[0].section_title == "Full Document"
assert sections[0].url == "https://example.com/llms-full.txt"
assert "This is some documentation" in sections[0].content
def test_h2_not_treated_as_section():
"""Test that H2 headers (##) are not treated as section boundaries"""
content = """# Main Section
This is the main section.
## Subsection
This is a subsection.
## Another Subsection
This is another subsection.
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
assert len(sections) == 1
assert sections[0].section_title == "# Main Section"
assert "## Subsection" in sections[0].content
assert "## Another Subsection" in sections[0].content
def test_empty_sections_skipped():
"""Test that empty sections are skipped"""
content = """# Section 1
This is the first section with enough content to prevent automatic combination.
It contains multiple sentences and provides substantial information for testing purposes.
The section has several lines to ensure it exceeds the minimum character threshold.
#
# Section 2
This is the second section with enough content to prevent automatic combination.
It also contains multiple sentences and provides substantial information for testing.
The section has several lines to ensure it exceeds the minimum character threshold.
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
# Should only have 2 sections (empty one skipped)
assert len(sections) == 2
assert sections[0].section_title == "# Section 1"
assert sections[1].section_title == "# Section 2"
def test_consecutive_h1_headers():
"""Test handling multiple consecutive H1 headers"""
content = """# Section 1
The first section contains enough content to prevent automatic combination with subsequent sections.
It has multiple sentences and provides substantial information for proper testing functionality.
This ensures that the section exceeds the minimum character threshold requirement.
# Section 2
This section also has enough content to prevent automatic combination with the previous section.
It contains multiple sentences and provides substantial information for proper testing.
The content here ensures that the section exceeds the minimum character threshold.
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
# Both sections should be parsed
assert len(sections) == 2
assert sections[0].section_title == "# Section 1"
assert sections[1].section_title == "# Section 2"
assert "The first section contains enough content" in sections[0].content
assert "This section also has enough content" in sections[1].content
def test_word_count_calculation():
"""Test word count calculation for sections"""
content = """# Test Section
This is a test section with exactly ten words here.
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
assert len(sections) == 1
# Word count includes the H1 heading
assert sections[0].word_count > 10
def test_empty_content():
"""Test handling empty content"""
content = ""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
assert len(sections) == 0
def test_whitespace_only_content():
"""Test handling whitespace-only content"""
content = """
"""
base_url = "https://example.com/llms-full.txt"
sections = parse_llms_full_sections(content, base_url)
assert len(sections) == 0

View File

@@ -119,8 +119,8 @@ class TestSourceRaceCondition:
assert "upsert" in methods_called, "Should use upsert for new sources"
assert "insert" not in methods_called, "Should not use insert to avoid race conditions"
def test_existing_source_uses_update(self):
"""Test that existing sources still use UPDATE (not affected by change)."""
def test_existing_source_uses_upsert(self):
"""Test that existing sources use UPSERT to handle race conditions."""
mock_client = Mock()
methods_called = []
@@ -158,9 +158,9 @@ class TestSourceRaceCondition:
))
loop.close()
# Should use update for existing sources
assert "update" in methods_called, "Should use update for existing sources"
assert "upsert" not in methods_called, "Should not use upsert for existing sources"
# Should use upsert for existing sources to handle race conditions
assert "upsert" in methods_called, "Should use upsert for existing sources"
assert "update" not in methods_called, "Should not use update (upsert handles race conditions)"
@pytest.mark.asyncio
async def test_async_concurrent_creation(self):