feat: support Streamable HTTP transport for downstream (#32)

This commit is contained in:
samanhappy
2025-04-27 13:55:25 +08:00
committed by GitHub
parent c9ec3b77ce
commit 7f33615161
4 changed files with 80 additions and 9 deletions

View File

@@ -1,9 +1,13 @@
import { Request, Response } from 'express';
import { randomUUID } from 'node:crypto';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
import { getMcpServer } from './mcpService.js';
import { loadSettings } from '../config/index.js';
const transports: { [sessionId: string]: { transport: SSEServerTransport; group: string } } = {};
const transports: { [sessionId: string]: { transport: Transport; group: string } } = {};
export const getGroup = (sessionId: string): string => {
return transports[sessionId]?.group || '';
@@ -44,13 +48,72 @@ export const handleSseMessage = async (req: Request, res: Response): Promise<voi
req.query.group = group;
console.log(`Received message for sessionId: ${sessionId} in group: ${group}`);
if (transport) {
await transport.handlePostMessage(req, res);
await (transport as SSEServerTransport).handlePostMessage(req, res);
} else {
console.error(`No transport found for sessionId: ${sessionId}`);
res.status(400).send('No transport found for sessionId');
}
};
export const handleMcpPostRequest = async (req: Request, res: Response): Promise<void> => {
console.log('Handling MCP post request');
const sessionId = req.headers['mcp-session-id'] as string | undefined;
const group = req.params.group;
const settings = loadSettings();
const routingConfig = settings.systemConfig?.routing || {
enableGlobalRoute: true,
enableGroupNameRoute: true,
};
if (!group && !routingConfig.enableGlobalRoute) {
res.status(403).send('Global routes are disabled. Please specify a group ID.');
return;
}
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
transport = transports[sessionId].transport as StreamableHTTPServerTransport;
} else if (!sessionId && isInitializeRequest(req.body)) {
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
transports[sessionId] = { transport, group };
},
});
transport.onclose = () => {
if (transport.sessionId) {
delete transports[transport.sessionId];
}
};
await getMcpServer().connect(transport);
} else {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}
await transport.handleRequest(req, res, req.body);
};
export const handleMcpOtherRequest = async (req: Request, res: Response) => {
console.log('Handling MCP other request');
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).send('Invalid or missing session ID');
return;
}
const { transport } = transports[sessionId];
await (transport as StreamableHTTPServerTransport).handleRequest(req, res);
};
export const getConnectionCount = (): number => {
return Object.keys(transports).length;
};