Add tier-based scan scheduler and webhook triggers (FRE-4498)

- ScanScheduler: tier-based scheduling (BASIC=24h, PLUS=6h, PREMIUM=1h)
- WebhookHandler: HMAC-verified webhook ingestion with SCAN_TRIGGER support
- API routes: /scheduler and /webhooks endpoints under /api/v1/darkwatch
- Jobs: scheduled scan checker + webhook retry processor via BullMQ
- Schema: ScanSchedule, WebhookEvent models; ScanJob.scheduledBy field
- Types: ScheduleStatus, WebhookEventType, WebhookTriggerInput
- Tests: scheduler lifecycle + webhook signature/processing tests

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-30 10:57:56 -04:00
parent 76d431e1ec
commit 9fb5379b7a
43 changed files with 7819 additions and 93 deletions

View File

@@ -3,3 +3,5 @@ export * from "./hibp/HIBPService";
export * from "./matching/MatchingEngine";
export * from "./alerts/AlertPipeline";
export * from "./scanner/ScanService";
export * from "./scheduler/ScanScheduler";
export * from "./webhooks/WebhookHandler";

View File

@@ -0,0 +1,168 @@
import prisma from "@shieldai/db";
import { SubscriptionTier } from "@shieldai/types";
const TIER_CONFIG = {
[SubscriptionTier.BASIC]: { intervalMinutes: 1440, cron: "0 0 * * *" },
[SubscriptionTier.PLUS]: { intervalMinutes: 360, cron: "0 */6 * * *" },
[SubscriptionTier.PREMIUM]: { intervalMinutes: 60, cron: "0 * * * *" },
} as const;
export class ScanScheduler {
/**
* Get the scan interval (in minutes) for a given subscription tier.
*/
public static getIntervalForTier(tier: SubscriptionTier): number {
return TIER_CONFIG[tier]?.intervalMinutes ?? TIER_CONFIG[SubscriptionTier.BASIC].intervalMinutes;
}
/**
* Get the cron expression for a given subscription tier.
*/
public static getCronForTier(tier: SubscriptionTier): string {
return TIER_CONFIG[tier]?.cron ?? TIER_CONFIG[SubscriptionTier.BASIC].cron;
}
/**
* Ensure a user has an active scan schedule based on their subscription tier.
* Creates or updates the schedule record.
*/
async ensureScheduleForUser(userId: string): Promise<{ scheduled: boolean; intervalMinutes: number }> {
const user = await prisma.user.findUnique({
where: { id: userId },
select: { subscriptionTier: true },
});
if (!user) {
return { scheduled: false, intervalMinutes: 0 };
}
const tier = user.subscriptionTier ?? SubscriptionTier.BASIC;
const config = TIER_CONFIG[tier];
const nextScan = this.calculateNextScan();
const schedule = await prisma.scanSchedule.upsert({
where: { userId },
update: {
intervalMinutes: config.intervalMinutes,
cronExpression: config.cron,
nextScanAt: nextScan,
},
create: {
userId,
intervalMinutes: config.intervalMinutes,
cronExpression: config.cron,
status: "ACTIVE",
nextScanAt: nextScan,
},
});
return {
scheduled: schedule.status === "ACTIVE",
intervalMinutes: schedule.intervalMinutes,
};
}
/**
* Get all active schedules that are due for scanning.
*/
async getDueSchedules(): Promise<Array<{ userId: string; intervalMinutes: number; cronExpression: string }>> {
const now = new Date();
const due = await prisma.scanSchedule.findMany({
where: {
status: "ACTIVE",
OR: [
{ nextScanAt: { lte: now } },
{ nextScanAt: null },
],
},
select: {
userId: true,
intervalMinutes: true,
cronExpression: true,
},
});
return due;
}
/**
* Mark a schedule as scanned and compute the next scan time.
*/
async markScanned(userId: string): Promise<Date> {
const schedule = await prisma.scanSchedule.findUnique({ where: { userId } });
if (!schedule) {
throw new Error(`ScanSchedule not found for user ${userId}`);
}
const nextScan = this.calculateNextScan(schedule.intervalMinutes);
await prisma.scanSchedule.update({
where: { userId },
data: {
lastScanAt: new Date(),
nextScanAt: nextScan,
},
});
return nextScan;
}
/**
* Pause scheduling for a user (e.g., on subscription downgrade or pause).
*/
async pauseSchedule(userId: string): Promise<void> {
await prisma.scanSchedule.updateMany({
where: { userId, status: "ACTIVE" },
data: { status: "PAUSED" },
});
}
/**
* Resume scheduling for a user and recalculate based on current tier.
*/
async resumeSchedule(userId: string): Promise<void> {
await this.ensureScheduleForUser(userId);
}
/**
* Get the current schedule for a user.
*/
async getSchedule(userId: string) {
return prisma.scanSchedule.findUnique({
where: { userId },
});
}
/**
* List all active schedules (for admin/monitoring).
*/
async listActiveSchedules(limit = 100, offset = 0) {
return prisma.scanSchedule.findMany({
where: { status: "ACTIVE" },
include: {
user: {
select: {
id: true,
email: true,
subscriptionTier: true,
},
},
},
orderBy: { nextScanAt: "asc" },
take: limit,
skip: offset,
});
}
/**
* Calculate the next scan time based on interval.
*/
private calculateNextScan(intervalMinutes?: number): Date {
const minutes = intervalMinutes ?? 60;
const next = new Date();
next.setMinutes(next.getMinutes() + minutes);
return next;
}
}

View File

@@ -0,0 +1,193 @@
import prisma from "@shieldai/db";
import { createHmac, timingSafeEqual } from "crypto";
import { DataSource, WebhookEventType } from "@shieldai/types";
export class WebhookHandler {
private secret: string;
constructor(secret?: string) {
this.secret = secret || process.env.WEBHOOK_SECRET || "default-webhook-secret";
}
/**
* Verify HMAC signature of incoming webhook payload.
*/
verifySignature(payload: string, signature: string | string[]): boolean {
if (!signature) return false;
const sigArray = Array.isArray(signature) ? signature : [signature];
const expected = this.computeSignature(payload);
for (const sig of sigArray) {
if (timingSafeEqual(Buffer.from(sig), Buffer.from(expected))) {
return true;
}
}
return false;
}
/**
* Process an incoming webhook event.
* Validates, stores, and triggers appropriate action.
*/
async processEvent(
eventType: string,
payload: Record<string, unknown>,
source?: string,
signature?: string
): Promise<{ eventId: string; scanTriggered: boolean }> {
const payloadStr = JSON.stringify(payload);
if (signature && !this.verifySignature(payloadStr, signature)) {
throw new Error("Webhook signature verification failed");
}
const eventTypeNormalized = this.normalizeEventType(eventType);
const event = await prisma.webhookEvent.create({
data: {
eventType: eventTypeNormalized,
payload: payloadStr,
source,
signature,
},
});
let scanTriggered = false;
if (eventTypeNormalized === WebhookEventType.SCAN_TRIGGER) {
const userId = payload.userId as string | undefined;
const source = (payload.dataSource as string) || undefined;
if (userId) {
scanTriggered = await this.triggerScanFromWebhook(event.id, userId, source);
}
}
await prisma.webhookEvent.update({
where: { id: event.id },
data: {
processed: true,
processedAt: new Date(),
},
});
return { eventId: event.id, scanTriggered };
}
/**
* Trigger a scan job from a webhook event.
*/
private async triggerScanFromWebhook(
eventId: string,
userId: string,
dataSource?: string
): Promise<boolean> {
try {
const user = await prisma.user.findUnique({ where: { id: userId } });
if (!user) {
return false;
}
const job = await prisma.scanJob.create({
data: {
userId,
status: "PENDING",
source: (dataSource as DataSource) || undefined,
scheduledBy: "webhook",
},
});
await prisma.webhookEvent.update({
where: { id: eventId },
data: { scanJobId: job.id },
});
return true;
} catch (err) {
console.error(`[Webhook] Scan trigger failed for event ${eventId}:`, err);
return false;
}
}
/**
* Get webhook event history.
*/
async getEventHistory(limit = 50, offset = 0) {
return prisma.webhookEvent.findMany({
orderBy: { createdAt: "desc" },
take: limit,
skip: offset,
include: { scanJob: true },
});
}
/**
* Get events for a specific user (via linked scan jobs).
*/
async getUserEvents(userId: string, limit = 50, offset = 0) {
return prisma.webhookEvent.findMany({
where: {
scanJob: { userId },
},
orderBy: { createdAt: "desc" },
take: limit,
skip: offset,
});
}
/**
* Process unprocessed webhook events (retry mechanism).
*/
async processPendingEvents(): Promise<number> {
const pending = await prisma.webhookEvent.findMany({
where: {
processed: false,
eventType: WebhookEventType.SCAN_TRIGGER,
},
orderBy: { createdAt: "asc" },
take: 50,
});
let processed = 0;
for (const event of pending) {
try {
const payload = JSON.parse(event.payload) as Record<string, unknown>;
const userId = payload.userId as string | undefined;
if (userId) {
const success = await this.triggerScanFromWebhook(
event.id,
userId,
payload.dataSource as string | undefined
);
if (success) {
await prisma.webhookEvent.update({
where: { id: event.id },
data: { processed: true, processedAt: new Date() },
});
processed++;
}
}
} catch (err) {
console.error(`[Webhook] Retry failed for event ${event.id}:`, err);
}
}
return processed;
}
private computeSignature(payload: string): string {
return createHmac("sha256", this.secret).update(payload).digest("hex");
}
private normalizeEventType(eventType: string): WebhookEventType {
const upper = eventType.toUpperCase().replace(/\s+/g, "_");
const validTypes: WebhookEventType[] = [WebhookEventType.SCAN_TRIGGER, WebhookEventType.BREACH_DETECTED, WebhookEventType.SUBSCRIPTION_CHANGE];
return validTypes.includes(upper as WebhookEventType) ? (upper as WebhookEventType) : WebhookEventType.SCAN_TRIGGER;
}
}