fix: Bad Request: No valid session ID provided (#405) (#427)

This commit is contained in:
cheestard
2025-11-19 18:17:37 +08:00
committed by GitHub
parent 07adeab036
commit 1869f283ba
12 changed files with 459 additions and 38 deletions

View File

@@ -10,7 +10,10 @@ import config from '../config/index.js';
import { UserContextService } from './userContextService.js';
import { RequestContextService } from './requestContextService.js';
const transports: { [sessionId: string]: { transport: Transport; group: string } } = {};
export const transports: { [sessionId: string]: { transport: Transport; group: string; needsInitialization?: boolean } } = {};
// Session creation locks to prevent concurrent session creation conflicts
const sessionCreationLocks: { [sessionId: string]: Promise<StreamableHTTPServerTransport> } = {};
export const getGroup = (sessionId: string): string => {
return transports[sessionId]?.group || '';
@@ -144,6 +147,76 @@ export const handleSseMessage = async (req: Request, res: Response): Promise<voi
}
};
// Helper function to create a session with a specific sessionId
async function createSessionWithId(sessionId: string, group: string, username?: string): Promise<StreamableHTTPServerTransport> {
console.log(`[SESSION REBUILD] Starting session rebuild for ID: ${sessionId}${username ? ` for user: ${username}` : ''}`);
// Create a new server instance to ensure clean state
const server = getMcpServer(sessionId, group);
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => sessionId, // Use the specified sessionId
onsessioninitialized: (initializedSessionId) => {
console.log(`[SESSION REBUILD] onsessioninitialized triggered for ID: ${initializedSessionId}`); // New log
if (initializedSessionId === sessionId) {
transports[sessionId] = { transport, group };
console.log(`[SESSION REBUILD] Session ${sessionId} initialized successfully${username ? ` for user: ${username}` : ''}`);
} else {
console.warn(`[SESSION REBUILD] Session ID mismatch: expected ${sessionId}, got ${initializedSessionId}`);
}
},
});
transport.onclose = () => {
console.log(`[SESSION REBUILD] Transport closed: ${sessionId}`);
delete transports[sessionId];
deleteMcpServer(sessionId);
};
// Connect to MCP server
await server.connect(transport);
// Wait for the server to fully initialize
await new Promise(resolve => setTimeout(resolve, 500));
// Ensure the transport is properly initialized
if (!transports[sessionId]) {
console.warn(`[SESSION REBUILD] Transport not found in transports after initialization, forcing registration`);
transports[sessionId] = { transport, group, needsInitialization: true };
} else {
// Mark the session as needing initialization
transports[sessionId].needsInitialization = true;
}
console.log(`[SESSION REBUILD] Session ${sessionId} created but not yet initialized. It will be initialized on first use.`);
console.log(`[SESSION REBUILD] Successfully rebuilt session ${sessionId} in group: ${group}`);
return transport;
}
// Helper function to create a completely new session
async function createNewSession(group: string, username?: string): Promise<StreamableHTTPServerTransport> {
const newSessionId = randomUUID();
console.log(`[SESSION NEW] Creating new session with ID: ${newSessionId}${username ? ` for user: ${username}` : ''}`);
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => newSessionId,
onsessioninitialized: (sessionId) => {
transports[sessionId] = { transport, group };
console.log(`[SESSION NEW] New session ${sessionId} initialized successfully${username ? ` for user: ${username}` : ''}`);
},
});
transport.onclose = () => {
console.log(`[SESSION NEW] Transport closed: ${newSessionId}`);
delete transports[newSessionId];
deleteMcpServer(newSessionId);
};
await getMcpServer(newSessionId, group).connect(transport);
console.log(`[SESSION NEW] Successfully created new session ${newSessionId} in group: ${group}`);
return transport;
}
export const handleMcpPostRequest = async (req: Request, res: Response): Promise<void> => {
// User context is now set by sseUserContextMiddleware
const userContextService = UserContextService.getInstance();
@@ -175,31 +248,63 @@ export const handleMcpPostRequest = async (req: Request, res: Response): Promise
}
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
console.log(`Reusing existing transport for sessionId: ${sessionId}`);
transport = transports[sessionId].transport as StreamableHTTPServerTransport;
} else if (!sessionId && isInitializeRequest(req.body)) {
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
transports[sessionId] = { transport, group };
},
});
transport.onclose = () => {
console.log(`Transport closed: ${transport.sessionId}`);
if (transport.sessionId) {
delete transports[transport.sessionId];
deleteMcpServer(transport.sessionId);
console.log(`MCP connection closed: ${transport.sessionId}`);
let transportInfo: typeof transports[string] | undefined;
if (sessionId) {
transportInfo = transports[sessionId];
}
if (sessionId && transportInfo) {
// Case 1: Session exists and is valid, reuse it
console.log(`[SESSION REUSE] Reusing existing session: ${sessionId}${username ? ` for user: ${username}` : ''}`);
transport = transportInfo.transport as StreamableHTTPServerTransport;
} else if (sessionId) {
// Case 2: SessionId exists but transport is missing (server restart), check if session rebuild is enabled
const settings = loadSettings();
const enableSessionRebuild = settings.systemConfig?.enableSessionRebuild || false;
if (enableSessionRebuild) {
console.log(`[SESSION AUTO-REBUILD] Session ${sessionId} not found, initiating transparent rebuild${username ? ` for user: ${username}` : ''}`);
// Prevent concurrent session creation
if (sessionCreationLocks[sessionId] !== undefined) {
console.log(`[SESSION AUTO-REBUILD] Session creation in progress for ${sessionId}, waiting...`);
transport = await sessionCreationLocks[sessionId];
} else {
sessionCreationLocks[sessionId] = createSessionWithId(sessionId, group, username);
try {
transport = await sessionCreationLocks[sessionId];
console.log(`[SESSION AUTO-REBUILD] Successfully transparently rebuilt session: ${sessionId}`);
} catch (error) {
console.error(`[SESSION AUTO-REBUILD] Failed to rebuild session ${sessionId}:`, error);
throw error;
} finally {
delete sessionCreationLocks[sessionId];
}
}
};
console.log(
`MCP connection established: ${transport.sessionId}${username ? ` for user: ${username}` : ''}`,
);
await getMcpServer(transport.sessionId, group).connect(transport);
// Get the updated transport info after rebuild
if (sessionId) {
transportInfo = transports[sessionId];
}
} else {
// Session rebuild is disabled, return error
console.warn(`[SESSION ERROR] Session ${sessionId} not found and session rebuild is disabled${username ? ` for user: ${username}` : ''}`);
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}
} else if (isInitializeRequest(req.body)) {
// Case 3: No sessionId and this is an initialize request, create new session
console.log(`[SESSION CREATE] No session ID provided for initialize request, creating new session${username ? ` for user: ${username}` : ''}`);
transport = await createNewSession(group, username);
} else {
// Case 4: No sessionId and not an initialize request, return error
console.warn(`[SESSION ERROR] No session ID provided for non-initialize request (method: ${req.body?.method})${username ? ` for user: ${username}` : ''}`);
res.status(400).json({
jsonrpc: '2.0',
error: {
@@ -217,8 +322,118 @@ export const handleMcpPostRequest = async (req: Request, res: Response): Promise
const requestContextService = RequestContextService.getInstance();
requestContextService.setRequestContext(req);
// Check if the session needs initialization (for rebuilt sessions)
if (transportInfo && transportInfo.needsInitialization) {
console.log(`[MCP] Session ${sessionId} needs initialization, performing proactive initialization`);
try {
// Create a mock response object that doesn't actually send headers
const mockRes = {
writeHead: () => {},
end: () => {},
json: () => {},
status: () => mockRes,
send: () => {},
headersSent: false
} as any;
// First, send the initialize request
const initializeRequest = {
method: 'initialize',
params: {
protocolVersion: '2025-03-26',
capabilities: {},
clientInfo: {
name: 'MCPHub-Client',
version: '1.0.0'
}
},
jsonrpc: '2.0',
id: `init-${sessionId}-${Date.now()}`
};
console.log(`[MCP] Sending initialize request for session ${sessionId}`);
// Use mock response to avoid sending actual HTTP response
await transport.handleRequest(req, mockRes, initializeRequest);
// Then send the initialized notification
const initializedNotification = {
method: 'notifications/initialized',
jsonrpc: '2.0'
};
console.log(`[MCP] Sending initialized notification for session ${sessionId}`);
await transport.handleRequest(req, mockRes, initializedNotification);
// Mark the session as initialized
transportInfo.needsInitialization = false;
console.log(`[MCP] Session ${sessionId} successfully initialized`);
} catch (initError) {
console.error(`[MCP] Failed to initialize session ${sessionId}:`, initError);
console.error(`[MCP] Initialization error details:`, initError);
// Don't return here, continue with the original request
}
}
try {
await transport.handleRequest(req, res, req.body);
} catch (error: any) {
// Check if this is a "Server not initialized" error for a newly rebuilt session
if (sessionId && error.message && error.message.includes('Server not initialized')) {
console.log(`[SESSION AUTO-REBUILD] Server not initialized for ${sessionId}. Attempting to initialize with the current request.`);
// Check if the current request is an 'initialize' request
if (isInitializeRequest(req.body)) {
// If it is, we can just retry it. The transport should now be in the transports map.
console.log(`[SESSION AUTO-REBUILD] Retrying initialize request for ${sessionId}.`);
await transport.handleRequest(req, res, req.body);
} else {
// If not, we need to send an initialize request first.
// We construct a mock initialize request, but use the REAL req/res objects.
const initializeRequest = {
method: 'initialize',
params: {
protocolVersion: '2025-03-26',
capabilities: {},
clientInfo: {
name: 'MCPHub-Client',
version: '1.0.0'
}
},
jsonrpc: '2.0',
id: `init-${sessionId}-${Date.now()}`
};
console.log(`[SESSION AUTO-REBUILD] Sending initialize request for ${sessionId} before handling the actual request.`);
try {
// Temporarily replace the body to send the initialize request
const originalBody = req.body;
req.body = initializeRequest;
await transport.handleRequest(req, res, req.body);
// Now, send the notifications/initialized
const initializedNotification = {
method: 'notifications/initialized',
jsonrpc: '2.0'
};
req.body = initializedNotification;
await transport.handleRequest(req, res, req.body);
// Restore the original body and retry the original request
req.body = originalBody;
console.log(`[SESSION AUTO-REBUILD] Initialization complete for ${sessionId}. Retrying original request.`);
await transport.handleRequest(req, res, req.body);
} catch (initError) {
console.error(`[SESSION AUTO-REBUILD] Failed to initialize session ${sessionId} on-the-fly:`, initError);
// Re-throw the original error if initialization fails
throw error;
}
}
} else {
// If it's a different error, just re-throw it
throw error;
}
} finally {
// Clean up request context after handling
requestContextService.clearRequestContext();
@@ -240,12 +455,51 @@ export const handleMcpOtherRequest = async (req: Request, res: Response) => {
}
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
if (!sessionId) {
res.status(400).send('Invalid or missing session ID');
return;
}
const { transport } = transports[sessionId];
let transportEntry = transports[sessionId];
// If session doesn't exist, attempt transparent rebuild if enabled
if (!transportEntry) {
const settings = loadSettings();
const enableSessionRebuild = settings.systemConfig?.enableSessionRebuild || false;
if (enableSessionRebuild) {
console.log(`[SESSION AUTO-REBUILD] Session ${sessionId} not found in handleMcpOtherRequest, initiating transparent rebuild`);
try {
// Check if user context exists
if (!currentUser) {
res.status(401).send('User context not found');
return;
}
// Create session with same ID using existing function
const group = req.params.group;
const rebuiltSession = await createSessionWithId(sessionId, group, currentUser.username);
if (rebuiltSession) {
console.log(`[SESSION AUTO-REBUILD] Successfully transparently rebuilt session: ${sessionId}`);
transportEntry = transports[sessionId];
}
} catch (error) {
console.error(`[SESSION AUTO-REBUILD] Failed to rebuild session ${sessionId}:`, error);
}
} else {
console.warn(`[SESSION ERROR] Session ${sessionId} not found and session rebuild is disabled in handleMcpOtherRequest`);
res.status(400).send('Invalid or missing session ID');
return;
}
}
if (!transportEntry) {
res.status(400).send('Invalid or missing session ID');
return;
}
const { transport } = transportEntry;
await (transport as StreamableHTTPServerTransport).handleRequest(req, res);
};