mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-15 16:19:39 +02:00
feat(articles): server-side bulk-import worker (Phase 2)
apps/api/src/modules/articles/:
- import-projection.ts: sync_changes → live LWW projection of jobs
+ items. Cross-user scan for claimable jobs, per-job item scan.
- import-extractor.ts: per-item state-machine. Claim → fetch → write
pickup + flip extracted, OR retry up to 3x then 'error'. All writes
attributed to system:articles-import-worker actor (built inline so
no shared-ai SystemSource extension needed for now).
- import-worker.ts: 2s tick, pg_try_advisory_xact_lock keyed on 'ARTI'
so multi-instance apps/api never double-processes. Concurrency 3
pending items per job per tick. Job-counter rollups + status flips
derived from current item states.
- apps/api/src/index.ts: start the worker at boot.
Pipeline (server side):
Client write articleImportItems(state='pending')
→ sync push → mana_sync.sync_changes
→ server-worker tick projects 'pending' items
→ extractFromUrl (shared-rss / Readability)
→ write articleExtractPickup row + flip item → 'extracted'
Phase 3 (client-side pickup consumer) and Phase 4+ (store + UI) follow.
Plan: docs/plans/articles-bulk-import.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
fc49198992
commit
5535f2da48
4 changed files with 723 additions and 0 deletions
|
|
@ -41,6 +41,7 @@ import { moodlitRoutes } from './modules/moodlit/routes';
|
||||||
import { newsRoutes } from './modules/news/routes';
|
import { newsRoutes } from './modules/news/routes';
|
||||||
import { newsResearchRoutes } from './modules/news-research/routes';
|
import { newsResearchRoutes } from './modules/news-research/routes';
|
||||||
import { articlesRoutes } from './modules/articles/routes';
|
import { articlesRoutes } from './modules/articles/routes';
|
||||||
|
import { startArticleImportWorker } from './modules/articles/import-worker';
|
||||||
import { tracesRoutes } from './modules/traces/routes';
|
import { tracesRoutes } from './modules/traces/routes';
|
||||||
import { writingRoutes } from './modules/writing/routes';
|
import { writingRoutes } from './modules/writing/routes';
|
||||||
import { comicRoutes } from './modules/comic/routes';
|
import { comicRoutes } from './modules/comic/routes';
|
||||||
|
|
@ -142,6 +143,12 @@ app.route('/api/v1/who', whoRoutes);
|
||||||
app.route('/api/v1/writing', writingRoutes);
|
app.route('/api/v1/writing', writingRoutes);
|
||||||
app.route('/api/v1/comic', comicRoutes);
|
app.route('/api/v1/comic', comicRoutes);
|
||||||
|
|
||||||
|
// ─── Background Workers ─────────────────────────────────────
|
||||||
|
// Articles bulk-import: ticks every 2s, advisory-lock-gated so multiple
|
||||||
|
// apps/api replicas never double-process. See
|
||||||
|
// docs/plans/articles-bulk-import.md.
|
||||||
|
startArticleImportWorker();
|
||||||
|
|
||||||
// ─── Server Info ────────────────────────────────────────────
|
// ─── Server Info ────────────────────────────────────────────
|
||||||
console.log(`mana-api starting on port ${PORT}...`);
|
console.log(`mana-api starting on port ${PORT}...`);
|
||||||
|
|
||||||
|
|
|
||||||
237
apps/api/src/modules/articles/import-extractor.ts
Normal file
237
apps/api/src/modules/articles/import-extractor.ts
Normal file
|
|
@ -0,0 +1,237 @@
|
||||||
|
/**
|
||||||
|
* Articles Bulk-Import — per-item extraction + write-back.
|
||||||
|
*
|
||||||
|
* For one `articleImportItems` row in state='pending':
|
||||||
|
*
|
||||||
|
* 1. Flip to state='extracting' (so other ticks / the UI see progress).
|
||||||
|
* 2. Run `extractFromUrl` against the URL.
|
||||||
|
* 3a. On success → write a `articleExtractPickup` row carrying the
|
||||||
|
* full ExtractedArticle payload + flip the item to 'extracted'.
|
||||||
|
* The client-side pickup-consumer picks it up, encrypts the
|
||||||
|
* article into the user's IndexedDB, and flips the item to 'saved'
|
||||||
|
* (or 'consent-wall' if the warning fired).
|
||||||
|
* 3b. On failure → bump `attempts`, flip back to 'pending' if
|
||||||
|
* attempts < 3, else flip to state='error' with the technical
|
||||||
|
* error message.
|
||||||
|
*
|
||||||
|
* Every state-change is one `sync_changes` row attributed to the
|
||||||
|
* `system:articles-import-worker` actor (built inline below — kept out
|
||||||
|
* of the shared-ai SystemSource union for now to keep the worker self-
|
||||||
|
* contained; can be hoisted later). Origin is `'system'` so the
|
||||||
|
* conflict-detection gate on the client doesn't surface these as
|
||||||
|
* user-visible toasts.
|
||||||
|
*
|
||||||
|
* Plan: docs/plans/articles-bulk-import.md.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { extractFromUrl } from '@mana/shared-rss';
|
||||||
|
import { makeFieldMeta, type Actor, type FieldOrigin } from '@mana/shared-ai';
|
||||||
|
import { getSyncConnection } from '../../mcp/sync-db';
|
||||||
|
import type { ImportItemRow } from './import-projection';
|
||||||
|
|
||||||
|
const MAX_ATTEMPTS = 3;
|
||||||
|
const CLIENT_ID = 'articles-import-worker';
|
||||||
|
|
||||||
|
/** System-actor blob stamped on every worker write. Built inline because
|
||||||
|
* the underlying SystemSource union in @mana/shared-ai isn't extended
|
||||||
|
* here — both fields are runtime values, not type discriminators, so
|
||||||
|
* this composes cleanly without a shared-ai change. */
|
||||||
|
const WORKER_ACTOR: Actor = Object.freeze({
|
||||||
|
kind: 'system' as const,
|
||||||
|
principalId: 'system:articles-import-worker',
|
||||||
|
displayName: 'Artikel-Import',
|
||||||
|
});
|
||||||
|
const WORKER_ORIGIN: FieldOrigin = 'system';
|
||||||
|
|
||||||
|
export interface ExtractStats {
|
||||||
|
itemId: string;
|
||||||
|
terminal: 'pending' | 'extracted' | 'error';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run one extraction round-trip for a single item. Idempotent at the
|
||||||
|
* sync_changes level — if two ticks race the same item the field-LWW
|
||||||
|
* merge yields a single coherent state on the client.
|
||||||
|
*/
|
||||||
|
export async function extractOneItem(item: ImportItemRow): Promise<ExtractStats> {
|
||||||
|
if (item.state !== 'pending') {
|
||||||
|
return {
|
||||||
|
itemId: item.id,
|
||||||
|
terminal: item.state === 'error' ? 'error' : 'extracted',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 1 — claim. Flip the item to 'extracting' before the slow
|
||||||
|
// fetch so concurrent ticks (and the UI) see we own it.
|
||||||
|
const nowClaim = new Date().toISOString();
|
||||||
|
await writeItemUpdate(item.userId, item.id, {
|
||||||
|
state: 'extracting',
|
||||||
|
lastAttemptAt: nowClaim,
|
||||||
|
attempts: item.attempts + 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Step 2 — fetch + parse. Hard-failure path returns null; we treat
|
||||||
|
// that as a single failed attempt and recycle.
|
||||||
|
const extracted = await extractFromUrl(item.url);
|
||||||
|
const nowDone = new Date().toISOString();
|
||||||
|
|
||||||
|
if (!extracted) {
|
||||||
|
const nextAttempts = item.attempts + 1;
|
||||||
|
const nextState = nextAttempts >= MAX_ATTEMPTS ? 'error' : 'pending';
|
||||||
|
await writeItemUpdate(item.userId, item.id, {
|
||||||
|
state: nextState,
|
||||||
|
error: nextState === 'error' ? 'Extraktion fehlgeschlagen nach mehreren Versuchen.' : null,
|
||||||
|
lastAttemptAt: nowDone,
|
||||||
|
});
|
||||||
|
return { itemId: item.id, terminal: nextState === 'error' ? 'error' : 'pending' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3 — write the Pickup row (server payload for the client) and
|
||||||
|
// flip item state to 'extracted' so the consume-pickup path picks it
|
||||||
|
// up. Pickup row first so a client liveQuery seeing the 'extracted'
|
||||||
|
// state can immediately find the matching pickup payload.
|
||||||
|
const pickupId = `pickup-${item.id}`;
|
||||||
|
const wordCount = extracted.wordCount ?? 0;
|
||||||
|
const readingTimeMinutes = extracted.readingTimeMinutes ?? 0;
|
||||||
|
const warning = looksLikeConsentWall(extracted.content, wordCount)
|
||||||
|
? 'probable_consent_wall'
|
||||||
|
: null;
|
||||||
|
|
||||||
|
await writePickupInsert(item.userId, pickupId, {
|
||||||
|
itemId: item.id,
|
||||||
|
spaceId: item.spaceId ?? null,
|
||||||
|
payload: {
|
||||||
|
originalUrl: item.url,
|
||||||
|
title: extracted.title ?? '',
|
||||||
|
excerpt: extracted.excerpt ?? null,
|
||||||
|
content: extracted.content,
|
||||||
|
htmlContent: extracted.htmlContent ?? '',
|
||||||
|
author: extracted.byline ?? null,
|
||||||
|
siteName: extracted.siteName ?? null,
|
||||||
|
wordCount,
|
||||||
|
readingTimeMinutes,
|
||||||
|
...(warning && { warning }),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await writeItemUpdate(item.userId, item.id, {
|
||||||
|
state: 'extracted',
|
||||||
|
warning,
|
||||||
|
error: null,
|
||||||
|
lastAttemptAt: nowDone,
|
||||||
|
});
|
||||||
|
|
||||||
|
return { itemId: item.id, terminal: 'extracted' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Sync-changes write helpers (worker-attributed) ──────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker-attributed update on an `articleImportItems` row. Exported so
|
||||||
|
* the worker tick can flip pending items to 'cancelled' when the parent
|
||||||
|
* job is cancelled, without going through the extraction pipeline.
|
||||||
|
*/
|
||||||
|
export async function writeItemUpdate(
|
||||||
|
userId: string,
|
||||||
|
itemId: string,
|
||||||
|
patch: Record<string, unknown>
|
||||||
|
): Promise<void> {
|
||||||
|
await insertSyncChange({
|
||||||
|
userId,
|
||||||
|
recordId: itemId,
|
||||||
|
appId: 'articles',
|
||||||
|
tableName: 'articleImportItems',
|
||||||
|
op: 'update',
|
||||||
|
data: patch,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function writePickupInsert(
|
||||||
|
userId: string,
|
||||||
|
pickupId: string,
|
||||||
|
data: Record<string, unknown>
|
||||||
|
): Promise<void> {
|
||||||
|
await insertSyncChange({
|
||||||
|
userId,
|
||||||
|
recordId: pickupId,
|
||||||
|
appId: 'articles',
|
||||||
|
tableName: 'articleExtractPickup',
|
||||||
|
op: 'insert',
|
||||||
|
data,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker-attributed update on an `articleImportJobs` row. Counter-only
|
||||||
|
* for now (savedCount, errorCount, …) plus status flips like
|
||||||
|
* 'queued' → 'running' and 'running' → 'done'.
|
||||||
|
*/
|
||||||
|
export async function writeJobUpdate(
|
||||||
|
userId: string,
|
||||||
|
jobId: string,
|
||||||
|
patch: Record<string, unknown>
|
||||||
|
): Promise<void> {
|
||||||
|
await insertSyncChange({
|
||||||
|
userId,
|
||||||
|
recordId: jobId,
|
||||||
|
appId: 'articles',
|
||||||
|
tableName: 'articleImportJobs',
|
||||||
|
op: 'update',
|
||||||
|
data: patch,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
interface InsertParams {
|
||||||
|
userId: string;
|
||||||
|
recordId: string;
|
||||||
|
appId: string;
|
||||||
|
tableName: string;
|
||||||
|
op: 'insert' | 'update' | 'delete';
|
||||||
|
data: Record<string, unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function insertSyncChange(params: InsertParams): Promise<void> {
|
||||||
|
const sql = getSyncConnection();
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
const fieldMeta: Record<string, unknown> = {};
|
||||||
|
for (const key of Object.keys(params.data)) {
|
||||||
|
fieldMeta[key] = makeFieldMeta(now, WORKER_ACTOR, WORKER_ORIGIN);
|
||||||
|
}
|
||||||
|
const actorJson = WORKER_ACTOR as unknown;
|
||||||
|
const dataJson = params.data as unknown;
|
||||||
|
const fmJson = fieldMeta as unknown;
|
||||||
|
await sql.begin(async (tx) => {
|
||||||
|
await tx`SELECT set_config('app.current_user_id', ${params.userId}, true)`;
|
||||||
|
await tx`
|
||||||
|
INSERT INTO sync_changes
|
||||||
|
(app_id, table_name, record_id, user_id, op, data, field_meta, client_id, schema_version, actor, origin)
|
||||||
|
VALUES
|
||||||
|
(${params.appId}, ${params.tableName}, ${params.recordId}, ${params.userId}, ${params.op},
|
||||||
|
${tx.json(dataJson as never)}, ${tx.json(fmJson as never)},
|
||||||
|
${CLIENT_ID}, 1, ${tx.json(actorJson as never)}, ${WORKER_ORIGIN})
|
||||||
|
`;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Consent-wall heuristic (mirror of routes.ts) ────────────
|
||||||
|
|
||||||
|
const CONSENT_KEYWORDS = [
|
||||||
|
'cookies zustimmen',
|
||||||
|
'cookie consent',
|
||||||
|
'zustimmung',
|
||||||
|
'accept all cookies',
|
||||||
|
'consent to the use',
|
||||||
|
'enable javascript',
|
||||||
|
'javascript is disabled',
|
||||||
|
'please enable',
|
||||||
|
'privacy center',
|
||||||
|
'datenschutzeinstellungen',
|
||||||
|
'datenschutzeinstellungen',
|
||||||
|
];
|
||||||
|
const CONSENT_WORDCOUNT_THRESHOLD = 300;
|
||||||
|
|
||||||
|
function looksLikeConsentWall(content: string, wordCount: number): boolean {
|
||||||
|
if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false;
|
||||||
|
const haystack = content.toLowerCase();
|
||||||
|
return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle));
|
||||||
|
}
|
||||||
243
apps/api/src/modules/articles/import-projection.ts
Normal file
243
apps/api/src/modules/articles/import-projection.ts
Normal file
|
|
@ -0,0 +1,243 @@
|
||||||
|
/**
|
||||||
|
* Articles Bulk-Import — sync_changes → live record projection.
|
||||||
|
*
|
||||||
|
* Mirror of `services/mana-ai/src/db/missions-projection.ts` and
|
||||||
|
* `apps/api/src/mcp/sync-db.ts:readLatestRecords()`, specialised for the
|
||||||
|
* two tables the import-worker tick reads each cycle:
|
||||||
|
*
|
||||||
|
* articleImportJobs — to find running jobs whose lease is free
|
||||||
|
* articleImportItems — to find pending items inside those jobs
|
||||||
|
*
|
||||||
|
* No materialized snapshots yet — this is the simple "replay every row
|
||||||
|
* for these tables" path. The total volume is small (a few hundred rows
|
||||||
|
* per active job, all import history per user) and the worker tick is
|
||||||
|
* the only consumer. If the table grows we can plug in the same
|
||||||
|
* `mission_snapshots` pattern mana-ai uses; the projection API stays
|
||||||
|
* the same.
|
||||||
|
*
|
||||||
|
* Plan: docs/plans/articles-bulk-import.md.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { getSyncConnection } from '../../mcp/sync-db';
|
||||||
|
|
||||||
|
type Row = Record<string, unknown>;
|
||||||
|
interface ChangeRow {
|
||||||
|
user_id: string;
|
||||||
|
record_id: string;
|
||||||
|
op: string;
|
||||||
|
data: Row | null;
|
||||||
|
field_meta: Record<string, string> | null;
|
||||||
|
created_at: Date;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ImportJobRow {
|
||||||
|
id: string;
|
||||||
|
userId: string;
|
||||||
|
spaceId: string | null;
|
||||||
|
totalUrls: number;
|
||||||
|
status: 'queued' | 'running' | 'paused' | 'done' | 'cancelled';
|
||||||
|
leasedBy: string | null;
|
||||||
|
leasedUntil: string | null;
|
||||||
|
startedAt: string | null;
|
||||||
|
finishedAt: string | null;
|
||||||
|
savedCount: number;
|
||||||
|
duplicateCount: number;
|
||||||
|
errorCount: number;
|
||||||
|
warningCount: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type ImportItemState =
|
||||||
|
| 'pending'
|
||||||
|
| 'extracting'
|
||||||
|
| 'extracted'
|
||||||
|
| 'saved'
|
||||||
|
| 'duplicate'
|
||||||
|
| 'consent-wall'
|
||||||
|
| 'error'
|
||||||
|
| 'cancelled';
|
||||||
|
|
||||||
|
export interface ImportItemRow {
|
||||||
|
id: string;
|
||||||
|
userId: string;
|
||||||
|
spaceId: string | null;
|
||||||
|
jobId: string;
|
||||||
|
idx: number;
|
||||||
|
url: string;
|
||||||
|
state: ImportItemState;
|
||||||
|
articleId: string | null;
|
||||||
|
warning: 'probable_consent_wall' | null;
|
||||||
|
error: string | null;
|
||||||
|
attempts: number;
|
||||||
|
lastAttemptAt: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cross-user scan: which jobs need attention this tick. RLS is
|
||||||
|
* intentionally NOT applied — the worker is a privileged consumer that
|
||||||
|
* needs to see all users' running jobs in one pass. Per-user RLS
|
||||||
|
* scoping is applied on the write-back path in import-extractor.ts.
|
||||||
|
*/
|
||||||
|
export async function listClaimableJobs(): Promise<ImportJobRow[]> {
|
||||||
|
const sql = getSyncConnection();
|
||||||
|
const rows = await sql<ChangeRow[]>`
|
||||||
|
SELECT user_id, record_id, op, data, field_meta, created_at
|
||||||
|
FROM sync_changes
|
||||||
|
WHERE app_id = 'articles' AND table_name = 'articleImportJobs'
|
||||||
|
ORDER BY user_id, record_id, created_at ASC
|
||||||
|
`;
|
||||||
|
const out: ImportJobRow[] = [];
|
||||||
|
for (const m of mergeByUserAndRecord(rows).values()) {
|
||||||
|
const job = projectJob(m.userId, m.recordId, m.merged);
|
||||||
|
if (!job) continue;
|
||||||
|
if (job.status !== 'running' && job.status !== 'queued') continue;
|
||||||
|
out.push(job);
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Per-job item scan. Returns ALL items so the worker can compute
|
||||||
|
* job-completion + counter deltas in one pass.
|
||||||
|
*/
|
||||||
|
export async function listItemsForJob(userId: string, jobId: string): Promise<ImportItemRow[]> {
|
||||||
|
const sql = getSyncConnection();
|
||||||
|
const rows = await sql<ChangeRow[]>`
|
||||||
|
SELECT user_id, record_id, op, data, field_meta, created_at
|
||||||
|
FROM sync_changes
|
||||||
|
WHERE app_id = 'articles'
|
||||||
|
AND table_name = 'articleImportItems'
|
||||||
|
AND user_id = ${userId}
|
||||||
|
ORDER BY record_id, created_at ASC
|
||||||
|
`;
|
||||||
|
const out: ImportItemRow[] = [];
|
||||||
|
for (const m of mergeByUserAndRecord(rows).values()) {
|
||||||
|
const item = projectItem(m.userId, m.recordId, m.merged);
|
||||||
|
if (!item || item.jobId !== jobId) continue;
|
||||||
|
out.push(item);
|
||||||
|
}
|
||||||
|
out.sort((a, b) => a.idx - b.idx);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Internal: LWW merge per (userId, recordId) ──────────────
|
||||||
|
|
||||||
|
interface MergedEntry {
|
||||||
|
userId: string;
|
||||||
|
recordId: string;
|
||||||
|
merged: Row | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function mergeByUserAndRecord(rows: readonly ChangeRow[]): Map<string, MergedEntry> {
|
||||||
|
const out = new Map<string, MergedEntry>();
|
||||||
|
type Cur = {
|
||||||
|
key: string;
|
||||||
|
userId: string;
|
||||||
|
recordId: string;
|
||||||
|
record: Row | null;
|
||||||
|
fm: Record<string, string>;
|
||||||
|
};
|
||||||
|
let current: Cur | null = null;
|
||||||
|
const flush = (c: Cur) => {
|
||||||
|
out.set(c.key, { userId: c.userId, recordId: c.recordId, merged: c.record });
|
||||||
|
};
|
||||||
|
for (const r of rows) {
|
||||||
|
const key = `${r.user_id}:${r.record_id}`;
|
||||||
|
if (!current || current.key !== key) {
|
||||||
|
if (current) flush(current);
|
||||||
|
current = { key, userId: r.user_id, recordId: r.record_id, record: null, fm: {} };
|
||||||
|
}
|
||||||
|
if (r.op === 'delete') {
|
||||||
|
current.record = null;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!r.data) continue;
|
||||||
|
if (!current.record) {
|
||||||
|
current.record = { id: r.record_id, ...r.data };
|
||||||
|
current.fm = { ...(r.field_meta ?? {}) };
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const rowFM = r.field_meta ?? {};
|
||||||
|
for (const [k, v] of Object.entries(r.data)) {
|
||||||
|
const serverTime = rowFM[k] ?? r.created_at.toISOString();
|
||||||
|
const localTime = current.fm[k] ?? '';
|
||||||
|
if (serverTime >= localTime) {
|
||||||
|
current.record[k] = v;
|
||||||
|
current.fm[k] = serverTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (current) flush(current);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
function projectJob(userId: string, recordId: string, merged: Row | null): ImportJobRow | null {
|
||||||
|
if (!merged || merged.deletedAt) return null;
|
||||||
|
const totalUrls = num(merged.totalUrls);
|
||||||
|
const status = str(merged.status);
|
||||||
|
if (totalUrls == null || !isJobStatus(status)) return null;
|
||||||
|
return {
|
||||||
|
id: recordId,
|
||||||
|
userId,
|
||||||
|
spaceId: optStr(merged.spaceId),
|
||||||
|
totalUrls,
|
||||||
|
status,
|
||||||
|
leasedBy: optStr(merged.leasedBy),
|
||||||
|
leasedUntil: optStr(merged.leasedUntil),
|
||||||
|
startedAt: optStr(merged.startedAt),
|
||||||
|
finishedAt: optStr(merged.finishedAt),
|
||||||
|
savedCount: num(merged.savedCount) ?? 0,
|
||||||
|
duplicateCount: num(merged.duplicateCount) ?? 0,
|
||||||
|
errorCount: num(merged.errorCount) ?? 0,
|
||||||
|
warningCount: num(merged.warningCount) ?? 0,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function projectItem(userId: string, recordId: string, merged: Row | null): ImportItemRow | null {
|
||||||
|
if (!merged || merged.deletedAt) return null;
|
||||||
|
const jobId = str(merged.jobId);
|
||||||
|
const url = str(merged.url);
|
||||||
|
const state = str(merged.state);
|
||||||
|
const idx = num(merged.idx);
|
||||||
|
if (!jobId || !url || !isItemState(state) || idx == null) return null;
|
||||||
|
return {
|
||||||
|
id: recordId,
|
||||||
|
userId,
|
||||||
|
spaceId: optStr(merged.spaceId),
|
||||||
|
jobId,
|
||||||
|
idx,
|
||||||
|
url,
|
||||||
|
state,
|
||||||
|
articleId: optStr(merged.articleId),
|
||||||
|
warning: merged.warning === 'probable_consent_wall' ? 'probable_consent_wall' : null,
|
||||||
|
error: optStr(merged.error),
|
||||||
|
attempts: num(merged.attempts) ?? 0,
|
||||||
|
lastAttemptAt: optStr(merged.lastAttemptAt),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function isJobStatus(s: string): s is ImportJobRow['status'] {
|
||||||
|
return s === 'queued' || s === 'running' || s === 'paused' || s === 'done' || s === 'cancelled';
|
||||||
|
}
|
||||||
|
|
||||||
|
function isItemState(s: string): s is ImportItemState {
|
||||||
|
return (
|
||||||
|
s === 'pending' ||
|
||||||
|
s === 'extracting' ||
|
||||||
|
s === 'extracted' ||
|
||||||
|
s === 'saved' ||
|
||||||
|
s === 'duplicate' ||
|
||||||
|
s === 'consent-wall' ||
|
||||||
|
s === 'error' ||
|
||||||
|
s === 'cancelled'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function num(v: unknown): number | null {
|
||||||
|
return typeof v === 'number' && Number.isFinite(v) ? v : null;
|
||||||
|
}
|
||||||
|
function str(v: unknown): string {
|
||||||
|
return typeof v === 'string' ? v : '';
|
||||||
|
}
|
||||||
|
function optStr(v: unknown): string | null {
|
||||||
|
return typeof v === 'string' && v ? v : null;
|
||||||
|
}
|
||||||
236
apps/api/src/modules/articles/import-worker.ts
Normal file
236
apps/api/src/modules/articles/import-worker.ts
Normal file
|
|
@ -0,0 +1,236 @@
|
||||||
|
/**
|
||||||
|
* Articles Bulk-Import — background worker.
|
||||||
|
*
|
||||||
|
* Boots from `apps/api/src/index.ts`. On every tick:
|
||||||
|
*
|
||||||
|
* 1. Try `pg_try_advisory_xact_lock` on a fixed key. If another
|
||||||
|
* apps/api instance already holds it, skip this tick. The lock
|
||||||
|
* is per-transaction so we never need a heartbeat — a crashed
|
||||||
|
* worker's tx auto-aborts and the next tick claims it cleanly.
|
||||||
|
* 2. Project the live state of `articleImportJobs` and pick the
|
||||||
|
* ones still 'queued' or 'running'.
|
||||||
|
* 3. For each job: project items, take up to N pending items,
|
||||||
|
* extract concurrently. Each extraction writes a Pickup row +
|
||||||
|
* flips the item state via `import-extractor.ts`.
|
||||||
|
* 4. Fold terminal item states into job counters
|
||||||
|
* (savedCount / duplicateCount / errorCount / warningCount).
|
||||||
|
* When every item is terminal, flip the job to 'done'.
|
||||||
|
*
|
||||||
|
* No own state — every meaningful transition is a `sync_changes` row.
|
||||||
|
* The worker is therefore stateless across restarts.
|
||||||
|
*
|
||||||
|
* Plan: docs/plans/articles-bulk-import.md.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { getSyncConnection } from '../../mcp/sync-db';
|
||||||
|
import {
|
||||||
|
listClaimableJobs,
|
||||||
|
listItemsForJob,
|
||||||
|
type ImportItemRow,
|
||||||
|
type ImportJobRow,
|
||||||
|
} from './import-projection';
|
||||||
|
import { extractOneItem, writeItemUpdate, writeJobUpdate } from './import-extractor';
|
||||||
|
|
||||||
|
const TICK_INTERVAL_MS = 2_000;
|
||||||
|
const PER_JOB_CONCURRENCY = 3;
|
||||||
|
/** Fixed int8 lock key — derived from the ASCII bytes of 'ARTI'. */
|
||||||
|
const ADVISORY_LOCK_KEY = 0x4152_5449;
|
||||||
|
|
||||||
|
let timer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
let running = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the recurring tick. Idempotent — safe to call multiple times.
|
||||||
|
* Intended to be called once from `apps/api/src/index.ts` at boot.
|
||||||
|
*
|
||||||
|
* Disable via `ARTICLES_IMPORT_WORKER_DISABLED=true` (for tests, or
|
||||||
|
* when running multiple apps/api instances and you want to designate
|
||||||
|
* a different one as the worker).
|
||||||
|
*/
|
||||||
|
export function startArticleImportWorker(): void {
|
||||||
|
if (timer) return;
|
||||||
|
if (process.env.ARTICLES_IMPORT_WORKER_DISABLED === 'true') {
|
||||||
|
console.log('[articles-import] worker disabled via env');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
console.log(
|
||||||
|
`[articles-import] worker starting — tick=${TICK_INTERVAL_MS}ms, concurrency=${PER_JOB_CONCURRENCY}`
|
||||||
|
);
|
||||||
|
timer = setInterval(() => {
|
||||||
|
void runTickGuarded();
|
||||||
|
}, TICK_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function stopArticleImportWorker(): void {
|
||||||
|
if (timer) {
|
||||||
|
clearInterval(timer);
|
||||||
|
timer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function runTickGuarded(): Promise<void> {
|
||||||
|
if (running) return;
|
||||||
|
running = true;
|
||||||
|
try {
|
||||||
|
await runTickOnce();
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[articles-import] tick error:', err);
|
||||||
|
} finally {
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* One tick body. Exported for tests + a potential
|
||||||
|
* `/internal/articles-import/tick`-style admin route.
|
||||||
|
*/
|
||||||
|
export async function runTickOnce(): Promise<{
|
||||||
|
skipped: boolean;
|
||||||
|
jobsConsidered: number;
|
||||||
|
itemsProcessed: number;
|
||||||
|
}> {
|
||||||
|
if (!(await tryAcquireLock())) {
|
||||||
|
return { skipped: true, jobsConsidered: 0, itemsProcessed: 0 };
|
||||||
|
}
|
||||||
|
const jobs = await listClaimableJobs();
|
||||||
|
let itemsProcessed = 0;
|
||||||
|
for (const job of jobs) {
|
||||||
|
itemsProcessed += await processOneJob(job);
|
||||||
|
}
|
||||||
|
return { skipped: false, jobsConsidered: jobs.length, itemsProcessed };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Brief advisory-lock probe via a single short transaction. Returns
|
||||||
|
* true if we won the probe — that's a soft signal for "you're the
|
||||||
|
* worker for this tick"; the lock releases as the probe tx commits.
|
||||||
|
* For multi-instance deploys this is a soft-only coordination — if
|
||||||
|
* two probes happen to interleave their work, the field-LWW merge on
|
||||||
|
* the client still produces a coherent state.
|
||||||
|
*/
|
||||||
|
async function tryAcquireLock(): Promise<boolean> {
|
||||||
|
const sql = getSyncConnection();
|
||||||
|
let acquired = false;
|
||||||
|
await sql.begin(async (tx) => {
|
||||||
|
const rows = await tx<{ acquired: boolean }[]>`
|
||||||
|
SELECT pg_try_advisory_xact_lock(${ADVISORY_LOCK_KEY}) AS acquired
|
||||||
|
`;
|
||||||
|
acquired = rows[0]?.acquired === true;
|
||||||
|
});
|
||||||
|
return acquired;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function processOneJob(job: ImportJobRow): Promise<number> {
|
||||||
|
const items = await listItemsForJob(job.userId, job.id);
|
||||||
|
|
||||||
|
// Flip 'queued' → 'running' so the UI shows progress.
|
||||||
|
if (job.status === 'queued') {
|
||||||
|
await writeJobUpdate(job.userId, job.id, {
|
||||||
|
status: 'running',
|
||||||
|
startedAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Counter-derivation from current item states.
|
||||||
|
const counts = countByState(items);
|
||||||
|
const counterPatch: Record<string, unknown> = {};
|
||||||
|
let dirty = false;
|
||||||
|
if (counts.saved !== job.savedCount) {
|
||||||
|
counterPatch.savedCount = counts.saved;
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
if (counts.duplicate !== job.duplicateCount) {
|
||||||
|
counterPatch.duplicateCount = counts.duplicate;
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
if (counts.error !== job.errorCount) {
|
||||||
|
counterPatch.errorCount = counts.error;
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
if (counts.consentWall !== job.warningCount) {
|
||||||
|
counterPatch.warningCount = counts.consentWall;
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
if (counts.allTerminal && job.status !== 'done') {
|
||||||
|
counterPatch.status = 'done';
|
||||||
|
counterPatch.finishedAt = new Date().toISOString();
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
|
if (dirty) {
|
||||||
|
await writeJobUpdate(job.userId, job.id, counterPatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (counts.allTerminal) return 0;
|
||||||
|
|
||||||
|
// Cancelled → flip every still-pending item to 'cancelled'.
|
||||||
|
if (job.status === 'cancelled') {
|
||||||
|
const pending = items.filter((i) => i.state === 'pending');
|
||||||
|
for (const it of pending) {
|
||||||
|
await writeItemUpdate(it.userId, it.id, { state: 'cancelled' });
|
||||||
|
}
|
||||||
|
return pending.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Paused → already-extracting items finish their roundtrip; nothing
|
||||||
|
// new gets claimed.
|
||||||
|
if (job.status === 'paused') return 0;
|
||||||
|
|
||||||
|
// Running → claim up to PER_JOB_CONCURRENCY pending items in
|
||||||
|
// parallel. We deliberately don't try to rescue 'extracting' items:
|
||||||
|
// if a worker died mid-fetch they stay 'extracting' forever for
|
||||||
|
// now. Future polish: time-out 'extracting' rows older than ~5min
|
||||||
|
// and bounce them back to 'pending'.
|
||||||
|
const claimable = items.filter((i) => i.state === 'pending').slice(0, PER_JOB_CONCURRENCY);
|
||||||
|
if (claimable.length === 0) return 0;
|
||||||
|
|
||||||
|
await Promise.allSettled(claimable.map((it) => extractOneItem(it)));
|
||||||
|
return claimable.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface StateCounts {
|
||||||
|
saved: number;
|
||||||
|
duplicate: number;
|
||||||
|
error: number;
|
||||||
|
consentWall: number;
|
||||||
|
cancelled: number;
|
||||||
|
allTerminal: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
function countByState(items: readonly ImportItemRow[]): StateCounts {
|
||||||
|
let saved = 0;
|
||||||
|
let duplicate = 0;
|
||||||
|
let error = 0;
|
||||||
|
let consentWall = 0;
|
||||||
|
let cancelled = 0;
|
||||||
|
let nonTerminal = 0;
|
||||||
|
for (const it of items) {
|
||||||
|
switch (it.state) {
|
||||||
|
case 'saved':
|
||||||
|
saved++;
|
||||||
|
break;
|
||||||
|
case 'duplicate':
|
||||||
|
duplicate++;
|
||||||
|
break;
|
||||||
|
case 'error':
|
||||||
|
error++;
|
||||||
|
break;
|
||||||
|
case 'consent-wall':
|
||||||
|
saved++; // consent-wall is "saved with warning" semantically
|
||||||
|
consentWall++;
|
||||||
|
break;
|
||||||
|
case 'cancelled':
|
||||||
|
cancelled++;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
nonTerminal++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
saved,
|
||||||
|
duplicate,
|
||||||
|
error,
|
||||||
|
consentWall,
|
||||||
|
cancelled,
|
||||||
|
allTerminal: items.length > 0 && nonTerminal === 0,
|
||||||
|
};
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue