diff --git a/python/src/server/services/crawling/strategies/batch.py b/python/src/server/services/crawling/strategies/batch.py index 17932ac2..bf2cf571 100644 --- a/python/src/server/services/crawling/strategies/batch.py +++ b/python/src/server/services/crawling/strategies/batch.py @@ -4,7 +4,6 @@ Batch Crawling Strategy Handles batch crawling of multiple URLs in parallel. """ -import asyncio from typing import List, Dict, Any, Optional, Callable from crawl4ai import CrawlerRunConfig, CacheMode, MemoryAdaptiveDispatcher @@ -70,10 +69,12 @@ class BatchCrawlStrategy: except (ValueError, KeyError, TypeError) as e: # Critical configuration errors should fail fast in alpha logger.error(f"Invalid crawl settings format: {e}", exc_info=True) - raise ValueError(f"Failed to load crawler configuration: {e}") + raise ValueError(f"Failed to load crawler configuration: {e}") from e except Exception as e: # For non-critical errors (e.g., network issues), use defaults but log prominently - logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True) + logger.error( + f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True + ) batch_size = 50 if max_concurrent is None: max_concurrent = 10 # Safe default to prevent memory issues diff --git a/python/src/server/services/crawling/strategies/recursive.py b/python/src/server/services/crawling/strategies/recursive.py index d258143f..2ce252f3 100644 --- a/python/src/server/services/crawling/strategies/recursive.py +++ b/python/src/server/services/crawling/strategies/recursive.py @@ -3,7 +3,7 @@ Recursive Crawling Strategy Handles recursive crawling of websites by following internal links. """ -import asyncio + from typing import List, Dict, Any, Optional, Callable from urllib.parse import urldefrag @@ -17,11 +17,11 @@ logger = get_logger(__name__) class RecursiveCrawlStrategy: """Strategy for recursive crawling of websites.""" - + def __init__(self, crawler, markdown_generator): """ Initialize recursive crawl strategy. - + Args: crawler (AsyncWebCrawler): The Crawl4AI crawler instance for web crawling operations markdown_generator (DefaultMarkdownGenerator): The markdown generator instance for converting HTML to markdown @@ -29,7 +29,7 @@ class RecursiveCrawlStrategy: self.crawler = crawler self.markdown_generator = markdown_generator self.url_handler = URLHandler() - + async def crawl_recursive_with_progress( self, start_urls: List[str], @@ -39,11 +39,11 @@ class RecursiveCrawlStrategy: max_concurrent: int = None, progress_callback: Optional[Callable] = None, start_progress: int = 10, - end_progress: int = 60 + end_progress: int = 60, ) -> List[Dict[str, Any]]: """ Recursively crawl internal links from start URLs up to a maximum depth with progress reporting. - + Args: start_urls: List of starting URLs transform_url_func: Function to transform URLs (e.g., GitHub URLs) @@ -53,16 +53,16 @@ class RecursiveCrawlStrategy: progress_callback: Optional callback for progress updates start_progress: Starting progress percentage end_progress: Ending progress percentage - + Returns: List of crawl results """ if not self.crawler: logger.error("No crawler instance available for recursive crawling") if progress_callback: - await progress_callback('error', 0, 'Crawler not available') + await progress_callback("error", 0, "Crawler not available") return [] - + # Load settings from database - fail fast on configuration errors try: settings = await credential_service.get_credentials_by_category("rag_strategy") @@ -74,22 +74,26 @@ class RecursiveCrawlStrategy: except (ValueError, KeyError, TypeError) as e: # Critical configuration errors should fail fast in alpha logger.error(f"Invalid crawl settings format: {e}", exc_info=True) - raise ValueError(f"Failed to load crawler configuration: {e}") + raise ValueError(f"Failed to load crawler configuration: {e}") from e except Exception as e: # For non-critical errors (e.g., network issues), use defaults but log prominently - logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True) + logger.error( + f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True + ) batch_size = 50 if max_concurrent is None: max_concurrent = 10 # Safe default to prevent memory issues memory_threshold = 80.0 check_interval = 0.5 settings = {} # Empty dict for defaults - + # Check if start URLs include documentation sites has_doc_sites = any(is_documentation_site_func(url) for url in start_urls) - + if has_doc_sites: - logger.info("Detected documentation sites for recursive crawl, using enhanced configuration") + logger.info( + "Detected documentation sites for recursive crawl, using enhanced configuration" + ) run_config = CrawlerRunConfig( cache_mode=CacheMode.BYPASS, stream=True, # Enable streaming for faster parallel processing @@ -101,7 +105,7 @@ class RecursiveCrawlStrategy: scan_full_page=True, # Trigger lazy loading exclude_all_images=False, remove_overlay_elements=True, - process_iframes=True + process_iframes=True, ) else: # Configuration for regular recursive crawling @@ -112,65 +116,76 @@ class RecursiveCrawlStrategy: wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"), page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")), delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")), - scan_full_page=True + scan_full_page=True, ) - + dispatcher = MemoryAdaptiveDispatcher( memory_threshold_percent=memory_threshold, check_interval=check_interval, - max_session_permit=max_concurrent + max_session_permit=max_concurrent, ) - + async def report_progress(percentage: int, message: str, **kwargs): """Helper to report progress if callback is available""" if progress_callback: # Add step information for multi-progress tracking - step_info = { - 'currentStep': message, - 'stepMessage': message, - **kwargs - } - await progress_callback('crawling', percentage, message, **step_info) - + step_info = {"currentStep": message, "stepMessage": message, **kwargs} + await progress_callback("crawling", percentage, message, **step_info) + visited = set() - + def normalize_url(url): return urldefrag(url)[0] - + current_urls = set([normalize_url(u) for u in start_urls]) results_all = [] total_processed = 0 - + for depth in range(max_depth): - urls_to_crawl = [normalize_url(url) for url in current_urls if normalize_url(url) not in visited] + urls_to_crawl = [ + normalize_url(url) for url in current_urls if normalize_url(url) not in visited + ] if not urls_to_crawl: break - + # Calculate progress for this depth level - depth_start = start_progress + int((depth / max_depth) * (end_progress - start_progress) * 0.8) - depth_end = start_progress + int(((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8) - - await report_progress(depth_start, f'Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process') - + depth_start = start_progress + int( + (depth / max_depth) * (end_progress - start_progress) * 0.8 + ) + depth_end = start_progress + int( + ((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8 + ) + + await report_progress( + depth_start, + f"Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process", + ) + # Use configured batch size for recursive crawling next_level_urls = set() depth_successful = 0 - + for batch_idx in range(0, len(urls_to_crawl), batch_size): - batch_urls = urls_to_crawl[batch_idx:batch_idx + batch_size] + batch_urls = urls_to_crawl[batch_idx : batch_idx + batch_size] batch_end_idx = min(batch_idx + batch_size, len(urls_to_crawl)) - + # Calculate progress for this batch within the depth - batch_progress = depth_start + int((batch_idx / len(urls_to_crawl)) * (depth_end - depth_start)) - await report_progress(batch_progress, - f'Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}', - totalPages=total_processed + batch_idx, - processedPages=len(results_all)) - + batch_progress = depth_start + int( + (batch_idx / len(urls_to_crawl)) * (depth_end - depth_start) + ) + await report_progress( + batch_progress, + f"Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}", + totalPages=total_processed + batch_idx, + processedPages=len(results_all), + ) + # Use arun_many for native parallel crawling with streaming logger.info(f"Starting parallel crawl of {len(batch_urls)} URLs with arun_many") - batch_results = await self.crawler.arun_many(urls=batch_urls, config=run_config, dispatcher=dispatcher) - + batch_results = await self.crawler.arun_many( + urls=batch_urls, config=run_config, dispatcher=dispatcher + ) + # Handle streaming results from arun_many i = 0 async for result in batch_results: @@ -180,45 +195,58 @@ class RecursiveCrawlStrategy: if transform_url_func(orig_url) == result.url: original_url = orig_url break - + norm_url = normalize_url(original_url) visited.add(norm_url) total_processed += 1 - + if result.success and result.markdown: results_all.append({ - 'url': original_url, - 'markdown': result.markdown, - 'html': result.html # Always use raw HTML for code extraction + "url": original_url, + "markdown": result.markdown, + "html": result.html, # Always use raw HTML for code extraction }) depth_successful += 1 - + # Find internal links for next depth for link in result.links.get("internal", []): next_url = normalize_url(link["href"]) # Skip binary files and already visited URLs - if next_url not in visited and not self.url_handler.is_binary_file(next_url): + if next_url not in visited and not self.url_handler.is_binary_file( + next_url + ): next_level_urls.add(next_url) elif self.url_handler.is_binary_file(next_url): logger.debug(f"Skipping binary file from crawl queue: {next_url}") else: - logger.warning(f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}") - + logger.warning( + f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}" + ) + # Report progress every few URLs current_idx = batch_idx + i + 1 if current_idx % 5 == 0 or current_idx == len(urls_to_crawl): - current_progress = depth_start + int((current_idx / len(urls_to_crawl)) * (depth_end - depth_start)) - await report_progress(current_progress, - f'Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)', - totalPages=total_processed, - processedPages=len(results_all)) + current_progress = depth_start + int( + (current_idx / len(urls_to_crawl)) * (depth_end - depth_start) + ) + await report_progress( + current_progress, + f"Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)", + totalPages=total_processed, + processedPages=len(results_all), + ) i += 1 - + current_urls = next_level_urls - + # Report completion of this depth - await report_progress(depth_end, - f'Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth') - - await report_progress(end_progress, f'Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels') - return results_all \ No newline at end of file + await report_progress( + depth_end, + f"Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth", + ) + + await report_progress( + end_progress, + f"Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels", + ) + return results_all