import { Queue, Worker, Job } from "bullmq"; import { Redis } from "ioredis"; const redisUrl = process.env.REDIS_URL || "redis://localhost:6379"; const { host, port } = new URL(redisUrl); const connection = new Redis({ host, port: parseInt(port, 10), retryStrategy: (times: number) => { const maxAttempts = parseInt(process.env.VOICEPRINT_MAX_RETRIES || "5", 10); const delay = Math.min(times * 1000, 5000); return times < maxAttempts ? delay : null; }, }); const analysisQueue = new Queue("voiceprint-analysis", { connection }); const VOICEPRINT_CONFIG = { concurrency: parseInt(process.env.VOICEPRINT_CONCURRENCY || "2", 10), maxAttempts: parseInt(process.env.VOICEPRINT_MAX_ATTEMPTS || "3", 10), defaultBackoffDelay: parseInt(process.env.VOICEPRINT_BACKOFF_DELAY || "5000", 10), }; export function createAnalysisWorker(): Worker { const analysisWorker = new Worker( "voiceprint-analysis", async (job: Job) => { const { userId, audioBuffer, sampleRate, analysisType } = job.data; // Import AnalysisService within job handler to avoid circular deps const { analysisService: { analyze } } = await import("@shieldai/voiceprint"); const decodedAudio = Buffer.from(audioBuffer, "base64"); const result = await analyze(userId, decodedAudio, { enrollmentId: undefined, audioUrl: undefined, }); return { jobId: result.id, completedAt: new Date().toISOString() }; }, { connection, concurrency: VOICEPRINT_CONFIG.concurrency, removeOnComplete: { age: 7 * 24 * 60 * 60 * 1000, // 7 days count: 1000, }, removeOnFail: { age: 30 * 24 * 60 * 60 * 1000, // 30 days count: 500, }, } ); analysisWorker.on("completed", (job: Job | undefined) => { if (job) { console.log(`[VoicePrint] Job ${job.id} completed: ${JSON.stringify(job.returnvalue)}`); } }); analysisWorker.on("failed", (job: Job | undefined, err: Error) => { if (job) { console.error(`[VoicePrint] Job ${job.id} failed: ${err.message}`); } }); analysisWorker.on("error", (err: Error) => { console.error("[VoicePrint] Worker error:", err.message); }); return analysisWorker; } export async function addAnalysisJob( userId: string, audioBuffer: Buffer, sampleRate?: number, analysisType?: string ) { return analysisQueue.add("analyze", { userId, audioBuffer: audioBuffer.toString("base64"), sampleRate, analysisType, }, { attempts: 3, backoff: { type: "exponential", delay: 5000 }, jobId: `vp-${userId}-${Date.now()}`, }); }