feat(hometitle): add Backend Router — HomeTitle (Property Monitoring)
- Add hometitle schema (Valibot input schemas) - Add change detector (fuzzy matching, severity, change detection) - Add scanner module (geocoding, county records placeholder) - Add hometitle service (property CRUD, scan, alert pipeline) - Add hometitle router (7 tRPC procedures) - Wire into api root - Add alert type 'property_change' to enum - Write unit tests (10 tests, all passing)
This commit is contained in:
458
web/src/server/services/hometitle.service.ts
Normal file
458
web/src/server/services/hometitle.service.ts
Normal file
@@ -0,0 +1,458 @@
|
||||
import { TRPCError } from "@trpc/server";
|
||||
import { eq, and, desc, count, gte } from "drizzle-orm";
|
||||
import { db } from "~/server/db";
|
||||
import {
|
||||
subscriptions,
|
||||
propertyWatchlistItems,
|
||||
propertySnapshots,
|
||||
propertyChanges,
|
||||
alerts,
|
||||
normalizedAlerts,
|
||||
} from "~/server/db/schema";
|
||||
import {
|
||||
detectChanges,
|
||||
type DetectedChange,
|
||||
type SnapshotData,
|
||||
} from "./hometitle/change.detector";
|
||||
import {
|
||||
geocodeAddress,
|
||||
fetchCountyRecords,
|
||||
parseAddress,
|
||||
getLastSnapshot,
|
||||
} from "./hometitle/scanner";
|
||||
|
||||
async function getSubscription(userId: string) {
|
||||
const [sub] = await db
|
||||
.select()
|
||||
.from(subscriptions)
|
||||
.where(and(eq(subscriptions.userId, userId), eq(subscriptions.status, "active")))
|
||||
.limit(1);
|
||||
if (!sub) {
|
||||
throw new TRPCError({ code: "NOT_FOUND", message: "No active subscription found" });
|
||||
}
|
||||
return sub;
|
||||
}
|
||||
|
||||
export async function getProperties(userId: string) {
|
||||
const sub = await getSubscription(userId);
|
||||
const items = await db
|
||||
.select()
|
||||
.from(propertyWatchlistItems)
|
||||
.where(
|
||||
and(
|
||||
eq(propertyWatchlistItems.subscriptionId, sub.id),
|
||||
eq(propertyWatchlistItems.isActive, true),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(propertyWatchlistItems.createdAt));
|
||||
return items;
|
||||
}
|
||||
|
||||
export async function addProperty(
|
||||
userId: string,
|
||||
address: string,
|
||||
parcelId?: string,
|
||||
ownerName?: string,
|
||||
) {
|
||||
const sub = await getSubscription(userId);
|
||||
|
||||
const parsed = parseAddress(address);
|
||||
const coords = await geocodeAddress(address);
|
||||
|
||||
const [inserted] = await db
|
||||
.insert(propertyWatchlistItems)
|
||||
.values({
|
||||
subscriptionId: sub.id,
|
||||
address,
|
||||
parcelId: parcelId ?? null,
|
||||
ownerName: ownerName ?? null,
|
||||
streetAddress: parsed.streetAddress,
|
||||
city: parsed.city,
|
||||
state: parsed.state,
|
||||
zipCode: parsed.zipCode,
|
||||
latitude: coords?.latitude ?? null,
|
||||
longitude: coords?.longitude ?? null,
|
||||
})
|
||||
.returning();
|
||||
|
||||
await db.insert(propertySnapshots).values({
|
||||
propertyWatchlistItemId: inserted.id,
|
||||
subscriptionId: sub.id,
|
||||
capturedAt: new Date(),
|
||||
ownerName: ownerName ?? "Unknown",
|
||||
address: { full: address, ...parsed },
|
||||
deedDate: null,
|
||||
taxId: null,
|
||||
propertyType: "residential",
|
||||
taxAmount: null,
|
||||
lienCount: 0,
|
||||
});
|
||||
|
||||
return inserted;
|
||||
}
|
||||
|
||||
export async function removeProperty(userId: string, propertyId: string) {
|
||||
const sub = await getSubscription(userId);
|
||||
|
||||
const [item] = await db
|
||||
.select()
|
||||
.from(propertyWatchlistItems)
|
||||
.where(
|
||||
and(
|
||||
eq(propertyWatchlistItems.id, propertyId),
|
||||
eq(propertyWatchlistItems.subscriptionId, sub.id),
|
||||
),
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (!item) {
|
||||
throw new TRPCError({ code: "NOT_FOUND", message: "Property not found" });
|
||||
}
|
||||
|
||||
const [deleted] = await db
|
||||
.update(propertyWatchlistItems)
|
||||
.set({ isActive: false, updatedAt: new Date() })
|
||||
.where(eq(propertyWatchlistItems.id, propertyId))
|
||||
.returning();
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
export async function getSnapshots(userId: string, propertyId: string) {
|
||||
const sub = await getSubscription(userId);
|
||||
|
||||
const [item] = await db
|
||||
.select()
|
||||
.from(propertyWatchlistItems)
|
||||
.where(
|
||||
and(
|
||||
eq(propertyWatchlistItems.id, propertyId),
|
||||
eq(propertyWatchlistItems.subscriptionId, sub.id),
|
||||
),
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (!item) {
|
||||
throw new TRPCError({ code: "NOT_FOUND", message: "Property not found" });
|
||||
}
|
||||
|
||||
const snapshots = await db
|
||||
.select()
|
||||
.from(propertySnapshots)
|
||||
.where(eq(propertySnapshots.propertyWatchlistItemId, propertyId))
|
||||
.orderBy(desc(propertySnapshots.capturedAt));
|
||||
|
||||
return snapshots;
|
||||
}
|
||||
|
||||
export async function getChanges(
|
||||
userId: string,
|
||||
propertyId: string,
|
||||
filters?: { severity?: string; changeType?: string },
|
||||
) {
|
||||
const sub = await getSubscription(userId);
|
||||
|
||||
const [item] = await db
|
||||
.select()
|
||||
.from(propertyWatchlistItems)
|
||||
.where(
|
||||
and(
|
||||
eq(propertyWatchlistItems.id, propertyId),
|
||||
eq(propertyWatchlistItems.subscriptionId, sub.id),
|
||||
),
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (!item) {
|
||||
throw new TRPCError({ code: "NOT_FOUND", message: "Property not found" });
|
||||
}
|
||||
|
||||
const conditions = [eq(propertyChanges.propertyWatchlistItemId, propertyId)];
|
||||
if (filters?.severity) {
|
||||
conditions.push(
|
||||
eq(
|
||||
propertyChanges.severity,
|
||||
filters.severity as "info" | "warning" | "critical",
|
||||
),
|
||||
);
|
||||
}
|
||||
if (filters?.changeType) {
|
||||
conditions.push(
|
||||
eq(
|
||||
propertyChanges.changeType,
|
||||
filters.changeType as
|
||||
| "tax_change"
|
||||
| "deed_change"
|
||||
| "ownership_transfer"
|
||||
| "lien_filing"
|
||||
| "metadata_change",
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
const items = await db
|
||||
.select()
|
||||
.from(propertyChanges)
|
||||
.where(and(...conditions))
|
||||
.orderBy(desc(propertyChanges.detectedAt));
|
||||
|
||||
return items;
|
||||
}
|
||||
|
||||
export async function getAlerts(userId: string) {
|
||||
const sub = await getSubscription(userId);
|
||||
|
||||
const items = await db
|
||||
.select()
|
||||
.from(propertyChanges)
|
||||
.where(
|
||||
and(
|
||||
eq(propertyChanges.severity, "warning"),
|
||||
eq(propertyChanges.propertyWatchlistItemId, sub.id),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(propertyChanges.detectedAt));
|
||||
|
||||
const propertyIds = [...new Set(items.map((c) => c.propertyWatchlistItemId))];
|
||||
const properties = await Promise.all(
|
||||
propertyIds.map((pid) =>
|
||||
db
|
||||
.select({ id: propertyWatchlistItems.id, address: propertyWatchlistItems.address })
|
||||
.from(propertyWatchlistItems)
|
||||
.where(eq(propertyWatchlistItems.id, pid))
|
||||
.limit(1)
|
||||
.then((r) => r[0]),
|
||||
),
|
||||
);
|
||||
|
||||
const propertyMap = new Map(properties.filter(Boolean).map((p) => [p.id, p.address]));
|
||||
|
||||
const criticalItems = await db
|
||||
.select()
|
||||
.from(propertyChanges)
|
||||
.where(
|
||||
and(
|
||||
eq(propertyChanges.severity, "critical"),
|
||||
eq(propertyChanges.propertyWatchlistItemId, sub.id),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(propertyChanges.detectedAt));
|
||||
|
||||
const allChanges = [...criticalItems, ...items];
|
||||
return allChanges.map((c) => ({
|
||||
...c,
|
||||
propertyAddress: propertyMap.get(c.propertyWatchlistItemId) ?? null,
|
||||
}));
|
||||
}
|
||||
|
||||
async function checkTierLimits(userId: string): Promise<{ allowed: boolean; reason?: string }> {
|
||||
const sub = await getSubscription(userId);
|
||||
const tier = sub.tier;
|
||||
|
||||
if (tier === "premium") {
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
const maxScans: Record<string, number> = {
|
||||
basic: 1,
|
||||
plus: 4,
|
||||
};
|
||||
|
||||
const maxScanCount = maxScans[tier] ?? 1;
|
||||
const periodStart =
|
||||
tier === "plus"
|
||||
? new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
|
||||
: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
|
||||
|
||||
const [result] = await db
|
||||
.select({ count: count() })
|
||||
.from(propertySnapshots)
|
||||
.where(
|
||||
and(
|
||||
eq(propertySnapshots.subscriptionId, sub.id),
|
||||
gte(propertySnapshots.capturedAt, periodStart),
|
||||
),
|
||||
);
|
||||
|
||||
if (result.count >= maxScanCount) {
|
||||
const periodLabel = tier === "plus" ? "week" : "month";
|
||||
return {
|
||||
allowed: false,
|
||||
reason: `Scan limit reached: ${maxScanCount} per ${periodLabel} for ${tier} tier`,
|
||||
};
|
||||
}
|
||||
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
export async function runScan(userId: string): Promise<{ scanId: string }> {
|
||||
const sub = await getSubscription(userId);
|
||||
|
||||
const tierCheck = await checkTierLimits(userId);
|
||||
if (!tierCheck.allowed) {
|
||||
throw new TRPCError({ code: "TOO_MANY_REQUESTS", message: tierCheck.reason });
|
||||
}
|
||||
|
||||
const items = await db
|
||||
.select()
|
||||
.from(propertyWatchlistItems)
|
||||
.where(
|
||||
and(
|
||||
eq(propertyWatchlistItems.subscriptionId, sub.id),
|
||||
eq(propertyWatchlistItems.isActive, true),
|
||||
),
|
||||
);
|
||||
|
||||
const scanId = crypto.randomUUID();
|
||||
|
||||
for (const item of items) {
|
||||
try {
|
||||
const lastSnapshot = await getLastSnapshot(item.id);
|
||||
const county = item.state || "Unknown";
|
||||
const currentRecord = await fetchCountyRecords(item.parcelId, county, item.state ?? "");
|
||||
|
||||
if (!currentRecord) continue;
|
||||
|
||||
const newData: SnapshotData = {
|
||||
ownerName: currentRecord.ownerName,
|
||||
deedDate: currentRecord.deedDate,
|
||||
taxAmount: currentRecord.taxAmount,
|
||||
lienCount: currentRecord.lienCount,
|
||||
propertyType: currentRecord.propertyType,
|
||||
taxId: currentRecord.taxId,
|
||||
};
|
||||
|
||||
const changes = detectChanges(
|
||||
lastSnapshot
|
||||
? {
|
||||
ownerName: lastSnapshot.ownerName,
|
||||
deedDate: lastSnapshot.deedDate,
|
||||
taxAmount: lastSnapshot.taxAmount,
|
||||
lienCount: lastSnapshot.lienCount,
|
||||
propertyType: lastSnapshot.propertyType,
|
||||
taxId: lastSnapshot.taxId,
|
||||
}
|
||||
: null,
|
||||
newData,
|
||||
);
|
||||
|
||||
const [snapshot] = await db
|
||||
.insert(propertySnapshots)
|
||||
.values({
|
||||
propertyWatchlistItemId: item.id,
|
||||
subscriptionId: sub.id,
|
||||
capturedAt: new Date(),
|
||||
ownerName: currentRecord.ownerName,
|
||||
address: currentRecord.address,
|
||||
deedDate: currentRecord.deedDate,
|
||||
taxId: currentRecord.taxId,
|
||||
propertyType: currentRecord.propertyType,
|
||||
taxAmount: currentRecord.taxAmount,
|
||||
lienCount: currentRecord.lienCount,
|
||||
})
|
||||
.returning();
|
||||
|
||||
for (const change of changes) {
|
||||
await createPropertyChange(sub, item.id, snapshot.id, change);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[hometitle] Scan failed for property ${item.id}:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
return { scanId };
|
||||
}
|
||||
|
||||
async function createPropertyChange(
|
||||
sub: { id: string; userId: string },
|
||||
propertyWatchlistItemId: string,
|
||||
snapshotId: string,
|
||||
change: DetectedChange,
|
||||
) {
|
||||
const [inserted] = await db
|
||||
.insert(propertyChanges)
|
||||
.values({
|
||||
propertyWatchlistItemId,
|
||||
snapshotId,
|
||||
changeType: change.changeType,
|
||||
severity: change.severity,
|
||||
details: change.details,
|
||||
})
|
||||
.returning();
|
||||
|
||||
if (change.severity === "warning" || change.severity === "critical") {
|
||||
await generateAlert(sub, propertyWatchlistItemId, inserted, change);
|
||||
}
|
||||
|
||||
return inserted;
|
||||
}
|
||||
|
||||
async function generateAlert(
|
||||
sub: { id: string; userId: string },
|
||||
propertyWatchlistItemId: string,
|
||||
change: { id: string; severity: string; changeType: string; details: unknown },
|
||||
detectedChange: DetectedChange,
|
||||
) {
|
||||
const [property] = await db
|
||||
.select({ address: propertyWatchlistItems.address })
|
||||
.from(propertyWatchlistItems)
|
||||
.where(eq(propertyWatchlistItems.id, propertyWatchlistItemId))
|
||||
.limit(1);
|
||||
|
||||
const severityLabel =
|
||||
change.severity === "critical" ? "Critical" : change.severity === "warning" ? "Warning" : "Info";
|
||||
const changeLabels: Record<string, string> = {
|
||||
ownership_transfer: "Ownership Transfer",
|
||||
lien_filing: "Lien Filing",
|
||||
tax_change: "Tax Change",
|
||||
deed_change: "Deed Change",
|
||||
metadata_change: "Metadata Change",
|
||||
};
|
||||
const changeLabel = changeLabels[detectedChange.changeType] ?? detectedChange.changeType;
|
||||
const propertyAddress = property?.address ?? "Unknown";
|
||||
|
||||
const title = `${severityLabel} property change detected`;
|
||||
const message = `${changeLabel} at ${propertyAddress}`;
|
||||
|
||||
const [alert] = await db
|
||||
.insert(alerts)
|
||||
.values({
|
||||
subscriptionId: sub.id,
|
||||
userId: sub.userId,
|
||||
type: "property_change",
|
||||
title,
|
||||
message,
|
||||
severity: change.severity as "info" | "warning" | "critical",
|
||||
channel: ["email", "push"],
|
||||
})
|
||||
.returning();
|
||||
|
||||
const severityMap: Record<string, "LOW" | "INFO" | "MEDIUM" | "WARNING" | "HIGH" | "CRITICAL"> = {
|
||||
info: "INFO",
|
||||
warning: "WARNING",
|
||||
critical: "CRITICAL",
|
||||
};
|
||||
|
||||
await db
|
||||
.insert(normalizedAlerts)
|
||||
.values({
|
||||
source: "HOME_TITLE",
|
||||
category: "HOME_TITLE",
|
||||
severity: severityMap[change.severity] ?? "INFO",
|
||||
userId: sub.userId,
|
||||
title,
|
||||
description: message,
|
||||
entities: {
|
||||
propertyWatchlistItemId,
|
||||
changeId: change.id,
|
||||
propertyAddress,
|
||||
changeType: detectedChange.changeType,
|
||||
details: detectedChange.details,
|
||||
},
|
||||
sourceAlertId: `hometitle:${change.id}`,
|
||||
createdAt: new Date(),
|
||||
})
|
||||
.returning();
|
||||
|
||||
return alert;
|
||||
}
|
||||
Reference in New Issue
Block a user