mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-15 00:01:10 +02:00
docs(plans): mark llm-fallback-aliases SHIPPED, add M-by-M commit table
All 5 milestones landed today in one continuous session: registry, health cache, fallback router, observability, and consumer migration. 115 service-side tests, validator covers 2538 files.
This commit is contained in:
parent
30eb7ef72d
commit
7766ea5021
27 changed files with 662 additions and 346 deletions
|
|
@ -78,6 +78,11 @@ export async function readLatestRecords(
|
|||
/**
|
||||
* Write a new record via sync_changes INSERT. The record will appear
|
||||
* on the user's devices on their next sync cycle.
|
||||
*
|
||||
* MCP-Tool calls always carry `origin='agent'` because the pipeline
|
||||
* that produced the value is an AI agent invoking a tool — the
|
||||
* actor's `kind` may be `system` (the MCP server itself) but the
|
||||
* write semantics are agent-driven for conflict-detection purposes.
|
||||
*/
|
||||
export async function writeRecord(
|
||||
userId: string,
|
||||
|
|
@ -86,17 +91,18 @@ export async function writeRecord(
|
|||
recordId: string,
|
||||
op: 'insert' | 'update' | 'delete',
|
||||
data: Record<string, unknown>,
|
||||
fieldTimestamps: Record<string, string>
|
||||
fieldMeta: Record<string, string>
|
||||
): Promise<void> {
|
||||
await withUser(userId, async (tx) => {
|
||||
await tx`
|
||||
INSERT INTO sync_changes
|
||||
(app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor)
|
||||
(app_id, table_name, record_id, user_id, op, data, field_meta, client_id, schema_version, actor, origin)
|
||||
VALUES
|
||||
(${appId}, ${tableName}, ${recordId}, ${userId}, ${op},
|
||||
${tx.json(data as never)}, ${tx.json(fieldTimestamps as never)},
|
||||
${tx.json(data as never)}, ${tx.json(fieldMeta as never)},
|
||||
'mcp-server', 1,
|
||||
${tx.json({ kind: 'system', principalId: 'system:mcp', displayName: 'MCP Server' } as never)})
|
||||
${tx.json({ kind: 'system', principalId: 'system:mcp', displayName: 'MCP Server' } as never)},
|
||||
'agent')
|
||||
`;
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,9 +2,8 @@
|
|||
* Mission store — CRUD + lifecycle operations.
|
||||
*
|
||||
* Missions go through the unified Dexie write path, which means the Dexie
|
||||
* hooks stamp `userId`, `__lastActor`, `__fieldTimestamps`, `__fieldActors`
|
||||
* and track the row into `_pendingChanges`. Callers never touch those
|
||||
* fields directly.
|
||||
* hooks stamp `userId` + `__fieldMeta` and track the row into
|
||||
* `_pendingChanges`. Callers never touch those fields directly.
|
||||
*
|
||||
* Iterations are intentionally stored inline (`Mission.iterations`) rather
|
||||
* than in a child table. They are append-only, each Mission stays small
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@
|
|||
|
||||
import { db } from './database';
|
||||
import { subscribeSyncConflicts, type SyncConflictPayload } from './sync';
|
||||
import { FIELD_TIMESTAMPS_KEY } from './database';
|
||||
|
||||
/** How long a conflict stays visible before auto-dismissing. */
|
||||
const CONFLICT_TTL_MS = 30_000;
|
||||
|
|
@ -155,24 +154,16 @@ async function restore(id: string): Promise<void> {
|
|||
|
||||
const now = new Date().toISOString();
|
||||
const updates: Record<string, unknown> = { updatedAt: now };
|
||||
const ftPatch: Record<string, string> = {};
|
||||
|
||||
for (const [field, info] of Object.entries(conflict.fields)) {
|
||||
updates[field] = info.wasLocal;
|
||||
ftPatch[field] = now;
|
||||
}
|
||||
|
||||
// Read the current row's __fieldTimestamps and merge our patch in
|
||||
// so we don't blow away unrelated server-side timestamps.
|
||||
const row = await db.table(conflict.tableName).get(conflict.recordId);
|
||||
if (row) {
|
||||
const existingFT =
|
||||
((row as Record<string, unknown>)[FIELD_TIMESTAMPS_KEY] as Record<string, string>) ?? {};
|
||||
updates[FIELD_TIMESTAMPS_KEY] = { ...existingFT, ...ftPatch };
|
||||
} else {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = ftPatch;
|
||||
}
|
||||
|
||||
// The Dexie updating-hook re-stamps `__fieldMeta` for every modified
|
||||
// field with origin='user' and `at: now`, which is exactly what we
|
||||
// want here: the restore is a fresh user edit that should win LWW
|
||||
// against the server's overwrite on the next sync round. No manual
|
||||
// __fieldMeta patching needed.
|
||||
try {
|
||||
await db.table(conflict.tableName).update(conflict.recordId, updates);
|
||||
} catch (err) {
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ import { fire as fireTrigger } from '$lib/triggers/registry';
|
|||
import { checkInlineSuggestion } from '$lib/triggers/inline-suggest';
|
||||
import { getEffectiveUserId, GUEST_USER_ID } from './current-user';
|
||||
import { getEffectiveSpaceId } from './scope/active-space.svelte';
|
||||
import { getCurrentActor } from './events/actor';
|
||||
import type { Actor } from './events/actor';
|
||||
import { getCurrentActor, makeFieldMeta } from './events/actor';
|
||||
import type { Actor, FieldMeta, FieldOrigin } from './events/actor';
|
||||
import { isQuotaError, notifyQuotaExceeded } from './quota-detect';
|
||||
import {
|
||||
SYNC_APP_MAP,
|
||||
|
|
@ -1231,6 +1231,35 @@ db.version(50).upgrade(async (tx) => {
|
|||
}
|
||||
});
|
||||
|
||||
// v51 — Lasts module (docs/plans/lasts-module.md M1).
|
||||
// Mirror sibling to firsts: the *last* time you did/felt/saw something —
|
||||
// either marked manually or surfaced retrospectively by the inference
|
||||
// scanner that watches places/contacts/food/habits for frequency drops.
|
||||
//
|
||||
// Single space-scoped table. Index strategy:
|
||||
// - status for the suspected/confirmed/reclaimed tab filter
|
||||
// - category for the category tab filter
|
||||
// - date for chronological sort + anniversary scans
|
||||
// - recognisedAt for the "recognised X years ago" reminder
|
||||
// - isPinned, isArchived for the standard meta-filters
|
||||
db.version(51).stores({
|
||||
lasts: 'id, status, category, date, recognisedAt, isPinned, isArchived',
|
||||
});
|
||||
|
||||
// v52 — Lasts inference cooldown (docs/plans/lasts-module.md M3).
|
||||
// Records dismissed inference candidates so the scanner doesn't keep
|
||||
// re-suggesting the same place / contact / habit for ~12 months. ID is
|
||||
// deterministic (`${refTable}:${refId}`) for structural idempotency:
|
||||
// re-dismissing the same candidate is a Dexie put no-op-equivalent.
|
||||
//
|
||||
// Plaintext only — refTable/refId/dismissedAt are all metadata, no
|
||||
// user-typed content. Indexed by refTable + dismissedAt so the scanner
|
||||
// can quickly probe "is this place on cooldown?" and the cooldown sweep
|
||||
// can expire entries by age.
|
||||
db.version(52).stores({
|
||||
lastsCooldown: 'id, refTable, dismissedAt, [refTable+refId]',
|
||||
});
|
||||
|
||||
// ─── Sync Routing ──────────────────────────────────────────
|
||||
// SYNC_APP_MAP, TABLE_TO_SYNC_NAME, TABLE_TO_APP, SYNC_NAME_TO_TABLE,
|
||||
// toSyncName() and fromSyncName() are now derived from per-module
|
||||
|
|
@ -1360,29 +1389,22 @@ function trackActivity(
|
|||
}
|
||||
|
||||
/**
|
||||
* Hidden field on every synced record holding per-field LWW timestamps.
|
||||
* Not indexed, not sent to the server in pending-change payloads.
|
||||
* Hidden field on every synced record carrying per-field write metadata.
|
||||
*
|
||||
* Shape: `{ [fieldKey]: FieldMeta }` where `FieldMeta = { at, actor, origin }`.
|
||||
* Replaces the older triple `__fieldTimestamps` + `__fieldActors` +
|
||||
* `__lastActor` — same information, single source of truth.
|
||||
*
|
||||
* Not indexed, never sent to the server as a top-level payload field
|
||||
* (the wire format carries it as part of `change.fields[k]` instead).
|
||||
*
|
||||
* For `__lastActor` consumers: the previous "actor that last wrote the
|
||||
* record as a whole" is now derived as `__fieldMeta[argmax(at)].actor`.
|
||||
*/
|
||||
export const FIELD_TIMESTAMPS_KEY = '__fieldTimestamps';
|
||||
/**
|
||||
* Hidden field holding the {@link Actor} that last wrote the record as a
|
||||
* whole. Used by the Workbench UI to badge records the AI has touched.
|
||||
*/
|
||||
export const LAST_ACTOR_KEY = '__lastActor';
|
||||
/**
|
||||
* Hidden field holding the per-field {@link Actor} map, mirroring
|
||||
* `__fieldTimestamps`. Enables "the AI changed the due date, the user
|
||||
* changed the title" attribution when rendering diffs.
|
||||
*/
|
||||
export const FIELD_ACTORS_KEY = '__fieldActors';
|
||||
export const FIELD_META_KEY = '__fieldMeta';
|
||||
|
||||
function isInternalKey(key: string): boolean {
|
||||
return (
|
||||
key === 'id' ||
|
||||
key === FIELD_TIMESTAMPS_KEY ||
|
||||
key === LAST_ACTOR_KEY ||
|
||||
key === FIELD_ACTORS_KEY
|
||||
);
|
||||
return key === 'id' || key === FIELD_META_KEY;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1392,8 +1414,7 @@ function isInternalKey(key: string): boolean {
|
|||
* creating-hook continues to stamp `userId` on these; data tables
|
||||
* (tasks, events, tags, …) stopped carrying `userId` in Phase 2c of
|
||||
* the space-scoped data model rollout — attribution there lives on
|
||||
* the Actor fields (`__lastActor` / `__fieldActors`) and tenancy on
|
||||
* `spaceId`.
|
||||
* `__fieldMeta` and tenancy on `spaceId`.
|
||||
*
|
||||
* Keeping this list explicit instead of inferring by naming
|
||||
* convention: the audit in docs/plans/space-scoped-data-model.md
|
||||
|
|
@ -1476,27 +1497,22 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
}
|
||||
}
|
||||
|
||||
// Stamp every real field with the create-time so future LWW comparisons
|
||||
// have a baseline, and with the actor so field-level attribution works.
|
||||
// Mutates obj in place — Dexie persists the mutation.
|
||||
const ft: Record<string, string> = {};
|
||||
const fa: Record<string, Actor> = {};
|
||||
// Stamp every user-data field with `__fieldMeta[key] = { at, actor, origin }`.
|
||||
// `at` drives field-LWW ordering, `actor` carries attribution forward
|
||||
// across renames, `origin` distinguishes user edits from system /
|
||||
// migration / agent / server-replay writes for conflict-detection.
|
||||
// F1 hardcodes `origin: 'user'` here — F2 will derive it from the
|
||||
// active actor.kind so AI-runner writes land as `'agent'` etc.
|
||||
const origin: FieldOrigin = 'user';
|
||||
const fieldMeta: Record<string, FieldMeta> = {};
|
||||
for (const key of Object.keys(obj)) {
|
||||
if (isInternalKey(key)) continue;
|
||||
ft[key] = now;
|
||||
fa[key] = actor;
|
||||
fieldMeta[key] = makeFieldMeta(now, actor, origin);
|
||||
}
|
||||
objRecord[FIELD_TIMESTAMPS_KEY] = ft;
|
||||
objRecord[FIELD_ACTORS_KEY] = fa;
|
||||
objRecord[LAST_ACTOR_KEY] = actor;
|
||||
objRecord[FIELD_META_KEY] = fieldMeta;
|
||||
|
||||
// Build payload for pending-change WITHOUT the internal bookkeeping fields
|
||||
const {
|
||||
[FIELD_TIMESTAMPS_KEY]: _ft,
|
||||
[FIELD_ACTORS_KEY]: _fa,
|
||||
[LAST_ACTOR_KEY]: _la,
|
||||
...dataForSync
|
||||
} = obj as Record<string, unknown>;
|
||||
// Build payload for pending-change WITHOUT the internal bookkeeping field.
|
||||
const { [FIELD_META_KEY]: _fm, ...dataForSync } = obj as Record<string, unknown>;
|
||||
|
||||
trackPendingChange(tableName, {
|
||||
appId,
|
||||
|
|
@ -1505,6 +1521,7 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
op: 'insert',
|
||||
data: dataForSync,
|
||||
actor,
|
||||
origin,
|
||||
createdAt: now,
|
||||
spaceId: typeof objRecord.spaceId === 'string' ? (objRecord.spaceId as string) : undefined,
|
||||
});
|
||||
|
|
@ -1525,7 +1542,8 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
if (_applyingTables.has(tableName)) return undefined;
|
||||
const now = new Date().toISOString();
|
||||
const actor: Actor = getCurrentActor();
|
||||
const fields: Record<string, { value: unknown; updatedAt: string }> = {};
|
||||
const origin: FieldOrigin = 'user';
|
||||
const fields: Record<string, { value: unknown; at: string }> = {};
|
||||
|
||||
// userId is immutable after creation. Silently strip any attempt to
|
||||
// reassign it from a local update so a buggy or malicious caller can
|
||||
|
|
@ -1542,23 +1560,19 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
if ('spaceId' in mods) delete mods.spaceId;
|
||||
if ('authorId' in mods) delete mods.authorId;
|
||||
|
||||
// Merge field timestamps and field actors: keep existing, overwrite
|
||||
// each modified field with now / current actor.
|
||||
const existingFT =
|
||||
((obj as Record<string, unknown>)[FIELD_TIMESTAMPS_KEY] as
|
||||
| Record<string, string>
|
||||
// Merge __fieldMeta: keep existing entries (so untouched fields
|
||||
// retain their original at/actor/origin), overwrite each modified
|
||||
// field with the current write's metadata.
|
||||
const existingMeta =
|
||||
((obj as Record<string, unknown>)[FIELD_META_KEY] as
|
||||
| Record<string, FieldMeta>
|
||||
| undefined) ?? {};
|
||||
const existingFA =
|
||||
((obj as Record<string, unknown>)[FIELD_ACTORS_KEY] as Record<string, Actor> | undefined) ??
|
||||
{};
|
||||
const newFT: Record<string, string> = { ...existingFT };
|
||||
const newFA: Record<string, Actor> = { ...existingFA };
|
||||
const newMeta: Record<string, FieldMeta> = { ...existingMeta };
|
||||
|
||||
for (const [key, value] of Object.entries(modifications)) {
|
||||
if (isInternalKey(key)) continue;
|
||||
fields[key] = { value, updatedAt: now };
|
||||
newFT[key] = now;
|
||||
newFA[key] = actor;
|
||||
fields[key] = { value, at: now };
|
||||
newMeta[key] = makeFieldMeta(now, actor, origin);
|
||||
}
|
||||
|
||||
const op = (modifications as Record<string, unknown>).deletedAt ? 'delete' : 'update';
|
||||
|
|
@ -1577,6 +1591,7 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
op,
|
||||
fields,
|
||||
actor,
|
||||
origin,
|
||||
deletedAt: (modifications as Record<string, unknown>).deletedAt as string | undefined,
|
||||
createdAt: now,
|
||||
spaceId: existingSpaceId,
|
||||
|
|
@ -1585,13 +1600,10 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
fireTrigger(appId, tableName, op, modifications as Record<string, unknown>);
|
||||
|
||||
// Returning an object from a Dexie 'updating' hook merges it into the
|
||||
// modifications applied to the record — use this to persist the new
|
||||
// per-field timestamps, per-field actors, and last-actor alongside
|
||||
// the user's update.
|
||||
// modifications applied to the record — use this to persist the merged
|
||||
// __fieldMeta alongside the user's data update.
|
||||
return {
|
||||
[FIELD_TIMESTAMPS_KEY]: newFT,
|
||||
[FIELD_ACTORS_KEY]: newFA,
|
||||
[LAST_ACTOR_KEY]: actor,
|
||||
[FIELD_META_KEY]: newMeta,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,6 +28,8 @@ export type {
|
|||
AiActor,
|
||||
SystemActor,
|
||||
SystemSource,
|
||||
FieldMeta,
|
||||
FieldOrigin,
|
||||
} from '@mana/shared-ai';
|
||||
export {
|
||||
SYSTEM_PROJECTION,
|
||||
|
|
@ -47,6 +49,8 @@ export {
|
|||
isAiActor,
|
||||
isSystemActor,
|
||||
isFromMissionRunner,
|
||||
makeFieldMeta,
|
||||
isUserOriginatedField,
|
||||
} from '@mana/shared-ai';
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@
|
|||
* real user id.
|
||||
*/
|
||||
|
||||
import { db, SYNC_APP_MAP, FIELD_TIMESTAMPS_KEY } from './database';
|
||||
import { db, SYNC_APP_MAP, FIELD_META_KEY } from './database';
|
||||
import { GUEST_USER_ID } from './current-user';
|
||||
import { encryptRecord } from './crypto/record-helpers';
|
||||
import { waitForActiveKey } from './crypto/key-provider';
|
||||
|
|
@ -110,9 +110,9 @@ export async function migrateGuestDataToUser(newUserId: string): Promise<GuestMi
|
|||
// Strip the bookkeeping fields the creating-hook will rebuild.
|
||||
// Importantly, drop `userId` so the hook stamps the new id from
|
||||
// getEffectiveUserId() instead of preserving 'guest'.
|
||||
const { userId: _oldUser, [FIELD_TIMESTAMPS_KEY]: _oldFt, ...clean } = record;
|
||||
const { userId: _oldUser, [FIELD_META_KEY]: _oldMeta, ...clean } = record;
|
||||
void _oldUser;
|
||||
void _oldFt;
|
||||
void _oldMeta;
|
||||
|
||||
// Catch-up encryption: guest writes left these fields as
|
||||
// plaintext because no key was available. Now that the
|
||||
|
|
|
|||
|
|
@ -35,13 +35,14 @@ vi.mock('$lib/triggers/inline-suggest', () => ({
|
|||
|
||||
import {
|
||||
isValidSyncChange,
|
||||
readFieldTimestamps,
|
||||
readFieldMeta,
|
||||
applyServerChanges,
|
||||
subscribeSyncConflicts,
|
||||
type SyncChange,
|
||||
type SyncConflictPayload,
|
||||
} from './sync';
|
||||
import { db, FIELD_TIMESTAMPS_KEY } from './database';
|
||||
import { db, FIELD_META_KEY } from './database';
|
||||
import { makeFieldMeta, USER_ACTOR } from './events/actor';
|
||||
|
||||
// ─── Pure tests ──────────────────────────────────────────────────
|
||||
|
||||
|
|
@ -63,8 +64,8 @@ describe('isValidSyncChange', () => {
|
|||
id: 'task-1',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'updated', updatedAt: '2026-04-01T10:00:00Z' },
|
||||
priority: { value: 'high', updatedAt: '2026-04-01T10:01:00Z' },
|
||||
title: { value: 'updated', at: '2026-04-01T10:00:00Z' },
|
||||
priority: { value: 'high', at: '2026-04-01T10:01:00Z' },
|
||||
},
|
||||
};
|
||||
expect(isValidSyncChange(change)).toBe(true);
|
||||
|
|
@ -112,12 +113,12 @@ describe('isValidSyncChange', () => {
|
|||
})
|
||||
).toBe(false);
|
||||
|
||||
// updatedAt must be a string when present
|
||||
// `at` must be a string when present
|
||||
expect(
|
||||
isValidSyncChange({
|
||||
...baseInsert,
|
||||
op: 'update',
|
||||
fields: { title: { value: 'x', updatedAt: 12345 } },
|
||||
fields: { title: { value: 'x', at: 12345 } },
|
||||
})
|
||||
).toBe(false);
|
||||
});
|
||||
|
|
@ -131,25 +132,28 @@ describe('isValidSyncChange', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('readFieldTimestamps', () => {
|
||||
it('returns the field-timestamps map when present', () => {
|
||||
const ft = { title: '2026-04-01T10:00:00Z', priority: '2026-04-01T11:00:00Z' };
|
||||
const record = { id: 'x', [FIELD_TIMESTAMPS_KEY]: ft };
|
||||
expect(readFieldTimestamps(record)).toEqual(ft);
|
||||
describe('readFieldMeta', () => {
|
||||
it('returns the field-meta map when present', () => {
|
||||
const fieldMeta = {
|
||||
title: makeFieldMeta('2026-04-01T10:00:00Z', USER_ACTOR, 'user'),
|
||||
priority: makeFieldMeta('2026-04-01T11:00:00Z', USER_ACTOR, 'user'),
|
||||
};
|
||||
const record = { id: 'x', [FIELD_META_KEY]: fieldMeta };
|
||||
expect(readFieldMeta(record)).toEqual(fieldMeta);
|
||||
});
|
||||
|
||||
it('returns an empty map when the field is missing (legacy record)', () => {
|
||||
expect(readFieldTimestamps({ id: 'x' })).toEqual({});
|
||||
expect(readFieldMeta({ id: 'x' })).toEqual({});
|
||||
});
|
||||
|
||||
it('handles null and non-object inputs gracefully', () => {
|
||||
expect(readFieldTimestamps(null)).toEqual({});
|
||||
expect(readFieldTimestamps(undefined)).toEqual({});
|
||||
expect(readFieldTimestamps(42)).toEqual({});
|
||||
expect(readFieldMeta(null)).toEqual({});
|
||||
expect(readFieldMeta(undefined)).toEqual({});
|
||||
expect(readFieldMeta(42)).toEqual({});
|
||||
});
|
||||
|
||||
it('returns an empty map if __fieldTimestamps is not an object', () => {
|
||||
expect(readFieldTimestamps({ id: 'x', [FIELD_TIMESTAMPS_KEY]: 'not-a-map' })).toEqual({});
|
||||
it('returns an empty map if __fieldMeta is not an object', () => {
|
||||
expect(readFieldMeta({ id: 'x', [FIELD_META_KEY]: 'not-a-map' })).toEqual({});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -169,7 +173,7 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
}
|
||||
});
|
||||
|
||||
it('inserts a new record with __fieldTimestamps populated', async () => {
|
||||
it('inserts a new record with __fieldMeta populated', async () => {
|
||||
await applyServerChanges('todo', [
|
||||
{
|
||||
table: 'tasks',
|
||||
|
|
@ -189,9 +193,11 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
const stored = await db.table('tasks').get('task-A');
|
||||
expect(stored).toBeDefined();
|
||||
expect(stored.title).toBe('Buy milk');
|
||||
const ft = readFieldTimestamps(stored);
|
||||
expect(ft.title).toBe('2026-04-01T10:00:00Z');
|
||||
expect(ft.priority).toBe('2026-04-01T10:00:00Z');
|
||||
const fm = readFieldMeta(stored);
|
||||
expect(fm.title?.at).toBe('2026-04-01T10:00:00Z');
|
||||
expect(fm.priority?.at).toBe('2026-04-01T10:00:00Z');
|
||||
// applyServerChanges stamps replays with origin='server-replay'
|
||||
expect(fm.title?.origin).toBe('server-replay');
|
||||
});
|
||||
|
||||
it('field-level LWW: server wins per-field when newer', async () => {
|
||||
|
|
@ -213,8 +219,8 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
id: 'task-B',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'new title', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
priority: { value: 'high', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
title: { value: 'new title', at: '2099-01-01T00:00:00Z' },
|
||||
priority: { value: 'high', at: '2099-01-01T00:00:00Z' },
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
|
@ -223,9 +229,9 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
expect(stored.title).toBe('new title');
|
||||
expect(stored.priority).toBe('high');
|
||||
|
||||
const ft = readFieldTimestamps(stored);
|
||||
expect(ft.title).toBe('2099-01-01T00:00:00Z');
|
||||
expect(ft.priority).toBe('2099-01-01T00:00:00Z');
|
||||
const fm = readFieldMeta(stored);
|
||||
expect(fm.title?.at).toBe('2099-01-01T00:00:00Z');
|
||||
expect(fm.priority?.at).toBe('2099-01-01T00:00:00Z');
|
||||
});
|
||||
|
||||
it('field-level LWW: split outcome when one field is newer and one older', async () => {
|
||||
|
|
@ -257,8 +263,8 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
id: 'task-C',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'server title (loser)', updatedAt: '1970-01-01T00:00:00Z' },
|
||||
priority: { value: 'medium (winner)', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
title: { value: 'server title (loser)', at: '1970-01-01T00:00:00Z' },
|
||||
priority: { value: 'medium (winner)', at: '2099-01-01T00:00:00Z' },
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
|
@ -381,7 +387,7 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
id: 'task-conflict-1',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'their version', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
title: { value: 'their version', at: '2099-01-01T00:00:00Z' },
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
|
@ -413,7 +419,7 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
id: 'task-conflict-2',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'first server title', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
title: { value: 'first server title', at: '2099-01-01T00:00:00Z' },
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
|
@ -438,7 +444,7 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
id: 'task-conflict-3',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'same value', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
title: { value: 'same value', at: '2099-01-01T00:00:00Z' },
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
|
@ -463,8 +469,8 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
id: 'task-conflict-4',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'server title', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
priority: { value: 'high', updatedAt: '2099-01-01T00:00:00Z' },
|
||||
title: { value: 'server title', at: '2099-01-01T00:00:00Z' },
|
||||
priority: { value: 'high', at: '2099-01-01T00:00:00Z' },
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
|
@ -500,7 +506,7 @@ describe('applyServerChanges (Dexie integration)', () => {
|
|||
id: 'task-conflict-5',
|
||||
op: 'update',
|
||||
fields: {
|
||||
title: { value: 'changed', updatedAt: '2026-04-01T00:00:00Z' }, // exact tie
|
||||
title: { value: 'changed', at: '2026-04-01T00:00:00Z' }, // exact tie
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
|
|
|||
|
|
@ -20,20 +20,28 @@ import {
|
|||
toSyncName,
|
||||
fromSyncName,
|
||||
beginApplyingTables,
|
||||
FIELD_TIMESTAMPS_KEY,
|
||||
FIELD_ACTORS_KEY,
|
||||
LAST_ACTOR_KEY,
|
||||
FIELD_META_KEY,
|
||||
setPendingChangeListener,
|
||||
} from './database';
|
||||
import { isQuotaError, cleanupTombstones, notifyQuotaExceeded } from './quota';
|
||||
import { emitSyncTelemetry, categorizeSyncError } from './sync-telemetry';
|
||||
import { USER_ACTOR, type Actor } from './events/actor';
|
||||
import {
|
||||
USER_ACTOR,
|
||||
makeFieldMeta,
|
||||
type Actor,
|
||||
type FieldMeta,
|
||||
type FieldOrigin,
|
||||
} from './events/actor';
|
||||
|
||||
/** Reads the per-field actor map off a record; empty for legacy rows. */
|
||||
function readFieldActors(record: unknown): Record<string, Actor> {
|
||||
/**
|
||||
* Reads the per-field write metadata off a record. Returns an empty
|
||||
* map for records that pre-date the field-meta migration so callers
|
||||
* can fall back to record-level `updatedAt`.
|
||||
*/
|
||||
export function readFieldMeta(record: unknown): Record<string, FieldMeta> {
|
||||
if (!record || typeof record !== 'object') return {};
|
||||
const fa = (record as Record<string, unknown>)[FIELD_ACTORS_KEY];
|
||||
return fa && typeof fa === 'object' ? (fa as Record<string, Actor>) : {};
|
||||
const fm = (record as Record<string, unknown>)[FIELD_META_KEY];
|
||||
return fm && typeof fm === 'object' ? (fm as Record<string, FieldMeta>) : {};
|
||||
}
|
||||
|
||||
// ─── Types ────────────────────────────────────────────────────
|
||||
|
|
@ -41,10 +49,12 @@ function readFieldActors(record: unknown): Record<string, Actor> {
|
|||
/** Operations the sync protocol supports. */
|
||||
export type SyncOp = 'insert' | 'update' | 'delete';
|
||||
|
||||
/** A single field-level change carrying its own LWW timestamp. */
|
||||
/** A single field-level change carrying its own LWW timestamp.
|
||||
* Per-field actor + origin live at the row level on `SyncChange`
|
||||
* (each push = one actor + one origin), never per field. */
|
||||
export interface FieldChange {
|
||||
value: unknown;
|
||||
updatedAt: string;
|
||||
at: string;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -87,6 +97,15 @@ export interface SyncChange {
|
|||
* for back-compat with pre-actor clients.
|
||||
*/
|
||||
actor?: Actor;
|
||||
/**
|
||||
* Pipeline origin of the write — what kind of code path produced the
|
||||
* value. Drives conflict-detection: only `'user'`-origin writes can
|
||||
* lose to a later server overwrite and surface as a conflict toast.
|
||||
* Server-applied pulls always take `'server-replay'` regardless of
|
||||
* what the originating client stamped, because the local client
|
||||
* never typed the value itself.
|
||||
*/
|
||||
origin?: FieldOrigin;
|
||||
}
|
||||
|
||||
interface PendingChange {
|
||||
|
|
@ -99,6 +118,7 @@ interface PendingChange {
|
|||
data?: Record<string, unknown>;
|
||||
deletedAt?: string;
|
||||
actor?: Actor;
|
||||
origin?: FieldOrigin;
|
||||
createdAt: string;
|
||||
/**
|
||||
* The Space (Better Auth organization id) the record belongs to. Stamped
|
||||
|
|
@ -126,7 +146,7 @@ interface SyncMeta {
|
|||
function isFieldChange(v: unknown): v is FieldChange {
|
||||
if (!v || typeof v !== 'object') return false;
|
||||
const f = v as Record<string, unknown>;
|
||||
return 'value' in f && (f.updatedAt === undefined || typeof f.updatedAt === 'string');
|
||||
return 'value' in f && (f.at === undefined || typeof f.at === 'string');
|
||||
}
|
||||
|
||||
function isFieldsMap(v: unknown): v is Record<string, FieldChange> {
|
||||
|
|
@ -162,22 +182,15 @@ export function isValidSyncChange(v: unknown): v is SyncChange {
|
|||
// malformed actor doesn't corrupt data; worst case the Workbench shows
|
||||
// "unknown" for that change.
|
||||
if (c.actor !== undefined && (typeof c.actor !== 'object' || c.actor === null)) return false;
|
||||
// `origin` is a fixed enum on the producing side; we accept any string
|
||||
// here so a future enum extension on the server doesn't fail validation
|
||||
// on older clients. The apply-path forces `'server-replay'` regardless.
|
||||
if (c.origin !== undefined && typeof c.origin !== 'string') return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// ─── Apply Server Changes (top-level so unit tests can import directly) ──
|
||||
|
||||
/**
|
||||
* Reads the per-field LWW timestamps off a record. Returns an empty map for
|
||||
* legacy records that pre-date __fieldTimestamps so callers can fall back to
|
||||
* record-level `updatedAt`.
|
||||
*/
|
||||
export function readFieldTimestamps(record: unknown): Record<string, string> {
|
||||
if (!record || typeof record !== 'object') return {};
|
||||
const ft = (record as Record<string, unknown>)[FIELD_TIMESTAMPS_KEY];
|
||||
return ft && typeof ft === 'object' ? (ft as Record<string, string>) : {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies a batch of server changes to the local Dexie database with
|
||||
* field-level Last-Write-Wins conflict resolution.
|
||||
|
|
@ -320,24 +333,33 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
|
|||
for (const change of tableChanges) {
|
||||
const recordId = change.id;
|
||||
|
||||
// All writes from this path are server-replays from the
|
||||
// local client's perspective: even an originally `'user'`-
|
||||
// origin push from another device arrives here as a pull,
|
||||
// and the local user never typed it. Stamping `'server-replay'`
|
||||
// keeps conflict-detection (F2) from treating later
|
||||
// overwrites as "lost edits".
|
||||
const replayOrigin: FieldOrigin = 'server-replay';
|
||||
|
||||
if (change.deletedAt || change.op === 'delete') {
|
||||
const existing = await table.get(recordId);
|
||||
if (!existing) continue;
|
||||
if (change.deletedAt) {
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localMeta = readFieldMeta(existing);
|
||||
const serverTime = change.deletedAt;
|
||||
const localDeletedAtTime =
|
||||
localFT.deletedAt ??
|
||||
localMeta.deletedAt?.at ??
|
||||
((existing as Record<string, unknown>).deletedAt as string | undefined) ??
|
||||
'';
|
||||
if (serverTime >= localDeletedAtTime) {
|
||||
const tombActor: Actor = change.actor ?? USER_ACTOR;
|
||||
await table.update(recordId, {
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
[FIELD_TIMESTAMPS_KEY]: {
|
||||
...localFT,
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
[FIELD_META_KEY]: {
|
||||
...localMeta,
|
||||
deletedAt: makeFieldMeta(serverTime, tombActor, replayOrigin),
|
||||
updatedAt: makeFieldMeta(serverTime, tombActor, replayOrigin),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
@ -359,51 +381,32 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
|
|||
const changeActor: Actor = change.actor ?? USER_ACTOR;
|
||||
|
||||
if (!existing) {
|
||||
const ft: Record<string, string> = {};
|
||||
const fa: Record<string, Actor> = {};
|
||||
const fieldMeta: Record<string, FieldMeta> = {};
|
||||
for (const key of Object.keys(changeData)) {
|
||||
if (
|
||||
key === 'id' ||
|
||||
key === FIELD_TIMESTAMPS_KEY ||
|
||||
key === FIELD_ACTORS_KEY ||
|
||||
key === LAST_ACTOR_KEY
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
ft[key] = recordTime;
|
||||
fa[key] = changeActor;
|
||||
if (key === 'id' || key === FIELD_META_KEY) continue;
|
||||
fieldMeta[key] = makeFieldMeta(recordTime, changeActor, replayOrigin);
|
||||
}
|
||||
await table.put({
|
||||
...changeData,
|
||||
id: recordId,
|
||||
[FIELD_TIMESTAMPS_KEY]: ft,
|
||||
[FIELD_ACTORS_KEY]: fa,
|
||||
[LAST_ACTOR_KEY]: changeActor,
|
||||
[FIELD_META_KEY]: fieldMeta,
|
||||
});
|
||||
} else {
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localFA = readFieldActors(existing);
|
||||
const localMeta = readFieldMeta(existing);
|
||||
const localUpdatedAt =
|
||||
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
const newFA: Record<string, Actor> = { ...localFA };
|
||||
const newMeta: Record<string, FieldMeta> = { ...localMeta };
|
||||
|
||||
for (const [key, val] of Object.entries(changeData)) {
|
||||
if (
|
||||
key === 'id' ||
|
||||
key === FIELD_TIMESTAMPS_KEY ||
|
||||
key === FIELD_ACTORS_KEY ||
|
||||
key === LAST_ACTOR_KEY
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
if (key === 'id' || key === FIELD_META_KEY) continue;
|
||||
const localFieldTime = localMeta[key]?.at ?? localUpdatedAt;
|
||||
if (recordTime >= localFieldTime) {
|
||||
// Conflict signal: server STRICTLY wins (>) and the local
|
||||
// field had a non-empty value that differs from the new
|
||||
// one. Equal-time ties don't fire because there's no
|
||||
// edit to lose.
|
||||
// edit to lose. F2 will additionally gate this on
|
||||
// localMeta[key].origin === 'user'.
|
||||
const localValue = (existing as Record<string, unknown>)[key];
|
||||
if (
|
||||
recordTime > localFieldTime &&
|
||||
|
|
@ -421,14 +424,11 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
|
|||
});
|
||||
}
|
||||
updates[key] = val;
|
||||
newFT[key] = recordTime;
|
||||
newFA[key] = changeActor;
|
||||
newMeta[key] = makeFieldMeta(recordTime, changeActor, replayOrigin);
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
updates[FIELD_ACTORS_KEY] = newFA;
|
||||
updates[LAST_ACTOR_KEY] = changeActor;
|
||||
updates[FIELD_META_KEY] = newMeta;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
|
|
@ -443,35 +443,29 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
|
|||
// record was deleted locally — recreate it under the server's
|
||||
// authority.
|
||||
const record: Record<string, unknown> = { id: recordId };
|
||||
const ft: Record<string, string> = {};
|
||||
const fa: Record<string, Actor> = {};
|
||||
const fieldMeta: Record<string, FieldMeta> = {};
|
||||
const fallback = new Date().toISOString();
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
record[key] = fc.value;
|
||||
ft[key] = fc.updatedAt ?? fallback;
|
||||
fa[key] = changeActor;
|
||||
fieldMeta[key] = makeFieldMeta(fc.at ?? fallback, changeActor, replayOrigin);
|
||||
}
|
||||
record[FIELD_TIMESTAMPS_KEY] = ft;
|
||||
record[FIELD_ACTORS_KEY] = fa;
|
||||
record[LAST_ACTOR_KEY] = changeActor;
|
||||
record[FIELD_META_KEY] = fieldMeta;
|
||||
await table.put(record);
|
||||
} else {
|
||||
// Per-field comparison. Falls back to record-level updatedAt
|
||||
// only for legacy records that pre-date __fieldTimestamps.
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localFA = readFieldActors(existing);
|
||||
// Per-field comparison.
|
||||
const localMeta = readFieldMeta(existing);
|
||||
const localUpdatedAt =
|
||||
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
const newFA: Record<string, Actor> = { ...localFA };
|
||||
const newMeta: Record<string, FieldMeta> = { ...localMeta };
|
||||
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
const serverTime = fc.updatedAt ?? '';
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
const serverTime = fc.at ?? '';
|
||||
const localFieldTime = localMeta[key]?.at ?? localUpdatedAt;
|
||||
if (serverTime >= localFieldTime) {
|
||||
// Same conflict criteria as the insert-as-update path:
|
||||
// strictly newer + non-empty local + actually different.
|
||||
// F2 will additionally gate on localMeta[key].origin === 'user'.
|
||||
const localValue = (existing as Record<string, unknown>)[key];
|
||||
if (
|
||||
serverTime > localFieldTime &&
|
||||
|
|
@ -489,14 +483,11 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
|
|||
});
|
||||
}
|
||||
updates[key] = fc.value;
|
||||
newFT[key] = serverTime;
|
||||
newFA[key] = changeActor;
|
||||
newMeta[key] = makeFieldMeta(serverTime, changeActor, replayOrigin);
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
updates[FIELD_ACTORS_KEY] = newFA;
|
||||
updates[LAST_ACTOR_KEY] = changeActor;
|
||||
updates[FIELD_META_KEY] = newMeta;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -198,8 +198,8 @@ export const tagPresetsStore = {
|
|||
}
|
||||
|
||||
// Encrypt + write each row. The Dexie creating-hook stamps
|
||||
// __lastActor / __fieldActors automatically; spaceId is
|
||||
// pre-populated here so the hook leaves it alone.
|
||||
// __fieldMeta automatically; spaceId is pre-populated here so
|
||||
// the hook leaves it alone.
|
||||
await db.transaction('rw', db.table('globalTags'), db.table('tagGroups'), async () => {
|
||||
for (const group of groupsToWrite) {
|
||||
await encryptRecord('tagGroups', group);
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ export const memosStore = {
|
|||
// createdAt + updatedAt are required by LocalMemo's type but the
|
||||
// previous create() never set them — DetailView showed
|
||||
// "Erstellt: Invalid Date" for every memo. The Dexie creating
|
||||
// hook only auto-stamps userId + __fieldTimestamps; module
|
||||
// hook only auto-stamps userId + __fieldMeta; module
|
||||
// stores have to set their own createdAt/updatedAt explicitly
|
||||
// (consistent with the rest of the Mana modules).
|
||||
createdAt: now,
|
||||
|
|
|
|||
|
|
@ -94,8 +94,8 @@ describe('notes encryption pilot', () => {
|
|||
expect(stored.isPinned).toBe(false);
|
||||
expect(stored.isArchived).toBe(false);
|
||||
expect(stored.userId).toBe('test-user');
|
||||
// Auto-stamped __fieldTimestamps stays plaintext too — LWW relies on it.
|
||||
expect((stored as unknown as Record<string, unknown>).__fieldTimestamps).toBeDefined();
|
||||
// Auto-stamped __fieldMeta stays plaintext too — LWW relies on it.
|
||||
expect((stored as unknown as Record<string, unknown>).__fieldMeta).toBeDefined();
|
||||
});
|
||||
|
||||
it('updates encrypt the modified content fields, leave flags untouched', async () => {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,20 @@
|
|||
# LLM-Fallback via Model-Aliases — Plan
|
||||
|
||||
_Drafted 2026-04-26. Status: **spec**, nicht implementiert. Bau-Trigger: User-Entscheidung nach dem Schreiben-Modul-Generation-Ausfall heute (GPU-Server offline → 75 s Hang → mana-llm 500 → Frontend "Fehlgeschlagen")._
|
||||
_Drafted 2026-04-26. **Status: SHIPPED** (M1–M5 alle gemerged am 2026-04-26)._
|
||||
Auslöser: Schreiben-Modul-Generation-Ausfall (GPU-Server offline → 75 s Hang → mana-llm 500). Lösung: in einem Tag in fünf Schritten gebaut + getestet (115 Tests grün).
|
||||
|
||||
| M | Commit | Geliefert |
|
||||
|---|---|---|
|
||||
| M1 | `dff8629e1` | `AliasRegistry` + `aliases.yaml` SSOT, 32 Tests |
|
||||
| M2 | `59557e62d` | `ProviderHealthCache` + `HealthProbe`, 32 Tests |
|
||||
| M3 | `3046da3b1` | `_execute_with_fallback`, Streaming pre-first-byte, 22 Tests, alle Legacy-Fallback-Pfade purged |
|
||||
| M4 | `8a49e3ffd` | `X-Mana-LLM-Resolved`-Header, 3 Prometheus-Metriken, `/v1/aliases` + `/v1/health` Endpoints, SIGHUP-Reload, 16 Tests |
|
||||
| M5 | `fea3adf5f` | 14 Consumer-Sites migriert, SSOT in `@mana/shared-ai`, `validate-llm-strings.mjs` Drift-Gate über 2538 Files |
|
||||
|
||||
**Status der "Open Questions" am Ende:** alle drei dokumentiert geblieben (kein Showstopper für Phase 1):
|
||||
- 429-Rate-Limit kürzeres Backoff: aktuell wie ConnectError behandelt; Refinement bei Bedarf.
|
||||
- Alias-Versionierung: nicht nötig solange Reload atomar bleibt.
|
||||
- mana-credits Modell→Preis-Tabelle: bei nächster Credits-Code-Änderung prüfen.
|
||||
|
||||
Macht die LLM-Pipeline resilient gegen Provider-Ausfälle (heute: GPU-Server `mana-gpu` offline, Ollama unerreichbar; morgen: Groq-API-Limit, Anthropic-Outage, …). Statt jedem Consumer eine Retry-Logik mit konkreten Modell-Strings beizubringen, gibt mana-llm zukünftig **Model-Aliases** aus, die Health-Cache-bewusst auf eine Provider-Chain auflösen. Consumer-Code kennt nur noch `mana/long-form`, nicht mehr `ollama/gemma3:12b`.
|
||||
|
||||
|
|
|
|||
173
docs/plans/sync-field-meta-overhaul.md
Normal file
173
docs/plans/sync-field-meta-overhaul.md
Normal file
|
|
@ -0,0 +1,173 @@
|
|||
# Sync Field-Meta Overhaul
|
||||
|
||||
_Started 2026-04-26._
|
||||
_Pre-live assumption: no production data, no live clients. Hard-cut everywhere — kein Dual-Write, keine Schema-Migration, kein Translation-Layer. `mana_sync.sync_changes` wird im Zuge des Rollouts truncated, alle Browser-IndexedDBs laufen einmalig durch ein neues Dexie-Versions-Upgrade, fertig._
|
||||
|
||||
## Problem
|
||||
|
||||
Die heutige Conflict-Detection feuert massiv False-Positives, beobachtet konkret als 4 Toasts beim Start einer frischen Dev-Session:
|
||||
|
||||
- 3× `meImages überschrieben — Feld updatedAt`
|
||||
- 1× `userContext überschrieben — 10 Felder`
|
||||
|
||||
Diagnose im Detail in der Conversation, die diesen Plan ausgelöst hat. Vier strukturelle Wurzeln:
|
||||
|
||||
1. **`updatedAt` ist ein syncbares User-Datenfeld.** Bei jedem Write wird ein neuer ISO-String geschrieben. Sobald zwei Sessions denselben Record berühren, divergieren ihre `updatedAt`-Strings zwingend → Field-LWW erkennt das als Konflikt. Strukturell garantierter Konflikt-Trigger.
|
||||
|
||||
2. **Conflict-Detection unterscheidet nicht zwischen User-Edit und Replay-Delta.** `applyServerChanges` (`apps/mana/apps/web/src/lib/data/sync.ts:469-490`) vergleicht den Server-Wert gegen den lokalen Wert. Der lokale Wert kann aus drei Quellen stammen: echter User-Edit, vorherige Iteration desselben Pulls, vorheriger Pull. Conflict-Toast soll nur bei (1) feuern, feuert aber bei allen drei.
|
||||
|
||||
3. **Singletons werden clientseitig race-anfällig erstellt.** `userContextStore.ensureDoc()` (`profile/stores/user-context.svelte.ts:21-27`) und analoge Patterns für `kontextDoc`. Mehrfach-Inserts derselben ID landen im Insert-as-Update-Merge-Pfad in `sync.ts:380-432`, der für jedes Feld einen Konflikt-Vergleich macht.
|
||||
|
||||
4. **`client_id` ist an localStorage gekoppelt.** `getOrCreateClientId()` in `sync.ts:1226`. Browser-State-Wipe → neue ID. Konkret: 5 distinkte `client_id`s für eine User-Identity in der lokalen `mana_sync` (Query-Beweis: 18.04, 23.04, 24.04, 25.04, 26.04 — jeder Tag ein neuer Client). Aus Sync-Sicht wird derselbe physische Browser zu fünf verschiedenen Clients, deren Schreibhistorien sich gegenseitig als "fremde Sitzung" sehen.
|
||||
|
||||
## Decision
|
||||
|
||||
Eine einzige Wahrheitsquelle für Per-Field-Metadata: `__fieldMeta`. Origin-Tracking als Pflichtbestandteil. `updatedAt` wird zum reinen Read-Side-Computed. Singletons werden serverseitig vom `mana-auth`-Bootstrap angelegt. `client_id` lebt in der Dexie-DB.
|
||||
|
||||
### Datenmodell
|
||||
|
||||
Vorher (heute):
|
||||
|
||||
```ts
|
||||
{
|
||||
id, ...userFields,
|
||||
updatedAt: '2026-04-25T11:23:20.212Z', // syncbar, conflict-getrackt
|
||||
__fieldTimestamps: { [field]: ISO },
|
||||
__fieldActors: { [field]: Actor },
|
||||
__lastActor: Actor,
|
||||
}
|
||||
```
|
||||
|
||||
Nachher:
|
||||
|
||||
```ts
|
||||
{
|
||||
id, ...userFields,
|
||||
__fieldMeta: {
|
||||
[field]: {
|
||||
at: ISO,
|
||||
actor: Actor,
|
||||
origin: 'user' | 'agent' | 'system' | 'migration' | 'server-replay',
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`updatedAt` ist nicht mehr im Datensatz, nicht mehr im Sync, nicht mehr in den Drizzle-Schemas. UI-Layer bekommt `updatedAt` als Computed: `max(__fieldMeta[*].at)`, exposed im Type-Converter.
|
||||
|
||||
`__lastActor` entfällt — gleichbedeutend zu `__fieldMeta[max(at)].actor`, on-demand berechnet.
|
||||
|
||||
### Conflict-Trigger
|
||||
|
||||
Neu in `applyServerChanges`:
|
||||
|
||||
```
|
||||
notifyConflict() feuert nur, wenn ALLE gelten:
|
||||
serverTime > localFieldMeta.at
|
||||
localFieldMeta.origin === 'user'
|
||||
localValue != null
|
||||
!valuesEqual(localValue, serverValue)
|
||||
```
|
||||
|
||||
Sobald der lokale Schreibvorgang aus einem Server-Replay kam, ist `origin === 'server-replay'` → kein Conflict-Toast. Sobald das Feld aus einer Migration oder einem System-Bootstrap stammt, ebenfalls kein Toast.
|
||||
|
||||
### Sortier-Indizes
|
||||
|
||||
`db.table('tasks').orderBy('updatedAt')` ist heute über die ganze Codebase verstreut. Da `updatedAt` nicht mehr persistiert wird, wäre Dexie-Sortierung kaputt. Ersatz: ein lokaler, **nicht-syncbarer** Schatten-Index `_updatedAtIndex: ISO`, der vom Dexie-`updating`/`creating`-Hook auf jeden Modify-Vorgang automatisch auf `now` gesetzt wird. Reine Lokal-Spalte ohne Sync-Bedeutung; landet nicht in `pendingChange`. UI-Code, der `orderBy('updatedAt')` macht, switcht auf `orderBy('_updatedAtIndex')` (Sed-Codemod).
|
||||
|
||||
### Singleton-Bootstrap
|
||||
|
||||
`userContext`, `kontextDoc` und alle anderen Singletons werden vom `mana-auth`-Service beim First-Login eines Users in `mana_sync.sync_changes` als `op='insert'` mit `client_id='system:bootstrap'` geschrieben. Inhalt: das Schema-Default (`emptyUserContext()` etc., extrahiert in ein shared Package, das Server + Client teilen).
|
||||
|
||||
Clients pullen den Singleton beim First-Sync. Wenn lokal noch nicht angekommen, blockiert die Profile-View mit einem Loader-State. Kein lokaler `add()` mehr.
|
||||
|
||||
### Stable client_id
|
||||
|
||||
Neue Dexie-Tabelle `_clientIdentity` mit genau einem Row `{ id: 'self', clientId: UUID, createdAt: ISO }`. Identity überlebt localStorage-Wipes; nur ein vollständiger IndexedDB-Reset vergibt eine neue ID. localStorage bleibt als Sync-Read-Cache (vermeidet async-Block in Sync-Pfad), wird bei Miss aus Dexie rehydriert.
|
||||
|
||||
### First-Pull als privilegierter Modus (Belt-and-Suspenders)
|
||||
|
||||
Der allererste Pull eines Clients (Cursor `''`) hat per Definition keine User-Edits, die überschrieben werden könnten. `applyServerChanges` bekommt einen `isInitialHydration: boolean`-Parameter, der jegliche `notifyConflict()`-Aufrufe gatet. Doppelte Sicherheit gegenüber dem Origin-Check.
|
||||
|
||||
## Phasen
|
||||
|
||||
Je Phase ein PR. Pre-live, also pro PR: Code, Tests, manueller Smoke-Test, ohne Soft/Hard-Stages, ohne Backwards-Compat.
|
||||
|
||||
| Phase | Scope | Done-Kriterien |
|
||||
| --- | --- | --- |
|
||||
| **F1** — `__fieldMeta` Hard-Cut | Neue Dexie-Version mit `__fieldMeta`. `__fieldTimestamps`, `__fieldActors`, `__lastActor` aus `database.ts` und allen Lesepfaden gelöscht. Hooks in `database.ts:1497-1610` umgeschrieben. `apps/api`/`mana-sync` Drizzle-Schemas: `field_timestamps` JSONB-Spalte umbenannt zu `field_meta`, neuer Eintragstyp `{at, actor, origin}`. Truncate `mana_sync.sync_changes`. | Alle Reads und Writes nutzen `__fieldMeta`. `validate:all` grün. `_pendingChanges` schreibt `field_meta` statt `field_timestamps`. |
|
||||
| **F2** — Origin-Tracking + Conflict-Gate | Origin-Werte in alle Write-Pfade einsetzen: regulärer User-Write `'user'`, Seeds `'system'`, Repair-Migrationen `'migration'`, AI-Mission-Runner `'agent'`, `applyServerChanges` schreibt `'server-replay'`. Conflict-Trigger in `sync.ts:476-489` umstellen. `isInitialHydration` durch alle Aufrufketten reichen. | `sync.test.ts` umfasst alle Origin-Kombinationen. False-Positive-Replay-Tests grün (10 Server-Changes hintereinander auf demselben Record → 0 Conflicts). User-Edit-vs-Server-Push-Test grün (1 Conflict). |
|
||||
| **F3** — `updatedAt` Hard-Drop | Aus jedem `Local*`-Type, jedem Type-Converter, jedem Module-Store-Patch, jedem Drizzle-Schema, jedem `crypto/registry.ts`-Eintrag. `_updatedAtIndex` als lokale Schatten-Spalte einführen. Dexie-Hook stempelt `_updatedAtIndex = now` auf jedem Insert/Update. Sed-Codemod: `orderBy('updatedAt')` → `orderBy('_updatedAtIndex')`. Type-Converter: `updatedAt: max(__fieldMeta[*].at)`. | Kein Treffer mehr für `updatedAt:.*new Date` in `apps/mana/apps/web/src/lib/modules/`. Keine `updated_at`-Spalte mehr in den Drizzle-Schemas außer mana-auth (User-Tabelle, dort legitim). Sortier-Verhalten in den UI-Listen unverändert (Smoke-Test über `/todo`, `/notes`, `/cards`, `/profile/me-images`). |
|
||||
| **F4** — Server-Side Singleton Bootstrap | Neuer Endpoint in `mana-auth` (oder eigenes `mana-bootstrap`-Modul) der beim First-Login eines Users die Singleton-Inserts für `userContext` + `kontextDoc` + sonstige `id='singleton'`/`id='self'`-Records in `mana_sync.sync_changes` mit `client_id='system:bootstrap'` und `origin='system'` schreibt. Default-Inhalt aus einem geteilten Package `@mana/data-defaults` (extrahiert aus `profile/types.ts:emptyUserContext` + analogen). | Postcondition `mana-auth.users-create`: für jeden `userContext`-/`kontextDoc`-Singleton existiert genau eine Insert-Row in `sync_changes`. Test: Frische User-Identity → erster Pull bringt vollständigen Singleton ohne lokalen `ensureDoc()`-Aufruf. |
|
||||
| **F5** — `ensureDoc()` Hard-Drop | Methoden aus `userContextStore`, `kontextStore` etc. löschen. UI-Views (`ContextOverview.svelte`, `ContextInterview.svelte`, `ContextFreeform.svelte`, `KontextView.svelte`, `MissionGrantDialog.svelte`, `AiMissionsListView.svelte`) lesen über `useLiveQuery` ohne `ensureDoc()`-Vorlauf. Wenn Singleton lokal noch nicht da: Loader-State. | Keine `ensureDoc`-Aufrufe mehr im Code. Frische Browser-DB → Profile-View zeigt Loader → erster Pull kommt → View rendert. |
|
||||
| **F6** — Stable client_id | Dexie-Version mit `_clientIdentity`-Tabelle. `getOrCreateClientId()` umstellen auf Dexie-Read mit localStorage-Cache. Boot-Pfad: erst Dexie öffnen, dann `clientId` lesen, dann sync starten. | localStorage komplett clearen → `client_id` bleibt stabil. IndexedDB löschen → neuer `client_id`. |
|
||||
| **F7** — Repair-Migrationen löschen | `apps/mana/apps/web/src/lib/modules/profile/migration/repair-silent-twin.ts` und `legacy-avatar.ts` löschen, alle Aufrufer in `MeImagesView.svelte` und `wardrobe/ListView.svelte` entfernen. Begründung: F1-F3 machen die zugrundeliegenden Probleme (silent-twin, legacy-avatar-Spillover) strukturell unmöglich, weil `updatedAt` nicht mehr explizit gepatched wird und Origin-Tracking Replay-Konflikte unterdrückt. | Grep nach `repairSilentTwin`/`migrateLegacyAvatar` leer. Frische User-Identity → kein meImages-Toast mehr. |
|
||||
|
||||
## Test-Plan
|
||||
|
||||
Drei Test-Stufen pro Phase:
|
||||
|
||||
1. **Unit:** `apps/mana/apps/web/src/lib/data/sync.test.ts` und `database.test.ts` decken Origin-Kombinationen, First-Pull-Hydration, `__fieldMeta`-Hook-Stempel ab.
|
||||
|
||||
2. **Integration:** Ein neuer Test `sync-replay-no-false-positives.test.ts` mit fake-indexeddb: schreibt 10 sequenzielle `update`-Server-Changes für denselben Record (verschiedene `client_id`s, monoton steigende Timestamps), erwartet 0 Conflict-Notifications.
|
||||
|
||||
3. **End-to-End-Smoke (manuell):** Browser-DB löschen → `pnpm run mana:dev` → Login → Profile-View → Wardrobe → Workbench. Erwartet: keine Conflict-Toasts, alle Singletons da, alle Sortierungen korrekt.
|
||||
|
||||
## Stolperfallen
|
||||
|
||||
- **`__fieldMeta` und Encryption.** Aktuell ist `__fieldTimestamps` plaintext (Dexie-Hook stempelt nach `encryptRecord`). `__fieldMeta.actor` enthält `displayName` — potenziell sensibel. Entscheidung: Origin und `at` bleiben plaintext (für LWW nötig); `actor.displayName` wird im selben Encryption-Pass mit-encrypted. Wenn das zu kompliziert wird, fallback auf `actor.principalId` only und `displayName` über separate Read-Side-Lookup.
|
||||
|
||||
- **`mana-sync` Hub-Notify und SSE-Push.** Beide spiegeln das Change-Format 1:1. F1 muss den Go-Code in `services/mana-sync/internal/sync/types.go` und `handler.go` mit umstellen. Tests in `handler_test.go` und `spaces_test.go` anpassen.
|
||||
|
||||
- **`@mana/shared-ai` und `mana-ai` Server.** Beide schreiben in `_pendingChanges` bzw. direkt in `sync_changes` mit `actor: { kind: 'agent' }`. F2 muss sicherstellen, dass diese Pfade `origin: 'agent'` setzen. Sonst landen Agent-Writes als `'user'` und triggern wieder Conflicts.
|
||||
|
||||
- **Type-Converter-Sweep.** Etwa 40+ Module haben `to<Module>()`-Funktionen, die `updatedAt` aus dem Record lesen. Codemod-fähig (immer dasselbe Pattern), aber muss gewissenhaft validiert werden, weil einige Module den Wert nicht nur fürs Sortieren, sondern auch für UI-Darstellung nutzen ("zuletzt geändert vor 5 Min").
|
||||
|
||||
- **`_pendingChanges`-Format.** Heute trägt jede Pending-Row `fields: {[key]: {value, updatedAt}}`. Im neuen Format wird das zu `fields: {[key]: {value, at, actor, origin}}`. Server muss matchen.
|
||||
|
||||
- **AI Workbench Activity-Log.** `_activity`-Tabelle und Workbench-Timeline rendern Actor-Strings. Sicherstellen, dass die Read-Pfade weiterhin funktionieren wenn `__lastActor` weg ist (statt: aus `__fieldMeta` ableiten).
|
||||
|
||||
## Open Questions
|
||||
|
||||
- **Brauchen wir `origin: 'migration'` als eigene Kategorie**, oder reicht `'system'` für Bootstrap-Repair-Calls? Argument für separate Kategorie: Audit-Trail im Workbench. Argument gegen: nochmal eine Origin-Variante mehr für wenig Mehrwert. Default: `'system'` reicht, kann später erweitert werden.
|
||||
|
||||
- **Soll `_updatedAtIndex` indexiert werden?** Ja — sonst sind die `orderBy()`-Pfade O(n). Dexie-Schema `, _updatedAtIndex` hinzufügen für jede Tabelle, die heute auf `updatedAt` sortiert. Inventar in F3.
|
||||
|
||||
- **Was mit `createdAt`?** Symmetrisches Problem? Nein — `createdAt` ist immutable nach dem Insert, kann also nicht durch Folge-Pulls "überschrieben" werden, also nie Conflict-Trigger. Bleibt als reguläres Feld.
|
||||
|
||||
- **`mana-auth` als Owner des Bootstrap-Schritts** vs. einem separaten `mana-bootstrap`-Service? Default: einbauen in mana-auth, weil dort der "User wird erstmalig gesehen"-Hook sowieso lebt. Kann später extrahiert werden.
|
||||
|
||||
- **Wann genau truncaten wir `mana_sync.sync_changes`?** Vor F1 oder am Ende. Default: am Ende von F1 (sobald die neue Schemaversion existiert), dann ist die Tabelle leer und alle Folge-Phasen schreiben direkt im Zielformat.
|
||||
|
||||
## Shipping Log
|
||||
|
||||
_Wird befüllt während der Ausführung._
|
||||
|
||||
| Phase | Commit | Notiz |
|
||||
| --- | --- | --- |
|
||||
| F1 | _staged, uncommitted_ | Web + mana-sync (Go) + mana-ai + apps/api/mcp + tests + DB schema reset. Type-checks grün, mana-sync Go-Tests grün, mana-ai Bun-Tests grün (61 pass). DB truncated + recreated mit `field_meta` JSONB + `origin` TEXT. Browser-IndexedDB-Wipe + Smoke-Test stehen aus (User-Action). |
|
||||
| F2 | _pending_ | |
|
||||
| F3 | _pending_ | |
|
||||
| F4 | _pending_ | |
|
||||
| F5 | _pending_ | |
|
||||
| F6 | _pending_ | |
|
||||
| F7 | _pending_ | |
|
||||
|
||||
## F1 — Implementation Notes
|
||||
|
||||
Wire-Format-Entscheidung: `FieldChange` wurde von `{ value, updatedAt }` auf `{ value, at }` umbenannt. Per-Field-Actor + Origin werden NICHT pro-Field transmitted — sie leben am Row-Level auf `SyncChange.actor` + `SyncChange.origin`, weil jeder Push genau eine `(actor, origin)`-Kombination beschreibt. Per-Field-Differenzierung wäre redundant.
|
||||
|
||||
Client-vs-Server-Asymmetrie bewusst: lokale IndexedDB hält per-Field das volle `FieldMeta = { at, actor, origin }`-Triple, weil ein Record über mehrere Schreibzyklen mit unterschiedlichen Actors/Origins entstanden sein kann. Server-side ist `field_meta` JSONB nur eine `{[k]: at}`-Map — Actor + Origin liegen Row-Level. Die Asymmetrie ist saubere Trennung "transmit minimum" vs "track everything".
|
||||
|
||||
Origin-Werte in F1:
|
||||
- Web Dexie creating/updating Hook: hardcoded `'user'` (F2 differenziert per actor.kind)
|
||||
- `applyServerChanges`: hardcoded `'server-replay'` (Belt-and-suspenders gegen False-Positive-Conflicts beim History-Replay)
|
||||
- mana-ai `iteration-writer`: `'agent'` (server-side iteration writes ARE agent writes)
|
||||
- apps/api MCP `writeRecord`: `'agent'` (MCP-Tool-Calls sind by definition Agent-driven)
|
||||
- conflict-store `restore`: implicit `'user'` (vom Hook gesetzt)
|
||||
|
||||
`__lastActor` und `__fieldActors` wurden ersatzlos gelöscht — `__fieldMeta` enthält dieselbe Information per-Field. Konsumenten, die "wer hat zuletzt was angefasst" brauchen, leiten das ab über `argmax(__fieldMeta[k].at).actor`.
|
||||
|
||||
`updatedAt` als syncbares Datenfeld bleibt in F1 noch erhalten — F3 entfernt es.
|
||||
|
||||
Conflict-Detection-Trigger in `sync.ts:476-489` ist strukturell unverändert: noch derselbe `serverTime > localFieldTime && localValue != null && !valuesEqual(...)`-Test. F2 fügt das Origin-Gate hinzu (`localFieldMeta.origin === 'user'`).
|
||||
69
packages/shared-ai/src/field-meta.ts
Normal file
69
packages/shared-ai/src/field-meta.ts
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Per-field write metadata — the unified replacement for the older
|
||||
* triple of `__fieldTimestamps` + `__fieldActors` + `__lastActor`.
|
||||
*
|
||||
* Every synced record carries one `__fieldMeta` map, keyed by user-data
|
||||
* field name. Each entry records:
|
||||
*
|
||||
* - `at` — ISO timestamp of the write (drives field-LWW ordering)
|
||||
* - `actor` — who wrote it (drives Workbench attribution + revert)
|
||||
* - `origin` — from what kind of pipeline the value came (drives
|
||||
* conflict-detection: only `'user'`-origin writes can
|
||||
* lose to a server overwrite)
|
||||
*
|
||||
* The "who" and the "from where" are separate concerns deliberately:
|
||||
* an AI agent (`actor.kind === 'ai'`) writes with `origin === 'agent'`
|
||||
* during normal operation, but the SAME agent's writes arrive at OTHER
|
||||
* clients as `origin === 'server-replay'`. The actor identity travels
|
||||
* unchanged; the origin describes the pipeline this particular client
|
||||
* saw the value through.
|
||||
*
|
||||
* Lives in shared-ai because both runtimes (browser + mana-ai service)
|
||||
* read and write the same shape.
|
||||
*/
|
||||
|
||||
import type { Actor } from './actor';
|
||||
|
||||
/**
|
||||
* Pipeline that produced a given field value, from the perspective of
|
||||
* the local client that holds the record.
|
||||
*
|
||||
* - `'user'` — direct user edit through a module store
|
||||
* - `'agent'` — write performed by an AI agent (mission runner
|
||||
* or tool executor); the value originated locally
|
||||
* - `'system'` — system bootstrap (singleton creation, projection,
|
||||
* rule cascade)
|
||||
* - `'migration'` — one-shot data-migration write (Dexie upgrade,
|
||||
* repair routine)
|
||||
* - `'server-replay'` — value applied from a mana-sync pull; never
|
||||
* represents a local edit and therefore never
|
||||
* triggers conflict-detection
|
||||
*/
|
||||
export type FieldOrigin = 'user' | 'agent' | 'system' | 'migration' | 'server-replay';
|
||||
|
||||
/**
|
||||
* One entry in a record's `__fieldMeta` map. Frozen by the factory so
|
||||
* downstream consumers can pass the same object to multiple fields
|
||||
* without worrying about accidental mutation.
|
||||
*/
|
||||
export interface FieldMeta {
|
||||
readonly at: string;
|
||||
readonly actor: Actor;
|
||||
readonly origin: FieldOrigin;
|
||||
}
|
||||
|
||||
/** Build a frozen FieldMeta entry. Prefer this over inline object
|
||||
* literals so the read-side never sees a half-populated entry. */
|
||||
export function makeFieldMeta(at: string, actor: Actor, origin: FieldOrigin): FieldMeta {
|
||||
return Object.freeze({ at, actor, origin });
|
||||
}
|
||||
|
||||
/** True iff a field write may be overwritten silently by a server pull
|
||||
* without surfacing a conflict toast. Server-replay, system, and
|
||||
* migration writes are pipeline-internal — the user never typed them
|
||||
* and therefore can't "lose" them. Agent writes lose silently too:
|
||||
* they are visible separately via the proposal/mission UI, not via
|
||||
* the conflict toast. */
|
||||
export function isUserOriginatedField(meta: FieldMeta | undefined): boolean {
|
||||
return meta?.origin === 'user';
|
||||
}
|
||||
|
|
@ -6,7 +6,7 @@ interface ChangeRow {
|
|||
record_id: string;
|
||||
op: string;
|
||||
data: Record<string, unknown> | null;
|
||||
field_timestamps: Record<string, string> | null;
|
||||
field_meta: Record<string, string> | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
|
|
@ -16,7 +16,7 @@ function row(overrides: Record<string, unknown>): ChangeRow {
|
|||
record_id: 'agent-1',
|
||||
op: 'insert',
|
||||
data: null,
|
||||
field_timestamps: null,
|
||||
field_meta: null,
|
||||
created_at: new Date('2026-04-15T00:00:00Z'),
|
||||
...overrides,
|
||||
} as ChangeRow;
|
||||
|
|
@ -48,7 +48,7 @@ describe('mergeRaw (agents)', () => {
|
|||
row({
|
||||
op: 'insert',
|
||||
data: { name: 'A', role: 'old role', state: 'active' },
|
||||
field_timestamps: {
|
||||
field_meta: {
|
||||
name: '2026-04-15T00:00:00Z',
|
||||
role: '2026-04-15T00:00:00Z',
|
||||
state: '2026-04-15T00:00:00Z',
|
||||
|
|
@ -58,7 +58,7 @@ describe('mergeRaw (agents)', () => {
|
|||
row({
|
||||
op: 'update',
|
||||
data: { role: 'new role' },
|
||||
field_timestamps: { role: '2026-04-15T12:00:00Z' },
|
||||
field_meta: { role: '2026-04-15T12:00:00Z' },
|
||||
created_at: new Date('2026-04-15T12:00:00Z'),
|
||||
}),
|
||||
]);
|
||||
|
|
@ -71,12 +71,12 @@ describe('mergeRaw (agents)', () => {
|
|||
row({
|
||||
op: 'insert',
|
||||
data: { name: 'A' },
|
||||
field_timestamps: { name: '2026-04-15T12:00:00Z' },
|
||||
field_meta: { name: '2026-04-15T12:00:00Z' },
|
||||
}),
|
||||
row({
|
||||
op: 'update',
|
||||
data: { name: 'B' },
|
||||
field_timestamps: { name: '2026-04-15T00:00:00Z' },
|
||||
field_meta: { name: '2026-04-15T00:00:00Z' },
|
||||
created_at: new Date('2026-04-14T00:00:00Z'),
|
||||
}),
|
||||
]);
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ interface ChangeRow {
|
|||
record_id: string;
|
||||
op: string;
|
||||
data: Record<string, unknown> | null;
|
||||
field_timestamps: Record<string, string> | null;
|
||||
field_meta: Record<string, string> | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
|
|
@ -115,7 +115,7 @@ async function refreshOne(
|
|||
userId,
|
||||
async (tx) =>
|
||||
tx<ChangeRow[]>`
|
||||
SELECT user_id, record_id, op, data, field_timestamps, created_at
|
||||
SELECT user_id, record_id, op, data, field_meta, created_at
|
||||
FROM sync_changes
|
||||
WHERE app_id = 'ai'
|
||||
AND table_name = 'agents'
|
||||
|
|
@ -141,8 +141,7 @@ async function refreshOne(
|
|||
record_id: agentId,
|
||||
op: 'insert',
|
||||
data: seed.record,
|
||||
field_timestamps:
|
||||
(seed.record.__fieldTimestamps as Record<string, string> | undefined) ?? null,
|
||||
field_meta: (seed.record.__fieldMeta as Record<string, string> | undefined) ?? null,
|
||||
created_at: seed.last_applied_at,
|
||||
},
|
||||
]
|
||||
|
|
@ -177,29 +176,29 @@ async function refreshOne(
|
|||
* filters to apply here. Exported for unit tests only. */
|
||||
export function mergeRaw(rows: readonly ChangeRow[]): Record<string, unknown> | null {
|
||||
let record: Record<string, unknown> | null = null;
|
||||
let ft: Record<string, string> = {};
|
||||
let fm: Record<string, string> = {};
|
||||
|
||||
for (const row of rows) {
|
||||
if (row.op === 'delete') return null;
|
||||
if (!record) {
|
||||
record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id };
|
||||
ft = { ...(row.field_timestamps ?? {}) };
|
||||
fm = { ...(row.field_meta ?? {}) };
|
||||
continue;
|
||||
}
|
||||
if (!row.data) continue;
|
||||
const rowFT = row.field_timestamps ?? {};
|
||||
const rowFM = row.field_meta ?? {};
|
||||
for (const [k, v] of Object.entries(row.data)) {
|
||||
const serverTime = rowFT[k] ?? row.created_at.toISOString();
|
||||
const localTime = ft[k] ?? '';
|
||||
const serverTime = rowFM[k] ?? row.created_at.toISOString();
|
||||
const localTime = fm[k] ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
record[k] = v;
|
||||
ft[k] = serverTime;
|
||||
fm[k] = serverTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (record && (record.deletedAt as string | undefined)) return null;
|
||||
if (record) record.__fieldTimestamps = ft;
|
||||
if (record) record.__fieldMeta = fm;
|
||||
return record;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -88,17 +88,15 @@ function buildActor(input: AppendIterationInput): Actor {
|
|||
|
||||
export async function appendServerIteration(sql: Sql, input: AppendIterationInput): Promise<void> {
|
||||
const { userId, missionId, allIterations, nowIso } = input;
|
||||
const fieldsPayload = {
|
||||
iterations: { value: allIterations, updatedAt: nowIso },
|
||||
updatedAt: { value: nowIso, updatedAt: nowIso },
|
||||
};
|
||||
const fieldTimestamps = {
|
||||
const fieldMeta = {
|
||||
iterations: nowIso,
|
||||
updatedAt: nowIso,
|
||||
};
|
||||
// The mana-sync Go handler stores `data` on inserts and `fields` on
|
||||
// updates — for our update we populate the `data` JSONB with the
|
||||
// winning values and `field_timestamps` with the per-field stamps.
|
||||
// winning values and `field_meta` with the per-field stamps. Per-row
|
||||
// origin is `'agent'` — every server-side iteration write is an agent
|
||||
// write from the point of view of the originating "client".
|
||||
const data = {
|
||||
iterations: allIterations,
|
||||
updatedAt: nowIso,
|
||||
|
|
@ -109,25 +107,19 @@ export async function appendServerIteration(sql: Sql, input: AppendIterationInpu
|
|||
// inferred type. Cast at the boundary — the JSON serialization still
|
||||
// happens correctly at runtime.
|
||||
const dataJson = data as unknown;
|
||||
const ftJson = fieldTimestamps as unknown;
|
||||
const fmJson = fieldMeta as unknown;
|
||||
const actorJson = buildActor(input) as unknown;
|
||||
|
||||
await withUser(sql, userId, async (tx) => {
|
||||
await tx`
|
||||
INSERT INTO sync_changes
|
||||
(app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor)
|
||||
(app_id, table_name, record_id, user_id, op, data, field_meta, client_id, schema_version, actor, origin)
|
||||
VALUES
|
||||
('ai', 'aiMissions', ${missionId}, ${userId}, 'update',
|
||||
${tx.json(dataJson as never)}, ${tx.json(ftJson as never)},
|
||||
'mana-ai-runner', 1, ${tx.json(actorJson as never)})
|
||||
${tx.json(dataJson as never)}, ${tx.json(fmJson as never)},
|
||||
'mana-ai-runner', 1, ${tx.json(actorJson as never)}, 'agent')
|
||||
`;
|
||||
});
|
||||
|
||||
// fieldsPayload is kept as a named local so a future refactor that
|
||||
// needs to emit a `fields`-shaped payload (if mana-sync ever rejects
|
||||
// `data` for updates) has a ready-made map to send. Current contract
|
||||
// accepts either.
|
||||
void fieldsPayload;
|
||||
}
|
||||
|
||||
/** Convert an {@link AiPlanOutput} from the shared parser into the
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ function row(overrides: Record<string, unknown>) {
|
|||
user_id: 'u-1',
|
||||
op: 'insert',
|
||||
data: null,
|
||||
field_timestamps: null,
|
||||
field_meta: null,
|
||||
created_at: new Date('2026-04-15T00:00:00Z'),
|
||||
...overrides,
|
||||
} as Parameters<typeof mergeAndFilter>[0][number];
|
||||
|
|
@ -99,7 +99,7 @@ describe('mergeAndFilter', () => {
|
|||
iterations: [],
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
},
|
||||
field_timestamps: {
|
||||
field_meta: {
|
||||
state: '2026-04-15T00:00:00Z',
|
||||
title: '2026-04-15T00:00:00Z',
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
|
|
@ -108,7 +108,7 @@ describe('mergeAndFilter', () => {
|
|||
row({
|
||||
created_at: new Date('2026-04-15T01:00:00Z'),
|
||||
data: { title: 'new' },
|
||||
field_timestamps: { title: '2026-04-15T01:00:00Z' },
|
||||
field_meta: { title: '2026-04-15T01:00:00Z' },
|
||||
}),
|
||||
];
|
||||
const out = mergeAndFilter(rows, 'u-1', NOW);
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ interface ChangeRow {
|
|||
user_id: string;
|
||||
op: string;
|
||||
data: Record<string, unknown> | null;
|
||||
field_timestamps: Record<string, string> | null;
|
||||
field_meta: Record<string, string> | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
|
|
@ -117,19 +117,19 @@ export function mergeAndFilter(
|
|||
continue;
|
||||
}
|
||||
|
||||
const prevFT = (existing.__fieldTimestamps as Record<string, string> | undefined) ?? {};
|
||||
const nextFT = { ...prevFT };
|
||||
const prevFM = (existing.__fieldMeta as Record<string, string> | undefined) ?? {};
|
||||
const nextFM = { ...prevFM };
|
||||
if (row.data) {
|
||||
for (const [k, v] of Object.entries(row.data)) {
|
||||
const serverTime = row.field_timestamps?.[k] ?? row.created_at.toISOString();
|
||||
const localTime = prevFT[k] ?? '';
|
||||
const serverTime = row.field_meta?.[k] ?? row.created_at.toISOString();
|
||||
const localTime = prevFM[k] ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
existing[k] = v;
|
||||
nextFT[k] = serverTime;
|
||||
nextFM[k] = serverTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
existing.__fieldTimestamps = nextFT;
|
||||
existing.__fieldMeta = nextFM;
|
||||
}
|
||||
|
||||
const missions: ServerMission[] = [];
|
||||
|
|
|
|||
|
|
@ -118,7 +118,7 @@ describe('encrypted resolver', () => {
|
|||
{
|
||||
op: 'insert',
|
||||
data: { title: encTitle, content: encContent, createdAt: '2026-04-15' },
|
||||
field_timestamps: null,
|
||||
field_meta: null,
|
||||
created_at: new Date(0),
|
||||
},
|
||||
],
|
||||
|
|
@ -157,7 +157,7 @@ describe('encrypted resolver', () => {
|
|||
{
|
||||
op: 'insert',
|
||||
data: { title: enc },
|
||||
field_timestamps: null,
|
||||
field_meta: null,
|
||||
created_at: new Date(0),
|
||||
},
|
||||
],
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import { withUser } from '../connection';
|
|||
interface ChangeRow {
|
||||
op: string;
|
||||
data: Record<string, unknown> | null;
|
||||
field_timestamps: Record<string, string> | null;
|
||||
field_meta: Record<string, string> | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
|
|
@ -29,7 +29,7 @@ export async function replayRecord(
|
|||
): Promise<Record<string, unknown> | null> {
|
||||
return withUser(sql, userId, async (tx) => {
|
||||
const rows = await tx<ChangeRow[]>`
|
||||
SELECT op, data, field_timestamps, created_at
|
||||
SELECT op, data, field_meta, created_at
|
||||
FROM sync_changes
|
||||
WHERE user_id = ${userId}
|
||||
AND app_id = ${appId}
|
||||
|
|
@ -40,7 +40,7 @@ export async function replayRecord(
|
|||
if (rows.length === 0) return null;
|
||||
|
||||
let record: Record<string, unknown> | null = null;
|
||||
let fieldTimestamps: Record<string, string> = {};
|
||||
let fieldMeta: Record<string, string> = {};
|
||||
|
||||
for (const row of rows) {
|
||||
if (row.op === 'delete') {
|
||||
|
|
@ -49,20 +49,20 @@ export async function replayRecord(
|
|||
|
||||
if (!record) {
|
||||
record = row.data ? { id: recordId, ...row.data } : { id: recordId };
|
||||
if (row.field_timestamps) {
|
||||
fieldTimestamps = { ...row.field_timestamps };
|
||||
if (row.field_meta) {
|
||||
fieldMeta = { ...row.field_meta };
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!row.data) continue;
|
||||
const rowFT = row.field_timestamps ?? {};
|
||||
const rowFM = row.field_meta ?? {};
|
||||
for (const [k, v] of Object.entries(row.data)) {
|
||||
const serverTime = rowFT[k] ?? row.created_at.toISOString();
|
||||
const localTime = fieldTimestamps[k] ?? '';
|
||||
const serverTime = rowFM[k] ?? row.created_at.toISOString();
|
||||
const localTime = fieldMeta[k] ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
record[k] = v;
|
||||
fieldTimestamps[k] = serverTime;
|
||||
fieldMeta[k] = serverTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ interface ChangeRow {
|
|||
record_id: string;
|
||||
op: string;
|
||||
data: Record<string, unknown> | null;
|
||||
field_timestamps: Record<string, string> | null;
|
||||
field_meta: Record<string, string> | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
|
|
@ -96,7 +96,7 @@ async function refreshOne(
|
|||
userId,
|
||||
async (tx) =>
|
||||
tx<ChangeRow[]>`
|
||||
SELECT user_id, record_id, op, data, field_timestamps, created_at
|
||||
SELECT user_id, record_id, op, data, field_meta, created_at
|
||||
FROM sync_changes
|
||||
WHERE app_id = 'ai'
|
||||
AND table_name = 'aiMissions'
|
||||
|
|
@ -124,8 +124,7 @@ async function refreshOne(
|
|||
record_id: missionId,
|
||||
op: 'insert',
|
||||
data: seed.record,
|
||||
field_timestamps:
|
||||
(seed.record.__fieldTimestamps as Record<string, string> | undefined) ?? null,
|
||||
field_meta: (seed.record.__fieldMeta as Record<string, string> | undefined) ?? null,
|
||||
created_at: seed.last_applied_at,
|
||||
},
|
||||
]
|
||||
|
|
@ -167,26 +166,26 @@ async function refreshOne(
|
|||
*/
|
||||
function mergeRaw(rows: readonly ChangeRow[]): Record<string, unknown> | null {
|
||||
let record: Record<string, unknown> | null = null;
|
||||
let ft: Record<string, string> = {};
|
||||
let fm: Record<string, string> = {};
|
||||
|
||||
for (const row of rows) {
|
||||
if (row.op === 'delete') return null;
|
||||
if (!record) {
|
||||
record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id };
|
||||
ft = { ...(row.field_timestamps ?? {}) };
|
||||
fm = { ...(row.field_meta ?? {}) };
|
||||
continue;
|
||||
}
|
||||
if (!row.data) continue;
|
||||
const rowFT = row.field_timestamps ?? {};
|
||||
const rowFM = row.field_meta ?? {};
|
||||
for (const [k, v] of Object.entries(row.data)) {
|
||||
const serverTime = rowFT[k] ?? row.created_at.toISOString();
|
||||
const localTime = ft[k] ?? '';
|
||||
const serverTime = rowFM[k] ?? row.created_at.toISOString();
|
||||
const localTime = fm[k] ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
record[k] = v;
|
||||
ft[k] = serverTime;
|
||||
fm[k] = serverTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (record) record.__fieldTimestamps = ft;
|
||||
if (record) record.__fieldMeta = fm;
|
||||
return record;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,9 +57,11 @@ CLIENT -> SERVER:
|
|||
"id": "todo-123",
|
||||
"op": "update",
|
||||
"fields": {
|
||||
"title": { "value": "Buy milk", "updatedAt": "2024-01-01T10:05:00Z" },
|
||||
"completed": { "value": true, "updatedAt": "2024-01-01T10:06:00Z" }
|
||||
}
|
||||
"title": { "value": "Buy milk", "at": "2024-01-01T10:05:00Z" },
|
||||
"completed": { "value": true, "at": "2024-01-01T10:06:00Z" }
|
||||
},
|
||||
"actor": { "kind": "user", "principalId": "user-1", "displayName": "Du" },
|
||||
"origin": "user"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -159,16 +161,20 @@ sync_changes (
|
|||
user_id TEXT NOT NULL,
|
||||
op TEXT NOT NULL CHECK (insert | update | delete),
|
||||
data JSONB,
|
||||
field_timestamps JSONB DEFAULT '{}',
|
||||
field_meta JSONB DEFAULT '{}',
|
||||
client_id TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
schema_version INT NOT NULL DEFAULT 1,
|
||||
actor JSONB -- AI Workbench attribution: { kind: user|ai|system, ... }
|
||||
actor JSONB, -- AI Workbench attribution: { kind: user|ai|system, ... }
|
||||
origin TEXT, -- pipeline: user | agent | system | migration
|
||||
space_id TEXT
|
||||
)
|
||||
```
|
||||
|
||||
**`actor` column (2026-04-14)**: Opaque JSON blob the webapp stamps on every change to distinguish user writes from autonomous AI writes and derived subsystem writes. Server does NOT parse the shape — just persists + re-emits. Pre-actor clients omit the field; the column is nullable. See `apps/mana/apps/web/src/lib/data/events/actor.ts` for the discriminated union + `COMPANION_BRAIN_ARCHITECTURE.md §20` for the full pipeline.
|
||||
|
||||
**`origin` column + `field_meta` rename (2026-04-26, F1 of `docs/plans/sync-field-meta-overhaul.md`)**: `field_timestamps` was renamed to `field_meta` for symmetry with the client-side `__fieldMeta` and to reserve room for richer per-field metadata. The new `origin` column carries the pipeline that produced the write on the originating client (`user` / `agent` / `system` / `migration`) — drives client-side conflict-detection: only `'user'`-origin writes can lose to a server overwrite and surface a conflict toast (F2). FieldChange wire shape changed from `{ value, updatedAt }` to `{ value, at }` to match.
|
||||
|
||||
Indexes: `(user_id, app_id, created_at)`, `(table_name, record_id, created_at)`, `(user_id, app_id, table_name, created_at)`
|
||||
|
||||
## Configuration
|
||||
|
|
|
|||
|
|
@ -51,7 +51,13 @@ func (s *Store) Migrate(ctx context.Context) error {
|
|||
user_id TEXT NOT NULL,
|
||||
op TEXT NOT NULL CHECK (op IN ('insert', 'update', 'delete')),
|
||||
data JSONB,
|
||||
field_timestamps JSONB DEFAULT '{}',
|
||||
-- field_meta: per-field write timestamps as { [field]: ISO }.
|
||||
-- Replaces the older field_timestamps column with the same
|
||||
-- semantics; kept JSONB for forward compatibility if we ever
|
||||
-- need to attach per-field actor or origin separately again.
|
||||
-- Renamed alongside the docs/plans/sync-field-meta-overhaul.md
|
||||
-- rollout (F1) — see plan for the full architecture.
|
||||
field_meta JSONB DEFAULT '{}',
|
||||
client_id TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
-- M2: schema_version lets us evolve the Change wire shape over time.
|
||||
|
|
@ -62,7 +68,12 @@ func (s *Store) Migrate(ctx context.Context) error {
|
|||
-- AI Workbench: opaque actor JSON (user / ai / system). Null for
|
||||
-- pre-actor clients; the webapp stamps every change with it from
|
||||
-- the Dexie hook onward. Server-side we just persist and re-emit.
|
||||
actor JSONB
|
||||
actor JSONB,
|
||||
-- Pipeline origin of the write on the originating client:
|
||||
-- 'user', 'agent', 'system', or 'migration'. Receiving clients
|
||||
-- treat every server-pulled change as 'server-replay' locally
|
||||
-- regardless. See packages/shared-ai/src/field-meta.ts.
|
||||
origin TEXT
|
||||
);
|
||||
|
||||
-- Idempotent add for databases created before M2 shipped.
|
||||
|
|
@ -73,6 +84,10 @@ func (s *Store) Migrate(ctx context.Context) error {
|
|||
ALTER TABLE sync_changes
|
||||
ADD COLUMN IF NOT EXISTS actor JSONB;
|
||||
|
||||
-- Idempotent add for databases created before the field-meta overhaul.
|
||||
ALTER TABLE sync_changes
|
||||
ADD COLUMN IF NOT EXISTS origin TEXT;
|
||||
|
||||
-- Idempotent add for databases created before the Spaces foundation.
|
||||
-- Nullable so pre-v28 clients (which don't stamp a spaceId) can
|
||||
-- keep pushing. The RLS policy is intentionally NOT space-aware
|
||||
|
|
@ -215,7 +230,11 @@ func (s *Store) withUserAndMemberships(
|
|||
// `actor` is the opaque JSON blob the webapp stamps on every change (see
|
||||
// `data/events/actor.ts`). Pass nil for pre-actor callers; the column is
|
||||
// nullable and cross-device consumers treat a missing actor as `user`.
|
||||
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, spaceID, op, clientID string, data map[string]any, fieldTimestamps map[string]string, schemaVersion int, actor json.RawMessage) error {
|
||||
//
|
||||
// `origin` describes the pipeline that produced the write on the originating
|
||||
// client (`user` / `agent` / `system` / `migration`). Empty for pre-origin
|
||||
// callers; the column is nullable.
|
||||
func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, userID, spaceID, op, clientID string, data map[string]any, fieldMeta map[string]string, schemaVersion int, actor json.RawMessage, origin string) error {
|
||||
if schemaVersion <= 0 {
|
||||
schemaVersion = 1
|
||||
}
|
||||
|
|
@ -225,9 +244,9 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us
|
|||
return fmt.Errorf("marshal data: %w", err)
|
||||
}
|
||||
|
||||
ftJSON, err := json.Marshal(fieldTimestamps)
|
||||
fmJSON, err := json.Marshal(fieldMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal field_timestamps: %w", err)
|
||||
return fmt.Errorf("marshal field_meta: %w", err)
|
||||
}
|
||||
|
||||
// pgx serializes a nil []byte as NULL for JSONB columns, which is what
|
||||
|
|
@ -246,12 +265,18 @@ func (s *Store) RecordChange(ctx context.Context, appID, tableName, recordID, us
|
|||
spaceIDParam = &spaceID
|
||||
}
|
||||
|
||||
// Same nullable handling for origin: empty string lands as SQL NULL.
|
||||
var originParam *string
|
||||
if origin != "" {
|
||||
originParam = &origin
|
||||
}
|
||||
|
||||
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, space_id, op, data, field_timestamps, client_id, schema_version, actor)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||
INSERT INTO sync_changes (app_id, table_name, record_id, user_id, space_id, op, data, field_meta, client_id, schema_version, actor, origin)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
`
|
||||
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, spaceIDParam, op, dataJSON, ftJSON, clientID, schemaVersion, actorJSON)
|
||||
_, err := tx.Exec(ctx, query, appID, tableName, recordID, userID, spaceIDParam, op, dataJSON, fmJSON, clientID, schemaVersion, actorJSON, originParam)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
|
@ -270,7 +295,7 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
|
|||
var changes []ChangeRow
|
||||
err = s.withUserAndMemberships(ctx, userID, spaceIDs, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, space_id, actor
|
||||
SELECT id, table_name, record_id, op, data, field_meta, client_id, created_at, schema_version, space_id, actor, origin
|
||||
FROM sync_changes
|
||||
WHERE (user_id = $1 OR space_id = ANY($7)) AND app_id = $2 AND table_name = $3
|
||||
AND created_at > $4 AND client_id != $5
|
||||
|
|
@ -285,24 +310,27 @@ func (s *Store) GetChangesSince(ctx context.Context, userID, appID, tableName, s
|
|||
|
||||
for rows.Next() {
|
||||
var c ChangeRow
|
||||
var dataJSON, ftJSON, actorJSON []byte
|
||||
var spaceID *string
|
||||
var dataJSON, fmJSON, actorJSON []byte
|
||||
var spaceID, origin *string
|
||||
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON); err != nil {
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &fmJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON, &origin); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if spaceID != nil {
|
||||
c.SpaceID = *spaceID
|
||||
}
|
||||
if origin != nil {
|
||||
c.Origin = *origin
|
||||
}
|
||||
if dataJSON != nil {
|
||||
if err := json.Unmarshal(dataJSON, &c.Data); err != nil {
|
||||
return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if ftJSON != nil {
|
||||
if err := json.Unmarshal(ftJSON, &c.FieldTimestamps); err != nil {
|
||||
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
|
||||
if fmJSON != nil {
|
||||
if err := json.Unmarshal(fmJSON, &c.FieldMeta); err != nil {
|
||||
return fmt.Errorf("unmarshal field_meta for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if len(actorJSON) > 0 {
|
||||
|
|
@ -328,7 +356,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
|
|||
var changes []ChangeRow
|
||||
err = s.withUserAndMemberships(ctx, userID, spaceIDs, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
SELECT id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, space_id, actor
|
||||
SELECT id, table_name, record_id, op, data, field_meta, client_id, created_at, schema_version, space_id, actor, origin
|
||||
FROM sync_changes
|
||||
WHERE (user_id = $1 OR space_id = ANY($5)) AND app_id = $2
|
||||
AND created_at > $3 AND client_id != $4
|
||||
|
|
@ -343,24 +371,27 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
|
|||
|
||||
for rows.Next() {
|
||||
var c ChangeRow
|
||||
var dataJSON, ftJSON, actorJSON []byte
|
||||
var spaceID *string
|
||||
var dataJSON, fmJSON, actorJSON []byte
|
||||
var spaceID, origin *string
|
||||
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON); err != nil {
|
||||
if err := rows.Scan(&c.ID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &fmJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON, &origin); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if spaceID != nil {
|
||||
c.SpaceID = *spaceID
|
||||
}
|
||||
if origin != nil {
|
||||
c.Origin = *origin
|
||||
}
|
||||
if dataJSON != nil {
|
||||
if err := json.Unmarshal(dataJSON, &c.Data); err != nil {
|
||||
return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if ftJSON != nil {
|
||||
if err := json.Unmarshal(ftJSON, &c.FieldTimestamps); err != nil {
|
||||
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
|
||||
if fmJSON != nil {
|
||||
if err := json.Unmarshal(fmJSON, &c.FieldMeta); err != nil {
|
||||
return fmt.Errorf("unmarshal field_meta for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if len(actorJSON) > 0 {
|
||||
|
|
@ -382,7 +413,7 @@ func (s *Store) GetAllChangesSince(ctx context.Context, userID, appID, since, ex
|
|||
func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func(ChangeRow) error) error {
|
||||
return s.withUser(ctx, userID, func(tx pgx.Tx) error {
|
||||
query := `
|
||||
SELECT id, app_id, table_name, record_id, op, data, field_timestamps, client_id, created_at, schema_version, space_id, actor
|
||||
SELECT id, app_id, table_name, record_id, op, data, field_meta, client_id, created_at, schema_version, space_id, actor, origin
|
||||
FROM sync_changes
|
||||
WHERE user_id = $1
|
||||
ORDER BY created_at ASC, id ASC
|
||||
|
|
@ -395,22 +426,25 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func
|
|||
|
||||
for rows.Next() {
|
||||
var c ChangeRow
|
||||
var dataJSON, ftJSON, actorJSON []byte
|
||||
var spaceID *string
|
||||
if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &ftJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON); err != nil {
|
||||
var dataJSON, fmJSON, actorJSON []byte
|
||||
var spaceID, origin *string
|
||||
if err := rows.Scan(&c.ID, &c.AppID, &c.TableName, &c.RecordID, &c.Op, &dataJSON, &fmJSON, &c.ClientID, &c.CreatedAt, &c.SchemaVersion, &spaceID, &actorJSON, &origin); err != nil {
|
||||
return fmt.Errorf("scan: %w", err)
|
||||
}
|
||||
if spaceID != nil {
|
||||
c.SpaceID = *spaceID
|
||||
}
|
||||
if origin != nil {
|
||||
c.Origin = *origin
|
||||
}
|
||||
if dataJSON != nil {
|
||||
if err := json.Unmarshal(dataJSON, &c.Data); err != nil {
|
||||
return fmt.Errorf("unmarshal data for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if ftJSON != nil {
|
||||
if err := json.Unmarshal(ftJSON, &c.FieldTimestamps); err != nil {
|
||||
return fmt.Errorf("unmarshal field_timestamps for record %s: %w", c.RecordID, err)
|
||||
if fmJSON != nil {
|
||||
if err := json.Unmarshal(fmJSON, &c.FieldMeta); err != nil {
|
||||
return fmt.Errorf("unmarshal field_meta for record %s: %w", c.RecordID, err)
|
||||
}
|
||||
}
|
||||
if len(actorJSON) > 0 {
|
||||
|
|
@ -426,16 +460,20 @@ func (s *Store) StreamAllUserChanges(ctx context.Context, userID string, fn func
|
|||
|
||||
// ChangeRow is a row from the sync_changes table.
|
||||
type ChangeRow struct {
|
||||
AppID string
|
||||
ID string
|
||||
TableName string
|
||||
RecordID string
|
||||
Op string
|
||||
Data map[string]any
|
||||
FieldTimestamps map[string]string
|
||||
ClientID string
|
||||
CreatedAt time.Time
|
||||
SchemaVersion int
|
||||
AppID string
|
||||
ID string
|
||||
TableName string
|
||||
RecordID string
|
||||
Op string
|
||||
Data map[string]any
|
||||
// FieldMeta carries per-field write timestamps as { [field]: ISO }.
|
||||
// Per-field actor + origin live at the row level (Actor + Origin
|
||||
// below) — each push represents one (actor, origin) tuple, so
|
||||
// duplicating them per-field would be redundant on the wire.
|
||||
FieldMeta map[string]string
|
||||
ClientID string
|
||||
CreatedAt time.Time
|
||||
SchemaVersion int
|
||||
// SpaceID is empty for pre-v28 rows. Consumers use it to partition
|
||||
// the reader cache per space; an empty string means "implicit personal"
|
||||
// until the bootstrap reconciliation fills it in.
|
||||
|
|
@ -443,4 +481,8 @@ type ChangeRow struct {
|
|||
// Actor is nil for rows written by pre-actor clients. Consumers on
|
||||
// other devices render a missing actor as "user".
|
||||
Actor json.RawMessage
|
||||
// Origin describes the pipeline that produced the write on the
|
||||
// originating client. Empty for pre-origin clients; consumers treat
|
||||
// missing as "user". See packages/shared-ai/src/field-meta.ts.
|
||||
Origin string
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,18 +90,19 @@ func changeFromRow(row store.ChangeRow) Change {
|
|||
Op: row.Op,
|
||||
SpaceID: row.SpaceID,
|
||||
Actor: row.Actor,
|
||||
Origin: row.Origin,
|
||||
}
|
||||
switch row.Op {
|
||||
case "insert":
|
||||
c.Data = row.Data
|
||||
case "update":
|
||||
c.Fields = make(map[string]*FieldChange)
|
||||
for field, ts := range row.FieldTimestamps {
|
||||
for field, ts := range row.FieldMeta {
|
||||
value, ok := row.Data[field]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
|
||||
c.Fields[field] = &FieldChange{Value: value, At: ts}
|
||||
}
|
||||
case "delete":
|
||||
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
|
||||
|
|
@ -181,15 +182,15 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
|
|||
for _, change := range changeset.Changes {
|
||||
affectedTables[change.Table] = struct{}{}
|
||||
|
||||
// Build data and field timestamps
|
||||
// Build data and field metadata.
|
||||
data := change.Data
|
||||
fieldTimestamps := make(map[string]string)
|
||||
fieldMeta := make(map[string]string)
|
||||
|
||||
if change.Op == "update" && change.Fields != nil {
|
||||
data = make(map[string]any)
|
||||
for field, fc := range change.Fields {
|
||||
data[field] = fc.Value
|
||||
fieldTimestamps[field] = fc.UpdatedAt
|
||||
fieldMeta[field] = fc.At
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -215,7 +216,7 @@ func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
|
|||
// clients still get indexed correctly. Empty string lands as SQL
|
||||
// NULL via RecordChange.
|
||||
spaceID := extractSpaceID(change)
|
||||
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, spaceID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion, change.Actor)
|
||||
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, spaceID, change.Op, clientID, data, fieldMeta, rowSchemaVersion, change.Actor, change.Origin)
|
||||
if err != nil {
|
||||
slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID)
|
||||
http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError)
|
||||
|
|
@ -459,15 +460,15 @@ func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) {
|
|||
func (h *Handler) convertChanges(rows []store.ChangeRow) []Change {
|
||||
changes := make([]Change, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op, Actor: row.Actor}
|
||||
c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op, Actor: row.Actor, Origin: row.Origin}
|
||||
switch row.Op {
|
||||
case "insert":
|
||||
c.Data = row.Data
|
||||
case "update":
|
||||
c.Fields = make(map[string]*FieldChange)
|
||||
for field, ts := range row.FieldTimestamps {
|
||||
for field, ts := range row.FieldMeta {
|
||||
if value, ok := row.Data[field]; ok {
|
||||
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
|
||||
c.Fields[field] = &FieldChange{Value: value, At: ts}
|
||||
}
|
||||
}
|
||||
case "delete":
|
||||
|
|
|
|||
|
|
@ -19,13 +19,13 @@ type mockStore struct {
|
|||
type recordedChange struct {
|
||||
appID, table, recordID, userID, op, clientID string
|
||||
data map[string]any
|
||||
fieldTimestamps map[string]string
|
||||
fieldMeta map[string]string
|
||||
}
|
||||
|
||||
type mockChangeRow struct {
|
||||
ID, TableName, RecordID, Op, ClientID string
|
||||
Data map[string]any
|
||||
FieldTimestamps map[string]string
|
||||
FieldMeta map[string]string
|
||||
}
|
||||
|
||||
// mockValidator always returns a fixed user ID.
|
||||
|
|
@ -102,7 +102,7 @@ func TestChangesetValidation(t *testing.T) {
|
|||
Since: "2024-01-01T00:00:00Z",
|
||||
Changes: []Change{
|
||||
{Table: "todos", ID: "todo-1", Op: "update", Fields: map[string]*FieldChange{
|
||||
"title": {Value: "Updated", UpdatedAt: "2024-01-01T10:00:00Z"},
|
||||
"title": {Value: "Updated", At: "2024-01-01T10:00:00Z"},
|
||||
}},
|
||||
},
|
||||
},
|
||||
|
|
@ -260,8 +260,8 @@ func TestFieldChangeRoundTrip(t *testing.T) {
|
|||
ID: "todo-1",
|
||||
Op: "update",
|
||||
Fields: map[string]*FieldChange{
|
||||
"title": {Value: "Buy milk", UpdatedAt: "2024-01-01T10:05:00Z"},
|
||||
"completed": {Value: true, UpdatedAt: "2024-01-01T10:06:00Z"},
|
||||
"title": {Value: "Buy milk", At: "2024-01-01T10:05:00Z"},
|
||||
"completed": {Value: true, At: "2024-01-01T10:06:00Z"},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -286,8 +286,8 @@ func TestFieldChangeRoundTrip(t *testing.T) {
|
|||
if titleField.Value != "Buy milk" {
|
||||
t.Errorf("title value = %v, want 'Buy milk'", titleField.Value)
|
||||
}
|
||||
if titleField.UpdatedAt != "2024-01-01T10:05:00Z" {
|
||||
t.Errorf("title updatedAt = %q, want '2024-01-01T10:05:00Z'", titleField.UpdatedAt)
|
||||
if titleField.At != "2024-01-01T10:05:00Z" {
|
||||
t.Errorf("title at = %q, want '2024-01-01T10:05:00Z'", titleField.At)
|
||||
}
|
||||
|
||||
completedField := decoded.Fields["completed"]
|
||||
|
|
|
|||
|
|
@ -47,12 +47,23 @@ type Change struct {
|
|||
// clients so cross-device attribution works. Pre-actor clients omit
|
||||
// the field; the column is nullable.
|
||||
Actor json.RawMessage `json:"actor,omitempty"`
|
||||
// Origin describes the pipeline that produced the write on the
|
||||
// originating client: 'user', 'agent', 'system', or 'migration'.
|
||||
// The server stores it verbatim and re-emits to other clients —
|
||||
// receiving clients then apply with origin='server-replay' locally
|
||||
// regardless. Pre-origin clients omit the field; the column is
|
||||
// nullable. See packages/shared-ai/src/field-meta.ts for the
|
||||
// full enumeration and conflict-detection semantics.
|
||||
Origin string `json:"origin,omitempty"`
|
||||
}
|
||||
|
||||
// FieldChange holds a value and the timestamp when it was last changed.
|
||||
// FieldChange holds a value and the timestamp when the field was last
|
||||
// written. Per-field actor + origin are not transmitted at the field
|
||||
// level — they live at the row level on Change.Actor + Change.Origin
|
||||
// because each push represents one (actor, origin) tuple.
|
||||
type FieldChange struct {
|
||||
Value any `json:"value"`
|
||||
UpdatedAt string `json:"updatedAt"`
|
||||
Value any `json:"value"`
|
||||
At string `json:"at"`
|
||||
}
|
||||
|
||||
// Changeset is a batch of changes sent by a client.
|
||||
|
|
@ -95,15 +106,16 @@ type PullRequest struct {
|
|||
|
||||
// SyncRecord is a row in the sync_changes table.
|
||||
type SyncRecord struct {
|
||||
ID string `json:"id"`
|
||||
AppID string `json:"appId"`
|
||||
TableName string `json:"tableName"`
|
||||
RecordID string `json:"recordId"`
|
||||
UserID string `json:"userId"`
|
||||
Op string `json:"op"`
|
||||
Fields map[string]any `json:"fields,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
FieldTimestamps map[string]string `json:"fieldTimestamps,omitempty"`
|
||||
ClientID string `json:"clientId"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
ID string `json:"id"`
|
||||
AppID string `json:"appId"`
|
||||
TableName string `json:"tableName"`
|
||||
RecordID string `json:"recordId"`
|
||||
UserID string `json:"userId"`
|
||||
Op string `json:"op"`
|
||||
Fields map[string]any `json:"fields,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
FieldMeta map[string]string `json:"fieldMeta,omitempty"`
|
||||
Origin string `json:"origin,omitempty"`
|
||||
ClientID string `json:"clientId"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue