diff --git a/packages/jobs/src/voiceprint.jobs.ts b/packages/jobs/src/voiceprint.jobs.ts index 95dc98d..9398a63 100644 --- a/packages/jobs/src/voiceprint.jobs.ts +++ b/packages/jobs/src/voiceprint.jobs.ts @@ -1,37 +1,75 @@ -import { Queue, Worker } from "bullmq"; +import { Queue, Worker, Job } from "bullmq"; import { Redis } from "ioredis"; -import { AnalysisService } from "@shieldai/voiceprint"; const redisUrl = process.env.REDIS_URL || "redis://localhost:6379"; -const connection = new Redis(redisUrl); +const { host, port } = new URL(redisUrl); +const connection = new Redis({ + host, + port: parseInt(port, 10), + retryStrategy: (times: number) => { + const maxAttempts = parseInt(process.env.VOICEPRINT_MAX_RETRIES || "5", 10); + const delay = Math.min(times * 1000, 5000); + return times < maxAttempts ? delay : null; + }, +}); const analysisQueue = new Queue("voiceprint-analysis", { connection }); -const analysisWorker = new Worker( - "voiceprint-analysis", - async (job) => { - const { userId, audioBuffer, sampleRate, analysisType } = job.data; - const analysisService = new AnalysisService(); - const result = await analysisService.analyze( - { - audioBuffer: Buffer.from(audioBuffer, "base64"), - sampleRate, - analysisType, +const VOICEPRINT_CONFIG = { + concurrency: parseInt(process.env.VOICEPRINT_CONCURRENCY || "2", 10), + maxAttempts: parseInt(process.env.VOICEPRINT_MAX_ATTEMPTS || "3", 10), + defaultBackoffDelay: parseInt(process.env.VOICEPRINT_BACKOFF_DELAY || "5000", 10), +}; + +export function createAnalysisWorker(): Worker { + const analysisWorker = new Worker( + "voiceprint-analysis", + async (job: Job) => { + const { userId, audioBuffer, sampleRate, analysisType } = job.data; + + // Import AnalysisService within job handler to avoid circular deps + const { analysisService: { analyze } } = await import("@shieldai/voiceprint"); + + const decodedAudio = Buffer.from(audioBuffer, "base64"); + const result = await analyze(userId, decodedAudio, { + enrollmentId: undefined, + audioUrl: undefined, + }); + + return { jobId: result.id, completedAt: new Date().toISOString() }; + }, + { + connection, + concurrency: VOICEPRINT_CONFIG.concurrency, + removeOnComplete: { + age: 7 * 24 * 60 * 60 * 1000, // 7 days + count: 1000, }, - userId - ); - return { jobId: result.jobId, completedAt: new Date().toISOString() }; - }, - { connection, concurrency: 2 } -); + removeOnFail: { + age: 30 * 24 * 60 * 60 * 1000, // 30 days + count: 500, + }, + } + ); -analysisWorker.on("completed", (job) => { - console.log(`[VoicePrint] Job ${job.id} completed: ${JSON.stringify(job.returnvalue)}`); -}); + analysisWorker.on("completed", (job: Job | undefined) => { + if (job) { + console.log(`[VoicePrint] Job ${job.id} completed: ${JSON.stringify(job.returnvalue)}`); + } + }); -analysisWorker.on("failed", (job, err) => { - console.error(`[VoicePrint] Job ${job.id} failed: ${err.message}`); -}); + analysisWorker.on("failed", (job: Job | undefined, err: Error) => { + if (job) { + console.error(`[VoicePrint] Job ${job.id} failed: ${err.message}`); + } + }); + + analysisWorker.on("error", (err: Error) => { + console.error("[VoicePrint] Worker error:", err.message); + }); + + return analysisWorker; +} export async function addAnalysisJob( userId: string, @@ -51,4 +89,3 @@ export async function addAnalysisJob( }); } -console.log("[VoicePrint] Analysis worker started");