import { Queue, Worker } from "bullmq"; import { Redis } from "ioredis"; import { ScanService, ScanScheduler, WebhookHandler } from "@shieldai/darkwatch"; import { AlertPipeline } from "@shieldai/darkwatch"; const redisUrl = process.env.REDIS_URL || "redis://localhost:6379"; const connection = new Redis(redisUrl); const scanQueue = new Queue("darkwatch-scans", { connection }); const alertQueue = new Queue("darkwatch-alerts", { connection }); const scheduleQueue = new Queue("darkwatch-scheduler", { connection }); const scanWorker = new Worker( "darkwatch-scans", async (job) => { const { userId, source } = job.data; const scanService = new ScanService(); const resultCount = await scanService.runScan(userId, source); return { resultCount, completedAt: new Date().toISOString() }; }, { connection, concurrency: 3 } ); const alertWorker = new Worker( "darkwatch-alerts", async () => { const pipeline = new AlertPipeline(); const sent = await pipeline.sendPendingAlerts(); return { sent, processedAt: new Date().toISOString() }; }, { connection, concurrency: 1 } ); const scheduleWorker = new Worker( "darkwatch-scheduler", async () => { const scheduler = new ScanScheduler(); const dueSchedules = await scheduler.getDueSchedules(); const results: Array<{ userId: string; queued: boolean }> = []; for (const schedule of dueSchedules) { try { await scanQueue.add("scheduled-scan", { userId: schedule.userId, source: undefined, }, { attempts: 3, backoff: { type: "exponential", delay: 5000 }, jobId: `scheduled-scan-${schedule.userId}-${Date.now()}`, }); await scheduler.markScanned(schedule.userId); results.push({ userId: schedule.userId, queued: true }); } catch (err) { console.error(`[Scheduler] Failed to queue scan for ${schedule.userId}:`, err); results.push({ userId: schedule.userId, queued: false }); } } return { processed: results.length, completedAt: new Date().toISOString() }; }, { connection, concurrency: 1 } ); const webhookWorker = new Worker( "darkwatch-webhooks", async () => { const handler = new WebhookHandler(); const processed = await handler.processPendingEvents(); return { processed, completedAt: new Date().toISOString() }; }, { connection, concurrency: 1 } ); scanWorker.on("completed", (job) => { console.log(`[Scan] Job ${job?.id} completed: ${JSON.stringify(job?.returnvalue)}`); }); scanWorker.on("failed", (job, err) => { console.error(`[Scan] Job ${job?.id} failed: ${err.message}`); }); alertWorker.on("completed", (job) => { console.log(`[Alert] Job ${job?.id} completed: ${JSON.stringify(job?.returnvalue)}`); }); alertWorker.on("failed", (job, err) => { console.error(`[Alert] Job ${job?.id} failed: ${err.message}`); }); scheduleWorker.on("completed", (job) => { console.log(`[Scheduler] Job ${job?.id} completed: ${JSON.stringify(job?.returnvalue)}`); }); scheduleWorker.on("failed", (job, err) => { console.error(`[Scheduler] Job ${job?.id} failed: ${err.message}`); }); webhookWorker.on("completed", (job) => { console.log(`[Webhook] Job ${job?.id} completed: ${JSON.stringify(job?.returnvalue)}`); }); webhookWorker.on("failed", (job, err) => { console.error(`[Webhook] Job ${job?.id} failed: ${err.message}`); }); export async function addScanJob(userId: string, source?: string) { return scanQueue.add("scan", { userId, source }, { attempts: 3, backoff: { type: "exponential", delay: 5000 }, jobId: `scan-${userId}-${Date.now()}`, }); } export async function scheduleAlertProcessing() { return alertQueue.add("process-alerts", {}, { repeat: { pattern: "*/5 * * * *" }, jobId: "alert-processor-recurring", }); } export async function schedulePeriodicScanCheck() { return scheduleQueue.add("check-due-scans", {}, { repeat: { pattern: "*/10 * * * *" }, jobId: "scheduler-recurring", }); } export async function scheduleWebhookProcessor() { const webhookQueue = new Queue("darkwatch-webhooks", { connection }); return webhookQueue.add("process-pending-webhooks", {}, { repeat: { pattern: "*/2 * * * *" }, jobId: "webhook-processor-recurring", }); } // Waitlist email worker import { EmailService } from '@shieldai/shared-notifications'; const waitlistEmailWorker = new Worker( "waitlist-emails", async (job) => { const { email, name, entryId } = job.data; const templateIdMap: Record = { 'send-waitlist-intro': 'waitlist_intro', 'send-waitlist-features': 'waitlist_features', 'send-waitlist-launch-teaser': 'waitlist_launch_teaser', }; const templateId = templateIdMap[job.name]; if (!templateId) { throw new Error(`Unknown waitlist email job: ${job.name}`); } const emailService = EmailService.getInstance(); const result = await emailService.sendWithTemplate(email, { templateId, variables: { name, entryId }, }); if (result.status === 'failed') { throw new Error(`Failed to send ${templateId} to ${email}: ${result.error}`); } return { templateId, email, deliveredAt: result.deliveredAt }; }, { connection, concurrency: 5 } ); waitlistEmailWorker.on("completed", (job) => { console.log(`[WaitlistEmail] Job ${job?.id} (${job?.name}) completed for ${job?.data?.email}`); }); waitlistEmailWorker.on("failed", (job, err) => { console.error(`[WaitlistEmail] Job ${job?.id} (${job?.name}) failed: ${err.message}`); }); console.log("Job workers started"); // Report generation workers import { reportGenerationWorker, reportSchedulerWorker, scheduleReportProcessor, scheduleMonthlyReportTrigger, scheduleAnnualReportTrigger, } from './report.jobs'; scheduleReportProcessor().catch(console.error); scheduleMonthlyReportTrigger().catch(console.error); scheduleAnnualReportTrigger().catch(console.error);