mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 19:21:10 +02:00
fix(articles, mana-ai): rollout-block hardening for sync_changes projections
Four cross-cutting fixes that make the bulk-import worker safe to run
under real production load. All four were called out as live-rollout
risks in the post-ship review of docs/plans/articles-bulk-import.md.
#1 — Same fieldMetaTime bug fixed in mana-ai
The articles fix in 054b9e5be hoists the helper to its own file
`apps/api/src/modules/articles/field-meta.ts`. The same naive
`rowFM[k] >= localTime` LWW comparison existed in three more
projections under services/mana-ai (missions-projection,
snapshot-refresh, agents-projection). Once any F3 stamp lands
beside a legacy-string stamp, the comparison evaluates
`'[object Object]' >= 'ISO-…'` (false) and the older value wins.
New `services/mana-ai/src/db/field-meta.ts` — same helper,
deliberately duplicated (each service treats sync_changes as a
read-only event log; sharing infra across services is out of
scope here). All 61 mana-ai bun tests still pass.
#2 — Stale 'extracting' items recycle
If the worker dies mid-fetch (OOM, pod restart), items stay in
state='extracting' forever and the job never completes. New sweep
at the start of `processOneJob`: items whose lastAttemptAt is
older than 5 minutes get bounced back to 'pending' so the next
tick re-claims them. STALE_EXTRACTING_MS tuned for the 15s
shared-rss fetch + JSDOM-parse worst case.
#3 — Pickup-row GC
Every 30 ticks (~once per minute) the worker hard-deletes
articleExtractPickup rows older than 24h. Without this a stuck
pickup-consumer (all tabs closed, Web-Lock mismatch) would let
sync_changes accumulate without bound. Logs the row count when
non-zero so we can spot stuck consumers in the wild.
#4 — DRY consent-wall heuristic
Identical CONSENT_KEYWORDS + threshold lived in routes.ts AND
import-extractor.ts. Hoisted to
`apps/api/src/modules/articles/consent-wall.ts`; both call sites
now share one heuristic.
Plan: docs/plans/articles-bulk-import.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e99fea1938
commit
b297f68ee4
10 changed files with 223 additions and 69 deletions
37
apps/api/src/modules/articles/consent-wall.ts
Normal file
37
apps/api/src/modules/articles/consent-wall.ts
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Consent-wall heuristic shared by every server-side article-extract
|
||||||
|
* path:
|
||||||
|
* - `/api/v1/articles/extract` and `/extract/html` (single-URL)
|
||||||
|
* - The bulk-import worker's `extractOneItem` (background)
|
||||||
|
*
|
||||||
|
* When the extracted text is suspiciously short AND contains GDPR /
|
||||||
|
* cookie-consent vocabulary, the server's anonymous fetch most likely
|
||||||
|
* hit a consent dialog instead of the article itself. The caller can
|
||||||
|
* use the flag to nudge the user toward the browser-HTML bookmarklet
|
||||||
|
* (which fetches with the user's existing session cookies) rather
|
||||||
|
* than silently persisting the GDPR overlay text as the article body.
|
||||||
|
*/
|
||||||
|
|
||||||
|
const CONSENT_KEYWORDS = [
|
||||||
|
'cookies zustimmen',
|
||||||
|
'cookie consent',
|
||||||
|
'zustimmung',
|
||||||
|
'accept all cookies',
|
||||||
|
'consent to the use',
|
||||||
|
'enable javascript',
|
||||||
|
'javascript is disabled',
|
||||||
|
'please enable',
|
||||||
|
'privacy center',
|
||||||
|
'datenschutzeinstellungen',
|
||||||
|
'datenschutzeinstellungen',
|
||||||
|
];
|
||||||
|
|
||||||
|
/** Wordcount floor below which the heuristic is considered. Real
|
||||||
|
* articles are typically >300 words; consent dialogs are <50. */
|
||||||
|
const CONSENT_WORDCOUNT_THRESHOLD = 300;
|
||||||
|
|
||||||
|
export function looksLikeConsentWall(content: string, wordCount: number): boolean {
|
||||||
|
if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false;
|
||||||
|
const haystack = content.toLowerCase();
|
||||||
|
return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle));
|
||||||
|
}
|
||||||
32
apps/api/src/modules/articles/field-meta.ts
Normal file
32
apps/api/src/modules/articles/field-meta.ts
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Wire-shape adapter for `sync_changes.field_meta`.
|
||||||
|
*
|
||||||
|
* Two shapes coexist on the wire today:
|
||||||
|
*
|
||||||
|
* - Legacy plaintext writes: { state: 'ISO-8601' }
|
||||||
|
* - Field-meta-overhaul (F3): { state: { at, actor, origin } }
|
||||||
|
*
|
||||||
|
* Any LWW projection that string-compares per-field timestamps MUST
|
||||||
|
* fold both into a comparable form, otherwise the moment one side is
|
||||||
|
* an F3 object the comparison becomes `'[object Object]' >= 'ISO…'`
|
||||||
|
* (false), the older value wins and the projection lies.
|
||||||
|
*
|
||||||
|
* Sister helper at `services/mana-ai/src/db/field-meta.ts` — same
|
||||||
|
* logic, deliberately duplicated. Both services treat sync_changes as
|
||||||
|
* a read-only event log; sharing infrastructure code across services
|
||||||
|
* (apps/api ↔ services/mana-ai) is out of scope.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** Returns the ISO-string timestamp of a single `field_meta[k]` slot,
|
||||||
|
* regardless of whether the wire format is the legacy plain string
|
||||||
|
* or the F3 `{ at, actor, origin }` object. Returns the empty string
|
||||||
|
* when no usable value is present so the LWW comparison treats the
|
||||||
|
* field as never-stamped (callers fall back to row.created_at). */
|
||||||
|
export function fieldMetaTime(meta: unknown): string {
|
||||||
|
if (typeof meta === 'string') return meta;
|
||||||
|
if (meta && typeof meta === 'object') {
|
||||||
|
const at = (meta as { at?: unknown }).at;
|
||||||
|
if (typeof at === 'string') return at;
|
||||||
|
}
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
@ -27,6 +27,7 @@
|
||||||
import { extractFromUrl } from '@mana/shared-rss';
|
import { extractFromUrl } from '@mana/shared-rss';
|
||||||
import { makeFieldMeta, type Actor, type FieldOrigin } from '@mana/shared-ai';
|
import { makeFieldMeta, type Actor, type FieldOrigin } from '@mana/shared-ai';
|
||||||
import { getSyncConnection } from '../../mcp/sync-db';
|
import { getSyncConnection } from '../../mcp/sync-db';
|
||||||
|
import { looksLikeConsentWall } from './consent-wall';
|
||||||
import type { ImportItemRow } from './import-projection';
|
import type { ImportItemRow } from './import-projection';
|
||||||
|
|
||||||
const MAX_ATTEMPTS = 3;
|
const MAX_ATTEMPTS = 3;
|
||||||
|
|
@ -213,25 +214,4 @@ async function insertSyncChange(params: InsertParams): Promise<void> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── Consent-wall heuristic (mirror of routes.ts) ────────────
|
// looksLikeConsentWall lives in ./consent-wall.ts — shared with routes.ts.
|
||||||
|
|
||||||
const CONSENT_KEYWORDS = [
|
|
||||||
'cookies zustimmen',
|
|
||||||
'cookie consent',
|
|
||||||
'zustimmung',
|
|
||||||
'accept all cookies',
|
|
||||||
'consent to the use',
|
|
||||||
'enable javascript',
|
|
||||||
'javascript is disabled',
|
|
||||||
'please enable',
|
|
||||||
'privacy center',
|
|
||||||
'datenschutzeinstellungen',
|
|
||||||
'datenschutzeinstellungen',
|
|
||||||
];
|
|
||||||
const CONSENT_WORDCOUNT_THRESHOLD = 300;
|
|
||||||
|
|
||||||
function looksLikeConsentWall(content: string, wordCount: number): boolean {
|
|
||||||
if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false;
|
|
||||||
const haystack = content.toLowerCase();
|
|
||||||
return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle));
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -19,34 +19,20 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { getSyncConnection } from '../../mcp/sync-db';
|
import { getSyncConnection } from '../../mcp/sync-db';
|
||||||
|
import { fieldMetaTime } from './field-meta';
|
||||||
|
|
||||||
type Row = Record<string, unknown>;
|
type Row = Record<string, unknown>;
|
||||||
/**
|
|
||||||
* `field_meta` is one of two shapes on the wire:
|
|
||||||
* - Legacy plaintext writes: `{[fieldName]: ISOString}`
|
|
||||||
* - Field-meta-overhaul writes: `{[fieldName]: {at, actor, origin}}`
|
|
||||||
* `fieldMetaTime()` below normalises both into the comparable ISO string.
|
|
||||||
*/
|
|
||||||
interface ChangeRow {
|
interface ChangeRow {
|
||||||
user_id: string;
|
user_id: string;
|
||||||
record_id: string;
|
record_id: string;
|
||||||
op: string;
|
op: string;
|
||||||
data: Row | null;
|
data: Row | null;
|
||||||
|
/** See `./field-meta.ts` — wire shape is two-tone (legacy ISO string
|
||||||
|
* vs. F3 `{at, actor, origin}` object). */
|
||||||
field_meta: Record<string, unknown> | null;
|
field_meta: Record<string, unknown> | null;
|
||||||
created_at: Date;
|
created_at: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Pull the timestamp out of either shape. Falls back to empty string
|
|
||||||
* so the LWW comparison never throws on undefined. */
|
|
||||||
function fieldMetaTime(meta: unknown): string {
|
|
||||||
if (typeof meta === 'string') return meta;
|
|
||||||
if (meta && typeof meta === 'object') {
|
|
||||||
const at = (meta as { at?: unknown }).at;
|
|
||||||
if (typeof at === 'string') return at;
|
|
||||||
}
|
|
||||||
return '';
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ImportJobRow {
|
export interface ImportJobRow {
|
||||||
id: string;
|
id: string;
|
||||||
userId: string;
|
userId: string;
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,30 @@ import {
|
||||||
} from './import-projection';
|
} from './import-projection';
|
||||||
import { extractOneItem, writeItemUpdate, writeJobUpdate } from './import-extractor';
|
import { extractOneItem, writeItemUpdate, writeJobUpdate } from './import-extractor';
|
||||||
|
|
||||||
|
/** Counts ticks so the pickup-GC sweep can run every Nth one rather
|
||||||
|
* than on every 2-second cycle (the DELETE is cheap but not free). */
|
||||||
|
let tickCount = 0;
|
||||||
|
/** Run pickup-GC every 30 ticks ≈ once per minute. */
|
||||||
|
const PICKUP_GC_EVERY_N_TICKS = 30;
|
||||||
|
|
||||||
const TICK_INTERVAL_MS = 2_000;
|
const TICK_INTERVAL_MS = 2_000;
|
||||||
const PER_JOB_CONCURRENCY = 3;
|
const PER_JOB_CONCURRENCY = 3;
|
||||||
|
/**
|
||||||
|
* If an item has been in `state='extracting'` longer than this without
|
||||||
|
* a follow-up state-write, it's orphaned (worker crashed mid-fetch,
|
||||||
|
* pod restart, OOM, …) and gets bounced back to `pending` so the next
|
||||||
|
* tick can re-claim it.
|
||||||
|
*
|
||||||
|
* Tuned so a slow but live extraction (15 s shared-rss fetch timeout +
|
||||||
|
* a few seconds of JSDOM parse on a 2 MB page) doesn't reset
|
||||||
|
* prematurely — 5 minutes is comfortable headroom.
|
||||||
|
*/
|
||||||
|
const STALE_EXTRACTING_MS = 5 * 60 * 1000;
|
||||||
|
/** TTL for `articleExtractPickup` rows. The pickup-consumer normally
|
||||||
|
* deletes them seconds after the worker writes them; anything older
|
||||||
|
* than this is garbage from a stuck consumer (all tabs closed,
|
||||||
|
* Web-Lock mismatch, …) and would otherwise accumulate without bound. */
|
||||||
|
const PICKUP_TTL_MS = 24 * 60 * 60 * 1000;
|
||||||
/** Fixed int8 lock key — derived from the ASCII bytes of 'ARTI'. */
|
/** Fixed int8 lock key — derived from the ASCII bytes of 'ARTI'. */
|
||||||
const ADVISORY_LOCK_KEY = 0x4152_5449;
|
const ADVISORY_LOCK_KEY = 0x4152_5449;
|
||||||
|
|
||||||
|
|
@ -88,16 +110,51 @@ export async function runTickOnce(): Promise<{
|
||||||
skipped: boolean;
|
skipped: boolean;
|
||||||
jobsConsidered: number;
|
jobsConsidered: number;
|
||||||
itemsProcessed: number;
|
itemsProcessed: number;
|
||||||
|
pickupGcRows?: number;
|
||||||
}> {
|
}> {
|
||||||
if (!(await tryAcquireLock())) {
|
if (!(await tryAcquireLock())) {
|
||||||
return { skipped: true, jobsConsidered: 0, itemsProcessed: 0 };
|
return { skipped: true, jobsConsidered: 0, itemsProcessed: 0 };
|
||||||
}
|
}
|
||||||
|
tickCount++;
|
||||||
|
let pickupGcRows: number | undefined;
|
||||||
|
if (tickCount % PICKUP_GC_EVERY_N_TICKS === 0) {
|
||||||
|
pickupGcRows = await runPickupGc();
|
||||||
|
}
|
||||||
const jobs = await listClaimableJobs();
|
const jobs = await listClaimableJobs();
|
||||||
let itemsProcessed = 0;
|
let itemsProcessed = 0;
|
||||||
for (const job of jobs) {
|
for (const job of jobs) {
|
||||||
itemsProcessed += await processOneJob(job);
|
itemsProcessed += await processOneJob(job);
|
||||||
}
|
}
|
||||||
return { skipped: false, jobsConsidered: jobs.length, itemsProcessed };
|
return { skipped: false, jobsConsidered: jobs.length, itemsProcessed, pickupGcRows };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hard-delete pickup rows older than `PICKUP_TTL_MS`. The
|
||||||
|
* pickup-consumer on a healthy client removes each row seconds after
|
||||||
|
* the worker writes it; anything older is residue from a stuck
|
||||||
|
* consumer (all tabs closed, Web-Lock mismatch). Without this sweep
|
||||||
|
* the rows would accumulate without bound in sync_changes.
|
||||||
|
*
|
||||||
|
* Runs against `sync_changes` directly, not via a soft-delete on the
|
||||||
|
* row data — pickup rows are server-write inbox only, never editable
|
||||||
|
* by users; a hard DELETE keeps the table tight.
|
||||||
|
*/
|
||||||
|
async function runPickupGc(): Promise<number> {
|
||||||
|
const sql = getSyncConnection();
|
||||||
|
const cutoff = new Date(Date.now() - PICKUP_TTL_MS).toISOString();
|
||||||
|
const rows = await sql<{ count: string }[]>`
|
||||||
|
WITH deleted AS (
|
||||||
|
DELETE FROM sync_changes
|
||||||
|
WHERE app_id = 'articles'
|
||||||
|
AND table_name = 'articleExtractPickup'
|
||||||
|
AND created_at < ${cutoff}
|
||||||
|
RETURNING 1
|
||||||
|
)
|
||||||
|
SELECT count(*)::text AS count FROM deleted
|
||||||
|
`;
|
||||||
|
const n = parseInt(rows[0]?.count ?? '0', 10);
|
||||||
|
if (n > 0) console.log(`[articles-import] pickup-gc: removed ${n} rows older than 24h`);
|
||||||
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -123,6 +180,23 @@ async function tryAcquireLock(): Promise<boolean> {
|
||||||
async function processOneJob(job: ImportJobRow): Promise<number> {
|
async function processOneJob(job: ImportJobRow): Promise<number> {
|
||||||
const items = await listItemsForJob(job.userId, job.id);
|
const items = await listItemsForJob(job.userId, job.id);
|
||||||
|
|
||||||
|
// Crash-recovery sweep — bounce items that have been 'extracting'
|
||||||
|
// for too long back to 'pending'. Without this, a worker that
|
||||||
|
// crashed (or got OOM'd, restarted mid-extract) leaves orphaned
|
||||||
|
// items in 'extracting' forever; the job never completes. Worker
|
||||||
|
// re-attribution happens via the next tick's claim path.
|
||||||
|
const now = Date.now();
|
||||||
|
for (const it of items) {
|
||||||
|
if (it.state !== 'extracting') continue;
|
||||||
|
const since = it.lastAttemptAt ? Date.parse(it.lastAttemptAt) : 0;
|
||||||
|
if (!Number.isFinite(since)) continue;
|
||||||
|
if (now - since < STALE_EXTRACTING_MS) continue;
|
||||||
|
console.warn(
|
||||||
|
`[articles-import] resetting stale extracting item ${it.id} (job=${job.id}) — ${Math.round((now - since) / 1000)}s old`
|
||||||
|
);
|
||||||
|
await writeItemUpdate(it.userId, it.id, { state: 'pending' });
|
||||||
|
}
|
||||||
|
|
||||||
// Flip 'queued' → 'running' so the UI shows progress.
|
// Flip 'queued' → 'running' so the UI shows progress.
|
||||||
if (job.status === 'queued') {
|
if (job.status === 'queued') {
|
||||||
await writeJobUpdate(job.userId, job.id, {
|
await writeJobUpdate(job.userId, job.id, {
|
||||||
|
|
|
||||||
|
|
@ -26,30 +26,10 @@
|
||||||
|
|
||||||
import { Hono } from 'hono';
|
import { Hono } from 'hono';
|
||||||
import { extractFromUrl, extractFromHtml } from '@mana/shared-rss';
|
import { extractFromUrl, extractFromHtml } from '@mana/shared-rss';
|
||||||
|
import { looksLikeConsentWall } from './consent-wall';
|
||||||
|
|
||||||
const routes = new Hono();
|
const routes = new Hono();
|
||||||
|
|
||||||
const CONSENT_KEYWORDS = [
|
|
||||||
'cookies zustimmen',
|
|
||||||
'cookie consent',
|
|
||||||
'zustimmung',
|
|
||||||
'accept all cookies',
|
|
||||||
'consent to the use',
|
|
||||||
'enable javascript',
|
|
||||||
'javascript is disabled',
|
|
||||||
'please enable',
|
|
||||||
'privacy center',
|
|
||||||
'datenschutzeinstellungen',
|
|
||||||
'datenschutzeinstellungen',
|
|
||||||
];
|
|
||||||
const CONSENT_WORDCOUNT_THRESHOLD = 300;
|
|
||||||
|
|
||||||
function looksLikeConsentWall(content: string, wordCount: number): boolean {
|
|
||||||
if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false;
|
|
||||||
const haystack = content.toLowerCase();
|
|
||||||
return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle));
|
|
||||||
}
|
|
||||||
|
|
||||||
function isValidHttpUrl(url: string): boolean {
|
function isValidHttpUrl(url: string): boolean {
|
||||||
try {
|
try {
|
||||||
const u = new URL(url);
|
const u = new URL(url);
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
import type { Sql } from './connection';
|
import type { Sql } from './connection';
|
||||||
import { withUser } from './connection';
|
import { withUser } from './connection';
|
||||||
|
import { fieldMetaTime } from './field-meta';
|
||||||
import type { AiPolicy } from '@mana/shared-ai';
|
import type { AiPolicy } from '@mana/shared-ai';
|
||||||
|
|
||||||
export interface ServerAgent {
|
export interface ServerAgent {
|
||||||
|
|
@ -54,7 +55,9 @@ interface ChangeRow {
|
||||||
record_id: string;
|
record_id: string;
|
||||||
op: string;
|
op: string;
|
||||||
data: Record<string, unknown> | null;
|
data: Record<string, unknown> | null;
|
||||||
field_meta: Record<string, string> | null;
|
/** See `./field-meta.ts` — wire shape is two-tone (legacy ISO string
|
||||||
|
* vs. F3 `{at, actor, origin}` object). */
|
||||||
|
field_meta: Record<string, unknown> | null;
|
||||||
created_at: Date;
|
created_at: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -180,15 +183,20 @@ export function mergeRaw(rows: readonly ChangeRow[]): Record<string, unknown> |
|
||||||
|
|
||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
if (row.op === 'delete') return null;
|
if (row.op === 'delete') return null;
|
||||||
|
const rowCreatedAt = row.created_at.toISOString();
|
||||||
if (!record) {
|
if (!record) {
|
||||||
record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id };
|
record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id };
|
||||||
fm = { ...(row.field_meta ?? {}) };
|
const initFM = row.field_meta ?? {};
|
||||||
|
fm = {};
|
||||||
|
for (const k of Object.keys(initFM)) {
|
||||||
|
fm[k] = fieldMetaTime(initFM[k]) || rowCreatedAt;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!row.data) continue;
|
if (!row.data) continue;
|
||||||
const rowFM = row.field_meta ?? {};
|
const rowFM = row.field_meta ?? {};
|
||||||
for (const [k, v] of Object.entries(row.data)) {
|
for (const [k, v] of Object.entries(row.data)) {
|
||||||
const serverTime = rowFM[k] ?? row.created_at.toISOString();
|
const serverTime = fieldMetaTime(rowFM[k]) || rowCreatedAt;
|
||||||
const localTime = fm[k] ?? '';
|
const localTime = fm[k] ?? '';
|
||||||
if (serverTime >= localTime) {
|
if (serverTime >= localTime) {
|
||||||
record[k] = v;
|
record[k] = v;
|
||||||
|
|
|
||||||
39
services/mana-ai/src/db/field-meta.ts
Normal file
39
services/mana-ai/src/db/field-meta.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Wire-shape adapter for `sync_changes.field_meta`.
|
||||||
|
*
|
||||||
|
* Two shapes coexist on the wire today:
|
||||||
|
*
|
||||||
|
* - Legacy plaintext writes: { state: 'ISO-8601' }
|
||||||
|
* - Field-meta-overhaul (F3): { state: { at, actor, origin } }
|
||||||
|
*
|
||||||
|
* Every projection / snapshot-refresh in this service performs LWW
|
||||||
|
* merges by string-comparing the per-field timestamp. A naive
|
||||||
|
* `rowFM[k] >= localTime` works for the all-legacy case but silently
|
||||||
|
* collapses the moment one side is an F3 object — the comparison
|
||||||
|
* becomes `'[object Object]' >= 'ISO-…'` (false), the older value
|
||||||
|
* wins and the projection lies.
|
||||||
|
*
|
||||||
|
* This single helper folds both shapes into a comparable ISO string.
|
||||||
|
* Any consumer that reads `field_meta` for LWW MUST go through it.
|
||||||
|
*
|
||||||
|
* Same helper exists in `apps/api/src/modules/articles/import-projection.ts`
|
||||||
|
* (kept duplicated for now — both services treat sync_changes as a
|
||||||
|
* read-only event log; sharing infrastructure code across services
|
||||||
|
* is out of scope here).
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the ISO-string timestamp of a single `field_meta[k]` slot,
|
||||||
|
* regardless of whether the wire format is the legacy plain string
|
||||||
|
* or the F3 `{ at, actor, origin }` object. Returns the empty string
|
||||||
|
* when no usable value is present so the LWW comparison treats the
|
||||||
|
* field as never-stamped (callers fall back to row.created_at).
|
||||||
|
*/
|
||||||
|
export function fieldMetaTime(meta: unknown): string {
|
||||||
|
if (typeof meta === 'string') return meta;
|
||||||
|
if (meta && typeof meta === 'object') {
|
||||||
|
const at = (meta as { at?: unknown }).at;
|
||||||
|
if (typeof at === 'string') return at;
|
||||||
|
}
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
@ -12,6 +12,7 @@
|
||||||
|
|
||||||
import type { MissionGrant } from '@mana/shared-ai';
|
import type { MissionGrant } from '@mana/shared-ai';
|
||||||
import type { Sql } from './connection';
|
import type { Sql } from './connection';
|
||||||
|
import { fieldMetaTime } from './field-meta';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subset of the Mission shape the server needs. Matches
|
* Subset of the Mission shape the server needs. Matches
|
||||||
|
|
@ -44,10 +45,18 @@ interface ChangeRow {
|
||||||
user_id: string;
|
user_id: string;
|
||||||
op: string;
|
op: string;
|
||||||
data: Record<string, unknown> | null;
|
data: Record<string, unknown> | null;
|
||||||
field_meta: Record<string, string> | null;
|
/**
|
||||||
|
* Two-shaped on the wire:
|
||||||
|
* - Legacy plaintext writes: { state: 'ISO-8601' }
|
||||||
|
* - F3 field-meta-overhaul: { state: { at, actor, origin } }
|
||||||
|
* The merge uses `fieldMetaTime` to fold both into a comparable string.
|
||||||
|
*/
|
||||||
|
field_meta: Record<string, unknown> | null;
|
||||||
created_at: Date;
|
created_at: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fieldMetaTime imported from ./field-meta — see comment in that file.
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return all currently-active missions whose `nextRunAt` has passed.
|
* Return all currently-active missions whose `nextRunAt` has passed.
|
||||||
*
|
*
|
||||||
|
|
@ -120,8 +129,9 @@ export function mergeAndFilter(
|
||||||
const prevFM = (existing.__fieldMeta as Record<string, string> | undefined) ?? {};
|
const prevFM = (existing.__fieldMeta as Record<string, string> | undefined) ?? {};
|
||||||
const nextFM = { ...prevFM };
|
const nextFM = { ...prevFM };
|
||||||
if (row.data) {
|
if (row.data) {
|
||||||
|
const rowCreatedAt = row.created_at.toISOString();
|
||||||
for (const [k, v] of Object.entries(row.data)) {
|
for (const [k, v] of Object.entries(row.data)) {
|
||||||
const serverTime = row.field_meta?.[k] ?? row.created_at.toISOString();
|
const serverTime = fieldMetaTime(row.field_meta?.[k]) || rowCreatedAt;
|
||||||
const localTime = prevFM[k] ?? '';
|
const localTime = prevFM[k] ?? '';
|
||||||
if (serverTime >= localTime) {
|
if (serverTime >= localTime) {
|
||||||
existing[k] = v;
|
existing[k] = v;
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
import type { Sql } from './connection';
|
import type { Sql } from './connection';
|
||||||
import { withUser } from './connection';
|
import { withUser } from './connection';
|
||||||
|
import { fieldMetaTime } from './field-meta';
|
||||||
|
|
||||||
interface SnapshotRow {
|
interface SnapshotRow {
|
||||||
user_id: string;
|
user_id: string;
|
||||||
|
|
@ -29,7 +30,9 @@ interface ChangeRow {
|
||||||
record_id: string;
|
record_id: string;
|
||||||
op: string;
|
op: string;
|
||||||
data: Record<string, unknown> | null;
|
data: Record<string, unknown> | null;
|
||||||
field_meta: Record<string, string> | null;
|
/** See `./field-meta.ts` — wire shape is two-tone (legacy ISO string
|
||||||
|
* vs. F3 `{at, actor, origin}` object). */
|
||||||
|
field_meta: Record<string, unknown> | null;
|
||||||
created_at: Date;
|
created_at: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -170,15 +173,20 @@ function mergeRaw(rows: readonly ChangeRow[]): Record<string, unknown> | null {
|
||||||
|
|
||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
if (row.op === 'delete') return null;
|
if (row.op === 'delete') return null;
|
||||||
|
const rowCreatedAt = row.created_at.toISOString();
|
||||||
if (!record) {
|
if (!record) {
|
||||||
record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id };
|
record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id };
|
||||||
fm = { ...(row.field_meta ?? {}) };
|
const initFM = row.field_meta ?? {};
|
||||||
|
fm = {};
|
||||||
|
for (const k of Object.keys(initFM)) {
|
||||||
|
fm[k] = fieldMetaTime(initFM[k]) || rowCreatedAt;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!row.data) continue;
|
if (!row.data) continue;
|
||||||
const rowFM = row.field_meta ?? {};
|
const rowFM = row.field_meta ?? {};
|
||||||
for (const [k, v] of Object.entries(row.data)) {
|
for (const [k, v] of Object.entries(row.data)) {
|
||||||
const serverTime = rowFM[k] ?? row.created_at.toISOString();
|
const serverTime = fieldMetaTime(rowFM[k]) || rowCreatedAt;
|
||||||
const localTime = fm[k] ?? '';
|
const localTime = fm[k] ?? '';
|
||||||
if (serverTime >= localTime) {
|
if (serverTime >= localTime) {
|
||||||
record[k] = v;
|
record[k] = v;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue