From bc97f5dbf5209eeb1b2b4afdb483be93665fd7b3 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Sat, 4 Oct 2025 13:46:19 +0000 Subject: [PATCH] feat: integrate Docling for advanced document processing in RAG pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Docling dependency to pyproject.toml for multi-format document support - Create docling_processing.py with DocumentConverter and HybridChunker - Implement intelligent semantic-aware chunking for better RAG performance - Add enhanced document upload method with Docling integration - Update knowledge API to use enhanced processing with legacy fallback - Support PDF, DOCX, PPTX, XLSX, HTML formats with structure preservation - Include enhanced metadata tracking for processing methods - Maintain backward compatibility with existing document processors Resolves #756 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Cole Medin --- python/pyproject.toml | 2 + python/src/server/api_routes/knowledge_api.py | 170 ++++--- .../services/storage/storage_services.py | 211 +++++++++ python/src/server/utils/docling_processing.py | 419 ++++++++++++++++++ .../src/server/utils/document_processing.py | 156 +++++++ 5 files changed, 893 insertions(+), 65 deletions(-) create mode 100644 python/src/server/utils/docling_processing.py diff --git a/python/pyproject.toml b/python/pyproject.toml index c4ce74c6..91606918 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -50,6 +50,7 @@ server = [ "pdfplumber>=0.11.6", "python-docx>=1.1.2", "markdown>=3.8", + "docling>=1.0.0", # Security and utilities "python-jose[cryptography]>=3.3.0", "cryptography>=41.0.0", @@ -112,6 +113,7 @@ all = [ "pdfplumber>=0.11.6", "python-docx>=1.1.2", "markdown>=3.8", + "docling>=1.0.0", "python-jose[cryptography]>=3.3.0", "cryptography>=41.0.0", "slowapi>=0.1.9", diff --git a/python/src/server/api_routes/knowledge_api.py b/python/src/server/api_routes/knowledge_api.py index 1f26dace..f0c213b4 100644 --- a/python/src/server/api_routes/knowledge_api.py +++ b/python/src/server/api_routes/knowledge_api.py @@ -30,7 +30,7 @@ from ..services.knowledge import DatabaseMetricsService, KnowledgeItemService, K from ..services.search.rag_service import RAGService from ..services.storage import DocumentStorageService from ..utils import get_supabase_client -from ..utils.document_processing import extract_text_from_document +from ..utils.document_processing import extract_text_from_document, extract_and_chunk_for_rag # Get logger for this module logger = get_logger(__name__) @@ -84,39 +84,39 @@ async def _validate_provider_api_key(provider: str = None) -> None: safe_provider = provider[:20] # Limit length logger.info(f"🔑 Testing {safe_provider.title()} API key with minimal embedding request...") - try: - # Test API key with minimal embedding request using provider-scoped configuration - from ..services.embeddings.embedding_service import create_embedding - - test_result = await create_embedding(text="test", provider=provider) - - if not test_result: - logger.error( - f"❌ {provider.title()} API key validation failed - no embedding returned" - ) - raise HTTPException( - status_code=401, - detail={ - "error": f"Invalid {provider.title()} API key", - "message": f"Please verify your {provider.title()} API key in Settings.", - "error_type": "authentication_failed", - "provider": provider, - }, - ) - except Exception as e: - logger.error( - f"❌ {provider.title()} API key validation failed: {e}", - exc_info=True, - ) - raise HTTPException( - status_code=401, - detail={ - "error": f"Invalid {provider.title()} API key", - "message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}", - "error_type": "authentication_failed", - "provider": provider, - }, - ) + try: + # Test API key with minimal embedding request using provider-scoped configuration + from ..services.embeddings.embedding_service import create_embedding + + test_result = await create_embedding(text="test", provider=provider) + + if not test_result: + logger.error( + f"❌ {provider.title()} API key validation failed - no embedding returned" + ) + raise HTTPException( + status_code=401, + detail={ + "error": f"Invalid {provider.title()} API key", + "message": f"Please verify your {provider.title()} API key in Settings.", + "error_type": "authentication_failed", + "provider": provider, + }, + ) + except Exception as e: + logger.error( + f"❌ {provider.title()} API key validation failed: {e}", + exc_info=True, + ) + raise HTTPException( + status_code=401, + detail={ + "error": f"Invalid {provider.title()} API key", + "message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}", + "error_type": "authentication_failed", + "provider": provider, + }, + ) logger.info(f"✅ {provider.title()} API key validation successful") @@ -1010,23 +1010,7 @@ async def _perform_upload_with_progress( log=f"Extracting text from {filename}" ) - try: - extracted_text = extract_text_from_document(file_content, filename, content_type) - safe_logfire_info( - f"Document text extracted | filename={filename} | extracted_length={len(extracted_text)} | content_type={content_type}" - ) - except ValueError as ex: - # ValueError indicates unsupported format or empty file - user error - logger.warning(f"Document validation failed: {filename} - {str(ex)}") - await tracker.error(str(ex)) - return - except Exception as ex: - # Other exceptions are system errors - log with full traceback - logger.error(f"Failed to extract text from document: {filename}", exc_info=True) - await tracker.error(f"Failed to extract text from document: {str(ex)}") - return - - # Use DocumentStorageService to handle the upload + # Use DocumentStorageService to handle the upload with enhanced processing doc_storage_service = DocumentStorageService(get_supabase_client()) # Generate source_id from filename with UUID to prevent collisions @@ -1049,29 +1033,85 @@ async def _perform_upload_with_progress( **(batch_info or {}) ) - - # Call the service's upload_document method - success, result = await doc_storage_service.upload_document( - file_content=extracted_text, - filename=filename, - source_id=source_id, - knowledge_type=knowledge_type, - tags=tag_list, - extract_code_examples=extract_code_examples, - progress_callback=document_progress_callback, - cancellation_check=check_upload_cancellation, - ) + # Try enhanced document processing with Docling first + try: + success, result = await doc_storage_service.upload_document_with_enhanced_chunking( + file_content=file_content, # Pass raw bytes to Docling processor + filename=filename, + content_type=content_type, + source_id=source_id, + knowledge_type=knowledge_type, + tags=tag_list, + extract_code_examples=extract_code_examples, + progress_callback=document_progress_callback, + cancellation_check=check_upload_cancellation, + ) + + # Log the processing method used + processing_method = result.get("processing_method", "unknown") + safe_logfire_info( + f"Enhanced document processing completed | filename={filename} | method={processing_method} | chunks_stored={result.get('chunks_stored', 0)}" + ) + + except Exception as enhanced_error: + # If enhanced processing fails, fall back to legacy processing + logger.warning(f"Enhanced processing failed for {filename}: {enhanced_error}. Falling back to legacy processing.") + + try: + # Extract text using legacy method + extracted_text = extract_text_from_document(file_content, filename, content_type) + safe_logfire_info( + f"Legacy document text extracted | filename={filename} | extracted_length={len(extracted_text)} | content_type={content_type}" + ) + + # Use legacy upload method + success, result = await doc_storage_service.upload_document( + file_content=extracted_text, + filename=filename, + source_id=source_id, + knowledge_type=knowledge_type, + tags=tag_list, + extract_code_examples=extract_code_examples, + progress_callback=document_progress_callback, + cancellation_check=check_upload_cancellation, + ) + + # Add processing method to result for tracking + result["processing_method"] = "legacy_fallback" + result["fallback_reason"] = str(enhanced_error) + + except ValueError as ex: + # ValueError indicates unsupported format or empty file - user error + logger.warning(f"Document validation failed: {filename} - {str(ex)}") + await tracker.error(str(ex)) + return + except Exception as ex: + # Other exceptions are system errors - log with full traceback + logger.error(f"Failed to extract text from document: {filename}", exc_info=True) + await tracker.error(f"Failed to extract text from document: {str(ex)}") + return if success: # Complete the upload with 100% progress + processing_method = result.get("processing_method", "unknown") + extraction_method = result.get("extraction_method", "unknown") + chunking_method = result.get("chunking_method", "unknown") + + completion_log = f"Document uploaded successfully using {processing_method} processing!" + if processing_method == "docling_enhanced": + completion_log += f" (extraction: {extraction_method}, chunking: {chunking_method})" + await tracker.complete({ - "log": "Document uploaded successfully!", + "log": completion_log, "chunks_stored": result.get("chunks_stored"), "code_examples_stored": result.get("code_examples_stored", 0), "sourceId": result.get("source_id"), + "processing_method": processing_method, + "extraction_method": extraction_method, + "chunking_method": chunking_method, }) safe_logfire_info( - f"Document uploaded successfully | progress_id={progress_id} | source_id={result.get('source_id')} | chunks_stored={result.get('chunks_stored')} | code_examples_stored={result.get('code_examples_stored', 0)}" + f"Document uploaded successfully | progress_id={progress_id} | source_id={result.get('source_id')} | chunks_stored={result.get('chunks_stored')} | code_examples_stored={result.get('code_examples_stored', 0)} | processing_method={processing_method}" ) else: error_msg = result.get("error", "Unknown error") diff --git a/python/src/server/services/storage/storage_services.py b/python/src/server/services/storage/storage_services.py index d3daecdb..687a6c6d 100644 --- a/python/src/server/services/storage/storage_services.py +++ b/python/src/server/services/storage/storage_services.py @@ -322,3 +322,214 @@ class DocumentStorageService(BaseStorageService): except Exception as e: logger.error(f"Error in store_code_examples: {e}") return False, {"error": str(e)} + + async def upload_document_with_enhanced_chunking( + self, + file_content: bytes, + filename: str, + content_type: str, + source_id: str, + knowledge_type: str = "documentation", + tags: list[str] | None = None, + extract_code_examples: bool = True, + progress_callback: Any | None = None, + cancellation_check: Any | None = None, + max_tokens_per_chunk: int = 512, + ) -> tuple[bool, dict[str, Any]]: + """ + Upload and process a document using enhanced Docling chunking. + + This method uses Docling's intelligent chunking when available, + falling back to legacy processing for unsupported formats. + + Args: + file_content: Raw document bytes + filename: Name of the file + content_type: MIME type of the file + source_id: Source identifier + knowledge_type: Type of knowledge + tags: Optional list of tags + extract_code_examples: Whether to extract code examples + progress_callback: Optional callback for progress + cancellation_check: Optional function to check for cancellation + max_tokens_per_chunk: Maximum tokens per chunk for embeddings + + Returns: + Tuple of (success, result_dict) + """ + from ...utils.document_processing import extract_and_chunk_for_rag + + logger.info(f"Enhanced document upload starting: {filename} as {knowledge_type} knowledge") + + with safe_span( + "upload_document_enhanced", + filename=filename, + source_id=source_id, + content_length=len(file_content), + use_docling=True, + ) as span: + try: + # Progress reporting helper + async def report_progress(message: str, percentage: int, batch_info: dict = None): + if progress_callback: + await progress_callback(message, percentage, batch_info) + + await report_progress("Starting enhanced document processing...", 10) + + # Use enhanced extraction and chunking with Docling + full_text, docling_chunks, doc_metadata = extract_and_chunk_for_rag( + file_content, filename, content_type, max_tokens_per_chunk + ) + + if not docling_chunks: + raise ValueError(f"No content could be extracted from {filename}. The file may be empty, corrupted, or in an unsupported format.") + + logger.info( + f"Enhanced processing completed for {filename}: " + f"{len(docling_chunks)} chunks created with {doc_metadata.get('extraction_method', 'unknown')} method" + ) + + await report_progress("Preparing enhanced document chunks...", 30) + + # Prepare data for storage using Docling chunks + doc_url = f"file://{filename}" + urls = [] + chunk_numbers = [] + contents = [] + metadatas = [] + total_word_count = 0 + + # Process Docling chunks with enhanced metadata + for i, chunk in enumerate(docling_chunks): + chunk_text = chunk["text"] + chunk_metadata = chunk.get("metadata", {}) + + # Combine base metadata with Docling metadata + enhanced_meta = { + "chunk_index": i, + "url": doc_url, + "source": source_id, + "source_id": source_id, + "knowledge_type": knowledge_type, + "source_type": "file", + "filename": filename, + # Add Docling-specific metadata + "docling_processed": doc_metadata.get("docling_processed", False), + "chunking_method": chunk_metadata.get("chunking_method", "unknown"), + "chunk_type": chunk.get("chunk_type", "unknown"), + "estimated_tokens": chunk.get("token_count", 0), + "extraction_method": doc_metadata.get("extraction_method", "legacy"), + } + + # Add document-level metadata to first chunk + if i == 0: + enhanced_meta.update({ + "document_metadata": doc_metadata, + "total_chunks": len(docling_chunks), + }) + + # Add tags if provided + if tags: + enhanced_meta["tags"] = tags + + urls.append(doc_url) + chunk_numbers.append(i) + contents.append(chunk_text) + metadatas.append(enhanced_meta) + total_word_count += len(chunk_text.split()) + + await report_progress(f"Processing {len(docling_chunks)} enhanced chunks...", 40) + + # Store documents using existing document storage + url_to_full_document = {doc_url: full_text} + storage_result = await add_documents_to_supabase( + self.supabase_client, + urls, + chunk_numbers, + contents, + metadatas, + url_to_full_document, + progress_callback=lambda stage, progress, message, **kwargs: report_progress( + f"Storing: {message}", 40 + (progress * 0.5) + ), + cancellation_check=cancellation_check, + ) + + chunks_stored = storage_result.get("chunks_stored", 0) + + await report_progress("Finalizing enhanced document upload...", 90) + + # Extract code examples if requested + code_examples_count = 0 + if extract_code_examples and len(docling_chunks) > 0: + try: + await report_progress("Extracting code examples...", 95) + + logger.info(f"🔍 DEBUG: Starting code extraction for {filename} (enhanced) | extract_code_examples={extract_code_examples}") + + # Import code extraction service + from ..crawling.code_extraction_service import CodeExtractionService + + code_service = CodeExtractionService(self.supabase_client) + + # Create crawl_results format with enhanced metadata + crawl_results = [{ + "url": doc_url, + "markdown": full_text, # Use full extracted text + "html": "", # Empty to prevent HTML extraction path + "content_type": content_type, + "docling_processed": doc_metadata.get("docling_processed", False), + "extraction_method": doc_metadata.get("extraction_method", "legacy"), + }] + + logger.info(f"🔍 DEBUG: Created enhanced crawl_results with url={doc_url}, content_length={len(full_text)}") + + # Create progress callback for code extraction + async def code_progress_callback(data: dict): + if progress_callback: + raw_progress = data.get("progress", data.get("percentage", 0)) + mapped_progress = 95 + (raw_progress / 100.0) * 5 # 95% to 100% + message = data.get("log", "Extracting code examples...") + await progress_callback(message, int(mapped_progress)) + + code_examples_count = await code_service.extract_and_store_code_examples( + crawl_results=crawl_results, + url_to_full_document=url_to_full_document, + source_id=source_id, + progress_callback=code_progress_callback, + cancellation_check=cancellation_check, + ) + + logger.info(f"🔍 DEBUG: Enhanced code extraction completed: {code_examples_count} code examples found for {filename}") + + except Exception as e: + logger.error(f"Code extraction failed for {filename}: {e}", exc_info=True) + code_examples_count = 0 + + await report_progress("Enhanced document upload completed!", 100) + + result_dict = { + "source_id": source_id, + "filename": filename, + "chunks_stored": chunks_stored, + "code_examples_stored": code_examples_count, + "total_word_count": total_word_count, + "processing_method": "docling_enhanced" if doc_metadata.get("docling_processed") else "legacy_fallback", + "extraction_method": doc_metadata.get("extraction_method", "legacy"), + "chunking_method": doc_metadata.get("chunking_method", "unknown"), + "document_metadata": doc_metadata, + } + + span.set_attribute("success", True) + span.set_attribute("chunks_stored", chunks_stored) + span.set_attribute("code_examples_stored", code_examples_count) + span.set_attribute("processing_method", result_dict["processing_method"]) + + logger.info(f"Enhanced document upload completed successfully: {filename}") + return True, result_dict + + except Exception as e: + logger.error(f"Enhanced document upload failed: {filename}", exc_info=True) + span.set_attribute("success", False) + span.set_attribute("error", str(e)) + return False, {"error": str(e), "filename": filename} diff --git a/python/src/server/utils/docling_processing.py b/python/src/server/utils/docling_processing.py new file mode 100644 index 00000000..cd069b05 --- /dev/null +++ b/python/src/server/utils/docling_processing.py @@ -0,0 +1,419 @@ +""" +Docling Document Processing Utilities + +This module provides advanced document processing capabilities using Docling +for multi-format support, intelligent chunking, and structure preservation. +""" + +import io +import tempfile +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from ..config.logfire_config import get_logger, logfire + +logger = get_logger(__name__) + +# Import Docling with availability check +try: + from docling import DocumentConverter + from docling.chunking import HybridChunker + from docling.datamodel.base_models import InputFormat + from docling.datamodel.document import ConversionResult + + DOCLING_AVAILABLE = True +except ImportError as e: + logger.warning(f"Docling not available: {e}") + DOCLING_AVAILABLE = False + + +class DoclingProcessor: + """ + Advanced document processor using Docling for multi-format support + and intelligent chunking optimized for RAG operations. + """ + + def __init__(self): + """Initialize the Docling processor.""" + if not DOCLING_AVAILABLE: + raise ImportError( + "Docling is not available. Please install docling>=1.0.0" + ) + + self.converter = DocumentConverter() + self.chunker = HybridChunker() + + def get_supported_formats(self) -> List[str]: + """ + Get list of file formats supported by Docling. + + Returns: + List of supported file extensions + """ + # Based on Docling documentation + return [ + ".pdf", ".docx", ".pptx", ".xlsx", ".html", ".htm", + ".mp3", ".wav", ".m4a", ".flac" # Audio formats (if ASR is configured) + ] + + def is_supported_format(self, filename: str, content_type: str = None) -> bool: + """ + Check if a file format is supported by Docling. + + Args: + filename: Name of the file + content_type: MIME type of the file (optional) + + Returns: + True if format is supported + """ + if not DOCLING_AVAILABLE: + return False + + file_ext = Path(filename).suffix.lower() + return file_ext in self.get_supported_formats() + + def detect_input_format(self, filename: str, content_type: str = None) -> Optional[InputFormat]: + """ + Detect the input format for Docling processing. + + Args: + filename: Name of the file + content_type: MIME type of the file + + Returns: + InputFormat enum value or None if unsupported + """ + file_ext = Path(filename).suffix.lower() + + format_mapping = { + ".pdf": InputFormat.PDF, + ".docx": InputFormat.DOCX, + ".pptx": InputFormat.PPTX, + ".xlsx": InputFormat.XLSX, + ".html": InputFormat.HTML, + ".htm": InputFormat.HTML, + } + + return format_mapping.get(file_ext) + + def extract_text_and_structure( + self, + file_content: bytes, + filename: str, + content_type: str = None + ) -> Tuple[str, Dict[str, Any]]: + """ + Extract text and structural information from document using Docling. + + Args: + file_content: Raw file bytes + filename: Name of the file + content_type: MIME type of the file + + Returns: + Tuple of (extracted_markdown_text, metadata_dict) + + Raises: + ValueError: If the file format is not supported + Exception: If extraction fails + """ + if not DOCLING_AVAILABLE: + raise Exception("Docling is not available") + + if not self.is_supported_format(filename, content_type): + raise ValueError(f"Unsupported file format for Docling: {filename}") + + try: + # Create temporary file for Docling processing + with tempfile.NamedTemporaryFile(suffix=Path(filename).suffix, delete=False) as temp_file: + temp_file.write(file_content) + temp_path = Path(temp_file.name) + + try: + # Convert document using Docling + logfire.info( + "Starting Docling document conversion", + filename=filename, + file_size=len(file_content) + ) + + result: ConversionResult = self.converter.convert(temp_path) + + # Export to Markdown for RAG-optimized text + markdown_text = result.document.export_to_markdown() + + # Extract metadata + metadata = { + "docling_processed": True, + "original_filename": filename, + "content_type": content_type, + "extraction_method": "docling", + "document_structure": { + "has_tables": bool(result.document.tables), + "has_figures": bool(result.document.figures), + "page_count": len(result.document.pages) if result.document.pages else None, + } + } + + # Add table information if present + if result.document.tables: + metadata["table_count"] = len(result.document.tables) + + # Add figure information if present + if result.document.figures: + metadata["figure_count"] = len(result.document.figures) + + logfire.info( + "Docling document conversion completed", + filename=filename, + text_length=len(markdown_text), + metadata=metadata + ) + + return markdown_text, metadata + + finally: + # Clean up temporary file + try: + temp_path.unlink() + except Exception as cleanup_error: + logger.warning(f"Failed to cleanup temp file {temp_path}: {cleanup_error}") + + except Exception as e: + logfire.error( + "Docling document extraction failed", + filename=filename, + error=str(e), + exc_info=True + ) + raise Exception(f"Failed to extract text using Docling from {filename}") from e + + def create_intelligent_chunks( + self, + markdown_text: str, + metadata: Dict[str, Any] = None, + max_tokens: int = 512 + ) -> List[Dict[str, Any]]: + """ + Create intelligent chunks using Docling's HybridChunker. + + Args: + markdown_text: The markdown text to chunk + metadata: Document metadata to include in chunks + max_tokens: Maximum tokens per chunk (default: 512 for embeddings) + + Returns: + List of chunk dictionaries with text and metadata + """ + if not DOCLING_AVAILABLE: + raise Exception("Docling is not available") + + try: + # Use Docling's HybridChunker for semantic chunking + chunks = self.chunker.chunk(markdown_text, max_tokens=max_tokens) + + chunk_list = [] + for i, chunk in enumerate(chunks): + chunk_data = { + "text": chunk.text, + "chunk_index": i, + "chunk_type": "hybrid_semantic", + "token_count": len(chunk.text.split()), # Rough token estimation + "metadata": { + **(metadata or {}), + "chunking_method": "docling_hybrid", + "chunk_boundaries": "semantic_aware" + } + } + chunk_list.append(chunk_data) + + logfire.info( + "Docling intelligent chunking completed", + original_length=len(markdown_text), + chunks_created=len(chunk_list), + max_tokens=max_tokens + ) + + return chunk_list + + except Exception as e: + logfire.error( + "Docling chunking failed", + error=str(e), + text_length=len(markdown_text), + exc_info=True + ) + # Fallback to simple text chunking + logger.warning("Falling back to simple chunking due to Docling error") + return self._fallback_simple_chunks(markdown_text, metadata, max_tokens) + + def _fallback_simple_chunks( + self, + text: str, + metadata: Dict[str, Any] = None, + max_tokens: int = 512 + ) -> List[Dict[str, Any]]: + """ + Fallback to simple text chunking if Docling chunking fails. + + Args: + text: Text to chunk + metadata: Metadata to include + max_tokens: Maximum tokens per chunk + + Returns: + List of simple chunks + """ + # Simple word-based chunking as fallback + words = text.split() + chunk_size = max_tokens * 3 # Rough words-to-tokens ratio + + chunks = [] + for i in range(0, len(words), chunk_size): + chunk_words = words[i:i + chunk_size] + chunk_text = " ".join(chunk_words) + + chunk_data = { + "text": chunk_text, + "chunk_index": i // chunk_size, + "chunk_type": "simple_fallback", + "token_count": len(chunk_words), + "metadata": { + **(metadata or {}), + "chunking_method": "simple_fallback", + "chunk_boundaries": "word_based" + } + } + chunks.append(chunk_data) + + return chunks + + def process_document_for_rag( + self, + file_content: bytes, + filename: str, + content_type: str = None, + max_tokens_per_chunk: int = 512 + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + """ + Complete document processing pipeline for RAG operations. + + Args: + file_content: Raw file bytes + filename: Name of the file + content_type: MIME type of the file + max_tokens_per_chunk: Maximum tokens per chunk for embeddings + + Returns: + Tuple of (chunk_list, document_metadata) + """ + try: + # Extract text and structure + markdown_text, doc_metadata = self.extract_text_and_structure( + file_content, filename, content_type + ) + + # Create intelligent chunks + chunks = self.create_intelligent_chunks( + markdown_text, doc_metadata, max_tokens_per_chunk + ) + + # Update document metadata + doc_metadata.update({ + "total_chunks": len(chunks), + "processing_pipeline": "docling_rag_optimized", + "chunk_token_limit": max_tokens_per_chunk + }) + + logfire.info( + "Docling RAG processing completed", + filename=filename, + total_chunks=len(chunks), + total_text_length=len(markdown_text) + ) + + return chunks, doc_metadata + + except Exception as e: + logfire.error( + "Docling RAG processing failed", + filename=filename, + error=str(e), + exc_info=True + ) + raise + + +# Global processor instance +_docling_processor: Optional[DoclingProcessor] = None + + +def get_docling_processor() -> DoclingProcessor: + """ + Get a singleton instance of the Docling processor. + + Returns: + DoclingProcessor instance + + Raises: + ImportError: If Docling is not available + """ + global _docling_processor + + if _docling_processor is None: + _docling_processor = DoclingProcessor() + + return _docling_processor + + +def is_docling_available() -> bool: + """ + Check if Docling is available for use. + + Returns: + True if Docling can be imported and used + """ + return DOCLING_AVAILABLE + + +def process_document_with_docling( + file_content: bytes, + filename: str, + content_type: str = None +) -> Tuple[str, Dict[str, Any]]: + """ + Convenience function to process a document with Docling. + + Args: + file_content: Raw file bytes + filename: Name of the file + content_type: MIME type of the file + + Returns: + Tuple of (extracted_text, metadata) + """ + processor = get_docling_processor() + return processor.extract_text_and_structure(file_content, filename, content_type) + + +def create_rag_chunks_with_docling( + file_content: bytes, + filename: str, + content_type: str = None, + max_tokens: int = 512 +) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + """ + Convenience function to create RAG-optimized chunks with Docling. + + Args: + file_content: Raw file bytes + filename: Name of the file + content_type: MIME type of the file + max_tokens: Maximum tokens per chunk + + Returns: + Tuple of (chunk_list, document_metadata) + """ + processor = get_docling_processor() + return processor.process_document_for_rag(file_content, filename, content_type, max_tokens) \ No newline at end of file diff --git a/python/src/server/utils/document_processing.py b/python/src/server/utils/document_processing.py index 03e35a15..0dc1fbc6 100644 --- a/python/src/server/utils/document_processing.py +++ b/python/src/server/utils/document_processing.py @@ -33,6 +33,18 @@ except ImportError: from ..config.logfire_config import get_logger, logfire +# Import Docling processing utilities +try: + from .docling_processing import ( + create_rag_chunks_with_docling, + is_docling_available, + process_document_with_docling, + ) + + DOCLING_INTEGRATION_AVAILABLE = True +except ImportError: + DOCLING_INTEGRATION_AVAILABLE = False + logger = get_logger(__name__) @@ -158,6 +170,8 @@ def _clean_html_to_text(html_content: str) -> str: def extract_text_from_document(file_content: bytes, filename: str, content_type: str) -> str: """ Extract text from various document formats. + + Uses Docling for advanced processing when available, with fallback to legacy processors. Args: file_content: Raw file bytes @@ -172,6 +186,23 @@ def extract_text_from_document(file_content: bytes, filename: str, content_type: Exception: If extraction fails """ try: + # Try Docling first if available and format is supported + if DOCLING_INTEGRATION_AVAILABLE and is_docling_available(): + try: + text, metadata = process_document_with_docling(file_content, filename, content_type) + if text and text.strip(): + logger.info(f"Successfully processed {filename} with Docling") + return text + else: + logger.warning(f"Docling returned empty text for {filename}, falling back to legacy processors") + except ValueError as docling_error: + # Docling doesn't support this format, continue to legacy processors + logger.debug(f"Docling doesn't support {filename}: {docling_error}") + except Exception as docling_error: + # Docling failed, log warning and continue to legacy processors + logger.warning(f"Docling processing failed for {filename}: {docling_error}. Falling back to legacy processors.") + + # Legacy document processing (existing logic) # PDF files if content_type == "application/pdf" or filename.lower().endswith(".pdf"): return extract_text_from_pdf(file_content) @@ -342,3 +373,128 @@ def extract_text_from_docx(file_content: bytes) -> str: except Exception as e: raise Exception("Failed to extract text from Word document") from e + + +def extract_and_chunk_for_rag( + file_content: bytes, + filename: str, + content_type: str, + max_tokens_per_chunk: int = 512 +) -> tuple[str, list[dict], dict]: + """ + Extract text and create intelligent chunks optimized for RAG operations. + + Uses Docling's HybridChunker for semantic-aware chunking when available, + with fallback to basic text extraction and simple chunking. + + Args: + file_content: Raw file bytes + filename: Name of the file + content_type: MIME type of the file + max_tokens_per_chunk: Maximum tokens per chunk for embeddings + + Returns: + Tuple of (full_text, chunk_list, metadata) + - full_text: Complete extracted text + - chunk_list: List of chunk dictionaries with text and metadata + - metadata: Document-level metadata + + Raises: + ValueError: If the file format is not supported + Exception: If extraction fails + """ + try: + # Try Docling's complete RAG processing pipeline first + if DOCLING_INTEGRATION_AVAILABLE and is_docling_available(): + try: + chunks, doc_metadata = create_rag_chunks_with_docling( + file_content, filename, content_type, max_tokens_per_chunk + ) + + # Reconstruct full text from chunks for backward compatibility + full_text = "\n\n".join(chunk["text"] for chunk in chunks) + + logger.info( + f"Successfully processed {filename} with Docling RAG pipeline: " + f"{len(chunks)} chunks created" + ) + + return full_text, chunks, doc_metadata + + except ValueError as docling_error: + # Docling doesn't support this format, continue to legacy processing + logger.debug(f"Docling doesn't support {filename}: {docling_error}") + except Exception as docling_error: + # Docling failed, log warning and continue to legacy processing + logger.warning( + f"Docling RAG processing failed for {filename}: {docling_error}. " + f"Falling back to legacy processing with simple chunking." + ) + + # Fallback to legacy extraction + simple chunking + full_text = extract_text_from_document(file_content, filename, content_type) + + # Create simple chunks as fallback + chunks = _create_simple_chunks(full_text, max_tokens_per_chunk) + + # Basic metadata + metadata = { + "docling_processed": False, + "original_filename": filename, + "content_type": content_type, + "extraction_method": "legacy", + "chunking_method": "simple_token_based", + "total_chunks": len(chunks), + "chunk_token_limit": max_tokens_per_chunk + } + + logger.info( + f"Processed {filename} with legacy methods: {len(chunks)} chunks created" + ) + + return full_text, chunks, metadata + + except Exception as e: + logfire.error( + "Document RAG processing failed", + filename=filename, + content_type=content_type, + error=str(e), + ) + raise Exception(f"Failed to process {filename} for RAG") from e + + +def _create_simple_chunks(text: str, max_tokens_per_chunk: int = 512) -> list[dict]: + """ + Create simple word-based chunks as fallback when Docling is not available. + + Args: + text: Text to chunk + max_tokens_per_chunk: Maximum tokens per chunk + + Returns: + List of chunk dictionaries + """ + words = text.split() + # Rough estimation: 1 token ≈ 0.75 words for English text + words_per_chunk = int(max_tokens_per_chunk * 0.75) + + chunks = [] + for i in range(0, len(words), words_per_chunk): + chunk_words = words[i:i + words_per_chunk] + chunk_text = " ".join(chunk_words) + + chunk_data = { + "text": chunk_text, + "chunk_index": i // words_per_chunk, + "chunk_type": "simple_word_based", + "token_count": len(chunk_words), # Rough estimate + "metadata": { + "chunking_method": "simple_word_based", + "chunk_boundaries": "word_based", + "fallback_chunking": True + } + } + chunks.append(chunk_data) + + return chunks