import { prisma } from '@shieldai/db'; import { Queue, Worker, Job } from 'bullmq'; import { Redis } from 'ioredis'; import { reportService } from '@shieldai/report'; const redisHost = process.env.REDIS_HOST || 'localhost'; const redisPort = parseInt(process.env.REDIS_PORT || '6379', 10); const connection = new Redis({ host: redisHost, port: redisPort, retryStrategy: (times: number) => Math.min(times * 50, 2000), }); const QUEUE_CONFIG = { reportGeneration: { name: 'report-generation', concurrency: parseInt(process.env.REPORT_CONCURRENCY || '3', 10), defaultJobTimeout: parseInt(process.env.REPORT_JOB_TIMEOUT || '30000', 10), maxAttempts: parseInt(process.env.REPORT_MAX_ATTEMPTS || '2', 10), }, reportScheduler: { name: 'report-scheduler', concurrency: 1, }, }; export const reportGenerationQueue = new Queue( QUEUE_CONFIG.reportGeneration.name, { connection } ); export const reportSchedulerQueue = new Queue( QUEUE_CONFIG.reportScheduler.name, { connection } ); async function processReportGeneration( job: Job<{ reportId: string; userId: string; subscriptionId: string; reportType: string; periodStart?: string; periodEnd?: string; notifyEmail?: string; }> ) { const { reportId, userId, subscriptionId, reportType, periodStart, periodEnd, notifyEmail } = job.data; job.updateProgress(10); console.log(`[Report:Generate] Starting report ${reportId} for user ${userId}`); try { const report = await reportService.generateReport({ userId, subscriptionId, reportType, periodStart: periodStart ? new Date(periodStart) : undefined, periodEnd: periodEnd ? new Date(periodEnd) : undefined, }); job.updateProgress(80); if (notifyEmail && report.status === 'COMPLETED') { const { EmailService } = await import('@shieldai/shared-notifications'); const emailService = EmailService.getInstance(); const dashboardUrl = process.env.DASHBOARD_URL || 'https://app.shieldai.com'; const user = await prisma.user.findUnique({ where: { id: userId }, select: { name: true, email: true }, }); const userName = user?.name || notifyEmail.split('@')[0]; await emailService.sendWithTemplate(notifyEmail, { templateId: 'report_ready', variables: { name: userName, report_title: report.title, report_summary: report.summary || 'Your protection report contains detailed statistics and recommendations.', report_url: `${dashboardUrl}/reports/${report.id}`, pdf_url: report.pdfUrl || `${dashboardUrl}/api/v1/reports/${report.id}/pdf`, }, }); await prisma.securityReport.update({ where: { id: report.id }, data: { status: 'DELIVERED', deliveredAt: new Date(), }, }); job.updateProgress(95); } job.updateProgress(100); return { status: report.status, reportId: report.id, title: report.title, }; } catch (error) { const message = error instanceof Error ? error.message : 'Report generation failed'; console.error(`[Report:Generate] Job ${job.id} failed:`, message); await prisma.securityReport.update({ where: { id: reportId }, data: { status: 'FAILED', error: message, }, }); job.updateProgress(100); throw new Error(message); } } async function processReportScheduler(job: Job) { console.log('[Report:Scheduler] Running scheduled report check'); try { const pendingReports = await prisma.securityReport.findMany({ where: { status: 'PENDING', scheduledFor: { lte: new Date(), }, }, include: { user: { select: { email: true } }, }, }); const results: Array<{ reportId: string; queued: boolean }> = []; for (const report of pendingReports) { try { await reportGenerationQueue.add('generate-report', { reportId: report.id, userId: report.userId, subscriptionId: report.subscriptionId, reportType: report.reportType, periodStart: report.periodStart.toISOString(), periodEnd: report.periodEnd.toISOString(), notifyEmail: report.user?.email, }, { attempts: QUEUE_CONFIG.reportGeneration.maxAttempts, backoff: { type: 'exponential', delay: 5000 }, jobId: `report-gen-${report.id}`, }); results.push({ reportId: report.id, queued: true }); } catch (err) { console.error(`[Report:Scheduler] Failed to queue report ${report.id}:`, err); results.push({ reportId: report.id, queued: false }); } } return { processed: results.length, completedAt: new Date().toISOString() }; } catch (error) { console.error('[Report:Scheduler] Error:', error); throw error; } } export const reportGenerationWorker = new Worker( QUEUE_CONFIG.reportGeneration.name, processReportGeneration, { connection, concurrency: QUEUE_CONFIG.reportGeneration.concurrency, removeOnComplete: { age: 7 * 24 * 60 * 60, count: 500, }, removeOnFail: { age: 30 * 24 * 60 * 60, count: 100, }, } ); export const reportSchedulerWorker = new Worker( QUEUE_CONFIG.reportScheduler.name, processReportScheduler, { connection, concurrency: QUEUE_CONFIG.reportScheduler.concurrency, } ); reportGenerationWorker.on('completed', (job, result) => { console.log(`[Report:Generate] Job ${job.id} completed:`, result); }); reportGenerationWorker.on('failed', (job, err) => { console.error(`[Report:Generate] Job ${job?.id} failed:`, err.message); }); reportGenerationWorker.on('error', (err) => { console.error('[Report:Generate] Worker error:', err.message); }); reportSchedulerWorker.on('completed', (job, result) => { console.log(`[Report:Scheduler] Job ${job.id} completed:`, result); }); reportSchedulerWorker.on('failed', (job, err) => { console.error(`[Report:Scheduler] Job ${job?.id} failed:`, err.message); }); export async function queueReportGeneration(data: { reportId: string; userId: string; subscriptionId: string; reportType: string; periodStart?: string; periodEnd?: string; notifyEmail?: string; }) { return reportGenerationQueue.add('generate-report', data, { attempts: QUEUE_CONFIG.reportGeneration.maxAttempts, backoff: { type: 'exponential', delay: 5000 }, jobId: `report-gen-${data.reportId}-${Date.now()}`, }); } export async function scheduleReportProcessor() { return reportSchedulerQueue.add('check-pending-reports', {}, { repeat: { pattern: '0 */6 * * *' }, jobId: 'report-scheduler-recurring', }); } export async function scheduleMonthlyReportTrigger() { return reportSchedulerQueue.add('trigger-monthly-reports', {}, { repeat: { pattern: '0 0 1 * *' }, jobId: 'monthly-report-trigger', }); } export async function scheduleAnnualReportTrigger() { return reportSchedulerQueue.add('trigger-annual-reports', {}, { repeat: { pattern: '0 0 1 1 *' }, jobId: 'annual-report-trigger', }); } export default { reportGenerationQueue, reportGenerationWorker, reportSchedulerQueue, reportSchedulerWorker, };