mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-17 23:09:39 +02:00
fix(articles): import-projection accepts F3 + legacy field_meta shapes
Live-test caught it: the worker projects sync_changes via field-level
LWW, comparing `field_meta[k]` directly. But field_meta is two-shaped
on the wire:
- Legacy plaintext writes: { state: '2026-04-28T…' }
- Field-meta-overhaul writes: { state: { at, actor, origin } }
The naive `rowFM[k] >= localTime` worked for the all-legacy case, but
once a client write (legacy string) followed a worker write (F3
object), the comparison evaluated `'2026-04-28T…' >= '[object …]'`
and the projection silently kept the older value. Live symptom: an
item that was correctly flipped to 'saved' on the client was reported
back as 'extracted' by the projection.
Fix: `fieldMetaTime()` helper that pulls the ISO string out of either
shape; both write paths now compare apples-to-apples.
Verified end-to-end:
- Synthetic job + item written into sync_changes
- runTickOnce() → claim → extractFromUrl(example.com) → pickup row
with title='Example Domain', wordCount=16, actor=
system:articles-import-worker
- Item transitions pending → extracting → extracted
- Simulated client write 'saved'
- Next tick rolls counters: savedCount 0→1, status running→done,
finishedAt stamped
Plan: docs/plans/articles-bulk-import.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
18f13e19b2
commit
054b9e5beb
11 changed files with 28 additions and 92 deletions
|
|
@ -21,15 +21,32 @@
|
|||
import { getSyncConnection } from '../../mcp/sync-db';
|
||||
|
||||
type Row = Record<string, unknown>;
|
||||
/**
|
||||
* `field_meta` is one of two shapes on the wire:
|
||||
* - Legacy plaintext writes: `{[fieldName]: ISOString}`
|
||||
* - Field-meta-overhaul writes: `{[fieldName]: {at, actor, origin}}`
|
||||
* `fieldMetaTime()` below normalises both into the comparable ISO string.
|
||||
*/
|
||||
interface ChangeRow {
|
||||
user_id: string;
|
||||
record_id: string;
|
||||
op: string;
|
||||
data: Row | null;
|
||||
field_meta: Record<string, string> | null;
|
||||
field_meta: Record<string, unknown> | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
/** Pull the timestamp out of either shape. Falls back to empty string
|
||||
* so the LWW comparison never throws on undefined. */
|
||||
function fieldMetaTime(meta: unknown): string {
|
||||
if (typeof meta === 'string') return meta;
|
||||
if (meta && typeof meta === 'object') {
|
||||
const at = (meta as { at?: unknown }).at;
|
||||
if (typeof at === 'string') return at;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
export interface ImportJobRow {
|
||||
id: string;
|
||||
userId: string;
|
||||
|
|
@ -134,6 +151,9 @@ function mergeByUserAndRecord(rows: readonly ChangeRow[]): Map<string, MergedEnt
|
|||
userId: string;
|
||||
recordId: string;
|
||||
record: Row | null;
|
||||
/** Per-field LWW timestamps (normalised to ISO strings — see
|
||||
* fieldMetaTime). Both wire shapes are folded down to plain
|
||||
* strings here so the projection comparison stays trivial. */
|
||||
fm: Record<string, string>;
|
||||
};
|
||||
let current: Cur | null = null;
|
||||
|
|
@ -151,14 +171,19 @@ function mergeByUserAndRecord(rows: readonly ChangeRow[]): Map<string, MergedEnt
|
|||
continue;
|
||||
}
|
||||
if (!r.data) continue;
|
||||
const rowCreatedAt = r.created_at.toISOString();
|
||||
if (!current.record) {
|
||||
current.record = { id: r.record_id, ...r.data };
|
||||
current.fm = { ...(r.field_meta ?? {}) };
|
||||
const initFM = r.field_meta ?? {};
|
||||
current.fm = {};
|
||||
for (const k of Object.keys(initFM)) {
|
||||
current.fm[k] = fieldMetaTime(initFM[k]) || rowCreatedAt;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const rowFM = r.field_meta ?? {};
|
||||
for (const [k, v] of Object.entries(r.data)) {
|
||||
const serverTime = rowFM[k] ?? r.created_at.toISOString();
|
||||
const serverTime = fieldMetaTime(rowFM[k]) || rowCreatedAt;
|
||||
const localTime = current.fm[k] ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
current.record[k] = v;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue