diff --git a/src/controllers/clusterController.ts b/src/controllers/clusterController.ts new file mode 100644 index 0000000..58abc4e --- /dev/null +++ b/src/controllers/clusterController.ts @@ -0,0 +1,240 @@ +/** + * Cluster Controller + * + * Handles cluster-related API endpoints: + * - Node registration + * - Heartbeat updates + * - Cluster status queries + * - Session affinity management + */ + +import { Request, Response } from 'express'; +import { + getClusterMode, + isClusterEnabled, + getCurrentNodeId, + registerNode, + updateNodeHeartbeat, + getActiveNodes, + getAllNodes, + getServerReplicas, + getSessionAffinity, + getClusterStats, +} from '../services/clusterService.js'; +import { ClusterNode } from '../types/index.js'; + +/** + * Get cluster status + * GET /api/cluster/status + */ +export const getClusterStatus = (_req: Request, res: Response): void => { + try { + const enabled = isClusterEnabled(); + const mode = getClusterMode(); + const nodeId = getCurrentNodeId(); + const stats = getClusterStats(); + + res.json({ + success: true, + data: { + enabled, + mode, + nodeId, + stats, + }, + }); + } catch (error) { + console.error('Error getting cluster status:', error); + res.status(500).json({ + success: false, + message: 'Failed to get cluster status', + }); + } +}; + +/** + * Register a node (coordinator only) + * POST /api/cluster/register + */ +export const registerNodeEndpoint = (req: Request, res: Response): void => { + try { + const mode = getClusterMode(); + + if (mode !== 'coordinator') { + res.status(403).json({ + success: false, + message: 'This endpoint is only available on coordinator nodes', + }); + return; + } + + const nodeInfo: ClusterNode = req.body; + + // Validate required fields + if (!nodeInfo.id || !nodeInfo.name || !nodeInfo.url) { + res.status(400).json({ + success: false, + message: 'Missing required fields: id, name, url', + }); + return; + } + + registerNode(nodeInfo); + + res.json({ + success: true, + message: 'Node registered successfully', + }); + } catch (error) { + console.error('Error registering node:', error); + res.status(500).json({ + success: false, + message: 'Failed to register node', + }); + } +}; + +/** + * Update node heartbeat (coordinator only) + * POST /api/cluster/heartbeat + */ +export const updateHeartbeat = (req: Request, res: Response): void => { + try { + const mode = getClusterMode(); + + if (mode !== 'coordinator') { + res.status(403).json({ + success: false, + message: 'This endpoint is only available on coordinator nodes', + }); + return; + } + + const { id, servers } = req.body; + + if (!id) { + res.status(400).json({ + success: false, + message: 'Missing required field: id', + }); + return; + } + + updateNodeHeartbeat(id, servers || []); + + res.json({ + success: true, + message: 'Heartbeat updated successfully', + }); + } catch (error) { + console.error('Error updating heartbeat:', error); + res.status(500).json({ + success: false, + message: 'Failed to update heartbeat', + }); + } +}; + +/** + * Get all nodes (coordinator only) + * GET /api/cluster/nodes + */ +export const getNodes = (req: Request, res: Response): void => { + try { + const mode = getClusterMode(); + + if (mode !== 'coordinator') { + res.status(403).json({ + success: false, + message: 'This endpoint is only available on coordinator nodes', + }); + return; + } + + const activeOnly = req.query.active === 'true'; + const nodes = activeOnly ? getActiveNodes() : getAllNodes(); + + res.json({ + success: true, + data: nodes, + }); + } catch (error) { + console.error('Error getting nodes:', error); + res.status(500).json({ + success: false, + message: 'Failed to get nodes', + }); + } +}; + +/** + * Get server replicas (coordinator only) + * GET /api/cluster/servers/:serverId/replicas + */ +export const getReplicasForServer = (req: Request, res: Response): void => { + try { + const mode = getClusterMode(); + + if (mode !== 'coordinator') { + res.status(403).json({ + success: false, + message: 'This endpoint is only available on coordinator nodes', + }); + return; + } + + const { serverId } = req.params; + const replicas = getServerReplicas(serverId); + + res.json({ + success: true, + data: replicas, + }); + } catch (error) { + console.error('Error getting server replicas:', error); + res.status(500).json({ + success: false, + message: 'Failed to get server replicas', + }); + } +}; + +/** + * Get session affinity information (coordinator only) + * GET /api/cluster/sessions/:sessionId + */ +export const getSessionAffinityInfo = (req: Request, res: Response): void => { + try { + const mode = getClusterMode(); + + if (mode !== 'coordinator') { + res.status(403).json({ + success: false, + message: 'This endpoint is only available on coordinator nodes', + }); + return; + } + + const { sessionId } = req.params; + const affinity = getSessionAffinity(sessionId); + + if (!affinity) { + res.status(404).json({ + success: false, + message: 'Session affinity not found', + }); + return; + } + + res.json({ + success: true, + data: affinity, + }); + } catch (error) { + console.error('Error getting session affinity:', error); + res.status(500).json({ + success: false, + message: 'Failed to get session affinity', + }); + } +}; diff --git a/src/middlewares/clusterRouting.ts b/src/middlewares/clusterRouting.ts new file mode 100644 index 0000000..15f7e01 --- /dev/null +++ b/src/middlewares/clusterRouting.ts @@ -0,0 +1,176 @@ +/** + * Cluster Routing Middleware + * + * Handles routing of MCP requests in cluster mode: + * - Determines target node based on session affinity + * - Proxies requests to appropriate nodes + * - Maintains sticky sessions + */ + +import { Request, Response, NextFunction } from 'express'; +import axios from 'axios'; +import { + isClusterEnabled, + getClusterMode, + getNodeForSession, + getCurrentNodeId, +} from '../services/clusterService.js'; + +/** + * Cluster routing middleware for SSE connections + */ +export const clusterSseRouting = async ( + req: Request, + res: Response, + next: NextFunction, +): Promise => { + // If cluster is not enabled or we're in standalone mode, proceed normally + if (!isClusterEnabled() || getClusterMode() === 'standalone') { + next(); + return; + } + + // Coordinator should handle all requests normally + if (getClusterMode() === 'coordinator') { + // For coordinator, we need to route to appropriate node + await routeToNode(req, res, next); + return; + } + + // For regular nodes, proceed normally (they handle their own servers) + next(); +}; + +/** + * Cluster routing middleware for MCP HTTP requests + */ +export const clusterMcpRouting = async ( + req: Request, + res: Response, + next: NextFunction, +): Promise => { + // If cluster is not enabled or we're in standalone mode, proceed normally + if (!isClusterEnabled() || getClusterMode() === 'standalone') { + next(); + return; + } + + // Coordinator should route requests to appropriate nodes + if (getClusterMode() === 'coordinator') { + await routeToNode(req, res, next); + return; + } + + // For regular nodes, proceed normally + next(); +}; + +/** + * Route request to appropriate node based on session affinity + */ +const routeToNode = async ( + req: Request, + res: Response, + next: NextFunction, +): Promise => { + try { + // Extract session ID from headers or generate new one + const sessionId = + (req.headers['mcp-session-id'] as string) || + (req.query.sessionId as string) || + generateSessionId(req); + + // Determine target node + const group = req.params.group; + const targetNode = getNodeForSession(sessionId, group, req.headers); + + if (!targetNode) { + // No available nodes, return error + res.status(503).json({ + success: false, + message: 'No available nodes to handle request', + }); + return; + } + + // Check if this is the current node + const currentNodeId = getCurrentNodeId(); + if (currentNodeId && targetNode.id === currentNodeId) { + // Handle locally + next(); + return; + } + + // Proxy request to target node + await proxyRequest(req, res, targetNode.url); + } catch (error) { + console.error('Error in cluster routing:', error); + next(error); + } +}; + +/** + * Generate session ID from request + */ +const generateSessionId = (req: Request): string => { + // Use IP address and user agent as seed for consistent hashing + const seed = `${req.ip}-${req.get('user-agent') || 'unknown'}`; + return Buffer.from(seed).toString('base64'); +}; + +/** + * Proxy request to another node + */ +const proxyRequest = async ( + req: Request, + res: Response, + targetUrl: string, +): Promise => { + try { + // Build target URL + const url = new URL(req.originalUrl || req.url, targetUrl); + + // Prepare headers (excluding host and connection headers) + const headers: Record = {}; + for (const [key, value] of Object.entries(req.headers)) { + if ( + key.toLowerCase() !== 'host' && + key.toLowerCase() !== 'connection' && + value + ) { + headers[key] = Array.isArray(value) ? value[0] : value; + } + } + + // Forward request to target node + const response = await axios({ + method: req.method, + url: url.toString(), + headers, + data: req.body, + responseType: 'stream', + timeout: 30000, + validateStatus: () => true, // Don't throw on any status + }); + + // Forward response headers + for (const [key, value] of Object.entries(response.headers)) { + if ( + key.toLowerCase() !== 'connection' && + key.toLowerCase() !== 'transfer-encoding' + ) { + res.setHeader(key, value as string); + } + } + + // Forward status code and stream response + res.status(response.status); + response.data.pipe(res); + } catch (error) { + console.error('Error proxying request:', error); + res.status(502).json({ + success: false, + message: 'Failed to proxy request to target node', + }); + } +}; diff --git a/src/routes/index.ts b/src/routes/index.ts index 2bfc3c0..ff3000f 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -80,6 +80,14 @@ import { getGroupOpenAPISpec, } from '../controllers/openApiController.js'; import { handleOAuthCallback } from '../controllers/oauthCallbackController.js'; +import { + getClusterStatus, + registerNodeEndpoint, + updateHeartbeat, + getNodes, + getReplicasForServer, + getSessionAffinityInfo, +} from '../controllers/clusterController.js'; import { auth } from '../middlewares/auth.js'; const router = express.Router(); @@ -167,6 +175,14 @@ export const initRoutes = (app: express.Application): void => { router.delete('/logs', clearLogs); router.get('/logs/stream', streamLogs); + // Cluster management routes + router.get('/cluster/status', getClusterStatus); + router.post('/cluster/register', registerNodeEndpoint); + router.post('/cluster/heartbeat', updateHeartbeat); + router.get('/cluster/nodes', getNodes); + router.get('/cluster/servers/:serverId/replicas', getReplicasForServer); + router.get('/cluster/sessions/:sessionId', getSessionAffinityInfo); + // MCP settings export route router.get('/mcp-settings/export', getMcpSettingsJson); diff --git a/src/server.ts b/src/server.ts index 7968d52..6a5b655 100644 --- a/src/server.ts +++ b/src/server.ts @@ -15,9 +15,11 @@ import { } from './services/sseService.js'; import { initializeDefaultUser } from './models/User.js'; import { sseUserContextMiddleware } from './middlewares/userContext.js'; +import { clusterSseRouting, clusterMcpRouting } from './middlewares/clusterRouting.js'; import { findPackageRoot } from './utils/path.js'; import { getCurrentModuleDir } from './utils/moduleDir.js'; import { initOAuthProvider, getOAuthRouter } from './services/oauthService.js'; +import { initClusterService, shutdownClusterService } from './services/clusterService.js'; /** * Get the directory of the current module @@ -73,53 +75,74 @@ export class AppServer { initRoutes(this.app); console.log('Server initialized successfully'); + // Initialize cluster service + await initClusterService(); + initUpstreamServers() .then(() => { console.log('MCP server initialized successfully'); - // Original routes (global and group-based) - this.app.get(`${this.basePath}/sse/:group(.*)?`, sseUserContextMiddleware, (req, res) => - handleSseConnection(req, res), + // Original routes (global and group-based) with cluster routing + this.app.get( + `${this.basePath}/sse/:group(.*)?`, + sseUserContextMiddleware, + clusterSseRouting, + (req, res) => handleSseConnection(req, res), + ); + this.app.post( + `${this.basePath}/messages`, + sseUserContextMiddleware, + clusterSseRouting, + handleSseMessage, ); - this.app.post(`${this.basePath}/messages`, sseUserContextMiddleware, handleSseMessage); this.app.post( `${this.basePath}/mcp/:group(.*)?`, sseUserContextMiddleware, + clusterMcpRouting, handleMcpPostRequest, ); this.app.get( `${this.basePath}/mcp/:group(.*)?`, sseUserContextMiddleware, + clusterMcpRouting, handleMcpOtherRequest, ); this.app.delete( `${this.basePath}/mcp/:group(.*)?`, sseUserContextMiddleware, + clusterMcpRouting, handleMcpOtherRequest, ); - // User-scoped routes with user context middleware - this.app.get(`${this.basePath}/:user/sse/:group(.*)?`, sseUserContextMiddleware, (req, res) => - handleSseConnection(req, res), + // User-scoped routes with user context middleware and cluster routing + this.app.get( + `${this.basePath}/:user/sse/:group(.*)?`, + sseUserContextMiddleware, + clusterSseRouting, + (req, res) => handleSseConnection(req, res), ); this.app.post( `${this.basePath}/:user/messages`, sseUserContextMiddleware, + clusterSseRouting, handleSseMessage, ); this.app.post( `${this.basePath}/:user/mcp/:group(.*)?`, sseUserContextMiddleware, + clusterMcpRouting, handleMcpPostRequest, ); this.app.get( `${this.basePath}/:user/mcp/:group(.*)?`, sseUserContextMiddleware, + clusterMcpRouting, handleMcpOtherRequest, ); this.app.delete( `${this.basePath}/:user/mcp/:group(.*)?`, sseUserContextMiddleware, + clusterMcpRouting, handleMcpOtherRequest, ); }) @@ -191,6 +214,11 @@ export class AppServer { return this.app; } + shutdown(): void { + console.log('Shutting down cluster service...'); + shutdownClusterService(); + } + // Helper method to find frontend dist path in different environments private findFrontendDistPath(): string | null { // Debug flag for detailed logging diff --git a/src/services/clusterService.ts b/src/services/clusterService.ts new file mode 100644 index 0000000..37c64af --- /dev/null +++ b/src/services/clusterService.ts @@ -0,0 +1,538 @@ +/** + * Cluster Service + * + * Manages cluster functionality including: + * - Node registration and discovery + * - Health checking and heartbeats + * - Session affinity (sticky sessions) + * - Load balancing across replicas + */ + +import { randomUUID } from 'crypto'; +import os from 'os'; +import crypto from 'crypto'; +import axios from 'axios'; +import { + ClusterNode, + ClusterConfig, + ServerReplica, + SessionAffinity, +} from '../types/index.js'; +import { loadSettings } from '../config/index.js'; + +// In-memory storage for cluster state +const nodes: Map = new Map(); +const sessionAffinities: Map = new Map(); +const serverReplicas: Map = new Map(); +let currentNodeId: string | null = null; +let heartbeatIntervalId: NodeJS.Timeout | null = null; +let cleanupIntervalId: NodeJS.Timeout | null = null; + +/** + * Get cluster configuration from settings + */ +export const getClusterConfig = (): ClusterConfig | null => { + const settings = loadSettings(); + return settings.systemConfig?.cluster || null; +}; + +/** + * Check if cluster mode is enabled + */ +export const isClusterEnabled = (): boolean => { + const config = getClusterConfig(); + return config?.enabled === true; +}; + +/** + * Get the current node's operating mode + */ +export const getClusterMode = (): 'standalone' | 'node' | 'coordinator' => { + const config = getClusterConfig(); + if (!config?.enabled) { + return 'standalone'; + } + return config.mode || 'standalone'; +}; + +/** + * Get the current node ID + */ +export const getCurrentNodeId = (): string | null => { + return currentNodeId; +}; + +/** + * Initialize cluster service based on configuration + */ +export const initClusterService = async (): Promise => { + const config = getClusterConfig(); + + if (!config?.enabled) { + console.log('Cluster mode is disabled'); + return; + } + + console.log(`Initializing cluster service in ${config.mode} mode`); + + switch (config.mode) { + case 'node': + await initAsNode(config); + break; + case 'coordinator': + await initAsCoordinator(config); + break; + case 'standalone': + default: + console.log('Running in standalone mode'); + break; + } +}; + +/** + * Initialize this instance as a cluster node + */ +const initAsNode = async (config: ClusterConfig): Promise => { + if (!config.node) { + throw new Error('Node configuration is required for cluster node mode'); + } + + // Generate or use provided node ID + currentNodeId = config.node.id || randomUUID(); + + const nodeName = config.node.name || os.hostname(); + const port = process.env.PORT || 3000; + + console.log(`Initializing as cluster node: ${nodeName} (${currentNodeId})`); + + // Register with coordinator if enabled + if (config.node.registerOnStartup !== false) { + await registerWithCoordinator(config, nodeName, Number(port)); + } + + // Start heartbeat to coordinator + const heartbeatInterval = config.node.heartbeatInterval || 5000; + heartbeatIntervalId = setInterval(async () => { + await sendHeartbeat(config, nodeName, Number(port)); + }, heartbeatInterval); + + console.log(`Node registered with coordinator at ${config.node.coordinatorUrl}`); +}; + +/** + * Initialize this instance as the coordinator + */ +const initAsCoordinator = async (config: ClusterConfig): Promise => { + currentNodeId = 'coordinator'; + + console.log('Initializing as cluster coordinator'); + + // Start cleanup interval for inactive nodes + const cleanupInterval = config.coordinator?.cleanupInterval || 30000; + cleanupIntervalId = setInterval(() => { + cleanupInactiveNodes(config); + }, cleanupInterval); + + console.log('Cluster coordinator initialized'); +}; + +/** + * Register this node with the coordinator + */ +const registerWithCoordinator = async ( + config: ClusterConfig, + nodeName: string, + port: number, +): Promise => { + if (!config.node?.coordinatorUrl) { + return; + } + + const hostname = os.hostname(); + const nodeUrl = `http://${hostname}:${port}`; + + // Get list of local MCP servers + const settings = loadSettings(); + const servers = Object.keys(settings.mcpServers || {}); + + const nodeInfo: ClusterNode = { + id: currentNodeId!, + name: nodeName, + host: hostname, + port, + url: nodeUrl, + status: 'active', + lastHeartbeat: Date.now(), + servers, + }; + + try { + await axios.post( + `${config.node.coordinatorUrl}/api/cluster/register`, + nodeInfo, + { timeout: 5000 } + ); + console.log('Successfully registered with coordinator'); + } catch (error) { + console.error('Failed to register with coordinator:', error); + } +}; + +/** + * Send heartbeat to coordinator + */ +const sendHeartbeat = async ( + config: ClusterConfig, + nodeName: string, + port: number, +): Promise => { + if (!config.node?.coordinatorUrl || !currentNodeId) { + return; + } + + const hostname = os.hostname(); + const settings = loadSettings(); + const servers = Object.keys(settings.mcpServers || {}); + + try { + await axios.post( + `${config.node.coordinatorUrl}/api/cluster/heartbeat`, + { + id: currentNodeId, + name: nodeName, + host: hostname, + port, + servers, + timestamp: Date.now(), + }, + { timeout: 5000 } + ); + } catch (error) { + console.warn('Failed to send heartbeat to coordinator:', error); + } +}; + +/** + * Cleanup inactive nodes (coordinator only) + */ +const cleanupInactiveNodes = (config: ClusterConfig): void => { + const timeout = config.coordinator?.nodeTimeout || 15000; + const now = Date.now(); + + for (const [nodeId, node] of nodes.entries()) { + if (now - node.lastHeartbeat > timeout) { + console.log(`Marking node ${nodeId} as unhealthy (last heartbeat: ${new Date(node.lastHeartbeat).toISOString()})`); + node.status = 'unhealthy'; + + // Remove server replicas for this node + for (const [serverId, replicas] of serverReplicas.entries()) { + const updatedReplicas = replicas.filter(r => r.nodeId !== nodeId); + if (updatedReplicas.length === 0) { + serverReplicas.delete(serverId); + } else { + serverReplicas.set(serverId, updatedReplicas); + } + } + } + } + + // Clean up expired session affinities + const _sessionTimeout = config.coordinator?.stickySessionTimeout || 3600000; // 1 hour + for (const [sessionId, affinity] of sessionAffinities.entries()) { + if (now > affinity.expiresAt) { + sessionAffinities.delete(sessionId); + console.log(`Removed expired session affinity: ${sessionId}`); + } + } +}; + +/** + * Register a node (coordinator endpoint) + */ +export const registerNode = (nodeInfo: ClusterNode): void => { + nodes.set(nodeInfo.id, { + ...nodeInfo, + status: 'active', + lastHeartbeat: Date.now(), + }); + + // Update server replicas + for (const serverId of nodeInfo.servers) { + const replicas = serverReplicas.get(serverId) || []; + + // Check if replica already exists + const existingIndex = replicas.findIndex(r => r.nodeId === nodeInfo.id); + const replica: ServerReplica = { + serverId, + nodeId: nodeInfo.id, + nodeUrl: nodeInfo.url, + status: 'active', + weight: 1, + }; + + if (existingIndex >= 0) { + replicas[existingIndex] = replica; + } else { + replicas.push(replica); + } + + serverReplicas.set(serverId, replicas); + } + + console.log(`Node registered: ${nodeInfo.name} (${nodeInfo.id}) with ${nodeInfo.servers.length} servers`); +}; + +/** + * Update node heartbeat (coordinator endpoint) + */ +export const updateNodeHeartbeat = (nodeId: string, servers: string[]): void => { + const node = nodes.get(nodeId); + if (!node) { + console.warn(`Received heartbeat from unknown node: ${nodeId}`); + return; + } + + node.lastHeartbeat = Date.now(); + node.status = 'active'; + node.servers = servers; + + // Update server replicas + const currentReplicas = new Set(); + for (const [serverId, replicas] of serverReplicas.entries()) { + for (const replica of replicas) { + if (replica.nodeId === nodeId) { + currentReplicas.add(serverId); + } + } + } + + // Add new servers + for (const serverId of servers) { + if (!currentReplicas.has(serverId)) { + const replicas = serverReplicas.get(serverId) || []; + replicas.push({ + serverId, + nodeId, + nodeUrl: node.url, + status: 'active', + weight: 1, + }); + serverReplicas.set(serverId, replicas); + } + } + + // Remove servers that are no longer on this node + for (const serverId of currentReplicas) { + if (!servers.includes(serverId)) { + const replicas = serverReplicas.get(serverId) || []; + const updatedReplicas = replicas.filter(r => r.nodeId !== nodeId); + if (updatedReplicas.length === 0) { + serverReplicas.delete(serverId); + } else { + serverReplicas.set(serverId, updatedReplicas); + } + } + } +}; + +/** + * Get all active nodes (coordinator) + */ +export const getActiveNodes = (): ClusterNode[] => { + return Array.from(nodes.values()).filter(n => n.status === 'active'); +}; + +/** + * Get all nodes including unhealthy ones (coordinator) + */ +export const getAllNodes = (): ClusterNode[] => { + return Array.from(nodes.values()); +}; + +/** + * Get replicas for a specific server + */ +export const getServerReplicas = (serverId: string): ServerReplica[] => { + return serverReplicas.get(serverId) || []; +}; + +/** + * Get node for a session using sticky session strategy + */ +export const getNodeForSession = ( + sessionId: string, + serverId?: string, + headers?: Record +): ClusterNode | null => { + const config = getClusterConfig(); + + if (!config?.enabled || !config.stickySession?.enabled) { + return null; + } + + // Check if session already has affinity + const existingAffinity = sessionAffinities.get(sessionId); + if (existingAffinity) { + const node = nodes.get(existingAffinity.nodeId); + if (node && node.status === 'active') { + // Update last accessed time + existingAffinity.lastAccessed = Date.now(); + return node; + } else { + // Node is no longer active, remove affinity + sessionAffinities.delete(sessionId); + } + } + + // Determine which node to use based on strategy + const strategy = config.stickySession.strategy || 'consistent-hash'; + let targetNode: ClusterNode | null = null; + + switch (strategy) { + case 'consistent-hash': + targetNode = getNodeByConsistentHash(sessionId, serverId); + break; + case 'cookie': + targetNode = getNodeByCookie(headers, serverId); + break; + case 'header': + targetNode = getNodeByHeader(headers, serverId); + break; + } + + if (targetNode) { + // Create session affinity + const timeout = config.coordinator?.stickySessionTimeout || 3600000; + const affinity: SessionAffinity = { + sessionId, + nodeId: targetNode.id, + serverId, + createdAt: Date.now(), + lastAccessed: Date.now(), + expiresAt: Date.now() + timeout, + }; + sessionAffinities.set(sessionId, affinity); + } + + return targetNode; +}; + +/** + * Get node using consistent hashing + */ +const getNodeByConsistentHash = (sessionId: string, serverId?: string): ClusterNode | null => { + let availableNodes = getActiveNodes(); + + // Filter nodes that have the server if serverId is specified + if (serverId) { + const replicas = getServerReplicas(serverId); + const nodeIds = new Set(replicas.filter(r => r.status === 'active').map(r => r.nodeId)); + availableNodes = availableNodes.filter(n => nodeIds.has(n.id)); + } + + if (availableNodes.length === 0) { + return null; + } + + // Simple consistent hash: hash session ID and mod by node count + const hash = crypto.createHash('md5').update(sessionId).digest('hex'); + const hashNum = parseInt(hash.substring(0, 8), 16); + const index = hashNum % availableNodes.length; + + return availableNodes[index]; +}; + +/** + * Get node from cookie + */ +const getNodeByCookie = ( + headers?: Record, + serverId?: string +): ClusterNode | null => { + if (!headers?.cookie) { + return getNodeByConsistentHash(randomUUID(), serverId); + } + + const config = getClusterConfig(); + const cookieName = config?.stickySession?.cookieName || 'MCPHUB_NODE'; + + const cookies = (Array.isArray(headers.cookie) ? headers.cookie[0] : headers.cookie) || ''; + const cookieMatch = cookies.match(new RegExp(`${cookieName}=([^;]+)`)); + + if (cookieMatch) { + const nodeId = cookieMatch[1]; + const node = nodes.get(nodeId); + if (node && node.status === 'active') { + return node; + } + } + + return getNodeByConsistentHash(randomUUID(), serverId); +}; + +/** + * Get node from header + */ +const getNodeByHeader = ( + headers?: Record, + serverId?: string +): ClusterNode | null => { + const config = getClusterConfig(); + const headerName = (config?.stickySession?.headerName || 'X-MCPHub-Node').toLowerCase(); + + if (headers) { + const nodeId = headers[headerName]; + if (nodeId) { + const nodeIdStr = Array.isArray(nodeId) ? nodeId[0] : nodeId; + const node = nodes.get(nodeIdStr); + if (node && node.status === 'active') { + return node; + } + } + } + + return getNodeByConsistentHash(randomUUID(), serverId); +}; + +/** + * Get session affinity info for a session + */ +export const getSessionAffinity = (sessionId: string): SessionAffinity | null => { + return sessionAffinities.get(sessionId) || null; +}; + +/** + * Remove session affinity + */ +export const removeSessionAffinity = (sessionId: string): void => { + sessionAffinities.delete(sessionId); +}; + +/** + * Shutdown cluster service + */ +export const shutdownClusterService = (): void => { + if (heartbeatIntervalId) { + clearInterval(heartbeatIntervalId); + heartbeatIntervalId = null; + } + + if (cleanupIntervalId) { + clearInterval(cleanupIntervalId); + cleanupIntervalId = null; + } + + console.log('Cluster service shut down'); +}; + +/** + * Get cluster statistics + */ +export const getClusterStats = () => { + return { + nodes: nodes.size, + activeNodes: getActiveNodes().length, + servers: serverReplicas.size, + sessions: sessionAffinities.size, + }; +}; diff --git a/src/types/index.ts b/src/types/index.ts index 44baf23..040242d 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -171,6 +171,7 @@ export interface SystemConfig { }; nameSeparator?: string; // Separator used between server name and tool/prompt name (default: '-') oauth?: OAuthProviderConfig; // OAuth provider configuration for upstream MCP servers + cluster?: ClusterConfig; // Cluster configuration for distributed deployment } export interface UserConfig { @@ -356,3 +357,63 @@ export interface AddServerRequest { name: string; // Name of the server to add config: ServerConfig; // Configuration details for the server } + +// Cluster-related types + +// Cluster node information +export interface ClusterNode { + id: string; // Unique identifier for the node (e.g., UUID) + name: string; // Human-readable name of the node + host: string; // Hostname or IP address + port: number; // Port number the node is running on + url: string; // Full URL to access the node (e.g., 'http://node1:3000') + status: 'active' | 'inactive' | 'unhealthy'; // Current status of the node + lastHeartbeat: number; // Timestamp of last heartbeat + servers: string[]; // List of MCP server names hosted on this node + metadata?: Record; // Additional metadata about the node +} + +// Cluster configuration +export interface ClusterConfig { + enabled: boolean; // Whether cluster mode is enabled + mode: 'standalone' | 'node' | 'coordinator'; // Cluster operating mode + node?: { + // Configuration when running as a cluster node + id?: string; // Node ID (generated if not provided) + name?: string; // Node name (defaults to hostname) + coordinatorUrl: string; // URL of the coordinator node + heartbeatInterval?: number; // Heartbeat interval in milliseconds (default: 5000) + registerOnStartup?: boolean; // Whether to register with coordinator on startup (default: true) + }; + coordinator?: { + // Configuration when running as coordinator + nodeTimeout?: number; // Time in ms before marking a node as unhealthy (default: 15000) + cleanupInterval?: number; // Interval for cleaning up inactive nodes (default: 30000) + stickySessionTimeout?: number; // Sticky session timeout in milliseconds (default: 3600000, 1 hour) + }; + stickySession?: { + enabled: boolean; // Whether sticky sessions are enabled (default: true for cluster mode) + strategy: 'consistent-hash' | 'cookie' | 'header'; // Strategy for session affinity (default: consistent-hash) + cookieName?: string; // Cookie name for cookie-based sticky sessions (default: 'MCPHUB_NODE') + headerName?: string; // Header name for header-based sticky sessions (default: 'X-MCPHub-Node') + }; +} + +// Cluster server replica configuration +export interface ServerReplica { + serverId: string; // MCP server identifier + nodeId: string; // Node hosting this replica + nodeUrl: string; // URL to access this replica + status: 'active' | 'inactive'; // Status of this replica + weight?: number; // Load balancing weight (default: 1) +} + +// Session affinity information +export interface SessionAffinity { + sessionId: string; // Session identifier + nodeId: string; // Node ID for this session + serverId?: string; // Optional: specific server this session is bound to + createdAt: number; // Timestamp when session was created + lastAccessed: number; // Timestamp of last access + expiresAt: number; // Timestamp when session expires +} diff --git a/tests/services/clusterService.test.ts b/tests/services/clusterService.test.ts new file mode 100644 index 0000000..65a764f --- /dev/null +++ b/tests/services/clusterService.test.ts @@ -0,0 +1,335 @@ +/** + * Cluster Service Tests + */ + +import { + isClusterEnabled, + getClusterMode, + getCurrentNodeId, + registerNode, + updateNodeHeartbeat, + getActiveNodes, + getAllNodes, + getServerReplicas, + getNodeForSession, + getSessionAffinity, + removeSessionAffinity, + getClusterStats, + shutdownClusterService, +} from '../../src/services/clusterService'; +import { ClusterNode } from '../../src/types/index'; + +// Mock the config module +jest.mock('../../src/config/index.js', () => ({ + loadSettings: jest.fn(), +})); + +const { loadSettings } = require('../../src/config/index.js'); + +describe('Cluster Service', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + afterEach(() => { + // Clean up cluster service to reset state + shutdownClusterService(); + }); + + describe('Configuration', () => { + it('should return false when cluster is not enabled', () => { + loadSettings.mockReturnValue({ + mcpServers: {}, + }); + + expect(isClusterEnabled()).toBe(false); + }); + + it('should return true when cluster is enabled', () => { + loadSettings.mockReturnValue({ + mcpServers: {}, + systemConfig: { + cluster: { + enabled: true, + mode: 'coordinator', + }, + }, + }); + + expect(isClusterEnabled()).toBe(true); + }); + + it('should return standalone mode when cluster is not configured', () => { + loadSettings.mockReturnValue({ + mcpServers: {}, + }); + + expect(getClusterMode()).toBe('standalone'); + }); + + it('should return configured mode when cluster is enabled', () => { + loadSettings.mockReturnValue({ + mcpServers: {}, + systemConfig: { + cluster: { + enabled: true, + mode: 'coordinator', + }, + }, + }); + + expect(getClusterMode()).toBe('coordinator'); + }); + }); + + describe('Node Management', () => { + beforeEach(() => { + loadSettings.mockReturnValue({ + mcpServers: {}, + systemConfig: { + cluster: { + enabled: true, + mode: 'coordinator', + }, + }, + }); + }); + + it('should register a new node', () => { + const node: ClusterNode = { + id: 'node-test-1', + name: 'Test Node 1', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['server1', 'server2'], + }; + + registerNode(node); + const nodes = getAllNodes(); + + // Find our node (there might be others from previous tests) + const registeredNode = nodes.find(n => n.id === 'node-test-1'); + expect(registeredNode).toBeTruthy(); + expect(registeredNode?.name).toBe('Test Node 1'); + expect(registeredNode?.servers).toEqual(['server1', 'server2']); + }); + + it('should update node heartbeat', () => { + const node: ClusterNode = { + id: 'node-test-2', + name: 'Test Node 2', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now() - 10000, + servers: ['server1'], + }; + + registerNode(node); + const beforeHeartbeat = getAllNodes().find(n => n.id === 'node-test-2')?.lastHeartbeat || 0; + + // Wait a bit to ensure timestamp changes + setTimeout(() => { + updateNodeHeartbeat('node-test-2', ['server1', 'server2']); + const updatedNode = getAllNodes().find(n => n.id === 'node-test-2'); + const afterHeartbeat = updatedNode?.lastHeartbeat || 0; + + expect(afterHeartbeat).toBeGreaterThan(beforeHeartbeat); + expect(updatedNode?.servers).toEqual(['server1', 'server2']); + }, 10); + }); + + it('should get active nodes only', () => { + const node1: ClusterNode = { + id: 'node-active-1', + name: 'Active Node', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['server1'], + }; + + registerNode(node1); + + const activeNodes = getActiveNodes(); + const activeNode = activeNodes.find(n => n.id === 'node-active-1'); + expect(activeNode).toBeTruthy(); + expect(activeNode?.status).toBe('active'); + }); + }); + + describe('Server Replicas', () => { + beforeEach(() => { + loadSettings.mockReturnValue({ + mcpServers: {}, + systemConfig: { + cluster: { + enabled: true, + mode: 'coordinator', + }, + }, + }); + }); + + it('should track server replicas across nodes', () => { + const node1: ClusterNode = { + id: 'node-replica-1', + name: 'Node 1', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['test-server-1', 'test-server-2'], + }; + + const node2: ClusterNode = { + id: 'node-replica-2', + name: 'Node 2', + host: 'localhost', + port: 3002, + url: 'http://localhost:3002', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['test-server-1', 'test-server-3'], + }; + + registerNode(node1); + registerNode(node2); + + const server1Replicas = getServerReplicas('test-server-1'); + expect(server1Replicas.length).toBeGreaterThanOrEqual(2); + expect(server1Replicas.map(r => r.nodeId)).toContain('node-replica-1'); + expect(server1Replicas.map(r => r.nodeId)).toContain('node-replica-2'); + }); + }); + + describe('Session Affinity', () => { + beforeEach(() => { + loadSettings.mockReturnValue({ + mcpServers: {}, + systemConfig: { + cluster: { + enabled: true, + mode: 'coordinator', + stickySession: { + enabled: true, + strategy: 'consistent-hash', + }, + }, + }, + }); + }); + + it('should maintain session affinity with consistent hash', () => { + const node1: ClusterNode = { + id: 'node-affinity-1', + name: 'Node 1', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['server1'], + }; + + registerNode(node1); + + const sessionId = 'test-session-consistent-hash'; + const firstNode = getNodeForSession(sessionId); + const secondNode = getNodeForSession(sessionId); + + expect(firstNode).toBeTruthy(); + expect(secondNode).toBeTruthy(); + expect(firstNode?.id).toBe(secondNode?.id); + }); + + it('should create and retrieve session affinity', () => { + const node1: ClusterNode = { + id: 'node-affinity-2', + name: 'Node 1', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['server1'], + }; + + registerNode(node1); + + const sessionId = 'test-session-retrieve'; + const selectedNode = getNodeForSession(sessionId); + + const affinity = getSessionAffinity(sessionId); + expect(affinity).toBeTruthy(); + expect(affinity?.sessionId).toBe(sessionId); + expect(affinity?.nodeId).toBe(selectedNode?.id); + }); + + it('should remove session affinity', () => { + const node1: ClusterNode = { + id: 'node-affinity-3', + name: 'Node 1', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['server1'], + }; + + registerNode(node1); + + const sessionId = 'test-session-remove'; + getNodeForSession(sessionId); + + let affinity = getSessionAffinity(sessionId); + expect(affinity).toBeTruthy(); + + removeSessionAffinity(sessionId); + affinity = getSessionAffinity(sessionId); + expect(affinity).toBeNull(); + }); + }); + + describe('Cluster Statistics', () => { + beforeEach(() => { + loadSettings.mockReturnValue({ + mcpServers: {}, + systemConfig: { + cluster: { + enabled: true, + mode: 'coordinator', + }, + }, + }); + }); + + it('should return cluster statistics', () => { + const node1: ClusterNode = { + id: 'node-stats-1', + name: 'Node 1', + host: 'localhost', + port: 3001, + url: 'http://localhost:3001', + status: 'active', + lastHeartbeat: Date.now(), + servers: ['unique-server-1', 'unique-server-2'], + }; + + registerNode(node1); + + const stats = getClusterStats(); + expect(stats.nodes).toBeGreaterThanOrEqual(1); + expect(stats.activeNodes).toBeGreaterThanOrEqual(1); + expect(stats.servers).toBeGreaterThanOrEqual(2); + }); + }); +});