feat(darkwatch): implement DarkWatch tRPC router and service layer

- Add darkwatch router with procedures: getWatchlist, addWatchlistItem,
  removeWatchlistItem, getExposures, getExposureDetails, runScan,
  getScanStatus, getReports
- Add darkwatch service with watchlist CRUD, exposure queries,
  scan orchestration, tier limit enforcement, report listing
- Add scan engine with HIBP, SecurityTrails, Censys, Shodan, and
  forum scraping modules (circuit breaker pattern, env-var API keys)
- Add alert pipeline with severity scoring, deduplication, and
  exposure-to-alert creation
- Add valibot schemas for input validation
- Register router in root.ts
- Write unit tests for router procedures, service functions,
  and severity scoring (21 tests passing)
This commit is contained in:
2026-05-25 16:19:23 -04:00
parent 5154990acd
commit b2c3470a71
9 changed files with 1120 additions and 0 deletions

View File

@@ -2,6 +2,7 @@ import { exampleRouter } from "./routers/example";
import { userRouter } from "./routers/user";
import { billingRouter } from "./routers/billing";
import { notificationRouter } from "./routers/notification";
import { darkwatchRouter } from "./routers/darkwatch";
import { createTRPCRouter } from "./utils";
export const appRouter = createTRPCRouter({
@@ -9,6 +10,7 @@ export const appRouter = createTRPCRouter({
user: userRouter,
billing: billingRouter,
notification: notificationRouter,
darkwatch: darkwatchRouter,
});
export type AppRouter = typeof appRouter;

View File

@@ -0,0 +1,202 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { initTRPC, TRPCError } from "@trpc/server";
import { wrap } from "@typeschema/valibot";
import {
AddWatchlistItemSchema,
RemoveWatchlistItemSchema,
ExposureFilterSchema,
ExposureDetailsSchema,
RunScanSchema,
ReportFilterSchema,
} from "../schemas/darkwatch";
vi.mock("~/server/services/darkwatch.service", () => ({
getWatchlistItems: vi.fn(),
addWatchlistItem: vi.fn(),
removeWatchlistItem: vi.fn(),
getExposures: vi.fn(),
getExposureDetails: vi.fn(),
runScan: vi.fn(),
getScanStatus: vi.fn(),
getReports: vi.fn(),
}));
import * as darkwatchService from "~/server/services/darkwatch.service";
const mockGetWatchlistItems = vi.mocked(darkwatchService.getWatchlistItems);
const mockAddWatchlistItem = vi.mocked(darkwatchService.addWatchlistItem);
const mockRemoveWatchlistItem = vi.mocked(darkwatchService.removeWatchlistItem);
const mockGetExposures = vi.mocked(darkwatchService.getExposures);
const mockGetExposureDetails = vi.mocked(darkwatchService.getExposureDetails);
const mockRunScan = vi.mocked(darkwatchService.runScan);
const mockGetScanStatus = vi.mocked(darkwatchService.getScanStatus);
const mockGetReports = vi.mocked(darkwatchService.getReports);
type User = {
id: string; email: string; name: string | null; image: string | null;
role: string; emailVerified: Date | null; deletedAt: Date | null;
stripeCustomerId: string | null;
createdAt: Date; updatedAt: Date;
};
type Ctx = { db: object; user: User | null; apiKey: string | null };
function createCaller(user: User | null) {
const t = initTRPC.context<Ctx>().create();
const isAuthed = t.middleware(({ ctx, next }) => {
if (!ctx.user) throw new TRPCError({ code: "UNAUTHORIZED" });
return next({ ctx: { ...ctx, user: ctx.user } });
});
const router = t.router({
getWatchlist: t.procedure.use(isAuthed).query(async ({ ctx }) => {
return mockGetWatchlistItems(ctx.user.id);
}),
addWatchlistItem: t.procedure.use(isAuthed)
.input(wrap(AddWatchlistItemSchema))
.mutation(async ({ ctx, input }) => {
return mockAddWatchlistItem(ctx.user.id, input.type, input.value);
}),
removeWatchlistItem: t.procedure.use(isAuthed)
.input(wrap(RemoveWatchlistItemSchema))
.mutation(async ({ ctx, input }) => {
return mockRemoveWatchlistItem(ctx.user.id, input.itemId);
}),
getExposures: t.procedure.use(isAuthed)
.input(wrap(ExposureFilterSchema))
.query(async ({ ctx, input }) => {
return mockGetExposures(ctx.user.id, input);
}),
getExposureDetails: t.procedure.use(isAuthed)
.input(wrap(ExposureDetailsSchema))
.query(async ({ ctx, input }) => {
return mockGetExposureDetails(ctx.user.id, input.exposureId);
}),
runScan: t.procedure.use(isAuthed)
.input(wrap(RunScanSchema))
.mutation(async ({ ctx }) => {
return mockRunScan(ctx.user.id);
}),
getScanStatus: t.procedure.use(isAuthed).query(async ({ ctx }) => {
return mockGetScanStatus(ctx.user.id);
}),
getReports: t.procedure.use(isAuthed)
.input(wrap(ReportFilterSchema))
.query(async ({ ctx, input }) => {
return mockGetReports(ctx.user.id, input);
}),
});
const caller = t.createCallerFactory(router);
return caller({ db: {} as never, user, apiKey: null });
}
const baseUser: User = {
id: "user-1", email: "a@b.com", name: "Test", image: null,
role: "user", emailVerified: null, deletedAt: null,
stripeCustomerId: null,
createdAt: new Date(), updatedAt: new Date(),
};
function makeUser(overrides: Partial<User> = {}): User {
return { ...baseUser, ...overrides };
}
beforeEach(() => {
vi.clearAllMocks();
});
describe("darkwatch.getWatchlist", () => {
it("returns watchlist items for authenticated user", async () => {
const items = [{ id: "w1", type: "email", value: "test@example.com" }];
mockGetWatchlistItems.mockResolvedValue(items as never);
const api = createCaller(makeUser());
expect(await api.getWatchlist()).toEqual(items);
});
it("rejects unauthenticated", async () => {
const api = createCaller(null);
await expect(api.getWatchlist()).rejects.toThrow(TRPCError);
});
});
describe("darkwatch.addWatchlistItem", () => {
it("adds a watchlist item", async () => {
const item = { id: "w1", type: "email", value: "test@example.com" };
mockAddWatchlistItem.mockResolvedValue(item as never);
const api = createCaller(makeUser());
const result = await api.addWatchlistItem({ type: "email", value: "test@example.com" });
expect(result).toEqual(item);
});
it("rejects invalid type", async () => {
const api = createCaller(makeUser());
await expect(
api.addWatchlistItem({ type: "invalid" as never, value: "test" }),
).rejects.toThrow();
});
});
describe("darkwatch.removeWatchlistItem", () => {
it("removes a watchlist item", async () => {
mockRemoveWatchlistItem.mockResolvedValue({ id: "w1", isActive: false } as never);
const api = createCaller(makeUser());
const result = await api.removeWatchlistItem({ itemId: "w1" });
expect(result.isActive).toBe(false);
});
});
describe("darkwatch.getExposures", () => {
it("returns exposures with pagination", async () => {
const data = { items: [], total: 0, page: 1, limit: 20, totalPages: 0 };
mockGetExposures.mockResolvedValue(data);
const api = createCaller(makeUser());
const result = await api.getExposures({ page: 1, limit: 20 });
expect(result.total).toBe(0);
});
it("passes severity filter", async () => {
const data = { items: [], total: 0, page: 1, limit: 20, totalPages: 0 };
mockGetExposures.mockResolvedValue(data);
const api = createCaller(makeUser());
await api.getExposures({ severity: "critical" });
expect(mockGetExposures).toHaveBeenCalledWith("user-1", { severity: "critical", page: 1, limit: 20 });
});
});
describe("darkwatch.getExposureDetails", () => {
it("returns exposure details", async () => {
const exposure = { id: "e1", identifier: "test@example.com", watchlistItem: null };
mockGetExposureDetails.mockResolvedValue(exposure as never);
const api = createCaller(makeUser());
const result = await api.getExposureDetails({ exposureId: "e1" });
expect(result.id).toBe("e1");
});
});
describe("darkwatch.runScan", () => {
it("triggers a scan", async () => {
mockRunScan.mockResolvedValue({ scanId: "s1" });
const api = createCaller(makeUser());
const result = await api.runScan({});
expect(result.scanId).toBe("s1");
});
});
describe("darkwatch.getScanStatus", () => {
it("returns scan status", async () => {
mockGetScanStatus.mockResolvedValue({ status: "idle", startedAt: null, completedAt: null, progress: 0, error: null });
const api = createCaller(makeUser());
const result = await api.getScanStatus();
expect(result.status).toBe("idle");
});
});
describe("darkwatch.getReports", () => {
it("returns reports", async () => {
const data = { items: [], total: 0, page: 1, limit: 20, totalPages: 0 };
mockGetReports.mockResolvedValue(data);
const api = createCaller(makeUser());
const result = await api.getReports({ page: 1, limit: 20 });
expect(result.total).toBe(0);
});
});

View File

@@ -0,0 +1,57 @@
import { wrap } from "@typeschema/valibot";
import { createTRPCRouter, protectedProcedure } from "../utils";
import {
AddWatchlistItemSchema,
RemoveWatchlistItemSchema,
ExposureFilterSchema,
ExposureDetailsSchema,
RunScanSchema,
ReportFilterSchema,
} from "../schemas/darkwatch";
import * as darkwatchService from "~/server/services/darkwatch.service";
export const darkwatchRouter = createTRPCRouter({
getWatchlist: protectedProcedure.query(async ({ ctx }) => {
return darkwatchService.getWatchlistItems(ctx.user.id);
}),
addWatchlistItem: protectedProcedure
.input(wrap(AddWatchlistItemSchema))
.mutation(async ({ ctx, input }) => {
return darkwatchService.addWatchlistItem(ctx.user.id, input.type, input.value);
}),
removeWatchlistItem: protectedProcedure
.input(wrap(RemoveWatchlistItemSchema))
.mutation(async ({ ctx, input }) => {
return darkwatchService.removeWatchlistItem(ctx.user.id, input.itemId);
}),
getExposures: protectedProcedure
.input(wrap(ExposureFilterSchema))
.query(async ({ ctx, input }) => {
return darkwatchService.getExposures(ctx.user.id, input);
}),
getExposureDetails: protectedProcedure
.input(wrap(ExposureDetailsSchema))
.query(async ({ ctx, input }) => {
return darkwatchService.getExposureDetails(ctx.user.id, input.exposureId);
}),
runScan: protectedProcedure
.input(wrap(RunScanSchema))
.mutation(async ({ ctx }) => {
return darkwatchService.runScan(ctx.user.id);
}),
getScanStatus: protectedProcedure.query(async ({ ctx }) => {
return darkwatchService.getScanStatus(ctx.user.id);
}),
getReports: protectedProcedure
.input(wrap(ReportFilterSchema))
.query(async ({ ctx, input }) => {
return darkwatchService.getReports(ctx.user.id, input);
}),
});

View File

@@ -0,0 +1,28 @@
import { object, string, minLength, optional, number, picklist } from "valibot";
export const AddWatchlistItemSchema = object({
type: picklist(["email", "phoneNumber", "ssn", "address", "domain"]),
value: string([minLength(1)]),
});
export const RemoveWatchlistItemSchema = object({
itemId: string([minLength(1)]),
});
export const ExposureFilterSchema = object({
severity: optional(picklist(["info", "warning", "critical"])),
source: optional(picklist(["hibp", "securityTrails", "censys", "darkWebForum", "shodan", "honeypot"])),
page: optional(number(), 1),
limit: optional(number(), 20),
});
export const ExposureDetailsSchema = object({
exposureId: string([minLength(1)]),
});
export const RunScanSchema = object({});
export const ReportFilterSchema = object({
page: optional(number(), 1),
limit: optional(number(), 20),
});

View File

@@ -0,0 +1,138 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { TRPCError } from "@trpc/server";
const mockSelectFromWhereLimit = vi.fn();
const mockSelectFromWhereOrderByLimitOffset = vi.fn();
const mockInsertValuesReturning = vi.fn();
const mockUpdateSetWhereReturning = vi.fn();
const mockSubFindFirst = vi.fn();
const mockWhereLimit = vi.fn();
const mockCountSelectFromWhere = vi.fn();
vi.mock("~/server/db", () => ({
db: {
query: {
subscriptions: {
findFirst: mockSubFindFirst,
},
},
select: vi.fn((...args: unknown[]) => {
const firstArg = args[0] as { count?: unknown } | undefined;
const isCount = args.length > 0 && firstArg?.count;
if (isCount) {
return {
from: vi.fn(() => ({
where: vi.fn(() => mockCountSelectFromWhere()),
})),
};
}
return {
from: vi.fn(() => ({
where: vi.fn(() => ({
limit: mockSelectFromWhereLimit,
orderBy: vi.fn(() => ({
limit: vi.fn(() => ({
offset: mockSelectFromWhereOrderByLimitOffset,
})),
})),
})),
innerJoin: vi.fn(() => ({
where: vi.fn(() => ({
limit: vi.fn().mockResolvedValue([]),
})),
})),
})),
};
}),
insert: vi.fn(() => ({
values: vi.fn(() => ({
returning: mockInsertValuesReturning,
})),
})),
update: vi.fn(() => ({
set: vi.fn(() => ({
where: vi.fn(() => ({
returning: mockUpdateSetWhereReturning,
})),
})),
})),
},
}));
const mockSub = {
id: "sub-1",
userId: "user-1",
tier: "premium" as const,
status: "active" as const,
stripeId: null as string | null,
familyGroupId: null as string | null,
currentPeriodStart: new Date(),
currentPeriodEnd: new Date(Date.now() + 86400000),
cancelAtPeriodEnd: false,
createdAt: new Date(),
updatedAt: new Date(),
};
beforeEach(() => {
vi.clearAllMocks();
mockSelectFromWhereLimit.mockResolvedValue([mockSub]);
});
describe("checkTierLimits", () => {
it("allows premium tier always", async () => {
mockSubFindFirst.mockResolvedValue({ ...mockSub, tier: "premium" as const });
const { checkTierLimits } = await import("./darkwatch.service");
const result = await checkTierLimits("user-1");
expect(result.allowed).toBe(true);
});
it("allows basic tier if under limit", async () => {
mockSubFindFirst.mockResolvedValue({ ...mockSub, tier: "basic" as const });
mockCountSelectFromWhere.mockResolvedValue([{ count: 0 }]);
const { checkTierLimits } = await import("./darkwatch.service");
const result = await checkTierLimits("user-1");
expect(result.allowed).toBe(true);
});
});
describe("addWatchlistItem", () => {
it("hashes and deduplicates values", async () => {
mockSubFindFirst.mockResolvedValue(mockSub);
mockSelectFromWhereLimit
.mockResolvedValueOnce([mockSub])
.mockResolvedValueOnce([]);
mockInsertValuesReturning.mockResolvedValue([{
id: "w1", subscriptionId: "sub-1", type: "email", value: "test@example.com",
hash: "some-hash", isActive: true,
createdAt: new Date(), updatedAt: new Date(),
}]);
const { addWatchlistItem } = await import("./darkwatch.service");
const result = await addWatchlistItem("user-1", "email", "test@example.com");
expect(result.type).toBe("email");
expect(result.value).toBe("test@example.com");
});
});
describe("removeWatchlistItem", () => {
it("throws not found if item does not belong to user", async () => {
mockSubFindFirst.mockResolvedValue(mockSub);
mockSelectFromWhereLimit.mockResolvedValue([]);
const { removeWatchlistItem } = await import("./darkwatch.service");
await expect(removeWatchlistItem("user-1", "nonexistent")).rejects.toThrow(TRPCError);
});
});
describe("getExposures", () => {
it("returns paginated exposures", async () => {
mockSubFindFirst.mockResolvedValue(mockSub);
mockCountSelectFromWhere.mockResolvedValue([{ count: 0 }]);
mockSelectFromWhereOrderByLimitOffset.mockResolvedValue([]);
const { getExposures } = await import("./darkwatch.service");
const result = await getExposures("user-1", { page: 1, limit: 10 });
expect(result.items).toEqual([]);
expect(result.total).toBe(0);
expect(result.page).toBe(1);
});
});

View File

@@ -0,0 +1,363 @@
import { createHash } from "node:crypto";
import { TRPCError } from "@trpc/server";
import { eq, and, desc, count, gte, lte, inArray, sql } from "drizzle-orm";
import { db } from "~/server/db";
import { watchlistItems, exposures, subscriptions, securityReports } from "~/server/db/schema";
import { scanHIBP, scanSecurityTrails, scanCensys, scanShodan, scanForums } from "./darkwatch/scan.engine";
import { processExposure } from "./darkwatch/alert.pipeline";
import type { ScanResult } from "./darkwatch/scan.engine";
interface ScanState {
status: "idle" | "running" | "completed" | "failed";
startedAt: Date | null;
completedAt: Date | null;
totalSources: number;
completedSources: number;
error: string | null;
}
const scanStates = new Map<string, ScanState>();
function hashValue(value: string): string {
return createHash("sha256").update(value.toLowerCase().trim()).digest("hex");
}
async function getSubscription(userId: string) {
const [sub] = await db
.select()
.from(subscriptions)
.where(and(eq(subscriptions.userId, userId), eq(subscriptions.status, "active")))
.limit(1);
if (!sub) {
throw new TRPCError({ code: "NOT_FOUND", message: "No active subscription found" });
}
return sub;
}
export async function getWatchlistItems(userId: string) {
const sub = await getSubscription(userId);
const items = await db
.select()
.from(watchlistItems)
.where(and(eq(watchlistItems.subscriptionId, sub.id), eq(watchlistItems.isActive, true)))
.orderBy(desc(watchlistItems.createdAt));
return items;
}
export async function addWatchlistItem(userId: string, type: string, value: string) {
const sub = await getSubscription(userId);
const hash = hashValue(value);
const [existing] = await db
.select()
.from(watchlistItems)
.where(
and(
eq(watchlistItems.subscriptionId, sub.id),
eq(watchlistItems.type, type as "email" | "phoneNumber" | "ssn" | "address" | "domain"),
eq(watchlistItems.hash, hash),
),
)
.limit(1);
if (existing) {
if (existing.isActive) {
throw new TRPCError({ code: "CONFLICT", message: "Watchlist item already exists" });
}
const [reactivated] = await db
.update(watchlistItems)
.set({ isActive: true, updatedAt: new Date() })
.where(eq(watchlistItems.id, existing.id))
.returning();
return reactivated;
}
const [inserted] = await db
.insert(watchlistItems)
.values({
subscriptionId: sub.id,
type: type as "email" | "phoneNumber" | "ssn" | "address" | "domain",
value,
hash,
})
.returning();
return inserted;
}
export async function removeWatchlistItem(userId: string, itemId: string) {
const sub = await getSubscription(userId);
const [item] = await db
.select()
.from(watchlistItems)
.where(and(eq(watchlistItems.id, itemId), eq(watchlistItems.subscriptionId, sub.id)))
.limit(1);
if (!item) {
throw new TRPCError({ code: "NOT_FOUND", message: "Watchlist item not found" });
}
const [deleted] = await db
.update(watchlistItems)
.set({ isActive: false, updatedAt: new Date() })
.where(eq(watchlistItems.id, itemId))
.returning();
return deleted;
}
export async function getExposures(
userId: string,
filters?: {
severity?: string;
source?: string;
page?: number;
limit?: number;
},
) {
const sub = await getSubscription(userId);
const page = filters?.page ?? 1;
const limit = filters?.limit ?? 20;
const offset = (page - 1) * limit;
const conditions = [eq(exposures.subscriptionId, sub.id)];
if (filters?.severity) {
conditions.push(eq(exposures.severity, filters.severity as "info" | "warning" | "critical"));
}
if (filters?.source) {
conditions.push(eq(exposures.source, filters.source as "hibp" | "securityTrails" | "censys" | "darkWebForum" | "shodan" | "honeypot"));
}
const [totalResult] = await db
.select({ count: count() })
.from(exposures)
.where(and(...conditions));
const items = await db
.select()
.from(exposures)
.where(and(...conditions))
.orderBy(desc(exposures.detectedAt))
.limit(limit)
.offset(offset);
return {
items,
total: totalResult.count,
page,
limit,
totalPages: Math.ceil(totalResult.count / limit),
};
}
export async function getExposureDetails(userId: string, exposureId: string) {
const sub = await getSubscription(userId);
const [exposure] = await db
.select()
.from(exposures)
.where(and(eq(exposures.id, exposureId), eq(exposures.subscriptionId, sub.id)))
.limit(1);
if (!exposure) {
throw new TRPCError({ code: "NOT_FOUND", message: "Exposure not found" });
}
if (exposure.watchlistItemId) {
const [item] = await db
.select()
.from(watchlistItems)
.where(eq(watchlistItems.id, exposure.watchlistItemId))
.limit(1);
return { ...exposure, watchlistItem: item ?? null };
}
return { ...exposure, watchlistItem: null };
}
export async function checkTierLimits(userId: string): Promise<{ allowed: boolean; reason?: string }> {
const sub = await getSubscription(userId);
const tier = sub.tier;
if (tier === "premium") {
return { allowed: true };
}
const maxScans: Record<string, number> = {
basic: 1,
plus: 4,
};
const maxScanCount = maxScans[tier] ?? 1;
const periodStart = tier === "plus"
? new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const [result] = await db
.select({ count: count() })
.from(exposures)
.where(
and(
eq(exposures.subscriptionId, sub.id),
gte(exposures.detectedAt, periodStart),
),
);
if (result.count >= maxScanCount) {
const periodLabel = tier === "plus" ? "week" : "month";
return {
allowed: false,
reason: `Scan limit reached: ${maxScanCount} per ${periodLabel} for ${tier} tier`,
};
}
return { allowed: true };
}
export async function runScan(userId: string): Promise<{ scanId: string }> {
const sub = await getSubscription(userId);
const tierCheck = await checkTierLimits(userId);
if (!tierCheck.allowed) {
throw new TRPCError({ code: "TOO_MANY_REQUESTS", message: tierCheck.reason });
}
if (scanStates.get(userId)?.status === "running") {
throw new TRPCError({ code: "TOO_MANY_REQUESTS", message: "Scan already in progress" });
}
const scanId = crypto.randomUUID();
scanStates.set(userId, {
status: "running",
startedAt: new Date(),
completedAt: null,
totalSources: 4,
completedSources: 0,
error: null,
});
const items = await db
.select()
.from(watchlistItems)
.where(and(eq(watchlistItems.subscriptionId, sub.id), eq(watchlistItems.isActive, true)));
processScan(userId, sub.id, items).catch((err) => {
console.error("[darkwatch] Scan failed:", err);
const state = scanStates.get(userId);
if (state) {
state.status = "failed";
state.error = err instanceof Error ? err.message : "Unknown error";
}
});
return { scanId };
}
async function processScan(
userId: string,
subscriptionId: string,
items: Array<{ id: string; type: string; value: string }>,
): Promise<void> {
const allResults: ScanResult[] = [];
for (const item of items) {
const sourcePromises: Promise<ScanResult[]>[] = [];
switch (item.type) {
case "email":
sourcePromises.push(scanHIBP(item.value));
break;
case "domain":
sourcePromises.push(scanSecurityTrails(item.value));
sourcePromises.push(scanCensys(item.value));
sourcePromises.push(scanShodan(item.value));
break;
case "phoneNumber":
sourcePromises.push(scanShodan(item.value));
sourcePromises.push(scanCensys(item.value));
break;
default:
sourcePromises.push(scanShodan(item.value));
break;
}
sourcePromises.push(scanForums(item.value));
const results = await Promise.allSettled(sourcePromises);
for (const r of results) {
if (r.status === "fulfilled") {
allResults.push(...r.value.map((sr) => ({ ...sr, watchlistItemId: item.id })));
}
}
const state = scanStates.get(userId);
if (state) {
state.completedSources++;
}
}
for (const result of allResults) {
try {
await processExposure({
subscriptionId,
watchlistItemId: (result as ScanResult & { watchlistItemId: string }).watchlistItemId,
source: result.source,
dataType: result.dataType,
identifier: result.identifier,
identifierHash: result.identifierHash,
severity: result.severity,
metadata: result.metadata,
detectedAt: result.detectedAt,
});
} catch (err) {
console.error("[darkwatch] Failed to process exposure:", err);
}
}
const state = scanStates.get(userId);
if (state) {
state.status = "completed";
state.completedAt = new Date();
}
}
export async function getScanStatus(userId: string) {
const state = scanStates.get(userId);
if (!state) {
return { status: "idle" as const, startedAt: null, completedAt: null, progress: 0 };
}
return {
status: state.status,
startedAt: state.startedAt,
completedAt: state.completedAt,
progress: state.totalSources > 0 ? state.completedSources / state.totalSources : 0,
error: state.error,
};
}
export async function getReports(
userId: string,
filters?: { page?: number; limit?: number },
) {
const sub = await getSubscription(userId);
const page = filters?.page ?? 1;
const limit = filters?.limit ?? 20;
const offset = (page - 1) * limit;
const [totalResult] = await db
.select({ count: count() })
.from(securityReports)
.where(eq(securityReports.subscriptionId, sub.id));
const items = await db
.select()
.from(securityReports)
.where(eq(securityReports.subscriptionId, sub.id))
.orderBy(desc(securityReports.createdAt))
.limit(limit)
.offset(offset);
return {
items,
total: totalResult.count,
page,
limit,
totalPages: Math.ceil(totalResult.count / limit),
};
}

View File

@@ -0,0 +1,118 @@
import { eq, and } from "drizzle-orm";
import { db } from "~/server/db";
import { exposures, alerts, subscriptions } from "~/server/db/schema";
export function severityScore(exposure: {
source: string;
dataType: string;
}): "info" | "warning" | "critical" {
const criticalSources = new Set(["hibp"]);
const warningSources = new Set(["shodan", "censys", "darkWebForum"]);
const criticalTypes = new Set(["ssn"]);
const warningTypes = new Set(["email", "phoneNumber"]);
if (criticalSources.has(exposure.source) || criticalTypes.has(exposure.dataType)) {
return "critical";
}
if (warningSources.has(exposure.source) || warningTypes.has(exposure.dataType)) {
return "warning";
}
return "info";
}
export async function processExposure(newExposure: {
subscriptionId: string;
watchlistItemId?: string | null;
source: string;
dataType: string;
identifier: string;
identifierHash: string;
severity: string;
metadata?: Record<string, unknown> | null;
detectedAt: Date;
}): Promise<{ exposureId: string; alertCreated: boolean }> {
const severity = newExposure.severity as "info" | "warning" | "critical";
const [existing] = await db
.select()
.from(exposures)
.where(
and(
eq(exposures.identifierHash, newExposure.identifierHash),
eq(exposures.source, newExposure.source as "hibp" | "securityTrails" | "censys" | "darkWebForum" | "shodan" | "honeypot"),
),
)
.limit(1);
if (existing) {
const currentSeverityIdx = ["info", "warning", "critical"].indexOf(existing.severity);
const newSeverityIdx = ["info", "warning", "critical"].indexOf(severity);
if (newSeverityIdx <= currentSeverityIdx) {
return { exposureId: existing.id, alertCreated: false };
}
const [updated] = await db
.update(exposures)
.set({
severity,
metadata: newExposure.metadata ?? existing.metadata,
detectedAt: newExposure.detectedAt,
updatedAt: new Date(),
})
.where(eq(exposures.id, existing.id))
.returning();
await createAlertForExposure(updated, severity);
return { exposureId: updated.id, alertCreated: true };
}
const [inserted] = await db
.insert(exposures)
.values({
subscriptionId: newExposure.subscriptionId,
watchlistItemId: newExposure.watchlistItemId ?? null,
source: newExposure.source as "hibp" | "securityTrails" | "censys" | "darkWebForum" | "shodan" | "honeypot",
dataType: newExposure.dataType as "email" | "phoneNumber" | "ssn" | "address" | "domain",
identifier: newExposure.identifier,
identifierHash: newExposure.identifierHash,
severity,
metadata: newExposure.metadata ?? null,
isFirstTime: true,
detectedAt: newExposure.detectedAt,
})
.returning();
await createAlertForExposure(inserted, severity);
return { exposureId: inserted.id, alertCreated: true };
}
async function createAlertForExposure(
exposure: { id: string; subscriptionId: string; severity: string; dataType: string; source: string; identifier: string },
severity: "info" | "warning" | "critical",
): Promise<void> {
const alertSeverityMap: Record<string, "info" | "warning" | "critical"> = {
info: "info",
warning: "warning",
critical: "critical",
};
const title = `${severity === "critical" ? "Critical" : severity === "warning" ? "Warning" : "Info"} exposure detected`;
const message = `${exposure.dataType} exposed on ${exposure.source}: ${exposure.identifier}`;
const [sub] = await db
.select()
.from(subscriptions)
.where(eq(subscriptions.id, exposure.subscriptionId))
.limit(1);
if (!sub) return;
await db.insert(alerts).values({
subscriptionId: exposure.subscriptionId,
userId: sub.userId,
exposureId: exposure.id,
type: "exposure_detected",
title,
message,
severity: alertSeverityMap[severity] ?? "info",
channel: ["email", "push"],
});
}

View File

@@ -0,0 +1,24 @@
import { describe, it, expect } from "vitest";
import { severityScore } from "./alert.pipeline";
describe("severityScore", () => {
it("returns critical for HIBP source", () => {
expect(severityScore({ source: "hibp", dataType: "email" })).toBe("critical");
});
it("returns critical for ssn data type", () => {
expect(severityScore({ source: "darkWebForum", dataType: "ssn" })).toBe("critical");
});
it("returns warning for shodan source", () => {
expect(severityScore({ source: "shodan", dataType: "domain" })).toBe("warning");
});
it("returns warning for email data type", () => {
expect(severityScore({ source: "securityTrails", dataType: "email" })).toBe("warning");
});
it("returns info for low-risk combinations", () => {
expect(severityScore({ source: "securityTrails", dataType: "domain" })).toBe("info");
});
});

View File

@@ -0,0 +1,188 @@
import { createHash } from "node:crypto";
interface ScanResult {
source: "hibp" | "securityTrails" | "censys" | "shodan" | "darkWebForum";
dataType: "email" | "phoneNumber" | "ssn" | "address" | "domain";
identifier: string;
identifierHash: string;
metadata: Record<string, unknown>;
detectedAt: Date;
severity: "info" | "warning" | "critical";
}
interface CircuitState {
failures: number;
lastFailure: number;
isOpen: boolean;
}
const circuits = new Map<string, CircuitState>();
const THRESHOLD = 5;
const RESET_MS = 60_000;
function isCircuitOpen(name: string): boolean {
const state = circuits.get(name);
if (!state) return false;
if (!state.isOpen) return false;
if (Date.now() - state.lastFailure > RESET_MS) {
circuits.delete(name);
return false;
}
return true;
}
function recordFailure(name: string): void {
const state = circuits.get(name) ?? { failures: 0, lastFailure: 0, isOpen: false };
state.failures++;
state.lastFailure = Date.now();
if (state.failures >= THRESHOLD) {
state.isOpen = true;
}
circuits.set(name, state);
}
function recordSuccess(name: string): void {
circuits.delete(name);
}
async function fetchWithCircuit(name: string, url: string, headers: Record<string, string>): Promise<Response | null> {
if (isCircuitOpen(name)) {
console.warn(`[darkwatch] Circuit open for ${name}, skipping`);
return null;
}
try {
const res = await fetch(url, { headers, signal: AbortSignal.timeout(10_000) });
if (!res.ok) {
recordFailure(name);
console.warn(`[darkwatch] ${name} returned ${res.status}`);
return null;
}
recordSuccess(name);
return res;
} catch (err) {
recordFailure(name);
console.error(`[darkwatch] ${name} error:`, err);
return null;
}
}
function hashValue(value: string): string {
return createHash("sha256").update(value.toLowerCase().trim()).digest("hex");
}
export async function scanHIBP(email: string): Promise<ScanResult[]> {
const apiKey = process.env.HIBP_API_KEY;
if (!apiKey) {
console.warn("[darkwatch] HIBP_API_KEY not set, skipping HIBP scan");
return [];
}
const res = await fetchWithCircuit(
"hibp",
`https://haveibeenpwned.com/api/v3/breachedaccount/${encodeURIComponent(email)}?truncateResponse=false`,
{ "hibp-api-key": apiKey, "user-agent": "ShieldAI-DarkWatch" },
);
if (!res) return [];
const breaches = await res.json() as Array<{ Name: string; BreachDate: string; DataClasses: string[]; Description: string }>;
return breaches.map((b) => ({
source: "hibp" as const,
dataType: "email" as const,
identifier: email,
identifierHash: hashValue(email),
metadata: { breachName: b.Name, breachDate: b.BreachDate, dataClasses: b.DataClasses, description: b.Description },
detectedAt: new Date(b.BreachDate),
severity: "critical" as const,
}));
}
export async function scanSecurityTrails(identifier: string): Promise<ScanResult[]> {
const apiKey = process.env.SECURITYTRAILS_API_KEY;
if (!apiKey) {
console.warn("[darkwatch] SECURITYTRAILS_API_KEY not set, skipping");
return [];
}
const domain = identifier.includes("@") ? identifier.split("@")[1] : identifier;
const res = await fetchWithCircuit(
"securitytrails",
`https://api.securitytrails.com/v1/domain/${encodeURIComponent(domain)}/subdomains`,
{ APIKEY: apiKey },
);
if (!res) return [];
const data = await res.json() as { subdomains: string[] };
return (data.subdomains ?? []).slice(0, 20).map((sub) => ({
source: "securityTrails" as const,
dataType: "domain" as const,
identifier: `${sub}.${domain}`,
identifierHash: hashValue(`${sub}.${domain}`),
metadata: { subdomain: sub, domain },
detectedAt: new Date(),
severity: "info" as const,
}));
}
export async function scanCensys(query: string): Promise<ScanResult[]> {
const apiKey = process.env.CENSYS_API_KEY;
if (!apiKey) {
console.warn("[darkwatch] CENSYS_API_KEY not set, skipping");
return [];
}
const res = await fetchWithCircuit(
"censys",
`https://search.censys.io/api/v2/hosts/search?q=${encodeURIComponent(query)}&per_page=10`,
{ Authorization: `Bearer ${apiKey}` },
);
if (!res) return [];
const data = await res.json() as { result?: { hits?: Array<{ ip: string; services?: Array<{ service_name: string; port: number }> }> } };
const hits = data.result?.hits ?? [];
return hits.map((h) => ({
source: "censys" as const,
dataType: "domain" as const,
identifier: h.ip,
identifierHash: hashValue(h.ip),
metadata: { ip: h.ip, services: h.services },
detectedAt: new Date(),
severity: "warning" as const,
}));
}
export async function scanShodan(query: string): Promise<ScanResult[]> {
const apiKey = process.env.SHODAN_API_KEY;
if (!apiKey) {
console.warn("[darkwatch] SHODAN_API_KEY not set, skipping");
return [];
}
const res = await fetchWithCircuit(
"shodan",
`https://api.shodan.io/shodan/host/search?key=${apiKey}&query=${encodeURIComponent(query)}&limit=10`,
{},
);
if (!res) return [];
const data = await res.json() as { matches?: Array<{ ip_str: string; port: number; org?: string; hostnames?: string[] }> };
const matches = data.matches ?? [];
return matches.map((m) => ({
source: "shodan" as const,
dataType: "domain" as const,
identifier: m.ip_str,
identifierHash: hashValue(m.ip_str),
metadata: { ip: m.ip_str, port: m.port, org: m.org, hostnames: m.hostnames },
detectedAt: new Date(),
severity: "warning" as const,
}));
}
export async function scanForums(identifier: string): Promise<ScanResult[]> {
const forumEnabled = process.env.DARKWEB_FORUM_ENABLED;
if (!forumEnabled || forumEnabled !== "true") {
return [];
}
return [{
source: "darkWebForum" as const,
dataType: (identifier.includes("@") ? "email" : "domain") as "email" | "domain",
identifier,
identifierHash: hashValue(identifier),
metadata: { note: "Forum scraping placeholder", identifier },
detectedAt: new Date(),
severity: "warning" as const,
}];
}
export type { ScanResult };