import os from 'os'; import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { CallToolRequestSchema, ListToolsRequestSchema, ListPromptsRequestSchema, GetPromptRequestSchema, ServerCapabilities, } from '@modelcontextprotocol/sdk/types.js'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; import { StreamableHTTPClientTransport, StreamableHTTPClientTransportOptions, } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; import { createFetchWithProxy, getProxyConfigFromEnv } from './proxy.js'; import { ServerInfo, ServerConfig, Tool } from '../types/index.js'; import { expandEnvVars, replaceEnvVars, getNameSeparator } from '../config/index.js'; import config from '../config/index.js'; import { getGroup } from './sseService.js'; import { getServersInGroup, getServerConfigInGroup } from './groupService.js'; import { saveToolsAsVectorEmbeddings, searchToolsByVector } from './vectorSearchService.js'; import { OpenAPIClient } from '../clients/openapi.js'; import { RequestContextService } from './requestContextService.js'; import { getDataService } from './services.js'; import { getServerDao, getSystemConfigDao, ServerConfigWithName } from '../dao/index.js'; import { initializeAllOAuthClients } from './oauthService.js'; import { createOAuthProvider } from './mcpOAuthProvider.js'; const servers: { [sessionId: string]: Server } = {}; import { setupClientKeepAlive } from './keepAliveService.js'; export const initUpstreamServers = async (): Promise => { // Initialize OAuth clients for servers with dynamic registration await initializeAllOAuthClients(); // Register all tools from upstream servers await registerAllTools(true); }; export const getMcpServer = (sessionId?: string, group?: string): Server => { if (!sessionId) { return createMcpServer(config.mcpHubName, config.mcpHubVersion, group); } if (!servers[sessionId]) { const serverGroup = group || getGroup(sessionId); const server = createMcpServer(config.mcpHubName, config.mcpHubVersion, serverGroup); servers[sessionId] = server; } else { console.log(`MCP server already exists for sessionId: ${sessionId}`); } return servers[sessionId]; }; export const deleteMcpServer = (sessionId: string): void => { delete servers[sessionId]; }; export const notifyToolChanged = async (name?: string) => { await registerAllTools(false, name); Object.values(servers).forEach((server) => { server .sendToolListChanged() .catch((error) => { console.warn('Failed to send tool list changed notification:', error.message); }) .then(() => { console.log('Tool list changed notification sent successfully'); }); }); }; export const syncToolEmbedding = async (serverName: string, toolName: string) => { const serverInfo = getServerByName(serverName); if (!serverInfo) { console.warn(`Server not found: ${serverName}`); return; } const tool = serverInfo.tools.find((t) => t.name === toolName); if (!tool) { console.warn(`Tool not found: ${toolName} on server: ${serverName}`); return; } // Save tool as vector embedding for search saveToolsAsVectorEmbeddings(serverName, [tool]); }; // Helper function to clean $schema field from inputSchema const cleanInputSchema = (schema: any): any => { if (!schema || typeof schema !== 'object') { return schema; } const cleanedSchema = { ...schema }; delete cleanedSchema.$schema; return cleanedSchema; }; // Store all server information let serverInfos: ServerInfo[] = []; // Returns true if all enabled servers are connected export const connected = (): boolean => { return serverInfos .filter((serverInfo) => serverInfo.enabled !== false) .every((serverInfo) => serverInfo.status === 'connected'); }; // Global cleanup function to close all connections export const cleanupAllServers = (): void => { for (const serverInfo of serverInfos) { try { if (serverInfo.client) { serverInfo.client.close(); } if (serverInfo.transport) { serverInfo.transport.close(); } } catch (error) { console.warn(`Error closing server ${serverInfo.name}:`, error); } } serverInfos = []; // Clear session servers as well Object.keys(servers).forEach((sessionId) => { delete servers[sessionId]; }); }; // Helper function to create transport based on server configuration export const createTransportFromConfig = async (name: string, conf: ServerConfig): Promise => { let transport; const env: Record = { ...(process.env as Record), ...replaceEnvVars(conf.env || {}), }; if (conf.type === 'streamable-http') { const options: StreamableHTTPClientTransportOptions = {}; const headers = conf.headers ? replaceEnvVars(conf.headers) : {}; if (Object.keys(headers).length > 0) { options.requestInit = { headers, }; } // Create OAuth provider if configured - SDK will handle authentication automatically const authProvider = await createOAuthProvider(name, conf); if (authProvider) { options.authProvider = authProvider; console.log(`OAuth provider configured for server: ${name}`); } options.fetch = createFetchWithProxy(getProxyConfigFromEnv(env)); transport = new StreamableHTTPClientTransport(new URL(conf.url || ''), options); } else if (conf.url) { // SSE transport const options: any = {}; const headers = conf.headers ? replaceEnvVars(conf.headers) : {}; if (Object.keys(headers).length > 0) { options.eventSourceInit = { headers, }; options.requestInit = { headers, }; } // Create OAuth provider if configured - SDK will handle authentication automatically const authProvider = await createOAuthProvider(name, conf); if (authProvider) { options.authProvider = authProvider; console.log(`OAuth provider configured for server: ${name}`); } options.fetch = createFetchWithProxy(getProxyConfigFromEnv(env)); transport = new SSEClientTransport(new URL(conf.url), options); } else if (conf.command && conf.args) { // Stdio transport env['PATH'] = expandEnvVars(process.env.PATH as string) || ''; const systemConfigDao = getSystemConfigDao(); const systemConfig = await systemConfigDao.get(); // Add UV_DEFAULT_INDEX and npm_config_registry if needed if ( systemConfig?.install?.pythonIndexUrl && (conf.command === 'uvx' || conf.command === 'uv' || conf.command === 'python') ) { env['UV_DEFAULT_INDEX'] = systemConfig.install.pythonIndexUrl; } if ( systemConfig?.install?.npmRegistry && (conf.command === 'npm' || conf.command === 'npx' || conf.command === 'pnpm' || conf.command === 'yarn' || conf.command === 'node') ) { env['npm_config_registry'] = systemConfig.install.npmRegistry; } // Expand environment variables in command transport = new StdioClientTransport({ cwd: os.homedir(), command: conf.command, args: replaceEnvVars(conf.args) as string[], env: env, stderr: 'pipe', }); transport.stderr?.on('data', (data) => { console.log(`[${name}] [child] ${data}`); }); } else { throw new Error(`Unable to create transport for server: ${name}`); } return transport; }; // Helper function to handle client.callTool with reconnection logic const callToolWithReconnect = async ( serverInfo: ServerInfo, toolParams: any, options?: any, maxRetries: number = 1, ): Promise => { if (!serverInfo.client) { throw new Error(`Client not found for server: ${serverInfo.name}`); } for (let attempt = 0; attempt <= maxRetries; attempt++) { try { const result = await serverInfo.client.callTool(toolParams, undefined, options || {}); // Check auth error checkAuthError(result); return result; } catch (error: any) { // Check if error message starts with "Error POSTing to endpoint (HTTP 40" const isHttp40xError = error?.message?.startsWith?.('Error POSTing to endpoint (HTTP 40'); // Only retry for StreamableHTTPClientTransport const isStreamableHttp = serverInfo.transport instanceof StreamableHTTPClientTransport; const isSSE = serverInfo.transport instanceof SSEClientTransport; if ( attempt < maxRetries && serverInfo.transport && ((isStreamableHttp && isHttp40xError) || isSSE) ) { console.warn( `${isHttp40xError ? 'HTTP 40x error' : 'error'} detected for ${isStreamableHttp ? 'StreamableHTTP' : 'SSE'} server ${serverInfo.name}, attempting reconnection (attempt ${attempt + 1}/${maxRetries + 1})`, ); try { // Close existing connection if (serverInfo.keepAliveIntervalId) { clearInterval(serverInfo.keepAliveIntervalId); serverInfo.keepAliveIntervalId = undefined; } serverInfo.client.close(); serverInfo.transport.close(); const server = await getServerDao().findById(serverInfo.name); if (!server) { throw new Error(`Server configuration not found for: ${serverInfo.name}`); } // Recreate transport using helper function const newTransport = await createTransportFromConfig(serverInfo.name, server); // Create new client const client = new Client( { name: `mcp-client-${serverInfo.name}`, version: '1.0.0', }, { capabilities: {}, }, ); // Reconnect with new transport await client.connect(newTransport, serverInfo.options || {}); // Update server info with new client and transport serverInfo.client = client; serverInfo.transport = newTransport; serverInfo.status = 'connected'; // Reload tools list after reconnection try { const tools = await client.listTools({}, serverInfo.options || {}); serverInfo.tools = tools.tools.map((tool) => ({ name: `${serverInfo.name}${getNameSeparator()}${tool.name}`, description: tool.description || '', inputSchema: cleanInputSchema(tool.inputSchema || {}), })); // Save tools as vector embeddings for search saveToolsAsVectorEmbeddings(serverInfo.name, serverInfo.tools); } catch (listToolsError) { console.warn( `Failed to reload tools after reconnection for server ${serverInfo.name}:`, listToolsError, ); // Continue anyway, as the connection might still work for the current tool } console.log(`Successfully reconnected to server: ${serverInfo.name}`); // Continue to next attempt continue; } catch (reconnectError) { console.error(`Failed to reconnect to server ${serverInfo.name}:`, reconnectError); serverInfo.status = 'disconnected'; serverInfo.error = `Failed to reconnect: ${reconnectError}`; // If this was the last attempt, throw the original error if (attempt === maxRetries) { throw error; } } } else { // Not an HTTP 40x error or no more retries, throw the original error throw error; } } } // This should not be reached, but just in case throw new Error('Unexpected error in callToolWithReconnect'); }; // Initialize MCP server clients export const initializeClientsFromSettings = async ( isInit: boolean, serverName?: string, ): Promise => { const allServers: ServerConfigWithName[] = await getServerDao().findAll(); const existingServerInfos = serverInfos; const nextServerInfos: ServerInfo[] = []; try { for (const conf of allServers) { const { name } = conf; // Expand environment variables in all configuration values const expandedConf = replaceEnvVars(conf as any) as ServerConfigWithName; // Skip disabled servers if (expandedConf.enabled === false) { console.log(`Skipping disabled server: ${name}`); nextServerInfos.push({ name, owner: expandedConf.owner, status: 'disconnected', error: null, tools: [], prompts: [], createTime: Date.now(), enabled: false, }); continue; } // Check if server is already connected const existingServer = existingServerInfos.find( (s) => s.name === name && s.status === 'connected', ); if (existingServer && (!serverName || serverName !== name)) { nextServerInfos.push({ ...existingServer, enabled: expandedConf.enabled === undefined ? true : expandedConf.enabled, }); console.log(`Server '${name}' is already connected.`); continue; } let transport; let openApiClient; if (expandedConf.type === 'openapi') { // Handle OpenAPI type servers if (!expandedConf.openapi?.url && !expandedConf.openapi?.schema) { console.warn( `Skipping OpenAPI server '${name}': missing OpenAPI specification URL or schema`, ); nextServerInfos.push({ name, owner: expandedConf.owner, status: 'disconnected', error: 'Missing OpenAPI specification URL or schema', tools: [], prompts: [], createTime: Date.now(), }); continue; } // Create server info first and keep reference to it const serverInfo: ServerInfo = { name, owner: expandedConf.owner, status: 'connecting', error: null, tools: [], prompts: [], createTime: Date.now(), enabled: expandedConf.enabled === undefined ? true : expandedConf.enabled, config: expandedConf, // Store reference to expanded config for OpenAPI passthrough headers }; nextServerInfos.push(serverInfo); try { // Create OpenAPI client instance openApiClient = new OpenAPIClient(expandedConf); console.log(`Initializing OpenAPI server: ${name}...`); // Perform async initialization await openApiClient.initialize(); // Convert OpenAPI tools to MCP tool format const openApiTools = openApiClient.getTools(); const mcpTools: Tool[] = openApiTools.map((tool) => ({ name: `${name}${getNameSeparator()}${tool.name}`, description: tool.description, inputSchema: cleanInputSchema(tool.inputSchema), })); // Update server info with successful initialization serverInfo.status = 'connected'; serverInfo.tools = mcpTools; serverInfo.openApiClient = openApiClient; console.log( `Successfully initialized OpenAPI server: ${name} with ${mcpTools.length} tools`, ); // Save tools as vector embeddings for search saveToolsAsVectorEmbeddings(name, mcpTools); continue; } catch (error) { console.error(`Failed to initialize OpenAPI server ${name}:`, error); // Update the already pushed server info with error status serverInfo.status = 'disconnected'; serverInfo.error = `Failed to initialize OpenAPI server: ${error}`; continue; } } else { transport = await createTransportFromConfig(name, expandedConf); } const client = new Client( { name: `mcp-client-${name}`, version: '1.0.0', }, { capabilities: {}, }, ); const initRequestOptions = isInit ? { timeout: Number(config.initTimeout) || 60000, } : undefined; // Get request options from server configuration, with fallbacks const serverRequestOptions = expandedConf.options || {}; const requestOptions = { timeout: serverRequestOptions.timeout || 60000, resetTimeoutOnProgress: serverRequestOptions.resetTimeoutOnProgress || false, maxTotalTimeout: serverRequestOptions.maxTotalTimeout, }; // Create server info first and keep reference to it const serverInfo: ServerInfo = { name, owner: expandedConf.owner, status: 'connecting', error: null, tools: [], prompts: [], client, transport, options: requestOptions, createTime: Date.now(), config: expandedConf, // Store reference to expanded config }; const pendingAuth = expandedConf.oauth?.pendingAuthorization; if (pendingAuth) { serverInfo.status = 'oauth_required'; serverInfo.error = null; serverInfo.oauth = { authorizationUrl: pendingAuth.authorizationUrl, state: pendingAuth.state, codeVerifier: pendingAuth.codeVerifier, }; } nextServerInfos.push(serverInfo); client .connect(transport, initRequestOptions || requestOptions) .then(() => { console.log(`Successfully connected client for server: ${name}`); const capabilities: ServerCapabilities | undefined = client.getServerCapabilities(); console.log(`Server capabilities: ${JSON.stringify(capabilities)}`); let dataError: Error | null = null; if (capabilities?.tools) { client .listTools({}, initRequestOptions || requestOptions) .then((tools) => { console.log(`Successfully listed ${tools.tools.length} tools for server: ${name}`); serverInfo.tools = tools.tools.map((tool) => ({ name: `${name}${getNameSeparator()}${tool.name}`, description: tool.description || '', inputSchema: cleanInputSchema(tool.inputSchema || {}), })); // Save tools as vector embeddings for search saveToolsAsVectorEmbeddings(name, serverInfo.tools); }) .catch((error) => { console.error( `Failed to list tools for server ${name} by error: ${error} with stack: ${error.stack}`, ); dataError = error; }); } if (capabilities?.prompts) { client .listPrompts({}, initRequestOptions || requestOptions) .then((prompts) => { console.log( `Successfully listed ${prompts.prompts.length} prompts for server: ${name}`, ); serverInfo.prompts = prompts.prompts.map((prompt) => ({ name: `${name}${getNameSeparator()}${prompt.name}`, title: prompt.title, description: prompt.description, arguments: prompt.arguments, })); }) .catch((error) => { console.error( `Failed to list prompts for server ${name} by error: ${error} with stack: ${error.stack}`, ); dataError = error; }); } if (!dataError) { serverInfo.status = 'connected'; serverInfo.error = null; // Set up keep-alive ping for SSE connections via shared service setupClientKeepAlive(serverInfo, expandedConf).catch((e) => console.warn(`Keepalive setup failed for ${name}:`, e), ); } else { serverInfo.status = 'disconnected'; serverInfo.error = `Failed to list data: ${dataError} `; } }) .catch(async (error) => { // Check if this is an OAuth authorization error const isOAuthError = error?.message?.includes('OAuth authorization required') || error?.message?.includes('Authorization required'); if (isOAuthError) { // OAuth provider should have already set the status to 'oauth_required' // and stored the authorization URL in serverInfo.oauth console.log( `OAuth authorization required for server ${name}. Status should be set to 'oauth_required'.`, ); // Make sure status is set correctly if (serverInfo.status !== 'oauth_required') { serverInfo.status = 'oauth_required'; } serverInfo.error = null; } else { console.error( `Failed to connect client for server ${name} by error: ${error} with stack: ${error.stack}`, ); // Other connection errors serverInfo.status = 'disconnected'; serverInfo.error = `Failed to connect: ${error.stack} `; } }); console.log(`Initialized client for server: ${name}`); } } catch (error) { // Restore previous state if initialization fails to avoid exposing an empty server list serverInfos = existingServerInfos; throw error; } serverInfos = nextServerInfos; return serverInfos; }; // Register all MCP tools export const registerAllTools = async (isInit: boolean, serverName?: string): Promise => { await initializeClientsFromSettings(isInit, serverName); }; // Get all server information export const getServersInfo = async (): Promise[]> => { const allServers: ServerConfigWithName[] = await getServerDao().findAll(); const dataService = getDataService(); // Ensure that servers recently added via DAO but not yet initialized in serverInfos // are still visible in the servers list. This avoids a race condition where // a POST /api/servers immediately followed by GET /api/servers would not // return the newly created server until background initialization completes. const combinedServerInfos: ServerInfo[] = [...serverInfos]; const existingNames = new Set(combinedServerInfos.map((s) => s.name)); for (const server of allServers) { if (!existingNames.has(server.name)) { const isEnabled = server.enabled === undefined ? true : server.enabled; combinedServerInfos.push({ name: server.name, owner: server.owner, // Newly created servers that are enabled should appear as "connecting" // until the MCP client initialization completes. Disabled servers remain // in the "disconnected" state. status: isEnabled ? 'connecting' : 'disconnected', error: null, tools: [], prompts: [], createTime: Date.now(), enabled: isEnabled, }); } } const filterServerInfos: ServerInfo[] = dataService.filterData ? dataService.filterData(combinedServerInfos) : combinedServerInfos; const infos = filterServerInfos.map( ({ name, status, tools, prompts, createTime, error, oauth }) => { const serverConfig = allServers.find((server) => server.name === name); const enabled = serverConfig ? serverConfig.enabled !== false : true; // Add enabled status and custom description to each tool const toolsWithEnabled = tools.map((tool) => { const toolConfig = serverConfig?.tools?.[tool.name]; return { ...tool, description: toolConfig?.description || tool.description, // Use custom description if available enabled: toolConfig?.enabled !== false, // Default to true if not explicitly disabled }; }); const promptsWithEnabled = prompts.map((prompt) => { const promptConfig = serverConfig?.prompts?.[prompt.name]; return { ...prompt, description: promptConfig?.description || prompt.description, // Use custom description if available enabled: promptConfig?.enabled !== false, // Default to true if not explicitly disabled }; }); return { name, status, error, tools: toolsWithEnabled, prompts: promptsWithEnabled, createTime, enabled, oauth: oauth ? { authorizationUrl: oauth.authorizationUrl, state: oauth.state, // Don't expose codeVerifier to frontend for security } : undefined, }; }, ); infos.sort((a, b) => { if (a.enabled === b.enabled) return 0; return a.enabled ? -1 : 1; }); return infos; }; // Get server by name export const getServerByName = (name: string): ServerInfo | undefined => { return serverInfos.find((serverInfo) => serverInfo.name === name); }; // Get server by OAuth state parameter export const getServerByOAuthState = (state: string): ServerInfo | undefined => { return serverInfos.find((serverInfo) => serverInfo.oauth?.state === state); }; /** * Reconnect a server after OAuth authorization or configuration change * This will close the existing connection and reinitialize the server */ export const reconnectServer = async (serverName: string): Promise => { console.log(`Reconnecting server: ${serverName}`); const serverInfo = getServerByName(serverName); if (!serverInfo) { throw new Error(`Server not found: ${serverName}`); } // Close existing connection if any if (serverInfo.client) { try { serverInfo.client.close(); } catch (error) { console.warn(`Error closing client for server ${serverName}:`, error); } } if (serverInfo.transport) { try { serverInfo.transport.close(); } catch (error) { console.warn(`Error closing transport for server ${serverName}:`, error); } } if (serverInfo.keepAliveIntervalId) { clearInterval(serverInfo.keepAliveIntervalId); serverInfo.keepAliveIntervalId = undefined; } // Reinitialize the server await initializeClientsFromSettings(false, serverName); console.log(`Successfully reconnected server: ${serverName}`); }; // Filter tools by server configuration const filterToolsByConfig = async (serverName: string, tools: Tool[]): Promise => { const serverConfig = await getServerDao().findById(serverName); if (!serverConfig || !serverConfig.tools) { // If no tool configuration exists, all tools are enabled by default return tools; } return tools.filter((tool) => { const toolConfig = serverConfig.tools?.[tool.name]; // If tool is not in config, it's enabled by default return toolConfig?.enabled !== false; }); }; // Get server by tool name const getServerByTool = (toolName: string): ServerInfo | undefined => { return serverInfos.find((serverInfo) => serverInfo.tools.some((tool) => tool.name === toolName)); }; // Add new server export const addServer = async ( name: string, config: ServerConfig, ): Promise<{ success: boolean; message?: string }> => { const server: ServerConfigWithName = { name, ...config }; const result = await getServerDao().create(server); if (result) { return { success: true, message: 'Server added successfully' }; } else { return { success: false, message: 'Failed to add server' }; } }; // Remove server export const removeServer = async ( name: string, ): Promise<{ success: boolean; message?: string }> => { const result = await getServerDao().delete(name); if (!result) { return { success: false, message: 'Failed to remove server' }; } serverInfos = serverInfos.filter((serverInfo) => serverInfo.name !== name); return { success: true, message: 'Server removed successfully' }; }; // Add or update server (supports overriding existing servers for DXT) export const addOrUpdateServer = async ( name: string, config: ServerConfig, allowOverride: boolean = false, ): Promise<{ success: boolean; message?: string }> => { try { const exists = await getServerDao().exists(name); if (exists && !allowOverride) { return { success: false, message: 'Server name already exists' }; } // If overriding an existing server, close connections and clear keep-alive timers if (exists) { // Close existing server connections (clears keep-alive intervals as well) closeServer(name); // Remove from server infos serverInfos = serverInfos.filter((serverInfo) => serverInfo.name !== name); } if (exists) { await getServerDao().update(name, config); } else { await getServerDao().create({ name, ...config }); } const action = exists ? 'updated' : 'added'; return { success: true, message: `Server ${action} successfully` }; } catch (error) { console.error(`Failed to add/update server: ${name}`, error); return { success: false, message: 'Failed to add/update server' }; } }; // Check for authentication error in tool call result function checkAuthError(result: any) { if (Array.isArray(result.content) && result.content.length > 0) { const text = result.content[0]?.text; if (typeof text === 'string') { let errorContent; try { errorContent = JSON.parse(text); } catch (e) { // Ignore JSON parse errors and continue return; } if (errorContent.code === 401) { throw new Error('Error POSTing to endpoint (HTTP 401 Unauthorized)'); } } } } // Close server client and transport function closeServer(name: string) { const serverInfo = serverInfos.find((serverInfo) => serverInfo.name === name); if (serverInfo && serverInfo.client && serverInfo.transport) { // Clear keep-alive interval if exists if (serverInfo.keepAliveIntervalId) { clearInterval(serverInfo.keepAliveIntervalId); serverInfo.keepAliveIntervalId = undefined; console.log(`Cleared keep-alive interval for server: ${serverInfo.name}`); } serverInfo.client.close(); serverInfo.transport.close(); console.log(`Closed client and transport for server: ${serverInfo.name}`); // TODO kill process } } // Toggle server enabled status export const toggleServerStatus = async ( name: string, enabled: boolean, ): Promise<{ success: boolean; message?: string }> => { try { await getServerDao().setEnabled(name, enabled); // If disabling, disconnect the server and remove from active servers if (!enabled) { closeServer(name); // Update the server info to show as disconnected and disabled const index = serverInfos.findIndex((s) => s.name === name); if (index !== -1) { serverInfos[index] = { ...serverInfos[index], status: 'disconnected', enabled: false, }; } } return { success: true, message: `Server ${enabled ? 'enabled' : 'disabled'} successfully` }; } catch (error) { console.error(`Failed to toggle server status: ${name}`, error); return { success: false, message: 'Failed to toggle server status' }; } }; export const handleListToolsRequest = async (_: any, extra: any) => { const sessionId = extra.sessionId || ''; const group = getGroup(sessionId); console.log(`Handling ListToolsRequest for group: ${group}`); // Special handling for $smart group to return special tools // Support both $smart and $smart/{group} patterns if (group === '$smart' || group?.startsWith('$smart/')) { // Extract target group if pattern is $smart/{group} const targetGroup = group?.startsWith('$smart/') ? group.substring(7) : undefined; // Get info about available servers, filtered by target group if specified let availableServers = serverInfos.filter( (server) => server.status === 'connected' && server.enabled !== false, ); // If a target group is specified, filter servers to only those in the group if (targetGroup) { const serversInGroup = await getServersInGroup(targetGroup); if (serversInGroup && serversInGroup.length > 0) { availableServers = availableServers.filter((server) => serversInGroup.includes(server.name), ); } } // Create simple server information with only server names const serversList = availableServers .map((server) => { return `${server.name}`; }) .join(', '); const scopeDescription = targetGroup ? `servers in the "${targetGroup}" group` : 'all available servers'; return { tools: [ { name: 'search_tools', description: `STEP 1 of 2: Use this tool FIRST to discover and search for relevant tools across ${scopeDescription}. This tool and call_tool work together as a two-step process: 1) search_tools to find what you need, 2) call_tool to execute it. For optimal results, use specific queries matching your exact needs. Call this tool multiple times with different queries for different parts of complex tasks. Example queries: "image generation tools", "code review tools", "data analysis", "translation capabilities", etc. Results are sorted by relevance using vector similarity. After finding relevant tools, you MUST use the call_tool to actually execute them. The search_tools only finds tools - it doesn't execute them. Available servers: ${serversList}`, inputSchema: { type: 'object', properties: { query: { type: 'string', description: 'The search query to find relevant tools. Be specific and descriptive about the task you want to accomplish.', }, limit: { type: 'integer', description: 'Maximum number of results to return. Use higher values (20-30) for broad searches and lower values (5-10) for specific searches.', default: 10, }, }, required: ['query'], }, }, { name: 'call_tool', description: "STEP 2 of 2: Use this tool AFTER search_tools to actually execute/invoke any tool you found. This is the execution step - search_tools finds tools, call_tool runs them.\n\nWorkflow: search_tools → examine results → call_tool with the chosen tool name and required arguments.\n\nIMPORTANT: Always check the tool's inputSchema from search_tools results before invoking to ensure you provide the correct arguments. The search results will show you exactly what parameters each tool expects.", inputSchema: { type: 'object', properties: { toolName: { type: 'string', description: 'The exact name of the tool to invoke (from search_tools results)', }, arguments: { type: 'object', description: 'The arguments to pass to the tool based on its inputSchema (optional if tool requires no arguments)', }, }, required: ['toolName'], }, }, ], }; } // Need to filter servers based on group asynchronously const filteredServerInfos = []; for (const serverInfo of getDataService().filterData(serverInfos)) { if (serverInfo.enabled === false) continue; if (!group) { filteredServerInfos.push(serverInfo); continue; } const serversInGroup = await getServersInGroup(group); if (!serversInGroup || serversInGroup.length === 0) { if (serverInfo.name === group) filteredServerInfos.push(serverInfo); continue; } if (serversInGroup.includes(serverInfo.name)) { filteredServerInfos.push(serverInfo); } } const allTools = []; for (const serverInfo of filteredServerInfos) { if (serverInfo.tools && serverInfo.tools.length > 0) { // Filter tools based on server configuration let tools = await filterToolsByConfig(serverInfo.name, serverInfo.tools); // If this is a group request, apply group-level tool filtering tools = await filterToolsByGroup(group, serverInfo.name, tools); // Apply custom descriptions from server configuration const serverConfig = await getServerDao().findById(serverInfo.name); const toolsWithCustomDescriptions = tools.map((tool) => { const toolConfig = serverConfig?.tools?.[tool.name]; return { ...tool, description: toolConfig?.description || tool.description, // Use custom description if available }; }); allTools.push(...toolsWithCustomDescriptions); } } return { tools: allTools, }; }; export const handleCallToolRequest = async (request: any, extra: any) => { console.log(`Handling CallToolRequest for tool: ${JSON.stringify(request.params)}`); try { // Special handling for agent group tools if (request.params.name === 'search_tools') { const { query, limit = 10 } = request.params.arguments || {}; if (!query || typeof query !== 'string') { throw new Error('Query parameter is required and must be a string'); } const limitNum = Math.min(Math.max(parseInt(String(limit)) || 10, 1), 100); // Dynamically adjust threshold based on query characteristics let thresholdNum = 0.3; // Default threshold // For more general queries, use a lower threshold to get more diverse results if (query.length < 10 || query.split(' ').length <= 2) { thresholdNum = 0.2; } // For very specific queries, use a higher threshold for more precise results if (query.length > 30 || query.includes('specific') || query.includes('exact')) { thresholdNum = 0.4; } console.log(`Using similarity threshold: ${thresholdNum} for query: "${query}"`); // Determine server filtering based on group const sessionId = extra.sessionId || ''; let group = getGroup(sessionId); let servers: string[] | undefined = undefined; // No server filtering by default // If group is in format $smart/{group}, filter servers to that group if (group?.startsWith('$smart/')) { const targetGroup = group.substring(7); if (targetGroup) { group = targetGroup; } const serversInGroup = await getServersInGroup(targetGroup); if (serversInGroup !== undefined && serversInGroup !== null) { servers = serversInGroup; if (servers && servers.length > 0) { console.log( `Filtering search to servers in group "${targetGroup}": ${servers.join(', ')}`, ); } else { console.log(`Group "${targetGroup}" has no servers, search will return no results`); } } } const searchResults = await searchToolsByVector(query, limitNum, thresholdNum, servers); console.log(`Search results: ${JSON.stringify(searchResults)}`); // Find actual tool information from serverInfos by serverName and toolName // First resolve all tool promises const resolvedTools = await Promise.all( searchResults.map(async (result) => { // Find the server in serverInfos const server = serverInfos.find( (serverInfo) => serverInfo.name === result.serverName && serverInfo.status === 'connected' && serverInfo.enabled !== false, ); if (server && server.tools && server.tools.length > 0) { // Find the tool in server.tools const actualTool = server.tools.find((tool) => tool.name === result.toolName); if (actualTool) { // Check if the tool is enabled in configuration const tools = await filterToolsByConfig(server.name, [actualTool]); if (tools.length > 0) { // Apply custom description from configuration const serverConfig = await getServerDao().findById(server.name); const toolConfig = serverConfig?.tools?.[actualTool.name]; // Return the actual tool info from serverInfos with custom description return { ...actualTool, description: toolConfig?.description || actualTool.description, serverName: result.serverName, // Add serverName for filtering }; } } } // Fallback to search result if server or tool not found or disabled return { name: result.toolName, description: result.description || '', inputSchema: cleanInputSchema(result.inputSchema || {}), serverName: result.serverName, // Add serverName for filtering }; }), ); // Now filter the resolved tools const filterResults = await Promise.all( resolvedTools.map(async (tool) => { if (tool.name) { const serverName = tool.serverName; if (serverName) { let tools = await filterToolsByConfig(serverName, [tool as Tool]); if (tools.length === 0) { return false; } tools = await filterToolsByGroup(group, serverName, tools); return tools.length > 0; } } return true; }), ); const tools = resolvedTools.filter((_, i) => filterResults[i]); // Add usage guidance to the response const response = { tools, metadata: { query: query, threshold: thresholdNum, totalResults: tools.length, guideline: tools.length > 0 ? "Found relevant tools. If these tools don't match exactly what you need, try another search with more specific keywords." : 'No tools found. Try broadening your search or using different keywords.', nextSteps: tools.length > 0 ? 'To use a tool, call call_tool with the toolName and required arguments.' : 'Consider searching for related capabilities or more general terms.', }, }; // Return in the same format as handleListToolsRequest return { content: [ { type: 'text', text: JSON.stringify(response), }, ], }; } // Special handling for call_tool if (request.params.name === 'call_tool') { let { toolName } = request.params.arguments || {}; if (!toolName) { throw new Error('toolName parameter is required'); } const { arguments: toolArgs = {} } = request.params.arguments || {}; let targetServerInfo: ServerInfo | undefined; if (extra && extra.server) { targetServerInfo = getServerByName(extra.server); } else { // Find the first server that has this tool targetServerInfo = serverInfos.find( (serverInfo) => serverInfo.status === 'connected' && serverInfo.enabled !== false && serverInfo.tools.some((tool) => tool.name === toolName), ); } if (!targetServerInfo) { throw new Error(`No available servers found with tool: ${toolName}`); } // Check if the tool exists on the server const toolExists = targetServerInfo.tools.some((tool) => tool.name === toolName); if (!toolExists) { throw new Error(`Tool '${toolName}' not found on server '${targetServerInfo.name}'`); } // Handle OpenAPI servers differently if (targetServerInfo.openApiClient) { // For OpenAPI servers, use the OpenAPI client const openApiClient = targetServerInfo.openApiClient; // Use toolArgs if it has properties, otherwise fallback to request.params.arguments const finalArgs = toolArgs && Object.keys(toolArgs).length > 0 ? toolArgs : request.params.arguments || {}; console.log( `Invoking OpenAPI tool '${toolName}' on server '${targetServerInfo.name}' with arguments: ${JSON.stringify(finalArgs)}`, ); // Remove server prefix from tool name if present const separator = getNameSeparator(); const prefix = `${targetServerInfo.name}${separator}`; const cleanToolName = toolName.startsWith(prefix) ? toolName.substring(prefix.length) : toolName; // Extract passthrough headers from extra or request context let passthroughHeaders: Record | undefined; let requestHeaders: Record | null = null; // Try to get headers from extra parameter first (if available) if (extra?.headers) { requestHeaders = extra.headers; } else { // Fallback to request context service const requestContextService = RequestContextService.getInstance(); requestHeaders = requestContextService.getHeaders(); } if (requestHeaders && targetServerInfo.config?.openapi?.passthroughHeaders) { passthroughHeaders = {}; for (const headerName of targetServerInfo.config.openapi.passthroughHeaders) { // Handle different header name cases (Express normalizes headers to lowercase) const headerValue = requestHeaders[headerName] || requestHeaders[headerName.toLowerCase()]; if (headerValue) { passthroughHeaders[headerName] = Array.isArray(headerValue) ? headerValue[0] : String(headerValue); } } } const result = await openApiClient.callTool(cleanToolName, finalArgs, passthroughHeaders); console.log(`OpenAPI tool invocation result: ${JSON.stringify(result)}`); return { content: [ { type: 'text', text: JSON.stringify(result), }, ], }; } // Call the tool on the target server (MCP servers) const client = targetServerInfo.client; if (!client) { throw new Error(`Client not found for server: ${targetServerInfo.name}`); } // Use toolArgs if it has properties, otherwise fallback to request.params.arguments const finalArgs = toolArgs && Object.keys(toolArgs).length > 0 ? toolArgs : request.params.arguments || {}; console.log( `Invoking tool '${toolName}' on server '${targetServerInfo.name}' with arguments: ${JSON.stringify(finalArgs)}`, ); const separator = getNameSeparator(); const prefix = `${targetServerInfo.name}${separator}`; toolName = toolName.startsWith(prefix) ? toolName.substring(prefix.length) : toolName; const result = await callToolWithReconnect( targetServerInfo, { name: toolName, arguments: finalArgs, }, targetServerInfo.options || {}, ); console.log(`Tool invocation result: ${JSON.stringify(result)}`); return result; } // Regular tool handling const serverInfo = getServerByTool(request.params.name); if (!serverInfo) { throw new Error(`Server not found: ${request.params.name}`); } // Handle OpenAPI servers differently if (serverInfo.openApiClient) { // For OpenAPI servers, use the OpenAPI client const openApiClient = serverInfo.openApiClient; // Remove server prefix from tool name if present const separator = getNameSeparator(); const prefix = `${serverInfo.name}${separator}`; const cleanToolName = request.params.name.startsWith(prefix) ? request.params.name.substring(prefix.length) : request.params.name; console.log( `Invoking OpenAPI tool '${cleanToolName}' on server '${serverInfo.name}' with arguments: ${JSON.stringify(request.params.arguments)}`, ); // Extract passthrough headers from extra or request context let passthroughHeaders: Record | undefined; let requestHeaders: Record | null = null; // Try to get headers from extra parameter first (if available) if (extra?.headers) { requestHeaders = extra.headers; } else { // Fallback to request context service const requestContextService = RequestContextService.getInstance(); requestHeaders = requestContextService.getHeaders(); } if (requestHeaders && serverInfo.config?.openapi?.passthroughHeaders) { passthroughHeaders = {}; for (const headerName of serverInfo.config.openapi.passthroughHeaders) { // Handle different header name cases (Express normalizes headers to lowercase) const headerValue = requestHeaders[headerName] || requestHeaders[headerName.toLowerCase()]; if (headerValue) { passthroughHeaders[headerName] = Array.isArray(headerValue) ? headerValue[0] : String(headerValue); } } } const result = await openApiClient.callTool( cleanToolName, request.params.arguments || {}, passthroughHeaders, ); console.log(`OpenAPI tool invocation result: ${JSON.stringify(result)}`); return { content: [ { type: 'text', text: JSON.stringify(result), }, ], }; } // Handle MCP servers const client = serverInfo.client; if (!client) { throw new Error(`Client not found for server: ${serverInfo.name}`); } const separator = getNameSeparator(); const prefix = `${serverInfo.name}${separator}`; request.params.name = request.params.name.startsWith(prefix) ? request.params.name.substring(prefix.length) : request.params.name; const result = await callToolWithReconnect( serverInfo, request.params, serverInfo.options || {}, ); console.log(`Tool call result: ${JSON.stringify(result)}`); return result; } catch (error) { console.error(`Error handling CallToolRequest: ${error}`); return { content: [ { type: 'text', text: `Error: ${error}`, }, ], isError: true, }; } }; export const handleGetPromptRequest = async (request: any, extra: any) => { try { const { name, arguments: promptArgs } = request.params; let server: ServerInfo | undefined; if (extra && extra.server) { server = getServerByName(extra.server); } else { // Find the first server that has this tool server = serverInfos.find( (serverInfo) => serverInfo.status === 'connected' && serverInfo.enabled !== false && serverInfo.prompts.find((prompt) => prompt.name === name), ); } if (!server) { throw new Error(`Server not found: ${name}`); } // Remove server prefix from prompt name if present const separator = getNameSeparator(); const prefix = `${server.name}${separator}`; const cleanPromptName = name.startsWith(prefix) ? name.substring(prefix.length) : name; const promptParams = { name: cleanPromptName || '', arguments: promptArgs, }; // Log the final promptParams console.log(`Calling getPrompt with params: ${JSON.stringify(promptParams)}`); const prompt = await server.client?.getPrompt(promptParams); console.log(`Received prompt: ${JSON.stringify(prompt)}`); if (!prompt) { throw new Error(`Prompt not found: ${cleanPromptName}`); } return prompt; } catch (error) { console.error(`Error handling GetPromptRequest: ${error}`); return { content: [ { type: 'text', text: `Error: ${error}`, }, ], isError: true, }; } }; export const handleListPromptsRequest = async (_: any, extra: any) => { const sessionId = extra.sessionId || ''; const group = getGroup(sessionId); console.log(`Handling ListPromptsRequest for group: ${group}`); // Need to filter servers based on group asynchronously const filteredServerInfos = []; for (const serverInfo of getDataService().filterData(serverInfos)) { if (serverInfo.enabled === false) continue; if (!group) { filteredServerInfos.push(serverInfo); continue; } const serversInGroup = await getServersInGroup(group); if (!serversInGroup || serversInGroup.length === 0) { if (serverInfo.name === group) filteredServerInfos.push(serverInfo); continue; } if (serversInGroup.includes(serverInfo.name)) { filteredServerInfos.push(serverInfo); } } const allPrompts: any[] = []; for (const serverInfo of filteredServerInfos) { if (serverInfo.prompts && serverInfo.prompts.length > 0) { // Filter prompts based on server configuration const serverConfig = await getServerDao().findById(serverInfo.name); let enabledPrompts = serverInfo.prompts; if (serverConfig && serverConfig.prompts) { enabledPrompts = serverInfo.prompts.filter((prompt: any) => { const promptConfig = serverConfig.prompts?.[prompt.name]; // If prompt is not in config, it's enabled by default return promptConfig?.enabled !== false; }); } // If this is a group request, apply group-level prompt filtering if (group) { const serverConfigInGroup = await getServerConfigInGroup(group, serverInfo.name); if ( serverConfigInGroup && serverConfigInGroup.tools !== 'all' && Array.isArray(serverConfigInGroup.tools) ) { // Note: Group config uses 'tools' field but we're filtering prompts here // This might be a design decision to control access at the server level } } // Apply custom descriptions from server configuration const promptsWithCustomDescriptions = enabledPrompts.map((prompt: any) => { const promptConfig = serverConfig?.prompts?.[prompt.name]; return { ...prompt, description: promptConfig?.description || prompt.description, // Use custom description if available }; }); allPrompts.push(...promptsWithCustomDescriptions); } } return { prompts: allPrompts, }; }; // Create McpServer instance export const createMcpServer = (name: string, version: string, group?: string): Server => { // Determine server name based on routing type let serverName = name; if (group) { // For createMcpServer we use sync approach since it's called synchronously // The actual group validation happens at request time serverName = `${name}_${group}_group`; } // If no group, use default name (global routing) const server = new Server( { name: serverName, version }, { capabilities: { tools: {}, prompts: {}, resources: {} } }, ); server.setRequestHandler(ListToolsRequestSchema, handleListToolsRequest); server.setRequestHandler(CallToolRequestSchema, handleCallToolRequest); server.setRequestHandler(GetPromptRequestSchema, handleGetPromptRequest); server.setRequestHandler(ListPromptsRequestSchema, handleListPromptsRequest); return server; }; // Filter tools based on group configuration async function filterToolsByGroup(group: string | undefined, serverName: string, tools: Tool[]) { if (group) { const serverConfig = await getServerConfigInGroup(group, serverName); if (serverConfig && serverConfig.tools !== 'all' && Array.isArray(serverConfig.tools)) { // Filter tools based on group configuration const allowedToolNames = serverConfig.tools.map( (toolName: string) => `${serverName}${getNameSeparator()}${toolName}`, ); tools = tools.filter((tool) => allowedToolNames.includes(tool.name)); } } return tools; }