feat: add alert correlation & normalization engine with tRPC router

Implement the cross-service alert correlation and normalization engine:

- correlation router with 6 procedures: getAlerts, getAlertDetails,
  getGroups, getGroupDetails, resolveAlert, getStats
- correlation service with normalizeAlert, correlateAlerts,
  getAlertTimeline, resolveAlert, getThreatScore, getAlertStats
- correlation engine with findRelatedAlerts, createCorrelationGroup,
  updateGroupSeverity, deduplicateAlerts
- alert normalizer with service-specific converters for DarkWatch,
  SpamShield, VoicePrint, HomeTitle, and RemoveBrokers
- Entity extraction (emails, phones, SSNs) and threat scoring
  with severity-weighted decay over 30-day window
- 52 unit tests across engine, service, normalizer, and router
This commit is contained in:
2026-05-25 16:55:31 -04:00
parent d84595bf72
commit 4f7882a10d
10 changed files with 1662 additions and 0 deletions

View File

@@ -7,6 +7,7 @@ import { voiceprintRouter } from "./routers/voiceprint";
import { spamshieldRouter } from "./routers/spamshield";
import { hometitleRouter } from "./routers/hometitle";
import { removebrokersRouter } from "./routers/removebrokers";
import { correlationRouter } from "./routers/correlation";
import { createTRPCRouter } from "./utils";
export const appRouter = createTRPCRouter({
@@ -19,6 +20,7 @@ export const appRouter = createTRPCRouter({
spamshield: spamshieldRouter,
hometitle: hometitleRouter,
removebrokers: removebrokersRouter,
correlation: correlationRouter,
});
export type AppRouter = typeof appRouter;

View File

@@ -0,0 +1,225 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { initTRPC, TRPCError } from "@trpc/server";
import { wrap } from "@typeschema/valibot";
import {
AlertFilterSchema,
AlertDetailsSchema,
GroupFilterSchema,
GroupDetailsSchema,
ResolveAlertSchema,
} from "../schemas/correlation";
vi.mock("~/server/services/correlation.service", () => ({
getAlertTimeline: vi.fn(),
getAlertDetails: vi.fn(),
getCorrelationGroups: vi.fn(),
getCorrelationGroupDetails: vi.fn(),
resolveAlert: vi.fn(),
getAlertStats: vi.fn(),
}));
import * as correlationService from "~/server/services/correlation.service";
const mockGetAlertTimeline = vi.mocked(correlationService.getAlertTimeline);
const mockGetAlertDetails = vi.mocked(correlationService.getAlertDetails);
const mockGetCorrelationGroups = vi.mocked(correlationService.getCorrelationGroups);
const mockGetCorrelationGroupDetails = vi.mocked(correlationService.getCorrelationGroupDetails);
const mockResolveAlert = vi.mocked(correlationService.resolveAlert);
const mockGetAlertStats = vi.mocked(correlationService.getAlertStats);
type User = {
id: string; email: string; name: string | null; image: string | null;
role: string; emailVerified: Date | null; deletedAt: Date | null;
stripeCustomerId: string | null;
createdAt: Date; updatedAt: Date;
};
type Ctx = { db: object; user: User | null; apiKey: string | null };
function createCaller(user: User | null) {
const t = initTRPC.context<Ctx>().create();
const isAuthed = t.middleware(({ ctx, next }) => {
if (!ctx.user) throw new TRPCError({ code: "UNAUTHORIZED" });
return next({ ctx: { ...ctx, user: ctx.user } });
});
const router = t.router({
getAlerts: t.procedure.use(isAuthed)
.input(wrap(AlertFilterSchema))
.query(async ({ ctx, input }) => {
return mockGetAlertTimeline(ctx.user.id, input);
}),
getAlertDetails: t.procedure.use(isAuthed)
.input(wrap(AlertDetailsSchema))
.query(async ({ ctx, input }) => {
return mockGetAlertDetails(ctx.user.id, input.alertId);
}),
getGroups: t.procedure.use(isAuthed)
.input(wrap(GroupFilterSchema))
.query(async ({ ctx, input }) => {
return mockGetCorrelationGroups(ctx.user.id, input);
}),
getGroupDetails: t.procedure.use(isAuthed)
.input(wrap(GroupDetailsSchema))
.query(async ({ ctx, input }) => {
return mockGetCorrelationGroupDetails(ctx.user.id, input.groupId);
}),
resolveAlert: t.procedure.use(isAuthed)
.input(wrap(ResolveAlertSchema))
.mutation(async ({ ctx, input }) => {
return mockResolveAlert(ctx.user.id, input.alertId, input.resolution);
}),
getStats: t.procedure.use(isAuthed).query(async ({ ctx }) => {
return mockGetAlertStats(ctx.user.id);
}),
});
const caller = t.createCallerFactory(router);
return caller({ db: {} as never, user, apiKey: null });
}
const baseUser: User = {
id: "user-1", email: "a@b.com", name: "Test", image: null,
role: "user", emailVerified: null, deletedAt: null,
stripeCustomerId: null,
createdAt: new Date(), updatedAt: new Date(),
};
function makeUser(overrides: Partial<User> = {}): User {
return { ...baseUser, ...overrides };
}
beforeEach(() => {
vi.clearAllMocks();
});
describe("correlation.getAlerts", () => {
it("returns alert timeline for authenticated user", async () => {
const data = {
items: [{ id: "a1", source: "DARKWATCH", severity: "HIGH" }],
total: 1, page: 1, limit: 20, totalPages: 1,
};
mockGetAlertTimeline.mockResolvedValue(data as never);
const api = createCaller(makeUser());
const result = await api.getAlerts({});
expect(result.items).toHaveLength(1);
expect(result.total).toBe(1);
});
it("rejects unauthenticated requests", async () => {
const api = createCaller(null);
await expect(api.getAlerts({})).rejects.toThrow(TRPCError);
});
it("passes filters to service", async () => {
mockGetAlertTimeline.mockResolvedValue({
items: [], total: 0, page: 1, limit: 20, totalPages: 0,
} as never);
const api = createCaller(makeUser());
await api.getAlerts({ source: "DARKWATCH", severity: "HIGH", page: 1, limit: 10 });
expect(mockGetAlertTimeline).toHaveBeenCalledWith(
"user-1",
{ source: "DARKWATCH", severity: "HIGH", page: 1, limit: 10 },
);
});
it("rejects invalid source value", async () => {
const api = createCaller(makeUser());
await expect(
api.getAlerts({ source: "INVALID" as never }),
).rejects.toThrow();
});
});
describe("correlation.getAlertDetails", () => {
it("returns alert details", async () => {
const data = { alert: { id: "a1" }, group: null, relatedAlerts: [] };
mockGetAlertDetails.mockResolvedValue(data as never);
const api = createCaller(makeUser());
const result = await api.getAlertDetails({ alertId: "a1" });
expect(result.alert.id).toBe("a1");
});
});
describe("correlation.getGroups", () => {
it("returns correlation groups", async () => {
const data = {
items: [{ id: "g1", highestSeverity: "CRITICAL" }],
total: 1, page: 1, limit: 20, totalPages: 1,
};
mockGetCorrelationGroups.mockResolvedValue(data as never);
const api = createCaller(makeUser());
const result = await api.getGroups({});
expect(result.items).toHaveLength(1);
});
it("filters by status", async () => {
mockGetCorrelationGroups.mockResolvedValue({
items: [], total: 0, page: 1, limit: 20, totalPages: 0,
} as never);
const api = createCaller(makeUser());
await api.getGroups({ status: "ACTIVE" });
expect(mockGetCorrelationGroups).toHaveBeenCalledWith(
"user-1",
{ status: "ACTIVE", page: 1, limit: 20 },
);
});
});
describe("correlation.getGroupDetails", () => {
it("returns group details with alerts", async () => {
const data = {
group: { id: "g1", highestSeverity: "HIGH" },
alerts: [{ id: "a1" }, { id: "a2" }],
};
mockGetCorrelationGroupDetails.mockResolvedValue(data as never);
const api = createCaller(makeUser());
const result = await api.getGroupDetails({ groupId: "g1" });
expect(result.group.id).toBe("g1");
expect(result.alerts).toHaveLength(2);
});
});
describe("correlation.resolveAlert", () => {
it("resolves an alert", async () => {
const data = { id: "g1", status: "RESOLVED" };
mockResolveAlert.mockResolvedValue(data as never);
const api = createCaller(makeUser());
const result = await api.resolveAlert({ alertId: "a1", resolution: "RESOLVED" });
expect(result.status).toBe("RESOLVED");
});
it("marks as false positive", async () => {
const data = { id: "g1", status: "FALSE_POSITIVE" };
mockResolveAlert.mockResolvedValue(data as never);
const api = createCaller(makeUser());
const result = await api.resolveAlert({ alertId: "a1", resolution: "FALSE_POSITIVE" });
expect(result.status).toBe("FALSE_POSITIVE");
});
it("rejects invalid resolution", async () => {
const api = createCaller(makeUser());
await expect(
api.resolveAlert({ alertId: "a1", resolution: "INVALID" as never }),
).rejects.toThrow();
});
});
describe("correlation.getStats", () => {
it("returns alert statistics", async () => {
const stats = {
totalAlerts: 10,
bySeverity: { HIGH: 5, LOW: 5 },
bySource: { DARKWATCH: 10 },
activeGroups: 2,
resolvedCount: 1,
falsePositiveCount: 0,
threatScore: 45,
threatBreakdown: [{ source: "DARKWATCH", score: 45 }],
};
mockGetAlertStats.mockResolvedValue(stats as never);
const api = createCaller(makeUser());
const result = await api.getStats();
expect(result.totalAlerts).toBe(10);
expect(result.threatScore).toBe(45);
});
});

View File

@@ -0,0 +1,46 @@
import { wrap } from "@typeschema/valibot";
import { createTRPCRouter, protectedProcedure } from "../utils";
import {
AlertFilterSchema,
AlertDetailsSchema,
GroupFilterSchema,
GroupDetailsSchema,
ResolveAlertSchema,
} from "../schemas/correlation";
import * as correlationService from "~/server/services/correlation.service";
export const correlationRouter = createTRPCRouter({
getAlerts: protectedProcedure
.input(wrap(AlertFilterSchema))
.query(async ({ ctx, input }) => {
return correlationService.getAlertTimeline(ctx.user.id, input);
}),
getAlertDetails: protectedProcedure
.input(wrap(AlertDetailsSchema))
.query(async ({ ctx, input }) => {
return correlationService.getAlertDetails(ctx.user.id, input.alertId);
}),
getGroups: protectedProcedure
.input(wrap(GroupFilterSchema))
.query(async ({ ctx, input }) => {
return correlationService.getCorrelationGroups(ctx.user.id, input);
}),
getGroupDetails: protectedProcedure
.input(wrap(GroupDetailsSchema))
.query(async ({ ctx, input }) => {
return correlationService.getCorrelationGroupDetails(ctx.user.id, input.groupId);
}),
resolveAlert: protectedProcedure
.input(wrap(ResolveAlertSchema))
.mutation(async ({ ctx, input }) => {
return correlationService.resolveAlert(ctx.user.id, input.alertId, input.resolution);
}),
getStats: protectedProcedure.query(async ({ ctx }) => {
return correlationService.getAlertStats(ctx.user.id);
}),
});

View File

@@ -0,0 +1,36 @@
import { object, string, minLength, optional, number, picklist } from "valibot";
export const AlertFilterSchema = object({
source: optional(
picklist(["DARKWATCH", "SPAMSHIELD", "VOICEPRINT", "CALL_ANALYSIS", "HOME_TITLE", "INFO_BROKER"]),
),
severity: optional(
picklist(["LOW", "INFO", "MEDIUM", "WARNING", "HIGH", "CRITICAL"]),
),
status: optional(
picklist(["ACTIVE", "RESOLVED", "FALSE_POSITIVE"]),
),
page: optional(number(), 1),
limit: optional(number(), 20),
});
export const AlertDetailsSchema = object({
alertId: string([minLength(1)]),
});
export const GroupFilterSchema = object({
status: optional(
picklist(["ACTIVE", "RESOLVED", "FALSE_POSITIVE"]),
),
page: optional(number(), 1),
limit: optional(number(), 20),
});
export const GroupDetailsSchema = object({
groupId: string([minLength(1)]),
});
export const ResolveAlertSchema = object({
alertId: string([minLength(1)]),
resolution: picklist(["RESOLVED", "FALSE_POSITIVE"]),
});