mirror of
https://github.com/samanhappy/mcphub.git
synced 2025-12-24 02:39:19 -05:00
fix: resolve race conditions in initializeClientsFromSettings (#201)
This commit is contained in:
@@ -101,6 +101,187 @@ export const syncToolEmbedding = async (serverName: string, toolName: string) =>
|
||||
// Store all server information
|
||||
let serverInfos: ServerInfo[] = [];
|
||||
|
||||
// Helper function to create transport based on server configuration
|
||||
const createTransportFromConfig = (name: string, conf: ServerConfig): any => {
|
||||
let transport;
|
||||
|
||||
if (conf.type === 'streamable-http') {
|
||||
const options: any = {};
|
||||
if (conf.headers && Object.keys(conf.headers).length > 0) {
|
||||
options.requestInit = {
|
||||
headers: conf.headers,
|
||||
};
|
||||
}
|
||||
transport = new StreamableHTTPClientTransport(new URL(conf.url || ''), options);
|
||||
} else if (conf.url) {
|
||||
// SSE transport
|
||||
const options: any = {};
|
||||
if (conf.headers && Object.keys(conf.headers).length > 0) {
|
||||
options.eventSourceInit = {
|
||||
headers: conf.headers,
|
||||
};
|
||||
options.requestInit = {
|
||||
headers: conf.headers,
|
||||
};
|
||||
}
|
||||
transport = new SSEClientTransport(new URL(conf.url), options);
|
||||
} else if (conf.command && conf.args) {
|
||||
// Stdio transport
|
||||
const env: Record<string, string> = {
|
||||
...(process.env as Record<string, string>),
|
||||
...replaceEnvVars(conf.env || {}),
|
||||
};
|
||||
env['PATH'] = expandEnvVars(process.env.PATH as string) || '';
|
||||
|
||||
const settings = loadSettings();
|
||||
// Add UV_DEFAULT_INDEX and npm_config_registry if needed
|
||||
if (
|
||||
settings.systemConfig?.install?.pythonIndexUrl &&
|
||||
(conf.command === 'uvx' || conf.command === 'uv' || conf.command === 'python')
|
||||
) {
|
||||
env['UV_DEFAULT_INDEX'] = settings.systemConfig.install.pythonIndexUrl;
|
||||
}
|
||||
|
||||
if (
|
||||
settings.systemConfig?.install?.npmRegistry &&
|
||||
(conf.command === 'npm' ||
|
||||
conf.command === 'npx' ||
|
||||
conf.command === 'pnpm' ||
|
||||
conf.command === 'yarn' ||
|
||||
conf.command === 'node')
|
||||
) {
|
||||
env['npm_config_registry'] = settings.systemConfig.install.npmRegistry;
|
||||
}
|
||||
|
||||
transport = new StdioClientTransport({
|
||||
command: conf.command,
|
||||
args: conf.args,
|
||||
env: env,
|
||||
stderr: 'pipe',
|
||||
});
|
||||
transport.stderr?.on('data', (data) => {
|
||||
console.log(`[${name}] [child] ${data}`);
|
||||
});
|
||||
} else {
|
||||
throw new Error(`Unable to create transport for server: ${name}`);
|
||||
}
|
||||
|
||||
return transport;
|
||||
};
|
||||
|
||||
// Helper function to handle client.callTool with reconnection logic
|
||||
const callToolWithReconnect = async (
|
||||
serverInfo: ServerInfo,
|
||||
toolParams: any,
|
||||
options?: any,
|
||||
maxRetries: number = 1,
|
||||
): Promise<any> => {
|
||||
if (!serverInfo.client) {
|
||||
throw new Error(`Client not found for server: ${serverInfo.name}`);
|
||||
}
|
||||
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
const result = await serverInfo.client.callTool(toolParams, undefined, options || {});
|
||||
return result;
|
||||
} catch (error: any) {
|
||||
// Check if error message starts with "Error POSTing to endpoint (HTTP 40"
|
||||
const isHttp40xError = error?.message?.startsWith?.('Error POSTing to endpoint (HTTP 40');
|
||||
// Only retry for StreamableHTTPClientTransport
|
||||
const isStreamableHttp = serverInfo.transport instanceof StreamableHTTPClientTransport;
|
||||
|
||||
if (isHttp40xError && attempt < maxRetries && serverInfo.transport && isStreamableHttp) {
|
||||
console.warn(
|
||||
`HTTP 40x error detected for StreamableHTTP server ${serverInfo.name}, attempting reconnection (attempt ${attempt + 1}/${maxRetries + 1})`,
|
||||
);
|
||||
|
||||
try {
|
||||
// Close existing connection
|
||||
if (serverInfo.keepAliveIntervalId) {
|
||||
clearInterval(serverInfo.keepAliveIntervalId);
|
||||
serverInfo.keepAliveIntervalId = undefined;
|
||||
}
|
||||
|
||||
serverInfo.client.close();
|
||||
serverInfo.transport.close();
|
||||
|
||||
// Get server configuration to recreate transport
|
||||
const settings = loadSettings();
|
||||
const conf = settings.mcpServers[serverInfo.name];
|
||||
if (!conf) {
|
||||
throw new Error(`Server configuration not found for: ${serverInfo.name}`);
|
||||
}
|
||||
|
||||
// Recreate transport using helper function
|
||||
const newTransport = createTransportFromConfig(serverInfo.name, conf);
|
||||
|
||||
// Create new client
|
||||
const client = new Client(
|
||||
{
|
||||
name: `mcp-client-${serverInfo.name}`,
|
||||
version: '1.0.0',
|
||||
},
|
||||
{
|
||||
capabilities: {
|
||||
prompts: {},
|
||||
resources: {},
|
||||
tools: {},
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
// Reconnect with new transport
|
||||
await client.connect(newTransport, serverInfo.options || {});
|
||||
|
||||
// Update server info with new client and transport
|
||||
serverInfo.client = client;
|
||||
serverInfo.transport = newTransport;
|
||||
serverInfo.status = 'connected';
|
||||
|
||||
// Reload tools list after reconnection
|
||||
try {
|
||||
const tools = await client.listTools({}, serverInfo.options || {});
|
||||
serverInfo.tools = tools.tools.map((tool) => ({
|
||||
name: `${serverInfo.name}-${tool.name}`,
|
||||
description: tool.description || '',
|
||||
inputSchema: tool.inputSchema || {},
|
||||
}));
|
||||
|
||||
// Save tools as vector embeddings for search
|
||||
saveToolsAsVectorEmbeddings(serverInfo.name, serverInfo.tools);
|
||||
} catch (listToolsError) {
|
||||
console.warn(
|
||||
`Failed to reload tools after reconnection for server ${serverInfo.name}:`,
|
||||
listToolsError,
|
||||
);
|
||||
// Continue anyway, as the connection might still work for the current tool
|
||||
}
|
||||
|
||||
console.log(`Successfully reconnected to server: ${serverInfo.name}`);
|
||||
|
||||
// Continue to next attempt
|
||||
continue;
|
||||
} catch (reconnectError) {
|
||||
console.error(`Failed to reconnect to server ${serverInfo.name}:`, reconnectError);
|
||||
serverInfo.status = 'disconnected';
|
||||
serverInfo.error = `Failed to reconnect: ${reconnectError}`;
|
||||
|
||||
// If this was the last attempt, throw the original error
|
||||
if (attempt === maxRetries) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Not an HTTP 40x error or no more retries, throw the original error
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This should not be reached, but just in case
|
||||
throw new Error('Unexpected error in callToolWithReconnect');
|
||||
};
|
||||
|
||||
// Initialize MCP server clients
|
||||
export const initializeClientsFromSettings = async (isInit: boolean): Promise<ServerInfo[]> => {
|
||||
const settings = loadSettings();
|
||||
@@ -154,21 +335,21 @@ export const initializeClientsFromSettings = async (isInit: boolean): Promise<Se
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create server info first and keep reference to it
|
||||
const serverInfo: ServerInfo = {
|
||||
name,
|
||||
status: 'connecting',
|
||||
error: null,
|
||||
tools: [],
|
||||
createTime: Date.now(),
|
||||
enabled: conf.enabled === undefined ? true : conf.enabled,
|
||||
};
|
||||
serverInfos.push(serverInfo);
|
||||
|
||||
try {
|
||||
// Create OpenAPI client instance
|
||||
openApiClient = new OpenAPIClient(conf);
|
||||
|
||||
// Add server with connecting status first
|
||||
const serverInfo: ServerInfo = {
|
||||
name,
|
||||
status: 'connecting',
|
||||
error: null,
|
||||
tools: [],
|
||||
createTime: Date.now(),
|
||||
enabled: conf.enabled === undefined ? true : conf.enabled,
|
||||
};
|
||||
serverInfos.push(serverInfo);
|
||||
|
||||
console.log(`Initializing OpenAPI server: ${name}...`);
|
||||
|
||||
// Perform async initialization
|
||||
@@ -197,91 +378,13 @@ export const initializeClientsFromSettings = async (isInit: boolean): Promise<Se
|
||||
} catch (error) {
|
||||
console.error(`Failed to initialize OpenAPI server ${name}:`, error);
|
||||
|
||||
// Find and update the server info if it was already added
|
||||
const existingServerIndex = serverInfos.findIndex((s) => s.name === name);
|
||||
if (existingServerIndex !== -1) {
|
||||
serverInfos[existingServerIndex].status = 'disconnected';
|
||||
serverInfos[existingServerIndex].error = `Failed to initialize OpenAPI server: ${error}`;
|
||||
} else {
|
||||
// Add new server info with error status
|
||||
serverInfos.push({
|
||||
name,
|
||||
status: 'disconnected',
|
||||
error: `Failed to initialize OpenAPI server: ${error}`,
|
||||
tools: [],
|
||||
createTime: Date.now(),
|
||||
});
|
||||
}
|
||||
// Update the already pushed server info with error status
|
||||
serverInfo.status = 'disconnected';
|
||||
serverInfo.error = `Failed to initialize OpenAPI server: ${error}`;
|
||||
continue;
|
||||
}
|
||||
} else if (conf.type === 'streamable-http') {
|
||||
const options: any = {};
|
||||
if (conf.headers && Object.keys(conf.headers).length > 0) {
|
||||
options.requestInit = {
|
||||
headers: conf.headers,
|
||||
};
|
||||
}
|
||||
transport = new StreamableHTTPClientTransport(new URL(conf.url || ''), options);
|
||||
} else if (conf.url) {
|
||||
// Default to SSE only when 'conf.type' is not specified and 'conf.url' is available
|
||||
const options: any = {};
|
||||
if (conf.headers && Object.keys(conf.headers).length > 0) {
|
||||
options.eventSourceInit = {
|
||||
headers: conf.headers,
|
||||
};
|
||||
options.requestInit = {
|
||||
headers: conf.headers,
|
||||
};
|
||||
}
|
||||
transport = new SSEClientTransport(new URL(conf.url), options);
|
||||
} else if (conf.command && conf.args) {
|
||||
// If type is stdio or if command and args are provided without type
|
||||
const env: Record<string, string> = {
|
||||
...(process.env as Record<string, string>), // Inherit all environment variables from parent process
|
||||
...replaceEnvVars(conf.env || {}), // Override with configured env vars
|
||||
};
|
||||
env['PATH'] = expandEnvVars(process.env.PATH as string) || '';
|
||||
|
||||
// Add UV_DEFAULT_INDEX from settings if available (for Python packages)
|
||||
const settings = loadSettings(); // Add UV_DEFAULT_INDEX from settings if available (for Python packages)
|
||||
if (
|
||||
settings.systemConfig?.install?.pythonIndexUrl &&
|
||||
(conf.command === 'uvx' || conf.command === 'uv' || conf.command === 'python')
|
||||
) {
|
||||
env['UV_DEFAULT_INDEX'] = settings.systemConfig.install.pythonIndexUrl;
|
||||
}
|
||||
|
||||
// Add npm_config_registry from settings if available (for NPM packages)
|
||||
if (
|
||||
settings.systemConfig?.install?.npmRegistry &&
|
||||
(conf.command === 'npm' ||
|
||||
conf.command === 'npx' ||
|
||||
conf.command === 'pnpm' ||
|
||||
conf.command === 'yarn' ||
|
||||
conf.command === 'node')
|
||||
) {
|
||||
env['npm_config_registry'] = settings.systemConfig.install.npmRegistry;
|
||||
}
|
||||
|
||||
transport = new StdioClientTransport({
|
||||
command: conf.command,
|
||||
args: conf.args,
|
||||
env: env,
|
||||
stderr: 'pipe',
|
||||
});
|
||||
transport.stderr?.on('data', (data) => {
|
||||
console.log(`[${name}] [child] ${data}`);
|
||||
});
|
||||
} else {
|
||||
console.warn(`Skipping server '${name}': missing required configuration`);
|
||||
serverInfos.push({
|
||||
name,
|
||||
status: 'disconnected',
|
||||
error: 'Missing required configuration',
|
||||
tools: [],
|
||||
createTime: Date.now(),
|
||||
});
|
||||
continue;
|
||||
transport = createTransportFromConfig(name, conf);
|
||||
}
|
||||
|
||||
const client = new Client(
|
||||
@@ -312,6 +415,19 @@ export const initializeClientsFromSettings = async (isInit: boolean): Promise<Se
|
||||
maxTotalTimeout: serverRequestOptions.maxTotalTimeout,
|
||||
};
|
||||
|
||||
// Create server info first and keep reference to it
|
||||
const serverInfo: ServerInfo = {
|
||||
name,
|
||||
status: 'connecting',
|
||||
error: null,
|
||||
tools: [],
|
||||
client,
|
||||
transport,
|
||||
options: requestOptions,
|
||||
createTime: Date.now(),
|
||||
};
|
||||
serverInfos.push(serverInfo);
|
||||
|
||||
client
|
||||
.connect(transport, initRequestOptions || requestOptions)
|
||||
.then(() => {
|
||||
@@ -320,11 +436,6 @@ export const initializeClientsFromSettings = async (isInit: boolean): Promise<Se
|
||||
.listTools({}, initRequestOptions || requestOptions)
|
||||
.then((tools) => {
|
||||
console.log(`Successfully listed ${tools.tools.length} tools for server: ${name}`);
|
||||
const serverInfo = getServerByName(name);
|
||||
if (!serverInfo) {
|
||||
console.warn(`Server info not found for server: ${name}`);
|
||||
return;
|
||||
}
|
||||
|
||||
serverInfo.tools = tools.tools.map((tool) => ({
|
||||
name: `${name}-${tool.name}`,
|
||||
@@ -344,33 +455,17 @@ export const initializeClientsFromSettings = async (isInit: boolean): Promise<Se
|
||||
console.error(
|
||||
`Failed to list tools for server ${name} by error: ${error} with stack: ${error.stack}`,
|
||||
);
|
||||
const serverInfo = getServerByName(name);
|
||||
if (serverInfo) {
|
||||
serverInfo.status = 'disconnected';
|
||||
serverInfo.error = `Failed to list tools: ${error.stack} `;
|
||||
}
|
||||
serverInfo.status = 'disconnected';
|
||||
serverInfo.error = `Failed to list tools: ${error.stack} `;
|
||||
});
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error(
|
||||
`Failed to connect client for server ${name} by error: ${error} with stack: ${error.stack}`,
|
||||
);
|
||||
const serverInfo = getServerByName(name);
|
||||
if (serverInfo) {
|
||||
serverInfo.status = 'disconnected';
|
||||
serverInfo.error = `Failed to connect: ${error.stack} `;
|
||||
}
|
||||
serverInfo.status = 'disconnected';
|
||||
serverInfo.error = `Failed to connect: ${error.stack} `;
|
||||
});
|
||||
serverInfos.push({
|
||||
name,
|
||||
status: 'connecting',
|
||||
error: null,
|
||||
tools: [],
|
||||
client,
|
||||
transport,
|
||||
options: requestOptions,
|
||||
createTime: Date.now(),
|
||||
});
|
||||
console.log(`Initialized client for server: ${name}`);
|
||||
}
|
||||
|
||||
@@ -902,12 +997,12 @@ export const handleCallToolRequest = async (request: any, extra: any) => {
|
||||
toolName = toolName.startsWith(`${targetServerInfo.name}-`)
|
||||
? toolName.replace(`${targetServerInfo.name}-`, '')
|
||||
: toolName;
|
||||
const result = await client.callTool(
|
||||
const result = await callToolWithReconnect(
|
||||
targetServerInfo,
|
||||
{
|
||||
name: toolName,
|
||||
arguments: finalArgs,
|
||||
},
|
||||
undefined,
|
||||
targetServerInfo.options || {},
|
||||
);
|
||||
|
||||
@@ -957,7 +1052,11 @@ export const handleCallToolRequest = async (request: any, extra: any) => {
|
||||
request.params.name = request.params.name.startsWith(`${serverInfo.name}-`)
|
||||
? request.params.name.replace(`${serverInfo.name}-`, '')
|
||||
: request.params.name;
|
||||
const result = await client.callTool(request.params, undefined, serverInfo.options || {});
|
||||
const result = await callToolWithReconnect(
|
||||
serverInfo,
|
||||
request.params,
|
||||
serverInfo.options || {},
|
||||
);
|
||||
console.log(`Tool call result: ${JSON.stringify(result)}`);
|
||||
return result;
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user