diff --git a/services/mana-sync/cmd/server/main.go b/services/mana-sync/cmd/server/main.go index 5439348df..545e12fcc 100644 --- a/services/mana-sync/cmd/server/main.go +++ b/services/mana-sync/cmd/server/main.go @@ -58,6 +58,7 @@ func main() { // Sync endpoints (Go 1.22+ routing patterns) mux.HandleFunc("POST /sync/{appId}", handler.HandleSync) mux.HandleFunc("GET /sync/{appId}/pull", handler.HandlePull) + mux.HandleFunc("GET /sync/{appId}/stream", handler.HandleStream) // WebSocket endpoints // Unified: one connection per user, receives all app notifications with appId in payload @@ -106,7 +107,7 @@ func main() { Addr: fmt.Sprintf(":%d", cfg.Port), Handler: c.Handler(mux), ReadTimeout: 15 * time.Second, - WriteTimeout: 30 * time.Second, + WriteTimeout: 0, // Disabled for SSE streaming (long-lived connections) IdleTimeout: 120 * time.Second, } diff --git a/services/mana-sync/internal/sync/handler.go b/services/mana-sync/internal/sync/handler.go index 40d239eb9..7c53c38ef 100644 --- a/services/mana-sync/internal/sync/handler.go +++ b/services/mana-sync/internal/sync/handler.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "net/http" + "strings" "time" "github.com/manacore/mana-sync/internal/auth" @@ -270,3 +271,160 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(resp) } + +// HandleStream provides a Server-Sent Events (SSE) endpoint that streams +// changes to the client in real-time. Replaces the WebSocket notification + +// HTTP pull round-trip with a single persistent connection. +// +// GET /sync/{appId}/stream?collections=tasks,projects&since=2024-01-01T10:00:00Z +func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + userID, err := h.validator.UserIDFromRequest(r) + if err != nil { + http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized) + return + } + + appID := r.PathValue("appId") + if appID == "" { + http.Error(w, "missing appId", http.StatusBadRequest) + return + } + + clientID := r.Header.Get("X-Client-Id") + since := r.URL.Query().Get("since") + if since == "" { + since = "1970-01-01T00:00:00.000Z" + } + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + // SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + ctx := r.Context() + const batchLimit = 1000 + + // Parse requested collections + var collections []string + if q := r.URL.Query().Get("collections"); q != "" { + for _, c := range strings.Split(q, ",") { + c = strings.TrimSpace(c) + if c != "" { + collections = append(collections, c) + } + } + } + + // Track cursors per collection + cursors := make(map[string]string) + for _, coll := range collections { + cursors[coll] = since + } + + // Initial sync: send pending changes since cursor for each collection + for _, coll := range collections { + changes, err := h.store.GetChangesSince(ctx, userID, appID, coll, since, clientID, batchLimit+1) + if err != nil { + slog.Error("SSE initial pull failed", "error", err, "collection", coll) + continue + } + + hasMore := len(changes) > batchLimit + if hasMore { + changes = changes[:batchLimit] + } + + if len(changes) > 0 { + cursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano) + cursors[coll] = cursor + sendChangeEvent(w, coll, h.convertChanges(changes), cursor, hasMore) + flusher.Flush() + } + } + + // Subscribe to hub notifications for real-time updates + ch := h.hub.Subscribe(userID) + defer h.hub.Unsubscribe(userID, ch) + + heartbeat := time.NewTicker(30 * time.Second) + defer heartbeat.Stop() + + for { + select { + case notification := <-ch: + if notification.AppID != appID { + continue + } + for _, table := range notification.Tables { + cursor := cursors[table] + if cursor == "" { + cursor = since + } + changes, err := h.store.GetChangesSince(ctx, userID, appID, table, cursor, clientID, batchLimit+1) + if err != nil || len(changes) == 0 { + continue + } + hasMore := len(changes) > batchLimit + if hasMore { + changes = changes[:batchLimit] + } + newCursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano) + cursors[table] = newCursor + sendChangeEvent(w, table, h.convertChanges(changes), newCursor, hasMore) + flusher.Flush() + } + + case <-heartbeat.C: + fmt.Fprint(w, "event: heartbeat\ndata: {}\n\n") + flusher.Flush() + + case <-ctx.Done(): + return + } + } +} + +// convertChanges transforms store rows into sync Change objects. +func (h *Handler) convertChanges(rows []store.ChangeRow) []Change { + changes := make([]Change, 0, len(rows)) + for _, row := range rows { + c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op} + switch row.Op { + case "insert": + c.Data = row.Data + case "update": + c.Fields = make(map[string]*FieldChange) + for field, ts := range row.FieldTimestamps { + if value, ok := row.Data[field]; ok { + c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts} + } + } + case "delete": + if deletedAt, ok := row.Data["deletedAt"].(string); ok { + c.DeletedAt = &deletedAt + } + } + changes = append(changes, c) + } + return changes +} + +func sendChangeEvent(w http.ResponseWriter, table string, changes []Change, syncedUntil string, hasMore bool) { + event := map[string]any{ + "table": table, "changes": changes, + "syncedUntil": syncedUntil, "hasMore": hasMore, + } + data, _ := json.Marshal(event) + fmt.Fprintf(w, "event: changes\ndata: %s\n\n", data) +} diff --git a/services/mana-sync/internal/ws/hub.go b/services/mana-sync/internal/ws/hub.go index 1b677de28..58c33b545 100644 --- a/services/mana-sync/internal/ws/hub.go +++ b/services/mana-sync/internal/ws/hub.go @@ -28,12 +28,14 @@ type Client struct { cancel context.CancelFunc } -// Hub manages WebSocket connections and broadcasts sync notifications. +// Hub manages WebSocket connections, SSE subscribers, and broadcasts sync notifications. type Hub struct { - // clients maps userID -> set of clients - clients map[string]map[*Client]struct{} - mu sync.RWMutex - validator *auth.Validator + // clients maps userID -> set of WebSocket clients + clients map[string]map[*Client]struct{} + // sseSubscribers maps userID -> set of SSE notification channels + sseSubscribers map[string][]chan Notification + mu sync.RWMutex + validator *auth.Validator } // NewHub creates a new WebSocket hub. @@ -114,6 +116,22 @@ func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string) } }(client) } + + // Also notify SSE subscribers + h.mu.RLock() + sseSubs := h.sseSubscribers[userID] + h.mu.RUnlock() + + if len(sseSubs) > 0 { + notification := Notification{AppID: appID, Tables: tables} + for _, ch := range sseSubs { + select { + case ch <- notification: + default: + // Drop if channel full (subscriber is slow) + } + } + } } func (h *Hub) readLoop(ctx context.Context, client *Client) { @@ -252,6 +270,39 @@ func (h *Hub) removeClient(client *Client) { slog.Info("client disconnected", "userID", client.UserID, "appID", client.AppID) } +// Notification is sent to SSE subscribers when a sync event occurs. +type Notification struct { + AppID string + Tables []string +} + +// Subscribe creates a channel that receives notifications for a user. +// Used by SSE stream handlers to get notified of changes. +func (h *Hub) Subscribe(userID string) chan Notification { + ch := make(chan Notification, 32) + h.mu.Lock() + defer h.mu.Unlock() + if h.sseSubscribers == nil { + h.sseSubscribers = make(map[string][]chan Notification) + } + h.sseSubscribers[userID] = append(h.sseSubscribers[userID], ch) + return ch +} + +// Unsubscribe removes an SSE subscriber channel. +func (h *Hub) Unsubscribe(userID string, ch chan Notification) { + h.mu.Lock() + defer h.mu.Unlock() + subs := h.sseSubscribers[userID] + for i, sub := range subs { + if sub == ch { + h.sseSubscribers[userID] = append(subs[:i], subs[i+1:]...) + break + } + } + close(ch) +} + // ConnectedUsers returns the number of unique connected users. func (h *Hub) ConnectedUsers() int { h.mu.RLock()