feat(sync): F2 — origin-gated conflict-detection

Closes the false-positive conflict-toast loop on history-replay. Conflict
notifications now fire only when the local field meta records origin='user'
AND the pull is not an initial hydration round.

Origin source-of-truth:
- shared-ai/field-meta.ts → originFromActor(actor) maps actor.kind onto
  the FieldOrigin enum: user→'user', ai→'agent', system+SYSTEM_MIGRATION
  →'migration', any other system source→'system'.
- Dexie creating/updating hooks call it once per write so every persisted
  field carries the right pipeline tag.
- repair-silent-twin + legacy-avatar wrap their writes in
  runAsAsync(makeSystemActor(SYSTEM_MIGRATION, ...)) so the hook stamps
  origin='migration'. Future replays of those rows from another device
  will not surface as conflicts.

applyServerChanges options:
- New ApplyServerChangesOptions { isInitialHydration?: boolean }.
- Push-response and pull-paged-loop callers compute it from the cursor
  state (`!oldestCursor` / `!cursor`). Pagination resets the flag after
  the first page.
- Conflict-trigger gates on `!options.isInitialHydration && localMeta[k]
  ?.origin === 'user'` in addition to the prior tests.

Tests (sync.test.ts):
- New: replay-burst (10 sequential server updates → 0 conflicts)
- New: agent-origin local write + server overwrite → 0 conflicts
- New: isInitialHydration suppresses everything → 0 conflicts
- New: real user edit + server overwrite → 1 conflict
- All 25 prior tests still pass.

29/29 vitest sync.test.ts cases green; svelte-check 0 errors over 7647
files.

Plan: docs/plans/sync-field-meta-overhaul.md F2 done-criteria met.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-26 21:38:56 +02:00
parent 6c942e3ab2
commit ad5e04a554
9 changed files with 279 additions and 50 deletions

View file

@ -20,8 +20,8 @@ import { fire as fireTrigger } from '$lib/triggers/registry';
import { checkInlineSuggestion } from '$lib/triggers/inline-suggest';
import { getEffectiveUserId, GUEST_USER_ID } from './current-user';
import { getEffectiveSpaceId } from './scope/active-space.svelte';
import { getCurrentActor, makeFieldMeta } from './events/actor';
import type { Actor, FieldMeta, FieldOrigin } from './events/actor';
import { getCurrentActor, makeFieldMeta, originFromActor } from './events/actor';
import type { Actor, FieldMeta } from './events/actor';
import { isQuotaError, notifyQuotaExceeded } from './quota-detect';
import {
SYNC_APP_MAP,
@ -1501,9 +1501,12 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
// `at` drives field-LWW ordering, `actor` carries attribution forward
// across renames, `origin` distinguishes user edits from system /
// migration / agent / server-replay writes for conflict-detection.
// F1 hardcodes `origin: 'user'` here — F2 will derive it from the
// active actor.kind so AI-runner writes land as `'agent'` etc.
const origin: FieldOrigin = 'user';
// `origin` is derived from `actor.kind`:
// user → 'user'
// ai → 'agent' (mission-runner / tool executor)
// system + SYSTEM_MIGRATION → 'migration' (Dexie upgrades, repair routines)
// any other system source → 'system' (projection, rule, stream, …)
const origin = originFromActor(actor);
const fieldMeta: Record<string, FieldMeta> = {};
for (const key of Object.keys(obj)) {
if (isInternalKey(key)) continue;
@ -1542,7 +1545,7 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
if (_applyingTables.has(tableName)) return undefined;
const now = new Date().toISOString();
const actor: Actor = getCurrentActor();
const origin: FieldOrigin = 'user';
const origin = originFromActor(actor);
const fields: Record<string, { value: unknown; at: string }> = {};
// userId is immutable after creation. Silently strip any attempt to

View file

@ -51,6 +51,7 @@ export {
isFromMissionRunner,
makeFieldMeta,
isUserOriginatedField,
originFromActor,
} from '@mana/shared-ai';
/**

View file

@ -515,5 +515,152 @@ describe('applyServerChanges (Dexie integration)', () => {
// Tied — LWW lets server win silently (no edit-loss to surface)
expect(conflicts).toHaveLength(0);
});
// ─── F2: Origin-gated conflict detection ─────────────────
it('does NOT fire on a sequential server-replay burst (history-replay)', async () => {
// Reproduces the original bug: a fresh client pulls history of
// 10 distinct server changes for the same record across N
// pseudo-sessions. None of them is a local user edit; all
// applyServerChanges writes are stamped origin='server-replay'.
// No conflict toast must fire — the surface is reserved for
// user edits that lose to a later server overwrite.
const conflicts = await captureConflicts(async () => {
// Initial insert (server-replay).
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-replay-burst',
op: 'insert',
data: {
id: 'task-replay-burst',
title: 'gen-0',
priority: 'low',
isCompleted: false,
order: 0,
updatedAt: '2026-04-01T00:00:00Z',
},
},
]);
// 10 follow-up updates with monotonically increasing timestamps.
for (let i = 1; i <= 10; i++) {
const ts = `2026-04-01T00:0${i}:00Z`;
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-replay-burst',
op: 'update',
fields: { title: { value: `gen-${i}`, at: ts } },
},
]);
}
});
expect(conflicts).toHaveLength(0);
});
it('does NOT fire when the local write came from an AI agent (origin=agent)', async () => {
const { runAs } = await import('./events/actor');
const { makeAgentActor } = await import('@mana/shared-ai');
const agent = makeAgentActor({
agentId: 'agent-test',
displayName: 'Test-Agent',
missionId: 'mission-1',
iterationId: 'iter-1',
rationale: 'unit-test',
});
// Seed locally under the agent actor — the creating-hook
// stamps origin='agent' for every field.
runAs(agent, () => {
void db.table('tasks').add({
id: 'task-agent-write',
title: 'agent-typed value',
priority: 'low',
isCompleted: false,
order: 0,
});
});
// Wait for the (synchronous) Dexie put to drain.
await db.table('tasks').get('task-agent-write');
const conflicts = await captureConflicts(async () => {
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-agent-write',
op: 'update',
fields: {
title: { value: 'their version', at: '2099-01-01T00:00:00Z' },
},
},
]);
});
// Agent writes are visible in the proposal/mission UI, not the
// conflict toast — server overwriting them is silent.
expect(conflicts).toHaveLength(0);
});
it('does NOT fire when isInitialHydration is set (belt-and-suspenders)', async () => {
// Even with a real local user edit present, the hydration mode
// suppresses the entire conflict surface for the round.
await db.table('tasks').add({
id: 'task-hydration',
title: 'my draft',
priority: 'low',
isCompleted: false,
order: 0,
});
const conflicts = await captureConflicts(async () => {
await applyServerChanges(
'todo',
[
{
table: 'tasks',
id: 'task-hydration',
op: 'update',
fields: {
title: { value: 'server overwrite', at: '2099-01-01T00:00:00Z' },
},
},
],
{ isInitialHydration: true }
);
});
expect(conflicts).toHaveLength(0);
});
it('fires for a real user edit getting overwritten (origin=user)', async () => {
// Belt: the existing "fires when..." test at the top of the
// suite already covers this — restated here so the F2 block
// reads as a complete spec for the gating rules.
await db.table('tasks').add({
id: 'task-real-conflict',
title: 'I typed this',
priority: 'low',
isCompleted: false,
order: 0,
});
const conflicts = await captureConflicts(async () => {
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-real-conflict',
op: 'update',
fields: {
title: { value: 'somebody else', at: '2099-01-01T00:00:00Z' },
},
},
]);
});
expect(conflicts).toHaveLength(1);
expect(conflicts[0].field).toBe('title');
});
});
});

View file

@ -287,7 +287,22 @@ function valuesEqual(a: unknown, b: unknown): boolean {
}
}
export async function applyServerChanges(appId: string, changes: unknown[]): Promise<void> {
/** Options for {@link applyServerChanges}.
*
* `isInitialHydration` flips off conflict-detection entirely used when
* the calling pull/push knows the local client has never seen this app's
* data before (cursor is empty), so any "newer server value over local"
* is by definition initial-fill, not a lost edit. Belt-and-suspenders
* with the per-field `origin === 'user'` gate inside the inner loop. */
export interface ApplyServerChangesOptions {
isInitialHydration?: boolean;
}
export async function applyServerChanges(
appId: string,
changes: unknown[],
options: ApplyServerChangesOptions = {}
): Promise<void> {
// Reject malformed entries up-front so a single bad row from the server
// can never write garbage into IndexedDB. Drops are logged once and the
// good entries proceed — partial degradation beats a hard crash on a
@ -402,16 +417,20 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
if (key === 'id' || key === FIELD_META_KEY) continue;
const localFieldTime = localMeta[key]?.at ?? localUpdatedAt;
if (recordTime >= localFieldTime) {
// Conflict signal: server STRICTLY wins (>) and the local
// Conflict signal: server STRICTLY wins (>), the local
// field had a non-empty value that differs from the new
// one. Equal-time ties don't fire because there's no
// edit to lose. F2 will additionally gate this on
// localMeta[key].origin === 'user'.
// one, AND the local write was a real user edit (not a
// server-replay / system / migration / agent write).
// Initial-hydration pulls bypass the conflict surface
// entirely — by definition no edit can be lost when
// the table has never been seen locally before.
const localValue = (existing as Record<string, unknown>)[key];
if (
!options.isInitialHydration &&
recordTime > localFieldTime &&
localValue != null &&
!valuesEqual(localValue, val)
!valuesEqual(localValue, val) &&
localMeta[key]?.origin === 'user'
) {
notifyConflict({
tableName,
@ -464,13 +483,16 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
const localFieldTime = localMeta[key]?.at ?? localUpdatedAt;
if (serverTime >= localFieldTime) {
// Same conflict criteria as the insert-as-update path:
// strictly newer + non-empty local + actually different.
// F2 will additionally gate on localMeta[key].origin === 'user'.
// strictly newer + non-empty local + actually different
// + local write came from a real user edit. Initial
// hydration suppresses the surface entirely.
const localValue = (existing as Record<string, unknown>)[key];
if (
!options.isInitialHydration &&
serverTime > localFieldTime &&
localValue != null &&
!valuesEqual(localValue, fc.value)
!valuesEqual(localValue, fc.value) &&
localMeta[key]?.origin === 'user'
) {
notifyConflict({
tableName,
@ -777,6 +799,12 @@ export function createUnifiedSync(
// Build changeset in backend protocol format
const changeset = buildChangeset(pending, clientId, oldestCursor);
// First contact with this app's data: cursor empty means the
// server is about to send everything-since-epoch. No local edit
// can be lost in that batch by definition — flip on the
// hydration mode so applyServerChanges suppresses any conflict
// notifications for this round.
const isInitialHydration = !oldestCursor;
const res = await fetchWithRetry(
`${serverUrl}/sync/${appId}`,
@ -802,7 +830,7 @@ export function createUnifiedSync(
// Apply server changes from the response
if (data.serverChanges?.length > 0) {
await applyServerChanges(appId, data.serverChanges);
await applyServerChanges(appId, data.serverChanges, { isInitialHydration });
}
// Update sync cursor
@ -881,6 +909,12 @@ export function createUnifiedSync(
const syncName = toSyncName(tableName);
let cursor = await getSyncCursor(appId, tableName);
let hasMore = true;
// Hydration applies to the first page of an empty-cursor
// pull. Subsequent pages carry a real cursor (set after the
// first response lands) so they fall back to normal
// conflict-detection — by then any user edits made during
// pagination are real edits worth surfacing.
let isInitialHydration = !cursor;
// Paginated pull: continue fetching until server signals no more data
while (hasMore) {
@ -906,11 +940,12 @@ export function createUnifiedSync(
if (data.serverChanges && data.serverChanges.length > 0) {
totalApplied += data.serverChanges.length;
await applyServerChanges(appId, data.serverChanges);
await applyServerChanges(appId, data.serverChanges, { isInitialHydration });
}
if (data.syncedUntil) {
cursor = data.syncedUntil;
isInitialHydration = false;
} else {
break;
}

View file

@ -25,6 +25,7 @@ import { authStore } from '$lib/stores/auth.svelte';
import { profileService } from '$lib/api/profile';
import { meImagesTable } from '../collections';
import { meImagesStore } from '../stores/me-images.svelte';
import { makeSystemActor, runAsAsync, SYSTEM_MIGRATION } from '$lib/data/events/actor';
export async function migrateLegacyAvatarIfNeeded(): Promise<void> {
const user = authStore.user;
@ -63,28 +64,40 @@ export async function migrateLegacyAvatarIfNeeded(): Promise<void> {
return;
}
await meImagesStore.createMeImage({
kind: 'face',
// Sentinel mediaId: not a real mana-media reference. The generate-
// with-reference path (M3) gates on MediaClient.list({app:'me'}),
// so this id will naturally bounce if ever used for generation.
mediaId: `legacy-avatar:${user.id}`,
storagePath: profile.image,
publicUrl: profile.image,
thumbnailUrl: profile.image,
width: 0,
height: 0,
label: 'Bisheriges Profilbild',
usage: { aiReference: false, showInProfile: true },
primaryFor: 'avatar',
// Legacy avatar is the user's global SSO identity (Better Auth
// `users.image`) — it belongs explicitly in the *personal* space,
// regardless of which space the user happens to be in when the
// migration fires. Use the `_personal:<uid>` sentinel that
// reconcileSentinels() rewrites to the real personal-space id on
// the next active-space bootstrap (same pattern v28 used for the
// blanket data-table migration).
spaceId: `_personal:${user.id}`,
// Pin the narrowed `profile.image` to a const so the type stays
// `string` across the runAsAsync closure (control-flow narrowing
// doesn't survive nested callbacks).
const imageUrl = profile.image;
// Run the seed insert under a migration-system actor so the Dexie
// creating-hook stamps `origin: 'migration'` on every field —
// conflict-detection ignores this row when the same migration fires
// later on a different device.
const migrationActor = makeSystemActor(SYSTEM_MIGRATION, 'Migration: legacy avatar');
await runAsAsync(migrationActor, async () => {
await meImagesStore.createMeImage({
kind: 'face',
// Sentinel mediaId: not a real mana-media reference. The generate-
// with-reference path (M3) gates on MediaClient.list({app:'me'}),
// so this id will naturally bounce if ever used for generation.
mediaId: `legacy-avatar:${user.id}`,
storagePath: imageUrl,
publicUrl: imageUrl,
thumbnailUrl: imageUrl,
width: 0,
height: 0,
label: 'Bisheriges Profilbild',
usage: { aiReference: false, showInProfile: true },
primaryFor: 'avatar',
// Legacy avatar is the user's global SSO identity (Better Auth
// `users.image`) — it belongs explicitly in the *personal* space,
// regardless of which space the user happens to be in when the
// migration fires. Use the `_personal:<uid>` sentinel that
// reconcileSentinels() rewrites to the real personal-space id on
// the next active-space bootstrap (same pattern v28 used for the
// blanket data-table migration).
spaceId: `_personal:${user.id}`,
});
});
try {

View file

@ -24,6 +24,7 @@
import { authStore } from '$lib/stores/auth.svelte';
import { meImagesTable } from '../collections';
import { makeSystemActor, runAsAsync, SYSTEM_MIGRATION } from '$lib/data/events/actor';
export async function repairSilentTwinAvatarRows(): Promise<void> {
const user = authStore.user;
@ -60,14 +61,22 @@ export async function repairSilentTwinAvatarRows(): Promise<void> {
victims.sort((a, b) => (b.updatedAt ?? '').localeCompare(a.updatedAt ?? ''));
const nowIso = new Date().toISOString();
await meImagesTable.db.transaction('rw', meImagesTable, async () => {
for (let i = 0; i < victims.length; i++) {
const row = victims[i];
await meImagesTable.update(row.id, {
primaryFor: i === 0 ? 'face-ref' : null,
updatedAt: nowIso,
});
}
// Run the rewrite under a migration-system actor so the Dexie
// updating-hook stamps `origin: 'migration'` on every touched field.
// Conflict-detection later treats these writes as pipeline-internal —
// a fresh client pulling the same updates from another device must
// NOT see "another session overwrote your edit" toasts.
const migrationActor = makeSystemActor(SYSTEM_MIGRATION, 'Repair: silent-twin');
await runAsAsync(migrationActor, async () => {
await meImagesTable.db.transaction('rw', meImagesTable, async () => {
for (let i = 0; i < victims.length; i++) {
const row = victims[i];
await meImagesTable.update(row.id, {
primaryFor: i === 0 ? 'face-ref' : null,
updatedAt: nowIso,
});
}
});
});
try {