chore: Remove unused imports and fix exception chaining

- Remove unused asyncio imports from batch.py and recursive.py
- Add proper exception chaining with 'from e' to preserve stack traces
This commit is contained in:
Rasmus Widing
2025-08-20 22:03:45 +03:00
committed by Wirasm
parent 8792a1b0dd
commit 573e5c18c5
2 changed files with 100 additions and 71 deletions

View File

@@ -4,7 +4,6 @@ Batch Crawling Strategy
Handles batch crawling of multiple URLs in parallel.
"""
import asyncio
from typing import List, Dict, Any, Optional, Callable
from crawl4ai import CrawlerRunConfig, CacheMode, MemoryAdaptiveDispatcher
@@ -70,10 +69,12 @@ class BatchCrawlStrategy:
except (ValueError, KeyError, TypeError) as e:
# Critical configuration errors should fail fast in alpha
logger.error(f"Invalid crawl settings format: {e}", exc_info=True)
raise ValueError(f"Failed to load crawler configuration: {e}")
raise ValueError(f"Failed to load crawler configuration: {e}") from e
except Exception as e:
# For non-critical errors (e.g., network issues), use defaults but log prominently
logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True)
logger.error(
f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True
)
batch_size = 50
if max_concurrent is None:
max_concurrent = 10 # Safe default to prevent memory issues

View File

@@ -3,7 +3,7 @@ Recursive Crawling Strategy
Handles recursive crawling of websites by following internal links.
"""
import asyncio
from typing import List, Dict, Any, Optional, Callable
from urllib.parse import urldefrag
@@ -17,11 +17,11 @@ logger = get_logger(__name__)
class RecursiveCrawlStrategy:
"""Strategy for recursive crawling of websites."""
def __init__(self, crawler, markdown_generator):
"""
Initialize recursive crawl strategy.
Args:
crawler (AsyncWebCrawler): The Crawl4AI crawler instance for web crawling operations
markdown_generator (DefaultMarkdownGenerator): The markdown generator instance for converting HTML to markdown
@@ -29,7 +29,7 @@ class RecursiveCrawlStrategy:
self.crawler = crawler
self.markdown_generator = markdown_generator
self.url_handler = URLHandler()
async def crawl_recursive_with_progress(
self,
start_urls: List[str],
@@ -39,11 +39,11 @@ class RecursiveCrawlStrategy:
max_concurrent: int = None,
progress_callback: Optional[Callable] = None,
start_progress: int = 10,
end_progress: int = 60
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""
Recursively crawl internal links from start URLs up to a maximum depth with progress reporting.
Args:
start_urls: List of starting URLs
transform_url_func: Function to transform URLs (e.g., GitHub URLs)
@@ -53,16 +53,16 @@ class RecursiveCrawlStrategy:
progress_callback: Optional callback for progress updates
start_progress: Starting progress percentage
end_progress: Ending progress percentage
Returns:
List of crawl results
"""
if not self.crawler:
logger.error("No crawler instance available for recursive crawling")
if progress_callback:
await progress_callback('error', 0, 'Crawler not available')
await progress_callback("error", 0, "Crawler not available")
return []
# Load settings from database - fail fast on configuration errors
try:
settings = await credential_service.get_credentials_by_category("rag_strategy")
@@ -74,22 +74,26 @@ class RecursiveCrawlStrategy:
except (ValueError, KeyError, TypeError) as e:
# Critical configuration errors should fail fast in alpha
logger.error(f"Invalid crawl settings format: {e}", exc_info=True)
raise ValueError(f"Failed to load crawler configuration: {e}")
raise ValueError(f"Failed to load crawler configuration: {e}") from e
except Exception as e:
# For non-critical errors (e.g., network issues), use defaults but log prominently
logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True)
logger.error(
f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True
)
batch_size = 50
if max_concurrent is None:
max_concurrent = 10 # Safe default to prevent memory issues
memory_threshold = 80.0
check_interval = 0.5
settings = {} # Empty dict for defaults
# Check if start URLs include documentation sites
has_doc_sites = any(is_documentation_site_func(url) for url in start_urls)
if has_doc_sites:
logger.info("Detected documentation sites for recursive crawl, using enhanced configuration")
logger.info(
"Detected documentation sites for recursive crawl, using enhanced configuration"
)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
stream=True, # Enable streaming for faster parallel processing
@@ -101,7 +105,7 @@ class RecursiveCrawlStrategy:
scan_full_page=True, # Trigger lazy loading
exclude_all_images=False,
remove_overlay_elements=True,
process_iframes=True
process_iframes=True,
)
else:
# Configuration for regular recursive crawling
@@ -112,65 +116,76 @@ class RecursiveCrawlStrategy:
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")),
scan_full_page=True
scan_full_page=True,
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=memory_threshold,
check_interval=check_interval,
max_session_permit=max_concurrent
max_session_permit=max_concurrent,
)
async def report_progress(percentage: int, message: str, **kwargs):
"""Helper to report progress if callback is available"""
if progress_callback:
# Add step information for multi-progress tracking
step_info = {
'currentStep': message,
'stepMessage': message,
**kwargs
}
await progress_callback('crawling', percentage, message, **step_info)
step_info = {"currentStep": message, "stepMessage": message, **kwargs}
await progress_callback("crawling", percentage, message, **step_info)
visited = set()
def normalize_url(url):
return urldefrag(url)[0]
current_urls = set([normalize_url(u) for u in start_urls])
results_all = []
total_processed = 0
for depth in range(max_depth):
urls_to_crawl = [normalize_url(url) for url in current_urls if normalize_url(url) not in visited]
urls_to_crawl = [
normalize_url(url) for url in current_urls if normalize_url(url) not in visited
]
if not urls_to_crawl:
break
# Calculate progress for this depth level
depth_start = start_progress + int((depth / max_depth) * (end_progress - start_progress) * 0.8)
depth_end = start_progress + int(((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8)
await report_progress(depth_start, f'Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process')
depth_start = start_progress + int(
(depth / max_depth) * (end_progress - start_progress) * 0.8
)
depth_end = start_progress + int(
((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8
)
await report_progress(
depth_start,
f"Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process",
)
# Use configured batch size for recursive crawling
next_level_urls = set()
depth_successful = 0
for batch_idx in range(0, len(urls_to_crawl), batch_size):
batch_urls = urls_to_crawl[batch_idx:batch_idx + batch_size]
batch_urls = urls_to_crawl[batch_idx : batch_idx + batch_size]
batch_end_idx = min(batch_idx + batch_size, len(urls_to_crawl))
# Calculate progress for this batch within the depth
batch_progress = depth_start + int((batch_idx / len(urls_to_crawl)) * (depth_end - depth_start))
await report_progress(batch_progress,
f'Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}',
totalPages=total_processed + batch_idx,
processedPages=len(results_all))
batch_progress = depth_start + int(
(batch_idx / len(urls_to_crawl)) * (depth_end - depth_start)
)
await report_progress(
batch_progress,
f"Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}",
totalPages=total_processed + batch_idx,
processedPages=len(results_all),
)
# Use arun_many for native parallel crawling with streaming
logger.info(f"Starting parallel crawl of {len(batch_urls)} URLs with arun_many")
batch_results = await self.crawler.arun_many(urls=batch_urls, config=run_config, dispatcher=dispatcher)
batch_results = await self.crawler.arun_many(
urls=batch_urls, config=run_config, dispatcher=dispatcher
)
# Handle streaming results from arun_many
i = 0
async for result in batch_results:
@@ -180,45 +195,58 @@ class RecursiveCrawlStrategy:
if transform_url_func(orig_url) == result.url:
original_url = orig_url
break
norm_url = normalize_url(original_url)
visited.add(norm_url)
total_processed += 1
if result.success and result.markdown:
results_all.append({
'url': original_url,
'markdown': result.markdown,
'html': result.html # Always use raw HTML for code extraction
"url": original_url,
"markdown": result.markdown,
"html": result.html, # Always use raw HTML for code extraction
})
depth_successful += 1
# Find internal links for next depth
for link in result.links.get("internal", []):
next_url = normalize_url(link["href"])
# Skip binary files and already visited URLs
if next_url not in visited and not self.url_handler.is_binary_file(next_url):
if next_url not in visited and not self.url_handler.is_binary_file(
next_url
):
next_level_urls.add(next_url)
elif self.url_handler.is_binary_file(next_url):
logger.debug(f"Skipping binary file from crawl queue: {next_url}")
else:
logger.warning(f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}")
logger.warning(
f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}"
)
# Report progress every few URLs
current_idx = batch_idx + i + 1
if current_idx % 5 == 0 or current_idx == len(urls_to_crawl):
current_progress = depth_start + int((current_idx / len(urls_to_crawl)) * (depth_end - depth_start))
await report_progress(current_progress,
f'Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)',
totalPages=total_processed,
processedPages=len(results_all))
current_progress = depth_start + int(
(current_idx / len(urls_to_crawl)) * (depth_end - depth_start)
)
await report_progress(
current_progress,
f"Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)",
totalPages=total_processed,
processedPages=len(results_all),
)
i += 1
current_urls = next_level_urls
# Report completion of this depth
await report_progress(depth_end,
f'Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth')
await report_progress(end_progress, f'Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels')
return results_all
await report_progress(
depth_end,
f"Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth",
)
await report_progress(
end_progress,
f"Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels",
)
return results_all