feat(sync): replace WebSocket with SSE client for real-time sync

Client now connects to GET /sync/{appId}/stream via fetch + ReadableStream
instead of WebSocket + HTTP pull. Each app gets its own SSE connection that
delivers initial sync data + live updates in one persistent stream.

Changes:
- Remove WebSocket connection (connectUnifiedWs)
- Add connectSSE() per app using fetch + ReadableStream
- Parse SSE events (changes, heartbeat) from streamed response
- Auto-reconnect on disconnect with WS_RECONNECT_DELAY
- Fallback to polling if SSE endpoint not available
- ensureAppSynced() connects SSE for lazy apps on first visit
- handleOnline() reconnects all active SSE streams
- handleOffline() aborts all SSE connections

Benefits: 1 connection instead of 2 (WS + HTTP), data delivered instantly
without notification → pull round-trip, works through HTTP proxies/CDN.

Push (POST /sync/{appId}) remains unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-02 22:27:30 +02:00
parent 068a64b275
commit c8daa443fc

View file

@ -73,7 +73,7 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
let status: SyncStatus = 'idle';
let online = typeof navigator !== 'undefined' ? navigator.onLine : true;
let _statusListeners: Array<(s: SyncStatus) => void> = [];
let unifiedWs: WebSocket | null = null;
const sseAbortControllers = new Map<string, AbortController>();
// ─── Lifecycle ──────────────────────────────────────────
@ -90,16 +90,11 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
channels.set(appId, channel);
if (EAGER_APPS.has(appId)) {
// Eager: pull now + start periodic sync
pull(appId).catch(() => {});
channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL);
connectSSE(appId);
}
// Lazy apps: no pull until ensureAppSynced() is called
// Lazy apps: no SSE until ensureAppSynced() is called
}
// Single unified WebSocket for all apps
connectUnifiedWs();
// Listen for online/offline
if (typeof window !== 'undefined') {
window.addEventListener('online', handleOnline);
@ -112,14 +107,14 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
if (channel.pushTimer) clearTimeout(channel.pushTimer);
if (channel.pullTimer) clearInterval(channel.pullTimer);
}
// Abort all SSE connections
for (const [, controller] of sseAbortControllers) {
controller.abort();
}
sseAbortControllers.clear();
channels.clear();
_statusListeners = [];
if (unifiedWs) {
unifiedWs.close();
unifiedWs = null;
}
if (typeof window !== 'undefined') {
window.removeEventListener('online', handleOnline);
window.removeEventListener('offline', handleOffline);
@ -261,50 +256,103 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
}
}
// ─── WebSocket (unified — one connection for all apps) ──
// ─── SSE Stream (one per app — replaces WebSocket + Pull) ──
function connectUnifiedWs(): void {
async function connectSSE(appId: string): Promise<void> {
if (!online) return;
const wsUrl = serverUrl.replace(/^http/, 'ws') + '/ws';
const channel = channels.get(appId);
if (!channel) return;
// Abort existing SSE connection for this app
sseAbortControllers.get(appId)?.abort();
const token = await getToken();
if (!token) return;
// Build collections list (backend names)
const collections = channel.tables.map(toSyncName).join(',');
// Get oldest cursor across all collections for this app
const since = await getOldestSyncCursor(appId);
const controller = new AbortController();
sseAbortControllers.set(appId, controller);
try {
const ws = new WebSocket(wsUrl);
ws.onopen = async () => {
unifiedWs = ws;
// Authenticate — backend requires auth within 10 seconds
const token = await getToken();
if (token && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'auth', token }));
const res = await fetch(
`${serverUrl}/sync/${appId}/stream?collections=${encodeURIComponent(collections)}&since=${encodeURIComponent(since)}`,
{
headers: {
Authorization: `Bearer ${token}`,
'X-Client-Id': clientId,
Accept: 'text/event-stream',
},
signal: controller.signal,
}
};
);
ws.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
if (msg.type === 'sync-available' && msg.appId) {
// Server notifies us of changes for a specific app — pull only that app
pull(msg.appId).catch(() => {});
if (!res.ok || !res.body) {
// Fallback to polling if SSE not available
channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL);
return;
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE events from buffer
const events = buffer.split('\n\n');
buffer = events.pop() ?? ''; // Keep incomplete last event
for (const eventBlock of events) {
if (!eventBlock.trim()) continue;
let eventType = '';
let eventData = '';
for (const line of eventBlock.split('\n')) {
if (line.startsWith('event: ')) {
eventType = line.slice(7);
} else if (line.startsWith('data: ')) {
eventData = line.slice(6);
}
}
} catch {
// Ignore malformed messages
}
};
ws.onclose = () => {
unifiedWs = null;
// Reconnect after delay
if (channels.size > 0 && online) {
setTimeout(() => connectUnifiedWs(), WS_RECONNECT_DELAY);
if (eventType === 'changes' && eventData) {
try {
const data = JSON.parse(eventData);
if (data.changes?.length > 0) {
await applyServerChanges(appId, data.changes);
}
if (data.syncedUntil && data.table) {
// Map backend table name to unified name for cursor storage
const unifiedTable = fromSyncName(appId, data.table);
await setSyncCursor(appId, unifiedTable, data.syncedUntil);
}
} catch {
// Ignore malformed event data
}
}
// heartbeat events are no-ops (keep connection alive)
}
};
}
} catch (err: unknown) {
if (err instanceof Error && err.name === 'AbortError') return;
channel.lastError = err instanceof Error ? err.message : 'SSE failed';
}
ws.onerror = () => {
ws.close();
};
} catch {
// WebSocket not available or blocked
// Connection ended — reconnect after delay if still active
sseAbortControllers.delete(appId);
if (channels.has(appId) && online) {
setTimeout(() => connectSSE(appId), WS_RECONNECT_DELAY);
}
}
@ -447,24 +495,22 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
function handleOnline() {
online = true;
setStatus('idle');
// Resume sync for all channels
for (const appId of channels.keys()) {
pull(appId).catch(() => {});
}
// Reconnect unified WebSocket
if (!unifiedWs) {
connectUnifiedWs();
// Reconnect SSE streams for active channels
for (const [appId, channel] of channels) {
if (EAGER_APPS.has(appId) || channel.pullTimer) {
connectSSE(appId);
}
}
}
function handleOffline() {
online = false;
setStatus('offline');
// Close unified WebSocket
if (unifiedWs) {
unifiedWs.close();
unifiedWs = null;
// Abort all SSE connections
for (const [, controller] of sseAbortControllers) {
controller.abort();
}
sseAbortControllers.clear();
}
function setStatus(s: SyncStatus) {
@ -476,15 +522,15 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
/**
* Ensure a lazy app's collections are synced (called on module navigation).
* If already synced (has pullTimer), this is a no-op.
* Connects SSE stream if not already active.
*/
function ensureAppSynced(appId: string): void {
const channel = channels.get(appId);
if (!channel || channel.pullTimer) return; // Already active
if (!channel) return;
// Already connected via SSE
if (sseAbortControllers.has(appId)) return;
// Start sync for this lazy app
pull(appId).catch(() => {});
channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL);
connectSSE(appId);
}
return {