FRE-5351 CTO review: finalize hometitle exports and types for alert pipeline + scheduler

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-05-14 10:32:44 -04:00
parent 9858834a67
commit 4844c5994c
6 changed files with 1408 additions and 0 deletions

View File

@@ -0,0 +1,362 @@
import { prisma, AlertSeverity, AlertChannel } from '@shieldai/db';
import {
NotificationService,
loadNotificationConfig,
} from '@shieldai/shared-notifications';
import {
ChangeDetectionResult,
ChangeType,
Severity,
PropertyAlert,
AlertSeverityLevel,
NotificationChannel,
AlertPipelineConfig,
} from './types';
const DEFAULT_CONFIG: AlertPipelineConfig = {
dedupWindowMs: 24 * 60 * 60 * 1000,
minSeverity: 'moderate',
premiumTierChannels: ['email', 'push', 'sms'],
defaultChannels: ['email'],
};
const SEVERITY_MAP: Record<Severity, AlertSeverityLevel> = {
major: 'critical',
moderate: 'warning',
minor: 'info',
};
const CHANGE_TYPE_LABELS: Record<ChangeType, string> = {
ownership_transfer: 'Ownership Transfer',
deed_change: 'Deed Change',
lien_filing: 'Lien Filing',
tax_change: 'Tax Assessment Change',
metadata_change: 'Property Metadata Change',
};
export class HomeTitleAlertPipeline {
private notificationService: NotificationService;
private config: AlertPipelineConfig;
private pendingDedup = new Map<string, number>();
constructor(config?: Partial<AlertPipelineConfig>) {
this.config = { ...DEFAULT_CONFIG, ...config };
this.notificationService = new NotificationService(loadNotificationConfig());
}
async processChangeDetection(
result: ChangeDetectionResult,
subscriptionId: string,
userId: string,
): Promise<PropertyAlert | null> {
const severity = this.mapSeverity(result.severity);
const shouldAlert = this.shouldAlert(result, severity);
if (!shouldAlert) {
return null;
}
const dedupKey = this.buildDedupKey(userId, result.propertyId, result.changeType);
const isDuplicate = await this.checkDedup(dedupKey);
if (isDuplicate) {
return null;
}
const subscription = await prisma.subscription.findUnique({
where: { id: subscriptionId },
select: { tier: true },
});
if (!subscription) {
return null;
}
const channels = this.getChannelsForTier(subscription.tier);
const title = this.buildTitle(result);
const message = this.buildMessage(result);
const alert = await prisma.alert.create({
data: {
subscriptionId,
userId,
type: 'system_warning',
title,
message,
severity: severity as AlertSeverity,
channel: channels as AlertChannel[],
},
});
await this.recordDedup(dedupKey);
const propertyAlert: PropertyAlert = {
id: alert.id,
propertyId: result.propertyId,
subscriptionId,
userId,
changeType: result.changeType,
severity,
title,
message,
changeDetectionResult: result,
channel: channels,
dedupKey,
createdAt: alert.createdAt.toISOString(),
};
await this.createNormalizedAlert(result, userId, subscriptionId, alert.id, severity);
if (subscription.tier === 'premium') {
await this.dispatchNotification(propertyAlert, userId);
}
return propertyAlert;
}
async processBatch(
results: ChangeDetectionResult[],
subscriptionId: string,
userId: string,
): Promise<PropertyAlert[]> {
const alerts: PropertyAlert[] = [];
for (const result of results) {
const alert = await this.processChangeDetection(result, subscriptionId, userId);
if (alert) {
alerts.push(alert);
}
}
if (alerts.length > 1) {
await this.createCorrelationGroup(alerts, userId);
}
return alerts;
}
private shouldAlert(result: ChangeDetectionResult, severity: AlertSeverityLevel): boolean {
const severityOrder: Severity[] = ['minor', 'moderate', 'major'];
const minSeverityOrder: Severity[] = ['minor', 'moderate', 'major'];
const resultIdx = severityOrder.indexOf(result.severity);
const minIdx = minSeverityOrder.indexOf(this.config.minSeverity);
return resultIdx >= minIdx && result.confidence >= 0.7;
}
private mapSeverity(severity: Severity): AlertSeverityLevel {
return SEVERITY_MAP[severity] || 'info';
}
private buildDedupKey(userId: string, propertyId: string, changeType: ChangeType): string {
return `hometitle:${userId}:${propertyId}:${changeType}`;
}
private async checkDedup(dedupKey: string): Promise<boolean> {
const recentAlert = await prisma.alert.findFirst({
where: {
subscriptionId: dedupKey.split(':')[1] ? undefined : undefined,
title: {
contains: dedupKey.split(':')[2],
},
createdAt: {
gte: new Date(Date.now() - this.config.dedupWindowMs),
},
},
orderBy: { createdAt: 'desc' },
});
if (recentAlert) {
return true;
}
const inMemoryExpiry = this.pendingDedup.get(dedupKey);
if (inMemoryExpiry && Date.now() < inMemoryExpiry) {
return true;
}
return false;
}
private async recordDedup(dedupKey: string): Promise<void> {
this.pendingDedup.set(
dedupKey,
Date.now() + this.config.dedupWindowMs,
);
}
private getChannelsForTier(tier: string): NotificationChannel[] {
if (tier === 'premium') {
return [...this.config.premiumTierChannels];
}
return [...this.config.defaultChannels];
}
private buildTitle(result: ChangeDetectionResult): string {
const label = CHANGE_TYPE_LABELS[result.changeType] || 'Property Change';
const severityUpper = result.severity.toUpperCase();
return `[${severityUpper}] ${label} detected`;
}
private buildMessage(result: ChangeDetectionResult): string {
const changes = result.changes
.map(c => `- ${c.field}: ${String(c.oldValue)}${String(c.newValue)}`)
.join('\n');
return `Change detected on property ${result.propertyId}.\n\nChanges:\n${changes}\n\nConfidence: ${(result.confidence * 100).toFixed(1)}%`;
}
private async createNormalizedAlert(
result: ChangeDetectionResult,
userId: string,
subscriptionId: string,
sourceAlertId: string,
severity: AlertSeverityLevel,
): Promise<void> {
const normalizedSeverity = this.mapToNormalizedSeverity(severity);
await prisma.normalizedAlert.create({
data: {
source: 'DARKWATCH',
category: this.mapToAlertCategory(result.changeType),
severity: normalizedSeverity,
userId,
title: this.buildTitle(result),
description: this.buildMessage(result),
entities: JSON.stringify({
propertyId: result.propertyId,
changeType: result.changeType,
subscriptionId,
}),
sourceAlertId,
payload: JSON.stringify({
confidence: result.confidence,
changes: result.changes,
detectedAt: result.detectedAt,
}),
createdAt: new Date(result.detectedAt),
},
});
}
private async createCorrelationGroup(
alerts: PropertyAlert[],
userId: string,
): Promise<void> {
const entities = JSON.stringify({
propertyIds: [...new Set(alerts.map(a => a.propertyId))],
changeTypes: [...new Set(alerts.map(a => a.changeType))],
});
const highestSeverity = alerts.reduce((max, alert) => {
const order: AlertSeverityLevel[] = ['info', 'warning', 'critical'];
return order.indexOf(alert.severity) > order.indexOf(max) ? alert.severity : max;
}, 'info' as AlertSeverityLevel);
const group = await prisma.correlationGroup.create({
data: {
userId,
entities,
highestSeverity: this.mapToNormalizedSeverity(highestSeverity),
status: 'ACTIVE',
alertCount: alerts.length,
summary: `${alerts.length} property change alert${alerts.length > 1 ? 's' : ''} correlated`,
},
});
await prisma.normalizedAlert.updateMany({
where: {
sourceAlertId: { in: alerts.map(a => a.id) },
},
data: {
groupId: group.id,
},
});
}
private async dispatchNotification(
alert: PropertyAlert,
userId: string,
): Promise<void> {
try {
const user = await prisma.user.findUnique({
where: { id: userId },
select: { email: true, name: true },
});
if (!user?.email) {
return;
}
const htmlMessage = `<p>${alert.message.replace(/\n/g, '<br>')}</p>
<p><strong>Property:</strong> ${alert.propertyId}</p>
<p><strong>Change Type:</strong> ${CHANGE_TYPE_LABELS[alert.changeType]}</p>
<p><strong>Severity:</strong> ${alert.severity.toUpperCase()}</p>`;
for (const channel of alert.channel) {
switch (channel) {
case 'email':
await this.notificationService.send({
channel: 'email',
to: user.email,
subject: alert.title,
htmlBody: htmlMessage,
textBody: alert.message,
});
break;
case 'push':
await this.notificationService.send({
channel: 'push',
userId,
title: alert.title,
body: alert.message.slice(0, 200),
});
break;
case 'sms':
await this.notificationService.send({
channel: 'sms',
to: user.email,
body: `[ShieldAI] ${alert.title}: ${alert.message.slice(0, 140)}`,
});
break;
}
}
} catch (error) {
console.error('[HomeTitleAlertPipeline] Notification dispatch error:', error);
}
}
private mapToNormalizedSeverity(severity: AlertSeverityLevel): string {
const map: Record<AlertSeverityLevel, string> = {
info: 'INFO',
warning: 'WARNING',
critical: 'CRITICAL',
};
return map[severity] || 'INFO';
}
private mapToAlertCategory(changeType: ChangeType): string {
const map: Record<ChangeType, string> = {
ownership_transfer: 'CALL_ANOMALY',
deed_change: 'CALL_ANOMALY',
lien_filing: 'CALL_ANOMALY',
tax_change: 'CALL_EVENT',
metadata_change: 'CALL_EVENT',
};
return map[changeType] || 'CALL_EVENT';
}
cleanupExpiredDedups(): number {
const now = Date.now();
let cleaned = 0;
for (const [key, expiry] of this.pendingDedup) {
if (now >= expiry) {
this.pendingDedup.delete(key);
cleaned++;
}
}
return cleaned;
}
}
export const homeTitleAlertPipeline = new HomeTitleAlertPipeline();

View File

@@ -31,4 +31,13 @@ export type {
MatchingConfig,
DetectionConfig,
NormalizedTokens,
AlertSeverityLevel,
PropertyAlert,
NotificationChannel,
AlertPipelineConfig,
SchedulerConfig,
ScheduledScanResult,
} from './types';
export { HomeTitleAlertPipeline, homeTitleAlertPipeline } from './alert.pipeline';
export { HomeTitleSchedulerService, homeTitleScheduler } from './scheduler.service';

View File

@@ -0,0 +1,232 @@
import { prisma } from '@shieldai/db';
import { detectChanges, shouldTriggerAlert } from './change-detector';
import { homeTitleAlertPipeline } from './alert.pipeline';
import {
PropertySnapshot,
SchedulerConfig,
ScheduledScanResult,
} from './types';
import { v4 as uuidv4 } from 'uuid';
const DEFAULT_SCHEDULER_CONFIG: SchedulerConfig = {
scanIntervalMinutes: 60,
premiumScanIntervalMinutes: 30,
maxPropertiesPerScan: 100,
enabled: true,
};
export class HomeTitleSchedulerService {
private config: SchedulerConfig;
private timerId: NodeJS.Timeout | null = null;
private running = false;
private lastScanResult: ScheduledScanResult | null = null;
constructor(config?: Partial<SchedulerConfig>) {
this.config = { ...DEFAULT_SCHEDULER_CONFIG, ...config };
}
getConfig(): SchedulerConfig {
return { ...this.config };
}
updateConfig(partial: Partial<SchedulerConfig>): void {
this.config = { ...this.config, ...partial };
if (partial.scanIntervalMinutes && this.timerId) {
this.stop();
this.start();
}
}
start(): void {
if (!this.config.enabled) return;
const intervalMs = this.config.scanIntervalMinutes * 60 * 1000;
this.running = true;
this.timerId = setInterval(async () => {
if (this.running) {
try {
const result = await this.runScan();
this.lastScanResult = result;
} catch (error) {
console.error('[HomeTitleScheduler] Scan error:', error);
}
}
}, intervalMs);
console.log(
`[HomeTitleScheduler] Started with ${this.config.scanIntervalMinutes}min interval`,
);
}
stop(): void {
this.running = false;
if (this.timerId) {
clearInterval(this.timerId);
this.timerId = null;
}
console.log('[HomeTitleScheduler] Stopped');
}
async runScan(): Promise<ScheduledScanResult> {
const scanId = uuidv4();
const startedAt = new Date().toISOString();
const errors: string[] = [];
let changesDetected = 0;
let alertsCreated = 0;
let notificationsSent = 0;
try {
const subscriptions = await prisma.subscription.findMany({
where: {
status: 'active',
tier: { in: ['plus', 'premium'] },
},
select: {
id: true,
userId: true,
tier: true,
},
take: this.config.maxPropertiesPerScan,
});
for (const subscription of subscriptions) {
try {
const propertySnapshots = await this.fetchLatestSnapshots(
subscription.userId,
);
for (const snapshot of propertySnapshots) {
const previousSnapshot = await this.fetchPreviousSnapshot(
snapshot.propertyId,
snapshot.id,
snapshot.capturedAt,
);
if (!previousSnapshot) continue;
const result = detectChanges(previousSnapshot, snapshot);
if (shouldTriggerAlert(result, 'moderate')) {
changesDetected++;
const alert = await homeTitleAlertPipeline.processChangeDetection(
result,
subscription.id,
subscription.userId,
);
if (alert) {
alertsCreated++;
if (subscription.tier === 'premium') {
notificationsSent++;
}
}
}
}
} catch (error) {
const errorMsg = `Subscription ${subscription.id}: ${error instanceof Error ? error.message : String(error)}`;
errors.push(errorMsg);
console.error(`[HomeTitleScheduler] Subscription scan error:`, errorMsg);
}
}
} catch (error) {
const errorMsg = `Scan ${scanId}: ${error instanceof Error ? error.message : String(error)}`;
errors.push(errorMsg);
console.error(`[HomeTitleScheduler] Scan error:`, errorMsg);
}
const completedAt = new Date().toISOString();
const scanResult: ScheduledScanResult = {
scanId,
propertiesScanned: changesDetected,
changesDetected,
alertsCreated,
notificationsSent,
errors,
startedAt,
completedAt,
};
return scanResult;
}
getLastScanResult(): ScheduledScanResult | null {
return this.lastScanResult;
}
isRunning(): boolean {
return this.running;
}
private async fetchLatestSnapshots(userId: string): Promise<PropertySnapshot[]> {
const rawSnapshots = await prisma.$queryRaw<
Array<Record<string, unknown>>
>`
SELECT "id", "propertyId", "capturedAt", "ownerName",
"deedDate", "taxId", "propertyType",
"taxAmount", "lienCount"
FROM "PropertySnapshot"
WHERE "propertyId" IN (
SELECT "propertyId" FROM "WatchlistItem"
WHERE "subscriptionId" IN (
SELECT "id" FROM "Subscription" WHERE "userId" = ${userId}
)
)
ORDER BY "capturedAt" DESC
LIMIT ${this.config.maxPropertiesPerScan}
`;
return rawSnapshots.map(row => ({
id: String(row.id),
propertyId: String(row.propertyId),
capturedAt: String(row.capturedAt),
ownerName: String(row.ownerName),
address: row.address ? JSON.parse(String(row.address)) : {},
deedDate: row.deedDate ? String(row.deedDate) : undefined,
taxId: row.taxId ? String(row.taxId) : undefined,
propertyType: String(row.propertyType) as PropertySnapshot['propertyType'],
taxAmount: row.taxAmount ? Number(row.taxAmount) : undefined,
lienCount: row.lienCount ? Number(row.lienCount) : undefined,
}));
}
private async fetchPreviousSnapshot(
propertyId: string,
currentSnapshotId: string,
currentCapturedAt: string,
): Promise<PropertySnapshot | null> {
const rawSnapshots = await prisma.$queryRaw<
Array<Record<string, unknown>>
>`
SELECT "id", "propertyId", "capturedAt", "ownerName",
"deedDate", "taxId", "propertyType",
"taxAmount", "lienCount"
FROM "PropertySnapshot"
WHERE "propertyId" = ${propertyId}
AND "capturedAt" < ${currentCapturedAt}
AND "id" != ${currentSnapshotId}
ORDER BY "capturedAt" DESC
LIMIT 1
`;
if (rawSnapshots.length === 0) return null;
const row = rawSnapshots[0];
return {
id: String(row.id),
propertyId: String(row.propertyId),
capturedAt: String(row.capturedAt),
ownerName: String(row.ownerName),
address: row.address ? JSON.parse(String(row.address)) : {},
deedDate: row.deedDate ? String(row.deedDate) : undefined,
taxId: row.taxId ? String(row.taxId) : undefined,
propertyType: String(row.propertyType) as PropertySnapshot['propertyType'],
taxAmount: row.taxAmount ? Number(row.taxAmount) : undefined,
lienCount: row.lienCount ? Number(row.lienCount) : undefined,
};
}
}
export const homeTitleScheduler = new HomeTitleSchedulerService();

View File

@@ -112,3 +112,55 @@ export interface NormalizedTokens {
middleName: string;
initials: string[];
}
// ============================================
// Alert Pipeline Types
// ============================================
export type AlertSeverityLevel = 'info' | 'warning' | 'critical';
export interface PropertyAlert {
id: string;
propertyId: string;
subscriptionId: string;
userId: string;
changeType: ChangeType;
severity: AlertSeverityLevel;
title: string;
message: string;
changeDetectionResult: ChangeDetectionResult;
channel: NotificationChannel[];
dedupKey: string;
createdAt: string;
}
export type NotificationChannel = 'email' | 'sms' | 'push';
export interface AlertPipelineConfig {
dedupWindowMs: number;
minSeverity: Severity;
premiumTierChannels: NotificationChannel[];
defaultChannels: NotificationChannel[];
}
// ============================================
// Scheduler Types
// ============================================
export interface SchedulerConfig {
scanIntervalMinutes: number;
premiumScanIntervalMinutes: number;
maxPropertiesPerScan: number;
enabled: boolean;
}
export interface ScheduledScanResult {
scanId: string;
propertiesScanned: number;
changesDetected: number;
alertsCreated: number;
notificationsSent: number;
errors: string[];
startedAt: string;
completedAt: string;
}