feat(sync): schemaVersion + eventId on wire (M2 protocol hardening)

- sync_changes gains schema_version column (default 1, idempotent ADD)
- Change/Changeset carry schemaVersion; server refuses > MaxSupported
- server->client changes now carry eventId + schemaVersion so the
  restore path can dedup via eventId and route through a migration
  chain keyed on schemaVersion
- backup JSONL gains schemaVersion per line

Pre-M2 clients (omit the field) are treated as v1 for compatibility.
This is the stability contract we commit to before launch: once v1
events are in the wild, all future builds must replay them forward.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-14 15:25:32 +02:00
parent e4f0a410d1
commit 79996f946a
5 changed files with 301 additions and 76 deletions

View file

@ -37,6 +37,17 @@ export interface FieldChange {
updatedAt: string;
}
/**
* Wire protocol version this build emits on every push. Bumped together with
* a matching migration on both client and mana-sync so older events can be
* replayed forward during import and live pulls.
*
* Pre-M2 events land as v1 (server default). Anything above this on the wire
* from the server is an older client talking to a newer one tolerated and
* routed through the migration chain on apply.
*/
export const CURRENT_SCHEMA_VERSION = 1;
/**
* One row of a changeset on the wire. Pending changes (local) and server
* changes (remote) share the same shape so the validator can be reused.
@ -45,8 +56,14 @@ export interface FieldChange {
* - `op === 'update'` requires `fields` (record-level `data` is ignored).
* - `op === 'insert'` requires `data`.
* - A `deletedAt` flag implies a soft delete regardless of `op`.
*
* `eventId` and `schemaVersion` are only populated on server->client payloads.
* Clients should use `eventId` to dedup on import replay; `schemaVersion`
* decides which migration chain to run before apply.
*/
export interface SyncChange {
eventId?: string;
schemaVersion?: number;
table: string;
id: string;
op: SyncOp;
@ -113,6 +130,8 @@ export function isValidSyncChange(v: unknown): v is SyncChange {
if (c.fields !== undefined && !isFieldsMap(c.fields)) return false;
if (c.data !== undefined && (typeof c.data !== 'object' || c.data === null)) return false;
if (c.deletedAt !== undefined && typeof c.deletedAt !== 'string') return false;
if (c.eventId !== undefined && typeof c.eventId !== 'string') return false;
if (c.schemaVersion !== undefined && typeof c.schemaVersion !== 'number') return false;
return true;
}
@ -1033,6 +1052,7 @@ export function createUnifiedSync(
return {
clientId: cid,
since,
schemaVersion: CURRENT_SCHEMA_VERSION,
changes: pending.map((p) => ({
table: toSyncName(p.collection),
id: p.recordId,

View file

@ -0,0 +1,123 @@
// Package backup implements the M1 thin-slice user-data backup endpoint.
//
// Streams every sync_changes row owned by the authenticated user as JSON Lines
// (one Change per line). The body is the raw event stream from mana-sync —
// identical in shape to what live sync emits, so a future restore endpoint can
// replay it via the existing applyServerChanges() path on the client.
//
// Field-level ciphertext passes through untouched: the registry-encrypted
// fields are already encrypted when they reach this table, so the file is
// effectively encrypted at rest for sensitive fields.
package backup
import (
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/mana/mana-sync/internal/auth"
"github.com/mana/mana-sync/internal/store"
)
// Handler serves the /backup/export endpoint.
type Handler struct {
store *store.Store
validator *auth.Validator
}
// NewHandler constructs a backup handler.
func NewHandler(s *store.Store, v *auth.Validator) *Handler {
return &Handler{store: s, validator: v}
}
// exportLine is the on-wire shape of one row in the JSONL body. Field names
// mirror the sync-protocol Change shape as closely as possible; the restore
// side maps these back into SyncChange objects.
type exportLine struct {
EventID string `json:"eventId"`
SchemaVersion int `json:"schemaVersion"`
AppID string `json:"appId"`
Table string `json:"table"`
RecordID string `json:"id"`
Op string `json:"op"`
Data map[string]any `json:"data,omitempty"`
FieldTimestamps map[string]string `json:"fieldTimestamps,omitempty"`
ClientID string `json:"clientId"`
CreatedAt string `json:"createdAt"`
}
// HandleExport streams the authenticated user's full sync_changes log as
// JSONL. This is the M1 thin slice of the backup/restore feature — no zip,
// no manifest, no signature yet. Those land in M3.
//
// GDPR-bypass for billing: the route is wired outside the billing middleware
// in main.go, so users can always export their data even if their sync
// subscription is inactive.
func (h *Handler) HandleExport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
filename := fmt.Sprintf("mana-backup-%s-%s.jsonl", userID, time.Now().UTC().Format("20060102-150405"))
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, filename))
w.Header().Set("X-Content-Type-Options", "nosniff")
// Disable proxy buffering so the response streams as rows arrive.
w.Header().Set("X-Accel-Buffering", "no")
w.Header().Set("Cache-Control", "no-store")
flusher, _ := w.(http.Flusher)
encoder := json.NewEncoder(w)
var count int
streamErr := h.store.StreamAllUserChanges(r.Context(), userID, func(row store.ChangeRow) error {
sv := row.SchemaVersion
if sv <= 0 {
sv = 1
}
line := exportLine{
EventID: row.ID,
SchemaVersion: sv,
AppID: row.AppID,
Table: row.TableName,
RecordID: row.RecordID,
Op: row.Op,
Data: row.Data,
FieldTimestamps: row.FieldTimestamps,
ClientID: row.ClientID,
CreatedAt: row.CreatedAt.UTC().Format(time.RFC3339Nano),
}
if err := encoder.Encode(line); err != nil {
return err
}
count++
// Flush every ~500 rows so big exports show progress over the wire.
if flusher != nil && count%500 == 0 {
flusher.Flush()
}
return nil
})
if flusher != nil {
flusher.Flush()
}
if streamErr != nil {
// Headers are already sent, so we cannot change the status code.
// Log and let the client detect truncation via the row count it expected.
// (M3 will add a manifest with eventCount + sha256 for integrity checking.)
slog.Error("backup export stream failed", "user_id", userID, "written", count, "error", streamErr)
return
}
slog.Info("backup export ok", "user_id", userID, "rows", count)
}

View file

@ -52,9 +52,18 @@ func (s *Store) Migrate(ctx context.Context) error {
data JSONB,
field_timestamps JSONB DEFAULT '{}',
client_id TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- M2: schema_version lets us evolve the Change wire shape over time.
-- Default 1 covers rows written before the column existed so the
-- backup/restore pipeline can always feed them through a migration
-- chain keyed on this value.
schema_version INT NOT NULL DEFAULT 1
);
-- Idempotent add for databases created before M2 shipped.
ALTER TABLE sync_changes
ADD COLUMN IF NOT EXISTS schema_version INT NOT NULL DEFAULT 1;
CREATE INDEX IF NOT EXISTS idx_sync_changes_user_app
ON sync_changes (user_id, app_id, created_at);
@ -112,7 +121,11 @@ func (s *Store) withUser(ctx context.Context, userID string, fn func(pgx.Tx) err
// RecordChange stores a client change in the database. The insert is performed
// inside an RLS-scoped transaction so the user_id column is double-checked
// against the policy on the way in — a mismatched user_id would fail WITH CHECK.
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string) error {
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int) error {
if schemaVersion <= 0 {
schemaVersion = 1
}
dataJSON, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("marshal data: %w", err)
@ -125,10 +138,10 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID)
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion)
return err
})
}
@ -145,7 +158,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
var changes []ChangeRow
err = s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
FROM sync_changes
WHERE user_id = $1 AND app_id = $2 AND table_name = $3
AND created_at > $4 AND client_id != $5
@ -162,7 +175,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
var c ChangeRow
var dataJSON, ftJSON []byte
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt); err != nil {
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
return err
}
@ -194,7 +207,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
var changes []ChangeRow
err = s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
FROM sync_changes
WHERE user_id = $1 AND app_id = $2
AND created_at > $3 AND client_id != $4
@ -211,7 +224,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
var c ChangeRow
var dataJSON, ftJSON []byte
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt); err != nil {
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
return err
}
@ -233,8 +246,52 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
return changes, err
}
// StreamAllUserChanges iterates every sync_changes row owned by userID, across
// all apps/tables, in chronological order, invoking fn for each row. Designed
// for the backup/export endpoint — unbounded result set, so rows are streamed
// via a cursor-free single query (pgx streams rows as they arrive from the
// server). If fn returns an error, iteration stops and the error is returned.
func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func(ChangeRow) error) error {
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
FROM sync_changes
WHERE user_id = $1
ORDER BY created_at ASC, id ASC
`
rows, err := tx.Query(ctx, query, userID)
if err != nil {
return fmt.Errorf("query: %w", err)
}
defer rows.Close()
for rows.Next() {
var c ChangeRow
var dataJSON, ftJSON []byte
if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
return fmt.Errorf("scan: %w", err)
}
if dataJSON != nil {
if err := json.Unmarshal(dataJSON, &c.Data); err != nil {
return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err)
}
}
if ftJSON != nil {
if err := json.Unmarshal(ftJSON, &c.FieldTimestamps); err != nil {
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
}
}
if err := fn(c); err != nil {
return err
}
}
return rows.Err()
})
}
// ChangeRow is a row from the sync_changes table.
type ChangeRow struct {
AppID string
ID string
TableName string
RecordID string
@ -243,4 +300,5 @@ type ChangeRow struct {
FieldTimestamps map[string]string
ClientID string
CreatedAt time.Time
SchemaVersion int
}

View file

@ -28,6 +28,41 @@ func NewHandler(s *store.Store, v *auth.Validator, h *ws.Hub) *Handler {
// maxBodySize is the maximum allowed request body (10 MB).
const maxBodySize = 10 * 1024 * 1024
// changeFromRow projects a stored sync_changes row onto the wire Change shape.
// Carries eventId + schemaVersion through so clients can dedup on replay and
// route through the migration chain.
func changeFromRow(row store.ChangeRow) Change {
sv := row.SchemaVersion
if sv <= 0 {
sv = 1
}
c := Change{
EventID: row.ID,
SchemaVersion: sv,
Table: row.TableName,
ID: row.RecordID,
Op: row.Op,
}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
value, ok := row.Data[field]
if !ok {
continue
}
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
return c
}
// validOps are the allowed sync operation types.
var validOps = map[string]bool{"insert": true, "update": true, "delete": true}
@ -63,6 +98,18 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
return
}
// Normalize + validate protocol version. Pre-M2 clients omit the field
// (treated as v1); a client newer than this build is refused so we don't
// silently store events we can't fully interpret.
schemaVersion := changeset.SchemaVersion
if schemaVersion <= 0 {
schemaVersion = 1
}
if schemaVersion > MaxSupportedSchemaVersion {
http.Error(w, fmt.Sprintf("unsupported schemaVersion %d (max %d)", schemaVersion, MaxSupportedSchemaVersion), http.StatusBadRequest)
return
}
// Validate changes
for i, change := range changeset.Changes {
if !validOps[change.Op] {
@ -107,7 +154,14 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
}
}
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps)
// Per-change schemaVersion falls back to the changeset-level value
// so a well-formed pre-M2 change nested in an M2 changeset still
// lands on the right version.
rowSchemaVersion := change.SchemaVersion
if rowSchemaVersion <= 0 {
rowSchemaVersion = schemaVersion
}
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion)
if err != nil {
slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID)
http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError)
@ -126,34 +180,7 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
// Convert store rows to sync changes
responseChanges := make([]Change, 0, len(serverChanges))
for _, row := range serverChanges {
c := Change{
Table: row.TableName,
ID: row.RecordID,
Op: row.Op,
}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
value, ok := row.Data[field]
if !ok {
continue
}
c.Fields[field] = &FieldChange{
Value: value,
UpdatedAt: ts,
}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
responseChanges = append(responseChanges, c)
responseChanges = append(responseChanges, changeFromRow(row))
}
now := time.Now().UTC().Format(time.RFC3339Nano)
@ -223,34 +250,7 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) {
responseChanges := make([]Change, 0, len(serverChanges))
for _, row := range serverChanges {
c := Change{
Table: row.TableName,
ID: row.RecordID,
Op: row.Op,
}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
value, ok := row.Data[field]
if !ok {
continue
}
c.Fields[field] = &FieldChange{
Value: value,
UpdatedAt: ts,
}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
responseChanges = append(responseChanges, c)
responseChanges = append(responseChanges, changeFromRow(row))
}
// When paginating, use last row's timestamp as cursor; otherwise now()

View file

@ -2,14 +2,33 @@ package sync
import "time"
// CurrentSchemaVersion is the protocol version that this build emits for every
// new change. Bump only with a matching migration registered on both the Go
// server and the TS client so older events can be replayed forward during
// import and live sync.
const CurrentSchemaVersion = 1
// MaxSupportedSchemaVersion is the highest schemaVersion the server will accept
// from a client today. A client running ahead of the server is refused with
// 400; the reverse case (server ahead) replays old events through the migration
// chain on the receiving side.
const MaxSupportedSchemaVersion = 1
// Change represents a single field-level change to a record.
//
// EventID and SchemaVersion are populated on server->client payloads so
// clients can dedup on replay (import flow) and route events through the
// migration chain. Client->server pushes leave EventID empty — the server
// assigns a UUID on insert.
type Change struct {
Table string `json:"table"`
ID string `json:"id"`
Op string `json:"op"` // "insert", "update", "delete"
Fields map[string]*FieldChange `json:"fields,omitempty"`
Data map[string]any `json:"data,omitempty"`
DeletedAt *string `json:"deletedAt,omitempty"`
EventID string `json:"eventId,omitempty"`
SchemaVersion int `json:"schemaVersion,omitempty"`
Table string `json:"table"`
ID string `json:"id"`
Op string `json:"op"` // "insert", "update", "delete"
Fields map[string]*FieldChange `json:"fields,omitempty"`
Data map[string]any `json:"data,omitempty"`
DeletedAt *string `json:"deletedAt,omitempty"`
}
// FieldChange holds a value and the timestamp when it was last changed.
@ -19,11 +38,16 @@ type FieldChange struct {
}
// Changeset is a batch of changes sent by a client.
//
// SchemaVersion is the protocol version the client is emitting. Missing/zero
// is treated as 1 for compatibility with pre-M2 clients; anything above
// MaxSupportedSchemaVersion is refused.
type Changeset struct {
ClientID string `json:"clientId"`
AppID string `json:"appId"`
Since string `json:"since"` // ISO timestamp
Changes []Change `json:"changes"`
ClientID string `json:"clientId"`
AppID string `json:"appId"`
Since string `json:"since"` // ISO timestamp
Changes []Change `json:"changes"`
SchemaVersion int `json:"schemaVersion,omitempty"`
}
// SyncResponse is returned after processing a changeset.