feat(mana-sync): persist actor JSON on every sync_changes row

Adds an opaque JSON `actor` column alongside the existing field_timestamps
so cross-device consumers can distinguish user / ai / system writes. The
server never parses the shape — it just stores and re-emits the blob the
webapp stamped in its Dexie hook.

- `sync/types.go` — Change.Actor as json.RawMessage with omitempty; nil
  for pre-actor clients so wire remains backward-compatible
- `store/postgres.go`
  - Migrate: CREATE TABLE includes `actor JSONB` for fresh DBs;
    ALTER TABLE ADD COLUMN IF NOT EXISTS actor JSONB for existing ones
    (idempotent, safe to re-run)
  - RecordChange signature takes json.RawMessage; pgx writes nil as NULL
  - All three SELECT paths (GetChangesSince, GetAllChangesSince,
    StreamAllUserChanges) return actor, Scan into ChangeRow.Actor
  - ChangeRow.Actor added with doc noting "missing = user" consumer rule
- `sync/handler.go` — Change.Actor threaded through HandleSync →
  RecordChange, and populated on both changeFromRow (pull/POST replies)
  and convertChanges (SSE stream)
- Tests: roundtrip of an AI-actor payload + omitempty verification for
  pre-actor clients. All existing tests still pass.

Webapp types still need `actor?: Actor` on SyncChange + PendingChange to
match the wire, and applyServerChanges needs to stamp __lastActor /
__fieldActors from incoming changes for Workbench attribution on other
devices — both tracked as separate follow-ups.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-14 23:31:01 +02:00
parent 6425135612
commit bfa1c0260f
4 changed files with 123 additions and 18 deletions

View file

@ -57,13 +57,21 @@ func (s *Store) Migrate(ctx context.Context) error {
-- Default 1 covers rows written before the column existed so the
-- backup/restore pipeline can always feed them through a migration
-- chain keyed on this value.
schema_version INT NOT NULL DEFAULT 1
schema_version INT NOT NULL DEFAULT 1,
-- AI Workbench: opaque actor JSON (user / ai / system). Null for
-- pre-actor clients; the webapp stamps every change with it from
-- the Dexie hook onward. Server-side we just persist and re-emit.
actor JSONB
);
-- Idempotent add for databases created before M2 shipped.
ALTER TABLE sync_changes
ADD COLUMN IF NOT EXISTS schema_version INT NOT NULL DEFAULT 1;
-- Idempotent add for databases created before the AI Workbench.
ALTER TABLE sync_changes
ADD COLUMN IF NOT EXISTS actor JSONB;
CREATE INDEX IF NOT EXISTS idx_sync_changes_user_app
ON sync_changes (user_id, app_id, created_at);
@ -121,7 +129,11 @@ func (s *Store) withUser(ctx context.Context, userID string, fn func(pgx.Tx) err
// RecordChange stores a client change in the database. The insert is performed
// inside an RLS-scoped transaction so the user_id column is double-checked
// against the policy on the way in — a mismatched user_id would fail WITH CHECK.
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int) error {
//
// `actor` is the opaque JSON blob the webapp stamps on every change (see
// `data/events/actor.ts`). Pass nil for pre-actor callers; the column is
// nullable and cross-device consumers treat a missing actor as `user`.
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int, actor json.RawMessage) error {
if schemaVersion <= 0 {
schemaVersion = 1
}
@ -136,12 +148,20 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us
return fmt.Errorf("marshal field_timestamps: %w", err)
}
// pgx serializes a nil []byte as NULL for JSONB columns, which is what
// we want for pre-actor clients. Non-empty raw JSON is passed through
// unchanged — we don't validate the shape, that's the webapp's contract.
var actorJSON []byte
if len(actor) > 0 {
actorJSON = []byte(actor)
}
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion)
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion, actorJSON)
return err
})
}
@ -158,7 +178,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
var changes []ChangeRow
err = s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor
FROM sync_changes
WHERE user_id = $1 AND app_id = $2 AND table_name = $3
AND created_at > $4 AND client_id != $5
@ -173,9 +193,9 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
for rows.Next() {
var c ChangeRow
var dataJSON, ftJSON []byte
var dataJSON, ftJSON, actorJSON []byte
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil {
return err
}
@ -189,6 +209,9 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
}
}
if len(actorJSON) > 0 {
c.Actor = json.RawMessage(actorJSON)
}
changes = append(changes, c)
}
@ -207,7 +230,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
var changes []ChangeRow
err = s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor
FROM sync_changes
WHERE user_id = $1 AND app_id = $2
AND created_at > $3 AND client_id != $4
@ -222,9 +245,9 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
for rows.Next() {
var c ChangeRow
var dataJSON, ftJSON []byte
var dataJSON, ftJSON, actorJSON []byte
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil {
return err
}
@ -238,6 +261,9 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
}
}
if len(actorJSON) > 0 {
c.Actor = json.RawMessage(actorJSON)
}
changes = append(changes, c)
}
@ -254,7 +280,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func(ChangeRow) error) error {
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
query := `
SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version
SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor
FROM sync_changes
WHERE user_id = $1
ORDER BY created_at ASC, id ASC
@ -267,8 +293,8 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func
for rows.Next() {
var c ChangeRow
var dataJSON, ftJSON []byte
if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion); err != nil {
var dataJSON, ftJSON, actorJSON []byte
if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil {
return fmt.Errorf("scan: %w", err)
}
if dataJSON != nil {
@ -281,6 +307,9 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
}
}
if len(actorJSON) > 0 {
c.Actor = json.RawMessage(actorJSON)
}
if err := fn(c); err != nil {
return err
}
@ -291,7 +320,7 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func
// ChangeRow is a row from the sync_changes table.
type ChangeRow struct {
AppID string
AppID string
ID string
TableName string
RecordID string
@ -301,4 +330,7 @@ type ChangeRow struct {
ClientID string
CreatedAt time.Time
SchemaVersion int
// Actor is nil for rows written by pre-actor clients. Consumers on
// other devices render a missing actor as "user".
Actor json.RawMessage
}

View file

@ -42,6 +42,7 @@ func changeFromRow(row store.ChangeRow) Change {
Table: row.TableName,
ID: row.RecordID,
Op: row.Op,
Actor: row.Actor,
}
switch row.Op {
case "insert":
@ -161,7 +162,7 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
if rowSchemaVersion <= 0 {
rowSchemaVersion = schemaVersion
}
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion)
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion, change.Actor)
if err != nil {
slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID)
http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError)
@ -404,7 +405,7 @@ func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) {
func (h *Handler) convertChanges(rows []store.ChangeRow) []Change {
changes := make([]Change, 0, len(rows))
for _, row := range rows {
c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op}
c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op, Actor: row.Actor}
switch row.Op {
case "insert":
c.Data = row.Data

View file

@ -298,3 +298,64 @@ func TestFieldChangeRoundTrip(t *testing.T) {
t.Errorf("completed value = %v, want true", completedField.Value)
}
}
// TestActorPassthrough verifies that an AI-attributed change round-trips
// through JSON encoding/decoding with the actor payload intact as opaque
// bytes — we don't parse the actor shape server-side, just store and re-emit.
func TestActorPassthrough(t *testing.T) {
aiActor := json.RawMessage(`{"kind":"ai","missionId":"m-1","iterationId":"it-1","rationale":"weekly goals review"}`)
change := Change{
Table: "todos",
ID: "todo-1",
Op: "insert",
Data: map[string]any{"title": "Staged by AI"},
Actor: aiActor,
}
encoded, err := json.Marshal(change)
if err != nil {
t.Fatal(err)
}
var decoded Change
if err := json.Unmarshal(encoded, &decoded); err != nil {
t.Fatal(err)
}
if len(decoded.Actor) == 0 {
t.Fatal("actor was dropped during round-trip")
}
// Shape-check that the opaque blob still holds the AI payload
var shape struct {
Kind string `json:"kind"`
MissionID string `json:"missionId"`
IterationID string `json:"iterationId"`
}
if err := json.Unmarshal(decoded.Actor, &shape); err != nil {
t.Fatalf("actor not valid JSON after round-trip: %v", err)
}
if shape.Kind != "ai" || shape.MissionID != "m-1" || shape.IterationID != "it-1" {
t.Errorf("actor shape lost: %+v", shape)
}
}
// TestActorOmittedWhenAbsent verifies that pre-actor clients (no actor
// field) don't emit a null or empty "actor" key on the wire — the
// omitempty tag should suppress it entirely.
func TestActorOmittedWhenAbsent(t *testing.T) {
change := Change{
Table: "todos",
ID: "todo-1",
Op: "insert",
Data: map[string]any{"title": "Legacy client write"},
}
encoded, err := json.Marshal(change)
if err != nil {
t.Fatal(err)
}
if bytes.Contains(encoded, []byte(`"actor"`)) {
t.Errorf("absent actor was serialized into payload: %s", encoded)
}
}

View file

@ -1,6 +1,9 @@
package sync
import "time"
import (
"encoding/json"
"time"
)
// CurrentSchemaVersion is the protocol version that this build emits for every
// new change. Bump only with a matching migration registered on both the Go
@ -29,6 +32,14 @@ type Change struct {
Fields map[string]*FieldChange `json:"fields,omitempty"`
Data map[string]any `json:"data,omitempty"`
DeletedAt *string `json:"deletedAt,omitempty"`
// Actor is the opaque JSON object the webapp stamps on every pending
// change (see `data/events/actor.ts`): one of `{ kind: 'user' }`,
// `{ kind: 'ai', missionId, iterationId, rationale }`, or
// `{ kind: 'system', source }`. Stored as-is server-side — the server
// doesn't parse the shape, it just persists + re-emits to other
// clients so cross-device attribution works. Pre-actor clients omit
// the field; the column is nullable.
Actor json.RawMessage `json:"actor,omitempty"`
}
// FieldChange holds a value and the timestamp when it was last changed.