From ad0215863d620ea88bde5b6e9b0518ba2c39f48b Mon Sep 17 00:00:00 2001 From: Till JS Date: Tue, 7 Apr 2026 14:57:21 +0200 Subject: [PATCH] perf(mana/web): pipeline SSE reads against sequential apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two improvements to the SSE event loop in connectSSE: 1. Read/apply pipelining The previous loop did read → parse → await applyServerChanges → read. A slow apply blocked the network reader, so each event incurred the latency of the previous event's IndexedDB write before the next chunk could even start streaming in. Now apply work is enqueued onto a sequential promise chain (applyChain) and the read loop returns to draining the network immediately. LWW correctness still requires in-order application, so the chain serialises applies — the win is just decoupling I/O from disk work, not parallelism. The chain is awaited once at the end so the SSE state never resumes from a cursor that hasn't been written. 2. Allocation-light parser indexOf/slice replaces split('\n\n') and split('\n'). The previous parser allocated a fresh array of strings on every chunk; the new one walks the rolling buffer in place and only materialises the one event block currently being inspected. Same complexity, less GC pressure on busy streams. Drive-by: tightens the JSON.parse error handling to skip malformed events explicitly instead of swallowing them inside an outer try. Verified: 20/20 sync.test.ts still passing. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/mana/apps/web/src/lib/data/sync.ts | 94 ++++++++++++++++--------- 1 file changed, 62 insertions(+), 32 deletions(-) diff --git a/apps/mana/apps/web/src/lib/data/sync.ts b/apps/mana/apps/web/src/lib/data/sync.ts index 220a93d4d..2e5fbb5f6 100644 --- a/apps/mana/apps/web/src/lib/data/sync.ts +++ b/apps/mana/apps/web/src/lib/data/sync.ts @@ -673,48 +673,78 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise = Promise.resolve(); + const enqueueApply = (work: () => Promise) => { + applyChain = applyChain.then(work).catch((err) => { + console.error('[mana-sync] SSE apply failed:', err); + }); + }; - buffer += decoder.decode(value, { stream: true }); - - // Parse SSE events from buffer - const events = buffer.split('\n\n'); - buffer = events.pop() ?? ''; // Keep incomplete last event - - for (const eventBlock of events) { + // Streaming parser: scan the rolling buffer for complete events + // (terminated by `\n\n`) using indexOf+slice instead of split. + // Each event block is sub-parsed line-by-line for `event:` and + // `data:` fields, then enqueued for application. + const flushCompleteEvents = () => { + while (true) { + const boundary = buffer.indexOf('\n\n'); + if (boundary === -1) return; + const eventBlock = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); if (!eventBlock.trim()) continue; let eventType = ''; let eventData = ''; - - for (const line of eventBlock.split('\n')) { - if (line.startsWith('event: ')) { - eventType = line.slice(7); - } else if (line.startsWith('data: ')) { - eventData = line.slice(6); - } + // Manual line walk avoids allocating an intermediate + // string array per event block. + let lineStart = 0; + while (lineStart <= eventBlock.length) { + const lineEnd = eventBlock.indexOf('\n', lineStart); + const line = + lineEnd === -1 ? eventBlock.slice(lineStart) : eventBlock.slice(lineStart, lineEnd); + if (line.startsWith('event: ')) eventType = line.slice(7); + else if (line.startsWith('data: ')) eventData = line.slice(6); + if (lineEnd === -1) break; + lineStart = lineEnd + 1; } - if (eventType === 'changes' && eventData) { - try { - const data = JSON.parse(eventData); - if (data.changes?.length > 0) { - await applyServerChanges(appId, data.changes); - } - if (data.syncedUntil && data.table) { - // Map backend table name to unified name for cursor storage - const unifiedTable = fromSyncName(appId, data.table); - await setSyncCursor(appId, unifiedTable, data.syncedUntil); - } - } catch { - // Ignore malformed event data - } + if (eventType !== 'changes' || !eventData) continue; + // Heartbeat / unknown event types fall through silently. + + let parsed: { changes?: unknown[]; syncedUntil?: string; table?: string }; + try { + parsed = JSON.parse(eventData); + } catch { + continue; // malformed event data — skip } - // heartbeat events are no-ops (keep connection alive) + + enqueueApply(async () => { + if (parsed.changes && parsed.changes.length > 0) { + await applyServerChanges(appId, parsed.changes); + } + if (parsed.syncedUntil && parsed.table) { + const unifiedTable = fromSyncName(appId, parsed.table); + await setSyncCursor(appId, unifiedTable, parsed.syncedUntil); + } + }); } + }; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + flushCompleteEvents(); } + + // Drain any final apply work before letting the connection + // settle into reconnect — otherwise the SSE state could resume + // from a cursor that hasn't actually been written yet. + await applyChain; } catch (err: unknown) { if (err instanceof Error && err.name === 'AbortError') return; channel.lastError = err instanceof Error ? err.message : 'SSE failed';