From f7f5c9eb3a48dab7bb86095c2a850698645e0fe3 Mon Sep 17 00:00:00 2001 From: Till JS Date: Thu, 2 Apr 2026 22:17:20 +0200 Subject: [PATCH] feat(sync): add pull pagination with hasMore flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server now returns hasMore: true when there are more than 1000 changes pending for a collection. Client continues pulling in a loop until hasMore is false, using the last row's timestamp as cursor. Prevents data loss after long offline periods where >1000 changes accumulated for a single collection. Server changes (Go): - GetChangesSince() accepts limit parameter - HandlePull() fetches limit+1, trims, sets hasMore - SyncedUntil uses last row's timestamp when paginating Client changes (TypeScript): - Pull loop: while (hasMore) { fetch → apply → advance cursor } - Cursor only persisted after all pages fetched Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/manacore/apps/web/src/lib/data/sync.ts | 49 +++++++++++-------- services/mana-sync/internal/store/postgres.go | 9 ++-- services/mana-sync/internal/sync/handler.go | 20 ++++++-- services/mana-sync/internal/sync/types.go | 1 + 4 files changed, 52 insertions(+), 27 deletions(-) diff --git a/apps/manacore/apps/web/src/lib/data/sync.ts b/apps/manacore/apps/web/src/lib/data/sync.ts index 0f3d593e5..8458d6fdf 100644 --- a/apps/manacore/apps/web/src/lib/data/sync.ts +++ b/apps/manacore/apps/web/src/lib/data/sync.ts @@ -201,30 +201,39 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise 0) { + await applyServerChanges(appId, data.serverChanges); } - ); - if (!res.ok) continue; - - const data = await res.json(); - if (!data.serverChanges || data.serverChanges.length === 0) continue; - - // Apply changes to local DB - await applyServerChanges(appId, data.serverChanges); - - // Update cursor - if (data.syncedUntil) { - await setSyncCursor(appId, tableName, data.syncedUntil); + if (data.syncedUntil) { + cursor = data.syncedUntil; + } else { + break; + } } + + // Update cursor after all pages fetched + await setSyncCursor(appId, tableName, cursor); } channel.lastError = null; diff --git a/services/mana-sync/internal/store/postgres.go b/services/mana-sync/internal/store/postgres.go index 18ec6db37..a3714e4a6 100644 --- a/services/mana-sync/internal/store/postgres.go +++ b/services/mana-sync/internal/store/postgres.go @@ -84,9 +84,10 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us return err } -// GetChangesSince returns all changes for a user+app+table since a given timestamp, +// GetChangesSince returns changes for a user+app+table since a given timestamp, // excluding changes from the requesting client (to avoid echo). -func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, since, excludeClientID string) ([]ChangeRow, error) { +// The limit parameter controls maximum rows returned (caller should pass limit+1 to detect hasMore). +func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, since, excludeClientID string, limit int) ([]ChangeRow, error) { sinceTime, err := time.Parse(time.RFC3339Nano, since) if err != nil { sinceTime = time.Unix(0, 0) @@ -98,10 +99,10 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s WHERE user_id = $1 AND app_id = $2 AND table_name = $3 AND created_at > $4 AND client_id != $5 ORDER BY created_at ASC - LIMIT 1000 + LIMIT $6 ` - rows, err := s.pool.Query(ctx, query, userID, appID, tableName, sinceTime, excludeClientID) + rows, err := s.pool.Query(ctx, query, userID, appID, tableName, sinceTime, excludeClientID, limit) if err != nil { return nil, err } diff --git a/services/mana-sync/internal/sync/handler.go b/services/mana-sync/internal/sync/handler.go index 68fa59722..40d239eb9 100644 --- a/services/mana-sync/internal/sync/handler.go +++ b/services/mana-sync/internal/sync/handler.go @@ -206,13 +206,20 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) { } ctx := r.Context() - serverChanges, err := h.store.GetChangesSince(ctx, userID, appID, collection, since, clientID) + const batchLimit = 1000 + serverChanges, err := h.store.GetChangesSince(ctx, userID, appID, collection, since, clientID, batchLimit+1) if err != nil { slog.Error("failed to get changes", "error", err) http.Error(w, "internal error", http.StatusInternalServerError) return } + // Check if there are more rows than the batch limit + hasMore := len(serverChanges) > batchLimit + if hasMore { + serverChanges = serverChanges[:batchLimit] + } + responseChanges := make([]Change, 0, len(serverChanges)) for _, row := range serverChanges { c := Change{ @@ -245,12 +252,19 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) { responseChanges = append(responseChanges, c) } - now := time.Now().UTC().Format(time.RFC3339Nano) + // When paginating, use last row's timestamp as cursor; otherwise now() + var syncedUntil string + if hasMore && len(serverChanges) > 0 { + syncedUntil = serverChanges[len(serverChanges)-1].CreatedAt.UTC().Format(time.RFC3339Nano) + } else { + syncedUntil = time.Now().UTC().Format(time.RFC3339Nano) + } resp := SyncResponse{ ServerChanges: responseChanges, Conflicts: []SyncConflict{}, - SyncedUntil: now, + SyncedUntil: syncedUntil, + HasMore: hasMore, } w.Header().Set("Content-Type", "application/json") diff --git a/services/mana-sync/internal/sync/types.go b/services/mana-sync/internal/sync/types.go index e60226bb7..42c365384 100644 --- a/services/mana-sync/internal/sync/types.go +++ b/services/mana-sync/internal/sync/types.go @@ -31,6 +31,7 @@ type SyncResponse struct { ServerChanges []Change `json:"serverChanges"` Conflicts []SyncConflict `json:"conflicts"` SyncedUntil string `json:"syncedUntil"` + HasMore bool `json:"hasMore,omitempty"` } // SyncConflict describes a conflict that couldn't be auto-resolved.