Files
Kordant/tasks/kordant-unified-restructure/22-background-jobs.md
2026-05-25 22:49:37 -04:00

5.7 KiB

22. Background Jobs — Scheduler, Scan Workers, and Reminders

meta: id: kordant-unified-restructure-22 feature: kordant-unified-restructure priority: P1 depends_on: [kordant-unified-restructure-20] tags: [backend, jobs, scheduler, workers, background]

objective:

  • Build the background job system for the unified monolith. Port logic from packages/jobs/ and all service schedulers into a unified job queue and worker system that handles recurring scans, report generation, and reminder notifications.

deliverables:

  • web/src/server/jobs/queue.ts — Job queue abstraction:
    • Interface for enqueueing, dequeuing, and marking jobs complete/failed
    • Implementation using bullmq with Redis, or lightweight in-memory queue for dev
    • Job types: darkwatch.scan, voiceprint.batch, hometitle.scan, removebrokers.process, reports.generate, notifications.send
  • web/src/server/jobs/worker.ts — Worker process:
    • Polls queue for jobs
    • Dispatches to correct handler based on job type
    • Handles retries with exponential backoff
    • Logs all job execution
  • web/src/server/jobs/handlers/ — Job handlers:
    • darkwatch.scan.ts — scheduled dark web scans per subscription tier
    • voiceprint.batch.ts — batch voice analysis jobs
    • hometitle.scan.ts — scheduled property record scans
    • removebrokers.process.ts — process pending removal requests
    • reports.generate.ts — scheduled report generation
    • notifications.send.ts — queued notification delivery
  • web/src/server/jobs/scheduler.ts — Cron scheduler:
    • Registers recurring jobs based on schedules in DB
    • Tier-based frequencies:
      • Basic: DarkWatch monthly, HomeTitle monthly
      • Plus: DarkWatch weekly, HomeTitle weekly, Reports monthly
      • Premium: DarkWatch daily, HomeTitle daily, Reports weekly, real-time monitoring
    • Uses node-cron or bullmq repeatables
  • web/src/server/jobs/index.ts — Entry point:
    • Initializes queue, scheduler, and workers
    • Graceful shutdown handling

steps:

  1. Install dependencies in web/:
    • bullmq (Redis-based queue)
    • ioredis (Redis client)
    • node-cron (cron expressions)
  2. Create web/src/server/jobs/queue.ts:
    • Define JobType enum
    • Define JobPayload union type per job type
    • enqueue(jobType, payload, options?) — add job to queue
    • dequeue() — get next job (used by worker)
    • markComplete(jobId), markFailed(jobId, error) — update status
  3. Create web/src/server/jobs/worker.ts:
    • startWorker() — begin polling loop
    • processJob(job) — switch on job.type, call handler
    • stopWorker() — graceful shutdown
    • Retry logic: max 3 attempts, exponential backoff (1min, 5min, 15min)
  4. Create job handlers:
    • Each handler is an async function taking payload and returning result
    • darkwatch.scan: query all active watchlists, run scan engine (task 15), create alerts
    • voiceprint.batch: process pending AnalysisJob records, run ML inference
    • hometitle.scan: query all watched properties, run county scans (task 18), detect changes
    • removebrokers.process: query PENDING requests, attempt removal, update status
    • reports.generate: compile data, render HTML, generate PDF, save to storage
    • notifications.send: send queued email/push/SMS via notification service (task 14)
  5. Create web/src/server/jobs/scheduler.ts:
    • registerSchedules() — read SchedulerConfig from DB, create cron jobs
    • scheduleForSubscription(subscription) — create tier-appropriate schedules
    • removeSchedulesForSubscription(subscriptionId) — clean up on cancellation
  6. Create web/src/server/jobs/index.ts:
    • Initialize Redis connection
    • Start scheduler
    • Start worker(s)
    • Handle SIGINT/SIGTERM for graceful shutdown
  7. Integrate with tRPC:
    • Add scheduler.runJobNow admin procedure for manual job triggering
    • Add scheduler.getJobStatus procedure for checking job progress
  8. Write unit tests for handlers with mocked services.

steps:

  • Unit: enqueue adds job to queue with correct payload
  • Unit: processJob dispatches to correct handler
  • Unit: darkwatch.scan handler queries watchlists and triggers scans
  • Unit: Retry logic attempts job 3 times with correct delays
  • Integration: Scheduled job runs and creates expected DB records

acceptance_criteria:

  • Jobs can be enqueued with type-specific payloads
  • Worker processes jobs and dispatches to correct handler
  • Failed jobs are retried up to 3 times with exponential backoff
  • Scheduled scans run at tier-appropriate frequencies
  • Manual job triggering works via admin API
  • Job status is queryable (pending, running, completed, failed)
  • Graceful shutdown completes in-flight jobs before exiting
  • All job handlers are tested with mocked dependencies

validation:

  • Enqueue a test job and verify it appears in queue
  • Start worker and verify job is processed
  • Cause a handler to throw and verify retry behavior
  • Verify scheduled jobs are created for a Premium subscription
  • Run cd web && pnpm test for job unit tests

notes:

  • Reference legacy: packages/jobs/src/, services/*/src/scheduler.service.ts
  • Redis is required for BullMQ. For local development without Redis, implement a simple in-memory queue fallback.
  • The worker can run in the same process as the web server (dev) or as a separate process (production). Design for both.
  • Job handlers should be idempotent: running the same job twice should not create duplicates.
  • Consider using bullmq's built-in cron support instead of node-cron for better reliability.
  • Monitor queue depth and job failure rates. Add alerting if queue grows unexpectedly.
  • For high-volume operations (e.g., scanning thousands of watchlist items), consider batching jobs or using worker concurrency.