5.7 KiB
5.7 KiB
22. Background Jobs — Scheduler, Scan Workers, and Reminders
meta: id: kordant-unified-restructure-22 feature: kordant-unified-restructure priority: P1 depends_on: [kordant-unified-restructure-20] tags: [backend, jobs, scheduler, workers, background]
objective:
- Build the background job system for the unified monolith. Port logic from
packages/jobs/and all service schedulers into a unified job queue and worker system that handles recurring scans, report generation, and reminder notifications.
deliverables:
web/src/server/jobs/queue.ts— Job queue abstraction:- Interface for enqueueing, dequeuing, and marking jobs complete/failed
- Implementation using
bullmqwith Redis, or lightweight in-memory queue for dev - Job types:
darkwatch.scan,voiceprint.batch,hometitle.scan,removebrokers.process,reports.generate,notifications.send
web/src/server/jobs/worker.ts— Worker process:- Polls queue for jobs
- Dispatches to correct handler based on job type
- Handles retries with exponential backoff
- Logs all job execution
web/src/server/jobs/handlers/— Job handlers:darkwatch.scan.ts— scheduled dark web scans per subscription tiervoiceprint.batch.ts— batch voice analysis jobshometitle.scan.ts— scheduled property record scansremovebrokers.process.ts— process pending removal requestsreports.generate.ts— scheduled report generationnotifications.send.ts— queued notification delivery
web/src/server/jobs/scheduler.ts— Cron scheduler:- Registers recurring jobs based on schedules in DB
- Tier-based frequencies:
- Basic: DarkWatch monthly, HomeTitle monthly
- Plus: DarkWatch weekly, HomeTitle weekly, Reports monthly
- Premium: DarkWatch daily, HomeTitle daily, Reports weekly, real-time monitoring
- Uses
node-cronorbullmqrepeatables
web/src/server/jobs/index.ts— Entry point:- Initializes queue, scheduler, and workers
- Graceful shutdown handling
steps:
- Install dependencies in
web/:bullmq(Redis-based queue)ioredis(Redis client)node-cron(cron expressions)
- Create
web/src/server/jobs/queue.ts:- Define
JobTypeenum - Define
JobPayloadunion type per job type enqueue(jobType, payload, options?)— add job to queuedequeue()— get next job (used by worker)markComplete(jobId),markFailed(jobId, error)— update status
- Define
- Create
web/src/server/jobs/worker.ts:startWorker()— begin polling loopprocessJob(job)— switch onjob.type, call handlerstopWorker()— graceful shutdown- Retry logic: max 3 attempts, exponential backoff (1min, 5min, 15min)
- Create job handlers:
- Each handler is an async function taking
payloadand returning result darkwatch.scan: query all active watchlists, run scan engine (task 15), create alertsvoiceprint.batch: process pendingAnalysisJobrecords, run ML inferencehometitle.scan: query all watched properties, run county scans (task 18), detect changesremovebrokers.process: query PENDING requests, attempt removal, update statusreports.generate: compile data, render HTML, generate PDF, save to storagenotifications.send: send queued email/push/SMS via notification service (task 14)
- Each handler is an async function taking
- Create
web/src/server/jobs/scheduler.ts:registerSchedules()— readSchedulerConfigfrom DB, create cron jobsscheduleForSubscription(subscription)— create tier-appropriate schedulesremoveSchedulesForSubscription(subscriptionId)— clean up on cancellation
- Create
web/src/server/jobs/index.ts:- Initialize Redis connection
- Start scheduler
- Start worker(s)
- Handle SIGINT/SIGTERM for graceful shutdown
- Integrate with tRPC:
- Add
scheduler.runJobNowadmin procedure for manual job triggering - Add
scheduler.getJobStatusprocedure for checking job progress
- Add
- Write unit tests for handlers with mocked services.
steps:
- Unit:
enqueueadds job to queue with correct payload - Unit:
processJobdispatches to correct handler - Unit:
darkwatch.scanhandler queries watchlists and triggers scans - Unit: Retry logic attempts job 3 times with correct delays
- Integration: Scheduled job runs and creates expected DB records
acceptance_criteria:
- Jobs can be enqueued with type-specific payloads
- Worker processes jobs and dispatches to correct handler
- Failed jobs are retried up to 3 times with exponential backoff
- Scheduled scans run at tier-appropriate frequencies
- Manual job triggering works via admin API
- Job status is queryable (pending, running, completed, failed)
- Graceful shutdown completes in-flight jobs before exiting
- All job handlers are tested with mocked dependencies
validation:
- Enqueue a test job and verify it appears in queue
- Start worker and verify job is processed
- Cause a handler to throw and verify retry behavior
- Verify scheduled jobs are created for a Premium subscription
- Run
cd web && pnpm testfor job unit tests
notes:
- Reference legacy:
packages/jobs/src/,services/*/src/scheduler.service.ts - Redis is required for BullMQ. For local development without Redis, implement a simple in-memory queue fallback.
- The worker can run in the same process as the web server (dev) or as a separate process (production). Design for both.
- Job handlers should be idempotent: running the same job twice should not create duplicates.
- Consider using
bullmq's built-in cron support instead ofnode-cronfor better reliability. - Monitor queue depth and job failure rates. Add alerting if queue grows unexpectedly.
- For high-volume operations (e.g., scanning thousands of watchlist items), consider batching jobs or using worker concurrency.