FRE-4471: Scaffold DarkWatch MVP — monorepo, schema, services, API routes, tests
- Turborepo monorepo structure (packages: api, db, types, jobs; services: darkwatch) - Prisma schema: User, WatchListItem, Exposure, Alert, ScanJob models - WatchListService: CRUD with normalization, dedup, tier-based limits - HIBPService: API integration with severity scoring - MatchingEngine: exact-match with content hash dedup - AlertPipeline: dedup window, email notifications - ScanService: orchestrates watch list -> HIBP -> match -> alert flow - BullMQ job workers for scan and alert processing - Fastify API routes: watchlist, exposures, alerts, scan - Docker Compose: PostgreSQL 16 + Redis 7 - 15 unit tests passing - Implementation plan document uploaded
This commit is contained in:
142
services/darkwatch/src/alerts/AlertPipeline.ts
Normal file
142
services/darkwatch/src/alerts/AlertPipeline.ts
Normal file
@@ -0,0 +1,142 @@
|
||||
import prisma from "@shieldai/db";
|
||||
import { AlertChannel, AlertStatus, Severity } from "@shieldai/types";
|
||||
import { createHash } from "crypto";
|
||||
import NodeCache from "node-cache";
|
||||
|
||||
export class AlertPipeline {
|
||||
private cache: NodeCache;
|
||||
private dedupWindowMs = 24 * 60 * 60 * 1000;
|
||||
|
||||
constructor() {
|
||||
this.cache = new NodeCache({ stdTTL: 3600, checkperiod: 600 });
|
||||
}
|
||||
|
||||
async createAlert(
|
||||
userId: string,
|
||||
exposureId: string,
|
||||
severity: Severity,
|
||||
channel: AlertChannel = AlertChannel.EMAIL
|
||||
): Promise<boolean> {
|
||||
const dedupKey = this.computeDedupKey(userId, exposureId);
|
||||
|
||||
const cached = this.cache.get(dedupKey);
|
||||
if (cached) return false;
|
||||
|
||||
const existing = await prisma.alert.findFirst({
|
||||
where: { dedupKey, status: AlertStatus.SENT },
|
||||
});
|
||||
|
||||
if (existing) return false;
|
||||
|
||||
await prisma.alert.create({
|
||||
data: {
|
||||
userId,
|
||||
exposureId,
|
||||
severity,
|
||||
channel,
|
||||
status: AlertStatus.PENDING,
|
||||
dedupKey,
|
||||
},
|
||||
});
|
||||
|
||||
this.cache.set(dedupKey, true, this.dedupWindowMs / 1000);
|
||||
return true;
|
||||
}
|
||||
|
||||
async sendPendingAlerts(): Promise<number> {
|
||||
const pending = await prisma.alert.findMany({
|
||||
where: { status: AlertStatus.PENDING },
|
||||
include: { user: true, exposure: true },
|
||||
orderBy: { createdAt: "asc" },
|
||||
take: 100,
|
||||
});
|
||||
|
||||
let sent = 0;
|
||||
for (const alert of pending) {
|
||||
try {
|
||||
await this.sendNotification(alert);
|
||||
await prisma.alert.update({
|
||||
where: { id: alert.id },
|
||||
data: { status: AlertStatus.SENT, sentAt: new Date() },
|
||||
});
|
||||
sent++;
|
||||
} catch (err) {
|
||||
console.error(`Alert send failed for ${alert.id}:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
return sent;
|
||||
}
|
||||
|
||||
async markRead(alertId: string, userId: string): Promise<void> {
|
||||
await prisma.alert.updateMany({
|
||||
where: { id: alertId, userId },
|
||||
data: { status: AlertStatus.READ },
|
||||
});
|
||||
}
|
||||
|
||||
async getUserAlerts(userId: string, limit = 50, offset = 0) {
|
||||
return prisma.alert.findMany({
|
||||
where: { userId },
|
||||
include: { exposure: true },
|
||||
orderBy: { createdAt: "desc" },
|
||||
take: limit,
|
||||
skip: offset,
|
||||
});
|
||||
}
|
||||
|
||||
async countUnread(userId: string): Promise<number> {
|
||||
return prisma.alert.count({
|
||||
where: { userId, status: { in: [AlertStatus.PENDING, AlertStatus.SENT] } },
|
||||
});
|
||||
}
|
||||
|
||||
private async sendNotification(alert: {
|
||||
id: string;
|
||||
userId: string;
|
||||
severity: Severity;
|
||||
channel: AlertChannel;
|
||||
user: { email: string; name: string | null };
|
||||
exposure: { breachName: string; exposedAt: Date; dataType: string[] };
|
||||
}): Promise<void> {
|
||||
switch (alert.channel) {
|
||||
case AlertChannel.EMAIL:
|
||||
await this.sendEmail(alert);
|
||||
break;
|
||||
case AlertChannel.PUSH:
|
||||
console.log(`[Push] Alert ${alert.id} for user ${alert.userId}`);
|
||||
break;
|
||||
case AlertChannel.SMS:
|
||||
console.log(`[SMS] Alert ${alert.id} for user ${alert.userId}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private async sendEmail(alert: {
|
||||
user: { email: string; name: string | null };
|
||||
severity: Severity;
|
||||
exposure: { breachName: string; exposedAt: Date; dataType: string[] };
|
||||
}): Promise<void> {
|
||||
const subject = `[DarkWatch] ${alert.severity} Exposure Detected`;
|
||||
const body = `
|
||||
Dear ${alert.user.name || "User"},
|
||||
|
||||
A new data exposure has been detected:
|
||||
|
||||
Breach: ${alert.exposure.breachName}
|
||||
Date: ${alert.exposure.exposedAt.toISOString().split("T")[0]}
|
||||
Severity: ${alert.severity}
|
||||
Data Types: ${alert.exposure.dataType.join(", ")}
|
||||
|
||||
Login to your DarkWatch dashboard for details.
|
||||
|
||||
— ShieldAI Team
|
||||
`.trim();
|
||||
|
||||
console.log(`[Email] To: ${alert.user.email}, Subject: ${subject}`);
|
||||
}
|
||||
|
||||
computeDedupKey(userId: string, exposureId: string): string {
|
||||
return createHash("sha256").update(`${userId}:${exposureId}`).digest("hex");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user