FRE-4521 Implement Redis integration for rate limiting and deduplication

- Add ioredis dependency for Redis connection pooling
- Create RedisService singleton with connection management
- Add Redis config (url, dedupWindowSeconds) to notification.config.ts
- Implement NotificationService.checkRateLimit using Redis INCR+EXPIRE
- Implement NotificationService.deduplicateNotification using Redis SET/NX
- Add configurable rate limit windows and thresholds via env vars
- Add 29 unit tests covering Redis operations, rate limiting, and dedup
- All tests pass, TypeScript compiles cleanly for new files
This commit is contained in:
Senior Engineer
2026-05-01 16:13:17 -04:00
committed by Michael Freno
parent 7aed2d8b2b
commit 574bcf2264
9 changed files with 598 additions and 3 deletions

View File

@@ -14,7 +14,8 @@
"firebase-admin": "^12.0.0",
"twilio": "^4.0.0",
"zod": "^3.22.0",
"express": "^4.18.0"
"express": "^4.18.0",
"ioredis": "^5.4.0"
},
"devDependencies": {
"@types/express": "^4.17.0",

View File

@@ -25,6 +25,11 @@ export const NotificationConfigSchema = z.object({
emailPerMinute: z.number().default(60),
smsPerMinute: z.number().default(30),
pushPerMinute: z.number().default(100),
windowSeconds: z.number().default(60),
}),
redis: z.object({
url: z.string().default('redis://localhost:6379'),
dedupWindowSeconds: z.number().default(300),
}),
});
@@ -55,5 +60,10 @@ export const loadNotificationConfig = (): NotificationConfig => ({
emailPerMinute: parseInt(process.env.EMAIL_RATE_LIMIT || '60', 10),
smsPerMinute: parseInt(process.env.SMS_RATE_LIMIT || '30', 10),
pushPerMinute: parseInt(process.env.PUSH_RATE_LIMIT || '100', 10),
windowSeconds: parseInt(process.env.RATE_LIMIT_WINDOW_SECONDS || '60', 10),
},
redis: {
url: process.env.REDIS_URL || 'redis://localhost:6379',
dedupWindowSeconds: parseInt(process.env.DEDUP_WINDOW_SECONDS || '300', 10),
},
});

View File

@@ -1,7 +1,8 @@
export { EmailService } from './services/email.service';
export { SMSService } from './services/sms.service';
export { PushService } from './services/push.service';
export { NotificationService } from './services/notification.service';
export { NotificationService, RateLimitResult } from './services/notification.service';
export { RedisService } from './services/redis.service';
export { TemplateService } from './services/template.service';
export { loadNotificationConfig, NotificationConfigSchema } from './config/notification.config';
export { notificationRoutes } from './routes/notification.routes';

View File

@@ -0,0 +1,321 @@
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { NotificationService } from '../services/notification.service';
import { RedisService } from '../services/redis.service';
vi.mock('../services/email.service', () => ({
EmailService: {
getInstance: vi.fn(() => ({
send: vi.fn(async () => ({
notificationId: 'email-123',
channel: 'email',
status: 'sent',
externalId: 'ext-123',
})),
getRateLimitStatus: vi.fn(() => ({ remaining: 50, limit: 60 })),
})),
},
}));
vi.mock('../services/sms.service', () => ({
SMSService: {
getInstance: vi.fn(() => ({
send: vi.fn(async () => ({
notificationId: 'sms-123',
channel: 'sms',
status: 'sent',
externalId: 'ext-456',
})),
getRateLimitStatus: vi.fn(() => ({ remaining: 25, limit: 30 })),
})),
},
}));
vi.mock('../services/push.service', () => ({
PushService: {
getInstance: vi.fn(() => ({
send: vi.fn(async () => ({
notificationId: 'push-123',
channel: 'push',
status: 'sent',
externalId: 'ext-789',
})),
getRateLimitStatus: vi.fn(() => ({ remaining: 90, limit: 100 })),
})),
},
}));
vi.mock('../config/notification.config', () => ({
loadNotificationConfig: vi.fn(() => ({
resend: { apiKey: 'test', baseUrl: 'https://api.resend.com' },
fcm: { privateKey: 'test', projectId: 'test', clientEmail: 'test@test.com' },
apns: { key: 'test', keyId: 'test', teamId: 'test', bundleId: 'test' },
twilio: { accountSid: 'test', authToken: 'test', messagingServiceSid: 'test' },
rateLimits: {
emailPerMinute: 60,
smsPerMinute: 30,
pushPerMinute: 100,
windowSeconds: 60,
},
redis: {
url: 'redis://localhost:6379',
dedupWindowSeconds: 300,
},
})),
}));
describe('NotificationService - Rate Limiting', () => {
let notificationService: NotificationService;
let redis: RedisService;
beforeEach(async () => {
vi.clearAllMocks();
process.env.EMAIL_RATE_LIMIT = '60';
process.env.SMS_RATE_LIMIT = '30';
process.env.PUSH_RATE_LIMIT = '100';
process.env.RATE_LIMIT_WINDOW_SECONDS = '60';
process.env.DEDUP_WINDOW_SECONDS = '300';
redis = RedisService.getInstance({ url: 'redis://localhost:6379' });
await redis.getClient().flushdb();
notificationService = NotificationService.getInstance();
});
afterEach(async () => {
await redis.getClient().flushdb();
});
describe('checkRateLimit', () => {
it('should allow request within limit', async () => {
const result = await notificationService.checkRateLimit('user-1', 'email');
expect(result.allowed).toBe(true);
expect(result.currentCount).toBe(1);
expect(result.limit).toBe(60);
expect(result.remaining).toBe(59);
expect(result.resetInSeconds).toBeGreaterThan(0);
});
it('should allow multiple requests within limit', async () => {
for (let i = 0; i < 5; i++) {
const result = await notificationService.checkRateLimit('user-2', 'email');
expect(result.allowed).toBe(true);
expect(result.currentCount).toBe(i + 1);
expect(result.remaining).toBe(60 - (i + 1));
}
});
it('should deny request when limit is exceeded', async () => {
for (let i = 0; i < 60; i++) {
await notificationService.checkRateLimit('user-3', 'email');
}
const result = await notificationService.checkRateLimit('user-3', 'email');
expect(result.allowed).toBe(false);
expect(result.currentCount).toBe(61);
expect(result.remaining).toBe(0);
});
it('should use custom limit when provided', async () => {
const result = await notificationService.checkRateLimit('user-4', 'email', 5);
expect(result.limit).toBe(5);
expect(result.currentCount).toBe(1);
expect(result.remaining).toBe(4);
});
it('should use custom window when provided', async () => {
const result = await notificationService.checkRateLimit('user-5', 'email', 10, 120);
expect(result.resetInSeconds).toBeLessThanOrEqual(120);
expect(result.resetInSeconds).toBeGreaterThan(0);
});
it('should use SMS default limit', async () => {
const result = await notificationService.checkRateLimit('user-6', 'sms');
expect(result.limit).toBe(30);
expect(result.allowed).toBe(true);
});
it('should use push default limit', async () => {
const result = await notificationService.checkRateLimit('user-7', 'push');
expect(result.limit).toBe(100);
expect(result.allowed).toBe(true);
});
it('should track different identifiers independently', async () => {
await notificationService.checkRateLimit('user-a', 'email');
await notificationService.checkRateLimit('user-a', 'email');
const resultA = await notificationService.checkRateLimit('user-a', 'email');
expect(resultA.currentCount).toBe(3);
const resultB = await notificationService.checkRateLimit('user-b', 'email');
expect(resultB.currentCount).toBe(1);
});
it('should track different channels independently', async () => {
await notificationService.checkRateLimit('user-8', 'email');
await notificationService.checkRateLimit('user-8', 'email');
const emailResult = await notificationService.checkRateLimit('user-8', 'email');
expect(emailResult.currentCount).toBe(3);
const smsResult = await notificationService.checkRateLimit('user-8', 'sms');
expect(smsResult.currentCount).toBe(1);
});
});
});
describe('NotificationService - Deduplication', () => {
let notificationService: NotificationService;
let redis: RedisService;
beforeEach(async () => {
vi.clearAllMocks();
process.env.EMAIL_RATE_LIMIT = '60';
process.env.SMS_RATE_LIMIT = '30';
process.env.PUSH_RATE_LIMIT = '100';
process.env.RATE_LIMIT_WINDOW_SECONDS = '60';
process.env.DEDUP_WINDOW_SECONDS = '300';
redis = RedisService.getInstance({ url: 'redis://localhost:6379' });
await redis.getClient().flushdb();
notificationService = NotificationService.getInstance();
});
afterEach(async () => {
await redis.getClient().flushdb();
});
describe('deduplicateNotification', () => {
it('should return true for first notification', async () => {
const result = await notificationService.deduplicateNotification({
userId: 'user-1',
templateId: 'welcome-email',
key: 'initial',
});
expect(result).toBe(true);
});
it('should return false for duplicate notification', async () => {
await notificationService.deduplicateNotification({
userId: 'user-2',
templateId: 'welcome-email',
key: 'initial',
});
const result = await notificationService.deduplicateNotification({
userId: 'user-2',
templateId: 'welcome-email',
key: 'initial',
});
expect(result).toBe(false);
});
it('should allow different keys for same user', async () => {
await notificationService.deduplicateNotification({
userId: 'user-3',
templateId: 'welcome-email',
key: 'initial',
});
const result = await notificationService.deduplicateNotification({
userId: 'user-3',
templateId: 'welcome-email',
key: 'followup',
});
expect(result).toBe(true);
});
it('should allow different users with same template', async () => {
await notificationService.deduplicateNotification({
userId: 'user-4',
templateId: 'welcome-email',
key: 'initial',
});
const result = await notificationService.deduplicateNotification({
userId: 'user-5',
templateId: 'welcome-email',
key: 'initial',
});
expect(result).toBe(true);
});
it('should use custom window when provided', async () => {
const result = await notificationService.deduplicateNotification(
{
userId: 'user-6',
templateId: 'test',
key: 'custom-window',
},
60
);
expect(result).toBe(true);
const ttl = await redis.getTTL('dedup:user-6:test:custom-window');
expect(ttl).toBeLessThanOrEqual(60);
expect(ttl).toBeGreaterThan(0);
});
it('should use windowSeconds from dedupKey', async () => {
const result = await notificationService.deduplicateNotification({
userId: 'user-7',
templateId: 'test',
key: 'key-window',
windowSeconds: 120,
});
expect(result).toBe(true);
const ttl = await redis.getTTL('dedup:user-7:test:key-window');
expect(ttl).toBeLessThanOrEqual(120);
expect(ttl).toBeGreaterThan(0);
});
it('should use default dedup window from config', async () => {
const result = await notificationService.deduplicateNotification({
userId: 'user-8',
templateId: 'test',
key: 'default-window',
});
expect(result).toBe(true);
const ttl = await redis.getTTL('dedup:user-8:test:default-window');
expect(ttl).toBeLessThanOrEqual(300);
expect(ttl).toBeGreaterThan(0);
});
});
});
describe('NotificationService - Rate Limit Config', () => {
let notificationService: NotificationService;
beforeEach(() => {
vi.clearAllMocks();
process.env.EMAIL_RATE_LIMIT = '60';
process.env.SMS_RATE_LIMIT = '30';
process.env.PUSH_RATE_LIMIT = '100';
process.env.RATE_LIMIT_WINDOW_SECONDS = '60';
process.env.DEDUP_WINDOW_SECONDS = '300';
notificationService = NotificationService.getInstance();
});
describe('getRateLimitConfig', () => {
it('should return configured rate limits', () => {
const config = notificationService.getRateLimitConfig();
expect(config.emailPerMinute).toBe(60);
expect(config.smsPerMinute).toBe(30);
expect(config.pushPerMinute).toBe(100);
expect(config.windowSeconds).toBe(60);
});
});
});

View File

@@ -2,6 +2,8 @@ import { EmailService } from './email.service';
import { SMSService } from './sms.service';
import { PushService } from './push.service';
import { TemplateService } from './template.service';
import { RedisService } from './redis.service';
import { loadNotificationConfig } from '../config/notification.config';
import type {
Notification,
NotificationChannel,
@@ -11,11 +13,21 @@ import type {
} from '../types/notification.types';
import type { TemplateResolutionOptions } from '../types/template.types';
export interface RateLimitResult {
allowed: boolean;
currentCount: number;
limit: number;
remaining: number;
resetInSeconds: number;
}
export class NotificationService {
private static instance: NotificationService;
private emailService: EmailService;
private smsService: SMSService;
private pushService: PushService;
private redisService: RedisService;
private config: ReturnType<typeof loadNotificationConfig>;
private pendingDeduplication = new Map<string, Set<string>>();
private preferenceCache = new Map<string, NotificationPreference>();
@@ -23,6 +35,8 @@ export class NotificationService {
this.emailService = EmailService.getInstance();
this.smsService = SMSService.getInstance();
this.pushService = PushService.getInstance();
this.config = loadNotificationConfig();
this.redisService = RedisService.getInstance({ url: this.config.redis.url });
}
static getInstance(): NotificationService {
@@ -204,7 +218,65 @@ export class NotificationService {
}
}
async checkRateLimit(
identifier: string,
channel: NotificationChannel,
customLimit?: number,
customWindowSeconds?: number
): Promise<RateLimitResult> {
const limit = customLimit || this.getLimitForChannel(channel);
const windowSeconds = customWindowSeconds || this.config.rateLimits.windowSeconds;
const key = `rl:${channel}:${identifier}`;
const currentCount = await this.redisService.increment(key, windowSeconds);
const ttl = await this.redisService.getTTL(key);
return {
allowed: currentCount <= limit,
currentCount,
limit,
remaining: Math.max(0, limit - currentCount),
resetInSeconds: Math.max(1, ttl),
};
}
async deduplicateNotification(
dedupKey: DeduplicationKey,
customWindowSeconds?: number
): Promise<boolean> {
const dedupId = `dedup:${dedupKey.userId}:${dedupKey.templateId}:${dedupKey.key}`;
const windowSeconds = customWindowSeconds || dedupKey.windowSeconds || this.config.redis.dedupWindowSeconds;
const wasSet = await this.redisService.setIfNotExists(dedupId, '1', windowSeconds);
return wasSet;
}
getRateLimitConfig(): {
emailPerMinute: number;
smsPerMinute: number;
pushPerMinute: number;
windowSeconds: number;
} {
return {
emailPerMinute: this.config.rateLimits.emailPerMinute,
smsPerMinute: this.config.rateLimits.smsPerMinute,
pushPerMinute: this.config.rateLimits.pushPerMinute,
windowSeconds: this.config.rateLimits.windowSeconds,
};
}
getTemplateService(): TemplateService {
return TemplateService.getInstance();
}
private getLimitForChannel(channel: NotificationChannel): number {
switch (channel) {
case 'email':
return this.config.rateLimits.emailPerMinute;
case 'sms':
return this.config.rateLimits.smsPerMinute;
case 'push':
return this.config.rateLimits.pushPerMinute;
}
}
}

View File

@@ -0,0 +1,107 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { RedisService } from '../services/redis.service';
describe('RedisService', () => {
let redis: RedisService;
beforeEach(async () => {
redis = RedisService.getInstance({ url: 'redis://localhost:6379' });
});
afterEach(async () => {
const client = redis.getClient();
await client.flushdb();
});
describe('increment', () => {
it('should increment counter and set expiry', async () => {
const count = await redis.increment('test:key', 60);
expect(count).toBe(1);
const count2 = await redis.increment('test:key', 60);
expect(count2).toBe(2);
});
it('should set TTL on first increment', async () => {
await redis.increment('test:ttl', 60);
const ttl = await redis.getTTL('test:ttl');
expect(ttl).toBeGreaterThan(0);
expect(ttl).toBeLessThanOrEqual(60);
});
});
describe('getCounter', () => {
it('should return 0 for non-existent key', async () => {
const count = await redis.getCounter('test:nonexistent');
expect(count).toBe(0);
});
it('should return current count', async () => {
await redis.increment('test:counter', 60);
await redis.increment('test:counter', 60);
const count = await redis.getCounter('test:counter');
expect(count).toBe(2);
});
});
describe('setIfNotExists', () => {
it('should return true on first set', async () => {
const result = await redis.setIfNotExists('test:unique', 'value', 60);
expect(result).toBe(true);
});
it('should return false on duplicate set', async () => {
await redis.setIfNotExists('test:dup', 'value', 60);
const result = await redis.setIfNotExists('test:dup', 'value', 60);
expect(result).toBe(false);
});
it('should set TTL', async () => {
await redis.setIfNotExists('test:ttl', 'value', 60);
const ttl = await redis.getTTL('test:ttl');
expect(ttl).toBeGreaterThan(0);
expect(ttl).toBeLessThanOrEqual(60);
});
});
describe('setWithExpiry', () => {
it('should set value with expiry', async () => {
const result = await redis.setWithExpiry('test:ex', 'value', 60);
expect(result).toBe('OK');
const value = await redis.get('test:ex');
expect(value).toBe('value');
});
});
describe('get', () => {
it('should return null for non-existent key', async () => {
const value = await redis.get('test:missing');
expect(value).toBeNull();
});
it('should return value for existing key', async () => {
await redis.setWithExpiry('test:get', 'hello', 60);
const value = await redis.get('test:get');
expect(value).toBe('hello');
});
});
describe('delete', () => {
it('should delete a key', async () => {
await redis.setWithExpiry('test:del', 'value', 60);
const deleted = await redis.delete('test:del');
expect(deleted).toBe(1);
const value = await redis.get('test:del');
expect(value).toBeNull();
});
});
describe('isConnected', () => {
it('should return connection status', async () => {
const connected = await redis.isConnected();
expect(connected).toBe(true);
});
});
});

View File

@@ -0,0 +1,73 @@
import { Redis, RedisOptions } from 'ioredis';
export interface RedisServiceConfig {
url?: string;
options?: RedisOptions;
}
export class RedisService {
private static instance: RedisService;
private client: Redis;
private constructor(config?: RedisServiceConfig) {
const url = config?.url || process.env.REDIS_URL || 'redis://localhost:6379';
this.client = new Redis(url, config?.options || {});
}
static getInstance(config?: RedisServiceConfig): RedisService {
if (!RedisService.instance) {
RedisService.instance = new RedisService(config);
}
return RedisService.instance;
}
getClient(): Redis {
return this.client;
}
async isConnected(): Promise<boolean> {
return this.client.status === 'ready';
}
async increment(key: string, expirySeconds?: number): Promise<number> {
const current = await this.client.incr(key);
if (current === 1 && expirySeconds) {
await this.client.expire(key, expirySeconds);
}
return current;
}
async getCounter(key: string): Promise<number> {
const value = await this.client.get(key);
return value ? parseInt(value, 10) : 0;
}
async setWithExpiry(key: string, value: string, seconds: number): Promise<'OK' | 1> {
return this.client.setex(key, seconds, value);
}
async setIfNotExists(key: string, value: string, seconds: number): Promise<boolean> {
const result = await this.client.set(key, value, 'EX', seconds, 'NX');
return result === 'OK';
}
async get(key: string): Promise<string | null> {
return this.client.get(key);
}
async ttl(key: string): Promise<number> {
return this.client.ttl(key);
}
async delete(key: string): Promise<number> {
return this.client.del(key);
}
async getTTL(key: string): Promise<number> {
return this.client.ttl(key);
}
async close(): Promise<void> {
await this.client.quit();
}
}

View File

@@ -86,5 +86,6 @@ export interface DeduplicationKey {
userId: string;
templateId: string;
key: string;
windowMinutes: number;
windowMinutes?: number;
windowSeconds?: number;
}

View File

@@ -0,0 +1,9 @@
import { defineConfig } from 'vitest/config';
export default defineConfig({
test: {
globals: true,
environment: 'node',
include: ['src/**/*.test.ts'],
},
});