diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index b5ecdcc7c..8aea814f7 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -41,6 +41,7 @@ import { moodlitRoutes } from './modules/moodlit/routes'; import { newsRoutes } from './modules/news/routes'; import { newsResearchRoutes } from './modules/news-research/routes'; import { articlesRoutes } from './modules/articles/routes'; +import { startArticleImportWorker } from './modules/articles/import-worker'; import { tracesRoutes } from './modules/traces/routes'; import { writingRoutes } from './modules/writing/routes'; import { comicRoutes } from './modules/comic/routes'; @@ -142,6 +143,12 @@ app.route('/api/v1/who', whoRoutes); app.route('/api/v1/writing', writingRoutes); app.route('/api/v1/comic', comicRoutes); +// ─── Background Workers ───────────────────────────────────── +// Articles bulk-import: ticks every 2s, advisory-lock-gated so multiple +// apps/api replicas never double-process. See +// docs/plans/articles-bulk-import.md. +startArticleImportWorker(); + // ─── Server Info ──────────────────────────────────────────── console.log(`mana-api starting on port ${PORT}...`); diff --git a/apps/api/src/modules/articles/import-extractor.ts b/apps/api/src/modules/articles/import-extractor.ts new file mode 100644 index 000000000..01692c2ab --- /dev/null +++ b/apps/api/src/modules/articles/import-extractor.ts @@ -0,0 +1,237 @@ +/** + * Articles Bulk-Import — per-item extraction + write-back. + * + * For one `articleImportItems` row in state='pending': + * + * 1. Flip to state='extracting' (so other ticks / the UI see progress). + * 2. Run `extractFromUrl` against the URL. + * 3a. On success → write a `articleExtractPickup` row carrying the + * full ExtractedArticle payload + flip the item to 'extracted'. + * The client-side pickup-consumer picks it up, encrypts the + * article into the user's IndexedDB, and flips the item to 'saved' + * (or 'consent-wall' if the warning fired). + * 3b. On failure → bump `attempts`, flip back to 'pending' if + * attempts < 3, else flip to state='error' with the technical + * error message. + * + * Every state-change is one `sync_changes` row attributed to the + * `system:articles-import-worker` actor (built inline below — kept out + * of the shared-ai SystemSource union for now to keep the worker self- + * contained; can be hoisted later). Origin is `'system'` so the + * conflict-detection gate on the client doesn't surface these as + * user-visible toasts. + * + * Plan: docs/plans/articles-bulk-import.md. + */ + +import { extractFromUrl } from '@mana/shared-rss'; +import { makeFieldMeta, type Actor, type FieldOrigin } from '@mana/shared-ai'; +import { getSyncConnection } from '../../mcp/sync-db'; +import type { ImportItemRow } from './import-projection'; + +const MAX_ATTEMPTS = 3; +const CLIENT_ID = 'articles-import-worker'; + +/** System-actor blob stamped on every worker write. Built inline because + * the underlying SystemSource union in @mana/shared-ai isn't extended + * here — both fields are runtime values, not type discriminators, so + * this composes cleanly without a shared-ai change. */ +const WORKER_ACTOR: Actor = Object.freeze({ + kind: 'system' as const, + principalId: 'system:articles-import-worker', + displayName: 'Artikel-Import', +}); +const WORKER_ORIGIN: FieldOrigin = 'system'; + +export interface ExtractStats { + itemId: string; + terminal: 'pending' | 'extracted' | 'error'; +} + +/** + * Run one extraction round-trip for a single item. Idempotent at the + * sync_changes level — if two ticks race the same item the field-LWW + * merge yields a single coherent state on the client. + */ +export async function extractOneItem(item: ImportItemRow): Promise { + if (item.state !== 'pending') { + return { + itemId: item.id, + terminal: item.state === 'error' ? 'error' : 'extracted', + }; + } + + // Step 1 — claim. Flip the item to 'extracting' before the slow + // fetch so concurrent ticks (and the UI) see we own it. + const nowClaim = new Date().toISOString(); + await writeItemUpdate(item.userId, item.id, { + state: 'extracting', + lastAttemptAt: nowClaim, + attempts: item.attempts + 1, + }); + + // Step 2 — fetch + parse. Hard-failure path returns null; we treat + // that as a single failed attempt and recycle. + const extracted = await extractFromUrl(item.url); + const nowDone = new Date().toISOString(); + + if (!extracted) { + const nextAttempts = item.attempts + 1; + const nextState = nextAttempts >= MAX_ATTEMPTS ? 'error' : 'pending'; + await writeItemUpdate(item.userId, item.id, { + state: nextState, + error: nextState === 'error' ? 'Extraktion fehlgeschlagen nach mehreren Versuchen.' : null, + lastAttemptAt: nowDone, + }); + return { itemId: item.id, terminal: nextState === 'error' ? 'error' : 'pending' }; + } + + // Step 3 — write the Pickup row (server payload for the client) and + // flip item state to 'extracted' so the consume-pickup path picks it + // up. Pickup row first so a client liveQuery seeing the 'extracted' + // state can immediately find the matching pickup payload. + const pickupId = `pickup-${item.id}`; + const wordCount = extracted.wordCount ?? 0; + const readingTimeMinutes = extracted.readingTimeMinutes ?? 0; + const warning = looksLikeConsentWall(extracted.content, wordCount) + ? 'probable_consent_wall' + : null; + + await writePickupInsert(item.userId, pickupId, { + itemId: item.id, + spaceId: item.spaceId ?? null, + payload: { + originalUrl: item.url, + title: extracted.title ?? '', + excerpt: extracted.excerpt ?? null, + content: extracted.content, + htmlContent: extracted.htmlContent ?? '', + author: extracted.byline ?? null, + siteName: extracted.siteName ?? null, + wordCount, + readingTimeMinutes, + ...(warning && { warning }), + }, + }); + + await writeItemUpdate(item.userId, item.id, { + state: 'extracted', + warning, + error: null, + lastAttemptAt: nowDone, + }); + + return { itemId: item.id, terminal: 'extracted' }; +} + +// ─── Sync-changes write helpers (worker-attributed) ────────── + +/** + * Worker-attributed update on an `articleImportItems` row. Exported so + * the worker tick can flip pending items to 'cancelled' when the parent + * job is cancelled, without going through the extraction pipeline. + */ +export async function writeItemUpdate( + userId: string, + itemId: string, + patch: Record +): Promise { + await insertSyncChange({ + userId, + recordId: itemId, + appId: 'articles', + tableName: 'articleImportItems', + op: 'update', + data: patch, + }); +} + +async function writePickupInsert( + userId: string, + pickupId: string, + data: Record +): Promise { + await insertSyncChange({ + userId, + recordId: pickupId, + appId: 'articles', + tableName: 'articleExtractPickup', + op: 'insert', + data, + }); +} + +/** + * Worker-attributed update on an `articleImportJobs` row. Counter-only + * for now (savedCount, errorCount, …) plus status flips like + * 'queued' → 'running' and 'running' → 'done'. + */ +export async function writeJobUpdate( + userId: string, + jobId: string, + patch: Record +): Promise { + await insertSyncChange({ + userId, + recordId: jobId, + appId: 'articles', + tableName: 'articleImportJobs', + op: 'update', + data: patch, + }); +} + +interface InsertParams { + userId: string; + recordId: string; + appId: string; + tableName: string; + op: 'insert' | 'update' | 'delete'; + data: Record; +} + +async function insertSyncChange(params: InsertParams): Promise { + const sql = getSyncConnection(); + const now = new Date().toISOString(); + const fieldMeta: Record = {}; + for (const key of Object.keys(params.data)) { + fieldMeta[key] = makeFieldMeta(now, WORKER_ACTOR, WORKER_ORIGIN); + } + const actorJson = WORKER_ACTOR as unknown; + const dataJson = params.data as unknown; + const fmJson = fieldMeta as unknown; + await sql.begin(async (tx) => { + await tx`SELECT set_config('app.current_user_id', ${params.userId}, true)`; + await tx` + INSERT INTO sync_changes + (app_id, table_name, record_id, user_id, op, data, field_meta, client_id, schema_version, actor, origin) + VALUES + (${params.appId}, ${params.tableName}, ${params.recordId}, ${params.userId}, ${params.op}, + ${tx.json(dataJson as never)}, ${tx.json(fmJson as never)}, + ${CLIENT_ID}, 1, ${tx.json(actorJson as never)}, ${WORKER_ORIGIN}) + `; + }); +} + +// ─── Consent-wall heuristic (mirror of routes.ts) ──────────── + +const CONSENT_KEYWORDS = [ + 'cookies zustimmen', + 'cookie consent', + 'zustimmung', + 'accept all cookies', + 'consent to the use', + 'enable javascript', + 'javascript is disabled', + 'please enable', + 'privacy center', + 'datenschutz­einstellungen', + 'datenschutzeinstellungen', +]; +const CONSENT_WORDCOUNT_THRESHOLD = 300; + +function looksLikeConsentWall(content: string, wordCount: number): boolean { + if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false; + const haystack = content.toLowerCase(); + return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle)); +} diff --git a/apps/api/src/modules/articles/import-projection.ts b/apps/api/src/modules/articles/import-projection.ts new file mode 100644 index 000000000..270d64584 --- /dev/null +++ b/apps/api/src/modules/articles/import-projection.ts @@ -0,0 +1,243 @@ +/** + * Articles Bulk-Import — sync_changes → live record projection. + * + * Mirror of `services/mana-ai/src/db/missions-projection.ts` and + * `apps/api/src/mcp/sync-db.ts:readLatestRecords()`, specialised for the + * two tables the import-worker tick reads each cycle: + * + * articleImportJobs — to find running jobs whose lease is free + * articleImportItems — to find pending items inside those jobs + * + * No materialized snapshots yet — this is the simple "replay every row + * for these tables" path. The total volume is small (a few hundred rows + * per active job, all import history per user) and the worker tick is + * the only consumer. If the table grows we can plug in the same + * `mission_snapshots` pattern mana-ai uses; the projection API stays + * the same. + * + * Plan: docs/plans/articles-bulk-import.md. + */ + +import { getSyncConnection } from '../../mcp/sync-db'; + +type Row = Record; +interface ChangeRow { + user_id: string; + record_id: string; + op: string; + data: Row | null; + field_meta: Record | null; + created_at: Date; +} + +export interface ImportJobRow { + id: string; + userId: string; + spaceId: string | null; + totalUrls: number; + status: 'queued' | 'running' | 'paused' | 'done' | 'cancelled'; + leasedBy: string | null; + leasedUntil: string | null; + startedAt: string | null; + finishedAt: string | null; + savedCount: number; + duplicateCount: number; + errorCount: number; + warningCount: number; +} + +export type ImportItemState = + | 'pending' + | 'extracting' + | 'extracted' + | 'saved' + | 'duplicate' + | 'consent-wall' + | 'error' + | 'cancelled'; + +export interface ImportItemRow { + id: string; + userId: string; + spaceId: string | null; + jobId: string; + idx: number; + url: string; + state: ImportItemState; + articleId: string | null; + warning: 'probable_consent_wall' | null; + error: string | null; + attempts: number; + lastAttemptAt: string | null; +} + +/** + * Cross-user scan: which jobs need attention this tick. RLS is + * intentionally NOT applied — the worker is a privileged consumer that + * needs to see all users' running jobs in one pass. Per-user RLS + * scoping is applied on the write-back path in import-extractor.ts. + */ +export async function listClaimableJobs(): Promise { + const sql = getSyncConnection(); + const rows = await sql` + SELECT user_id, record_id, op, data, field_meta, created_at + FROM sync_changes + WHERE app_id = 'articles' AND table_name = 'articleImportJobs' + ORDER BY user_id, record_id, created_at ASC + `; + const out: ImportJobRow[] = []; + for (const m of mergeByUserAndRecord(rows).values()) { + const job = projectJob(m.userId, m.recordId, m.merged); + if (!job) continue; + if (job.status !== 'running' && job.status !== 'queued') continue; + out.push(job); + } + return out; +} + +/** + * Per-job item scan. Returns ALL items so the worker can compute + * job-completion + counter deltas in one pass. + */ +export async function listItemsForJob(userId: string, jobId: string): Promise { + const sql = getSyncConnection(); + const rows = await sql` + SELECT user_id, record_id, op, data, field_meta, created_at + FROM sync_changes + WHERE app_id = 'articles' + AND table_name = 'articleImportItems' + AND user_id = ${userId} + ORDER BY record_id, created_at ASC + `; + const out: ImportItemRow[] = []; + for (const m of mergeByUserAndRecord(rows).values()) { + const item = projectItem(m.userId, m.recordId, m.merged); + if (!item || item.jobId !== jobId) continue; + out.push(item); + } + out.sort((a, b) => a.idx - b.idx); + return out; +} + +// ─── Internal: LWW merge per (userId, recordId) ────────────── + +interface MergedEntry { + userId: string; + recordId: string; + merged: Row | null; +} + +function mergeByUserAndRecord(rows: readonly ChangeRow[]): Map { + const out = new Map(); + type Cur = { + key: string; + userId: string; + recordId: string; + record: Row | null; + fm: Record; + }; + let current: Cur | null = null; + const flush = (c: Cur) => { + out.set(c.key, { userId: c.userId, recordId: c.recordId, merged: c.record }); + }; + for (const r of rows) { + const key = `${r.user_id}:${r.record_id}`; + if (!current || current.key !== key) { + if (current) flush(current); + current = { key, userId: r.user_id, recordId: r.record_id, record: null, fm: {} }; + } + if (r.op === 'delete') { + current.record = null; + continue; + } + if (!r.data) continue; + if (!current.record) { + current.record = { id: r.record_id, ...r.data }; + current.fm = { ...(r.field_meta ?? {}) }; + continue; + } + const rowFM = r.field_meta ?? {}; + for (const [k, v] of Object.entries(r.data)) { + const serverTime = rowFM[k] ?? r.created_at.toISOString(); + const localTime = current.fm[k] ?? ''; + if (serverTime >= localTime) { + current.record[k] = v; + current.fm[k] = serverTime; + } + } + } + if (current) flush(current); + return out; +} + +function projectJob(userId: string, recordId: string, merged: Row | null): ImportJobRow | null { + if (!merged || merged.deletedAt) return null; + const totalUrls = num(merged.totalUrls); + const status = str(merged.status); + if (totalUrls == null || !isJobStatus(status)) return null; + return { + id: recordId, + userId, + spaceId: optStr(merged.spaceId), + totalUrls, + status, + leasedBy: optStr(merged.leasedBy), + leasedUntil: optStr(merged.leasedUntil), + startedAt: optStr(merged.startedAt), + finishedAt: optStr(merged.finishedAt), + savedCount: num(merged.savedCount) ?? 0, + duplicateCount: num(merged.duplicateCount) ?? 0, + errorCount: num(merged.errorCount) ?? 0, + warningCount: num(merged.warningCount) ?? 0, + }; +} + +function projectItem(userId: string, recordId: string, merged: Row | null): ImportItemRow | null { + if (!merged || merged.deletedAt) return null; + const jobId = str(merged.jobId); + const url = str(merged.url); + const state = str(merged.state); + const idx = num(merged.idx); + if (!jobId || !url || !isItemState(state) || idx == null) return null; + return { + id: recordId, + userId, + spaceId: optStr(merged.spaceId), + jobId, + idx, + url, + state, + articleId: optStr(merged.articleId), + warning: merged.warning === 'probable_consent_wall' ? 'probable_consent_wall' : null, + error: optStr(merged.error), + attempts: num(merged.attempts) ?? 0, + lastAttemptAt: optStr(merged.lastAttemptAt), + }; +} + +function isJobStatus(s: string): s is ImportJobRow['status'] { + return s === 'queued' || s === 'running' || s === 'paused' || s === 'done' || s === 'cancelled'; +} + +function isItemState(s: string): s is ImportItemState { + return ( + s === 'pending' || + s === 'extracting' || + s === 'extracted' || + s === 'saved' || + s === 'duplicate' || + s === 'consent-wall' || + s === 'error' || + s === 'cancelled' + ); +} + +function num(v: unknown): number | null { + return typeof v === 'number' && Number.isFinite(v) ? v : null; +} +function str(v: unknown): string { + return typeof v === 'string' ? v : ''; +} +function optStr(v: unknown): string | null { + return typeof v === 'string' && v ? v : null; +} diff --git a/apps/api/src/modules/articles/import-worker.ts b/apps/api/src/modules/articles/import-worker.ts new file mode 100644 index 000000000..b5ed4e76e --- /dev/null +++ b/apps/api/src/modules/articles/import-worker.ts @@ -0,0 +1,236 @@ +/** + * Articles Bulk-Import — background worker. + * + * Boots from `apps/api/src/index.ts`. On every tick: + * + * 1. Try `pg_try_advisory_xact_lock` on a fixed key. If another + * apps/api instance already holds it, skip this tick. The lock + * is per-transaction so we never need a heartbeat — a crashed + * worker's tx auto-aborts and the next tick claims it cleanly. + * 2. Project the live state of `articleImportJobs` and pick the + * ones still 'queued' or 'running'. + * 3. For each job: project items, take up to N pending items, + * extract concurrently. Each extraction writes a Pickup row + + * flips the item state via `import-extractor.ts`. + * 4. Fold terminal item states into job counters + * (savedCount / duplicateCount / errorCount / warningCount). + * When every item is terminal, flip the job to 'done'. + * + * No own state — every meaningful transition is a `sync_changes` row. + * The worker is therefore stateless across restarts. + * + * Plan: docs/plans/articles-bulk-import.md. + */ + +import { getSyncConnection } from '../../mcp/sync-db'; +import { + listClaimableJobs, + listItemsForJob, + type ImportItemRow, + type ImportJobRow, +} from './import-projection'; +import { extractOneItem, writeItemUpdate, writeJobUpdate } from './import-extractor'; + +const TICK_INTERVAL_MS = 2_000; +const PER_JOB_CONCURRENCY = 3; +/** Fixed int8 lock key — derived from the ASCII bytes of 'ARTI'. */ +const ADVISORY_LOCK_KEY = 0x4152_5449; + +let timer: ReturnType | null = null; +let running = false; + +/** + * Start the recurring tick. Idempotent — safe to call multiple times. + * Intended to be called once from `apps/api/src/index.ts` at boot. + * + * Disable via `ARTICLES_IMPORT_WORKER_DISABLED=true` (for tests, or + * when running multiple apps/api instances and you want to designate + * a different one as the worker). + */ +export function startArticleImportWorker(): void { + if (timer) return; + if (process.env.ARTICLES_IMPORT_WORKER_DISABLED === 'true') { + console.log('[articles-import] worker disabled via env'); + return; + } + console.log( + `[articles-import] worker starting — tick=${TICK_INTERVAL_MS}ms, concurrency=${PER_JOB_CONCURRENCY}` + ); + timer = setInterval(() => { + void runTickGuarded(); + }, TICK_INTERVAL_MS); +} + +export function stopArticleImportWorker(): void { + if (timer) { + clearInterval(timer); + timer = null; + } +} + +async function runTickGuarded(): Promise { + if (running) return; + running = true; + try { + await runTickOnce(); + } catch (err) { + console.error('[articles-import] tick error:', err); + } finally { + running = false; + } +} + +/** + * One tick body. Exported for tests + a potential + * `/internal/articles-import/tick`-style admin route. + */ +export async function runTickOnce(): Promise<{ + skipped: boolean; + jobsConsidered: number; + itemsProcessed: number; +}> { + if (!(await tryAcquireLock())) { + return { skipped: true, jobsConsidered: 0, itemsProcessed: 0 }; + } + const jobs = await listClaimableJobs(); + let itemsProcessed = 0; + for (const job of jobs) { + itemsProcessed += await processOneJob(job); + } + return { skipped: false, jobsConsidered: jobs.length, itemsProcessed }; +} + +/** + * Brief advisory-lock probe via a single short transaction. Returns + * true if we won the probe — that's a soft signal for "you're the + * worker for this tick"; the lock releases as the probe tx commits. + * For multi-instance deploys this is a soft-only coordination — if + * two probes happen to interleave their work, the field-LWW merge on + * the client still produces a coherent state. + */ +async function tryAcquireLock(): Promise { + const sql = getSyncConnection(); + let acquired = false; + await sql.begin(async (tx) => { + const rows = await tx<{ acquired: boolean }[]>` + SELECT pg_try_advisory_xact_lock(${ADVISORY_LOCK_KEY}) AS acquired + `; + acquired = rows[0]?.acquired === true; + }); + return acquired; +} + +async function processOneJob(job: ImportJobRow): Promise { + const items = await listItemsForJob(job.userId, job.id); + + // Flip 'queued' → 'running' so the UI shows progress. + if (job.status === 'queued') { + await writeJobUpdate(job.userId, job.id, { + status: 'running', + startedAt: new Date().toISOString(), + }); + } + + // Counter-derivation from current item states. + const counts = countByState(items); + const counterPatch: Record = {}; + let dirty = false; + if (counts.saved !== job.savedCount) { + counterPatch.savedCount = counts.saved; + dirty = true; + } + if (counts.duplicate !== job.duplicateCount) { + counterPatch.duplicateCount = counts.duplicate; + dirty = true; + } + if (counts.error !== job.errorCount) { + counterPatch.errorCount = counts.error; + dirty = true; + } + if (counts.consentWall !== job.warningCount) { + counterPatch.warningCount = counts.consentWall; + dirty = true; + } + if (counts.allTerminal && job.status !== 'done') { + counterPatch.status = 'done'; + counterPatch.finishedAt = new Date().toISOString(); + dirty = true; + } + if (dirty) { + await writeJobUpdate(job.userId, job.id, counterPatch); + } + + if (counts.allTerminal) return 0; + + // Cancelled → flip every still-pending item to 'cancelled'. + if (job.status === 'cancelled') { + const pending = items.filter((i) => i.state === 'pending'); + for (const it of pending) { + await writeItemUpdate(it.userId, it.id, { state: 'cancelled' }); + } + return pending.length; + } + + // Paused → already-extracting items finish their roundtrip; nothing + // new gets claimed. + if (job.status === 'paused') return 0; + + // Running → claim up to PER_JOB_CONCURRENCY pending items in + // parallel. We deliberately don't try to rescue 'extracting' items: + // if a worker died mid-fetch they stay 'extracting' forever for + // now. Future polish: time-out 'extracting' rows older than ~5min + // and bounce them back to 'pending'. + const claimable = items.filter((i) => i.state === 'pending').slice(0, PER_JOB_CONCURRENCY); + if (claimable.length === 0) return 0; + + await Promise.allSettled(claimable.map((it) => extractOneItem(it))); + return claimable.length; +} + +interface StateCounts { + saved: number; + duplicate: number; + error: number; + consentWall: number; + cancelled: number; + allTerminal: boolean; +} + +function countByState(items: readonly ImportItemRow[]): StateCounts { + let saved = 0; + let duplicate = 0; + let error = 0; + let consentWall = 0; + let cancelled = 0; + let nonTerminal = 0; + for (const it of items) { + switch (it.state) { + case 'saved': + saved++; + break; + case 'duplicate': + duplicate++; + break; + case 'error': + error++; + break; + case 'consent-wall': + saved++; // consent-wall is "saved with warning" semantically + consentWall++; + break; + case 'cancelled': + cancelled++; + break; + default: + nonTerminal++; + } + } + return { + saved, + duplicate, + error, + consentWall, + cancelled, + allTerminal: items.length > 0 && nonTerminal === 0, + }; +}