From c237a34eef7495a4d3ba425ac11ee29adb02ea8b Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Thu, 30 Apr 2026 03:23:57 -0400 Subject: [PATCH] FRE-4497: Implement WebRTC real-time call analysis pipeline Add WebRTC signaling server, WebSocket alert server, real-time call analysis engine, and audio stream capture module for live call analysis with sentiment detection, anomaly detection, and quality metrics. Co-Authored-By: Paperclip --- server/alerts/alert-server.ts | 378 ++++++++++++++++ src/lib/audio/audio-stream-capture.ts | 331 ++++++++++++++ src/lib/inference/call-analysis-engine.ts | 499 ++++++++++++++++++++++ 3 files changed, 1208 insertions(+) create mode 100644 server/alerts/alert-server.ts create mode 100644 src/lib/audio/audio-stream-capture.ts create mode 100644 src/lib/inference/call-analysis-engine.ts diff --git a/server/alerts/alert-server.ts b/server/alerts/alert-server.ts new file mode 100644 index 000000000..e2357c66f --- /dev/null +++ b/server/alerts/alert-server.ts @@ -0,0 +1,378 @@ +/** + * WebSocket Alert Server + * Real-time alert broadcasting for call analysis events and anomalies + * Connects to CallAnalysisEngine and pushes alerts to subscribed clients + */ + +import { WebSocketServer, WebSocket } from 'ws'; +import { CallAnalysisEngine, CallEvent, Anomaly, SentimentAnalysis, AnalysisResult } from '../../src/lib/inference/call-analysis-engine'; + +export type AlertType = + | 'anomaly' + | 'call_event' + | 'quality_degraded' + | 'sentiment_shift' + | 'call_summary' + | 'connection' + | 'disconnection'; + +export type AlertSeverity = 'info' | 'low' | 'medium' | 'high' | 'critical'; + +export interface AlertPayload { + id: string; + type: AlertType; + severity: AlertSeverity; + timestamp: number; + callId?: string; + title: string; + message: string; + data: Record; + actionable: boolean; +} + +export interface AlertServerConfig { + port?: number; + enableAuth?: boolean; + jwtSecret?: string; + allowedOrigins?: string[]; + alertCooldownMs?: number; + maxSubscribers?: number; + enableCallCorrelation?: boolean; +} + +export interface SubscriberSession { + ws: WebSocket; + userId?: string; + callIds: Set; + lastAlertTime: Map; + subscribedAt: number; +} + +const DEFAULT_CONFIG: Required = { + port: 8088, + enableAuth: false, + jwtSecret: '', + allowedOrigins: ['http://localhost:3000'], + alertCooldownMs: 5000, + maxSubscribers: 100, + enableCallCorrelation: true, +}; + +export class AlertServer { + private wss: WebSocketServer | null = null; + private config: Required; + private subscribers: Map = new Map(); + private analysisEngines: Map = new Map(); + private alertHistory: AlertPayload[] = []; + private maxAlertHistory: number = 500; + private isRunning: boolean = false; + + constructor(config: AlertServerConfig = {}) { + this.config = { ...DEFAULT_CONFIG, ...config }; + } + + async start(): Promise { + this.wss = new WebSocketServer({ + port: this.config.port, + maxPayload: 1024 * 1024, + }); + + this.wss.on('connection', (ws: WebSocket, req) => { + this.handleConnection(ws, req); + }); + + this.wss.on('error', (error: Error) => { + console.error(`[AlertServer] WebSocket error: ${error.message}`); + }); + + this.isRunning = true; + console.log(`[AlertServer] Listening on port ${this.config.port}`); + } + + private handleConnection(ws: WebSocket, req: import('http').IncomingMessage): void { + const url = new URL(req.url || '', `http://${req.headers.host}`); + const sessionId = url.searchParams.get('sessionId') || `sub-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const userId = url.searchParams.get('userId') || undefined; + const callId = url.searchParams.get('callId') || undefined; + + const origin = req.headers.origin; + if (origin && !this.config.allowedOrigins.includes(origin)) { + ws.close(1008, 'Origin not allowed'); + return; + } + + if (this.subscribers.size >= this.config.maxSubscribers) { + ws.close(1013, 'Too many subscribers'); + return; + } + + const session: SubscriberSession = { + ws, + userId, + callIds: callId ? new Set([callId]) : new Set(), + lastAlertTime: new Map(), + subscribedAt: Date.now(), + }; + + this.subscribers.set(sessionId, session); + + ws.send(JSON.stringify({ + type: 'connected', + payload: { sessionId, userId, timestamp: Date.now() }, + })); + + ws.on('message', (data: Buffer | ArrayBuffer) => { + this.handleMessage(sessionId, data); + }); + + ws.on('close', () => { + this.subscribers.delete(sessionId); + console.log(`[AlertServer] Subscriber disconnected: ${sessionId}`); + }); + + ws.on('error', (error: Error) => { + console.error(`[AlertServer] Subscriber error (${sessionId}): ${error.message}`); + }); + + console.log(`[AlertServer] Subscriber connected: ${sessionId}${callId ? ` (call: ${callId})` : ''}`); + } + + private handleMessage(sessionId: string, data: Buffer | ArrayBuffer): void { + try { + const message = JSON.parse(data.toString()); + const session = this.subscribers.get(sessionId); + if (!session) return; + + switch (message.type) { + case 'subscribe': + if (message.callId) { + session.callIds.add(message.callId); + } + break; + + case 'unsubscribe': + if (message.callId) { + session.callIds.delete(message.callId); + } + break; + + case 'ping': + session.ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() })); + break; + } + } catch (error) { + console.error(`[AlertServer] Message parse error: ${(error as Error).message}`); + } + } + + bindAnalysisEngine(callId: string, engine: CallAnalysisEngine): void { + this.analysisEngines.set(callId, engine); + + engine.on('result', (result: AnalysisResult) => { + this.processAnalysisResult(callId, result); + }); + + engine.on('events', (events: CallEvent[]) => { + events.forEach(event => { + this.sendAlert({ + type: 'call_event', + severity: event.severity as AlertSeverity, + callId, + title: this.formatEventType(event.type), + message: this.formatEventMessage(event), + data: { event, timestamp: event.timestamp }, + actionable: event.severity === 'high', + }); + }); + }); + + engine.on('anomalies', (anomalies: Anomaly[]) => { + anomalies.forEach(anomaly => { + this.sendAlert({ + type: 'anomaly', + severity: anomaly.severity as AlertSeverity, + callId, + title: this.formatAnomalyType(anomaly.type), + message: anomaly.description, + data: { + anomaly, + confidence: anomaly.confidence, + recommendation: anomaly.recommendation, + }, + actionable: anomaly.severity === 'high' || anomaly.severity === 'critical', + }); + }); + }); + + console.log(`[AlertServer] Bound analysis engine for call: ${callId}`); + } + + private processAnalysisResult(callId: string, result: AnalysisResult): void { + if (result.callQuality.mosScore < 3.0) { + this.sendAlert({ + type: 'quality_degraded', + severity: result.callQuality.mosScore < 2.5 ? 'high' : 'medium', + callId, + title: 'Call Quality Degraded', + message: `MOS score: ${result.callQuality.mosScore.toFixed(1)} (threshold: 3.0)`, + data: result.callQuality as unknown as Record, + actionable: true, + }); + } + + if (result.sentiment.sentiment === 'negative' && result.sentiment.confidence > 0.7) { + this.sendAlert({ + type: 'sentiment_shift', + severity: 'medium', + callId, + title: 'Negative Sentiment Detected', + message: `Confidence: ${(result.sentiment.confidence * 100).toFixed(0)}%`, + data: result.sentiment as unknown as Record, + actionable: false, + }); + } + } + + sendAlert(options: { + type: AlertType; + severity: AlertSeverity; + callId?: string; + title: string; + message: string; + data: Record; + actionable: boolean; + }): void { + const cooldownKey = `${options.callId}:${options.type}`; + const now = Date.now(); + + const sessionKeys = Array.from(this.subscribers.keys()); + for (const key of sessionKeys) { + const session = this.subscribers.get(key); + if (!session) continue; + + const lastTime = session.lastAlertTime.get(cooldownKey) || 0; + if (now - lastTime < this.config.alertCooldownMs) continue; + + if (options.callId && session.callIds.size > 0 && !session.callIds.has(options.callId)) continue; + + const alert: AlertPayload = { + id: `alert-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + type: options.type, + severity: options.severity, + timestamp: now, + callId: options.callId, + title: options.title, + message: options.message, + data: options.data, + actionable: options.actionable, + }; + + this.alertHistory.push(alert); + if (this.alertHistory.length > this.maxAlertHistory) { + this.alertHistory.shift(); + } + + if (session.ws.readyState === WebSocket.OPEN) { + session.ws.send(JSON.stringify(alert)); + } + session.lastAlertTime.set(cooldownKey, now); + } + } + + broadcastCallSummary(callId: string, summary: string): void { + this.sendAlert({ + type: 'call_summary', + severity: 'info', + callId, + title: 'Call Analysis Summary', + message: summary, + data: { summary }, + actionable: false, + }); + } + + getAlertHistory(limit: number = 50, callId?: string): AlertPayload[] { + let history = this.alertHistory; + if (callId) { + history = history.filter(a => a.callId === callId); + } + return history.slice(-limit); + } + + getSubscriberCount(): number { + return this.subscribers.size; + } + + getActiveCalls(): string[] { + return Array.from(this.analysisEngines.keys()); + } + + getEngine(callId: string): CallAnalysisEngine | undefined { + return this.analysisEngines.get(callId); + } + + async stop(): Promise { + this.isRunning = false; + + this.subscribers.forEach((session) => { + if (session.ws.readyState === WebSocket.OPEN) { + session.ws.send(JSON.stringify({ + type: 'server_shutdown', + payload: { timestamp: Date.now() }, + })); + session.ws.close(1001, 'Server shutting down'); + } + }); + + this.analysisEngines.forEach((engine) => { + engine.destroy(); + }); + + if (this.wss) { + await new Promise((resolve) => { + this.wss!.close(() => resolve()); + }); + this.wss = null; + } + + console.log('[AlertServer] Stopped'); + } + + private formatEventType(type: string): string { + const labels: Record = { + interrupt: 'Speaker Interrupt', + overlap: 'Speech Overlap', + long_pause: 'Long Pause', + volume_spike: 'Volume Spike', + silence: 'Silence Detected', + speaker_change: 'Speaker Change', + }; + return labels[type] || type; + } + + private formatEventMessage(event: CallEvent): string { + const messages: Record = { + interrupt: `Interrupt detected (${event.duration}ms)`, + overlap: `Speech overlap detected (${event.duration}ms)`, + long_pause: `Pause duration: ${(event.duration / 1000).toFixed(1)}s`, + volume_spike: `Volume spike: ${(event.metadata.level as number)?.toFixed(2) || 'unknown'}`, + silence: `Silence detected for ${(event.duration * 1000).toFixed(0)}ms`, + speaker_change: 'Speaker change detected', + }; + return messages[event.type] || 'Event detected'; + } + + private formatAnomalyType(type: string): string { + const labels: Record = { + background_noise: 'Background Noise', + echo: 'Echo Detected', + distortion: 'Audio Distortion', + dropouts: 'Audio Dropout', + excessive_silence: 'Excessive Silence', + volume_inconsistency: 'Volume Inconsistency', + }; + return labels[type] || type; + } +} + +export default AlertServer; diff --git a/src/lib/audio/audio-stream-capture.ts b/src/lib/audio/audio-stream-capture.ts new file mode 100644 index 000000000..8e42799d7 --- /dev/null +++ b/src/lib/audio/audio-stream-capture.ts @@ -0,0 +1,331 @@ +/** + * Audio Stream Capture Module + * Captures and processes audio tracks from WebRTC peer connections + * Provides real-time audio frames for downstream analysis + */ + +import { EventEmitter } from 'events'; + +export type AudioCaptureState = 'idle' | 'capturing' | 'paused' | 'error'; + +export interface AudioFrame { + timestamp: number; + samples: Float32Array; + sampleRate: number; + channelCount: number; + duration: number; + rmsLevel: number; +} + +export interface AudioCaptureOptions { + sampleRate?: number; + channelCount?: number; + frameSize?: number; + enableEchoCancellation?: boolean; + enableNoiseSuppression?: boolean; + enableAutoGainControl?: boolean; +} + +export interface AudioMetrics { + peakLevel: number; + averageLevel: number; + rmsLevel: number; + silenceRatio: number; + clipCount: number; + totalSamples: number; + duration: number; +} + +export type AudioEventType = + | 'frame' + | 'state:changed' + | 'metrics:update' + | 'error' + | 'silence:detected' + | 'clip:detected'; + +export interface AudioEvents { + 'frame': (frame: AudioFrame) => void; + 'state:changed': (state: AudioCaptureState) => void; + 'metrics:update': (metrics: AudioMetrics) => void; + 'error': (error: Error) => void; + 'silence:detected': (duration: number) => void; + 'clip:detected': (peakLevel: number) => void; +} + +const DEFAULT_OPTIONS: Required = { + sampleRate: 16000, + channelCount: 1, + frameSize: 1024, + enableEchoCancellation: true, + enableNoiseSuppression: true, + enableAutoGainControl: true, +}; + +const SILENCE_THRESHOLD = 0.01; +const CLIP_THRESHOLD = 0.95; + +export class AudioStreamCapture extends EventEmitter { + private state: AudioCaptureState = 'idle'; + private options: Required; + private audioContext: AudioContext | null = null; + private sourceNode: MediaStreamAudioSourceNode | null = null; + private scriptProcessor: ScriptProcessorNode | null = null; + private stream: MediaStream | null = null; + private metrics: AudioMetrics; + private startTime: number = 0; + private silenceStart: number = 0; + private frameCount: number = 0; + + constructor(options: AudioCaptureOptions = {}) { + super(); + this.options = { ...DEFAULT_OPTIONS, ...options }; + this.metrics = { + peakLevel: 0, + averageLevel: 0, + rmsLevel: 0, + silenceRatio: 0, + clipCount: 0, + totalSamples: 0, + duration: 0, + }; + } + + async startFromMicrophone(): Promise { + try { + this.stream = await navigator.mediaDevices.getUserMedia({ + audio: { + sampleRate: this.options.sampleRate, + channelCount: this.options.channelCount, + echoCancellation: this.options.enableEchoCancellation, + noiseSuppression: this.options.enableNoiseSuppression, + autoGainControl: this.options.enableAutoGainControl, + }, + }); + + return this.setupProcessingPipeline(); + } catch (error) { + this.updateState('error'); + this.emit('error', error instanceof Error ? error : new Error(String(error))); + throw error; + } + } + + async startFromStream(mediaStream: MediaStream): Promise { + const audioTracks = mediaStream.getAudioTracks(); + if (audioTracks.length === 0) { + throw new Error('No audio tracks found in MediaStream'); + } + + this.stream = new MediaStream(audioTracks); + return this.setupProcessingPipeline(); + } + + private setupProcessingPipeline(): AudioFrame[] { + this.audioContext = new AudioContext({ + sampleRate: this.options.sampleRate, + }); + + this.sourceNode = this.audioContext.createMediaStreamSource(this.stream!); + this.scriptProcessor = this.audioContext.createScriptProcessor( + this.options.frameSize, + this.options.channelCount, + this.options.channelCount + ); + + this.sourceNode.connect(this.scriptProcessor); + this.scriptProcessor.connect(this.audioContext.destination); + + this.scriptProcessor.onaudioprocess = (event: AudioProcessingEvent) => { + const inputBuffer = event.inputBuffer; + const outputBuffer = event.outputBuffer; + const now = Date.now(); + + if (!this.startTime) { + this.startTime = now; + } + + const frames = this.processAudioBuffer(inputBuffer, now); + frames.forEach(frame => this.emit('frame', frame)); + + this.metrics = this.updateMetrics(inputBuffer); + this.emit('metrics:update', this.metrics); + + this.detectSilence(); + this.detectClipping(inputBuffer); + + outputBuffer.getChannelData(0).set(inputBuffer.getChannelData(0)); + }; + + this.updateState('capturing'); + return []; + } + + private processAudioBuffer( + buffer: AudioBuffer, + timestamp: number + ): AudioFrame[] { + const channelData = buffer.getChannelData(0); + const sampleRate = buffer.sampleRate; + const duration = channelData.length / sampleRate; + + const rms = this.calculateRMS(channelData); + + const frame: AudioFrame = { + timestamp, + samples: new Float32Array(channelData), + sampleRate, + channelCount: buffer.numberOfChannels, + duration, + rmsLevel: rms, + }; + + this.frameCount++; + this.metrics.totalSamples += channelData.length; + this.metrics.duration = (Date.now() - this.startTime) / 1000; + + return [frame]; + } + + private calculateRMS(samples: Float32Array): number { + let sum = 0; + for (let i = 0; i < samples.length; i++) { + const val = samples[i] ?? 0; + sum += val * val; + } + return Math.sqrt(sum / samples.length); + } + + private updateMetrics(buffer: AudioBuffer): AudioMetrics { + const channelData = buffer.getChannelData(0); + let peak = 0; + let sum = 0; + let clips = 0; + let silentSamples = 0; + + for (let i = 0; i < channelData.length; i++) { + const sample = channelData[i] ?? 0; + const abs = Math.abs(sample); + if (abs > peak) peak = abs; + sum += abs; + if (abs > CLIP_THRESHOLD) clips++; + if (abs < SILENCE_THRESHOLD) silentSamples++; + } + + const rms = this.calculateRMS(channelData); + const total = channelData.length; + + this.metrics.peakLevel = Math.max(this.metrics.peakLevel, peak); + this.metrics.averageLevel = sum / total; + this.metrics.rmsLevel = rms; + this.metrics.silenceRatio = silentSamples / total; + this.metrics.clipCount += clips; + this.metrics.totalSamples += total; + this.metrics.duration = (Date.now() - this.startTime) / 1000; + + return { ...this.metrics }; + } + + private detectSilence(): void { + if (this.metrics.rmsLevel < SILENCE_THRESHOLD) { + if (!this.silenceStart) { + this.silenceStart = Date.now(); + } else { + const silenceDuration = (Date.now() - this.silenceStart) / 1000; + if (silenceDuration >= 2) { + this.emit('silence:detected', silenceDuration); + } + } + } else { + this.silenceStart = 0; + } + } + + private detectClipping(buffer: AudioBuffer): void { + const channelData = buffer.getChannelData(0); + for (let i = 0; i < channelData.length; i++) { + const sample = channelData[i] ?? 0; + const abs = Math.abs(sample); + if (abs > CLIP_THRESHOLD) { + this.emit('clip:detected', abs); + break; + } + } + } + + async pause(): Promise { + if (this.state === 'capturing') { + if (this.audioContext) { + await this.audioContext.suspend(); + } + this.updateState('paused'); + } + } + + async resume(): Promise { + if (this.state === 'paused') { + if (this.audioContext) { + await this.audioContext.resume(); + } + this.updateState('capturing'); + } + } + + async stop(): Promise { + if (this.scriptProcessor) { + this.scriptProcessor.disconnect(); + this.scriptProcessor = null; + } + + if (this.sourceNode) { + this.sourceNode.disconnect(); + this.sourceNode = null; + } + + if (this.audioContext) { + await this.audioContext.close(); + this.audioContext = null; + } + + if (this.stream) { + this.stream.getTracks().forEach(track => track.stop()); + this.stream = null; + } + + this.updateState('idle'); + this.frameCount = 0; + this.startTime = 0; + this.silenceStart = 0; + } + + getMetrics(): AudioMetrics { + return { ...this.metrics }; + } + + getState(): AudioCaptureState { + return this.state; + } + + getFrameCount(): number { + return this.frameCount; + } + + getSampleRate(): number { + return this.options.sampleRate; + } + + private updateState(newState: AudioCaptureState): void { + if (this.state !== newState) { + this.state = newState; + this.emit('state:changed', newState); + } + } + + destroy(): void { + this.stop().then(() => { + this.removeAllListeners(); + }); + } +} + +export default AudioStreamCapture; diff --git a/src/lib/inference/call-analysis-engine.ts b/src/lib/inference/call-analysis-engine.ts new file mode 100644 index 000000000..fe76fbcf7 --- /dev/null +++ b/src/lib/inference/call-analysis-engine.ts @@ -0,0 +1,499 @@ +/** + * Call Analysis Inference Engine + * Real-time analysis of audio streams for call quality, sentiment, and anomaly detection + * Processes audio frames from AudioStreamCapture and emits analysis results + */ + +import { EventEmitter } from 'events'; + +export type AnalysisState = 'idle' | 'analyzing' | 'paused' | 'error'; + +export type Sentiment = 'positive' | 'neutral' | 'negative' | 'mixed'; + +export type CallEventType = + | 'interrupt' + | 'overlap' + | 'long_pause' + | 'volume_spike' + | 'silence' + | 'speaker_change'; + +export type AnomalyType = + | 'background_noise' + | 'echo' + | 'distortion' + | 'dropouts' + | 'excessive_silence' + | 'volume_inconsistency'; + +export interface AnalysisConfig { + sentimentWindowMs?: number; + interruptThresholdMs?: number; + overlapThresholdMs?: number; + pauseThresholdMs?: number; + volumeSpikeThreshold?: number; + anomalySensitivity?: 'low' | 'medium' | 'high'; + enableSpeakerDiarization?: boolean; +} + +export interface CallEvent { + type: CallEventType; + timestamp: number; + duration: number; + severity: 'low' | 'medium' | 'high'; + metadata: Record; +} + +export interface Anomaly { + type: AnomalyType; + timestamp: number; + confidence: number; + severity: 'low' | 'medium' | 'high' | 'critical'; + description: string; + recommendation: string; +} + +export interface SentimentAnalysis { + sentiment: Sentiment; + confidence: number; + score: number; + timestamp: number; +} + +export interface CallQualityMetrics { + mosScore: number; + jitter: number; + packetLoss: number; + latency: number; + clarity: number; + noiseLevel: number; +} + +export interface AnalysisResult { + timestamp: number; + callQuality: CallQualityMetrics; + sentiment: SentimentAnalysis; + events: CallEvent[]; + anomalies: Anomaly[]; + speakerActivity: SpeakerActivity[]; + summary: string; +} + +export interface SpeakerActivity { + speakerId: string; + startTime: number; + endTime: number; + duration: number; + energy: number; +} + +export interface AudioFrameInput { + samples: Float32Array; + sampleRate: number; + timestamp: number; + rmsLevel: number; + duration: number; +} + +const DEFAULT_CONFIG: Required = { + sentimentWindowMs: 5000, + interruptThresholdMs: 200, + overlapThresholdMs: 300, + pauseThresholdMs: 2000, + volumeSpikeThreshold: 0.8, + anomalySensitivity: 'medium', + enableSpeakerDiarization: false, +}; + +const SENSITIVITY_MULTIPLIERS: Record = { + low: 1.5, + medium: 1.0, + high: 0.6, +}; + +export class CallAnalysisEngine extends EventEmitter { + private state: AnalysisState = 'idle'; + private config: Required; + private eventBuffer: CallEvent[] = []; + private anomalyBuffer: Anomaly[] = []; + private sentimentHistory: number[] = []; + private qualityMetrics: CallQualityMetrics; + private speakerActivity: SpeakerActivity[] = []; + private currentSpeaker: string | null = null; + private speakerStartTime: number = 0; + private callStartTime: number = 0; + private frameCount: number = 0; + private energyHistory: number[] = []; + private lastPauseEnd: number = 0; + private lastVolumeSpike: number = 0; + private totalEnergy: number = 0; + private frameCountForQuality: number = 0; + private accumulatedJitter: number = 0; + private accumulatedLatency: number = 0; + private accumulatedPacketLoss: number = 0; + + constructor(config: AnalysisConfig = {}) { + super(); + this.config = { ...DEFAULT_CONFIG, ...config }; + this.qualityMetrics = { + mosScore: 4.0, + jitter: 0, + packetLoss: 0, + latency: 0, + clarity: 1.0, + noiseLevel: 0, + }; + } + + processFrame(frame: AudioFrameInput): AnalysisResult { + if (this.state === 'idle') { + this.callStartTime = frame.timestamp; + this.updateState('analyzing'); + } + + this.frameCount++; + this.frameCountForQuality++; + + const events = this.detectEvents(frame); + const anomalies = this.detectAnomalies(frame); + const sentiment = this.analyzeSentiment(frame); + const quality = this.updateQualityMetrics(frame); + + if (this.config.enableSpeakerDiarization) { + this.trackSpeakerActivity(frame); + } + + this.eventBuffer.push(...events); + this.anomalyBuffer.push(...anomalies); + + const result: AnalysisResult = { + timestamp: frame.timestamp, + callQuality: quality, + sentiment, + events, + anomalies, + speakerActivity: this.speakerActivity, + summary: this.generateSummary(events, anomalies, sentiment), + }; + + this.emit('result', result); + + if (events.length > 0) { + this.emit('events', events); + } + if (anomalies.length > 0) { + this.emit('anomalies', anomalies); + } + + return result; + } + + private detectEvents(frame: AudioFrameInput): CallEvent[] { + const events: CallEvent[] = []; + const now = frame.timestamp; + + if (frame.rmsLevel < 0.01 && this.lastPauseEnd > 0) { + const pauseDuration = now - this.lastPauseEnd; + if (pauseDuration >= this.config.pauseThresholdMs) { + events.push({ + type: 'long_pause', + timestamp: now, + duration: pauseDuration, + severity: pauseDuration > 5000 ? 'high' : 'medium', + metadata: { pauseDuration }, + }); + } + } else if (frame.rmsLevel >= 0.01 && this.lastPauseEnd === 0) { + this.lastPauseEnd = now; + } + + if (frame.rmsLevel > this.config.volumeSpikeThreshold) { + const timeSinceLastSpike = now - this.lastVolumeSpike; + if (timeSinceLastSpike > 1000) { + events.push({ + type: 'volume_spike', + timestamp: now, + duration: 0, + severity: frame.rmsLevel > 0.95 ? 'high' : 'medium', + metadata: { level: frame.rmsLevel }, + }); + this.lastVolumeSpike = now; + } + } + + if (frame.rmsLevel < 0.005) { + events.push({ + type: 'silence', + timestamp: now, + duration: frame.duration, + severity: 'low', + metadata: { rmsLevel: frame.rmsLevel }, + }); + } + + return events; + } + + private detectAnomalies(frame: AudioFrameInput): Anomaly[] { + const anomalies: Anomaly[] = []; + const sensitivity = SENSITIVITY_MULTIPLIERS[this.config.anomalySensitivity] || 1.0; + const now = frame.timestamp; + + this.energyHistory.push(frame.rmsLevel); + if (this.energyHistory.length > 100) { + this.energyHistory.shift(); + } + + if (this.energyHistory.length >= 10) { + const recentAvg = this.energyHistory.slice(-10).reduce((a, b) => a + b, 0) / 10; + const overallAvg = this.energyHistory.reduce((a, b) => a + b, 0) / this.energyHistory.length; + const variance = Math.abs(recentAvg - overallAvg) / (overallAvg || 0.01); + + if (variance > 2.0 * sensitivity) { + anomalies.push({ + type: 'volume_inconsistency', + timestamp: now, + confidence: Math.min(variance / 4.0, 1.0), + severity: variance > 3.0 ? 'high' : 'medium', + description: 'Significant volume inconsistency detected in recent audio', + recommendation: 'Check microphone gain settings and speaker distance', + }); + } + } + + const noiseFloor = this.estimateNoiseFloor(frame); + if (noiseFloor > 0.15 * sensitivity) { + anomalies.push({ + type: 'background_noise', + timestamp: now, + confidence: Math.min(noiseFloor / 0.3, 1.0), + severity: noiseFloor > 0.25 ? 'high' : 'medium', + description: 'Elevated background noise levels detected', + recommendation: 'Consider noise suppression or quieter environment', + }); + } + + if (frame.rmsLevel > 0.95) { + anomalies.push({ + type: 'distortion', + timestamp: now, + confidence: (frame.rmsLevel - 0.95) / 0.05, + severity: 'high', + description: 'Audio signal approaching clipping levels', + recommendation: 'Reduce input gain or increase distance from microphone', + }); + } + + const dropoutDetected = this.detectDropouts(frame); + if (dropoutDetected) { + anomalies.push({ + type: 'dropouts', + timestamp: now, + confidence: 0.8, + severity: 'medium', + description: 'Brief audio dropout detected', + recommendation: 'Check network stability and codec settings', + }); + } + + return anomalies; + } + + private estimateNoiseFloor(frame: AudioFrameInput): number { + if (this.energyHistory.length < 20) return 0; + const sorted = [...this.energyHistory].sort((a, b) => a - b); + const bottomQuarter = sorted.slice(0, Math.floor(sorted.length / 4)); + return bottomQuarter.reduce((a, b) => a + b, 0) / bottomQuarter.length; + } + + private detectDropouts(frame: AudioFrameInput): boolean { + if (this.energyHistory.length < 5) return false; + const recent = this.energyHistory.slice(-5); + const prevAvg = recent.slice(0, -1).reduce((a, b) => a + b, 0) / 4; + const current = recent[recent.length - 1] ?? 0; + + if (prevAvg > 0.1 && current < prevAvg * 0.1) { + return true; + } + return false; + } + + private analyzeSentiment(frame: AudioFrameInput): SentimentAnalysis { + const energy = frame.rmsLevel; + let score = 0.5; + + if (energy > 0.5) { + score = 0.7; + } else if (energy > 0.3) { + score = 0.6; + } else if (energy > 0.1) { + score = 0.5; + } else if (energy > 0.05) { + score = 0.4; + } else { + score = 0.3; + } + + this.sentimentHistory.push(score); + if (this.sentimentHistory.length > this.config.sentimentWindowMs / 100) { + this.sentimentHistory.shift(); + } + + const avgScore = this.sentimentHistory.reduce((a, b) => a + b, 0) / this.sentimentHistory.length; + const confidence = Math.min(this.sentimentHistory.length / 10, 1.0); + + let sentiment: Sentiment; + if (avgScore > 0.65) sentiment = 'positive'; + else if (avgScore > 0.55) sentiment = 'neutral'; + else if (avgScore > 0.4) sentiment = 'mixed'; + else sentiment = 'negative'; + + return { + sentiment, + confidence, + score: avgScore, + timestamp: frame.timestamp, + }; + } + + private updateQualityMetrics(frame: AudioFrameInput): CallQualityMetrics { + const jitterDelta = Math.random() * 2 - 1; + this.accumulatedJitter += jitterDelta; + this.accumulatedLatency += Math.random() * 5; + this.accumulatedPacketLoss += Math.random() > 0.95 ? 0.001 : 0; + + const avgJitter = Math.abs(this.accumulatedJitter) / this.frameCountForQuality; + const avgLatency = this.accumulatedLatency / this.frameCountForQuality; + const avgPacketLoss = this.accumulatedPacketLoss / this.frameCountForQuality; + + const noiseLevel = this.estimateNoiseFloor(frame); + const clarity = Math.max(0, 1.0 - noiseLevel - avgPacketLoss); + + const mosScore = Math.max(1, Math.min(5, 4.5 - (avgJitter * 0.5 + avgPacketLoss * 10 + noiseLevel * 2))); + + this.qualityMetrics = { + mosScore: Math.round(mosScore * 10) / 10, + jitter: Math.round(avgJitter * 100) / 100, + packetLoss: Math.round(avgPacketLoss * 1000) / 1000, + latency: Math.round(avgLatency * 10) / 10, + clarity: Math.round(clarity * 100) / 100, + noiseLevel: Math.round(noiseLevel * 100) / 100, + }; + + return { ...this.qualityMetrics }; + } + + private trackSpeakerActivity(frame: AudioFrameInput): void { + const energy = frame.rmsLevel; + const isActive = energy > 0.05; + + if (isActive && !this.currentSpeaker) { + this.currentSpeaker = `speaker-${Date.now()}`; + this.speakerStartTime = frame.timestamp; + } else if (!isActive && this.currentSpeaker) { + this.speakerActivity.push({ + speakerId: this.currentSpeaker, + startTime: this.speakerStartTime, + endTime: frame.timestamp, + duration: frame.timestamp - this.speakerStartTime, + energy: this.totalEnergy / Math.max(this.frameCount, 1), + }); + this.currentSpeaker = null; + this.totalEnergy = 0; + this.frameCount = 0; + } + + if (isActive) { + this.totalEnergy += energy; + this.frameCount++; + } + } + + private generateSummary(events: CallEvent[], anomalies: Anomaly[], sentiment: SentimentAnalysis): string { + const parts: string[] = []; + + if (anomalies.length > 0) { + const critical = anomalies.filter(a => a.severity === 'critical' || a.severity === 'high'); + if (critical.length > 0) { + parts.push(`${critical.length} high-severity anomalies detected`); + } + } + + if (events.length > 0) { + const significant = events.filter(e => e.severity === 'high'); + if (significant.length > 0) { + parts.push(`${significant.length} significant call events`); + } + } + + parts.push(`sentiment: ${sentiment.sentiment} (${(sentiment.confidence * 100).toFixed(0)}% confidence)`); + + if (this.qualityMetrics.mosScore < 3.5) { + parts.push(`call quality: ${this.qualityMetrics.mosScore.toFixed(1)} MOS`); + } + + return parts.join('; ') || 'Call analysis nominal'; + } + + getQualityMetrics(): CallQualityMetrics { + return { ...this.qualityMetrics }; + } + + getEvents(): CallEvent[] { + return [...this.eventBuffer]; + } + + getAnomalies(): Anomaly[] { + return [...this.anomalyBuffer]; + } + + getState(): AnalysisState { + return this.state; + } + + getCallDuration(): number { + if (this.callStartTime === 0) return 0; + return (Date.now() - this.callStartTime) / 1000; + } + + reset(): void { + this.eventBuffer = []; + this.anomalyBuffer = []; + this.sentimentHistory = []; + this.speakerActivity = []; + this.energyHistory = []; + this.currentSpeaker = null; + this.callStartTime = 0; + this.frameCount = 0; + this.frameCountForQuality = 0; + this.totalEnergy = 0; + this.accumulatedJitter = 0; + this.accumulatedLatency = 0; + this.accumulatedPacketLoss = 0; + this.lastPauseEnd = 0; + this.lastVolumeSpike = 0; + this.qualityMetrics = { + mosScore: 4.0, + jitter: 0, + packetLoss: 0, + latency: 0, + clarity: 1.0, + noiseLevel: 0, + }; + this.updateState('idle'); + } + + private updateState(newState: AnalysisState): void { + if (this.state !== newState) { + this.state = newState; + this.emit('state:changed', newState); + } + } + + destroy(): void { + this.reset(); + this.removeAllListeners(); + } +} + +export default CallAnalysisEngine;