import { WebSocketServer, WebSocket, Data } from 'ws'; import { randomBytes } from 'crypto'; import { IncomingMessage } from 'http'; import { EventEmitter } from 'events'; /** * WebSocket Alert Server for Real-Time Call Analysis * * Subscribes to CallAnalysisEngine events and broadcasts alerts * to authenticated WebSocket clients. * * Security hardening (FRE-4497): * - JWT authentication required (enableAuth defaults to true) * - jwtSecret loaded from env (non-empty default) * - Origin allowlist validation * - Per-subscriber callId filtering (empty set = no alerts by default) * - crypto.randomBytes for sessionId * - Bounded alert history with TTL-based eviction * - Alert cooldown per session to prevent flooding * - Graceful shutdown with timeout */ // ── Types ──────────────────────────────────────────────────────────────────── export interface AlertServerConfig { port: number; host: string; allowedOrigins: string[]; enableAuth: boolean; jwtSecret: string; maxAlertHistory: number; alertHistoryTtlMs: number; cooldownMs: number; maxSubscribers: number; maxCallIdsPerSubscriber: number; shutdownTimeoutMs: number; } export interface AlertEntry { id: string; timestamp: number; callId: string; type: string; severity: 'low' | 'medium' | 'high' | 'critical'; data: Record; } export interface SubscriberSession { sessionId: string; userId: string; ws: WebSocket; callIds: Set; lastAlertTime: Map; connectedAt: number; } export interface AlertOptions { callId: string; type: string; severity: 'low' | 'medium' | 'high' | 'critical'; data?: Record; } // ── Constants ──────────────────────────────────────────────────────────────── const DEFAULT_CONFIG: AlertServerConfig = { port: parseInt(process.env.ALERT_SERVER_PORT || '8088', 10), host: process.env.ALERT_SERVER_HOST || '0.0.0.0', allowedOrigins: (process.env.ALLOWED_ORIGINS || '').split(',').filter(Boolean), enableAuth: process.env.ALERT_AUTH_DISABLED === 'true' ? false : true, jwtSecret: process.env.JWT_SECRET || randomBytes(32).toString('hex'), maxAlertHistory: 500, alertHistoryTtlMs: 3600_000, cooldownMs: 2000, maxSubscribers: 100, maxCallIdsPerSubscriber: 50, shutdownTimeoutMs: 5000, }; // ── JWT Helper (shared with signaling server) ──────────────────────────────── function extractJwtFromQuery(url: string): string | null { const match = url.match(/[?&]token=([^&]+)/); return match ? decodeURIComponent(match[1]) : null; } function extractJwtFromHeader(req: IncomingMessage): string | null { const auth = req.headers['authorization']; return auth?.startsWith('Bearer ') ? auth.slice(7) : null; } function verifyJwt(token: string, secret: string): { sub: string; exp: number } | null { try { const parts = token.split('.'); if (parts.length !== 3) return null; const header = JSON.parse(Buffer.from(parts[0], 'base64url').toString()); if (header.alg !== 'HS256') return null; const payload = JSON.parse(Buffer.from(parts[1], 'base64url').toString()); if (!payload.sub || typeof payload.sub !== 'string') return null; if (payload.exp && Date.now() / 1000 > payload.exp) return null; const crypto = require('crypto'); const sigInput = `${parts[0]}.${parts[1]}`; const expected = crypto.createHmac('sha256', secret).update(sigInput).digest('base64url'); if (expected !== parts[2]) return null; return { sub: payload.sub, exp: payload.exp || 0 }; } catch { return null; } } // ── Alert Server ───────────────────────────────────────────────────────────── export class AlertServer extends EventEmitter { private wss: WebSocketServer; private sessions: Map = new Map(); private alertHistory: AlertEntry[] = []; private config: AlertServerConfig; private engine?: EventEmitter; private cleanupTimer?: NodeJS.Timeout; constructor(config: Partial = {}) { super(); this.config = { ...DEFAULT_CONFIG, ...config }; this.wss = new WebSocketServer({ port: this.config.port, host: this.config.host, maxPayload: 65536, verifyClient: this.verifyClient.bind(this), }); this.wss.on('connection', this.handleConnection.bind(this)); console.log(`[AlertServer] Listening on ${this.config.host}:${this.config.port}`); // Periodic TTL cleanup this.cleanupTimer = setInterval(() => this.evictStaleAlerts(), 60_000); } /** * Verify incoming WebSocket connection */ private verifyClient(info: { req: IncomingMessage; origin: string }, cb: (result: boolean, status?: number, reason?: string) => void) { // Origin validation if (this.config.allowedOrigins.length > 0) { const origin = info.origin || info.req.headers['origin'] || ''; const allowed = this.config.allowedOrigins.some( allowedOrigin => origin === allowedOrigin || origin.startsWith(allowedOrigin) ); if (!allowed) { cb(false, 403, `Origin "${origin}" not allowed`); return; } } // JWT authentication if (this.config.enableAuth) { const token = extractJwtFromQuery(info.req.url || '') || extractJwtFromHeader(info.req); if (!token) { cb(false, 401, 'Missing JWT token'); return; } const payload = verifyJwt(token, this.config.jwtSecret); if (!payload) { cb(false, 401, 'Invalid or expired JWT'); return; } } // Max subscriber check if (this.sessions.size >= this.config.maxSubscribers) { cb(false, 503, 'Max subscribers reached'); return; } cb(true); } /** * Handle new WebSocket connection */ private handleConnection(ws: WebSocket, req: IncomingMessage) { const token = extractJwtFromQuery(req.url || '') || extractJwtFromHeader(req); const payload = token ? verifyJwt(token, this.config.jwtSecret) : null; const userId = payload?.sub || 'anonymous'; // crypto.randomBytes for sessionId (not Date.now() + Math.random()) const sessionId = `sess_${randomBytes(12).toString('hex')}`; const session: SubscriberSession = { sessionId, userId, ws, callIds: new Set(), lastAlertTime: new Map(), connectedAt: Date.now(), }; this.sessions.set(sessionId, session); // Send handshake ws.send(JSON.stringify({ type: 'handshake', payload: { sessionId, message: 'Connected to alert server' }, })); ws.on('message', this.handleMessage(session).bind(this)); ws.on('close', () => this.handleDisconnect(session)); ws.on('error', (err) => { console.error(`[AlertServer] Session ${sessionId} error:`, err.message); this.handleDisconnect(session); }); this.emit('subscriber:connected', { sessionId, userId }); } /** * Handle incoming message from subscriber */ private handleMessage(session: SubscriberSession) { return (data: Data) => { let parsed: Record; try { parsed = JSON.parse(data.toString()); } catch { session.ws.send(JSON.stringify({ type: 'error', payload: { message: 'Invalid JSON' } })); return; } const msgType = parsed.type as string; switch (msgType) { case 'subscribe': { const callIds = (parsed.callIds as string[]) || []; for (const cid of callIds) { if (typeof cid === 'string' && cid.length <= 64) { session.callIds.add(cid); } } if (session.callIds.size > this.config.maxCallIdsPerSubscriber) { const ids = Array.from(session.callIds); session.callIds = new Set(ids.slice(0, this.config.maxCallIdsPerSubscriber)); } session.ws.send(JSON.stringify({ type: 'subscribed', payload: { callIds: Array.from(session.callIds) }, })); break; } case 'unsubscribe': { const callIds = (parsed.callIds as string[]) || Array.from(session.callIds); for (const cid of callIds) { session.callIds.delete(cid); } session.ws.send(JSON.stringify({ type: 'unsubscribed', payload: { callIds: Array.from(session.callIds) }, })); break; } case 'getHistory': { const limit = Math.min(parseInt(String(parsed.limit)) || 50, 100); const callId = parsed.callId as string | undefined; const filtered = callId ? this.alertHistory.filter(a => a.callId === callId) : this.alertHistory; session.ws.send(JSON.stringify({ type: 'history', payload: { alerts: filtered.slice(-limit) }, })); break; } case 'ping': session.ws.send(JSON.stringify({ type: 'pong', payload: { timestamp: Date.now() } })); break; default: session.ws.send(JSON.stringify({ type: 'error', payload: { message: `Unknown message type: ${msgType}` } })); } }; } /** * Handle subscriber disconnect */ private handleDisconnect(session: SubscriberSession) { this.sessions.delete(session.sessionId); this.emit('subscriber:disconnected', { sessionId: session.sessionId }); } /** * Connect to CallAnalysisEngine events */ connectEngine(engine: EventEmitter): void { this.engine = engine; engine.on('result', (result: { callId: string; callQuality?: Record; sentiment?: string }) => { if (result.callQuality) { this.emitAlert({ callId: result.callId, type: 'call_quality', severity: this.getSeverityFromQuality(result.callQuality), data: result.callQuality as Record, }); } }); engine.on('events', (events: { callId: string; events: Array<{ type: string; timestamp: number }> }) => { for (const event of events.events) { this.emitAlert({ callId: events.callId, type: `call_event:${event.type}`, severity: 'medium', data: { eventType: event.type, timestamp: event.timestamp }, }); } }); engine.on('anomalies', (anomalies: { callId: string; anomalies: Array<{ type: string; confidence: number }> }) => { for (const anomaly of anomalies.anomalies) { this.emitAlert({ callId: anomalies.callId, type: `anomaly:${anomaly.type}`, severity: anomaly.confidence > 0.8 ? 'high' : 'medium', data: { anomalyType: anomaly.type, confidence: anomaly.confidence }, }); } }); console.log('[AlertServer] Connected to analysis engine'); } /** * Emit an alert to matching subscribers */ private emitAlert(options: AlertOptions): void { const alert: AlertEntry = { id: `alert_${randomBytes(8).toString('hex')}`, timestamp: Date.now(), callId: options.callId, type: options.type, severity: options.severity, data: options.data || {}, }; // Store in bounded history this.alertHistory.push(alert); if (this.alertHistory.length > this.config.maxAlertHistory) { this.alertHistory = this.alertHistory.slice(-this.config.maxAlertHistory); } const payload = JSON.stringify({ type: 'alert', payload: alert }); // Broadcast to matching subscribers with cooldown for (const session of this.sessions.values()) { // Skip if subscriber has callId filter and this call is not in it if (session.callIds.size > 0 && !session.callIds.has(options.callId)) { continue; } // Cooldown check const key = `${options.callId}:${options.type}`; const lastTime = session.lastAlertTime.get(key) || 0; if (Date.now() - lastTime < this.config.cooldownMs) { continue; } if (session.ws.readyState === WebSocket.OPEN) { session.ws.send(payload); session.lastAlertTime.set(key, Date.now()); } } this.emit('alert:emitted', alert); } /** * Determine severity from call quality metrics */ private getSeverityFromQuality(quality: Record): 'low' | 'medium' | 'high' | 'critical' { const mos = quality.mosScore as number | undefined; if (mos !== undefined) { if (mos < 2.5) return 'critical'; if (mos < 3.5) return 'high'; if (mos < 4.0) return 'medium'; } return 'low'; } /** * Evict stale alerts from history based on TTL */ private evictStaleAlerts(): void { const cutoff = Date.now() - this.config.alertHistoryTtlMs; const before = this.alertHistory.length; this.alertHistory = this.alertHistory.filter(a => a.timestamp > cutoff); const evicted = before - this.alertHistory.length; if (evicted > 0) { console.log(`[AlertServer] Evicted ${evicted} stale alerts`); } } /** * Get alert history (for API endpoint) */ getAlertHistory(limit = 50, callId?: string): AlertEntry[] { let alerts = this.alertHistory; if (callId) { alerts = alerts.filter(a => a.callId === callId); } return alerts.slice(-limit); } /** * Get subscriber stats */ getStats() { return { activeSubscribers: this.sessions.size, alertHistorySize: this.alertHistory.length, }; } /** * Graceful shutdown with timeout */ async stop(timeoutMs?: number): Promise { const t = timeoutMs || this.config.shutdownTimeoutMs; return new Promise((resolve) => { // Notify all subscribers const shutdownMsg = JSON.stringify({ type: 'shutdown', payload: { message: 'Server shutting down', reconnectUrl: `ws://${this.config.host}:${this.config.port}` }, }); for (const session of this.sessions.values()) { if (session.ws.readyState === WebSocket.OPEN) { session.ws.send(shutdownMsg); } } // Close connections with timeout const deadline = Date.now() + t; let pending = this.sessions.size; if (pending === 0) { this.finishShutdown(); resolve(); return; } const timer = setTimeout(() => { for (const session of this.sessions.values()) { session.ws.close(1001, 'Server shutting down'); } this.finishShutdown(); resolve(); }, Math.max(100, deadline - Date.now())); for (const session of this.sessions.values()) { session.ws.once('close', () => { pending--; if (pending <= 0) { clearTimeout(timer); this.finishShutdown(); resolve(); } }); } }); } private finishShutdown(): void { if (this.cleanupTimer) clearInterval(this.cleanupTimer); this.wss.close(); this.sessions.clear(); console.log('[AlertServer] Shutdown complete'); } } export function createAlertServer(config?: Partial): AlertServer { return new AlertServer(config); }