mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-15 06:01:09 +02:00
perf(mana/web): pipeline SSE reads against sequential apply
Two improvements to the SSE event loop in connectSSE:
1. Read/apply pipelining
The previous loop did read → parse → await applyServerChanges →
read. A slow apply blocked the network reader, so each event
incurred the latency of the previous event's IndexedDB write
before the next chunk could even start streaming in.
Now apply work is enqueued onto a sequential promise chain
(applyChain) and the read loop returns to draining the network
immediately. LWW correctness still requires in-order application,
so the chain serialises applies — the win is just decoupling I/O
from disk work, not parallelism. The chain is awaited once at the
end so the SSE state never resumes from a cursor that hasn't been
written.
2. Allocation-light parser
indexOf/slice replaces split('\n\n') and split('\n'). The previous
parser allocated a fresh array of strings on every chunk; the new
one walks the rolling buffer in place and only materialises the
one event block currently being inspected. Same complexity, less
GC pressure on busy streams.
Drive-by: tightens the JSON.parse error handling to skip malformed
events explicitly instead of swallowing them inside an outer try.
Verified: 20/20 sync.test.ts still passing.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
333855c502
commit
ad0215863d
1 changed files with 62 additions and 32 deletions
|
|
@ -673,48 +673,78 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
// Apply queue: events are parsed and apply()ed sequentially in
|
||||
// the background while the reader keeps draining the network.
|
||||
// LWW correctness still requires in-order application, so we
|
||||
// chain promises instead of running them concurrently. The win
|
||||
// is throughput — read() no longer blocks on a slow apply.
|
||||
let applyChain: Promise<void> = Promise.resolve();
|
||||
const enqueueApply = (work: () => Promise<void>) => {
|
||||
applyChain = applyChain.then(work).catch((err) => {
|
||||
console.error('[mana-sync] SSE apply failed:', err);
|
||||
});
|
||||
};
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Parse SSE events from buffer
|
||||
const events = buffer.split('\n\n');
|
||||
buffer = events.pop() ?? ''; // Keep incomplete last event
|
||||
|
||||
for (const eventBlock of events) {
|
||||
// Streaming parser: scan the rolling buffer for complete events
|
||||
// (terminated by `\n\n`) using indexOf+slice instead of split.
|
||||
// Each event block is sub-parsed line-by-line for `event:` and
|
||||
// `data:` fields, then enqueued for application.
|
||||
const flushCompleteEvents = () => {
|
||||
while (true) {
|
||||
const boundary = buffer.indexOf('\n\n');
|
||||
if (boundary === -1) return;
|
||||
const eventBlock = buffer.slice(0, boundary);
|
||||
buffer = buffer.slice(boundary + 2);
|
||||
if (!eventBlock.trim()) continue;
|
||||
|
||||
let eventType = '';
|
||||
let eventData = '';
|
||||
|
||||
for (const line of eventBlock.split('\n')) {
|
||||
if (line.startsWith('event: ')) {
|
||||
eventType = line.slice(7);
|
||||
} else if (line.startsWith('data: ')) {
|
||||
eventData = line.slice(6);
|
||||
}
|
||||
// Manual line walk avoids allocating an intermediate
|
||||
// string array per event block.
|
||||
let lineStart = 0;
|
||||
while (lineStart <= eventBlock.length) {
|
||||
const lineEnd = eventBlock.indexOf('\n', lineStart);
|
||||
const line =
|
||||
lineEnd === -1 ? eventBlock.slice(lineStart) : eventBlock.slice(lineStart, lineEnd);
|
||||
if (line.startsWith('event: ')) eventType = line.slice(7);
|
||||
else if (line.startsWith('data: ')) eventData = line.slice(6);
|
||||
if (lineEnd === -1) break;
|
||||
lineStart = lineEnd + 1;
|
||||
}
|
||||
|
||||
if (eventType === 'changes' && eventData) {
|
||||
try {
|
||||
const data = JSON.parse(eventData);
|
||||
if (data.changes?.length > 0) {
|
||||
await applyServerChanges(appId, data.changes);
|
||||
}
|
||||
if (data.syncedUntil && data.table) {
|
||||
// Map backend table name to unified name for cursor storage
|
||||
const unifiedTable = fromSyncName(appId, data.table);
|
||||
await setSyncCursor(appId, unifiedTable, data.syncedUntil);
|
||||
}
|
||||
} catch {
|
||||
// Ignore malformed event data
|
||||
}
|
||||
if (eventType !== 'changes' || !eventData) continue;
|
||||
// Heartbeat / unknown event types fall through silently.
|
||||
|
||||
let parsed: { changes?: unknown[]; syncedUntil?: string; table?: string };
|
||||
try {
|
||||
parsed = JSON.parse(eventData);
|
||||
} catch {
|
||||
continue; // malformed event data — skip
|
||||
}
|
||||
// heartbeat events are no-ops (keep connection alive)
|
||||
|
||||
enqueueApply(async () => {
|
||||
if (parsed.changes && parsed.changes.length > 0) {
|
||||
await applyServerChanges(appId, parsed.changes);
|
||||
}
|
||||
if (parsed.syncedUntil && parsed.table) {
|
||||
const unifiedTable = fromSyncName(appId, parsed.table);
|
||||
await setSyncCursor(appId, unifiedTable, parsed.syncedUntil);
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
flushCompleteEvents();
|
||||
}
|
||||
|
||||
// Drain any final apply work before letting the connection
|
||||
// settle into reconnect — otherwise the SSE state could resume
|
||||
// from a cursor that hasn't actually been written yet.
|
||||
await applyChain;
|
||||
} catch (err: unknown) {
|
||||
if (err instanceof Error && err.name === 'AbortError') return;
|
||||
channel.lastError = err instanceof Error ? err.message : 'SSE failed';
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue