From f786a8026bcec67503936f071a7fc382e053c2ae Mon Sep 17 00:00:00 2001 From: Rasmus Widing Date: Mon, 18 Aug 2025 20:42:04 +0300 Subject: [PATCH] Add task management tools with smart routing Extract task functionality into focused tools: - create_task: Create tasks with sources and code examples - list_tasks: List tasks with project/status filtering - get_task: Retrieve task details - update_task: Modify task properties - delete_task: Archive tasks (soft delete) Preserves intelligent endpoint routing: - Project-specific: /api/projects/{id}/tasks - Status filtering: /api/tasks?status=X - Assignee filtering: /api/tasks?assignee=X --- .../src/mcp_server/features/tasks/__init__.py | 14 + .../mcp_server/features/tasks/task_tools.py | 375 ++++++++++++++++++ 2 files changed, 389 insertions(+) create mode 100644 python/src/mcp_server/features/tasks/__init__.py create mode 100644 python/src/mcp_server/features/tasks/task_tools.py diff --git a/python/src/mcp_server/features/tasks/__init__.py b/python/src/mcp_server/features/tasks/__init__.py new file mode 100644 index 00000000..f5f659c4 --- /dev/null +++ b/python/src/mcp_server/features/tasks/__init__.py @@ -0,0 +1,14 @@ +""" +Task management tools for Archon MCP Server. + +This module provides separate tools for each task operation: +- create_task: Create a new task +- list_tasks: List tasks with filtering +- get_task: Get task details +- update_task: Update task properties +- delete_task: Delete a task +""" + +from .task_tools import register_task_tools + +__all__ = ["register_task_tools"] diff --git a/python/src/mcp_server/features/tasks/task_tools.py b/python/src/mcp_server/features/tasks/task_tools.py new file mode 100644 index 00000000..b81b313d --- /dev/null +++ b/python/src/mcp_server/features/tasks/task_tools.py @@ -0,0 +1,375 @@ +""" +Simple task management tools for Archon MCP Server. + +Provides separate, focused tools for each task operation. +Mirrors the functionality of the original manage_task tool but with individual tools. +""" + +import json +import logging +from typing import Any, Optional, List, Dict +from urllib.parse import urljoin + +import httpx +from mcp.server.fastmcp import Context, FastMCP + +from src.server.config.service_discovery import get_api_url + +logger = logging.getLogger(__name__) + + +def register_task_tools(mcp: FastMCP): + """Register individual task management tools with the MCP server.""" + + @mcp.tool() + async def create_task( + ctx: Context, + project_id: str, + title: str, + description: str = "", + assignee: str = "User", + task_order: int = 0, + feature: Optional[str] = None, + sources: Optional[List[Dict[str, str]]] = None, + code_examples: Optional[List[Dict[str, str]]] = None, + ) -> str: + """ + Create a new task in a project. + + Args: + project_id: Project UUID (required) + title: Task title - should be specific and actionable (required) + description: Detailed task description with acceptance criteria + assignee: Who will work on this task. Options: + - "User": For manual tasks + - "Archon": For AI-driven tasks + - "AI IDE Agent": For code implementation + - "prp-executor": For PRP coordination + - "prp-validator": For testing/validation + task_order: Priority within status (0-100, higher = more priority) + feature: Feature label for grouping related tasks (e.g., "authentication") + sources: List of source references. Each source should have: + - "url": Link to documentation or file path + - "type": Type of source (e.g., "documentation", "api_spec") + - "relevance": Why this source is relevant + code_examples: List of code examples. Each example should have: + - "file": Path to the file + - "function": Function or class name + - "purpose": Why this example is relevant + + Returns: + JSON with task details including task_id: + { + "success": true, + "task": {...}, + "task_id": "task-123", + "message": "Task created successfully" + } + + Examples: + # Simple task + create_task( + project_id="550e8400-e29b-41d4-a716-446655440000", + title="Add user authentication", + description="Implement JWT-based authentication with refresh tokens" + ) + + # Task with sources and examples + create_task( + project_id="550e8400-e29b-41d4-a716-446655440000", + title="Implement OAuth2 Google provider", + description="Add Google OAuth2 with PKCE security", + assignee="AI IDE Agent", + task_order=10, + feature="authentication", + sources=[ + { + "url": "https://developers.google.com/identity/protocols/oauth2", + "type": "documentation", + "relevance": "Official OAuth2 implementation guide" + }, + { + "url": "docs/auth/README.md", + "type": "internal_docs", + "relevance": "Current auth architecture" + } + ], + code_examples=[ + { + "file": "src/auth/base.py", + "function": "BaseAuthProvider", + "purpose": "Base class to extend" + }, + { + "file": "tests/auth/test_oauth.py", + "function": "test_oauth_flow", + "purpose": "Test pattern to follow" + } + ] + ) + """ + try: + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + urljoin(api_url, "/api/tasks"), + json={ + "project_id": project_id, + "title": title, + "description": description, + "assignee": assignee, + "task_order": task_order, + "feature": feature, + "sources": sources or [], + "code_examples": code_examples or [], + }, + ) + + if response.status_code == 200: + result = response.json() + return json.dumps({ + "success": True, + "task": result.get("task"), + "task_id": result.get("task", {}).get("id"), + "message": result.get("message", "Task created successfully"), + }) + else: + error_detail = response.text + return json.dumps({"success": False, "error": error_detail}) + + except Exception as e: + logger.error(f"Error creating task: {e}") + return json.dumps({"success": False, "error": str(e)}) + + @mcp.tool() + async def list_tasks( + ctx: Context, + filter_by: Optional[str] = None, + filter_value: Optional[str] = None, + project_id: Optional[str] = None, + include_closed: bool = False, + page: int = 1, + per_page: int = 50, + ) -> str: + """ + List tasks with filtering options. + + Args: + filter_by: "status" | "project" | "assignee" (optional) + filter_value: Filter value (e.g., "todo", "doing", "review", "done") + project_id: Project UUID (optional, for additional filtering) + include_closed: Include done tasks in results + page: Page number for pagination + per_page: Items per page + + Returns: + JSON array of tasks with pagination info + + Examples: + list_tasks() # All tasks + list_tasks(filter_by="status", filter_value="todo") # Only todo tasks + list_tasks(filter_by="project", filter_value="project-uuid") # Tasks for specific project + """ + try: + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + # Build URL and parameters based on filter type + params = { + "page": page, + "per_page": per_page, + "exclude_large_fields": True, # Always exclude large fields in MCP responses + } + + if filter_by == "project" and filter_value: + # Use project-specific endpoint for project filtering + url = urljoin(api_url, f"/api/projects/{filter_value}/tasks") + params["include_archived"] = False # For backward compatibility + elif filter_by == "status" and filter_value: + # Use generic tasks endpoint for status filtering + url = urljoin(api_url, "/api/tasks") + params["status"] = filter_value + params["include_closed"] = include_closed + if project_id: + params["project_id"] = project_id + else: + # Default to generic tasks endpoint + url = urljoin(api_url, "/api/tasks") + params["include_closed"] = include_closed + if project_id: + params["project_id"] = project_id + + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.get(url, params=params) + response.raise_for_status() + + result = response.json() + + # Handle both direct array and paginated response formats + if isinstance(result, list): + tasks = result + pagination_info = None + else: + if "tasks" in result: + tasks = result.get("tasks", []) + pagination_info = result.get("pagination", {}) + else: + tasks = result if isinstance(result, list) else [] + pagination_info = None + + return json.dumps({ + "success": True, + "tasks": tasks, + "pagination": pagination_info, + "total_count": len(tasks) + if pagination_info is None + else pagination_info.get("total", len(tasks)), + "count": len(tasks), + }) + + except Exception as e: + logger.error(f"Error listing tasks: {e}") + return json.dumps({"success": False, "error": str(e)}) + + @mcp.tool() + async def get_task(ctx: Context, task_id: str) -> str: + """ + Get detailed information about a specific task. + + Args: + task_id: UUID of the task + + Returns: + JSON with complete task details + + Example: + get_task(task_id="task-uuid") + """ + try: + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.get(urljoin(api_url, f"/api/tasks/{task_id}")) + + if response.status_code == 200: + task = response.json() + return json.dumps({"success": True, "task": task}) + elif response.status_code == 404: + return json.dumps({"success": False, "error": f"Task {task_id} not found"}) + else: + return json.dumps({"success": False, "error": "Failed to get task"}) + + except Exception as e: + logger.error(f"Error getting task: {e}") + return json.dumps({"success": False, "error": str(e)}) + + @mcp.tool() + async def update_task( + ctx: Context, + task_id: str, + update_fields: Dict[str, Any], + ) -> str: + """ + Update a task's properties. + + Args: + task_id: UUID of the task to update + update_fields: Dict of fields to update (e.g., {"status": "doing", "assignee": "AI IDE Agent"}) + + Returns: + JSON with updated task details + + Examples: + update_task(task_id="uuid", update_fields={"status": "doing"}) + update_task(task_id="uuid", update_fields={"title": "New Title", "description": "Updated description"}) + """ + try: + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.put( + urljoin(api_url, f"/api/tasks/{task_id}"), json=update_fields + ) + + if response.status_code == 200: + result = response.json() + return json.dumps({ + "success": True, + "task": result.get("task"), + "message": result.get("message", "Task updated successfully"), + }) + else: + error_detail = response.text + return json.dumps({"success": False, "error": error_detail}) + + except Exception as e: + logger.error(f"Error updating task: {e}") + return json.dumps({"success": False, "error": str(e)}) + + @mcp.tool() + async def delete_task(ctx: Context, task_id: str) -> str: + """ + Delete/archive a task. + + This removes the task from active lists but preserves it in the database + for audit purposes (soft delete). + + Args: + task_id: UUID of the task to delete/archive + + Returns: + JSON confirmation of deletion: + { + "success": true, + "message": "Task deleted successfully", + "subtasks_archived": 0 + } + + Example: + delete_task(task_id="task-123e4567-e89b-12d3-a456-426614174000") + """ + try: + api_url = get_api_url() + timeout = httpx.Timeout(30.0, connect=5.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.delete(urljoin(api_url, f"/api/tasks/{task_id}")) + + if response.status_code == 200: + result = response.json() + return json.dumps({ + "success": True, + "message": result.get("message", f"Task {task_id} deleted successfully"), + "subtasks_archived": result.get("subtasks_archived", 0), + }) + elif response.status_code == 404: + return json.dumps({ + "success": False, + "error": f"Task {task_id} not found. Use list_tasks to find valid task IDs." + }) + elif response.status_code == 400: + # More specific error for bad requests + error_text = response.text + if "already archived" in error_text.lower(): + return json.dumps({ + "success": False, + "error": f"Task {task_id} is already archived. No further action needed." + }) + return json.dumps({ + "success": False, + "error": f"Cannot delete task: {error_text}" + }) + else: + return json.dumps({ + "success": False, + "error": f"Failed to delete task (HTTP {response.status_code}): {response.text}" + }) + + except Exception as e: + logger.error(f"Error deleting task: {e}") + return json.dumps({"success": False, "error": str(e)}) +