feat(sync): stamp __lastActor + __fieldActors on incoming server changes

Closes the cross-device attribution loop. When another device pushes a
change with `actor: { kind: 'ai', missionId, … }`, the receiving device
now persists that attribution onto the record so the Workbench timeline
and per-module ghost badges render the same way they would on the
originating device.

- `readFieldActors()` sibling helper next to `readFieldTimestamps` for
  reading the per-field actor map off a record
- `applyServerChanges`:
  - Insert-new-record: stamp every field with `change.actor`, set
    `__lastActor` on the whole record
  - Insert-as-upsert: stamp only the winning fields (same LWW condition
    as the timestamp merge), update `__lastActor` to the change actor
  - Field-level update: same per-field + whole-record stamping
  - Pre-actor clients (change.actor undefined) fall back to USER_ACTOR so
    legacy rows still have a valid stamp
- All three paths also add the new hidden keys to their "skip" lists so
  incoming payloads can't smuggle old bookkeeping fields through

With this, the full pipeline is cross-device:
  Device A (AI writes)  → meta.actor + __lastActor + pendingChange.actor
  mana-sync (Go)        → persists actor JSONB on sync_changes row
  Device B (sync pull)  → applyServerChanges re-stamps __lastActor +
                          __fieldActors from the incoming change
  Device B (Workbench)  → renders the AI's activity from `_events` with
                          correct rationale + mission context

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-14 23:37:19 +02:00
parent 615b1c23c3
commit 5922abbbd8

View file

@ -21,11 +21,20 @@ import {
fromSyncName,
beginApplyingTables,
FIELD_TIMESTAMPS_KEY,
FIELD_ACTORS_KEY,
LAST_ACTOR_KEY,
setPendingChangeListener,
} from './database';
import { isQuotaError, cleanupTombstones, notifyQuotaExceeded } from './quota';
import { emitSyncTelemetry, categorizeSyncError } from './sync-telemetry';
import type { Actor } from './events/actor';
import { USER_ACTOR, type Actor } from './events/actor';
/** Reads the per-field actor map off a record; empty for legacy rows. */
function readFieldActors(record: unknown): Record<string, Actor> {
if (!record || typeof record !== 'object') return {};
const fa = (record as Record<string, unknown>)[FIELD_ACTORS_KEY];
return fa && typeof fa === 'object' ? (fa as Record<string, Actor>) : {};
}
// ─── Types ────────────────────────────────────────────────────
@ -336,27 +345,51 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
(changeData.updatedAt as string | undefined) ??
(changeData.createdAt as string | undefined) ??
new Date().toISOString();
// Actor stamped by the originating device. Pre-actor clients
// omit the field → fall back to USER_ACTOR so the record
// still has a valid stamp for the Workbench to render.
const changeActor: Actor = change.actor ?? USER_ACTOR;
if (!existing) {
const ft: Record<string, string> = {};
const fa: Record<string, Actor> = {};
for (const key of Object.keys(changeData)) {
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
if (
key === 'id' ||
key === FIELD_TIMESTAMPS_KEY ||
key === FIELD_ACTORS_KEY ||
key === LAST_ACTOR_KEY
) {
continue;
}
ft[key] = recordTime;
fa[key] = changeActor;
}
await table.put({
...changeData,
id: recordId,
[FIELD_TIMESTAMPS_KEY]: ft,
[FIELD_ACTORS_KEY]: fa,
[LAST_ACTOR_KEY]: changeActor,
});
} else {
const localFT = readFieldTimestamps(existing);
const localFA = readFieldActors(existing);
const localUpdatedAt =
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
const updates: Record<string, unknown> = {};
const newFT: Record<string, string> = { ...localFT };
const newFA: Record<string, Actor> = { ...localFA };
for (const [key, val] of Object.entries(changeData)) {
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
if (
key === 'id' ||
key === FIELD_TIMESTAMPS_KEY ||
key === FIELD_ACTORS_KEY ||
key === LAST_ACTOR_KEY
) {
continue;
}
const localFieldTime = localFT[key] ?? localUpdatedAt;
if (recordTime >= localFieldTime) {
// Conflict signal: server STRICTLY wins (>) and the local
@ -381,10 +414,13 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
}
updates[key] = val;
newFT[key] = recordTime;
newFA[key] = changeActor;
}
}
if (Object.keys(updates).length > 0) {
updates[FIELD_TIMESTAMPS_KEY] = newFT;
updates[FIELD_ACTORS_KEY] = newFA;
updates[LAST_ACTOR_KEY] = changeActor;
await table.update(recordId, updates);
}
}
@ -392,6 +428,7 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
// Field-level LWW update — the canonical conflict-resolution path.
const existing = await table.get(recordId);
const serverFields = change.fields;
const changeActor: Actor = change.actor ?? USER_ACTOR;
if (!existing) {
// Reconstruct from fields. Other clients only see this if the
@ -399,21 +436,27 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
// authority.
const record: Record<string, unknown> = { id: recordId };
const ft: Record<string, string> = {};
const fa: Record<string, Actor> = {};
const fallback = new Date().toISOString();
for (const [key, fc] of Object.entries(serverFields)) {
record[key] = fc.value;
ft[key] = fc.updatedAt ?? fallback;
fa[key] = changeActor;
}
record[FIELD_TIMESTAMPS_KEY] = ft;
record[FIELD_ACTORS_KEY] = fa;
record[LAST_ACTOR_KEY] = changeActor;
await table.put(record);
} else {
// Per-field comparison. Falls back to record-level updatedAt
// only for legacy records that pre-date __fieldTimestamps.
const localFT = readFieldTimestamps(existing);
const localFA = readFieldActors(existing);
const localUpdatedAt =
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
const updates: Record<string, unknown> = {};
const newFT: Record<string, string> = { ...localFT };
const newFA: Record<string, Actor> = { ...localFA };
for (const [key, fc] of Object.entries(serverFields)) {
const serverTime = fc.updatedAt ?? '';
@ -439,10 +482,13 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
}
updates[key] = fc.value;
newFT[key] = serverTime;
newFA[key] = changeActor;
}
}
if (Object.keys(updates).length > 0) {
updates[FIELD_TIMESTAMPS_KEY] = newFT;
updates[FIELD_ACTORS_KEY] = newFA;
updates[LAST_ACTOR_KEY] = changeActor;
await table.update(recordId, updates);
}
}