diff --git a/apps/manacore/apps/web/src/lib/data/sync.ts b/apps/manacore/apps/web/src/lib/data/sync.ts index c5a31811d..0f3d593e5 100644 --- a/apps/manacore/apps/web/src/lib/data/sync.ts +++ b/apps/manacore/apps/web/src/lib/data/sync.ts @@ -40,7 +40,6 @@ interface SyncMeta { interface SyncChannelState { appId: string; tables: string[]; - ws: WebSocket | null; pushTimer: ReturnType | null; pullTimer: ReturnType | null; lastError: string | null; @@ -53,8 +52,6 @@ export type SyncStatus = 'idle' | 'syncing' | 'error' | 'offline'; const PUSH_DEBOUNCE = 1000; const PULL_INTERVAL = 30_000; const WS_RECONNECT_DELAY = 5000; -const WS_AUTH_TIMEOUT = 10_000; - // ─── Unified Sync Manager ───────────────────────────────────── export function createUnifiedSync(serverUrl: string, getToken: () => Promise) { @@ -63,6 +60,7 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise void> = []; + let unifiedWs: WebSocket | null = null; // ─── Lifecycle ────────────────────────────────────────── @@ -71,7 +69,6 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise Promise {}); channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL); - - // Connect WebSocket for real-time push notifications - connectWs(appId); } + // Single unified WebSocket for all apps + connectUnifiedWs(); + // Listen for online/offline if (typeof window !== 'undefined') { window.addEventListener('online', handleOnline); @@ -97,14 +94,15 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise Promise { - channel.ws = ws; + unifiedWs = ws; // Authenticate — backend requires auth within 10 seconds const token = await getToken(); if (token && ws.readyState === WebSocket.OPEN) { @@ -260,9 +257,9 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise { try { const msg = JSON.parse(event.data); - if (msg.type === 'sync-available') { - // Server notifies us of new changes — trigger pull - pull(appId).catch(() => {}); + if (msg.type === 'sync-available' && msg.appId) { + // Server notifies us of changes for a specific app — pull only that app + pull(msg.appId).catch(() => {}); } } catch { // Ignore malformed messages @@ -270,10 +267,10 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise { - channel.ws = null; + unifiedWs = null; // Reconnect after delay - if (channels.has(appId) && online) { - setTimeout(() => connectWs(appId), WS_RECONNECT_DELAY); + if (channels.size > 0 && online) { + setTimeout(() => connectUnifiedWs(), WS_RECONNECT_DELAY); } }; @@ -427,19 +424,20 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise {}); - connectWs(appId); + } + // Reconnect unified WebSocket + if (!unifiedWs) { + connectUnifiedWs(); } } function handleOffline() { online = false; setStatus('offline'); - // Close all WebSockets - for (const channel of channels.values()) { - if (channel.ws) { - channel.ws.close(); - channel.ws = null; - } + // Close unified WebSocket + if (unifiedWs) { + unifiedWs.close(); + unifiedWs = null; } } diff --git a/services/mana-sync/CLAUDE.md b/services/mana-sync/CLAUDE.md index e29c9f32a..2a3979e0c 100644 --- a/services/mana-sync/CLAUDE.md +++ b/services/mana-sync/CLAUDE.md @@ -82,22 +82,32 @@ Header: X-Client-Id: chrome-tab-abc123 Header: Authorization: Bearer ``` -### WebSocket (GET /ws/{appId}) +### WebSocket — Unified (GET /ws) [Recommended] -Real-time notifications when other clients sync. Client must authenticate first. +Single connection per user. Receives notifications for all apps with `appId` in the payload. ``` CLIENT -> SERVER: { "type": "auth", "token": "" } SERVER -> CLIENT: { "type": "auth-ok" } // When another client syncs: -SERVER -> CLIENT: { "type": "sync-available", "tables": ["todos"] } +SERVER -> CLIENT: { "type": "sync-available", "appId": "todo", "tables": ["tasks"] } // Keepalive: CLIENT -> SERVER: { "type": "ping" } SERVER -> CLIENT: { "type": "pong" } ``` +### WebSocket — Legacy (GET /ws/{appId}) + +One connection per app. Only receives notifications for that specific app. Backward-compatible. + +``` +CLIENT -> SERVER: { "type": "auth", "token": "" } +SERVER -> CLIENT: { "type": "auth-ok" } +SERVER -> CLIENT: { "type": "sync-available", "appId": "todo", "tables": ["tasks"] } +``` + ## Conflict Resolution: Field-Level LWW Each field update carries a timestamp. When the same field is modified by multiple clients, the latest timestamp wins. @@ -119,7 +129,8 @@ Result: title="Buy eggs", completed=true (merged — different fields) |----------|--------|------|-------------| | `POST /sync/{appId}` | POST | JWT | Push changes, get server delta | | `GET /sync/{appId}/pull` | GET | JWT | Pull changes for a collection | -| `GET /ws/{appId}` | WS | JWT (in-band) | Real-time sync notifications | +| `GET /ws` | WS | JWT (in-band) | Unified real-time sync (all apps, one connection) | +| `GET /ws/{appId}` | WS | JWT (in-band) | Legacy per-app sync notifications | | `GET /health` | GET | No | Health check with connection stats | | `GET /metrics` | GET | No | Prometheus metrics | diff --git a/services/mana-sync/cmd/server/main.go b/services/mana-sync/cmd/server/main.go index dba32e779..5439348df 100644 --- a/services/mana-sync/cmd/server/main.go +++ b/services/mana-sync/cmd/server/main.go @@ -59,7 +59,12 @@ func main() { mux.HandleFunc("POST /sync/{appId}", handler.HandleSync) mux.HandleFunc("GET /sync/{appId}/pull", handler.HandlePull) - // WebSocket endpoint + // WebSocket endpoints + // Unified: one connection per user, receives all app notifications with appId in payload + mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + hub.HandleWebSocket(w, r, "") // empty appID = unified mode + }) + // Legacy: one connection per app (backward-compatible) mux.HandleFunc("/ws/{appId}", func(w http.ResponseWriter, r *http.Request) { appID := r.PathValue("appId") hub.HandleWebSocket(w, r, appID) diff --git a/services/mana-sync/internal/ws/hub.go b/services/mana-sync/internal/ws/hub.go index 7ab5a8a7c..1b677de28 100644 --- a/services/mana-sync/internal/ws/hub.go +++ b/services/mana-sync/internal/ws/hub.go @@ -15,6 +15,7 @@ import ( // Message types sent over WebSocket. type Message struct { Type string `json:"type"` + AppID string `json:"appId,omitempty"` Tables []string `json:"tables,omitempty"` Token string `json:"token,omitempty"` } @@ -45,6 +46,7 @@ func NewHub(validator *auth.Validator) *Hub { // HandleWebSocket upgrades an HTTP connection to WebSocket and registers the client. // The client must send an auth message with a valid JWT before receiving notifications. +// Supports both unified (/ws) and legacy per-app (/ws/{appId}) connections. func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, appID string) { conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ OriginPatterns: []string{"*"}, @@ -56,7 +58,7 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, appID stri ctx, cancel := context.WithCancel(r.Context()) client := &Client{ - AppID: appID, + AppID: appID, // empty for unified connections, set for legacy per-app Conn: conn, cancel: cancel, } @@ -67,6 +69,8 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, appID stri // NotifyUser sends a sync-available message to all connected clients of a user, // except the client that originated the change. +// For unified connections (AppID==""), all clients receive the notification with appId in the payload. +// For legacy per-app connections, only clients matching the appId are notified. func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string) { h.mu.RLock() clients, ok := h.clients[userID] @@ -78,7 +82,9 @@ func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string) // Copy the client set under read lock to avoid holding lock during writes clientsCopy := make([]*Client, 0, len(clients)) for client := range clients { - if client.AppID == appID { + // Unified clients (AppID=="") receive all notifications. + // Legacy per-app clients only receive notifications for their app. + if client.AppID == "" || client.AppID == appID { clientsCopy = append(clientsCopy, client) } } @@ -90,6 +96,7 @@ func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string) msg := Message{ Type: "sync-available", + AppID: appID, Tables: tables, } data, err := json.Marshal(msg) @@ -175,7 +182,11 @@ func (h *Hub) readLoop(ctx context.Context, client *Client) { ackData, _ := json.Marshal(ackMsg) client.Conn.Write(ctx, websocket.MessageText, ackData) - slog.Info("websocket authenticated", "userID", client.UserID, "appID", client.AppID) + mode := "unified" + if client.AppID != "" { + mode = "legacy:" + client.AppID + } + slog.Info("websocket authenticated", "userID", client.UserID, "mode", mode) case "ping": pongMsg := Message{Type: "pong"} @@ -221,7 +232,7 @@ func (h *Hub) addClient(client *Client) { } h.clients[client.UserID][client] = struct{}{} - slog.Info("client connected", "userID", client.UserID, "appID", client.AppID) + slog.Info("client connected", "userID", client.UserID, "appID", client.AppID, "unified", client.AppID == "") } func (h *Hub) removeClient(client *Client) {