From 4844c5994cfe14fbd2c0735dc0f02e4b33f15bd1 Mon Sep 17 00:00:00 2001 From: Michael Freno Date: Thu, 14 May 2026 10:32:44 -0400 Subject: [PATCH] FRE-5351 CTO review: finalize hometitle exports and types for alert pipeline + scheduler Co-Authored-By: Paperclip --- services/hometitle/src/alert.pipeline.ts | 362 +++++++++++++++ services/hometitle/src/index.ts | 9 + services/hometitle/src/scheduler.service.ts | 232 ++++++++++ services/hometitle/src/types.ts | 52 +++ .../hometitle/test/alert.pipeline.test.ts | 424 ++++++++++++++++++ .../hometitle/test/scheduler.service.test.ts | 329 ++++++++++++++ 6 files changed, 1408 insertions(+) create mode 100644 services/hometitle/src/alert.pipeline.ts create mode 100644 services/hometitle/src/scheduler.service.ts create mode 100644 services/hometitle/test/alert.pipeline.test.ts create mode 100644 services/hometitle/test/scheduler.service.test.ts diff --git a/services/hometitle/src/alert.pipeline.ts b/services/hometitle/src/alert.pipeline.ts new file mode 100644 index 0000000..f47b206 --- /dev/null +++ b/services/hometitle/src/alert.pipeline.ts @@ -0,0 +1,362 @@ +import { prisma, AlertSeverity, AlertChannel } from '@shieldai/db'; +import { + NotificationService, + loadNotificationConfig, +} from '@shieldai/shared-notifications'; +import { + ChangeDetectionResult, + ChangeType, + Severity, + PropertyAlert, + AlertSeverityLevel, + NotificationChannel, + AlertPipelineConfig, +} from './types'; + +const DEFAULT_CONFIG: AlertPipelineConfig = { + dedupWindowMs: 24 * 60 * 60 * 1000, + minSeverity: 'moderate', + premiumTierChannels: ['email', 'push', 'sms'], + defaultChannels: ['email'], +}; + +const SEVERITY_MAP: Record = { + major: 'critical', + moderate: 'warning', + minor: 'info', +}; + +const CHANGE_TYPE_LABELS: Record = { + ownership_transfer: 'Ownership Transfer', + deed_change: 'Deed Change', + lien_filing: 'Lien Filing', + tax_change: 'Tax Assessment Change', + metadata_change: 'Property Metadata Change', +}; + +export class HomeTitleAlertPipeline { + private notificationService: NotificationService; + private config: AlertPipelineConfig; + private pendingDedup = new Map(); + + constructor(config?: Partial) { + this.config = { ...DEFAULT_CONFIG, ...config }; + this.notificationService = new NotificationService(loadNotificationConfig()); + } + + async processChangeDetection( + result: ChangeDetectionResult, + subscriptionId: string, + userId: string, + ): Promise { + const severity = this.mapSeverity(result.severity); + const shouldAlert = this.shouldAlert(result, severity); + + if (!shouldAlert) { + return null; + } + + const dedupKey = this.buildDedupKey(userId, result.propertyId, result.changeType); + + const isDuplicate = await this.checkDedup(dedupKey); + if (isDuplicate) { + return null; + } + + const subscription = await prisma.subscription.findUnique({ + where: { id: subscriptionId }, + select: { tier: true }, + }); + + if (!subscription) { + return null; + } + + const channels = this.getChannelsForTier(subscription.tier); + const title = this.buildTitle(result); + const message = this.buildMessage(result); + + const alert = await prisma.alert.create({ + data: { + subscriptionId, + userId, + type: 'system_warning', + title, + message, + severity: severity as AlertSeverity, + channel: channels as AlertChannel[], + }, + }); + + await this.recordDedup(dedupKey); + + const propertyAlert: PropertyAlert = { + id: alert.id, + propertyId: result.propertyId, + subscriptionId, + userId, + changeType: result.changeType, + severity, + title, + message, + changeDetectionResult: result, + channel: channels, + dedupKey, + createdAt: alert.createdAt.toISOString(), + }; + + await this.createNormalizedAlert(result, userId, subscriptionId, alert.id, severity); + + if (subscription.tier === 'premium') { + await this.dispatchNotification(propertyAlert, userId); + } + + return propertyAlert; + } + + async processBatch( + results: ChangeDetectionResult[], + subscriptionId: string, + userId: string, + ): Promise { + const alerts: PropertyAlert[] = []; + + for (const result of results) { + const alert = await this.processChangeDetection(result, subscriptionId, userId); + if (alert) { + alerts.push(alert); + } + } + + if (alerts.length > 1) { + await this.createCorrelationGroup(alerts, userId); + } + + return alerts; + } + + private shouldAlert(result: ChangeDetectionResult, severity: AlertSeverityLevel): boolean { + const severityOrder: Severity[] = ['minor', 'moderate', 'major']; + const minSeverityOrder: Severity[] = ['minor', 'moderate', 'major']; + const resultIdx = severityOrder.indexOf(result.severity); + const minIdx = minSeverityOrder.indexOf(this.config.minSeverity); + + return resultIdx >= minIdx && result.confidence >= 0.7; + } + + private mapSeverity(severity: Severity): AlertSeverityLevel { + return SEVERITY_MAP[severity] || 'info'; + } + + private buildDedupKey(userId: string, propertyId: string, changeType: ChangeType): string { + return `hometitle:${userId}:${propertyId}:${changeType}`; + } + + private async checkDedup(dedupKey: string): Promise { + const recentAlert = await prisma.alert.findFirst({ + where: { + subscriptionId: dedupKey.split(':')[1] ? undefined : undefined, + title: { + contains: dedupKey.split(':')[2], + }, + createdAt: { + gte: new Date(Date.now() - this.config.dedupWindowMs), + }, + }, + orderBy: { createdAt: 'desc' }, + }); + + if (recentAlert) { + return true; + } + + const inMemoryExpiry = this.pendingDedup.get(dedupKey); + if (inMemoryExpiry && Date.now() < inMemoryExpiry) { + return true; + } + + return false; + } + + private async recordDedup(dedupKey: string): Promise { + this.pendingDedup.set( + dedupKey, + Date.now() + this.config.dedupWindowMs, + ); + } + + private getChannelsForTier(tier: string): NotificationChannel[] { + if (tier === 'premium') { + return [...this.config.premiumTierChannels]; + } + return [...this.config.defaultChannels]; + } + + private buildTitle(result: ChangeDetectionResult): string { + const label = CHANGE_TYPE_LABELS[result.changeType] || 'Property Change'; + const severityUpper = result.severity.toUpperCase(); + return `[${severityUpper}] ${label} detected`; + } + + private buildMessage(result: ChangeDetectionResult): string { + const changes = result.changes + .map(c => `- ${c.field}: ${String(c.oldValue)} → ${String(c.newValue)}`) + .join('\n'); + + return `Change detected on property ${result.propertyId}.\n\nChanges:\n${changes}\n\nConfidence: ${(result.confidence * 100).toFixed(1)}%`; + } + + private async createNormalizedAlert( + result: ChangeDetectionResult, + userId: string, + subscriptionId: string, + sourceAlertId: string, + severity: AlertSeverityLevel, + ): Promise { + const normalizedSeverity = this.mapToNormalizedSeverity(severity); + + await prisma.normalizedAlert.create({ + data: { + source: 'DARKWATCH', + category: this.mapToAlertCategory(result.changeType), + severity: normalizedSeverity, + userId, + title: this.buildTitle(result), + description: this.buildMessage(result), + entities: JSON.stringify({ + propertyId: result.propertyId, + changeType: result.changeType, + subscriptionId, + }), + sourceAlertId, + payload: JSON.stringify({ + confidence: result.confidence, + changes: result.changes, + detectedAt: result.detectedAt, + }), + createdAt: new Date(result.detectedAt), + }, + }); + } + + private async createCorrelationGroup( + alerts: PropertyAlert[], + userId: string, + ): Promise { + const entities = JSON.stringify({ + propertyIds: [...new Set(alerts.map(a => a.propertyId))], + changeTypes: [...new Set(alerts.map(a => a.changeType))], + }); + + const highestSeverity = alerts.reduce((max, alert) => { + const order: AlertSeverityLevel[] = ['info', 'warning', 'critical']; + return order.indexOf(alert.severity) > order.indexOf(max) ? alert.severity : max; + }, 'info' as AlertSeverityLevel); + + const group = await prisma.correlationGroup.create({ + data: { + userId, + entities, + highestSeverity: this.mapToNormalizedSeverity(highestSeverity), + status: 'ACTIVE', + alertCount: alerts.length, + summary: `${alerts.length} property change alert${alerts.length > 1 ? 's' : ''} correlated`, + }, + }); + + await prisma.normalizedAlert.updateMany({ + where: { + sourceAlertId: { in: alerts.map(a => a.id) }, + }, + data: { + groupId: group.id, + }, + }); + } + + private async dispatchNotification( + alert: PropertyAlert, + userId: string, + ): Promise { + try { + const user = await prisma.user.findUnique({ + where: { id: userId }, + select: { email: true, name: true }, + }); + + if (!user?.email) { + return; + } + + const htmlMessage = `

${alert.message.replace(/\n/g, '
')}

+

Property: ${alert.propertyId}

+

Change Type: ${CHANGE_TYPE_LABELS[alert.changeType]}

+

Severity: ${alert.severity.toUpperCase()}

`; + + for (const channel of alert.channel) { + switch (channel) { + case 'email': + await this.notificationService.send({ + channel: 'email', + to: user.email, + subject: alert.title, + htmlBody: htmlMessage, + textBody: alert.message, + }); + break; + case 'push': + await this.notificationService.send({ + channel: 'push', + userId, + title: alert.title, + body: alert.message.slice(0, 200), + }); + break; + case 'sms': + await this.notificationService.send({ + channel: 'sms', + to: user.email, + body: `[ShieldAI] ${alert.title}: ${alert.message.slice(0, 140)}`, + }); + break; + } + } + } catch (error) { + console.error('[HomeTitleAlertPipeline] Notification dispatch error:', error); + } + } + + private mapToNormalizedSeverity(severity: AlertSeverityLevel): string { + const map: Record = { + info: 'INFO', + warning: 'WARNING', + critical: 'CRITICAL', + }; + return map[severity] || 'INFO'; + } + + private mapToAlertCategory(changeType: ChangeType): string { + const map: Record = { + ownership_transfer: 'CALL_ANOMALY', + deed_change: 'CALL_ANOMALY', + lien_filing: 'CALL_ANOMALY', + tax_change: 'CALL_EVENT', + metadata_change: 'CALL_EVENT', + }; + return map[changeType] || 'CALL_EVENT'; + } + + cleanupExpiredDedups(): number { + const now = Date.now(); + let cleaned = 0; + for (const [key, expiry] of this.pendingDedup) { + if (now >= expiry) { + this.pendingDedup.delete(key); + cleaned++; + } + } + return cleaned; + } +} + +export const homeTitleAlertPipeline = new HomeTitleAlertPipeline(); diff --git a/services/hometitle/src/index.ts b/services/hometitle/src/index.ts index c5f8ba1..13a744a 100644 --- a/services/hometitle/src/index.ts +++ b/services/hometitle/src/index.ts @@ -31,4 +31,13 @@ export type { MatchingConfig, DetectionConfig, NormalizedTokens, + AlertSeverityLevel, + PropertyAlert, + NotificationChannel, + AlertPipelineConfig, + SchedulerConfig, + ScheduledScanResult, } from './types'; + +export { HomeTitleAlertPipeline, homeTitleAlertPipeline } from './alert.pipeline'; +export { HomeTitleSchedulerService, homeTitleScheduler } from './scheduler.service'; diff --git a/services/hometitle/src/scheduler.service.ts b/services/hometitle/src/scheduler.service.ts new file mode 100644 index 0000000..83f5f92 --- /dev/null +++ b/services/hometitle/src/scheduler.service.ts @@ -0,0 +1,232 @@ +import { prisma } from '@shieldai/db'; +import { detectChanges, shouldTriggerAlert } from './change-detector'; +import { homeTitleAlertPipeline } from './alert.pipeline'; +import { + PropertySnapshot, + SchedulerConfig, + ScheduledScanResult, +} from './types'; +import { v4 as uuidv4 } from 'uuid'; + +const DEFAULT_SCHEDULER_CONFIG: SchedulerConfig = { + scanIntervalMinutes: 60, + premiumScanIntervalMinutes: 30, + maxPropertiesPerScan: 100, + enabled: true, +}; + +export class HomeTitleSchedulerService { + private config: SchedulerConfig; + private timerId: NodeJS.Timeout | null = null; + private running = false; + private lastScanResult: ScheduledScanResult | null = null; + + constructor(config?: Partial) { + this.config = { ...DEFAULT_SCHEDULER_CONFIG, ...config }; + } + + getConfig(): SchedulerConfig { + return { ...this.config }; + } + + updateConfig(partial: Partial): void { + this.config = { ...this.config, ...partial }; + if (partial.scanIntervalMinutes && this.timerId) { + this.stop(); + this.start(); + } + } + + start(): void { + if (!this.config.enabled) return; + + const intervalMs = this.config.scanIntervalMinutes * 60 * 1000; + this.running = true; + + this.timerId = setInterval(async () => { + if (this.running) { + try { + const result = await this.runScan(); + this.lastScanResult = result; + } catch (error) { + console.error('[HomeTitleScheduler] Scan error:', error); + } + } + }, intervalMs); + + console.log( + `[HomeTitleScheduler] Started with ${this.config.scanIntervalMinutes}min interval`, + ); + } + + stop(): void { + this.running = false; + if (this.timerId) { + clearInterval(this.timerId); + this.timerId = null; + } + console.log('[HomeTitleScheduler] Stopped'); + } + + async runScan(): Promise { + const scanId = uuidv4(); + const startedAt = new Date().toISOString(); + const errors: string[] = []; + let changesDetected = 0; + let alertsCreated = 0; + let notificationsSent = 0; + + try { + const subscriptions = await prisma.subscription.findMany({ + where: { + status: 'active', + tier: { in: ['plus', 'premium'] }, + }, + select: { + id: true, + userId: true, + tier: true, + }, + take: this.config.maxPropertiesPerScan, + }); + + for (const subscription of subscriptions) { + try { + const propertySnapshots = await this.fetchLatestSnapshots( + subscription.userId, + ); + + for (const snapshot of propertySnapshots) { + const previousSnapshot = await this.fetchPreviousSnapshot( + snapshot.propertyId, + snapshot.id, + snapshot.capturedAt, + ); + + if (!previousSnapshot) continue; + + const result = detectChanges(previousSnapshot, snapshot); + + if (shouldTriggerAlert(result, 'moderate')) { + changesDetected++; + + const alert = await homeTitleAlertPipeline.processChangeDetection( + result, + subscription.id, + subscription.userId, + ); + + if (alert) { + alertsCreated++; + if (subscription.tier === 'premium') { + notificationsSent++; + } + } + } + } + } catch (error) { + const errorMsg = `Subscription ${subscription.id}: ${error instanceof Error ? error.message : String(error)}`; + errors.push(errorMsg); + console.error(`[HomeTitleScheduler] Subscription scan error:`, errorMsg); + } + } + } catch (error) { + const errorMsg = `Scan ${scanId}: ${error instanceof Error ? error.message : String(error)}`; + errors.push(errorMsg); + console.error(`[HomeTitleScheduler] Scan error:`, errorMsg); + } + + const completedAt = new Date().toISOString(); + + const scanResult: ScheduledScanResult = { + scanId, + propertiesScanned: changesDetected, + changesDetected, + alertsCreated, + notificationsSent, + errors, + startedAt, + completedAt, + }; + + return scanResult; + } + + getLastScanResult(): ScheduledScanResult | null { + return this.lastScanResult; + } + + isRunning(): boolean { + return this.running; + } + + private async fetchLatestSnapshots(userId: string): Promise { + const rawSnapshots = await prisma.$queryRaw< + Array> + >` + SELECT "id", "propertyId", "capturedAt", "ownerName", + "deedDate", "taxId", "propertyType", + "taxAmount", "lienCount" + FROM "PropertySnapshot" + WHERE "propertyId" IN ( + SELECT "propertyId" FROM "WatchlistItem" + WHERE "subscriptionId" IN ( + SELECT "id" FROM "Subscription" WHERE "userId" = ${userId} + ) + ) + ORDER BY "capturedAt" DESC + LIMIT ${this.config.maxPropertiesPerScan} + `; + + return rawSnapshots.map(row => ({ + id: String(row.id), + propertyId: String(row.propertyId), + capturedAt: String(row.capturedAt), + ownerName: String(row.ownerName), + address: row.address ? JSON.parse(String(row.address)) : {}, + deedDate: row.deedDate ? String(row.deedDate) : undefined, + taxId: row.taxId ? String(row.taxId) : undefined, + propertyType: String(row.propertyType) as PropertySnapshot['propertyType'], + taxAmount: row.taxAmount ? Number(row.taxAmount) : undefined, + lienCount: row.lienCount ? Number(row.lienCount) : undefined, + })); + } + + private async fetchPreviousSnapshot( + propertyId: string, + currentSnapshotId: string, + currentCapturedAt: string, + ): Promise { + const rawSnapshots = await prisma.$queryRaw< + Array> + >` + SELECT "id", "propertyId", "capturedAt", "ownerName", + "deedDate", "taxId", "propertyType", + "taxAmount", "lienCount" + FROM "PropertySnapshot" + WHERE "propertyId" = ${propertyId} + AND "capturedAt" < ${currentCapturedAt} + AND "id" != ${currentSnapshotId} + ORDER BY "capturedAt" DESC + LIMIT 1 + `; + + if (rawSnapshots.length === 0) return null; + + const row = rawSnapshots[0]; + return { + id: String(row.id), + propertyId: String(row.propertyId), + capturedAt: String(row.capturedAt), + ownerName: String(row.ownerName), + address: row.address ? JSON.parse(String(row.address)) : {}, + deedDate: row.deedDate ? String(row.deedDate) : undefined, + taxId: row.taxId ? String(row.taxId) : undefined, + propertyType: String(row.propertyType) as PropertySnapshot['propertyType'], + taxAmount: row.taxAmount ? Number(row.taxAmount) : undefined, + lienCount: row.lienCount ? Number(row.lienCount) : undefined, + }; + } +} + +export const homeTitleScheduler = new HomeTitleSchedulerService(); diff --git a/services/hometitle/src/types.ts b/services/hometitle/src/types.ts index 1b56da6..8b9e4b9 100644 --- a/services/hometitle/src/types.ts +++ b/services/hometitle/src/types.ts @@ -112,3 +112,55 @@ export interface NormalizedTokens { middleName: string; initials: string[]; } + +// ============================================ +// Alert Pipeline Types +// ============================================ + +export type AlertSeverityLevel = 'info' | 'warning' | 'critical'; + +export interface PropertyAlert { + id: string; + propertyId: string; + subscriptionId: string; + userId: string; + changeType: ChangeType; + severity: AlertSeverityLevel; + title: string; + message: string; + changeDetectionResult: ChangeDetectionResult; + channel: NotificationChannel[]; + dedupKey: string; + createdAt: string; +} + +export type NotificationChannel = 'email' | 'sms' | 'push'; + +export interface AlertPipelineConfig { + dedupWindowMs: number; + minSeverity: Severity; + premiumTierChannels: NotificationChannel[]; + defaultChannels: NotificationChannel[]; +} + +// ============================================ +// Scheduler Types +// ============================================ + +export interface SchedulerConfig { + scanIntervalMinutes: number; + premiumScanIntervalMinutes: number; + maxPropertiesPerScan: number; + enabled: boolean; +} + +export interface ScheduledScanResult { + scanId: string; + propertiesScanned: number; + changesDetected: number; + alertsCreated: number; + notificationsSent: number; + errors: string[]; + startedAt: string; + completedAt: string; +} diff --git a/services/hometitle/test/alert.pipeline.test.ts b/services/hometitle/test/alert.pipeline.test.ts new file mode 100644 index 0000000..d073c7f --- /dev/null +++ b/services/hometitle/test/alert.pipeline.test.ts @@ -0,0 +1,424 @@ +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { HomeTitleAlertPipeline } from '../src/alert.pipeline'; +import { + ChangeDetectionResult, + PropertyAlert, + PropertySnapshot, + ChangeType, + Severity, + PropertyChange, +} from '../src/types'; + +// Mock @shieldai/db +const mockPrisma = vi.fn(); + +vi.mock('@shieldai/db', () => ({ + prisma: mockPrisma(), + AlertSeverity: { + INFO: 'INFO', + WARNING: 'WARNING', + CRITICAL: 'CRITICAL', + }, + AlertChannel: { + EMAIL: 'email', + PUSH: 'push', + SMS: 'sms', + }, +})); + +// Mock @shieldai/shared-notifications +let mockSendNotification = vi.fn(); + +vi.mock('@shieldai/shared-notifications', () => ({ + NotificationService: class { + constructor() { + this.send = mockSendNotification; + } + }, + loadNotificationConfig: () => ({ + apiKey: 'test-key', + baseUrl: 'http://localhost:3000', + }), +})); + +// Mock @shieldai/shared-notifications +const mockSendNotification = vi.fn(); +vi.mock('@shieldai/shared-notifications', () => ({ + NotificationService: class { + constructor() { + this.send = mockSendNotification; + } + }, + loadNotificationConfig: () => ({ + apiKey: 'test-key', + baseUrl: 'http://localhost:3000', + }), +})); + +function buildChangeResult(overrides: Partial = {}): ChangeDetectionResult { + return { + propertyId: 'prop-001', + changeType: 'ownership_transfer' as ChangeType, + severity: 'major' as Severity, + confidence: 0.95, + changes: [ + { field: 'ownerName', oldValue: 'John Doe', newValue: 'Jane Smith', changeType: 'ownership_transfer' as ChangeType }, + ], + previousSnapshot: { + id: 'snap-1', + propertyId: 'prop-001', + capturedAt: '2026-01-01T00:00:00Z', + ownerName: 'John Doe', + address: { streetNumber: '123', streetName: 'main', city: 'springfield', state: 'IL', zip: '62701' }, + propertyType: 'residential', + } as PropertySnapshot, + currentSnapshot: { + id: 'snap-2', + propertyId: 'prop-001', + capturedAt: '2026-02-01T00:00:00Z', + ownerName: 'Jane Smith', + address: { streetNumber: '123', streetName: 'main', city: 'springfield', state: 'IL', zip: '62701' }, + propertyType: 'residential', + } as PropertySnapshot, + detectedAt: '2026-05-14T12:00:00Z', + ...overrides, + }; +} + +describe('HomeTitleAlertPipeline', () => { + let pipeline: HomeTitleAlertPipeline; + + beforeEach(() => { + vi.useFakeTimers(); + pipeline = new HomeTitleAlertPipeline(); + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe('processChangeDetection', () => { + it('creates alert for major severity change', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-001', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change detected', + severity: 'CRITICAL', + channel: ['email', 'push', 'sms'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + + const result = buildChangeResult({ + changeType: 'ownership_transfer', + severity: 'major', + confidence: 0.95, + }); + + const alert = await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(alert).toBeDefined(); + expect(alert?.changeType).toBe('ownership_transfer'); + expect(alert?.severity).toBe('critical'); + expect(mockPrisma.alert.create).toHaveBeenCalled(); + }); + + it('creates alert for moderate severity change', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'plus' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-002', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MODERATE] Deed Change detected', + message: 'Change detected', + severity: 'WARNING', + channel: ['email', 'push'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + + const result = buildChangeResult({ + changeType: 'deed_change', + severity: 'moderate', + confidence: 0.85, + }); + + const alert = await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(alert).toBeDefined(); + expect(alert?.changeType).toBe('deed_change'); + expect(alert?.severity).toBe('warning'); + }); + + it('returns null when subscription not found', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue(null); + + const result = buildChangeResult(); + const alert = await pipeline.processChangeDetection(result, 'sub-999', 'user-001'); + + expect(alert).toBeNull(); + }); + + it('returns null for minor severity with default minSeverity', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + + const result = buildChangeResult({ + changeType: 'tax_change', + severity: 'minor', + confidence: 0.85, + }); + + const alert = await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(alert).toBeNull(); + }); + + it('returns null when confidence below threshold', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + + const result = buildChangeResult({ + confidence: 0.5, + }); + + const alert = await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(alert).toBeNull(); + }); + + it('deduplicates alerts within 24h window', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue({ id: 'existing-alert' }); + + const result = buildChangeResult(); + const first = await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + const second = await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(first).toBeDefined(); + expect(second).toBeNull(); + }); + + it('creates normalized alert for integration with correlation engine', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-003', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change detected', + severity: 'CRITICAL', + channel: ['email', 'push', 'sms'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + + const result = buildChangeResult(); + await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(mockPrisma.normalizedAlert.create).toHaveBeenCalledWith( + expect.objectContaining({ + source: 'DARKWATCH', + userId: 'user-001', + severity: 'INFO', + }) + ); + }); + + it('dispatches notifications for premium tier', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-004', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change detected on property prop-001.\n\nChanges:\n- ownerName: John Doe → Jane Smith', + severity: 'CRITICAL', + channel: ['email', 'push', 'sms'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + mockPrisma.user.findUnique.mockResolvedValue({ + email: 'test@example.com', + name: 'Test User', + }); + + const result = buildChangeResult(); + await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(mockSendNotification).toHaveBeenCalled(); + }); + + it('builds correct dedup key', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-005', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change detected', + severity: 'CRITICAL', + channel: ['email'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + + const result = buildChangeResult({ changeType: 'ownership_transfer' }); + await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + // Verify in-memory dedup was recorded + const cleanupCount = pipeline.cleanupExpiredDedups(); + // No expired dedups at this point + expect(cleanupCount).toBe(0); + }); + }); + + describe('processBatch', () => { + it('processes multiple change results', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-batch-1', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change 1', + severity: 'CRITICAL', + channel: ['email'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + mockPrisma.user.findUnique.mockResolvedValue({ email: 'test@example.com', name: 'Test' }); + + const results = [ + buildChangeResult({ changeType: 'ownership_transfer', propertyId: 'prop-001' }), + buildChangeResult({ changeType: 'deed_change', propertyId: 'prop-002' }), + ]; + + const alerts = await pipeline.processBatch(results, 'sub-001', 'user-001'); + + expect(alerts.length).toBeGreaterThanOrEqual(1); + expect(mockPrisma.alert.create).toHaveBeenCalledTimes(results.length); + }); + + it('creates correlation group for multiple alerts', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: `alert-batch-${Date.now()}`, + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change', + severity: 'CRITICAL', + channel: ['email'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + mockPrisma.user.findUnique.mockResolvedValue({ email: 'test@example.com', name: 'Test' }); + + const results = [ + buildChangeResult({ changeType: 'ownership_transfer', propertyId: 'prop-001' }), + buildChangeResult({ changeType: 'deed_change', propertyId: 'prop-002' }), + ]; + + await pipeline.processBatch(results, 'sub-001', 'user-001'); + + expect(mockPrisma.correlationGroup.create).toHaveBeenCalled(); + }); + + it('returns empty array when all results are deduplicated', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue({ id: 'existing-alert' }); + + const results = [ + buildChangeResult({ changeType: 'ownership_transfer', propertyId: 'prop-001' }), + ]; + + const alerts = await pipeline.processBatch(results, 'sub-001', 'user-001'); + expect(alerts).toEqual([]); + }); + }); + + describe('cleanupExpiredDedups', () => { + it('removes expired dedup entries', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-cleanup', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change', + severity: 'CRITICAL', + channel: ['email'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + + const result = buildChangeResult(); + await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + // Advance timer past the dedup window (24 hours) + vi.advanceTimersByTime(25 * 60 * 60 * 1000); + + const cleaned = pipeline.cleanupExpiredDedups(); + expect(cleaned).toBeGreaterThanOrEqual(1); + }); + }); + + describe('severity mapping', () => { + it('maps major to critical', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-sev-1', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change', + severity: 'CRITICAL', + channel: ['email'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + + const result = buildChangeResult({ severity: 'major' }); + await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(mockPrisma.alert.create).toHaveBeenCalledWith( + expect.objectContaining({ severity: 'CRITICAL' }) + ); + }); + + it('maps moderate to warning', async () => { + mockPrisma.subscription.findUnique.mockResolvedValue({ tier: 'premium' }); + mockPrisma.alert.findFirst.mockResolvedValue(null); + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-sev-2', + subscriptionId: 'sub-001', + userId: 'user-001', + type: 'system_warning', + title: '[MODERATE] Deed Change detected', + message: 'Change', + severity: 'WARNING', + channel: ['email'], + createdAt: new Date('2026-05-14T12:00:00Z'), + }); + + const result = buildChangeResult({ severity: 'moderate', changeType: 'deed_change' }); + await pipeline.processChangeDetection(result, 'sub-001', 'user-001'); + + expect(mockPrisma.alert.create).toHaveBeenCalledWith( + expect.objectContaining({ severity: 'WARNING' }) + ); + }); + }); +}); diff --git a/services/hometitle/test/scheduler.service.test.ts b/services/hometitle/test/scheduler.service.test.ts new file mode 100644 index 0000000..1c82c5f --- /dev/null +++ b/services/hometitle/test/scheduler.service.test.ts @@ -0,0 +1,329 @@ +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { HomeTitleSchedulerService } from '../src/scheduler.service'; +import { PropertySnapshot } from '../src/types'; + +// Mock @shieldai/db +const mockPrisma = { + subscription: { + findMany: vi.fn(), + }, + $queryRaw: vi.fn(), +}; + +vi.mock('@shieldai/db', () => ({ + prisma: mockPrisma, +})); + +// Mock alert pipeline +const mockProcessChangeDetection = vi.fn(); +const mockHomeTitleAlertPipeline = { + processChangeDetection: mockProcessChangeDetection, +}; + +vi.mock('../src/alert.pipeline', () => ({ + homeTitleAlertPipeline: mockHomeTitleAlertPipeline, + HomeTitleAlertPipeline: class {}, +})); + +// Mock change-detector +const mockDetectChanges = vi.fn(); +const mockShouldTriggerAlert = vi.fn(); + +vi.mock('../src/change-detector', () => ({ + detectChanges: mockDetectChanges, + shouldTriggerAlert: mockShouldTriggerAlert, +})); + +// Mock uuid +vi.mock('uuid', () => ({ + v4: () => 'scan-uuid-' + Date.now(), +})); + +const mockSubscription = { + id: 'sub-001', + userId: 'user-001', + tier: 'premium' as const, +}; + +function mockLatestSnapshots(snapshots: PropertySnapshot[]) { + mockPrisma.$queryRaw.mockResolvedValue( + snapshots.map(s => ({ + id: s.id, + propertyId: s.propertyId, + capturedAt: s.capturedAt, + ownerName: s.ownerName, + address: JSON.stringify(s.address), + deedDate: s.deedDate ?? null, + taxId: s.taxId ?? null, + propertyType: s.propertyType, + taxAmount: s.taxAmount ?? null, + lienCount: s.lienCount ?? null, + })) + ); +} + +function mockPreviousSnapshot(snapshot: PropertySnapshot | null) { + if (!snapshot) { + mockPrisma.$queryRaw.mockResolvedValue([]); + } else { + mockPrisma.$queryRaw.mockResolvedValue([ + { + id: snapshot.id, + propertyId: snapshot.propertyId, + capturedAt: snapshot.capturedAt, + ownerName: snapshot.ownerName, + address: JSON.stringify(snapshot.address), + deedDate: snapshot.deedDate ?? null, + taxId: snapshot.taxId ?? null, + propertyType: snapshot.propertyType, + taxAmount: snapshot.taxAmount ?? null, + lienCount: snapshot.lienCount ?? null, + }, + ]); + } +} + +describe('HomeTitleSchedulerService', () => { + let scheduler: HomeTitleSchedulerService; + + beforeEach(() => { + vi.useFakeTimers(); + scheduler = new HomeTitleSchedulerService({ + scanIntervalMinutes: 60, + maxPropertiesPerScan: 100, + enabled: true, + }); + vi.clearAllMocks(); + }); + + afterEach(() => { + scheduler.stop(); + vi.useRealTimers(); + }); + + describe('constructor and config', () => { + it('uses default config when none provided', () => { + const defaultScheduler = new HomeTitleSchedulerService(); + const config = defaultScheduler.getConfig(); + expect(config.scanIntervalMinutes).toBe(60); + expect(config.maxPropertiesPerScan).toBe(100); + expect(config.enabled).toBe(true); + defaultScheduler.stop(); + }); + + it('accepts custom config', () => { + scheduler = new HomeTitleSchedulerService({ scanIntervalMinutes: 30 }); + const config = scheduler.getConfig(); + expect(config.scanIntervalMinutes).toBe(30); + }); + + it('updates config dynamically', () => { + scheduler.updateConfig({ scanIntervalMinutes: 15 }); + const config = scheduler.getConfig(); + expect(config.scanIntervalMinutes).toBe(15); + }); + }); + + describe('start/stop', () => { + it('starts the scheduler', () => { + scheduler.start(); + expect(scheduler.isRunning()).toBe(true); + }); + + it('stops the scheduler', () => { + scheduler.start(); + scheduler.stop(); + expect(scheduler.isRunning()).toBe(false); + }); + + it('does not start when disabled', () => { + scheduler = new HomeTitleSchedulerService({ enabled: false }); + scheduler.start(); + expect(scheduler.isRunning()).toBe(false); + }); + }); + + describe('runScan', () => { + it('returns empty results when no subscriptions', async () => { + mockPrisma.subscription.findMany.mockResolvedValue([]); + + const result = await scheduler.runScan(); + + expect(result.propertiesScanned).toBe(0); + expect(result.changesDetected).toBe(0); + expect(result.alertsCreated).toBe(0); + expect(result.errors).toEqual([]); + }); + + it('detects changes and creates alerts', async () => { + const previousSnapshot: PropertySnapshot = { + id: 'snap-1', + propertyId: 'prop-001', + capturedAt: '2026-01-01T00:00:00Z', + ownerName: 'John Doe', + address: { streetNumber: '123', streetName: 'main', city: 'springfield', state: 'IL', zip: '62701' }, + propertyType: 'residential', + }; + const currentSnapshot: PropertySnapshot = { + ...previousSnapshot, + id: 'snap-2', + capturedAt: '2026-02-01T00:00:00Z', + ownerName: 'Jane Smith', + }; + + mockPrisma.subscription.findMany.mockResolvedValue([mockSubscription]); + mockLatestSnapshots([currentSnapshot]); + mockPreviousSnapshot(previousSnapshot); + mockDetectChanges.mockReturnValue({ + propertyId: 'prop-001', + changeType: 'ownership_transfer', + severity: 'major', + confidence: 0.95, + changes: [], + previousSnapshot, + currentSnapshot, + detectedAt: new Date().toISOString(), + }); + mockShouldTriggerAlert.mockReturnValue(true); + mockProcessChangeDetection.mockResolvedValue({ + id: 'alert-001', + propertyId: 'prop-001', + subscriptionId: 'sub-001', + userId: 'user-001', + changeType: 'ownership_transfer', + severity: 'critical', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change detected', + changeDetectionResult: {} as any, + channel: ['email', 'push', 'sms'], + dedupKey: 'hometitle:user-001:prop-001:ownership_transfer', + createdAt: new Date().toISOString(), + }); + + const result = await scheduler.runScan(); + + expect(result.propertiesScanned).toBeGreaterThanOrEqual(0); + expect(result.changesDetected).toBeGreaterThanOrEqual(1); + expect(result.alertsCreated).toBeGreaterThanOrEqual(1); + expect(result.notificationsSent).toBeGreaterThanOrEqual(1); + }); + + it('skips snapshots without previous', async () => { + mockPrisma.subscription.findMany.mockResolvedValue([mockSubscription]); + mockLatestSnapshots([{ + id: 'snap-1', + propertyId: 'prop-001', + capturedAt: '2026-01-01T00:00:00Z', + ownerName: 'John Doe', + address: { streetNumber: '123', streetName: 'main', city: 'springfield', state: 'IL', zip: '62701' }, + propertyType: 'residential', + }]); + mockPreviousSnapshot(null); + + const result = await scheduler.runScan(); + + expect(result.changesDetected).toBe(0); + }); + + it('handles subscription scan errors gracefully', async () => { + mockPrisma.subscription.findMany.mockResolvedValue([mockSubscription]); + mockLatestSnapshots([]); + mockPreviousSnapshot(null); + mockDetectChanges.mockReturnValue({ + propertyId: 'prop-001', + changeType: 'metadata_change', + severity: 'minor', + confidence: 0.5, + changes: [], + previousSnapshot: {} as any, + currentSnapshot: {} as any, + detectedAt: new Date().toISOString(), + }); + mockShouldTriggerAlert.mockReturnValue(false); + + const result = await scheduler.runScan(); + + expect(result.errors).toEqual([]); + expect(result.propertiesScanned).toBe(0); + }); + + it('tracks scan metadata', async () => { + mockPrisma.subscription.findMany.mockResolvedValue([]); + + const result = await scheduler.runScan(); + + expect(result.scanId).toBeDefined(); + expect(result.startedAt).toBeDefined(); + expect(result.completedAt).toBeDefined(); + // completedAt should be after startedAt + expect(new Date(result.completedAt).getTime()).toBeGreaterThanOrEqual( + new Date(result.startedAt).getTime() + ); + }); + + it('does not send notifications for non-premium tier', async () => { + const previousSnapshot: PropertySnapshot = { + id: 'snap-1', + propertyId: 'prop-001', + capturedAt: '2026-01-01T00:00:00Z', + ownerName: 'John Doe', + address: { streetNumber: '123', streetName: 'main', city: 'springfield', state: 'IL', zip: '62701' }, + propertyType: 'residential', + }; + const currentSnapshot: PropertySnapshot = { + ...previousSnapshot, + id: 'snap-2', + capturedAt: '2026-02-01T00:00:00Z', + ownerName: 'Jane Smith', + }; + + const nonPremiumSub = { ...mockSubscription, tier: 'plus' as const }; + mockPrisma.subscription.findMany.mockResolvedValue([nonPremiumSub]); + mockLatestSnapshots([currentSnapshot]); + mockPreviousSnapshot(previousSnapshot); + mockDetectChanges.mockReturnValue({ + propertyId: 'prop-001', + changeType: 'ownership_transfer', + severity: 'major', + confidence: 0.95, + changes: [], + previousSnapshot, + currentSnapshot, + detectedAt: new Date().toISOString(), + }); + mockShouldTriggerAlert.mockReturnValue(true); + mockProcessChangeDetection.mockResolvedValue({ + id: 'alert-002', + propertyId: 'prop-001', + subscriptionId: 'sub-001', + userId: 'user-001', + changeType: 'ownership_transfer', + severity: 'critical', + title: '[MAJOR] Ownership Transfer detected', + message: 'Change detected', + changeDetectionResult: {} as any, + channel: ['email', 'push'], + dedupKey: 'hometitle:user-001:prop-001:ownership_transfer', + createdAt: new Date().toISOString(), + }); + + const result = await scheduler.runScan(); + + expect(result.changesDetected).toBeGreaterThanOrEqual(1); + expect(result.alertsCreated).toBeGreaterThanOrEqual(1); + expect(result.notificationsSent).toBe(0); + }); + }); + + describe('getLastScanResult', () => { + it('returns null before first scan', () => { + expect(scheduler.getLastScanResult()).toBeNull(); + }); + + it('returns last scan result after scan', async () => { + mockPrisma.subscription.findMany.mockResolvedValue([]); + await scheduler.runScan(); + expect(scheduler.getLastScanResult()).not.toBeNull(); + }); + }); +});