feat: implement background job system with queue, worker, scheduler, and handlers

- Add job queue abstraction (InMemoryQueue and Redis/BullMQ adapter)
- Add polling worker with retry logic and exponential backoff
- Add 6 job handlers: darkwatch.scan, voiceprint.batch, hometitle.scan,
  removebrokers.process, reports.generate, notifications.send
- Add cron-based scheduler with tier-appropriate frequencies
  (Basic/Plus/Premium)
- Add tRPC scheduler router for admin (runJobNow, getJobStatus, etc.)
- Add entry point with graceful shutdown support
- Achieve 100% test pass rate for new job system
This commit is contained in:
2026-05-25 17:16:21 -04:00
parent 659ab9b71a
commit eb8e57c674
19 changed files with 1429 additions and 0 deletions

149
pnpm-lock.yaml generated
View File

@@ -55,15 +55,24 @@ importers:
bcryptjs:
specifier: ^3.0.3
version: 3.0.3
bullmq:
specifier: ^5.77.3
version: 5.77.3
drizzle-orm:
specifier: ^0.45.2
version: 0.45.2(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(pg@8.21.0)
firebase-admin:
specifier: ^13.10.0
version: 13.10.0
ioredis:
specifier: ^5.10.1
version: 5.10.1
jose:
specifier: ^5
version: 5.10.0
node-cron:
specifier: ^4.2.1
version: 4.2.1
pg:
specifier: ^8.21.0
version: 8.21.0
@@ -92,6 +101,9 @@ importers:
specifier: ^7.0.0
version: 7.3.3(@types/node@25.9.1)(jiti@2.7.0)(lightningcss@1.32.0)(terser@5.48.0)
devDependencies:
'@types/node-cron':
specifier: ^3.0.11
version: 3.0.11
'@types/pg':
specifier: ^8.20.0
version: 8.20.0
@@ -1040,6 +1052,36 @@ packages:
engines: {node: '>=18'}
hasBin: true
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.4':
resolution: {integrity: sha512-LCkGo6JDfaBhgST7UpPWgNgLINpcpabaHfyz5OBx75nUYxBsaEPxjnyNjWpeb/xBup/682QnBfRBy2/LvPutZQ==}
cpu: [arm64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.4':
resolution: {integrity: sha512-zExlW9zUJKZH/tOtVMttwjKa4Xm/3KcNjnE3dPN92uCktwavMxpgCA3MoJK/DOnTWsQgo224OaST27/mPNAf+w==}
cpu: [x64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.4':
resolution: {integrity: sha512-dgX0P/9wGPJeHFBG+ZmhgE6bmtMt7NP5CRBGyyktpopdk/mW4POnrpQsSLtKI1dwpc+pPLuXHDh6vvskyQE/sw==}
cpu: [arm64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.4':
resolution: {integrity: sha512-Tg3yX65f5GbtXLkrYEHE5oibZG9epyYWas7FogTTEJeDEF9JlXJzKgXaNhT3UXlTOeA+AfZpYZYZ0uPj7Cfquw==}
cpu: [arm]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.4':
resolution: {integrity: sha512-8TNXMEjJc3QEy7R/x1INhgiU+XakDAFUzBhaz7+Rbrs8NH5UQeHQxxmzsSBJGyV6I1jW79undiQm8tOI+D+8FQ==}
cpu: [x64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.4':
resolution: {integrity: sha512-CmCXPQrkbwExx3j946/PtHWHbYJiCRBRDl4BlkRQcJB/YOwQxJRTpoo7aTsortjgoJ1x7opzTSxn7C+ASSLVjQ==}
cpu: [x64]
os: [win32]
'@nodable/entities@2.1.0':
resolution: {integrity: sha512-nyT7T3nbMyBI/lvr6L5TyWbFJAI9FTgVRakNoBqCD+PmID8DzFrrNdLLtHMwMszOtqZa8PAOV24ZqDnQrhQINA==}
@@ -1650,6 +1692,9 @@ packages:
'@types/ms@2.1.0':
resolution: {integrity: sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA==}
'@types/node-cron@3.0.11':
resolution: {integrity: sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==}
'@types/node@25.9.1':
resolution: {integrity: sha512-xfrlY7UD5rMJk3ZVJP8BNzS28J36YJg+xp+LPXV1TdWxr8uMH5A860QNxYDGQe/ylDSgjxE52Q9VnO7p75tJxg==}
@@ -1943,6 +1988,15 @@ packages:
buffer@6.0.3:
resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==}
bullmq@5.77.3:
resolution: {integrity: sha512-ccGjnRF5Bbu21CfzDG7toIewoAj5W9DUHjqhFnppt8YBtoQplynzBGuSzn7yYQobpyYeHEqrYhiJ1NZInrKbyA==}
engines: {node: '>=12.22.0'}
peerDependencies:
redis: '>=5.0.0'
peerDependenciesMeta:
redis:
optional: true
bundle-name@4.1.0:
resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==}
engines: {node: '>=18'}
@@ -2085,6 +2139,10 @@ packages:
resolution: {integrity: sha512-piICUB6ei4IlTv1+653yq5+KoqfBYmj9bw6LqXoOneTMDXk5nM1qt12mFW1caG3LlJXEKW1Bp0WggEmIfQB34g==}
engines: {node: '>= 14'}
cron-parser@4.9.0:
resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==}
engines: {node: '>=12.0.0'}
croner@10.0.1:
resolution: {integrity: sha512-ixNtAJndqh173VQ4KodSdJEI6nuioBWI0V1ITNKhZZsO0pEMoDxz539T4FTTbSZ/xIOSuDnzxLVRqBVSvPNE2g==}
engines: {node: '>=18.0'}
@@ -3051,6 +3109,10 @@ packages:
lru-memoizer@2.3.0:
resolution: {integrity: sha512-GXn7gyHAMhO13WSKrIiNfztwxodVsP8IoZ3XfrJV4yH2x0/OeTO/FIaAHTY5YekdGgW94njfuKmyyt1E0mR6Ug==}
luxon@3.7.2:
resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==}
engines: {node: '>=12'}
magic-string@0.30.21:
resolution: {integrity: sha512-vd2F4YUyEXKGcLHoq+TEyCjxueSeHnFxyyjNp80yg0XV4vUhnDer/lvvlqM/arB5bXQN5K2/3oinyCRyx8T2CQ==}
@@ -3153,6 +3215,13 @@ packages:
ms@2.1.3:
resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==}
msgpackr-extract@3.0.4:
resolution: {integrity: sha512-4kmO/MdyUIkLIvTPr8VHLil4AtoKIoniWPIEk5+CDy0xnWC84azhSFmuJ7PxZdsYtiP5kEeQsORAVIeMgxT+Hw==}
hasBin: true
msgpackr@2.0.1:
resolution: {integrity: sha512-9J+tqTEsbHqY8YohazYgty7LgerFIWxvMLpUjqETSmjHojtJm2WnX2kK/2a1fLI7CO7ERP1YSEUXMucz4j+yBA==}
nanoid@3.3.12:
resolution: {integrity: sha512-ZB9RH/39qpq5Vu6Y+NmUaFhQR6pp+M2Xt76XBnEwDaGcVAqhlvxrl3B2bKS5D3NH3QR76v3aSrKaF/Kiy7lEtQ==}
engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1}
@@ -3168,9 +3237,16 @@ packages:
xml2js:
optional: true
node-abort-controller@3.1.1:
resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==}
node-addon-api@7.1.1:
resolution: {integrity: sha512-5m3bsyrjFWE1xf7nz7YXdN4udnVtXK6/Yfgn5qnahL6bCkf2yKt4k3nuTKAtT4r3IG8JNR2ncsIMdZuAzJjHQQ==}
node-cron@4.2.1:
resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==}
engines: {node: '>=6.0.0'}
node-domexception@1.0.0:
resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==}
engines: {node: '>=10.5.0'}
@@ -3196,6 +3272,10 @@ packages:
resolution: {integrity: sha512-LarFH0+6VfriEhqMMcLX2F7SwSXeWwnEAJEsYm5QKWchiVYVvJyV9v7UDvUv+w5HO23ZpQTXDv/GxdDdMyOuoQ==}
engines: {node: '>= 6.13.0'}
node-gyp-build-optional-packages@5.2.2:
resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==}
hasBin: true
node-gyp-build@4.8.4:
resolution: {integrity: sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==}
hasBin: true
@@ -3576,6 +3656,11 @@ packages:
resolution: {integrity: sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==}
hasBin: true
semver@7.8.0:
resolution: {integrity: sha512-AcM7dV/5ul4EekoQ29Agm5vri8JNqRyj39o0qpX6vDF2GZrtutZl5RwgD1XnZjiTAfncsJhMI48QQH3sN87YNA==}
engines: {node: '>=10'}
hasBin: true
semver@7.8.1:
resolution: {integrity: sha512-rkVq3IXh+4FDGch+KwzX3aV9W3kO54GyEgpvBzSyctDA6Xtd7RJQV1xmXbeQp5v7+VzLOfVqiutSE6GICgPFvg==}
engines: {node: '>=10'}
@@ -5055,6 +5140,24 @@ snapshots:
- encoding
- supports-color
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.4':
optional: true
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.4':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.4':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.4':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.4':
optional: true
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.4':
optional: true
'@nodable/entities@2.1.0':
optional: true
@@ -5658,6 +5761,8 @@ snapshots:
'@types/ms@2.1.0': {}
'@types/node-cron@3.0.11': {}
'@types/node@25.9.1':
dependencies:
undici-types: 7.24.6
@@ -5982,6 +6087,17 @@ snapshots:
base64-js: 1.5.1
ieee754: 1.2.1
bullmq@5.77.3:
dependencies:
cron-parser: 4.9.0
ioredis: 5.10.1
msgpackr: 2.0.1
node-abort-controller: 3.1.1
semver: 7.8.0
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
bundle-name@4.1.0:
dependencies:
run-applescript: 7.1.0
@@ -6115,6 +6231,10 @@ snapshots:
crc-32: 1.2.2
readable-stream: 4.7.0
cron-parser@4.9.0:
dependencies:
luxon: 3.7.2
croner@10.0.1: {}
cross-spawn@7.0.6:
@@ -7118,6 +7238,8 @@ snapshots:
lodash.clonedeep: 4.5.0
lru-cache: 6.0.0
luxon@3.7.2: {}
magic-string@0.30.21:
dependencies:
'@jridgewell/sourcemap-codec': 1.5.5
@@ -7222,6 +7344,22 @@ snapshots:
ms@2.1.3: {}
msgpackr-extract@3.0.4:
dependencies:
node-gyp-build-optional-packages: 5.2.2
optionalDependencies:
'@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.4
'@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.4
'@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.4
'@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.4
'@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.4
'@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.4
optional: true
msgpackr@2.0.1:
optionalDependencies:
msgpackr-extract: 3.0.4
nanoid@3.3.12: {}
nitropack@2.13.4:
@@ -7328,8 +7466,12 @@ snapshots:
- supports-color
- uploadthing
node-abort-controller@3.1.1: {}
node-addon-api@7.1.1: {}
node-cron@4.2.1: {}
node-domexception@1.0.0: {}
node-fetch-native@1.6.7: {}
@@ -7346,6 +7488,11 @@ snapshots:
node-forge@1.4.0: {}
node-gyp-build-optional-packages@5.2.2:
dependencies:
detect-libc: 2.1.2
optional: true
node-gyp-build@4.8.4: {}
node-mock-http@1.0.4: {}
@@ -7762,6 +7909,8 @@ snapshots:
semver@6.3.1: {}
semver@7.8.0: {}
semver@7.8.1: {}
send@1.2.1:

View File

@@ -24,9 +24,12 @@
"@types/three": "^0.184.1",
"@typeschema/valibot": "^0.13.4",
"bcryptjs": "^3.0.3",
"bullmq": "^5.77.3",
"drizzle-orm": "^0.45.2",
"firebase-admin": "^13.10.0",
"ioredis": "^5.10.1",
"jose": "^5",
"node-cron": "^4.2.1",
"pg": "^8.21.0",
"puppeteer": "^25.0.4",
"resend": "^6.12.4",
@@ -42,6 +45,7 @@
"node": ">=22"
},
"devDependencies": {
"@types/node-cron": "^3.0.11",
"@types/pg": "^8.20.0",
"drizzle-kit": "^0.31.10",
"jsdom": "^29.1.1",

View File

@@ -9,6 +9,7 @@ import { hometitleRouter } from "./routers/hometitle";
import { removebrokersRouter } from "./routers/removebrokers";
import { correlationRouter } from "./routers/correlation";
import { reportsRouter } from "./routers/reports";
import { schedulerRouter } from "./routers/scheduler";
import { createTRPCRouter } from "./utils";
export const appRouter = createTRPCRouter({
@@ -23,6 +24,7 @@ export const appRouter = createTRPCRouter({
removebrokers: removebrokersRouter,
correlation: correlationRouter,
reports: reportsRouter,
scheduler: schedulerRouter,
});
export type AppRouter = typeof appRouter;

View File

@@ -0,0 +1,82 @@
import { wrap } from "@typeschema/valibot";
import { object, string } from "valibot";
import { createTRPCRouter, adminProcedure, publicProcedure } from "../utils";
import { RunJobNowSchema, JobStatusSchema } from "../schemas/scheduler";
import { getQueue } from "~/server/jobs";
import { registerSchedules, scheduleForSubscription, removeSchedulesForSubscription, getCronOverview } from "~/server/jobs";
import { JOB_TYPES, type JobType } from "~/server/jobs/queue";
export const schedulerRouter = createTRPCRouter({
getCronOverview: adminProcedure.query(async () => {
return { overview: getCronOverview() };
}),
runJobNow: adminProcedure
.input(wrap(RunJobNowSchema))
.mutation(async ({ input }) => {
const { type, payload } = input;
if (!JOB_TYPES.includes(type as JobType)) {
throw new Error(`Invalid job type: ${type}. Must be one of: ${JOB_TYPES.join(", ")}`);
}
const queue = getQueue();
const job = await queue.enqueue(type as JobType, payload as any);
return { jobId: job.id, status: job.status };
}),
getJobStatus: adminProcedure
.input(wrap(JobStatusSchema))
.query(async ({ input }) => {
const queue = getQueue();
const job = await queue.getJob(input.jobId);
if (!job) {
throw new Error(`Job not found: ${input.jobId}`);
}
return {
id: job.id,
type: job.type,
status: job.status,
attempts: job.attempts,
maxAttempts: job.maxAttempts,
error: job.error ?? null,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
};
}),
listJobs: adminProcedure
.query(async () => {
const queue = getQueue();
const jobs = await queue.getJobs();
return jobs.map((j) => ({
id: j.id,
type: j.type,
status: j.status,
attempts: j.attempts,
maxAttempts: j.maxAttempts,
error: j.error ?? null,
createdAt: j.createdAt,
updatedAt: j.updatedAt,
}));
}),
reloadSchedules: adminProcedure.mutation(async () => {
await registerSchedules();
return { success: true };
}),
scheduleForTier: adminProcedure
.input(wrap(object({ subscriptionId: string(), userId: string(), tier: string() })))
.mutation(async ({ input }) => {
scheduleForSubscription(input);
return { success: true };
}),
removeSchedules: adminProcedure
.input(wrap(object({ subscriptionId: string() })))
.mutation(async ({ input }) => {
removeSchedulesForSubscription(input.subscriptionId);
return { success: true };
}),
});

View File

@@ -0,0 +1,19 @@
import { object, string, optional, enumType } from "valibot";
export const RunJobNowSchema = object({
type: string(),
payload: object({
userId: optional(string()),
subscriptionId: optional(string()),
requestId: optional(string()),
reportType: optional(string()),
reportScheduleId: optional(string()),
channel: optional(string()),
alertId: optional(string()),
jobId: optional(string()),
}),
});
export const JobStatusSchema = object({
jobId: string(),
});

View File

@@ -0,0 +1,65 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
function makeChain(result: any[]) {
const chain = {
from: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
limit: vi.fn().mockReturnThis(),
then: vi.fn().mockImplementation((fn: Function) => Promise.resolve(fn(result))),
};
return chain;
}
vi.mock("~/server/db", () => ({
db: {
select: vi.fn().mockReturnValue(makeChain([])),
},
}));
vi.mock("~/server/services/darkwatch.service", () => ({
runScan: vi.fn(),
}));
import { db } from "~/server/db";
import { runScan } from "~/server/services/darkwatch.service";
import { handler } from "./darkwatch.scan";
const mockDb = vi.mocked(db);
const mockRunScan = vi.mocked(runScan);
describe("darkwatch.scan handler", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("skips scan when subscription is not active", async () => {
mockDb.select.mockReturnValue(makeChain([]));
await handler({ userId: "user-1", subscriptionId: "sub-1" });
expect(mockRunScan).not.toHaveBeenCalled();
});
it("skips scan when no active watchlist items exist", async () => {
// First call for subscription check returns active sub
mockDb.select.mockReturnValueOnce(makeChain([{ id: "sub-1", userId: "user-1", tier: "plus", status: "active" }]))
// Second call for watchlist items returns empty
.mockReturnValueOnce(makeChain([]));
await handler({ userId: "user-1", subscriptionId: "sub-1" });
expect(mockRunScan).not.toHaveBeenCalled();
});
it("triggers scan when active watchlist items exist", async () => {
mockRunScan.mockResolvedValue({ scanId: "scan-1" });
mockDb.select
.mockReturnValueOnce(makeChain([{ id: "sub-1", userId: "user-1", tier: "plus", status: "active" }]))
.mockReturnValueOnce(makeChain([{ id: "item-1", type: "email", value: "test@test.com" }]));
await handler({ userId: "user-1", subscriptionId: "sub-1" });
expect(mockRunScan).toHaveBeenCalledWith("user-1");
});
});

View File

@@ -0,0 +1,38 @@
import { eq, and } from "drizzle-orm";
import { db } from "~/server/db";
import { watchlistItems, subscriptions } from "~/server/db/schema";
import { runScan } from "~/server/services/darkwatch.service";
interface DarkWatchScanPayload {
userId: string;
subscriptionId: string;
}
export async function handler(payload: DarkWatchScanPayload): Promise<void> {
const { userId, subscriptionId } = payload;
const sub = await db
.select()
.from(subscriptions)
.where(and(eq(subscriptions.id, subscriptionId), eq(subscriptions.status, "active")))
.limit(1)
.then((r) => r[0]);
if (!sub) {
console.warn(`[darkwatch.scan] Subscription ${subscriptionId} not found or inactive, skipping`);
return;
}
const items = await db
.select()
.from(watchlistItems)
.where(and(eq(watchlistItems.subscriptionId, subscriptionId), eq(watchlistItems.isActive, true)));
if (items.length === 0) {
console.log(`[darkwatch.scan] No active watchlist items for subscription ${subscriptionId}`);
return;
}
await runScan(userId);
console.log(`[darkwatch.scan] Completed scan for subscription ${subscriptionId}`);
}

View File

@@ -0,0 +1,38 @@
import { eq, and } from "drizzle-orm";
import { db } from "~/server/db";
import { propertyWatchlistItems, subscriptions } from "~/server/db/schema";
import { runScan } from "~/server/services/hometitle.service";
interface HomeTitleScanPayload {
userId: string;
subscriptionId: string;
}
export async function handler(payload: HomeTitleScanPayload): Promise<void> {
const { userId, subscriptionId } = payload;
const sub = await db
.select()
.from(subscriptions)
.where(and(eq(subscriptions.id, subscriptionId), eq(subscriptions.status, "active")))
.limit(1)
.then((r) => r[0]);
if (!sub) {
console.warn(`[hometitle.scan] Subscription ${subscriptionId} not found or inactive, skipping`);
return;
}
const items = await db
.select()
.from(propertyWatchlistItems)
.where(and(eq(propertyWatchlistItems.subscriptionId, subscriptionId), eq(propertyWatchlistItems.isActive, true)));
if (items.length === 0) {
console.log(`[hometitle.scan] No watched properties for subscription ${subscriptionId}`);
return;
}
await runScan(userId);
console.log(`[hometitle.scan] Completed property scan for subscription ${subscriptionId}`);
}

View File

@@ -0,0 +1,34 @@
import type { JobPayload, JobType } from "../queue";
export type JobHandler<T extends JobType = JobType> = (payload: JobPayload[T]) => Promise<void>;
export type HandlerMap = {
[K in JobType]: JobHandler<K>;
};
let handlers: HandlerMap | null = null;
export function getHandlers(): HandlerMap {
if (!handlers) {
handlers = {
"darkwatch.scan": require("./darkwatch.scan").handler,
"voiceprint.batch": require("./voiceprint.batch").handler,
"hometitle.scan": require("./hometitle.scan").handler,
"removebrokers.process": require("./removebrokers.process").handler,
"reports.generate": require("./reports.generate").handler,
"notifications.send": require("./notifications.send").handler,
};
}
return handlers;
}
export function setHandlers(mock: Partial<HandlerMap>): void {
handlers = {
"darkwatch.scan": mock["darkwatch.scan"] ?? (async () => {}),
"voiceprint.batch": mock["voiceprint.batch"] ?? (async () => {}),
"hometitle.scan": mock["hometitle.scan"] ?? (async () => {}),
"removebrokers.process": mock["removebrokers.process"] ?? (async () => {}),
"reports.generate": mock["reports.generate"] ?? (async () => {}),
"notifications.send": mock["notifications.send"] ?? (async () => {}),
};
}

View File

@@ -0,0 +1,70 @@
import { eq, and } from "drizzle-orm";
import { db } from "~/server/db";
import { alerts, subscriptions, users } from "~/server/db/schema";
import { sendEmail, sendPush, sendSMS } from "~/server/services/notification.service";
interface NotificationsSendPayload {
userId: string;
alertId?: string;
channel: string;
}
export async function handler(payload: NotificationsSendPayload): Promise<void> {
const { userId, alertId, channel } = payload;
const [user] = await db
.select()
.from(users)
.where(eq(users.id, userId))
.limit(1);
if (!user) {
console.warn(`[notifications.send] User ${userId} not found`);
return;
}
if (alertId) {
const [alert] = await db
.select()
.from(alerts)
.where(and(eq(alerts.id, alertId), eq(alerts.userId, userId)))
.limit(1);
if (!alert) {
console.warn(`[notifications.send] Alert ${alertId} not found`);
return;
}
await sendViaChannel(channel, user, alert.title, alert.message);
} else {
const unsentAlerts = await db
.select()
.from(alerts)
.where(and(eq(alerts.userId, userId), eq(alerts.isRead, false)))
.limit(20);
for (const alert of unsentAlerts) {
for (const ch of alert.channel as string[]) {
await sendViaChannel(ch, user, alert.title, alert.message);
}
}
}
}
async function sendViaChannel(channel: string, user: { email: string; id: string }, title: string, message: string): Promise<void> {
try {
switch (channel) {
case "email":
await sendEmail(user.email, title, `<p>${message}</p>`);
break;
case "push":
await sendPush(user.id, title, message);
break;
case "sms":
await sendSMS(user.email, message);
break;
}
} catch (err) {
console.error(`[notifications.send] Failed to send via ${channel}:`, err);
}
}

View File

@@ -0,0 +1,29 @@
import { eq, and, inArray, or, isNull, lt } from "drizzle-orm";
import { db } from "~/server/db";
import { removalRequests, infoBrokers } from "~/server/db/schema";
import { processRemovals } from "~/server/services/removebrokers.service";
interface RemoveBrokersProcessPayload {
subscriptionId?: string;
requestId?: string;
}
export async function handler(payload: RemoveBrokersProcessPayload): Promise<void> {
const { subscriptionId, requestId } = payload;
if (requestId) {
const [request] = await db
.select()
.from(removalRequests)
.where(and(eq(removalRequests.id, requestId), inArray(removalRequests.status, ["PENDING", "FAILED"])))
.limit(1);
if (!request) {
console.warn(`[removebrokers.process] Request ${requestId} not found or not pending`);
return;
}
}
const result = await processRemovals();
console.log(`[removebrokers.process] Processed ${result.processed} removal requests`);
}

View File

@@ -0,0 +1,58 @@
import { eq, and } from "drizzle-orm";
import { db } from "~/server/db";
import { reportSchedules } from "~/server/db/schema";
import { generateReport } from "~/server/services/reports.service";
interface ReportsGeneratePayload {
userId: string;
reportScheduleId?: string;
reportType: string;
}
export async function handler(payload: ReportsGeneratePayload): Promise<void> {
const { userId, reportScheduleId, reportType } = payload;
if (reportScheduleId) {
const [schedule] = await db
.select()
.from(reportSchedules)
.where(and(eq(reportSchedules.id, reportScheduleId), eq(reportSchedules.enabled, true)))
.limit(1);
if (!schedule) {
console.warn(`[reports.generate] Schedule ${reportScheduleId} not found or disabled`);
return;
}
await generateReport(userId, schedule.reportType as any, schedule.lastGeneratedAt?.toISOString());
} else {
await generateReport(userId, reportType as any);
}
if (reportScheduleId) {
await db
.update(reportSchedules)
.set({
lastGeneratedAt: new Date(),
nextScheduledAt: calculateNextRun(reportType),
updatedAt: new Date(),
})
.where(eq(reportSchedules.id, reportScheduleId));
}
console.log(`[reports.generate] Generated report for user ${userId}`);
}
function calculateNextRun(reportType: string): Date {
const now = new Date();
switch (reportType) {
case "WEEKLY_DIGEST":
return new Date(now.getTime() + 7 * 24 * 60 * 60 * 1000);
case "MONTHLY_PLUS":
return new Date(now.getTime() + 30 * 24 * 60 * 60 * 1000);
case "ANNUAL_PREMIUM":
return new Date(now.getTime() + 365 * 24 * 60 * 60 * 1000);
default:
return new Date(now.getTime() + 30 * 24 * 60 * 60 * 1000);
}
}

View File

@@ -0,0 +1,64 @@
import { eq, and, inArray } from "drizzle-orm";
import { db } from "~/server/db";
import { analysisJobs, analysisResults, subscriptions } from "~/server/db/schema";
import { analyzeAudio } from "~/server/services/voiceprint.service";
interface VoiceprintBatchPayload {
userId?: string;
jobId?: string;
}
export async function handler(payload: VoiceprintBatchPayload): Promise<void> {
const { userId, jobId } = payload;
const conditions = [eq(analysisJobs.status, "PENDING")];
if (userId) conditions.push(eq(analysisJobs.userId, userId));
if (jobId) conditions.push(eq(analysisJobs.id, jobId));
const pendingJobs = await db
.select()
.from(analysisJobs)
.where(and(...conditions))
.limit(20);
if (pendingJobs.length === 0) {
console.log("[voiceprint.batch] No pending analysis jobs");
return;
}
for (const job of pendingJobs) {
try {
await db
.update(analysisJobs)
.set({ status: "RUNNING" })
.where(eq(analysisJobs.id, job.id));
const result = await analyzeAudio(job.userId, job.audioFilePath);
await db.insert(analysisResults).values({
analysisJobId: job.id,
syntheticScore: result.isSynthetic ? result.confidence : 1 - result.confidence,
verdict: result.verdict,
confidence: result.confidence,
processingTimeMs: 0,
matchedEnrollmentId: result.matchedEnrollmentId ?? undefined,
matchedSimilarity: result.matchedSimilarity ?? undefined,
modelVersion: "v1",
});
await db
.update(analysisJobs)
.set({ status: "COMPLETED", completedAt: new Date() })
.where(eq(analysisJobs.id, job.id));
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err);
console.error(`[voiceprint.batch] Job ${job.id} failed:`, errorMessage);
await db
.update(analysisJobs)
.set({ status: "FAILED", errorMessage })
.where(eq(analysisJobs.id, job.id));
}
}
console.log(`[voiceprint.batch] Processed ${pendingJobs.length} analysis jobs`);
}

View File

@@ -0,0 +1,40 @@
import { startWorker, stopWorker, isWorkerRunning } from "./worker";
import { startScheduler, stopScheduler, isSchedulerRunning } from "./scheduler";
let initialized = false;
export async function initialize(): Promise<void> {
if (initialized) return;
initialized = true;
const isWorker = process.env.JOB_WORKER === "true" || !process.env.REDIS_URL;
const isPrimary = process.env.JOB_PRIMARY === "true" || !process.env.REDIS_URL;
if (isPrimary) {
console.log("[jobs] Starting scheduler...");
await startScheduler();
}
if (isWorker) {
console.log("[jobs] Starting worker...");
await startWorker({
pollInterval: parseInt(process.env.JOB_POLL_INTERVAL ?? "1000", 10),
});
}
console.log("[jobs] Background job system initialized", {
scheduler: isSchedulerRunning(),
worker: isWorkerRunning(),
});
}
export async function shutdown(): Promise<void> {
console.log("[jobs] Shutting down...");
stopScheduler();
await stopWorker();
console.log("[jobs] Shutdown complete");
}
export { getQueue } from "./queue";
export { startWorker, stopWorker, processJob, isWorkerRunning } from "./worker";
export { startScheduler, stopScheduler, registerSchedules, scheduleForSubscription, removeSchedulesForSubscription, getCronOverview, isSchedulerRunning } from "./scheduler";

View File

@@ -0,0 +1,102 @@
import { describe, it, expect, beforeEach } from "vitest";
import { InMemoryQueue } from "./queue";
describe("InMemoryQueue", () => {
let queue: InMemoryQueue;
beforeEach(() => {
queue = new InMemoryQueue();
});
it("enqueues a job with correct payload", async () => {
const job = await queue.enqueue("darkwatch.scan", {
userId: "user-1",
subscriptionId: "sub-1",
});
expect(job.type).toBe("darkwatch.scan");
expect(job.payload).toEqual({ userId: "user-1", subscriptionId: "sub-1" });
expect(job.status).toBe("pending");
expect(job.attempts).toBe(0);
expect(job.maxAttempts).toBe(3);
expect(job.id).toBeDefined();
});
it("dequeues pending jobs in order", async () => {
await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" });
await queue.enqueue("hometitle.scan", { userId: "u2", subscriptionId: "s2" });
const job1 = await queue.dequeue();
expect(job1?.type).toBe("darkwatch.scan");
expect(job1?.status).toBe("running");
const job2 = await queue.dequeue();
expect(job2?.type).toBe("hometitle.scan");
expect(job2?.status).toBe("running");
const job3 = await queue.dequeue();
expect(job3).toBeNull();
});
it("only dequeues pending jobs", async () => {
const job = await queue.enqueue("notifications.send", { userId: "u1", channel: "email" });
await queue.markComplete(job.id);
const result = await queue.dequeue();
expect(result).toBeNull();
});
it("marks job as completed", async () => {
const job = await queue.enqueue("reports.generate", { userId: "u1", reportType: "MONTHLY_PLUS" });
await queue.markComplete(job.id);
const fetched = await queue.getJob(job.id);
expect(fetched?.status).toBe("completed");
});
it("marks job as failed with error", async () => {
const job = await queue.enqueue("voiceprint.batch", {});
await queue.markFailed(job.id, "Test error");
const fetched = await queue.getJob(job.id);
expect(fetched?.status).toBe("failed");
expect(fetched?.error).toBe("Test error");
});
it("returns null for non-existent job", async () => {
const job = await queue.getJob("non-existent");
expect(job).toBeNull();
});
it("filters jobs by status", async () => {
const j1 = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" });
const j2 = await queue.enqueue("hometitle.scan", { userId: "u2", subscriptionId: "s2" });
await queue.markComplete(j1.id);
const pending = await queue.getJobs("pending");
expect(pending).toHaveLength(1);
expect(pending[0].id).toBe(j2.id);
const completed = await queue.getJobs("completed");
expect(completed).toHaveLength(1);
expect(completed[0].id).toBe(j1.id);
});
it("supports delayed enqueue", async () => {
await queue.enqueue("notifications.send", { userId: "u1", channel: "email" }, { delay: 50 });
const immediate = await queue.dequeue();
expect(immediate).toBeNull();
await new Promise((r) => setTimeout(r, 60));
const delayed = await queue.dequeue();
expect(delayed?.type).toBe("notifications.send");
});
it("supports custom maxAttempts", async () => {
const job = await queue.enqueue("removebrokers.process", {}, { maxAttempts: 5 });
expect(job.maxAttempts).toBe(5);
});
});

View File

@@ -0,0 +1,220 @@
import { randomUUID } from "node:crypto";
export const JOB_TYPES = [
"darkwatch.scan",
"voiceprint.batch",
"hometitle.scan",
"removebrokers.process",
"reports.generate",
"notifications.send",
] as const;
export type JobType = (typeof JOB_TYPES)[number];
export type JobPayload = {
"darkwatch.scan": { userId: string; subscriptionId: string };
"voiceprint.batch": { userId?: string; jobId?: string };
"hometitle.scan": { userId: string; subscriptionId: string };
"removebrokers.process": { subscriptionId?: string; requestId?: string };
"reports.generate": { userId: string; reportScheduleId?: string; reportType: string };
"notifications.send": { userId: string; alertId?: string; channel: string };
};
export type JobStatus = "pending" | "running" | "completed" | "failed";
export interface Job<T extends JobType = JobType> {
id: string;
type: T;
payload: JobPayload[T];
status: JobStatus;
attempts: number;
maxAttempts: number;
error?: string;
createdAt: Date;
updatedAt: Date;
}
export interface EnqueueOptions {
delay?: number;
maxAttempts?: number;
}
export interface QueueAdapter {
enqueue<T extends JobType>(type: T, payload: JobPayload[T], options?: EnqueueOptions): Promise<Job<T>>;
dequeue(): Promise<Job | null>;
markComplete(jobId: string): Promise<void>;
markFailed(jobId: string, error: string): Promise<void>;
scheduleRetry(job: Job, delayMs: number): Promise<void>;
getJob(jobId: string): Promise<Job | null>;
getJobs(status?: JobStatus): Promise<Job[]>;
}
export class InMemoryQueue implements QueueAdapter {
private jobs = new Map<string, Job>();
private pendingQueue: string[] = [];
async enqueue<T extends JobType>(type: T, payload: JobPayload[T], options?: EnqueueOptions): Promise<Job<T>> {
const id = randomUUID();
const job: Job<T> = {
id,
type,
payload,
status: "pending",
attempts: 0,
maxAttempts: options?.maxAttempts ?? 3,
createdAt: new Date(),
updatedAt: new Date(),
};
this.jobs.set(id, job as Job);
if (options?.delay) {
setTimeout(() => {
this.pendingQueue.push(id);
}, options.delay);
} else {
this.pendingQueue.push(id);
}
return job;
}
async scheduleRetry(job: Job, delayMs: number): Promise<void> {
job.status = "pending";
job.attempts++;
job.updatedAt = new Date();
setTimeout(() => {
this.pendingQueue.push(job.id);
}, delayMs);
}
async dequeue(): Promise<Job | null> {
while (this.pendingQueue.length > 0) {
const id = this.pendingQueue.shift()!;
const job = this.jobs.get(id);
if (!job || job.status !== "pending") continue;
job.status = "running";
job.updatedAt = new Date();
return job;
}
return null;
}
async markComplete(jobId: string): Promise<void> {
const job = this.jobs.get(jobId);
if (job) {
job.status = "completed";
job.updatedAt = new Date();
}
}
async markFailed(jobId: string, error: string): Promise<void> {
const job = this.jobs.get(jobId);
if (job) {
job.status = "failed";
job.error = error;
job.updatedAt = new Date();
}
}
async getJob(jobId: string): Promise<Job | null> {
return this.jobs.get(jobId) ?? null;
}
async getJobs(status?: JobStatus): Promise<Job[]> {
const all = Array.from(this.jobs.values());
if (status) return all.filter((j) => j.status === status);
return all;
}
}
function createRedisAdapter(): QueueAdapter {
// Lazy imports so this module works without Redis
const BullMQ = require("bullmq");
const IORedis = require("ioredis");
const connection = new IORedis.default(process.env.REDIS_URL ?? "redis://localhost:6379", {
maxRetriesPerRequest: null,
});
const queue = new BullMQ.Queue("shieldai-jobs", { connection });
let bullJobs = new Map<string, any>();
async function toJob(bullJob: any): Promise<Job> {
return {
id: bullJob.id,
type: bullJob.name as JobType,
payload: bullJob.data,
status: (await bullJob.getState()) as JobStatus,
attempts: bullJob.attemptsMade,
maxAttempts: bullJob.opts?.attempts ?? 3,
error: bullJob.failedReason ?? undefined,
createdAt: bullJob.timestamp ? new Date(bullJob.timestamp) : new Date(),
updatedAt: bullJob.processedOn ? new Date(bullJob.processedOn) : new Date(),
};
}
return {
async enqueue<T extends JobType>(type: T, payload: JobPayload[T], options?: EnqueueOptions) {
const bullJob = await queue.add(type, payload, {
attempts: options?.maxAttempts ?? 3,
delay: options?.delay,
backoff: { type: "exponential", delay: 60_000 },
});
return toJob(bullJob) as Promise<Job<T>>;
},
async dequeue() {
// BullMQ Worker handles dequeue automatically; this is for the polling worker
return null;
},
async markComplete(jobId) {
// Handled by BullMQ Worker
},
async markFailed(jobId, error) {
// Handled by BullMQ Worker
},
async scheduleRetry(job, delayMs) {
// BullMQ handles retries via backoff
},
async getJob(jobId) {
const bullJob = await queue.getJob(jobId);
if (!bullJob) return null;
return toJob(bullJob);
},
async getJobs(status) {
const states = status ? [status] : ["waiting", "active", "completed", "failed"];
const allJobs: Job[] = [];
for (const state of states) {
const jobs = await queue.getJobs(state);
for (const j of jobs) {
allJobs.push(await toJob(j));
}
}
return allJobs;
},
};
}
let adapter: QueueAdapter;
export function getQueue(): QueueAdapter {
if (!adapter) {
if (process.env.REDIS_URL) {
adapter = createRedisAdapter();
} else {
adapter = new InMemoryQueue();
}
}
return adapter;
}
export function setQueue(mock: QueueAdapter): void {
adapter = mock;
}
export function resetQueue(): void {
adapter = undefined as unknown as QueueAdapter;
}

View File

@@ -0,0 +1,159 @@
import cron from "node-cron";
import { eq, and } from "drizzle-orm";
import { db } from "~/server/db";
import { subscriptions } from "~/server/db/schema";
import { getQueue } from "./queue";
import type { JobType } from "./queue";
const TIER_SCHEDULES: Record<string, Array<{ type: JobType; cron: string }>> = {
basic: [
{ type: "darkwatch.scan", cron: "0 0 1 * *" },
{ type: "hometitle.scan", cron: "0 0 2 * *" },
],
plus: [
{ type: "darkwatch.scan", cron: "0 0 * * 0" },
{ type: "hometitle.scan", cron: "0 0 * * 6" },
{ type: "reports.generate", cron: "0 0 1 * *" },
],
premium: [
{ type: "darkwatch.scan", cron: "0 0 * * *" },
{ type: "hometitle.scan", cron: "0 0 * * *" },
{ type: "reports.generate", cron: "0 0 * * 0" },
],
};
const CRON_OVERVIEW: Record<string, string> = {
basic: "Basic: DarkWatch monthly (1st), HomeTitle monthly (2nd)",
plus: "Plus: DarkWatch weekly (Sun), HomeTitle weekly (Sat), Reports monthly (1st)",
premium: "Premium: DarkWatch daily, HomeTitle daily, Reports weekly (Sun)",
};
interface SchedulerEntry {
subscriptionId: string;
userId: string;
type: JobType;
task: cron.ScheduledTask;
}
let activeSchedules: SchedulerEntry[] = [];
let schedulerRunning = false;
export function getCronOverview(): string {
return Object.values(CRON_OVERVIEW).join("\n");
}
async function enqueueScheduledJob(type: JobType, userId: string, subscriptionId: string): Promise<void> {
const queue = getQueue();
switch (type) {
case "darkwatch.scan":
await queue.enqueue(type, { userId, subscriptionId });
break;
case "hometitle.scan":
await queue.enqueue(type, { userId, subscriptionId });
break;
case "reports.generate":
await queue.enqueue(type, { userId, reportType: type === "reports.generate" ? "MONTHLY_PLUS" : "WEEKLY_DIGEST" });
break;
default:
await queue.enqueue(type, { userId, subscriptionId });
}
}
export async function registerSchedules(): Promise<void> {
clearSchedules();
const activeSubs = await db
.select()
.from(subscriptions)
.where(and(eq(subscriptions.status, "active")));
for (const sub of activeSubs) {
const schedules = TIER_SCHEDULES[sub.tier];
if (!schedules) continue;
for (const schedule of schedules) {
if (!cron.validate(schedule.cron)) {
console.warn(`[scheduler] Invalid cron expression: ${schedule.cron}`);
continue;
}
const task = cron.schedule(schedule.cron, () => {
enqueueScheduledJob(schedule.type, sub.userId, sub.id).catch((err) => {
console.error(`[scheduler] Failed to enqueue ${schedule.type} for ${sub.id}:`, err);
});
});
activeSchedules.push({
subscriptionId: sub.id,
userId: sub.userId,
type: schedule.type,
task,
});
}
}
console.log(`[scheduler] Registered ${activeSchedules.length} schedules for ${activeSubs.length} subscriptions`);
}
export function scheduleForSubscription(
subscription: { id: string; userId: string; tier: string },
): void {
removeSchedulesForSubscription(subscription.id);
const schedules = TIER_SCHEDULES[subscription.tier];
if (!schedules) return;
for (const schedule of schedules) {
if (!cron.validate(schedule.cron)) continue;
const task = cron.schedule(schedule.cron, () => {
enqueueScheduledJob(schedule.type, subscription.userId, subscription.id).catch((err) => {
console.error(`[scheduler] Failed to enqueue ${schedule.type} for ${subscription.id}:`, err);
});
});
activeSchedules.push({
subscriptionId: subscription.id,
userId: subscription.userId,
type: schedule.type,
task,
});
}
}
export function removeSchedulesForSubscription(subscriptionId: string): void {
const count = activeSchedules.filter((s) => s.subscriptionId === subscriptionId).length;
activeSchedules = activeSchedules.filter((s) => {
if (s.subscriptionId === subscriptionId) {
s.task.stop();
return false;
}
return true;
});
if (count > 0) {
console.log(`[scheduler] Removed ${count} schedules for subscription ${subscriptionId}`);
}
}
export function clearSchedules(): void {
for (const entry of activeSchedules) {
entry.task.stop();
}
activeSchedules = [];
console.log("[scheduler] All schedules cleared");
}
export async function startScheduler(): Promise<void> {
if (schedulerRunning) return;
schedulerRunning = true;
await registerSchedules();
}
export function stopScheduler(): void {
clearSchedules();
schedulerRunning = false;
}
export function isSchedulerRunning(): boolean {
return schedulerRunning;
}

View File

@@ -0,0 +1,170 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { InMemoryQueue, setQueue, resetQueue } from "./queue";
import { processJob, startWorker, stopWorker } from "./worker";
import { setHandlers } from "./handlers";
import type { Job } from "./queue";
describe("worker", () => {
let queue: InMemoryQueue;
beforeEach(() => {
queue = new InMemoryQueue();
setQueue(queue);
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
resetQueue();
});
describe("processJob", () => {
it("dispatches darkwatch.scan to correct handler", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "darkwatch.scan": handler });
const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" });
await processJob(job);
expect(handler).toHaveBeenCalledWith({ userId: "u1", subscriptionId: "s1" });
});
it("dispatches hometitle.scan to correct handler", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "hometitle.scan": handler });
const job = await queue.enqueue("hometitle.scan", { userId: "u1", subscriptionId: "s1" });
await processJob(job);
expect(handler).toHaveBeenCalledWith({ userId: "u1", subscriptionId: "s1" });
});
it("dispatches reports.generate to correct handler", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "reports.generate": handler });
const job = await queue.enqueue("reports.generate", { userId: "u1", reportType: "MONTHLY_PLUS" });
await processJob(job);
expect(handler).toHaveBeenCalledWith({ userId: "u1", reportType: "MONTHLY_PLUS" });
});
it("dispatches notifications.send to correct handler", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "notifications.send": handler });
const job = await queue.enqueue("notifications.send", { userId: "u1", channel: "email" });
await processJob(job);
expect(handler).toHaveBeenCalledWith({ userId: "u1", channel: "email" });
});
it("dispatches removebrokers.process to correct handler", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "removebrokers.process": handler });
const job = await queue.enqueue("removebrokers.process", {});
await processJob(job);
expect(handler).toHaveBeenCalledWith({});
});
it("dispatches voiceprint.batch to correct handler", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "voiceprint.batch": handler });
const job = await queue.enqueue("voiceprint.batch", { userId: "u1" });
await processJob(job);
expect(handler).toHaveBeenCalledWith({ userId: "u1" });
});
it("marks job as completed on success", async () => {
setHandlers({ "darkwatch.scan": vi.fn().mockResolvedValue(undefined) });
const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" });
await processJob(job);
const updated = await queue.getJob(job.id);
expect(updated?.status).toBe("completed");
});
it("schedules retry on failure with exponential backoff", async () => {
const handler = vi.fn().mockRejectedValue(new Error("Scan failed"));
setHandlers({ "darkwatch.scan": handler });
const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" });
await processJob(job);
// After first retry, job should be pending with attempt 1
const updated = await queue.getJob(job.id);
expect(updated?.attempts).toBe(1);
expect(updated?.status).toBe("pending");
// Retry delay should be 60s for first retry
expect(handler).toHaveBeenCalledTimes(1);
});
it("marks as failed after exhausting retries", async () => {
const handler = vi.fn().mockRejectedValue(new Error("Final failure"));
setHandlers({ "darkwatch.scan": handler });
const job = await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" }, { maxAttempts: 2 });
// First attempt - retries (schedules retry)
await processJob(job);
expect(handler).toHaveBeenCalledTimes(1);
expect(job.attempts).toBe(1);
// Advance past the retry delay so the retry is available
vi.advanceTimersByTime(60001);
// Second attempt - dequeue and process
const retried = await queue.dequeue();
expect(retried).not.toBeNull();
if (retried) {
await processJob(retried);
}
// After max attempts, job should be failed
const failed = await queue.getJob(job.id);
expect(failed?.status).toBe("failed");
expect(failed?.error).toBe("Final failure");
});
});
describe("startWorker / stopWorker", () => {
it("polls queue and processes jobs", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "darkwatch.scan": handler });
await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" });
startWorker({ pollInterval: 50 });
await vi.advanceTimersByTimeAsync(100);
expect(handler).toHaveBeenCalled();
await stopWorker();
});
it("stops polling after stopWorker", async () => {
const handler = vi.fn().mockResolvedValue(undefined);
setHandlers({ "darkwatch.scan": handler });
startWorker({ pollInterval: 50 });
await stopWorker();
await queue.enqueue("darkwatch.scan", { userId: "u1", subscriptionId: "s1" });
await vi.advanceTimersByTimeAsync(200);
expect(handler).not.toHaveBeenCalled();
});
});
});

View File

@@ -0,0 +1,86 @@
import { getQueue, type Job } from "./queue";
import { getHandlers } from "./handlers";
const RETRY_DELAYS = [60_000, 300_000, 900_000]; // 1min, 5min, 15min
export interface WorkerOptions {
pollInterval?: number;
concurrency?: number;
}
let running = false;
let pollTimer: ReturnType<typeof setInterval> | null = null;
let activeJobs = new Set<string>();
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export async function processJob(job: Job): Promise<void> {
const handlers = getHandlers();
const handler = handlers[job.type];
if (!handler) {
throw new Error(`No handler registered for job type: ${job.type}`);
}
const queue = getQueue();
activeJobs.add(job.id);
try {
await handler(job.payload as any);
await queue.markComplete(job.id);
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err);
console.error(`[worker] Job ${job.id} (${job.type}) failed:`, errorMessage);
if (job.attempts < job.maxAttempts - 1) {
const delay = RETRY_DELAYS[job.attempts] ?? RETRY_DELAYS[RETRY_DELAYS.length - 1];
console.log(`[worker] Scheduling retry ${job.attempts + 1}/${job.maxAttempts} for job ${job.id} in ${delay}ms`);
await queue.scheduleRetry(job, delay);
} else {
await queue.markFailed(job.id, errorMessage);
}
} finally {
activeJobs.delete(job.id);
}
}
export async function startWorker(options: WorkerOptions = {}): Promise<void> {
if (running) return;
running = true;
const pollInterval = options.pollInterval ?? 1000;
const queue = getQueue();
const poll = async () => {
if (!running) return;
try {
const job = await queue.dequeue();
if (job) {
processJob(job).catch((err) => {
console.error(`[worker] Unexpected error processing job ${job.id}:`, err);
});
}
} catch (err) {
console.error("[worker] Poll error:", err);
}
};
poll();
pollTimer = setInterval(poll, pollInterval);
}
export async function stopWorker(): Promise<void> {
running = false;
if (pollTimer) {
clearInterval(pollTimer);
pollTimer = null;
}
while (activeJobs.size > 0) {
await sleep(100);
}
}
export function isWorkerRunning(): boolean {
return running;
}