FRE-4529: Transfer ShieldAI code from FrenoCorp repo
Transferred ShieldAI-related files mistakenly placed in ~/code/FrenoCorp:
- Services: spamshield (feature-flags, audit-logger, error-handler), voiceprint (config, service, feature-flags), darkwatch (pipeline, scan, scheduler, watchlist, webhook)
- Packages: shared-analytics, shared-auth, shared-ui, shared-utils (new); shared-billing, jobs supplemented with unique FC files
- Server: alerts (FC version newer), routes (spamshield, darkwatch, voiceprint)
- Config: turbo.json, tsconfig.base.json, vite/vitest configs, drizzle, Dockerfile
- VoicePrint ML service
- Examples
Pending: apps/{api,web,mobile}/ structured merge, shared-db/db mapping
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -1,473 +1,415 @@
|
||||
import { WebSocketServer, WebSocket, Data } from 'ws';
|
||||
import { randomBytes } from 'crypto';
|
||||
import { IncomingMessage } from 'http';
|
||||
import { EventEmitter } from 'events';
|
||||
import jwt from 'jsonwebtoken';
|
||||
|
||||
/**
|
||||
* WebSocket Alert Server for Real-Time Call Analysis
|
||||
*
|
||||
* Subscribes to CallAnalysisEngine events and broadcasts alerts
|
||||
* to authenticated WebSocket clients.
|
||||
*
|
||||
* Security hardening (FRE-4497):
|
||||
* - JWT authentication required (enableAuth defaults to true)
|
||||
* - jwtSecret loaded from env (non-empty default)
|
||||
* - Origin allowlist validation
|
||||
* - Per-subscriber callId filtering (empty set = no alerts by default)
|
||||
* - crypto.randomBytes for sessionId
|
||||
* - Bounded alert history with TTL-based eviction
|
||||
* - Alert cooldown per session to prevent flooding
|
||||
* - Graceful shutdown with timeout
|
||||
* WebSocket Alert Server
|
||||
* Real-time alert broadcasting for call analysis events and anomalies
|
||||
* Connects to CallAnalysisEngine and pushes alerts to subscribed clients
|
||||
*/
|
||||
|
||||
// ── Types ────────────────────────────────────────────────────────────────────
|
||||
import { WebSocketServer, WebSocket } from 'ws';
|
||||
import { CallAnalysisEngine, CallEvent, Anomaly, SentimentAnalysis, AnalysisResult } from '../../src/lib/inference/call-analysis-engine';
|
||||
import { jwtVerify, SignJWT } from 'jose';
|
||||
|
||||
export interface AlertServerConfig {
|
||||
port: number;
|
||||
host: string;
|
||||
allowedOrigins: string[];
|
||||
enableAuth: boolean;
|
||||
jwtSecret: string;
|
||||
maxAlertHistory: number;
|
||||
alertHistoryTtlMs: number;
|
||||
cooldownMs: number;
|
||||
maxSubscribers: number;
|
||||
maxCallIdsPerSubscriber: number;
|
||||
shutdownTimeoutMs: number;
|
||||
export type AlertType =
|
||||
| 'anomaly'
|
||||
| 'call_event'
|
||||
| 'quality_degraded'
|
||||
| 'sentiment_shift'
|
||||
| 'call_summary'
|
||||
| 'connection'
|
||||
| 'disconnection';
|
||||
|
||||
export type AlertSeverity = 'info' | 'low' | 'medium' | 'high' | 'critical';
|
||||
|
||||
export interface AlertPayload {
|
||||
id: string;
|
||||
type: AlertType;
|
||||
severity: AlertSeverity;
|
||||
timestamp: number;
|
||||
callId?: string;
|
||||
title: string;
|
||||
message: string;
|
||||
data: Record<string, unknown>;
|
||||
actionable: boolean;
|
||||
}
|
||||
|
||||
export interface AlertEntry {
|
||||
id: string;
|
||||
timestamp: number;
|
||||
callId: string;
|
||||
type: string;
|
||||
severity: 'low' | 'medium' | 'high' | 'critical';
|
||||
data: Record<string, unknown>;
|
||||
export interface AlertServerConfig {
|
||||
port?: number;
|
||||
enableAuth?: boolean;
|
||||
jwtSecret?: string;
|
||||
allowedOrigins?: string[];
|
||||
alertCooldownMs?: number;
|
||||
maxSubscribers?: number;
|
||||
enableCallCorrelation?: boolean;
|
||||
}
|
||||
|
||||
export interface SubscriberSession {
|
||||
sessionId: string;
|
||||
userId: string;
|
||||
ws: WebSocket;
|
||||
userId?: string;
|
||||
callIds: Set<string>;
|
||||
lastAlertTime: Map<string, number>;
|
||||
connectedAt: number;
|
||||
subscribedAt: number;
|
||||
}
|
||||
|
||||
export interface AlertOptions {
|
||||
callId: string;
|
||||
type: string;
|
||||
severity: 'low' | 'medium' | 'high' | 'critical';
|
||||
data?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
// ── Constants ────────────────────────────────────────────────────────────────
|
||||
|
||||
const DEFAULT_CONFIG: AlertServerConfig = {
|
||||
port: parseInt(process.env.ALERT_SERVER_PORT || '8088', 10),
|
||||
host: process.env.ALERT_SERVER_HOST || '0.0.0.0',
|
||||
allowedOrigins: (process.env.ALLOWED_ORIGINS || '').split(',').filter(Boolean),
|
||||
enableAuth: process.env.ALERT_AUTH_DISABLED === 'true' ? false : true,
|
||||
jwtSecret: process.env.JWT_SECRET || randomBytes(32).toString('hex'),
|
||||
maxAlertHistory: 500,
|
||||
alertHistoryTtlMs: 3600_000,
|
||||
cooldownMs: 2000,
|
||||
const DEFAULT_CONFIG: Required<AlertServerConfig> = {
|
||||
port: 8088,
|
||||
enableAuth: true,
|
||||
jwtSecret: process.env.ALERT_SERVER_JWT_SECRET || '',
|
||||
allowedOrigins: ['http://localhost:3000'],
|
||||
alertCooldownMs: 5000,
|
||||
maxSubscribers: 100,
|
||||
maxCallIdsPerSubscriber: 50,
|
||||
shutdownTimeoutMs: 5000,
|
||||
enableCallCorrelation: true,
|
||||
};
|
||||
|
||||
// ── JWT Helper ───────────────────────────────────────────────────────────────
|
||||
|
||||
function extractJwt(req: IncomingMessage): string | null {
|
||||
const auth = req.headers['authorization'];
|
||||
if (auth?.startsWith('Bearer ')) return auth.slice(7);
|
||||
const match = req.url?.match(/[?&]token=([^&]+)/);
|
||||
return match ? decodeURIComponent(match[1]) : null;
|
||||
}
|
||||
|
||||
function verifyJwt(token: string, secret: string): { sub: string; exp?: number } | null {
|
||||
/**
|
||||
* JWT verification helper
|
||||
*/
|
||||
async function verifyJWT(token: string, secret: string): Promise<any | null> {
|
||||
try {
|
||||
const decoded = jwt.verify(token, secret, { algorithms: ['HS256'] });
|
||||
if (typeof decoded !== 'object' || !decoded.sub) return null;
|
||||
return {
|
||||
sub: String(decoded.sub),
|
||||
exp: decoded.exp ? Number(decoded.exp) : undefined,
|
||||
};
|
||||
} catch {
|
||||
const decoded = await jwtVerify(token, new TextEncoder().encode(secret), {
|
||||
algorithms: ['HS256'],
|
||||
});
|
||||
return decoded;
|
||||
} catch (error) {
|
||||
console.error('[AlertServer] JWT verification failed:', (error as Error).message);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Alert Server ─────────────────────────────────────────────────────────────
|
||||
export class AlertServer {
|
||||
private wss: WebSocketServer | null = null;
|
||||
private config: Required<AlertServerConfig>;
|
||||
private subscribers: Map<string, SubscriberSession> = new Map();
|
||||
private analysisEngines: Map<string, CallAnalysisEngine> = new Map();
|
||||
private alertHistory: AlertPayload[] = [];
|
||||
private maxAlertHistory: number = 500;
|
||||
private isRunning: boolean = false;
|
||||
|
||||
export class AlertServer extends EventEmitter {
|
||||
private wss: WebSocketServer;
|
||||
private sessions: Map<string, SubscriberSession> = new Map();
|
||||
private alertHistory: AlertEntry[] = [];
|
||||
private config: AlertServerConfig;
|
||||
private engine?: EventEmitter;
|
||||
private cleanupTimer?: NodeJS.Timeout;
|
||||
|
||||
constructor(config: Partial<AlertServerConfig> = {}) {
|
||||
super();
|
||||
constructor(config: AlertServerConfig = {}) {
|
||||
this.config = { ...DEFAULT_CONFIG, ...config };
|
||||
|
||||
this.wss = new WebSocketServer({
|
||||
port: this.config.port,
|
||||
host: this.config.host,
|
||||
maxPayload: 65536,
|
||||
verifyClient: this.verifyClient.bind(this),
|
||||
});
|
||||
|
||||
this.wss.on('connection', this.handleConnection.bind(this));
|
||||
console.log(`[AlertServer] Listening on ${this.config.host}:${this.config.port}`);
|
||||
|
||||
// Periodic TTL cleanup
|
||||
this.cleanupTimer = setInterval(() => this.evictStaleAlerts(), 60_000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify incoming WebSocket connection
|
||||
*/
|
||||
private verifyClient(info: { req: IncomingMessage; origin: string }, cb: (result: boolean, status?: number, reason?: string) => void) {
|
||||
async start(): Promise<void> {
|
||||
this.wss = new WebSocketServer({
|
||||
port: this.config.port,
|
||||
maxPayload: 1024 * 1024,
|
||||
});
|
||||
|
||||
this.wss.on('connection', (ws: WebSocket, req) => {
|
||||
this.handleConnection(ws, req);
|
||||
});
|
||||
|
||||
this.wss.on('error', (error: Error) => {
|
||||
console.error(`[AlertServer] WebSocket error: ${error.message}`);
|
||||
});
|
||||
|
||||
this.isRunning = true;
|
||||
console.log(`[AlertServer] Listening on port ${this.config.port}`);
|
||||
}
|
||||
|
||||
private handleConnection(ws: WebSocket, req: import('http').IncomingMessage): void {
|
||||
const url = new URL(req.url || '', `http://${req.headers.host}`);
|
||||
const sessionId = url.searchParams.get('sessionId') || `sub-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
let userId = url.searchParams.get('userId') || undefined;
|
||||
const callId = url.searchParams.get('callId') || undefined;
|
||||
|
||||
// Origin validation
|
||||
if (this.config.allowedOrigins.length > 0) {
|
||||
const origin = info.origin || info.req.headers['origin'] || '';
|
||||
const allowed = this.config.allowedOrigins.some(
|
||||
allowedOrigin => origin === allowedOrigin || origin.startsWith(allowedOrigin)
|
||||
);
|
||||
if (!allowed) {
|
||||
cb(false, 403, `Origin "${origin}" not allowed`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// JWT authentication
|
||||
if (this.config.enableAuth) {
|
||||
const token = extractJwt(info.req);
|
||||
if (!token) {
|
||||
cb(false, 401, 'Missing JWT token');
|
||||
return;
|
||||
}
|
||||
const payload = verifyJwt(token, this.config.jwtSecret);
|
||||
if (!payload) {
|
||||
cb(false, 401, 'Invalid or expired JWT');
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Max subscriber check
|
||||
if (this.sessions.size >= this.config.maxSubscribers) {
|
||||
cb(false, 503, 'Max subscribers reached');
|
||||
const origin = req.headers.origin;
|
||||
if (origin && !this.config.allowedOrigins.includes(origin)) {
|
||||
ws.close(1008, 'Origin not allowed');
|
||||
return;
|
||||
}
|
||||
|
||||
cb(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle new WebSocket connection
|
||||
*/
|
||||
private handleConnection(ws: WebSocket, req: IncomingMessage) {
|
||||
const token = extractJwt(req);
|
||||
const payload = token ? verifyJwt(token, this.config.jwtSecret) : null;
|
||||
const userId = payload?.sub || 'anonymous';
|
||||
|
||||
// crypto.randomBytes for sessionId (not Date.now() + Math.random())
|
||||
const sessionId = `sess_${randomBytes(12).toString('hex')}`;
|
||||
|
||||
const session: SubscriberSession = {
|
||||
sessionId,
|
||||
userId,
|
||||
ws,
|
||||
callIds: new Set(),
|
||||
lastAlertTime: new Map(),
|
||||
connectedAt: Date.now(),
|
||||
};
|
||||
|
||||
this.sessions.set(sessionId, session);
|
||||
|
||||
// Send handshake
|
||||
ws.send(JSON.stringify({
|
||||
type: 'handshake',
|
||||
payload: { sessionId, message: 'Connected to alert server' },
|
||||
}));
|
||||
|
||||
ws.on('message', this.handleMessage(session).bind(this));
|
||||
ws.on('close', () => this.handleDisconnect(session));
|
||||
ws.on('error', (err) => {
|
||||
console.error(`[AlertServer] Session ${sessionId} error:`, err.message);
|
||||
this.handleDisconnect(session);
|
||||
});
|
||||
|
||||
this.emit('subscriber:connected', { sessionId, userId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming message from subscriber
|
||||
*/
|
||||
private handleMessage(session: SubscriberSession) {
|
||||
return (data: Data) => {
|
||||
let parsed: Record<string, unknown>;
|
||||
try {
|
||||
parsed = JSON.parse(data.toString());
|
||||
} catch {
|
||||
session.ws.send(JSON.stringify({ type: 'error', payload: { message: 'Invalid JSON' } }));
|
||||
// JWT Authentication (if enabled)
|
||||
if (this.config.enableAuth && this.config.jwtSecret) {
|
||||
const authHeader = req.headers.authorization;
|
||||
if (!authHeader || !authHeader.startsWith('Bearer ')) {
|
||||
ws.close(4001, 'Missing or invalid JWT token');
|
||||
return;
|
||||
}
|
||||
|
||||
const msgType = parsed.type as string;
|
||||
|
||||
switch (msgType) {
|
||||
case 'subscribe': {
|
||||
const callIds = (parsed.callIds as string[]) || [];
|
||||
for (const cid of callIds) {
|
||||
if (typeof cid === 'string' && cid.length <= 64) {
|
||||
session.callIds.add(cid);
|
||||
}
|
||||
}
|
||||
if (session.callIds.size > this.config.maxCallIdsPerSubscriber) {
|
||||
const ids = Array.from(session.callIds);
|
||||
session.callIds = new Set(ids.slice(0, this.config.maxCallIdsPerSubscriber));
|
||||
}
|
||||
session.ws.send(JSON.stringify({
|
||||
type: 'subscribed',
|
||||
payload: { callIds: Array.from(session.callIds) },
|
||||
}));
|
||||
break;
|
||||
}
|
||||
case 'unsubscribe': {
|
||||
const callIds = (parsed.callIds as string[]) || Array.from(session.callIds);
|
||||
for (const cid of callIds) {
|
||||
session.callIds.delete(cid);
|
||||
}
|
||||
session.ws.send(JSON.stringify({
|
||||
type: 'unsubscribed',
|
||||
payload: { callIds: Array.from(session.callIds) },
|
||||
}));
|
||||
break;
|
||||
}
|
||||
case 'getHistory': {
|
||||
const limit = Math.min(parseInt(String(parsed.limit)) || 50, 100);
|
||||
const callId = parsed.callId as string | undefined;
|
||||
const filtered = callId
|
||||
? this.alertHistory.filter(a => a.callId === callId)
|
||||
: this.alertHistory;
|
||||
session.ws.send(JSON.stringify({
|
||||
type: 'history',
|
||||
payload: { alerts: filtered.slice(-limit) },
|
||||
}));
|
||||
break;
|
||||
}
|
||||
case 'ping':
|
||||
session.ws.send(JSON.stringify({ type: 'pong', payload: { timestamp: Date.now() } }));
|
||||
break;
|
||||
default:
|
||||
session.ws.send(JSON.stringify({ type: 'error', payload: { message: `Unknown message type: ${msgType}` } }));
|
||||
const token = authHeader.substring(7);
|
||||
const decoded = verifyJWT(token, this.config.jwtSecret);
|
||||
|
||||
if (!decoded) {
|
||||
ws.close(4002, 'Invalid or expired JWT token');
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle subscriber disconnect
|
||||
*/
|
||||
private handleDisconnect(session: SubscriberSession) {
|
||||
this.sessions.delete(session.sessionId);
|
||||
this.emit('subscriber:disconnected', { sessionId: session.sessionId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to CallAnalysisEngine events
|
||||
*/
|
||||
connectEngine(engine: EventEmitter): void {
|
||||
this.engine = engine;
|
||||
|
||||
engine.on('result', (result: { callId: string; callQuality?: Record<string, unknown>; sentiment?: string }) => {
|
||||
if (result.callQuality) {
|
||||
this.emitAlert({
|
||||
callId: result.callId,
|
||||
type: 'call_quality',
|
||||
severity: this.getSeverityFromQuality(result.callQuality),
|
||||
data: result.callQuality as Record<string, unknown>,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
engine.on('events', (events: { callId: string; events: Array<{ type: string; timestamp: number }> }) => {
|
||||
for (const event of events.events) {
|
||||
this.emitAlert({
|
||||
callId: events.callId,
|
||||
type: `call_event:${event.type}`,
|
||||
severity: 'medium',
|
||||
data: { eventType: event.type, timestamp: event.timestamp },
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
engine.on('anomalies', (anomalies: { callId: string; anomalies: Array<{ type: string; confidence: number }> }) => {
|
||||
for (const anomaly of anomalies.anomalies) {
|
||||
this.emitAlert({
|
||||
callId: anomalies.callId,
|
||||
type: `anomaly:${anomaly.type}`,
|
||||
severity: anomaly.confidence > 0.8 ? 'high' : 'medium',
|
||||
data: { anomalyType: anomaly.type, confidence: anomaly.confidence },
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
console.log('[AlertServer] Connected to analysis engine');
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit an alert to matching subscribers
|
||||
*/
|
||||
private emitAlert(options: AlertOptions): void {
|
||||
const alert: AlertEntry = {
|
||||
id: `alert_${randomBytes(8).toString('hex')}`,
|
||||
timestamp: Date.now(),
|
||||
callId: options.callId,
|
||||
type: options.type,
|
||||
severity: options.severity,
|
||||
data: options.data || {},
|
||||
};
|
||||
|
||||
// Store in bounded history
|
||||
this.alertHistory.push(alert);
|
||||
if (this.alertHistory.length > this.config.maxAlertHistory) {
|
||||
this.alertHistory = this.alertHistory.slice(-this.config.maxAlertHistory);
|
||||
// Extract user ID from token if present
|
||||
userId = (decoded as any).sub || userId;
|
||||
}
|
||||
|
||||
const payload = JSON.stringify({ type: 'alert', payload: alert });
|
||||
if (this.subscribers.size >= this.config.maxSubscribers) {
|
||||
ws.close(1013, 'Too many subscribers');
|
||||
return;
|
||||
}
|
||||
|
||||
// Broadcast to matching subscribers with cooldown
|
||||
for (const session of this.sessions.values()) {
|
||||
// Skip if subscriber has callId filter and this call is not in it
|
||||
if (session.callIds.size > 0 && !session.callIds.has(options.callId)) {
|
||||
continue;
|
||||
const session: SubscriberSession = {
|
||||
ws,
|
||||
userId,
|
||||
callIds: callId ? new Set([callId]) : new Set(),
|
||||
lastAlertTime: new Map(),
|
||||
subscribedAt: Date.now(),
|
||||
};
|
||||
|
||||
this.subscribers.set(sessionId, session);
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
type: 'connected',
|
||||
payload: { sessionId, userId, timestamp: Date.now() },
|
||||
}));
|
||||
|
||||
ws.on('message', (data: Buffer | ArrayBuffer) => {
|
||||
this.handleMessage(sessionId, data);
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
this.subscribers.delete(sessionId);
|
||||
console.log(`[AlertServer] Subscriber disconnected: ${sessionId}`);
|
||||
});
|
||||
|
||||
ws.on('error', (error: Error) => {
|
||||
console.error(`[AlertServer] Subscriber error (${sessionId}): ${error.message}`);
|
||||
});
|
||||
|
||||
console.log(`[AlertServer] Subscriber connected: ${sessionId}${callId ? ` (call: ${callId})` : ''}`);
|
||||
}
|
||||
|
||||
private handleMessage(sessionId: string, data: Buffer | ArrayBuffer): void {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
const session = this.subscribers.get(sessionId);
|
||||
if (!session) return;
|
||||
|
||||
switch (message.type) {
|
||||
case 'subscribe':
|
||||
if (message.callId) {
|
||||
session.callIds.add(message.callId);
|
||||
}
|
||||
break;
|
||||
|
||||
case 'unsubscribe':
|
||||
if (message.callId) {
|
||||
session.callIds.delete(message.callId);
|
||||
}
|
||||
break;
|
||||
|
||||
case 'ping':
|
||||
session.ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() }));
|
||||
break;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`[AlertServer] Message parse error: ${(error as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Cooldown check
|
||||
const key = `${options.callId}:${options.type}`;
|
||||
const lastTime = session.lastAlertTime.get(key) || 0;
|
||||
if (Date.now() - lastTime < this.config.cooldownMs) {
|
||||
continue;
|
||||
bindAnalysisEngine(callId: string, engine: CallAnalysisEngine): void {
|
||||
this.analysisEngines.set(callId, engine);
|
||||
|
||||
engine.on('result', (result: AnalysisResult) => {
|
||||
this.processAnalysisResult(callId, result);
|
||||
});
|
||||
|
||||
engine.on('events', (events: CallEvent[]) => {
|
||||
events.forEach(event => {
|
||||
this.sendAlert({
|
||||
type: 'call_event',
|
||||
severity: event.severity as AlertSeverity,
|
||||
callId,
|
||||
title: this.formatEventType(event.type),
|
||||
message: this.formatEventMessage(event),
|
||||
data: { event, timestamp: event.timestamp },
|
||||
actionable: event.severity === 'high',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
engine.on('anomalies', (anomalies: Anomaly[]) => {
|
||||
anomalies.forEach(anomaly => {
|
||||
this.sendAlert({
|
||||
type: 'anomaly',
|
||||
severity: anomaly.severity as AlertSeverity,
|
||||
callId,
|
||||
title: this.formatAnomalyType(anomaly.type),
|
||||
message: anomaly.description,
|
||||
data: {
|
||||
anomaly,
|
||||
confidence: anomaly.confidence,
|
||||
recommendation: anomaly.recommendation,
|
||||
},
|
||||
actionable: anomaly.severity === 'high' || anomaly.severity === 'critical',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
console.log(`[AlertServer] Bound analysis engine for call: ${callId}`);
|
||||
}
|
||||
|
||||
private processAnalysisResult(callId: string, result: AnalysisResult): void {
|
||||
if (result.callQuality.mosScore < 3.0) {
|
||||
this.sendAlert({
|
||||
type: 'quality_degraded',
|
||||
severity: result.callQuality.mosScore < 2.5 ? 'high' : 'medium',
|
||||
callId,
|
||||
title: 'Call Quality Degraded',
|
||||
message: `MOS score: ${result.callQuality.mosScore.toFixed(1)} (threshold: 3.0)`,
|
||||
data: result.callQuality as unknown as Record<string, unknown>,
|
||||
actionable: true,
|
||||
});
|
||||
}
|
||||
|
||||
if (result.sentiment.sentiment === 'negative' && result.sentiment.confidence > 0.7) {
|
||||
this.sendAlert({
|
||||
type: 'sentiment_shift',
|
||||
severity: 'medium',
|
||||
callId,
|
||||
title: 'Negative Sentiment Detected',
|
||||
message: `Confidence: ${(result.sentiment.confidence * 100).toFixed(0)}%`,
|
||||
data: result.sentiment as unknown as Record<string, unknown>,
|
||||
actionable: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
sendAlert(options: {
|
||||
type: AlertType;
|
||||
severity: AlertSeverity;
|
||||
callId?: string;
|
||||
title: string;
|
||||
message: string;
|
||||
data: Record<string, unknown>;
|
||||
actionable: boolean;
|
||||
}): void {
|
||||
const cooldownKey = `${options.callId}:${options.type}`;
|
||||
const now = Date.now();
|
||||
|
||||
const sessionKeys = Array.from(this.subscribers.keys());
|
||||
for (const key of sessionKeys) {
|
||||
const session = this.subscribers.get(key);
|
||||
if (!session) continue;
|
||||
|
||||
const lastTime = session.lastAlertTime.get(cooldownKey) || 0;
|
||||
if (now - lastTime < this.config.alertCooldownMs) continue;
|
||||
|
||||
if (options.callId && session.callIds.size > 0 && !session.callIds.has(options.callId)) continue;
|
||||
|
||||
const alert: AlertPayload = {
|
||||
id: `alert-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
type: options.type,
|
||||
severity: options.severity,
|
||||
timestamp: now,
|
||||
callId: options.callId,
|
||||
title: options.title,
|
||||
message: options.message,
|
||||
data: options.data,
|
||||
actionable: options.actionable,
|
||||
};
|
||||
|
||||
this.alertHistory.push(alert);
|
||||
if (this.alertHistory.length > this.maxAlertHistory) {
|
||||
this.alertHistory.shift();
|
||||
}
|
||||
|
||||
if (session.ws.readyState === WebSocket.OPEN) {
|
||||
session.ws.send(payload);
|
||||
session.lastAlertTime.set(key, Date.now());
|
||||
session.ws.send(JSON.stringify(alert));
|
||||
}
|
||||
}
|
||||
|
||||
this.emit('alert:emitted', alert);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine severity from call quality metrics
|
||||
*/
|
||||
private getSeverityFromQuality(quality: Record<string, unknown>): 'low' | 'medium' | 'high' | 'critical' {
|
||||
const mos = quality.mosScore as number | undefined;
|
||||
if (mos !== undefined) {
|
||||
if (mos < 2.5) return 'critical';
|
||||
if (mos < 3.5) return 'high';
|
||||
if (mos < 4.0) return 'medium';
|
||||
}
|
||||
return 'low';
|
||||
}
|
||||
|
||||
/**
|
||||
* Evict stale alerts from history based on TTL
|
||||
*/
|
||||
private evictStaleAlerts(): void {
|
||||
const cutoff = Date.now() - this.config.alertHistoryTtlMs;
|
||||
const before = this.alertHistory.length;
|
||||
this.alertHistory = this.alertHistory.filter(a => a.timestamp > cutoff);
|
||||
const evicted = before - this.alertHistory.length;
|
||||
if (evicted > 0) {
|
||||
console.log(`[AlertServer] Evicted ${evicted} stale alerts`);
|
||||
session.lastAlertTime.set(cooldownKey, now);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get alert history (for API endpoint)
|
||||
*/
|
||||
getAlertHistory(limit = 50, callId?: string): AlertEntry[] {
|
||||
let alerts = this.alertHistory;
|
||||
if (callId) {
|
||||
alerts = alerts.filter(a => a.callId === callId);
|
||||
}
|
||||
return alerts.slice(-limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get subscriber stats
|
||||
*/
|
||||
getStats() {
|
||||
return {
|
||||
activeSubscribers: this.sessions.size,
|
||||
alertHistorySize: this.alertHistory.length,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Graceful shutdown with timeout
|
||||
*/
|
||||
async stop(timeoutMs?: number): Promise<void> {
|
||||
const t = timeoutMs || this.config.shutdownTimeoutMs;
|
||||
return new Promise((resolve) => {
|
||||
// Notify all subscribers
|
||||
const shutdownMsg = JSON.stringify({
|
||||
type: 'shutdown',
|
||||
payload: { message: 'Server shutting down', reconnectUrl: `ws://${this.config.host}:${this.config.port}` },
|
||||
});
|
||||
|
||||
for (const session of this.sessions.values()) {
|
||||
if (session.ws.readyState === WebSocket.OPEN) {
|
||||
session.ws.send(shutdownMsg);
|
||||
}
|
||||
}
|
||||
|
||||
// Close connections with timeout
|
||||
const deadline = Date.now() + t;
|
||||
let pending = this.sessions.size;
|
||||
|
||||
if (pending === 0) {
|
||||
this.finishShutdown();
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
for (const session of this.sessions.values()) {
|
||||
session.ws.close(1001, 'Server shutting down');
|
||||
}
|
||||
this.finishShutdown();
|
||||
resolve();
|
||||
}, Math.max(100, deadline - Date.now()));
|
||||
|
||||
for (const session of this.sessions.values()) {
|
||||
session.ws.once('close', () => {
|
||||
pending--;
|
||||
if (pending <= 0) {
|
||||
clearTimeout(timer);
|
||||
this.finishShutdown();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
broadcastCallSummary(callId: string, summary: string): void {
|
||||
this.sendAlert({
|
||||
type: 'call_summary',
|
||||
severity: 'info',
|
||||
callId,
|
||||
title: 'Call Analysis Summary',
|
||||
message: summary,
|
||||
data: { summary },
|
||||
actionable: false,
|
||||
});
|
||||
}
|
||||
|
||||
private finishShutdown(): void {
|
||||
if (this.cleanupTimer) clearInterval(this.cleanupTimer);
|
||||
this.wss.close();
|
||||
this.sessions.clear();
|
||||
console.log('[AlertServer] Shutdown complete');
|
||||
getAlertHistory(limit: number = 50, callId?: string): AlertPayload[] {
|
||||
let history = this.alertHistory;
|
||||
if (callId) {
|
||||
history = history.filter(a => a.callId === callId);
|
||||
}
|
||||
return history.slice(-limit);
|
||||
}
|
||||
|
||||
getSubscriberCount(): number {
|
||||
return this.subscribers.size;
|
||||
}
|
||||
|
||||
getActiveCalls(): string[] {
|
||||
return Array.from(this.analysisEngines.keys());
|
||||
}
|
||||
|
||||
getEngine(callId: string): CallAnalysisEngine | undefined {
|
||||
return this.analysisEngines.get(callId);
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.isRunning = false;
|
||||
|
||||
this.subscribers.forEach((session) => {
|
||||
if (session.ws.readyState === WebSocket.OPEN) {
|
||||
session.ws.send(JSON.stringify({
|
||||
type: 'server_shutdown',
|
||||
payload: { timestamp: Date.now() },
|
||||
}));
|
||||
session.ws.close(1001, 'Server shutting down');
|
||||
}
|
||||
});
|
||||
|
||||
this.analysisEngines.forEach((engine) => {
|
||||
engine.destroy();
|
||||
});
|
||||
|
||||
if (this.wss) {
|
||||
await new Promise<void>((resolve) => {
|
||||
this.wss!.close(() => resolve());
|
||||
});
|
||||
this.wss = null;
|
||||
}
|
||||
|
||||
console.log('[AlertServer] Stopped');
|
||||
}
|
||||
|
||||
private formatEventType(type: string): string {
|
||||
const labels: Record<string, string> = {
|
||||
interrupt: 'Speaker Interrupt',
|
||||
overlap: 'Speech Overlap',
|
||||
long_pause: 'Long Pause',
|
||||
volume_spike: 'Volume Spike',
|
||||
silence: 'Silence Detected',
|
||||
speaker_change: 'Speaker Change',
|
||||
};
|
||||
return labels[type] || type;
|
||||
}
|
||||
|
||||
private formatEventMessage(event: CallEvent): string {
|
||||
const messages: Record<string, string> = {
|
||||
interrupt: `Interrupt detected (${event.duration}ms)`,
|
||||
overlap: `Speech overlap detected (${event.duration}ms)`,
|
||||
long_pause: `Pause duration: ${(event.duration / 1000).toFixed(1)}s`,
|
||||
volume_spike: `Volume spike: ${(event.metadata.level as number)?.toFixed(2) || 'unknown'}`,
|
||||
silence: `Silence detected for ${(event.duration * 1000).toFixed(0)}ms`,
|
||||
speaker_change: 'Speaker change detected',
|
||||
};
|
||||
return messages[event.type] || 'Event detected';
|
||||
}
|
||||
|
||||
private formatAnomalyType(type: string): string {
|
||||
const labels: Record<string, string> = {
|
||||
background_noise: 'Background Noise',
|
||||
echo: 'Echo Detected',
|
||||
distortion: 'Audio Distortion',
|
||||
dropouts: 'Audio Dropout',
|
||||
excessive_silence: 'Excessive Silence',
|
||||
volume_inconsistency: 'Volume Inconsistency',
|
||||
};
|
||||
return labels[type] || type;
|
||||
}
|
||||
}
|
||||
|
||||
export function createAlertServer(config?: Partial<AlertServerConfig>): AlertServer {
|
||||
return new AlertServer(config);
|
||||
}
|
||||
export default AlertServer;
|
||||
|
||||
Reference in New Issue
Block a user