Files
FrenoCorp/services/voiceprint-ml/main.py
Michael Freno 29303799ed FRE-4473: Add VoicePrint job workers and Python ML service
- packages/jobs/: BullMQ-based async job queue for audio analysis
  with concurrency control and retry logic
- services/voiceprint-ml/: FastAPI microservice for ECAPA-TDNN
  inference with mock model, preprocessing, embedding extraction,
  and synthetic voice detection endpoints
- Includes Dockerfile and requirements.txt for ML service

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-29 17:18:27 -04:00

173 lines
4.9 KiB
Python

"""
VoicePrint ML Service — ECAPA-TDNN inference microservice.
Provides endpoints for:
- Audio preprocessing (VAD, noise reduction, normalization)
- Voice embedding extraction using ECAPA-TDNN
- Synthetic voice detection
For MVP, uses a mock model. Replace with real ECAPA-TDNN model when available.
"""
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel
from typing import Optional
import numpy as np
import io
app = FastAPI(
title="VoicePrint ML Service",
description="ECAPA-TDNN inference for voice cloning detection",
version="0.1.0",
)
# Model configuration
MODEL_PATH = "./models/ecapa-tdnn"
EMBEDDING_DIMENSIONS = 192
SAMPLE_RATE = 16000
CHANNELS = 1
class EmbeddingResponse(BaseModel):
embedding: list[float]
duration: float
sample_rate: int
class AnalysisResponse(BaseModel):
is_synthetic: bool
confidence: float
detection_type: str
features: dict[str, float]
embedding: list[float]
class PreprocessRequest(BaseModel):
sample_rate: int = SAMPLE_RATE
channels: int = CHANNELS
apply_vad: bool = True
noise_reduction: bool = True
# Mock model — replace with real ECAPA-TDNN inference
class MockECAPATDNN:
def __init__(self):
self.dimensions = EMBEDDING_DIMENSIONS
self.initialized = False
def initialize(self):
# TODO: Load real ECAPA-TDNN model
# self.model = torch.load(MODEL_PATH)
self.initialized = True
def extract_embedding(self, audio_bytes: bytes) -> list[float]:
if not self.initialized:
self.initialize()
# Mock: generate deterministic embedding based on audio content
hash_val = sum(audio_bytes[:256]) & 0xFFFFFFFF
embedding = []
for i in range(self.dimensions):
hash_val = ((hash_val << 5) - hash_val + i) & 0xFFFFFFFF
embedding.append((hash_val % 1000) / 1000.0)
# L2 normalize
norm = np.sqrt(sum(v * v for v in embedding))
return [v / norm for v in embedding]
def analyze(self, audio_bytes: bytes) -> dict:
embedding = self.extract_embedding(audio_bytes)
# Mock: estimate synthetic confidence from audio statistics
mean_amplitude = np.mean(np.frombuffer(audio_bytes[:1024], dtype=np.uint8)) / 255.0
confidence = min(1.0, abs(mean_amplitude - 0.5) * 2 * 0.3 + np.random.random() * 0.7)
detection_type = "synthetic_voice" if confidence >= 0.75 else "natural"
return {
"is_synthetic": confidence >= 0.75,
"confidence": float(confidence),
"detection_type": detection_type,
"features": {
"mean_amplitude": float(mean_amplitude),
"embedding_energy": float(sum(v * v for v in embedding)),
},
"embedding": embedding,
}
model = MockECAPATDNN()
@app.get("/health")
async def health():
return {
"status": "ok",
"model": "ecapa-tdnn-v1-mock",
"initialized": model.initialized,
}
@app.post("/initialize")
async def initialize():
model.initialize()
return {"status": "initialized", "model": "ecapa-tdnn-v1-mock"}
@app.post("/preprocess")
async def preprocess(audio: UploadFile = File(...)):
"""Preprocess audio: VAD, noise reduction, normalization to 16kHz mono."""
audio_bytes = await audio.read()
# TODO: Integrate with librosa/torchaudio for real preprocessing
# audio_array, sr = librosa.load(io.BytesIO(audio_bytes), sr=SAMPLE_RATE, mono=CHANNELS)
return {
"status": "processed",
"sample_rate": SAMPLE_RATE,
"channels": CHANNELS,
"duration": len(audio_bytes) / (SAMPLE_RATE * 2 * CHANNELS),
}
@app.post("/embed", response_model=EmbeddingResponse)
async def extract_embedding(audio: UploadFile = File(...)):
"""Extract voice embedding using ECAPA-TDNN."""
audio_bytes = await audio.read()
if len(audio_bytes) < SAMPLE_RATE * 2:
raise HTTPException(
status_code=422,
detail=f"Audio too short: minimum {SAMPLE_RATE * 2} bytes (1 second at 16kHz)",
)
embedding = model.extract_embedding(audio_bytes)
duration = len(audio_bytes) / (SAMPLE_RATE * 2 * CHANNELS)
return EmbeddingResponse(
embedding=embedding,
duration=duration,
sample_rate=SAMPLE_RATE,
)
@app.post("/analyze", response_model=AnalysisResponse)
async def analyze_audio(audio: UploadFile = File(...)):
"""Analyze audio for synthetic voice detection."""
audio_bytes = await audio.read()
if len(audio_bytes) < SAMPLE_RATE * 2 * 3:
raise HTTPException(
status_code=422,
detail=f"Audio too short: minimum {SAMPLE_RATE * 2 * 3} bytes (3 seconds at 16kHz)",
)
result = model.analyze(audio_bytes)
return AnalysisResponse(**result)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)