diff --git a/python/src/server/services/crawling/strategies/batch.py b/python/src/server/services/crawling/strategies/batch.py index d97b0bc4..72fc2fae 100644 --- a/python/src/server/services/crawling/strategies/batch.py +++ b/python/src/server/services/crawling/strategies/batch.py @@ -4,7 +4,6 @@ Batch Crawling Strategy Handles batch crawling of multiple URLs in parallel. """ -import asyncio from typing import List, Dict, Any, Optional, Callable from crawl4ai import CrawlerRunConfig, CacheMode, MemoryAdaptiveDispatcher @@ -70,10 +69,12 @@ class BatchCrawlStrategy: except (ValueError, KeyError, TypeError) as e: # Critical configuration errors should fail fast in alpha logger.error(f"Invalid crawl settings format: {e}", exc_info=True) - raise ValueError(f"Failed to load crawler configuration: {e}") + raise ValueError(f"Failed to load crawler configuration: {e}") from e except Exception as e: # For non-critical errors (e.g., network issues), use defaults but log prominently - logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True) + logger.error( + f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True + ) batch_size = 50 if max_concurrent is None: max_concurrent = 10 # Safe default to prevent memory issues @@ -91,7 +92,6 @@ class BatchCrawlStrategy: cache_mode=CacheMode.BYPASS, stream=True, # Enable streaming for faster parallel processing markdown_generator=self.markdown_generator, - wait_for="body", # Simple selector for batch wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"), page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")), delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "1.0")), @@ -196,4 +196,4 @@ class BatchCrawlStrategy: end_progress, f"Batch crawling completed: {len(successful_results)}/{total_urls} pages successful", ) - return successful_results + return successful_results \ No newline at end of file diff --git a/python/src/server/services/crawling/strategies/recursive.py b/python/src/server/services/crawling/strategies/recursive.py index d6eb5b38..979c7fac 100644 --- a/python/src/server/services/crawling/strategies/recursive.py +++ b/python/src/server/services/crawling/strategies/recursive.py @@ -3,7 +3,7 @@ Recursive Crawling Strategy Handles recursive crawling of websites by following internal links. """ -import asyncio + from typing import List, Dict, Any, Optional, Callable from urllib.parse import urldefrag @@ -17,11 +17,11 @@ logger = get_logger(__name__) class RecursiveCrawlStrategy: """Strategy for recursive crawling of websites.""" - + def __init__(self, crawler, markdown_generator): """ Initialize recursive crawl strategy. - + Args: crawler (AsyncWebCrawler): The Crawl4AI crawler instance for web crawling operations markdown_generator (DefaultMarkdownGenerator): The markdown generator instance for converting HTML to markdown @@ -29,7 +29,7 @@ class RecursiveCrawlStrategy: self.crawler = crawler self.markdown_generator = markdown_generator self.url_handler = URLHandler() - + async def crawl_recursive_with_progress( self, start_urls: List[str], @@ -39,11 +39,11 @@ class RecursiveCrawlStrategy: max_concurrent: int = None, progress_callback: Optional[Callable] = None, start_progress: int = 10, - end_progress: int = 60 + end_progress: int = 60, ) -> List[Dict[str, Any]]: """ Recursively crawl internal links from start URLs up to a maximum depth with progress reporting. - + Args: start_urls: List of starting URLs transform_url_func: Function to transform URLs (e.g., GitHub URLs) @@ -53,16 +53,16 @@ class RecursiveCrawlStrategy: progress_callback: Optional callback for progress updates start_progress: Starting progress percentage end_progress: Ending progress percentage - + Returns: List of crawl results """ if not self.crawler: logger.error("No crawler instance available for recursive crawling") if progress_callback: - await progress_callback('error', 0, 'Crawler not available') + await progress_callback("error", 0, "Crawler not available") return [] - + # Load settings from database - fail fast on configuration errors try: settings = await credential_service.get_credentials_by_category("rag_strategy") @@ -74,27 +74,30 @@ class RecursiveCrawlStrategy: except (ValueError, KeyError, TypeError) as e: # Critical configuration errors should fail fast in alpha logger.error(f"Invalid crawl settings format: {e}", exc_info=True) - raise ValueError(f"Failed to load crawler configuration: {e}") + raise ValueError(f"Failed to load crawler configuration: {e}") from e except Exception as e: # For non-critical errors (e.g., network issues), use defaults but log prominently - logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True) + logger.error( + f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True + ) batch_size = 50 if max_concurrent is None: max_concurrent = 10 # Safe default to prevent memory issues memory_threshold = 80.0 check_interval = 0.5 settings = {} # Empty dict for defaults - + # Check if start URLs include documentation sites has_doc_sites = any(is_documentation_site_func(url) for url in start_urls) - + if has_doc_sites: - logger.info("Detected documentation sites for recursive crawl, using enhanced configuration") + logger.info( + "Detected documentation sites for recursive crawl, using enhanced configuration" + ) run_config = CrawlerRunConfig( cache_mode=CacheMode.BYPASS, stream=True, # Enable streaming for faster parallel processing markdown_generator=self.markdown_generator, - wait_for='body', wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"), page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")), delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "1.0")), @@ -102,7 +105,7 @@ class RecursiveCrawlStrategy: scan_full_page=True, # Trigger lazy loading exclude_all_images=False, remove_overlay_elements=True, - process_iframes=True + process_iframes=True, ) else: # Configuration for regular recursive crawling @@ -113,65 +116,76 @@ class RecursiveCrawlStrategy: wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"), page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")), delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")), - scan_full_page=True + scan_full_page=True, ) - + dispatcher = MemoryAdaptiveDispatcher( memory_threshold_percent=memory_threshold, check_interval=check_interval, - max_session_permit=max_concurrent + max_session_permit=max_concurrent, ) - + async def report_progress(percentage: int, message: str, **kwargs): """Helper to report progress if callback is available""" if progress_callback: # Add step information for multi-progress tracking - step_info = { - 'currentStep': message, - 'stepMessage': message, - **kwargs - } - await progress_callback('crawling', percentage, message, **step_info) - + step_info = {"currentStep": message, "stepMessage": message, **kwargs} + await progress_callback("crawling", percentage, message, **step_info) + visited = set() - + def normalize_url(url): return urldefrag(url)[0] - + current_urls = set([normalize_url(u) for u in start_urls]) results_all = [] total_processed = 0 - + for depth in range(max_depth): - urls_to_crawl = [normalize_url(url) for url in current_urls if normalize_url(url) not in visited] + urls_to_crawl = [ + normalize_url(url) for url in current_urls if normalize_url(url) not in visited + ] if not urls_to_crawl: break - + # Calculate progress for this depth level - depth_start = start_progress + int((depth / max_depth) * (end_progress - start_progress) * 0.8) - depth_end = start_progress + int(((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8) - - await report_progress(depth_start, f'Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process') - + depth_start = start_progress + int( + (depth / max_depth) * (end_progress - start_progress) * 0.8 + ) + depth_end = start_progress + int( + ((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8 + ) + + await report_progress( + depth_start, + f"Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process", + ) + # Use configured batch size for recursive crawling next_level_urls = set() depth_successful = 0 - + for batch_idx in range(0, len(urls_to_crawl), batch_size): - batch_urls = urls_to_crawl[batch_idx:batch_idx + batch_size] + batch_urls = urls_to_crawl[batch_idx : batch_idx + batch_size] batch_end_idx = min(batch_idx + batch_size, len(urls_to_crawl)) - + # Calculate progress for this batch within the depth - batch_progress = depth_start + int((batch_idx / len(urls_to_crawl)) * (depth_end - depth_start)) - await report_progress(batch_progress, - f'Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}', - totalPages=total_processed + batch_idx, - processedPages=len(results_all)) - + batch_progress = depth_start + int( + (batch_idx / len(urls_to_crawl)) * (depth_end - depth_start) + ) + await report_progress( + batch_progress, + f"Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}", + totalPages=total_processed + batch_idx, + processedPages=len(results_all), + ) + # Use arun_many for native parallel crawling with streaming logger.info(f"Starting parallel crawl of {len(batch_urls)} URLs with arun_many") - batch_results = await self.crawler.arun_many(urls=batch_urls, config=run_config, dispatcher=dispatcher) - + batch_results = await self.crawler.arun_many( + urls=batch_urls, config=run_config, dispatcher=dispatcher + ) + # Handle streaming results from arun_many i = 0 async for result in batch_results: @@ -181,45 +195,58 @@ class RecursiveCrawlStrategy: if transform_url_func(orig_url) == result.url: original_url = orig_url break - + norm_url = normalize_url(original_url) visited.add(norm_url) total_processed += 1 - + if result.success and result.markdown: results_all.append({ - 'url': original_url, - 'markdown': result.markdown, - 'html': result.html # Always use raw HTML for code extraction + "url": original_url, + "markdown": result.markdown, + "html": result.html, # Always use raw HTML for code extraction }) depth_successful += 1 - + # Find internal links for next depth for link in result.links.get("internal", []): next_url = normalize_url(link["href"]) # Skip binary files and already visited URLs - if next_url not in visited and not self.url_handler.is_binary_file(next_url): + if next_url not in visited and not self.url_handler.is_binary_file( + next_url + ): next_level_urls.add(next_url) elif self.url_handler.is_binary_file(next_url): logger.debug(f"Skipping binary file from crawl queue: {next_url}") else: - logger.warning(f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}") - + logger.warning( + f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}" + ) + # Report progress every few URLs current_idx = batch_idx + i + 1 if current_idx % 5 == 0 or current_idx == len(urls_to_crawl): - current_progress = depth_start + int((current_idx / len(urls_to_crawl)) * (depth_end - depth_start)) - await report_progress(current_progress, - f'Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)', - totalPages=total_processed, - processedPages=len(results_all)) + current_progress = depth_start + int( + (current_idx / len(urls_to_crawl)) * (depth_end - depth_start) + ) + await report_progress( + current_progress, + f"Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)", + totalPages=total_processed, + processedPages=len(results_all), + ) i += 1 - + current_urls = next_level_urls - + # Report completion of this depth - await report_progress(depth_end, - f'Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth') - - await report_progress(end_progress, f'Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels') + await report_progress( + depth_end, + f"Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth", + ) + + await report_progress( + end_progress, + f"Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels", + ) return results_all \ No newline at end of file diff --git a/python/src/server/services/threading_service.py b/python/src/server/services/threading_service.py index a5ad6cbd..12d69b63 100644 --- a/python/src/server/services/threading_service.py +++ b/python/src/server/services/threading_service.py @@ -93,18 +93,19 @@ class RateLimiter: self._clean_old_entries(now) # Check if we can make the request - while not self._can_make_request(estimated_tokens): + if not self._can_make_request(estimated_tokens): wait_time = self._calculate_wait_time(estimated_tokens) if wait_time > 0: logfire_logger.info( - f"Rate limiting: waiting {wait_time:.1f}s (tokens={estimated_tokens}, current_usage={self._get_current_usage()})" + f"Rate limiting: waiting {wait_time:.1f}s", + extra={ + "tokens": estimated_tokens, + "current_usage": self._get_current_usage(), + } ) await asyncio.sleep(wait_time) - # Clean old entries after waiting - now = time.time() - self._clean_old_entries(now) - else: - return False + return await self.acquire(estimated_tokens) + return False # Record the request self.request_times.append(now) @@ -199,13 +200,21 @@ class MemoryAdaptiveDispatcher: # Reduce workers when memory is high workers = max(1, base // 2) logfire_logger.warning( - f"High memory usage detected, reducing workers (memory_percent={metrics.memory_percent}, workers={workers})" + "High memory usage detected, reducing workers", + extra={ + "memory_percent": metrics.memory_percent, + "workers": workers, + } ) elif metrics.cpu_percent > self.config.cpu_threshold * 100: # Reduce workers when CPU is high workers = max(1, base // 2) logfire_logger.warning( - f"High CPU usage detected, reducing workers (cpu_percent={metrics.cpu_percent}, workers={workers})" + "High CPU usage detected, reducing workers", + extra={ + "cpu_percent": metrics.cpu_percent, + "workers": workers, + } ) elif metrics.memory_percent < 50 and metrics.cpu_percent < 50: # Increase workers when resources are available @@ -235,7 +244,14 @@ class MemoryAdaptiveDispatcher: semaphore = asyncio.Semaphore(optimal_workers) logfire_logger.info( - f"Starting adaptive processing (items_count={len(items)}, workers={optimal_workers}, mode={mode}, memory_percent={self.last_metrics.memory_percent}, cpu_percent={self.last_metrics.cpu_percent})" + "Starting adaptive processing", + extra={ + "items_count": len(items), + "workers": optimal_workers, + "mode": mode, + "memory_percent": self.last_metrics.memory_percent, + "cpu_percent": self.last_metrics.cpu_percent, + } ) # Track active workers @@ -310,7 +326,8 @@ class MemoryAdaptiveDispatcher: del active_workers[worker_id] logfire_logger.error( - f"Processing failed for item {index} (error={str(e)}, item_index={index})" + f"Processing failed for item {index}", + extra={"error": str(e), "item_index": index} ) return None @@ -325,7 +342,13 @@ class MemoryAdaptiveDispatcher: success_rate = len(successful_results) / len(items) * 100 logfire_logger.info( - f"Adaptive processing completed (total_items={len(items)}, successful={len(successful_results)}, success_rate={success_rate:.1f}%, workers_used={optimal_workers})" + "Adaptive processing completed", + extra={ + "total_items": len(items), + "successful": len(successful_results), + "success_rate": f"{success_rate:.1f}%", + "workers_used": optimal_workers, + } ) return successful_results @@ -343,7 +366,8 @@ class WebSocketSafeProcessor: await websocket.accept() self.active_connections.append(websocket) logfire_logger.info( - f"WebSocket client connected (total_connections={len(self.active_connections)})" + "WebSocket client connected", + extra={"total_connections": len(self.active_connections)} ) def disconnect(self, websocket: WebSocket): @@ -351,7 +375,8 @@ class WebSocketSafeProcessor: if websocket in self.active_connections: self.active_connections.remove(websocket) logfire_logger.info( - f"WebSocket client disconnected (remaining_connections={len(self.active_connections)})" + "WebSocket client disconnected", + extra={"remaining_connections": len(self.active_connections)} ) async def broadcast_progress(self, message: dict[str, Any]): @@ -462,7 +487,7 @@ class ThreadingService: self._running = True self._health_check_task = asyncio.create_task(self._health_check_loop()) - logfire_logger.info(f"Threading service started (config={self.config.__dict__})") + logfire_logger.info("Threading service started", extra={"config": self.config.__dict__}) async def stop(self): """Stop the threading service""" @@ -498,7 +523,8 @@ class ThreadingService: finally: duration = time.time() - start_time logfire_logger.debug( - f"Rate limited operation completed (duration={duration}, tokens={estimated_tokens})" + "Rate limited operation completed", + extra={"duration": duration, "tokens": estimated_tokens}, ) async def run_cpu_intensive(self, func: Callable, *args, **kwargs) -> Any: @@ -550,30 +576,44 @@ class ThreadingService: # Log system metrics logfire_logger.info( - f"System health check (memory_percent={metrics.memory_percent}, cpu_percent={metrics.cpu_percent}, available_memory_gb={metrics.available_memory_gb}, active_threads={metrics.active_threads}, active_websockets={len(self.websocket_processor.active_connections)})" + "System health check", + extra={ + "memory_percent": metrics.memory_percent, + "cpu_percent": metrics.cpu_percent, + "available_memory_gb": metrics.available_memory_gb, + "active_threads": metrics.active_threads, + "active_websockets": len(self.websocket_processor.active_connections), + } ) # Alert on critical thresholds if metrics.memory_percent > 90: logfire_logger.warning( - f"Critical memory usage (memory_percent={metrics.memory_percent})" + "Critical memory usage", + extra={"memory_percent": metrics.memory_percent} ) # Force garbage collection gc.collect() if metrics.cpu_percent > 95: - logfire_logger.warning(f"Critical CPU usage (cpu_percent={metrics.cpu_percent})") + logfire_logger.warning( + "Critical CPU usage", extra={"cpu_percent": metrics.cpu_percent} + ) # Check for memory leaks (too many threads) if metrics.active_threads > self.config.max_workers * 3: logfire_logger.warning( - f"High thread count detected (active_threads={metrics.active_threads}, max_expected={self.config.max_workers * 3})" + "High thread count detected", + extra={ + "active_threads": metrics.active_threads, + "max_expected": self.config.max_workers * 3, + } ) await asyncio.sleep(self.config.health_check_interval) except Exception as e: - logfire_logger.error(f"Health check failed (error={str(e)})") + logfire_logger.error("Health check failed", extra={"error": str(e)}) await asyncio.sleep(self.config.health_check_interval) @@ -601,4 +641,4 @@ async def stop_threading_service(): global _threading_service if _threading_service: await _threading_service.stop() - _threading_service = None + _threading_service = None \ No newline at end of file