feat: integrate Docling for advanced document processing in RAG pipeline

- Add Docling dependency to pyproject.toml for multi-format document support
- Create docling_processing.py with DocumentConverter and HybridChunker
- Implement intelligent semantic-aware chunking for better RAG performance
- Add enhanced document upload method with Docling integration
- Update knowledge API to use enhanced processing with legacy fallback
- Support PDF, DOCX, PPTX, XLSX, HTML formats with structure preservation
- Include enhanced metadata tracking for processing methods
- Maintain backward compatibility with existing document processors

Resolves #756

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Cole Medin <coleam00@users.noreply.github.com>
This commit is contained in:
claude[bot]
2025-10-04 13:46:19 +00:00
parent 63a92cf7d7
commit bc97f5dbf5
5 changed files with 893 additions and 65 deletions

View File

@@ -50,6 +50,7 @@ server = [
"pdfplumber>=0.11.6",
"python-docx>=1.1.2",
"markdown>=3.8",
"docling>=1.0.0",
# Security and utilities
"python-jose[cryptography]>=3.3.0",
"cryptography>=41.0.0",
@@ -112,6 +113,7 @@ all = [
"pdfplumber>=0.11.6",
"python-docx>=1.1.2",
"markdown>=3.8",
"docling>=1.0.0",
"python-jose[cryptography]>=3.3.0",
"cryptography>=41.0.0",
"slowapi>=0.1.9",

View File

@@ -30,7 +30,7 @@ from ..services.knowledge import DatabaseMetricsService, KnowledgeItemService, K
from ..services.search.rag_service import RAGService
from ..services.storage import DocumentStorageService
from ..utils import get_supabase_client
from ..utils.document_processing import extract_text_from_document
from ..utils.document_processing import extract_text_from_document, extract_and_chunk_for_rag
# Get logger for this module
logger = get_logger(__name__)
@@ -84,39 +84,39 @@ async def _validate_provider_api_key(provider: str = None) -> None:
safe_provider = provider[:20] # Limit length
logger.info(f"🔑 Testing {safe_provider.title()} API key with minimal embedding request...")
try:
# Test API key with minimal embedding request using provider-scoped configuration
from ..services.embeddings.embedding_service import create_embedding
test_result = await create_embedding(text="test", provider=provider)
if not test_result:
logger.error(
f"{provider.title()} API key validation failed - no embedding returned"
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings.",
"error_type": "authentication_failed",
"provider": provider,
},
)
except Exception as e:
logger.error(
f"{provider.title()} API key validation failed: {e}",
exc_info=True,
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}",
"error_type": "authentication_failed",
"provider": provider,
},
)
try:
# Test API key with minimal embedding request using provider-scoped configuration
from ..services.embeddings.embedding_service import create_embedding
test_result = await create_embedding(text="test", provider=provider)
if not test_result:
logger.error(
f"{provider.title()} API key validation failed - no embedding returned"
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings.",
"error_type": "authentication_failed",
"provider": provider,
},
)
except Exception as e:
logger.error(
f"{provider.title()} API key validation failed: {e}",
exc_info=True,
)
raise HTTPException(
status_code=401,
detail={
"error": f"Invalid {provider.title()} API key",
"message": f"Please verify your {provider.title()} API key in Settings. Error: {str(e)[:100]}",
"error_type": "authentication_failed",
"provider": provider,
},
)
logger.info(f"{provider.title()} API key validation successful")
@@ -1010,23 +1010,7 @@ async def _perform_upload_with_progress(
log=f"Extracting text from {filename}"
)
try:
extracted_text = extract_text_from_document(file_content, filename, content_type)
safe_logfire_info(
f"Document text extracted | filename={filename} | extracted_length={len(extracted_text)} | content_type={content_type}"
)
except ValueError as ex:
# ValueError indicates unsupported format or empty file - user error
logger.warning(f"Document validation failed: {filename} - {str(ex)}")
await tracker.error(str(ex))
return
except Exception as ex:
# Other exceptions are system errors - log with full traceback
logger.error(f"Failed to extract text from document: {filename}", exc_info=True)
await tracker.error(f"Failed to extract text from document: {str(ex)}")
return
# Use DocumentStorageService to handle the upload
# Use DocumentStorageService to handle the upload with enhanced processing
doc_storage_service = DocumentStorageService(get_supabase_client())
# Generate source_id from filename with UUID to prevent collisions
@@ -1049,29 +1033,85 @@ async def _perform_upload_with_progress(
**(batch_info or {})
)
# Call the service's upload_document method
success, result = await doc_storage_service.upload_document(
file_content=extracted_text,
filename=filename,
source_id=source_id,
knowledge_type=knowledge_type,
tags=tag_list,
extract_code_examples=extract_code_examples,
progress_callback=document_progress_callback,
cancellation_check=check_upload_cancellation,
)
# Try enhanced document processing with Docling first
try:
success, result = await doc_storage_service.upload_document_with_enhanced_chunking(
file_content=file_content, # Pass raw bytes to Docling processor
filename=filename,
content_type=content_type,
source_id=source_id,
knowledge_type=knowledge_type,
tags=tag_list,
extract_code_examples=extract_code_examples,
progress_callback=document_progress_callback,
cancellation_check=check_upload_cancellation,
)
# Log the processing method used
processing_method = result.get("processing_method", "unknown")
safe_logfire_info(
f"Enhanced document processing completed | filename={filename} | method={processing_method} | chunks_stored={result.get('chunks_stored', 0)}"
)
except Exception as enhanced_error:
# If enhanced processing fails, fall back to legacy processing
logger.warning(f"Enhanced processing failed for {filename}: {enhanced_error}. Falling back to legacy processing.")
try:
# Extract text using legacy method
extracted_text = extract_text_from_document(file_content, filename, content_type)
safe_logfire_info(
f"Legacy document text extracted | filename={filename} | extracted_length={len(extracted_text)} | content_type={content_type}"
)
# Use legacy upload method
success, result = await doc_storage_service.upload_document(
file_content=extracted_text,
filename=filename,
source_id=source_id,
knowledge_type=knowledge_type,
tags=tag_list,
extract_code_examples=extract_code_examples,
progress_callback=document_progress_callback,
cancellation_check=check_upload_cancellation,
)
# Add processing method to result for tracking
result["processing_method"] = "legacy_fallback"
result["fallback_reason"] = str(enhanced_error)
except ValueError as ex:
# ValueError indicates unsupported format or empty file - user error
logger.warning(f"Document validation failed: {filename} - {str(ex)}")
await tracker.error(str(ex))
return
except Exception as ex:
# Other exceptions are system errors - log with full traceback
logger.error(f"Failed to extract text from document: {filename}", exc_info=True)
await tracker.error(f"Failed to extract text from document: {str(ex)}")
return
if success:
# Complete the upload with 100% progress
processing_method = result.get("processing_method", "unknown")
extraction_method = result.get("extraction_method", "unknown")
chunking_method = result.get("chunking_method", "unknown")
completion_log = f"Document uploaded successfully using {processing_method} processing!"
if processing_method == "docling_enhanced":
completion_log += f" (extraction: {extraction_method}, chunking: {chunking_method})"
await tracker.complete({
"log": "Document uploaded successfully!",
"log": completion_log,
"chunks_stored": result.get("chunks_stored"),
"code_examples_stored": result.get("code_examples_stored", 0),
"sourceId": result.get("source_id"),
"processing_method": processing_method,
"extraction_method": extraction_method,
"chunking_method": chunking_method,
})
safe_logfire_info(
f"Document uploaded successfully | progress_id={progress_id} | source_id={result.get('source_id')} | chunks_stored={result.get('chunks_stored')} | code_examples_stored={result.get('code_examples_stored', 0)}"
f"Document uploaded successfully | progress_id={progress_id} | source_id={result.get('source_id')} | chunks_stored={result.get('chunks_stored')} | code_examples_stored={result.get('code_examples_stored', 0)} | processing_method={processing_method}"
)
else:
error_msg = result.get("error", "Unknown error")

View File

@@ -322,3 +322,214 @@ class DocumentStorageService(BaseStorageService):
except Exception as e:
logger.error(f"Error in store_code_examples: {e}")
return False, {"error": str(e)}
async def upload_document_with_enhanced_chunking(
self,
file_content: bytes,
filename: str,
content_type: str,
source_id: str,
knowledge_type: str = "documentation",
tags: list[str] | None = None,
extract_code_examples: bool = True,
progress_callback: Any | None = None,
cancellation_check: Any | None = None,
max_tokens_per_chunk: int = 512,
) -> tuple[bool, dict[str, Any]]:
"""
Upload and process a document using enhanced Docling chunking.
This method uses Docling's intelligent chunking when available,
falling back to legacy processing for unsupported formats.
Args:
file_content: Raw document bytes
filename: Name of the file
content_type: MIME type of the file
source_id: Source identifier
knowledge_type: Type of knowledge
tags: Optional list of tags
extract_code_examples: Whether to extract code examples
progress_callback: Optional callback for progress
cancellation_check: Optional function to check for cancellation
max_tokens_per_chunk: Maximum tokens per chunk for embeddings
Returns:
Tuple of (success, result_dict)
"""
from ...utils.document_processing import extract_and_chunk_for_rag
logger.info(f"Enhanced document upload starting: {filename} as {knowledge_type} knowledge")
with safe_span(
"upload_document_enhanced",
filename=filename,
source_id=source_id,
content_length=len(file_content),
use_docling=True,
) as span:
try:
# Progress reporting helper
async def report_progress(message: str, percentage: int, batch_info: dict = None):
if progress_callback:
await progress_callback(message, percentage, batch_info)
await report_progress("Starting enhanced document processing...", 10)
# Use enhanced extraction and chunking with Docling
full_text, docling_chunks, doc_metadata = extract_and_chunk_for_rag(
file_content, filename, content_type, max_tokens_per_chunk
)
if not docling_chunks:
raise ValueError(f"No content could be extracted from {filename}. The file may be empty, corrupted, or in an unsupported format.")
logger.info(
f"Enhanced processing completed for {filename}: "
f"{len(docling_chunks)} chunks created with {doc_metadata.get('extraction_method', 'unknown')} method"
)
await report_progress("Preparing enhanced document chunks...", 30)
# Prepare data for storage using Docling chunks
doc_url = f"file://{filename}"
urls = []
chunk_numbers = []
contents = []
metadatas = []
total_word_count = 0
# Process Docling chunks with enhanced metadata
for i, chunk in enumerate(docling_chunks):
chunk_text = chunk["text"]
chunk_metadata = chunk.get("metadata", {})
# Combine base metadata with Docling metadata
enhanced_meta = {
"chunk_index": i,
"url": doc_url,
"source": source_id,
"source_id": source_id,
"knowledge_type": knowledge_type,
"source_type": "file",
"filename": filename,
# Add Docling-specific metadata
"docling_processed": doc_metadata.get("docling_processed", False),
"chunking_method": chunk_metadata.get("chunking_method", "unknown"),
"chunk_type": chunk.get("chunk_type", "unknown"),
"estimated_tokens": chunk.get("token_count", 0),
"extraction_method": doc_metadata.get("extraction_method", "legacy"),
}
# Add document-level metadata to first chunk
if i == 0:
enhanced_meta.update({
"document_metadata": doc_metadata,
"total_chunks": len(docling_chunks),
})
# Add tags if provided
if tags:
enhanced_meta["tags"] = tags
urls.append(doc_url)
chunk_numbers.append(i)
contents.append(chunk_text)
metadatas.append(enhanced_meta)
total_word_count += len(chunk_text.split())
await report_progress(f"Processing {len(docling_chunks)} enhanced chunks...", 40)
# Store documents using existing document storage
url_to_full_document = {doc_url: full_text}
storage_result = await add_documents_to_supabase(
self.supabase_client,
urls,
chunk_numbers,
contents,
metadatas,
url_to_full_document,
progress_callback=lambda stage, progress, message, **kwargs: report_progress(
f"Storing: {message}", 40 + (progress * 0.5)
),
cancellation_check=cancellation_check,
)
chunks_stored = storage_result.get("chunks_stored", 0)
await report_progress("Finalizing enhanced document upload...", 90)
# Extract code examples if requested
code_examples_count = 0
if extract_code_examples and len(docling_chunks) > 0:
try:
await report_progress("Extracting code examples...", 95)
logger.info(f"🔍 DEBUG: Starting code extraction for {filename} (enhanced) | extract_code_examples={extract_code_examples}")
# Import code extraction service
from ..crawling.code_extraction_service import CodeExtractionService
code_service = CodeExtractionService(self.supabase_client)
# Create crawl_results format with enhanced metadata
crawl_results = [{
"url": doc_url,
"markdown": full_text, # Use full extracted text
"html": "", # Empty to prevent HTML extraction path
"content_type": content_type,
"docling_processed": doc_metadata.get("docling_processed", False),
"extraction_method": doc_metadata.get("extraction_method", "legacy"),
}]
logger.info(f"🔍 DEBUG: Created enhanced crawl_results with url={doc_url}, content_length={len(full_text)}")
# Create progress callback for code extraction
async def code_progress_callback(data: dict):
if progress_callback:
raw_progress = data.get("progress", data.get("percentage", 0))
mapped_progress = 95 + (raw_progress / 100.0) * 5 # 95% to 100%
message = data.get("log", "Extracting code examples...")
await progress_callback(message, int(mapped_progress))
code_examples_count = await code_service.extract_and_store_code_examples(
crawl_results=crawl_results,
url_to_full_document=url_to_full_document,
source_id=source_id,
progress_callback=code_progress_callback,
cancellation_check=cancellation_check,
)
logger.info(f"🔍 DEBUG: Enhanced code extraction completed: {code_examples_count} code examples found for {filename}")
except Exception as e:
logger.error(f"Code extraction failed for {filename}: {e}", exc_info=True)
code_examples_count = 0
await report_progress("Enhanced document upload completed!", 100)
result_dict = {
"source_id": source_id,
"filename": filename,
"chunks_stored": chunks_stored,
"code_examples_stored": code_examples_count,
"total_word_count": total_word_count,
"processing_method": "docling_enhanced" if doc_metadata.get("docling_processed") else "legacy_fallback",
"extraction_method": doc_metadata.get("extraction_method", "legacy"),
"chunking_method": doc_metadata.get("chunking_method", "unknown"),
"document_metadata": doc_metadata,
}
span.set_attribute("success", True)
span.set_attribute("chunks_stored", chunks_stored)
span.set_attribute("code_examples_stored", code_examples_count)
span.set_attribute("processing_method", result_dict["processing_method"])
logger.info(f"Enhanced document upload completed successfully: {filename}")
return True, result_dict
except Exception as e:
logger.error(f"Enhanced document upload failed: {filename}", exc_info=True)
span.set_attribute("success", False)
span.set_attribute("error", str(e))
return False, {"error": str(e), "filename": filename}

View File

@@ -0,0 +1,419 @@
"""
Docling Document Processing Utilities
This module provides advanced document processing capabilities using Docling
for multi-format support, intelligent chunking, and structure preservation.
"""
import io
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from ..config.logfire_config import get_logger, logfire
logger = get_logger(__name__)
# Import Docling with availability check
try:
from docling import DocumentConverter
from docling.chunking import HybridChunker
from docling.datamodel.base_models import InputFormat
from docling.datamodel.document import ConversionResult
DOCLING_AVAILABLE = True
except ImportError as e:
logger.warning(f"Docling not available: {e}")
DOCLING_AVAILABLE = False
class DoclingProcessor:
"""
Advanced document processor using Docling for multi-format support
and intelligent chunking optimized for RAG operations.
"""
def __init__(self):
"""Initialize the Docling processor."""
if not DOCLING_AVAILABLE:
raise ImportError(
"Docling is not available. Please install docling>=1.0.0"
)
self.converter = DocumentConverter()
self.chunker = HybridChunker()
def get_supported_formats(self) -> List[str]:
"""
Get list of file formats supported by Docling.
Returns:
List of supported file extensions
"""
# Based on Docling documentation
return [
".pdf", ".docx", ".pptx", ".xlsx", ".html", ".htm",
".mp3", ".wav", ".m4a", ".flac" # Audio formats (if ASR is configured)
]
def is_supported_format(self, filename: str, content_type: str = None) -> bool:
"""
Check if a file format is supported by Docling.
Args:
filename: Name of the file
content_type: MIME type of the file (optional)
Returns:
True if format is supported
"""
if not DOCLING_AVAILABLE:
return False
file_ext = Path(filename).suffix.lower()
return file_ext in self.get_supported_formats()
def detect_input_format(self, filename: str, content_type: str = None) -> Optional[InputFormat]:
"""
Detect the input format for Docling processing.
Args:
filename: Name of the file
content_type: MIME type of the file
Returns:
InputFormat enum value or None if unsupported
"""
file_ext = Path(filename).suffix.lower()
format_mapping = {
".pdf": InputFormat.PDF,
".docx": InputFormat.DOCX,
".pptx": InputFormat.PPTX,
".xlsx": InputFormat.XLSX,
".html": InputFormat.HTML,
".htm": InputFormat.HTML,
}
return format_mapping.get(file_ext)
def extract_text_and_structure(
self,
file_content: bytes,
filename: str,
content_type: str = None
) -> Tuple[str, Dict[str, Any]]:
"""
Extract text and structural information from document using Docling.
Args:
file_content: Raw file bytes
filename: Name of the file
content_type: MIME type of the file
Returns:
Tuple of (extracted_markdown_text, metadata_dict)
Raises:
ValueError: If the file format is not supported
Exception: If extraction fails
"""
if not DOCLING_AVAILABLE:
raise Exception("Docling is not available")
if not self.is_supported_format(filename, content_type):
raise ValueError(f"Unsupported file format for Docling: {filename}")
try:
# Create temporary file for Docling processing
with tempfile.NamedTemporaryFile(suffix=Path(filename).suffix, delete=False) as temp_file:
temp_file.write(file_content)
temp_path = Path(temp_file.name)
try:
# Convert document using Docling
logfire.info(
"Starting Docling document conversion",
filename=filename,
file_size=len(file_content)
)
result: ConversionResult = self.converter.convert(temp_path)
# Export to Markdown for RAG-optimized text
markdown_text = result.document.export_to_markdown()
# Extract metadata
metadata = {
"docling_processed": True,
"original_filename": filename,
"content_type": content_type,
"extraction_method": "docling",
"document_structure": {
"has_tables": bool(result.document.tables),
"has_figures": bool(result.document.figures),
"page_count": len(result.document.pages) if result.document.pages else None,
}
}
# Add table information if present
if result.document.tables:
metadata["table_count"] = len(result.document.tables)
# Add figure information if present
if result.document.figures:
metadata["figure_count"] = len(result.document.figures)
logfire.info(
"Docling document conversion completed",
filename=filename,
text_length=len(markdown_text),
metadata=metadata
)
return markdown_text, metadata
finally:
# Clean up temporary file
try:
temp_path.unlink()
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup temp file {temp_path}: {cleanup_error}")
except Exception as e:
logfire.error(
"Docling document extraction failed",
filename=filename,
error=str(e),
exc_info=True
)
raise Exception(f"Failed to extract text using Docling from {filename}") from e
def create_intelligent_chunks(
self,
markdown_text: str,
metadata: Dict[str, Any] = None,
max_tokens: int = 512
) -> List[Dict[str, Any]]:
"""
Create intelligent chunks using Docling's HybridChunker.
Args:
markdown_text: The markdown text to chunk
metadata: Document metadata to include in chunks
max_tokens: Maximum tokens per chunk (default: 512 for embeddings)
Returns:
List of chunk dictionaries with text and metadata
"""
if not DOCLING_AVAILABLE:
raise Exception("Docling is not available")
try:
# Use Docling's HybridChunker for semantic chunking
chunks = self.chunker.chunk(markdown_text, max_tokens=max_tokens)
chunk_list = []
for i, chunk in enumerate(chunks):
chunk_data = {
"text": chunk.text,
"chunk_index": i,
"chunk_type": "hybrid_semantic",
"token_count": len(chunk.text.split()), # Rough token estimation
"metadata": {
**(metadata or {}),
"chunking_method": "docling_hybrid",
"chunk_boundaries": "semantic_aware"
}
}
chunk_list.append(chunk_data)
logfire.info(
"Docling intelligent chunking completed",
original_length=len(markdown_text),
chunks_created=len(chunk_list),
max_tokens=max_tokens
)
return chunk_list
except Exception as e:
logfire.error(
"Docling chunking failed",
error=str(e),
text_length=len(markdown_text),
exc_info=True
)
# Fallback to simple text chunking
logger.warning("Falling back to simple chunking due to Docling error")
return self._fallback_simple_chunks(markdown_text, metadata, max_tokens)
def _fallback_simple_chunks(
self,
text: str,
metadata: Dict[str, Any] = None,
max_tokens: int = 512
) -> List[Dict[str, Any]]:
"""
Fallback to simple text chunking if Docling chunking fails.
Args:
text: Text to chunk
metadata: Metadata to include
max_tokens: Maximum tokens per chunk
Returns:
List of simple chunks
"""
# Simple word-based chunking as fallback
words = text.split()
chunk_size = max_tokens * 3 # Rough words-to-tokens ratio
chunks = []
for i in range(0, len(words), chunk_size):
chunk_words = words[i:i + chunk_size]
chunk_text = " ".join(chunk_words)
chunk_data = {
"text": chunk_text,
"chunk_index": i // chunk_size,
"chunk_type": "simple_fallback",
"token_count": len(chunk_words),
"metadata": {
**(metadata or {}),
"chunking_method": "simple_fallback",
"chunk_boundaries": "word_based"
}
}
chunks.append(chunk_data)
return chunks
def process_document_for_rag(
self,
file_content: bytes,
filename: str,
content_type: str = None,
max_tokens_per_chunk: int = 512
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
"""
Complete document processing pipeline for RAG operations.
Args:
file_content: Raw file bytes
filename: Name of the file
content_type: MIME type of the file
max_tokens_per_chunk: Maximum tokens per chunk for embeddings
Returns:
Tuple of (chunk_list, document_metadata)
"""
try:
# Extract text and structure
markdown_text, doc_metadata = self.extract_text_and_structure(
file_content, filename, content_type
)
# Create intelligent chunks
chunks = self.create_intelligent_chunks(
markdown_text, doc_metadata, max_tokens_per_chunk
)
# Update document metadata
doc_metadata.update({
"total_chunks": len(chunks),
"processing_pipeline": "docling_rag_optimized",
"chunk_token_limit": max_tokens_per_chunk
})
logfire.info(
"Docling RAG processing completed",
filename=filename,
total_chunks=len(chunks),
total_text_length=len(markdown_text)
)
return chunks, doc_metadata
except Exception as e:
logfire.error(
"Docling RAG processing failed",
filename=filename,
error=str(e),
exc_info=True
)
raise
# Global processor instance
_docling_processor: Optional[DoclingProcessor] = None
def get_docling_processor() -> DoclingProcessor:
"""
Get a singleton instance of the Docling processor.
Returns:
DoclingProcessor instance
Raises:
ImportError: If Docling is not available
"""
global _docling_processor
if _docling_processor is None:
_docling_processor = DoclingProcessor()
return _docling_processor
def is_docling_available() -> bool:
"""
Check if Docling is available for use.
Returns:
True if Docling can be imported and used
"""
return DOCLING_AVAILABLE
def process_document_with_docling(
file_content: bytes,
filename: str,
content_type: str = None
) -> Tuple[str, Dict[str, Any]]:
"""
Convenience function to process a document with Docling.
Args:
file_content: Raw file bytes
filename: Name of the file
content_type: MIME type of the file
Returns:
Tuple of (extracted_text, metadata)
"""
processor = get_docling_processor()
return processor.extract_text_and_structure(file_content, filename, content_type)
def create_rag_chunks_with_docling(
file_content: bytes,
filename: str,
content_type: str = None,
max_tokens: int = 512
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
"""
Convenience function to create RAG-optimized chunks with Docling.
Args:
file_content: Raw file bytes
filename: Name of the file
content_type: MIME type of the file
max_tokens: Maximum tokens per chunk
Returns:
Tuple of (chunk_list, document_metadata)
"""
processor = get_docling_processor()
return processor.process_document_for_rag(file_content, filename, content_type, max_tokens)

View File

@@ -33,6 +33,18 @@ except ImportError:
from ..config.logfire_config import get_logger, logfire
# Import Docling processing utilities
try:
from .docling_processing import (
create_rag_chunks_with_docling,
is_docling_available,
process_document_with_docling,
)
DOCLING_INTEGRATION_AVAILABLE = True
except ImportError:
DOCLING_INTEGRATION_AVAILABLE = False
logger = get_logger(__name__)
@@ -158,6 +170,8 @@ def _clean_html_to_text(html_content: str) -> str:
def extract_text_from_document(file_content: bytes, filename: str, content_type: str) -> str:
"""
Extract text from various document formats.
Uses Docling for advanced processing when available, with fallback to legacy processors.
Args:
file_content: Raw file bytes
@@ -172,6 +186,23 @@ def extract_text_from_document(file_content: bytes, filename: str, content_type:
Exception: If extraction fails
"""
try:
# Try Docling first if available and format is supported
if DOCLING_INTEGRATION_AVAILABLE and is_docling_available():
try:
text, metadata = process_document_with_docling(file_content, filename, content_type)
if text and text.strip():
logger.info(f"Successfully processed {filename} with Docling")
return text
else:
logger.warning(f"Docling returned empty text for {filename}, falling back to legacy processors")
except ValueError as docling_error:
# Docling doesn't support this format, continue to legacy processors
logger.debug(f"Docling doesn't support {filename}: {docling_error}")
except Exception as docling_error:
# Docling failed, log warning and continue to legacy processors
logger.warning(f"Docling processing failed for {filename}: {docling_error}. Falling back to legacy processors.")
# Legacy document processing (existing logic)
# PDF files
if content_type == "application/pdf" or filename.lower().endswith(".pdf"):
return extract_text_from_pdf(file_content)
@@ -342,3 +373,128 @@ def extract_text_from_docx(file_content: bytes) -> str:
except Exception as e:
raise Exception("Failed to extract text from Word document") from e
def extract_and_chunk_for_rag(
file_content: bytes,
filename: str,
content_type: str,
max_tokens_per_chunk: int = 512
) -> tuple[str, list[dict], dict]:
"""
Extract text and create intelligent chunks optimized for RAG operations.
Uses Docling's HybridChunker for semantic-aware chunking when available,
with fallback to basic text extraction and simple chunking.
Args:
file_content: Raw file bytes
filename: Name of the file
content_type: MIME type of the file
max_tokens_per_chunk: Maximum tokens per chunk for embeddings
Returns:
Tuple of (full_text, chunk_list, metadata)
- full_text: Complete extracted text
- chunk_list: List of chunk dictionaries with text and metadata
- metadata: Document-level metadata
Raises:
ValueError: If the file format is not supported
Exception: If extraction fails
"""
try:
# Try Docling's complete RAG processing pipeline first
if DOCLING_INTEGRATION_AVAILABLE and is_docling_available():
try:
chunks, doc_metadata = create_rag_chunks_with_docling(
file_content, filename, content_type, max_tokens_per_chunk
)
# Reconstruct full text from chunks for backward compatibility
full_text = "\n\n".join(chunk["text"] for chunk in chunks)
logger.info(
f"Successfully processed {filename} with Docling RAG pipeline: "
f"{len(chunks)} chunks created"
)
return full_text, chunks, doc_metadata
except ValueError as docling_error:
# Docling doesn't support this format, continue to legacy processing
logger.debug(f"Docling doesn't support {filename}: {docling_error}")
except Exception as docling_error:
# Docling failed, log warning and continue to legacy processing
logger.warning(
f"Docling RAG processing failed for {filename}: {docling_error}. "
f"Falling back to legacy processing with simple chunking."
)
# Fallback to legacy extraction + simple chunking
full_text = extract_text_from_document(file_content, filename, content_type)
# Create simple chunks as fallback
chunks = _create_simple_chunks(full_text, max_tokens_per_chunk)
# Basic metadata
metadata = {
"docling_processed": False,
"original_filename": filename,
"content_type": content_type,
"extraction_method": "legacy",
"chunking_method": "simple_token_based",
"total_chunks": len(chunks),
"chunk_token_limit": max_tokens_per_chunk
}
logger.info(
f"Processed {filename} with legacy methods: {len(chunks)} chunks created"
)
return full_text, chunks, metadata
except Exception as e:
logfire.error(
"Document RAG processing failed",
filename=filename,
content_type=content_type,
error=str(e),
)
raise Exception(f"Failed to process {filename} for RAG") from e
def _create_simple_chunks(text: str, max_tokens_per_chunk: int = 512) -> list[dict]:
"""
Create simple word-based chunks as fallback when Docling is not available.
Args:
text: Text to chunk
max_tokens_per_chunk: Maximum tokens per chunk
Returns:
List of chunk dictionaries
"""
words = text.split()
# Rough estimation: 1 token ≈ 0.75 words for English text
words_per_chunk = int(max_tokens_per_chunk * 0.75)
chunks = []
for i in range(0, len(words), words_per_chunk):
chunk_words = words[i:i + words_per_chunk]
chunk_text = " ".join(chunk_words)
chunk_data = {
"text": chunk_text,
"chunk_index": i // words_per_chunk,
"chunk_type": "simple_word_based",
"token_count": len(chunk_words), # Rough estimate
"metadata": {
"chunking_method": "simple_word_based",
"chunk_boundaries": "word_based",
"fallback_chunking": True
}
}
chunks.append(chunk_data)
return chunks