From 29303799ed459e30dc467e137c0ab499185f1b3a Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Wed, 29 Apr 2026 17:18:27 -0400 Subject: [PATCH] FRE-4473: Add VoicePrint job workers and Python ML service - packages/jobs/: BullMQ-based async job queue for audio analysis with concurrency control and retry logic - services/voiceprint-ml/: FastAPI microservice for ECAPA-TDNN inference with mock model, preprocessing, embedding extraction, and synthetic voice detection endpoints - Includes Dockerfile and requirements.txt for ML service Co-Authored-By: Paperclip --- packages/jobs/package.json | 23 ++++ packages/jobs/src/index.ts | 4 + packages/jobs/src/voiceprint.jobs.ts | 93 +++++++++++++ services/voiceprint-ml/Dockerfile | 15 +++ services/voiceprint-ml/main.py | 172 ++++++++++++++++++++++++ services/voiceprint-ml/requirements.txt | 8 ++ 6 files changed, 315 insertions(+) create mode 100644 packages/jobs/package.json create mode 100644 packages/jobs/src/index.ts create mode 100644 packages/jobs/src/voiceprint.jobs.ts create mode 100644 services/voiceprint-ml/Dockerfile create mode 100644 services/voiceprint-ml/main.py create mode 100644 services/voiceprint-ml/requirements.txt diff --git a/packages/jobs/package.json b/packages/jobs/package.json new file mode 100644 index 000000000..498e17436 --- /dev/null +++ b/packages/jobs/package.json @@ -0,0 +1,23 @@ +{ + "name": "@shieldsai/jobs", + "version": "0.1.0", + "private": true, + "type": "module", + "main": "src/index.ts", + "types": "src/index.ts", + "scripts": { + "start": "tsx src/index.ts", + "dev": "tsx watch src/index.ts" + }, + "dependencies": { + "@shieldsai/shared-db": "*", + "@shieldsai/shared-utils": "*", + "bullmq": "^5.1.0", + "ioredis": "^5.3.0", + "zod": "^4.3.6" + }, + "devDependencies": { + "tsx": "^4.7.1", + "typescript": "^5.3.3" + } +} diff --git a/packages/jobs/src/index.ts b/packages/jobs/src/index.ts new file mode 100644 index 000000000..eaee74ace --- /dev/null +++ b/packages/jobs/src/index.ts @@ -0,0 +1,4 @@ +export { + voiceprintAnalysisQueue, + voiceprintAnalysisWorker, +} from './voiceprint.jobs'; diff --git a/packages/jobs/src/voiceprint.jobs.ts b/packages/jobs/src/voiceprint.jobs.ts new file mode 100644 index 000000000..f60c3575c --- /dev/null +++ b/packages/jobs/src/voiceprint.jobs.ts @@ -0,0 +1,93 @@ +import { prisma } from '@shieldsai/shared-db'; +import { Queue, Worker, Job } from 'bullmq'; +import { Redis } from 'ioredis'; + +// Redis connection +const redisHost = process.env.REDIS_HOST || 'localhost'; +const redisPort = parseInt(process.env.REDIS_PORT || '6379', 10); + +const connection = new Redis({ + host: redisHost, + port: redisPort, + retryStrategy: (times: number) => Math.min(times * 50, 2000), +}); + +// Queue configuration +const QUEUE_CONFIG = { + voiceprintAnalysis: { + name: 'voiceprint-analysis', + concurrency: parseInt(process.env.VOICEPRINT_CONCURRENCY || '3', 10), + defaultJobTimeout: parseInt(process.env.VOICEPRINT_JOB_TIMEOUT || '30000', 10), + maxAttempts: parseInt(process.env.VOICEPRINT_MAX_ATTEMPTS || '3', 10), + }, +}; + +// Create queues +export const voiceprintAnalysisQueue = new Queue( + QUEUE_CONFIG.voiceprintAnalysis.name, + { connection } +); + +// VoicePrint analysis job processor +async function processVoiceprintAnalysis(job: Job<{ + userId: string; + audioBuffer: Buffer; + enrollmentId?: string; + audioUrl?: string; +}>) { + const { userId, audioBuffer, enrollmentId, audioUrl } = job.data; + + // Import analysis service dynamically to avoid circular dependencies + const { analysisService } = await import( + '../../../apps/api/src/services/voiceprint' + ); + + try { + const result = await analysisService.analyze(userId, audioBuffer, { + enrollmentId, + audioUrl, + }); + + return { + analysisId: result.id, + isSynthetic: result.isSynthetic, + confidence: result.confidence, + }; + } catch (error) { + const message = error instanceof Error ? error.message : 'Analysis failed'; + job.updateProgress(100); + throw new Error(message); + } +} + +// Create worker +export const voiceprintAnalysisWorker = new Worker( + QUEUE_CONFIG.voiceprintAnalysis.name, + processVoiceprintAnalysis, + { + connection, + concurrency: QUEUE_CONFIG.voiceprintAnalysis.concurrency, + limiter: { + max: 10, + duration: 1000, + }, + } +); + +// Add event handlers +voiceprintAnalysisWorker.on('completed', (job, result) => { + console.log(`Job ${job.id} completed:`, result); +}); + +voiceprintAnalysisWorker.on('failed', (job, err) => { + console.error(`Job ${job.id} failed:`, err.message); +}); + +voiceprintAnalysisWorker.on('error', (err) => { + console.error('Worker error:', err.message); +}); + +export default { + voiceprintAnalysisQueue, + voiceprintAnalysisWorker, +}; diff --git a/services/voiceprint-ml/Dockerfile b/services/voiceprint-ml/Dockerfile new file mode 100644 index 000000000..79be4245e --- /dev/null +++ b/services/voiceprint-ml/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Create models directory +RUN mkdir -p models + +EXPOSE 8001 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"] diff --git a/services/voiceprint-ml/main.py b/services/voiceprint-ml/main.py new file mode 100644 index 000000000..d652bf680 --- /dev/null +++ b/services/voiceprint-ml/main.py @@ -0,0 +1,172 @@ +""" +VoicePrint ML Service — ECAPA-TDNN inference microservice. + +Provides endpoints for: +- Audio preprocessing (VAD, noise reduction, normalization) +- Voice embedding extraction using ECAPA-TDNN +- Synthetic voice detection + +For MVP, uses a mock model. Replace with real ECAPA-TDNN model when available. +""" + +from fastapi import FastAPI, File, UploadFile, HTTPException +from pydantic import BaseModel +from typing import Optional +import numpy as np +import io + +app = FastAPI( + title="VoicePrint ML Service", + description="ECAPA-TDNN inference for voice cloning detection", + version="0.1.0", +) + +# Model configuration +MODEL_PATH = "./models/ecapa-tdnn" +EMBEDDING_DIMENSIONS = 192 +SAMPLE_RATE = 16000 +CHANNELS = 1 + + +class EmbeddingResponse(BaseModel): + embedding: list[float] + duration: float + sample_rate: int + + +class AnalysisResponse(BaseModel): + is_synthetic: bool + confidence: float + detection_type: str + features: dict[str, float] + embedding: list[float] + + +class PreprocessRequest(BaseModel): + sample_rate: int = SAMPLE_RATE + channels: int = CHANNELS + apply_vad: bool = True + noise_reduction: bool = True + + +# Mock model — replace with real ECAPA-TDNN inference +class MockECAPATDNN: + def __init__(self): + self.dimensions = EMBEDDING_DIMENSIONS + self.initialized = False + + def initialize(self): + # TODO: Load real ECAPA-TDNN model + # self.model = torch.load(MODEL_PATH) + self.initialized = True + + def extract_embedding(self, audio_bytes: bytes) -> list[float]: + if not self.initialized: + self.initialize() + + # Mock: generate deterministic embedding based on audio content + hash_val = sum(audio_bytes[:256]) & 0xFFFFFFFF + embedding = [] + for i in range(self.dimensions): + hash_val = ((hash_val << 5) - hash_val + i) & 0xFFFFFFFF + embedding.append((hash_val % 1000) / 1000.0) + + # L2 normalize + norm = np.sqrt(sum(v * v for v in embedding)) + return [v / norm for v in embedding] + + def analyze(self, audio_bytes: bytes) -> dict: + embedding = self.extract_embedding(audio_bytes) + + # Mock: estimate synthetic confidence from audio statistics + mean_amplitude = np.mean(np.frombuffer(audio_bytes[:1024], dtype=np.uint8)) / 255.0 + confidence = min(1.0, abs(mean_amplitude - 0.5) * 2 * 0.3 + np.random.random() * 0.7) + + detection_type = "synthetic_voice" if confidence >= 0.75 else "natural" + + return { + "is_synthetic": confidence >= 0.75, + "confidence": float(confidence), + "detection_type": detection_type, + "features": { + "mean_amplitude": float(mean_amplitude), + "embedding_energy": float(sum(v * v for v in embedding)), + }, + "embedding": embedding, + } + + +model = MockECAPATDNN() + + +@app.get("/health") +async def health(): + return { + "status": "ok", + "model": "ecapa-tdnn-v1-mock", + "initialized": model.initialized, + } + + +@app.post("/initialize") +async def initialize(): + model.initialize() + return {"status": "initialized", "model": "ecapa-tdnn-v1-mock"} + + +@app.post("/preprocess") +async def preprocess(audio: UploadFile = File(...)): + """Preprocess audio: VAD, noise reduction, normalization to 16kHz mono.""" + audio_bytes = await audio.read() + + # TODO: Integrate with librosa/torchaudio for real preprocessing + # audio_array, sr = librosa.load(io.BytesIO(audio_bytes), sr=SAMPLE_RATE, mono=CHANNELS) + + return { + "status": "processed", + "sample_rate": SAMPLE_RATE, + "channels": CHANNELS, + "duration": len(audio_bytes) / (SAMPLE_RATE * 2 * CHANNELS), + } + + +@app.post("/embed", response_model=EmbeddingResponse) +async def extract_embedding(audio: UploadFile = File(...)): + """Extract voice embedding using ECAPA-TDNN.""" + audio_bytes = await audio.read() + + if len(audio_bytes) < SAMPLE_RATE * 2: + raise HTTPException( + status_code=422, + detail=f"Audio too short: minimum {SAMPLE_RATE * 2} bytes (1 second at 16kHz)", + ) + + embedding = model.extract_embedding(audio_bytes) + duration = len(audio_bytes) / (SAMPLE_RATE * 2 * CHANNELS) + + return EmbeddingResponse( + embedding=embedding, + duration=duration, + sample_rate=SAMPLE_RATE, + ) + + +@app.post("/analyze", response_model=AnalysisResponse) +async def analyze_audio(audio: UploadFile = File(...)): + """Analyze audio for synthetic voice detection.""" + audio_bytes = await audio.read() + + if len(audio_bytes) < SAMPLE_RATE * 2 * 3: + raise HTTPException( + status_code=422, + detail=f"Audio too short: minimum {SAMPLE_RATE * 2 * 3} bytes (3 seconds at 16kHz)", + ) + + result = model.analyze(audio_bytes) + + return AnalysisResponse(**result) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8001) diff --git a/services/voiceprint-ml/requirements.txt b/services/voiceprint-ml/requirements.txt new file mode 100644 index 000000000..626922cd6 --- /dev/null +++ b/services/voiceprint-ml/requirements.txt @@ -0,0 +1,8 @@ +fastapi==0.104.1 +uvicorn==0.24.0 +pydantic==2.5.0 +numpy==1.26.0 +librosa==0.10.0 +torch==2.1.0 +faiss-cpu==1.7.4 +python-multipart==0.0.6