- Created waitlist_signups and waitlist_events tables - Supports email, name, source tracking, and status management - Enables VIP supporter list for Product Hunt launch - Migration 0002_chemical_shocker.sql generated - Fixed brand color in product-hunt-assets-brief.md (#518ac8)
237 lines
6.5 KiB
TypeScript
237 lines
6.5 KiB
TypeScript
/**
|
|
* WebSocket Server for Yjs CRDT Sync
|
|
* Node.js server using y-websocket adapter
|
|
*/
|
|
|
|
import { WebSocketServer } from 'ws';
|
|
import { applyUpdate, encodeStateAsUpdate, Doc } from 'yjs';
|
|
|
|
type DocMessage = {
|
|
type: 'sync';
|
|
args: [Uint8Array];
|
|
};
|
|
|
|
type SyncStep1Message = {
|
|
type: 'sync';
|
|
args: [Uint8Array];
|
|
};
|
|
|
|
type SyncStep2Message = {
|
|
type: 'sync';
|
|
args: [Uint8Array, Uint8Array];
|
|
};
|
|
|
|
type UpdateMessage = {
|
|
type: 'update';
|
|
args: [Uint8Array];
|
|
};
|
|
|
|
export type Message = DocMessage | SyncStep1Message | SyncStep2Message | UpdateMessage;
|
|
|
|
// Store document states in memory (in production, use Redis or persistent storage)
|
|
const docs: Map<string, Uint8Array> = new Map();
|
|
const clients: Map<string, Set<WebSocket>> = new Map();
|
|
|
|
interface WebSocketWithDoc extends WebSocket {
|
|
docName?: string;
|
|
}
|
|
|
|
/**
|
|
* Initialize the WebSocket server
|
|
*/
|
|
export function createWebSocketServer(
|
|
port: number,
|
|
options: {
|
|
authMiddleware?: (token: string) => Promise<{ userId: string; projectId: string }>;
|
|
} = {}
|
|
): WebSocketServer {
|
|
const { authMiddleware } = options;
|
|
|
|
const server = new WebSocketServer({ port });
|
|
|
|
server.on('connection', async (ws: WebSocketWithDoc, req) => {
|
|
// Extract document name from URL query params
|
|
const url = new URL(req.url || '', `http://${req.headers.host}`);
|
|
const docName = url.pathname.split('/').pop() || 'default';
|
|
|
|
// Validate origin to prevent WebSocket CSRF
|
|
const origin = req.headers.origin;
|
|
if (authMiddleware && origin) {
|
|
const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'];
|
|
if (!allowedOrigins.includes(origin)) {
|
|
console.warn(`Origin validation failed: ${origin}`);
|
|
ws.close(4002);
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Authenticate connection if auth middleware provided
|
|
const token = url.searchParams.get('token');
|
|
let userId: string | undefined;
|
|
let projectId: string | undefined;
|
|
|
|
if (authMiddleware) {
|
|
if (!token) {
|
|
console.warn('Authentication required but no token provided');
|
|
ws.send(JSON.stringify({ type: 'error', message: 'Authentication required' }));
|
|
ws.close(4001);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const auth = await authMiddleware(token);
|
|
userId = auth.userId;
|
|
projectId = auth.projectId;
|
|
console.log(`WebSocket connection authenticated: ${userId} for ${docName}`);
|
|
} catch (error) {
|
|
console.error('Authentication failed:', error);
|
|
ws.send(JSON.stringify({ type: 'error', message: 'Authentication failed' }));
|
|
ws.close();
|
|
return;
|
|
}
|
|
}
|
|
|
|
ws.docName = docName;
|
|
|
|
// Initialize document state if not exists
|
|
if (!docs.has(docName)) {
|
|
docs.set(docName, new Uint8Array());
|
|
clients.set(docName, new Set());
|
|
}
|
|
|
|
// Add client to the document's client set
|
|
clients.get(docName)!.add(ws);
|
|
|
|
// Send initial sync
|
|
const initialState = docs.get(docName)!;
|
|
ws.send(encodeSyncStep1(initialState));
|
|
|
|
// Handle incoming messages
|
|
ws.on('message', (data: Buffer | ArrayBuffer) => {
|
|
handleMessage(ws, docName, data, userId, projectId);
|
|
});
|
|
|
|
// Handle disconnection
|
|
ws.on('close', () => {
|
|
clients.get(docName)?.delete(ws);
|
|
console.log(`Client disconnected from ${docName}. Remaining clients: ${clients.get(docName)?.size || 0}`);
|
|
});
|
|
|
|
console.log(`Client connected to ${docName}${userId ? ` (user: ${userId})` : ''}`);
|
|
});
|
|
|
|
return server;
|
|
}
|
|
|
|
/**
|
|
* Encode sync step 1 (send document state as binary)
|
|
*/
|
|
function encodeSyncStep1(state: Uint8Array): Uint8Array {
|
|
return state;
|
|
}
|
|
|
|
/**
|
|
* Handle incoming WebSocket message
|
|
*/
|
|
function handleMessage(
|
|
ws: WebSocketWithDoc,
|
|
docName: string,
|
|
data: Buffer | ArrayBuffer,
|
|
userId?: string,
|
|
projectId?: string
|
|
) {
|
|
try {
|
|
const message = JSON.parse(data.toString()) as Message;
|
|
|
|
switch (message.type) {
|
|
case 'sync':
|
|
handleSync(ws, docName, message, userId, projectId);
|
|
break;
|
|
|
|
case 'update':
|
|
handleUpdate(ws, docName, message, userId);
|
|
break;
|
|
}
|
|
} catch (error) {
|
|
console.error('Error handling message:', error);
|
|
ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format' }));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle sync message
|
|
*/
|
|
function handleSync(ws: WebSocketWithDoc, docName: string, message: SyncStep1Message | SyncStep2Message) {
|
|
const currentState = docs.get(docName) || new Uint8Array();
|
|
|
|
if (message.args.length === 1) {
|
|
// Sync step 1: client sends its state, server responds with full state
|
|
const clientState = message.args[0];
|
|
|
|
// Send full document state to client
|
|
const response = encodeSyncStep1(currentState);
|
|
ws.send(response);
|
|
} else if (message.args.length === 2) {
|
|
// Sync step 2: client sends its state, server sends missing updates
|
|
const clientState = message.args[0];
|
|
|
|
// Calculate missing updates (simplified - in production use Yjs protocol)
|
|
const missingUpdates = currentState;
|
|
|
|
const response = JSON.stringify({
|
|
type: 'sync',
|
|
args: [Array.from(missingUpdates)],
|
|
});
|
|
ws.send(new TextEncoder().encode(response));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle update message
|
|
*/
|
|
function handleUpdate(ws: WebSocketWithDoc, docName: string, message: UpdateMessage) {
|
|
const update = message.args[0];
|
|
let currentState = docs.get(docName) || new Uint8Array();
|
|
|
|
// Apply update to document state
|
|
try {
|
|
const doc = new Doc();
|
|
applyUpdate(doc, update);
|
|
currentState = encodeStateAsUpdate(doc);
|
|
docs.set(docName, currentState);
|
|
|
|
console.log(`Update applied to ${docName}. Size: ${currentState.length} bytes`);
|
|
|
|
// Broadcast update to all other clients
|
|
const broadcastMsg = {
|
|
type: 'update',
|
|
args: [Array.from(update)],
|
|
};
|
|
|
|
clients.get(docName)?.forEach(client => {
|
|
if (client !== ws && client.readyState === WebSocket.OPEN) {
|
|
client.send(new TextEncoder().encode(JSON.stringify(broadcastMsg)));
|
|
}
|
|
});
|
|
} catch (error) {
|
|
console.error('Error applying update:', error);
|
|
ws.send(JSON.stringify({ type: 'error', message: 'Failed to apply update' }));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get document stats (for monitoring)
|
|
*/
|
|
export function getDocStats(): Record<string, { clientCount: number; stateSize: number }> {
|
|
const stats: Record<string, { clientCount: number; stateSize: number }> = {};
|
|
|
|
docs.forEach((state, docName) => {
|
|
stats[docName] = {
|
|
clientCount: clients.get(docName)?.size || 0,
|
|
stateSize: state.length,
|
|
};
|
|
});
|
|
|
|
return stats;
|
|
}
|