/** * WebSocket Alert Server * Real-time alert broadcasting for call analysis events and anomalies * Connects to CallAnalysisEngine and pushes alerts to subscribed clients */ import { WebSocketServer, WebSocket } from 'ws'; import { CallAnalysisEngine, CallEvent, Anomaly, SentimentAnalysis, AnalysisResult } from '../../src/lib/inference/call-analysis-engine'; import { jwtVerify, SignJWT } from 'jose'; export type AlertType = | 'anomaly' | 'call_event' | 'quality_degraded' | 'sentiment_shift' | 'call_summary' | 'connection' | 'disconnection'; export type AlertSeverity = 'info' | 'low' | 'medium' | 'high' | 'critical'; export interface AlertPayload { id: string; type: AlertType; severity: AlertSeverity; timestamp: number; callId?: string; title: string; message: string; data: Record; actionable: boolean; } export interface AlertServerConfig { port?: number; enableAuth?: boolean; jwtSecret?: string; allowedOrigins?: string[]; alertCooldownMs?: number; maxSubscribers?: number; enableCallCorrelation?: boolean; } export interface SubscriberSession { ws: WebSocket; userId?: string; callIds: Set; lastAlertTime: Map; subscribedAt: number; } const DEFAULT_CONFIG: Required = { port: 8088, enableAuth: true, jwtSecret: process.env.ALERT_SERVER_JWT_SECRET || '', allowedOrigins: ['http://localhost:3000'], alertCooldownMs: 5000, maxSubscribers: 100, enableCallCorrelation: true, }; /** * JWT verification helper */ async function verifyJWT(token: string, secret: string): Promise { try { const decoded = await jwtVerify(token, new TextEncoder().encode(secret), { algorithms: ['HS256'], }); return decoded; } catch (error) { console.error('[AlertServer] JWT verification failed:', (error as Error).message); return null; } } export class AlertServer { private wss: WebSocketServer | null = null; private config: Required; private subscribers: Map = new Map(); private analysisEngines: Map = new Map(); private alertHistory: AlertPayload[] = []; private maxAlertHistory: number = 500; private isRunning: boolean = false; constructor(config: AlertServerConfig = {}) { this.config = { ...DEFAULT_CONFIG, ...config }; } async start(): Promise { this.wss = new WebSocketServer({ port: this.config.port, maxPayload: 1024 * 1024, }); this.wss.on('connection', (ws: WebSocket, req) => { this.handleConnection(ws, req); }); this.wss.on('error', (error: Error) => { console.error(`[AlertServer] WebSocket error: ${error.message}`); }); this.isRunning = true; console.log(`[AlertServer] Listening on port ${this.config.port}`); } private handleConnection(ws: WebSocket, req: import('http').IncomingMessage): void { const url = new URL(req.url || '', `http://${req.headers.host}`); const sessionId = url.searchParams.get('sessionId') || `sub-${Date.now()}-${Math.random().toString(36).slice(2)}`; let userId = url.searchParams.get('userId') || undefined; const callId = url.searchParams.get('callId') || undefined; // Origin validation const origin = req.headers.origin; if (origin && !this.config.allowedOrigins.includes(origin)) { ws.close(1008, 'Origin not allowed'); return; } // JWT Authentication (if enabled) if (this.config.enableAuth && this.config.jwtSecret) { const authHeader = req.headers.authorization; if (!authHeader || !authHeader.startsWith('Bearer ')) { ws.close(4001, 'Missing or invalid JWT token'); return; } const token = authHeader.substring(7); const decoded = verifyJWT(token, this.config.jwtSecret); if (!decoded) { ws.close(4002, 'Invalid or expired JWT token'); return; } // Extract user ID from token if present userId = (decoded as any).sub || userId; } if (this.subscribers.size >= this.config.maxSubscribers) { ws.close(1013, 'Too many subscribers'); return; } const session: SubscriberSession = { ws, userId, callIds: callId ? new Set([callId]) : new Set(), lastAlertTime: new Map(), subscribedAt: Date.now(), }; this.subscribers.set(sessionId, session); ws.send(JSON.stringify({ type: 'connected', payload: { sessionId, userId, timestamp: Date.now() }, })); ws.on('message', (data: Buffer | ArrayBuffer) => { this.handleMessage(sessionId, data); }); ws.on('close', () => { this.subscribers.delete(sessionId); console.log(`[AlertServer] Subscriber disconnected: ${sessionId}`); }); ws.on('error', (error: Error) => { console.error(`[AlertServer] Subscriber error (${sessionId}): ${error.message}`); }); console.log(`[AlertServer] Subscriber connected: ${sessionId}${callId ? ` (call: ${callId})` : ''}`); } private handleMessage(sessionId: string, data: Buffer | ArrayBuffer): void { try { const message = JSON.parse(data.toString()); const session = this.subscribers.get(sessionId); if (!session) return; switch (message.type) { case 'subscribe': if (message.callId) { session.callIds.add(message.callId); } break; case 'unsubscribe': if (message.callId) { session.callIds.delete(message.callId); } break; case 'ping': session.ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() })); break; } } catch (error) { console.error(`[AlertServer] Message parse error: ${(error as Error).message}`); } } bindAnalysisEngine(callId: string, engine: CallAnalysisEngine): void { this.analysisEngines.set(callId, engine); engine.on('result', (result: AnalysisResult) => { this.processAnalysisResult(callId, result); }); engine.on('events', (events: CallEvent[]) => { events.forEach(event => { this.sendAlert({ type: 'call_event', severity: event.severity as AlertSeverity, callId, title: this.formatEventType(event.type), message: this.formatEventMessage(event), data: { event, timestamp: event.timestamp }, actionable: event.severity === 'high', }); }); }); engine.on('anomalies', (anomalies: Anomaly[]) => { anomalies.forEach(anomaly => { this.sendAlert({ type: 'anomaly', severity: anomaly.severity as AlertSeverity, callId, title: this.formatAnomalyType(anomaly.type), message: anomaly.description, data: { anomaly, confidence: anomaly.confidence, recommendation: anomaly.recommendation, }, actionable: anomaly.severity === 'high' || anomaly.severity === 'critical', }); }); }); console.log(`[AlertServer] Bound analysis engine for call: ${callId}`); } private processAnalysisResult(callId: string, result: AnalysisResult): void { if (result.callQuality.mosScore < 3.0) { this.sendAlert({ type: 'quality_degraded', severity: result.callQuality.mosScore < 2.5 ? 'high' : 'medium', callId, title: 'Call Quality Degraded', message: `MOS score: ${result.callQuality.mosScore.toFixed(1)} (threshold: 3.0)`, data: result.callQuality as unknown as Record, actionable: true, }); } if (result.sentiment.sentiment === 'negative' && result.sentiment.confidence > 0.7) { this.sendAlert({ type: 'sentiment_shift', severity: 'medium', callId, title: 'Negative Sentiment Detected', message: `Confidence: ${(result.sentiment.confidence * 100).toFixed(0)}%`, data: result.sentiment as unknown as Record, actionable: false, }); } } sendAlert(options: { type: AlertType; severity: AlertSeverity; callId?: string; title: string; message: string; data: Record; actionable: boolean; }): void { const cooldownKey = `${options.callId}:${options.type}`; const now = Date.now(); const sessionKeys = Array.from(this.subscribers.keys()); for (const key of sessionKeys) { const session = this.subscribers.get(key); if (!session) continue; const lastTime = session.lastAlertTime.get(cooldownKey) || 0; if (now - lastTime < this.config.alertCooldownMs) continue; if (options.callId && session.callIds.size > 0 && !session.callIds.has(options.callId)) continue; const alert: AlertPayload = { id: `alert-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, type: options.type, severity: options.severity, timestamp: now, callId: options.callId, title: options.title, message: options.message, data: options.data, actionable: options.actionable, }; this.alertHistory.push(alert); if (this.alertHistory.length > this.maxAlertHistory) { this.alertHistory.shift(); } if (session.ws.readyState === WebSocket.OPEN) { session.ws.send(JSON.stringify(alert)); } session.lastAlertTime.set(cooldownKey, now); } } broadcastCallSummary(callId: string, summary: string): void { this.sendAlert({ type: 'call_summary', severity: 'info', callId, title: 'Call Analysis Summary', message: summary, data: { summary }, actionable: false, }); } getAlertHistory(limit: number = 50, callId?: string): AlertPayload[] { let history = this.alertHistory; if (callId) { history = history.filter(a => a.callId === callId); } return history.slice(-limit); } getSubscriberCount(): number { return this.subscribers.size; } getActiveCalls(): string[] { return Array.from(this.analysisEngines.keys()); } getEngine(callId: string): CallAnalysisEngine | undefined { return this.analysisEngines.get(callId); } async stop(): Promise { this.isRunning = false; this.subscribers.forEach((session) => { if (session.ws.readyState === WebSocket.OPEN) { session.ws.send(JSON.stringify({ type: 'server_shutdown', payload: { timestamp: Date.now() }, })); session.ws.close(1001, 'Server shutting down'); } }); this.analysisEngines.forEach((engine) => { engine.destroy(); }); if (this.wss) { await new Promise((resolve) => { this.wss!.close(() => resolve()); }); this.wss = null; } console.log('[AlertServer] Stopped'); } private formatEventType(type: string): string { const labels: Record = { interrupt: 'Speaker Interrupt', overlap: 'Speech Overlap', long_pause: 'Long Pause', volume_spike: 'Volume Spike', silence: 'Silence Detected', speaker_change: 'Speaker Change', }; return labels[type] || type; } private formatEventMessage(event: CallEvent): string { const messages: Record = { interrupt: `Interrupt detected (${event.duration}ms)`, overlap: `Speech overlap detected (${event.duration}ms)`, long_pause: `Pause duration: ${(event.duration / 1000).toFixed(1)}s`, volume_spike: `Volume spike: ${(event.metadata.level as number)?.toFixed(2) || 'unknown'}`, silence: `Silence detected for ${(event.duration * 1000).toFixed(0)}ms`, speaker_change: 'Speaker change detected', }; return messages[event.type] || 'Event detected'; } private formatAnomalyType(type: string): string { const labels: Record = { background_noise: 'Background Noise', echo: 'Echo Detected', distortion: 'Audio Distortion', dropouts: 'Audio Dropout', excessive_silence: 'Excessive Silence', volume_inconsistency: 'Volume Inconsistency', }; return labels[type] || type; } } export default AlertServer;