This commit is contained in:
2026-06-03 14:05:49 -04:00
parent a07c004f2d
commit 8e953cdd7c
11 changed files with 1281 additions and 1221 deletions

View File

@@ -54,7 +54,9 @@ export async function POST(event: APIEvent) {
const { name, email, password } = body;
if (!email || !password) {
return new Response(
JSON.stringify({ message: "Name, email, and password are required" }),
JSON.stringify({
message: "Name, email, and password are required",
}),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}
@@ -77,7 +79,9 @@ export async function POST(event: APIEvent) {
const { identityToken, authorizationCode, userIdentifier } = body;
if (!identityToken || !authorizationCode) {
return new Response(
JSON.stringify({ message: "identityToken and authorizationCode are required" }),
JSON.stringify({
message: "identityToken and authorizationCode are required",
}),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}
@@ -162,10 +166,15 @@ export async function POST(event: APIEvent) {
);
}
} catch (error: any) {
const statusCode = error.code === "UNAUTHORIZED" ? 401
: error.code === "CONFLICT" ? 409
: error.code === "NOT_FOUND" ? 404
: error.code === "FORBIDDEN" ? 403
const statusCode =
error.code === "UNAUTHORIZED"
? 401
: error.code === "CONFLICT"
? 409
: error.code === "NOT_FOUND"
? 404
: error.code === "FORBIDDEN"
? 403
: 500;
return new Response(

View File

@@ -1,7 +1,11 @@
import { wrap } from "@typeschema/valibot";
import { object, string, minLength, email as emailVal } from "valibot";
import { TRPCError } from "@trpc/server";
import { createTRPCRouter, publicProcedure, protectedProcedure } from "../utils";
import {
createTRPCRouter,
publicProcedure,
protectedProcedure,
} from "../utils";
import {
UpdateUserSchema,
InviteMemberSchema,
@@ -130,11 +134,12 @@ export const userRouter = createTRPCRouter({
.mutation(async ({ ctx, input }) => {
const group = await getFamilyGroup(ctx.user.id);
const callerMember = group.members.find(
(m) => m.userId === ctx.user.id,
);
const callerMember = group.members.find((m) => m.userId === ctx.user.id);
if (!callerMember || (callerMember.role !== "owner" && callerMember.role !== "admin")) {
if (
!callerMember ||
(callerMember.role !== "owner" && callerMember.role !== "admin")
) {
throw new TRPCError({
code: "FORBIDDEN",
message: "Only owner or admin can invite members",

View File

@@ -1,6 +1,8 @@
import type { JobPayload, JobType } from "../queue";
export type JobHandler<T extends JobType = JobType> = (payload: JobPayload[T]) => Promise<void>;
export type JobHandler<T extends JobType = JobType> = (
payload: JobPayload[T],
) => Promise<void>;
export type HandlerMap = {
[K in JobType]: JobHandler<K>;

View File

@@ -17,7 +17,11 @@ export type JobPayload = {
"voiceprint.batch": { userId?: string; jobId?: string };
"hometitle.scan": { userId: string; subscriptionId: string };
"removebrokers.process": { subscriptionId?: string; requestId?: string };
"reports.generate": { userId: string; reportScheduleId?: string; reportType: string };
"reports.generate": {
userId: string;
reportScheduleId?: string;
reportType: string;
};
};
export type JobStatus = "pending" | "running" | "completed" | "failed";
@@ -40,7 +44,11 @@ export interface EnqueueOptions {
}
export interface QueueAdapter {
enqueue<T extends JobType>(type: T, payload: JobPayload[T], options?: EnqueueOptions): Promise<Job<T>>;
enqueue<T extends JobType>(
type: T,
payload: JobPayload[T],
options?: EnqueueOptions,
): Promise<Job<T>>;
dequeue(): Promise<Job | null>;
markComplete(jobId: string): Promise<void>;
markFailed(jobId: string, error: string): Promise<void>;
@@ -53,7 +61,11 @@ export class InMemoryQueue implements QueueAdapter {
private jobs = new Map<string, Job>();
private pendingQueue: string[] = [];
async enqueue<T extends JobType>(type: T, payload: JobPayload[T], options?: EnqueueOptions): Promise<Job<T>> {
async enqueue<T extends JobType>(
type: T,
payload: JobPayload[T],
options?: EnqueueOptions,
): Promise<Job<T>> {
const id = randomUUID();
const job: Job<T> = {
id,
@@ -130,9 +142,12 @@ function createRedisAdapter(): QueueAdapter {
const BullMQ = require("bullmq");
const IORedis = require("ioredis");
const connection = new IORedis.default(process.env.REDIS_URL ?? "redis://localhost:6379", {
const connection = new IORedis.default(
process.env.REDIS_URL ?? "redis://localhost:6379",
{
maxRetriesPerRequest: null,
});
},
);
const queue = new BullMQ.Queue("kordant-jobs", { connection });
const bullJobs = new Map<string, any>();
@@ -147,12 +162,18 @@ function createRedisAdapter(): QueueAdapter {
maxAttempts: bullJob.opts?.attempts ?? 3,
error: bullJob.failedReason ?? undefined,
createdAt: bullJob.timestamp ? new Date(bullJob.timestamp) : new Date(),
updatedAt: bullJob.processedOn ? new Date(bullJob.processedOn) : new Date(),
updatedAt: bullJob.processedOn
? new Date(bullJob.processedOn)
: new Date(),
};
}
return {
async enqueue<T extends JobType>(type: T, payload: JobPayload[T], options?: EnqueueOptions) {
async enqueue<T extends JobType>(
type: T,
payload: JobPayload[T],
options?: EnqueueOptions,
) {
const bullJob = await queue.add(type, payload, {
attempts: options?.maxAttempts ?? 3,
delay: options?.delay,
@@ -185,7 +206,9 @@ function createRedisAdapter(): QueueAdapter {
},
async getJobs(status) {
const states = status ? [status] : ["waiting", "active", "completed", "failed"];
const states = status
? [status]
: ["waiting", "active", "completed", "failed"];
const allJobs: Job[] = [];
for (const state of states) {
const jobs = await queue.getJobs(state);

View File

@@ -32,8 +32,6 @@ const envSchema = object({
// Email
RESEND_API_KEY: optional(string()),
// SMS
TWILIO_ACCOUNT_SID: optional(string()),
TWILIO_AUTH_TOKEN: optional(string()),

View File

@@ -62,7 +62,9 @@ describe("alert.publisher", () => {
(db.db.select as ReturnType<typeof vi.fn>).mockReturnValue({
from: vi.fn().mockReturnValue({
where: vi.fn().mockReturnValue({
limit: vi.fn().mockResolvedValue([{ id: "user-1", email: "user@example.com" }]),
limit: vi
.fn()
.mockResolvedValue([{ id: "user-1", email: "user@example.com" }]),
}),
}),
});

View File

@@ -14,7 +14,10 @@ export interface PublishableAlert {
createdAt: Date;
}
export async function publishAlert(userId: string, alert: PublishableAlert): Promise<void> {
export async function publishAlert(
userId: string,
alert: PublishableAlert,
): Promise<void> {
const message = {
type: "alert" as const,
alert: {
@@ -52,7 +55,10 @@ export async function publishAlert(userId: string, alert: PublishableAlert): Pro
}
}
export async function publishToGroup(userIds: string[], alert: PublishableAlert): Promise<void> {
export async function publishToGroup(
userIds: string[],
alert: PublishableAlert,
): Promise<void> {
const promises = userIds.map((userId) => publishAlert(userId, alert));
await Promise.allSettled(promises);
}

View File

@@ -55,7 +55,9 @@ export async function shouldDigest(
/**
* Calculates the next scheduled digest date based on config.
*/
export function calculateNextDigestDate(config: DigestConfig = DEFAULT_DIGEST_CONFIG): Date {
export function calculateNextDigestDate(
config: DigestConfig = DEFAULT_DIGEST_CONFIG,
): Date {
const now = new Date();
const next = new Date(now);
@@ -155,7 +157,9 @@ export async function sendDigestEmail(
await db
.update(digestAlerts)
.set({ sent: true, sentAt: new Date() })
.where(and(eq(digestAlerts.userId, userId), eq(digestAlerts.id, alertIds[0])));
.where(
and(eq(digestAlerts.userId, userId), eq(digestAlerts.id, alertIds[0])),
);
// Update all matching alerts
for (const alertId of alertIds) {
@@ -165,7 +169,9 @@ export async function sendDigestEmail(
.where(eq(digestAlerts.id, alertId));
}
console.log(`[digest] Sent digest to ${user.email} with ${pendingAlerts.length} alerts`);
console.log(
`[digest] Sent digest to ${user.email} with ${pendingAlerts.length} alerts`,
);
return pendingAlerts.length;
} catch (err) {
console.error(`[digest] Failed to send digest for user ${userId}:`, err);
@@ -193,11 +199,7 @@ export async function processDueDigests(): Promise<void> {
scheduledDate: digestAlerts.scheduledDigestDate,
})
.from(digestAlerts)
.where(
and(
eq(digestAlerts.sent, false),
),
);
.where(and(eq(digestAlerts.sent, false)));
// Group by user
const userMap = new Map<string, Date[]>();
@@ -221,9 +223,9 @@ export async function processDueDigests(): Promise<void> {
// ---------------------------------------------------------------------------
function groupBySeverity(
alerts: typeof digestAlerts.$InferInsert[],
): Record<string, typeof digestAlerts.$InferInsert[]> {
const groups: Record<string, typeof digestAlerts.$InferInsert[]> = {
alerts: (typeof digestAlerts.$InferInsert)[],
): Record<string, (typeof digestAlerts.$InferInsert)[]> {
const groups: Record<string, (typeof digestAlerts.$InferInsert)[]> = {
critical: [],
warning: [],
info: [],
@@ -242,7 +244,7 @@ function groupBySeverity(
}
function buildDigestEmailHTML(
groups: Record<string, typeof digestAlerts.$InferInsert[]>,
groups: Record<string, (typeof digestAlerts.$InferInsert)[]>,
total: number,
): string {
const sections = [];
@@ -289,10 +291,13 @@ function buildDigestEmailHTML(
}
function buildDigestPlainText(
groups: Record<string, typeof digestAlerts.$InferInsert[]>,
groups: Record<string, (typeof digestAlerts.$InferInsert)[]>,
total: number,
): string {
const lines = [`Kordant Security Digest — ${total} alert${total > 1 ? "s" : ""}`, ""];
const lines = [
`Kordant Security Digest — ${total} alert${total > 1 ? "s" : ""}`,
"",
];
for (const [key, alerts] of Object.entries(groups)) {
if (!alerts.length) continue;
@@ -321,13 +326,7 @@ function escapeHtml(str: string): string {
export async function cleanupOldDigests(): Promise<void> {
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
await db
.delete(digestAlerts)
.where(
and(
eq(digestAlerts.sent, true),
),
);
await db.delete(digestAlerts).where(and(eq(digestAlerts.sent, true)));
console.log(`[digest] Cleaned up old digest records`);
}

View File

@@ -37,10 +37,7 @@ export async function createUserWithPassword(
return { user, sessionToken: session.sessionToken, accessToken };
}
export async function authenticateUser(
email: string,
password: string,
) {
export async function authenticateUser(email: string, password: string) {
const [user] = await db
.select()
.from(users)
@@ -70,8 +67,6 @@ export async function authenticateUser(
const APPLE_ISSUER = "https://appleid.apple.com";
const APPLE_JWKS_URL = new URL("https://appleid.apple.com/auth/keys");
/**
* Verifies an Apple identity token and authenticates the user.
* If the user does not exist, creates a new account.
@@ -90,14 +85,18 @@ export async function authenticateWithApple(
}
// Verify Apple ID token using Apple's JWKS
let payload: { sub: string; email?: string; is_private_email?: string; };
let payload: { sub: string; email?: string; is_private_email?: string };
try {
const JWKS = createRemoteJWKSet(APPLE_JWKS_URL);
const result = await jwtVerify(identityToken, JWKS, {
issuer: APPLE_ISSUER,
audience: process.env.IOS_BUNDLE_ID ?? "com.frenocorp.kordant",
});
payload = result.payload as unknown as { sub: string; email?: string; is_private_email?: string; };
payload = result.payload as unknown as {
sub: string;
email?: string;
is_private_email?: string;
};
} catch (err) {
throw new TRPCError({
code: "UNAUTHORIZED",
@@ -193,14 +192,30 @@ export async function authenticateWithApple(
// Create session and JWT
const session = await createSession(userId);
const accessToken = await signJWT({ sub: userId }, { expiresIn: "7d" });
const refreshToken = await signJWT({ sub: userId, type: "refresh" }, { expiresIn: "30d" });
const refreshToken = await signJWT(
{ sub: userId, type: "refresh" },
{ expiresIn: "30d" },
);
const [user] = await db.select().from(users).where(eq(users.id, userId)).limit(1);
const [user] = await db
.select()
.from(users)
.where(eq(users.id, userId))
.limit(1);
if (!user) {
throw new TRPCError({ code: "NOT_FOUND", message: "User not found after creation" });
throw new TRPCError({
code: "NOT_FOUND",
message: "User not found after creation",
});
}
return { user, sessionToken: session.sessionToken, accessToken, refreshToken, isNewUser };
return {
user,
sessionToken: session.sessionToken,
accessToken,
refreshToken,
isNewUser,
};
}
/**
@@ -241,7 +256,10 @@ export async function refreshAccessToken(refreshToken: string) {
}
const newAccessToken = await signJWT({ sub: userId }, { expiresIn: "7d" });
const newRefreshToken = await signJWT({ sub: userId, type: "refresh" }, { expiresIn: "30d" });
const newRefreshToken = await signJWT(
{ sub: userId, type: "refresh" },
{ expiresIn: "30d" },
);
return { accessToken: newAccessToken, refreshToken: newRefreshToken };
}
@@ -323,9 +341,7 @@ export async function resetPassword(token: string, newPassword: string) {
*/
export async function revokeUserSessions(userId: string) {
const { sessions } = await import("~/server/db/schema/auth");
await db
.delete(sessions)
.where(eq(sessions.userId, userId));
await db.delete(sessions).where(eq(sessions.userId, userId));
return { success: true };
}