- Create TypeScript and Vite configuration for SolidJS - Implement Yjs document structure for screenplay collaboration - Build WebSocket connection manager with exponential backoff reconnection - Create CRDT document manager with undo/redo support - Set up WebSocket sync server with JWT authentication - Add SolidJS reactive bindings for Yjs shared types - Build collaborative editor component - Write unit tests for CRDT operations - Document implementation in analysis/fre600_websocket_foundation.md Architecture: Yjs chosen over Automerge for better ecosystem and Tauri compatibility. WebSocket for sync, WebRTC for video. Co-Authored-By: Paperclip <noreply@paperclip.ing>
216 lines
5.9 KiB
TypeScript
216 lines
5.9 KiB
TypeScript
/**
|
|
* WebSocket Server for Yjs CRDT Sync
|
|
* Node.js server using y-websocket adapter
|
|
*/
|
|
|
|
import { WebSocketServer } from 'ws';
|
|
import { applyUpdate, encodeStateAsUpdate } from 'yjs';
|
|
import { decode } from 'yjs/lib/index.js';
|
|
|
|
type DocMessage = {
|
|
type: 'sync';
|
|
args: [Uint8Array];
|
|
};
|
|
|
|
type SyncStep1Message = {
|
|
type: 'sync';
|
|
args: [Uint8Array];
|
|
};
|
|
|
|
type SyncStep2Message = {
|
|
type: 'sync';
|
|
args: [Uint8Array, Uint8Array];
|
|
};
|
|
|
|
type UpdateMessage = {
|
|
type: 'update';
|
|
args: [Uint8Array];
|
|
};
|
|
|
|
export type Message = DocMessage | SyncStep1Message | SyncStep2Message | UpdateMessage;
|
|
|
|
// Store document states in memory (in production, use Redis or persistent storage)
|
|
const docs: Map<string, Uint8Array> = new Map();
|
|
const clients: Map<string, Set<WebSocket>> = new Map();
|
|
|
|
interface WebSocketWithDoc extends WebSocket {
|
|
docName?: string;
|
|
}
|
|
|
|
/**
|
|
* Initialize the WebSocket server
|
|
*/
|
|
export function createWebSocketServer(
|
|
port: number,
|
|
options: {
|
|
authMiddleware?: (token: string) => Promise<{ userId: string; projectId: string }>;
|
|
} = {}
|
|
): WebSocketServer {
|
|
const { authMiddleware } = options;
|
|
|
|
const server = new WebSocketServer({ port });
|
|
|
|
server.on('connection', async (ws: WebSocketWithDoc, req) => {
|
|
// Extract document name from URL query params
|
|
const url = new URL(req.url || '', `http://${req.headers.host}`);
|
|
const docName = url.pathname.split('/').pop() || 'default';
|
|
|
|
// Authenticate connection if auth middleware provided
|
|
const token = url.searchParams.get('token');
|
|
let userId: string | undefined;
|
|
|
|
if (authMiddleware && token) {
|
|
try {
|
|
const auth = await authMiddleware(token);
|
|
userId = auth.userId;
|
|
console.log(`WebSocket connection authenticated: ${userId} for ${docName}`);
|
|
} catch (error) {
|
|
console.error('Authentication failed:', error);
|
|
ws.send(JSON.stringify({ type: 'error', message: 'Authentication failed' }));
|
|
ws.close();
|
|
return;
|
|
}
|
|
}
|
|
|
|
ws.docName = docName;
|
|
|
|
// Initialize document state if not exists
|
|
if (!docs.has(docName)) {
|
|
docs.set(docName, new Uint8Array());
|
|
clients.set(docName, new Set());
|
|
}
|
|
|
|
// Add client to the document's client set
|
|
clients.get(docName)!.add(ws);
|
|
|
|
// Send initial sync
|
|
const initialState = docs.get(docName)!;
|
|
ws.send(encodeSyncStep1(initialState));
|
|
|
|
// Handle incoming messages
|
|
ws.on('message', (data) => {
|
|
handleMessage(ws, docName, data);
|
|
});
|
|
|
|
// Handle disconnection
|
|
ws.on('close', () => {
|
|
clients.get(docName)?.delete(ws);
|
|
console.log(`Client disconnected from ${docName}. Remaining clients: ${clients.get(docName)?.size || 0}`);
|
|
});
|
|
|
|
console.log(`Client connected to ${docName}${userId ? ` (user: ${userId})` : ''}`);
|
|
});
|
|
|
|
return server;
|
|
}
|
|
|
|
/**
|
|
* Encode sync step 1 (send document state)
|
|
*/
|
|
function encodeSyncStep1(state: Uint8Array): Uint8Array {
|
|
const updateMsg = {
|
|
type: 'sync',
|
|
args: [state],
|
|
};
|
|
return new TextEncoder().encode(JSON.stringify(updateMsg));
|
|
}
|
|
|
|
/**
|
|
* Handle incoming WebSocket message
|
|
*/
|
|
function handleMessage(ws: WebSocketWithDoc, docName: string, data: Buffer | ArrayBuffer) {
|
|
try {
|
|
const message = JSON.parse(data.toString()) as Message;
|
|
|
|
switch (message.type) {
|
|
case 'sync':
|
|
handleSync(ws, docName, message);
|
|
break;
|
|
|
|
case 'update':
|
|
handleUpdate(ws, docName, message);
|
|
break;
|
|
}
|
|
} catch (error) {
|
|
console.error('Error handling message:', error);
|
|
ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format' }));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle sync message
|
|
*/
|
|
function handleSync(ws: WebSocketWithDoc, docName: string, message: SyncStep1Message | SyncStep2Message) {
|
|
const currentState = docs.get(docName) || new Uint8Array();
|
|
|
|
if (message.args.length === 1) {
|
|
// Sync step 1: client sends its state, server responds with full state
|
|
const clientState = message.args[0];
|
|
|
|
// Send full document state to client
|
|
const response = encodeSyncStep1(currentState);
|
|
ws.send(response);
|
|
} else if (message.args.length === 2) {
|
|
// Sync step 2: client sends its state, server sends missing updates
|
|
const clientState = message.args[0];
|
|
|
|
// Calculate missing updates (simplified - in production use Yjs protocol)
|
|
const missingUpdates = currentState;
|
|
|
|
const response = JSON.stringify({
|
|
type: 'sync',
|
|
args: [Array.from(missingUpdates)],
|
|
});
|
|
ws.send(new TextEncoder().encode(response));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle update message
|
|
*/
|
|
function handleUpdate(ws: WebSocketWithDoc, docName: string, message: UpdateMessage) {
|
|
const update = message.args[0];
|
|
let currentState = docs.get(docName) || new Uint8Array();
|
|
|
|
// Apply update to document state
|
|
try {
|
|
const doc = decode(currentState);
|
|
applyUpdate(doc, update);
|
|
currentState = encodeStateAsUpdate(doc);
|
|
docs.set(docName, currentState);
|
|
|
|
console.log(`Update applied to ${docName}. Size: ${currentState.length} bytes`);
|
|
|
|
// Broadcast update to all other clients
|
|
const broadcastMsg = JSON.stringify({
|
|
type: 'update',
|
|
args: [Array.from(update)],
|
|
});
|
|
|
|
clients.get(docName)?.forEach(client => {
|
|
if (client !== ws && client.readyState === WebSocket.OPEN) {
|
|
client.send(new TextEncoder().encode(broadcastMsg));
|
|
}
|
|
});
|
|
} catch (error) {
|
|
console.error('Error applying update:', error);
|
|
ws.send(JSON.stringify({ type: 'error', message: 'Failed to apply update' }));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get document stats (for monitoring)
|
|
*/
|
|
export function getDocStats(): Record<string, { clientCount: number; stateSize: number }> {
|
|
const stats: Record<string, { clientCount: number; stateSize: number }> = {};
|
|
|
|
docs.forEach((state, docName) => {
|
|
stats[docName] = {
|
|
clientCount: clients.get(docName)?.size || 0,
|
|
stateSize: state.length,
|
|
};
|
|
});
|
|
|
|
return stats;
|
|
}
|