From 59373c0d57080b79b940eb806a8e82b21dd5874e Mon Sep 17 00:00:00 2001 From: Till JS Date: Wed, 29 Apr 2026 01:06:15 +0200 Subject: [PATCH] =?UTF-8?q?chore(articles):=20hygiene=20pass=20=E2=80=94?= =?UTF-8?q?=20shared-ai=20actor=20+=20lib/sync-db=20+=20metrics=20(#5,#7,#?= =?UTF-8?q?11)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #5 — SYSTEM_ARTICLES_IMPORT_WORKER hoisted into @mana/shared-ai The worker built its actor inline, bypassing the SystemSource union that's the blessed list for system-write principals. Now uses makeSystemActor(SYSTEM_ARTICLES_IMPORT_WORKER) like every other server-side system writer (mission-runner, projection, …). #7 — sync-db helper hoisted out of mcp/ into lib/ Implementation moved to apps/api/src/lib/sync-db.ts; mcp/sync-db.ts is a re-export shim so existing MCP imports keep working. Articles bulk-import + future modules import from lib/ directly — no more "articles depending on mcp" layering smell. #11 — Prometheus metrics for the worker New counters + histogram in lib/metrics.ts under mana_api_articles_import_*: - ticks_total{result=processed|skipped|error} - items_total{result=extracted|error|consent_wall|cancelled} - extract_duration_seconds (histogram, 0.25–30s buckets) - jobs_completed_total{result=done} - pickup_gc_rows_total Worker tick + extractor instrumented at the right transition points. Steady-state pickup_gc_rows_total > 0 over time signals a stuck consumer somewhere — useful operator alert. Plan: docs/plans/articles-bulk-import.md. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/lib/metrics.ts | 68 ++++++++++ apps/api/src/lib/sync-db.ts | 114 +++++++++++++++++ apps/api/src/mcp/sync-db.ts | 116 ++---------------- .../src/modules/articles/import-extractor.ts | 33 +++-- .../src/modules/articles/import-projection.ts | 4 +- .../api/src/modules/articles/import-worker.ts | 15 ++- packages/shared-ai/src/actor.ts | 16 ++- packages/shared-ai/src/index.ts | 1 + 8 files changed, 245 insertions(+), 122 deletions(-) create mode 100644 apps/api/src/lib/sync-db.ts diff --git a/apps/api/src/lib/metrics.ts b/apps/api/src/lib/metrics.ts index 140408979..79440aa3a 100644 --- a/apps/api/src/lib/metrics.ts +++ b/apps/api/src/lib/metrics.ts @@ -104,3 +104,71 @@ export const websitePublicReadAge = new Histogram({ buckets: [1, 10, 60, 300, 1800, 3600, 21600, 86400], registers: [register], }); + +// ── Articles bulk-import worker ───────────────────────── + +/** + * Every worker tick, regardless of outcome. `result`: + * - `processed` — lock acquired, jobs scanned + * - `skipped` — advisory lock taken by another instance + * - `error` — tick threw (logged + rethrown) + */ +export const articlesImportTicksTotal = new Counter({ + name: 'mana_api_articles_import_ticks_total', + help: 'Articles bulk-import worker tick outcomes.', + labelNames: ['result'] as const, + registers: [register], +}); + +/** + * Each per-item terminal-state transition the worker observes. + * `result`: + * - `extracted` — server fetch + Readability succeeded, pickup row written + * - `error` — 3 attempts exhausted, item parked as 'error' + * - `consent_wall` — extracted but flagged probable_consent_wall + * - `cancelled` — flipped from pending → cancelled because parent + * job was cancelled mid-flight + */ +export const articlesImportItemsTotal = new Counter({ + name: 'mana_api_articles_import_items_total', + help: 'Articles bulk-import items by terminal-from-worker state.', + labelNames: ['result'] as const, + registers: [register], +}); + +/** + * End-to-end latency of one extractFromUrl call (network fetch + + * JSDOM parse + Readability). Exclude consent-wall flagging — that's + * a synchronous post-process. Buckets cover anything from a snappy + * blog (250ms) to the shared-rss timeout ceiling (15s). + */ +export const articlesImportExtractDuration = new Histogram({ + name: 'mana_api_articles_import_extract_duration_seconds', + help: 'extractFromUrl roundtrip time inside the bulk-import worker.', + buckets: [0.25, 0.5, 1, 2, 4, 8, 15, 30], + registers: [register], +}); + +/** + * Job-completion counter. `result`: + * - `done` — every item terminal, status flipped to done + * - `cancelled` — user cancelled before completion + */ +export const articlesImportJobsCompletedTotal = new Counter({ + name: 'mana_api_articles_import_jobs_completed_total', + help: 'Articles bulk-import jobs by terminal status.', + labelNames: ['result'] as const, + registers: [register], +}); + +/** + * Pickup-row GC sweep — how many stale rows were hard-deleted on each + * 30-tick run. Steady-state should be 0 (consumer drains them within + * seconds); a non-zero value over time signals a stuck consumer + * somewhere (closed tabs, broken Web-Lock). + */ +export const articlesImportPickupGcRows = new Counter({ + name: 'mana_api_articles_import_pickup_gc_rows_total', + help: 'articleExtractPickup rows hard-deleted by the worker GC sweep.', + registers: [register], +}); diff --git a/apps/api/src/lib/sync-db.ts b/apps/api/src/lib/sync-db.ts new file mode 100644 index 000000000..597505c51 --- /dev/null +++ b/apps/api/src/lib/sync-db.ts @@ -0,0 +1,114 @@ +/** + * Connection helpers for the `mana_sync` database (the shared sync + * event log). Multiple modules read/write `sync_changes` directly: + * + * - apps/api MCP tools (apps/api/src/mcp/executor.ts) + * - apps/api articles bulk-import worker (modules/articles/import-*.ts) + * - apps/api forms module (modules/forms/public-routes.ts) + * + * Lives under lib/ rather than mcp/ so non-MCP consumers don't depend + * on the MCP module. Originally introduced for MCP and later + * generalised — the legacy import path via `mcp/sync-db.ts` re-exports + * from here for backwards compatibility. + * + * Connection uses SYNC_DATABASE_URL (same env var as services/mana-ai). + */ + +import postgres from 'postgres'; + +const SYNC_DATABASE_URL = + process.env.SYNC_DATABASE_URL ?? 'postgresql://mana:devpassword@localhost:5432/mana_sync'; + +let syncPool: ReturnType | null = null; + +/** Returns the shared sync database connection pool. */ +export function getSyncConnection() { + if (!syncPool) { + syncPool = postgres(SYNC_DATABASE_URL, { max: 5, idle_timeout: 30 }); + } + return syncPool; +} + +export type SyncSql = ReturnType; + +/** + * Run a callback within an RLS-scoped transaction for the given user. + * Sets `app.current_user_id` so the sync_changes RLS policy allows + * reads and writes only for that user's data. + */ +export async function withUser( + userId: string, + fn: (tx: postgres.TransactionSql>) => Promise +): Promise { + if (!userId) throw new Error('withUser: empty userId'); + const sql = getSyncConnection(); + return sql.begin(async (tx) => { + await tx`SELECT set_config('app.current_user_id', ${userId}, true)`; + return fn(tx); + }) as Promise; +} + +/** + * Read the latest version of all records for a user + app + table from + * sync_changes. Applies field-level LWW to reconstruct current state. + * + * This is the server-side equivalent of the Dexie liveQuery: it replays + * the sync_changes log to build the latest record versions. For small + * datasets this is fine; for large tables we'll need materialized + * snapshots (like mana-ai's mission_snapshots). + */ +export async function readLatestRecords( + userId: string, + appId: string, + tableName: string +): Promise[]> { + const sql = getSyncConnection(); + // Get the latest change per record_id (by created_at desc), then + // reconstruct the record. Only include non-deleted records. + const rows = await sql<{ record_id: string; data: Record; op: string }[]>` + SELECT DISTINCT ON (record_id) + record_id, data, op + FROM sync_changes + WHERE user_id = ${userId} + AND app_id = ${appId} + AND table_name = ${tableName} + ORDER BY record_id, created_at DESC + `; + + // Filter out deleted records and records with delete ops + return rows + .filter((r) => r.op !== 'delete' && r.data && !(r.data as Record).deletedAt) + .map((r) => r.data); +} + +/** + * Write a new record via sync_changes INSERT. The record will appear + * on the user's devices on their next sync cycle. + * + * MCP-Tool calls always carry `origin='agent'` because the pipeline + * that produced the value is an AI agent invoking a tool — the + * actor's `kind` may be `system` (the MCP server itself) but the + * write semantics are agent-driven for conflict-detection purposes. + */ +export async function writeRecord( + userId: string, + appId: string, + tableName: string, + recordId: string, + op: 'insert' | 'update' | 'delete', + data: Record, + fieldMeta: Record +): Promise { + await withUser(userId, async (tx) => { + await tx` + INSERT INTO sync_changes + (app_id, table_name, record_id, user_id, op, data, field_meta, client_id, schema_version, actor, origin) + VALUES + (${appId}, ${tableName}, ${recordId}, ${userId}, ${op}, + ${tx.json(data as never)}, ${tx.json(fieldMeta as never)}, + 'mcp-server', 1, + ${tx.json({ kind: 'system', principalId: 'system:mcp', displayName: 'MCP Server' } as never)}, + 'agent') + `; + }); +} diff --git a/apps/api/src/mcp/sync-db.ts b/apps/api/src/mcp/sync-db.ts index 41be50210..e4596f270 100644 --- a/apps/api/src/mcp/sync-db.ts +++ b/apps/api/src/mcp/sync-db.ts @@ -1,108 +1,14 @@ /** - * Sync database connection for MCP tool execution. - * - * MCP tools read and write via the mana_sync database (the shared sync - * event log), not mana_platform. This matches the local-first pattern: - * writes go to sync_changes, clients pick them up on next sync. - * - * The connection uses the same env var as mana-ai: SYNC_DATABASE_URL. + * Backwards-compatible re-export shim. The actual implementation lives + * at `apps/api/src/lib/sync-db.ts`; this file just keeps existing + * MCP-side import paths working while non-MCP consumers (articles + * bulk-import worker, forms module, …) import from `lib/` directly. */ -import postgres from 'postgres'; - -const SYNC_DATABASE_URL = - process.env.SYNC_DATABASE_URL ?? 'postgresql://mana:devpassword@localhost:5432/mana_sync'; - -let syncPool: ReturnType | null = null; - -/** Returns the shared sync database connection pool. */ -export function getSyncConnection() { - if (!syncPool) { - syncPool = postgres(SYNC_DATABASE_URL, { max: 5, idle_timeout: 30 }); - } - return syncPool; -} - -export type SyncSql = ReturnType; - -/** - * Run a callback within an RLS-scoped transaction for the given user. - * Sets `app.current_user_id` so the sync_changes RLS policy allows - * reads and writes only for that user's data. - */ -export async function withUser( - userId: string, - fn: (tx: postgres.TransactionSql>) => Promise -): Promise { - if (!userId) throw new Error('withUser: empty userId'); - const sql = getSyncConnection(); - return sql.begin(async (tx) => { - await tx`SELECT set_config('app.current_user_id', ${userId}, true)`; - return fn(tx); - }) as Promise; -} - -/** - * Read the latest version of all records for a user + app + table from - * sync_changes. Applies field-level LWW to reconstruct current state. - * - * This is the server-side equivalent of the Dexie liveQuery: it replays - * the sync_changes log to build the latest record versions. For small - * datasets this is fine; for large tables we'll need materialized - * snapshots (like mana-ai's mission_snapshots). - */ -export async function readLatestRecords( - userId: string, - appId: string, - tableName: string -): Promise[]> { - const sql = getSyncConnection(); - // Get the latest change per record_id (by created_at desc), then - // reconstruct the record. Only include non-deleted records. - const rows = await sql<{ record_id: string; data: Record; op: string }[]>` - SELECT DISTINCT ON (record_id) - record_id, data, op - FROM sync_changes - WHERE user_id = ${userId} - AND app_id = ${appId} - AND table_name = ${tableName} - ORDER BY record_id, created_at DESC - `; - - // Filter out deleted records and records with delete ops - return rows - .filter((r) => r.op !== 'delete' && r.data && !(r.data as Record).deletedAt) - .map((r) => r.data); -} - -/** - * Write a new record via sync_changes INSERT. The record will appear - * on the user's devices on their next sync cycle. - * - * MCP-Tool calls always carry `origin='agent'` because the pipeline - * that produced the value is an AI agent invoking a tool — the - * actor's `kind` may be `system` (the MCP server itself) but the - * write semantics are agent-driven for conflict-detection purposes. - */ -export async function writeRecord( - userId: string, - appId: string, - tableName: string, - recordId: string, - op: 'insert' | 'update' | 'delete', - data: Record, - fieldMeta: Record -): Promise { - await withUser(userId, async (tx) => { - await tx` - INSERT INTO sync_changes - (app_id, table_name, record_id, user_id, op, data, field_meta, client_id, schema_version, actor, origin) - VALUES - (${appId}, ${tableName}, ${recordId}, ${userId}, ${op}, - ${tx.json(data as never)}, ${tx.json(fieldMeta as never)}, - 'mcp-server', 1, - ${tx.json({ kind: 'system', principalId: 'system:mcp', displayName: 'MCP Server' } as never)}, - 'agent') - `; - }); -} +export { + getSyncConnection, + withUser, + readLatestRecords, + writeRecord, + type SyncSql, +} from '../lib/sync-db'; diff --git a/apps/api/src/modules/articles/import-extractor.ts b/apps/api/src/modules/articles/import-extractor.ts index 4e21f6cb5..f047a11b9 100644 --- a/apps/api/src/modules/articles/import-extractor.ts +++ b/apps/api/src/modules/articles/import-extractor.ts @@ -25,24 +25,27 @@ */ import { extractFromUrl } from '@mana/shared-rss'; -import { makeFieldMeta, type Actor, type FieldOrigin } from '@mana/shared-ai'; -import { getSyncConnection } from '../../mcp/sync-db'; +import { + makeFieldMeta, + makeSystemActor, + originFromActor, + SYSTEM_ARTICLES_IMPORT_WORKER, + type Actor, + type FieldOrigin, +} from '@mana/shared-ai'; +import { getSyncConnection } from '../../lib/sync-db'; +import { articlesImportExtractDuration, articlesImportItemsTotal } from '../../lib/metrics'; import { looksLikeConsentWall } from './consent-wall'; import type { ImportItemRow } from './import-projection'; const MAX_ATTEMPTS = 3; const CLIENT_ID = 'articles-import-worker'; -/** System-actor blob stamped on every worker write. Built inline because - * the underlying SystemSource union in @mana/shared-ai isn't extended - * here — both fields are runtime values, not type discriminators, so - * this composes cleanly without a shared-ai change. */ -const WORKER_ACTOR: Actor = Object.freeze({ - kind: 'system' as const, - principalId: 'system:articles-import-worker', - displayName: 'Artikel-Import', -}); -const WORKER_ORIGIN: FieldOrigin = 'system'; +/** System-actor blob stamped on every worker write — sourced from the + * blessed SystemSource union in @mana/shared-ai so the actor.ts audit + * + Workbench filters know about it. */ +const WORKER_ACTOR: Actor = makeSystemActor(SYSTEM_ARTICLES_IMPORT_WORKER); +const WORKER_ORIGIN: FieldOrigin = originFromActor(WORKER_ACTOR); export interface ExtractStats { itemId: string; @@ -73,7 +76,9 @@ export async function extractOneItem(item: ImportItemRow): Promise // Step 2 — fetch + parse. Hard-failure path returns null; we treat // that as a single failed attempt and recycle. + const extractStart = Date.now(); const extracted = await extractFromUrl(item.url); + articlesImportExtractDuration.observe((Date.now() - extractStart) / 1000); const nowDone = new Date().toISOString(); if (!extracted) { @@ -84,6 +89,9 @@ export async function extractOneItem(item: ImportItemRow): Promise error: nextState === 'error' ? 'Extraktion fehlgeschlagen nach mehreren Versuchen.' : null, lastAttemptAt: nowDone, }); + if (nextState === 'error') { + articlesImportItemsTotal.inc({ result: 'error' }); + } return { itemId: item.id, terminal: nextState === 'error' ? 'error' : 'pending' }; } @@ -122,6 +130,7 @@ export async function extractOneItem(item: ImportItemRow): Promise lastAttemptAt: nowDone, }); + articlesImportItemsTotal.inc({ result: warning ? 'consent_wall' : 'extracted' }); return { itemId: item.id, terminal: 'extracted' }; } diff --git a/apps/api/src/modules/articles/import-projection.ts b/apps/api/src/modules/articles/import-projection.ts index 5233818c1..e832f83b0 100644 --- a/apps/api/src/modules/articles/import-projection.ts +++ b/apps/api/src/modules/articles/import-projection.ts @@ -2,7 +2,7 @@ * Articles Bulk-Import — sync_changes → live record projection. * * Mirror of `services/mana-ai/src/db/missions-projection.ts` and - * `apps/api/src/mcp/sync-db.ts:readLatestRecords()`, specialised for the + * `apps/api/src/lib/sync-db.ts:readLatestRecords()`, specialised for the * two tables the import-worker tick reads each cycle: * * articleImportJobs — to find running jobs whose lease is free @@ -18,7 +18,7 @@ * Plan: docs/plans/articles-bulk-import.md. */ -import { getSyncConnection } from '../../mcp/sync-db'; +import { getSyncConnection } from '../../lib/sync-db'; import { fieldMetaTime } from './field-meta'; type Row = Record; diff --git a/apps/api/src/modules/articles/import-worker.ts b/apps/api/src/modules/articles/import-worker.ts index 91bed4695..828b64528 100644 --- a/apps/api/src/modules/articles/import-worker.ts +++ b/apps/api/src/modules/articles/import-worker.ts @@ -22,7 +22,12 @@ * Plan: docs/plans/articles-bulk-import.md. */ -import { getSyncConnection } from '../../mcp/sync-db'; +import { getSyncConnection } from '../../lib/sync-db'; +import { + articlesImportJobsCompletedTotal, + articlesImportPickupGcRows, + articlesImportTicksTotal, +} from '../../lib/metrics'; import { listClaimableJobs, listItemsForJob, @@ -94,8 +99,13 @@ async function runTickGuarded(): Promise { if (running) return; running = true; try { - await runTickOnce(); + const result = await runTickOnce(); + articlesImportTicksTotal.inc({ result: result.skipped ? 'skipped' : 'processed' }); + if (typeof result.pickupGcRows === 'number' && result.pickupGcRows > 0) { + articlesImportPickupGcRows.inc(result.pickupGcRows); + } } catch (err) { + articlesImportTicksTotal.inc({ result: 'error' }); console.error('[articles-import] tick error:', err); } finally { running = false; @@ -229,6 +239,7 @@ async function processOneJob(job: ImportJobRow): Promise { counterPatch.status = 'done'; counterPatch.finishedAt = new Date().toISOString(); dirty = true; + articlesImportJobsCompletedTotal.inc({ result: 'done' }); } if (dirty) { await writeJobUpdate(job.userId, job.id, counterPatch); diff --git a/packages/shared-ai/src/actor.ts b/packages/shared-ai/src/actor.ts index 95fe6bc1c..d87b658f6 100644 --- a/packages/shared-ai/src/actor.ts +++ b/packages/shared-ai/src/actor.ts @@ -42,6 +42,17 @@ export const SYSTEM_MISSION_RUNNER = 'system:mission-runner'; * it from the user-write codepath. */ export const SYSTEM_BOOTSTRAP = 'system:bootstrap'; +/** + * Server-side bulk-article-import worker (apps/api). Picks up + * articleImportItems with state='pending', runs Readability, drops + * the extracted payload into articleExtractPickup, flips item state. + * Every state-transition write the worker does is attributed to this + * principal so the Workbench timeline + revert path can group + * background-import writes under one identity. + * + * Plan: docs/plans/articles-bulk-import.md. + */ +export const SYSTEM_ARTICLES_IMPORT_WORKER = 'system:articles-import-worker'; export type SystemSource = | typeof SYSTEM_PROJECTION @@ -49,7 +60,8 @@ export type SystemSource = | typeof SYSTEM_MIGRATION | typeof SYSTEM_STREAM | typeof SYSTEM_MISSION_RUNNER - | typeof SYSTEM_BOOTSTRAP; + | typeof SYSTEM_BOOTSTRAP + | typeof SYSTEM_ARTICLES_IMPORT_WORKER; /** Legacy sentinels for records that pre-date the identity-aware actor * shape. Read-path normalization maps missing fields to these. */ @@ -153,6 +165,8 @@ function defaultSystemDisplayName(source: SystemSource): string { return 'Mission-Runner'; case SYSTEM_BOOTSTRAP: return 'Bootstrap'; + case SYSTEM_ARTICLES_IMPORT_WORKER: + return 'Artikel-Import'; } } diff --git a/packages/shared-ai/src/index.ts b/packages/shared-ai/src/index.ts index bae608b9c..1a9159514 100644 --- a/packages/shared-ai/src/index.ts +++ b/packages/shared-ai/src/index.ts @@ -23,6 +23,7 @@ export { SYSTEM_STREAM, SYSTEM_MISSION_RUNNER, SYSTEM_BOOTSTRAP, + SYSTEM_ARTICLES_IMPORT_WORKER, LEGACY_USER_PRINCIPAL, LEGACY_AI_PRINCIPAL, LEGACY_SYSTEM_PRINCIPAL,