feat(sync): add pull pagination with hasMore flag

Server now returns hasMore: true when there are more than 1000 changes
pending for a collection. Client continues pulling in a loop until
hasMore is false, using the last row's timestamp as cursor.

Prevents data loss after long offline periods where >1000 changes
accumulated for a single collection.

Server changes (Go):
- GetChangesSince() accepts limit parameter
- HandlePull() fetches limit+1, trims, sets hasMore
- SyncedUntil uses last row's timestamp when paginating

Client changes (TypeScript):
- Pull loop: while (hasMore) { fetch → apply → advance cursor }
- Cursor only persisted after all pages fetched

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-02 22:17:20 +02:00
parent 03434c2802
commit f7f5c9eb3a
4 changed files with 52 additions and 27 deletions

View file

@ -201,30 +201,39 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
try {
for (const tableName of channel.tables) {
const syncName = toSyncName(tableName);
const cursor = await getSyncCursor(appId, tableName);
let cursor = await getSyncCursor(appId, tableName);
let hasMore = true;
const res = await fetch(
`${serverUrl}/sync/${appId}/pull?collection=${encodeURIComponent(syncName)}&since=${encodeURIComponent(cursor)}`,
{
headers: {
Authorization: `Bearer ${token}`,
'X-Client-Id': clientId,
},
// Paginated pull: continue fetching until server signals no more data
while (hasMore) {
const res = await fetch(
`${serverUrl}/sync/${appId}/pull?collection=${encodeURIComponent(syncName)}&since=${encodeURIComponent(cursor)}`,
{
headers: {
Authorization: `Bearer ${token}`,
'X-Client-Id': clientId,
},
}
);
if (!res.ok) break;
const data = await res.json();
hasMore = data.hasMore ?? false;
if (data.serverChanges && data.serverChanges.length > 0) {
await applyServerChanges(appId, data.serverChanges);
}
);
if (!res.ok) continue;
const data = await res.json();
if (!data.serverChanges || data.serverChanges.length === 0) continue;
// Apply changes to local DB
await applyServerChanges(appId, data.serverChanges);
// Update cursor
if (data.syncedUntil) {
await setSyncCursor(appId, tableName, data.syncedUntil);
if (data.syncedUntil) {
cursor = data.syncedUntil;
} else {
break;
}
}
// Update cursor after all pages fetched
await setSyncCursor(appId, tableName, cursor);
}
channel.lastError = null;

View file

@ -84,9 +84,10 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us
return err
}
// GetChangesSince returns all changes for a user+app+table since a given timestamp,
// GetChangesSince returns changes for a user+app+table since a given timestamp,
// excluding changes from the requesting client (to avoid echo).
func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, since, excludeClientID string) ([]ChangeRow, error) {
// The limit parameter controls maximum rows returned (caller should pass limit+1 to detect hasMore).
func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, since, excludeClientID string, limit int) ([]ChangeRow, error) {
sinceTime, err := time.Parse(time.RFC3339Nano, since)
if err != nil {
sinceTime = time.Unix(0, 0)
@ -98,10 +99,10 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
WHERE user_id = $1 AND app_id = $2 AND table_name = $3
AND created_at > $4 AND client_id != $5
ORDER BY created_at ASC
LIMIT 1000
LIMIT $6
`
rows, err := s.pool.Query(ctx, query, userID, appID, tableName, sinceTime, excludeClientID)
rows, err := s.pool.Query(ctx, query, userID, appID, tableName, sinceTime, excludeClientID, limit)
if err != nil {
return nil, err
}

View file

@ -206,13 +206,20 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) {
}
ctx := r.Context()
serverChanges, err := h.store.GetChangesSince(ctx, userID, appID, collection, since, clientID)
const batchLimit = 1000
serverChanges, err := h.store.GetChangesSince(ctx, userID, appID, collection, since, clientID, batchLimit+1)
if err != nil {
slog.Error("failed to get changes", "error", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// Check if there are more rows than the batch limit
hasMore := len(serverChanges) > batchLimit
if hasMore {
serverChanges = serverChanges[:batchLimit]
}
responseChanges := make([]Change, 0, len(serverChanges))
for _, row := range serverChanges {
c := Change{
@ -245,12 +252,19 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) {
responseChanges = append(responseChanges, c)
}
now := time.Now().UTC().Format(time.RFC3339Nano)
// When paginating, use last row's timestamp as cursor; otherwise now()
var syncedUntil string
if hasMore && len(serverChanges) > 0 {
syncedUntil = serverChanges[len(serverChanges)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
} else {
syncedUntil = time.Now().UTC().Format(time.RFC3339Nano)
}
resp := SyncResponse{
ServerChanges: responseChanges,
Conflicts: []SyncConflict{},
SyncedUntil: now,
SyncedUntil: syncedUntil,
HasMore: hasMore,
}
w.Header().Set("Content-Type", "application/json")

View file

@ -31,6 +31,7 @@ type SyncResponse struct {
ServerChanges []Change `json:"serverChanges"`
Conflicts []SyncConflict `json:"conflicts"`
SyncedUntil string `json:"syncedUntil"`
HasMore bool `json:"hasMore,omitempty"`
}
// SyncConflict describes a conflict that couldn't be auto-resolved.