diff --git a/services/mana-sync/internal/store/postgres.go b/services/mana-sync/internal/store/postgres.go index 9b38a63c6..f925eaef4 100644 --- a/services/mana-sync/internal/store/postgres.go +++ b/services/mana-sync/internal/store/postgres.go @@ -57,13 +57,21 @@ func (s *Store) Migrate(ctx context.Context) error { -- Default 1 covers rows written before the column existed so the -- backup/restore pipeline can always feed them through a migration -- chain keyed on this value. - schema_version INT NOT NULL DEFAULT 1 + schema_version INT NOT NULL DEFAULT 1, + -- AI Workbench: opaque actor JSON (user / ai / system). Null for + -- pre-actor clients; the webapp stamps every change with it from + -- the Dexie hook onward. Server-side we just persist and re-emit. + actor JSONB ); -- Idempotent add for databases created before M2 shipped. ALTER TABLE sync_changes ADD COLUMN IF NOT EXISTS schema_version INT NOT NULL DEFAULT 1; + -- Idempotent add for databases created before the AI Workbench. + ALTER TABLE sync_changes + ADD COLUMN IF NOT EXISTS actor JSONB; + CREATE INDEX IF NOT EXISTS idx_sync_changes_user_app ON sync_changes (user_id, app_id, created_at); @@ -121,7 +129,11 @@ func (s *Store) withUser(ctx context.Context, userID string, fn func(pgx.Tx) err // RecordChange stores a client change in the database. The insert is performed // inside an RLS-scoped transaction so the user_id column is double-checked // against the policy on the way in — a mismatched user_id would fail WITH CHECK. -func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int) error { +// +// `actor` is the opaque JSON blob the webapp stamps on every change (see +// `data/events/actor.ts`). Pass nil for pre-actor callers; the column is +// nullable and cross-device consumers treat a missing actor as `user`. +func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int, actor json.RawMessage) error { if schemaVersion <= 0 { schemaVersion = 1 } @@ -136,12 +148,20 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us return fmt.Errorf("marshal field_timestamps: %w", err) } + // pgx serializes a nil []byte as NULL for JSONB columns, which is what + // we want for pre-actor clients. Non-empty raw JSON is passed through + // unchanged — we don't validate the shape, that's the webapp's contract. + var actorJSON []byte + if len(actor) > 0 { + actorJSON = []byte(actor) + } + return s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ` - _, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion) + _, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion, actorJSON) return err }) } @@ -158,7 +178,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s var changes []ChangeRow err = s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version + SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor FROM sync_changes WHERE user_id = $1 AND app_id = $2 AND table_name = $3 AND created_at > $4 AND client_id != $5 @@ -173,9 +193,9 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s for rows.Next() { var c ChangeRow - var dataJSON, ftJSON []byte + var dataJSON, ftJSON, actorJSON []byte - if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil { + if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil { return err } @@ -189,6 +209,9 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err) } } + if len(actorJSON) > 0 { + c.Actor = json.RawMessage(actorJSON) + } changes = append(changes, c) } @@ -207,7 +230,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex var changes []ChangeRow err = s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version + SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor FROM sync_changes WHERE user_id = $1 AND app_id = $2 AND created_at > $3 AND client_id != $4 @@ -222,9 +245,9 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex for rows.Next() { var c ChangeRow - var dataJSON, ftJSON []byte + var dataJSON, ftJSON, actorJSON []byte - if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil { + if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil { return err } @@ -238,6 +261,9 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err) } } + if len(actorJSON) > 0 { + c.Actor = json.RawMessage(actorJSON) + } changes = append(changes, c) } @@ -254,7 +280,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func(ChangeRow) error) error { return s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version + SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor FROM sync_changes WHERE user_id = $1 ORDER BY created_at ASC, id ASC @@ -267,8 +293,8 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func for rows.Next() { var c ChangeRow - var dataJSON, ftJSON []byte - if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil { + var dataJSON, ftJSON, actorJSON []byte + if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil { return fmt.Errorf("scan: %w", err) } if dataJSON != nil { @@ -281,6 +307,9 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err) } } + if len(actorJSON) > 0 { + c.Actor = json.RawMessage(actorJSON) + } if err := fn(c); err != nil { return err } @@ -291,7 +320,7 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func // ChangeRow is a row from the sync_changes table. type ChangeRow struct { - AppID string + AppID string ID string TableName string RecordID string @@ -301,4 +330,7 @@ type ChangeRow struct { ClientID string CreatedAt time.Time SchemaVersion int + // Actor is nil for rows written by pre-actor clients. Consumers on + // other devices render a missing actor as "user". + Actor json.RawMessage } diff --git a/services/mana-sync/internal/sync/handler.go b/services/mana-sync/internal/sync/handler.go index 14d295344..b6d4f558a 100644 --- a/services/mana-sync/internal/sync/handler.go +++ b/services/mana-sync/internal/sync/handler.go @@ -42,6 +42,7 @@ func changeFromRow(row store.ChangeRow) Change { Table: row.TableName, ID: row.RecordID, Op: row.Op, + Actor: row.Actor, } switch row.Op { case "insert": @@ -161,7 +162,7 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) { if rowSchemaVersion <= 0 { rowSchemaVersion = schemaVersion } - err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion) + err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion, change.Actor) if err != nil { slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID) http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError) @@ -404,7 +405,7 @@ func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) { func (h *Handler) convertChanges(rows []store.ChangeRow) []Change { changes := make([]Change, 0, len(rows)) for _, row := range rows { - c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op} + c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op, Actor: row.Actor} switch row.Op { case "insert": c.Data = row.Data diff --git a/services/mana-sync/internal/sync/handler_test.go b/services/mana-sync/internal/sync/handler_test.go index 864006035..6e0694ef7 100644 --- a/services/mana-sync/internal/sync/handler_test.go +++ b/services/mana-sync/internal/sync/handler_test.go @@ -298,3 +298,64 @@ func TestFieldChangeRoundTrip(t *testing.T) { t.Errorf("completed value = %v, want true", completedField.Value) } } + +// TestActorPassthrough verifies that an AI-attributed change round-trips +// through JSON encoding/decoding with the actor payload intact as opaque +// bytes — we don't parse the actor shape server-side, just store and re-emit. +func TestActorPassthrough(t *testing.T) { + aiActor := json.RawMessage(`{"kind":"ai","missionId":"m-1","iterationId":"it-1","rationale":"weekly goals review"}`) + change := Change{ + Table: "todos", + ID: "todo-1", + Op: "insert", + Data: map[string]any{"title": "Staged by AI"}, + Actor: aiActor, + } + + encoded, err := json.Marshal(change) + if err != nil { + t.Fatal(err) + } + + var decoded Change + if err := json.Unmarshal(encoded, &decoded); err != nil { + t.Fatal(err) + } + + if len(decoded.Actor) == 0 { + t.Fatal("actor was dropped during round-trip") + } + + // Shape-check that the opaque blob still holds the AI payload + var shape struct { + Kind string `json:"kind"` + MissionID string `json:"missionId"` + IterationID string `json:"iterationId"` + } + if err := json.Unmarshal(decoded.Actor, &shape); err != nil { + t.Fatalf("actor not valid JSON after round-trip: %v", err) + } + if shape.Kind != "ai" || shape.MissionID != "m-1" || shape.IterationID != "it-1" { + t.Errorf("actor shape lost: %+v", shape) + } +} + +// TestActorOmittedWhenAbsent verifies that pre-actor clients (no actor +// field) don't emit a null or empty "actor" key on the wire — the +// omitempty tag should suppress it entirely. +func TestActorOmittedWhenAbsent(t *testing.T) { + change := Change{ + Table: "todos", + ID: "todo-1", + Op: "insert", + Data: map[string]any{"title": "Legacy client write"}, + } + + encoded, err := json.Marshal(change) + if err != nil { + t.Fatal(err) + } + if bytes.Contains(encoded, []byte(`"actor"`)) { + t.Errorf("absent actor was serialized into payload: %s", encoded) + } +} diff --git a/services/mana-sync/internal/sync/types.go b/services/mana-sync/internal/sync/types.go index f30d66749..99030da3b 100644 --- a/services/mana-sync/internal/sync/types.go +++ b/services/mana-sync/internal/sync/types.go @@ -1,6 +1,9 @@ package sync -import "time" +import ( + "encoding/json" + "time" +) // CurrentSchemaVersion is the protocol version that this build emits for every // new change. Bump only with a matching migration registered on both the Go @@ -29,6 +32,14 @@ type Change struct { Fields map[string]*FieldChange `json:"fields,omitempty"` Data map[string]any `json:"data,omitempty"` DeletedAt *string `json:"deletedAt,omitempty"` + // Actor is the opaque JSON object the webapp stamps on every pending + // change (see `data/events/actor.ts`): one of `{ kind: 'user' }`, + // `{ kind: 'ai', missionId, iterationId, rationale }`, or + // `{ kind: 'system', source }`. Stored as-is server-side — the server + // doesn't parse the shape, it just persists + re-emits to other + // clients so cross-device attribution works. Pre-actor clients omit + // the field; the column is nullable. + Actor json.RawMessage `json:"actor,omitempty"` } // FieldChange holds a value and the timestamp when it was last changed.