mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 19:41:09 +02:00
feat(sync): schemaVersion + eventId on wire (M2 protocol hardening)
- sync_changes gains schema_version column (default 1, idempotent ADD) - Change/Changeset carry schemaVersion; server refuses > MaxSupported - server->client changes now carry eventId + schemaVersion so the restore path can dedup via eventId and route through a migration chain keyed on schemaVersion - backup JSONL gains schemaVersion per line Pre-M2 clients (omit the field) are treated as v1 for compatibility. This is the stability contract we commit to before launch: once v1 events are in the wild, all future builds must replay them forward. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e4f0a410d1
commit
79996f946a
5 changed files with 301 additions and 76 deletions
|
|
@ -37,6 +37,17 @@ export interface FieldChange {
|
|||
updatedAt: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wire protocol version this build emits on every push. Bumped together with
|
||||
* a matching migration on both client and mana-sync so older events can be
|
||||
* replayed forward during import and live pulls.
|
||||
*
|
||||
* Pre-M2 events land as v1 (server default). Anything above this on the wire
|
||||
* from the server is an older client talking to a newer one — tolerated and
|
||||
* routed through the migration chain on apply.
|
||||
*/
|
||||
export const CURRENT_SCHEMA_VERSION = 1;
|
||||
|
||||
/**
|
||||
* One row of a changeset on the wire. Pending changes (local) and server
|
||||
* changes (remote) share the same shape so the validator can be reused.
|
||||
|
|
@ -45,8 +56,14 @@ export interface FieldChange {
|
|||
* - `op === 'update'` requires `fields` (record-level `data` is ignored).
|
||||
* - `op === 'insert'` requires `data`.
|
||||
* - A `deletedAt` flag implies a soft delete regardless of `op`.
|
||||
*
|
||||
* `eventId` and `schemaVersion` are only populated on server->client payloads.
|
||||
* Clients should use `eventId` to dedup on import replay; `schemaVersion`
|
||||
* decides which migration chain to run before apply.
|
||||
*/
|
||||
export interface SyncChange {
|
||||
eventId?: string;
|
||||
schemaVersion?: number;
|
||||
table: string;
|
||||
id: string;
|
||||
op: SyncOp;
|
||||
|
|
@ -113,6 +130,8 @@ export function isValidSyncChange(v: unknown): v is SyncChange {
|
|||
if (c.fields !== undefined && !isFieldsMap(c.fields)) return false;
|
||||
if (c.data !== undefined && (typeof c.data !== 'object' || c.data === null)) return false;
|
||||
if (c.deletedAt !== undefined && typeof c.deletedAt !== 'string') return false;
|
||||
if (c.eventId !== undefined && typeof c.eventId !== 'string') return false;
|
||||
if (c.schemaVersion !== undefined && typeof c.schemaVersion !== 'number') return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -1033,6 +1052,7 @@ export function createUnifiedSync(
|
|||
return {
|
||||
clientId: cid,
|
||||
since,
|
||||
schemaVersion: CURRENT_SCHEMA_VERSION,
|
||||
changes: pending.map((p) => ({
|
||||
table: toSyncName(p.collection),
|
||||
id: p.recordId,
|
||||
|
|
|
|||
123
services/mana-sync/internal/backup/handler.go
Normal file
123
services/mana-sync/internal/backup/handler.go
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
// Package backup implements the M1 thin-slice user-data backup endpoint.
|
||||
//
|
||||
// Streams every sync_changes row owned by the authenticated user as JSON Lines
|
||||
// (one Change per line). The body is the raw event stream from mana-sync —
|
||||
// identical in shape to what live sync emits, so a future restore endpoint can
|
||||
// replay it via the existing applyServerChanges() path on the client.
|
||||
//
|
||||
// Field-level ciphertext passes through untouched: the registry-encrypted
|
||||
// fields are already encrypted when they reach this table, so the file is
|
||||
// effectively encrypted at rest for sensitive fields.
|
||||
package backup
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/mana/mana-sync/internal/auth"
|
||||
"github.com/mana/mana-sync/internal/store"
|
||||
)
|
||||
|
||||
// Handler serves the /backup/export endpoint.
|
||||
type Handler struct {
|
||||
store *store.Store
|
||||
validator *auth.Validator
|
||||
}
|
||||
|
||||
// NewHandler constructs a backup handler.
|
||||
func NewHandler(s *store.Store, v *auth.Validator) *Handler {
|
||||
return &Handler{store: s, validator: v}
|
||||
}
|
||||
|
||||
// exportLine is the on-wire shape of one row in the JSONL body. Field names
|
||||
// mirror the sync-protocol Change shape as closely as possible; the restore
|
||||
// side maps these back into SyncChange objects.
|
||||
type exportLine struct {
|
||||
EventID string `json:"eventId"`
|
||||
SchemaVersion int `json:"schemaVersion"`
|
||||
AppID string `json:"appId"`
|
||||
Table string `json:"table"`
|
||||
RecordID string `json:"id"`
|
||||
Op string `json:"op"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
FieldTimestamps map[string]string `json:"fieldTimestamps,omitempty"`
|
||||
ClientID string `json:"clientId"`
|
||||
CreatedAt string `json:"createdAt"`
|
||||
}
|
||||
|
||||
// HandleExport streams the authenticated user's full sync_changes log as
|
||||
// JSONL. This is the M1 thin slice of the backup/restore feature — no zip,
|
||||
// no manifest, no signature yet. Those land in M3.
|
||||
//
|
||||
// GDPR-bypass for billing: the route is wired outside the billing middleware
|
||||
// in main.go, so users can always export their data even if their sync
|
||||
// subscription is inactive.
|
||||
func (h *Handler) HandleExport(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
userID, err := h.validator.UserIDFromRequest(r)
|
||||
if err != nil {
|
||||
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
filename := fmt.Sprintf("mana-backup-%s-%s.jsonl", userID, time.Now().UTC().Format("20060102-150405"))
|
||||
|
||||
w.Header().Set("Content-Type", "application/x-ndjson")
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, filename))
|
||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
// Disable proxy buffering so the response streams as rows arrive.
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
w.Header().Set("Cache-Control", "no-store")
|
||||
|
||||
flusher, _ := w.(http.Flusher)
|
||||
encoder := json.NewEncoder(w)
|
||||
|
||||
var count int
|
||||
streamErr := h.store.StreamAllUserChanges(r.Context(), userID, func(row store.ChangeRow) error {
|
||||
sv := row.SchemaVersion
|
||||
if sv <= 0 {
|
||||
sv = 1
|
||||
}
|
||||
line := exportLine{
|
||||
EventID: row.ID,
|
||||
SchemaVersion: sv,
|
||||
AppID: row.AppID,
|
||||
Table: row.TableName,
|
||||
RecordID: row.RecordID,
|
||||
Op: row.Op,
|
||||
Data: row.Data,
|
||||
FieldTimestamps: row.FieldTimestamps,
|
||||
ClientID: row.ClientID,
|
||||
CreatedAt: row.CreatedAt.UTC().Format(time.RFC3339Nano),
|
||||
}
|
||||
if err := encoder.Encode(line); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
// Flush every ~500 rows so big exports show progress over the wire.
|
||||
if flusher != nil && count%500 == 0 {
|
||||
flusher.Flush()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
if streamErr != nil {
|
||||
// Headers are already sent, so we cannot change the status code.
|
||||
// Log and let the client detect truncation via the row count it expected.
|
||||
// (M3 will add a manifest with eventCount + sha256 for integrity checking.)
|
||||
slog.Error("backup export stream failed", "user_id", userID, "written", count, "error", streamErr)
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("backup export ok", "user_id", userID, "rows", count)
|
||||
}
|
||||
|
|
@ -52,9 +52,18 @@ func (s *Store) Migrate(ctx context.Context) error {
|
|||
data JSONB,
|
||||
field_timestamps JSONB DEFAULT '{}',
|
||||
client_id TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
-- M2: schema_version lets us evolve the Change wire shape over time.
|
||||
-- Default 1 covers rows written before the column existed so the
|
||||
-- backup/restore pipeline can always feed them through a migration
|
||||
-- chain keyed on this value.
|
||||
schema_version INT NOT NULL DEFAULT 1
|
||||
);
|
||||
|
||||
-- Idempotent add for databases created before M2 shipped.
|
||||
ALTER TABLE sync_changes
|
||||
ADD COLUMN IF NOT EXISTS schema_version INT NOT NULL DEFAULT 1;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_sync_changes_user_app
|
||||
ON sync_changes (user_id, app_id, created_at);
|
||||
|
||||
|
|
@ -112,7 +121,11 @@ func (s *Store) withUser(ctx context.Context, userID string, fn func(pgx.Tx) err
|
|||
// RecordChange stores a client change in the database. The insert is performed
|
||||
// inside an RLS-scoped transaction so the user_id column is double-checked
|
||||
// against the policy on the way in — a mismatched user_id would fail WITH CHECK.
|
||||
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string) error {
|
||||
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int) error {
|
||||
if schemaVersion <= 0 {
|
||||
schemaVersion = 1
|
||||
}
|
||||
|
||||
dataJSON, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal data: %w", err)
|
||||
|
|
@ -125,10 +138,10 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us
|
|||
|
||||
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
`
|
||||
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID)
|
||||
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
|
@ -145,7 +158,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
|
|||
var changes []ChangeRow
|
||||
err = s.withUser(ctx, userID, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at
|
||||
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
|
||||
FROM sync_changes
|
||||
WHERE user_id = $1 AND app_id = $2 AND table_name = $3
|
||||
AND created_at > $4 AND client_id != $5
|
||||
|
|
@ -162,7 +175,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
|
|||
var c ChangeRow
|
||||
var dataJSON, ftJSON []byte
|
||||
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt); err != nil {
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -194,7 +207,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
|
|||
var changes []ChangeRow
|
||||
err = s.withUser(ctx, userID, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at
|
||||
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
|
||||
FROM sync_changes
|
||||
WHERE user_id = $1 AND app_id = $2
|
||||
AND created_at > $3 AND client_id != $4
|
||||
|
|
@ -211,7 +224,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
|
|||
var c ChangeRow
|
||||
var dataJSON, ftJSON []byte
|
||||
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt); err != nil {
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -233,8 +246,52 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
|
|||
return changes, err
|
||||
}
|
||||
|
||||
// StreamAllUserChanges iterates every sync_changes row owned by userID, across
|
||||
// all apps/tables, in chronological order, invoking fn for each row. Designed
|
||||
// for the backup/export endpoint — unbounded result set, so rows are streamed
|
||||
// via a cursor-free single query (pgx streams rows as they arrive from the
|
||||
// server). If fn returns an error, iteration stops and the error is returned.
|
||||
func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func(ChangeRow) error) error {
|
||||
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
|
||||
FROM sync_changes
|
||||
WHERE user_id = $1
|
||||
ORDER BY created_at ASC, id ASC
|
||||
`
|
||||
rows, err := tx.Query(ctx, query, userID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("query: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var c ChangeRow
|
||||
var dataJSON, ftJSON []byte
|
||||
if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
|
||||
return fmt.Errorf("scan: %w", err)
|
||||
}
|
||||
if dataJSON != nil {
|
||||
if err := json.Unmarshal(dataJSON, &c.Data); err != nil {
|
||||
return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if ftJSON != nil {
|
||||
if err := json.Unmarshal(ftJSON, &c.FieldTimestamps); err != nil {
|
||||
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if err := fn(c); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return rows.Err()
|
||||
})
|
||||
}
|
||||
|
||||
// ChangeRow is a row from the sync_changes table.
|
||||
type ChangeRow struct {
|
||||
AppID string
|
||||
ID string
|
||||
TableName string
|
||||
RecordID string
|
||||
|
|
@ -243,4 +300,5 @@ type ChangeRow struct {
|
|||
FieldTimestamps map[string]string
|
||||
ClientID string
|
||||
CreatedAt time.Time
|
||||
SchemaVersion int
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,6 +28,41 @@ func NewHandler(s *store.Store, v *auth.Validator, h *ws.Hub) *Handler {
|
|||
// maxBodySize is the maximum allowed request body (10 MB).
|
||||
const maxBodySize = 10 * 1024 * 1024
|
||||
|
||||
// changeFromRow projects a stored sync_changes row onto the wire Change shape.
|
||||
// Carries eventId + schemaVersion through so clients can dedup on replay and
|
||||
// route through the migration chain.
|
||||
func changeFromRow(row store.ChangeRow) Change {
|
||||
sv := row.SchemaVersion
|
||||
if sv <= 0 {
|
||||
sv = 1
|
||||
}
|
||||
c := Change{
|
||||
EventID: row.ID,
|
||||
SchemaVersion: sv,
|
||||
Table: row.TableName,
|
||||
ID: row.RecordID,
|
||||
Op: row.Op,
|
||||
}
|
||||
switch row.Op {
|
||||
case "insert":
|
||||
c.Data = row.Data
|
||||
case "update":
|
||||
c.Fields = make(map[string]*FieldChange)
|
||||
for field, ts := range row.FieldTimestamps {
|
||||
value, ok := row.Data[field]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
|
||||
}
|
||||
case "delete":
|
||||
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
|
||||
c.DeletedAt = &deletedAt
|
||||
}
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// validOps are the allowed sync operation types.
|
||||
var validOps = map[string]bool{"insert": true, "update": true, "delete": true}
|
||||
|
||||
|
|
@ -63,6 +98,18 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
// Normalize + validate protocol version. Pre-M2 clients omit the field
|
||||
// (treated as v1); a client newer than this build is refused so we don't
|
||||
// silently store events we can't fully interpret.
|
||||
schemaVersion := changeset.SchemaVersion
|
||||
if schemaVersion <= 0 {
|
||||
schemaVersion = 1
|
||||
}
|
||||
if schemaVersion > MaxSupportedSchemaVersion {
|
||||
http.Error(w, fmt.Sprintf("unsupported schemaVersion %d (max %d)", schemaVersion, MaxSupportedSchemaVersion), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Validate changes
|
||||
for i, change := range changeset.Changes {
|
||||
if !validOps[change.Op] {
|
||||
|
|
@ -107,7 +154,14 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps)
|
||||
// Per-change schemaVersion falls back to the changeset-level value
|
||||
// so a well-formed pre-M2 change nested in an M2 changeset still
|
||||
// lands on the right version.
|
||||
rowSchemaVersion := change.SchemaVersion
|
||||
if rowSchemaVersion <= 0 {
|
||||
rowSchemaVersion = schemaVersion
|
||||
}
|
||||
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion)
|
||||
if err != nil {
|
||||
slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID)
|
||||
http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError)
|
||||
|
|
@ -126,34 +180,7 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
|
|||
// Convert store rows to sync changes
|
||||
responseChanges := make([]Change, 0, len(serverChanges))
|
||||
for _, row := range serverChanges {
|
||||
c := Change{
|
||||
Table: row.TableName,
|
||||
ID: row.RecordID,
|
||||
Op: row.Op,
|
||||
}
|
||||
|
||||
switch row.Op {
|
||||
case "insert":
|
||||
c.Data = row.Data
|
||||
case "update":
|
||||
c.Fields = make(map[string]*FieldChange)
|
||||
for field, ts := range row.FieldTimestamps {
|
||||
value, ok := row.Data[field]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
c.Fields[field] = &FieldChange{
|
||||
Value: value,
|
||||
UpdatedAt: ts,
|
||||
}
|
||||
}
|
||||
case "delete":
|
||||
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
|
||||
c.DeletedAt = &deletedAt
|
||||
}
|
||||
}
|
||||
|
||||
responseChanges = append(responseChanges, c)
|
||||
responseChanges = append(responseChanges, changeFromRow(row))
|
||||
}
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339Nano)
|
||||
|
|
@ -223,34 +250,7 @@ func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
responseChanges := make([]Change, 0, len(serverChanges))
|
||||
for _, row := range serverChanges {
|
||||
c := Change{
|
||||
Table: row.TableName,
|
||||
ID: row.RecordID,
|
||||
Op: row.Op,
|
||||
}
|
||||
|
||||
switch row.Op {
|
||||
case "insert":
|
||||
c.Data = row.Data
|
||||
case "update":
|
||||
c.Fields = make(map[string]*FieldChange)
|
||||
for field, ts := range row.FieldTimestamps {
|
||||
value, ok := row.Data[field]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
c.Fields[field] = &FieldChange{
|
||||
Value: value,
|
||||
UpdatedAt: ts,
|
||||
}
|
||||
}
|
||||
case "delete":
|
||||
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
|
||||
c.DeletedAt = &deletedAt
|
||||
}
|
||||
}
|
||||
|
||||
responseChanges = append(responseChanges, c)
|
||||
responseChanges = append(responseChanges, changeFromRow(row))
|
||||
}
|
||||
|
||||
// When paginating, use last row's timestamp as cursor; otherwise now()
|
||||
|
|
|
|||
|
|
@ -2,14 +2,33 @@ package sync
|
|||
|
||||
import "time"
|
||||
|
||||
// CurrentSchemaVersion is the protocol version that this build emits for every
|
||||
// new change. Bump only with a matching migration registered on both the Go
|
||||
// server and the TS client so older events can be replayed forward during
|
||||
// import and live sync.
|
||||
const CurrentSchemaVersion = 1
|
||||
|
||||
// MaxSupportedSchemaVersion is the highest schemaVersion the server will accept
|
||||
// from a client today. A client running ahead of the server is refused with
|
||||
// 400; the reverse case (server ahead) replays old events through the migration
|
||||
// chain on the receiving side.
|
||||
const MaxSupportedSchemaVersion = 1
|
||||
|
||||
// Change represents a single field-level change to a record.
|
||||
//
|
||||
// EventID and SchemaVersion are populated on server->client payloads so
|
||||
// clients can dedup on replay (import flow) and route events through the
|
||||
// migration chain. Client->server pushes leave EventID empty — the server
|
||||
// assigns a UUID on insert.
|
||||
type Change struct {
|
||||
Table string `json:"table"`
|
||||
ID string `json:"id"`
|
||||
Op string `json:"op"` // "insert", "update", "delete"
|
||||
Fields map[string]*FieldChange `json:"fields,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
DeletedAt *string `json:"deletedAt,omitempty"`
|
||||
EventID string `json:"eventId,omitempty"`
|
||||
SchemaVersion int `json:"schemaVersion,omitempty"`
|
||||
Table string `json:"table"`
|
||||
ID string `json:"id"`
|
||||
Op string `json:"op"` // "insert", "update", "delete"
|
||||
Fields map[string]*FieldChange `json:"fields,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
DeletedAt *string `json:"deletedAt,omitempty"`
|
||||
}
|
||||
|
||||
// FieldChange holds a value and the timestamp when it was last changed.
|
||||
|
|
@ -19,11 +38,16 @@ type FieldChange struct {
|
|||
}
|
||||
|
||||
// Changeset is a batch of changes sent by a client.
|
||||
//
|
||||
// SchemaVersion is the protocol version the client is emitting. Missing/zero
|
||||
// is treated as 1 for compatibility with pre-M2 clients; anything above
|
||||
// MaxSupportedSchemaVersion is refused.
|
||||
type Changeset struct {
|
||||
ClientID string `json:"clientId"`
|
||||
AppID string `json:"appId"`
|
||||
Since string `json:"since"` // ISO timestamp
|
||||
Changes []Change `json:"changes"`
|
||||
ClientID string `json:"clientId"`
|
||||
AppID string `json:"appId"`
|
||||
Since string `json:"since"` // ISO timestamp
|
||||
Changes []Change `json:"changes"`
|
||||
SchemaVersion int `json:"schemaVersion,omitempty"`
|
||||
}
|
||||
|
||||
// SyncResponse is returned after processing a changeset.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue