feat(sync): F2 — origin-gated conflict-detection

Closes the false-positive conflict-toast loop on history-replay. Conflict
notifications now fire only when the local field meta records origin='user'
AND the pull is not an initial hydration round.

Origin source-of-truth:
- shared-ai/field-meta.ts → originFromActor(actor) maps actor.kind onto
  the FieldOrigin enum: user→'user', ai→'agent', system+SYSTEM_MIGRATION
  →'migration', any other system source→'system'.
- Dexie creating/updating hooks call it once per write so every persisted
  field carries the right pipeline tag.
- repair-silent-twin + legacy-avatar wrap their writes in
  runAsAsync(makeSystemActor(SYSTEM_MIGRATION, ...)) so the hook stamps
  origin='migration'. Future replays of those rows from another device
  will not surface as conflicts.

applyServerChanges options:
- New ApplyServerChangesOptions { isInitialHydration?: boolean }.
- Push-response and pull-paged-loop callers compute it from the cursor
  state (`!oldestCursor` / `!cursor`). Pagination resets the flag after
  the first page.
- Conflict-trigger gates on `!options.isInitialHydration && localMeta[k]
  ?.origin === 'user'` in addition to the prior tests.

Tests (sync.test.ts):
- New: replay-burst (10 sequential server updates → 0 conflicts)
- New: agent-origin local write + server overwrite → 0 conflicts
- New: isInitialHydration suppresses everything → 0 conflicts
- New: real user edit + server overwrite → 1 conflict
- All 25 prior tests still pass.

29/29 vitest sync.test.ts cases green; svelte-check 0 errors over 7647
files.

Plan: docs/plans/sync-field-meta-overhaul.md F2 done-criteria met.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-26 21:38:56 +02:00
parent 6c942e3ab2
commit ad5e04a554
9 changed files with 279 additions and 50 deletions

View file

@ -20,8 +20,8 @@ import { fire as fireTrigger } from '$lib/triggers/registry';
import { checkInlineSuggestion } from '$lib/triggers/inline-suggest'; import { checkInlineSuggestion } from '$lib/triggers/inline-suggest';
import { getEffectiveUserId, GUEST_USER_ID } from './current-user'; import { getEffectiveUserId, GUEST_USER_ID } from './current-user';
import { getEffectiveSpaceId } from './scope/active-space.svelte'; import { getEffectiveSpaceId } from './scope/active-space.svelte';
import { getCurrentActor, makeFieldMeta } from './events/actor'; import { getCurrentActor, makeFieldMeta, originFromActor } from './events/actor';
import type { Actor, FieldMeta, FieldOrigin } from './events/actor'; import type { Actor, FieldMeta } from './events/actor';
import { isQuotaError, notifyQuotaExceeded } from './quota-detect'; import { isQuotaError, notifyQuotaExceeded } from './quota-detect';
import { import {
SYNC_APP_MAP, SYNC_APP_MAP,
@ -1501,9 +1501,12 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
// `at` drives field-LWW ordering, `actor` carries attribution forward // `at` drives field-LWW ordering, `actor` carries attribution forward
// across renames, `origin` distinguishes user edits from system / // across renames, `origin` distinguishes user edits from system /
// migration / agent / server-replay writes for conflict-detection. // migration / agent / server-replay writes for conflict-detection.
// F1 hardcodes `origin: 'user'` here — F2 will derive it from the // `origin` is derived from `actor.kind`:
// active actor.kind so AI-runner writes land as `'agent'` etc. // user → 'user'
const origin: FieldOrigin = 'user'; // ai → 'agent' (mission-runner / tool executor)
// system + SYSTEM_MIGRATION → 'migration' (Dexie upgrades, repair routines)
// any other system source → 'system' (projection, rule, stream, …)
const origin = originFromActor(actor);
const fieldMeta: Record<string, FieldMeta> = {}; const fieldMeta: Record<string, FieldMeta> = {};
for (const key of Object.keys(obj)) { for (const key of Object.keys(obj)) {
if (isInternalKey(key)) continue; if (isInternalKey(key)) continue;
@ -1542,7 +1545,7 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
if (_applyingTables.has(tableName)) return undefined; if (_applyingTables.has(tableName)) return undefined;
const now = new Date().toISOString(); const now = new Date().toISOString();
const actor: Actor = getCurrentActor(); const actor: Actor = getCurrentActor();
const origin: FieldOrigin = 'user'; const origin = originFromActor(actor);
const fields: Record<string, { value: unknown; at: string }> = {}; const fields: Record<string, { value: unknown; at: string }> = {};
// userId is immutable after creation. Silently strip any attempt to // userId is immutable after creation. Silently strip any attempt to

View file

@ -51,6 +51,7 @@ export {
isFromMissionRunner, isFromMissionRunner,
makeFieldMeta, makeFieldMeta,
isUserOriginatedField, isUserOriginatedField,
originFromActor,
} from '@mana/shared-ai'; } from '@mana/shared-ai';
/** /**

View file

@ -515,5 +515,152 @@ describe('applyServerChanges (Dexie integration)', () => {
// Tied — LWW lets server win silently (no edit-loss to surface) // Tied — LWW lets server win silently (no edit-loss to surface)
expect(conflicts).toHaveLength(0); expect(conflicts).toHaveLength(0);
}); });
// ─── F2: Origin-gated conflict detection ─────────────────
it('does NOT fire on a sequential server-replay burst (history-replay)', async () => {
// Reproduces the original bug: a fresh client pulls history of
// 10 distinct server changes for the same record across N
// pseudo-sessions. None of them is a local user edit; all
// applyServerChanges writes are stamped origin='server-replay'.
// No conflict toast must fire — the surface is reserved for
// user edits that lose to a later server overwrite.
const conflicts = await captureConflicts(async () => {
// Initial insert (server-replay).
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-replay-burst',
op: 'insert',
data: {
id: 'task-replay-burst',
title: 'gen-0',
priority: 'low',
isCompleted: false,
order: 0,
updatedAt: '2026-04-01T00:00:00Z',
},
},
]);
// 10 follow-up updates with monotonically increasing timestamps.
for (let i = 1; i <= 10; i++) {
const ts = `2026-04-01T00:0${i}:00Z`;
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-replay-burst',
op: 'update',
fields: { title: { value: `gen-${i}`, at: ts } },
},
]);
}
});
expect(conflicts).toHaveLength(0);
});
it('does NOT fire when the local write came from an AI agent (origin=agent)', async () => {
const { runAs } = await import('./events/actor');
const { makeAgentActor } = await import('@mana/shared-ai');
const agent = makeAgentActor({
agentId: 'agent-test',
displayName: 'Test-Agent',
missionId: 'mission-1',
iterationId: 'iter-1',
rationale: 'unit-test',
});
// Seed locally under the agent actor — the creating-hook
// stamps origin='agent' for every field.
runAs(agent, () => {
void db.table('tasks').add({
id: 'task-agent-write',
title: 'agent-typed value',
priority: 'low',
isCompleted: false,
order: 0,
});
});
// Wait for the (synchronous) Dexie put to drain.
await db.table('tasks').get('task-agent-write');
const conflicts = await captureConflicts(async () => {
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-agent-write',
op: 'update',
fields: {
title: { value: 'their version', at: '2099-01-01T00:00:00Z' },
},
},
]);
});
// Agent writes are visible in the proposal/mission UI, not the
// conflict toast — server overwriting them is silent.
expect(conflicts).toHaveLength(0);
});
it('does NOT fire when isInitialHydration is set (belt-and-suspenders)', async () => {
// Even with a real local user edit present, the hydration mode
// suppresses the entire conflict surface for the round.
await db.table('tasks').add({
id: 'task-hydration',
title: 'my draft',
priority: 'low',
isCompleted: false,
order: 0,
});
const conflicts = await captureConflicts(async () => {
await applyServerChanges(
'todo',
[
{
table: 'tasks',
id: 'task-hydration',
op: 'update',
fields: {
title: { value: 'server overwrite', at: '2099-01-01T00:00:00Z' },
},
},
],
{ isInitialHydration: true }
);
});
expect(conflicts).toHaveLength(0);
});
it('fires for a real user edit getting overwritten (origin=user)', async () => {
// Belt: the existing "fires when..." test at the top of the
// suite already covers this — restated here so the F2 block
// reads as a complete spec for the gating rules.
await db.table('tasks').add({
id: 'task-real-conflict',
title: 'I typed this',
priority: 'low',
isCompleted: false,
order: 0,
});
const conflicts = await captureConflicts(async () => {
await applyServerChanges('todo', [
{
table: 'tasks',
id: 'task-real-conflict',
op: 'update',
fields: {
title: { value: 'somebody else', at: '2099-01-01T00:00:00Z' },
},
},
]);
});
expect(conflicts).toHaveLength(1);
expect(conflicts[0].field).toBe('title');
});
}); });
}); });

View file

@ -287,7 +287,22 @@ function valuesEqual(a: unknown, b: unknown): boolean {
} }
} }
export async function applyServerChanges(appId: string, changes: unknown[]): Promise<void> { /** Options for {@link applyServerChanges}.
*
* `isInitialHydration` flips off conflict-detection entirely used when
* the calling pull/push knows the local client has never seen this app's
* data before (cursor is empty), so any "newer server value over local"
* is by definition initial-fill, not a lost edit. Belt-and-suspenders
* with the per-field `origin === 'user'` gate inside the inner loop. */
export interface ApplyServerChangesOptions {
isInitialHydration?: boolean;
}
export async function applyServerChanges(
appId: string,
changes: unknown[],
options: ApplyServerChangesOptions = {}
): Promise<void> {
// Reject malformed entries up-front so a single bad row from the server // Reject malformed entries up-front so a single bad row from the server
// can never write garbage into IndexedDB. Drops are logged once and the // can never write garbage into IndexedDB. Drops are logged once and the
// good entries proceed — partial degradation beats a hard crash on a // good entries proceed — partial degradation beats a hard crash on a
@ -402,16 +417,20 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
if (key === 'id' || key === FIELD_META_KEY) continue; if (key === 'id' || key === FIELD_META_KEY) continue;
const localFieldTime = localMeta[key]?.at ?? localUpdatedAt; const localFieldTime = localMeta[key]?.at ?? localUpdatedAt;
if (recordTime >= localFieldTime) { if (recordTime >= localFieldTime) {
// Conflict signal: server STRICTLY wins (>) and the local // Conflict signal: server STRICTLY wins (>), the local
// field had a non-empty value that differs from the new // field had a non-empty value that differs from the new
// one. Equal-time ties don't fire because there's no // one, AND the local write was a real user edit (not a
// edit to lose. F2 will additionally gate this on // server-replay / system / migration / agent write).
// localMeta[key].origin === 'user'. // Initial-hydration pulls bypass the conflict surface
// entirely — by definition no edit can be lost when
// the table has never been seen locally before.
const localValue = (existing as Record<string, unknown>)[key]; const localValue = (existing as Record<string, unknown>)[key];
if ( if (
!options.isInitialHydration &&
recordTime > localFieldTime && recordTime > localFieldTime &&
localValue != null && localValue != null &&
!valuesEqual(localValue, val) !valuesEqual(localValue, val) &&
localMeta[key]?.origin === 'user'
) { ) {
notifyConflict({ notifyConflict({
tableName, tableName,
@ -464,13 +483,16 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
const localFieldTime = localMeta[key]?.at ?? localUpdatedAt; const localFieldTime = localMeta[key]?.at ?? localUpdatedAt;
if (serverTime >= localFieldTime) { if (serverTime >= localFieldTime) {
// Same conflict criteria as the insert-as-update path: // Same conflict criteria as the insert-as-update path:
// strictly newer + non-empty local + actually different. // strictly newer + non-empty local + actually different
// F2 will additionally gate on localMeta[key].origin === 'user'. // + local write came from a real user edit. Initial
// hydration suppresses the surface entirely.
const localValue = (existing as Record<string, unknown>)[key]; const localValue = (existing as Record<string, unknown>)[key];
if ( if (
!options.isInitialHydration &&
serverTime > localFieldTime && serverTime > localFieldTime &&
localValue != null && localValue != null &&
!valuesEqual(localValue, fc.value) !valuesEqual(localValue, fc.value) &&
localMeta[key]?.origin === 'user'
) { ) {
notifyConflict({ notifyConflict({
tableName, tableName,
@ -777,6 +799,12 @@ export function createUnifiedSync(
// Build changeset in backend protocol format // Build changeset in backend protocol format
const changeset = buildChangeset(pending, clientId, oldestCursor); const changeset = buildChangeset(pending, clientId, oldestCursor);
// First contact with this app's data: cursor empty means the
// server is about to send everything-since-epoch. No local edit
// can be lost in that batch by definition — flip on the
// hydration mode so applyServerChanges suppresses any conflict
// notifications for this round.
const isInitialHydration = !oldestCursor;
const res = await fetchWithRetry( const res = await fetchWithRetry(
`${serverUrl}/sync/${appId}`, `${serverUrl}/sync/${appId}`,
@ -802,7 +830,7 @@ export function createUnifiedSync(
// Apply server changes from the response // Apply server changes from the response
if (data.serverChanges?.length > 0) { if (data.serverChanges?.length > 0) {
await applyServerChanges(appId, data.serverChanges); await applyServerChanges(appId, data.serverChanges, { isInitialHydration });
} }
// Update sync cursor // Update sync cursor
@ -881,6 +909,12 @@ export function createUnifiedSync(
const syncName = toSyncName(tableName); const syncName = toSyncName(tableName);
let cursor = await getSyncCursor(appId, tableName); let cursor = await getSyncCursor(appId, tableName);
let hasMore = true; let hasMore = true;
// Hydration applies to the first page of an empty-cursor
// pull. Subsequent pages carry a real cursor (set after the
// first response lands) so they fall back to normal
// conflict-detection — by then any user edits made during
// pagination are real edits worth surfacing.
let isInitialHydration = !cursor;
// Paginated pull: continue fetching until server signals no more data // Paginated pull: continue fetching until server signals no more data
while (hasMore) { while (hasMore) {
@ -906,11 +940,12 @@ export function createUnifiedSync(
if (data.serverChanges && data.serverChanges.length > 0) { if (data.serverChanges && data.serverChanges.length > 0) {
totalApplied += data.serverChanges.length; totalApplied += data.serverChanges.length;
await applyServerChanges(appId, data.serverChanges); await applyServerChanges(appId, data.serverChanges, { isInitialHydration });
} }
if (data.syncedUntil) { if (data.syncedUntil) {
cursor = data.syncedUntil; cursor = data.syncedUntil;
isInitialHydration = false;
} else { } else {
break; break;
} }

View file

@ -25,6 +25,7 @@ import { authStore } from '$lib/stores/auth.svelte';
import { profileService } from '$lib/api/profile'; import { profileService } from '$lib/api/profile';
import { meImagesTable } from '../collections'; import { meImagesTable } from '../collections';
import { meImagesStore } from '../stores/me-images.svelte'; import { meImagesStore } from '../stores/me-images.svelte';
import { makeSystemActor, runAsAsync, SYSTEM_MIGRATION } from '$lib/data/events/actor';
export async function migrateLegacyAvatarIfNeeded(): Promise<void> { export async function migrateLegacyAvatarIfNeeded(): Promise<void> {
const user = authStore.user; const user = authStore.user;
@ -63,28 +64,40 @@ export async function migrateLegacyAvatarIfNeeded(): Promise<void> {
return; return;
} }
await meImagesStore.createMeImage({ // Pin the narrowed `profile.image` to a const so the type stays
kind: 'face', // `string` across the runAsAsync closure (control-flow narrowing
// Sentinel mediaId: not a real mana-media reference. The generate- // doesn't survive nested callbacks).
// with-reference path (M3) gates on MediaClient.list({app:'me'}), const imageUrl = profile.image;
// so this id will naturally bounce if ever used for generation.
mediaId: `legacy-avatar:${user.id}`, // Run the seed insert under a migration-system actor so the Dexie
storagePath: profile.image, // creating-hook stamps `origin: 'migration'` on every field —
publicUrl: profile.image, // conflict-detection ignores this row when the same migration fires
thumbnailUrl: profile.image, // later on a different device.
width: 0, const migrationActor = makeSystemActor(SYSTEM_MIGRATION, 'Migration: legacy avatar');
height: 0, await runAsAsync(migrationActor, async () => {
label: 'Bisheriges Profilbild', await meImagesStore.createMeImage({
usage: { aiReference: false, showInProfile: true }, kind: 'face',
primaryFor: 'avatar', // Sentinel mediaId: not a real mana-media reference. The generate-
// Legacy avatar is the user's global SSO identity (Better Auth // with-reference path (M3) gates on MediaClient.list({app:'me'}),
// `users.image`) — it belongs explicitly in the *personal* space, // so this id will naturally bounce if ever used for generation.
// regardless of which space the user happens to be in when the mediaId: `legacy-avatar:${user.id}`,
// migration fires. Use the `_personal:<uid>` sentinel that storagePath: imageUrl,
// reconcileSentinels() rewrites to the real personal-space id on publicUrl: imageUrl,
// the next active-space bootstrap (same pattern v28 used for the thumbnailUrl: imageUrl,
// blanket data-table migration). width: 0,
spaceId: `_personal:${user.id}`, height: 0,
label: 'Bisheriges Profilbild',
usage: { aiReference: false, showInProfile: true },
primaryFor: 'avatar',
// Legacy avatar is the user's global SSO identity (Better Auth
// `users.image`) — it belongs explicitly in the *personal* space,
// regardless of which space the user happens to be in when the
// migration fires. Use the `_personal:<uid>` sentinel that
// reconcileSentinels() rewrites to the real personal-space id on
// the next active-space bootstrap (same pattern v28 used for the
// blanket data-table migration).
spaceId: `_personal:${user.id}`,
});
}); });
try { try {

View file

@ -24,6 +24,7 @@
import { authStore } from '$lib/stores/auth.svelte'; import { authStore } from '$lib/stores/auth.svelte';
import { meImagesTable } from '../collections'; import { meImagesTable } from '../collections';
import { makeSystemActor, runAsAsync, SYSTEM_MIGRATION } from '$lib/data/events/actor';
export async function repairSilentTwinAvatarRows(): Promise<void> { export async function repairSilentTwinAvatarRows(): Promise<void> {
const user = authStore.user; const user = authStore.user;
@ -60,14 +61,22 @@ export async function repairSilentTwinAvatarRows(): Promise<void> {
victims.sort((a, b) => (b.updatedAt ?? '').localeCompare(a.updatedAt ?? '')); victims.sort((a, b) => (b.updatedAt ?? '').localeCompare(a.updatedAt ?? ''));
const nowIso = new Date().toISOString(); const nowIso = new Date().toISOString();
await meImagesTable.db.transaction('rw', meImagesTable, async () => { // Run the rewrite under a migration-system actor so the Dexie
for (let i = 0; i < victims.length; i++) { // updating-hook stamps `origin: 'migration'` on every touched field.
const row = victims[i]; // Conflict-detection later treats these writes as pipeline-internal —
await meImagesTable.update(row.id, { // a fresh client pulling the same updates from another device must
primaryFor: i === 0 ? 'face-ref' : null, // NOT see "another session overwrote your edit" toasts.
updatedAt: nowIso, const migrationActor = makeSystemActor(SYSTEM_MIGRATION, 'Repair: silent-twin');
}); await runAsAsync(migrationActor, async () => {
} await meImagesTable.db.transaction('rw', meImagesTable, async () => {
for (let i = 0; i < victims.length; i++) {
const row = victims[i];
await meImagesTable.update(row.id, {
primaryFor: i === 0 ? 'face-ref' : null,
updatedAt: nowIso,
});
}
});
}); });
try { try {

View file

@ -145,8 +145,8 @@ _Wird befüllt während der Ausführung._
| Phase | Commit | Notiz | | Phase | Commit | Notiz |
| --- | --- | --- | | --- | --- | --- |
| F1 | _staged, uncommitted_ | Web + mana-sync (Go) + mana-ai + apps/api/mcp + tests + DB schema reset. Type-checks grün, mana-sync Go-Tests grün, mana-ai Bun-Tests grün (61 pass). DB truncated + recreated mit `field_meta` JSONB + `origin` TEXT. Browser-IndexedDB-Wipe + Smoke-Test stehen aus (User-Action). | | F1 | `7766ea502` | Web + mana-sync (Go) + mana-ai + apps/api/mcp + tests + DB schema reset. **Note: Commit-Titel `docs(plans): mark llm-fallback-aliases SHIPPED` ist irreführend — Multi-Terminal-Race hat F1 in einem fremden Commit zusammengeführt. Code ist trotzdem korrekt drin (27 F1 Files + Plan).** Tests grün, DB migriert. |
| F2 | _pending_ | | | F2 | _staged, uncommitted_ | Origin-Gate aktiviert. `originFromActor()` in shared-ai/field-meta.ts maps actor.kind → 'user'/'agent'/'system'/'migration'. Hooks nutzen es. Repair-Migrations (repair-silent-twin, legacy-avatar) wrappen ihre Writes in `runAsAsync(systemMigrationActor, ...)`. `applyServerChanges` bekommt `ApplyServerChangesOptions.isInitialHydration` Parameter, beide Caller (push-response + pull) setzen ihn aus dem Cursor-State. Conflict-Trigger feuert nur noch wenn `localMeta[k]?.origin === 'user' && !options.isInitialHydration`. **29 Tests grün** inkl. 4 neuer (replay-burst no-conflict, agent-origin no-conflict, hydration no-conflict, user-edit fires-conflict). |
| F3 | _pending_ | | | F3 | _pending_ | |
| F4 | _pending_ | | | F4 | _pending_ | |
| F5 | _pending_ | | | F5 | _pending_ | |

View file

@ -23,6 +23,7 @@
*/ */
import type { Actor } from './actor'; import type { Actor } from './actor';
import { SYSTEM_MIGRATION } from './actor';
/** /**
* Pipeline that produced a given field value, from the perspective of * Pipeline that produced a given field value, from the perspective of
@ -67,3 +68,23 @@ export function makeFieldMeta(at: string, actor: Actor, origin: FieldOrigin): Fi
export function isUserOriginatedField(meta: FieldMeta | undefined): boolean { export function isUserOriginatedField(meta: FieldMeta | undefined): boolean {
return meta?.origin === 'user'; return meta?.origin === 'user';
} }
/**
* Map an actor onto the pipeline-origin we stamp at the Dexie hook.
*
* Rules:
* - `kind: 'user'` `'user'`
* - `kind: 'ai'` `'agent'`
* - `kind: 'system'` with `principalId === SYSTEM_MIGRATION` `'migration'`
* - any other system source `'system'`
*
* Server-replay is set explicitly by `applyServerChanges`, never derived
* here the local hook only sees writes the local code initiated.
*/
export function originFromActor(actor: Actor): FieldOrigin {
if (actor.kind === 'ai') return 'agent';
if (actor.kind === 'system') {
return actor.principalId === SYSTEM_MIGRATION ? 'migration' : 'system';
}
return 'user';
}

View file

@ -38,7 +38,7 @@ export {
} from './actor'; } from './actor';
export type { FieldMeta, FieldOrigin } from './field-meta'; export type { FieldMeta, FieldOrigin } from './field-meta';
export { makeFieldMeta, isUserOriginatedField } from './field-meta'; export { makeFieldMeta, isUserOriginatedField, originFromActor } from './field-meta';
export type { export type {
IterationPhase, IterationPhase,