From 13796abbe8d9a20d22f772ac135ed37e54463fc4 Mon Sep 17 00:00:00 2001 From: leex279 Date: Sun, 19 Oct 2025 15:31:08 +0200 Subject: [PATCH] feat: Improve discovery system with SSRF protection and optimize file detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Backend Improvements ### Discovery Service - Fix SSRF protection: Use requests.Session() for max_redirects parameter - Add comprehensive IP validation (_is_safe_ip, _resolve_and_validate_hostname) - Add hostname DNS resolution validation before requests - Fix llms.txt link following to crawl ALL same-domain pages (not just llms.txt files) - Remove unused file variants: llms.md, llms.markdown, sitemap_index.xml, sitemap-index.xml - Optimize DISCOVERY_PRIORITY based on real-world usage research - Update priority: llms.txt > llms-full.txt > sitemap.xml > robots.txt ### URL Handler - Fix .well-known path to be case-sensitive per RFC 8615 - Remove llms.md, llms.markdown, llms.mdx from variant detection - Simplify link collection patterns to only .txt files (most common) - Update llms_variants list to only include spec-compliant files ### Crawling Service - Add tldextract for proper root domain extraction (handles .co.uk, .com.au, etc.) - Replace naive domain extraction with robust get_root_domain() function - Add tldextract>=5.0.0 to dependencies ## Frontend Improvements ### Type Safety - Extend ActiveOperation type with discovery fields (discovered_file, discovered_file_type, linked_files) - Remove all type casting (operation as any) from CrawlingProgress component - Add proper TypeScript types for discovery information ### Security - Create URL validation utility (urlValidation.ts) - Only render clickable links for validated HTTP/HTTPS URLs - Reject unsafe protocols (javascript:, data:, vbscript:, file:) - Display invalid URLs as plain text instead of links ## Testing - Update test mocks to include history and url attributes for redirect checking - Fix .well-known case sensitivity tests (must be lowercase per RFC 8615) - Update discovery priority tests to match new order - Remove tests for deprecated file variants 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../progress/components/CrawlingProgress.tsx | 61 +- .../src/features/progress/types/progress.ts | 4 + .../features/progress/utils/urlValidation.ts | 50 + python/pyproject.toml | 1 + .../services/crawling/crawling_service.py | 77 +- .../services/crawling/discovery_service.py | 1028 ++++++++--------- .../services/crawling/helpers/url_handler.py | 16 +- python/tests/test_discovery_service.py | 76 +- python/tests/test_url_handler.py | 25 +- python/uv.lock | 29 + 10 files changed, 714 insertions(+), 653 deletions(-) create mode 100644 archon-ui-main/src/features/progress/utils/urlValidation.ts diff --git a/archon-ui-main/src/features/progress/components/CrawlingProgress.tsx b/archon-ui-main/src/features/progress/components/CrawlingProgress.tsx index 7e5f6308..8d274355 100644 --- a/archon-ui-main/src/features/progress/components/CrawlingProgress.tsx +++ b/archon-ui-main/src/features/progress/components/CrawlingProgress.tsx @@ -12,6 +12,7 @@ import { Button } from "../../ui/primitives"; import { cn } from "../../ui/primitives/styles"; import { useCrawlProgressPolling } from "../hooks"; import type { ActiveOperation } from "../types/progress"; +import { isValidHttpUrl } from "../utils/urlValidation"; interface CrawlingProgressProps { onSwitchToBrowse: () => void; @@ -247,45 +248,57 @@ export const CrawlingProgress: React.FC = ({ onSwitchToBr {/* Discovery Information */} - {(operation as any).discovered_file && ( + {operation.discovered_file && (
Discovery Result - {(operation as any).discovered_file_type && ( + {operation.discovered_file_type && ( - {(operation as any).discovered_file_type} + {operation.discovered_file_type} )}
- - {(operation as any).discovered_file} - + {isValidHttpUrl(operation.discovered_file) ? ( + + {operation.discovered_file} + + ) : ( + + {operation.discovered_file} + + )}
)} {/* Linked Files */} - {(operation as any).linked_files && (operation as any).linked_files.length > 0 && ( + {operation.linked_files && operation.linked_files.length > 0 && (
- Following {(operation as any).linked_files.length} Linked File - {(operation as any).linked_files.length > 1 ? "s" : ""} + Following {operation.linked_files.length} Linked File + {operation.linked_files.length > 1 ? "s" : ""}
- {(operation as any).linked_files.map((file: string, idx: number) => ( - - • {file} - + {operation.linked_files.map((file: string, idx: number) => ( + isValidHttpUrl(file) ? ( + + • {file} + + ) : ( + + • {file} + + ) ))}
diff --git a/archon-ui-main/src/features/progress/types/progress.ts b/archon-ui-main/src/features/progress/types/progress.ts index 74cbc5b8..c57426b9 100644 --- a/archon-ui-main/src/features/progress/types/progress.ts +++ b/archon-ui-main/src/features/progress/types/progress.ts @@ -114,6 +114,10 @@ export interface ActiveOperation { code_examples_found?: number; current_operation?: string; }; + // Discovery information + discovered_file?: string; + discovered_file_type?: string; + linked_files?: string[]; } export interface ActiveOperationsResponse { diff --git a/archon-ui-main/src/features/progress/utils/urlValidation.ts b/archon-ui-main/src/features/progress/utils/urlValidation.ts new file mode 100644 index 00000000..6a8d8564 --- /dev/null +++ b/archon-ui-main/src/features/progress/utils/urlValidation.ts @@ -0,0 +1,50 @@ +/** + * Client-side URL validation utility for discovered files. + * Ensures only safe HTTP/HTTPS URLs are rendered as clickable links. + */ + +const SAFE_PROTOCOLS = ["http:", "https:"]; +const UNSAFE_PROTOCOLS = ["javascript:", "data:", "vbscript:", "file:"]; + +/** + * Validates that a URL is safe to render as a clickable link. + * Only allows http: and https: protocols. + * + * @param url - URL string to validate + * @returns true if URL is safe (http/https), false otherwise + */ +export function isValidHttpUrl(url: string | undefined | null): boolean { + if (!url || typeof url !== "string") { + return false; + } + + // Trim whitespace + const trimmed = url.trim(); + if (!trimmed) { + return false; + } + + try { + const parsed = new URL(trimmed); + + // Only allow http and https protocols + if (!SAFE_PROTOCOLS.includes(parsed.protocol)) { + return false; + } + + // Explicitly reject known unsafe protocols + if (UNSAFE_PROTOCOLS.some((unsafe) => trimmed.toLowerCase().startsWith(unsafe))) { + return false; + } + + // Basic hostname validation (must have at least one dot or be localhost) + if (!parsed.hostname.includes(".") && parsed.hostname !== "localhost") { + return false; + } + + return true; + } catch { + // URL parsing failed - not a valid URL + return false; + } +} diff --git a/python/pyproject.toml b/python/pyproject.toml index 2c036d34..ff4cf4ee 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -59,6 +59,7 @@ server = [ "pydantic>=2.0.0", "python-dotenv>=1.0.0", "docker>=6.1.0", + "tldextract>=5.0.0", # Logging "logfire>=0.30.0", # Testing (needed for UI-triggered tests) diff --git a/python/src/server/services/crawling/crawling_service.py b/python/src/server/services/crawling/crawling_service.py index c11a6312..01122704 100644 --- a/python/src/server/services/crawling/crawling_service.py +++ b/python/src/server/services/crawling/crawling_service.py @@ -11,6 +11,8 @@ import uuid from collections.abc import Awaitable, Callable from typing import Any, Optional +import tldextract + from ...config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info from ...utils import get_supabase_client from ...utils.progress.progress_tracker import ProgressTracker @@ -38,6 +40,34 @@ _active_orchestrations: dict[str, "CrawlingService"] = {} _orchestration_lock: asyncio.Lock | None = None +def get_root_domain(host: str) -> str: + """ + Extract the root domain from a hostname using tldextract. + Handles multi-part public suffixes correctly (e.g., .co.uk, .com.au). + + Args: + host: Hostname to extract root domain from + + Returns: + Root domain (domain + suffix) or original host if extraction fails + + Examples: + - "docs.example.com" -> "example.com" + - "api.example.co.uk" -> "example.co.uk" + - "localhost" -> "localhost" + """ + try: + extracted = tldextract.extract(host) + # Return domain.suffix if both are present + if extracted.domain and extracted.suffix: + return f"{extracted.domain}.{extracted.suffix}" + # Fallback to original host if extraction yields no domain or suffix + return host + except Exception: + # If extraction fails, return original host + return host + + def _ensure_orchestration_lock() -> asyncio.Lock: global _orchestration_lock if _orchestration_lock is None: @@ -771,14 +801,7 @@ class CrawlingService: if url_host == base_host: return True - # Check if url_host is a subdomain of base_host - # Extract root domain (last 2 parts for .com, .org, etc.) - def get_root_domain(host: str) -> str: - parts = host.split('.') - if len(parts) >= 2: - return '.'.join(parts[-2:]) - return host - + # Check if url_host is a subdomain of base_host using tldextract url_root = get_root_domain(url_host) base_root = get_root_domain(base_host) @@ -865,51 +888,49 @@ class CrawlingService: is_llms_file = self.url_handler.is_llms_variant(url) if is_llms_file: - logger.info(f"Discovery llms.txt mode: checking for linked llms.txt files at {url}") + logger.info(f"Discovery llms.txt mode: following ALL same-domain links from {url}") # Extract all links from the file extracted_links_with_text = self.url_handler.extract_markdown_links_with_text(content, url) - # Filter for llms.txt files only on same domain - llms_links = [] + # Filter for same-domain links (all types, not just llms.txt) + same_domain_links = [] if extracted_links_with_text: original_domain = request.get("original_domain") if original_domain: for link, text in extracted_links_with_text: - # Check if link is to another llms.txt file - if self.url_handler.is_llms_variant(link): - # Check same domain/subdomain - if self._is_same_domain_or_subdomain(link, original_domain): - llms_links.append((link, text)) - logger.info(f"Found linked llms.txt: {link}") + # Check same domain/subdomain for ALL links + if self._is_same_domain_or_subdomain(link, original_domain): + same_domain_links.append((link, text)) + logger.debug(f"Found same-domain link: {link}") - if llms_links: + if same_domain_links: # Build mapping and extract just URLs - url_to_link_text = dict(llms_links) - extracted_llms_urls = [link for link, _ in llms_links] + url_to_link_text = dict(same_domain_links) + extracted_urls = [link for link, _ in same_domain_links] - logger.info(f"Following {len(extracted_llms_urls)} linked llms.txt files") + logger.info(f"Following {len(extracted_urls)} same-domain links from llms.txt") # Notify user about linked files being crawled await update_crawl_progress( 60, # 60% of crawling stage - f"Found {len(extracted_llms_urls)} linked llms.txt files, crawling them now...", + f"Found {len(extracted_urls)} links in llms.txt, crawling them now...", crawl_type="llms_txt_linked_files", - linked_files=extracted_llms_urls + linked_files=extracted_urls ) - # Crawl linked llms.txt files (no recursion, just one level) + # Crawl all same-domain links from llms.txt (no recursion, just one level) batch_results = await self.crawl_batch_with_progress( - extracted_llms_urls, + extracted_urls, max_concurrent=request.get('max_concurrent'), progress_callback=await self._create_crawl_progress_callback("crawling"), link_text_fallbacks=url_to_link_text, ) - # Combine original llms.txt with linked files + # Combine original llms.txt with linked pages crawl_results.extend(batch_results) - crawl_type = "llms_txt_with_linked_files" - logger.info(f"llms.txt crawling completed: {len(crawl_results)} total files (1 main + {len(batch_results)} linked)") + crawl_type = "llms_txt_with_linked_pages" + logger.info(f"llms.txt crawling completed: {len(crawl_results)} total pages (1 llms.txt + {len(batch_results)} linked pages)") return crawl_results, crawl_type # For non-llms.txt discovery targets (sitemaps, robots.txt), keep single-file mode diff --git a/python/src/server/services/crawling/discovery_service.py b/python/src/server/services/crawling/discovery_service.py index 203b67df..103a2772 100644 --- a/python/src/server/services/crawling/discovery_service.py +++ b/python/src/server/services/crawling/discovery_service.py @@ -5,7 +5,10 @@ Handles automatic discovery and parsing of llms.txt, sitemap.xml, and related fi to enhance crawling capabilities with priority-based discovery methods. """ -from urllib.parse import urljoin +import ipaddress +import socket +from html.parser import HTMLParser +from urllib.parse import urljoin, urlparse import requests @@ -14,6 +17,36 @@ from ...config.logfire_config import get_logger logger = get_logger(__name__) +class SitemapHTMLParser(HTMLParser): + """HTML parser for extracting sitemap references from link and meta tags.""" + + def __init__(self): + super().__init__() + self.sitemaps = [] + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]): + """Handle start tags to find sitemap references.""" + attrs_dict = {k.lower(): v for k, v in attrs if v is not None} + + # Check + if tag == 'link': + rel = attrs_dict.get('rel', '').lower() + # Handle multi-valued rel attributes (space-separated) + rel_values = rel.split() if rel else [] + if 'sitemap' in rel_values: + href = attrs_dict.get('href') + if href: + self.sitemaps.append(('link', href)) + + # Check + elif tag == 'meta': + name = attrs_dict.get('name', '').lower() + if name == 'sitemap': + content = attrs_dict.get('content') + if content: + self.sitemaps.append(('meta', content)) + + class DiscoveryService: """Service for discovering related files automatically during crawls.""" @@ -21,54 +54,460 @@ class DiscoveryService: MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB # Global priority order - select ONE best file from all categories - # All these files contain similar AI/crawling guidance content + # Based on actual usage research - only includes files commonly found in the wild DISCOVERY_PRIORITY = [ # LLMs files (highest priority - most comprehensive AI guidance) - "llms-full.txt", - "llms.txt", - "llms.md", - "llms.mdx", - "llms.markdown", - + "llms.txt", # Standard llms.txt spec - widely adopted + "llms-full.txt", # Part of llms.txt spec - comprehensive content # Sitemap files (structural crawling guidance) - "sitemap_index.xml", - "sitemap-index.xml", - "sitemap.xml", - + "sitemap.xml", # Universal standard for site structure # Robots file (basic crawling rules) - "robots.txt", - - # Well-known variants (alternative locations) + "robots.txt", # Universal standard for crawl directives + # Well-known variants (alternative locations per RFC 8615) ".well-known/ai.txt", ".well-known/llms.txt", ".well-known/sitemap.xml" ] - # Categorized discovery targets for helper methods - # Maintains the same order and values as DISCOVERY_PRIORITY - DISCOVERY_TARGETS = { - "llms_files": [ - "llms-full.txt", - "llms.txt", - "llms.md", - "llms.mdx", - "llms.markdown", - ], - "sitemap_files": [ - "sitemap_index.xml", - "sitemap-index.xml", - "sitemap.xml", - ], - "robots_files": [ - "robots.txt", - ], - "well_known_files": [ - ".well-known/ai.txt", - ".well-known/llms.txt", - ".well-known/sitemap.xml", - ], + # Known file extensions for path detection + FILE_EXTENSIONS = { + '.html', '.htm', '.xml', '.json', '.txt', '.md', '.csv', + '.rss', '.yaml', '.yml', '.pdf', '.zip' } + def discover_files(self, base_url: str) -> str | None: + """ + Main discovery orchestrator - selects ONE best file across all categories. + All files contain similar AI/crawling guidance, so we only need the best one. + + Args: + base_url: Base URL to discover files for + + Returns: + Single best URL found, or None if no files discovered + """ + try: + logger.info(f"Starting single-file discovery for {base_url}") + + # Extract directory path from base URL + base_dir = self._extract_directory(base_url) + + # Try each file in priority order + for filename in self.DISCOVERY_PRIORITY: + discovered_url = self._try_locations(base_url, base_dir, filename) + if discovered_url: + logger.info(f"Discovery found best file: {discovered_url}") + return discovered_url + + # Fallback: Check HTML meta tags for sitemap references + html_sitemaps = self._parse_html_meta_tags(base_url) + if html_sitemaps: + best_file = html_sitemaps[0] + logger.info(f"Discovery found best file from HTML meta tags: {best_file}") + return best_file + + logger.info(f"Discovery completed for {base_url}: no files found") + return None + + except Exception: + logger.exception(f"Unexpected error during discovery for {base_url}") + return None + + def _extract_directory(self, base_url: str) -> str: + """ + Extract directory path from URL, handling both file URLs and directory URLs. + + Args: + base_url: URL to extract directory from + + Returns: + Directory path (without trailing slash) + """ + parsed = urlparse(base_url) + base_path = parsed.path.rstrip('/') + + # Check if last segment is a file (has known extension) + last_segment = base_path.split('/')[-1] if base_path else '' + has_file_extension = any(last_segment.lower().endswith(ext) for ext in self.FILE_EXTENSIONS) + + if has_file_extension: + # Remove filename to get directory + return '/'.join(base_path.split('/')[:-1]) + else: + # Last segment is a directory + return base_path + + def _try_locations(self, base_url: str, base_dir: str, filename: str) -> str | None: + """ + Try different locations for a given filename in priority order. + + Priority: + 1. Same directory as base_url (if not root) + 2. Root level + 3. Common subdirectories (based on file type) + + Args: + base_url: Original base URL + base_dir: Extracted directory path + filename: Filename to search for + + Returns: + URL if file found, None otherwise + """ + parsed = urlparse(base_url) + + # Priority 1: Check same directory (if not root) + if base_dir and base_dir != '/': + same_dir_url = f"{parsed.scheme}://{parsed.netloc}{base_dir}/{filename}" + if self._check_url_exists(same_dir_url): + return same_dir_url + + # Priority 2: Check root level + root_url = urljoin(base_url, filename) + if self._check_url_exists(root_url): + return root_url + + # Priority 3: Check common subdirectories + subdirs = self._get_subdirs_for_file(base_dir, filename) + for subdir in subdirs: + subdir_url = urljoin(base_url, f"{subdir}/{filename}") + if self._check_url_exists(subdir_url): + return subdir_url + + return None + + def _get_subdirs_for_file(self, base_dir: str, filename: str) -> list[str]: + """ + Get relevant subdirectories to check based on file type. + + Args: + base_dir: Base directory path + filename: Filename being searched for + + Returns: + List of subdirectory names to check + """ + subdirs = [] + + # Include base directory name if available + if base_dir and base_dir != '/': + base_dir_name = base_dir.split('/')[-1] + if base_dir_name: + subdirs.append(base_dir_name) + + # Add type-specific subdirectories + if filename.startswith('llms') or filename.endswith('.txt') or filename.endswith('.md'): + # LLMs files commonly in these locations + subdirs.extend(["docs", "static", "public", "assets", "doc", "api"]) + elif filename.endswith('.xml') and not filename.startswith('.well-known'): + # Sitemap files commonly in these locations + subdirs.extend(["docs", "sitemaps", "sitemap", "xml", "feed"]) + + return subdirs + + def _is_safe_ip(self, ip_str: str) -> bool: + """ + Check if an IP address is safe (not private, loopback, link-local, or cloud metadata). + + Args: + ip_str: IP address string to check + + Returns: + True if IP is safe for outbound requests, False otherwise + """ + try: + ip = ipaddress.ip_address(ip_str) + + # Block private networks + if ip.is_private: + logger.warning(f"Blocked private IP address: {ip_str}") + return False + + # Block loopback (127.0.0.0/8, ::1) + if ip.is_loopback: + logger.warning(f"Blocked loopback IP address: {ip_str}") + return False + + # Block link-local (169.254.0.0/16, fe80::/10) + if ip.is_link_local: + logger.warning(f"Blocked link-local IP address: {ip_str}") + return False + + # Block multicast + if ip.is_multicast: + logger.warning(f"Blocked multicast IP address: {ip_str}") + return False + + # Block reserved ranges + if ip.is_reserved: + logger.warning(f"Blocked reserved IP address: {ip_str}") + return False + + # Additional explicit checks for cloud metadata services + # AWS metadata service + if str(ip) == "169.254.169.254": + logger.warning(f"Blocked AWS metadata service IP: {ip_str}") + return False + + # GCP metadata service + if str(ip) == "169.254.169.254": + logger.warning(f"Blocked GCP metadata service IP: {ip_str}") + return False + + return True + + except ValueError: + logger.warning(f"Invalid IP address format: {ip_str}") + return False + + def _resolve_and_validate_hostname(self, hostname: str) -> bool: + """ + Resolve hostname to IP and validate it's safe. + + Args: + hostname: Hostname to resolve and validate + + Returns: + True if hostname resolves to safe IPs only, False otherwise + """ + try: + # Resolve hostname to IP addresses + addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) + + # Check all resolved IPs + for info in addr_info: + ip_str = info[4][0] + if not self._is_safe_ip(ip_str): + logger.warning(f"Hostname {hostname} resolves to unsafe IP {ip_str}") + return False + + return True + + except socket.gaierror as e: + logger.warning(f"DNS resolution failed for {hostname}: {e}") + return False + except Exception as e: + logger.warning(f"Error resolving hostname {hostname}: {e}") + return False + + def _check_url_exists(self, url: str) -> bool: + """ + Check if a URL exists and returns a successful response. + Includes SSRF protection by validating hostnames and blocking private IPs. + + Args: + url: URL to check + + Returns: + True if URL returns 200, False otherwise + """ + try: + # Parse URL to extract hostname + parsed = urlparse(url) + if not parsed.scheme or not parsed.netloc: + logger.warning(f"Invalid URL format: {url}") + return False + + # Only allow HTTP/HTTPS + if parsed.scheme not in ('http', 'https'): + logger.warning(f"Blocked non-HTTP(S) scheme: {parsed.scheme}") + return False + + # Validate initial hostname + hostname = parsed.netloc.split(':')[0] # Remove port if present + if not self._resolve_and_validate_hostname(hostname): + logger.warning(f"URL check blocked due to unsafe hostname: {url}") + return False + + # Set safe User-Agent header + headers = { + 'User-Agent': 'Archon-Discovery/1.0 (SSRF-Protected)' + } + + # Create a session with limited redirects + session = requests.Session() + session.max_redirects = 3 + + # Make request with redirect validation + resp = session.get( + url, + timeout=5, + allow_redirects=True, + verify=True, + headers=headers + ) + + try: + # Check if there were redirects (history attribute exists on real responses) + if hasattr(resp, 'history') and resp.history: + logger.debug(f"URL {url} had {len(resp.history)} redirect(s)") + + # Validate final destination + final_url = resp.url + final_parsed = urlparse(final_url) + + # Only allow HTTP/HTTPS for final destination + if final_parsed.scheme not in ('http', 'https'): + logger.warning(f"Blocked redirect to non-HTTP(S) scheme: {final_parsed.scheme}") + return False + + # Validate final hostname + final_hostname = final_parsed.netloc.split(':')[0] + if not self._resolve_and_validate_hostname(final_hostname): + logger.warning(f"Redirect target blocked due to unsafe hostname: {final_url}") + return False + + # Check response status + success = resp.status_code == 200 + logger.debug(f"URL check: {url} -> {resp.status_code} ({'exists' if success else 'not found'})") + return success + + finally: + if hasattr(resp, 'close'): + resp.close() + + except requests.exceptions.TooManyRedirects: + logger.warning(f"Too many redirects for URL: {url}") + return False + except requests.exceptions.Timeout: + logger.debug(f"Timeout checking URL: {url}") + return False + except requests.exceptions.RequestException as e: + logger.debug(f"Request error checking URL {url}: {e}") + return False + except Exception as e: + logger.warning(f"Unexpected error checking URL {url}: {e}", exc_info=True) + return False + + def _parse_robots_txt(self, base_url: str) -> list[str]: + """ + Extract sitemap URLs from robots.txt. + + Args: + base_url: Base URL to check robots.txt for + + Returns: + List of sitemap URLs found in robots.txt + """ + sitemaps: list[str] = [] + + try: + robots_url = urljoin(base_url, "robots.txt") + logger.info(f"Checking robots.txt at {robots_url}") + + # Set safe User-Agent header + headers = { + 'User-Agent': 'Archon-Discovery/1.0 (SSRF-Protected)' + } + + resp = requests.get(robots_url, timeout=30, stream=True, verify=True, headers=headers) + + try: + if resp.status_code != 200: + logger.info(f"No robots.txt found: HTTP {resp.status_code}") + return sitemaps + + # Read response with size limit + content = self._read_response_with_limit(resp, robots_url) + + # Parse robots.txt content for sitemap directives + for raw_line in content.splitlines(): + line = raw_line.strip() + if line.lower().startswith("sitemap:"): + sitemap_value = line.split(":", 1)[1].strip() + if sitemap_value: + # Allow absolute and relative sitemap values + if sitemap_value.lower().startswith(("http://", "https://")): + sitemap_url = sitemap_value + else: + # Resolve relative path against base_url + sitemap_url = urljoin(base_url, sitemap_value) + + # Validate scheme is HTTP/HTTPS only + parsed = urlparse(sitemap_url) + if parsed.scheme not in ("http", "https"): + logger.warning(f"Skipping non-HTTP(S) sitemap in robots.txt: {sitemap_url}") + continue + + sitemaps.append(sitemap_url) + logger.info(f"Found sitemap in robots.txt: {sitemap_url}") + + finally: + resp.close() + + except requests.exceptions.RequestException: + logger.exception(f"Network error fetching robots.txt from {base_url}") + except ValueError as e: + logger.warning(f"robots.txt too large at {base_url}: {e}") + except Exception: + logger.exception(f"Unexpected error parsing robots.txt from {base_url}") + + return sitemaps + + def _parse_html_meta_tags(self, base_url: str) -> list[str]: + """ + Extract sitemap references from HTML meta tags using proper HTML parsing. + + Args: + base_url: Base URL to check HTML for meta tags + + Returns: + List of sitemap URLs found in HTML meta tags + """ + sitemaps: list[str] = [] + + try: + logger.info(f"Checking HTML meta tags for sitemaps at {base_url}") + + # Set safe User-Agent header + headers = { + 'User-Agent': 'Archon-Discovery/1.0 (SSRF-Protected)' + } + + resp = requests.get(base_url, timeout=30, stream=True, verify=True, headers=headers) + + try: + if resp.status_code != 200: + logger.debug(f"Could not fetch HTML for meta tag parsing: HTTP {resp.status_code}") + return sitemaps + + # Read response with size limit + content = self._read_response_with_limit(resp, base_url) + + # Parse HTML using proper HTML parser + parser = SitemapHTMLParser() + try: + parser.feed(content) + except Exception as e: + logger.warning(f"HTML parsing error for {base_url}: {e}") + return sitemaps + + # Process found sitemaps + for tag_type, url in parser.sitemaps: + # Resolve relative URLs + sitemap_url = urljoin(base_url, url.strip()) + + # Validate scheme is HTTP/HTTPS + parsed = urlparse(sitemap_url) + if parsed.scheme not in ("http", "https"): + logger.debug(f"Skipping non-HTTP(S) sitemap URL: {sitemap_url}") + continue + + sitemaps.append(sitemap_url) + logger.info(f"Found sitemap in HTML {tag_type} tag: {sitemap_url}") + + finally: + resp.close() + + except requests.exceptions.RequestException: + logger.exception(f"Network error fetching HTML from {base_url}") + except ValueError as e: + logger.warning(f"HTML response too large at {base_url}: {e}") + except Exception: + logger.exception(f"Unexpected error parsing HTML meta tags from {base_url}") + + return sitemaps + def _read_response_with_limit(self, response: requests.Response, url: str, max_size: int | None = None) -> str: """ Read response content with size limit to prevent memory exhaustion. @@ -107,7 +546,6 @@ class DiscoveryService: # Decode the complete response content_bytes = b''.join(chunks) - # Try to decode with the response encoding or fall back to utf-8 encoding = response.encoding or 'utf-8' try: return content_bytes.decode(encoding) @@ -116,517 +554,5 @@ class DiscoveryService: return content_bytes.decode('utf-8', errors='replace') except Exception: - # Ensure response is closed on any error response.close() raise - - def discover_files(self, base_url: str) -> str | None: - """ - Main discovery orchestrator - selects ONE best file across all categories. - All files contain similar AI/crawling guidance, so we only need the best one. - - Args: - base_url: Base URL to discover files for - - Returns: - Single best URL found, or None if no files discovered - """ - try: - logger.info(f"Starting single-file discovery for {base_url}") - - # Check files in global priority order - # IMPORTANT: Check root-level llms files BEFORE same-directory sitemaps - # This ensures llms.txt at root is preferred over /docs/sitemap.xml - from urllib.parse import urlparse - - # Get the directory path of the base URL - parsed = urlparse(base_url) - base_path = parsed.path.rstrip('/') - - # Known file extensions - only treat as file if last segment has one of these - FILE_EXTENSIONS = { - '.html', '.htm', '.xml', '.json', '.txt', '.md', '.csv', - '.rss', '.yaml', '.yml', '.pdf', '.zip' - } - - # Extract directory (remove filename if present) - last_segment = base_path.split('/')[-1] if base_path else '' - # Check if the last segment ends with a known file extension - has_file_extension = any(last_segment.lower().endswith(ext) for ext in FILE_EXTENSIONS) - - if has_file_extension: - # Last segment is a file, strip it to get directory - base_dir = '/'.join(base_path.split('/')[:-1]) - else: - # Last segment is a directory (e.g., /docs.v2) - base_dir = base_path - - # Phase 1: Check llms files at ALL priority levels before checking sitemaps - for filename in self.DISCOVERY_PRIORITY: - if not filename.startswith('llms') and not filename.startswith('.well-known/llms') and not filename.startswith('.well-known/ai'): - continue # Skip non-llms files in this phase - - # Priority 1a: Check same directory for llms files - if base_dir and base_dir != '/': - same_dir_url = f"{parsed.scheme}://{parsed.netloc}{base_dir}/{filename}" - if self._check_url_exists(same_dir_url): - logger.info(f"Discovery found best file in same directory: {same_dir_url}") - return same_dir_url - - # Priority 1b: Check root-level for llms files - file_url = urljoin(base_url, filename) - if self._check_url_exists(file_url): - logger.info(f"Discovery found best file at root: {file_url}") - return file_url - - # Priority 1c: Check subdirectories for llms files - subdirs = [] - if base_dir and base_dir != '/': - base_dir_name = base_dir.split('/')[-1] - if base_dir_name: - subdirs.append(base_dir_name) - subdirs.extend(["docs", "static", "public", "assets", "doc", "api"]) - - for subdir in subdirs: - subdir_url = urljoin(base_url, f"{subdir}/{filename}") - if self._check_url_exists(subdir_url): - logger.info(f"Discovery found best file in subdirectory: {subdir_url}") - return subdir_url - - # Phase 2: Check sitemaps and robots.txt (only if no llms files found) - for filename in self.DISCOVERY_PRIORITY: - if filename.startswith('llms') or filename.startswith('.well-known/llms') or filename.startswith('.well-known/ai'): - continue # Skip llms files, already checked - - # Priority 2a: Check same directory - if base_dir and base_dir != '/': - same_dir_url = f"{parsed.scheme}://{parsed.netloc}{base_dir}/{filename}" - if self._check_url_exists(same_dir_url): - logger.info(f"Discovery found best file in same directory: {same_dir_url}") - return same_dir_url - - # Priority 2b: Check root-level - file_url = urljoin(base_url, filename) - if self._check_url_exists(file_url): - logger.info(f"Discovery found best file at root: {file_url}") - return file_url - - # Priority 2c: For sitemap files, check common subdirectories - if filename.endswith('.xml') and not filename.startswith('.well-known'): - subdirs = [] - if base_dir and base_dir != '/': - base_dir_name = base_dir.split('/')[-1] - if base_dir_name: - subdirs.append(base_dir_name) - subdirs.extend(["docs", "sitemaps", "sitemap", "xml", "feed"]) - - for subdir in subdirs: - subdir_url = urljoin(base_url, f"{subdir}/{filename}") - if self._check_url_exists(subdir_url): - logger.info(f"Discovery found best file in subdirectory: {subdir_url}") - return subdir_url - - # Check HTML meta tags for sitemap references as final fallback - html_sitemaps = self._parse_html_meta_tags(base_url) - if html_sitemaps: - best_file = html_sitemaps[0] - logger.info(f"Discovery found best file from HTML meta tags: {best_file}") - return best_file - - logger.info(f"Discovery completed for {base_url}: no files found") - return None - - except Exception: - logger.exception(f"Unexpected error during discovery for {base_url}") - return None - - def _discover_best_sitemap(self, base_url: str) -> str | None: - """ - Discover the best available sitemap using priority-based selection. - - Priority order: - 1. Sitemaps from robots.txt (highest priority - explicitly declared) - 2. Standard locations (sitemap_index.xml > sitemap-index.xml > sitemap.xml) - 3. Common subdirectory variations - 4. HTML meta tag references - 5. .well-known directory - """ - try: - # Priority 1: Check robots.txt for sitemap declarations - robots_sitemaps = self._parse_robots_txt(base_url) - if robots_sitemaps: - return robots_sitemaps[0] # Use first sitemap from robots.txt - - # Priority 2: Check standard locations in priority order - for filename in self.DISCOVERY_TARGETS["sitemap_files"]: - sitemap_url = urljoin(base_url, filename) - if self._check_url_exists(sitemap_url): - return sitemap_url - - # Priority 3: Check common subdirectory variations - subdirs = ["sitemaps", "sitemap", "xml", "feed"] - for subdir in subdirs: - for filename in self.DISCOVERY_TARGETS["sitemap_files"]: - sitemap_url = urljoin(base_url, f"{subdir}/{filename}") - if self._check_url_exists(sitemap_url): - return sitemap_url - - # Priority 4: Check HTML meta tag references - html_sitemaps = self._parse_html_meta_tags(base_url) - if html_sitemaps: - return html_sitemaps[0] # Use first sitemap from HTML - - # Priority 5: Check .well-known directory - well_known_sitemap = urljoin(base_url, ".well-known/sitemap.xml") - if self._check_url_exists(well_known_sitemap): - return well_known_sitemap - - except Exception: - logger.exception(f"Error discovering best sitemap for {base_url}") - - return None - - def _discover_best_llms_file(self, base_url: str) -> str | None: - """ - Discover the best available llms file using priority-based selection. - - Priority order: - 1. Standard locations (llms-full.txt > llms.txt > llms.md > llms.mdx > llms.markdown) - 2. Common subdirectory variations (static, public, docs, assets) - 3. .well-known directory variants - """ - try: - # Priority 1: Check standard root locations in priority order - for filename in self.DISCOVERY_TARGETS["llms_files"]: - llms_url = urljoin(base_url, filename) - if self._check_url_exists(llms_url): - return llms_url - - # Priority 2: Check common subdirectory variations - subdirs = ["static", "public", "docs", "assets", "doc", "api"] - for subdir in subdirs: - for filename in self.DISCOVERY_TARGETS["llms_files"]: - llms_url = urljoin(base_url, f"{subdir}/{filename}") - if self._check_url_exists(llms_url): - return llms_url - - # Priority 3: Check .well-known directory variants - for well_known_file in [".well-known/ai.txt", ".well-known/llms.txt"]: - well_known_url = urljoin(base_url, well_known_file) - if self._check_url_exists(well_known_url): - return well_known_url - - except Exception: - logger.exception(f"Error discovering best llms file for {base_url}") - - return None - - def _discover_robots_file(self, base_url: str) -> str | None: - """ - Discover robots.txt file (always single file at root). - """ - try: - robots_url = urljoin(base_url, "robots.txt") - if self._check_url_exists(robots_url): - return robots_url - except Exception: - logger.exception(f"Error discovering robots file for {base_url}") - - return None - - def _check_url_exists(self, url: str) -> bool: - """ - Check if a URL exists and returns a successful response. - """ - try: - resp = requests.get(url, timeout=5, allow_redirects=True, verify=True) - success = resp.status_code == 200 - logger.debug(f"URL check: {url} -> {resp.status_code} ({'exists' if success else 'not found'})") - resp.close() - return success - except Exception as e: - logger.debug(f"URL check failed: {url} -> {e}") - return False - - def _parse_robots_txt(self, base_url: str) -> list[str]: - """ - Extract sitemap URLs from robots.txt. - - Args: - base_url: Base URL to check robots.txt for - - Returns: - List of sitemap URLs found in robots.txt - """ - sitemaps: list[str] = [] - - try: - # Use robots.txt relative to the given URL, not always root - robots_url = urljoin(base_url, "robots.txt") - logger.info(f"Checking robots.txt at {robots_url}") - - resp = requests.get(robots_url, timeout=30, stream=True, verify=True) - - try: - if resp.status_code != 200: - logger.info(f"No robots.txt found: HTTP {resp.status_code}") - return sitemaps - - # Read response with size limit - content = self._read_response_with_limit(resp, robots_url) - - # Parse robots.txt content for sitemap directives - for raw_line in content.splitlines(): - line = raw_line.strip() - if line.lower().startswith("sitemap:"): - sitemap_value = line.split(":", 1)[1].strip() - if sitemap_value: - # Allow absolute and relative sitemap values - if sitemap_value.lower().startswith(("http://", "https://")): - sitemap_url = sitemap_value - else: - # Resolve relative path against base_url - sitemap_url = urljoin(base_url, sitemap_value) - sitemaps.append(sitemap_url) - logger.info(f"Found sitemap in robots.txt: {sitemap_url}") - - finally: - # Ensure response is always closed - resp.close() - - except requests.exceptions.RequestException: - logger.exception(f"Network error fetching robots.txt from {base_url}") - except ValueError as e: - # Size limit exceeded - logger.warning(f"robots.txt too large at {base_url}: {e}") - except Exception: - logger.exception(f"Unexpected error parsing robots.txt from {base_url}") - - return sitemaps - - def _check_standard_patterns(self, base_url: str) -> dict[str, list[str]]: - """ - Check common file locations for discovery targets. - - Args: - base_url: Base URL to check standard locations for - - Returns: - Dictionary with file types and discovered URLs - """ - discovered: dict[str, list[str]] = { - "sitemaps": [], - "llms_files": [], - "robots_files": [] - } - - try: - # Check all discovery targets at standard locations - all_targets = [] - for target_type, files in self.DISCOVERY_TARGETS.items(): - if target_type != "well_known_files": # Skip well-known, handled separately - for filename in files: - all_targets.append((target_type, filename)) - - for target_type, filename in all_targets: - try: - file_url = urljoin(base_url, filename) - resp = requests.get(file_url, timeout=30, allow_redirects=True, stream=True, verify=True) - - try: - if resp.status_code == 200: - # Map target type to discovery category - if target_type == "sitemap_files": - discovered["sitemaps"].append(file_url) - elif target_type == "llms_files": - discovered["llms_files"].append(file_url) - elif target_type == "robots_files": - discovered["robots_files"].append(file_url) - - logger.info(f"Found {target_type} file: {file_url}") - - finally: - resp.close() - - except requests.exceptions.RequestException: - logger.debug(f"File not found or network error: {filename}") - except Exception: - logger.exception(f"Unexpected error checking {filename}") - - except Exception: - logger.exception(f"Unexpected error in standard pattern checking for {base_url}") - - return discovered - - def _parse_html_meta_tags(self, base_url: str) -> list[str]: - """ - Extract sitemap references from HTML meta tags. - - Args: - base_url: Base URL to check HTML for meta tags - - Returns: - List of sitemap URLs found in HTML meta tags - """ - sitemaps: list[str] = [] - - try: - logger.info(f"Checking HTML meta tags for sitemaps at {base_url}") - resp = requests.get(base_url, timeout=30, stream=True, verify=True) - - try: - if resp.status_code != 200: - logger.debug(f"Could not fetch HTML for meta tag parsing: HTTP {resp.status_code}") - return sitemaps - - # Read response with size limit - content = self._read_response_with_limit(resp, base_url) - - # Look for sitemap meta tags or link elements - import re - from urllib.parse import urlparse - - # Check for (case-insensitive) - sitemap_link_pattern = re.compile( - r']*rel=["\']sitemap["\'][^>]*href=["\']([^"\']+)["\']', - re.IGNORECASE - ) - matches = sitemap_link_pattern.findall(content) - - for match in matches: - sitemap_url = urljoin(base_url, match) - if urlparse(sitemap_url).scheme in ("http", "https"): - sitemaps.append(sitemap_url) - logger.info(f"Found sitemap in HTML link tag: {sitemap_url}") - - # Check for (case-insensitive) - sitemap_meta_pattern = re.compile( - r']*name=["\']sitemap["\'][^>]*content=["\']([^"\']+)["\']', - re.IGNORECASE - ) - matches = sitemap_meta_pattern.findall(content) - - for match in matches: - sitemap_url = urljoin(base_url, match) - if urlparse(sitemap_url).scheme in ("http", "https"): - sitemaps.append(sitemap_url) - logger.info(f"Found sitemap in HTML meta tag: {sitemap_url}") - - finally: - resp.close() - - except requests.exceptions.RequestException: - logger.exception(f"Network error fetching HTML from {base_url}") - except ValueError as e: - # Size limit exceeded - logger.warning(f"HTML response too large at {base_url}: {e}") - except Exception: - logger.exception(f"Unexpected error parsing HTML meta tags from {base_url}") - - return sitemaps - - def _check_well_known_directory(self, base_url: str) -> list[str]: - """ - Check .well-known/* files for discovery targets. - - Args: - base_url: Base URL to check .well-known directory for - - Returns: - List of URLs found in .well-known directory - """ - well_known_files: list[str] = [] - - try: - for filename in self.DISCOVERY_TARGETS["well_known_files"]: - try: - file_url = urljoin(base_url, filename) - resp = requests.get(file_url, timeout=30, allow_redirects=True, stream=True, verify=True) - - try: - if resp.status_code == 200: - well_known_files.append(file_url) - logger.info(f"Found .well-known file: {file_url}") - - finally: - resp.close() - - except requests.exceptions.RequestException: - logger.debug(f"Well-known file not found or network error: {filename}") - except Exception: - logger.exception(f"Unexpected error checking well-known file: {filename}") - - except Exception: - logger.exception(f"Unexpected error checking .well-known directory for {base_url}") - - return well_known_files - - def _try_common_variations(self, base_url: str) -> dict[str, list[str]]: - """ - Try pattern variations for discovery targets. - - Args: - base_url: Base URL to try variations for - - Returns: - Dictionary with file types and discovered variation URLs - """ - discovered: dict[str, list[str]] = { - "sitemaps": [], - "llms_files": [] - } - - try: - # Common subdirectories to check - subdirs = ["public", "static", "assets", "docs", "doc", "api"] - - # Try llms.txt variants in subdirectories - for subdir in subdirs: - for llms_file in self.DISCOVERY_TARGETS["llms_files"]: - try: - file_url = urljoin(base_url, f"{subdir}/{llms_file}") - resp = requests.get(file_url, timeout=30, allow_redirects=True, stream=True, verify=True) - - try: - if resp.status_code == 200: - discovered["llms_files"].append(file_url) - logger.info(f"Found llms file variant: {file_url}") - - finally: - resp.close() - - except requests.exceptions.RequestException: - logger.debug(f"Variant not found: {subdir}/{llms_file}") - except Exception: - logger.exception(f"Error checking variant: {subdir}/{llms_file}") - - # Try sitemap variants with different paths - sitemap_paths = [ - "sitemaps/sitemap.xml", - "sitemap/sitemap.xml", - "xml/sitemap.xml", - "feed/sitemap.xml" - ] - - for sitemap_path in sitemap_paths: - try: - file_url = urljoin(base_url, sitemap_path) - resp = requests.get(file_url, timeout=30, allow_redirects=True, stream=True, verify=True) - - try: - if resp.status_code == 200: - discovered["sitemaps"].append(file_url) - logger.info(f"Found sitemap variant: {file_url}") - - finally: - resp.close() - - except requests.exceptions.RequestException: - logger.debug(f"Sitemap variant not found: {sitemap_path}") - except Exception: - logger.exception(f"Error checking sitemap variant: {sitemap_path}") - - except Exception: - logger.exception(f"Unexpected error trying common variations for {base_url}") - - return discovered diff --git a/python/src/server/services/crawling/helpers/url_handler.py b/python/src/server/services/crawling/helpers/url_handler.py index ac8513fe..f243c2ab 100644 --- a/python/src/server/services/crawling/helpers/url_handler.py +++ b/python/src/server/services/crawling/helpers/url_handler.py @@ -405,13 +405,10 @@ class URLHandler: # Check for specific link collection filenames # Note: "full-*" or "*-full" patterns are NOT link collections - they contain complete content, not just links + # Only includes commonly used formats found in the wild link_collection_patterns = [ # .txt variants - files that typically contain lists of links 'llms.txt', 'links.txt', 'resources.txt', 'references.txt', - # .md/.mdx/.markdown variants - 'llms.md', 'links.md', 'resources.md', 'references.md', - 'llms.mdx', 'links.mdx', 'resources.mdx', 'references.mdx', - 'llms.markdown', 'links.markdown', 'resources.markdown', 'references.markdown', ] # Direct filename match @@ -421,7 +418,7 @@ class URLHandler: # Pattern-based detection for variations, but exclude "full" variants # Only match files that are likely link collections, not complete content files - if filename.endswith(('.txt', '.md', '.mdx', '.markdown')): + if filename.endswith('.txt'): # Exclude files with "full" as standalone token (avoid false positives like "helpful.md") import re if not re.search(r'(^|[._-])full([._-]|$)', filename): @@ -650,8 +647,8 @@ class URLHandler: path = parsed.path.lower() filename = path.split('/')[-1] if '/' in path else path - # Check for exact llms file variants (llms.txt, llms.md, etc.) - llms_variants = ['llms-full.txt', 'llms.txt', 'llms.md', 'llms.mdx', 'llms.markdown'] + # Check for exact llms file variants (only standard spec files) + llms_variants = ['llms.txt', 'llms-full.txt'] if filename in llms_variants: return True @@ -668,6 +665,7 @@ class URLHandler: def is_well_known_file(url: str) -> bool: """ Check if a URL is a .well-known/* file with error handling. + Per RFC 8615, the path is case-sensitive and must be lowercase. Args: url: URL to check @@ -677,8 +675,8 @@ class URLHandler: """ try: parsed = urlparse(url) - # Normalize to lowercase and ignore query/fragment - path = parsed.path.lower() + # RFC 8615: path segments are case-sensitive, must be lowercase + path = parsed.path # Only detect .well-known files at root level return path.startswith('/.well-known/') and path.count('/.well-known/') == 1 except Exception as e: diff --git a/python/tests/test_discovery_service.py b/python/tests/test_discovery_service.py index 9531946d..e7de6170 100644 --- a/python/tests/test_discovery_service.py +++ b/python/tests/test_discovery_service.py @@ -1,15 +1,26 @@ """Unit tests for DiscoveryService class.""" +import socket from unittest.mock import Mock, patch from src.server.services.crawling.discovery_service import DiscoveryService +def create_mock_dns_response(): + """Create mock DNS response for safe public IPs.""" + # Return a safe public IP for testing + return [ + (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 0)) # example.com's actual IP + ] + + def create_mock_response(status_code: int, text: str = "") -> Mock: """Create a mock response object that supports streaming API.""" response = Mock() response.status_code = status_code response.text = text response.encoding = 'utf-8' + response.history = [] # Empty list for no redirects + response.url = "" # Mock URL for redirect checks # Mock iter_content to yield text in chunks as bytes text_bytes = text.encode('utf-8') @@ -28,8 +39,9 @@ def create_mock_response(status_code: int, text: str = "") -> Mock: class TestDiscoveryService: """Test suite for DiscoveryService class.""" + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_discover_files_basic(self, mock_get): + def test_discover_files_basic(self, mock_get, mock_dns): """Test main discovery method returns single best file.""" service = DiscoveryService() base_url = "https://example.com" @@ -56,8 +68,9 @@ class TestDiscoveryService: assert isinstance(result, str) assert result == 'https://example.com/llms.txt' + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_discover_files_no_files_found(self, mock_get): + def test_discover_files_no_files_found(self, mock_get, mock_dns): """Test discovery when no files are found.""" service = DiscoveryService() base_url = "https://example.com" @@ -70,8 +83,9 @@ class TestDiscoveryService: # Should return None when no files found assert result is None + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_discover_files_priority_order(self, mock_get): + def test_discover_files_priority_order(self, mock_get, mock_dns): """Test that discovery follows the correct priority order.""" service = DiscoveryService() base_url = "https://example.com" @@ -95,8 +109,9 @@ class TestDiscoveryService: # Should return llms.txt since it has higher priority than sitemap.xml assert result == 'https://example.com/llms.txt' + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_discover_files_robots_sitemap_priority(self, mock_get): + def test_discover_files_robots_sitemap_priority(self, mock_get, mock_dns): """Test that llms files have priority over robots.txt sitemap declarations.""" service = DiscoveryService() base_url = "https://example.com" @@ -121,8 +136,9 @@ class TestDiscoveryService: # even when sitemaps are declared in robots.txt assert result == 'https://example.com/llms-full.txt' + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_discover_files_subdirectory_fallback(self, mock_get): + def test_discover_files_subdirectory_fallback(self, mock_get, mock_dns): """Test discovery falls back to subdirectories for llms files.""" service = DiscoveryService() base_url = "https://example.com" @@ -146,8 +162,9 @@ class TestDiscoveryService: # Should find the file in static subdirectory assert result == 'https://example.com/static/llms.txt' + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_check_url_exists(self, mock_get): + def test_check_url_exists(self, mock_get, mock_dns): """Test URL existence checking.""" service = DiscoveryService() @@ -163,8 +180,9 @@ class TestDiscoveryService: mock_get.side_effect = Exception("Network error") assert service._check_url_exists("https://example.com/error") is False + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_parse_robots_txt_with_sitemap(self, mock_get): + def test_parse_robots_txt_with_sitemap(self, mock_get, mock_dns): """Test robots.txt parsing with sitemap directives.""" service = DiscoveryService() @@ -180,10 +198,11 @@ Sitemap: https://example.com/sitemap-news.xml""" assert len(result) == 2 assert "https://example.com/sitemap.xml" in result assert "https://example.com/sitemap-news.xml" in result - mock_get.assert_called_once_with("https://example.com/robots.txt", timeout=30, stream=True, verify=True) + mock_get.assert_called_once_with("https://example.com/robots.txt", timeout=30, stream=True, verify=True, headers={'User-Agent': 'Archon-Discovery/1.0 (SSRF-Protected)'}) + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_parse_robots_txt_no_sitemap(self, mock_get): + def test_parse_robots_txt_no_sitemap(self, mock_get, mock_dns): """Test robots.txt parsing without sitemap directives.""" service = DiscoveryService() @@ -196,10 +215,11 @@ Allow: /public/""" result = service._parse_robots_txt("https://example.com") assert len(result) == 0 - mock_get.assert_called_once_with("https://example.com/robots.txt", timeout=30, stream=True, verify=True) + mock_get.assert_called_once_with("https://example.com/robots.txt", timeout=30, stream=True, verify=True, headers={'User-Agent': 'Archon-Discovery/1.0 (SSRF-Protected)'}) + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_parse_html_meta_tags(self, mock_get): + def test_parse_html_meta_tags(self, mock_get, mock_dns): """Test HTML meta tag parsing for sitemaps.""" service = DiscoveryService() @@ -220,10 +240,11 @@ Allow: /public/""" # Should find sitemaps from both link and meta tags assert len(result) >= 1 assert any('sitemap' in url.lower() for url in result) - mock_get.assert_called_once_with("https://example.com", timeout=30, stream=True, verify=True) + mock_get.assert_called_once_with("https://example.com", timeout=30, stream=True, verify=True, headers={'User-Agent': 'Archon-Discovery/1.0 (SSRF-Protected)'}) + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_discovery_priority_behavior(self, mock_get): + def test_discovery_priority_behavior(self, mock_get, mock_dns): """Test that discovery returns highest-priority file when multiple files exist.""" service = DiscoveryService() base_url = "https://example.com" @@ -231,48 +252,48 @@ Allow: /public/""" # Mock robots.txt response (no sitemaps declared) robots_response = create_mock_response(200, "User-agent: *\nDisallow: /admin/") - # Scenario 1: All files exist - should return llms-full.txt (highest priority) + # Scenario 1: All files exist - should return llms.txt (highest priority) def mock_all_exist(url, **kwargs): if url.endswith('robots.txt'): return robots_response - elif any(file in url for file in ['llms-full.txt', 'llms.txt', 'llms.md', 'sitemap.xml', 'sitemap_index.xml']): + elif any(file in url for file in ['llms.txt', 'llms-full.txt', 'sitemap.xml']): return create_mock_response(200) else: return create_mock_response(404) mock_get.side_effect = mock_all_exist result = service.discover_files(base_url) - assert result == 'https://example.com/llms-full.txt', "Should return llms-full.txt when all files exist" + assert result == 'https://example.com/llms.txt', "Should return llms.txt when all files exist (highest priority)" - # Scenario 2: llms-full.txt missing, others exist - should return llms.txt - def mock_without_full(url, **kwargs): + # Scenario 2: llms.txt missing, others exist - should return llms-full.txt + def mock_without_txt(url, **kwargs): if url.endswith('robots.txt'): return robots_response - elif url.endswith('llms-full.txt'): + elif url.endswith('llms.txt'): return create_mock_response(404) - elif any(file in url for file in ['llms.txt', 'llms.md', 'sitemap.xml']): + elif any(file in url for file in ['llms-full.txt', 'sitemap.xml']): return create_mock_response(200) else: return create_mock_response(404) - mock_get.side_effect = mock_without_full + mock_get.side_effect = mock_without_txt result = service.discover_files(base_url) - assert result == 'https://example.com/llms.txt', "Should return llms.txt when llms-full.txt is missing" + assert result == 'https://example.com/llms-full.txt', "Should return llms-full.txt when llms.txt is missing" - # Scenario 3: Only sitemap files exist - should return sitemap_index.xml over sitemap.xml + # Scenario 3: Only sitemap files exist - should return sitemap.xml def mock_only_sitemaps(url, **kwargs): if url.endswith('robots.txt'): return robots_response - elif any(file in url for file in ['llms-full.txt', 'llms.txt', 'llms.md']): + elif any(file in url for file in ['llms.txt', 'llms-full.txt']): return create_mock_response(404) - elif any(file in url for file in ['sitemap_index.xml', 'sitemap.xml']): + elif url.endswith('sitemap.xml'): return create_mock_response(200) else: return create_mock_response(404) mock_get.side_effect = mock_only_sitemaps result = service.discover_files(base_url) - assert result == 'https://example.com/sitemap_index.xml', "Should return sitemap_index.xml when llms files are missing" + assert result == 'https://example.com/sitemap.xml', "Should return sitemap.xml when llms files are missing" # Scenario 4: llms files have priority over sitemap files def mock_llms_and_sitemap(url, **kwargs): @@ -287,8 +308,9 @@ Allow: /public/""" result = service.discover_files(base_url) assert result == 'https://example.com/llms.txt', "Should prefer llms.txt over sitemap.xml" + @patch('socket.getaddrinfo', return_value=create_mock_dns_response()) @patch('requests.get') - def test_network_error_handling(self, mock_get): + def test_network_error_handling(self, mock_get, mock_dns): """Test error scenarios with network failures.""" service = DiscoveryService() diff --git a/python/tests/test_url_handler.py b/python/tests/test_url_handler.py index e466239f..e268bd50 100644 --- a/python/tests/test_url_handler.py +++ b/python/tests/test_url_handler.py @@ -155,24 +155,21 @@ class TestURLHandler: """Test llms file variant detection.""" handler = URLHandler() - # All llms variants - assert handler.is_llms_variant("https://example.com/llms-full.txt") is True + # Standard llms.txt spec variants (only txt files) assert handler.is_llms_variant("https://example.com/llms.txt") is True - assert handler.is_llms_variant("https://example.com/llms.md") is True - assert handler.is_llms_variant("https://example.com/llms.mdx") is True - assert handler.is_llms_variant("https://example.com/llms.markdown") is True - + assert handler.is_llms_variant("https://example.com/llms-full.txt") is True + # Case sensitivity assert handler.is_llms_variant("https://example.com/LLMS.TXT") is True - assert handler.is_llms_variant("https://example.com/Llms.Md") is True - + assert handler.is_llms_variant("https://example.com/LLMS-FULL.TXT") is True + # With paths (should still detect) assert handler.is_llms_variant("https://example.com/docs/llms.txt") is True - assert handler.is_llms_variant("https://example.com/public/llms.md") is True - + assert handler.is_llms_variant("https://example.com/public/llms-full.txt") is True + # With query parameters assert handler.is_llms_variant("https://example.com/llms.txt?version=1") is True - assert handler.is_llms_variant("https://example.com/llms.md#section") is True + assert handler.is_llms_variant("https://example.com/llms-full.txt#section") is True # Not llms files assert handler.is_llms_variant("https://example.com/llms") is False @@ -193,9 +190,9 @@ class TestURLHandler: assert handler.is_well_known_file("https://example.com/.well-known/security.txt") is True assert handler.is_well_known_file("https://example.com/.well-known/change-password") is True - # Case sensitivity (path should be case sensitive) - assert handler.is_well_known_file("https://example.com/.WELL-KNOWN/ai.txt") is True - assert handler.is_well_known_file("https://example.com/.Well-Known/ai.txt") is True + # Case sensitivity - RFC 8615 requires lowercase .well-known + assert handler.is_well_known_file("https://example.com/.WELL-KNOWN/ai.txt") is False + assert handler.is_well_known_file("https://example.com/.Well-Known/ai.txt") is False # With query parameters assert handler.is_well_known_file("https://example.com/.well-known/ai.txt?v=1") is True diff --git a/python/uv.lock b/python/uv.lock index 274564d2..f8f82b01 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -247,6 +247,7 @@ server = [ { name = "python-multipart" }, { name = "slowapi" }, { name = "supabase" }, + { name = "tldextract" }, { name = "uvicorn" }, { name = "watchfiles" }, ] @@ -342,6 +343,7 @@ server = [ { name = "python-multipart", specifier = ">=0.0.20" }, { name = "slowapi", specifier = ">=0.1.9" }, { name = "supabase", specifier = "==2.15.1" }, + { name = "tldextract", specifier = ">=5.0.0" }, { name = "uvicorn", specifier = ">=0.24.0" }, { name = "watchfiles", specifier = ">=0.18" }, ] @@ -2601,6 +2603,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f9/9b/335f9764261e915ed497fcdeb11df5dfd6f7bf257d4a6a2a686d80da4d54/requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6", size = 64928 }, ] +[[package]] +name = "requests-file" +version = "3.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fe/5e/2aca791207e542a16a8cc91fd0e19f5c26f4dff030ee3062deb5606f84ae/requests_file-3.0.0.tar.gz", hash = "sha256:68789589cfde7098e8933fe3e69bbd864f7f0c22f118937b424d94d0e1b7760f", size = 6897 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e4/85/689c218feb21a66919bd667969d4ed60a64db67f6ea5ceb00c9795ae19b0/requests_file-3.0.0-py2.py3-none-any.whl", hash = "sha256:aca222ec94a19310be2a0ed6bdcdebb09058b0f6c3e984af56361c8fca59653c", size = 4486 }, +] + [[package]] name = "rich" version = "14.0.0" @@ -3086,6 +3100,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/de/a8/8f499c179ec900783ffe133e9aab10044481679bb9aad78436d239eee716/tiktoken-0.9.0-cp313-cp313-win_amd64.whl", hash = "sha256:5ea0edb6f83dc56d794723286215918c1cde03712cbbafa0348b33448faf5b95", size = 894669 }, ] +[[package]] +name = "tldextract" +version = "5.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "filelock" }, + { name = "idna" }, + { name = "requests" }, + { name = "requests-file" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/97/78/182641ea38e3cfd56e9c7b3c0d48a53d432eea755003aa544af96403d4ac/tldextract-5.3.0.tar.gz", hash = "sha256:b3d2b70a1594a0ecfa6967d57251527d58e00bb5a91a74387baa0d87a0678609", size = 128502 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/67/7c/ea488ef48f2f544566947ced88541bc45fae9e0e422b2edbf165ee07da99/tldextract-5.3.0-py3-none-any.whl", hash = "sha256:f70f31d10b55c83993f55e91ecb7c5d84532a8972f22ec578ecfbe5ea2292db2", size = 107384 }, +] + [[package]] name = "tokenizers" version = "0.21.1"