mirror of
https://github.com/samanhappy/mcphub.git
synced 2025-12-24 02:39:19 -05:00
feat: Enhance Keep-Alive configuration handling (#455)
This commit is contained in:
@@ -95,6 +95,11 @@ const ServerForm = ({
|
||||
undefined,
|
||||
},
|
||||
oauth: getInitialOAuthConfig(initialData),
|
||||
// KeepAlive configuration initialization
|
||||
keepAlive: {
|
||||
enabled: initialData?.config?.enableKeepAlive || false,
|
||||
interval: initialData?.config?.keepAliveInterval || 60000,
|
||||
},
|
||||
// OpenAPI configuration initialization
|
||||
openapi:
|
||||
initialData && initialData.config && initialData.config.openapi
|
||||
@@ -151,6 +156,7 @@ const ServerForm = ({
|
||||
|
||||
const [isRequestOptionsExpanded, setIsRequestOptionsExpanded] = useState<boolean>(false);
|
||||
const [isOAuthSectionExpanded, setIsOAuthSectionExpanded] = useState<boolean>(false);
|
||||
const [isKeepAliveSectionExpanded, setIsKeepAliveSectionExpanded] = useState<boolean>(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const isEdit = !!initialData;
|
||||
|
||||
@@ -377,6 +383,15 @@ const ServerForm = ({
|
||||
env: Object.keys(env).length > 0 ? env : undefined,
|
||||
}),
|
||||
...(Object.keys(options).length > 0 ? { options } : {}),
|
||||
// KeepAlive configuration (only for SSE/streamable-http types)
|
||||
...(serverType === 'sse' || serverType === 'streamable-http'
|
||||
? {
|
||||
enableKeepAlive: formData.keepAlive?.enabled || false,
|
||||
...(formData.keepAlive?.enabled
|
||||
? { keepAliveInterval: formData.keepAlive.interval || 60000 }
|
||||
: {}),
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -1255,6 +1270,86 @@ const ServerForm = ({
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* KeepAlive Configuration - only for SSE/Streamable HTTP */}
|
||||
{(serverType === 'sse' || serverType === 'streamable-http') && (
|
||||
<div className="mb-4">
|
||||
<div
|
||||
className="flex items-center justify-between cursor-pointer bg-gray-50 hover:bg-gray-100 p-3 rounded border border-gray-200"
|
||||
onClick={() => setIsKeepAliveSectionExpanded(!isKeepAliveSectionExpanded)}
|
||||
>
|
||||
<label className="text-gray-700 text-sm font-bold">
|
||||
{t('server.keepAlive', 'Keep-Alive')}
|
||||
</label>
|
||||
<span className="text-gray-500 text-sm">
|
||||
{isKeepAliveSectionExpanded ? '▼' : '▶'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
{isKeepAliveSectionExpanded && (
|
||||
<div className="border border-gray-200 rounded-b p-4 bg-gray-50 border-t-0">
|
||||
<div className="flex items-center mb-3">
|
||||
<input
|
||||
type="checkbox"
|
||||
id="enableKeepAlive"
|
||||
checked={formData.keepAlive?.enabled || false}
|
||||
onChange={(e) =>
|
||||
setFormData((prev) => ({
|
||||
...prev,
|
||||
keepAlive: {
|
||||
...prev.keepAlive,
|
||||
enabled: e.target.checked,
|
||||
},
|
||||
}))
|
||||
}
|
||||
className="mr-2"
|
||||
/>
|
||||
<label htmlFor="enableKeepAlive" className="text-gray-600 text-sm">
|
||||
{t('server.enableKeepAlive', 'Enable Keep-Alive')}
|
||||
</label>
|
||||
</div>
|
||||
<p className="text-xs text-gray-500 mb-3">
|
||||
{t(
|
||||
'server.keepAliveDescription',
|
||||
'Send periodic ping requests to maintain the connection. Useful for long-running connections that may timeout.',
|
||||
)}
|
||||
</p>
|
||||
<div>
|
||||
<label
|
||||
className="block text-gray-600 text-sm font-medium mb-1"
|
||||
htmlFor="keepAliveInterval"
|
||||
>
|
||||
{t('server.keepAliveInterval', 'Interval (ms)')}
|
||||
</label>
|
||||
<input
|
||||
type="number"
|
||||
id="keepAliveInterval"
|
||||
value={formData.keepAlive?.interval || 60000}
|
||||
onChange={(e) =>
|
||||
setFormData((prev) => ({
|
||||
...prev,
|
||||
keepAlive: {
|
||||
...prev.keepAlive,
|
||||
interval: parseInt(e.target.value) || 60000,
|
||||
},
|
||||
}))
|
||||
}
|
||||
className="shadow appearance-none border rounded w-full py-2 px-3 text-gray-700 leading-tight focus:outline-none focus:shadow-outline form-input"
|
||||
placeholder="60000"
|
||||
min="5000"
|
||||
max="300000"
|
||||
/>
|
||||
<p className="text-xs text-gray-500 mt-1">
|
||||
{t(
|
||||
'server.keepAliveIntervalDescription',
|
||||
'Time between keep-alive pings in milliseconds (default: 60000ms = 1 minute)',
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
|
||||
<div className="flex justify-end mt-6">
|
||||
<button
|
||||
type="button"
|
||||
|
||||
@@ -114,6 +114,8 @@ export interface ServerConfig {
|
||||
env?: Record<string, string>;
|
||||
headers?: Record<string, string>;
|
||||
enabled?: boolean;
|
||||
enableKeepAlive?: boolean; // Enable keep-alive for this server (requires global enable as well)
|
||||
keepAliveInterval?: number; // Keep-alive ping interval in milliseconds (default: 60000ms)
|
||||
tools?: Record<string, { enabled: boolean; description?: string }>; // Tool-specific configurations with enable/disable state and custom descriptions
|
||||
prompts?: Record<string, { enabled: boolean; description?: string }>; // Prompt-specific configurations with enable/disable state and custom descriptions
|
||||
options?: {
|
||||
@@ -250,6 +252,10 @@ export interface ServerFormData {
|
||||
resetTimeoutOnProgress?: boolean;
|
||||
maxTotalTimeout?: number;
|
||||
};
|
||||
keepAlive?: {
|
||||
enabled?: boolean;
|
||||
interval?: number;
|
||||
};
|
||||
oauth?: {
|
||||
clientId?: string;
|
||||
clientSecret?: string;
|
||||
|
||||
@@ -123,6 +123,11 @@
|
||||
"maxTotalTimeoutDescription": "Maximum total timeout for requests sent to the MCP server (ms) (Use with progress notifications)",
|
||||
"resetTimeoutOnProgress": "Reset Timeout on Progress",
|
||||
"resetTimeoutOnProgressDescription": "Reset timeout on progress notifications",
|
||||
"keepAlive": "Keep-Alive Configuration",
|
||||
"enableKeepAlive": "Enable Keep-Alive",
|
||||
"keepAliveDescription": "Send periodic ping requests to maintain the connection. Useful for long-running connections that may timeout.",
|
||||
"keepAliveInterval": "Interval (ms)",
|
||||
"keepAliveIntervalDescription": "Time between keep-alive pings in milliseconds (default: 60000ms = 1 minute)",
|
||||
"remove": "Remove",
|
||||
"toggleError": "Failed to toggle server {{serverName}}",
|
||||
"alreadyExists": "Server {{serverName}} already exists",
|
||||
@@ -787,4 +792,4 @@
|
||||
"internalErrorMessage": "An unexpected error occurred while processing the OAuth callback.",
|
||||
"closeWindow": "Close Window"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,6 +123,11 @@
|
||||
"maxTotalTimeoutDescription": "Délai d'attente total maximum pour les requêtes envoyées au serveur MCP (ms) (à utiliser avec les notifications de progression)",
|
||||
"resetTimeoutOnProgress": "Réinitialiser le délai d'attente en cas de progression",
|
||||
"resetTimeoutOnProgressDescription": "Réinitialiser le délai d'attente lors des notifications de progression",
|
||||
"keepAlive": "Configuration du maintien de connexion",
|
||||
"enableKeepAlive": "Activer le maintien de connexion",
|
||||
"keepAliveDescription": "Envoyer des requêtes ping périodiques pour maintenir la connexion. Utile pour les connexions de longue durée qui peuvent expirer.",
|
||||
"keepAliveInterval": "Intervalle (ms)",
|
||||
"keepAliveIntervalDescription": "Temps entre les pings de maintien de connexion en millisecondes (par défaut : 60000ms = 1 minute)",
|
||||
"remove": "Retirer",
|
||||
"toggleError": "Échec du basculement du serveur {{serverName}}",
|
||||
"alreadyExists": "Le serveur {{serverName}} existe déjà",
|
||||
@@ -787,4 +792,4 @@
|
||||
"internalErrorMessage": "Une erreur inattendue s'est produite lors du traitement du callback OAuth.",
|
||||
"closeWindow": "Fermer la fenêtre"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,6 +123,11 @@
|
||||
"maxTotalTimeoutDescription": "MCP sunucusuna gönderilen istekler için maksimum toplam zaman aşımı (ms) (İlerleme bildirimleriyle kullanın)",
|
||||
"resetTimeoutOnProgress": "İlerlemede Zaman Aşımını Sıfırla",
|
||||
"resetTimeoutOnProgressDescription": "İlerleme bildirimlerinde zaman aşımını sıfırla",
|
||||
"keepAlive": "Bağlantı Canlı Tutma Yapılandırması",
|
||||
"enableKeepAlive": "Bağlantı Canlı Tutmayı Etkinleştir",
|
||||
"keepAliveDescription": "Bağlantıyı korumak için periyodik ping istekleri gönderin. Zaman aşımına uğrayabilecek uzun süreli bağlantılar için yararlıdır.",
|
||||
"keepAliveInterval": "Aralık (ms)",
|
||||
"keepAliveIntervalDescription": "Canlı tutma pingleri arasındaki süre milisaniye cinsinden (varsayılan: 60000ms = 1 dakika)",
|
||||
"remove": "Kaldır",
|
||||
"toggleError": "{{serverName}} sunucusu açılamadı/kapatılamadı",
|
||||
"alreadyExists": "{{serverName}} sunucusu zaten mevcut",
|
||||
@@ -787,4 +792,4 @@
|
||||
"internalErrorMessage": "OAuth geri araması işlenirken beklenmeyen bir hata oluştu.",
|
||||
"closeWindow": "Pencereyi Kapat"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,6 +123,11 @@
|
||||
"maxTotalTimeoutDescription": "无论是否有进度通知的最大总超时时间(毫秒)",
|
||||
"resetTimeoutOnProgress": "收到进度通知时重置超时",
|
||||
"resetTimeoutOnProgressDescription": "适用于发送周期性进度更新的长时间运行操作",
|
||||
"keepAlive": "保活配置",
|
||||
"enableKeepAlive": "启用保活",
|
||||
"keepAliveDescription": "定期发送 ping 请求以维持连接。适用于可能超时的长期连接。",
|
||||
"keepAliveInterval": "间隔时间(毫秒)",
|
||||
"keepAliveIntervalDescription": "保活 ping 的时间间隔(默认:60000毫秒 = 1分钟)",
|
||||
"remove": "移除",
|
||||
"toggleError": "切换服务器 {{serverName}} 状态失败",
|
||||
"alreadyExists": "服务器 {{serverName}} 已经存在",
|
||||
@@ -789,4 +794,4 @@
|
||||
"internalErrorMessage": "处理 OAuth 回调时发生意外错误。",
|
||||
"closeWindow": "关闭窗口"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -32,6 +32,7 @@ export class ServerDaoDbImpl implements ServerDao {
|
||||
headers: entity.headers,
|
||||
enabled: entity.enabled !== undefined ? entity.enabled : true,
|
||||
owner: entity.owner,
|
||||
enableKeepAlive: entity.enableKeepAlive,
|
||||
keepAliveInterval: entity.keepAliveInterval,
|
||||
tools: entity.tools,
|
||||
prompts: entity.prompts,
|
||||
@@ -41,7 +42,10 @@ export class ServerDaoDbImpl implements ServerDao {
|
||||
return this.mapToServerConfig(server);
|
||||
}
|
||||
|
||||
async update(name: string, entity: Partial<ServerConfigWithName>): Promise<ServerConfigWithName | null> {
|
||||
async update(
|
||||
name: string,
|
||||
entity: Partial<ServerConfigWithName>,
|
||||
): Promise<ServerConfigWithName | null> {
|
||||
const server = await this.repository.update(name, {
|
||||
type: entity.type,
|
||||
url: entity.url,
|
||||
@@ -51,6 +55,7 @@ export class ServerDaoDbImpl implements ServerDao {
|
||||
headers: entity.headers,
|
||||
enabled: entity.enabled,
|
||||
owner: entity.owner,
|
||||
enableKeepAlive: entity.enableKeepAlive,
|
||||
keepAliveInterval: entity.keepAliveInterval,
|
||||
tools: entity.tools,
|
||||
prompts: entity.prompts,
|
||||
@@ -118,6 +123,7 @@ export class ServerDaoDbImpl implements ServerDao {
|
||||
headers?: Record<string, string>;
|
||||
enabled: boolean;
|
||||
owner?: string;
|
||||
enableKeepAlive?: boolean;
|
||||
keepAliveInterval?: number;
|
||||
tools?: Record<string, { enabled: boolean; description?: string }>;
|
||||
prompts?: Record<string, { enabled: boolean; description?: string }>;
|
||||
@@ -134,6 +140,7 @@ export class ServerDaoDbImpl implements ServerDao {
|
||||
headers: server.headers,
|
||||
enabled: server.enabled,
|
||||
owner: server.owner,
|
||||
enableKeepAlive: server.enableKeepAlive,
|
||||
keepAliveInterval: server.keepAliveInterval,
|
||||
tools: server.tools,
|
||||
prompts: server.prompts,
|
||||
|
||||
@@ -41,6 +41,9 @@ export class Server {
|
||||
@Column({ type: 'varchar', length: 255, nullable: true })
|
||||
owner?: string;
|
||||
|
||||
@Column({ type: 'boolean', default: false })
|
||||
enableKeepAlive?: boolean;
|
||||
|
||||
@Column({ type: 'int', nullable: true })
|
||||
keepAliveInterval?: number;
|
||||
|
||||
|
||||
73
src/services/keepAliveService.ts
Normal file
73
src/services/keepAliveService.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
|
||||
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
|
||||
import { ServerInfo, ServerConfig } from '../types/index.js';
|
||||
|
||||
export interface KeepAliveOptions {
|
||||
enabled?: boolean;
|
||||
intervalMs?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up keep-alive ping for MCP client connections (SSE or Streamable HTTP).
|
||||
* Keepalive is controlled per-server via `serverConfig.enableKeepAlive` (default off).
|
||||
*/
|
||||
export const setupClientKeepAlive = async (
|
||||
serverInfo: ServerInfo,
|
||||
serverConfig: ServerConfig,
|
||||
): Promise<void> => {
|
||||
// Only set up keep-alive for SSE or Streamable HTTP client transports
|
||||
const isSSE = serverInfo.transport instanceof SSEClientTransport;
|
||||
const isStreamableHttp = serverInfo.transport instanceof StreamableHTTPClientTransport;
|
||||
if (!isSSE && !isStreamableHttp) {
|
||||
return;
|
||||
}
|
||||
|
||||
const enabled = serverConfig.enableKeepAlive === true;
|
||||
if (!enabled) {
|
||||
// Ensure any previous timer is cleared
|
||||
if (serverInfo.keepAliveIntervalId) {
|
||||
clearInterval(serverInfo.keepAliveIntervalId as NodeJS.Timeout);
|
||||
serverInfo.keepAliveIntervalId = undefined;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Clear any existing interval first
|
||||
if (serverInfo.keepAliveIntervalId) {
|
||||
clearInterval(serverInfo.keepAliveIntervalId as NodeJS.Timeout);
|
||||
}
|
||||
|
||||
// Default interval: 60 seconds
|
||||
const interval = serverConfig.keepAliveInterval || 60000;
|
||||
|
||||
serverInfo.keepAliveIntervalId = setInterval(async () => {
|
||||
try {
|
||||
if (serverInfo.client && serverInfo.status === 'connected') {
|
||||
// Use client.ping() if available, otherwise fallback to listTools
|
||||
if (typeof (serverInfo.client as any).ping === 'function') {
|
||||
await (serverInfo.client as any).ping();
|
||||
console.log(`Keep-alive ping successful for server: ${serverInfo.name}`);
|
||||
} else {
|
||||
await serverInfo.client.listTools({ timeout: 5000 }).catch(() => void 0);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn(`Keep-alive ping failed for server ${serverInfo.name}:`, error);
|
||||
}
|
||||
}, interval);
|
||||
|
||||
console.log(
|
||||
`Keep-alive enabled for server ${serverInfo.name} at ${Math.round(interval / 1000)}s interval`,
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* Clear keep-alive timer for a server.
|
||||
*/
|
||||
export const clearClientKeepAlive = (serverInfo: ServerInfo): void => {
|
||||
if (serverInfo.keepAliveIntervalId) {
|
||||
clearInterval(serverInfo.keepAliveIntervalId as NodeJS.Timeout);
|
||||
serverInfo.keepAliveIntervalId = undefined;
|
||||
console.log(`Cleared keep-alive interval for server: ${serverInfo.name}`);
|
||||
}
|
||||
};
|
||||
@@ -29,37 +29,7 @@ import { createOAuthProvider } from './mcpOAuthProvider.js';
|
||||
|
||||
const servers: { [sessionId: string]: Server } = {};
|
||||
|
||||
// Helper function to set up keep-alive ping for SSE connections
|
||||
const setupKeepAlive = (serverInfo: ServerInfo, serverConfig: ServerConfig): void => {
|
||||
// Only set up keep-alive for SSE connections
|
||||
if (!(serverInfo.transport instanceof SSEClientTransport)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Clear any existing interval first
|
||||
if (serverInfo.keepAliveIntervalId) {
|
||||
clearInterval(serverInfo.keepAliveIntervalId);
|
||||
}
|
||||
|
||||
// Use configured interval or default to 60 seconds for SSE
|
||||
const interval = serverConfig.keepAliveInterval || 60000;
|
||||
|
||||
serverInfo.keepAliveIntervalId = setInterval(async () => {
|
||||
try {
|
||||
if (serverInfo.client && serverInfo.status === 'connected') {
|
||||
await serverInfo.client.ping();
|
||||
console.log(`Keep-alive ping successful for server: ${serverInfo.name}`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn(`Keep-alive ping failed for server ${serverInfo.name}:`, error);
|
||||
// TODO Consider handling reconnection logic here if needed
|
||||
}
|
||||
}, interval);
|
||||
|
||||
console.log(
|
||||
`Keep-alive ping set up for server ${serverInfo.name} with interval ${interval / 1000} seconds`,
|
||||
);
|
||||
};
|
||||
import { setupClientKeepAlive } from './keepAliveService.js';
|
||||
|
||||
export const initUpstreamServers = async (): Promise<void> => {
|
||||
// Initialize OAuth clients for servers with dynamic registration
|
||||
@@ -596,9 +566,10 @@ export const initializeClientsFromSettings = async (
|
||||
if (!dataError) {
|
||||
serverInfo.status = 'connected';
|
||||
serverInfo.error = null;
|
||||
|
||||
// Set up keep-alive ping for SSE connections
|
||||
setupKeepAlive(serverInfo, expandedConf);
|
||||
// Set up keep-alive ping for SSE connections via shared service
|
||||
setupClientKeepAlive(serverInfo, expandedConf).catch((e) =>
|
||||
console.warn(`Keepalive setup failed for ${name}:`, e),
|
||||
);
|
||||
} else {
|
||||
serverInfo.status = 'disconnected';
|
||||
serverInfo.error = `Failed to list data: ${dataError} `;
|
||||
@@ -812,10 +783,9 @@ export const addOrUpdateServer = async (
|
||||
return { success: false, message: 'Server name already exists' };
|
||||
}
|
||||
|
||||
// If overriding and this is a DXT server (stdio type with file paths),
|
||||
// we might want to clean up old files in the future
|
||||
if (exists && config.type === 'stdio') {
|
||||
// Close existing server connections
|
||||
// If overriding an existing server, close connections and clear keep-alive timers
|
||||
if (exists) {
|
||||
// Close existing server connections (clears keep-alive intervals as well)
|
||||
closeServer(name);
|
||||
// Remove from server infos
|
||||
serverInfos = serverInfos.filter((serverInfo) => serverInfo.name !== name);
|
||||
|
||||
@@ -215,25 +215,7 @@ export const handleSseConnection = async (req: Request, res: Response): Promise<
|
||||
const transport = new SSEServerTransport(messagesPath, res);
|
||||
transports[transport.sessionId] = { transport, group: group };
|
||||
|
||||
// Send keepalive ping every 30 seconds to prevent client from closing connection
|
||||
const keepAlive = setInterval(() => {
|
||||
try {
|
||||
// Send a ping notification to keep the connection alive
|
||||
transport.send({ jsonrpc: '2.0', method: 'ping' });
|
||||
console.log(`Sent keepalive ping for SSE session: ${transport.sessionId}`);
|
||||
} catch (e) {
|
||||
// If sending a ping fails, the connection is likely broken.
|
||||
// Log the error and clear the interval to prevent further attempts.
|
||||
console.warn(
|
||||
`Failed to send keepalive ping for SSE session ${transport.sessionId}, cleaning up interval:`,
|
||||
e,
|
||||
);
|
||||
clearInterval(keepAlive);
|
||||
}
|
||||
}, 30000); // Send ping every 30 seconds
|
||||
|
||||
res.on('close', () => {
|
||||
clearInterval(keepAlive);
|
||||
delete transports[transport.sessionId];
|
||||
deleteMcpServer(transport.sessionId);
|
||||
console.log(`SSE connection closed: ${transport.sessionId}`);
|
||||
@@ -329,26 +311,8 @@ async function createSessionWithId(
|
||||
},
|
||||
});
|
||||
|
||||
// Send keepalive ping every 30 seconds to prevent client from closing connection
|
||||
const keepAlive = setInterval(() => {
|
||||
try {
|
||||
// Send a ping notification to keep the connection alive
|
||||
transport.send({ jsonrpc: '2.0', method: 'ping' });
|
||||
console.log(`Sent keepalive ping for StreamableHTTP session: ${sessionId}`);
|
||||
} catch (e) {
|
||||
// If sending a ping fails, the connection is likely broken.
|
||||
// Log the error and clear the interval to prevent further attempts.
|
||||
console.warn(
|
||||
`Failed to send keepalive ping for StreamableHTTP session ${sessionId}, cleaning up interval:`,
|
||||
e,
|
||||
);
|
||||
clearInterval(keepAlive);
|
||||
}
|
||||
}, 30000); // Send ping every 30 seconds
|
||||
|
||||
transport.onclose = () => {
|
||||
console.log(`[SESSION REBUILD] Transport closed: ${sessionId}`);
|
||||
clearInterval(keepAlive);
|
||||
delete transports[sessionId];
|
||||
deleteMcpServer(sessionId);
|
||||
};
|
||||
@@ -397,26 +361,8 @@ async function createNewSession(
|
||||
},
|
||||
});
|
||||
|
||||
// Send keepalive ping every 30 seconds to prevent client from closing connection
|
||||
const keepAlive = setInterval(() => {
|
||||
try {
|
||||
// Send a ping notification to keep the connection alive
|
||||
transport.send({ jsonrpc: '2.0', method: 'ping' });
|
||||
console.log(`Sent keepalive ping for StreamableHTTP session: ${newSessionId}`);
|
||||
} catch (e) {
|
||||
// If sending a ping fails, the connection is likely broken.
|
||||
// Log the error and clear the interval to prevent further attempts.
|
||||
console.warn(
|
||||
`Failed to send keepalive ping for StreamableHTTP session ${newSessionId}, cleaning up interval:`,
|
||||
e,
|
||||
);
|
||||
clearInterval(keepAlive);
|
||||
}
|
||||
}, 30000); // Send ping every 30 seconds
|
||||
|
||||
transport.onclose = () => {
|
||||
console.log(`[SESSION NEW] Transport closed: ${newSessionId}`);
|
||||
clearInterval(keepAlive);
|
||||
delete transports[newSessionId];
|
||||
deleteMcpServer(newSessionId);
|
||||
};
|
||||
|
||||
@@ -266,6 +266,7 @@ export interface ServerConfig {
|
||||
headers?: Record<string, string>; // HTTP headers for SSE/streamable-http/openapi servers
|
||||
enabled?: boolean; // Flag to enable/disable the server
|
||||
owner?: string; // Owner of the server, defaults to 'admin' user
|
||||
enableKeepAlive?: boolean; // Enable keep-alive for this server (requires global enable as well)
|
||||
keepAliveInterval?: number; // Keep-alive ping interval in milliseconds (default: 60000ms for SSE servers)
|
||||
tools?: Record<string, { enabled: boolean; description?: string }>; // Tool-specific configurations with enable/disable state and custom descriptions
|
||||
prompts?: Record<string, { enabled: boolean; description?: string }>; // Prompt-specific configurations with enable/disable state and custom descriptions
|
||||
|
||||
@@ -65,6 +65,7 @@ export async function migrateToDatabase(): Promise<boolean> {
|
||||
headers: config.headers,
|
||||
enabled: config.enabled !== undefined ? config.enabled : true,
|
||||
owner: config.owner,
|
||||
enableKeepAlive: config.enableKeepAlive,
|
||||
keepAliveInterval: config.keepAliveInterval,
|
||||
tools: config.tools,
|
||||
prompts: config.prompts,
|
||||
|
||||
@@ -171,100 +171,37 @@ describe('Keepalive Functionality', () => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('SSE Connection Keepalive', () => {
|
||||
it('should create a keepalive interval when establishing SSE connection', async () => {
|
||||
describe('SSE Connection (No Server-Side Keepalive)', () => {
|
||||
// Server-side keepalive was removed - keepalive is now only for upstream MCP server connections (client-side)
|
||||
// These tests verify that SSE connections work without server-side keepalive
|
||||
|
||||
it('should establish SSE connection without keepalive interval', async () => {
|
||||
await handleSseConnection(mockReq as Request, mockRes as Response);
|
||||
|
||||
// Verify setInterval was called with 30000ms (30 seconds)
|
||||
expect(global.setInterval).toHaveBeenCalledWith(expect.any(Function), 30000);
|
||||
// Verify no keepalive interval was created for server-side SSE
|
||||
expect(global.setInterval).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should send ping messages via transport', async () => {
|
||||
jest.useFakeTimers();
|
||||
|
||||
it('should register close event handler for cleanup', async () => {
|
||||
await handleSseConnection(mockReq as Request, mockRes as Response);
|
||||
|
||||
// Fast-forward time by 30 seconds
|
||||
jest.advanceTimersByTime(30000);
|
||||
|
||||
// Verify ping was sent using mockTransportInstance
|
||||
expect(mockTransportInstance.send).toHaveBeenCalledWith({
|
||||
jsonrpc: '2.0',
|
||||
method: 'ping',
|
||||
});
|
||||
|
||||
jest.useRealTimers();
|
||||
// Verify close event handler was registered
|
||||
expect(mockRes.on).toHaveBeenCalledWith('close', expect.any(Function));
|
||||
});
|
||||
|
||||
it('should send multiple pings at 30-second intervals', async () => {
|
||||
jest.useFakeTimers();
|
||||
|
||||
it('should clean up transport on connection close', async () => {
|
||||
await handleSseConnection(mockReq as Request, mockRes as Response);
|
||||
|
||||
// Fast-forward time by 90 seconds (3 intervals)
|
||||
jest.advanceTimersByTime(90000);
|
||||
|
||||
// Verify ping was sent 3 times using mockTransportInstance
|
||||
expect(mockTransportInstance.send).toHaveBeenCalledTimes(3);
|
||||
expect(mockTransportInstance.send).toHaveBeenCalledWith({
|
||||
jsonrpc: '2.0',
|
||||
method: 'ping',
|
||||
});
|
||||
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('should clear keepalive interval when connection closes', async () => {
|
||||
await handleSseConnection(mockReq as Request, mockRes as Response);
|
||||
|
||||
// Verify interval was created
|
||||
expect(global.setInterval).toHaveBeenCalled();
|
||||
const intervalsBefore = intervals.length;
|
||||
expect(intervalsBefore).toBeGreaterThan(0);
|
||||
// Verify transport was registered
|
||||
expect(transports['test-session-id']).toBeDefined();
|
||||
|
||||
// Simulate connection close
|
||||
if (eventListeners['close']) {
|
||||
eventListeners['close']();
|
||||
}
|
||||
|
||||
// Verify clearInterval was called
|
||||
expect(global.clearInterval).toHaveBeenCalled();
|
||||
expect(intervals.length).toBeLessThan(intervalsBefore);
|
||||
});
|
||||
|
||||
it('should handle ping send errors gracefully', async () => {
|
||||
jest.useFakeTimers();
|
||||
|
||||
await handleSseConnection(mockReq as Request, mockRes as Response);
|
||||
|
||||
const consoleWarnSpy = jest.spyOn(console, 'warn').mockImplementation();
|
||||
|
||||
// Make transport.send throw an error on the first call
|
||||
let callCount = 0;
|
||||
mockTransportInstance.send.mockImplementation(() => {
|
||||
callCount++;
|
||||
throw new Error('Connection broken');
|
||||
});
|
||||
|
||||
// Fast-forward time by 30 seconds (first ping)
|
||||
jest.advanceTimersByTime(30000);
|
||||
|
||||
// Verify error was logged for the first ping
|
||||
expect(consoleWarnSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('Failed to send keepalive ping'),
|
||||
expect.any(Error),
|
||||
);
|
||||
|
||||
const firstCallCount = callCount;
|
||||
|
||||
// Fast-forward time by another 30 seconds
|
||||
jest.advanceTimersByTime(30000);
|
||||
|
||||
// Verify no additional attempts were made after the error (interval was cleared)
|
||||
expect(callCount).toBe(firstCallCount);
|
||||
|
||||
consoleWarnSpy.mockRestore();
|
||||
jest.useRealTimers();
|
||||
// Verify transport was removed
|
||||
expect(transports['test-session-id']).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should not send pings after connection is closed', async () => {
|
||||
|
||||
Reference in New Issue
Block a user