FRE-600: Implement Phase 1 WebSocket + Yjs CRDT foundation
- Create TypeScript and Vite configuration for SolidJS - Implement Yjs document structure for screenplay collaboration - Build WebSocket connection manager with exponential backoff reconnection - Create CRDT document manager with undo/redo support - Set up WebSocket sync server with JWT authentication - Add SolidJS reactive bindings for Yjs shared types - Build collaborative editor component - Write unit tests for CRDT operations - Document implementation in analysis/fre600_websocket_foundation.md Architecture: Yjs chosen over Automerge for better ecosystem and Tauri compatibility. WebSocket for sync, WebRTC for video. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
215
server/websocket/server.ts
Normal file
215
server/websocket/server.ts
Normal file
@@ -0,0 +1,215 @@
|
||||
/**
|
||||
* WebSocket Server for Yjs CRDT Sync
|
||||
* Node.js server using y-websocket adapter
|
||||
*/
|
||||
|
||||
import { WebSocketServer } from 'ws';
|
||||
import { applyUpdate, encodeStateAsUpdate } from 'yjs';
|
||||
import { decode } from 'yjs/lib/index.js';
|
||||
|
||||
type DocMessage = {
|
||||
type: 'sync';
|
||||
args: [Uint8Array];
|
||||
};
|
||||
|
||||
type SyncStep1Message = {
|
||||
type: 'sync';
|
||||
args: [Uint8Array];
|
||||
};
|
||||
|
||||
type SyncStep2Message = {
|
||||
type: 'sync';
|
||||
args: [Uint8Array, Uint8Array];
|
||||
};
|
||||
|
||||
type UpdateMessage = {
|
||||
type: 'update';
|
||||
args: [Uint8Array];
|
||||
};
|
||||
|
||||
export type Message = DocMessage | SyncStep1Message | SyncStep2Message | UpdateMessage;
|
||||
|
||||
// Store document states in memory (in production, use Redis or persistent storage)
|
||||
const docs: Map<string, Uint8Array> = new Map();
|
||||
const clients: Map<string, Set<WebSocket>> = new Map();
|
||||
|
||||
interface WebSocketWithDoc extends WebSocket {
|
||||
docName?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the WebSocket server
|
||||
*/
|
||||
export function createWebSocketServer(
|
||||
port: number,
|
||||
options: {
|
||||
authMiddleware?: (token: string) => Promise<{ userId: string; projectId: string }>;
|
||||
} = {}
|
||||
): WebSocketServer {
|
||||
const { authMiddleware } = options;
|
||||
|
||||
const server = new WebSocketServer({ port });
|
||||
|
||||
server.on('connection', async (ws: WebSocketWithDoc, req) => {
|
||||
// Extract document name from URL query params
|
||||
const url = new URL(req.url || '', `http://${req.headers.host}`);
|
||||
const docName = url.pathname.split('/').pop() || 'default';
|
||||
|
||||
// Authenticate connection if auth middleware provided
|
||||
const token = url.searchParams.get('token');
|
||||
let userId: string | undefined;
|
||||
|
||||
if (authMiddleware && token) {
|
||||
try {
|
||||
const auth = await authMiddleware(token);
|
||||
userId = auth.userId;
|
||||
console.log(`WebSocket connection authenticated: ${userId} for ${docName}`);
|
||||
} catch (error) {
|
||||
console.error('Authentication failed:', error);
|
||||
ws.send(JSON.stringify({ type: 'error', message: 'Authentication failed' }));
|
||||
ws.close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ws.docName = docName;
|
||||
|
||||
// Initialize document state if not exists
|
||||
if (!docs.has(docName)) {
|
||||
docs.set(docName, new Uint8Array());
|
||||
clients.set(docName, new Set());
|
||||
}
|
||||
|
||||
// Add client to the document's client set
|
||||
clients.get(docName)!.add(ws);
|
||||
|
||||
// Send initial sync
|
||||
const initialState = docs.get(docName)!;
|
||||
ws.send(encodeSyncStep1(initialState));
|
||||
|
||||
// Handle incoming messages
|
||||
ws.on('message', (data) => {
|
||||
handleMessage(ws, docName, data);
|
||||
});
|
||||
|
||||
// Handle disconnection
|
||||
ws.on('close', () => {
|
||||
clients.get(docName)?.delete(ws);
|
||||
console.log(`Client disconnected from ${docName}. Remaining clients: ${clients.get(docName)?.size || 0}`);
|
||||
});
|
||||
|
||||
console.log(`Client connected to ${docName}${userId ? ` (user: ${userId})` : ''}`);
|
||||
});
|
||||
|
||||
return server;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode sync step 1 (send document state)
|
||||
*/
|
||||
function encodeSyncStep1(state: Uint8Array): Uint8Array {
|
||||
const updateMsg = {
|
||||
type: 'sync',
|
||||
args: [state],
|
||||
};
|
||||
return new TextEncoder().encode(JSON.stringify(updateMsg));
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming WebSocket message
|
||||
*/
|
||||
function handleMessage(ws: WebSocketWithDoc, docName: string, data: Buffer | ArrayBuffer) {
|
||||
try {
|
||||
const message = JSON.parse(data.toString()) as Message;
|
||||
|
||||
switch (message.type) {
|
||||
case 'sync':
|
||||
handleSync(ws, docName, message);
|
||||
break;
|
||||
|
||||
case 'update':
|
||||
handleUpdate(ws, docName, message);
|
||||
break;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error handling message:', error);
|
||||
ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format' }));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle sync message
|
||||
*/
|
||||
function handleSync(ws: WebSocketWithDoc, docName: string, message: SyncStep1Message | SyncStep2Message) {
|
||||
const currentState = docs.get(docName) || new Uint8Array();
|
||||
|
||||
if (message.args.length === 1) {
|
||||
// Sync step 1: client sends its state, server responds with full state
|
||||
const clientState = message.args[0];
|
||||
|
||||
// Send full document state to client
|
||||
const response = encodeSyncStep1(currentState);
|
||||
ws.send(response);
|
||||
} else if (message.args.length === 2) {
|
||||
// Sync step 2: client sends its state, server sends missing updates
|
||||
const clientState = message.args[0];
|
||||
|
||||
// Calculate missing updates (simplified - in production use Yjs protocol)
|
||||
const missingUpdates = currentState;
|
||||
|
||||
const response = JSON.stringify({
|
||||
type: 'sync',
|
||||
args: [Array.from(missingUpdates)],
|
||||
});
|
||||
ws.send(new TextEncoder().encode(response));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle update message
|
||||
*/
|
||||
function handleUpdate(ws: WebSocketWithDoc, docName: string, message: UpdateMessage) {
|
||||
const update = message.args[0];
|
||||
let currentState = docs.get(docName) || new Uint8Array();
|
||||
|
||||
// Apply update to document state
|
||||
try {
|
||||
const doc = decode(currentState);
|
||||
applyUpdate(doc, update);
|
||||
currentState = encodeStateAsUpdate(doc);
|
||||
docs.set(docName, currentState);
|
||||
|
||||
console.log(`Update applied to ${docName}. Size: ${currentState.length} bytes`);
|
||||
|
||||
// Broadcast update to all other clients
|
||||
const broadcastMsg = JSON.stringify({
|
||||
type: 'update',
|
||||
args: [Array.from(update)],
|
||||
});
|
||||
|
||||
clients.get(docName)?.forEach(client => {
|
||||
if (client !== ws && client.readyState === WebSocket.OPEN) {
|
||||
client.send(new TextEncoder().encode(broadcastMsg));
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error applying update:', error);
|
||||
ws.send(JSON.stringify({ type: 'error', message: 'Failed to apply update' }));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get document stats (for monitoring)
|
||||
*/
|
||||
export function getDocStats(): Record<string, { clientCount: number; stateSize: number }> {
|
||||
const stats: Record<string, { clientCount: number; stateSize: number }> = {};
|
||||
|
||||
docs.forEach((state, docName) => {
|
||||
stats[docName] = {
|
||||
clientCount: clients.get(docName)?.size || 0,
|
||||
stateSize: state.length,
|
||||
};
|
||||
});
|
||||
|
||||
return stats;
|
||||
}
|
||||
Reference in New Issue
Block a user