Merge pull request #306 from coleam00/feature/mcp-server-consolidation-simplification

Refactor MCP server: Modularize tools and add comprehensive tests
This commit is contained in:
Wirasm
2025-08-20 12:20:13 +03:00
committed by GitHub
35 changed files with 3305 additions and 1769 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,12 @@
"""
Document and version management tools for Archon MCP Server.
This module provides separate tools for document operations:
- create_document, list_documents, get_document, update_document, delete_document
- create_version, list_versions, get_version, restore_version
"""
from .document_tools import register_document_tools
from .version_tools import register_version_tools
__all__ = ["register_document_tools", "register_version_tools"]

View File

@@ -0,0 +1,323 @@
"""
Simple document management tools for Archon MCP Server.
Provides separate, focused tools for each document operation.
Supports various document types including specs, designs, notes, and PRPs.
"""
import json
import logging
from typing import Any, Optional, Dict, List
from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.mcp_server.utils.error_handling import MCPErrorFormatter
from src.mcp_server.utils.timeout_config import get_default_timeout
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
def register_document_tools(mcp: FastMCP):
"""Register individual document management tools with the MCP server."""
@mcp.tool()
async def create_document(
ctx: Context,
project_id: str,
title: str,
document_type: str,
content: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None,
author: Optional[str] = None,
) -> str:
"""
Create a new document with automatic versioning.
Args:
project_id: Project UUID (required)
title: Document title (required)
document_type: Type of document. Common types:
- "spec": Technical specifications
- "design": Design documents
- "note": General notes
- "prp": Product requirement prompts
- "api": API documentation
- "guide": User guides
content: Document content as structured JSON (optional).
Can be any JSON structure that fits your needs.
tags: List of tags for categorization (e.g., ["backend", "auth"])
author: Document author name (optional)
Returns:
JSON with document details:
{
"success": true,
"document": {...},
"document_id": "doc-123",
"message": "Document created successfully"
}
Examples:
# Create API specification
create_document(
project_id="550e8400-e29b-41d4-a716-446655440000",
title="REST API Specification",
document_type="spec",
content={
"endpoints": [
{"path": "/users", "method": "GET", "description": "List users"},
{"path": "/users/{id}", "method": "GET", "description": "Get user"}
],
"authentication": "Bearer token",
"version": "1.0.0"
},
tags=["api", "backend"],
author="API Team"
)
# Create design document
create_document(
project_id="550e8400-e29b-41d4-a716-446655440000",
title="Authentication Flow Design",
document_type="design",
content={
"overview": "OAuth2 implementation design",
"components": ["AuthProvider", "TokenManager", "UserSession"],
"flow": {"step1": "Redirect to provider", "step2": "Exchange code"}
}
)
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
urljoin(api_url, f"/api/projects/{project_id}/docs"),
json={
"document_type": document_type,
"title": title,
"content": content or {},
"tags": tags,
"author": author,
},
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"document": result.get("document"),
"document_id": result.get("document", {}).get("id"),
"message": result.get("message", "Document created successfully"),
})
else:
return MCPErrorFormatter.from_http_error(response, "create document")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "create document", {"project_id": project_id, "title": title}
)
except Exception as e:
logger.error(f"Error creating document: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "create document")
@mcp.tool()
async def list_documents(ctx: Context, project_id: str) -> str:
"""
List all documents for a project.
Args:
project_id: Project UUID (required)
Returns:
JSON array of documents
Example:
list_documents(project_id="uuid")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, f"/api/projects/{project_id}/docs"))
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"documents": result.get("documents", []),
"count": len(result.get("documents", [])),
})
else:
return MCPErrorFormatter.from_http_error(response, "list documents")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "list documents", {"project_id": project_id})
except Exception as e:
logger.error(f"Error listing documents: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "list documents")
@mcp.tool()
async def get_document(ctx: Context, project_id: str, doc_id: str) -> str:
"""
Get detailed information about a specific document.
Args:
project_id: Project UUID (required)
doc_id: Document UUID (required)
Returns:
JSON with complete document details
Example:
get_document(project_id="uuid", doc_id="doc-uuid")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
urljoin(api_url, f"/api/projects/{project_id}/docs/{doc_id}")
)
if response.status_code == 200:
document = response.json()
return json.dumps({"success": True, "document": document})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Document {doc_id} not found",
suggestion="Verify the document ID is correct and exists in this project",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "get document")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "get document", {"project_id": project_id, "doc_id": doc_id}
)
except Exception as e:
logger.error(f"Error getting document: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "get document")
@mcp.tool()
async def update_document(
ctx: Context,
project_id: str,
doc_id: str,
title: Optional[str] = None,
content: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None,
author: Optional[str] = None,
) -> str:
"""
Update a document's properties.
Args:
project_id: Project UUID (required)
doc_id: Document UUID (required)
title: New document title (optional)
content: New document content (optional)
tags: New tags list (optional)
author: New author (optional)
Returns:
JSON with updated document details
Example:
update_document(project_id="uuid", doc_id="doc-uuid", title="New Title",
content={"updated": "content"})
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
# Build update fields
update_fields: Dict[str, Any] = {}
if title is not None:
update_fields["title"] = title
if content is not None:
update_fields["content"] = content
if tags is not None:
update_fields["tags"] = tags
if author is not None:
update_fields["author"] = author
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.put(
urljoin(api_url, f"/api/projects/{project_id}/docs/{doc_id}"),
json=update_fields,
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"document": result.get("document"),
"message": result.get("message", "Document updated successfully"),
})
else:
return MCPErrorFormatter.from_http_error(response, "update document")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "update document", {"project_id": project_id, "doc_id": doc_id}
)
except Exception as e:
logger.error(f"Error updating document: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "update document")
@mcp.tool()
async def delete_document(ctx: Context, project_id: str, doc_id: str) -> str:
"""
Delete a document.
Args:
project_id: Project UUID (required)
doc_id: Document UUID (required)
Returns:
JSON confirmation of deletion
Example:
delete_document(project_id="uuid", doc_id="doc-uuid")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.delete(
urljoin(api_url, f"/api/projects/{project_id}/docs/{doc_id}")
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"message": result.get("message", f"Document {doc_id} deleted successfully"),
})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Document {doc_id} not found",
suggestion="Verify the document ID is correct and exists in this project",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "delete document")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "delete document", {"project_id": project_id, "doc_id": doc_id}
)
except Exception as e:
logger.error(f"Error deleting document: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "delete document")

View File

@@ -0,0 +1,346 @@
"""
Simple version management tools for Archon MCP Server.
Provides separate, focused tools for version control operations.
Supports versioning of documents, features, and other project data.
"""
import json
import logging
from typing import Any, Optional
from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.mcp_server.utils.error_handling import MCPErrorFormatter
from src.mcp_server.utils.timeout_config import get_default_timeout
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
def register_version_tools(mcp: FastMCP):
"""Register individual version management tools with the MCP server."""
@mcp.tool()
async def create_version(
ctx: Context,
project_id: str,
field_name: str,
content: Any,
change_summary: Optional[str] = None,
document_id: Optional[str] = None,
created_by: str = "system",
) -> str:
"""
Create a new version snapshot of project data.
Creates an immutable snapshot that can be restored later. The content format
depends on which field_name you're versioning.
Args:
project_id: Project UUID (e.g., "550e8400-e29b-41d4-a716-446655440000")
field_name: Which field to version - must be one of:
- "docs": For document arrays
- "features": For feature status objects
- "data": For general data objects
- "prd": For product requirement documents
content: Complete content to snapshot. Format depends on field_name:
For "docs" - pass array of document objects:
[{"id": "doc-123", "title": "API Guide", "content": {...}}]
For "features" - pass dictionary of features:
{"auth": {"status": "done"}, "api": {"status": "in_progress"}}
For "data" - pass any JSON object:
{"config": {"theme": "dark"}, "settings": {...}}
For "prd" - pass PRD object:
{"vision": "...", "features": [...], "metrics": [...]}
change_summary: Description of what changed (e.g., "Added OAuth docs")
document_id: Optional - for versioning specific doc in docs array
created_by: Who created this version (default: "system")
Returns:
JSON with version details:
{
"success": true,
"version": {"version_number": 3, "field_name": "docs"},
"message": "Version created successfully"
}
Examples:
# Version documents
create_version(
project_id="550e8400-e29b-41d4-a716-446655440000",
field_name="docs",
content=[{"id": "doc-1", "title": "Guide", "content": {"text": "..."}}],
change_summary="Updated user guide"
)
# Version features
create_version(
project_id="550e8400-e29b-41d4-a716-446655440000",
field_name="features",
content={"auth": {"status": "done"}, "api": {"status": "todo"}},
change_summary="Completed authentication"
)
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
urljoin(api_url, f"/api/projects/{project_id}/versions"),
json={
"field_name": field_name,
"content": content,
"change_summary": change_summary,
"change_type": "manual",
"document_id": document_id,
"created_by": created_by,
},
)
if response.status_code == 200:
result = response.json()
version_num = result.get("version", {}).get("version_number")
return json.dumps({
"success": True,
"version": result.get("version"),
"version_number": version_num,
"message": f"Version {version_num} created successfully for {field_name} field",
})
elif response.status_code == 400:
error_text = response.text.lower()
if "invalid field_name" in error_text:
return MCPErrorFormatter.format_error(
error_type="validation_error",
message=f"Invalid field_name '{field_name}'. Must be one of: docs, features, data, or prd",
suggestion="Use one of the valid field names: docs, features, data, or prd",
http_status=400,
)
elif "content" in error_text and "required" in error_text:
return MCPErrorFormatter.format_error(
error_type="validation_error",
message="Content is required and cannot be empty. Provide the complete data to version.",
suggestion="Provide the complete data to version",
http_status=400,
)
elif "format" in error_text or "type" in error_text:
if field_name == "docs":
return MCPErrorFormatter.format_error(
error_type="validation_error",
message=f"For field_name='docs', content must be an array. Example: [{{'id': 'doc1', 'title': 'Guide', 'content': {{...}}}}]",
suggestion="Ensure content is an array of document objects",
http_status=400,
)
else:
return MCPErrorFormatter.format_error(
error_type="validation_error",
message=f"For field_name='{field_name}', content must be a dictionary/object. Example: {{'key': 'value'}}",
suggestion="Ensure content is a dictionary/object",
http_status=400,
)
return MCPErrorFormatter.format_error(
error_type="validation_error",
message=f"Invalid request: {response.text}",
suggestion="Check that all required fields are provided and valid",
http_status=400,
)
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Project {project_id} not found",
suggestion="Please check the project ID is correct",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "create version")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "create version", {"project_id": project_id, "field_name": field_name}
)
except Exception as e:
logger.error(f"Error creating version: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "create version")
@mcp.tool()
async def list_versions(ctx: Context, project_id: str, field_name: Optional[str] = None) -> str:
"""
List version history for a project.
Args:
project_id: Project UUID (required)
field_name: Filter by field name - "docs", "features", "data", "prd" (optional)
Returns:
JSON array of versions with metadata
Example:
list_versions(project_id="uuid", field_name="docs")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
params = {}
if field_name:
params["field_name"] = field_name
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
urljoin(api_url, f"/api/projects/{project_id}/versions"), params=params
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"versions": result.get("versions", []),
"count": len(result.get("versions", [])),
})
else:
return MCPErrorFormatter.from_http_error(response, "list versions")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "list versions", {"project_id": project_id, "field_name": field_name}
)
except Exception as e:
logger.error(f"Error listing versions: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "list versions")
@mcp.tool()
async def get_version(
ctx: Context, project_id: str, field_name: str, version_number: int
) -> str:
"""
Get detailed information about a specific version.
Args:
project_id: Project UUID (required)
field_name: Field name - "docs", "features", "data", "prd" (required)
version_number: Version number to retrieve (required)
Returns:
JSON with complete version details and content
Example:
get_version(project_id="uuid", field_name="docs", version_number=3)
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
urljoin(
api_url,
f"/api/projects/{project_id}/versions/{field_name}/{version_number}",
)
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"version": result.get("version"),
"content": result.get("content"),
})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Version {version_number} not found for field {field_name}",
suggestion="Check that the version number and field name are correct",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "get version")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e,
"get version",
{
"project_id": project_id,
"field_name": field_name,
"version_number": version_number,
},
)
except Exception as e:
logger.error(f"Error getting version: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "get version")
@mcp.tool()
async def restore_version(
ctx: Context,
project_id: str,
field_name: str,
version_number: int,
restored_by: str = "system",
) -> str:
"""
Restore a previous version.
Args:
project_id: Project UUID (required)
field_name: Field name - "docs", "features", "data", "prd" (required)
version_number: Version number to restore (required)
restored_by: Identifier of who is restoring (optional, defaults to "system")
Returns:
JSON confirmation of restoration
Example:
restore_version(project_id="uuid", field_name="docs", version_number=2)
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
urljoin(
api_url,
f"/api/projects/{project_id}/versions/{field_name}/{version_number}/restore",
),
json={"restored_by": restored_by},
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"message": result.get(
"message", f"Version {version_number} restored successfully"
),
})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Version {version_number} not found for field {field_name}",
suggestion="Check that the version number exists for this field",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "restore version")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e,
"restore version",
{
"project_id": project_id,
"field_name": field_name,
"version_number": version_number,
},
)
except Exception as e:
logger.error(f"Error restoring version: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "restore version")

View File

@@ -0,0 +1,105 @@
"""
Simple feature management tools for Archon MCP Server.
Provides tools to retrieve and manage project features.
"""
import json
import logging
from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.mcp_server.utils.error_handling import MCPErrorFormatter
from src.mcp_server.utils.timeout_config import get_default_timeout
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
def register_feature_tools(mcp: FastMCP):
"""Register feature management tools with the MCP server."""
@mcp.tool()
async def get_project_features(ctx: Context, project_id: str) -> str:
"""
Get features from a project's features field.
Features track functional components and capabilities of a project.
Features are typically populated through project updates or task completion.
Args:
project_id: Project UUID (required)
Returns:
JSON with list of project features:
{
"success": true,
"features": [
{"name": "authentication", "status": "completed", "components": ["oauth", "jwt"]},
{"name": "api", "status": "in_progress", "endpoints": 12},
{"name": "database", "status": "planned"}
],
"count": 3
}
Note: Returns empty array if no features are defined yet.
Examples:
get_project_features(project_id="550e8400-e29b-41d4-a716-446655440000")
Feature Structure Examples:
Features can have various structures depending on your needs:
1. Simple status tracking:
{"name": "feature_name", "status": "todo|in_progress|done"}
2. Component tracking:
{"name": "auth", "status": "done", "components": ["oauth", "jwt", "sessions"]}
3. Progress tracking:
{"name": "api", "status": "in_progress", "endpoints_done": 12, "endpoints_total": 20}
4. Metadata rich:
{"name": "payments", "provider": "stripe", "version": "2.0", "enabled": true}
How Features Are Populated:
- Features are typically added via update_project() with features field
- Can be automatically populated by AI during project creation
- May be updated when tasks are completed
- Can track any project capabilities or components you need
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
urljoin(api_url, f"/api/projects/{project_id}/features")
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"features": result.get("features", []),
"count": len(result.get("features", [])),
})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Project {project_id} not found",
suggestion="Verify the project ID is correct",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "get project features")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "get project features", {"project_id": project_id}
)
except Exception as e:
logger.error(f"Error getting project features: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "get project features")

View File

@@ -0,0 +1,13 @@
"""
Project management tools for Archon MCP Server.
This module provides separate tools for each project operation:
- create_project: Create a new project
- list_projects: List all projects
- get_project: Get project details
- delete_project: Delete a project
"""
from .project_tools import register_project_tools
__all__ = ["register_project_tools"]

View File

@@ -0,0 +1,348 @@
"""
Simple project management tools for Archon MCP Server.
Provides separate, focused tools for each project operation.
No complex PRP examples - just straightforward project management.
"""
import asyncio
import json
import logging
from typing import Any, Optional
from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.mcp_server.utils.error_handling import MCPErrorFormatter
from src.mcp_server.utils.timeout_config import (
get_default_timeout,
get_max_polling_attempts,
get_polling_interval,
get_polling_timeout,
)
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
def register_project_tools(mcp: FastMCP):
"""Register individual project management tools with the MCP server."""
@mcp.tool()
async def create_project(
ctx: Context,
title: str,
description: str = "",
github_repo: Optional[str] = None,
) -> str:
"""
Create a new project with automatic AI assistance.
The project creation starts a background process that generates PRP documentation
and initial tasks based on the title and description.
Args:
title: Project title - should be descriptive (required)
description: Project description explaining goals and scope
github_repo: GitHub repository URL (e.g., "https://github.com/org/repo")
Returns:
JSON with project details:
{
"success": true,
"project": {...},
"project_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "Project created successfully"
}
Examples:
# Simple project
create_project(
title="Task Management API",
description="RESTful API for managing tasks and projects"
)
# Project with GitHub integration
create_project(
title="OAuth2 Authentication System",
description="Implement secure OAuth2 authentication with multiple providers",
github_repo="https://github.com/myorg/auth-service"
)
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
urljoin(api_url, "/api/projects"),
json={"title": title, "description": description, "github_repo": github_repo},
)
if response.status_code == 200:
result = response.json()
# Handle async project creation
if "progress_id" in result:
# Poll for completion with proper error handling and backoff
max_attempts = get_max_polling_attempts()
polling_timeout = get_polling_timeout()
for attempt in range(max_attempts):
try:
# Exponential backoff
sleep_interval = get_polling_interval(attempt)
await asyncio.sleep(sleep_interval)
# Create new client with polling timeout
async with httpx.AsyncClient(
timeout=polling_timeout
) as poll_client:
list_response = await poll_client.get(
urljoin(api_url, "/api/projects")
)
list_response.raise_for_status() # Raise on HTTP errors
projects = list_response.json()
# Find project with matching title created recently
for proj in projects:
if proj.get("title") == title:
return json.dumps({
"success": True,
"project": proj,
"project_id": proj["id"],
"message": f"Project created successfully with ID: {proj['id']}",
})
except httpx.RequestError as poll_error:
logger.warning(
f"Polling attempt {attempt + 1}/{max_attempts} failed: {poll_error}"
)
if attempt == max_attempts - 1: # Last attempt
return MCPErrorFormatter.format_error(
error_type="polling_timeout",
message=f"Project creation polling failed after {max_attempts} attempts",
details={
"progress_id": result["progress_id"],
"title": title,
"last_error": str(poll_error),
},
suggestion="The project may still be creating. Use list_projects to check status",
)
except Exception as poll_error:
logger.warning(
f"Unexpected error during polling attempt {attempt + 1}: {poll_error}"
)
# If we couldn't find it after polling
return json.dumps({
"success": True,
"progress_id": result["progress_id"],
"message": f"Project creation in progress after {max_attempts} checks. Use list_projects to find it once complete.",
})
else:
# Direct response (shouldn't happen with current API)
return json.dumps({"success": True, "project": result})
else:
return MCPErrorFormatter.from_http_error(response, "create project")
except httpx.ConnectError as e:
return MCPErrorFormatter.from_exception(
e, "create project", {"title": title, "api_url": api_url}
)
except httpx.TimeoutException as e:
return MCPErrorFormatter.from_exception(
e, "create project", {"title": title, "timeout": str(timeout)}
)
except Exception as e:
logger.error(f"Error creating project: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "create project", {"title": title})
@mcp.tool()
async def list_projects(ctx: Context) -> str:
"""
List all projects.
Returns:
JSON array of all projects with their basic information
Example:
list_projects()
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, "/api/projects"))
if response.status_code == 200:
projects = response.json()
return json.dumps({
"success": True,
"projects": projects,
"count": len(projects),
})
else:
return MCPErrorFormatter.from_http_error(response, "list projects")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "list projects", {"api_url": api_url})
except Exception as e:
logger.error(f"Error listing projects: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "list projects")
@mcp.tool()
async def get_project(ctx: Context, project_id: str) -> str:
"""
Get detailed information about a specific project.
Args:
project_id: UUID of the project
Returns:
JSON with complete project details
Example:
get_project(project_id="550e8400-e29b-41d4-a716-446655440000")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, f"/api/projects/{project_id}"))
if response.status_code == 200:
project = response.json()
return json.dumps({"success": True, "project": project})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Project {project_id} not found",
suggestion="Verify the project ID is correct",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "get project")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "get project", {"project_id": project_id})
except Exception as e:
logger.error(f"Error getting project: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "get project")
@mcp.tool()
async def delete_project(ctx: Context, project_id: str) -> str:
"""
Delete a project.
Args:
project_id: UUID of the project to delete
Returns:
JSON confirmation of deletion
Example:
delete_project(project_id="550e8400-e29b-41d4-a716-446655440000")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.delete(urljoin(api_url, f"/api/projects/{project_id}"))
if response.status_code == 200:
return json.dumps({
"success": True,
"message": f"Project {project_id} deleted successfully",
})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Project {project_id} not found",
suggestion="Verify the project ID is correct",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "delete project")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "delete project", {"project_id": project_id})
except Exception as e:
logger.error(f"Error deleting project: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "delete project")
@mcp.tool()
async def update_project(
ctx: Context,
project_id: str,
title: Optional[str] = None,
description: Optional[str] = None,
github_repo: Optional[str] = None,
) -> str:
"""
Update a project's basic information.
Args:
project_id: UUID of the project to update
title: New title (optional)
description: New description (optional)
github_repo: New GitHub repository URL (optional)
Returns:
JSON with updated project details
Example:
update_project(project_id="550e8400-e29b-41d4-a716-446655440000",
title="Updated Project Title")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
# Build update payload with only provided fields
update_data = {}
if title is not None:
update_data["title"] = title
if description is not None:
update_data["description"] = description
if github_repo is not None:
update_data["github_repo"] = github_repo
if not update_data:
return MCPErrorFormatter.format_error(
error_type="validation_error",
message="No fields to update",
suggestion="Provide at least one field to update (title, description, or github_repo)",
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.put(
urljoin(api_url, f"/api/projects/{project_id}"), json=update_data
)
if response.status_code == 200:
project = response.json()
return json.dumps({
"success": True,
"project": project,
"message": "Project updated successfully",
})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Project {project_id} not found",
suggestion="Verify the project ID is correct",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "update project")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "update project", {"project_id": project_id})
except Exception as e:
logger.error(f"Error updating project: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "update project")

View File

@@ -0,0 +1,14 @@
"""
Task management tools for Archon MCP Server.
This module provides separate tools for each task operation:
- create_task: Create a new task
- list_tasks: List tasks with filtering
- get_task: Get task details
- update_task: Update task properties
- delete_task: Delete a task
"""
from .task_tools import register_task_tools
__all__ = ["register_task_tools"]

View File

@@ -0,0 +1,425 @@
"""
Simple task management tools for Archon MCP Server.
Provides separate, focused tools for each task operation.
Mirrors the functionality of the original manage_task tool but with individual tools.
"""
import json
import logging
from typing import Any, Dict, List, Optional, TypedDict
from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.mcp_server.utils.error_handling import MCPErrorFormatter
from src.mcp_server.utils.timeout_config import get_default_timeout
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
class TaskUpdateFields(TypedDict, total=False):
"""Valid fields that can be updated on a task."""
title: str
description: str
status: str # "todo" | "doing" | "review" | "done"
assignee: str # "User" | "Archon" | "AI IDE Agent" | "prp-executor" | "prp-validator"
task_order: int # 0-100, higher = more priority
feature: Optional[str]
sources: Optional[List[Dict[str, str]]]
code_examples: Optional[List[Dict[str, str]]]
def register_task_tools(mcp: FastMCP):
"""Register individual task management tools with the MCP server."""
@mcp.tool()
async def create_task(
ctx: Context,
project_id: str,
title: str,
description: str = "",
assignee: str = "User",
task_order: int = 0,
feature: Optional[str] = None,
sources: Optional[List[Dict[str, str]]] = None,
code_examples: Optional[List[Dict[str, str]]] = None,
) -> str:
"""
Create a new task in a project.
Args:
project_id: Project UUID (required)
title: Task title - should be specific and actionable (required)
description: Detailed task description with acceptance criteria
assignee: Who will work on this task. Options:
- "User": For manual tasks
- "Archon": For AI-driven tasks
- "AI IDE Agent": For code implementation
- "prp-executor": For PRP coordination
- "prp-validator": For testing/validation
task_order: Priority within status (0-100, higher = more priority)
feature: Feature label for grouping related tasks (e.g., "authentication")
sources: List of source references. Each source should have:
- "url": Link to documentation or file path
- "type": Type of source (e.g., "documentation", "api_spec")
- "relevance": Why this source is relevant
code_examples: List of code examples. Each example should have:
- "file": Path to the file
- "function": Function or class name
- "purpose": Why this example is relevant
Returns:
JSON with task details including task_id:
{
"success": true,
"task": {...},
"task_id": "task-123",
"message": "Task created successfully"
}
Examples:
# Simple task
create_task(
project_id="550e8400-e29b-41d4-a716-446655440000",
title="Add user authentication",
description="Implement JWT-based authentication with refresh tokens"
)
# Task with sources and examples
create_task(
project_id="550e8400-e29b-41d4-a716-446655440000",
title="Implement OAuth2 Google provider",
description="Add Google OAuth2 with PKCE security",
assignee="AI IDE Agent",
task_order=10,
feature="authentication",
sources=[
{
"url": "https://developers.google.com/identity/protocols/oauth2",
"type": "documentation",
"relevance": "Official OAuth2 implementation guide"
},
{
"url": "docs/auth/README.md",
"type": "internal_docs",
"relevance": "Current auth architecture"
}
],
code_examples=[
{
"file": "src/auth/base.py",
"function": "BaseAuthProvider",
"purpose": "Base class to extend"
},
{
"file": "tests/auth/test_oauth.py",
"function": "test_oauth_flow",
"purpose": "Test pattern to follow"
}
]
)
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
urljoin(api_url, "/api/tasks"),
json={
"project_id": project_id,
"title": title,
"description": description,
"assignee": assignee,
"task_order": task_order,
"feature": feature,
"sources": sources or [],
"code_examples": code_examples or [],
},
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"task": result.get("task"),
"task_id": result.get("task", {}).get("id"),
"message": result.get("message", "Task created successfully"),
})
else:
return MCPErrorFormatter.from_http_error(response, "create task")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "create task", {"project_id": project_id, "title": title}
)
except Exception as e:
logger.error(f"Error creating task: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "create task")
@mcp.tool()
async def list_tasks(
ctx: Context,
filter_by: Optional[str] = None,
filter_value: Optional[str] = None,
project_id: Optional[str] = None,
include_closed: bool = False,
page: int = 1,
per_page: int = 50,
) -> str:
"""
List tasks with filtering options.
Args:
filter_by: "status" | "project" | "assignee" (optional)
filter_value: Filter value (e.g., "todo", "doing", "review", "done")
project_id: Project UUID (optional, for additional filtering)
include_closed: Include done tasks in results
page: Page number for pagination
per_page: Items per page
Returns:
JSON array of tasks with pagination info
Examples:
list_tasks() # All tasks
list_tasks(filter_by="status", filter_value="todo") # Only todo tasks
list_tasks(filter_by="project", filter_value="project-uuid") # Tasks for specific project
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
# Build URL and parameters based on filter type
params: Dict[str, Any] = {
"page": page,
"per_page": per_page,
"exclude_large_fields": True, # Always exclude large fields in MCP responses
}
if filter_by == "project" and filter_value:
# Use project-specific endpoint for project filtering
url = urljoin(api_url, f"/api/projects/{filter_value}/tasks")
params["include_archived"] = False # For backward compatibility
elif filter_by == "status" and filter_value:
# Use generic tasks endpoint for status filtering
url = urljoin(api_url, "/api/tasks")
params["status"] = filter_value
params["include_closed"] = include_closed
if project_id:
params["project_id"] = project_id
else:
# Default to generic tasks endpoint
url = urljoin(api_url, "/api/tasks")
params["include_closed"] = include_closed
if project_id:
params["project_id"] = project_id
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url, params=params)
response.raise_for_status()
result = response.json()
# Normalize response format - handle both array and object responses
if isinstance(result, list):
# Direct array response
tasks = result
total_count = len(result)
elif isinstance(result, dict):
# Object response - check for standard fields
if "tasks" in result:
tasks = result["tasks"]
total_count = result.get("total_count", len(tasks))
elif "data" in result:
# Alternative format with 'data' field
tasks = result["data"]
total_count = result.get("total", len(tasks))
else:
# Unknown object format
return MCPErrorFormatter.format_error(
error_type="invalid_response",
message="Unexpected response format from API",
details={"response_keys": list(result.keys())},
suggestion="The API response format may have changed. Please check for updates.",
)
else:
# Completely unexpected format
return MCPErrorFormatter.format_error(
error_type="invalid_response",
message="Invalid response type from API",
details={"response_type": type(result).__name__},
suggestion="Expected list or object, got different type.",
)
return json.dumps({
"success": True,
"tasks": tasks,
"total_count": total_count,
"count": len(tasks),
})
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "list tasks", {"filter_by": filter_by, "filter_value": filter_value}
)
except Exception as e:
logger.error(f"Error listing tasks: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "list tasks")
@mcp.tool()
async def get_task(ctx: Context, task_id: str) -> str:
"""
Get detailed information about a specific task.
Args:
task_id: UUID of the task
Returns:
JSON with complete task details
Example:
get_task(task_id="task-uuid")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, f"/api/tasks/{task_id}"))
if response.status_code == 200:
task = response.json()
return json.dumps({"success": True, "task": task})
elif response.status_code == 404:
return MCPErrorFormatter.format_error(
error_type="not_found",
message=f"Task {task_id} not found",
suggestion="Verify the task ID is correct",
http_status=404,
)
else:
return MCPErrorFormatter.from_http_error(response, "get task")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "get task", {"task_id": task_id})
except Exception as e:
logger.error(f"Error getting task: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "get task")
@mcp.tool()
async def update_task(
ctx: Context,
task_id: str,
update_fields: TaskUpdateFields,
) -> str:
"""
Update a task's properties.
Args:
task_id: UUID of the task to update
update_fields: Dict of fields to update (e.g., {"status": "doing", "assignee": "AI IDE Agent"})
Returns:
JSON with updated task details
Examples:
update_task(task_id="uuid", update_fields={"status": "doing"})
update_task(task_id="uuid", update_fields={"title": "New Title", "description": "Updated description"})
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.put(
urljoin(api_url, f"/api/tasks/{task_id}"), json=update_fields
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"task": result.get("task"),
"message": result.get("message", "Task updated successfully"),
})
else:
return MCPErrorFormatter.from_http_error(response, "update task")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(
e, "update task", {"task_id": task_id, "update_fields": list(update_fields.keys())}
)
except Exception as e:
logger.error(f"Error updating task: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "update task")
@mcp.tool()
async def delete_task(ctx: Context, task_id: str) -> str:
"""
Delete/archive a task.
This removes the task from active lists but preserves it in the database
for audit purposes (soft delete).
Args:
task_id: UUID of the task to delete/archive
Returns:
JSON confirmation of deletion:
{
"success": true,
"message": "Task deleted successfully",
"subtasks_archived": 0
}
Example:
delete_task(task_id="task-123e4567-e89b-12d3-a456-426614174000")
"""
try:
api_url = get_api_url()
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.delete(urljoin(api_url, f"/api/tasks/{task_id}"))
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"message": result.get("message", f"Task {task_id} deleted successfully"),
"subtasks_archived": result.get("subtasks_archived", 0),
})
elif response.status_code == 404:
return json.dumps({
"success": False,
"error": f"Task {task_id} not found. Use list_tasks to find valid task IDs.",
})
elif response.status_code == 400:
# More specific error for bad requests
error_text = response.text
if "already archived" in error_text.lower():
return MCPErrorFormatter.format_error(
error_type="already_archived",
message=f"Task {task_id} is already archived",
suggestion="No further action needed - task is already archived",
http_status=400,
)
return MCPErrorFormatter.format_error(
error_type="validation_error",
message=f"Cannot delete task: {error_text}",
suggestion="Check if the task meets deletion requirements",
http_status=400,
)
else:
return MCPErrorFormatter.from_http_error(response, "delete task")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "delete task", {"task_id": task_id})
except Exception as e:
logger.error(f"Error deleting task: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "delete task")

View File

@@ -187,6 +187,64 @@ async def lifespan(server: FastMCP) -> AsyncIterator[ArchonContext]:
logger.info("✅ MCP server shutdown complete")
# Define MCP instructions for Claude Code and other clients
MCP_INSTRUCTIONS = """
# Archon MCP Server Instructions
## 🚨 CRITICAL RULES (ALWAYS FOLLOW)
1. **Task Management**: ALWAYS use Archon MCP tools for task management,
You can combine them with your TODO tools but always make sure that the first todo is to update archon
and the last todo is to update archon.
Example: Use TodoWrite to create a set of new todos
[]Create the task in archon
[]Research deeply using archon rag
[]Research on the web using websearch tools
[]Deeply look into the codebase patterns and integration points
[]Update Archon tasks with the findings
[]Create implementation tasks in Archon
This is to ensure efficient task management and collaboration.
Making sure all critical details are in Archon.
You can think of it as Archon is where you manage the task that needs to be shared with the team
And your todo is your internal subtasks/todos that does not need to be shared with the team.
2. **Research First**: Before implementing, use perform_rag_query and search_code_examples
3. **Task-Driven Development**: Never code without checking current tasks first
## 📋 Core Workflow
For every coding task, follow this cycle:
1. Check current task: manage_task(action="get", task_id="...")
2. Research: perform_rag_query() + search_code_examples()
3. Update to doing: manage_task(action="update", update_fields={"status": "doing"})
4. Implement based on research findings
5. Mark for review: manage_task(action="update", update_fields={"status": "review"})
6. Get next task: manage_task(action="list", filter_by="status", filter_value="todo")
## 🏗️ Project Initialization
- New project: manage_project(action="create", title="...", prd={...})
- Existing project: manage_task(action="list", filter_by="project", filter_value="...")
- Always create atomic tasks (1-4 hours of work each)
## 🔍 Research Patterns
- Architecture: perform_rag_query(query="[tech] patterns", match_count=5)
- Implementation: search_code_examples(query="[feature] example", match_count=3)
- Keep match_count around (5) for focused results
- Combine RAG with websearch tools for better results
## 📊 Task Status Flow
todo doing review done
- Only one task in 'doing' at a time
- Use 'review' for completed work awaiting validation
- Archive obsolete tasks
## 💾 Version Control
- All documents auto-versioned on update
- Use manage_versions to view history or restore
- Deletions preserve version history
"""
# Initialize the main FastMCP server with fixed configuration
try:
logger.info("🏗️ MCP SERVER INITIALIZATION:")
@@ -196,6 +254,7 @@ try:
mcp = FastMCP(
"archon-mcp-server",
description="MCP server for Archon - uses HTTP calls to other services",
instructions=MCP_INSTRUCTIONS,
lifespan=lifespan,
host=server_host,
port=server_port,
@@ -212,10 +271,10 @@ except Exception as e:
@mcp.tool()
async def health_check(ctx: Context) -> str:
"""
Perform a health check on the MCP server and its dependencies.
Check health status of MCP server and dependencies.
Returns:
JSON string with current health status
JSON with health status, uptime, and service availability
"""
try:
# Try to get the lifespan context
@@ -261,10 +320,10 @@ async def health_check(ctx: Context) -> str:
@mcp.tool()
async def session_info(ctx: Context) -> str:
"""
Get information about the current session and all active sessions.
Get current and active session information.
Returns:
JSON string with session information
JSON with active sessions count and server uptime
"""
try:
session_manager = get_session_manager()
@@ -304,7 +363,7 @@ def register_modules():
# Import and register RAG module (HTTP-based version)
try:
from src.mcp.modules.rag_module import register_rag_tools
from src.mcp_server.modules.rag_module import register_rag_tools
register_rag_tools(mcp)
modules_registered += 1
@@ -315,22 +374,96 @@ def register_modules():
logger.error(f"✗ Error registering RAG module: {e}")
logger.error(traceback.format_exc())
# Import and register Project module - only if Projects are enabled
projects_enabled = os.getenv("PROJECTS_ENABLED", "true").lower() == "true"
if projects_enabled:
try:
from src.mcp.modules.project_module import register_project_tools
# Import and register all feature tools - separated and focused
register_project_tools(mcp)
modules_registered += 1
logger.info("✓ Project module registered (HTTP-based)")
except ImportError as e:
logger.warning(f"⚠ Project module not available: {e}")
except Exception as e:
logger.error(f"✗ Error registering Project module: {e}")
logger.error(traceback.format_exc())
else:
logger.info("⚠ Project module skipped - Projects are disabled")
# Project Management Tools
try:
from src.mcp_server.features.projects import register_project_tools
register_project_tools(mcp)
modules_registered += 1
logger.info("✓ Project tools registered")
except ImportError as e:
# Module not found - this is acceptable in modular architecture
logger.warning(f"⚠ Project tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
# Code errors that should not be ignored
logger.error(f"✗ Code error in project tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise # Re-raise to prevent running with broken code
except Exception as e:
# Unexpected errors during registration
logger.error(f"✗ Failed to register project tools: {e}")
logger.error(traceback.format_exc())
# Don't raise - allow other modules to register
# Task Management Tools
try:
from src.mcp_server.features.tasks import register_task_tools
register_task_tools(mcp)
modules_registered += 1
logger.info("✓ Task tools registered")
except ImportError as e:
logger.warning(f"⚠ Task tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in task tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Failed to register task tools: {e}")
logger.error(traceback.format_exc())
# Document Management Tools
try:
from src.mcp_server.features.documents import register_document_tools
register_document_tools(mcp)
modules_registered += 1
logger.info("✓ Document tools registered")
except ImportError as e:
logger.warning(f"⚠ Document tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in document tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Failed to register document tools: {e}")
logger.error(traceback.format_exc())
# Version Management Tools
try:
from src.mcp_server.features.documents import register_version_tools
register_version_tools(mcp)
modules_registered += 1
logger.info("✓ Version tools registered")
except ImportError as e:
logger.warning(f"⚠ Version tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in version tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Failed to register version tools: {e}")
logger.error(traceback.format_exc())
# Feature Management Tools
try:
from src.mcp_server.features.feature_tools import register_feature_tools
register_feature_tools(mcp)
modules_registered += 1
logger.info("✓ Feature tools registered")
except ImportError as e:
logger.warning(f"⚠ Feature tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in feature tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Failed to register feature tools: {e}")
logger.error(traceback.format_exc())
logger.info(f"📦 Total modules registered: {modules_registered}")

View File

@@ -44,10 +44,12 @@ def register_rag_tools(mcp: FastMCP):
"""
Get list of available sources in the knowledge base.
This tool uses HTTP call to the API service.
Returns:
JSON string with list of sources
JSON string with structure:
- success: bool - Operation success status
- sources: list[dict] - Array of source objects
- count: int - Number of sources
- error: str - Error description if success=false
"""
try:
api_url = get_api_url()
@@ -76,22 +78,23 @@ def register_rag_tools(mcp: FastMCP):
@mcp.tool()
async def perform_rag_query(
ctx: Context, query: str, source: str = None, match_count: int = 5
ctx: Context, query: str, source_domain: str = None, match_count: int = 5
) -> str:
"""
Perform a RAG (Retrieval Augmented Generation) query on stored content.
This tool searches the vector database for content relevant to the query and returns
the matching documents. Optionally filter by source domain.
Get the source by using the get_available_sources tool before calling this search!
Search knowledge base for relevant content using RAG.
Args:
query: The search query
source: Optional source domain to filter results (e.g., 'example.com')
match_count: Maximum number of results to return (default: 5)
query: Search query
source_domain: Optional domain filter (e.g., 'docs.anthropic.com').
Note: This is a domain name, not the source_id from get_available_sources.
match_count: Max results (default: 5)
Returns:
JSON string with search results
JSON string with structure:
- success: bool - Operation success status
- results: list[dict] - Array of matching documents with content and metadata
- reranked: bool - Whether results were reranked
- error: str|null - Error description if success=false
"""
try:
api_url = get_api_url()
@@ -99,8 +102,8 @@ def register_rag_tools(mcp: FastMCP):
async with httpx.AsyncClient(timeout=timeout) as client:
request_data = {"query": query, "match_count": match_count}
if source:
request_data["source"] = source
if source_domain:
request_data["source"] = source_domain
response = await client.post(urljoin(api_url, "/api/rag/query"), json=request_data)
@@ -132,24 +135,23 @@ def register_rag_tools(mcp: FastMCP):
@mcp.tool()
async def search_code_examples(
ctx: Context, query: str, source_id: str = None, match_count: int = 5
ctx: Context, query: str, source_domain: str = None, match_count: int = 5
) -> str:
"""
Search for code examples relevant to the query.
This tool searches the vector database for code examples relevant to the query and returns
the matching examples with their summaries. Optionally filter by source_id.
Get the source_id by using the get_available_sources tool before calling this search!
Use the get_available_sources tool first to see what sources are available for filtering.
Search for relevant code examples in the knowledge base.
Args:
query: The search query
source_id: Optional source ID to filter results (e.g., 'example.com')
match_count: Maximum number of results to return (default: 5)
query: Search query
source_domain: Optional domain filter (e.g., 'docs.anthropic.com').
Note: This is a domain name, not the source_id from get_available_sources.
match_count: Max results (default: 5)
Returns:
JSON string with search results
JSON string with structure:
- success: bool - Operation success status
- results: list[dict] - Array of code examples with content and summaries
- reranked: bool - Whether results were reranked
- error: str|null - Error description if success=false
"""
try:
api_url = get_api_url()
@@ -157,8 +159,8 @@ def register_rag_tools(mcp: FastMCP):
async with httpx.AsyncClient(timeout=timeout) as client:
request_data = {"query": query, "match_count": match_count}
if source_id:
request_data["source"] = source_id
if source_domain:
request_data["source"] = source_domain
# Call the dedicated code examples endpoint
response = await client.post(

View File

@@ -0,0 +1,21 @@
"""
Utility modules for MCP Server.
"""
from .error_handling import MCPErrorFormatter
from .http_client import get_http_client
from .timeout_config import (
get_default_timeout,
get_max_polling_attempts,
get_polling_interval,
get_polling_timeout,
)
__all__ = [
"MCPErrorFormatter",
"get_http_client",
"get_default_timeout",
"get_polling_timeout",
"get_max_polling_attempts",
"get_polling_interval",
]

View File

@@ -0,0 +1,166 @@
"""
Centralized error handling utilities for MCP Server.
Provides consistent error formatting and helpful context for clients.
"""
import json
import logging
from typing import Any, Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class MCPErrorFormatter:
"""Formats errors consistently for MCP clients."""
@staticmethod
def format_error(
error_type: str,
message: str,
details: Optional[Dict[str, Any]] = None,
suggestion: Optional[str] = None,
http_status: Optional[int] = None,
) -> str:
"""
Format an error response with consistent structure.
Args:
error_type: Category of error (e.g., "connection_error", "validation_error")
message: User-friendly error message
details: Additional context about the error
suggestion: Actionable suggestion for resolving the error
http_status: HTTP status code if applicable
Returns:
JSON string with structured error information
"""
error_response: Dict[str, Any] = {
"success": False,
"error": {
"type": error_type,
"message": message,
},
}
if details:
error_response["error"]["details"] = details
if suggestion:
error_response["error"]["suggestion"] = suggestion
if http_status:
error_response["error"]["http_status"] = http_status
return json.dumps(error_response)
@staticmethod
def from_http_error(response: httpx.Response, operation: str) -> str:
"""
Format error from HTTP response.
Args:
response: The HTTP response object
operation: Description of what operation was being performed
Returns:
Formatted error JSON string
"""
# Try to extract error from response body
try:
body = response.json()
if isinstance(body, dict):
# Look for common error fields
error_message = (
body.get("detail", {}).get("error")
or body.get("error")
or body.get("message")
or body.get("detail")
)
if error_message:
return MCPErrorFormatter.format_error(
error_type="api_error",
message=f"Failed to {operation}: {error_message}",
details={"response_body": body},
http_status=response.status_code,
suggestion=_get_suggestion_for_status(response.status_code),
)
except Exception:
pass # Fall through to generic error
# Generic error based on status code
return MCPErrorFormatter.format_error(
error_type="http_error",
message=f"Failed to {operation}: HTTP {response.status_code}",
details={"response_text": response.text[:500]}, # Limit response text
http_status=response.status_code,
suggestion=_get_suggestion_for_status(response.status_code),
)
@staticmethod
def from_exception(exception: Exception, operation: str, context: Optional[Dict[str, Any]] = None) -> str:
"""
Format error from exception.
Args:
exception: The exception that occurred
operation: Description of what operation was being performed
context: Additional context about when the error occurred
Returns:
Formatted error JSON string
"""
error_type = "unknown_error"
suggestion = None
# Categorize common exceptions
if isinstance(exception, httpx.ConnectTimeout):
error_type = "connection_timeout"
suggestion = "Check if the Archon server is running and accessible at the configured URL"
elif isinstance(exception, httpx.ReadTimeout):
error_type = "read_timeout"
suggestion = "The operation is taking longer than expected. Try again or check server logs"
elif isinstance(exception, httpx.ConnectError):
error_type = "connection_error"
suggestion = "Ensure the Archon server is running on the correct port"
elif isinstance(exception, httpx.RequestError):
error_type = "request_error"
suggestion = "Check network connectivity and server configuration"
elif isinstance(exception, ValueError):
error_type = "validation_error"
suggestion = "Check that all input parameters are valid"
elif isinstance(exception, KeyError):
error_type = "missing_data"
suggestion = "The response format may have changed. Check for API updates"
details: Dict[str, Any] = {"exception_type": type(exception).__name__, "exception_message": str(exception)}
if context:
details["context"] = context
return MCPErrorFormatter.format_error(
error_type=error_type,
message=f"Failed to {operation}: {str(exception)}",
details=details,
suggestion=suggestion,
)
def _get_suggestion_for_status(status_code: int) -> Optional[str]:
"""Get helpful suggestion based on HTTP status code."""
suggestions = {
400: "Check that all required parameters are provided and valid",
401: "Authentication may be required. Check API credentials",
403: "You may not have permission for this operation",
404: "The requested resource was not found. Verify the ID is correct",
409: "There's a conflict with the current state. The resource may already exist",
422: "The request format is correct but the data is invalid",
429: "Too many requests. Please wait before retrying",
500: "Server error. Check server logs for details",
502: "The backend service may be down. Check if all services are running",
503: "Service temporarily unavailable. Try again later",
504: "The operation timed out. The server may be overloaded",
}
return suggestions.get(status_code)

View File

@@ -0,0 +1,38 @@
"""
HTTP client utilities for MCP Server.
Provides consistent HTTP client configuration.
"""
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
import httpx
from .timeout_config import get_default_timeout, get_polling_timeout
@asynccontextmanager
async def get_http_client(
timeout: Optional[httpx.Timeout] = None, for_polling: bool = False
) -> AsyncIterator[httpx.AsyncClient]:
"""
Create an HTTP client with consistent configuration.
Args:
timeout: Optional custom timeout. If not provided, uses defaults.
for_polling: If True, uses polling-specific timeout configuration.
Yields:
Configured httpx.AsyncClient
Example:
async with get_http_client() as client:
response = await client.get(url)
"""
if timeout is None:
timeout = get_polling_timeout() if for_polling else get_default_timeout()
# Future: Could add retry logic, custom headers, etc. here
async with httpx.AsyncClient(timeout=timeout) as client:
yield client

View File

@@ -0,0 +1,80 @@
"""
Centralized timeout configuration for MCP Server.
Provides consistent timeout values across all tools.
"""
import os
from typing import Optional
import httpx
def get_default_timeout() -> httpx.Timeout:
"""
Get default timeout configuration from environment or defaults.
Environment variables:
- MCP_REQUEST_TIMEOUT: Total request timeout in seconds (default: 30)
- MCP_CONNECT_TIMEOUT: Connection timeout in seconds (default: 5)
- MCP_READ_TIMEOUT: Read timeout in seconds (default: 20)
- MCP_WRITE_TIMEOUT: Write timeout in seconds (default: 10)
Returns:
Configured httpx.Timeout object
"""
return httpx.Timeout(
timeout=float(os.getenv("MCP_REQUEST_TIMEOUT", "30.0")),
connect=float(os.getenv("MCP_CONNECT_TIMEOUT", "5.0")),
read=float(os.getenv("MCP_READ_TIMEOUT", "20.0")),
write=float(os.getenv("MCP_WRITE_TIMEOUT", "10.0")),
)
def get_polling_timeout() -> httpx.Timeout:
"""
Get timeout configuration for polling operations.
Polling operations may need longer timeouts.
Returns:
Configured httpx.Timeout object for polling
"""
return httpx.Timeout(
timeout=float(os.getenv("MCP_POLLING_TIMEOUT", "60.0")),
connect=float(os.getenv("MCP_CONNECT_TIMEOUT", "5.0")),
read=float(os.getenv("MCP_POLLING_READ_TIMEOUT", "30.0")),
write=float(os.getenv("MCP_WRITE_TIMEOUT", "10.0")),
)
def get_max_polling_attempts() -> int:
"""
Get maximum number of polling attempts.
Returns:
Maximum polling attempts (default: 30)
"""
try:
return int(os.getenv("MCP_MAX_POLLING_ATTEMPTS", "30"))
except ValueError:
# Fall back to default if env var is not a valid integer
return 30
def get_polling_interval(attempt: int) -> float:
"""
Get polling interval with exponential backoff.
Args:
attempt: Current attempt number (0-based)
Returns:
Sleep interval in seconds
"""
base_interval = float(os.getenv("MCP_POLLING_BASE_INTERVAL", "1.0"))
max_interval = float(os.getenv("MCP_POLLING_MAX_INTERVAL", "5.0"))
# Exponential backoff: 1s, 2s, 4s, 5s, 5s, ...
interval = min(base_interval * (2**attempt), max_interval)
return float(interval)