- ScanScheduler: tier-based scheduling (BASIC=24h, PLUS=6h, PREMIUM=1h) - WebhookHandler: HMAC-verified webhook ingestion with SCAN_TRIGGER support - API routes: /scheduler and /webhooks endpoints under /api/v1/darkwatch - Jobs: scheduled scan checker + webhook retry processor via BullMQ - Schema: ScanSchedule, WebhookEvent models; ScanJob.scheduledBy field - Types: ScheduleStatus, WebhookEventType, WebhookTriggerInput - Tests: scheduler lifecycle + webhook signature/processing tests Co-Authored-By: Paperclip <noreply@paperclip.ing>
169 lines
4.4 KiB
TypeScript
169 lines
4.4 KiB
TypeScript
import prisma from "@shieldai/db";
|
|
import { SubscriptionTier } from "@shieldai/types";
|
|
|
|
const TIER_CONFIG = {
|
|
[SubscriptionTier.BASIC]: { intervalMinutes: 1440, cron: "0 0 * * *" },
|
|
[SubscriptionTier.PLUS]: { intervalMinutes: 360, cron: "0 */6 * * *" },
|
|
[SubscriptionTier.PREMIUM]: { intervalMinutes: 60, cron: "0 * * * *" },
|
|
} as const;
|
|
|
|
export class ScanScheduler {
|
|
/**
|
|
* Get the scan interval (in minutes) for a given subscription tier.
|
|
*/
|
|
public static getIntervalForTier(tier: SubscriptionTier): number {
|
|
return TIER_CONFIG[tier]?.intervalMinutes ?? TIER_CONFIG[SubscriptionTier.BASIC].intervalMinutes;
|
|
}
|
|
|
|
/**
|
|
* Get the cron expression for a given subscription tier.
|
|
*/
|
|
public static getCronForTier(tier: SubscriptionTier): string {
|
|
return TIER_CONFIG[tier]?.cron ?? TIER_CONFIG[SubscriptionTier.BASIC].cron;
|
|
}
|
|
|
|
/**
|
|
* Ensure a user has an active scan schedule based on their subscription tier.
|
|
* Creates or updates the schedule record.
|
|
*/
|
|
async ensureScheduleForUser(userId: string): Promise<{ scheduled: boolean; intervalMinutes: number }> {
|
|
const user = await prisma.user.findUnique({
|
|
where: { id: userId },
|
|
select: { subscriptionTier: true },
|
|
});
|
|
|
|
if (!user) {
|
|
return { scheduled: false, intervalMinutes: 0 };
|
|
}
|
|
|
|
const tier = user.subscriptionTier ?? SubscriptionTier.BASIC;
|
|
const config = TIER_CONFIG[tier];
|
|
const nextScan = this.calculateNextScan();
|
|
|
|
const schedule = await prisma.scanSchedule.upsert({
|
|
where: { userId },
|
|
update: {
|
|
intervalMinutes: config.intervalMinutes,
|
|
cronExpression: config.cron,
|
|
nextScanAt: nextScan,
|
|
},
|
|
create: {
|
|
userId,
|
|
intervalMinutes: config.intervalMinutes,
|
|
cronExpression: config.cron,
|
|
status: "ACTIVE",
|
|
nextScanAt: nextScan,
|
|
},
|
|
});
|
|
|
|
return {
|
|
scheduled: schedule.status === "ACTIVE",
|
|
intervalMinutes: schedule.intervalMinutes,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get all active schedules that are due for scanning.
|
|
*/
|
|
async getDueSchedules(): Promise<Array<{ userId: string; intervalMinutes: number; cronExpression: string }>> {
|
|
const now = new Date();
|
|
|
|
const due = await prisma.scanSchedule.findMany({
|
|
where: {
|
|
status: "ACTIVE",
|
|
OR: [
|
|
{ nextScanAt: { lte: now } },
|
|
{ nextScanAt: null },
|
|
],
|
|
},
|
|
select: {
|
|
userId: true,
|
|
intervalMinutes: true,
|
|
cronExpression: true,
|
|
},
|
|
});
|
|
|
|
return due;
|
|
}
|
|
|
|
/**
|
|
* Mark a schedule as scanned and compute the next scan time.
|
|
*/
|
|
async markScanned(userId: string): Promise<Date> {
|
|
const schedule = await prisma.scanSchedule.findUnique({ where: { userId } });
|
|
|
|
if (!schedule) {
|
|
throw new Error(`ScanSchedule not found for user ${userId}`);
|
|
}
|
|
|
|
const nextScan = this.calculateNextScan(schedule.intervalMinutes);
|
|
|
|
await prisma.scanSchedule.update({
|
|
where: { userId },
|
|
data: {
|
|
lastScanAt: new Date(),
|
|
nextScanAt: nextScan,
|
|
},
|
|
});
|
|
|
|
return nextScan;
|
|
}
|
|
|
|
/**
|
|
* Pause scheduling for a user (e.g., on subscription downgrade or pause).
|
|
*/
|
|
async pauseSchedule(userId: string): Promise<void> {
|
|
await prisma.scanSchedule.updateMany({
|
|
where: { userId, status: "ACTIVE" },
|
|
data: { status: "PAUSED" },
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Resume scheduling for a user and recalculate based on current tier.
|
|
*/
|
|
async resumeSchedule(userId: string): Promise<void> {
|
|
await this.ensureScheduleForUser(userId);
|
|
}
|
|
|
|
/**
|
|
* Get the current schedule for a user.
|
|
*/
|
|
async getSchedule(userId: string) {
|
|
return prisma.scanSchedule.findUnique({
|
|
where: { userId },
|
|
});
|
|
}
|
|
|
|
/**
|
|
* List all active schedules (for admin/monitoring).
|
|
*/
|
|
async listActiveSchedules(limit = 100, offset = 0) {
|
|
return prisma.scanSchedule.findMany({
|
|
where: { status: "ACTIVE" },
|
|
include: {
|
|
user: {
|
|
select: {
|
|
id: true,
|
|
email: true,
|
|
subscriptionTier: true,
|
|
},
|
|
},
|
|
},
|
|
orderBy: { nextScanAt: "asc" },
|
|
take: limit,
|
|
skip: offset,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Calculate the next scan time based on interval.
|
|
*/
|
|
private calculateNextScan(intervalMinutes?: number): Date {
|
|
const minutes = intervalMinutes ?? 60;
|
|
const next = new Date();
|
|
next.setMinutes(next.getMinutes() + minutes);
|
|
return next;
|
|
}
|
|
}
|