mirror of
https://github.com/coleam00/Archon.git
synced 2026-01-02 20:59:13 -05:00
The New Archon (Beta) - The Operating System for AI Coding Assistants!
This commit is contained in:
@@ -0,0 +1,173 @@
|
||||
from __future__ import annotations as _annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, List, Dict
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from dotenv import load_dotenv
|
||||
import shutil
|
||||
import time
|
||||
import re
|
||||
import json
|
||||
|
||||
import httpx
|
||||
import logfire
|
||||
from pydantic_ai import Agent, ModelRetry, RunContext
|
||||
from pydantic_ai.providers.openai import OpenAIProvider
|
||||
from pydantic_ai.models.openai import OpenAIModel
|
||||
from devtools import debug
|
||||
|
||||
load_dotenv()
|
||||
|
||||
llm = os.getenv('LLM_MODEL', 'deepseek/deepseek-chat')
|
||||
model = OpenAIModel(
|
||||
llm,
|
||||
provider=OpenAIProvider(base_url="https://openrouter.ai/api/v1", api_key=os.getenv('OPEN_ROUTER_API_KEY'))
|
||||
) if os.getenv('OPEN_ROUTER_API_KEY', None) else OpenAIModel(llm)
|
||||
|
||||
logfire.configure(send_to_logfire='if-token-present')
|
||||
|
||||
@dataclass
|
||||
class GitHubDeps:
|
||||
client: httpx.AsyncClient
|
||||
github_token: str | None = None
|
||||
|
||||
system_prompt = """
|
||||
You are a coding expert with access to GitHub to help the user manage their repository and get information from it.
|
||||
|
||||
Your only job is to assist with this and you don't answer other questions besides describing what you are able to do.
|
||||
|
||||
Don't ask the user before taking an action, just do it. Always make sure you look at the repository with the provided tools before answering the user's question unless you have already.
|
||||
|
||||
When answering a question about the repo, always start your answer with the full repo URL in brackets and then give your answer on a newline. Like:
|
||||
|
||||
[Using https://github.com/[repo URL from the user]]
|
||||
|
||||
Your answer here...
|
||||
"""
|
||||
|
||||
github_agent = Agent(
|
||||
model,
|
||||
system_prompt=system_prompt,
|
||||
deps_type=GitHubDeps,
|
||||
retries=2
|
||||
)
|
||||
|
||||
@github_agent.tool
|
||||
async def get_repo_info(ctx: RunContext[GitHubDeps], github_url: str) -> str:
|
||||
"""Get repository information including size and description using GitHub API.
|
||||
|
||||
Args:
|
||||
ctx: The context.
|
||||
github_url: The GitHub repository URL.
|
||||
|
||||
Returns:
|
||||
str: Repository information as a formatted string.
|
||||
"""
|
||||
match = re.search(r'github\.com[:/]([^/]+)/([^/]+?)(?:\.git)?$', github_url)
|
||||
if not match:
|
||||
return "Invalid GitHub URL format"
|
||||
|
||||
owner, repo = match.groups()
|
||||
headers = {'Authorization': f'token {ctx.deps.github_token}'} if ctx.deps.github_token else {}
|
||||
|
||||
response = await ctx.deps.client.get(
|
||||
f'https://api.github.com/repos/{owner}/{repo}',
|
||||
headers=headers
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
return f"Failed to get repository info: {response.text}"
|
||||
|
||||
data = response.json()
|
||||
size_mb = data['size'] / 1024
|
||||
|
||||
return (
|
||||
f"Repository: {data['full_name']}\n"
|
||||
f"Description: {data['description']}\n"
|
||||
f"Size: {size_mb:.1f}MB\n"
|
||||
f"Stars: {data['stargazers_count']}\n"
|
||||
f"Language: {data['language']}\n"
|
||||
f"Created: {data['created_at']}\n"
|
||||
f"Last Updated: {data['updated_at']}"
|
||||
)
|
||||
|
||||
@github_agent.tool
|
||||
async def get_repo_structure(ctx: RunContext[GitHubDeps], github_url: str) -> str:
|
||||
"""Get the directory structure of a GitHub repository.
|
||||
|
||||
Args:
|
||||
ctx: The context.
|
||||
github_url: The GitHub repository URL.
|
||||
|
||||
Returns:
|
||||
str: Directory structure as a formatted string.
|
||||
"""
|
||||
match = re.search(r'github\.com[:/]([^/]+)/([^/]+?)(?:\.git)?$', github_url)
|
||||
if not match:
|
||||
return "Invalid GitHub URL format"
|
||||
|
||||
owner, repo = match.groups()
|
||||
headers = {'Authorization': f'token {ctx.deps.github_token}'} if ctx.deps.github_token else {}
|
||||
|
||||
response = await ctx.deps.client.get(
|
||||
f'https://api.github.com/repos/{owner}/{repo}/git/trees/main?recursive=1',
|
||||
headers=headers
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
# Try with master branch if main fails
|
||||
response = await ctx.deps.client.get(
|
||||
f'https://api.github.com/repos/{owner}/{repo}/git/trees/master?recursive=1',
|
||||
headers=headers
|
||||
)
|
||||
if response.status_code != 200:
|
||||
return f"Failed to get repository structure: {response.text}"
|
||||
|
||||
data = response.json()
|
||||
tree = data['tree']
|
||||
|
||||
# Build directory structure
|
||||
structure = []
|
||||
for item in tree:
|
||||
if not any(excluded in item['path'] for excluded in ['.git/', 'node_modules/', '__pycache__/']):
|
||||
structure.append(f"{'📁 ' if item['type'] == 'tree' else '📄 '}{item['path']}")
|
||||
|
||||
return "\n".join(structure)
|
||||
|
||||
@github_agent.tool
|
||||
async def get_file_content(ctx: RunContext[GitHubDeps], github_url: str, file_path: str) -> str:
|
||||
"""Get the content of a specific file from the GitHub repository.
|
||||
|
||||
Args:
|
||||
ctx: The context.
|
||||
github_url: The GitHub repository URL.
|
||||
file_path: Path to the file within the repository.
|
||||
|
||||
Returns:
|
||||
str: File content as a string.
|
||||
"""
|
||||
match = re.search(r'github\.com[:/]([^/]+)/([^/]+?)(?:\.git)?$', github_url)
|
||||
if not match:
|
||||
return "Invalid GitHub URL format"
|
||||
|
||||
owner, repo = match.groups()
|
||||
headers = {'Authorization': f'token {ctx.deps.github_token}'} if ctx.deps.github_token else {}
|
||||
|
||||
response = await ctx.deps.client.get(
|
||||
f'https://raw.githubusercontent.com/{owner}/{repo}/main/{file_path}',
|
||||
headers=headers
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
# Try with master branch if main fails
|
||||
response = await ctx.deps.client.get(
|
||||
f'https://raw.githubusercontent.com/{owner}/{repo}/master/{file_path}',
|
||||
headers=headers
|
||||
)
|
||||
if response.status_code != 200:
|
||||
return f"Failed to get file content: {response.text}"
|
||||
|
||||
return response.text
|
||||
@@ -0,0 +1,33 @@
|
||||
from pydantic_ai.providers.openai import OpenAIProvider
|
||||
from pydantic_ai.models.openai import OpenAIModel
|
||||
from pydantic_ai.mcp import MCPServerStdio
|
||||
from pydantic_ai import Agent
|
||||
from dotenv import load_dotenv
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
load_dotenv()
|
||||
|
||||
def get_model():
|
||||
llm = os.getenv('MODEL_CHOICE', 'gpt-4o-mini')
|
||||
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
|
||||
api_key = os.getenv('LLM_API_KEY', 'no-api-key-provided')
|
||||
|
||||
return OpenAIModel(llm, provider=OpenAIProvider(base_url=base_url, api_key=api_key))
|
||||
|
||||
server = MCPServerStdio(
|
||||
'npx',
|
||||
['-y', '@modelcontextprotocol/server-brave-search', 'stdio'],
|
||||
env={"BRAVE_API_KEY": os.getenv("BRAVE_API_KEY")}
|
||||
)
|
||||
agent = Agent(get_model(), mcp_servers=[server])
|
||||
|
||||
|
||||
async def main():
|
||||
async with agent.run_mcp_servers():
|
||||
result = await agent.run('What is new with Gemini 2.5 Pro?')
|
||||
print(result.data)
|
||||
user_input = input("Press enter to quit...")
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,110 @@
|
||||
from __future__ import annotations as _annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import logfire
|
||||
from devtools import debug
|
||||
from httpx import AsyncClient
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from openai import AsyncOpenAI
|
||||
from pydantic_ai.models.openai import OpenAIModel
|
||||
from pydantic_ai import Agent, ModelRetry, RunContext
|
||||
|
||||
load_dotenv()
|
||||
llm = os.getenv('LLM_MODEL', 'gpt-4o')
|
||||
|
||||
client = AsyncOpenAI(
|
||||
base_url = 'http://localhost:11434/v1',
|
||||
api_key='ollama'
|
||||
)
|
||||
|
||||
model = OpenAIModel(llm) if llm.lower().startswith("gpt") else OpenAIModel(llm, openai_client=client)
|
||||
|
||||
# 'if-token-present' means nothing will be sent (and the example will work) if you don't have logfire configured
|
||||
logfire.configure(send_to_logfire='if-token-present')
|
||||
|
||||
|
||||
@dataclass
|
||||
class Deps:
|
||||
client: AsyncClient
|
||||
brave_api_key: str | None
|
||||
|
||||
|
||||
web_search_agent = Agent(
|
||||
model,
|
||||
system_prompt=f'You are an expert at researching the web to answer user questions. The current date is: {datetime.now().strftime("%Y-%m-%d")}',
|
||||
deps_type=Deps,
|
||||
retries=2
|
||||
)
|
||||
|
||||
|
||||
@web_search_agent.tool
|
||||
async def search_web(
|
||||
ctx: RunContext[Deps], web_query: str
|
||||
) -> str:
|
||||
"""Search the web given a query defined to answer the user's question.
|
||||
|
||||
Args:
|
||||
ctx: The context.
|
||||
web_query: The query for the web search.
|
||||
|
||||
Returns:
|
||||
str: The search results as a formatted string.
|
||||
"""
|
||||
if ctx.deps.brave_api_key is None:
|
||||
return "This is a test web search result. Please provide a Brave API key to get real search results."
|
||||
|
||||
headers = {
|
||||
'X-Subscription-Token': ctx.deps.brave_api_key,
|
||||
'Accept': 'application/json',
|
||||
}
|
||||
|
||||
with logfire.span('calling Brave search API', query=web_query) as span:
|
||||
r = await ctx.deps.client.get(
|
||||
'https://api.search.brave.com/res/v1/web/search',
|
||||
params={
|
||||
'q': web_query,
|
||||
'count': 5,
|
||||
'text_decorations': True,
|
||||
'search_lang': 'en'
|
||||
},
|
||||
headers=headers
|
||||
)
|
||||
r.raise_for_status()
|
||||
data = r.json()
|
||||
span.set_attribute('response', data)
|
||||
|
||||
results = []
|
||||
|
||||
# Add web results in a nice formatted way
|
||||
web_results = data.get('web', {}).get('results', [])
|
||||
for item in web_results[:3]:
|
||||
title = item.get('title', '')
|
||||
description = item.get('description', '')
|
||||
url = item.get('url', '')
|
||||
if title and description:
|
||||
results.append(f"Title: {title}\nSummary: {description}\nSource: {url}\n")
|
||||
|
||||
return "\n".join(results) if results else "No results found for the query."
|
||||
|
||||
|
||||
async def main():
|
||||
async with AsyncClient() as client:
|
||||
brave_api_key = os.getenv('BRAVE_API_KEY', None)
|
||||
deps = Deps(client=client, brave_api_key=brave_api_key)
|
||||
|
||||
result = await web_search_agent.run(
|
||||
'Give me some articles talking about the new release of React 19.', deps=deps
|
||||
)
|
||||
|
||||
debug(result)
|
||||
print('Response:', result.data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user