diff --git a/apps/mana/apps/web/src/lib/data/database.ts b/apps/mana/apps/web/src/lib/data/database.ts index d5d86c9e2..7e0a4cd5d 100644 --- a/apps/mana/apps/web/src/lib/data/database.ts +++ b/apps/mana/apps/web/src/lib/data/database.ts @@ -882,6 +882,7 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) { data: dataForSync, actor, createdAt: now, + spaceId: typeof objRecord.spaceId === 'string' ? (objRecord.spaceId as string) : undefined, }); trackActivity(appId, tableName, obj.id, 'insert'); trackFirstContent(appId); @@ -937,6 +938,14 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) { } const op = (modifications as Record).deletedAt ? 'delete' : 'update'; + // spaceId is immutable and therefore not in `fields` for updates — + // but the server wants it as a first-class column on every row. + // Read it from the pre-update record so the pending-change row + // carries the right space for routing even when only a title changed. + const existingSpaceId = + typeof (obj as Record).spaceId === 'string' + ? ((obj as Record).spaceId as string) + : undefined; trackPendingChange(tableName, { appId, collection: tableName, @@ -946,6 +955,7 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) { actor, deletedAt: (modifications as Record).deletedAt as string | undefined, createdAt: now, + spaceId: existingSpaceId, }); trackActivity(appId, tableName, primKey as string, op); fireTrigger(appId, tableName, op, modifications as Record); diff --git a/apps/mana/apps/web/src/lib/data/sync.ts b/apps/mana/apps/web/src/lib/data/sync.ts index 01531e903..ebd7d80bb 100644 --- a/apps/mana/apps/web/src/lib/data/sync.ts +++ b/apps/mana/apps/web/src/lib/data/sync.ts @@ -100,6 +100,14 @@ interface PendingChange { deletedAt?: string; actor?: Actor; createdAt: string; + /** + * The Space (Better Auth organization id) the record belongs to. Stamped + * on the pending-change row at write time so the server gets it as a + * first-class column even for updates (where it isn't in `fields` + * because it's immutable). Empty string / undefined means "pre-v28 + * record" — the server tolerates NULL on the column. + */ + spaceId?: string; } interface SyncMeta { @@ -1120,6 +1128,7 @@ export function createUnifiedSync( data: p.data, deletedAt: p.deletedAt, actor: p.actor, + spaceId: p.spaceId, })), }; } diff --git a/services/mana-sync/internal/store/postgres.go b/services/mana-sync/internal/store/postgres.go index f925eaef4..06f82f503 100644 --- a/services/mana-sync/internal/store/postgres.go +++ b/services/mana-sync/internal/store/postgres.go @@ -72,6 +72,15 @@ func (s *Store) Migrate(ctx context.Context) error { ALTER TABLE sync_changes ADD COLUMN IF NOT EXISTS actor JSONB; + -- Idempotent add for databases created before the Spaces foundation. + -- Nullable so pre-v28 clients (which don't stamp a spaceId) can + -- keep pushing. The RLS policy is intentionally NOT space-aware + -- yet — user_id remains the primary guard. Multi-member scoping + -- for shared spaces will add a second policy in a follow-up. + -- See docs/plans/spaces-foundation.md. + ALTER TABLE sync_changes + ADD COLUMN IF NOT EXISTS space_id TEXT; + CREATE INDEX IF NOT EXISTS idx_sync_changes_user_app ON sync_changes (user_id, app_id, created_at); @@ -81,6 +90,13 @@ func (s *Store) Migrate(ctx context.Context) error { CREATE INDEX IF NOT EXISTS idx_sync_changes_since ON sync_changes (user_id, app_id, table_name, created_at); + -- Fast "all changes for a space since X" queries once shared spaces + -- go live. Safe to create with nullable space_id — Postgres partial + -- indexes skip NULLs unless asked otherwise. + CREATE INDEX IF NOT EXISTS idx_sync_changes_user_space_app_since + ON sync_changes (user_id, space_id, app_id, created_at) + WHERE space_id IS NOT NULL; + ALTER TABLE sync_changes ENABLE ROW LEVEL SECURITY; -- FORCE makes RLS apply even to the table owner so that the application -- role used by mana-sync cannot bypass policies, regardless of grants. @@ -130,10 +146,15 @@ func (s *Store) withUser(ctx context.Context, userID string, fn func(pgx.Tx) err // inside an RLS-scoped transaction so the user_id column is double-checked // against the policy on the way in — a mismatched user_id would fail WITH CHECK. // +// `spaceID` is the Better Auth organization id the record belongs to. +// Pass empty string for pre-v28 callers; the column is nullable so mixed +// populations of pre- and post-v28 clients are fine. When multi-member +// space RLS lands, empty space_id rows will need a one-off backfill. +// // `actor` is the opaque JSON blob the webapp stamps on every change (see // `data/events/actor.ts`). Pass nil for pre-actor callers; the column is // nullable and cross-device consumers treat a missing actor as `user`. -func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int, actor json.RawMessage) error { +func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, spaceID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int, actor json.RawMessage) error { if schemaVersion <= 0 { schemaVersion = 1 } @@ -156,12 +177,20 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us actorJSON = []byte(actor) } + // pgx interprets a Go empty string as empty, not NULL — use *string so + // an unset space_id lands as a real SQL NULL and the partial index + // skips the row. + var spaceIDParam *string + if spaceID != "" { + spaceIDParam = &spaceID + } + return s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - INSERT INTO sync_changes (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + INSERT INTO sync_changes (app_id, table_name, record_id, user_id, space_id, op, data, field_timestamps, client_id, schema_version, actor) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ` - _, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, op, dataJSON, ftJSON, clientID, schemaVersion, actorJSON) + _, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, spaceIDParam, op, dataJSON, ftJSON, clientID, schemaVersion, actorJSON) return err }) } @@ -178,7 +207,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s var changes []ChangeRow err = s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor + SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, space_id, actor FROM sync_changes WHERE user_id = $1 AND app_id = $2 AND table_name = $3 AND created_at > $4 AND client_id != $5 @@ -194,11 +223,15 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s for rows.Next() { var c ChangeRow var dataJSON, ftJSON, actorJSON []byte + var spaceID *string - if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil { + if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON); err != nil { return err } + if spaceID != nil { + c.SpaceID = *spaceID + } if dataJSON != nil { if err := json.Unmarshal(dataJSON, &c.Data); err != nil { return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err) @@ -230,7 +263,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex var changes []ChangeRow err = s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor + SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, space_id, actor FROM sync_changes WHERE user_id = $1 AND app_id = $2 AND created_at > $3 AND client_id != $4 @@ -246,11 +279,15 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex for rows.Next() { var c ChangeRow var dataJSON, ftJSON, actorJSON []byte + var spaceID *string - if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil { + if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON); err != nil { return err } + if spaceID != nil { + c.SpaceID = *spaceID + } if dataJSON != nil { if err := json.Unmarshal(dataJSON, &c.Data); err != nil { return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err) @@ -280,7 +317,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func(ChangeRow) error) error { return s.withUser(ctx, userID, func(tx pgx.Tx) error { query := ` - SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, actor + SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, space_id, actor FROM sync_changes WHERE user_id = $1 ORDER BY created_at ASC, id ASC @@ -294,9 +331,13 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func for rows.Next() { var c ChangeRow var dataJSON, ftJSON, actorJSON []byte - if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &actorJSON); err != nil { + var spaceID *string + if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON); err != nil { return fmt.Errorf("scan: %w", err) } + if spaceID != nil { + c.SpaceID = *spaceID + } if dataJSON != nil { if err := json.Unmarshal(dataJSON, &c.Data); err != nil { return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err) @@ -330,6 +371,10 @@ type ChangeRow struct { ClientID string CreatedAt time.Time SchemaVersion int + // SpaceID is empty for pre-v28 rows. Consumers use it to partition + // the reader cache per space; an empty string means "implicit personal" + // until the bootstrap reconciliation fills it in. + SpaceID string // Actor is nil for rows written by pre-actor clients. Consumers on // other devices render a missing actor as "user". Actor json.RawMessage diff --git a/services/mana-sync/internal/sync/handler.go b/services/mana-sync/internal/sync/handler.go index b6d4f558a..8d62b8e52 100644 --- a/services/mana-sync/internal/sync/handler.go +++ b/services/mana-sync/internal/sync/handler.go @@ -28,6 +28,38 @@ func NewHandler(s *store.Store, v *auth.Validator, h *ws.Hub) *Handler { // maxBodySize is the maximum allowed request body (10 MB). const maxBodySize = 10 * 1024 * 1024 +// extractSpaceID pulls the Space id out of an incoming change payload. The +// protocol preference is: +// +// 1. Top-level `spaceId` (v28+ clients stamp it explicitly — cheap to parse). +// 2. `data.spaceId` — present on inserts from pre-protocol clients that only +// send the full record object. +// 3. `fields.spaceId.value` — present on updates that happen to touch the +// scope field (rare; spaceId is marked immutable in the Dexie updating +// hook, so in practice this branch only covers edge cases). +// +// Returns the empty string when none of the above yield a usable value. An +// empty string lands as SQL NULL in the space_id column so partial indexes +// keep skipping legacy rows cleanly. +func extractSpaceID(change Change) string { + if change.SpaceID != "" { + return change.SpaceID + } + if change.Data != nil { + if v, ok := change.Data["spaceId"].(string); ok && v != "" { + return v + } + } + if change.Fields != nil { + if fc, ok := change.Fields["spaceId"]; ok && fc != nil { + if v, ok := fc.Value.(string); ok && v != "" { + return v + } + } + } + return "" +} + // changeFromRow projects a stored sync_changes row onto the wire Change shape. // Carries eventId + schemaVersion through so clients can dedup on replay and // route through the migration chain. @@ -42,6 +74,7 @@ func changeFromRow(row store.ChangeRow) Change { Table: row.TableName, ID: row.RecordID, Op: row.Op, + SpaceID: row.SpaceID, Actor: row.Actor, } switch row.Op { @@ -162,7 +195,13 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) { if rowSchemaVersion <= 0 { rowSchemaVersion = schemaVersion } - err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion, change.Actor) + // spaceId for this change: prefer the top-level field (post-v28 + // clients stamp it explicitly), fall back to data.spaceId for + // inserts and fields.spaceId.value for updates so pre-protocol + // clients still get indexed correctly. Empty string lands as SQL + // NULL via RecordChange. + spaceID := extractSpaceID(change) + err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, spaceID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion, change.Actor) if err != nil { slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID) http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError) diff --git a/services/mana-sync/internal/sync/spaces_test.go b/services/mana-sync/internal/sync/spaces_test.go new file mode 100644 index 000000000..e3140c168 --- /dev/null +++ b/services/mana-sync/internal/sync/spaces_test.go @@ -0,0 +1,84 @@ +package sync + +import ( + "testing" + + "github.com/mana/mana-sync/internal/store" +) + +func TestExtractSpaceID_TopLevelWins(t *testing.T) { + got := extractSpaceID(Change{ + SpaceID: "space-top", + Data: map[string]any{"spaceId": "space-data"}, + Fields: map[string]*FieldChange{"spaceId": {Value: "space-field"}}, + }) + if got != "space-top" { + t.Fatalf("want space-top, got %q", got) + } +} + +func TestExtractSpaceID_FallsBackToData(t *testing.T) { + got := extractSpaceID(Change{ + Data: map[string]any{"spaceId": "space-from-data"}, + }) + if got != "space-from-data" { + t.Fatalf("want space-from-data, got %q", got) + } +} + +func TestExtractSpaceID_FallsBackToFields(t *testing.T) { + got := extractSpaceID(Change{ + Fields: map[string]*FieldChange{ + "spaceId": {Value: "space-from-fields"}, + }, + }) + if got != "space-from-fields" { + t.Fatalf("want space-from-fields, got %q", got) + } +} + +func TestExtractSpaceID_EmptyWhenMissing(t *testing.T) { + cases := []struct { + name string + c Change + }{ + {"nothing", Change{}}, + {"empty data", Change{Data: map[string]any{}}}, + {"data non-string", Change{Data: map[string]any{"spaceId": 42}}}, + {"fields non-string", Change{Fields: map[string]*FieldChange{"spaceId": {Value: nil}}}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := extractSpaceID(tc.c); got != "" { + t.Fatalf("want empty, got %q", got) + } + }) + } +} + +func TestChangeFromRow_PropagatesSpaceID(t *testing.T) { + row := store.ChangeRow{ + ID: "evt-1", + TableName: "tasks", + RecordID: "task-1", + Op: "insert", + SpaceID: "org-edisconet", + } + got := changeFromRow(row) + if got.SpaceID != "org-edisconet" { + t.Fatalf("want space id to round-trip, got %q", got.SpaceID) + } +} + +func TestChangeFromRow_EmptySpaceIDStaysEmpty(t *testing.T) { + row := store.ChangeRow{ + ID: "evt-2", + TableName: "tasks", + RecordID: "task-2", + Op: "insert", + } + got := changeFromRow(row) + if got.SpaceID != "" { + t.Fatalf("want empty space id, got %q", got.SpaceID) + } +} diff --git a/services/mana-sync/internal/sync/types.go b/services/mana-sync/internal/sync/types.go index 99030da3b..af5242c78 100644 --- a/services/mana-sync/internal/sync/types.go +++ b/services/mana-sync/internal/sync/types.go @@ -32,6 +32,13 @@ type Change struct { Fields map[string]*FieldChange `json:"fields,omitempty"` Data map[string]any `json:"data,omitempty"` DeletedAt *string `json:"deletedAt,omitempty"` + // SpaceID is the Better Auth organization id the record belongs to. + // Stamped client-side by the Dexie v28 hook from the user's active + // space (or the `_personal:` sentinel during the bootstrap + // window). Stored server-side in the space_id column so future + // queries can partition by space. Pre-v28 clients omit it; the + // column is nullable. See docs/plans/spaces-foundation.md. + SpaceID string `json:"spaceId,omitempty"` // Actor is the opaque JSON object the webapp stamps on every pending // change (see `data/events/actor.ts`): one of `{ kind: 'user' }`, // `{ kind: 'ai', missionId, iterationId, rationale }`, or