diff --git a/apps/api/src/modules/articles/consent-wall.ts b/apps/api/src/modules/articles/consent-wall.ts new file mode 100644 index 000000000..b74cee52f --- /dev/null +++ b/apps/api/src/modules/articles/consent-wall.ts @@ -0,0 +1,37 @@ +/** + * Consent-wall heuristic shared by every server-side article-extract + * path: + * - `/api/v1/articles/extract` and `/extract/html` (single-URL) + * - The bulk-import worker's `extractOneItem` (background) + * + * When the extracted text is suspiciously short AND contains GDPR / + * cookie-consent vocabulary, the server's anonymous fetch most likely + * hit a consent dialog instead of the article itself. The caller can + * use the flag to nudge the user toward the browser-HTML bookmarklet + * (which fetches with the user's existing session cookies) rather + * than silently persisting the GDPR overlay text as the article body. + */ + +const CONSENT_KEYWORDS = [ + 'cookies zustimmen', + 'cookie consent', + 'zustimmung', + 'accept all cookies', + 'consent to the use', + 'enable javascript', + 'javascript is disabled', + 'please enable', + 'privacy center', + 'datenschutz­einstellungen', + 'datenschutzeinstellungen', +]; + +/** Wordcount floor below which the heuristic is considered. Real + * articles are typically >300 words; consent dialogs are <50. */ +const CONSENT_WORDCOUNT_THRESHOLD = 300; + +export function looksLikeConsentWall(content: string, wordCount: number): boolean { + if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false; + const haystack = content.toLowerCase(); + return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle)); +} diff --git a/apps/api/src/modules/articles/field-meta.ts b/apps/api/src/modules/articles/field-meta.ts new file mode 100644 index 000000000..8766cc9c5 --- /dev/null +++ b/apps/api/src/modules/articles/field-meta.ts @@ -0,0 +1,32 @@ +/** + * Wire-shape adapter for `sync_changes.field_meta`. + * + * Two shapes coexist on the wire today: + * + * - Legacy plaintext writes: { state: 'ISO-8601' } + * - Field-meta-overhaul (F3): { state: { at, actor, origin } } + * + * Any LWW projection that string-compares per-field timestamps MUST + * fold both into a comparable form, otherwise the moment one side is + * an F3 object the comparison becomes `'[object Object]' >= 'ISO…'` + * (false), the older value wins and the projection lies. + * + * Sister helper at `services/mana-ai/src/db/field-meta.ts` — same + * logic, deliberately duplicated. Both services treat sync_changes as + * a read-only event log; sharing infrastructure code across services + * (apps/api ↔ services/mana-ai) is out of scope. + */ + +/** Returns the ISO-string timestamp of a single `field_meta[k]` slot, + * regardless of whether the wire format is the legacy plain string + * or the F3 `{ at, actor, origin }` object. Returns the empty string + * when no usable value is present so the LWW comparison treats the + * field as never-stamped (callers fall back to row.created_at). */ +export function fieldMetaTime(meta: unknown): string { + if (typeof meta === 'string') return meta; + if (meta && typeof meta === 'object') { + const at = (meta as { at?: unknown }).at; + if (typeof at === 'string') return at; + } + return ''; +} diff --git a/apps/api/src/modules/articles/import-extractor.ts b/apps/api/src/modules/articles/import-extractor.ts index 01692c2ab..4e21f6cb5 100644 --- a/apps/api/src/modules/articles/import-extractor.ts +++ b/apps/api/src/modules/articles/import-extractor.ts @@ -27,6 +27,7 @@ import { extractFromUrl } from '@mana/shared-rss'; import { makeFieldMeta, type Actor, type FieldOrigin } from '@mana/shared-ai'; import { getSyncConnection } from '../../mcp/sync-db'; +import { looksLikeConsentWall } from './consent-wall'; import type { ImportItemRow } from './import-projection'; const MAX_ATTEMPTS = 3; @@ -213,25 +214,4 @@ async function insertSyncChange(params: InsertParams): Promise { }); } -// ─── Consent-wall heuristic (mirror of routes.ts) ──────────── - -const CONSENT_KEYWORDS = [ - 'cookies zustimmen', - 'cookie consent', - 'zustimmung', - 'accept all cookies', - 'consent to the use', - 'enable javascript', - 'javascript is disabled', - 'please enable', - 'privacy center', - 'datenschutz­einstellungen', - 'datenschutzeinstellungen', -]; -const CONSENT_WORDCOUNT_THRESHOLD = 300; - -function looksLikeConsentWall(content: string, wordCount: number): boolean { - if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false; - const haystack = content.toLowerCase(); - return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle)); -} +// looksLikeConsentWall lives in ./consent-wall.ts — shared with routes.ts. diff --git a/apps/api/src/modules/articles/import-projection.ts b/apps/api/src/modules/articles/import-projection.ts index ea7bf7f3e..5233818c1 100644 --- a/apps/api/src/modules/articles/import-projection.ts +++ b/apps/api/src/modules/articles/import-projection.ts @@ -19,34 +19,20 @@ */ import { getSyncConnection } from '../../mcp/sync-db'; +import { fieldMetaTime } from './field-meta'; type Row = Record; -/** - * `field_meta` is one of two shapes on the wire: - * - Legacy plaintext writes: `{[fieldName]: ISOString}` - * - Field-meta-overhaul writes: `{[fieldName]: {at, actor, origin}}` - * `fieldMetaTime()` below normalises both into the comparable ISO string. - */ interface ChangeRow { user_id: string; record_id: string; op: string; data: Row | null; + /** See `./field-meta.ts` — wire shape is two-tone (legacy ISO string + * vs. F3 `{at, actor, origin}` object). */ field_meta: Record | null; created_at: Date; } -/** Pull the timestamp out of either shape. Falls back to empty string - * so the LWW comparison never throws on undefined. */ -function fieldMetaTime(meta: unknown): string { - if (typeof meta === 'string') return meta; - if (meta && typeof meta === 'object') { - const at = (meta as { at?: unknown }).at; - if (typeof at === 'string') return at; - } - return ''; -} - export interface ImportJobRow { id: string; userId: string; diff --git a/apps/api/src/modules/articles/import-worker.ts b/apps/api/src/modules/articles/import-worker.ts index b5ed4e76e..91bed4695 100644 --- a/apps/api/src/modules/articles/import-worker.ts +++ b/apps/api/src/modules/articles/import-worker.ts @@ -31,8 +31,30 @@ import { } from './import-projection'; import { extractOneItem, writeItemUpdate, writeJobUpdate } from './import-extractor'; +/** Counts ticks so the pickup-GC sweep can run every Nth one rather + * than on every 2-second cycle (the DELETE is cheap but not free). */ +let tickCount = 0; +/** Run pickup-GC every 30 ticks ≈ once per minute. */ +const PICKUP_GC_EVERY_N_TICKS = 30; + const TICK_INTERVAL_MS = 2_000; const PER_JOB_CONCURRENCY = 3; +/** + * If an item has been in `state='extracting'` longer than this without + * a follow-up state-write, it's orphaned (worker crashed mid-fetch, + * pod restart, OOM, …) and gets bounced back to `pending` so the next + * tick can re-claim it. + * + * Tuned so a slow but live extraction (15 s shared-rss fetch timeout + + * a few seconds of JSDOM parse on a 2 MB page) doesn't reset + * prematurely — 5 minutes is comfortable headroom. + */ +const STALE_EXTRACTING_MS = 5 * 60 * 1000; +/** TTL for `articleExtractPickup` rows. The pickup-consumer normally + * deletes them seconds after the worker writes them; anything older + * than this is garbage from a stuck consumer (all tabs closed, + * Web-Lock mismatch, …) and would otherwise accumulate without bound. */ +const PICKUP_TTL_MS = 24 * 60 * 60 * 1000; /** Fixed int8 lock key — derived from the ASCII bytes of 'ARTI'. */ const ADVISORY_LOCK_KEY = 0x4152_5449; @@ -88,16 +110,51 @@ export async function runTickOnce(): Promise<{ skipped: boolean; jobsConsidered: number; itemsProcessed: number; + pickupGcRows?: number; }> { if (!(await tryAcquireLock())) { return { skipped: true, jobsConsidered: 0, itemsProcessed: 0 }; } + tickCount++; + let pickupGcRows: number | undefined; + if (tickCount % PICKUP_GC_EVERY_N_TICKS === 0) { + pickupGcRows = await runPickupGc(); + } const jobs = await listClaimableJobs(); let itemsProcessed = 0; for (const job of jobs) { itemsProcessed += await processOneJob(job); } - return { skipped: false, jobsConsidered: jobs.length, itemsProcessed }; + return { skipped: false, jobsConsidered: jobs.length, itemsProcessed, pickupGcRows }; +} + +/** + * Hard-delete pickup rows older than `PICKUP_TTL_MS`. The + * pickup-consumer on a healthy client removes each row seconds after + * the worker writes it; anything older is residue from a stuck + * consumer (all tabs closed, Web-Lock mismatch). Without this sweep + * the rows would accumulate without bound in sync_changes. + * + * Runs against `sync_changes` directly, not via a soft-delete on the + * row data — pickup rows are server-write inbox only, never editable + * by users; a hard DELETE keeps the table tight. + */ +async function runPickupGc(): Promise { + const sql = getSyncConnection(); + const cutoff = new Date(Date.now() - PICKUP_TTL_MS).toISOString(); + const rows = await sql<{ count: string }[]>` + WITH deleted AS ( + DELETE FROM sync_changes + WHERE app_id = 'articles' + AND table_name = 'articleExtractPickup' + AND created_at < ${cutoff} + RETURNING 1 + ) + SELECT count(*)::text AS count FROM deleted + `; + const n = parseInt(rows[0]?.count ?? '0', 10); + if (n > 0) console.log(`[articles-import] pickup-gc: removed ${n} rows older than 24h`); + return n; } /** @@ -123,6 +180,23 @@ async function tryAcquireLock(): Promise { async function processOneJob(job: ImportJobRow): Promise { const items = await listItemsForJob(job.userId, job.id); + // Crash-recovery sweep — bounce items that have been 'extracting' + // for too long back to 'pending'. Without this, a worker that + // crashed (or got OOM'd, restarted mid-extract) leaves orphaned + // items in 'extracting' forever; the job never completes. Worker + // re-attribution happens via the next tick's claim path. + const now = Date.now(); + for (const it of items) { + if (it.state !== 'extracting') continue; + const since = it.lastAttemptAt ? Date.parse(it.lastAttemptAt) : 0; + if (!Number.isFinite(since)) continue; + if (now - since < STALE_EXTRACTING_MS) continue; + console.warn( + `[articles-import] resetting stale extracting item ${it.id} (job=${job.id}) — ${Math.round((now - since) / 1000)}s old` + ); + await writeItemUpdate(it.userId, it.id, { state: 'pending' }); + } + // Flip 'queued' → 'running' so the UI shows progress. if (job.status === 'queued') { await writeJobUpdate(job.userId, job.id, { diff --git a/apps/api/src/modules/articles/routes.ts b/apps/api/src/modules/articles/routes.ts index 8772a2f39..060d5e5fa 100644 --- a/apps/api/src/modules/articles/routes.ts +++ b/apps/api/src/modules/articles/routes.ts @@ -26,30 +26,10 @@ import { Hono } from 'hono'; import { extractFromUrl, extractFromHtml } from '@mana/shared-rss'; +import { looksLikeConsentWall } from './consent-wall'; const routes = new Hono(); -const CONSENT_KEYWORDS = [ - 'cookies zustimmen', - 'cookie consent', - 'zustimmung', - 'accept all cookies', - 'consent to the use', - 'enable javascript', - 'javascript is disabled', - 'please enable', - 'privacy center', - 'datenschutz­einstellungen', - 'datenschutzeinstellungen', -]; -const CONSENT_WORDCOUNT_THRESHOLD = 300; - -function looksLikeConsentWall(content: string, wordCount: number): boolean { - if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false; - const haystack = content.toLowerCase(); - return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle)); -} - function isValidHttpUrl(url: string): boolean { try { const u = new URL(url); diff --git a/services/mana-ai/src/db/agents-projection.ts b/services/mana-ai/src/db/agents-projection.ts index 568a5ec22..2b783f17d 100644 --- a/services/mana-ai/src/db/agents-projection.ts +++ b/services/mana-ai/src/db/agents-projection.ts @@ -19,6 +19,7 @@ import type { Sql } from './connection'; import { withUser } from './connection'; +import { fieldMetaTime } from './field-meta'; import type { AiPolicy } from '@mana/shared-ai'; export interface ServerAgent { @@ -54,7 +55,9 @@ interface ChangeRow { record_id: string; op: string; data: Record | null; - field_meta: Record | null; + /** See `./field-meta.ts` — wire shape is two-tone (legacy ISO string + * vs. F3 `{at, actor, origin}` object). */ + field_meta: Record | null; created_at: Date; } @@ -180,15 +183,20 @@ export function mergeRaw(rows: readonly ChangeRow[]): Record | for (const row of rows) { if (row.op === 'delete') return null; + const rowCreatedAt = row.created_at.toISOString(); if (!record) { record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id }; - fm = { ...(row.field_meta ?? {}) }; + const initFM = row.field_meta ?? {}; + fm = {}; + for (const k of Object.keys(initFM)) { + fm[k] = fieldMetaTime(initFM[k]) || rowCreatedAt; + } continue; } if (!row.data) continue; const rowFM = row.field_meta ?? {}; for (const [k, v] of Object.entries(row.data)) { - const serverTime = rowFM[k] ?? row.created_at.toISOString(); + const serverTime = fieldMetaTime(rowFM[k]) || rowCreatedAt; const localTime = fm[k] ?? ''; if (serverTime >= localTime) { record[k] = v; diff --git a/services/mana-ai/src/db/field-meta.ts b/services/mana-ai/src/db/field-meta.ts new file mode 100644 index 000000000..21c4e05bd --- /dev/null +++ b/services/mana-ai/src/db/field-meta.ts @@ -0,0 +1,39 @@ +/** + * Wire-shape adapter for `sync_changes.field_meta`. + * + * Two shapes coexist on the wire today: + * + * - Legacy plaintext writes: { state: 'ISO-8601' } + * - Field-meta-overhaul (F3): { state: { at, actor, origin } } + * + * Every projection / snapshot-refresh in this service performs LWW + * merges by string-comparing the per-field timestamp. A naive + * `rowFM[k] >= localTime` works for the all-legacy case but silently + * collapses the moment one side is an F3 object — the comparison + * becomes `'[object Object]' >= 'ISO-…'` (false), the older value + * wins and the projection lies. + * + * This single helper folds both shapes into a comparable ISO string. + * Any consumer that reads `field_meta` for LWW MUST go through it. + * + * Same helper exists in `apps/api/src/modules/articles/import-projection.ts` + * (kept duplicated for now — both services treat sync_changes as a + * read-only event log; sharing infrastructure code across services + * is out of scope here). + */ + +/** + * Returns the ISO-string timestamp of a single `field_meta[k]` slot, + * regardless of whether the wire format is the legacy plain string + * or the F3 `{ at, actor, origin }` object. Returns the empty string + * when no usable value is present so the LWW comparison treats the + * field as never-stamped (callers fall back to row.created_at). + */ +export function fieldMetaTime(meta: unknown): string { + if (typeof meta === 'string') return meta; + if (meta && typeof meta === 'object') { + const at = (meta as { at?: unknown }).at; + if (typeof at === 'string') return at; + } + return ''; +} diff --git a/services/mana-ai/src/db/missions-projection.ts b/services/mana-ai/src/db/missions-projection.ts index 76e2ae47b..236c81ae4 100644 --- a/services/mana-ai/src/db/missions-projection.ts +++ b/services/mana-ai/src/db/missions-projection.ts @@ -12,6 +12,7 @@ import type { MissionGrant } from '@mana/shared-ai'; import type { Sql } from './connection'; +import { fieldMetaTime } from './field-meta'; /** * Subset of the Mission shape the server needs. Matches @@ -44,10 +45,18 @@ interface ChangeRow { user_id: string; op: string; data: Record | null; - field_meta: Record | null; + /** + * Two-shaped on the wire: + * - Legacy plaintext writes: { state: 'ISO-8601' } + * - F3 field-meta-overhaul: { state: { at, actor, origin } } + * The merge uses `fieldMetaTime` to fold both into a comparable string. + */ + field_meta: Record | null; created_at: Date; } +// fieldMetaTime imported from ./field-meta — see comment in that file. + /** * Return all currently-active missions whose `nextRunAt` has passed. * @@ -120,8 +129,9 @@ export function mergeAndFilter( const prevFM = (existing.__fieldMeta as Record | undefined) ?? {}; const nextFM = { ...prevFM }; if (row.data) { + const rowCreatedAt = row.created_at.toISOString(); for (const [k, v] of Object.entries(row.data)) { - const serverTime = row.field_meta?.[k] ?? row.created_at.toISOString(); + const serverTime = fieldMetaTime(row.field_meta?.[k]) || rowCreatedAt; const localTime = prevFM[k] ?? ''; if (serverTime >= localTime) { existing[k] = v; diff --git a/services/mana-ai/src/db/snapshot-refresh.ts b/services/mana-ai/src/db/snapshot-refresh.ts index eee4198a5..c562367a2 100644 --- a/services/mana-ai/src/db/snapshot-refresh.ts +++ b/services/mana-ai/src/db/snapshot-refresh.ts @@ -16,6 +16,7 @@ import type { Sql } from './connection'; import { withUser } from './connection'; +import { fieldMetaTime } from './field-meta'; interface SnapshotRow { user_id: string; @@ -29,7 +30,9 @@ interface ChangeRow { record_id: string; op: string; data: Record | null; - field_meta: Record | null; + /** See `./field-meta.ts` — wire shape is two-tone (legacy ISO string + * vs. F3 `{at, actor, origin}` object). */ + field_meta: Record | null; created_at: Date; } @@ -170,15 +173,20 @@ function mergeRaw(rows: readonly ChangeRow[]): Record | null { for (const row of rows) { if (row.op === 'delete') return null; + const rowCreatedAt = row.created_at.toISOString(); if (!record) { record = row.data ? { id: row.record_id, ...row.data } : { id: row.record_id }; - fm = { ...(row.field_meta ?? {}) }; + const initFM = row.field_meta ?? {}; + fm = {}; + for (const k of Object.keys(initFM)) { + fm[k] = fieldMetaTime(initFM[k]) || rowCreatedAt; + } continue; } if (!row.data) continue; const rowFM = row.field_meta ?? {}; for (const [k, v] of Object.entries(row.data)) { - const serverTime = rowFM[k] ?? row.created_at.toISOString(); + const serverTime = fieldMetaTime(rowFM[k]) || rowCreatedAt; const localTime = fm[k] ?? ''; if (serverTime >= localTime) { record[k] = v;