From fed38efb8b02a097674edce6eee59fa2026b8638 Mon Sep 17 00:00:00 2001 From: Till JS Date: Thu, 2 Apr 2026 23:39:46 +0200 Subject: [PATCH] =?UTF-8?q?fix(sync):=20fix=20SSE=20live=20updates=20?= =?UTF-8?q?=E2=80=94=202=20bugs=20found=20during=20E2E=20testing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug 1: NotifyUser() early-returned when no WebSocket clients existed, skipping SSE subscriber notifications entirely. Fixed by restructuring to check WS clients and SSE subscribers independently. Bug 2: SSE stream cursor defaulted to client's `since` parameter when no initial data existed. If `since` was in the future (or very recent), live updates had created_at < cursor and were silently filtered out. Fixed by defaulting cursor to now() when no initial data is returned. Bug 3: NotifyUser used original sseSubs slice instead of sseSubsCopy after releasing the read lock (race condition). Verified E2E: Push from client A → SSE stream on client B receives live change event with correct data within ~1 second. Co-Authored-By: Claude Opus 4.6 (1M context) --- services/mana-sync/internal/sync/handler.go | 9 ++- services/mana-sync/internal/ws/hub.go | 70 ++++++++++----------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/services/mana-sync/internal/sync/handler.go b/services/mana-sync/internal/sync/handler.go index 7c53c38ef..d17617f01 100644 --- a/services/mana-sync/internal/sync/handler.go +++ b/services/mana-sync/internal/sync/handler.go @@ -164,7 +164,7 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) { SyncedUntil: now, } - // Notify other connected clients via WebSocket + // Notify other connected clients via WebSocket/SSE if len(affectedTables) > 0 { tables := make([]string, 0, len(affectedTables)) for t := range affectedTables { @@ -326,7 +326,8 @@ func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) { } } - // Track cursors per collection + // Track cursors per collection — default to now() if no initial data + now := time.Now().UTC().Format(time.RFC3339Nano) cursors := make(map[string]string) for _, coll := range collections { cursors[coll] = since @@ -337,6 +338,7 @@ func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) { changes, err := h.store.GetChangesSince(ctx, userID, appID, coll, since, clientID, batchLimit+1) if err != nil { slog.Error("SSE initial pull failed", "error", err, "collection", coll) + cursors[coll] = now // Default to now on error continue } @@ -350,6 +352,9 @@ func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) { cursors[coll] = cursor sendChangeEvent(w, coll, h.convertChanges(changes), cursor, hasMore) flusher.Flush() + } else { + // No initial data — set cursor to now so live updates work + cursors[coll] = now } } diff --git a/services/mana-sync/internal/ws/hub.go b/services/mana-sync/internal/ws/hub.go index 58c33b545..dc382f875 100644 --- a/services/mana-sync/internal/ws/hub.go +++ b/services/mana-sync/internal/ws/hub.go @@ -75,60 +75,57 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, appID stri // For legacy per-app connections, only clients matching the appId are notified. func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string) { h.mu.RLock() - clients, ok := h.clients[userID] - if !ok { - h.mu.RUnlock() - return - } + clients := h.clients[userID] + sseSubs := h.sseSubscribers[userID] - // Copy the client set under read lock to avoid holding lock during writes - clientsCopy := make([]*Client, 0, len(clients)) + // Copy WS clients under read lock + var clientsCopy []*Client for client := range clients { - // Unified clients (AppID=="") receive all notifications. - // Legacy per-app clients only receive notifications for their app. if client.AppID == "" || client.AppID == appID { clientsCopy = append(clientsCopy, client) } } + + // Copy SSE subscribers + sseSubsCopy := make([]chan Notification, len(sseSubs)) + copy(sseSubsCopy, sseSubs) h.mu.RUnlock() - if len(clientsCopy) == 0 { + // Nothing to notify + if len(clientsCopy) == 0 && len(sseSubsCopy) == 0 { return } - msg := Message{ - Type: "sync-available", - AppID: appID, - Tables: tables, - } - data, err := json.Marshal(msg) - if err != nil { - slog.Error("failed to marshal notification", "error", err) - return - } - - for _, client := range clientsCopy { - go func(c *Client) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := c.Conn.Write(ctx, websocket.MessageText, data); err != nil { - h.removeClient(c) + // Notify WebSocket clients + if len(clientsCopy) > 0 { + msg := Message{ + Type: "sync-available", + AppID: appID, + Tables: tables, + } + data, err := json.Marshal(msg) + if err == nil { + for _, client := range clientsCopy { + go func(c *Client) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.Conn.Write(ctx, websocket.MessageText, data); err != nil { + h.removeClient(c) + } + }(client) } - }(client) + } } - // Also notify SSE subscribers - h.mu.RLock() - sseSubs := h.sseSubscribers[userID] - h.mu.RUnlock() - - if len(sseSubs) > 0 { + // Notify SSE subscribers + if len(sseSubsCopy) > 0 { notification := Notification{AppID: appID, Tables: tables} - for _, ch := range sseSubs { + for _, ch := range sseSubsCopy { select { case ch <- notification: + // sent default: - // Drop if channel full (subscriber is slow) + slog.Warn("SSE notification dropped (channel full)", "appID", appID) } } } @@ -286,6 +283,7 @@ func (h *Hub) Subscribe(userID string) chan Notification { h.sseSubscribers = make(map[string][]chan Notification) } h.sseSubscribers[userID] = append(h.sseSubscribers[userID], ch) + slog.Debug("SSE subscribed", "userID", userID, "totalSubscribers", len(h.sseSubscribers[userID])) return ch }