111 lines
5.7 KiB
Markdown
111 lines
5.7 KiB
Markdown
# 22. Background Jobs — Scheduler, Scan Workers, and Reminders
|
|
|
|
meta:
|
|
id: kordant-unified-restructure-22
|
|
feature: kordant-unified-restructure
|
|
priority: P1
|
|
depends_on: [kordant-unified-restructure-20]
|
|
tags: [backend, jobs, scheduler, workers, background]
|
|
|
|
objective:
|
|
- Build the background job system for the unified monolith. Port logic from `packages/jobs/` and all service schedulers into a unified job queue and worker system that handles recurring scans, report generation, and reminder notifications.
|
|
|
|
deliverables:
|
|
- `web/src/server/jobs/queue.ts` — Job queue abstraction:
|
|
- Interface for enqueueing, dequeuing, and marking jobs complete/failed
|
|
- Implementation using `bullmq` with Redis, or lightweight in-memory queue for dev
|
|
- Job types: `darkwatch.scan`, `voiceprint.batch`, `hometitle.scan`, `removebrokers.process`, `reports.generate`, `notifications.send`
|
|
- `web/src/server/jobs/worker.ts` — Worker process:
|
|
- Polls queue for jobs
|
|
- Dispatches to correct handler based on job type
|
|
- Handles retries with exponential backoff
|
|
- Logs all job execution
|
|
- `web/src/server/jobs/handlers/` — Job handlers:
|
|
- `darkwatch.scan.ts` — scheduled dark web scans per subscription tier
|
|
- `voiceprint.batch.ts` — batch voice analysis jobs
|
|
- `hometitle.scan.ts` — scheduled property record scans
|
|
- `removebrokers.process.ts` — process pending removal requests
|
|
- `reports.generate.ts` — scheduled report generation
|
|
- `notifications.send.ts` — queued notification delivery
|
|
- `web/src/server/jobs/scheduler.ts` — Cron scheduler:
|
|
- Registers recurring jobs based on schedules in DB
|
|
- Tier-based frequencies:
|
|
- Basic: DarkWatch monthly, HomeTitle monthly
|
|
- Plus: DarkWatch weekly, HomeTitle weekly, Reports monthly
|
|
- Premium: DarkWatch daily, HomeTitle daily, Reports weekly, real-time monitoring
|
|
- Uses `node-cron` or `bullmq` repeatables
|
|
- `web/src/server/jobs/index.ts` — Entry point:
|
|
- Initializes queue, scheduler, and workers
|
|
- Graceful shutdown handling
|
|
|
|
steps:
|
|
1. Install dependencies in `web/`:
|
|
- `bullmq` (Redis-based queue)
|
|
- `ioredis` (Redis client)
|
|
- `node-cron` (cron expressions)
|
|
2. Create `web/src/server/jobs/queue.ts`:
|
|
- Define `JobType` enum
|
|
- Define `JobPayload` union type per job type
|
|
- `enqueue(jobType, payload, options?)` — add job to queue
|
|
- `dequeue()` — get next job (used by worker)
|
|
- `markComplete(jobId)`, `markFailed(jobId, error)` — update status
|
|
3. Create `web/src/server/jobs/worker.ts`:
|
|
- `startWorker()` — begin polling loop
|
|
- `processJob(job)` — switch on `job.type`, call handler
|
|
- `stopWorker()` — graceful shutdown
|
|
- Retry logic: max 3 attempts, exponential backoff (1min, 5min, 15min)
|
|
4. Create job handlers:
|
|
- Each handler is an async function taking `payload` and returning result
|
|
- `darkwatch.scan`: query all active watchlists, run scan engine (task 15), create alerts
|
|
- `voiceprint.batch`: process pending `AnalysisJob` records, run ML inference
|
|
- `hometitle.scan`: query all watched properties, run county scans (task 18), detect changes
|
|
- `removebrokers.process`: query PENDING requests, attempt removal, update status
|
|
- `reports.generate`: compile data, render HTML, generate PDF, save to storage
|
|
- `notifications.send`: send queued email/push/SMS via notification service (task 14)
|
|
5. Create `web/src/server/jobs/scheduler.ts`:
|
|
- `registerSchedules()` — read `SchedulerConfig` from DB, create cron jobs
|
|
- `scheduleForSubscription(subscription)` — create tier-appropriate schedules
|
|
- `removeSchedulesForSubscription(subscriptionId)` — clean up on cancellation
|
|
6. Create `web/src/server/jobs/index.ts`:
|
|
- Initialize Redis connection
|
|
- Start scheduler
|
|
- Start worker(s)
|
|
- Handle SIGINT/SIGTERM for graceful shutdown
|
|
7. Integrate with tRPC:
|
|
- Add `scheduler.runJobNow` admin procedure for manual job triggering
|
|
- Add `scheduler.getJobStatus` procedure for checking job progress
|
|
8. Write unit tests for handlers with mocked services.
|
|
|
|
steps:
|
|
- Unit: `enqueue` adds job to queue with correct payload
|
|
- Unit: `processJob` dispatches to correct handler
|
|
- Unit: `darkwatch.scan` handler queries watchlists and triggers scans
|
|
- Unit: Retry logic attempts job 3 times with correct delays
|
|
- Integration: Scheduled job runs and creates expected DB records
|
|
|
|
acceptance_criteria:
|
|
- [ ] Jobs can be enqueued with type-specific payloads
|
|
- [ ] Worker processes jobs and dispatches to correct handler
|
|
- [ ] Failed jobs are retried up to 3 times with exponential backoff
|
|
- [ ] Scheduled scans run at tier-appropriate frequencies
|
|
- [ ] Manual job triggering works via admin API
|
|
- [ ] Job status is queryable (pending, running, completed, failed)
|
|
- [ ] Graceful shutdown completes in-flight jobs before exiting
|
|
- [ ] All job handlers are tested with mocked dependencies
|
|
|
|
validation:
|
|
- Enqueue a test job and verify it appears in queue
|
|
- Start worker and verify job is processed
|
|
- Cause a handler to throw and verify retry behavior
|
|
- Verify scheduled jobs are created for a Premium subscription
|
|
- Run `cd web && pnpm test` for job unit tests
|
|
|
|
notes:
|
|
- Reference legacy: `packages/jobs/src/`, `services/*/src/scheduler.service.ts`
|
|
- Redis is required for BullMQ. For local development without Redis, implement a simple in-memory queue fallback.
|
|
- The worker can run in the same process as the web server (dev) or as a separate process (production). Design for both.
|
|
- Job handlers should be idempotent: running the same job twice should not create duplicates.
|
|
- Consider using `bullmq`'s built-in cron support instead of `node-cron` for better reliability.
|
|
- Monitor queue depth and job failure rates. Add alerting if queue grows unexpectedly.
|
|
- For high-volume operations (e.g., scanning thousands of watchlist items), consider batching jobs or using worker concurrency.
|