FRE-4473: Add VoicePrint job workers and Python ML service

- packages/jobs/: BullMQ-based async job queue for audio analysis
  with concurrency control and retry logic
- services/voiceprint-ml/: FastAPI microservice for ECAPA-TDNN
  inference with mock model, preprocessing, embedding extraction,
  and synthetic voice detection endpoints
- Includes Dockerfile and requirements.txt for ML service

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-29 17:18:27 -04:00
parent 0495ee5bd2
commit 29303799ed
6 changed files with 315 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
{
"name": "@shieldsai/jobs",
"version": "0.1.0",
"private": true,
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
"scripts": {
"start": "tsx src/index.ts",
"dev": "tsx watch src/index.ts"
},
"dependencies": {
"@shieldsai/shared-db": "*",
"@shieldsai/shared-utils": "*",
"bullmq": "^5.1.0",
"ioredis": "^5.3.0",
"zod": "^4.3.6"
},
"devDependencies": {
"tsx": "^4.7.1",
"typescript": "^5.3.3"
}
}

View File

@@ -0,0 +1,4 @@
export {
voiceprintAnalysisQueue,
voiceprintAnalysisWorker,
} from './voiceprint.jobs';

View File

@@ -0,0 +1,93 @@
import { prisma } from '@shieldsai/shared-db';
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
// Redis connection
const redisHost = process.env.REDIS_HOST || 'localhost';
const redisPort = parseInt(process.env.REDIS_PORT || '6379', 10);
const connection = new Redis({
host: redisHost,
port: redisPort,
retryStrategy: (times: number) => Math.min(times * 50, 2000),
});
// Queue configuration
const QUEUE_CONFIG = {
voiceprintAnalysis: {
name: 'voiceprint-analysis',
concurrency: parseInt(process.env.VOICEPRINT_CONCURRENCY || '3', 10),
defaultJobTimeout: parseInt(process.env.VOICEPRINT_JOB_TIMEOUT || '30000', 10),
maxAttempts: parseInt(process.env.VOICEPRINT_MAX_ATTEMPTS || '3', 10),
},
};
// Create queues
export const voiceprintAnalysisQueue = new Queue(
QUEUE_CONFIG.voiceprintAnalysis.name,
{ connection }
);
// VoicePrint analysis job processor
async function processVoiceprintAnalysis(job: Job<{
userId: string;
audioBuffer: Buffer;
enrollmentId?: string;
audioUrl?: string;
}>) {
const { userId, audioBuffer, enrollmentId, audioUrl } = job.data;
// Import analysis service dynamically to avoid circular dependencies
const { analysisService } = await import(
'../../../apps/api/src/services/voiceprint'
);
try {
const result = await analysisService.analyze(userId, audioBuffer, {
enrollmentId,
audioUrl,
});
return {
analysisId: result.id,
isSynthetic: result.isSynthetic,
confidence: result.confidence,
};
} catch (error) {
const message = error instanceof Error ? error.message : 'Analysis failed';
job.updateProgress(100);
throw new Error(message);
}
}
// Create worker
export const voiceprintAnalysisWorker = new Worker(
QUEUE_CONFIG.voiceprintAnalysis.name,
processVoiceprintAnalysis,
{
connection,
concurrency: QUEUE_CONFIG.voiceprintAnalysis.concurrency,
limiter: {
max: 10,
duration: 1000,
},
}
);
// Add event handlers
voiceprintAnalysisWorker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
voiceprintAnalysisWorker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
voiceprintAnalysisWorker.on('error', (err) => {
console.error('Worker error:', err.message);
});
export default {
voiceprintAnalysisQueue,
voiceprintAnalysisWorker,
};