diff --git a/web/src/server/api/root.ts b/web/src/server/api/root.ts index 972114d..909e3dd 100644 --- a/web/src/server/api/root.ts +++ b/web/src/server/api/root.ts @@ -2,6 +2,7 @@ import { exampleRouter } from "./routers/example"; import { userRouter } from "./routers/user"; import { billingRouter } from "./routers/billing"; import { notificationRouter } from "./routers/notification"; +import { darkwatchRouter } from "./routers/darkwatch"; import { createTRPCRouter } from "./utils"; export const appRouter = createTRPCRouter({ @@ -9,6 +10,7 @@ export const appRouter = createTRPCRouter({ user: userRouter, billing: billingRouter, notification: notificationRouter, + darkwatch: darkwatchRouter, }); export type AppRouter = typeof appRouter; diff --git a/web/src/server/api/routers/darkwatch.test.ts b/web/src/server/api/routers/darkwatch.test.ts new file mode 100644 index 0000000..97a2a63 --- /dev/null +++ b/web/src/server/api/routers/darkwatch.test.ts @@ -0,0 +1,202 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { initTRPC, TRPCError } from "@trpc/server"; +import { wrap } from "@typeschema/valibot"; +import { + AddWatchlistItemSchema, + RemoveWatchlistItemSchema, + ExposureFilterSchema, + ExposureDetailsSchema, + RunScanSchema, + ReportFilterSchema, +} from "../schemas/darkwatch"; + +vi.mock("~/server/services/darkwatch.service", () => ({ + getWatchlistItems: vi.fn(), + addWatchlistItem: vi.fn(), + removeWatchlistItem: vi.fn(), + getExposures: vi.fn(), + getExposureDetails: vi.fn(), + runScan: vi.fn(), + getScanStatus: vi.fn(), + getReports: vi.fn(), +})); + +import * as darkwatchService from "~/server/services/darkwatch.service"; + +const mockGetWatchlistItems = vi.mocked(darkwatchService.getWatchlistItems); +const mockAddWatchlistItem = vi.mocked(darkwatchService.addWatchlistItem); +const mockRemoveWatchlistItem = vi.mocked(darkwatchService.removeWatchlistItem); +const mockGetExposures = vi.mocked(darkwatchService.getExposures); +const mockGetExposureDetails = vi.mocked(darkwatchService.getExposureDetails); +const mockRunScan = vi.mocked(darkwatchService.runScan); +const mockGetScanStatus = vi.mocked(darkwatchService.getScanStatus); +const mockGetReports = vi.mocked(darkwatchService.getReports); + +type User = { + id: string; email: string; name: string | null; image: string | null; + role: string; emailVerified: Date | null; deletedAt: Date | null; + stripeCustomerId: string | null; + createdAt: Date; updatedAt: Date; +}; +type Ctx = { db: object; user: User | null; apiKey: string | null }; + +function createCaller(user: User | null) { + const t = initTRPC.context().create(); + const isAuthed = t.middleware(({ ctx, next }) => { + if (!ctx.user) throw new TRPCError({ code: "UNAUTHORIZED" }); + return next({ ctx: { ...ctx, user: ctx.user } }); + }); + + const router = t.router({ + getWatchlist: t.procedure.use(isAuthed).query(async ({ ctx }) => { + return mockGetWatchlistItems(ctx.user.id); + }), + addWatchlistItem: t.procedure.use(isAuthed) + .input(wrap(AddWatchlistItemSchema)) + .mutation(async ({ ctx, input }) => { + return mockAddWatchlistItem(ctx.user.id, input.type, input.value); + }), + removeWatchlistItem: t.procedure.use(isAuthed) + .input(wrap(RemoveWatchlistItemSchema)) + .mutation(async ({ ctx, input }) => { + return mockRemoveWatchlistItem(ctx.user.id, input.itemId); + }), + getExposures: t.procedure.use(isAuthed) + .input(wrap(ExposureFilterSchema)) + .query(async ({ ctx, input }) => { + return mockGetExposures(ctx.user.id, input); + }), + getExposureDetails: t.procedure.use(isAuthed) + .input(wrap(ExposureDetailsSchema)) + .query(async ({ ctx, input }) => { + return mockGetExposureDetails(ctx.user.id, input.exposureId); + }), + runScan: t.procedure.use(isAuthed) + .input(wrap(RunScanSchema)) + .mutation(async ({ ctx }) => { + return mockRunScan(ctx.user.id); + }), + getScanStatus: t.procedure.use(isAuthed).query(async ({ ctx }) => { + return mockGetScanStatus(ctx.user.id); + }), + getReports: t.procedure.use(isAuthed) + .input(wrap(ReportFilterSchema)) + .query(async ({ ctx, input }) => { + return mockGetReports(ctx.user.id, input); + }), + }); + + const caller = t.createCallerFactory(router); + return caller({ db: {} as never, user, apiKey: null }); +} + +const baseUser: User = { + id: "user-1", email: "a@b.com", name: "Test", image: null, + role: "user", emailVerified: null, deletedAt: null, + stripeCustomerId: null, + createdAt: new Date(), updatedAt: new Date(), +}; + +function makeUser(overrides: Partial = {}): User { + return { ...baseUser, ...overrides }; +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("darkwatch.getWatchlist", () => { + it("returns watchlist items for authenticated user", async () => { + const items = [{ id: "w1", type: "email", value: "test@example.com" }]; + mockGetWatchlistItems.mockResolvedValue(items as never); + const api = createCaller(makeUser()); + expect(await api.getWatchlist()).toEqual(items); + }); + + it("rejects unauthenticated", async () => { + const api = createCaller(null); + await expect(api.getWatchlist()).rejects.toThrow(TRPCError); + }); +}); + +describe("darkwatch.addWatchlistItem", () => { + it("adds a watchlist item", async () => { + const item = { id: "w1", type: "email", value: "test@example.com" }; + mockAddWatchlistItem.mockResolvedValue(item as never); + const api = createCaller(makeUser()); + const result = await api.addWatchlistItem({ type: "email", value: "test@example.com" }); + expect(result).toEqual(item); + }); + + it("rejects invalid type", async () => { + const api = createCaller(makeUser()); + await expect( + api.addWatchlistItem({ type: "invalid" as never, value: "test" }), + ).rejects.toThrow(); + }); +}); + +describe("darkwatch.removeWatchlistItem", () => { + it("removes a watchlist item", async () => { + mockRemoveWatchlistItem.mockResolvedValue({ id: "w1", isActive: false } as never); + const api = createCaller(makeUser()); + const result = await api.removeWatchlistItem({ itemId: "w1" }); + expect(result.isActive).toBe(false); + }); +}); + +describe("darkwatch.getExposures", () => { + it("returns exposures with pagination", async () => { + const data = { items: [], total: 0, page: 1, limit: 20, totalPages: 0 }; + mockGetExposures.mockResolvedValue(data); + const api = createCaller(makeUser()); + const result = await api.getExposures({ page: 1, limit: 20 }); + expect(result.total).toBe(0); + }); + + it("passes severity filter", async () => { + const data = { items: [], total: 0, page: 1, limit: 20, totalPages: 0 }; + mockGetExposures.mockResolvedValue(data); + const api = createCaller(makeUser()); + await api.getExposures({ severity: "critical" }); + expect(mockGetExposures).toHaveBeenCalledWith("user-1", { severity: "critical", page: 1, limit: 20 }); + }); +}); + +describe("darkwatch.getExposureDetails", () => { + it("returns exposure details", async () => { + const exposure = { id: "e1", identifier: "test@example.com", watchlistItem: null }; + mockGetExposureDetails.mockResolvedValue(exposure as never); + const api = createCaller(makeUser()); + const result = await api.getExposureDetails({ exposureId: "e1" }); + expect(result.id).toBe("e1"); + }); +}); + +describe("darkwatch.runScan", () => { + it("triggers a scan", async () => { + mockRunScan.mockResolvedValue({ scanId: "s1" }); + const api = createCaller(makeUser()); + const result = await api.runScan({}); + expect(result.scanId).toBe("s1"); + }); +}); + +describe("darkwatch.getScanStatus", () => { + it("returns scan status", async () => { + mockGetScanStatus.mockResolvedValue({ status: "idle", startedAt: null, completedAt: null, progress: 0, error: null }); + const api = createCaller(makeUser()); + const result = await api.getScanStatus(); + expect(result.status).toBe("idle"); + }); +}); + +describe("darkwatch.getReports", () => { + it("returns reports", async () => { + const data = { items: [], total: 0, page: 1, limit: 20, totalPages: 0 }; + mockGetReports.mockResolvedValue(data); + const api = createCaller(makeUser()); + const result = await api.getReports({ page: 1, limit: 20 }); + expect(result.total).toBe(0); + }); +}); diff --git a/web/src/server/api/routers/darkwatch.ts b/web/src/server/api/routers/darkwatch.ts new file mode 100644 index 0000000..169f995 --- /dev/null +++ b/web/src/server/api/routers/darkwatch.ts @@ -0,0 +1,57 @@ +import { wrap } from "@typeschema/valibot"; +import { createTRPCRouter, protectedProcedure } from "../utils"; +import { + AddWatchlistItemSchema, + RemoveWatchlistItemSchema, + ExposureFilterSchema, + ExposureDetailsSchema, + RunScanSchema, + ReportFilterSchema, +} from "../schemas/darkwatch"; +import * as darkwatchService from "~/server/services/darkwatch.service"; + +export const darkwatchRouter = createTRPCRouter({ + getWatchlist: protectedProcedure.query(async ({ ctx }) => { + return darkwatchService.getWatchlistItems(ctx.user.id); + }), + + addWatchlistItem: protectedProcedure + .input(wrap(AddWatchlistItemSchema)) + .mutation(async ({ ctx, input }) => { + return darkwatchService.addWatchlistItem(ctx.user.id, input.type, input.value); + }), + + removeWatchlistItem: protectedProcedure + .input(wrap(RemoveWatchlistItemSchema)) + .mutation(async ({ ctx, input }) => { + return darkwatchService.removeWatchlistItem(ctx.user.id, input.itemId); + }), + + getExposures: protectedProcedure + .input(wrap(ExposureFilterSchema)) + .query(async ({ ctx, input }) => { + return darkwatchService.getExposures(ctx.user.id, input); + }), + + getExposureDetails: protectedProcedure + .input(wrap(ExposureDetailsSchema)) + .query(async ({ ctx, input }) => { + return darkwatchService.getExposureDetails(ctx.user.id, input.exposureId); + }), + + runScan: protectedProcedure + .input(wrap(RunScanSchema)) + .mutation(async ({ ctx }) => { + return darkwatchService.runScan(ctx.user.id); + }), + + getScanStatus: protectedProcedure.query(async ({ ctx }) => { + return darkwatchService.getScanStatus(ctx.user.id); + }), + + getReports: protectedProcedure + .input(wrap(ReportFilterSchema)) + .query(async ({ ctx, input }) => { + return darkwatchService.getReports(ctx.user.id, input); + }), +}); diff --git a/web/src/server/api/schemas/darkwatch.ts b/web/src/server/api/schemas/darkwatch.ts new file mode 100644 index 0000000..fff1736 --- /dev/null +++ b/web/src/server/api/schemas/darkwatch.ts @@ -0,0 +1,28 @@ +import { object, string, minLength, optional, number, picklist } from "valibot"; + +export const AddWatchlistItemSchema = object({ + type: picklist(["email", "phoneNumber", "ssn", "address", "domain"]), + value: string([minLength(1)]), +}); + +export const RemoveWatchlistItemSchema = object({ + itemId: string([minLength(1)]), +}); + +export const ExposureFilterSchema = object({ + severity: optional(picklist(["info", "warning", "critical"])), + source: optional(picklist(["hibp", "securityTrails", "censys", "darkWebForum", "shodan", "honeypot"])), + page: optional(number(), 1), + limit: optional(number(), 20), +}); + +export const ExposureDetailsSchema = object({ + exposureId: string([minLength(1)]), +}); + +export const RunScanSchema = object({}); + +export const ReportFilterSchema = object({ + page: optional(number(), 1), + limit: optional(number(), 20), +}); diff --git a/web/src/server/services/darkwatch.service.test.ts b/web/src/server/services/darkwatch.service.test.ts new file mode 100644 index 0000000..16ba194 --- /dev/null +++ b/web/src/server/services/darkwatch.service.test.ts @@ -0,0 +1,138 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { TRPCError } from "@trpc/server"; + +const mockSelectFromWhereLimit = vi.fn(); +const mockSelectFromWhereOrderByLimitOffset = vi.fn(); +const mockInsertValuesReturning = vi.fn(); +const mockUpdateSetWhereReturning = vi.fn(); +const mockSubFindFirst = vi.fn(); +const mockWhereLimit = vi.fn(); +const mockCountSelectFromWhere = vi.fn(); + +vi.mock("~/server/db", () => ({ + db: { + query: { + subscriptions: { + findFirst: mockSubFindFirst, + }, + }, + select: vi.fn((...args: unknown[]) => { + const firstArg = args[0] as { count?: unknown } | undefined; + const isCount = args.length > 0 && firstArg?.count; + + if (isCount) { + return { + from: vi.fn(() => ({ + where: vi.fn(() => mockCountSelectFromWhere()), + })), + }; + } + + return { + from: vi.fn(() => ({ + where: vi.fn(() => ({ + limit: mockSelectFromWhereLimit, + orderBy: vi.fn(() => ({ + limit: vi.fn(() => ({ + offset: mockSelectFromWhereOrderByLimitOffset, + })), + })), + })), + innerJoin: vi.fn(() => ({ + where: vi.fn(() => ({ + limit: vi.fn().mockResolvedValue([]), + })), + })), + })), + }; + }), + insert: vi.fn(() => ({ + values: vi.fn(() => ({ + returning: mockInsertValuesReturning, + })), + })), + update: vi.fn(() => ({ + set: vi.fn(() => ({ + where: vi.fn(() => ({ + returning: mockUpdateSetWhereReturning, + })), + })), + })), + }, +})); + +const mockSub = { + id: "sub-1", + userId: "user-1", + tier: "premium" as const, + status: "active" as const, + stripeId: null as string | null, + familyGroupId: null as string | null, + currentPeriodStart: new Date(), + currentPeriodEnd: new Date(Date.now() + 86400000), + cancelAtPeriodEnd: false, + createdAt: new Date(), + updatedAt: new Date(), +}; + +beforeEach(() => { + vi.clearAllMocks(); + mockSelectFromWhereLimit.mockResolvedValue([mockSub]); +}); + +describe("checkTierLimits", () => { + it("allows premium tier always", async () => { + mockSubFindFirst.mockResolvedValue({ ...mockSub, tier: "premium" as const }); + const { checkTierLimits } = await import("./darkwatch.service"); + const result = await checkTierLimits("user-1"); + expect(result.allowed).toBe(true); + }); + + it("allows basic tier if under limit", async () => { + mockSubFindFirst.mockResolvedValue({ ...mockSub, tier: "basic" as const }); + mockCountSelectFromWhere.mockResolvedValue([{ count: 0 }]); + const { checkTierLimits } = await import("./darkwatch.service"); + const result = await checkTierLimits("user-1"); + expect(result.allowed).toBe(true); + }); +}); + +describe("addWatchlistItem", () => { + it("hashes and deduplicates values", async () => { + mockSubFindFirst.mockResolvedValue(mockSub); + mockSelectFromWhereLimit + .mockResolvedValueOnce([mockSub]) + .mockResolvedValueOnce([]); + mockInsertValuesReturning.mockResolvedValue([{ + id: "w1", subscriptionId: "sub-1", type: "email", value: "test@example.com", + hash: "some-hash", isActive: true, + createdAt: new Date(), updatedAt: new Date(), + }]); + const { addWatchlistItem } = await import("./darkwatch.service"); + const result = await addWatchlistItem("user-1", "email", "test@example.com"); + expect(result.type).toBe("email"); + expect(result.value).toBe("test@example.com"); + }); +}); + +describe("removeWatchlistItem", () => { + it("throws not found if item does not belong to user", async () => { + mockSubFindFirst.mockResolvedValue(mockSub); + mockSelectFromWhereLimit.mockResolvedValue([]); + const { removeWatchlistItem } = await import("./darkwatch.service"); + await expect(removeWatchlistItem("user-1", "nonexistent")).rejects.toThrow(TRPCError); + }); +}); + +describe("getExposures", () => { + it("returns paginated exposures", async () => { + mockSubFindFirst.mockResolvedValue(mockSub); + mockCountSelectFromWhere.mockResolvedValue([{ count: 0 }]); + mockSelectFromWhereOrderByLimitOffset.mockResolvedValue([]); + const { getExposures } = await import("./darkwatch.service"); + const result = await getExposures("user-1", { page: 1, limit: 10 }); + expect(result.items).toEqual([]); + expect(result.total).toBe(0); + expect(result.page).toBe(1); + }); +}); diff --git a/web/src/server/services/darkwatch.service.ts b/web/src/server/services/darkwatch.service.ts new file mode 100644 index 0000000..1c47033 --- /dev/null +++ b/web/src/server/services/darkwatch.service.ts @@ -0,0 +1,363 @@ +import { createHash } from "node:crypto"; +import { TRPCError } from "@trpc/server"; +import { eq, and, desc, count, gte, lte, inArray, sql } from "drizzle-orm"; +import { db } from "~/server/db"; +import { watchlistItems, exposures, subscriptions, securityReports } from "~/server/db/schema"; +import { scanHIBP, scanSecurityTrails, scanCensys, scanShodan, scanForums } from "./darkwatch/scan.engine"; +import { processExposure } from "./darkwatch/alert.pipeline"; +import type { ScanResult } from "./darkwatch/scan.engine"; + +interface ScanState { + status: "idle" | "running" | "completed" | "failed"; + startedAt: Date | null; + completedAt: Date | null; + totalSources: number; + completedSources: number; + error: string | null; +} + +const scanStates = new Map(); + +function hashValue(value: string): string { + return createHash("sha256").update(value.toLowerCase().trim()).digest("hex"); +} + +async function getSubscription(userId: string) { + const [sub] = await db + .select() + .from(subscriptions) + .where(and(eq(subscriptions.userId, userId), eq(subscriptions.status, "active"))) + .limit(1); + if (!sub) { + throw new TRPCError({ code: "NOT_FOUND", message: "No active subscription found" }); + } + return sub; +} + +export async function getWatchlistItems(userId: string) { + const sub = await getSubscription(userId); + const items = await db + .select() + .from(watchlistItems) + .where(and(eq(watchlistItems.subscriptionId, sub.id), eq(watchlistItems.isActive, true))) + .orderBy(desc(watchlistItems.createdAt)); + return items; +} + +export async function addWatchlistItem(userId: string, type: string, value: string) { + const sub = await getSubscription(userId); + const hash = hashValue(value); + + const [existing] = await db + .select() + .from(watchlistItems) + .where( + and( + eq(watchlistItems.subscriptionId, sub.id), + eq(watchlistItems.type, type as "email" | "phoneNumber" | "ssn" | "address" | "domain"), + eq(watchlistItems.hash, hash), + ), + ) + .limit(1); + + if (existing) { + if (existing.isActive) { + throw new TRPCError({ code: "CONFLICT", message: "Watchlist item already exists" }); + } + const [reactivated] = await db + .update(watchlistItems) + .set({ isActive: true, updatedAt: new Date() }) + .where(eq(watchlistItems.id, existing.id)) + .returning(); + return reactivated; + } + + const [inserted] = await db + .insert(watchlistItems) + .values({ + subscriptionId: sub.id, + type: type as "email" | "phoneNumber" | "ssn" | "address" | "domain", + value, + hash, + }) + .returning(); + return inserted; +} + +export async function removeWatchlistItem(userId: string, itemId: string) { + const sub = await getSubscription(userId); + const [item] = await db + .select() + .from(watchlistItems) + .where(and(eq(watchlistItems.id, itemId), eq(watchlistItems.subscriptionId, sub.id))) + .limit(1); + + if (!item) { + throw new TRPCError({ code: "NOT_FOUND", message: "Watchlist item not found" }); + } + + const [deleted] = await db + .update(watchlistItems) + .set({ isActive: false, updatedAt: new Date() }) + .where(eq(watchlistItems.id, itemId)) + .returning(); + return deleted; +} + +export async function getExposures( + userId: string, + filters?: { + severity?: string; + source?: string; + page?: number; + limit?: number; + }, +) { + const sub = await getSubscription(userId); + const page = filters?.page ?? 1; + const limit = filters?.limit ?? 20; + const offset = (page - 1) * limit; + + const conditions = [eq(exposures.subscriptionId, sub.id)]; + if (filters?.severity) { + conditions.push(eq(exposures.severity, filters.severity as "info" | "warning" | "critical")); + } + if (filters?.source) { + conditions.push(eq(exposures.source, filters.source as "hibp" | "securityTrails" | "censys" | "darkWebForum" | "shodan" | "honeypot")); + } + + const [totalResult] = await db + .select({ count: count() }) + .from(exposures) + .where(and(...conditions)); + + const items = await db + .select() + .from(exposures) + .where(and(...conditions)) + .orderBy(desc(exposures.detectedAt)) + .limit(limit) + .offset(offset); + + return { + items, + total: totalResult.count, + page, + limit, + totalPages: Math.ceil(totalResult.count / limit), + }; +} + +export async function getExposureDetails(userId: string, exposureId: string) { + const sub = await getSubscription(userId); + const [exposure] = await db + .select() + .from(exposures) + .where(and(eq(exposures.id, exposureId), eq(exposures.subscriptionId, sub.id))) + .limit(1); + + if (!exposure) { + throw new TRPCError({ code: "NOT_FOUND", message: "Exposure not found" }); + } + + if (exposure.watchlistItemId) { + const [item] = await db + .select() + .from(watchlistItems) + .where(eq(watchlistItems.id, exposure.watchlistItemId)) + .limit(1); + return { ...exposure, watchlistItem: item ?? null }; + } + + return { ...exposure, watchlistItem: null }; +} + +export async function checkTierLimits(userId: string): Promise<{ allowed: boolean; reason?: string }> { + const sub = await getSubscription(userId); + const tier = sub.tier; + + if (tier === "premium") { + return { allowed: true }; + } + + const maxScans: Record = { + basic: 1, + plus: 4, + }; + + const maxScanCount = maxScans[tier] ?? 1; + const periodStart = tier === "plus" + ? new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) + : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + + const [result] = await db + .select({ count: count() }) + .from(exposures) + .where( + and( + eq(exposures.subscriptionId, sub.id), + gte(exposures.detectedAt, periodStart), + ), + ); + + if (result.count >= maxScanCount) { + const periodLabel = tier === "plus" ? "week" : "month"; + return { + allowed: false, + reason: `Scan limit reached: ${maxScanCount} per ${periodLabel} for ${tier} tier`, + }; + } + + return { allowed: true }; +} + +export async function runScan(userId: string): Promise<{ scanId: string }> { + const sub = await getSubscription(userId); + + const tierCheck = await checkTierLimits(userId); + if (!tierCheck.allowed) { + throw new TRPCError({ code: "TOO_MANY_REQUESTS", message: tierCheck.reason }); + } + + if (scanStates.get(userId)?.status === "running") { + throw new TRPCError({ code: "TOO_MANY_REQUESTS", message: "Scan already in progress" }); + } + + const scanId = crypto.randomUUID(); + scanStates.set(userId, { + status: "running", + startedAt: new Date(), + completedAt: null, + totalSources: 4, + completedSources: 0, + error: null, + }); + + const items = await db + .select() + .from(watchlistItems) + .where(and(eq(watchlistItems.subscriptionId, sub.id), eq(watchlistItems.isActive, true))); + + processScan(userId, sub.id, items).catch((err) => { + console.error("[darkwatch] Scan failed:", err); + const state = scanStates.get(userId); + if (state) { + state.status = "failed"; + state.error = err instanceof Error ? err.message : "Unknown error"; + } + }); + + return { scanId }; +} + +async function processScan( + userId: string, + subscriptionId: string, + items: Array<{ id: string; type: string; value: string }>, +): Promise { + const allResults: ScanResult[] = []; + + for (const item of items) { + const sourcePromises: Promise[] = []; + + switch (item.type) { + case "email": + sourcePromises.push(scanHIBP(item.value)); + break; + case "domain": + sourcePromises.push(scanSecurityTrails(item.value)); + sourcePromises.push(scanCensys(item.value)); + sourcePromises.push(scanShodan(item.value)); + break; + case "phoneNumber": + sourcePromises.push(scanShodan(item.value)); + sourcePromises.push(scanCensys(item.value)); + break; + default: + sourcePromises.push(scanShodan(item.value)); + break; + } + sourcePromises.push(scanForums(item.value)); + + const results = await Promise.allSettled(sourcePromises); + for (const r of results) { + if (r.status === "fulfilled") { + allResults.push(...r.value.map((sr) => ({ ...sr, watchlistItemId: item.id }))); + } + } + + const state = scanStates.get(userId); + if (state) { + state.completedSources++; + } + } + + for (const result of allResults) { + try { + await processExposure({ + subscriptionId, + watchlistItemId: (result as ScanResult & { watchlistItemId: string }).watchlistItemId, + source: result.source, + dataType: result.dataType, + identifier: result.identifier, + identifierHash: result.identifierHash, + severity: result.severity, + metadata: result.metadata, + detectedAt: result.detectedAt, + }); + } catch (err) { + console.error("[darkwatch] Failed to process exposure:", err); + } + } + + const state = scanStates.get(userId); + if (state) { + state.status = "completed"; + state.completedAt = new Date(); + } +} + +export async function getScanStatus(userId: string) { + const state = scanStates.get(userId); + if (!state) { + return { status: "idle" as const, startedAt: null, completedAt: null, progress: 0 }; + } + return { + status: state.status, + startedAt: state.startedAt, + completedAt: state.completedAt, + progress: state.totalSources > 0 ? state.completedSources / state.totalSources : 0, + error: state.error, + }; +} + +export async function getReports( + userId: string, + filters?: { page?: number; limit?: number }, +) { + const sub = await getSubscription(userId); + const page = filters?.page ?? 1; + const limit = filters?.limit ?? 20; + const offset = (page - 1) * limit; + + const [totalResult] = await db + .select({ count: count() }) + .from(securityReports) + .where(eq(securityReports.subscriptionId, sub.id)); + + const items = await db + .select() + .from(securityReports) + .where(eq(securityReports.subscriptionId, sub.id)) + .orderBy(desc(securityReports.createdAt)) + .limit(limit) + .offset(offset); + + return { + items, + total: totalResult.count, + page, + limit, + totalPages: Math.ceil(totalResult.count / limit), + }; +} diff --git a/web/src/server/services/darkwatch/alert.pipeline.ts b/web/src/server/services/darkwatch/alert.pipeline.ts new file mode 100644 index 0000000..766e142 --- /dev/null +++ b/web/src/server/services/darkwatch/alert.pipeline.ts @@ -0,0 +1,118 @@ +import { eq, and } from "drizzle-orm"; +import { db } from "~/server/db"; +import { exposures, alerts, subscriptions } from "~/server/db/schema"; + +export function severityScore(exposure: { + source: string; + dataType: string; +}): "info" | "warning" | "critical" { + const criticalSources = new Set(["hibp"]); + const warningSources = new Set(["shodan", "censys", "darkWebForum"]); + const criticalTypes = new Set(["ssn"]); + const warningTypes = new Set(["email", "phoneNumber"]); + + if (criticalSources.has(exposure.source) || criticalTypes.has(exposure.dataType)) { + return "critical"; + } + if (warningSources.has(exposure.source) || warningTypes.has(exposure.dataType)) { + return "warning"; + } + return "info"; +} + +export async function processExposure(newExposure: { + subscriptionId: string; + watchlistItemId?: string | null; + source: string; + dataType: string; + identifier: string; + identifierHash: string; + severity: string; + metadata?: Record | null; + detectedAt: Date; +}): Promise<{ exposureId: string; alertCreated: boolean }> { + const severity = newExposure.severity as "info" | "warning" | "critical"; + + const [existing] = await db + .select() + .from(exposures) + .where( + and( + eq(exposures.identifierHash, newExposure.identifierHash), + eq(exposures.source, newExposure.source as "hibp" | "securityTrails" | "censys" | "darkWebForum" | "shodan" | "honeypot"), + ), + ) + .limit(1); + + if (existing) { + const currentSeverityIdx = ["info", "warning", "critical"].indexOf(existing.severity); + const newSeverityIdx = ["info", "warning", "critical"].indexOf(severity); + if (newSeverityIdx <= currentSeverityIdx) { + return { exposureId: existing.id, alertCreated: false }; + } + const [updated] = await db + .update(exposures) + .set({ + severity, + metadata: newExposure.metadata ?? existing.metadata, + detectedAt: newExposure.detectedAt, + updatedAt: new Date(), + }) + .where(eq(exposures.id, existing.id)) + .returning(); + await createAlertForExposure(updated, severity); + return { exposureId: updated.id, alertCreated: true }; + } + + const [inserted] = await db + .insert(exposures) + .values({ + subscriptionId: newExposure.subscriptionId, + watchlistItemId: newExposure.watchlistItemId ?? null, + source: newExposure.source as "hibp" | "securityTrails" | "censys" | "darkWebForum" | "shodan" | "honeypot", + dataType: newExposure.dataType as "email" | "phoneNumber" | "ssn" | "address" | "domain", + identifier: newExposure.identifier, + identifierHash: newExposure.identifierHash, + severity, + metadata: newExposure.metadata ?? null, + isFirstTime: true, + detectedAt: newExposure.detectedAt, + }) + .returning(); + + await createAlertForExposure(inserted, severity); + return { exposureId: inserted.id, alertCreated: true }; +} + +async function createAlertForExposure( + exposure: { id: string; subscriptionId: string; severity: string; dataType: string; source: string; identifier: string }, + severity: "info" | "warning" | "critical", +): Promise { + const alertSeverityMap: Record = { + info: "info", + warning: "warning", + critical: "critical", + }; + + const title = `${severity === "critical" ? "Critical" : severity === "warning" ? "Warning" : "Info"} exposure detected`; + const message = `${exposure.dataType} exposed on ${exposure.source}: ${exposure.identifier}`; + + const [sub] = await db + .select() + .from(subscriptions) + .where(eq(subscriptions.id, exposure.subscriptionId)) + .limit(1); + + if (!sub) return; + + await db.insert(alerts).values({ + subscriptionId: exposure.subscriptionId, + userId: sub.userId, + exposureId: exposure.id, + type: "exposure_detected", + title, + message, + severity: alertSeverityMap[severity] ?? "info", + channel: ["email", "push"], + }); +} diff --git a/web/src/server/services/darkwatch/scan.engine.test.ts b/web/src/server/services/darkwatch/scan.engine.test.ts new file mode 100644 index 0000000..76d031c --- /dev/null +++ b/web/src/server/services/darkwatch/scan.engine.test.ts @@ -0,0 +1,24 @@ +import { describe, it, expect } from "vitest"; +import { severityScore } from "./alert.pipeline"; + +describe("severityScore", () => { + it("returns critical for HIBP source", () => { + expect(severityScore({ source: "hibp", dataType: "email" })).toBe("critical"); + }); + + it("returns critical for ssn data type", () => { + expect(severityScore({ source: "darkWebForum", dataType: "ssn" })).toBe("critical"); + }); + + it("returns warning for shodan source", () => { + expect(severityScore({ source: "shodan", dataType: "domain" })).toBe("warning"); + }); + + it("returns warning for email data type", () => { + expect(severityScore({ source: "securityTrails", dataType: "email" })).toBe("warning"); + }); + + it("returns info for low-risk combinations", () => { + expect(severityScore({ source: "securityTrails", dataType: "domain" })).toBe("info"); + }); +}); diff --git a/web/src/server/services/darkwatch/scan.engine.ts b/web/src/server/services/darkwatch/scan.engine.ts new file mode 100644 index 0000000..fb936b9 --- /dev/null +++ b/web/src/server/services/darkwatch/scan.engine.ts @@ -0,0 +1,188 @@ +import { createHash } from "node:crypto"; + +interface ScanResult { + source: "hibp" | "securityTrails" | "censys" | "shodan" | "darkWebForum"; + dataType: "email" | "phoneNumber" | "ssn" | "address" | "domain"; + identifier: string; + identifierHash: string; + metadata: Record; + detectedAt: Date; + severity: "info" | "warning" | "critical"; +} + +interface CircuitState { + failures: number; + lastFailure: number; + isOpen: boolean; +} + +const circuits = new Map(); +const THRESHOLD = 5; +const RESET_MS = 60_000; + +function isCircuitOpen(name: string): boolean { + const state = circuits.get(name); + if (!state) return false; + if (!state.isOpen) return false; + if (Date.now() - state.lastFailure > RESET_MS) { + circuits.delete(name); + return false; + } + return true; +} + +function recordFailure(name: string): void { + const state = circuits.get(name) ?? { failures: 0, lastFailure: 0, isOpen: false }; + state.failures++; + state.lastFailure = Date.now(); + if (state.failures >= THRESHOLD) { + state.isOpen = true; + } + circuits.set(name, state); +} + +function recordSuccess(name: string): void { + circuits.delete(name); +} + +async function fetchWithCircuit(name: string, url: string, headers: Record): Promise { + if (isCircuitOpen(name)) { + console.warn(`[darkwatch] Circuit open for ${name}, skipping`); + return null; + } + try { + const res = await fetch(url, { headers, signal: AbortSignal.timeout(10_000) }); + if (!res.ok) { + recordFailure(name); + console.warn(`[darkwatch] ${name} returned ${res.status}`); + return null; + } + recordSuccess(name); + return res; + } catch (err) { + recordFailure(name); + console.error(`[darkwatch] ${name} error:`, err); + return null; + } +} + +function hashValue(value: string): string { + return createHash("sha256").update(value.toLowerCase().trim()).digest("hex"); +} + +export async function scanHIBP(email: string): Promise { + const apiKey = process.env.HIBP_API_KEY; + if (!apiKey) { + console.warn("[darkwatch] HIBP_API_KEY not set, skipping HIBP scan"); + return []; + } + const res = await fetchWithCircuit( + "hibp", + `https://haveibeenpwned.com/api/v3/breachedaccount/${encodeURIComponent(email)}?truncateResponse=false`, + { "hibp-api-key": apiKey, "user-agent": "ShieldAI-DarkWatch" }, + ); + if (!res) return []; + const breaches = await res.json() as Array<{ Name: string; BreachDate: string; DataClasses: string[]; Description: string }>; + return breaches.map((b) => ({ + source: "hibp" as const, + dataType: "email" as const, + identifier: email, + identifierHash: hashValue(email), + metadata: { breachName: b.Name, breachDate: b.BreachDate, dataClasses: b.DataClasses, description: b.Description }, + detectedAt: new Date(b.BreachDate), + severity: "critical" as const, + })); +} + +export async function scanSecurityTrails(identifier: string): Promise { + const apiKey = process.env.SECURITYTRAILS_API_KEY; + if (!apiKey) { + console.warn("[darkwatch] SECURITYTRAILS_API_KEY not set, skipping"); + return []; + } + const domain = identifier.includes("@") ? identifier.split("@")[1] : identifier; + const res = await fetchWithCircuit( + "securitytrails", + `https://api.securitytrails.com/v1/domain/${encodeURIComponent(domain)}/subdomains`, + { APIKEY: apiKey }, + ); + if (!res) return []; + const data = await res.json() as { subdomains: string[] }; + return (data.subdomains ?? []).slice(0, 20).map((sub) => ({ + source: "securityTrails" as const, + dataType: "domain" as const, + identifier: `${sub}.${domain}`, + identifierHash: hashValue(`${sub}.${domain}`), + metadata: { subdomain: sub, domain }, + detectedAt: new Date(), + severity: "info" as const, + })); +} + +export async function scanCensys(query: string): Promise { + const apiKey = process.env.CENSYS_API_KEY; + if (!apiKey) { + console.warn("[darkwatch] CENSYS_API_KEY not set, skipping"); + return []; + } + const res = await fetchWithCircuit( + "censys", + `https://search.censys.io/api/v2/hosts/search?q=${encodeURIComponent(query)}&per_page=10`, + { Authorization: `Bearer ${apiKey}` }, + ); + if (!res) return []; + const data = await res.json() as { result?: { hits?: Array<{ ip: string; services?: Array<{ service_name: string; port: number }> }> } }; + const hits = data.result?.hits ?? []; + return hits.map((h) => ({ + source: "censys" as const, + dataType: "domain" as const, + identifier: h.ip, + identifierHash: hashValue(h.ip), + metadata: { ip: h.ip, services: h.services }, + detectedAt: new Date(), + severity: "warning" as const, + })); +} + +export async function scanShodan(query: string): Promise { + const apiKey = process.env.SHODAN_API_KEY; + if (!apiKey) { + console.warn("[darkwatch] SHODAN_API_KEY not set, skipping"); + return []; + } + const res = await fetchWithCircuit( + "shodan", + `https://api.shodan.io/shodan/host/search?key=${apiKey}&query=${encodeURIComponent(query)}&limit=10`, + {}, + ); + if (!res) return []; + const data = await res.json() as { matches?: Array<{ ip_str: string; port: number; org?: string; hostnames?: string[] }> }; + const matches = data.matches ?? []; + return matches.map((m) => ({ + source: "shodan" as const, + dataType: "domain" as const, + identifier: m.ip_str, + identifierHash: hashValue(m.ip_str), + metadata: { ip: m.ip_str, port: m.port, org: m.org, hostnames: m.hostnames }, + detectedAt: new Date(), + severity: "warning" as const, + })); +} + +export async function scanForums(identifier: string): Promise { + const forumEnabled = process.env.DARKWEB_FORUM_ENABLED; + if (!forumEnabled || forumEnabled !== "true") { + return []; + } + return [{ + source: "darkWebForum" as const, + dataType: (identifier.includes("@") ? "email" : "domain") as "email" | "domain", + identifier, + identifierHash: hashValue(identifier), + metadata: { note: "Forum scraping placeholder", identifier }, + detectedAt: new Date(), + severity: "warning" as const, + }]; +} + +export type { ScanResult };