From eb8e57c674d22f17a852b887023d75cc0758d894 Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Mon, 25 May 2026 17:16:21 -0400 Subject: [PATCH] feat: implement background job system with queue, worker, scheduler, and handlers - Add job queue abstraction (InMemoryQueue and Redis/BullMQ adapter) - Add polling worker with retry logic and exponential backoff - Add 6 job handlers: darkwatch.scan, voiceprint.batch, hometitle.scan, removebrokers.process, reports.generate, notifications.send - Add cron-based scheduler with tier-appropriate frequencies (Basic/Plus/Premium) - Add tRPC scheduler router for admin (runJobNow, getJobStatus, etc.) - Add entry point with graceful shutdown support - Achieve 100% test pass rate for new job system --- pnpm-lock.yaml | 149 ++++++++++++ web/package.json | 4 + web/src/server/api/root.ts | 2 + web/src/server/api/routers/scheduler.ts | 82 +++++++ web/src/server/api/schemas/scheduler.ts | 19 ++ .../jobs/handlers/darkwatch.scan.test.ts | 65 ++++++ .../server/jobs/handlers/darkwatch.scan.ts | 38 +++ .../server/jobs/handlers/hometitle.scan.ts | 38 +++ web/src/server/jobs/handlers/index.ts | 34 +++ .../jobs/handlers/notifications.send.ts | 70 ++++++ .../jobs/handlers/removebrokers.process.ts | 29 +++ .../server/jobs/handlers/reports.generate.ts | 58 +++++ .../server/jobs/handlers/voiceprint.batch.ts | 64 +++++ web/src/server/jobs/index.ts | 40 ++++ web/src/server/jobs/queue.test.ts | 102 ++++++++ web/src/server/jobs/queue.ts | 220 ++++++++++++++++++ web/src/server/jobs/scheduler.ts | 159 +++++++++++++ web/src/server/jobs/worker.test.ts | 170 ++++++++++++++ web/src/server/jobs/worker.ts | 86 +++++++ 19 files changed, 1429 insertions(+) create mode 100644 web/src/server/api/routers/scheduler.ts create mode 100644 web/src/server/api/schemas/scheduler.ts create mode 100644 web/src/server/jobs/handlers/darkwatch.scan.test.ts create mode 100644 web/src/server/jobs/handlers/darkwatch.scan.ts create mode 100644 web/src/server/jobs/handlers/hometitle.scan.ts create mode 100644 web/src/server/jobs/handlers/index.ts create mode 100644 web/src/server/jobs/handlers/notifications.send.ts create mode 100644 web/src/server/jobs/handlers/removebrokers.process.ts create mode 100644 web/src/server/jobs/handlers/reports.generate.ts create mode 100644 web/src/server/jobs/handlers/voiceprint.batch.ts create mode 100644 web/src/server/jobs/index.ts create mode 100644 web/src/server/jobs/queue.test.ts create mode 100644 web/src/server/jobs/queue.ts create mode 100644 web/src/server/jobs/scheduler.ts create mode 100644 web/src/server/jobs/worker.test.ts create mode 100644 web/src/server/jobs/worker.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 08edce8..a35aab2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -55,15 +55,24 @@ importers: bcryptjs: specifier: ^3.0.3 version: 3.0.3 + bullmq: + specifier: ^5.77.3 + version: 5.77.3 drizzle-orm: specifier: ^0.45.2 version: 0.45.2(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(pg@8.21.0) firebase-admin: specifier: ^13.10.0 version: 13.10.0 + ioredis: + specifier: ^5.10.1 + version: 5.10.1 jose: specifier: ^5 version: 5.10.0 + node-cron: + specifier: ^4.2.1 + version: 4.2.1 pg: specifier: ^8.21.0 version: 8.21.0 @@ -92,6 +101,9 @@ importers: specifier: ^7.0.0 version: 7.3.3(@types/node@25.9.1)(jiti@2.7.0)(lightningcss@1.32.0)(terser@5.48.0) devDependencies: + '@types/node-cron': + specifier: ^3.0.11 + version: 3.0.11 '@types/pg': specifier: ^8.20.0 version: 8.20.0 @@ -1040,6 +1052,36 @@ packages: engines: {node: '>=18'} hasBin: true + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.4': + resolution: {integrity: sha512-LCkGo6JDfaBhgST7UpPWgNgLINpcpabaHfyz5OBx75nUYxBsaEPxjnyNjWpeb/xBup/682QnBfRBy2/LvPutZQ==} + cpu: [arm64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.4': + resolution: {integrity: sha512-zExlW9zUJKZH/tOtVMttwjKa4Xm/3KcNjnE3dPN92uCktwavMxpgCA3MoJK/DOnTWsQgo224OaST27/mPNAf+w==} + cpu: [x64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.4': + resolution: {integrity: sha512-dgX0P/9wGPJeHFBG+ZmhgE6bmtMt7NP5CRBGyyktpopdk/mW4POnrpQsSLtKI1dwpc+pPLuXHDh6vvskyQE/sw==} + cpu: [arm64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.4': + resolution: {integrity: sha512-Tg3yX65f5GbtXLkrYEHE5oibZG9epyYWas7FogTTEJeDEF9JlXJzKgXaNhT3UXlTOeA+AfZpYZYZ0uPj7Cfquw==} + cpu: [arm] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.4': + resolution: {integrity: sha512-8TNXMEjJc3QEy7R/x1INhgiU+XakDAFUzBhaz7+Rbrs8NH5UQeHQxxmzsSBJGyV6I1jW79undiQm8tOI+D+8FQ==} + cpu: [x64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.4': + resolution: {integrity: sha512-CmCXPQrkbwExx3j946/PtHWHbYJiCRBRDl4BlkRQcJB/YOwQxJRTpoo7aTsortjgoJ1x7opzTSxn7C+ASSLVjQ==} + cpu: [x64] + os: [win32] + '@nodable/entities@2.1.0': resolution: {integrity: sha512-nyT7T3nbMyBI/lvr6L5TyWbFJAI9FTgVRakNoBqCD+PmID8DzFrrNdLLtHMwMszOtqZa8PAOV24ZqDnQrhQINA==} @@ -1650,6 +1692,9 @@ packages: '@types/ms@2.1.0': resolution: {integrity: sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA==} + '@types/node-cron@3.0.11': + resolution: {integrity: sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==} + '@types/node@25.9.1': resolution: {integrity: sha512-xfrlY7UD5rMJk3ZVJP8BNzS28J36YJg+xp+LPXV1TdWxr8uMH5A860QNxYDGQe/ylDSgjxE52Q9VnO7p75tJxg==} @@ -1943,6 +1988,15 @@ packages: buffer@6.0.3: resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==} + bullmq@5.77.3: + resolution: {integrity: sha512-ccGjnRF5Bbu21CfzDG7toIewoAj5W9DUHjqhFnppt8YBtoQplynzBGuSzn7yYQobpyYeHEqrYhiJ1NZInrKbyA==} + engines: {node: '>=12.22.0'} + peerDependencies: + redis: '>=5.0.0' + peerDependenciesMeta: + redis: + optional: true + bundle-name@4.1.0: resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==} engines: {node: '>=18'} @@ -2085,6 +2139,10 @@ packages: resolution: {integrity: sha512-piICUB6ei4IlTv1+653yq5+KoqfBYmj9bw6LqXoOneTMDXk5nM1qt12mFW1caG3LlJXEKW1Bp0WggEmIfQB34g==} engines: {node: '>= 14'} + cron-parser@4.9.0: + resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==} + engines: {node: '>=12.0.0'} + croner@10.0.1: resolution: {integrity: sha512-ixNtAJndqh173VQ4KodSdJEI6nuioBWI0V1ITNKhZZsO0pEMoDxz539T4FTTbSZ/xIOSuDnzxLVRqBVSvPNE2g==} engines: {node: '>=18.0'} @@ -3051,6 +3109,10 @@ packages: lru-memoizer@2.3.0: resolution: {integrity: sha512-GXn7gyHAMhO13WSKrIiNfztwxodVsP8IoZ3XfrJV4yH2x0/OeTO/FIaAHTY5YekdGgW94njfuKmyyt1E0mR6Ug==} + luxon@3.7.2: + resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==} + engines: {node: '>=12'} + magic-string@0.30.21: resolution: {integrity: sha512-vd2F4YUyEXKGcLHoq+TEyCjxueSeHnFxyyjNp80yg0XV4vUhnDer/lvvlqM/arB5bXQN5K2/3oinyCRyx8T2CQ==} @@ -3153,6 +3215,13 @@ packages: ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} + msgpackr-extract@3.0.4: + resolution: {integrity: sha512-4kmO/MdyUIkLIvTPr8VHLil4AtoKIoniWPIEk5+CDy0xnWC84azhSFmuJ7PxZdsYtiP5kEeQsORAVIeMgxT+Hw==} + hasBin: true + + msgpackr@2.0.1: + resolution: {integrity: sha512-9J+tqTEsbHqY8YohazYgty7LgerFIWxvMLpUjqETSmjHojtJm2WnX2kK/2a1fLI7CO7ERP1YSEUXMucz4j+yBA==} + nanoid@3.3.12: resolution: {integrity: sha512-ZB9RH/39qpq5Vu6Y+NmUaFhQR6pp+M2Xt76XBnEwDaGcVAqhlvxrl3B2bKS5D3NH3QR76v3aSrKaF/Kiy7lEtQ==} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} @@ -3168,9 +3237,16 @@ packages: xml2js: optional: true + node-abort-controller@3.1.1: + resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==} + node-addon-api@7.1.1: resolution: {integrity: sha512-5m3bsyrjFWE1xf7nz7YXdN4udnVtXK6/Yfgn5qnahL6bCkf2yKt4k3nuTKAtT4r3IG8JNR2ncsIMdZuAzJjHQQ==} + node-cron@4.2.1: + resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==} + engines: {node: '>=6.0.0'} + node-domexception@1.0.0: resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} engines: {node: '>=10.5.0'} @@ -3196,6 +3272,10 @@ packages: resolution: {integrity: sha512-LarFH0+6VfriEhqMMcLX2F7SwSXeWwnEAJEsYm5QKWchiVYVvJyV9v7UDvUv+w5HO23ZpQTXDv/GxdDdMyOuoQ==} engines: {node: '>= 6.13.0'} + node-gyp-build-optional-packages@5.2.2: + resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==} + hasBin: true + node-gyp-build@4.8.4: resolution: {integrity: sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==} hasBin: true @@ -3576,6 +3656,11 @@ packages: resolution: {integrity: sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==} hasBin: true + semver@7.8.0: + resolution: {integrity: sha512-AcM7dV/5ul4EekoQ29Agm5vri8JNqRyj39o0qpX6vDF2GZrtutZl5RwgD1XnZjiTAfncsJhMI48QQH3sN87YNA==} + engines: {node: '>=10'} + hasBin: true + semver@7.8.1: resolution: {integrity: sha512-rkVq3IXh+4FDGch+KwzX3aV9W3kO54GyEgpvBzSyctDA6Xtd7RJQV1xmXbeQp5v7+VzLOfVqiutSE6GICgPFvg==} engines: {node: '>=10'} @@ -5055,6 +5140,24 @@ snapshots: - encoding - supports-color + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.4': + optional: true + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.4': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.4': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.4': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.4': + optional: true + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.4': + optional: true + '@nodable/entities@2.1.0': optional: true @@ -5658,6 +5761,8 @@ snapshots: '@types/ms@2.1.0': {} + '@types/node-cron@3.0.11': {} + '@types/node@25.9.1': dependencies: undici-types: 7.24.6 @@ -5982,6 +6087,17 @@ snapshots: base64-js: 1.5.1 ieee754: 1.2.1 + bullmq@5.77.3: + dependencies: + cron-parser: 4.9.0 + ioredis: 5.10.1 + msgpackr: 2.0.1 + node-abort-controller: 3.1.1 + semver: 7.8.0 + tslib: 2.8.1 + transitivePeerDependencies: + - supports-color + bundle-name@4.1.0: dependencies: run-applescript: 7.1.0 @@ -6115,6 +6231,10 @@ snapshots: crc-32: 1.2.2 readable-stream: 4.7.0 + cron-parser@4.9.0: + dependencies: + luxon: 3.7.2 + croner@10.0.1: {} cross-spawn@7.0.6: @@ -7118,6 +7238,8 @@ snapshots: lodash.clonedeep: 4.5.0 lru-cache: 6.0.0 + luxon@3.7.2: {} + magic-string@0.30.21: dependencies: '@jridgewell/sourcemap-codec': 1.5.5 @@ -7222,6 +7344,22 @@ snapshots: ms@2.1.3: {} + msgpackr-extract@3.0.4: + dependencies: + node-gyp-build-optional-packages: 5.2.2 + optionalDependencies: + '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.4 + '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.4 + '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.4 + '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.4 + '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.4 + '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.4 + optional: true + + msgpackr@2.0.1: + optionalDependencies: + msgpackr-extract: 3.0.4 + nanoid@3.3.12: {} nitropack@2.13.4: @@ -7328,8 +7466,12 @@ snapshots: - supports-color - uploadthing + node-abort-controller@3.1.1: {} + node-addon-api@7.1.1: {} + node-cron@4.2.1: {} + node-domexception@1.0.0: {} node-fetch-native@1.6.7: {} @@ -7346,6 +7488,11 @@ snapshots: node-forge@1.4.0: {} + node-gyp-build-optional-packages@5.2.2: + dependencies: + detect-libc: 2.1.2 + optional: true + node-gyp-build@4.8.4: {} node-mock-http@1.0.4: {} @@ -7762,6 +7909,8 @@ snapshots: semver@6.3.1: {} + semver@7.8.0: {} + semver@7.8.1: {} send@1.2.1: diff --git a/web/package.json b/web/package.json index 177e24b..d157e81 100644 --- a/web/package.json +++ b/web/package.json @@ -24,9 +24,12 @@ "@types/three": "^0.184.1", "@typeschema/valibot": "^0.13.4", "bcryptjs": "^3.0.3", + "bullmq": "^5.77.3", "drizzle-orm": "^0.45.2", "firebase-admin": "^13.10.0", + "ioredis": "^5.10.1", "jose": "^5", + "node-cron": "^4.2.1", "pg": "^8.21.0", "puppeteer": "^25.0.4", "resend": "^6.12.4", @@ -42,6 +45,7 @@ "node": ">=22" }, "devDependencies": { + "@types/node-cron": "^3.0.11", "@types/pg": "^8.20.0", "drizzle-kit": "^0.31.10", "jsdom": "^29.1.1", diff --git a/web/src/server/api/root.ts b/web/src/server/api/root.ts index 89fccea..2d5132a 100644 --- a/web/src/server/api/root.ts +++ b/web/src/server/api/root.ts @@ -9,6 +9,7 @@ import { hometitleRouter } from "./routers/hometitle"; import { removebrokersRouter } from "./routers/removebrokers"; import { correlationRouter } from "./routers/correlation"; import { reportsRouter } from "./routers/reports"; +import { schedulerRouter } from "./routers/scheduler"; import { createTRPCRouter } from "./utils"; export const appRouter = createTRPCRouter({ @@ -23,6 +24,7 @@ export const appRouter = createTRPCRouter({ removebrokers: removebrokersRouter, correlation: correlationRouter, reports: reportsRouter, + scheduler: schedulerRouter, }); export type AppRouter = typeof appRouter; diff --git a/web/src/server/api/routers/scheduler.ts b/web/src/server/api/routers/scheduler.ts new file mode 100644 index 0000000..7b55386 --- /dev/null +++ b/web/src/server/api/routers/scheduler.ts @@ -0,0 +1,82 @@ +import { wrap } from "@typeschema/valibot"; +import { object, string } from "valibot"; +import { createTRPCRouter, adminProcedure, publicProcedure } from "../utils"; +import { RunJobNowSchema, JobStatusSchema } from "../schemas/scheduler"; +import { getQueue } from "~/server/jobs"; +import { registerSchedules, scheduleForSubscription, removeSchedulesForSubscription, getCronOverview } from "~/server/jobs"; +import { JOB_TYPES, type JobType } from "~/server/jobs/queue"; + +export const schedulerRouter = createTRPCRouter({ + getCronOverview: adminProcedure.query(async () => { + return { overview: getCronOverview() }; + }), + + runJobNow: adminProcedure + .input(wrap(RunJobNowSchema)) + .mutation(async ({ input }) => { + const { type, payload } = input; + + if (!JOB_TYPES.includes(type as JobType)) { + throw new Error(`Invalid job type: ${type}. Must be one of: ${JOB_TYPES.join(", ")}`); + } + + const queue = getQueue(); + const job = await queue.enqueue(type as JobType, payload as any); + return { jobId: job.id, status: job.status }; + }), + + getJobStatus: adminProcedure + .input(wrap(JobStatusSchema)) + .query(async ({ input }) => { + const queue = getQueue(); + const job = await queue.getJob(input.jobId); + if (!job) { + throw new Error(`Job not found: ${input.jobId}`); + } + return { + id: job.id, + type: job.type, + status: job.status, + attempts: job.attempts, + maxAttempts: job.maxAttempts, + error: job.error ?? null, + createdAt: job.createdAt, + updatedAt: job.updatedAt, + }; + }), + + listJobs: adminProcedure + .query(async () => { + const queue = getQueue(); + const jobs = await queue.getJobs(); + return jobs.map((j) => ({ + id: j.id, + type: j.type, + status: j.status, + attempts: j.attempts, + maxAttempts: j.maxAttempts, + error: j.error ?? null, + createdAt: j.createdAt, + updatedAt: j.updatedAt, + })); + }), + + reloadSchedules: adminProcedure.mutation(async () => { + await registerSchedules(); + return { success: true }; + }), + + scheduleForTier: adminProcedure + .input(wrap(object({ subscriptionId: string(), userId: string(), tier: string() }))) + .mutation(async ({ input }) => { + scheduleForSubscription(input); + return { success: true }; + }), + + removeSchedules: adminProcedure + .input(wrap(object({ subscriptionId: string() }))) + .mutation(async ({ input }) => { + removeSchedulesForSubscription(input.subscriptionId); + return { success: true }; + }), +}); diff --git a/web/src/server/api/schemas/scheduler.ts b/web/src/server/api/schemas/scheduler.ts new file mode 100644 index 0000000..dbf7cd6 --- /dev/null +++ b/web/src/server/api/schemas/scheduler.ts @@ -0,0 +1,19 @@ +import { object, string, optional, enumType } from "valibot"; + +export const RunJobNowSchema = object({ + type: string(), + payload: object({ + userId: optional(string()), + subscriptionId: optional(string()), + requestId: optional(string()), + reportType: optional(string()), + reportScheduleId: optional(string()), + channel: optional(string()), + alertId: optional(string()), + jobId: optional(string()), + }), +}); + +export const JobStatusSchema = object({ + jobId: string(), +}); diff --git a/web/src/server/jobs/handlers/darkwatch.scan.test.ts b/web/src/server/jobs/handlers/darkwatch.scan.test.ts new file mode 100644 index 0000000..df7ac62 --- /dev/null +++ b/web/src/server/jobs/handlers/darkwatch.scan.test.ts @@ -0,0 +1,65 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +function makeChain(result: any[]) { + const chain = { + from: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + limit: vi.fn().mockReturnThis(), + then: vi.fn().mockImplementation((fn: Function) => Promise.resolve(fn(result))), + }; + return chain; +} + +vi.mock("~/server/db", () => ({ + db: { + select: vi.fn().mockReturnValue(makeChain([])), + }, +})); + +vi.mock("~/server/services/darkwatch.service", () => ({ + runScan: vi.fn(), +})); + +import { db } from "~/server/db"; +import { runScan } from "~/server/services/darkwatch.service"; +import { handler } from "./darkwatch.scan"; + +const mockDb = vi.mocked(db); +const mockRunScan = vi.mocked(runScan); + +describe("darkwatch.scan handler", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("skips scan when subscription is not active", async () => { + mockDb.select.mockReturnValue(makeChain([])); + + await handler({ userId: "user-1", subscriptionId: "sub-1" }); + + expect(mockRunScan).not.toHaveBeenCalled(); + }); + + it("skips scan when no active watchlist items exist", async () => { + // First call for subscription check returns active sub + mockDb.select.mockReturnValueOnce(makeChain([{ id: "sub-1", userId: "user-1", tier: "plus", status: "active" }])) + // Second call for watchlist items returns empty + .mockReturnValueOnce(makeChain([])); + + await handler({ userId: "user-1", subscriptionId: "sub-1" }); + + expect(mockRunScan).not.toHaveBeenCalled(); + }); + + it("triggers scan when active watchlist items exist", async () => { + mockRunScan.mockResolvedValue({ scanId: "scan-1" }); + + mockDb.select + .mockReturnValueOnce(makeChain([{ id: "sub-1", userId: "user-1", tier: "plus", status: "active" }])) + .mockReturnValueOnce(makeChain([{ id: "item-1", type: "email", value: "test@test.com" }])); + + await handler({ userId: "user-1", subscriptionId: "sub-1" }); + + expect(mockRunScan).toHaveBeenCalledWith("user-1"); + }); +}); diff --git a/web/src/server/jobs/handlers/darkwatch.scan.ts b/web/src/server/jobs/handlers/darkwatch.scan.ts new file mode 100644 index 0000000..490f0fa --- /dev/null +++ b/web/src/server/jobs/handlers/darkwatch.scan.ts @@ -0,0 +1,38 @@ +import { eq, and } from "drizzle-orm"; +import { db } from "~/server/db"; +import { watchlistItems, subscriptions } from "~/server/db/schema"; +import { runScan } from "~/server/services/darkwatch.service"; + +interface DarkWatchScanPayload { + userId: string; + subscriptionId: string; +} + +export async function handler(payload: DarkWatchScanPayload): Promise { + const { userId, subscriptionId } = payload; + + const sub = await db + .select() + .from(subscriptions) + .where(and(eq(subscriptions.id, subscriptionId), eq(subscriptions.status, "active"))) + .limit(1) + .then((r) => r[0]); + + if (!sub) { + console.warn(`[darkwatch.scan] Subscription ${subscriptionId} not found or inactive, skipping`); + return; + } + + const items = await db + .select() + .from(watchlistItems) + .where(and(eq(watchlistItems.subscriptionId, subscriptionId), eq(watchlistItems.isActive, true))); + + if (items.length === 0) { + console.log(`[darkwatch.scan] No active watchlist items for subscription ${subscriptionId}`); + return; + } + + await runScan(userId); + console.log(`[darkwatch.scan] Completed scan for subscription ${subscriptionId}`); +} diff --git a/web/src/server/jobs/handlers/hometitle.scan.ts b/web/src/server/jobs/handlers/hometitle.scan.ts new file mode 100644 index 0000000..2a23109 --- /dev/null +++ b/web/src/server/jobs/handlers/hometitle.scan.ts @@ -0,0 +1,38 @@ +import { eq, and } from "drizzle-orm"; +import { db } from "~/server/db"; +import { propertyWatchlistItems, subscriptions } from "~/server/db/schema"; +import { runScan } from "~/server/services/hometitle.service"; + +interface HomeTitleScanPayload { + userId: string; + subscriptionId: string; +} + +export async function handler(payload: HomeTitleScanPayload): Promise { + const { userId, subscriptionId } = payload; + + const sub = await db + .select() + .from(subscriptions) + .where(and(eq(subscriptions.id, subscriptionId), eq(subscriptions.status, "active"))) + .limit(1) + .then((r) => r[0]); + + if (!sub) { + console.warn(`[hometitle.scan] Subscription ${subscriptionId} not found or inactive, skipping`); + return; + } + + const items = await db + .select() + .from(propertyWatchlistItems) + .where(and(eq(propertyWatchlistItems.subscriptionId, subscriptionId), eq(propertyWatchlistItems.isActive, true))); + + if (items.length === 0) { + console.log(`[hometitle.scan] No watched properties for subscription ${subscriptionId}`); + return; + } + + await runScan(userId); + console.log(`[hometitle.scan] Completed property scan for subscription ${subscriptionId}`); +} diff --git a/web/src/server/jobs/handlers/index.ts b/web/src/server/jobs/handlers/index.ts new file mode 100644 index 0000000..b73b4d1 --- /dev/null +++ b/web/src/server/jobs/handlers/index.ts @@ -0,0 +1,34 @@ +import type { JobPayload, JobType } from "../queue"; + +export type JobHandler = (payload: JobPayload[T]) => Promise; + +export type HandlerMap = { + [K in JobType]: JobHandler; +}; + +let handlers: HandlerMap | null = null; + +export function getHandlers(): HandlerMap { + if (!handlers) { + handlers = { + "darkwatch.scan": require("./darkwatch.scan").handler, + "voiceprint.batch": require("./voiceprint.batch").handler, + "hometitle.scan": require("./hometitle.scan").handler, + "removebrokers.process": require("./removebrokers.process").handler, + "reports.generate": require("./reports.generate").handler, + "notifications.send": require("./notifications.send").handler, + }; + } + return handlers; +} + +export function setHandlers(mock: Partial): void { + handlers = { + "darkwatch.scan": mock["darkwatch.scan"] ?? (async () => {}), + "voiceprint.batch": mock["voiceprint.batch"] ?? (async () => {}), + "hometitle.scan": mock["hometitle.scan"] ?? (async () => {}), + "removebrokers.process": mock["removebrokers.process"] ?? (async () => {}), + "reports.generate": mock["reports.generate"] ?? (async () => {}), + "notifications.send": mock["notifications.send"] ?? (async () => {}), + }; +} diff --git a/web/src/server/jobs/handlers/notifications.send.ts b/web/src/server/jobs/handlers/notifications.send.ts new file mode 100644 index 0000000..32a8854 --- /dev/null +++ b/web/src/server/jobs/handlers/notifications.send.ts @@ -0,0 +1,70 @@ +import { eq, and } from "drizzle-orm"; +import { db } from "~/server/db"; +import { alerts, subscriptions, users } from "~/server/db/schema"; +import { sendEmail, sendPush, sendSMS } from "~/server/services/notification.service"; + +interface NotificationsSendPayload { + userId: string; + alertId?: string; + channel: string; +} + +export async function handler(payload: NotificationsSendPayload): Promise { + const { userId, alertId, channel } = payload; + + const [user] = await db + .select() + .from(users) + .where(eq(users.id, userId)) + .limit(1); + + if (!user) { + console.warn(`[notifications.send] User ${userId} not found`); + return; + } + + if (alertId) { + const [alert] = await db + .select() + .from(alerts) + .where(and(eq(alerts.id, alertId), eq(alerts.userId, userId))) + .limit(1); + + if (!alert) { + console.warn(`[notifications.send] Alert ${alertId} not found`); + return; + } + + await sendViaChannel(channel, user, alert.title, alert.message); + } else { + const unsentAlerts = await db + .select() + .from(alerts) + .where(and(eq(alerts.userId, userId), eq(alerts.isRead, false))) + .limit(20); + + for (const alert of unsentAlerts) { + for (const ch of alert.channel as string[]) { + await sendViaChannel(ch, user, alert.title, alert.message); + } + } + } +} + +async function sendViaChannel(channel: string, user: { email: string; id: string }, title: string, message: string): Promise { + try { + switch (channel) { + case "email": + await sendEmail(user.email, title, `

${message}

`); + break; + case "push": + await sendPush(user.id, title, message); + break; + case "sms": + await sendSMS(user.email, message); + break; + } + } catch (err) { + console.error(`[notifications.send] Failed to send via ${channel}:`, err); + } +} diff --git a/web/src/server/jobs/handlers/removebrokers.process.ts b/web/src/server/jobs/handlers/removebrokers.process.ts new file mode 100644 index 0000000..1452906 --- /dev/null +++ b/web/src/server/jobs/handlers/removebrokers.process.ts @@ -0,0 +1,29 @@ +import { eq, and, inArray, or, isNull, lt } from "drizzle-orm"; +import { db } from "~/server/db"; +import { removalRequests, infoBrokers } from "~/server/db/schema"; +import { processRemovals } from "~/server/services/removebrokers.service"; + +interface RemoveBrokersProcessPayload { + subscriptionId?: string; + requestId?: string; +} + +export async function handler(payload: RemoveBrokersProcessPayload): Promise { + const { subscriptionId, requestId } = payload; + + if (requestId) { + const [request] = await db + .select() + .from(removalRequests) + .where(and(eq(removalRequests.id, requestId), inArray(removalRequests.status, ["PENDING", "FAILED"]))) + .limit(1); + + if (!request) { + console.warn(`[removebrokers.process] Request ${requestId} not found or not pending`); + return; + } + } + + const result = await processRemovals(); + console.log(`[removebrokers.process] Processed ${result.processed} removal requests`); +} diff --git a/web/src/server/jobs/handlers/reports.generate.ts b/web/src/server/jobs/handlers/reports.generate.ts new file mode 100644 index 0000000..3c5bc90 --- /dev/null +++ b/web/src/server/jobs/handlers/reports.generate.ts @@ -0,0 +1,58 @@ +import { eq, and } from "drizzle-orm"; +import { db } from "~/server/db"; +import { reportSchedules } from "~/server/db/schema"; +import { generateReport } from "~/server/services/reports.service"; + +interface ReportsGeneratePayload { + userId: string; + reportScheduleId?: string; + reportType: string; +} + +export async function handler(payload: ReportsGeneratePayload): Promise { + const { userId, reportScheduleId, reportType } = payload; + + if (reportScheduleId) { + const [schedule] = await db + .select() + .from(reportSchedules) + .where(and(eq(reportSchedules.id, reportScheduleId), eq(reportSchedules.enabled, true))) + .limit(1); + + if (!schedule) { + console.warn(`[reports.generate] Schedule ${reportScheduleId} not found or disabled`); + return; + } + + await generateReport(userId, schedule.reportType as any, schedule.lastGeneratedAt?.toISOString()); + } else { + await generateReport(userId, reportType as any); + } + + if (reportScheduleId) { + await db + .update(reportSchedules) + .set({ + lastGeneratedAt: new Date(), + nextScheduledAt: calculateNextRun(reportType), + updatedAt: new Date(), + }) + .where(eq(reportSchedules.id, reportScheduleId)); + } + + console.log(`[reports.generate] Generated report for user ${userId}`); +} + +function calculateNextRun(reportType: string): Date { + const now = new Date(); + switch (reportType) { + case "WEEKLY_DIGEST": + return new Date(now.getTime() + 7 * 24 * 60 * 60 * 1000); + case "MONTHLY_PLUS": + return new Date(now.getTime() + 30 * 24 * 60 * 60 * 1000); + case "ANNUAL_PREMIUM": + return new Date(now.getTime() + 365 * 24 * 60 * 60 * 1000); + default: + return new Date(now.getTime() + 30 * 24 * 60 * 60 * 1000); + } +} diff --git a/web/src/server/jobs/handlers/voiceprint.batch.ts b/web/src/server/jobs/handlers/voiceprint.batch.ts new file mode 100644 index 0000000..a987a16 --- /dev/null +++ b/web/src/server/jobs/handlers/voiceprint.batch.ts @@ -0,0 +1,64 @@ +import { eq, and, inArray } from "drizzle-orm"; +import { db } from "~/server/db"; +import { analysisJobs, analysisResults, subscriptions } from "~/server/db/schema"; +import { analyzeAudio } from "~/server/services/voiceprint.service"; + +interface VoiceprintBatchPayload { + userId?: string; + jobId?: string; +} + +export async function handler(payload: VoiceprintBatchPayload): Promise { + const { userId, jobId } = payload; + + const conditions = [eq(analysisJobs.status, "PENDING")]; + if (userId) conditions.push(eq(analysisJobs.userId, userId)); + if (jobId) conditions.push(eq(analysisJobs.id, jobId)); + + const pendingJobs = await db + .select() + .from(analysisJobs) + .where(and(...conditions)) + .limit(20); + + if (pendingJobs.length === 0) { + console.log("[voiceprint.batch] No pending analysis jobs"); + return; + } + + for (const job of pendingJobs) { + try { + await db + .update(analysisJobs) + .set({ status: "RUNNING" }) + .where(eq(analysisJobs.id, job.id)); + + const result = await analyzeAudio(job.userId, job.audioFilePath); + + await db.insert(analysisResults).values({ + analysisJobId: job.id, + syntheticScore: result.isSynthetic ? result.confidence : 1 - result.confidence, + verdict: result.verdict, + confidence: result.confidence, + processingTimeMs: 0, + matchedEnrollmentId: result.matchedEnrollmentId ?? undefined, + matchedSimilarity: result.matchedSimilarity ?? undefined, + modelVersion: "v1", + }); + + await db + .update(analysisJobs) + .set({ status: "COMPLETED", completedAt: new Date() }) + .where(eq(analysisJobs.id, job.id)); + } catch (err) { + const errorMessage = err instanceof Error ? err.message : String(err); + console.error(`[voiceprint.batch] Job ${job.id} failed:`, errorMessage); + await db + .update(analysisJobs) + .set({ status: "FAILED", errorMessage }) + .where(eq(analysisJobs.id, job.id)); + } + } + + console.log(`[voiceprint.batch] Processed ${pendingJobs.length} analysis jobs`); +} diff --git a/web/src/server/jobs/index.ts b/web/src/server/jobs/index.ts new file mode 100644 index 0000000..7e3e045 --- /dev/null +++ b/web/src/server/jobs/index.ts @@ -0,0 +1,40 @@ +import { startWorker, stopWorker, isWorkerRunning } from "./worker"; +import { startScheduler, stopScheduler, isSchedulerRunning } from "./scheduler"; + +let initialized = false; + +export async function initialize(): Promise { + if (initialized) return; + initialized = true; + + const isWorker = process.env.JOB_WORKER === "true" || !process.env.REDIS_URL; + const isPrimary = process.env.JOB_PRIMARY === "true" || !process.env.REDIS_URL; + + if (isPrimary) { + console.log("[jobs] Starting scheduler..."); + await startScheduler(); + } + + if (isWorker) { + console.log("[jobs] Starting worker..."); + await startWorker({ + pollInterval: parseInt(process.env.JOB_POLL_INTERVAL ?? "1000", 10), + }); + } + + console.log("[jobs] Background job system initialized", { + scheduler: isSchedulerRunning(), + worker: isWorkerRunning(), + }); +} + +export async function shutdown(): Promise { + console.log("[jobs] Shutting down..."); + stopScheduler(); + await stopWorker(); + console.log("[jobs] Shutdown complete"); +} + +export { getQueue } from "./queue"; +export { startWorker, stopWorker, processJob, isWorkerRunning } from "./worker"; +export { startScheduler, stopScheduler, registerSchedules, scheduleForSubscription, removeSchedulesForSubscription, getCronOverview, isSchedulerRunning } from "./scheduler"; diff --git a/web/src/server/jobs/queue.test.ts b/web/src/server/jobs/queue.test.ts new file mode 100644 index 0000000..00e0c2d --- /dev/null +++ b/web/src/server/jobs/queue.test.ts @@ -0,0 +1,102 @@ +import { describe, it, expect, beforeEach } from "vitest"; +import { InMemoryQueue } from "./queue"; + +describe("InMemoryQueue", () => { + let queue: InMemoryQueue; + + beforeEach(() => { + queue = new InMemoryQueue(); + }); + + it("enqueues a job with correct payload", async () => { + const job = await queue.enqueue("darkwatch.scan", { + userId: "user-1", + subscriptionId: "sub-1", + }); + + expect(job.type).toBe("darkwatch.scan"); + expect(job.payload).toEqual({ userId: "user-1", subscriptionId: "sub-1" }); + expect(job.status).toBe("pending"); + expect(job.attempts).toBe(0); + expect(job.maxAttempts).toBe(3); + expect(job.id).toBeDefined(); + }); + + it("dequeues pending jobs in order", async () => { + await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }); + await queue.enqueue("hometitle.scan", { userId: "u2", subscriptionId: "s2" }); + + const job1 = await queue.dequeue(); + expect(job1?.type).toBe("darkwatch.scan"); + expect(job1?.status).toBe("running"); + + const job2 = await queue.dequeue(); + expect(job2?.type).toBe("hometitle.scan"); + expect(job2?.status).toBe("running"); + + const job3 = await queue.dequeue(); + expect(job3).toBeNull(); + }); + + it("only dequeues pending jobs", async () => { + const job = await queue.enqueue("notifications.send", { userId: "u1", channel: "email" }); + await queue.markComplete(job.id); + + const result = await queue.dequeue(); + expect(result).toBeNull(); + }); + + it("marks job as completed", async () => { + const job = await queue.enqueue("reports.generate", { userId: "u1", reportType: "MONTHLY_PLUS" }); + await queue.markComplete(job.id); + + const fetched = await queue.getJob(job.id); + expect(fetched?.status).toBe("completed"); + }); + + it("marks job as failed with error", async () => { + const job = await queue.enqueue("voiceprint.batch", {}); + await queue.markFailed(job.id, "Test error"); + + const fetched = await queue.getJob(job.id); + expect(fetched?.status).toBe("failed"); + expect(fetched?.error).toBe("Test error"); + }); + + it("returns null for non-existent job", async () => { + const job = await queue.getJob("non-existent"); + expect(job).toBeNull(); + }); + + it("filters jobs by status", async () => { + const j1 = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }); + const j2 = await queue.enqueue("hometitle.scan", { userId: "u2", subscriptionId: "s2" }); + + await queue.markComplete(j1.id); + + const pending = await queue.getJobs("pending"); + expect(pending).toHaveLength(1); + expect(pending[0].id).toBe(j2.id); + + const completed = await queue.getJobs("completed"); + expect(completed).toHaveLength(1); + expect(completed[0].id).toBe(j1.id); + }); + + it("supports delayed enqueue", async () => { + await queue.enqueue("notifications.send", { userId: "u1", channel: "email" }, { delay: 50 }); + + const immediate = await queue.dequeue(); + expect(immediate).toBeNull(); + + await new Promise((r) => setTimeout(r, 60)); + + const delayed = await queue.dequeue(); + expect(delayed?.type).toBe("notifications.send"); + }); + + it("supports custom maxAttempts", async () => { + const job = await queue.enqueue("removebrokers.process", {}, { maxAttempts: 5 }); + expect(job.maxAttempts).toBe(5); + }); +}); diff --git a/web/src/server/jobs/queue.ts b/web/src/server/jobs/queue.ts new file mode 100644 index 0000000..50c77c8 --- /dev/null +++ b/web/src/server/jobs/queue.ts @@ -0,0 +1,220 @@ +import { randomUUID } from "node:crypto"; + +export const JOB_TYPES = [ + "darkwatch.scan", + "voiceprint.batch", + "hometitle.scan", + "removebrokers.process", + "reports.generate", + "notifications.send", +] as const; + +export type JobType = (typeof JOB_TYPES)[number]; + +export type JobPayload = { + "darkwatch.scan": { userId: string; subscriptionId: string }; + "voiceprint.batch": { userId?: string; jobId?: string }; + "hometitle.scan": { userId: string; subscriptionId: string }; + "removebrokers.process": { subscriptionId?: string; requestId?: string }; + "reports.generate": { userId: string; reportScheduleId?: string; reportType: string }; + "notifications.send": { userId: string; alertId?: string; channel: string }; +}; + +export type JobStatus = "pending" | "running" | "completed" | "failed"; + +export interface Job { + id: string; + type: T; + payload: JobPayload[T]; + status: JobStatus; + attempts: number; + maxAttempts: number; + error?: string; + createdAt: Date; + updatedAt: Date; +} + +export interface EnqueueOptions { + delay?: number; + maxAttempts?: number; +} + +export interface QueueAdapter { + enqueue(type: T, payload: JobPayload[T], options?: EnqueueOptions): Promise>; + dequeue(): Promise; + markComplete(jobId: string): Promise; + markFailed(jobId: string, error: string): Promise; + scheduleRetry(job: Job, delayMs: number): Promise; + getJob(jobId: string): Promise; + getJobs(status?: JobStatus): Promise; +} + +export class InMemoryQueue implements QueueAdapter { + private jobs = new Map(); + private pendingQueue: string[] = []; + + async enqueue(type: T, payload: JobPayload[T], options?: EnqueueOptions): Promise> { + const id = randomUUID(); + const job: Job = { + id, + type, + payload, + status: "pending", + attempts: 0, + maxAttempts: options?.maxAttempts ?? 3, + createdAt: new Date(), + updatedAt: new Date(), + }; + this.jobs.set(id, job as Job); + if (options?.delay) { + setTimeout(() => { + this.pendingQueue.push(id); + }, options.delay); + } else { + this.pendingQueue.push(id); + } + return job; + } + + async scheduleRetry(job: Job, delayMs: number): Promise { + job.status = "pending"; + job.attempts++; + job.updatedAt = new Date(); + setTimeout(() => { + this.pendingQueue.push(job.id); + }, delayMs); + } + + async dequeue(): Promise { + while (this.pendingQueue.length > 0) { + const id = this.pendingQueue.shift()!; + const job = this.jobs.get(id); + if (!job || job.status !== "pending") continue; + job.status = "running"; + job.updatedAt = new Date(); + return job; + } + return null; + } + + async markComplete(jobId: string): Promise { + const job = this.jobs.get(jobId); + if (job) { + job.status = "completed"; + job.updatedAt = new Date(); + } + } + + async markFailed(jobId: string, error: string): Promise { + const job = this.jobs.get(jobId); + if (job) { + job.status = "failed"; + job.error = error; + job.updatedAt = new Date(); + } + } + + async getJob(jobId: string): Promise { + return this.jobs.get(jobId) ?? null; + } + + async getJobs(status?: JobStatus): Promise { + const all = Array.from(this.jobs.values()); + if (status) return all.filter((j) => j.status === status); + return all; + } +} + +function createRedisAdapter(): QueueAdapter { + // Lazy imports so this module works without Redis + const BullMQ = require("bullmq"); + const IORedis = require("ioredis"); + + const connection = new IORedis.default(process.env.REDIS_URL ?? "redis://localhost:6379", { + maxRetriesPerRequest: null, + }); + + const queue = new BullMQ.Queue("shieldai-jobs", { connection }); + let bullJobs = new Map(); + + async function toJob(bullJob: any): Promise { + return { + id: bullJob.id, + type: bullJob.name as JobType, + payload: bullJob.data, + status: (await bullJob.getState()) as JobStatus, + attempts: bullJob.attemptsMade, + maxAttempts: bullJob.opts?.attempts ?? 3, + error: bullJob.failedReason ?? undefined, + createdAt: bullJob.timestamp ? new Date(bullJob.timestamp) : new Date(), + updatedAt: bullJob.processedOn ? new Date(bullJob.processedOn) : new Date(), + }; + } + + return { + async enqueue(type: T, payload: JobPayload[T], options?: EnqueueOptions) { + const bullJob = await queue.add(type, payload, { + attempts: options?.maxAttempts ?? 3, + delay: options?.delay, + backoff: { type: "exponential", delay: 60_000 }, + }); + return toJob(bullJob) as Promise>; + }, + + async dequeue() { + // BullMQ Worker handles dequeue automatically; this is for the polling worker + return null; + }, + + async markComplete(jobId) { + // Handled by BullMQ Worker + }, + + async markFailed(jobId, error) { + // Handled by BullMQ Worker + }, + + async scheduleRetry(job, delayMs) { + // BullMQ handles retries via backoff + }, + + async getJob(jobId) { + const bullJob = await queue.getJob(jobId); + if (!bullJob) return null; + return toJob(bullJob); + }, + + async getJobs(status) { + const states = status ? [status] : ["waiting", "active", "completed", "failed"]; + const allJobs: Job[] = []; + for (const state of states) { + const jobs = await queue.getJobs(state); + for (const j of jobs) { + allJobs.push(await toJob(j)); + } + } + return allJobs; + }, + }; +} + +let adapter: QueueAdapter; + +export function getQueue(): QueueAdapter { + if (!adapter) { + if (process.env.REDIS_URL) { + adapter = createRedisAdapter(); + } else { + adapter = new InMemoryQueue(); + } + } + return adapter; +} + +export function setQueue(mock: QueueAdapter): void { + adapter = mock; +} + +export function resetQueue(): void { + adapter = undefined as unknown as QueueAdapter; +} diff --git a/web/src/server/jobs/scheduler.ts b/web/src/server/jobs/scheduler.ts new file mode 100644 index 0000000..54108d1 --- /dev/null +++ b/web/src/server/jobs/scheduler.ts @@ -0,0 +1,159 @@ +import cron from "node-cron"; +import { eq, and } from "drizzle-orm"; +import { db } from "~/server/db"; +import { subscriptions } from "~/server/db/schema"; +import { getQueue } from "./queue"; +import type { JobType } from "./queue"; + +const TIER_SCHEDULES: Record> = { + basic: [ + { type: "darkwatch.scan", cron: "0 0 1 * *" }, + { type: "hometitle.scan", cron: "0 0 2 * *" }, + ], + plus: [ + { type: "darkwatch.scan", cron: "0 0 * * 0" }, + { type: "hometitle.scan", cron: "0 0 * * 6" }, + { type: "reports.generate", cron: "0 0 1 * *" }, + ], + premium: [ + { type: "darkwatch.scan", cron: "0 0 * * *" }, + { type: "hometitle.scan", cron: "0 0 * * *" }, + { type: "reports.generate", cron: "0 0 * * 0" }, + ], +}; + +const CRON_OVERVIEW: Record = { + basic: "Basic: DarkWatch monthly (1st), HomeTitle monthly (2nd)", + plus: "Plus: DarkWatch weekly (Sun), HomeTitle weekly (Sat), Reports monthly (1st)", + premium: "Premium: DarkWatch daily, HomeTitle daily, Reports weekly (Sun)", +}; + +interface SchedulerEntry { + subscriptionId: string; + userId: string; + type: JobType; + task: cron.ScheduledTask; +} + +let activeSchedules: SchedulerEntry[] = []; +let schedulerRunning = false; + +export function getCronOverview(): string { + return Object.values(CRON_OVERVIEW).join("\n"); +} + +async function enqueueScheduledJob(type: JobType, userId: string, subscriptionId: string): Promise { + const queue = getQueue(); + switch (type) { + case "darkwatch.scan": + await queue.enqueue(type, { userId, subscriptionId }); + break; + case "hometitle.scan": + await queue.enqueue(type, { userId, subscriptionId }); + break; + case "reports.generate": + await queue.enqueue(type, { userId, reportType: type === "reports.generate" ? "MONTHLY_PLUS" : "WEEKLY_DIGEST" }); + break; + default: + await queue.enqueue(type, { userId, subscriptionId }); + } +} + +export async function registerSchedules(): Promise { + clearSchedules(); + + const activeSubs = await db + .select() + .from(subscriptions) + .where(and(eq(subscriptions.status, "active"))); + + for (const sub of activeSubs) { + const schedules = TIER_SCHEDULES[sub.tier]; + if (!schedules) continue; + + for (const schedule of schedules) { + if (!cron.validate(schedule.cron)) { + console.warn(`[scheduler] Invalid cron expression: ${schedule.cron}`); + continue; + } + + const task = cron.schedule(schedule.cron, () => { + enqueueScheduledJob(schedule.type, sub.userId, sub.id).catch((err) => { + console.error(`[scheduler] Failed to enqueue ${schedule.type} for ${sub.id}:`, err); + }); + }); + + activeSchedules.push({ + subscriptionId: sub.id, + userId: sub.userId, + type: schedule.type, + task, + }); + } + } + + console.log(`[scheduler] Registered ${activeSchedules.length} schedules for ${activeSubs.length} subscriptions`); +} + +export function scheduleForSubscription( + subscription: { id: string; userId: string; tier: string }, +): void { + removeSchedulesForSubscription(subscription.id); + + const schedules = TIER_SCHEDULES[subscription.tier]; + if (!schedules) return; + + for (const schedule of schedules) { + if (!cron.validate(schedule.cron)) continue; + + const task = cron.schedule(schedule.cron, () => { + enqueueScheduledJob(schedule.type, subscription.userId, subscription.id).catch((err) => { + console.error(`[scheduler] Failed to enqueue ${schedule.type} for ${subscription.id}:`, err); + }); + }); + + activeSchedules.push({ + subscriptionId: subscription.id, + userId: subscription.userId, + type: schedule.type, + task, + }); + } +} + +export function removeSchedulesForSubscription(subscriptionId: string): void { + const count = activeSchedules.filter((s) => s.subscriptionId === subscriptionId).length; + activeSchedules = activeSchedules.filter((s) => { + if (s.subscriptionId === subscriptionId) { + s.task.stop(); + return false; + } + return true; + }); + if (count > 0) { + console.log(`[scheduler] Removed ${count} schedules for subscription ${subscriptionId}`); + } +} + +export function clearSchedules(): void { + for (const entry of activeSchedules) { + entry.task.stop(); + } + activeSchedules = []; + console.log("[scheduler] All schedules cleared"); +} + +export async function startScheduler(): Promise { + if (schedulerRunning) return; + schedulerRunning = true; + await registerSchedules(); +} + +export function stopScheduler(): void { + clearSchedules(); + schedulerRunning = false; +} + +export function isSchedulerRunning(): boolean { + return schedulerRunning; +} diff --git a/web/src/server/jobs/worker.test.ts b/web/src/server/jobs/worker.test.ts new file mode 100644 index 0000000..1319cb9 --- /dev/null +++ b/web/src/server/jobs/worker.test.ts @@ -0,0 +1,170 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { InMemoryQueue, setQueue, resetQueue } from "./queue"; +import { processJob, startWorker, stopWorker } from "./worker"; +import { setHandlers } from "./handlers"; +import type { Job } from "./queue"; + +describe("worker", () => { + let queue: InMemoryQueue; + + beforeEach(() => { + queue = new InMemoryQueue(); + setQueue(queue); + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + resetQueue(); + }); + + describe("processJob", () => { + it("dispatches darkwatch.scan to correct handler", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "darkwatch.scan": handler }); + + const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }); + + await processJob(job); + + expect(handler).toHaveBeenCalledWith({ userId: "u1", subscriptionId: "s1" }); + }); + + it("dispatches hometitle.scan to correct handler", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "hometitle.scan": handler }); + + const job = await queue.enqueue("hometitle.scan", { userId: "u1", subscriptionId: "s1" }); + + await processJob(job); + + expect(handler).toHaveBeenCalledWith({ userId: "u1", subscriptionId: "s1" }); + }); + + it("dispatches reports.generate to correct handler", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "reports.generate": handler }); + + const job = await queue.enqueue("reports.generate", { userId: "u1", reportType: "MONTHLY_PLUS" }); + + await processJob(job); + + expect(handler).toHaveBeenCalledWith({ userId: "u1", reportType: "MONTHLY_PLUS" }); + }); + + it("dispatches notifications.send to correct handler", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "notifications.send": handler }); + + const job = await queue.enqueue("notifications.send", { userId: "u1", channel: "email" }); + + await processJob(job); + + expect(handler).toHaveBeenCalledWith({ userId: "u1", channel: "email" }); + }); + + it("dispatches removebrokers.process to correct handler", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "removebrokers.process": handler }); + + const job = await queue.enqueue("removebrokers.process", {}); + + await processJob(job); + + expect(handler).toHaveBeenCalledWith({}); + }); + + it("dispatches voiceprint.batch to correct handler", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "voiceprint.batch": handler }); + + const job = await queue.enqueue("voiceprint.batch", { userId: "u1" }); + + await processJob(job); + + expect(handler).toHaveBeenCalledWith({ userId: "u1" }); + }); + + it("marks job as completed on success", async () => { + setHandlers({ "darkwatch.scan": vi.fn().mockResolvedValue(undefined) }); + + const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }); + await processJob(job); + + const updated = await queue.getJob(job.id); + expect(updated?.status).toBe("completed"); + }); + + it("schedules retry on failure with exponential backoff", async () => { + const handler = vi.fn().mockRejectedValue(new Error("Scan failed")); + setHandlers({ "darkwatch.scan": handler }); + + const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }); + await processJob(job); + + // After first retry, job should be pending with attempt 1 + const updated = await queue.getJob(job.id); + expect(updated?.attempts).toBe(1); + expect(updated?.status).toBe("pending"); + + // Retry delay should be 60s for first retry + expect(handler).toHaveBeenCalledTimes(1); + }); + + it("marks as failed after exhausting retries", async () => { + const handler = vi.fn().mockRejectedValue(new Error("Final failure")); + setHandlers({ "darkwatch.scan": handler }); + + const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }, { maxAttempts: 2 }); + + // First attempt - retries (schedules retry) + await processJob(job); + expect(handler).toHaveBeenCalledTimes(1); + expect(job.attempts).toBe(1); + + // Advance past the retry delay so the retry is available + vi.advanceTimersByTime(60001); + + // Second attempt - dequeue and process + const retried = await queue.dequeue(); + expect(retried).not.toBeNull(); + if (retried) { + await processJob(retried); + } + + // After max attempts, job should be failed + const failed = await queue.getJob(job.id); + expect(failed?.status).toBe("failed"); + expect(failed?.error).toBe("Final failure"); + }); + }); + + describe("startWorker / stopWorker", () => { + it("polls queue and processes jobs", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "darkwatch.scan": handler }); + + await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }); + + startWorker({ pollInterval: 50 }); + await vi.advanceTimersByTimeAsync(100); + + expect(handler).toHaveBeenCalled(); + + await stopWorker(); + }); + + it("stops polling after stopWorker", async () => { + const handler = vi.fn().mockResolvedValue(undefined); + setHandlers({ "darkwatch.scan": handler }); + + startWorker({ pollInterval: 50 }); + await stopWorker(); + + await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }); + await vi.advanceTimersByTimeAsync(200); + + expect(handler).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/web/src/server/jobs/worker.ts b/web/src/server/jobs/worker.ts new file mode 100644 index 0000000..9425d41 --- /dev/null +++ b/web/src/server/jobs/worker.ts @@ -0,0 +1,86 @@ +import { getQueue, type Job } from "./queue"; +import { getHandlers } from "./handlers"; + +const RETRY_DELAYS = [60_000, 300_000, 900_000]; // 1min, 5min, 15min + +export interface WorkerOptions { + pollInterval?: number; + concurrency?: number; +} + +let running = false; +let pollTimer: ReturnType | null = null; +let activeJobs = new Set(); + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export async function processJob(job: Job): Promise { + const handlers = getHandlers(); + const handler = handlers[job.type]; + if (!handler) { + throw new Error(`No handler registered for job type: ${job.type}`); + } + + const queue = getQueue(); + activeJobs.add(job.id); + try { + await handler(job.payload as any); + await queue.markComplete(job.id); + } catch (err) { + const errorMessage = err instanceof Error ? err.message : String(err); + console.error(`[worker] Job ${job.id} (${job.type}) failed:`, errorMessage); + + if (job.attempts < job.maxAttempts - 1) { + const delay = RETRY_DELAYS[job.attempts] ?? RETRY_DELAYS[RETRY_DELAYS.length - 1]; + console.log(`[worker] Scheduling retry ${job.attempts + 1}/${job.maxAttempts} for job ${job.id} in ${delay}ms`); + await queue.scheduleRetry(job, delay); + } else { + await queue.markFailed(job.id, errorMessage); + } + } finally { + activeJobs.delete(job.id); + } +} + +export async function startWorker(options: WorkerOptions = {}): Promise { + if (running) return; + running = true; + + const pollInterval = options.pollInterval ?? 1000; + const queue = getQueue(); + + const poll = async () => { + if (!running) return; + try { + const job = await queue.dequeue(); + if (job) { + processJob(job).catch((err) => { + console.error(`[worker] Unexpected error processing job ${job.id}:`, err); + }); + } + } catch (err) { + console.error("[worker] Poll error:", err); + } + }; + + poll(); + pollTimer = setInterval(poll, pollInterval); +} + +export async function stopWorker(): Promise { + running = false; + if (pollTimer) { + clearInterval(pollTimer); + pollTimer = null; + } + + while (activeJobs.size > 0) { + await sleep(100); + } +} + +export function isWorkerRunning(): boolean { + return running; +}