Add cluster functionality with node registration, sticky sessions, and routing

Co-authored-by: samanhappy <2755122+samanhappy@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-10-26 14:09:38 +00:00
committed by samanhappy
parent fbff212005
commit f4bac3adc0
7 changed files with 1401 additions and 7 deletions

View File

@@ -0,0 +1,240 @@
/**
* Cluster Controller
*
* Handles cluster-related API endpoints:
* - Node registration
* - Heartbeat updates
* - Cluster status queries
* - Session affinity management
*/
import { Request, Response } from 'express';
import {
getClusterMode,
isClusterEnabled,
getCurrentNodeId,
registerNode,
updateNodeHeartbeat,
getActiveNodes,
getAllNodes,
getServerReplicas,
getSessionAffinity,
getClusterStats,
} from '../services/clusterService.js';
import { ClusterNode } from '../types/index.js';
/**
* Get cluster status
* GET /api/cluster/status
*/
export const getClusterStatus = (_req: Request, res: Response): void => {
try {
const enabled = isClusterEnabled();
const mode = getClusterMode();
const nodeId = getCurrentNodeId();
const stats = getClusterStats();
res.json({
success: true,
data: {
enabled,
mode,
nodeId,
stats,
},
});
} catch (error) {
console.error('Error getting cluster status:', error);
res.status(500).json({
success: false,
message: 'Failed to get cluster status',
});
}
};
/**
* Register a node (coordinator only)
* POST /api/cluster/register
*/
export const registerNodeEndpoint = (req: Request, res: Response): void => {
try {
const mode = getClusterMode();
if (mode !== 'coordinator') {
res.status(403).json({
success: false,
message: 'This endpoint is only available on coordinator nodes',
});
return;
}
const nodeInfo: ClusterNode = req.body;
// Validate required fields
if (!nodeInfo.id || !nodeInfo.name || !nodeInfo.url) {
res.status(400).json({
success: false,
message: 'Missing required fields: id, name, url',
});
return;
}
registerNode(nodeInfo);
res.json({
success: true,
message: 'Node registered successfully',
});
} catch (error) {
console.error('Error registering node:', error);
res.status(500).json({
success: false,
message: 'Failed to register node',
});
}
};
/**
* Update node heartbeat (coordinator only)
* POST /api/cluster/heartbeat
*/
export const updateHeartbeat = (req: Request, res: Response): void => {
try {
const mode = getClusterMode();
if (mode !== 'coordinator') {
res.status(403).json({
success: false,
message: 'This endpoint is only available on coordinator nodes',
});
return;
}
const { id, servers } = req.body;
if (!id) {
res.status(400).json({
success: false,
message: 'Missing required field: id',
});
return;
}
updateNodeHeartbeat(id, servers || []);
res.json({
success: true,
message: 'Heartbeat updated successfully',
});
} catch (error) {
console.error('Error updating heartbeat:', error);
res.status(500).json({
success: false,
message: 'Failed to update heartbeat',
});
}
};
/**
* Get all nodes (coordinator only)
* GET /api/cluster/nodes
*/
export const getNodes = (req: Request, res: Response): void => {
try {
const mode = getClusterMode();
if (mode !== 'coordinator') {
res.status(403).json({
success: false,
message: 'This endpoint is only available on coordinator nodes',
});
return;
}
const activeOnly = req.query.active === 'true';
const nodes = activeOnly ? getActiveNodes() : getAllNodes();
res.json({
success: true,
data: nodes,
});
} catch (error) {
console.error('Error getting nodes:', error);
res.status(500).json({
success: false,
message: 'Failed to get nodes',
});
}
};
/**
* Get server replicas (coordinator only)
* GET /api/cluster/servers/:serverId/replicas
*/
export const getReplicasForServer = (req: Request, res: Response): void => {
try {
const mode = getClusterMode();
if (mode !== 'coordinator') {
res.status(403).json({
success: false,
message: 'This endpoint is only available on coordinator nodes',
});
return;
}
const { serverId } = req.params;
const replicas = getServerReplicas(serverId);
res.json({
success: true,
data: replicas,
});
} catch (error) {
console.error('Error getting server replicas:', error);
res.status(500).json({
success: false,
message: 'Failed to get server replicas',
});
}
};
/**
* Get session affinity information (coordinator only)
* GET /api/cluster/sessions/:sessionId
*/
export const getSessionAffinityInfo = (req: Request, res: Response): void => {
try {
const mode = getClusterMode();
if (mode !== 'coordinator') {
res.status(403).json({
success: false,
message: 'This endpoint is only available on coordinator nodes',
});
return;
}
const { sessionId } = req.params;
const affinity = getSessionAffinity(sessionId);
if (!affinity) {
res.status(404).json({
success: false,
message: 'Session affinity not found',
});
return;
}
res.json({
success: true,
data: affinity,
});
} catch (error) {
console.error('Error getting session affinity:', error);
res.status(500).json({
success: false,
message: 'Failed to get session affinity',
});
}
};

View File

@@ -0,0 +1,176 @@
/**
* Cluster Routing Middleware
*
* Handles routing of MCP requests in cluster mode:
* - Determines target node based on session affinity
* - Proxies requests to appropriate nodes
* - Maintains sticky sessions
*/
import { Request, Response, NextFunction } from 'express';
import axios from 'axios';
import {
isClusterEnabled,
getClusterMode,
getNodeForSession,
getCurrentNodeId,
} from '../services/clusterService.js';
/**
* Cluster routing middleware for SSE connections
*/
export const clusterSseRouting = async (
req: Request,
res: Response,
next: NextFunction,
): Promise<void> => {
// If cluster is not enabled or we're in standalone mode, proceed normally
if (!isClusterEnabled() || getClusterMode() === 'standalone') {
next();
return;
}
// Coordinator should handle all requests normally
if (getClusterMode() === 'coordinator') {
// For coordinator, we need to route to appropriate node
await routeToNode(req, res, next);
return;
}
// For regular nodes, proceed normally (they handle their own servers)
next();
};
/**
* Cluster routing middleware for MCP HTTP requests
*/
export const clusterMcpRouting = async (
req: Request,
res: Response,
next: NextFunction,
): Promise<void> => {
// If cluster is not enabled or we're in standalone mode, proceed normally
if (!isClusterEnabled() || getClusterMode() === 'standalone') {
next();
return;
}
// Coordinator should route requests to appropriate nodes
if (getClusterMode() === 'coordinator') {
await routeToNode(req, res, next);
return;
}
// For regular nodes, proceed normally
next();
};
/**
* Route request to appropriate node based on session affinity
*/
const routeToNode = async (
req: Request,
res: Response,
next: NextFunction,
): Promise<void> => {
try {
// Extract session ID from headers or generate new one
const sessionId =
(req.headers['mcp-session-id'] as string) ||
(req.query.sessionId as string) ||
generateSessionId(req);
// Determine target node
const group = req.params.group;
const targetNode = getNodeForSession(sessionId, group, req.headers);
if (!targetNode) {
// No available nodes, return error
res.status(503).json({
success: false,
message: 'No available nodes to handle request',
});
return;
}
// Check if this is the current node
const currentNodeId = getCurrentNodeId();
if (currentNodeId && targetNode.id === currentNodeId) {
// Handle locally
next();
return;
}
// Proxy request to target node
await proxyRequest(req, res, targetNode.url);
} catch (error) {
console.error('Error in cluster routing:', error);
next(error);
}
};
/**
* Generate session ID from request
*/
const generateSessionId = (req: Request): string => {
// Use IP address and user agent as seed for consistent hashing
const seed = `${req.ip}-${req.get('user-agent') || 'unknown'}`;
return Buffer.from(seed).toString('base64');
};
/**
* Proxy request to another node
*/
const proxyRequest = async (
req: Request,
res: Response,
targetUrl: string,
): Promise<void> => {
try {
// Build target URL
const url = new URL(req.originalUrl || req.url, targetUrl);
// Prepare headers (excluding host and connection headers)
const headers: Record<string, string> = {};
for (const [key, value] of Object.entries(req.headers)) {
if (
key.toLowerCase() !== 'host' &&
key.toLowerCase() !== 'connection' &&
value
) {
headers[key] = Array.isArray(value) ? value[0] : value;
}
}
// Forward request to target node
const response = await axios({
method: req.method,
url: url.toString(),
headers,
data: req.body,
responseType: 'stream',
timeout: 30000,
validateStatus: () => true, // Don't throw on any status
});
// Forward response headers
for (const [key, value] of Object.entries(response.headers)) {
if (
key.toLowerCase() !== 'connection' &&
key.toLowerCase() !== 'transfer-encoding'
) {
res.setHeader(key, value as string);
}
}
// Forward status code and stream response
res.status(response.status);
response.data.pipe(res);
} catch (error) {
console.error('Error proxying request:', error);
res.status(502).json({
success: false,
message: 'Failed to proxy request to target node',
});
}
};

View File

@@ -80,6 +80,14 @@ import {
getGroupOpenAPISpec,
} from '../controllers/openApiController.js';
import { handleOAuthCallback } from '../controllers/oauthCallbackController.js';
import {
getClusterStatus,
registerNodeEndpoint,
updateHeartbeat,
getNodes,
getReplicasForServer,
getSessionAffinityInfo,
} from '../controllers/clusterController.js';
import { auth } from '../middlewares/auth.js';
const router = express.Router();
@@ -167,6 +175,14 @@ export const initRoutes = (app: express.Application): void => {
router.delete('/logs', clearLogs);
router.get('/logs/stream', streamLogs);
// Cluster management routes
router.get('/cluster/status', getClusterStatus);
router.post('/cluster/register', registerNodeEndpoint);
router.post('/cluster/heartbeat', updateHeartbeat);
router.get('/cluster/nodes', getNodes);
router.get('/cluster/servers/:serverId/replicas', getReplicasForServer);
router.get('/cluster/sessions/:sessionId', getSessionAffinityInfo);
// MCP settings export route
router.get('/mcp-settings/export', getMcpSettingsJson);

View File

@@ -15,9 +15,11 @@ import {
} from './services/sseService.js';
import { initializeDefaultUser } from './models/User.js';
import { sseUserContextMiddleware } from './middlewares/userContext.js';
import { clusterSseRouting, clusterMcpRouting } from './middlewares/clusterRouting.js';
import { findPackageRoot } from './utils/path.js';
import { getCurrentModuleDir } from './utils/moduleDir.js';
import { initOAuthProvider, getOAuthRouter } from './services/oauthService.js';
import { initClusterService, shutdownClusterService } from './services/clusterService.js';
/**
* Get the directory of the current module
@@ -73,53 +75,74 @@ export class AppServer {
initRoutes(this.app);
console.log('Server initialized successfully');
// Initialize cluster service
await initClusterService();
initUpstreamServers()
.then(() => {
console.log('MCP server initialized successfully');
// Original routes (global and group-based)
this.app.get(`${this.basePath}/sse/:group(.*)?`, sseUserContextMiddleware, (req, res) =>
handleSseConnection(req, res),
// Original routes (global and group-based) with cluster routing
this.app.get(
`${this.basePath}/sse/:group(.*)?`,
sseUserContextMiddleware,
clusterSseRouting,
(req, res) => handleSseConnection(req, res),
);
this.app.post(
`${this.basePath}/messages`,
sseUserContextMiddleware,
clusterSseRouting,
handleSseMessage,
);
this.app.post(`${this.basePath}/messages`, sseUserContextMiddleware, handleSseMessage);
this.app.post(
`${this.basePath}/mcp/:group(.*)?`,
sseUserContextMiddleware,
clusterMcpRouting,
handleMcpPostRequest,
);
this.app.get(
`${this.basePath}/mcp/:group(.*)?`,
sseUserContextMiddleware,
clusterMcpRouting,
handleMcpOtherRequest,
);
this.app.delete(
`${this.basePath}/mcp/:group(.*)?`,
sseUserContextMiddleware,
clusterMcpRouting,
handleMcpOtherRequest,
);
// User-scoped routes with user context middleware
this.app.get(`${this.basePath}/:user/sse/:group(.*)?`, sseUserContextMiddleware, (req, res) =>
handleSseConnection(req, res),
// User-scoped routes with user context middleware and cluster routing
this.app.get(
`${this.basePath}/:user/sse/:group(.*)?`,
sseUserContextMiddleware,
clusterSseRouting,
(req, res) => handleSseConnection(req, res),
);
this.app.post(
`${this.basePath}/:user/messages`,
sseUserContextMiddleware,
clusterSseRouting,
handleSseMessage,
);
this.app.post(
`${this.basePath}/:user/mcp/:group(.*)?`,
sseUserContextMiddleware,
clusterMcpRouting,
handleMcpPostRequest,
);
this.app.get(
`${this.basePath}/:user/mcp/:group(.*)?`,
sseUserContextMiddleware,
clusterMcpRouting,
handleMcpOtherRequest,
);
this.app.delete(
`${this.basePath}/:user/mcp/:group(.*)?`,
sseUserContextMiddleware,
clusterMcpRouting,
handleMcpOtherRequest,
);
})
@@ -191,6 +214,11 @@ export class AppServer {
return this.app;
}
shutdown(): void {
console.log('Shutting down cluster service...');
shutdownClusterService();
}
// Helper method to find frontend dist path in different environments
private findFrontendDistPath(): string | null {
// Debug flag for detailed logging

View File

@@ -0,0 +1,538 @@
/**
* Cluster Service
*
* Manages cluster functionality including:
* - Node registration and discovery
* - Health checking and heartbeats
* - Session affinity (sticky sessions)
* - Load balancing across replicas
*/
import { randomUUID } from 'crypto';
import os from 'os';
import crypto from 'crypto';
import axios from 'axios';
import {
ClusterNode,
ClusterConfig,
ServerReplica,
SessionAffinity,
} from '../types/index.js';
import { loadSettings } from '../config/index.js';
// In-memory storage for cluster state
const nodes: Map<string, ClusterNode> = new Map();
const sessionAffinities: Map<string, SessionAffinity> = new Map();
const serverReplicas: Map<string, ServerReplica[]> = new Map();
let currentNodeId: string | null = null;
let heartbeatIntervalId: NodeJS.Timeout | null = null;
let cleanupIntervalId: NodeJS.Timeout | null = null;
/**
* Get cluster configuration from settings
*/
export const getClusterConfig = (): ClusterConfig | null => {
const settings = loadSettings();
return settings.systemConfig?.cluster || null;
};
/**
* Check if cluster mode is enabled
*/
export const isClusterEnabled = (): boolean => {
const config = getClusterConfig();
return config?.enabled === true;
};
/**
* Get the current node's operating mode
*/
export const getClusterMode = (): 'standalone' | 'node' | 'coordinator' => {
const config = getClusterConfig();
if (!config?.enabled) {
return 'standalone';
}
return config.mode || 'standalone';
};
/**
* Get the current node ID
*/
export const getCurrentNodeId = (): string | null => {
return currentNodeId;
};
/**
* Initialize cluster service based on configuration
*/
export const initClusterService = async (): Promise<void> => {
const config = getClusterConfig();
if (!config?.enabled) {
console.log('Cluster mode is disabled');
return;
}
console.log(`Initializing cluster service in ${config.mode} mode`);
switch (config.mode) {
case 'node':
await initAsNode(config);
break;
case 'coordinator':
await initAsCoordinator(config);
break;
case 'standalone':
default:
console.log('Running in standalone mode');
break;
}
};
/**
* Initialize this instance as a cluster node
*/
const initAsNode = async (config: ClusterConfig): Promise<void> => {
if (!config.node) {
throw new Error('Node configuration is required for cluster node mode');
}
// Generate or use provided node ID
currentNodeId = config.node.id || randomUUID();
const nodeName = config.node.name || os.hostname();
const port = process.env.PORT || 3000;
console.log(`Initializing as cluster node: ${nodeName} (${currentNodeId})`);
// Register with coordinator if enabled
if (config.node.registerOnStartup !== false) {
await registerWithCoordinator(config, nodeName, Number(port));
}
// Start heartbeat to coordinator
const heartbeatInterval = config.node.heartbeatInterval || 5000;
heartbeatIntervalId = setInterval(async () => {
await sendHeartbeat(config, nodeName, Number(port));
}, heartbeatInterval);
console.log(`Node registered with coordinator at ${config.node.coordinatorUrl}`);
};
/**
* Initialize this instance as the coordinator
*/
const initAsCoordinator = async (config: ClusterConfig): Promise<void> => {
currentNodeId = 'coordinator';
console.log('Initializing as cluster coordinator');
// Start cleanup interval for inactive nodes
const cleanupInterval = config.coordinator?.cleanupInterval || 30000;
cleanupIntervalId = setInterval(() => {
cleanupInactiveNodes(config);
}, cleanupInterval);
console.log('Cluster coordinator initialized');
};
/**
* Register this node with the coordinator
*/
const registerWithCoordinator = async (
config: ClusterConfig,
nodeName: string,
port: number,
): Promise<void> => {
if (!config.node?.coordinatorUrl) {
return;
}
const hostname = os.hostname();
const nodeUrl = `http://${hostname}:${port}`;
// Get list of local MCP servers
const settings = loadSettings();
const servers = Object.keys(settings.mcpServers || {});
const nodeInfo: ClusterNode = {
id: currentNodeId!,
name: nodeName,
host: hostname,
port,
url: nodeUrl,
status: 'active',
lastHeartbeat: Date.now(),
servers,
};
try {
await axios.post(
`${config.node.coordinatorUrl}/api/cluster/register`,
nodeInfo,
{ timeout: 5000 }
);
console.log('Successfully registered with coordinator');
} catch (error) {
console.error('Failed to register with coordinator:', error);
}
};
/**
* Send heartbeat to coordinator
*/
const sendHeartbeat = async (
config: ClusterConfig,
nodeName: string,
port: number,
): Promise<void> => {
if (!config.node?.coordinatorUrl || !currentNodeId) {
return;
}
const hostname = os.hostname();
const settings = loadSettings();
const servers = Object.keys(settings.mcpServers || {});
try {
await axios.post(
`${config.node.coordinatorUrl}/api/cluster/heartbeat`,
{
id: currentNodeId,
name: nodeName,
host: hostname,
port,
servers,
timestamp: Date.now(),
},
{ timeout: 5000 }
);
} catch (error) {
console.warn('Failed to send heartbeat to coordinator:', error);
}
};
/**
* Cleanup inactive nodes (coordinator only)
*/
const cleanupInactiveNodes = (config: ClusterConfig): void => {
const timeout = config.coordinator?.nodeTimeout || 15000;
const now = Date.now();
for (const [nodeId, node] of nodes.entries()) {
if (now - node.lastHeartbeat > timeout) {
console.log(`Marking node ${nodeId} as unhealthy (last heartbeat: ${new Date(node.lastHeartbeat).toISOString()})`);
node.status = 'unhealthy';
// Remove server replicas for this node
for (const [serverId, replicas] of serverReplicas.entries()) {
const updatedReplicas = replicas.filter(r => r.nodeId !== nodeId);
if (updatedReplicas.length === 0) {
serverReplicas.delete(serverId);
} else {
serverReplicas.set(serverId, updatedReplicas);
}
}
}
}
// Clean up expired session affinities
const _sessionTimeout = config.coordinator?.stickySessionTimeout || 3600000; // 1 hour
for (const [sessionId, affinity] of sessionAffinities.entries()) {
if (now > affinity.expiresAt) {
sessionAffinities.delete(sessionId);
console.log(`Removed expired session affinity: ${sessionId}`);
}
}
};
/**
* Register a node (coordinator endpoint)
*/
export const registerNode = (nodeInfo: ClusterNode): void => {
nodes.set(nodeInfo.id, {
...nodeInfo,
status: 'active',
lastHeartbeat: Date.now(),
});
// Update server replicas
for (const serverId of nodeInfo.servers) {
const replicas = serverReplicas.get(serverId) || [];
// Check if replica already exists
const existingIndex = replicas.findIndex(r => r.nodeId === nodeInfo.id);
const replica: ServerReplica = {
serverId,
nodeId: nodeInfo.id,
nodeUrl: nodeInfo.url,
status: 'active',
weight: 1,
};
if (existingIndex >= 0) {
replicas[existingIndex] = replica;
} else {
replicas.push(replica);
}
serverReplicas.set(serverId, replicas);
}
console.log(`Node registered: ${nodeInfo.name} (${nodeInfo.id}) with ${nodeInfo.servers.length} servers`);
};
/**
* Update node heartbeat (coordinator endpoint)
*/
export const updateNodeHeartbeat = (nodeId: string, servers: string[]): void => {
const node = nodes.get(nodeId);
if (!node) {
console.warn(`Received heartbeat from unknown node: ${nodeId}`);
return;
}
node.lastHeartbeat = Date.now();
node.status = 'active';
node.servers = servers;
// Update server replicas
const currentReplicas = new Set<string>();
for (const [serverId, replicas] of serverReplicas.entries()) {
for (const replica of replicas) {
if (replica.nodeId === nodeId) {
currentReplicas.add(serverId);
}
}
}
// Add new servers
for (const serverId of servers) {
if (!currentReplicas.has(serverId)) {
const replicas = serverReplicas.get(serverId) || [];
replicas.push({
serverId,
nodeId,
nodeUrl: node.url,
status: 'active',
weight: 1,
});
serverReplicas.set(serverId, replicas);
}
}
// Remove servers that are no longer on this node
for (const serverId of currentReplicas) {
if (!servers.includes(serverId)) {
const replicas = serverReplicas.get(serverId) || [];
const updatedReplicas = replicas.filter(r => r.nodeId !== nodeId);
if (updatedReplicas.length === 0) {
serverReplicas.delete(serverId);
} else {
serverReplicas.set(serverId, updatedReplicas);
}
}
}
};
/**
* Get all active nodes (coordinator)
*/
export const getActiveNodes = (): ClusterNode[] => {
return Array.from(nodes.values()).filter(n => n.status === 'active');
};
/**
* Get all nodes including unhealthy ones (coordinator)
*/
export const getAllNodes = (): ClusterNode[] => {
return Array.from(nodes.values());
};
/**
* Get replicas for a specific server
*/
export const getServerReplicas = (serverId: string): ServerReplica[] => {
return serverReplicas.get(serverId) || [];
};
/**
* Get node for a session using sticky session strategy
*/
export const getNodeForSession = (
sessionId: string,
serverId?: string,
headers?: Record<string, string | string[] | undefined>
): ClusterNode | null => {
const config = getClusterConfig();
if (!config?.enabled || !config.stickySession?.enabled) {
return null;
}
// Check if session already has affinity
const existingAffinity = sessionAffinities.get(sessionId);
if (existingAffinity) {
const node = nodes.get(existingAffinity.nodeId);
if (node && node.status === 'active') {
// Update last accessed time
existingAffinity.lastAccessed = Date.now();
return node;
} else {
// Node is no longer active, remove affinity
sessionAffinities.delete(sessionId);
}
}
// Determine which node to use based on strategy
const strategy = config.stickySession.strategy || 'consistent-hash';
let targetNode: ClusterNode | null = null;
switch (strategy) {
case 'consistent-hash':
targetNode = getNodeByConsistentHash(sessionId, serverId);
break;
case 'cookie':
targetNode = getNodeByCookie(headers, serverId);
break;
case 'header':
targetNode = getNodeByHeader(headers, serverId);
break;
}
if (targetNode) {
// Create session affinity
const timeout = config.coordinator?.stickySessionTimeout || 3600000;
const affinity: SessionAffinity = {
sessionId,
nodeId: targetNode.id,
serverId,
createdAt: Date.now(),
lastAccessed: Date.now(),
expiresAt: Date.now() + timeout,
};
sessionAffinities.set(sessionId, affinity);
}
return targetNode;
};
/**
* Get node using consistent hashing
*/
const getNodeByConsistentHash = (sessionId: string, serverId?: string): ClusterNode | null => {
let availableNodes = getActiveNodes();
// Filter nodes that have the server if serverId is specified
if (serverId) {
const replicas = getServerReplicas(serverId);
const nodeIds = new Set(replicas.filter(r => r.status === 'active').map(r => r.nodeId));
availableNodes = availableNodes.filter(n => nodeIds.has(n.id));
}
if (availableNodes.length === 0) {
return null;
}
// Simple consistent hash: hash session ID and mod by node count
const hash = crypto.createHash('md5').update(sessionId).digest('hex');
const hashNum = parseInt(hash.substring(0, 8), 16);
const index = hashNum % availableNodes.length;
return availableNodes[index];
};
/**
* Get node from cookie
*/
const getNodeByCookie = (
headers?: Record<string, string | string[] | undefined>,
serverId?: string
): ClusterNode | null => {
if (!headers?.cookie) {
return getNodeByConsistentHash(randomUUID(), serverId);
}
const config = getClusterConfig();
const cookieName = config?.stickySession?.cookieName || 'MCPHUB_NODE';
const cookies = (Array.isArray(headers.cookie) ? headers.cookie[0] : headers.cookie) || '';
const cookieMatch = cookies.match(new RegExp(`${cookieName}=([^;]+)`));
if (cookieMatch) {
const nodeId = cookieMatch[1];
const node = nodes.get(nodeId);
if (node && node.status === 'active') {
return node;
}
}
return getNodeByConsistentHash(randomUUID(), serverId);
};
/**
* Get node from header
*/
const getNodeByHeader = (
headers?: Record<string, string | string[] | undefined>,
serverId?: string
): ClusterNode | null => {
const config = getClusterConfig();
const headerName = (config?.stickySession?.headerName || 'X-MCPHub-Node').toLowerCase();
if (headers) {
const nodeId = headers[headerName];
if (nodeId) {
const nodeIdStr = Array.isArray(nodeId) ? nodeId[0] : nodeId;
const node = nodes.get(nodeIdStr);
if (node && node.status === 'active') {
return node;
}
}
}
return getNodeByConsistentHash(randomUUID(), serverId);
};
/**
* Get session affinity info for a session
*/
export const getSessionAffinity = (sessionId: string): SessionAffinity | null => {
return sessionAffinities.get(sessionId) || null;
};
/**
* Remove session affinity
*/
export const removeSessionAffinity = (sessionId: string): void => {
sessionAffinities.delete(sessionId);
};
/**
* Shutdown cluster service
*/
export const shutdownClusterService = (): void => {
if (heartbeatIntervalId) {
clearInterval(heartbeatIntervalId);
heartbeatIntervalId = null;
}
if (cleanupIntervalId) {
clearInterval(cleanupIntervalId);
cleanupIntervalId = null;
}
console.log('Cluster service shut down');
};
/**
* Get cluster statistics
*/
export const getClusterStats = () => {
return {
nodes: nodes.size,
activeNodes: getActiveNodes().length,
servers: serverReplicas.size,
sessions: sessionAffinities.size,
};
};

View File

@@ -171,6 +171,7 @@ export interface SystemConfig {
};
nameSeparator?: string; // Separator used between server name and tool/prompt name (default: '-')
oauth?: OAuthProviderConfig; // OAuth provider configuration for upstream MCP servers
cluster?: ClusterConfig; // Cluster configuration for distributed deployment
}
export interface UserConfig {
@@ -356,3 +357,63 @@ export interface AddServerRequest {
name: string; // Name of the server to add
config: ServerConfig; // Configuration details for the server
}
// Cluster-related types
// Cluster node information
export interface ClusterNode {
id: string; // Unique identifier for the node (e.g., UUID)
name: string; // Human-readable name of the node
host: string; // Hostname or IP address
port: number; // Port number the node is running on
url: string; // Full URL to access the node (e.g., 'http://node1:3000')
status: 'active' | 'inactive' | 'unhealthy'; // Current status of the node
lastHeartbeat: number; // Timestamp of last heartbeat
servers: string[]; // List of MCP server names hosted on this node
metadata?: Record<string, any>; // Additional metadata about the node
}
// Cluster configuration
export interface ClusterConfig {
enabled: boolean; // Whether cluster mode is enabled
mode: 'standalone' | 'node' | 'coordinator'; // Cluster operating mode
node?: {
// Configuration when running as a cluster node
id?: string; // Node ID (generated if not provided)
name?: string; // Node name (defaults to hostname)
coordinatorUrl: string; // URL of the coordinator node
heartbeatInterval?: number; // Heartbeat interval in milliseconds (default: 5000)
registerOnStartup?: boolean; // Whether to register with coordinator on startup (default: true)
};
coordinator?: {
// Configuration when running as coordinator
nodeTimeout?: number; // Time in ms before marking a node as unhealthy (default: 15000)
cleanupInterval?: number; // Interval for cleaning up inactive nodes (default: 30000)
stickySessionTimeout?: number; // Sticky session timeout in milliseconds (default: 3600000, 1 hour)
};
stickySession?: {
enabled: boolean; // Whether sticky sessions are enabled (default: true for cluster mode)
strategy: 'consistent-hash' | 'cookie' | 'header'; // Strategy for session affinity (default: consistent-hash)
cookieName?: string; // Cookie name for cookie-based sticky sessions (default: 'MCPHUB_NODE')
headerName?: string; // Header name for header-based sticky sessions (default: 'X-MCPHub-Node')
};
}
// Cluster server replica configuration
export interface ServerReplica {
serverId: string; // MCP server identifier
nodeId: string; // Node hosting this replica
nodeUrl: string; // URL to access this replica
status: 'active' | 'inactive'; // Status of this replica
weight?: number; // Load balancing weight (default: 1)
}
// Session affinity information
export interface SessionAffinity {
sessionId: string; // Session identifier
nodeId: string; // Node ID for this session
serverId?: string; // Optional: specific server this session is bound to
createdAt: number; // Timestamp when session was created
lastAccessed: number; // Timestamp of last access
expiresAt: number; // Timestamp when session expires
}

View File

@@ -0,0 +1,335 @@
/**
* Cluster Service Tests
*/
import {
isClusterEnabled,
getClusterMode,
getCurrentNodeId,
registerNode,
updateNodeHeartbeat,
getActiveNodes,
getAllNodes,
getServerReplicas,
getNodeForSession,
getSessionAffinity,
removeSessionAffinity,
getClusterStats,
shutdownClusterService,
} from '../../src/services/clusterService';
import { ClusterNode } from '../../src/types/index';
// Mock the config module
jest.mock('../../src/config/index.js', () => ({
loadSettings: jest.fn(),
}));
const { loadSettings } = require('../../src/config/index.js');
describe('Cluster Service', () => {
beforeEach(() => {
jest.clearAllMocks();
});
afterEach(() => {
// Clean up cluster service to reset state
shutdownClusterService();
});
describe('Configuration', () => {
it('should return false when cluster is not enabled', () => {
loadSettings.mockReturnValue({
mcpServers: {},
});
expect(isClusterEnabled()).toBe(false);
});
it('should return true when cluster is enabled', () => {
loadSettings.mockReturnValue({
mcpServers: {},
systemConfig: {
cluster: {
enabled: true,
mode: 'coordinator',
},
},
});
expect(isClusterEnabled()).toBe(true);
});
it('should return standalone mode when cluster is not configured', () => {
loadSettings.mockReturnValue({
mcpServers: {},
});
expect(getClusterMode()).toBe('standalone');
});
it('should return configured mode when cluster is enabled', () => {
loadSettings.mockReturnValue({
mcpServers: {},
systemConfig: {
cluster: {
enabled: true,
mode: 'coordinator',
},
},
});
expect(getClusterMode()).toBe('coordinator');
});
});
describe('Node Management', () => {
beforeEach(() => {
loadSettings.mockReturnValue({
mcpServers: {},
systemConfig: {
cluster: {
enabled: true,
mode: 'coordinator',
},
},
});
});
it('should register a new node', () => {
const node: ClusterNode = {
id: 'node-test-1',
name: 'Test Node 1',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['server1', 'server2'],
};
registerNode(node);
const nodes = getAllNodes();
// Find our node (there might be others from previous tests)
const registeredNode = nodes.find(n => n.id === 'node-test-1');
expect(registeredNode).toBeTruthy();
expect(registeredNode?.name).toBe('Test Node 1');
expect(registeredNode?.servers).toEqual(['server1', 'server2']);
});
it('should update node heartbeat', () => {
const node: ClusterNode = {
id: 'node-test-2',
name: 'Test Node 2',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now() - 10000,
servers: ['server1'],
};
registerNode(node);
const beforeHeartbeat = getAllNodes().find(n => n.id === 'node-test-2')?.lastHeartbeat || 0;
// Wait a bit to ensure timestamp changes
setTimeout(() => {
updateNodeHeartbeat('node-test-2', ['server1', 'server2']);
const updatedNode = getAllNodes().find(n => n.id === 'node-test-2');
const afterHeartbeat = updatedNode?.lastHeartbeat || 0;
expect(afterHeartbeat).toBeGreaterThan(beforeHeartbeat);
expect(updatedNode?.servers).toEqual(['server1', 'server2']);
}, 10);
});
it('should get active nodes only', () => {
const node1: ClusterNode = {
id: 'node-active-1',
name: 'Active Node',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['server1'],
};
registerNode(node1);
const activeNodes = getActiveNodes();
const activeNode = activeNodes.find(n => n.id === 'node-active-1');
expect(activeNode).toBeTruthy();
expect(activeNode?.status).toBe('active');
});
});
describe('Server Replicas', () => {
beforeEach(() => {
loadSettings.mockReturnValue({
mcpServers: {},
systemConfig: {
cluster: {
enabled: true,
mode: 'coordinator',
},
},
});
});
it('should track server replicas across nodes', () => {
const node1: ClusterNode = {
id: 'node-replica-1',
name: 'Node 1',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['test-server-1', 'test-server-2'],
};
const node2: ClusterNode = {
id: 'node-replica-2',
name: 'Node 2',
host: 'localhost',
port: 3002,
url: 'http://localhost:3002',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['test-server-1', 'test-server-3'],
};
registerNode(node1);
registerNode(node2);
const server1Replicas = getServerReplicas('test-server-1');
expect(server1Replicas.length).toBeGreaterThanOrEqual(2);
expect(server1Replicas.map(r => r.nodeId)).toContain('node-replica-1');
expect(server1Replicas.map(r => r.nodeId)).toContain('node-replica-2');
});
});
describe('Session Affinity', () => {
beforeEach(() => {
loadSettings.mockReturnValue({
mcpServers: {},
systemConfig: {
cluster: {
enabled: true,
mode: 'coordinator',
stickySession: {
enabled: true,
strategy: 'consistent-hash',
},
},
},
});
});
it('should maintain session affinity with consistent hash', () => {
const node1: ClusterNode = {
id: 'node-affinity-1',
name: 'Node 1',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['server1'],
};
registerNode(node1);
const sessionId = 'test-session-consistent-hash';
const firstNode = getNodeForSession(sessionId);
const secondNode = getNodeForSession(sessionId);
expect(firstNode).toBeTruthy();
expect(secondNode).toBeTruthy();
expect(firstNode?.id).toBe(secondNode?.id);
});
it('should create and retrieve session affinity', () => {
const node1: ClusterNode = {
id: 'node-affinity-2',
name: 'Node 1',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['server1'],
};
registerNode(node1);
const sessionId = 'test-session-retrieve';
const selectedNode = getNodeForSession(sessionId);
const affinity = getSessionAffinity(sessionId);
expect(affinity).toBeTruthy();
expect(affinity?.sessionId).toBe(sessionId);
expect(affinity?.nodeId).toBe(selectedNode?.id);
});
it('should remove session affinity', () => {
const node1: ClusterNode = {
id: 'node-affinity-3',
name: 'Node 1',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['server1'],
};
registerNode(node1);
const sessionId = 'test-session-remove';
getNodeForSession(sessionId);
let affinity = getSessionAffinity(sessionId);
expect(affinity).toBeTruthy();
removeSessionAffinity(sessionId);
affinity = getSessionAffinity(sessionId);
expect(affinity).toBeNull();
});
});
describe('Cluster Statistics', () => {
beforeEach(() => {
loadSettings.mockReturnValue({
mcpServers: {},
systemConfig: {
cluster: {
enabled: true,
mode: 'coordinator',
},
},
});
});
it('should return cluster statistics', () => {
const node1: ClusterNode = {
id: 'node-stats-1',
name: 'Node 1',
host: 'localhost',
port: 3001,
url: 'http://localhost:3001',
status: 'active',
lastHeartbeat: Date.now(),
servers: ['unique-server-1', 'unique-server-2'],
};
registerNode(node1);
const stats = getClusterStats();
expect(stats.nodes).toBeGreaterThanOrEqual(1);
expect(stats.activeNodes).toBeGreaterThanOrEqual(1);
expect(stats.servers).toBeGreaterThanOrEqual(2);
});
});
});