From 4f7882a10d7520393912ac93f0763763db696530 Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Mon, 25 May 2026 16:55:31 -0400 Subject: [PATCH] feat: add alert correlation & normalization engine with tRPC router Implement the cross-service alert correlation and normalization engine: - correlation router with 6 procedures: getAlerts, getAlertDetails, getGroups, getGroupDetails, resolveAlert, getStats - correlation service with normalizeAlert, correlateAlerts, getAlertTimeline, resolveAlert, getThreatScore, getAlertStats - correlation engine with findRelatedAlerts, createCorrelationGroup, updateGroupSeverity, deduplicateAlerts - alert normalizer with service-specific converters for DarkWatch, SpamShield, VoicePrint, HomeTitle, and RemoveBrokers - Entity extraction (emails, phones, SSNs) and threat scoring with severity-weighted decay over 30-day window - 52 unit tests across engine, service, normalizer, and router --- web/src/server/api/root.ts | 2 + .../server/api/routers/correlation.test.ts | 225 +++++++++ web/src/server/api/routers/correlation.ts | 46 ++ web/src/server/api/schemas/correlation.ts | 36 ++ .../services/correlation.service.test.ts | 185 +++++++ .../server/services/correlation.service.ts | 472 ++++++++++++++++++ .../services/correlation/engine.test.ts | 257 ++++++++++ web/src/server/services/correlation/engine.ts | 127 +++++ .../services/correlation/normalizer.test.ts | 164 ++++++ .../server/services/correlation/normalizer.ts | 148 ++++++ 10 files changed, 1662 insertions(+) create mode 100644 web/src/server/api/routers/correlation.test.ts create mode 100644 web/src/server/api/routers/correlation.ts create mode 100644 web/src/server/api/schemas/correlation.ts create mode 100644 web/src/server/services/correlation.service.test.ts create mode 100644 web/src/server/services/correlation.service.ts create mode 100644 web/src/server/services/correlation/engine.test.ts create mode 100644 web/src/server/services/correlation/engine.ts create mode 100644 web/src/server/services/correlation/normalizer.test.ts create mode 100644 web/src/server/services/correlation/normalizer.ts diff --git a/web/src/server/api/root.ts b/web/src/server/api/root.ts index d130ef6..57e09c5 100644 --- a/web/src/server/api/root.ts +++ b/web/src/server/api/root.ts @@ -7,6 +7,7 @@ import { voiceprintRouter } from "./routers/voiceprint"; import { spamshieldRouter } from "./routers/spamshield"; import { hometitleRouter } from "./routers/hometitle"; import { removebrokersRouter } from "./routers/removebrokers"; +import { correlationRouter } from "./routers/correlation"; import { createTRPCRouter } from "./utils"; export const appRouter = createTRPCRouter({ @@ -19,6 +20,7 @@ export const appRouter = createTRPCRouter({ spamshield: spamshieldRouter, hometitle: hometitleRouter, removebrokers: removebrokersRouter, + correlation: correlationRouter, }); export type AppRouter = typeof appRouter; diff --git a/web/src/server/api/routers/correlation.test.ts b/web/src/server/api/routers/correlation.test.ts new file mode 100644 index 0000000..e3204ad --- /dev/null +++ b/web/src/server/api/routers/correlation.test.ts @@ -0,0 +1,225 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { initTRPC, TRPCError } from "@trpc/server"; +import { wrap } from "@typeschema/valibot"; +import { + AlertFilterSchema, + AlertDetailsSchema, + GroupFilterSchema, + GroupDetailsSchema, + ResolveAlertSchema, +} from "../schemas/correlation"; + +vi.mock("~/server/services/correlation.service", () => ({ + getAlertTimeline: vi.fn(), + getAlertDetails: vi.fn(), + getCorrelationGroups: vi.fn(), + getCorrelationGroupDetails: vi.fn(), + resolveAlert: vi.fn(), + getAlertStats: vi.fn(), +})); + +import * as correlationService from "~/server/services/correlation.service"; + +const mockGetAlertTimeline = vi.mocked(correlationService.getAlertTimeline); +const mockGetAlertDetails = vi.mocked(correlationService.getAlertDetails); +const mockGetCorrelationGroups = vi.mocked(correlationService.getCorrelationGroups); +const mockGetCorrelationGroupDetails = vi.mocked(correlationService.getCorrelationGroupDetails); +const mockResolveAlert = vi.mocked(correlationService.resolveAlert); +const mockGetAlertStats = vi.mocked(correlationService.getAlertStats); + +type User = { + id: string; email: string; name: string | null; image: string | null; + role: string; emailVerified: Date | null; deletedAt: Date | null; + stripeCustomerId: string | null; + createdAt: Date; updatedAt: Date; +}; +type Ctx = { db: object; user: User | null; apiKey: string | null }; + +function createCaller(user: User | null) { + const t = initTRPC.context().create(); + const isAuthed = t.middleware(({ ctx, next }) => { + if (!ctx.user) throw new TRPCError({ code: "UNAUTHORIZED" }); + return next({ ctx: { ...ctx, user: ctx.user } }); + }); + + const router = t.router({ + getAlerts: t.procedure.use(isAuthed) + .input(wrap(AlertFilterSchema)) + .query(async ({ ctx, input }) => { + return mockGetAlertTimeline(ctx.user.id, input); + }), + getAlertDetails: t.procedure.use(isAuthed) + .input(wrap(AlertDetailsSchema)) + .query(async ({ ctx, input }) => { + return mockGetAlertDetails(ctx.user.id, input.alertId); + }), + getGroups: t.procedure.use(isAuthed) + .input(wrap(GroupFilterSchema)) + .query(async ({ ctx, input }) => { + return mockGetCorrelationGroups(ctx.user.id, input); + }), + getGroupDetails: t.procedure.use(isAuthed) + .input(wrap(GroupDetailsSchema)) + .query(async ({ ctx, input }) => { + return mockGetCorrelationGroupDetails(ctx.user.id, input.groupId); + }), + resolveAlert: t.procedure.use(isAuthed) + .input(wrap(ResolveAlertSchema)) + .mutation(async ({ ctx, input }) => { + return mockResolveAlert(ctx.user.id, input.alertId, input.resolution); + }), + getStats: t.procedure.use(isAuthed).query(async ({ ctx }) => { + return mockGetAlertStats(ctx.user.id); + }), + }); + + const caller = t.createCallerFactory(router); + return caller({ db: {} as never, user, apiKey: null }); +} + +const baseUser: User = { + id: "user-1", email: "a@b.com", name: "Test", image: null, + role: "user", emailVerified: null, deletedAt: null, + stripeCustomerId: null, + createdAt: new Date(), updatedAt: new Date(), +}; + +function makeUser(overrides: Partial = {}): User { + return { ...baseUser, ...overrides }; +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("correlation.getAlerts", () => { + it("returns alert timeline for authenticated user", async () => { + const data = { + items: [{ id: "a1", source: "DARKWATCH", severity: "HIGH" }], + total: 1, page: 1, limit: 20, totalPages: 1, + }; + mockGetAlertTimeline.mockResolvedValue(data as never); + const api = createCaller(makeUser()); + const result = await api.getAlerts({}); + expect(result.items).toHaveLength(1); + expect(result.total).toBe(1); + }); + + it("rejects unauthenticated requests", async () => { + const api = createCaller(null); + await expect(api.getAlerts({})).rejects.toThrow(TRPCError); + }); + + it("passes filters to service", async () => { + mockGetAlertTimeline.mockResolvedValue({ + items: [], total: 0, page: 1, limit: 20, totalPages: 0, + } as never); + const api = createCaller(makeUser()); + await api.getAlerts({ source: "DARKWATCH", severity: "HIGH", page: 1, limit: 10 }); + expect(mockGetAlertTimeline).toHaveBeenCalledWith( + "user-1", + { source: "DARKWATCH", severity: "HIGH", page: 1, limit: 10 }, + ); + }); + + it("rejects invalid source value", async () => { + const api = createCaller(makeUser()); + await expect( + api.getAlerts({ source: "INVALID" as never }), + ).rejects.toThrow(); + }); +}); + +describe("correlation.getAlertDetails", () => { + it("returns alert details", async () => { + const data = { alert: { id: "a1" }, group: null, relatedAlerts: [] }; + mockGetAlertDetails.mockResolvedValue(data as never); + const api = createCaller(makeUser()); + const result = await api.getAlertDetails({ alertId: "a1" }); + expect(result.alert.id).toBe("a1"); + }); +}); + +describe("correlation.getGroups", () => { + it("returns correlation groups", async () => { + const data = { + items: [{ id: "g1", highestSeverity: "CRITICAL" }], + total: 1, page: 1, limit: 20, totalPages: 1, + }; + mockGetCorrelationGroups.mockResolvedValue(data as never); + const api = createCaller(makeUser()); + const result = await api.getGroups({}); + expect(result.items).toHaveLength(1); + }); + + it("filters by status", async () => { + mockGetCorrelationGroups.mockResolvedValue({ + items: [], total: 0, page: 1, limit: 20, totalPages: 0, + } as never); + const api = createCaller(makeUser()); + await api.getGroups({ status: "ACTIVE" }); + expect(mockGetCorrelationGroups).toHaveBeenCalledWith( + "user-1", + { status: "ACTIVE", page: 1, limit: 20 }, + ); + }); +}); + +describe("correlation.getGroupDetails", () => { + it("returns group details with alerts", async () => { + const data = { + group: { id: "g1", highestSeverity: "HIGH" }, + alerts: [{ id: "a1" }, { id: "a2" }], + }; + mockGetCorrelationGroupDetails.mockResolvedValue(data as never); + const api = createCaller(makeUser()); + const result = await api.getGroupDetails({ groupId: "g1" }); + expect(result.group.id).toBe("g1"); + expect(result.alerts).toHaveLength(2); + }); +}); + +describe("correlation.resolveAlert", () => { + it("resolves an alert", async () => { + const data = { id: "g1", status: "RESOLVED" }; + mockResolveAlert.mockResolvedValue(data as never); + const api = createCaller(makeUser()); + const result = await api.resolveAlert({ alertId: "a1", resolution: "RESOLVED" }); + expect(result.status).toBe("RESOLVED"); + }); + + it("marks as false positive", async () => { + const data = { id: "g1", status: "FALSE_POSITIVE" }; + mockResolveAlert.mockResolvedValue(data as never); + const api = createCaller(makeUser()); + const result = await api.resolveAlert({ alertId: "a1", resolution: "FALSE_POSITIVE" }); + expect(result.status).toBe("FALSE_POSITIVE"); + }); + + it("rejects invalid resolution", async () => { + const api = createCaller(makeUser()); + await expect( + api.resolveAlert({ alertId: "a1", resolution: "INVALID" as never }), + ).rejects.toThrow(); + }); +}); + +describe("correlation.getStats", () => { + it("returns alert statistics", async () => { + const stats = { + totalAlerts: 10, + bySeverity: { HIGH: 5, LOW: 5 }, + bySource: { DARKWATCH: 10 }, + activeGroups: 2, + resolvedCount: 1, + falsePositiveCount: 0, + threatScore: 45, + threatBreakdown: [{ source: "DARKWATCH", score: 45 }], + }; + mockGetAlertStats.mockResolvedValue(stats as never); + const api = createCaller(makeUser()); + const result = await api.getStats(); + expect(result.totalAlerts).toBe(10); + expect(result.threatScore).toBe(45); + }); +}); diff --git a/web/src/server/api/routers/correlation.ts b/web/src/server/api/routers/correlation.ts new file mode 100644 index 0000000..cef6c89 --- /dev/null +++ b/web/src/server/api/routers/correlation.ts @@ -0,0 +1,46 @@ +import { wrap } from "@typeschema/valibot"; +import { createTRPCRouter, protectedProcedure } from "../utils"; +import { + AlertFilterSchema, + AlertDetailsSchema, + GroupFilterSchema, + GroupDetailsSchema, + ResolveAlertSchema, +} from "../schemas/correlation"; +import * as correlationService from "~/server/services/correlation.service"; + +export const correlationRouter = createTRPCRouter({ + getAlerts: protectedProcedure + .input(wrap(AlertFilterSchema)) + .query(async ({ ctx, input }) => { + return correlationService.getAlertTimeline(ctx.user.id, input); + }), + + getAlertDetails: protectedProcedure + .input(wrap(AlertDetailsSchema)) + .query(async ({ ctx, input }) => { + return correlationService.getAlertDetails(ctx.user.id, input.alertId); + }), + + getGroups: protectedProcedure + .input(wrap(GroupFilterSchema)) + .query(async ({ ctx, input }) => { + return correlationService.getCorrelationGroups(ctx.user.id, input); + }), + + getGroupDetails: protectedProcedure + .input(wrap(GroupDetailsSchema)) + .query(async ({ ctx, input }) => { + return correlationService.getCorrelationGroupDetails(ctx.user.id, input.groupId); + }), + + resolveAlert: protectedProcedure + .input(wrap(ResolveAlertSchema)) + .mutation(async ({ ctx, input }) => { + return correlationService.resolveAlert(ctx.user.id, input.alertId, input.resolution); + }), + + getStats: protectedProcedure.query(async ({ ctx }) => { + return correlationService.getAlertStats(ctx.user.id); + }), +}); diff --git a/web/src/server/api/schemas/correlation.ts b/web/src/server/api/schemas/correlation.ts new file mode 100644 index 0000000..412f369 --- /dev/null +++ b/web/src/server/api/schemas/correlation.ts @@ -0,0 +1,36 @@ +import { object, string, minLength, optional, number, picklist } from "valibot"; + +export const AlertFilterSchema = object({ + source: optional( + picklist(["DARKWATCH", "SPAMSHIELD", "VOICEPRINT", "CALL_ANALYSIS", "HOME_TITLE", "INFO_BROKER"]), + ), + severity: optional( + picklist(["LOW", "INFO", "MEDIUM", "WARNING", "HIGH", "CRITICAL"]), + ), + status: optional( + picklist(["ACTIVE", "RESOLVED", "FALSE_POSITIVE"]), + ), + page: optional(number(), 1), + limit: optional(number(), 20), +}); + +export const AlertDetailsSchema = object({ + alertId: string([minLength(1)]), +}); + +export const GroupFilterSchema = object({ + status: optional( + picklist(["ACTIVE", "RESOLVED", "FALSE_POSITIVE"]), + ), + page: optional(number(), 1), + limit: optional(number(), 20), +}); + +export const GroupDetailsSchema = object({ + groupId: string([minLength(1)]), +}); + +export const ResolveAlertSchema = object({ + alertId: string([minLength(1)]), + resolution: picklist(["RESOLVED", "FALSE_POSITIVE"]), +}); diff --git a/web/src/server/services/correlation.service.test.ts b/web/src/server/services/correlation.service.test.ts new file mode 100644 index 0000000..edb85e5 --- /dev/null +++ b/web/src/server/services/correlation.service.test.ts @@ -0,0 +1,185 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const mockSelect = vi.fn(); +const mockInsert = vi.fn(); +const mockUpdate = vi.fn(); + +vi.mock("~/server/db", () => ({ + db: { + select: mockSelect, + insert: mockInsert, + update: mockUpdate, + }, +})); + +vi.mock("~/server/db/schema", () => ({ + normalizedAlerts: {}, + correlationGroups: {}, + auditLogs: {}, +})); + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("getThreatScore", () => { + function makeSelectChain(data: unknown) { + const where = vi.fn().mockResolvedValue(data); + const from = vi.fn().mockReturnValue({ where }); + mockSelect.mockReturnValue({ from }); + } + + function daysAgo(n: number): Date { + return new Date(Date.now() - n * 24 * 60 * 60 * 1000); + } + + it("returns score 0 with no alerts", async () => { + makeSelectChain([]); + + const { getThreatScore } = await import("./correlation.service"); + const result = await getThreatScore("user-1"); + expect(result.score).toBe(0); + expect(result.breakdown).toEqual([]); + }); + + it("returns higher score for more severe alerts", async () => { + const highAlert = { + id: "a1", + severity: "CRITICAL", + source: "DARKWATCH", + createdAt: daysAgo(1), + }; + makeSelectChain([highAlert]); + + const { getThreatScore } = await import("./correlation.service"); + const result = await getThreatScore("user-1"); + expect(result.score).toBeGreaterThan(0); + expect(result.breakdown[0].source).toBe("DARKWATCH"); + }); + + it("returns lower score for less severe alerts", async () => { + const lowAlert = { + id: "a1", + severity: "LOW", + source: "DARKWATCH", + createdAt: daysAgo(1), + }; + makeSelectChain([lowAlert]); + + const { getThreatScore } = await import("./correlation.service"); + const result = await getThreatScore("user-1"); + expect(result.score).toBeGreaterThan(0); + }); + + it("assigns higher weight to CRITICAL than LOW", async () => { + const { getThreatScore: getScore } = await import("./correlation.service"); + + makeSelectChain([{ + id: "a1", + severity: "CRITICAL", + source: "DARKWATCH", + createdAt: daysAgo(1), + }]); + const highResult = await getScore("user-1"); + + makeSelectChain([{ + id: "a2", + severity: "LOW", + source: "DARKWATCH", + createdAt: daysAgo(1), + }]); + const lowResult = await getScore("user-1"); + + expect(highResult.score).toBeGreaterThan(lowResult.score); + }); + + it("ignores alerts older than 30 days", async () => { + makeSelectChain([]); + + const { getThreatScore } = await import("./correlation.service"); + const result = await getThreatScore("user-1"); + expect(result.score).toBe(0); + }); + + it("provides breakdown by source", async () => { + const alerts = [ + { id: "a1", severity: "HIGH", source: "DARKWATCH", createdAt: daysAgo(1) }, + { id: "a2", severity: "WARNING", source: "SPAMSHIELD", createdAt: daysAgo(1) }, + ]; + makeSelectChain(alerts); + + const { getThreatScore } = await import("./correlation.service"); + const result = await getThreatScore("user-1"); + expect(result.breakdown.length).toBeGreaterThanOrEqual(2); + }); +}); + +describe("getAlertTimeline", () => { + it("returns paginated alerts for user", async () => { + const countWhere = vi.fn().mockResolvedValue([{ count: 0 }]); + const countFrom = vi.fn().mockReturnValue({ where: countWhere }); + mockSelect.mockReturnValueOnce({ from: countFrom }); + + const offset = vi.fn().mockResolvedValue([]); + const limit = vi.fn().mockReturnValue({ offset }); + const orderBy = vi.fn().mockReturnValue({ limit }); + const dataWhere = vi.fn().mockReturnValue({ orderBy }); + const dataFrom = vi.fn().mockReturnValue({ where: dataWhere }); + mockSelect.mockReturnValue({ from: dataFrom }); + + const { getAlertTimeline } = await import("./correlation.service"); + const result = await getAlertTimeline("user-1", { page: 1, limit: 20 }); + expect(result.page).toBe(1); + expect(result.limit).toBe(20); + expect(result.total).toBe(0); + }); +}); + +describe("resolveAlert", () => { + it("creates a group for ungrouped alert and resolves it", async () => { + const selectLimit = vi.fn().mockResolvedValue([{ + id: "a1", + userId: "user-1", + source: "DARKWATCH", + severity: "HIGH", + entities: { emails: [], phones: [], ssns: [] }, + title: "Test", + }]); + const selectWhere = vi.fn().mockReturnValue({ limit: selectLimit }); + const selectFrom = vi.fn().mockReturnValue({ where: selectWhere }); + mockSelect.mockReturnValue({ from: selectFrom }); + + const insertGroupReturning = vi.fn().mockResolvedValue([{ + id: "new-group", + userId: "user-1", + status: "ACTIVE", + entities: { emails: [], phones: [], ssns: [] }, + highestSeverity: "HIGH", + alertCount: 1, + }]); + const insertGroupValues = vi.fn().mockReturnValue({ returning: insertGroupReturning }); + mockInsert.mockReturnValueOnce({ values: insertGroupValues }); + + const updateAlertReturning = vi.fn().mockResolvedValue([{ id: "a1", groupId: "new-group" }]); + const updateAlertWhere = vi.fn().mockReturnValue({ returning: updateAlertReturning }); + const updateAlertSet = vi.fn().mockReturnValue({ where: updateAlertWhere }); + mockUpdate.mockReturnValueOnce({ set: updateAlertSet }); + + const updateGroupReturning = vi.fn().mockResolvedValue([{ + id: "new-group", + status: "RESOLVED", + resolvedAt: new Date(), + }]); + const updateGroupWhere = vi.fn().mockReturnValue({ returning: updateGroupReturning }); + const updateGroupSet = vi.fn().mockReturnValue({ where: updateGroupWhere }); + mockUpdate.mockReturnValueOnce({ set: updateGroupSet }); + + const auditReturning = vi.fn().mockResolvedValue([{}]); + const auditValues = vi.fn().mockReturnValue({ returning: auditReturning }); + mockInsert.mockReturnValue({ values: auditValues }); + + const { resolveAlert } = await import("./correlation.service"); + const result = await resolveAlert("user-1", "a1", "RESOLVED"); + expect(result.status).toBe("RESOLVED"); + }); +}); diff --git a/web/src/server/services/correlation.service.ts b/web/src/server/services/correlation.service.ts new file mode 100644 index 0000000..2a3cc40 --- /dev/null +++ b/web/src/server/services/correlation.service.ts @@ -0,0 +1,472 @@ +import { TRPCError } from "@trpc/server"; +import { and, desc, eq, count, gte, inArray, sql, lte } from "drizzle-orm"; +import { db } from "~/server/db"; +import { normalizedAlerts, correlationGroups, auditLogs } from "~/server/db/schema"; +import { + findRelatedAlerts, + createCorrelationGroup, + updateGroupSeverity, + deduplicateAlerts, +} from "./correlation/engine"; +import type { NormalizedAlertInput, EntitySet } from "./correlation/normalizer"; + +const SEVERITY_WEIGHTS: Record = { + CRITICAL: 40, + HIGH: 25, + WARNING: 15, + MEDIUM: 10, + INFO: 5, + LOW: 1, +}; + +async function ensureGroupForAlert(alertId: string, userId: string): Promise { + const [alert] = await db + .select() + .from(normalizedAlerts) + .where(eq(normalizedAlerts.id, alertId)) + .limit(1); + + if (!alert) throw new TRPCError({ code: "NOT_FOUND", message: "Alert not found" }); + + if (alert.groupId) return alert.groupId; + + const [group] = await db + .insert(correlationGroups) + .values({ + userId, + entities: alert.entities as Record, + highestSeverity: alert.severity as "LOW" | "INFO" | "MEDIUM" | "WARNING" | "HIGH" | "CRITICAL", + alertCount: 1, + summary: alert.title, + }) + .returning(); + + await db + .update(normalizedAlerts) + .set({ groupId: group.id }) + .where(eq(normalizedAlerts.id, alertId)); + + return group.id; +} + +export async function normalizeAlert( + source: NormalizedAlertInput["source"], + sourceAlertId: string, + category: NormalizedAlertInput["category"], + severity: NormalizedAlertInput["severity"], + userId: string, + title: string, + description: string, + entities: EntitySet, + payload?: Record, +) { + const [deduped] = await deduplicateAlerts([ + { source, sourceAlertId, category, severity, title, description, entities, payload } as NormalizedAlertInput, + ]); + if (!deduped) return null; + + const [alert] = await db + .insert(normalizedAlerts) + .values({ + source, + sourceAlertId, + category, + severity, + userId, + title, + description, + entities: entities as unknown as Record, + payload: payload ?? null, + createdAt: new Date(), + }) + .returning(); + + return alert; +} + +export async function correlateAlerts(userId: string): Promise { + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + + const alerts = await db + .select() + .from(normalizedAlerts) + .where( + and( + eq(normalizedAlerts.userId, userId), + gte(normalizedAlerts.createdAt, thirtyDaysAgo), + ), + ); + + const grouped = new Set(); + for (let i = 0; i < alerts.length; i++) { + if (grouped.has(alerts[i].id)) continue; + + const entityA = alerts[i].entities as EntitySet; + const related = alerts.filter((a, j) => { + if (i === j || grouped.has(a.id)) return false; + if (a.groupId && a.groupId === alerts[i].groupId) return false; + return entitiesOverlap(entityA, a.entities as EntitySet); + }); + + if (related.length === 0) continue; + + const groupAlerts = [alerts[i], ...related]; + for (const a of groupAlerts) grouped.add(a.id); + + const existingGroupId = groupAlerts.find((a) => a.groupId)?.groupId; + if (existingGroupId) { + const ungrouped = groupAlerts.filter((a) => !a.groupId || a.groupId !== existingGroupId); + if (ungrouped.length > 0) { + const ungroupedIds = ungrouped.map((a) => a.id); + await db + .update(normalizedAlerts) + .set({ groupId: existingGroupId }) + .where(inArray(normalizedAlerts.id, ungroupedIds)); + } + await updateGroupSeverity(existingGroupId); + } else { + const mergedEntities = mergeEntities(groupAlerts.map((a) => a.entities as EntitySet)); + const group = await createCorrelationGroup(groupAlerts, userId, mergedEntities); + for (const a of groupAlerts) grouped.add(a.id); + } + } +} + +function entitiesOverlap(a: EntitySet, b: EntitySet): boolean { + const aSet = new Set([...a.emails, ...a.phones, ...a.ssns]); + for (const val of [...b.emails, ...b.phones, ...b.ssns]) { + if (aSet.has(val)) return true; + } + return false; +} + +function mergeEntities(entitySets: EntitySet[]): EntitySet { + const emails = [...new Set(entitySets.flatMap((e) => e.emails))]; + const phones = [...new Set(entitySets.flatMap((e) => e.phones))]; + const ssns = [...new Set(entitySets.flatMap((e) => e.ssns))]; + return { emails, phones, ssns }; +} + +export interface TimelineFilter { + source?: string; + severity?: string; + status?: string; + page?: number; + limit?: number; +} + +export async function getAlertTimeline( + userId: string, + filters: TimelineFilter = {}, +): Promise<{ + items: Array>; + total: number; + page: number; + limit: number; + totalPages: number; +}> { + const page = filters.page ?? 1; + const limit = filters.limit ?? 20; + const offset = (page - 1) * limit; + + const conditions = [eq(normalizedAlerts.userId, userId)]; + if (filters.source) { + conditions.push(eq(normalizedAlerts.source, filters.source as typeof normalizedAlerts.$inferSelect.source)); + } + if (filters.severity) { + conditions.push(eq(normalizedAlerts.severity, filters.severity as typeof normalizedAlerts.$inferSelect.severity)); + } + + let groupStatusCondition = undefined; + if (filters.status) { + groupStatusCondition = filters.status; + } + + let items: Array>; + + if (groupStatusCondition) { + const [totalResult] = await db + .select({ count: count() }) + .from(normalizedAlerts) + .leftJoin(correlationGroups, eq(normalizedAlerts.groupId, correlationGroups.id)) + .where( + and( + ...conditions, + eq(correlationGroups.status, groupStatusCondition as "ACTIVE" | "RESOLVED" | "FALSE_POSITIVE"), + ), + ); + + const rows = await db + .select() + .from(normalizedAlerts) + .leftJoin(correlationGroups, eq(normalizedAlerts.groupId, correlationGroups.id)) + .where( + and( + ...conditions, + eq(correlationGroups.status, groupStatusCondition as "ACTIVE" | "RESOLVED" | "FALSE_POSITIVE"), + ), + ) + .orderBy(desc(normalizedAlerts.createdAt)) + .limit(limit) + .offset(offset); + + items = rows.map((r) => ({ + ...r.normalized_alerts, + groupStatus: r.correlation_groups?.status ?? "ACTIVE", + group: r.correlation_groups ?? null, + })); + + return { + items, + total: totalResult.count, + page, + limit, + totalPages: Math.ceil(totalResult.count / limit), + }; + } + + const [totalResult] = await db + .select({ count: count() }) + .from(normalizedAlerts) + .where(and(...conditions)); + + const rows = await db + .select() + .from(normalizedAlerts) + .where(and(...conditions)) + .orderBy(desc(normalizedAlerts.createdAt)) + .limit(limit) + .offset(offset); + + items = rows as unknown as Array>; + + return { + items, + total: totalResult.count, + page, + limit, + totalPages: Math.ceil(totalResult.count / limit), + }; +} + +export async function getAlertDetails(userId: string, alertId: string) { + const [alert] = await db + .select() + .from(normalizedAlerts) + .where(and(eq(normalizedAlerts.id, alertId), eq(normalizedAlerts.userId, userId))) + .limit(1); + + if (!alert) throw new TRPCError({ code: "NOT_FOUND", message: "Alert not found" }); + + let group = null; + let relatedAlerts: Array = []; + + if (alert.groupId) { + const [foundGroup] = await db + .select() + .from(correlationGroups) + .where(eq(correlationGroups.id, alert.groupId)) + .limit(1); + group = foundGroup ?? null; + + relatedAlerts = await db + .select() + .from(normalizedAlerts) + .where( + and( + eq(normalizedAlerts.groupId, alert.groupId), + eq(normalizedAlerts.userId, userId), + ), + ); + } + + return { alert, group, relatedAlerts }; +} + +export async function getCorrelationGroups( + userId: string, + filters: { status?: string; page?: number; limit?: number } = {}, +) { + const page = filters.page ?? 1; + const limit = filters.limit ?? 20; + const offset = (page - 1) * limit; + + const conditions = [eq(correlationGroups.userId, userId)]; + if (filters.status) { + conditions.push(eq(correlationGroups.status, filters.status as "ACTIVE" | "RESOLVED" | "FALSE_POSITIVE")); + } + + const [totalResult] = await db + .select({ count: count() }) + .from(correlationGroups) + .where(and(...conditions)); + + const items = await db + .select() + .from(correlationGroups) + .where(and(...conditions)) + .orderBy(desc(correlationGroups.createdAt)) + .limit(limit) + .offset(offset); + + return { + items, + total: totalResult.count, + page, + limit, + totalPages: Math.ceil(totalResult.count / limit), + }; +} + +export async function getCorrelationGroupDetails(userId: string, groupId: string) { + const [group] = await db + .select() + .from(correlationGroups) + .where(and(eq(correlationGroups.id, groupId), eq(correlationGroups.userId, userId))) + .limit(1); + + if (!group) throw new TRPCError({ code: "NOT_FOUND", message: "Group not found" }); + + const alerts = await db + .select() + .from(normalizedAlerts) + .where(eq(normalizedAlerts.groupId, groupId)) + .orderBy(desc(normalizedAlerts.createdAt)); + + return { group, alerts }; +} + +export async function resolveAlert( + userId: string, + alertId: string, + resolution: "RESOLVED" | "FALSE_POSITIVE", +) { + const groupId = await ensureGroupForAlert(alertId, userId); + + const [updated] = await db + .update(correlationGroups) + .set({ + status: resolution as "ACTIVE" | "RESOLVED" | "FALSE_POSITIVE", + resolvedAt: new Date(), + }) + .where(and(eq(correlationGroups.id, groupId), eq(correlationGroups.userId, userId))) + .returning(); + + if (!updated) throw new TRPCError({ code: "NOT_FOUND", message: "Group not found" }); + + await db + .insert(auditLogs) + .values({ + userId, + action: "alert_resolve", + resource: "normalized_alert", + resourceId: alertId, + changes: { resolution, groupId }, + metadata: { source: "correlation_router" }, + }); + + return updated; +} + +export async function getThreatScore(userId: string): Promise<{ + score: number; + breakdown: Array<{ source: string; score: number }>; +}> { + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + + const alerts = await db + .select() + .from(normalizedAlerts) + .where( + and( + eq(normalizedAlerts.userId, userId), + gte(normalizedAlerts.createdAt, thirtyDaysAgo), + ), + ); + + const now = Date.now(); + let totalScore = 0; + const sourceScores: Record = {}; + + for (const alert of alerts) { + const weight = SEVERITY_WEIGHTS[alert.severity] ?? 1; + const ageDays = (now - alert.createdAt.getTime()) / (1000 * 60 * 60 * 24); + const decay = Math.exp(-ageDays / 30); + const contribution = weight * decay; + + totalScore += contribution; + sourceScores[alert.source] = (sourceScores[alert.source] ?? 0) + contribution; + } + + const finalScore = Math.min(100, Math.round(totalScore)); + const breakdown = Object.entries(sourceScores).map(([source, score]) => ({ + source, + score: Math.round(score * 10) / 10, + })); + + return { score: finalScore, breakdown }; +} + +export async function getAlertStats(userId: string) { + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + + const [totalResult] = await db + .select({ count: count() }) + .from(normalizedAlerts) + .where(and(eq(normalizedAlerts.userId, userId), gte(normalizedAlerts.createdAt, thirtyDaysAgo))); + + const bySeverity = await db + .select({ severity: normalizedAlerts.severity, count: count() }) + .from(normalizedAlerts) + .where(and(eq(normalizedAlerts.userId, userId), gte(normalizedAlerts.createdAt, thirtyDaysAgo))) + .groupBy(normalizedAlerts.severity); + + const bySource = await db + .select({ source: normalizedAlerts.source, count: count() }) + .from(normalizedAlerts) + .where(and(eq(normalizedAlerts.userId, userId), gte(normalizedAlerts.createdAt, thirtyDaysAgo))) + .groupBy(normalizedAlerts.source); + + const [activeGroupsResult] = await db + .select({ count: count() }) + .from(correlationGroups) + .where( + and( + eq(correlationGroups.userId, userId), + eq(correlationGroups.status, "ACTIVE"), + ), + ); + + const [resolvedResult] = await db + .select({ count: count() }) + .from(correlationGroups) + .where( + and( + eq(correlationGroups.userId, userId), + eq(correlationGroups.status, "RESOLVED"), + ), + ); + + const [fpResult] = await db + .select({ count: count() }) + .from(correlationGroups) + .where( + and( + eq(correlationGroups.userId, userId), + eq(correlationGroups.status, "FALSE_POSITIVE"), + ), + ); + + const threat = await getThreatScore(userId); + + return { + totalAlerts: totalResult.count, + bySeverity: Object.fromEntries(bySeverity.map((r) => [r.severity, r.count])), + bySource: Object.fromEntries(bySource.map((r) => [r.source, r.count])), + activeGroups: activeGroupsResult.count, + resolvedCount: resolvedResult.count, + falsePositiveCount: fpResult.count, + threatScore: threat.score, + threatBreakdown: threat.breakdown, + }; +} diff --git a/web/src/server/services/correlation/engine.test.ts b/web/src/server/services/correlation/engine.test.ts new file mode 100644 index 0000000..73547dd --- /dev/null +++ b/web/src/server/services/correlation/engine.test.ts @@ -0,0 +1,257 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const mockSelect = vi.fn(); +const mockInsert = vi.fn(); +const mockUpdate = vi.fn(); + +vi.mock("~/server/db", () => ({ + db: { + select: mockSelect, + insert: mockInsert, + update: mockUpdate, + }, +})); + +vi.mock("~/server/db/schema", () => ({ + normalizedAlerts: {}, + correlationGroups: {}, +})); + +interface MockChain { + from: ReturnType; + where: ReturnType; + values: ReturnType; + returning: ReturnType; + set: ReturnType; +} + +function makeSelectChain(data: unknown): MockChain { + const where = vi.fn().mockResolvedValue(data); + const from = vi.fn().mockReturnValue({ where }); + const chain = { from, where } as MockChain; + mockSelect.mockReturnValue({ from }); + return chain; +} + +function makeInsertChain(data: unknown): MockChain { + const returning = vi.fn().mockResolvedValue(data); + const values = vi.fn().mockReturnValue({ returning }); + const chain = { values, returning } as MockChain; + mockInsert.mockReturnValue({ values }); + return chain; +} + +function makeUpdateChain(): { set: ReturnType; where: ReturnType } { + const where = vi.fn().mockResolvedValue([]); + const set = vi.fn().mockReturnValue({ where }); + mockUpdate.mockReturnValue({ set }); + return { set, where }; +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("getHighestSeverity", () => { + it("returns LOW for empty input", async () => { + const { getHighestSeverity } = await import("./engine"); + expect(getHighestSeverity([])).toBe("LOW"); + }); + + it("returns highest severity from list", async () => { + const { getHighestSeverity } = await import("./engine"); + expect(getHighestSeverity(["LOW", "HIGH", "INFO"])).toBe("HIGH"); + }); + + it("returns CRITICAL as highest", async () => { + const { getHighestSeverity } = await import("./engine"); + expect(getHighestSeverity(["INFO", "WARNING", "CRITICAL"])).toBe("CRITICAL"); + }); + + it("returns the only severity", async () => { + const { getHighestSeverity } = await import("./engine"); + expect(getHighestSeverity(["MEDIUM"])).toBe("MEDIUM"); + }); +}); + +describe("deduplicateAlerts", () => { + it("returns all inputs when none exist", async () => { + makeSelectChain([]); + + const { deduplicateAlerts } = await import("./engine"); + const inputs = [ + { source: "DARKWATCH" as const, sourceAlertId: "dw:1", category: "BREACH_EXPOSURE" as const, severity: "HIGH" as const, title: "Test", description: "Test", entities: { emails: [], phones: [], ssns: [] } }, + ]; + const result = await deduplicateAlerts(inputs); + expect(result).toHaveLength(1); + }); + + it("filters out existing sourceAlertIds", async () => { + makeSelectChain([{ sourceAlertId: "dw:1" }]); + + const { deduplicateAlerts } = await import("./engine"); + const inputs = [ + { source: "DARKWATCH" as const, sourceAlertId: "dw:1", category: "BREACH_EXPOSURE" as const, severity: "HIGH" as const, title: "Test", description: "Test", entities: { emails: [], phones: [], ssns: [] } }, + { source: "SPAMSHIELD" as const, sourceAlertId: "ss:2", category: "SPAM_CALL" as const, severity: "WARNING" as const, title: "Test2", description: "Test2", entities: { emails: [], phones: [], ssns: [] } }, + ]; + const result = await deduplicateAlerts(inputs); + expect(result).toHaveLength(1); + expect(result[0].sourceAlertId).toBe("ss:2"); + }); + + it("returns empty for all duplicates", async () => { + makeSelectChain([{ sourceAlertId: "dw:1" }, { sourceAlertId: "ss:2" }]); + + const { deduplicateAlerts } = await import("./engine"); + const inputs = [ + { source: "DARKWATCH" as const, sourceAlertId: "dw:1", category: "BREACH_EXPOSURE" as const, severity: "HIGH" as const, title: "Test", description: "Test", entities: { emails: [], phones: [], ssns: [] } }, + { source: "SPAMSHIELD" as const, sourceAlertId: "ss:2", category: "SPAM_CALL" as const, severity: "WARNING" as const, title: "Test2", description: "Test2", entities: { emails: [], phones: [], ssns: [] } }, + ]; + const result = await deduplicateAlerts(inputs); + expect(result).toHaveLength(0); + }); + + it("returns empty array for empty input", async () => { + const { deduplicateAlerts } = await import("./engine"); + const result = await deduplicateAlerts([]); + expect(result).toEqual([]); + }); +}); + +describe("createCorrelationGroup", () => { + it("creates group with correct highest severity", async () => { + makeInsertChain([{ + id: "group-1", + userId: "user-1", + entities: { emails: ["test@example.com"], phones: [], ssns: [] }, + highestSeverity: "CRITICAL", + alertCount: 3, + summary: "Correlated group of 3 alert(s)", + }]); + makeUpdateChain(); + + const { createCorrelationGroup } = await import("./engine"); + const alerts = [ + { id: "a1", severity: "HIGH" }, + { id: "a2", severity: "CRITICAL" }, + { id: "a3", severity: "INFO" }, + ] as Array>; + + const group = await createCorrelationGroup( + alerts as never, + "user-1", + { emails: ["test@example.com"], phones: [], ssns: [] }, + ); + expect(group.id).toBe("group-1"); + expect(group.highestSeverity).toBe("CRITICAL"); + expect(group.alertCount).toBe(3); + }); + + it("creates group with unique alert IDs", async () => { + makeInsertChain([{ + id: "group-2", + userId: "user-1", + entities: { emails: [], phones: ["+1234567890"], ssns: [] }, + highestSeverity: "WARNING", + alertCount: 1, + summary: "Correlated group of 1 alert(s)", + }]); + makeUpdateChain(); + + const { createCorrelationGroup } = await import("./engine"); + const alerts = [ + { id: "a1", severity: "WARNING" }, + { id: "a1", severity: "WARNING" }, + ] as Array>; + + const group = await createCorrelationGroup( + alerts as never, + "user-1", + { emails: [], phones: ["+1234567890"], ssns: [] }, + ); + expect(group.alertCount).toBe(1); + }); +}); + +describe("updateGroupSeverity", () => { + it("updates group severity to highest among alerts", async () => { + makeSelectChain([ + { id: "a1", severity: "LOW" }, + { id: "a2", severity: "HIGH" }, + ]); + makeUpdateChain(); + + const { updateGroupSeverity } = await import("./engine"); + await updateGroupSeverity("group-1"); + const setCall = mockUpdate.mock.results[0]?.value.set; + expect(setCall).toHaveBeenCalledWith( + expect.objectContaining({ highestSeverity: "HIGH", alertCount: 2 }), + ); + }); + + it("does nothing for empty group", async () => { + makeSelectChain([]); + + const { updateGroupSeverity } = await import("./engine"); + await updateGroupSeverity("group-empty"); + expect(mockUpdate).not.toHaveBeenCalled(); + }); +}); + +describe("findRelatedAlerts", () => { + it("finds alerts sharing an email address", async () => { + const existingAlerts = [ + { id: "a2", userId: "user-1", entities: { emails: ["shared@example.com"], phones: [], ssns: [] }, severity: "HIGH" }, + { id: "a3", userId: "user-1", entities: { emails: ["other@example.com"], phones: [], ssns: [] }, severity: "LOW" }, + ]; + + makeSelectChain(existingAlerts); + + const { findRelatedAlerts } = await import("./engine"); + const result = await findRelatedAlerts("a1", "user-1", { emails: ["shared@example.com"], phones: [], ssns: [] }); + expect(result).toHaveLength(1); + expect(result[0].id).toBe("a2"); + }); + + it("finds alerts sharing a phone number", async () => { + const existingAlerts = [ + { id: "a2", userId: "user-1", entities: { emails: [], phones: ["+1234567890"], ssns: [] }, severity: "HIGH" }, + ]; + + makeSelectChain(existingAlerts); + + const { findRelatedAlerts } = await import("./engine"); + const result = await findRelatedAlerts("a1", "user-1", { emails: [], phones: ["+1234567890"], ssns: [] }); + expect(result).toHaveLength(1); + }); + + it("finds alerts sharing an SSN", async () => { + const existingAlerts = [ + { id: "a2", userId: "user-1", entities: { emails: [], phones: [], ssns: ["123-45-6789"] }, severity: "CRITICAL" }, + ]; + + makeSelectChain(existingAlerts); + + const { findRelatedAlerts } = await import("./engine"); + const result = await findRelatedAlerts("a1", "user-1", { emails: [], phones: [], ssns: ["123-45-6789"] }); + expect(result).toHaveLength(1); + }); + + it("returns empty when no entities match", async () => { + const existingAlerts = [ + { id: "a2", userId: "user-1", entities: { emails: ["other@example.com"], phones: [], ssns: [] }, severity: "HIGH" }, + ]; + + makeSelectChain(existingAlerts); + + const { findRelatedAlerts } = await import("./engine"); + const result = await findRelatedAlerts("a1", "user-1", { emails: ["unrelated@example.com"], phones: [], ssns: [] }); + expect(result).toHaveLength(0); + }); + + it("returns empty when no entities provided", async () => { + const { findRelatedAlerts } = await import("./engine"); + const result = await findRelatedAlerts("a1", "user-1", { emails: [], phones: [], ssns: [] }); + expect(result).toEqual([]); + }); +}); diff --git a/web/src/server/services/correlation/engine.ts b/web/src/server/services/correlation/engine.ts new file mode 100644 index 0000000..cbee197 --- /dev/null +++ b/web/src/server/services/correlation/engine.ts @@ -0,0 +1,127 @@ +import { and, eq, gte, inArray, not, sql } from "drizzle-orm"; +import type { NodePgDatabase } from "drizzle-orm/node-postgres"; +import { db } from "~/server/db"; +import { normalizedAlerts, correlationGroups } from "~/server/db/schema"; +import type { NormalizedAlertInput, EntitySet } from "./normalizer"; +import type * as schema from "~/server/db/schema"; + +const SEVERITY_ORDER: Record = { + LOW: 0, + INFO: 1, + MEDIUM: 2, + WARNING: 3, + HIGH: 4, + CRITICAL: 5, +}; + +export function getHighestSeverity(severities: string[]): string { + let highest = "LOW"; + let highestOrder = 0; + for (const s of severities) { + const order = SEVERITY_ORDER[s] ?? 0; + if (order > highestOrder) { + highestOrder = order; + highest = s; + } + } + return highest; +} + +type NormalizedAlert = typeof normalizedAlerts.$inferSelect; + +export async function findRelatedAlerts( + alertId: string, + userId: string, + entities: EntitySet, +): Promise { + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + const allEntityValues = [...entities.emails, ...entities.phones, ...entities.ssns]; + if (allEntityValues.length === 0) return []; + + const alerts = await db + .select() + .from(normalizedAlerts) + .where( + and( + eq(normalizedAlerts.userId, userId), + not(eq(normalizedAlerts.id, alertId)), + gte(normalizedAlerts.createdAt, thirtyDaysAgo), + ), + ); + + return alerts.filter((a) => entitiesOverlap(a.entities as EntitySet, entities)); +} + +function entitiesOverlap(a: EntitySet, b: EntitySet): boolean { + const aSet = new Set([...a.emails, ...a.phones, ...a.ssns]); + for (const val of [...b.emails, ...b.phones, ...b.ssns]) { + if (aSet.has(val)) return true; + } + return false; +} + +export async function createCorrelationGroup( + alerts: NormalizedAlert[], + userId: string, + entities: EntitySet, +): Promise { + const severities = alerts.map((a) => a.severity); + const highestSeverity = getHighestSeverity(severities); + + const alertIds = [...new Set(alerts.map((a) => a.id))]; + + const [group] = await db + .insert(correlationGroups) + .values({ + userId, + entities: entities as unknown as Record, + highestSeverity: highestSeverity as "LOW" | "INFO" | "MEDIUM" | "WARNING" | "HIGH" | "CRITICAL", + alertCount: alertIds.length, + summary: `Correlated group of ${alertIds.length} alert(s)`, + }) + .returning(); + + if (alertIds.length > 0) { + await db + .update(normalizedAlerts) + .set({ groupId: group.id }) + .where(inArray(normalizedAlerts.id, alertIds)); + } + + return group; +} + +export async function updateGroupSeverity(groupId: string): Promise { + const groupAlerts = await db + .select() + .from(normalizedAlerts) + .where(eq(normalizedAlerts.groupId, groupId)); + + if (groupAlerts.length === 0) return; + + const severities = groupAlerts.map((a) => a.severity); + const highestSeverity = getHighestSeverity(severities); + + await db + .update(correlationGroups) + .set({ + highestSeverity: highestSeverity as "LOW" | "INFO" | "MEDIUM" | "WARNING" | "HIGH" | "CRITICAL", + alertCount: groupAlerts.length, + }) + .where(eq(correlationGroups.id, groupId)); +} + +export async function deduplicateAlerts( + inputs: NormalizedAlertInput[], +): Promise { + if (inputs.length === 0) return []; + + const sourceAlertIds = inputs.map((i) => i.sourceAlertId); + const existing = await db + .select({ sourceAlertId: normalizedAlerts.sourceAlertId }) + .from(normalizedAlerts) + .where(inArray(normalizedAlerts.sourceAlertId, sourceAlertIds)); + + const existingSet = new Set(existing.map((e) => e.sourceAlertId)); + return inputs.filter((i) => !existingSet.has(i.sourceAlertId)); +} diff --git a/web/src/server/services/correlation/normalizer.test.ts b/web/src/server/services/correlation/normalizer.test.ts new file mode 100644 index 0000000..b49b359 --- /dev/null +++ b/web/src/server/services/correlation/normalizer.test.ts @@ -0,0 +1,164 @@ +import { describe, it, expect } from "vitest"; + +describe("extractEntities", () => { + it("extracts emails from text", async () => { + const { extractEntities } = await import("./normalizer"); + const result = extractEntities("Contact me at user@example.com or admin@test.com"); + expect(result.emails).toContain("user@example.com"); + expect(result.emails).toContain("admin@test.com"); + }); + + it("extracts phone numbers from text", async () => { + const { extractEntities } = await import("./normalizer"); + const result = extractEntities("Call +14155551234 or 2125551234"); + expect(result.phones.length).toBeGreaterThan(0); + }); + + it("extracts SSNs from text", async () => { + const { extractEntities } = await import("./normalizer"); + const result = extractEntities("SSN: 123-45-6789"); + expect(result.ssns).toContain("123-45-6789"); + }); + + it("returns empty arrays for no matches", async () => { + const { extractEntities } = await import("./normalizer"); + const result = extractEntities("No entities here"); + expect(result.emails).toEqual([]); + expect(result.phones).toEqual([]); + expect(result.ssns).toEqual([]); + }); + + it("deduplicates entities", async () => { + const { extractEntities } = await import("./normalizer"); + const result = extractEntities("a@b.com a@b.com"); + expect(result.emails).toEqual(["a@b.com"]); + }); +}); + +describe("normalizeDarkWatchAlert", () => { + it("creates NormalizedAlert with correct severity mapping", async () => { + const { normalizeDarkWatchAlert } = await import("./normalizer"); + const result = normalizeDarkWatchAlert({ + id: "exp-1", + identifier: "user@example.com", + severity: "critical", + source: "hibp", + }); + expect(result.source).toBe("DARKWATCH"); + expect(result.sourceAlertId).toBe("darkwatch:exp-1"); + expect(result.category).toBe("BREACH_EXPOSURE"); + expect(result.severity).toBe("CRITICAL"); + expect(result.entities.emails).toContain("user@example.com"); + }); + + it("maps warning severity correctly", async () => { + const { normalizeDarkWatchAlert } = await import("./normalizer"); + const result = normalizeDarkWatchAlert({ + id: "exp-2", + identifier: "+1234567890", + severity: "warning", + source: "shodan", + }); + expect(result.severity).toBe("WARNING"); + }); + + it("maps info severity correctly", async () => { + const { normalizeDarkWatchAlert } = await import("./normalizer"); + const result = normalizeDarkWatchAlert({ + id: "exp-3", + identifier: "domain.com", + severity: "info", + source: "censys", + }); + expect(result.severity).toBe("INFO"); + }); +}); + +describe("normalizeSpamShieldAlert", () => { + it("creates SPAM_CALL alert for phone calls", async () => { + const { normalizeSpamShieldAlert } = await import("./normalizer"); + const result = normalizeSpamShieldAlert({ + id: "det-1", + callerNumber: "+1234567890", + verdict: "SYNTHETIC", + }); + expect(result.source).toBe("SPAMSHIELD"); + expect(result.category).toBe("SPAM_CALL"); + expect(result.severity).toBe("HIGH"); + }); + + it("creates SPAM_SMS alert for SMS", async () => { + const { normalizeSpamShieldAlert } = await import("./normalizer"); + const result = normalizeSpamShieldAlert({ + id: "det-2", + callerNumber: "+1234567890", + verdict: "UNCERTAIN", + type: "sms", + }); + expect(result.category).toBe("SPAM_SMS"); + expect(result.severity).toBe("WARNING"); + }); + + it("uses INFO for NATURAL verdict", async () => { + const { normalizeSpamShieldAlert } = await import("./normalizer"); + const result = normalizeSpamShieldAlert({ + id: "det-3", + callerNumber: "+1234567890", + verdict: "NATURAL", + }); + expect(result.severity).toBe("INFO"); + }); +}); + +describe("normalizeVoicePrintAlert", () => { + it("creates SYNTHETIC_VOICE alert", async () => { + const { normalizeVoicePrintAlert } = await import("./normalizer"); + const result = normalizeVoicePrintAlert({ + id: "an-1", + recordingId: "rec-1", + verdict: "SYNTHETIC", + }); + expect(result.source).toBe("VOICEPRINT"); + expect(result.category).toBe("SYNTHETIC_VOICE"); + expect(result.severity).toBe("CRITICAL"); + }); + + it("uses WARNING for UNCERTAIN verdict", async () => { + const { normalizeVoicePrintAlert } = await import("./normalizer"); + const result = normalizeVoicePrintAlert({ + id: "an-2", + recordingId: "rec-2", + verdict: "UNCERTAIN", + }); + expect(result.severity).toBe("WARNING"); + }); +}); + +describe("normalizeHomeTitleAlert", () => { + it("creates HOME_TITLE alert", async () => { + const { normalizeHomeTitleAlert } = await import("./normalizer"); + const result = normalizeHomeTitleAlert({ + id: "ch-1", + propertyAddress: "123 Main St, Springfield", + changeType: "ownership_transfer", + severity: "critical", + }); + expect(result.source).toBe("HOME_TITLE"); + expect(result.category).toBe("HOME_TITLE"); + expect(result.severity).toBe("CRITICAL"); + expect(result.entities.emails.length).toBe(0); + }); +}); + +describe("normalizeRemoveBrokersAlert", () => { + it("creates INFO_BROKER alert", async () => { + const { normalizeRemoveBrokersAlert } = await import("./normalizer"); + const result = normalizeRemoveBrokersAlert({ + id: "br-1", + brokerName: "Spokeo", + }); + expect(result.source).toBe("INFO_BROKER"); + expect(result.category).toBe("INFO_BROKER_LISTING"); + expect(result.severity).toBe("WARNING"); + }); +}); diff --git a/web/src/server/services/correlation/normalizer.ts b/web/src/server/services/correlation/normalizer.ts new file mode 100644 index 0000000..28a25cd --- /dev/null +++ b/web/src/server/services/correlation/normalizer.ts @@ -0,0 +1,148 @@ +import type { + alertSource, + alertCategory, + normalizedAlertSeverity, +} from "~/server/db/schema/enums"; + +export type AlertSource = (typeof alertSource.enumValues)[number]; +export type AlertCategory = (typeof alertCategory.enumValues)[number]; +export type NormalizedSeverity = (typeof normalizedAlertSeverity.enumValues)[number]; + +export interface EntitySet { + emails: string[]; + phones: string[]; + ssns: string[]; +} + +export interface NormalizedAlertInput { + source: AlertSource; + sourceAlertId: string; + category: AlertCategory; + severity: NormalizedSeverity; + title: string; + description: string; + entities: EntitySet; + payload?: Record; +} + +const EMAIL_RE = /[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}/g; +const PHONE_RE = /\+?1?\d{10,15}/g; +const SSN_RE = /\d{3}-\d{2}-\d{4}/g; + +export function extractEntities(text: string): EntitySet { + const emails = [...new Set(text.match(EMAIL_RE) ?? [])]; + const phones = [...new Set(text.match(PHONE_RE) ?? [])]; + const ssns = [...new Set(text.match(SSN_RE) ?? [])]; + return { emails, phones, ssns }; +} + +function mapToNormalizedSeverity(severity: string): NormalizedSeverity { + switch (severity) { + case "critical": + return "CRITICAL"; + case "warning": + return "WARNING"; + case "info": + return "INFO"; + default: + return "LOW"; + } +} + +export function normalizeDarkWatchAlert(exposure: { + id: string; + identifier: string; + severity: string; + source: string; + dataType?: string; + metadata?: Record; +}): NormalizedAlertInput { + return { + source: "DARKWATCH", + sourceAlertId: `darkwatch:${exposure.id}`, + category: "BREACH_EXPOSURE", + severity: mapToNormalizedSeverity(exposure.severity), + title: `Data breach exposure for ${exposure.identifier}`, + description: `Exposure detected from ${exposure.source} involving ${exposure.dataType ?? "personal data"}`, + entities: extractEntities(exposure.identifier), + payload: exposure.metadata, + }; +} + +export function normalizeSpamShieldAlert(detection: { + id: string; + callerNumber: string; + verdict: string; + type?: string; + metadata?: Record; +}): NormalizedAlertInput { + const category = detection.type === "sms" ? "SPAM_SMS" : "SPAM_CALL"; + const severity = detection.verdict === "SYNTHETIC" ? "HIGH" : detection.verdict === "UNCERTAIN" ? "WARNING" : "INFO"; + return { + source: "SPAMSHIELD", + sourceAlertId: `spamshield:${detection.id}`, + category, + severity, + title: `Spam ${detection.type ?? "call"} detected from ${detection.callerNumber}`, + description: `${detection.type ?? "Call"} flagged as ${detection.verdict} spam`, + entities: extractEntities(detection.callerNumber), + payload: detection.metadata, + }; +} + +export function normalizeVoicePrintAlert(analysis: { + id: string; + recordingId: string; + verdict: string; + callerNumber?: string; + metadata?: Record; +}): NormalizedAlertInput { + const severity = analysis.verdict === "SYNTHETIC" ? "CRITICAL" : analysis.verdict === "UNCERTAIN" ? "WARNING" : "INFO"; + return { + source: "VOICEPRINT", + sourceAlertId: `voiceprint:${analysis.id}`, + category: "SYNTHETIC_VOICE", + severity, + title: `Voice analysis alert for recording ${analysis.recordingId}`, + description: `Voice analysis verdict: ${analysis.verdict}`, + entities: extractEntities(analysis.callerNumber ?? analysis.recordingId), + payload: analysis.metadata, + }; +} + +export function normalizeHomeTitleAlert(change: { + id: string; + propertyAddress: string; + changeType: string; + severity: string; + metadata?: Record; +}): NormalizedAlertInput { + return { + source: "HOME_TITLE", + sourceAlertId: `hometitle:${change.id}`, + category: "HOME_TITLE", + severity: mapToNormalizedSeverity(change.severity), + title: `Property change detected: ${change.changeType}`, + description: `${change.changeType} detected for ${change.propertyAddress}`, + entities: extractEntities(change.propertyAddress), + payload: change.metadata, + }; +} + +export function normalizeRemoveBrokersAlert(listing: { + id: string; + brokerName: string; + listingUrl?: string; + metadata?: Record; +}): NormalizedAlertInput { + return { + source: "INFO_BROKER", + sourceAlertId: `infobroker:${listing.id}`, + category: "INFO_BROKER_LISTING", + severity: "WARNING", + title: `Broker listing found on ${listing.brokerName}`, + description: `Personal information listed on ${listing.brokerName}${listing.listingUrl ? ` (${listing.listingUrl})` : ""}`, + entities: { emails: [], phones: [], ssns: [] }, + payload: listing.metadata, + }; +}