diff --git a/apps/manacore/apps/web/src/lib/data/sync.ts b/apps/manacore/apps/web/src/lib/data/sync.ts index 3e0f6ab22..5f2720be8 100644 --- a/apps/manacore/apps/web/src/lib/data/sync.ts +++ b/apps/manacore/apps/web/src/lib/data/sync.ts @@ -73,7 +73,7 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise void> = []; - let unifiedWs: WebSocket | null = null; + const sseAbortControllers = new Map(); // ─── Lifecycle ────────────────────────────────────────── @@ -90,16 +90,11 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise {}); - channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL); + connectSSE(appId); } - // Lazy apps: no pull until ensureAppSynced() is called + // Lazy apps: no SSE until ensureAppSynced() is called } - // Single unified WebSocket for all apps - connectUnifiedWs(); - // Listen for online/offline if (typeof window !== 'undefined') { window.addEventListener('online', handleOnline); @@ -112,14 +107,14 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise Promise { if (!online) return; - const wsUrl = serverUrl.replace(/^http/, 'ws') + '/ws'; + const channel = channels.get(appId); + if (!channel) return; + + // Abort existing SSE connection for this app + sseAbortControllers.get(appId)?.abort(); + + const token = await getToken(); + if (!token) return; + + // Build collections list (backend names) + const collections = channel.tables.map(toSyncName).join(','); + + // Get oldest cursor across all collections for this app + const since = await getOldestSyncCursor(appId); + + const controller = new AbortController(); + sseAbortControllers.set(appId, controller); try { - const ws = new WebSocket(wsUrl); - - ws.onopen = async () => { - unifiedWs = ws; - // Authenticate — backend requires auth within 10 seconds - const token = await getToken(); - if (token && ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify({ type: 'auth', token })); + const res = await fetch( + `${serverUrl}/sync/${appId}/stream?collections=${encodeURIComponent(collections)}&since=${encodeURIComponent(since)}`, + { + headers: { + Authorization: `Bearer ${token}`, + 'X-Client-Id': clientId, + Accept: 'text/event-stream', + }, + signal: controller.signal, } - }; + ); - ws.onmessage = (event) => { - try { - const msg = JSON.parse(event.data); - if (msg.type === 'sync-available' && msg.appId) { - // Server notifies us of changes for a specific app — pull only that app - pull(msg.appId).catch(() => {}); + if (!res.ok || !res.body) { + // Fallback to polling if SSE not available + channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL); + return; + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Parse SSE events from buffer + const events = buffer.split('\n\n'); + buffer = events.pop() ?? ''; // Keep incomplete last event + + for (const eventBlock of events) { + if (!eventBlock.trim()) continue; + + let eventType = ''; + let eventData = ''; + + for (const line of eventBlock.split('\n')) { + if (line.startsWith('event: ')) { + eventType = line.slice(7); + } else if (line.startsWith('data: ')) { + eventData = line.slice(6); + } } - } catch { - // Ignore malformed messages - } - }; - ws.onclose = () => { - unifiedWs = null; - // Reconnect after delay - if (channels.size > 0 && online) { - setTimeout(() => connectUnifiedWs(), WS_RECONNECT_DELAY); + if (eventType === 'changes' && eventData) { + try { + const data = JSON.parse(eventData); + if (data.changes?.length > 0) { + await applyServerChanges(appId, data.changes); + } + if (data.syncedUntil && data.table) { + // Map backend table name to unified name for cursor storage + const unifiedTable = fromSyncName(appId, data.table); + await setSyncCursor(appId, unifiedTable, data.syncedUntil); + } + } catch { + // Ignore malformed event data + } + } + // heartbeat events are no-ops (keep connection alive) } - }; + } + } catch (err: unknown) { + if (err instanceof Error && err.name === 'AbortError') return; + channel.lastError = err instanceof Error ? err.message : 'SSE failed'; + } - ws.onerror = () => { - ws.close(); - }; - } catch { - // WebSocket not available or blocked + // Connection ended — reconnect after delay if still active + sseAbortControllers.delete(appId); + if (channels.has(appId) && online) { + setTimeout(() => connectSSE(appId), WS_RECONNECT_DELAY); } } @@ -447,24 +495,22 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise {}); - } - // Reconnect unified WebSocket - if (!unifiedWs) { - connectUnifiedWs(); + // Reconnect SSE streams for active channels + for (const [appId, channel] of channels) { + if (EAGER_APPS.has(appId) || channel.pullTimer) { + connectSSE(appId); + } } } function handleOffline() { online = false; setStatus('offline'); - // Close unified WebSocket - if (unifiedWs) { - unifiedWs.close(); - unifiedWs = null; + // Abort all SSE connections + for (const [, controller] of sseAbortControllers) { + controller.abort(); } + sseAbortControllers.clear(); } function setStatus(s: SyncStatus) { @@ -476,15 +522,15 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise {}); - channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL); + connectSSE(appId); } return {