diff --git a/apps/mana/apps/web/src/lib/data/sync.ts b/apps/mana/apps/web/src/lib/data/sync.ts index b6a1a60ed..bc57faea5 100644 --- a/apps/mana/apps/web/src/lib/data/sync.ts +++ b/apps/mana/apps/web/src/lib/data/sync.ts @@ -37,6 +37,17 @@ export interface FieldChange { updatedAt: string; } +/** + * Wire protocol version this build emits on every push. Bumped together with + * a matching migration on both client and mana-sync so older events can be + * replayed forward during import and live pulls. + * + * Pre-M2 events land as v1 (server default). Anything above this on the wire + * from the server is an older client talking to a newer one — tolerated and + * routed through the migration chain on apply. + */ +export const CURRENT_SCHEMA_VERSION = 1; + /** * One row of a changeset on the wire. Pending changes (local) and server * changes (remote) share the same shape so the validator can be reused. @@ -45,8 +56,14 @@ export interface FieldChange { * - `op === 'update'` requires `fields` (record-level `data` is ignored). * - `op === 'insert'` requires `data`. * - A `deletedAt` flag implies a soft delete regardless of `op`. + * + * `eventId` and `schemaVersion` are only populated on server->client payloads. + * Clients should use `eventId` to dedup on import replay; `schemaVersion` + * decides which migration chain to run before apply. */ export interface SyncChange { + eventId?: string; + schemaVersion?: number; table: string; id: string; op: SyncOp; @@ -113,6 +130,8 @@ export function isValidSyncChange(v: unknown): v is SyncChange { if (c.fields !== undefined && !isFieldsMap(c.fields)) return false; if (c.data !== undefined && (typeof c.data !== 'object' || c.data === null)) return false; if (c.deletedAt !== undefined && typeof c.deletedAt !== 'string') return false; + if (c.eventId !== undefined && typeof c.eventId !== 'string') return false; + if (c.schemaVersion !== undefined && typeof c.schemaVersion !== 'number') return false; return true; } @@ -1033,6 +1052,7 @@ export function createUnifiedSync( return { clientId: cid, since, + schemaVersion: CURRENT_SCHEMA_VERSION, changes: pending.map((p) => ({ table: toSyncName(p.collection), id: p.recordId, diff --git a/services/mana-sync/internal/backup/handler.go b/services/mana-sync/internal/backup/handler.go new file mode 100644 index 000000000..df9c77378 --- /dev/null +++ b/services/mana-sync/internal/backup/handler.go @@ -0,0 +1,123 @@ +// Package backup implements the M1 thin-slice user-data backup endpoint. +// +// Streams every sync_changes row owned by the authenticated user as JSON Lines +// (one Change per line). The body is the raw event stream from mana-sync — +// identical in shape to what live sync emits, so a future restore endpoint can +// replay it via the existing applyServerChanges() path on the client. +// +// Field-level ciphertext passes through untouched: the registry-encrypted +// fields are already encrypted when they reach this table, so the file is +// effectively encrypted at rest for sensitive fields. +package backup + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/mana/mana-sync/internal/auth" + "github.com/mana/mana-sync/internal/store" +) + +// Handler serves the /backup/export endpoint. +type Handler struct { + store *store.Store + validator *auth.Validator +} + +// NewHandler constructs a backup handler. +func NewHandler(s *store.Store, v *auth.Validator) *Handler { + return &Handler{store: s, validator: v} +} + +// exportLine is the on-wire shape of one row in the JSONL body. Field names +// mirror the sync-protocol Change shape as closely as possible; the restore +// side maps these back into SyncChange objects. +type exportLine struct { + EventID string `json:"eventId"` + SchemaVersion int `json:"schemaVersion"` + AppID string `json:"appId"` + Table string `json:"table"` + RecordID string `json:"id"` + Op string `json:"op"` + Data map[string]any `json:"data,omitempty"` + FieldTimestamps map[string]string `json:"fieldTimestamps,omitempty"` + ClientID string `json:"clientId"` + CreatedAt string `json:"createdAt"` +} + +// HandleExport streams the authenticated user's full sync_changes log as +// JSONL. This is the M1 thin slice of the backup/restore feature — no zip, +// no manifest, no signature yet. Those land in M3. +// +// GDPR-bypass for billing: the route is wired outside the billing middleware +// in main.go, so users can always export their data even if their sync +// subscription is inactive. +func (h *Handler) HandleExport(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + userID, err := h.validator.UserIDFromRequest(r) + if err != nil { + http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized) + return + } + + filename := fmt.Sprintf("mana-backup-%s-%s.jsonl", userID, time.Now().UTC().Format("20060102-150405")) + + w.Header().Set("Content-Type", "application/x-ndjson") + w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, filename)) + w.Header().Set("X-Content-Type-Options", "nosniff") + // Disable proxy buffering so the response streams as rows arrive. + w.Header().Set("X-Accel-Buffering", "no") + w.Header().Set("Cache-Control", "no-store") + + flusher, _ := w.(http.Flusher) + encoder := json.NewEncoder(w) + + var count int + streamErr := h.store.StreamAllUserChanges(r.Context(), userID, func(row store.ChangeRow) error { + sv := row.SchemaVersion + if sv <= 0 { + sv = 1 + } + line := exportLine{ + EventID: row.ID, + SchemaVersion: sv, + AppID: row.AppID, + Table: row.TableName, + RecordID: row.RecordID, + Op: row.Op, + Data: row.Data, + FieldTimestamps: row.FieldTimestamps, + ClientID: row.ClientID, + CreatedAt: row.CreatedAt.UTC().Format(time.RFC3339Nano), + } + if err := encoder.Encode(line); err != nil { + return err + } + count++ + // Flush every ~500 rows so big exports show progress over the wire. + if flusher != nil && count%500 == 0 { + flusher.Flush() + } + return nil + }) + if flusher != nil { + flusher.Flush() + } + + if streamErr != nil { + // Headers are already sent, so we cannot change the status code. + // Log and let the client detect truncation via the row count it expected. + // (M3 will add a manifest with eventCount + sha256 for integrity checking.) + slog.Error("backup export stream failed", "user_id", userID, "written", count, "error", streamErr) + return + } + + slog.Info("backup export ok", "user_id", userID, "rows", count) +} diff --git a/services/mana-sync/internal/store/postgres.go b/services/mana-sync/internal/store/postgres.go index 444bc66b0..9b38a63c6 100644 --- a/services/mana-sync/internal/store/postgres.go +++ b/services/mana-sync/internal/store/postgres.go @@ -52,9 +52,18 @@ func (s *Store) Migrate(ctx context.Context) error { data JSONB, field_timestamps JSONB DEFAULT '{}', client_id TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + -- M2: schema_version lets us evolve the Change wire shape over time. + -- Default 1 covers rows written before the column existed so the + -- backup/restore pipeline can always feed them through a migration + -- chain keyed on this value. + schema_version INT NOT NULL DEFAULT 1 ); + -- Idempotent add for databases created before M2 shipped. + ALTER TABLE sync_changes + ADD COLUMN IF NOT EXISTS schema_version INT NOT NULL DEFAULT 1; + CREATE INDEX IF NOT EXISTS idx_sync_changes_user_app ON sync_changes (user_id, app_id, created_at); @@ -112,7 +121,11 @@ func (s *Store) withUser(ctx context.Context, userID string, fn func(pgx.Tx) err // RecordChange stores a client change in the database. The insert is performed // inside an RLS-scoped transaction so the user_id column is double-checked // against the policy on the way in — a mismatched user_id would fail WITH CHECK. -func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string) error { +func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int) error { + if schemaVersion <= 0 { + schemaVersion = 1 + } + dataJSON, err := json.Marshal(data) if err != nil { return fmt.Errorf("marshal data: %w", err) @@ -125,10 +138,10 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us return s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ` - _, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID) + _, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion) return err }) } @@ -145,7 +158,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s var changes []ChangeRow err = s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at + SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version FROM sync_changes WHERE user_id = $1 AND app_id = $2 AND table_name = $3 AND created_at > $4 AND client_id != $5 @@ -162,7 +175,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s var c ChangeRow var dataJSON, ftJSON []byte - if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt); err != nil { + if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil { return err } @@ -194,7 +207,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex var changes []ChangeRow err = s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at + SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version FROM sync_changes WHERE user_id = $1 AND app_id = $2 AND created_at > $3 AND client_id != $4 @@ -211,7 +224,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex var c ChangeRow var dataJSON, ftJSON []byte - if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt); err != nil { + if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil { return err } @@ -233,8 +246,52 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex return changes, err } +// StreamAllUserChanges iterates every sync_changes row owned by userID, across +// all apps/tables, in chronological order, invoking fn for each row. Designed +// for the backup/export endpoint — unbounded result set, so rows are streamed +// via a cursor-free single query (pgx streams rows as they arrive from the +// server). If fn returns an error, iteration stops and the error is returned. +func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func(ChangeRow) error) error { + return s.withUser(ctx, userID, func(tx pgx.Tx) error { + query := ` + SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version + FROM sync_changes + WHERE user_id = $1 + ORDER BY created_at ASC, id ASC + ` + rows, err := tx.Query(ctx, query, userID) + if err != nil { + return fmt.Errorf("query: %w", err) + } + defer rows.Close() + + for rows.Next() { + var c ChangeRow + var dataJSON, ftJSON []byte + if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil { + return fmt.Errorf("scan: %w", err) + } + if dataJSON != nil { + if err := json.Unmarshal(dataJSON, &c.Data); err != nil { + return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err) + } + } + if ftJSON != nil { + if err := json.Unmarshal(ftJSON, &c.FieldTimestamps); err != nil { + return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err) + } + } + if err := fn(c); err != nil { + return err + } + } + return rows.Err() + }) +} + // ChangeRow is a row from the sync_changes table. type ChangeRow struct { + AppID string ID string TableName string RecordID string @@ -243,4 +300,5 @@ type ChangeRow struct { FieldTimestamps map[string]string ClientID string CreatedAt time.Time + SchemaVersion int } diff --git a/services/mana-sync/internal/sync/handler.go b/services/mana-sync/internal/sync/handler.go index 9757421d8..14d295344 100644 --- a/services/mana-sync/internal/sync/handler.go +++ b/services/mana-sync/internal/sync/handler.go @@ -28,6 +28,41 @@ func NewHandler(s *store.Store, v *auth.Validator, h *ws.Hub) *Handler { // maxBodySize is the maximum allowed request body (10 MB). const maxBodySize = 10 * 1024 * 1024 +// changeFromRow projects a stored sync_changes row onto the wire Change shape. +// Carries eventId + schemaVersion through so clients can dedup on replay and +// route through the migration chain. +func changeFromRow(row store.ChangeRow) Change { + sv := row.SchemaVersion + if sv <= 0 { + sv = 1 + } + c := Change{ + EventID: row.ID, + SchemaVersion: sv, + Table: row.TableName, + ID: row.RecordID, + Op: row.Op, + } + switch row.Op { + case "insert": + c.Data = row.Data + case "update": + c.Fields = make(map[string]*FieldChange) + for field, ts := range row.FieldTimestamps { + value, ok := row.Data[field] + if !ok { + continue + } + c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts} + } + case "delete": + if deletedAt, ok := row.Data["deletedAt"].(string); ok { + c.DeletedAt = &deletedAt + } + } + return c +} + // validOps are the allowed sync operation types. var validOps = map[string]bool{"insert": true, "update": true, "delete": true} @@ -63,6 +98,18 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) { return } + // Normalize + validate protocol version. Pre-M2 clients omit the field + // (treated as v1); a client newer than this build is refused so we don't + // silently store events we can't fully interpret. + schemaVersion := changeset.SchemaVersion + if schemaVersion <= 0 { + schemaVersion = 1 + } + if schemaVersion > MaxSupportedSchemaVersion { + http.Error(w, fmt.Sprintf("unsupported schemaVersion %d (max %d)", schemaVersion, MaxSupportedSchemaVersion), http.StatusBadRequest) + return + } + // Validate changes for i, change := range changeset.Changes { if !validOps[change.Op] { @@ -107,7 +154,14 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) { } } - err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps) + // Per-change schemaVersion falls back to the changeset-level value + // so a well-formed pre-M2 change nested in an M2 changeset still + // lands on the right version. + rowSchemaVersion := change.SchemaVersion + if rowSchemaVersion <= 0 { + rowSchemaVersion = schemaVersion + } + err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion) if err != nil { slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID) http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError) @@ -126,34 +180,7 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) { // Convert store rows to sync changes responseChanges := make([]Change, 0, len(serverChanges)) for _, row := range serverChanges { - c := Change{ - Table: row.TableName, - ID: row.RecordID, - Op: row.Op, - } - - switch row.Op { - case "insert": - c.Data = row.Data - case "update": - c.Fields = make(map[string]*FieldChange) - for field, ts := range row.FieldTimestamps { - value, ok := row.Data[field] - if !ok { - continue - } - c.Fields[field] = &FieldChange{ - Value: value, - UpdatedAt: ts, - } - } - case "delete": - if deletedAt, ok := row.Data["deletedAt"].(string); ok { - c.DeletedAt = &deletedAt - } - } - - responseChanges = append(responseChanges, c) + responseChanges = append(responseChanges, changeFromRow(row)) } now := time.Now().UTC().Format(time.RFC3339Nano) @@ -223,34 +250,7 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) { responseChanges := make([]Change, 0, len(serverChanges)) for _, row := range serverChanges { - c := Change{ - Table: row.TableName, - ID: row.RecordID, - Op: row.Op, - } - - switch row.Op { - case "insert": - c.Data = row.Data - case "update": - c.Fields = make(map[string]*FieldChange) - for field, ts := range row.FieldTimestamps { - value, ok := row.Data[field] - if !ok { - continue - } - c.Fields[field] = &FieldChange{ - Value: value, - UpdatedAt: ts, - } - } - case "delete": - if deletedAt, ok := row.Data["deletedAt"].(string); ok { - c.DeletedAt = &deletedAt - } - } - - responseChanges = append(responseChanges, c) + responseChanges = append(responseChanges, changeFromRow(row)) } // When paginating, use last row's timestamp as cursor; otherwise now() diff --git a/services/mana-sync/internal/sync/types.go b/services/mana-sync/internal/sync/types.go index 42c365384..f30d66749 100644 --- a/services/mana-sync/internal/sync/types.go +++ b/services/mana-sync/internal/sync/types.go @@ -2,14 +2,33 @@ package sync import "time" +// CurrentSchemaVersion is the protocol version that this build emits for every +// new change. Bump only with a matching migration registered on both the Go +// server and the TS client so older events can be replayed forward during +// import and live sync. +const CurrentSchemaVersion = 1 + +// MaxSupportedSchemaVersion is the highest schemaVersion the server will accept +// from a client today. A client running ahead of the server is refused with +// 400; the reverse case (server ahead) replays old events through the migration +// chain on the receiving side. +const MaxSupportedSchemaVersion = 1 + // Change represents a single field-level change to a record. +// +// EventID and SchemaVersion are populated on server->client payloads so +// clients can dedup on replay (import flow) and route events through the +// migration chain. Client->server pushes leave EventID empty — the server +// assigns a UUID on insert. type Change struct { - Table string `json:"table"` - ID string `json:"id"` - Op string `json:"op"` // "insert", "update", "delete" - Fields map[string]*FieldChange `json:"fields,omitempty"` - Data map[string]any `json:"data,omitempty"` - DeletedAt *string `json:"deletedAt,omitempty"` + EventID string `json:"eventId,omitempty"` + SchemaVersion int `json:"schemaVersion,omitempty"` + Table string `json:"table"` + ID string `json:"id"` + Op string `json:"op"` // "insert", "update", "delete" + Fields map[string]*FieldChange `json:"fields,omitempty"` + Data map[string]any `json:"data,omitempty"` + DeletedAt *string `json:"deletedAt,omitempty"` } // FieldChange holds a value and the timestamp when it was last changed. @@ -19,11 +38,16 @@ type FieldChange struct { } // Changeset is a batch of changes sent by a client. +// +// SchemaVersion is the protocol version the client is emitting. Missing/zero +// is treated as 1 for compatibility with pre-M2 clients; anything above +// MaxSupportedSchemaVersion is refused. type Changeset struct { - ClientID string `json:"clientId"` - AppID string `json:"appId"` - Since string `json:"since"` // ISO timestamp - Changes []Change `json:"changes"` + ClientID string `json:"clientId"` + AppID string `json:"appId"` + Since string `json:"since"` // ISO timestamp + Changes []Change `json:"changes"` + SchemaVersion int `json:"schemaVersion,omitempty"` } // SyncResponse is returned after processing a changeset.