mirror of
https://github.com/coleam00/Archon.git
synced 2026-01-01 04:09:08 -05:00
- Fixing the crawl errors for large crawled files like the NUXT docs.
- Removing the "Completed" steps in reporting. - Cleanup Sockets from PR 250 and 395 with Code Rabbit cleanup suggestions.
This commit is contained in:
@@ -47,10 +47,10 @@ describe('API Configuration', () => {
|
|||||||
delete (import.meta.env as any).VITE_API_URL;
|
delete (import.meta.env as any).VITE_API_URL;
|
||||||
delete (import.meta.env as any).ARCHON_SERVER_PORT;
|
delete (import.meta.env as any).ARCHON_SERVER_PORT;
|
||||||
|
|
||||||
const { getApiUrl } = await import('../../src/config/api');
|
// The error will be thrown during module import because API_FULL_URL calls getApiUrl()
|
||||||
|
await expect(async () => {
|
||||||
expect(() => getApiUrl()).toThrow('ARCHON_SERVER_PORT environment variable is required');
|
await import('../../src/config/api');
|
||||||
expect(() => getApiUrl()).toThrow('Default value: 8181');
|
}).rejects.toThrow('ARCHON_SERVER_PORT environment variable is required');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should use ARCHON_SERVER_PORT when set in development', async () => {
|
it('should use ARCHON_SERVER_PORT when set in development', async () => {
|
||||||
@@ -156,73 +156,4 @@ describe('API Configuration', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('MCP Client Service Configuration', () => {
|
// MCP Client Service Configuration tests removed - service not currently in use
|
||||||
let originalEnv: any;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
originalEnv = { ...import.meta.env };
|
|
||||||
vi.resetModules();
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
Object.keys(import.meta.env).forEach(key => {
|
|
||||||
delete (import.meta.env as any)[key];
|
|
||||||
});
|
|
||||||
Object.assign(import.meta.env, originalEnv);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw error when ARCHON_MCP_PORT is not set', async () => {
|
|
||||||
delete (import.meta.env as any).ARCHON_MCP_PORT;
|
|
||||||
|
|
||||||
const { MCPClientService } = await import('../../src/services/mcpClientService');
|
|
||||||
const service = new MCPClientService();
|
|
||||||
|
|
||||||
await expect(service.createArchonClient()).rejects.toThrow('ARCHON_MCP_PORT environment variable is required');
|
|
||||||
await expect(service.createArchonClient()).rejects.toThrow('Default value: 8051');
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should use ARCHON_MCP_PORT when set', async () => {
|
|
||||||
(import.meta.env as any).ARCHON_MCP_PORT = '9051';
|
|
||||||
(import.meta.env as any).ARCHON_SERVER_PORT = '8181';
|
|
||||||
|
|
||||||
// Mock window.location
|
|
||||||
Object.defineProperty(window, 'location', {
|
|
||||||
value: {
|
|
||||||
protocol: 'http:',
|
|
||||||
hostname: 'localhost'
|
|
||||||
},
|
|
||||||
writable: true
|
|
||||||
});
|
|
||||||
|
|
||||||
// Mock the API call
|
|
||||||
global.fetch = vi.fn().mockResolvedValue({
|
|
||||||
ok: true,
|
|
||||||
json: async () => ({
|
|
||||||
id: 'test-id',
|
|
||||||
name: 'Archon',
|
|
||||||
transport_type: 'http',
|
|
||||||
connection_status: 'connected'
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
const { MCPClientService } = await import('../../src/services/mcpClientService');
|
|
||||||
const service = new MCPClientService();
|
|
||||||
|
|
||||||
try {
|
|
||||||
await service.createArchonClient();
|
|
||||||
|
|
||||||
// Verify the fetch was called with the correct URL
|
|
||||||
expect(global.fetch).toHaveBeenCalledWith(
|
|
||||||
expect.stringContaining('/api/mcp/clients'),
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'POST',
|
|
||||||
body: expect.stringContaining('9051')
|
|
||||||
})
|
|
||||||
);
|
|
||||||
} catch (error) {
|
|
||||||
// If it fails due to actual API call, that's okay for this test
|
|
||||||
// We're mainly testing that it constructs the URL correctly
|
|
||||||
expect(error).toBeDefined();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@@ -211,14 +211,21 @@ class CodeExtractionService:
|
|||||||
Returns:
|
Returns:
|
||||||
List of code blocks with metadata
|
List of code blocks with metadata
|
||||||
"""
|
"""
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
# Progress will be reported during the loop below
|
# Progress will be reported during the loop below
|
||||||
|
|
||||||
all_code_blocks = []
|
all_code_blocks = []
|
||||||
total_docs = len(crawl_results)
|
total_docs = len(crawl_results)
|
||||||
completed_docs = 0
|
completed_docs = 0
|
||||||
|
|
||||||
|
# PERFORMANCE: Track extraction time per document
|
||||||
|
MAX_EXTRACTION_TIME_PER_DOC = 5.0 # 5 seconds max per document
|
||||||
|
|
||||||
for doc in crawl_results:
|
for doc in crawl_results:
|
||||||
try:
|
try:
|
||||||
|
doc_start_time = time.time()
|
||||||
source_url = doc["url"]
|
source_url = doc["url"]
|
||||||
html_content = doc.get("html", "")
|
html_content = doc.get("html", "")
|
||||||
md = doc.get("markdown", "")
|
md = doc.get("markdown", "")
|
||||||
@@ -228,9 +235,7 @@ class CodeExtractionService:
|
|||||||
f"Document content check | url={source_url} | has_html={bool(html_content)} | has_markdown={bool(md)} | html_len={len(html_content) if html_content else 0} | md_len={len(md) if md else 0}"
|
f"Document content check | url={source_url} | has_html={bool(html_content)} | has_markdown={bool(md)} | html_len={len(html_content) if html_content else 0} | md_len={len(md) if md else 0}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get dynamic minimum length based on document context
|
# Dynamic minimum length is handled inside the extraction methods
|
||||||
# Extract some context from the document for analysis
|
|
||||||
doc_context = md[:1000] if md else html_content[:1000] if html_content else ""
|
|
||||||
|
|
||||||
# Check markdown first to see if it has code blocks
|
# Check markdown first to see if it has code blocks
|
||||||
if md:
|
if md:
|
||||||
@@ -281,15 +286,32 @@ class CodeExtractionService:
|
|||||||
|
|
||||||
# If not a text file or no code blocks found, try HTML extraction first
|
# If not a text file or no code blocks found, try HTML extraction first
|
||||||
if len(code_blocks) == 0 and html_content and not is_text_file:
|
if len(code_blocks) == 0 and html_content and not is_text_file:
|
||||||
safe_logfire_info(
|
# PERFORMANCE: Check if we've already spent too much time on this document
|
||||||
f"Trying HTML extraction first | url={source_url} | html_length={len(html_content)}"
|
elapsed_time = time.time() - doc_start_time
|
||||||
)
|
if elapsed_time > MAX_EXTRACTION_TIME_PER_DOC:
|
||||||
html_code_blocks = await self._extract_html_code_blocks(html_content)
|
|
||||||
if html_code_blocks:
|
|
||||||
code_blocks = html_code_blocks
|
|
||||||
safe_logfire_info(
|
safe_logfire_info(
|
||||||
f"Found {len(code_blocks)} code blocks from HTML | url={source_url}"
|
f"⏱️ Skipping HTML extraction for {source_url} - already spent {elapsed_time:.1f}s"
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
safe_logfire_info(
|
||||||
|
f"Trying HTML extraction first | url={source_url} | html_length={len(html_content)}"
|
||||||
|
)
|
||||||
|
# Create a timeout for HTML extraction
|
||||||
|
remaining_time = MAX_EXTRACTION_TIME_PER_DOC - elapsed_time
|
||||||
|
try:
|
||||||
|
html_code_blocks = await asyncio.wait_for(
|
||||||
|
self._extract_html_code_blocks(html_content, source_url),
|
||||||
|
timeout=remaining_time
|
||||||
|
)
|
||||||
|
if html_code_blocks:
|
||||||
|
code_blocks = html_code_blocks
|
||||||
|
safe_logfire_info(
|
||||||
|
f"Found {len(code_blocks)} code blocks from HTML | url={source_url}"
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
safe_logfire_info(
|
||||||
|
f"⏱️ HTML extraction timed out after {remaining_time:.1f}s for {source_url}"
|
||||||
|
)
|
||||||
|
|
||||||
# If still no code blocks, try markdown extraction as fallback
|
# If still no code blocks, try markdown extraction as fallback
|
||||||
if len(code_blocks) == 0 and md and "```" in md:
|
if len(code_blocks) == 0 and md and "```" in md:
|
||||||
@@ -319,6 +341,14 @@ class CodeExtractionService:
|
|||||||
|
|
||||||
# Update progress only after completing document extraction
|
# Update progress only after completing document extraction
|
||||||
completed_docs += 1
|
completed_docs += 1
|
||||||
|
extraction_time = time.time() - doc_start_time
|
||||||
|
if extraction_time > 2.0: # Log slow extractions
|
||||||
|
safe_logfire_info(
|
||||||
|
f"⏱️ Document extraction took {extraction_time:.1f}s | url={source_url} | "
|
||||||
|
f"html_size={len(html_content) if html_content else 0} | "
|
||||||
|
f"blocks_found={len([b for b in all_code_blocks if b['source_url'] == source_url])}"
|
||||||
|
)
|
||||||
|
|
||||||
if progress_callback and total_docs > 0:
|
if progress_callback and total_docs > 0:
|
||||||
# Calculate progress within the specified range
|
# Calculate progress within the specified range
|
||||||
raw_progress = completed_docs / total_docs
|
raw_progress = completed_docs / total_docs
|
||||||
@@ -340,13 +370,14 @@ class CodeExtractionService:
|
|||||||
|
|
||||||
return all_code_blocks
|
return all_code_blocks
|
||||||
|
|
||||||
async def _extract_html_code_blocks(self, content: str) -> list[dict[str, Any]]:
|
async def _extract_html_code_blocks(self, content: str, source_url: str = "") -> list[dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Extract code blocks from HTML patterns in content.
|
Extract code blocks from HTML patterns in content.
|
||||||
This is a fallback when markdown conversion didn't preserve code blocks.
|
This is a fallback when markdown conversion didn't preserve code blocks.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
content: The content to search for HTML code patterns
|
content: The content to search for HTML code patterns
|
||||||
|
source_url: The URL of the document being processed
|
||||||
min_length: Minimum length for code blocks
|
min_length: Minimum length for code blocks
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -356,6 +387,20 @@ class CodeExtractionService:
|
|||||||
|
|
||||||
# Add detailed logging
|
# Add detailed logging
|
||||||
safe_logfire_info(f"Processing HTML of length {len(content)} for code extraction")
|
safe_logfire_info(f"Processing HTML of length {len(content)} for code extraction")
|
||||||
|
|
||||||
|
# PERFORMANCE OPTIMIZATION: Skip extremely large HTML files or chunk them
|
||||||
|
MAX_HTML_SIZE = 1_000_000 # 1MB limit for single-pass processing (increased from 500KB)
|
||||||
|
if len(content) > MAX_HTML_SIZE:
|
||||||
|
safe_logfire_info(
|
||||||
|
f"⚠️ HTML content is very large ({len(content)} bytes). "
|
||||||
|
f"Limiting to first {MAX_HTML_SIZE} bytes to prevent timeout."
|
||||||
|
)
|
||||||
|
# For very large files, focus on the first portion where code examples are likely to be
|
||||||
|
content = content[:MAX_HTML_SIZE]
|
||||||
|
# Try to find a good cutoff point (end of a tag)
|
||||||
|
last_tag_end = content.rfind('>')
|
||||||
|
if last_tag_end > MAX_HTML_SIZE - 1000:
|
||||||
|
content = content[:last_tag_end + 1]
|
||||||
|
|
||||||
# Check if we have actual content
|
# Check if we have actual content
|
||||||
if len(content) < 1000:
|
if len(content) < 1000:
|
||||||
@@ -507,9 +552,71 @@ class CodeExtractionService:
|
|||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
for pattern_tuple in patterns:
|
# PERFORMANCE: Early exit checks to avoid unnecessary regex processing
|
||||||
|
# Check more content (20KB instead of 5KB) and add URL-based exceptions
|
||||||
|
check_size = min(20000, len(content)) # Check first 20KB or entire content if smaller
|
||||||
|
has_code_indicators = any(indicator in content[:check_size] for indicator in
|
||||||
|
['<pre', '<code', 'language-', 'hljs', 'prism', 'shiki', 'highlight'])
|
||||||
|
|
||||||
|
# Never skip certain documentation sites that we know have code
|
||||||
|
is_known_code_site = any(domain in source_url.lower() for domain in
|
||||||
|
['milkdown', 'github.com', 'gitlab', 'docs.', 'dev.', 'api.'])
|
||||||
|
|
||||||
|
if not has_code_indicators and not is_known_code_site:
|
||||||
|
safe_logfire_info(f"No code indicators found in first {check_size} chars and not a known code site, skipping HTML extraction | url={source_url}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
if is_known_code_site and not has_code_indicators:
|
||||||
|
safe_logfire_info(f"Known code site but no indicators in first {check_size} chars, continuing anyway | url={source_url}")
|
||||||
|
|
||||||
|
# PERFORMANCE: Limit number of patterns to check based on detected libraries
|
||||||
|
patterns_to_check = []
|
||||||
|
content_lower = content[:10000].lower() # Check first 10KB for library detection
|
||||||
|
|
||||||
|
# Selectively add patterns based on what's detected
|
||||||
|
if 'milkdown' in content_lower:
|
||||||
|
patterns_to_check.extend([p for p in patterns if 'milkdown' in p[1]])
|
||||||
|
if 'monaco' in content_lower:
|
||||||
|
patterns_to_check.extend([p for p in patterns if 'monaco' in p[1]])
|
||||||
|
if 'codemirror' in content_lower or 'cm-' in content_lower:
|
||||||
|
patterns_to_check.extend([p for p in patterns if 'codemirror' in p[1]])
|
||||||
|
if 'prism' in content_lower:
|
||||||
|
patterns_to_check.extend([p for p in patterns if 'prism' in p[1]])
|
||||||
|
if 'hljs' in content_lower or 'highlight' in content_lower:
|
||||||
|
patterns_to_check.extend([p for p in patterns if 'hljs' in p[1] or 'highlight' in p[1]])
|
||||||
|
if 'shiki' in content_lower or 'astro' in content_lower:
|
||||||
|
patterns_to_check.extend([p for p in patterns if 'shiki' in p[1] or 'astro' in p[1]])
|
||||||
|
|
||||||
|
# Always include standard patterns as fallback (get ALL standard/generic patterns, not just last 5)
|
||||||
|
standard_patterns = [p for p in patterns if any(tag in p[1] for tag in ['standard', 'generic', 'prism', 'hljs'])]
|
||||||
|
patterns_to_check.extend(standard_patterns)
|
||||||
|
|
||||||
|
# Remove duplicates while preserving order
|
||||||
|
seen = set()
|
||||||
|
unique_patterns = []
|
||||||
|
for p in patterns_to_check:
|
||||||
|
if p[1] not in seen:
|
||||||
|
unique_patterns.append(p)
|
||||||
|
seen.add(p[1])
|
||||||
|
patterns_to_check = unique_patterns
|
||||||
|
|
||||||
|
# If we have very few patterns and it's a known code site, add more generic patterns
|
||||||
|
if len(patterns_to_check) < 5 and is_known_code_site:
|
||||||
|
safe_logfire_info(f"Known code site with few patterns ({len(patterns_to_check)}), adding more generic patterns")
|
||||||
|
patterns_to_check = patterns # Use all patterns for known code sites
|
||||||
|
|
||||||
|
safe_logfire_info(f"Checking {len(patterns_to_check)} relevant patterns out of {len(patterns)} total")
|
||||||
|
|
||||||
|
for pattern_tuple in patterns_to_check:
|
||||||
pattern_str, source_type = pattern_tuple
|
pattern_str, source_type = pattern_tuple
|
||||||
matches = list(re.finditer(pattern_str, content, re.DOTALL | re.IGNORECASE))
|
|
||||||
|
# PERFORMANCE: Use re.finditer with smaller chunks for very long content
|
||||||
|
# Only use DOTALL for patterns that really need it (multi-line blocks)
|
||||||
|
flags = re.IGNORECASE
|
||||||
|
if 'monaco' in source_type or 'codemirror' in source_type:
|
||||||
|
flags |= re.DOTALL # These need DOTALL for multi-line matching
|
||||||
|
|
||||||
|
matches = list(re.finditer(pattern_str, content, flags))
|
||||||
|
|
||||||
# Log pattern matches for Milkdown patterns and CodeMirror
|
# Log pattern matches for Milkdown patterns and CodeMirror
|
||||||
if matches and (
|
if matches and (
|
||||||
|
|||||||
@@ -428,6 +428,9 @@ class CrawlingService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Complete - send both the progress update and completion event
|
# Complete - send both the progress update and completion event
|
||||||
|
# CRITICAL: This is the ONLY place that should send status="completed"!
|
||||||
|
# All crawl strategies (batch, recursive, etc.) should use "finished" or other words.
|
||||||
|
# The frontend disconnects when it sees status="completed", so this must be the final step.
|
||||||
await update_mapped_progress(
|
await update_mapped_progress(
|
||||||
"completed",
|
"completed",
|
||||||
100,
|
100,
|
||||||
|
|||||||
@@ -73,7 +73,8 @@ class BatchCrawlStrategy:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
# For non-critical errors (e.g., network issues), use defaults but log prominently
|
# For non-critical errors (e.g., network issues), use defaults but log prominently
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True
|
f"Failed to load crawl settings from database: {e}, using defaults",
|
||||||
|
exc_info=True
|
||||||
)
|
)
|
||||||
batch_size = 50
|
batch_size = 50
|
||||||
if max_concurrent is None:
|
if max_concurrent is None:
|
||||||
@@ -98,102 +99,93 @@ class BatchCrawlStrategy:
|
|||||||
wait_for_images=False, # Skip images for faster crawling
|
wait_for_images=False, # Skip images for faster crawling
|
||||||
scan_full_page=True, # Trigger lazy loading
|
scan_full_page=True, # Trigger lazy loading
|
||||||
exclude_all_images=False,
|
exclude_all_images=False,
|
||||||
remove_overlay_elements=True,
|
|
||||||
process_iframes=True,
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Configuration for regular batch crawling
|
# Regular sites use standard configuration
|
||||||
crawl_config = CrawlerRunConfig(
|
crawl_config = CrawlerRunConfig(
|
||||||
cache_mode=CacheMode.BYPASS,
|
cache_mode=CacheMode.BYPASS,
|
||||||
stream=True, # Enable streaming
|
stream=True,
|
||||||
markdown_generator=self.markdown_generator,
|
markdown_generator=self.markdown_generator,
|
||||||
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
|
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
|
||||||
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")),
|
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")),
|
||||||
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")),
|
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")),
|
||||||
scan_full_page=True,
|
wait_for_images=False,
|
||||||
|
scan_full_page=False, # Don't scan full page for non-doc sites
|
||||||
|
exclude_all_images=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Transform URLs if needed
|
||||||
|
processed_urls = [transform_url_func(url) for url in urls]
|
||||||
|
|
||||||
|
# Create memory adaptive dispatcher
|
||||||
dispatcher = MemoryAdaptiveDispatcher(
|
dispatcher = MemoryAdaptiveDispatcher(
|
||||||
memory_threshold_percent=memory_threshold,
|
max_sessions=max_concurrent,
|
||||||
|
memory_threshold_mb=memory_threshold,
|
||||||
check_interval=check_interval,
|
check_interval=check_interval,
|
||||||
max_session_permit=max_concurrent,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def report_progress(percentage: int, message: str):
|
# Crawl URLs in batches using arun_many
|
||||||
"""Helper to report progress if callback is available"""
|
results = []
|
||||||
|
total_urls = len(processed_urls)
|
||||||
|
|
||||||
|
for batch_start in range(0, total_urls, batch_size):
|
||||||
|
batch_end = min(batch_start + batch_size, total_urls)
|
||||||
|
batch = processed_urls[batch_start:batch_end]
|
||||||
|
|
||||||
|
# Calculate progress for this batch
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
await progress_callback("crawling", percentage, message)
|
batch_progress = start_progress + ((batch_start / total_urls) * (end_progress - start_progress))
|
||||||
|
await progress_callback(
|
||||||
total_urls = len(urls)
|
"batch_crawling",
|
||||||
await report_progress(start_progress, f"Starting to crawl {total_urls} URLs...")
|
int(batch_progress),
|
||||||
|
f"Crawling batch {batch_start // batch_size + 1} ({batch_start + 1}-{batch_end}/{total_urls} URLs)"
|
||||||
# Use configured batch size
|
|
||||||
successful_results = []
|
|
||||||
processed = 0
|
|
||||||
|
|
||||||
# Transform all URLs at the beginning
|
|
||||||
url_mapping = {} # Map transformed URLs back to original
|
|
||||||
transformed_urls = []
|
|
||||||
for url in urls:
|
|
||||||
transformed = transform_url_func(url)
|
|
||||||
transformed_urls.append(transformed)
|
|
||||||
url_mapping[transformed] = url
|
|
||||||
|
|
||||||
for i in range(0, total_urls, batch_size):
|
|
||||||
batch_urls = transformed_urls[i : i + batch_size]
|
|
||||||
batch_start = i
|
|
||||||
batch_end = min(i + batch_size, total_urls)
|
|
||||||
|
|
||||||
# Report batch start with smooth progress
|
|
||||||
progress_percentage = start_progress + int(
|
|
||||||
(i / total_urls) * (end_progress - start_progress)
|
|
||||||
)
|
|
||||||
await report_progress(
|
|
||||||
progress_percentage,
|
|
||||||
f"Processing batch {batch_start + 1}-{batch_end} of {total_urls} URLs...",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Crawl this batch using arun_many with streaming
|
|
||||||
logger.info(
|
|
||||||
f"Starting parallel crawl of batch {batch_start + 1}-{batch_end} ({len(batch_urls)} URLs)"
|
|
||||||
)
|
|
||||||
batch_results = await self.crawler.arun_many(
|
|
||||||
urls=batch_urls, config=crawl_config, dispatcher=dispatcher
|
|
||||||
)
|
|
||||||
|
|
||||||
# Handle streaming results
|
|
||||||
j = 0
|
|
||||||
async for result in batch_results:
|
|
||||||
processed += 1
|
|
||||||
if result.success and result.markdown:
|
|
||||||
# Map back to original URL
|
|
||||||
original_url = url_mapping.get(result.url, result.url)
|
|
||||||
successful_results.append({
|
|
||||||
"url": original_url,
|
|
||||||
"markdown": result.markdown,
|
|
||||||
"html": result.html, # Use raw HTML
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
logger.warning(
|
|
||||||
f"Failed to crawl {result.url}: {getattr(result, 'error_message', 'Unknown error')}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Report individual URL progress with smooth increments
|
|
||||||
progress_percentage = start_progress + int(
|
|
||||||
(processed / total_urls) * (end_progress - start_progress)
|
|
||||||
)
|
)
|
||||||
# Report more frequently for smoother progress
|
|
||||||
if (
|
|
||||||
processed % 5 == 0 or processed == total_urls
|
|
||||||
): # Report every 5 URLs or at the end
|
|
||||||
await report_progress(
|
|
||||||
progress_percentage,
|
|
||||||
f"Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)",
|
|
||||||
)
|
|
||||||
j += 1
|
|
||||||
|
|
||||||
await report_progress(
|
# Run batch crawl
|
||||||
end_progress,
|
try:
|
||||||
f"Batch crawling completed: {len(successful_results)}/{total_urls} pages successful",
|
batch_results = await self.crawler.arun_many(
|
||||||
)
|
batch,
|
||||||
return successful_results
|
config=crawl_config,
|
||||||
|
dispatcher=dispatcher
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process results
|
||||||
|
for result in batch_results:
|
||||||
|
if result.success:
|
||||||
|
results.append({
|
||||||
|
"url": result.url,
|
||||||
|
"markdown": result.markdown_v2.raw_markdown if result.markdown_v2 else "",
|
||||||
|
"success": True,
|
||||||
|
"metadata": result.extracted_content if hasattr(result, 'extracted_content') else {}
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to crawl {result.url}: {result.error_message}")
|
||||||
|
results.append({
|
||||||
|
"url": result.url,
|
||||||
|
"markdown": "",
|
||||||
|
"success": False,
|
||||||
|
"error": result.error_message
|
||||||
|
})
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Batch crawl error: {e}", exc_info=True)
|
||||||
|
# Add failed results for this batch
|
||||||
|
for url in batch:
|
||||||
|
results.append({
|
||||||
|
"url": url,
|
||||||
|
"markdown": "",
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
# Update progress after batch completion
|
||||||
|
# IMPORTANT: Use "finished" not "completed" - only the final orchestrator should send "completed"
|
||||||
|
if progress_callback:
|
||||||
|
batch_progress = start_progress + ((batch_end / total_urls) * (end_progress - start_progress))
|
||||||
|
await progress_callback(
|
||||||
|
"batch_crawling",
|
||||||
|
int(batch_progress),
|
||||||
|
f"Finished batch {batch_start // batch_size + 1}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return results
|
||||||
@@ -60,7 +60,7 @@ class RecursiveCrawlStrategy:
|
|||||||
if not self.crawler:
|
if not self.crawler:
|
||||||
logger.error("No crawler instance available for recursive crawling")
|
logger.error("No crawler instance available for recursive crawling")
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
await progress_callback("error", 0, "Crawler not available")
|
await progress_callback("error", 0, "Crawler not available", step_info={"currentStep": "error", "stepMessage": "Crawler not available"})
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Load settings from database - fail fast on configuration errors
|
# Load settings from database - fail fast on configuration errors
|
||||||
@@ -78,7 +78,8 @@ class RecursiveCrawlStrategy:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
# For non-critical errors (e.g., network issues), use defaults but log prominently
|
# For non-critical errors (e.g., network issues), use defaults but log prominently
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True
|
f"Failed to load crawl settings from database: {e}, using defaults",
|
||||||
|
exc_info=True
|
||||||
)
|
)
|
||||||
batch_size = 50
|
batch_size = 50
|
||||||
if max_concurrent is None:
|
if max_concurrent is None:
|
||||||
@@ -126,11 +127,19 @@ class RecursiveCrawlStrategy:
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def report_progress(percentage: int, message: str, **kwargs):
|
async def report_progress(percentage: int, message: str, **kwargs):
|
||||||
"""Helper to report progress if callback is available"""
|
"""Helper to report progress if callback is available
|
||||||
|
|
||||||
|
IMPORTANT: Never use "complete" or "completed" in messages here!
|
||||||
|
This is just an intermediate step in the overall crawl process.
|
||||||
|
Only the final orchestrator should send "completed" status.
|
||||||
|
"""
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
# Add step information for multi-progress tracking
|
# Add step information for multi-progress tracking
|
||||||
step_info = {"currentStep": message, "stepMessage": message, **kwargs}
|
step_info = {
|
||||||
await progress_callback("crawling", percentage, message, **step_info)
|
"currentStep": message,
|
||||||
|
"stepMessage": message
|
||||||
|
}
|
||||||
|
await progress_callback("crawling", percentage, message, step_info=step_info, **kwargs)
|
||||||
|
|
||||||
visited = set()
|
visited = set()
|
||||||
|
|
||||||
@@ -183,7 +192,9 @@ class RecursiveCrawlStrategy:
|
|||||||
# Use arun_many for native parallel crawling with streaming
|
# Use arun_many for native parallel crawling with streaming
|
||||||
logger.info(f"Starting parallel crawl of {len(batch_urls)} URLs with arun_many")
|
logger.info(f"Starting parallel crawl of {len(batch_urls)} URLs with arun_many")
|
||||||
batch_results = await self.crawler.arun_many(
|
batch_results = await self.crawler.arun_many(
|
||||||
urls=batch_urls, config=run_config, dispatcher=dispatcher
|
urls=batch_urls,
|
||||||
|
config=run_config,
|
||||||
|
dispatcher=dispatcher
|
||||||
)
|
)
|
||||||
|
|
||||||
# Handle streaming results from arun_many
|
# Handle streaming results from arun_many
|
||||||
@@ -239,14 +250,16 @@ class RecursiveCrawlStrategy:
|
|||||||
|
|
||||||
current_urls = next_level_urls
|
current_urls = next_level_urls
|
||||||
|
|
||||||
# Report completion of this depth
|
# Report completion of this depth - IMPORTANT: Use "finished" not "completed"!
|
||||||
await report_progress(
|
await report_progress(
|
||||||
depth_end,
|
depth_end,
|
||||||
f"Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth",
|
f"Depth {depth + 1} finished: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# IMPORTANT: Use "finished" not "complete" - only the final orchestrator should say "completed"
|
||||||
await report_progress(
|
await report_progress(
|
||||||
end_progress,
|
end_progress,
|
||||||
f"Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels",
|
f"Recursive crawl finished: {len(results_all)} pages successfully crawled",
|
||||||
)
|
)
|
||||||
|
|
||||||
return results_all
|
return results_all
|
||||||
@@ -87,30 +87,42 @@ class RateLimiter:
|
|||||||
async def acquire(self, estimated_tokens: int = 8000) -> bool:
|
async def acquire(self, estimated_tokens: int = 8000) -> bool:
|
||||||
"""Acquire permission to make API call with token awareness"""
|
"""Acquire permission to make API call with token awareness"""
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
now = time.time()
|
while True: # Use a loop instead of recursion
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
# Clean old entries
|
# Clean old entries
|
||||||
self._clean_old_entries(now)
|
self._clean_old_entries(now)
|
||||||
|
|
||||||
# Check if we can make the request
|
# Check if we can make the request
|
||||||
if not self._can_make_request(estimated_tokens):
|
if self._can_make_request(estimated_tokens):
|
||||||
|
# Record the request
|
||||||
|
self.request_times.append(now)
|
||||||
|
self.token_usage.append((now, estimated_tokens))
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Calculate wait time
|
||||||
wait_time = self._calculate_wait_time(estimated_tokens)
|
wait_time = self._calculate_wait_time(estimated_tokens)
|
||||||
if wait_time > 0:
|
if wait_time <= 0:
|
||||||
logfire_logger.info(
|
return False
|
||||||
f"Rate limiting: waiting {wait_time:.1f}s",
|
|
||||||
extra={
|
|
||||||
"tokens": estimated_tokens,
|
|
||||||
"current_usage": self._get_current_usage(),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
await asyncio.sleep(wait_time)
|
|
||||||
return await self.acquire(estimated_tokens)
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Record the request
|
logfire_logger.info(
|
||||||
self.request_times.append(now)
|
f"Rate limiting: waiting {wait_time:.1f}s",
|
||||||
self.token_usage.append((now, estimated_tokens))
|
extra={
|
||||||
return True
|
"tokens": estimated_tokens,
|
||||||
|
"current_usage": self._get_current_usage(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Release the lock while sleeping to allow other operations
|
||||||
|
self._lock.release()
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(wait_time)
|
||||||
|
logfire_logger.info(f"Rate limiting: resuming after {wait_time:.1f}s wait")
|
||||||
|
finally:
|
||||||
|
# Re-acquire the lock before continuing
|
||||||
|
await self._lock.acquire()
|
||||||
|
|
||||||
|
# Loop will continue and re-check conditions
|
||||||
|
|
||||||
def _can_make_request(self, estimated_tokens: int) -> bool:
|
def _can_make_request(self, estimated_tokens: int) -> bool:
|
||||||
"""Check if request can be made within limits"""
|
"""Check if request can be made within limits"""
|
||||||
|
|||||||
@@ -26,16 +26,6 @@ sio = socketio.AsyncServer(
|
|||||||
ping_interval=60, # 1 minute - check connection every minute
|
ping_interval=60, # 1 minute - check connection every minute
|
||||||
)
|
)
|
||||||
|
|
||||||
# Global Socket.IO instance for use across modules
|
|
||||||
_socketio_instance: socketio.AsyncServer | None = None
|
|
||||||
|
|
||||||
def get_socketio_instance() -> socketio.AsyncServer:
|
|
||||||
"""Get the global Socket.IO server instance."""
|
|
||||||
global _socketio_instance
|
|
||||||
if _socketio_instance is None:
|
|
||||||
_socketio_instance = sio
|
|
||||||
return _socketio_instance
|
|
||||||
|
|
||||||
|
|
||||||
def create_socketio_app(app: FastAPI) -> socketio.ASGIApp:
|
def create_socketio_app(app: FastAPI) -> socketio.ASGIApp:
|
||||||
"""
|
"""
|
||||||
@@ -62,3 +52,24 @@ def create_socketio_app(app: FastAPI) -> socketio.ASGIApp:
|
|||||||
sio.app = app
|
sio.app = app
|
||||||
|
|
||||||
return socket_app
|
return socket_app
|
||||||
|
|
||||||
|
# Default Socket.IO event handlers
|
||||||
|
@sio.event
|
||||||
|
async def connect(sid, environ):
|
||||||
|
"""Handle new client connections."""
|
||||||
|
logger.info(f"Client connected: {sid}")
|
||||||
|
safe_logfire_info(f"Client connected: {sid}")
|
||||||
|
|
||||||
|
|
||||||
|
@sio.event
|
||||||
|
async def disconnect(sid):
|
||||||
|
"""Handle client disconnections."""
|
||||||
|
logger.info(f"Client disconnected: {sid}")
|
||||||
|
safe_logfire_info(f"Client disconnected: {sid}")
|
||||||
|
|
||||||
|
|
||||||
|
@sio.event
|
||||||
|
async def message(sid, data):
|
||||||
|
"""Handle incoming messages."""
|
||||||
|
logger.info(f"Received message from {sid}: {data}")
|
||||||
|
await sio.emit("response", {"data": "Message received!"}, to=sid)
|
||||||
Reference in New Issue
Block a user