feat(sync): add SSE streaming endpoint for real-time sync

New endpoint GET /sync/{appId}/stream sends Server-Sent Events with
change data directly, replacing the WebSocket notification + HTTP pull
round-trip pattern.

Server (Go):
- HandleStream() in handler.go: SSE endpoint with initial sync + live streaming
- Hub.Subscribe()/Unsubscribe() in hub.go: channel-based SSE subscriber system
- Notification type for type-safe SSE events
- convertChanges() helper extracted from duplicated code
- WriteTimeout set to 0 for SSE long-lived connections

Protocol: Client connects to /sync/{appId}/stream?collections=a,b&since=...
Server sends initial changes, then streams live changes as other clients sync.
Heartbeat every 30s keeps connection alive. Push still uses POST /sync/{appId}.

WebSocket remains available as fallback (not removed).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-02 22:24:10 +02:00
parent 8ba3c4c10d
commit 068a64b275
3 changed files with 216 additions and 6 deletions

View file

@ -58,6 +58,7 @@ func main() {
// Sync endpoints (Go 1.22+ routing patterns)
mux.HandleFunc("POST /sync/{appId}", handler.HandleSync)
mux.HandleFunc("GET /sync/{appId}/pull", handler.HandlePull)
mux.HandleFunc("GET /sync/{appId}/stream", handler.HandleStream)
// WebSocket endpoints
// Unified: one connection per user, receives all app notifications with appId in payload
@ -106,7 +107,7 @@ func main() {
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: c.Handler(mux),
ReadTimeout: 15 * time.Second,
WriteTimeout: 30 * time.Second,
WriteTimeout: 0, // Disabled for SSE streaming (long-lived connections)
IdleTimeout: 120 * time.Second,
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"github.com/manacore/mana-sync/internal/auth"
@ -270,3 +271,160 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// HandleStream provides a Server-Sent Events (SSE) endpoint that streams
// changes to the client in real-time. Replaces the WebSocket notification +
// HTTP pull round-trip with a single persistent connection.
//
// GET /sync/{appId}/stream?collections=tasks,projects&since=2024-01-01T10:00:00Z
func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
appID := r.PathValue("appId")
if appID == "" {
http.Error(w, "missing appId", http.StatusBadRequest)
return
}
clientID := r.Header.Get("X-Client-Id")
since := r.URL.Query().Get("since")
if since == "" {
since = "1970-01-01T00:00:00.000Z"
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
// SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ctx := r.Context()
const batchLimit = 1000
// Parse requested collections
var collections []string
if q := r.URL.Query().Get("collections"); q != "" {
for _, c := range strings.Split(q, ",") {
c = strings.TrimSpace(c)
if c != "" {
collections = append(collections, c)
}
}
}
// Track cursors per collection
cursors := make(map[string]string)
for _, coll := range collections {
cursors[coll] = since
}
// Initial sync: send pending changes since cursor for each collection
for _, coll := range collections {
changes, err := h.store.GetChangesSince(ctx, userID, appID, coll, since, clientID, batchLimit+1)
if err != nil {
slog.Error("SSE initial pull failed", "error", err, "collection", coll)
continue
}
hasMore := len(changes) > batchLimit
if hasMore {
changes = changes[:batchLimit]
}
if len(changes) > 0 {
cursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
cursors[coll] = cursor
sendChangeEvent(w, coll, h.convertChanges(changes), cursor, hasMore)
flusher.Flush()
}
}
// Subscribe to hub notifications for real-time updates
ch := h.hub.Subscribe(userID)
defer h.hub.Unsubscribe(userID, ch)
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case notification := <-ch:
if notification.AppID != appID {
continue
}
for _, table := range notification.Tables {
cursor := cursors[table]
if cursor == "" {
cursor = since
}
changes, err := h.store.GetChangesSince(ctx, userID, appID, table, cursor, clientID, batchLimit+1)
if err != nil || len(changes) == 0 {
continue
}
hasMore := len(changes) > batchLimit
if hasMore {
changes = changes[:batchLimit]
}
newCursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
cursors[table] = newCursor
sendChangeEvent(w, table, h.convertChanges(changes), newCursor, hasMore)
flusher.Flush()
}
case <-heartbeat.C:
fmt.Fprint(w, "event: heartbeat\ndata: {}\n\n")
flusher.Flush()
case <-ctx.Done():
return
}
}
}
// convertChanges transforms store rows into sync Change objects.
func (h *Handler) convertChanges(rows []store.ChangeRow) []Change {
changes := make([]Change, 0, len(rows))
for _, row := range rows {
c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
if value, ok := row.Data[field]; ok {
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
changes = append(changes, c)
}
return changes
}
func sendChangeEvent(w http.ResponseWriter, table string, changes []Change, syncedUntil string, hasMore bool) {
event := map[string]any{
"table": table, "changes": changes,
"syncedUntil": syncedUntil, "hasMore": hasMore,
}
data, _ := json.Marshal(event)
fmt.Fprintf(w, "event: changes\ndata: %s\n\n", data)
}

View file

@ -28,12 +28,14 @@ type Client struct {
cancel context.CancelFunc
}
// Hub manages WebSocket connections and broadcasts sync notifications.
// Hub manages WebSocket connections, SSE subscribers, and broadcasts sync notifications.
type Hub struct {
// clients maps userID -> set of clients
clients map[string]map[*Client]struct{}
mu sync.RWMutex
validator *auth.Validator
// clients maps userID -> set of WebSocket clients
clients map[string]map[*Client]struct{}
// sseSubscribers maps userID -> set of SSE notification channels
sseSubscribers map[string][]chan Notification
mu sync.RWMutex
validator *auth.Validator
}
// NewHub creates a new WebSocket hub.
@ -114,6 +116,22 @@ func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string)
}
}(client)
}
// Also notify SSE subscribers
h.mu.RLock()
sseSubs := h.sseSubscribers[userID]
h.mu.RUnlock()
if len(sseSubs) > 0 {
notification := Notification{AppID: appID, Tables: tables}
for _, ch := range sseSubs {
select {
case ch <- notification:
default:
// Drop if channel full (subscriber is slow)
}
}
}
}
func (h *Hub) readLoop(ctx context.Context, client *Client) {
@ -252,6 +270,39 @@ func (h *Hub) removeClient(client *Client) {
slog.Info("client disconnected", "userID", client.UserID, "appID", client.AppID)
}
// Notification is sent to SSE subscribers when a sync event occurs.
type Notification struct {
AppID string
Tables []string
}
// Subscribe creates a channel that receives notifications for a user.
// Used by SSE stream handlers to get notified of changes.
func (h *Hub) Subscribe(userID string) chan Notification {
ch := make(chan Notification, 32)
h.mu.Lock()
defer h.mu.Unlock()
if h.sseSubscribers == nil {
h.sseSubscribers = make(map[string][]chan Notification)
}
h.sseSubscribers[userID] = append(h.sseSubscribers[userID], ch)
return ch
}
// Unsubscribe removes an SSE subscriber channel.
func (h *Hub) Unsubscribe(userID string, ch chan Notification) {
h.mu.Lock()
defer h.mu.Unlock()
subs := h.sseSubscribers[userID]
for i, sub := range subs {
if sub == ch {
h.sseSubscribers[userID] = append(subs[:i], subs[i+1:]...)
break
}
}
close(ch)
}
// ConnectedUsers returns the number of unique connected users.
func (h *Hub) ConnectedUsers() int {
h.mu.RLock()