import { TRPCError } from "@trpc/server"; import { eq, and, desc, count, gte, inArray } from "drizzle-orm"; import { db } from "~/server/db"; import { subscriptions, propertyWatchlistItems, propertySnapshots, propertyChanges, alerts, normalizedAlerts, } from "~/server/db/schema"; import { detectChanges, type DetectedChange, type SnapshotData, } from "./hometitle/change.detector"; import { publishAlert } from "~/server/services/alert.publisher"; import { geocodeAddress, fetchCountyRecords, parseAddress, getLastSnapshot, } from "./hometitle/scanner"; import { getEffectiveTier, getActiveTrials, hasFeatureAccess, type SubWithEffectiveTier, } from "~/server/lib/tier"; // --------------------------------------------------------------------------- // Tier limits for property monitoring // Guard (basic) – 1 property // Fortress (plus) – 3 properties // Family (premium) – 5 properties // --------------------------------------------------------------------------- const MAX_PROPERTIES: Record = { basic: 1, plus: 3, premium: 5, }; async function getSubscription(userId: string): Promise { const [sub] = await db .select() .from(subscriptions) .where(and( eq(subscriptions.userId, userId), inArray(subscriptions.status, ["active", "trialing"]), )) .limit(1); if (!sub) { throw new TRPCError({ code: "NOT_FOUND", message: "No active subscription found" }); } const trials = await getActiveTrials(userId); return { id: sub.id, userId: sub.userId, tier: sub.tier as SubWithEffectiveTier["tier"], status: sub.status as SubWithEffectiveTier["status"], effectiveTier: getEffectiveTier(sub.tier as SubWithEffectiveTier["tier"], sub.status as SubWithEffectiveTier["status"]), isTrialing: sub.status === "trialing", trials, }; } export async function getProperties(userId: string) { const sub = await getSubscription(userId); if (!hasFeatureAccess(sub, "hometitle")) { throw new TRPCError({ code: "FORBIDDEN", message: "HomeTitle requires a Plus subscription or active feature trial" }); } const items = await db .select() .from(propertyWatchlistItems) .where( and( eq(propertyWatchlistItems.subscriptionId, sub.id), eq(propertyWatchlistItems.isActive, true), ), ) .orderBy(desc(propertyWatchlistItems.createdAt)); return items; } export async function addProperty( userId: string, address: string, parcelId?: string, ownerName?: string, ) { const sub = await getSubscription(userId); if (!hasFeatureAccess(sub, "hometitle")) { throw new TRPCError({ code: "FORBIDDEN", message: "HomeTitle requires a Plus subscription or active feature trial" }); } // Enforce tier property count limits const maxAllowed = MAX_PROPERTIES[sub.effectiveTier] ?? 1; const [{ count: activeCount }] = await db .select({ count: count() }) .from(propertyWatchlistItems) .where( and( eq(propertyWatchlistItems.subscriptionId, sub.id), eq(propertyWatchlistItems.isActive, true), ), ); if (activeCount >= maxAllowed) { throw new TRPCError({ code: "TOO_MANY_REQUESTS", message: `Property limit reached: ${maxAllowed} active properties for ${sub.effectiveTier} tier`, }); } const parsed = parseAddress(address); const coords = await geocodeAddress(address); const [inserted] = await db .insert(propertyWatchlistItems) .values({ subscriptionId: sub.id, address, parcelId: parcelId ?? null, ownerName: ownerName ?? null, streetAddress: parsed.streetAddress, city: parsed.city, state: parsed.state, zipCode: parsed.zipCode, latitude: coords?.latitude ?? null, longitude: coords?.longitude ?? null, }) .returning(); // Try to fetch real property data from Attom for the initial snapshot let initialOwnerName = ownerName ?? "Unknown Owner"; let initialDeedDate: string | null = null; let initialTaxAmount: number | null = null; let initialLienCount = 0; let initialPropertyType = "residential"; let initialAddress: Record = { full: address, ...parsed }; let initialTaxId: string | null = null; try { const apiKey = process.env.ATTOM_API_KEY; if (apiKey && !ownerName) { const record = await fetchCountyRecords( parcelId ?? null, parsed.state, parsed.state, inserted.id, sub.id, userId, ); if (record) { initialOwnerName = record.ownerName; initialDeedDate = record.deedDate; initialTaxAmount = record.taxAmount ?? null; initialLienCount = record.lienCount; initialPropertyType = record.propertyType ?? "residential"; initialAddress = record.address; initialTaxId = record.taxId; // Update parcel ID from Attom lookup if (record.taxId && !parcelId) { db.update(propertyWatchlistItems) .set({ ownerName: record.ownerName, parcelId: record.taxId, updatedAt: new Date() }) .where(eq(propertyWatchlistItems.id, inserted.id)) .catch(() => {}); } } } } catch (err) { // Initial fetch is best-effort; log but proceed with defaults console.warn("[hometitle] Failed to fetch initial Attom data for property:", err); } await db.insert(propertySnapshots).values({ propertyWatchlistItemId: inserted.id, subscriptionId: sub.id, capturedAt: new Date(), ownerName: initialOwnerName, address: initialAddress, deedDate: initialDeedDate, taxId: initialTaxId, propertyType: initialPropertyType, taxAmount: initialTaxAmount, lienCount: initialLienCount, }); return inserted; } export async function removeProperty(userId: string, propertyId: string) { const sub = await getSubscription(userId); if (!hasFeatureAccess(sub, "hometitle")) { throw new TRPCError({ code: "FORBIDDEN", message: "HomeTitle requires a Plus subscription or active feature trial" }); } const [item] = await db .select() .from(propertyWatchlistItems) .where( and( eq(propertyWatchlistItems.id, propertyId), eq(propertyWatchlistItems.subscriptionId, sub.id), ), ) .limit(1); if (!item) { throw new TRPCError({ code: "NOT_FOUND", message: "Property not found" }); } const [deleted] = await db .update(propertyWatchlistItems) .set({ isActive: false, updatedAt: new Date() }) .where(eq(propertyWatchlistItems.id, propertyId)) .returning(); return deleted; } export async function getSnapshots(userId: string, propertyId: string) { const sub = await getSubscription(userId); if (!hasFeatureAccess(sub, "hometitle")) { throw new TRPCError({ code: "FORBIDDEN", message: "HomeTitle requires a Plus subscription or active feature trial" }); } const [item] = await db .select() .from(propertyWatchlistItems) .where( and( eq(propertyWatchlistItems.id, propertyId), eq(propertyWatchlistItems.subscriptionId, sub.id), ), ) .limit(1); if (!item) { throw new TRPCError({ code: "NOT_FOUND", message: "Property not found" }); } const snapshots = await db .select() .from(propertySnapshots) .where(eq(propertySnapshots.propertyWatchlistItemId, propertyId)) .orderBy(desc(propertySnapshots.capturedAt)); return snapshots; } export async function getChanges( userId: string, propertyId: string, filters?: { severity?: string; changeType?: string }, ) { const sub = await getSubscription(userId); if (!hasFeatureAccess(sub, "hometitle")) { throw new TRPCError({ code: "FORBIDDEN", message: "HomeTitle requires a Plus subscription or active feature trial" }); } const [item] = await db .select() .from(propertyWatchlistItems) .where( and( eq(propertyWatchlistItems.id, propertyId), eq(propertyWatchlistItems.subscriptionId, sub.id), ), ) .limit(1); if (!item) { throw new TRPCError({ code: "NOT_FOUND", message: "Property not found" }); } const conditions = [eq(propertyChanges.propertyWatchlistItemId, propertyId)]; if (filters?.severity) { conditions.push( eq( propertyChanges.severity, filters.severity as "info" | "warning" | "critical", ), ); } if (filters?.changeType) { conditions.push( eq( propertyChanges.changeType, filters.changeType as | "tax_change" | "deed_change" | "ownership_transfer" | "lien_filing" | "metadata_change", ), ); } const items = await db .select() .from(propertyChanges) .where(and(...conditions)) .orderBy(desc(propertyChanges.detectedAt)); return items; } export async function getAlerts(userId: string) { const sub = await getSubscription(userId); if (!hasFeatureAccess(sub, "hometitle")) { throw new TRPCError({ code: "FORBIDDEN", message: "HomeTitle requires a Plus subscription or active feature trial" }); } const items = await db .select() .from(propertyChanges) .where( and( eq(propertyChanges.severity, "warning"), eq(propertyChanges.propertyWatchlistItemId, sub.id), ), ) .orderBy(desc(propertyChanges.detectedAt)); const propertyIds = [...new Set(items.map((c) => c.propertyWatchlistItemId))]; const properties = await Promise.all( propertyIds.map((pid) => db .select({ id: propertyWatchlistItems.id, address: propertyWatchlistItems.address }) .from(propertyWatchlistItems) .where(eq(propertyWatchlistItems.id, pid)) .limit(1) .then((r) => r[0]), ), ); const propertyMap = new Map(properties.filter(Boolean).map((p) => [p.id, p.address])); const criticalItems = await db .select() .from(propertyChanges) .where( and( eq(propertyChanges.severity, "critical"), eq(propertyChanges.propertyWatchlistItemId, sub.id), ), ) .orderBy(desc(propertyChanges.detectedAt)); const allChanges = [...criticalItems, ...items]; return allChanges.map((c) => ({ ...c, propertyAddress: propertyMap.get(c.propertyWatchlistItemId) ?? null, })); } async function checkTierLimits(userId: string): Promise<{ allowed: boolean; reason?: string }> { const sub = await getSubscription(userId); const tier = sub.effectiveTier; // Premium has no scan limit if (tier === "premium") { return { allowed: true }; } const maxScans: Record = { basic: 1, plus: 3, }; const maxScanCount = maxScans[tier] ?? 1; // Basic = monthly window, Plus = weekly window const periodStart = tier === "plus" ? new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const [result] = await db .select({ count: count() }) .from(propertySnapshots) .where( and( eq(propertySnapshots.subscriptionId, sub.id), gte(propertySnapshots.capturedAt, periodStart), ), ); if (result.count >= maxScanCount) { const periodLabel = tier === "plus" ? "week" : "month"; return { allowed: false, reason: `Scan limit reached: ${maxScanCount} per ${periodLabel} for ${tier} tier`, }; } return { allowed: true }; } export async function runScan(userId: string): Promise<{ scanId: string }> { const sub = await getSubscription(userId); if (!hasFeatureAccess(sub, "hometitle")) { throw new TRPCError({ code: "FORBIDDEN", message: "HomeTitle requires a Plus subscription or active feature trial" }); } const tierCheck = await checkTierLimits(userId); if (!tierCheck.allowed) { throw new TRPCError({ code: "TOO_MANY_REQUESTS", message: tierCheck.reason }); } const items = await db .select() .from(propertyWatchlistItems) .where( and( eq(propertyWatchlistItems.subscriptionId, sub.id), eq(propertyWatchlistItems.isActive, true), ), ); const scanId = crypto.randomUUID(); for (const item of items) { try { const lastSnapshot = await getLastSnapshot(item.id); const county = item.state || "Unknown"; const currentRecord = await fetchCountyRecords( item.parcelId, county, item.state ?? "", item.id, sub.id, userId, ); if (!currentRecord) { console.warn(`[hometitle] No Attom data returned for property ${item.id} — monitoring gap may exist`); continue; } const newData: SnapshotData = { ownerName: currentRecord.ownerName, deedDate: currentRecord.deedDate, taxAmount: currentRecord.taxAmount, lienCount: currentRecord.lienCount, propertyType: currentRecord.propertyType, taxId: currentRecord.taxId, }; const changes = detectChanges( lastSnapshot ? { ownerName: lastSnapshot.ownerName, deedDate: lastSnapshot.deedDate, taxAmount: lastSnapshot.taxAmount, lienCount: lastSnapshot.lienCount, propertyType: lastSnapshot.propertyType, taxId: lastSnapshot.taxId, } : null, newData, ); const [snapshot] = await db .insert(propertySnapshots) .values({ propertyWatchlistItemId: item.id, subscriptionId: sub.id, capturedAt: new Date(), ownerName: currentRecord.ownerName, address: currentRecord.address, deedDate: currentRecord.deedDate, taxId: currentRecord.taxId, propertyType: currentRecord.propertyType, taxAmount: currentRecord.taxAmount, lienCount: currentRecord.lienCount, }) .returning(); for (const change of changes) { await createPropertyChange(sub, item.id, snapshot.id, change); } } catch (err) { console.error(`[hometitle] Scan failed for property ${item.id}:`, err); } } return { scanId }; } async function createPropertyChange( sub: { id: string; userId: string }, propertyWatchlistItemId: string, snapshotId: string, change: DetectedChange, ) { const [inserted] = await db .insert(propertyChanges) .values({ propertyWatchlistItemId, snapshotId, changeType: change.changeType, severity: change.severity, details: change.details, }) .returning(); if (change.severity === "warning" || change.severity === "critical") { await generateAlert(sub, propertyWatchlistItemId, inserted, change); } return inserted; } async function generateAlert( sub: { id: string; userId: string }, propertyWatchlistItemId: string, change: { id: string; severity: string; changeType: string; details: unknown }, detectedChange: DetectedChange, ) { const [property] = await db .select({ address: propertyWatchlistItems.address }) .from(propertyWatchlistItems) .where(eq(propertyWatchlistItems.id, propertyWatchlistItemId)) .limit(1); const severityLabel = change.severity === "critical" ? "Critical" : change.severity === "warning" ? "Warning" : "Info"; const changeLabels: Record = { ownership_transfer: "Ownership Transfer", lien_filing: "Lien Filing", tax_change: "Tax Change", deed_change: "Deed Change", metadata_change: "Metadata Change", }; const changeLabel = changeLabels[detectedChange.changeType] ?? detectedChange.changeType; const propertyAddress = property?.address ?? "Unknown"; const title = `${severityLabel} property change detected`; const message = `${changeLabel} at ${propertyAddress}`; const [alert] = await db .insert(alerts) .values({ subscriptionId: sub.id, userId: sub.userId, type: "property_change", title, message, severity: change.severity as "info" | "warning" | "critical", channel: ["email", "push"], }) .returning(); const severityMap: Record = { info: "INFO", warning: "WARNING", critical: "CRITICAL", }; const [normalized] = await db .insert(normalizedAlerts) .values({ source: "HOME_TITLE", category: "HOME_TITLE", severity: severityMap[change.severity] ?? "INFO", userId: sub.userId, title, description: message, entities: { propertyWatchlistItemId, changeId: change.id, propertyAddress, changeType: detectedChange.changeType, details: detectedChange.details, }, sourceAlertId: `hometitle:${change.id}`, createdAt: new Date(), }) .returning(); publishAlert(sub.userId, { id: alert.id, title, message, severity: change.severity, source: "HOME_TITLE", category: "HOME_TITLE", createdAt: alert.createdAt, }).catch((err) => console.error("[hometitle] Failed to publish alert:", err)); return alert; }