Enhanced the hybrid search strategy with tsvector keyword matching (#539)

This commit is contained in:
Cole Medin
2025-09-10 05:23:49 -06:00
committed by GitHub
parent 012d2c58ed
commit 926b6f5a7b
7 changed files with 568 additions and 341 deletions

View File

@@ -1,14 +1,14 @@
"""
Hybrid Search Strategy
Implements hybrid search combining vector similarity search with keyword search
for improved recall and precision in document and code example retrieval.
Implements hybrid search combining vector similarity search with full-text search
using PostgreSQL's ts_vector for improved recall and precision in document and
code example retrieval.
Strategy combines:
1. Vector/semantic search for conceptual matches
2. Keyword search for exact term matches
3. Score boosting for results appearing in both searches
4. Intelligent result merging with preference ordering
2. Full-text search using ts_vector for efficient keyword matching
3. Returns union of both result sets for maximum coverage
"""
from typing import Any
@@ -17,129 +17,17 @@ from supabase import Client
from ...config.logfire_config import get_logger, safe_span
from ..embeddings.embedding_service import create_embedding
from .keyword_extractor import build_search_terms, extract_keywords
logger = get_logger(__name__)
class HybridSearchStrategy:
"""Strategy class implementing hybrid search combining vector and keyword search"""
"""Strategy class implementing hybrid search combining vector and full-text search"""
def __init__(self, supabase_client: Client, base_strategy):
self.supabase_client = supabase_client
self.base_strategy = base_strategy
async def keyword_search(
self,
query: str,
match_count: int,
table_name: str = "documents",
filter_metadata: dict | None = None,
select_fields: str | None = None,
) -> list[dict[str, Any]]:
"""
Perform intelligent keyword search using extracted keywords.
This method extracts keywords from the query and searches for documents
containing any of those keywords, ranking results by the number of matches.
Args:
query: The search query text
match_count: Number of results to return
table_name: The table to search (documents, archon_crawled_pages, or archon_code_examples)
filter_metadata: Optional metadata filters
select_fields: Optional specific fields to select (default: all)
Returns:
List of matching documents ranked by keyword relevance
"""
try:
# Extract keywords from the query
keywords = extract_keywords(query, min_length=2, max_keywords=8)
if not keywords:
# Fallback to original query if no keywords extracted
keywords = [query]
logger.debug(f"Extracted keywords from '{query}': {keywords}")
# Build search terms including variations
search_terms = build_search_terms(keywords)[:12] # Limit total search terms
# For now, we'll search for documents containing ANY of the keywords
# and then rank them by how many keywords they contain
all_results = []
seen_ids = set()
# Search for each keyword individually to get better coverage
for keyword in search_terms[:6]: # Limit to avoid too many queries
# Build the query with appropriate fields
if select_fields:
query_builder = self.supabase_client.from_(table_name).select(select_fields)
else:
query_builder = self.supabase_client.from_(table_name).select("*")
# Add keyword search condition with wildcards
search_pattern = f"%{keyword}%"
# Handle different search patterns based on table
if table_name == "archon_code_examples":
# Search both content and summary for code examples
query_builder = query_builder.or_(
f"content.ilike.{search_pattern},summary.ilike.{search_pattern}"
)
else:
query_builder = query_builder.ilike("content", search_pattern)
# Add metadata filters if provided
if filter_metadata:
if "source" in filter_metadata and table_name in ["documents", "crawled_pages"]:
query_builder = query_builder.eq("source_id", filter_metadata["source"])
elif "source_id" in filter_metadata:
query_builder = query_builder.eq("source_id", filter_metadata["source_id"])
# Execute query with limit
response = query_builder.limit(match_count * 2).execute()
if response.data:
for result in response.data:
result_id = result.get("id")
if result_id and result_id not in seen_ids:
# Count how many keywords match in this result
content = result.get("content", "").lower()
summary = (
result.get("summary", "").lower()
if table_name == "archon_code_examples"
else ""
)
combined_text = f"{content} {summary}"
# Count keyword matches
match_score = sum(1 for kw in keywords if kw.lower() in combined_text)
# Add match score to result
result["keyword_match_score"] = match_score
result["matched_keyword"] = keyword
all_results.append(result)
seen_ids.add(result_id)
# Sort results by keyword match score (descending)
all_results.sort(key=lambda x: x.get("keyword_match_score", 0), reverse=True)
# Return top N results
final_results = all_results[:match_count]
logger.debug(
f"Keyword search found {len(final_results)} results from {len(all_results)} total matches"
)
return final_results
except Exception as e:
logger.error(f"Keyword search failed: {e}")
return []
async def search_documents_hybrid(
self,
query: str,
@@ -148,7 +36,8 @@ class HybridSearchStrategy:
filter_metadata: dict | None = None,
) -> list[dict[str, Any]]:
"""
Perform hybrid search on archon_crawled_pages table combining vector and keyword search.
Perform hybrid search on archon_crawled_pages table using the PostgreSQL
hybrid search function that combines vector and full-text search.
Args:
query: Original search query text
@@ -157,41 +46,59 @@ class HybridSearchStrategy:
filter_metadata: Optional metadata filter dict
Returns:
List of matching documents with boosted scores for dual matches
List of matching documents from both vector and text search
"""
with safe_span("hybrid_search_documents") as span:
try:
# 1. Get vector search results using base strategy
vector_results = await self.base_strategy.vector_search(
query_embedding=query_embedding,
match_count=match_count * 2, # Get more for filtering
filter_metadata=filter_metadata,
table_rpc="match_archon_crawled_pages",
)
# Prepare filter and source parameters
filter_json = filter_metadata or {}
source_filter = filter_json.pop("source", None) if "source" in filter_json else None
# 2. Get keyword search results
keyword_results = await self.keyword_search(
query=query,
match_count=match_count * 2,
table_name="archon_crawled_pages",
filter_metadata=filter_metadata,
select_fields="id, url, chunk_number, content, metadata, source_id",
)
# Call the hybrid search PostgreSQL function
response = self.supabase_client.rpc(
"hybrid_search_archon_crawled_pages",
{
"query_embedding": query_embedding,
"query_text": query,
"match_count": match_count,
"filter": filter_json,
"source_filter": source_filter,
},
).execute()
# 3. Combine and merge results intelligently
combined_results = self._merge_search_results(
vector_results, keyword_results, match_count
)
if not response.data:
logger.debug("No results from hybrid search")
return []
span.set_attribute("vector_results_count", len(vector_results))
span.set_attribute("keyword_results_count", len(keyword_results))
span.set_attribute("final_results_count", len(combined_results))
# Format results to match expected structure
results = []
for row in response.data:
result = {
"id": row["id"],
"url": row["url"],
"chunk_number": row["chunk_number"],
"content": row["content"],
"metadata": row["metadata"],
"source_id": row["source_id"],
"similarity": row["similarity"],
"match_type": row["match_type"],
}
results.append(result)
span.set_attribute("results_count", len(results))
# Log match type distribution for debugging
match_types = {}
for r in results:
mt = r.get("match_type", "unknown")
match_types[mt] = match_types.get(mt, 0) + 1
logger.debug(
f"Hybrid document search: {len(vector_results)} vector + {len(keyword_results)} keyword → {len(combined_results)} final"
f"Hybrid search returned {len(results)} results. "
f"Match types: {match_types}"
)
return combined_results
return results
except Exception as e:
logger.error(f"Hybrid document search failed: {e}")
@@ -206,7 +113,8 @@ class HybridSearchStrategy:
source_id: str | None = None,
) -> list[dict[str, Any]]:
"""
Perform hybrid search on archon_code_examples table combining vector and keyword search.
Perform hybrid search on archon_code_examples table using the PostgreSQL
hybrid search function that combines vector and full-text search.
Args:
query: Search query text
@@ -215,147 +123,72 @@ class HybridSearchStrategy:
source_id: Optional source ID to filter results
Returns:
List of matching code examples with boosted scores for dual matches
List of matching code examples from both vector and text search
"""
with safe_span("hybrid_search_code_examples") as span:
try:
# Create query embedding (no enhancement needed)
# Create query embedding
query_embedding = await create_embedding(query)
if not query_embedding:
logger.error("Failed to create embedding for code example query")
return []
# 1. Get vector search results using base strategy
combined_filter = filter_metadata or {}
if source_id:
combined_filter["source"] = source_id
# Prepare filter and source parameters
filter_json = filter_metadata or {}
# Use source_id parameter if provided, otherwise check filter_metadata
final_source_filter = source_id
if not final_source_filter and "source" in filter_json:
final_source_filter = filter_json.pop("source")
vector_results = await self.base_strategy.vector_search(
query_embedding=query_embedding,
match_count=match_count * 2,
filter_metadata=combined_filter,
table_rpc="match_archon_code_examples",
)
# Call the hybrid search PostgreSQL function
response = self.supabase_client.rpc(
"hybrid_search_archon_code_examples",
{
"query_embedding": query_embedding,
"query_text": query,
"match_count": match_count,
"filter": filter_json,
"source_filter": final_source_filter,
},
).execute()
# 2. Get keyword search results
keyword_filter = filter_metadata or {}
if source_id:
keyword_filter["source_id"] = source_id
if not response.data:
logger.debug("No results from hybrid code search")
return []
keyword_results = await self.keyword_search(
query=query,
match_count=match_count * 2,
table_name="archon_code_examples",
filter_metadata=keyword_filter,
select_fields="id, url, chunk_number, content, summary, metadata, source_id",
)
# Format results to match expected structure
results = []
for row in response.data:
result = {
"id": row["id"],
"url": row["url"],
"chunk_number": row["chunk_number"],
"content": row["content"],
"summary": row["summary"],
"metadata": row["metadata"],
"source_id": row["source_id"],
"similarity": row["similarity"],
"match_type": row["match_type"],
}
results.append(result)
# 3. Combine and merge results intelligently
combined_results = self._merge_search_results(
vector_results, keyword_results, match_count
)
span.set_attribute("results_count", len(results))
span.set_attribute("vector_results_count", len(vector_results))
span.set_attribute("keyword_results_count", len(keyword_results))
span.set_attribute("final_results_count", len(combined_results))
# Log match type distribution for debugging
match_types = {}
for r in results:
mt = r.get("match_type", "unknown")
match_types[mt] = match_types.get(mt, 0) + 1
logger.debug(
f"Hybrid code search: {len(vector_results)} vector + {len(keyword_results)} keyword → {len(combined_results)} final"
f"Hybrid code search returned {len(results)} results. "
f"Match types: {match_types}"
)
return combined_results
return results
except Exception as e:
logger.error(f"Hybrid code example search failed: {e}")
span.set_attribute("error", str(e))
return []
def _merge_search_results(
self,
vector_results: list[dict[str, Any]],
keyword_results: list[dict[str, Any]],
match_count: int,
) -> list[dict[str, Any]]:
"""
Intelligently merge vector and keyword search results with preference ordering.
Priority order:
1. Results appearing in BOTH searches (highest relevance) - get score boost
2. Vector-only results (semantic matches)
3. Keyword-only results (exact term matches)
Args:
vector_results: Results from vector/semantic search
keyword_results: Results from keyword search
match_count: Maximum number of final results to return
Returns:
Merged and prioritized list of results
"""
seen_ids: set[str] = set()
combined_results: list[dict[str, Any]] = []
# Create lookup for vector results by ID for efficient matching
vector_lookup = {r.get("id"): r for r in vector_results if r.get("id")}
# Phase 1: Add items that appear in BOTH searches (boost their scores)
for keyword_result in keyword_results:
result_id = keyword_result.get("id")
if result_id and result_id in vector_lookup and result_id not in seen_ids:
vector_result = vector_lookup[result_id]
# Boost similarity score for dual matches (cap at 1.0)
boosted_similarity = min(1.0, vector_result.get("similarity", 0) * 1.2)
vector_result["similarity"] = boosted_similarity
vector_result["match_type"] = "hybrid" # Mark as hybrid match
combined_results.append(vector_result)
seen_ids.add(result_id)
# Phase 2: Add remaining vector results (semantic matches without exact keywords)
for vector_result in vector_results:
result_id = vector_result.get("id")
if result_id and result_id not in seen_ids and len(combined_results) < match_count:
vector_result["match_type"] = "vector"
combined_results.append(vector_result)
seen_ids.add(result_id)
# Phase 3: Add pure keyword matches if we need more results
for keyword_result in keyword_results:
result_id = keyword_result.get("id")
if result_id and result_id not in seen_ids and len(combined_results) < match_count:
# Convert keyword result to match vector result format
# Use keyword match score to influence similarity score
keyword_score = keyword_result.get("keyword_match_score", 1)
# Scale keyword score to similarity range (0.3 to 0.7 based on matches)
scaled_similarity = min(0.7, 0.3 + (keyword_score * 0.1))
standardized_result = {
"id": keyword_result["id"],
"url": keyword_result["url"],
"chunk_number": keyword_result["chunk_number"],
"content": keyword_result["content"],
"metadata": keyword_result["metadata"],
"source_id": keyword_result["source_id"],
"similarity": scaled_similarity,
"match_type": "keyword",
"keyword_match_score": keyword_score,
}
# Include summary if present (for code examples)
if "summary" in keyword_result:
standardized_result["summary"] = keyword_result["summary"]
combined_results.append(standardized_result)
seen_ids.add(result_id)
# Return only up to the requested match count
final_results = combined_results[:match_count]
logger.debug(
f"Merge stats - Hybrid: {sum(1 for r in final_results if r.get('match_type') == 'hybrid')}, "
f"Vector: {sum(1 for r in final_results if r.get('match_type') == 'vector')}, "
f"Keyword: {sum(1 for r in final_results if r.get('match_type') == 'keyword')}"
)
return final_results
return []

View File

@@ -204,10 +204,19 @@ class RAGService:
use_hybrid_search = self.get_bool_setting("USE_HYBRID_SEARCH", False)
use_reranking = self.get_bool_setting("USE_RERANKING", False)
# If reranking is enabled, fetch more candidates for the reranker to evaluate
# This allows the reranker to see a broader set of results
search_match_count = match_count
if use_reranking and self.reranking_strategy:
# Fetch 5x the requested amount when reranking is enabled
# The reranker will select the best from this larger pool
search_match_count = match_count * 5
logger.debug(f"Reranking enabled - fetching {search_match_count} candidates for {match_count} final results")
# Step 1 & 2: Get results (with hybrid search if enabled)
results = await self.search_documents(
query=query,
match_count=match_count,
match_count=search_match_count,
filter_metadata=filter_metadata,
use_hybrid_search=use_hybrid_search,
)
@@ -234,14 +243,18 @@ class RAGService:
reranking_applied = False
if self.reranking_strategy and formatted_results:
try:
# Pass top_k to limit results to the originally requested count
formatted_results = await self.reranking_strategy.rerank_results(
query, formatted_results, content_key="content"
query, formatted_results, content_key="content", top_k=match_count
)
reranking_applied = True
logger.debug(f"Reranking applied to {len(formatted_results)} results")
logger.debug(f"Reranking applied: {search_match_count} candidates -> {len(formatted_results)} final results")
except Exception as e:
logger.warning(f"Reranking failed: {e}")
reranking_applied = False
# If reranking fails but we fetched extra results, trim to requested count
if len(formatted_results) > match_count:
formatted_results = formatted_results[:match_count]
# Build response
response_data = {
@@ -313,6 +326,12 @@ class RAGService:
use_hybrid_search = self.get_bool_setting("USE_HYBRID_SEARCH", False)
use_reranking = self.get_bool_setting("USE_RERANKING", False)
# If reranking is enabled, fetch more candidates
search_match_count = match_count
if use_reranking and self.reranking_strategy:
search_match_count = match_count * 5
logger.debug(f"Reranking enabled for code search - fetching {search_match_count} candidates")
# Prepare filter
filter_metadata = {"source": source_id} if source_id and source_id.strip() else None
@@ -320,7 +339,7 @@ class RAGService:
# Use hybrid search for code examples
results = await self.hybrid_strategy.search_code_examples_hybrid(
query=query,
match_count=match_count,
match_count=search_match_count,
filter_metadata=filter_metadata,
source_id=source_id,
)
@@ -328,7 +347,7 @@ class RAGService:
# Use standard agentic search
results = await self.agentic_strategy.search_code_examples(
query=query,
match_count=match_count,
match_count=search_match_count,
filter_metadata=filter_metadata,
source_id=source_id,
)
@@ -337,10 +356,14 @@ class RAGService:
if self.reranking_strategy and results:
try:
results = await self.reranking_strategy.rerank_results(
query, results, content_key="content"
query, results, content_key="content", top_k=match_count
)
logger.debug(f"Code reranking applied: {search_match_count} candidates -> {len(results)} final results")
except Exception as e:
logger.warning(f"Code reranking failed: {e}")
# If reranking fails but we fetched extra results, trim to requested count
if len(results) > match_count:
results = results[:match_count]
# Format results
formatted_results = []

View File

@@ -162,38 +162,6 @@ class TestHybridSearchCore:
"""Test hybrid strategy initializes"""
assert hybrid_strategy is not None
assert hasattr(hybrid_strategy, "search_documents_hybrid")
assert hasattr(hybrid_strategy, "_merge_search_results")
def test_merge_results_functionality(self, hybrid_strategy):
"""Test result merging logic"""
vector_results = [
{
"id": "1",
"content": "Vector result",
"similarity": 0.9,
"url": "test1.com",
"chunk_number": 1,
"metadata": {},
"source_id": "src1",
}
]
keyword_results = [
{
"id": "2",
"content": "Keyword result",
"url": "test2.com",
"chunk_number": 1,
"metadata": {},
"source_id": "src2",
}
]
merged = hybrid_strategy._merge_search_results(
vector_results, keyword_results, match_count=5
)
assert isinstance(merged, list)
assert len(merged) <= 5
class TestRerankingCore:

View File

@@ -168,42 +168,6 @@ class TestHybridSearchStrategy:
assert hasattr(hybrid_strategy, "search_documents_hybrid")
assert hasattr(hybrid_strategy, "search_code_examples_hybrid")
def test_merge_search_results(self, hybrid_strategy):
"""Test search result merging"""
vector_results = [
{
"id": "1",
"content": "Vector result 1",
"score": 0.9,
"url": "url1",
"chunk_number": 1,
"metadata": {},
"source_id": "source1",
"similarity": 0.9,
}
]
keyword_results = [
{
"id": "2",
"content": "Keyword result 1",
"score": 0.8,
"url": "url2",
"chunk_number": 1,
"metadata": {},
"source_id": "source2",
}
]
merged = hybrid_strategy._merge_search_results(
vector_results, keyword_results, match_count=5
)
assert isinstance(merged, list)
assert len(merged) <= 5
# Should contain results from both sources
if merged:
assert any("Vector result" in str(r) or "Keyword result" in str(r) for r in merged)
class TestRerankingStrategy:
"""Test reranking strategy implementation"""