Files
archon/python/src/mcp_server/modules/rag_module.py
Rasmus Widing 1f03b40af1 Refactor MCP server structure and add separate project tools
- Rename src/mcp to src/mcp_server for clarity
- Update all internal imports to use new path
- Create features/projects directory for modular tool organization
- Add separate, simple project tools (create, list, get, delete, update)
- Keep consolidated tools for backward compatibility (via env var)
- Add USE_SEPARATE_PROJECT_TOOLS env var to toggle between approaches

The new separate tools:
- Solve the async project creation context loss issue
- Provide clearer, single-purpose interfaces
- Remove complex PRP examples for simplicity
- Handle project creation polling automatically
2025-08-18 15:55:00 +03:00

198 lines
7.2 KiB
Python

"""
RAG Module for Archon MCP Server (HTTP-based version)
This module provides tools for:
- RAG query and search
- Source management
- Code example extraction and search
This version uses HTTP calls to the server service instead of importing
service modules directly, enabling true microservices architecture.
"""
import json
import logging
import os
from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
# Import service discovery for HTTP communication
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
def get_setting(key: str, default: str = "false") -> str:
"""Get a setting from environment variable."""
return os.getenv(key, default)
def get_bool_setting(key: str, default: bool = False) -> bool:
"""Get a boolean setting from environment variable."""
value = get_setting(key, "false" if not default else "true")
return value.lower() in ("true", "1", "yes", "on")
def register_rag_tools(mcp: FastMCP):
"""Register all RAG tools with the MCP server."""
@mcp.tool()
async def get_available_sources(ctx: Context) -> str:
"""
Get list of available sources in the knowledge base.
Returns:
JSON string with structure:
- success: bool - Operation success status
- sources: list[dict] - Array of source objects
- count: int - Number of sources
- error: str - Error description if success=false
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, "/api/rag/sources"))
if response.status_code == 200:
result = response.json()
sources = result.get("sources", [])
return json.dumps(
{"success": True, "sources": sources, "count": len(sources)}, indent=2
)
else:
error_detail = response.text
return json.dumps(
{"success": False, "error": f"HTTP {response.status_code}: {error_detail}"},
indent=2,
)
except Exception as e:
logger.error(f"Error getting sources: {e}")
return json.dumps({"success": False, "error": str(e)}, indent=2)
@mcp.tool()
async def perform_rag_query(
ctx: Context, query: str, source_domain: str = None, match_count: int = 5
) -> str:
"""
Search knowledge base for relevant content using RAG.
Args:
query: Search query
source_domain: Optional domain filter (e.g., 'docs.anthropic.com').
Note: This is a domain name, not the source_id from get_available_sources.
match_count: Max results (default: 5)
Returns:
JSON string with structure:
- success: bool - Operation success status
- results: list[dict] - Array of matching documents with content and metadata
- reranked: bool - Whether results were reranked
- error: str|null - Error description if success=false
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
request_data = {"query": query, "match_count": match_count}
if source_domain:
request_data["source"] = source_domain
response = await client.post(urljoin(api_url, "/api/rag/query"), json=request_data)
if response.status_code == 200:
result = response.json()
return json.dumps(
{
"success": True,
"results": result.get("results", []),
"reranked": result.get("reranked", False),
"error": None,
},
indent=2,
)
else:
error_detail = response.text
return json.dumps(
{
"success": False,
"results": [],
"error": f"HTTP {response.status_code}: {error_detail}",
},
indent=2,
)
except Exception as e:
logger.error(f"Error performing RAG query: {e}")
return json.dumps({"success": False, "results": [], "error": str(e)}, indent=2)
@mcp.tool()
async def search_code_examples(
ctx: Context, query: str, source_domain: str = None, match_count: int = 5
) -> str:
"""
Search for relevant code examples in the knowledge base.
Args:
query: Search query
source_domain: Optional domain filter (e.g., 'docs.anthropic.com').
Note: This is a domain name, not the source_id from get_available_sources.
match_count: Max results (default: 5)
Returns:
JSON string with structure:
- success: bool - Operation success status
- results: list[dict] - Array of code examples with content and summaries
- reranked: bool - Whether results were reranked
- error: str|null - Error description if success=false
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
request_data = {"query": query, "match_count": match_count}
if source_domain:
request_data["source"] = source_domain
# Call the dedicated code examples endpoint
response = await client.post(
urljoin(api_url, "/api/rag/code-examples"), json=request_data
)
if response.status_code == 200:
result = response.json()
return json.dumps(
{
"success": True,
"results": result.get("results", []),
"reranked": result.get("reranked", False),
"error": None,
},
indent=2,
)
else:
error_detail = response.text
return json.dumps(
{
"success": False,
"results": [],
"error": f"HTTP {response.status_code}: {error_detail}",
},
indent=2,
)
except Exception as e:
logger.error(f"Error searching code examples: {e}")
return json.dumps({"success": False, "results": [], "error": str(e)}, indent=2)
# Log successful registration
logger.info("✓ RAG tools registered (HTTP-based version)")