Add cross-service alert correlation system FRE-4500

- Unified alert types (AlertSource, AlertCategory, CorrelationStatus, EntityType)
- NormalizedAlert and CorrelationGroup Prisma models
- AlertNormalizer for all 4 services (DarkWatch, SpamShield, VoicePrint, CallAnalysis)
- CorrelationEngine with temporal + entity-based correlation detection
- CorrelationService orchestrator with dashboard API
- Correlation API routes (/api/v1/correlation/*)
- Service emitters wired to DarkWatch, SpamShield, VoicePrint
- pnpm workspace config for monorepo
This commit is contained in:
Senior Engineer
2026-05-02 01:10:44 -04:00
committed by Michael Freno
parent 685fb57e53
commit 03276dde2d
35 changed files with 8072 additions and 31 deletions

View File

@@ -0,0 +1,17 @@
{
"name": "@shieldai/correlation",
"version": "0.1.0",
"main": "./dist/index.js",
"types": "./dist/index.js",
"scripts": {
"build": "tsc",
"lint": "eslint src/"
},
"dependencies": {
"@shieldai/db": "workspace:*",
"@shieldai/types": "workspace:*"
},
"exports": {
".": "./src/index.ts"
}
}

View File

@@ -0,0 +1,98 @@
import { correlationService } from "@shieldai/correlation";
export async function emitDarkWatchAlert(
userId: string,
exposureId: string,
alertId: string,
breachName: string,
severity: string,
channel: string,
dataType?: string[],
dataSource?: string
): Promise<void> {
try {
await correlationService.ingestDarkWatchAlert(userId, alertId, {
exposureId,
breachName,
severity,
channel,
dataType,
dataSource,
});
} catch (err) {
console.error(`[Correlation] DarkWatch alert emit failed:`, err);
}
}
export async function emitSpamShieldAlert(
userId: string,
analysisId: string,
phoneNumber: string,
decision: string,
confidence: number,
reasons?: string[],
channel?: "call" | "sms",
hiyaReputationScore?: number,
truecallerSpamScore?: number
): Promise<void> {
try {
await correlationService.ingestSpamShieldAlert(userId, analysisId, {
phoneNumber,
decision,
confidence,
reasons,
channel,
hiyaReputationScore,
truecallerSpamScore,
});
} catch (err) {
console.error(`[Correlation] SpamShield alert emit failed:`, err);
}
}
export async function emitVoicePrintAlert(
userId: string,
jobId: string,
verdict: string,
syntheticScore: number,
confidence: number,
matchedEnrollmentId?: string,
matchedSimilarity?: number,
analysisType?: string
): Promise<void> {
try {
await correlationService.ingestVoicePrintAlert(userId, jobId, {
jobId,
verdict,
syntheticScore,
confidence,
matchedEnrollmentId,
matchedSimilarity,
analysisType,
});
} catch (err) {
console.error(`[Correlation] VoicePrint alert emit failed:`, err);
}
}
export async function emitCallAnalysisAlert(
userId: string,
callId: string,
eventType?: string,
mosScore?: number,
anomaly?: string,
sentiment?: { label: string; score: number }
): Promise<void> {
const sourceAlertId = `call-${callId}-${Date.now()}`;
try {
await correlationService.ingestCallAnalysisAlert(userId, sourceAlertId, {
callId,
eventType,
mosScore,
anomaly,
sentiment,
});
} catch (err) {
console.error(`[Correlation] CallAnalysis alert emit failed:`, err);
}
}

View File

@@ -0,0 +1,422 @@
import { prisma } from "@shieldai/db";
import {
AlertSource,
AlertCategory,
Severity,
EntityType,
CorrelationStatus,
NormalizedAlertInput,
CorrelationGroupOutput,
CorrelatedAlertOutput,
CorrelationQuery,
} from "@shieldai/types";
import { alertNormalizer, AlertNormalizer } from "./normalizer";
const SEVERITY_RANK: Record<string, number> = {
LOW: 0,
INFO: 1,
MEDIUM: 2,
WARNING: 3,
HIGH: 4,
CRITICAL: 5,
};
function higherSeverity(a: string, b: string): string {
return SEVERITY_RANK[a] >= SEVERITY_RANK[b] ? a : b;
}
function entitiesOverlap(
a: Array<{ type: string; value: string }>,
b: Array<{ type: string; value: string }>
): boolean {
for (const ea of a) {
for (const eb of b) {
if (ea.type === eb.type && ea.value.toLowerCase() === eb.value.toLowerCase()) {
return true;
}
}
}
return false;
}
type AlertRow = {
id: string;
source: string;
category: string;
severity: string;
userId: string;
title: string;
description: string;
entities: unknown;
sourceAlertId: string;
groupId: string | null;
payload: unknown;
createdAt: Date;
};
type GroupRow = {
id: string;
userId: string;
entities: unknown;
highestSeverity: string;
status: string;
alertCount: number;
summary: string | null;
resolvedAt: Date | null;
createdAt: Date;
updatedAt: Date;
};
export class CorrelationEngine {
private readonly timeWindowMinutes: number;
constructor(timeWindowMinutes: number = 30) {
this.timeWindowMinutes = timeWindowMinutes;
}
public async ingestAlert(input: NormalizedAlertInput): Promise<CorrelatedAlertOutput> {
const alert = await (prisma as any).normalizedAlert.create({
data: {
source: input.source,
category: input.category,
severity: input.severity,
userId: input.userId,
title: input.title,
description: input.description,
entities: input.entities,
sourceAlertId: input.sourceAlertId,
payload: input.payload,
createdAt: input.timestamp || new Date(),
},
});
const correlation = await this.findOrCreateCorrelation(alert as AlertRow);
if (correlation) {
await (prisma as any).normalizedAlert.update({
where: { id: alert.id },
data: { groupId: correlation.id },
});
const updated = await (prisma as any).normalizedAlert.findUnique({
where: { id: alert.id },
});
return this.toOutput(updated as AlertRow);
}
return this.toOutput(alert as AlertRow);
}
private async findOrCreateCorrelation(
alert: AlertRow
): Promise<GroupRow | null> {
const cutoff = new Date(Date.now() - this.timeWindowMinutes * 60 * 1000);
const existingGroups = await (prisma as any).correlationGroup.findMany({
where: {
userId: alert.userId,
status: CorrelationStatus.ACTIVE,
createdAt: { gte: cutoff },
},
include: {
alerts: {
where: { createdAt: { gte: cutoff } },
},
},
});
const alertEntities = alert.entities as Array<{ type: string; value: string }>;
for (const group of existingGroups) {
const groupEntities = group.entities as Array<{ type: string; value: string }>;
if (entitiesOverlap(groupEntities, alertEntities)) {
const newSeverity = higherSeverity(
group.highestSeverity,
alert.severity
);
const updatedGroup = await (prisma as any).correlationGroup.update({
where: { id: group.id },
data: {
highestSeverity: newSeverity,
alertCount: group.alertCount + 1,
entities: this.mergeEntities(groupEntities, alertEntities),
},
});
return updatedGroup;
}
}
const uniqueSources = new Set<string>();
uniqueSources.add(alert.source);
const uniqueCategories = new Set<string>();
uniqueCategories.add(alert.category);
if (uniqueSources.size > 1 || uniqueCategories.size > 1) {
const newGroup = await (prisma as any).correlationGroup.create({
data: {
userId: alert.userId,
entities: alert.entities,
highestSeverity: alert.severity,
status: CorrelationStatus.ACTIVE,
alertCount: 1,
summary: this.generateSummary(
alert.source,
alert.category,
alert.title
),
},
});
return newGroup;
}
return null;
}
private mergeEntities(
a: Array<{ type: string; value: string }>,
b: Array<{ type: string; value: string }>
): Array<{ type: string; value: string }> {
const seen = new Map<string, string>();
for (const e of [...a, ...b]) {
const key = `${e.type}:${e.value.toLowerCase()}`;
if (!seen.has(key)) {
seen.set(key, e.value);
}
}
return Array.from(seen.entries()).map(([key, value]) => {
const [type] = key.split(":");
return { type, value };
});
}
private generateSummary(
source: string,
category: string,
title: string
): string {
return `${source} - ${category}: ${title}`;
}
public async getCorrelatedAlerts(
query: CorrelationQuery
): Promise<{ alerts: CorrelatedAlertOutput[]; total: number }> {
const where: Record<string, unknown> = {};
if (query.userId) where.userId = query.userId;
if (query.groupId) where.groupId = query.groupId;
if (query.source) where.source = query.source;
if (query.category) where.category = query.category;
if (query.severity) where.severity = query.severity;
if (query.timeWindowMinutes) {
where.createdAt = {
gte: new Date(Date.now() - query.timeWindowMinutes * 60 * 1000),
};
}
if (query.entityType && query.entityId) {
where.entities = {
path: [],
contains: JSON.stringify({ type: query.entityType, value: query.entityId }),
};
}
const [alerts, total] = await Promise.all([
(prisma as any).normalizedAlert.findMany({
where,
orderBy: { createdAt: "desc" },
take: query.limit || 50,
skip: query.offset || 0,
}),
(prisma as any).normalizedAlert.count({ where }),
]);
return {
alerts: alerts.map((a: AlertRow) => this.toOutput(a)),
total,
};
}
public async getCorrelationGroups(
query: CorrelationQuery
): Promise<{ groups: CorrelationGroupOutput[]; total: number }> {
const where: Record<string, unknown> = {};
if (query.userId) where.userId = query.userId;
if (query.status) where.status = query.status;
if (query.timeWindowMinutes) {
where.createdAt = {
gte: new Date(Date.now() - query.timeWindowMinutes * 60 * 1000),
};
}
const [groups, total] = await Promise.all([
(prisma as any).correlationGroup.findMany({
where,
orderBy: { createdAt: "desc" },
take: query.limit || 50,
skip: query.offset || 0,
include: {
alerts: {
orderBy: { createdAt: "desc" },
take: 100,
},
},
}),
(prisma as any).correlationGroup.count({ where }),
]);
return {
groups: groups.map((g: GroupRow & { alerts: AlertRow[] }) =>
this.toGroupOutput(g)
),
total,
};
}
public async getGroupById(
groupId: string
): Promise<CorrelationGroupOutput | null> {
const group = await (prisma as any).correlationGroup.findUnique({
where: { id: groupId },
include: {
alerts: {
orderBy: { createdAt: "asc" },
},
},
});
return group ? this.toGroupOutput(group as GroupRow & { alerts: AlertRow[] }) : null;
}
public async resolveGroup(
groupId: string,
status: string = CorrelationStatus.RESOLVED
): Promise<CorrelationGroupOutput | null> {
const group = await (prisma as any).correlationGroup.update({
where: { id: groupId },
data: {
status,
resolvedAt: new Date(),
},
include: {
alerts: {
orderBy: { createdAt: "asc" },
},
},
});
return this.toGroupOutput(group as GroupRow & { alerts: AlertRow[] });
}
public async getDashboardData(
userId: string,
timeWindowMinutes: number = 60
): Promise<{
totalAlerts: number;
activeCorrelations: number;
alertsBySource: Record<string, number>;
alertsBySeverity: Record<string, number>;
recentGroups: CorrelationGroupOutput[];
}> {
const cutoff = new Date(Date.now() - timeWindowMinutes * 60 * 1000);
const [totalAlerts, activeCorrelations, recentGroups] = await Promise.all([
(prisma as any).normalizedAlert.count({
where: { userId, createdAt: { gte: cutoff } },
}),
(prisma as any).correlationGroup.count({
where: {
userId,
status: CorrelationStatus.ACTIVE,
createdAt: { gte: cutoff },
},
}),
(prisma as any).correlationGroup.findMany({
where: {
userId,
status: CorrelationStatus.ACTIVE,
createdAt: { gte: cutoff },
},
orderBy: { createdAt: "desc" },
take: 10,
include: { alerts: { orderBy: { createdAt: "desc" }, take: 100 } },
}),
]);
const alertsBySource: Record<string, number> = {};
const alertsBySeverity: Record<string, number> = {};
const recentAlerts = await (prisma as any).normalizedAlert.findMany({
where: { userId, createdAt: { gte: cutoff } },
select: { source: true, severity: true },
});
for (const alert of recentAlerts) {
alertsBySource[alert.source] = (alertsBySource[alert.source] || 0) + 1;
alertsBySeverity[alert.severity] = (alertsBySeverity[alert.severity] || 0) + 1;
}
return {
totalAlerts,
activeCorrelations,
alertsBySource,
alertsBySeverity,
recentGroups: recentGroups.map(
(g: GroupRow & { alerts: AlertRow[] }) => this.toGroupOutput(g)
),
};
}
private toOutput(alert: AlertRow): CorrelatedAlertOutput {
return {
id: alert.id,
source: alert.source as AlertSource,
category: alert.category as AlertCategory,
severity: alert.severity as Severity,
userId: alert.userId,
title: alert.title,
description: alert.description,
entities: alert.entities as Array<{ type: EntityType; value: string }>,
sourceAlertId: alert.sourceAlertId,
groupId: alert.groupId || "",
payload: alert.payload as Record<string, unknown>,
createdAt: alert.createdAt,
};
}
private toGroupOutput(
group: GroupRow & { alerts: AlertRow[] }
): CorrelationGroupOutput {
const sources = new Set<string>();
const categories = new Set<string>();
const entities = group.entities as Array<{ type: EntityType; value: string }>;
for (const alert of group.alerts) {
sources.add(alert.source);
categories.add(alert.category);
}
return {
id: group.id,
groupId: group.id,
alertCount: group.alertCount,
highestSeverity: group.highestSeverity as Severity,
status: group.status as CorrelationStatus,
entities,
sources: Array.from(sources) as AlertSource[],
categories: Array.from(categories) as AlertCategory[],
createdAt: group.createdAt,
updatedAt: group.updatedAt,
};
}
}
export const correlationEngine = new CorrelationEngine();

View File

@@ -0,0 +1,9 @@
export { alertNormalizer, AlertNormalizer } from "./normalizer";
export { correlationEngine, CorrelationEngine } from "./engine";
export { correlationService, CorrelationService } from "./service";
export {
emitDarkWatchAlert,
emitSpamShieldAlert,
emitVoicePrintAlert,
emitCallAnalysisAlert,
} from "./emitter";

View File

@@ -0,0 +1,246 @@
import {
AlertSource,
AlertCategory,
Severity,
EntityTypes,
NormalizedAlertInput,
} from "@shieldai/types";
type EntityType = (typeof EntityTypes)[keyof typeof EntityTypes];
interface DarkWatchAlertPayload {
exposureId: string;
breachName: string;
severity: string;
channel: string;
dataType?: string[];
dataSource?: string;
}
interface SpamShieldAlertPayload {
phoneNumber: string;
decision: string;
confidence: number;
reasons?: string[];
channel?: "call" | "sms";
hiyaReputationScore?: number;
truecallerSpamScore?: number;
}
interface VoicePrintAlertPayload {
jobId: string;
verdict: string;
syntheticScore: number;
confidence: number;
matchedEnrollmentId?: string;
matchedSimilarity?: number;
analysisType?: string;
}
interface CallAnalysisAlertPayload {
callId: string;
eventType?: string;
mosScore?: number;
anomaly?: string;
sentiment?: { label: string; score: number };
}
const SEVERITY_MAP: Record<string, Severity> = {
LOW: "LOW",
INFO: "INFO",
MEDIUM: "MEDIUM",
WARNING: "WARNING",
HIGH: "HIGH",
CRITICAL: "CRITICAL",
};
function mapSeverity(raw: string | number): Severity {
if (typeof raw === "number") {
if (raw >= 0.9) return "CRITICAL";
if (raw >= 0.7) return "HIGH";
if (raw >= 0.5) return "WARNING";
if (raw >= 0.3) return "MEDIUM";
if (raw >= 0.1) return "INFO";
return "LOW";
}
const upper = raw.toUpperCase();
return SEVERITY_MAP[upper] ?? "INFO";
}
export class AlertNormalizer {
public normalizeDarkWatchAlert(
userId: string,
sourceAlertId: string,
payload: DarkWatchAlertPayload,
timestamp?: Date
): NormalizedAlertInput {
const severity = mapSeverity(payload.severity);
const entities: Array<{ type: EntityType; value: string }> = [];
if (payload.dataSource) {
entities.push({ type: EntityTypes.EMAIL, value: payload.breachName });
}
return {
source: AlertSource.DARKWATCH,
category: AlertCategory.BREACH_EXPOSURE,
severity,
userId,
title: `Breach Exposure: ${payload.breachName}`,
description: payload.dataType
? `Data types exposed: ${payload.dataType.join(", ")} in ${payload.breachName}`
: `Exposure detected in ${payload.breachName}`,
entities,
sourceAlertId,
payload: payload as unknown as Record<string, unknown>,
timestamp,
};
}
public normalizeSpamShieldAlert(
userId: string,
sourceAlertId: string,
payload: SpamShieldAlertPayload,
timestamp?: Date
): NormalizedAlertInput {
const decision = payload.decision.toUpperCase();
const severity =
decision === "BLOCK"
? "HIGH"
: decision === "FLAG"
? "WARNING"
: "INFO";
const channel = payload.channel === "sms" ? "sms" : "call";
const category =
channel === "sms"
? AlertCategory.SPAM_SMS
: AlertCategory.SPAM_CALL;
const entities: Array<{ type: EntityType; value: string }> = [
{ type: EntityTypes.PHONE_NUMBER, value: payload.phoneNumber },
];
return {
source: AlertSource.SPAMSHIELD,
category,
severity,
userId,
title: `${channel === "sms" ? "SMS" : "Call"} ${decision}: ${payload.phoneNumber}`,
description: payload.reasons
? `SpamShield ${decision} decision. Reasons: ${payload.reasons.join(", ")}`
: `SpamShield ${decision} decision with confidence ${Math.round(payload.confidence * 100)}%`,
entities,
sourceAlertId,
payload: payload as unknown as Record<string, unknown>,
timestamp,
};
}
public normalizeVoicePrintAlert(
userId: string,
sourceAlertId: string,
payload: VoicePrintAlertPayload,
timestamp?: Date
): NormalizedAlertInput {
const verdict = payload.verdict.toUpperCase();
let severity: Severity;
let category: AlertCategory;
if (payload.analysisType === "VOICE_MATCH" && payload.matchedEnrollmentId) {
category = AlertCategory.VOICE_MISMATCH;
severity =
payload.matchedSimilarity !== undefined && payload.matchedSimilarity > 0.85
? "MEDIUM"
: "LOW";
} else {
category = AlertCategory.SYNTHETIC_VOICE;
severity =
verdict === "SYNTHETIC"
? mapSeverity(payload.syntheticScore)
: verdict === "UNCERTAIN"
? "MEDIUM"
: "INFO";
}
const entities: Array<{ type: EntityType; value: string }> = [];
if (payload.matchedEnrollmentId) {
entities.push({ type: EntityTypes.USER_ID, value: payload.matchedEnrollmentId });
}
return {
source: AlertSource.VOICEPRINT,
category,
severity,
userId,
title: `Voice ${verdict}: Job ${payload.jobId}`,
description: payload.analysisType
? `Analysis type: ${payload.analysisType}. Verdict: ${verdict} (confidence: ${Math.round(payload.confidence * 100)}%)`
: `Synthetic voice detection: ${verdict} (score: ${payload.syntheticScore.toFixed(3)})`,
entities,
sourceAlertId,
payload: payload as unknown as Record<string, unknown>,
timestamp,
};
}
public normalizeCallAnalysisAlert(
userId: string,
sourceAlertId: string,
payload: CallAnalysisAlertPayload,
timestamp?: Date
): NormalizedAlertInput {
let category: AlertCategory;
let severity: Severity;
let title: string;
let description: string;
if (payload.anomaly) {
category = AlertCategory.CALL_ANOMALY;
severity = "WARNING";
title = `Call Anomaly: ${payload.anomaly}`;
description = `Anomaly "${payload.anomaly}" detected in call ${payload.callId}`;
} else if (payload.mosScore !== undefined) {
category = AlertCategory.CALL_QUALITY;
severity =
payload.mosScore < 2.5
? "CRITICAL"
: payload.mosScore < 3.5
? "HIGH"
: payload.mosScore < 4.0
? "MEDIUM"
: "INFO";
title = `Call Quality: MOS ${payload.mosScore.toFixed(1)}`;
description = `MOS score ${payload.mosScore.toFixed(1)} for call ${payload.callId}`;
} else if (payload.eventType) {
category = AlertCategory.CALL_EVENT;
severity = "INFO";
title = `Call Event: ${payload.eventType}`;
description = `Event "${payload.eventType}" during call ${payload.callId}`;
} else {
category = AlertCategory.CALL_EVENT;
severity = "INFO";
title = `Call Alert: ${payload.callId}`;
description = `Alert for call ${payload.callId}`;
}
const entities: Array<{ type: EntityType; value: string }> = [
{ type: EntityTypes.CALL_ID, value: payload.callId },
];
return {
source: AlertSource.CALL_ANALYSIS,
category,
severity,
userId,
title,
description,
entities,
sourceAlertId,
payload: payload as unknown as Record<string, unknown>,
timestamp,
};
}
}
export const alertNormalizer = new AlertNormalizer();

View File

@@ -0,0 +1,143 @@
import {
AlertSource,
AlertCategory,
Severity,
EntityType,
NormalizedAlertInput,
CorrelationGroupOutput,
CorrelatedAlertOutput,
CorrelationQuery,
} from "@shieldai/types";
import { alertNormalizer, AlertNormalizer } from "./normalizer";
import { correlationEngine, CorrelationEngine } from "./engine";
export class CorrelationService {
private normalizer: AlertNormalizer;
private engine: CorrelationEngine;
constructor(
normalizer: AlertNormalizer = alertNormalizer,
engine: CorrelationEngine = correlationEngine
) {
this.normalizer = normalizer;
this.engine = engine;
}
public async ingestDarkWatchAlert(
userId: string,
sourceAlertId: string,
payload: {
exposureId: string;
breachName: string;
severity: string;
channel: string;
dataType?: string[];
dataSource?: string;
},
timestamp?: Date
): Promise<CorrelatedAlertOutput> {
const normalized = this.normalizer.normalizeDarkWatchAlert(
userId,
sourceAlertId,
payload,
timestamp
);
return this.engine.ingestAlert(normalized);
}
public async ingestSpamShieldAlert(
userId: string,
sourceAlertId: string,
payload: {
phoneNumber: string;
decision: string;
confidence: number;
reasons?: string[];
channel?: "call" | "sms";
hiyaReputationScore?: number;
truecallerSpamScore?: number;
},
timestamp?: Date
): Promise<CorrelatedAlertOutput> {
const normalized = this.normalizer.normalizeSpamShieldAlert(
userId,
sourceAlertId,
payload,
timestamp
);
return this.engine.ingestAlert(normalized);
}
public async ingestVoicePrintAlert(
userId: string,
sourceAlertId: string,
payload: {
jobId: string;
verdict: string;
syntheticScore: number;
confidence: number;
matchedEnrollmentId?: string;
matchedSimilarity?: number;
analysisType?: string;
},
timestamp?: Date
): Promise<CorrelatedAlertOutput> {
const normalized = this.normalizer.normalizeVoicePrintAlert(
userId,
sourceAlertId,
payload,
timestamp
);
return this.engine.ingestAlert(normalized);
}
public async ingestCallAnalysisAlert(
userId: string,
sourceAlertId: string,
payload: {
callId: string;
eventType?: string;
mosScore?: number;
anomaly?: string;
sentiment?: { label: string; score: number };
},
timestamp?: Date
): Promise<CorrelatedAlertOutput> {
const normalized = this.normalizer.normalizeCallAnalysisAlert(
userId,
sourceAlertId,
payload,
timestamp
);
return this.engine.ingestAlert(normalized);
}
public async ingestGenericAlert(
input: NormalizedAlertInput
): Promise<CorrelatedAlertOutput> {
return this.engine.ingestAlert(input);
}
public getCorrelatedAlerts(query: CorrelationQuery) {
return this.engine.getCorrelatedAlerts(query);
}
public getCorrelationGroups(query: CorrelationQuery) {
return this.engine.getCorrelationGroups(query);
}
public getGroupById(groupId: string) {
return this.engine.getGroupById(groupId);
}
public resolveGroup(groupId: string, status?: string) {
return this.engine.resolveGroup(groupId, status as any);
}
public getDashboardData(userId: string, timeWindowMinutes?: number) {
return this.engine.getDashboardData(userId, timeWindowMinutes);
}
}
export const correlationService = new CorrelationService();
export { alertNormalizer, correlationEngine };

View File

@@ -0,0 +1,8 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src"
},
"include": ["src"]
}