diff --git a/apps/mana/apps/web/src/lib/data/sync.ts b/apps/mana/apps/web/src/lib/data/sync.ts index 220a93d4d..2e5fbb5f6 100644 --- a/apps/mana/apps/web/src/lib/data/sync.ts +++ b/apps/mana/apps/web/src/lib/data/sync.ts @@ -673,48 +673,78 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise = Promise.resolve(); + const enqueueApply = (work: () => Promise) => { + applyChain = applyChain.then(work).catch((err) => { + console.error('[mana-sync] SSE apply failed:', err); + }); + }; - buffer += decoder.decode(value, { stream: true }); - - // Parse SSE events from buffer - const events = buffer.split('\n\n'); - buffer = events.pop() ?? ''; // Keep incomplete last event - - for (const eventBlock of events) { + // Streaming parser: scan the rolling buffer for complete events + // (terminated by `\n\n`) using indexOf+slice instead of split. + // Each event block is sub-parsed line-by-line for `event:` and + // `data:` fields, then enqueued for application. + const flushCompleteEvents = () => { + while (true) { + const boundary = buffer.indexOf('\n\n'); + if (boundary === -1) return; + const eventBlock = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); if (!eventBlock.trim()) continue; let eventType = ''; let eventData = ''; - - for (const line of eventBlock.split('\n')) { - if (line.startsWith('event: ')) { - eventType = line.slice(7); - } else if (line.startsWith('data: ')) { - eventData = line.slice(6); - } + // Manual line walk avoids allocating an intermediate + // string array per event block. + let lineStart = 0; + while (lineStart <= eventBlock.length) { + const lineEnd = eventBlock.indexOf('\n', lineStart); + const line = + lineEnd === -1 ? eventBlock.slice(lineStart) : eventBlock.slice(lineStart, lineEnd); + if (line.startsWith('event: ')) eventType = line.slice(7); + else if (line.startsWith('data: ')) eventData = line.slice(6); + if (lineEnd === -1) break; + lineStart = lineEnd + 1; } - if (eventType === 'changes' && eventData) { - try { - const data = JSON.parse(eventData); - if (data.changes?.length > 0) { - await applyServerChanges(appId, data.changes); - } - if (data.syncedUntil && data.table) { - // Map backend table name to unified name for cursor storage - const unifiedTable = fromSyncName(appId, data.table); - await setSyncCursor(appId, unifiedTable, data.syncedUntil); - } - } catch { - // Ignore malformed event data - } + if (eventType !== 'changes' || !eventData) continue; + // Heartbeat / unknown event types fall through silently. + + let parsed: { changes?: unknown[]; syncedUntil?: string; table?: string }; + try { + parsed = JSON.parse(eventData); + } catch { + continue; // malformed event data — skip } - // heartbeat events are no-ops (keep connection alive) + + enqueueApply(async () => { + if (parsed.changes && parsed.changes.length > 0) { + await applyServerChanges(appId, parsed.changes); + } + if (parsed.syncedUntil && parsed.table) { + const unifiedTable = fromSyncName(appId, parsed.table); + await setSyncCursor(appId, unifiedTable, parsed.syncedUntil); + } + }); } + }; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + flushCompleteEvents(); } + + // Drain any final apply work before letting the connection + // settle into reconnect — otherwise the SSE state could resume + // from a cursor that hasn't actually been written yet. + await applyChain; } catch (err: unknown) { if (err instanceof Error && err.name === 'AbortError') return; channel.lastError = err instanceof Error ? err.message : 'SSE failed';