From 5922abbbd8f1820d8279180db0d37dea02006827 Mon Sep 17 00:00:00 2001 From: Till JS Date: Tue, 14 Apr 2026 23:37:19 +0200 Subject: [PATCH] feat(sync): stamp __lastActor + __fieldActors on incoming server changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the cross-device attribution loop. When another device pushes a change with `actor: { kind: 'ai', missionId, … }`, the receiving device now persists that attribution onto the record so the Workbench timeline and per-module ghost badges render the same way they would on the originating device. - `readFieldActors()` sibling helper next to `readFieldTimestamps` for reading the per-field actor map off a record - `applyServerChanges`: - Insert-new-record: stamp every field with `change.actor`, set `__lastActor` on the whole record - Insert-as-upsert: stamp only the winning fields (same LWW condition as the timestamp merge), update `__lastActor` to the change actor - Field-level update: same per-field + whole-record stamping - Pre-actor clients (change.actor undefined) fall back to USER_ACTOR so legacy rows still have a valid stamp - All three paths also add the new hidden keys to their "skip" lists so incoming payloads can't smuggle old bookkeeping fields through With this, the full pipeline is cross-device: Device A (AI writes) → meta.actor + __lastActor + pendingChange.actor mana-sync (Go) → persists actor JSONB on sync_changes row Device B (sync pull) → applyServerChanges re-stamps __lastActor + __fieldActors from the incoming change Device B (Workbench) → renders the AI's activity from `_events` with correct rationale + mission context Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/mana/apps/web/src/lib/data/sync.ts | 52 +++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/apps/mana/apps/web/src/lib/data/sync.ts b/apps/mana/apps/web/src/lib/data/sync.ts index 034aba4d1..01531e903 100644 --- a/apps/mana/apps/web/src/lib/data/sync.ts +++ b/apps/mana/apps/web/src/lib/data/sync.ts @@ -21,11 +21,20 @@ import { fromSyncName, beginApplyingTables, FIELD_TIMESTAMPS_KEY, + FIELD_ACTORS_KEY, + LAST_ACTOR_KEY, setPendingChangeListener, } from './database'; import { isQuotaError, cleanupTombstones, notifyQuotaExceeded } from './quota'; import { emitSyncTelemetry, categorizeSyncError } from './sync-telemetry'; -import type { Actor } from './events/actor'; +import { USER_ACTOR, type Actor } from './events/actor'; + +/** Reads the per-field actor map off a record; empty for legacy rows. */ +function readFieldActors(record: unknown): Record { + if (!record || typeof record !== 'object') return {}; + const fa = (record as Record)[FIELD_ACTORS_KEY]; + return fa && typeof fa === 'object' ? (fa as Record) : {}; +} // ─── Types ──────────────────────────────────────────────────── @@ -336,27 +345,51 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro (changeData.updatedAt as string | undefined) ?? (changeData.createdAt as string | undefined) ?? new Date().toISOString(); + // Actor stamped by the originating device. Pre-actor clients + // omit the field → fall back to USER_ACTOR so the record + // still has a valid stamp for the Workbench to render. + const changeActor: Actor = change.actor ?? USER_ACTOR; if (!existing) { const ft: Record = {}; + const fa: Record = {}; for (const key of Object.keys(changeData)) { - if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue; + if ( + key === 'id' || + key === FIELD_TIMESTAMPS_KEY || + key === FIELD_ACTORS_KEY || + key === LAST_ACTOR_KEY + ) { + continue; + } ft[key] = recordTime; + fa[key] = changeActor; } await table.put({ ...changeData, id: recordId, [FIELD_TIMESTAMPS_KEY]: ft, + [FIELD_ACTORS_KEY]: fa, + [LAST_ACTOR_KEY]: changeActor, }); } else { const localFT = readFieldTimestamps(existing); + const localFA = readFieldActors(existing); const localUpdatedAt = ((existing as Record).updatedAt as string | undefined) ?? ''; const updates: Record = {}; const newFT: Record = { ...localFT }; + const newFA: Record = { ...localFA }; for (const [key, val] of Object.entries(changeData)) { - if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue; + if ( + key === 'id' || + key === FIELD_TIMESTAMPS_KEY || + key === FIELD_ACTORS_KEY || + key === LAST_ACTOR_KEY + ) { + continue; + } const localFieldTime = localFT[key] ?? localUpdatedAt; if (recordTime >= localFieldTime) { // Conflict signal: server STRICTLY wins (>) and the local @@ -381,10 +414,13 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro } updates[key] = val; newFT[key] = recordTime; + newFA[key] = changeActor; } } if (Object.keys(updates).length > 0) { updates[FIELD_TIMESTAMPS_KEY] = newFT; + updates[FIELD_ACTORS_KEY] = newFA; + updates[LAST_ACTOR_KEY] = changeActor; await table.update(recordId, updates); } } @@ -392,6 +428,7 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro // Field-level LWW update — the canonical conflict-resolution path. const existing = await table.get(recordId); const serverFields = change.fields; + const changeActor: Actor = change.actor ?? USER_ACTOR; if (!existing) { // Reconstruct from fields. Other clients only see this if the @@ -399,21 +436,27 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro // authority. const record: Record = { id: recordId }; const ft: Record = {}; + const fa: Record = {}; const fallback = new Date().toISOString(); for (const [key, fc] of Object.entries(serverFields)) { record[key] = fc.value; ft[key] = fc.updatedAt ?? fallback; + fa[key] = changeActor; } record[FIELD_TIMESTAMPS_KEY] = ft; + record[FIELD_ACTORS_KEY] = fa; + record[LAST_ACTOR_KEY] = changeActor; await table.put(record); } else { // Per-field comparison. Falls back to record-level updatedAt // only for legacy records that pre-date __fieldTimestamps. const localFT = readFieldTimestamps(existing); + const localFA = readFieldActors(existing); const localUpdatedAt = ((existing as Record).updatedAt as string | undefined) ?? ''; const updates: Record = {}; const newFT: Record = { ...localFT }; + const newFA: Record = { ...localFA }; for (const [key, fc] of Object.entries(serverFields)) { const serverTime = fc.updatedAt ?? ''; @@ -439,10 +482,13 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro } updates[key] = fc.value; newFT[key] = serverTime; + newFA[key] = changeActor; } } if (Object.keys(updates).length > 0) { updates[FIELD_TIMESTAMPS_KEY] = newFT; + updates[FIELD_ACTORS_KEY] = newFA; + updates[LAST_ACTOR_KEY] = changeActor; await table.update(recordId, updates); } }