mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-21 09:36:42 +02:00
chore(mana+api): articles + Backend-Worker raus, pageta trägt allein
Some checks are pending
CD Mac Mini / Detect Changes (push) Waiting to run
CD Mac Mini / Deploy (push) Blocked by required conditions
CI / Detect Changes (push) Waiting to run
CI / Validate (push) Waiting to run
CI / Build mana-search (push) Blocked by required conditions
CI / Build mana-sync (push) Blocked by required conditions
CI / Build mana-api-gateway (push) Blocked by required conditions
CI / Build mana-crawler (push) Blocked by required conditions
Docker Validate / Validate Dockerfiles (push) Waiting to run
Docker Validate / Build calendar-web (push) Blocked by required conditions
Docker Validate / Build quotes-web (push) Blocked by required conditions
Docker Validate / Build todo-backend (push) Blocked by required conditions
Docker Validate / Build todo-web (push) Blocked by required conditions
Docker Validate / Build mana-auth (push) Blocked by required conditions
Docker Validate / Build mana-sync (push) Blocked by required conditions
Docker Validate / Build mana-media (push) Blocked by required conditions
Mirror to Forgejo / Push to Forgejo (push) Waiting to run
Some checks are pending
CD Mac Mini / Detect Changes (push) Waiting to run
CD Mac Mini / Deploy (push) Blocked by required conditions
CI / Detect Changes (push) Waiting to run
CI / Validate (push) Waiting to run
CI / Build mana-search (push) Blocked by required conditions
CI / Build mana-sync (push) Blocked by required conditions
CI / Build mana-api-gateway (push) Blocked by required conditions
CI / Build mana-crawler (push) Blocked by required conditions
Docker Validate / Validate Dockerfiles (push) Waiting to run
Docker Validate / Build calendar-web (push) Blocked by required conditions
Docker Validate / Build quotes-web (push) Blocked by required conditions
Docker Validate / Build todo-backend (push) Blocked by required conditions
Docker Validate / Build todo-web (push) Blocked by required conditions
Docker Validate / Build mana-auth (push) Blocked by required conditions
Docker Validate / Build mana-sync (push) Blocked by required conditions
Docker Validate / Build mana-media (push) Blocked by required conditions
Mirror to Forgejo / Push to Forgejo (push) Waiting to run
Pageta ist seit 2026-05-17 standalone live (pageta.mana.how + pageta.com, voll-featured laut STATUS.md) und deckt alle Articles-Module-Features ab + mehr (research, reactions, feed, share, snapshot, preferences). Keine User-Daten im managarten/articles-Modul (Till bestätigt). Frontend entfernt: - apps/mana/apps/web/src/routes/(app)/articles/ (9 Routes inkl. (tabs), [id], add, import, import/[jobId], settings) - apps/mana/apps/web/src/lib/modules/articles/ (5 Stores, Queries, Collections, Types, Tools, Components, Widgets, ArticlesTabShell, consume-pickup, tab-context, parse-urls) - apps/mana/apps/web/src/lib/i18n/locales/articles/ (DE/EN/ES/FR/IT) Backend entfernt: - apps/api/src/modules/articles/ (routes, import-worker, import-projection, import-extractor, consent-wall, field-meta, plus Tests) - apps/api/src/index.ts: articlesRoutes + startArticleImportWorker raus - apps/api/src/lib/metrics.ts: 5 articles-Metrics raus (articlesImportTicks/Items/Extract/JobsCompleted/PickupGc) "Save-to-Articles"-Features in anderen Modulen entfernt (User kann später direkt in pageta speichern via Share-Sheet): - news-research/ListView + routes/(app)/news-research/+page.svelte: "Speichern"-Button raus - writing/tools.ts: save_draft_as_article-Tool raus - writing/components/ExportMenu.svelte: "Als Artikel speichern"-Option raus - writing/components/ReferencePicker.svelte: 'article'-Mode raus - writing/components/ReferenceChip.svelte: KIND_ICON/LABEL ohne 'article' - writing/utils/reference-resolver.ts: resolveArticle + 'article'-case raus - writing/utils/reference-resolver.test.ts: kind: 'article' → 'note' in Aggregate-Budget-Tests - writing/utils/prompt-builder.test.ts: 'article'-Resolved-Reference raus - writing/views/DetailView.svelte: 'articles'-published-Chip raus - writing/types.ts: DraftReferenceKind ohne 'article', DraftPublishModule ohne 'articles' Aktualisiert (Cross-Refs raus): - module-registry.ts (articlesModuleConfig) - module-registry.test.ts (articles-Tabellen + sync-name-Mappings) - data-layer-listeners.ts (startArticlePickupConsumer) - app-registry/apps.ts (registerApp 'articles') - packages/shared-branding/src/mana-apps.ts (articles-Eintrag) - components/dashboard/widget-registry.ts (ArticlesUnreadWidget) - types/dashboard.ts (WidgetType 'articles-unread') - data/crypto/registry.ts (LocalArticle/LocalHighlight) - data/crypto/plaintext-allowlist.ts (articleTags/articleImportJobs/ articleImportItems/articleExtractPickup) - data/tools/init.ts (articlesTools) NICHT angefasst (mit Absicht): - data/database.ts db.version()-Stores — Schema-Snapshots sind frozen. Tabellen articles, articleHighlights, articleTags, articleImportJobs, articleImportItems, articleExtractPickup bleiben im IndexedDB-Schema, werden aber nicht mehr beschrieben. - packages/shared-branding/src/app-icons.ts APP_ICONS.articles (für Native-PNG-Generator, harmlos). - apps/api/src/lib/sync-db.ts Z6 Kommentar (historisches Beispiel). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
001548c74d
commit
0112161e78
82 changed files with 21 additions and 9057 deletions
|
|
@ -1,47 +0,0 @@
|
|||
import { describe, it, expect } from 'bun:test';
|
||||
import { looksLikeConsentWall } from './consent-wall';
|
||||
|
||||
describe('looksLikeConsentWall', () => {
|
||||
it('flags short text containing German consent vocabulary', () => {
|
||||
const text =
|
||||
'Cookies zustimmen — Wir und unsere Partner speichern Informationen auf einem Endgerät.';
|
||||
expect(looksLikeConsentWall(text, 14)).toBe(true);
|
||||
});
|
||||
|
||||
it('flags short English consent dialogs', () => {
|
||||
const text = 'Please accept all cookies to continue using this website.';
|
||||
expect(looksLikeConsentWall(text, 9)).toBe(true);
|
||||
});
|
||||
|
||||
it('flags JavaScript-disabled walls', () => {
|
||||
const text = 'JavaScript is disabled. Please enable JavaScript to continue.';
|
||||
expect(looksLikeConsentWall(text, 7)).toBe(true);
|
||||
});
|
||||
|
||||
it('does NOT flag long articles even if they mention cookies', () => {
|
||||
// Long-form article that happens to mention cookies in body. The
|
||||
// heuristic only fires below the wordcount threshold (300) so a
|
||||
// real article about cookies isn't misclassified.
|
||||
const text = 'cookie consent ' + 'lorem '.repeat(400);
|
||||
expect(looksLikeConsentWall(text, 800)).toBe(false);
|
||||
});
|
||||
|
||||
it('does NOT flag short text without consent vocabulary', () => {
|
||||
const text = 'A short blog post about hiking trails in the Black Forest.';
|
||||
expect(looksLikeConsentWall(text, 11)).toBe(false);
|
||||
});
|
||||
|
||||
it('is case-insensitive', () => {
|
||||
const text = 'COOKIES ZUSTIMMEN — KLICKE HIER';
|
||||
expect(looksLikeConsentWall(text, 4)).toBe(true);
|
||||
});
|
||||
|
||||
it('returns false on empty content', () => {
|
||||
expect(looksLikeConsentWall('', 0)).toBe(false);
|
||||
});
|
||||
|
||||
it('returns false at exactly the wordcount threshold (boundary check)', () => {
|
||||
const text = 'cookie consent ' + 'lorem '.repeat(300);
|
||||
expect(looksLikeConsentWall(text, 300)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
/**
|
||||
* Consent-wall heuristic shared by every server-side article-extract
|
||||
* path:
|
||||
* - `/api/v1/articles/extract` and `/extract/html` (single-URL)
|
||||
* - The bulk-import worker's `extractOneItem` (background)
|
||||
*
|
||||
* When the extracted text is suspiciously short AND contains GDPR /
|
||||
* cookie-consent vocabulary, the server's anonymous fetch most likely
|
||||
* hit a consent dialog instead of the article itself. The caller can
|
||||
* use the flag to nudge the user toward the browser-HTML bookmarklet
|
||||
* (which fetches with the user's existing session cookies) rather
|
||||
* than silently persisting the GDPR overlay text as the article body.
|
||||
*/
|
||||
|
||||
const CONSENT_KEYWORDS = [
|
||||
'cookies zustimmen',
|
||||
'cookie consent',
|
||||
'zustimmung',
|
||||
'accept all cookies',
|
||||
'consent to the use',
|
||||
'enable javascript',
|
||||
'javascript is disabled',
|
||||
'please enable',
|
||||
'privacy center',
|
||||
'datenschutzeinstellungen',
|
||||
'datenschutzeinstellungen',
|
||||
];
|
||||
|
||||
/** Wordcount floor below which the heuristic is considered. Real
|
||||
* articles are typically >300 words; consent dialogs are <50. */
|
||||
const CONSENT_WORDCOUNT_THRESHOLD = 300;
|
||||
|
||||
export function looksLikeConsentWall(content: string, wordCount: number): boolean {
|
||||
if (wordCount >= CONSENT_WORDCOUNT_THRESHOLD) return false;
|
||||
const haystack = content.toLowerCase();
|
||||
return CONSENT_KEYWORDS.some((needle) => haystack.includes(needle));
|
||||
}
|
||||
|
|
@ -1,51 +0,0 @@
|
|||
import { describe, it, expect } from 'bun:test';
|
||||
import { fieldMetaTime } from './field-meta';
|
||||
|
||||
describe('fieldMetaTime — wire-shape adapter for sync_changes.field_meta', () => {
|
||||
it('passes through legacy plain ISO strings unchanged', () => {
|
||||
expect(fieldMetaTime('2026-04-28T21:14:30.000Z')).toBe('2026-04-28T21:14:30.000Z');
|
||||
});
|
||||
|
||||
it('extracts the `at` field from F3 object stamps', () => {
|
||||
expect(
|
||||
fieldMetaTime({
|
||||
at: '2026-04-28T21:14:30.000Z',
|
||||
actor: { kind: 'system', principalId: 'system:foo', displayName: 'Foo' },
|
||||
origin: 'system',
|
||||
})
|
||||
).toBe('2026-04-28T21:14:30.000Z');
|
||||
});
|
||||
|
||||
it('returns "" for undefined / null (so callers can fall back)', () => {
|
||||
expect(fieldMetaTime(undefined)).toBe('');
|
||||
expect(fieldMetaTime(null)).toBe('');
|
||||
});
|
||||
|
||||
it('returns "" for malformed objects without an at-string', () => {
|
||||
expect(fieldMetaTime({})).toBe('');
|
||||
expect(fieldMetaTime({ at: 12345 })).toBe('');
|
||||
expect(fieldMetaTime({ at: null })).toBe('');
|
||||
});
|
||||
|
||||
it('returns "" for non-string non-object inputs', () => {
|
||||
expect(fieldMetaTime(42)).toBe('');
|
||||
expect(fieldMetaTime(true)).toBe('');
|
||||
expect(fieldMetaTime([])).toBe('');
|
||||
});
|
||||
|
||||
// Regression: this is the bug that triggered the cross-service fix.
|
||||
// Before fieldMetaTime, a string >= object compare evaluated to false
|
||||
// stably and the older value won. Now both shapes fold to comparable
|
||||
// ISO strings.
|
||||
it('makes string-vs-object comparison work correctly across both shapes', () => {
|
||||
const earlierLegacy = '2026-04-28T21:00:00.000Z';
|
||||
const laterF3 = {
|
||||
at: '2026-04-28T22:00:00.000Z',
|
||||
actor: { kind: 'user', principalId: 'u', displayName: 'Du' },
|
||||
origin: 'user',
|
||||
};
|
||||
// The F3 stamp is later in time, so its normalised form must
|
||||
// compare strictly greater than the legacy stamp.
|
||||
expect(fieldMetaTime(laterF3) > fieldMetaTime(earlierLegacy)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,32 +0,0 @@
|
|||
/**
|
||||
* Wire-shape adapter for `sync_changes.field_meta`.
|
||||
*
|
||||
* Two shapes coexist on the wire today:
|
||||
*
|
||||
* - Legacy plaintext writes: { state: 'ISO-8601' }
|
||||
* - Field-meta-overhaul (F3): { state: { at, actor, origin } }
|
||||
*
|
||||
* Any LWW projection that string-compares per-field timestamps MUST
|
||||
* fold both into a comparable form, otherwise the moment one side is
|
||||
* an F3 object the comparison becomes `'[object Object]' >= 'ISO…'`
|
||||
* (false), the older value wins and the projection lies.
|
||||
*
|
||||
* Sister helper at `services/mana-ai/src/db/field-meta.ts` — same
|
||||
* logic, deliberately duplicated. Both services treat sync_changes as
|
||||
* a read-only event log; sharing infrastructure code across services
|
||||
* (apps/api ↔ services/mana-ai) is out of scope.
|
||||
*/
|
||||
|
||||
/** Returns the ISO-string timestamp of a single `field_meta[k]` slot,
|
||||
* regardless of whether the wire format is the legacy plain string
|
||||
* or the F3 `{ at, actor, origin }` object. Returns the empty string
|
||||
* when no usable value is present so the LWW comparison treats the
|
||||
* field as never-stamped (callers fall back to row.created_at). */
|
||||
export function fieldMetaTime(meta: unknown): string {
|
||||
if (typeof meta === 'string') return meta;
|
||||
if (meta && typeof meta === 'object') {
|
||||
const at = (meta as { at?: unknown }).at;
|
||||
if (typeof at === 'string') return at;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
|
@ -1,226 +0,0 @@
|
|||
/**
|
||||
* Articles Bulk-Import — per-item extraction + write-back.
|
||||
*
|
||||
* For one `articleImportItems` row in state='pending':
|
||||
*
|
||||
* 1. Flip to state='extracting' (so other ticks / the UI see progress).
|
||||
* 2. Run `extractFromUrl` against the URL.
|
||||
* 3a. On success → write a `articleExtractPickup` row carrying the
|
||||
* full ExtractedArticle payload + flip the item to 'extracted'.
|
||||
* The client-side pickup-consumer picks it up, encrypts the
|
||||
* article into the user's IndexedDB, and flips the item to 'saved'
|
||||
* (or 'consent-wall' if the warning fired).
|
||||
* 3b. On failure → bump `attempts`, flip back to 'pending' if
|
||||
* attempts < 3, else flip to state='error' with the technical
|
||||
* error message.
|
||||
*
|
||||
* Every state-change is one `sync_changes` row attributed to the
|
||||
* `system:articles-import-worker` actor (built inline below — kept out
|
||||
* of the shared-ai SystemSource union for now to keep the worker self-
|
||||
* contained; can be hoisted later). Origin is `'system'` so the
|
||||
* conflict-detection gate on the client doesn't surface these as
|
||||
* user-visible toasts.
|
||||
*
|
||||
* Plan: docs/plans/articles-bulk-import.md.
|
||||
*/
|
||||
|
||||
import { extractFromUrl } from '@mana/shared-rss';
|
||||
import {
|
||||
makeFieldMeta,
|
||||
makeSystemActor,
|
||||
originFromActor,
|
||||
SYSTEM_ARTICLES_IMPORT_WORKER,
|
||||
type Actor,
|
||||
type FieldOrigin,
|
||||
} from '@mana/shared-ai';
|
||||
import { getSyncConnection } from '../../lib/sync-db';
|
||||
import { articlesImportExtractDuration, articlesImportItemsTotal } from '../../lib/metrics';
|
||||
import { looksLikeConsentWall } from './consent-wall';
|
||||
import type { ImportItemRow } from './import-projection';
|
||||
|
||||
const MAX_ATTEMPTS = 3;
|
||||
const CLIENT_ID = 'articles-import-worker';
|
||||
|
||||
/** System-actor blob stamped on every worker write — sourced from the
|
||||
* blessed SystemSource union in @mana/shared-ai so the actor.ts audit
|
||||
* + Workbench filters know about it. */
|
||||
const WORKER_ACTOR: Actor = makeSystemActor(SYSTEM_ARTICLES_IMPORT_WORKER);
|
||||
const WORKER_ORIGIN: FieldOrigin = originFromActor(WORKER_ACTOR);
|
||||
|
||||
export interface ExtractStats {
|
||||
itemId: string;
|
||||
terminal: 'pending' | 'extracted' | 'error';
|
||||
}
|
||||
|
||||
/**
|
||||
* Run one extraction round-trip for a single item. Idempotent at the
|
||||
* sync_changes level — if two ticks race the same item the field-LWW
|
||||
* merge yields a single coherent state on the client.
|
||||
*/
|
||||
export async function extractOneItem(item: ImportItemRow): Promise<ExtractStats> {
|
||||
if (item.state !== 'pending') {
|
||||
return {
|
||||
itemId: item.id,
|
||||
terminal: item.state === 'error' ? 'error' : 'extracted',
|
||||
};
|
||||
}
|
||||
|
||||
// Step 1 — claim. Flip the item to 'extracting' before the slow
|
||||
// fetch so concurrent ticks (and the UI) see we own it.
|
||||
const nowClaim = new Date().toISOString();
|
||||
await writeItemUpdate(item.userId, item.id, {
|
||||
state: 'extracting',
|
||||
lastAttemptAt: nowClaim,
|
||||
attempts: item.attempts + 1,
|
||||
});
|
||||
|
||||
// Step 2 — fetch + parse. Hard-failure path returns null; we treat
|
||||
// that as a single failed attempt and recycle.
|
||||
const extractStart = Date.now();
|
||||
const extracted = await extractFromUrl(item.url);
|
||||
articlesImportExtractDuration.observe((Date.now() - extractStart) / 1000);
|
||||
const nowDone = new Date().toISOString();
|
||||
|
||||
if (!extracted) {
|
||||
const nextAttempts = item.attempts + 1;
|
||||
const nextState = nextAttempts >= MAX_ATTEMPTS ? 'error' : 'pending';
|
||||
await writeItemUpdate(item.userId, item.id, {
|
||||
state: nextState,
|
||||
error: nextState === 'error' ? 'Extraktion fehlgeschlagen nach mehreren Versuchen.' : null,
|
||||
lastAttemptAt: nowDone,
|
||||
});
|
||||
if (nextState === 'error') {
|
||||
articlesImportItemsTotal.inc({ result: 'error' });
|
||||
}
|
||||
return { itemId: item.id, terminal: nextState === 'error' ? 'error' : 'pending' };
|
||||
}
|
||||
|
||||
// Step 3 — write the Pickup row (server payload for the client) and
|
||||
// flip item state to 'extracted' so the consume-pickup path picks it
|
||||
// up. Pickup row first so a client liveQuery seeing the 'extracted'
|
||||
// state can immediately find the matching pickup payload.
|
||||
const pickupId = `pickup-${item.id}`;
|
||||
const wordCount = extracted.wordCount ?? 0;
|
||||
const readingTimeMinutes = extracted.readingTimeMinutes ?? 0;
|
||||
const warning = looksLikeConsentWall(extracted.content, wordCount)
|
||||
? 'probable_consent_wall'
|
||||
: null;
|
||||
|
||||
await writePickupInsert(item.userId, pickupId, {
|
||||
itemId: item.id,
|
||||
spaceId: item.spaceId ?? null,
|
||||
payload: {
|
||||
originalUrl: item.url,
|
||||
title: extracted.title ?? '',
|
||||
excerpt: extracted.excerpt ?? null,
|
||||
content: extracted.content,
|
||||
htmlContent: extracted.htmlContent ?? '',
|
||||
author: extracted.byline ?? null,
|
||||
siteName: extracted.siteName ?? null,
|
||||
wordCount,
|
||||
readingTimeMinutes,
|
||||
...(warning && { warning }),
|
||||
},
|
||||
});
|
||||
|
||||
await writeItemUpdate(item.userId, item.id, {
|
||||
state: 'extracted',
|
||||
warning,
|
||||
error: null,
|
||||
lastAttemptAt: nowDone,
|
||||
});
|
||||
|
||||
articlesImportItemsTotal.inc({ result: warning ? 'consent_wall' : 'extracted' });
|
||||
return { itemId: item.id, terminal: 'extracted' };
|
||||
}
|
||||
|
||||
// ─── Sync-changes write helpers (worker-attributed) ──────────
|
||||
|
||||
/**
|
||||
* Worker-attributed update on an `articleImportItems` row. Exported so
|
||||
* the worker tick can flip pending items to 'cancelled' when the parent
|
||||
* job is cancelled, without going through the extraction pipeline.
|
||||
*/
|
||||
export async function writeItemUpdate(
|
||||
userId: string,
|
||||
itemId: string,
|
||||
patch: Record<string, unknown>
|
||||
): Promise<void> {
|
||||
await insertSyncChange({
|
||||
userId,
|
||||
recordId: itemId,
|
||||
appId: 'articles',
|
||||
tableName: 'articleImportItems',
|
||||
op: 'update',
|
||||
data: patch,
|
||||
});
|
||||
}
|
||||
|
||||
async function writePickupInsert(
|
||||
userId: string,
|
||||
pickupId: string,
|
||||
data: Record<string, unknown>
|
||||
): Promise<void> {
|
||||
await insertSyncChange({
|
||||
userId,
|
||||
recordId: pickupId,
|
||||
appId: 'articles',
|
||||
tableName: 'articleExtractPickup',
|
||||
op: 'insert',
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker-attributed update on an `articleImportJobs` row. Counter-only
|
||||
* for now (savedCount, errorCount, …) plus status flips like
|
||||
* 'queued' → 'running' and 'running' → 'done'.
|
||||
*/
|
||||
export async function writeJobUpdate(
|
||||
userId: string,
|
||||
jobId: string,
|
||||
patch: Record<string, unknown>
|
||||
): Promise<void> {
|
||||
await insertSyncChange({
|
||||
userId,
|
||||
recordId: jobId,
|
||||
appId: 'articles',
|
||||
tableName: 'articleImportJobs',
|
||||
op: 'update',
|
||||
data: patch,
|
||||
});
|
||||
}
|
||||
|
||||
interface InsertParams {
|
||||
userId: string;
|
||||
recordId: string;
|
||||
appId: string;
|
||||
tableName: string;
|
||||
op: 'insert' | 'update' | 'delete';
|
||||
data: Record<string, unknown>;
|
||||
}
|
||||
|
||||
async function insertSyncChange(params: InsertParams): Promise<void> {
|
||||
const sql = getSyncConnection();
|
||||
const now = new Date().toISOString();
|
||||
const fieldMeta: Record<string, unknown> = {};
|
||||
for (const key of Object.keys(params.data)) {
|
||||
fieldMeta[key] = makeFieldMeta(now, WORKER_ACTOR, WORKER_ORIGIN);
|
||||
}
|
||||
const actorJson = WORKER_ACTOR as unknown;
|
||||
const dataJson = params.data as unknown;
|
||||
const fmJson = fieldMeta as unknown;
|
||||
await sql.begin(async (tx) => {
|
||||
await tx`SELECT set_config('app.current_user_id', ${params.userId}, true)`;
|
||||
await tx`
|
||||
INSERT INTO sync_changes
|
||||
(app_id, table_name, record_id, user_id, op, data, field_meta, client_id, schema_version, actor, origin)
|
||||
VALUES
|
||||
(${params.appId}, ${params.tableName}, ${params.recordId}, ${params.userId}, ${params.op},
|
||||
${tx.json(dataJson as never)}, ${tx.json(fmJson as never)},
|
||||
${CLIENT_ID}, 1, ${tx.json(actorJson as never)}, ${WORKER_ORIGIN})
|
||||
`;
|
||||
});
|
||||
}
|
||||
|
||||
// looksLikeConsentWall lives in ./consent-wall.ts — shared with routes.ts.
|
||||
|
|
@ -1,250 +0,0 @@
|
|||
/**
|
||||
* Articles Bulk-Import — sync_changes → live record projection.
|
||||
*
|
||||
* Mirror of `services/mana-ai/src/db/missions-projection.ts` and
|
||||
* `apps/api/src/lib/sync-db.ts:readLatestRecords()`, specialised for the
|
||||
* two tables the import-worker tick reads each cycle:
|
||||
*
|
||||
* articleImportJobs — to find running jobs whose lease is free
|
||||
* articleImportItems — to find pending items inside those jobs
|
||||
*
|
||||
* No materialized snapshots yet — this is the simple "replay every row
|
||||
* for these tables" path. The total volume is small (a few hundred rows
|
||||
* per active job, all import history per user) and the worker tick is
|
||||
* the only consumer. If the table grows we can plug in the same
|
||||
* `mission_snapshots` pattern mana-ai uses; the projection API stays
|
||||
* the same.
|
||||
*
|
||||
* Plan: docs/plans/articles-bulk-import.md.
|
||||
*/
|
||||
|
||||
import { getSyncConnection } from '../../lib/sync-db';
|
||||
import { fieldMetaTime } from './field-meta';
|
||||
|
||||
type Row = Record<string, unknown>;
|
||||
interface ChangeRow {
|
||||
user_id: string;
|
||||
record_id: string;
|
||||
op: string;
|
||||
data: Row | null;
|
||||
/** See `./field-meta.ts` — wire shape is two-tone (legacy ISO string
|
||||
* vs. F3 `{at, actor, origin}` object). */
|
||||
field_meta: Record<string, unknown> | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
export interface ImportJobRow {
|
||||
id: string;
|
||||
userId: string;
|
||||
spaceId: string | null;
|
||||
totalUrls: number;
|
||||
status: 'queued' | 'running' | 'paused' | 'done' | 'cancelled';
|
||||
startedAt: string | null;
|
||||
finishedAt: string | null;
|
||||
savedCount: number;
|
||||
duplicateCount: number;
|
||||
errorCount: number;
|
||||
warningCount: number;
|
||||
}
|
||||
|
||||
export type ImportItemState =
|
||||
| 'pending'
|
||||
| 'extracting'
|
||||
| 'extracted'
|
||||
| 'saved'
|
||||
| 'duplicate'
|
||||
| 'consent-wall'
|
||||
| 'error'
|
||||
| 'cancelled';
|
||||
|
||||
export interface ImportItemRow {
|
||||
id: string;
|
||||
userId: string;
|
||||
spaceId: string | null;
|
||||
jobId: string;
|
||||
idx: number;
|
||||
url: string;
|
||||
state: ImportItemState;
|
||||
articleId: string | null;
|
||||
warning: 'probable_consent_wall' | null;
|
||||
error: string | null;
|
||||
attempts: number;
|
||||
lastAttemptAt: string | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cross-user scan: which jobs need attention this tick. RLS is
|
||||
* intentionally NOT applied — the worker is a privileged consumer that
|
||||
* needs to see all users' running jobs in one pass. Per-user RLS
|
||||
* scoping is applied on the write-back path in import-extractor.ts.
|
||||
*/
|
||||
export async function listClaimableJobs(): Promise<ImportJobRow[]> {
|
||||
const sql = getSyncConnection();
|
||||
const rows = await sql<ChangeRow[]>`
|
||||
SELECT user_id, record_id, op, data, field_meta, created_at
|
||||
FROM sync_changes
|
||||
WHERE app_id = 'articles' AND table_name = 'articleImportJobs'
|
||||
ORDER BY user_id, record_id, created_at ASC
|
||||
`;
|
||||
const out: ImportJobRow[] = [];
|
||||
for (const m of mergeByUserAndRecord(rows).values()) {
|
||||
const job = projectJob(m.userId, m.recordId, m.merged);
|
||||
if (!job) continue;
|
||||
if (job.status !== 'running' && job.status !== 'queued') continue;
|
||||
out.push(job);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-job item scan. Returns ALL items so the worker can compute
|
||||
* job-completion + counter deltas in one pass.
|
||||
*/
|
||||
export async function listItemsForJob(userId: string, jobId: string): Promise<ImportItemRow[]> {
|
||||
const sql = getSyncConnection();
|
||||
const rows = await sql<ChangeRow[]>`
|
||||
SELECT user_id, record_id, op, data, field_meta, created_at
|
||||
FROM sync_changes
|
||||
WHERE app_id = 'articles'
|
||||
AND table_name = 'articleImportItems'
|
||||
AND user_id = ${userId}
|
||||
ORDER BY record_id, created_at ASC
|
||||
`;
|
||||
const out: ImportItemRow[] = [];
|
||||
for (const m of mergeByUserAndRecord(rows).values()) {
|
||||
const item = projectItem(m.userId, m.recordId, m.merged);
|
||||
if (!item || item.jobId !== jobId) continue;
|
||||
out.push(item);
|
||||
}
|
||||
out.sort((a, b) => a.idx - b.idx);
|
||||
return out;
|
||||
}
|
||||
|
||||
// ─── Internal: LWW merge per (userId, recordId) ──────────────
|
||||
|
||||
interface MergedEntry {
|
||||
userId: string;
|
||||
recordId: string;
|
||||
merged: Row | null;
|
||||
}
|
||||
|
||||
function mergeByUserAndRecord(rows: readonly ChangeRow[]): Map<string, MergedEntry> {
|
||||
const out = new Map<string, MergedEntry>();
|
||||
type Cur = {
|
||||
key: string;
|
||||
userId: string;
|
||||
recordId: string;
|
||||
record: Row | null;
|
||||
/** Per-field LWW timestamps (normalised to ISO strings — see
|
||||
* fieldMetaTime). Both wire shapes are folded down to plain
|
||||
* strings here so the projection comparison stays trivial. */
|
||||
fm: Record<string, string>;
|
||||
};
|
||||
let current: Cur | null = null;
|
||||
const flush = (c: Cur) => {
|
||||
out.set(c.key, { userId: c.userId, recordId: c.recordId, merged: c.record });
|
||||
};
|
||||
for (const r of rows) {
|
||||
const key = `${r.user_id}:${r.record_id}`;
|
||||
if (!current || current.key !== key) {
|
||||
if (current) flush(current);
|
||||
current = { key, userId: r.user_id, recordId: r.record_id, record: null, fm: {} };
|
||||
}
|
||||
if (r.op === 'delete') {
|
||||
current.record = null;
|
||||
continue;
|
||||
}
|
||||
if (!r.data) continue;
|
||||
const rowCreatedAt = r.created_at.toISOString();
|
||||
if (!current.record) {
|
||||
current.record = { id: r.record_id, ...r.data };
|
||||
const initFM = r.field_meta ?? {};
|
||||
current.fm = {};
|
||||
for (const k of Object.keys(initFM)) {
|
||||
current.fm[k] = fieldMetaTime(initFM[k]) || rowCreatedAt;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const rowFM = r.field_meta ?? {};
|
||||
for (const [k, v] of Object.entries(r.data)) {
|
||||
const serverTime = fieldMetaTime(rowFM[k]) || rowCreatedAt;
|
||||
const localTime = current.fm[k] ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
current.record[k] = v;
|
||||
current.fm[k] = serverTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (current) flush(current);
|
||||
return out;
|
||||
}
|
||||
|
||||
function projectJob(userId: string, recordId: string, merged: Row | null): ImportJobRow | null {
|
||||
if (!merged || merged.deletedAt) return null;
|
||||
const totalUrls = num(merged.totalUrls);
|
||||
const status = str(merged.status);
|
||||
if (totalUrls == null || !isJobStatus(status)) return null;
|
||||
return {
|
||||
id: recordId,
|
||||
userId,
|
||||
spaceId: optStr(merged.spaceId),
|
||||
totalUrls,
|
||||
status,
|
||||
startedAt: optStr(merged.startedAt),
|
||||
finishedAt: optStr(merged.finishedAt),
|
||||
savedCount: num(merged.savedCount) ?? 0,
|
||||
duplicateCount: num(merged.duplicateCount) ?? 0,
|
||||
errorCount: num(merged.errorCount) ?? 0,
|
||||
warningCount: num(merged.warningCount) ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
function projectItem(userId: string, recordId: string, merged: Row | null): ImportItemRow | null {
|
||||
if (!merged || merged.deletedAt) return null;
|
||||
const jobId = str(merged.jobId);
|
||||
const url = str(merged.url);
|
||||
const state = str(merged.state);
|
||||
const idx = num(merged.idx);
|
||||
if (!jobId || !url || !isItemState(state) || idx == null) return null;
|
||||
return {
|
||||
id: recordId,
|
||||
userId,
|
||||
spaceId: optStr(merged.spaceId),
|
||||
jobId,
|
||||
idx,
|
||||
url,
|
||||
state,
|
||||
articleId: optStr(merged.articleId),
|
||||
warning: merged.warning === 'probable_consent_wall' ? 'probable_consent_wall' : null,
|
||||
error: optStr(merged.error),
|
||||
attempts: num(merged.attempts) ?? 0,
|
||||
lastAttemptAt: optStr(merged.lastAttemptAt),
|
||||
};
|
||||
}
|
||||
|
||||
function isJobStatus(s: string): s is ImportJobRow['status'] {
|
||||
return s === 'queued' || s === 'running' || s === 'paused' || s === 'done' || s === 'cancelled';
|
||||
}
|
||||
|
||||
function isItemState(s: string): s is ImportItemState {
|
||||
return (
|
||||
s === 'pending' ||
|
||||
s === 'extracting' ||
|
||||
s === 'extracted' ||
|
||||
s === 'saved' ||
|
||||
s === 'duplicate' ||
|
||||
s === 'consent-wall' ||
|
||||
s === 'error' ||
|
||||
s === 'cancelled'
|
||||
);
|
||||
}
|
||||
|
||||
function num(v: unknown): number | null {
|
||||
return typeof v === 'number' && Number.isFinite(v) ? v : null;
|
||||
}
|
||||
function str(v: unknown): string {
|
||||
return typeof v === 'string' ? v : '';
|
||||
}
|
||||
function optStr(v: unknown): string | null {
|
||||
return typeof v === 'string' && v ? v : null;
|
||||
}
|
||||
|
|
@ -1,80 +0,0 @@
|
|||
import { describe, it, expect } from 'bun:test';
|
||||
import { countByState } from './import-worker';
|
||||
import type { ImportItemRow } from './import-projection';
|
||||
|
||||
function item(state: ImportItemRow['state'], idx = 0): ImportItemRow {
|
||||
return {
|
||||
id: `i-${idx}`,
|
||||
userId: 'u-1',
|
||||
spaceId: 'sp-1',
|
||||
jobId: 'j-1',
|
||||
idx,
|
||||
url: `https://example.com/${idx}`,
|
||||
state,
|
||||
articleId: null,
|
||||
warning: null,
|
||||
error: null,
|
||||
attempts: 0,
|
||||
lastAttemptAt: null,
|
||||
};
|
||||
}
|
||||
|
||||
describe('countByState — worker job-counter rollup', () => {
|
||||
it('returns zeros for empty input + allTerminal=false', () => {
|
||||
const c = countByState([]);
|
||||
expect(c).toEqual({
|
||||
saved: 0,
|
||||
duplicate: 0,
|
||||
error: 0,
|
||||
consentWall: 0,
|
||||
cancelled: 0,
|
||||
allTerminal: false,
|
||||
});
|
||||
});
|
||||
|
||||
it('counts each terminal state independently', () => {
|
||||
const c = countByState([
|
||||
item('saved', 0),
|
||||
item('saved', 1),
|
||||
item('duplicate', 2),
|
||||
item('error', 3),
|
||||
item('cancelled', 4),
|
||||
]);
|
||||
expect(c.saved).toBe(2);
|
||||
expect(c.duplicate).toBe(1);
|
||||
expect(c.error).toBe(1);
|
||||
expect(c.cancelled).toBe(1);
|
||||
expect(c.allTerminal).toBe(true);
|
||||
});
|
||||
|
||||
it('treats consent-wall as semantically saved (so progress UI advances)', () => {
|
||||
// One real-saved + two consent-wall = three "saved" from the
|
||||
// user's perspective, but the warning counter tracks the wall hits.
|
||||
const c = countByState([item('saved', 0), item('consent-wall', 1), item('consent-wall', 2)]);
|
||||
expect(c.saved).toBe(3);
|
||||
expect(c.consentWall).toBe(2);
|
||||
expect(c.allTerminal).toBe(true);
|
||||
});
|
||||
|
||||
it('does not flag allTerminal when any item is non-terminal', () => {
|
||||
const states: ImportItemRow['state'][] = ['pending', 'extracting', 'extracted'];
|
||||
for (const nonTerminal of states) {
|
||||
const c = countByState([item('saved', 0), item(nonTerminal, 1)]);
|
||||
expect(c.allTerminal).toBe(false);
|
||||
}
|
||||
});
|
||||
|
||||
it('preserves the saved + consent-wall sum when both are present', () => {
|
||||
// Regression check: saved must include consent-wall items so the
|
||||
// finished-counter UI doesn't off-by-one.
|
||||
const c = countByState([
|
||||
item('saved', 0),
|
||||
item('saved', 1),
|
||||
item('consent-wall', 2),
|
||||
item('error', 3),
|
||||
]);
|
||||
expect(c.saved).toBe(3); // 2 saved + 1 consent-wall
|
||||
expect(c.consentWall).toBe(1);
|
||||
expect(c.error).toBe(1);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,327 +0,0 @@
|
|||
/**
|
||||
* Articles Bulk-Import — background worker.
|
||||
*
|
||||
* Boots from `apps/api/src/index.ts`. On every tick:
|
||||
*
|
||||
* 1. Try `pg_try_advisory_xact_lock` on a fixed key. If another
|
||||
* apps/api instance already holds it, skip this tick. The lock
|
||||
* is per-transaction so we never need a heartbeat — a crashed
|
||||
* worker's tx auto-aborts and the next tick claims it cleanly.
|
||||
* 2. Project the live state of `articleImportJobs` and pick the
|
||||
* ones still 'queued' or 'running'.
|
||||
* 3. For each job: project items, take up to N pending items,
|
||||
* extract concurrently. Each extraction writes a Pickup row +
|
||||
* flips the item state via `import-extractor.ts`.
|
||||
* 4. Fold terminal item states into job counters
|
||||
* (savedCount / duplicateCount / errorCount / warningCount).
|
||||
* When every item is terminal, flip the job to 'done'.
|
||||
*
|
||||
* No own state — every meaningful transition is a `sync_changes` row.
|
||||
* The worker is therefore stateless across restarts.
|
||||
*
|
||||
* Plan: docs/plans/articles-bulk-import.md.
|
||||
*/
|
||||
|
||||
// Operational logs (boot, tick errors, GC summary, stale-recovery
|
||||
// sweep) go to console intentionally — same pattern as
|
||||
// services/mana-ai/src/cron/tick.ts. Captured by the apps/api stdout
|
||||
// aggregator; structured signal lives in Prometheus counters.
|
||||
/* eslint-disable no-console */
|
||||
|
||||
import { getSyncConnection } from '../../lib/sync-db';
|
||||
import {
|
||||
articlesImportJobsCompletedTotal,
|
||||
articlesImportPickupGcRows,
|
||||
articlesImportTicksTotal,
|
||||
} from '../../lib/metrics';
|
||||
import {
|
||||
listClaimableJobs,
|
||||
listItemsForJob,
|
||||
type ImportItemRow,
|
||||
type ImportJobRow,
|
||||
} from './import-projection';
|
||||
import { extractOneItem, writeItemUpdate, writeJobUpdate } from './import-extractor';
|
||||
|
||||
/** Counts ticks so the pickup-GC sweep can run every Nth one rather
|
||||
* than on every 2-second cycle (the DELETE is cheap but not free). */
|
||||
let tickCount = 0;
|
||||
/** Run pickup-GC every 30 ticks ≈ once per minute. */
|
||||
const PICKUP_GC_EVERY_N_TICKS = 30;
|
||||
|
||||
const TICK_INTERVAL_MS = 2_000;
|
||||
const PER_JOB_CONCURRENCY = 3;
|
||||
/**
|
||||
* If an item has been in `state='extracting'` longer than this without
|
||||
* a follow-up state-write, it's orphaned (worker crashed mid-fetch,
|
||||
* pod restart, OOM, …) and gets bounced back to `pending` so the next
|
||||
* tick can re-claim it.
|
||||
*
|
||||
* Tuned so a slow but live extraction (15 s shared-rss fetch timeout +
|
||||
* a few seconds of JSDOM parse on a 2 MB page) doesn't reset
|
||||
* prematurely — 5 minutes is comfortable headroom.
|
||||
*/
|
||||
const STALE_EXTRACTING_MS = 5 * 60 * 1000;
|
||||
/** TTL for `articleExtractPickup` rows. The pickup-consumer normally
|
||||
* deletes them seconds after the worker writes them; anything older
|
||||
* than this is garbage from a stuck consumer (all tabs closed,
|
||||
* Web-Lock mismatch, …) and would otherwise accumulate without bound. */
|
||||
const PICKUP_TTL_MS = 24 * 60 * 60 * 1000;
|
||||
/** Fixed int8 lock key — derived from the ASCII bytes of 'ARTI'. */
|
||||
const ADVISORY_LOCK_KEY = 0x4152_5449;
|
||||
|
||||
let timer: ReturnType<typeof setInterval> | null = null;
|
||||
let running = false;
|
||||
|
||||
/**
|
||||
* Start the recurring tick. Idempotent — safe to call multiple times.
|
||||
* Intended to be called once from `apps/api/src/index.ts` at boot.
|
||||
*
|
||||
* Disable via `ARTICLES_IMPORT_WORKER_DISABLED=true` (for tests, or
|
||||
* when running multiple apps/api instances and you want to designate
|
||||
* a different one as the worker).
|
||||
*/
|
||||
export function startArticleImportWorker(): void {
|
||||
if (timer) return;
|
||||
if (process.env.ARTICLES_IMPORT_WORKER_DISABLED === 'true') {
|
||||
console.log('[articles-import] worker disabled via env');
|
||||
return;
|
||||
}
|
||||
console.log(
|
||||
`[articles-import] worker starting — tick=${TICK_INTERVAL_MS}ms, concurrency=${PER_JOB_CONCURRENCY}`
|
||||
);
|
||||
timer = setInterval(() => {
|
||||
void runTickGuarded();
|
||||
}, TICK_INTERVAL_MS);
|
||||
}
|
||||
|
||||
export function stopArticleImportWorker(): void {
|
||||
if (timer) {
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
async function runTickGuarded(): Promise<void> {
|
||||
if (running) return;
|
||||
running = true;
|
||||
try {
|
||||
const result = await runTickOnce();
|
||||
articlesImportTicksTotal.inc({ result: result.skipped ? 'skipped' : 'processed' });
|
||||
if (typeof result.pickupGcRows === 'number' && result.pickupGcRows > 0) {
|
||||
articlesImportPickupGcRows.inc(result.pickupGcRows);
|
||||
}
|
||||
} catch (err) {
|
||||
articlesImportTicksTotal.inc({ result: 'error' });
|
||||
console.error('[articles-import] tick error:', err);
|
||||
} finally {
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* One tick body. Exported for tests + a potential
|
||||
* `/internal/articles-import/tick`-style admin route.
|
||||
*/
|
||||
export async function runTickOnce(): Promise<{
|
||||
skipped: boolean;
|
||||
jobsConsidered: number;
|
||||
itemsProcessed: number;
|
||||
pickupGcRows?: number;
|
||||
}> {
|
||||
if (!(await tryAcquireLock())) {
|
||||
return { skipped: true, jobsConsidered: 0, itemsProcessed: 0 };
|
||||
}
|
||||
tickCount++;
|
||||
let pickupGcRows: number | undefined;
|
||||
if (tickCount % PICKUP_GC_EVERY_N_TICKS === 0) {
|
||||
pickupGcRows = await runPickupGc();
|
||||
}
|
||||
const jobs = await listClaimableJobs();
|
||||
let itemsProcessed = 0;
|
||||
for (const job of jobs) {
|
||||
itemsProcessed += await processOneJob(job);
|
||||
}
|
||||
return { skipped: false, jobsConsidered: jobs.length, itemsProcessed, pickupGcRows };
|
||||
}
|
||||
|
||||
/**
|
||||
* Hard-delete pickup rows older than `PICKUP_TTL_MS`. The
|
||||
* pickup-consumer on a healthy client removes each row seconds after
|
||||
* the worker writes it; anything older is residue from a stuck
|
||||
* consumer (all tabs closed, Web-Lock mismatch). Without this sweep
|
||||
* the rows would accumulate without bound in sync_changes.
|
||||
*
|
||||
* Runs against `sync_changes` directly, not via a soft-delete on the
|
||||
* row data — pickup rows are server-write inbox only, never editable
|
||||
* by users; a hard DELETE keeps the table tight.
|
||||
*/
|
||||
async function runPickupGc(): Promise<number> {
|
||||
const sql = getSyncConnection();
|
||||
const cutoff = new Date(Date.now() - PICKUP_TTL_MS).toISOString();
|
||||
const rows = await sql<{ count: string }[]>`
|
||||
WITH deleted AS (
|
||||
DELETE FROM sync_changes
|
||||
WHERE app_id = 'articles'
|
||||
AND table_name = 'articleExtractPickup'
|
||||
AND created_at < ${cutoff}
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT count(*)::text AS count FROM deleted
|
||||
`;
|
||||
const n = parseInt(rows[0]?.count ?? '0', 10);
|
||||
if (n > 0) console.log(`[articles-import] pickup-gc: removed ${n} rows older than 24h`);
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Brief advisory-lock probe via a single short transaction. Returns
|
||||
* true if we won the probe — that's a soft signal for "you're the
|
||||
* worker for this tick"; the lock releases as the probe tx commits.
|
||||
* For multi-instance deploys this is a soft-only coordination — if
|
||||
* two probes happen to interleave their work, the field-LWW merge on
|
||||
* the client still produces a coherent state.
|
||||
*/
|
||||
async function tryAcquireLock(): Promise<boolean> {
|
||||
const sql = getSyncConnection();
|
||||
let acquired = false;
|
||||
await sql.begin(async (tx) => {
|
||||
const rows = await tx<{ acquired: boolean }[]>`
|
||||
SELECT pg_try_advisory_xact_lock(${ADVISORY_LOCK_KEY}) AS acquired
|
||||
`;
|
||||
acquired = rows[0]?.acquired === true;
|
||||
});
|
||||
return acquired;
|
||||
}
|
||||
|
||||
async function processOneJob(job: ImportJobRow): Promise<number> {
|
||||
const items = await listItemsForJob(job.userId, job.id);
|
||||
|
||||
// Crash-recovery sweep — bounce items that have been 'extracting'
|
||||
// for too long back to 'pending'. Without this, a worker that
|
||||
// crashed (or got OOM'd, restarted mid-extract) leaves orphaned
|
||||
// items in 'extracting' forever; the job never completes. Worker
|
||||
// re-attribution happens via the next tick's claim path.
|
||||
const now = Date.now();
|
||||
for (const it of items) {
|
||||
if (it.state !== 'extracting') continue;
|
||||
const since = it.lastAttemptAt ? Date.parse(it.lastAttemptAt) : 0;
|
||||
if (!Number.isFinite(since)) continue;
|
||||
if (now - since < STALE_EXTRACTING_MS) continue;
|
||||
console.warn(
|
||||
`[articles-import] resetting stale extracting item ${it.id} (job=${job.id}) — ${Math.round((now - since) / 1000)}s old`
|
||||
);
|
||||
await writeItemUpdate(it.userId, it.id, { state: 'pending' });
|
||||
}
|
||||
|
||||
// Flip 'queued' → 'running' so the UI shows progress.
|
||||
if (job.status === 'queued') {
|
||||
await writeJobUpdate(job.userId, job.id, {
|
||||
status: 'running',
|
||||
startedAt: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
// Counter-derivation from current item states.
|
||||
const counts = countByState(items);
|
||||
const counterPatch: Record<string, unknown> = {};
|
||||
let dirty = false;
|
||||
if (counts.saved !== job.savedCount) {
|
||||
counterPatch.savedCount = counts.saved;
|
||||
dirty = true;
|
||||
}
|
||||
if (counts.duplicate !== job.duplicateCount) {
|
||||
counterPatch.duplicateCount = counts.duplicate;
|
||||
dirty = true;
|
||||
}
|
||||
if (counts.error !== job.errorCount) {
|
||||
counterPatch.errorCount = counts.error;
|
||||
dirty = true;
|
||||
}
|
||||
if (counts.consentWall !== job.warningCount) {
|
||||
counterPatch.warningCount = counts.consentWall;
|
||||
dirty = true;
|
||||
}
|
||||
if (counts.allTerminal && job.status !== 'done') {
|
||||
counterPatch.status = 'done';
|
||||
counterPatch.finishedAt = new Date().toISOString();
|
||||
dirty = true;
|
||||
articlesImportJobsCompletedTotal.inc({ result: 'done' });
|
||||
}
|
||||
if (dirty) {
|
||||
await writeJobUpdate(job.userId, job.id, counterPatch);
|
||||
}
|
||||
|
||||
if (counts.allTerminal) return 0;
|
||||
|
||||
// Cancelled → flip every still-pending item to 'cancelled'.
|
||||
if (job.status === 'cancelled') {
|
||||
const pending = items.filter((i) => i.state === 'pending');
|
||||
for (const it of pending) {
|
||||
await writeItemUpdate(it.userId, it.id, { state: 'cancelled' });
|
||||
}
|
||||
return pending.length;
|
||||
}
|
||||
|
||||
// Paused → already-extracting items finish their roundtrip; nothing
|
||||
// new gets claimed.
|
||||
if (job.status === 'paused') return 0;
|
||||
|
||||
// Running → claim up to PER_JOB_CONCURRENCY pending items in
|
||||
// parallel. We deliberately don't try to rescue 'extracting' items:
|
||||
// if a worker died mid-fetch they stay 'extracting' forever for
|
||||
// now. Future polish: time-out 'extracting' rows older than ~5min
|
||||
// and bounce them back to 'pending'.
|
||||
const claimable = items.filter((i) => i.state === 'pending').slice(0, PER_JOB_CONCURRENCY);
|
||||
if (claimable.length === 0) return 0;
|
||||
|
||||
await Promise.allSettled(claimable.map((it) => extractOneItem(it)));
|
||||
return claimable.length;
|
||||
}
|
||||
|
||||
export interface StateCounts {
|
||||
saved: number;
|
||||
duplicate: number;
|
||||
error: number;
|
||||
consentWall: number;
|
||||
cancelled: number;
|
||||
allTerminal: boolean;
|
||||
}
|
||||
|
||||
export function countByState(items: readonly ImportItemRow[]): StateCounts {
|
||||
let saved = 0;
|
||||
let duplicate = 0;
|
||||
let error = 0;
|
||||
let consentWall = 0;
|
||||
let cancelled = 0;
|
||||
let nonTerminal = 0;
|
||||
for (const it of items) {
|
||||
switch (it.state) {
|
||||
case 'saved':
|
||||
saved++;
|
||||
break;
|
||||
case 'duplicate':
|
||||
duplicate++;
|
||||
break;
|
||||
case 'error':
|
||||
error++;
|
||||
break;
|
||||
case 'consent-wall':
|
||||
saved++; // consent-wall is "saved with warning" semantically
|
||||
consentWall++;
|
||||
break;
|
||||
case 'cancelled':
|
||||
cancelled++;
|
||||
break;
|
||||
default:
|
||||
nonTerminal++;
|
||||
}
|
||||
}
|
||||
return {
|
||||
saved,
|
||||
duplicate,
|
||||
error,
|
||||
consentWall,
|
||||
cancelled,
|
||||
allTerminal: items.length > 0 && nonTerminal === 0,
|
||||
};
|
||||
}
|
||||
|
|
@ -1,128 +0,0 @@
|
|||
/**
|
||||
* Articles module — server-side URL extraction.
|
||||
*
|
||||
* Two endpoints, both thin wrappers around `@mana/shared-rss`:
|
||||
*
|
||||
* POST /extract ← server fetches the URL itself, then runs
|
||||
* Readability on the HTML it got back. Works
|
||||
* for simple sites but fails on anything behind
|
||||
* a cookie-consent wall or a paywall — the
|
||||
* server has no user session.
|
||||
* POST /extract/html ← client already has the rendered HTML (from a
|
||||
* browser bookmarklet running in the user's
|
||||
* own tab with all their cookies applied).
|
||||
* Server just runs Readability on that. This
|
||||
* is how we bypass Golem / Spiegel / Zeit /
|
||||
* Heise-style consent dialogs: use the user's
|
||||
* already-consented session, not the server's
|
||||
* anonymous fetch.
|
||||
*
|
||||
* Consent-wall heuristic: when /extract returns a suspiciously short
|
||||
* payload that contains consent-dialog vocabulary we still hand the
|
||||
* extracted text back but flag it with `warning: 'probable_consent_wall'`
|
||||
* so the client can offer the bookmarklet-v2 path instead of pretending
|
||||
* a 4-line "Cookies zustimmen" blob is the article.
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
import { extractFromUrl, extractFromHtml } from '@mana/shared-rss';
|
||||
import { looksLikeConsentWall } from './consent-wall';
|
||||
|
||||
const routes = new Hono();
|
||||
|
||||
function isValidHttpUrl(url: string): boolean {
|
||||
try {
|
||||
const u = new URL(url);
|
||||
return u.protocol === 'http:' || u.protocol === 'https:';
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// POST /extract — server fetches the URL + extracts. Legacy path.
|
||||
routes.post('/extract', async (c) => {
|
||||
const body = await c.req.json<{ url?: string }>().catch(() => ({}) as { url?: string });
|
||||
const url = body.url;
|
||||
if (!url || typeof url !== 'string') {
|
||||
return c.json({ error: 'URL is required' }, 400);
|
||||
}
|
||||
if (!isValidHttpUrl(url)) {
|
||||
return c.json({ error: 'Invalid URL' }, 400);
|
||||
}
|
||||
|
||||
const extracted = await extractFromUrl(url);
|
||||
if (!extracted) {
|
||||
return c.json({ error: 'Extraction failed' }, 502);
|
||||
}
|
||||
|
||||
const warning = looksLikeConsentWall(extracted.content, extracted.wordCount)
|
||||
? 'probable_consent_wall'
|
||||
: undefined;
|
||||
|
||||
return c.json({
|
||||
originalUrl: url,
|
||||
title: extracted.title,
|
||||
excerpt: extracted.excerpt,
|
||||
content: extracted.content,
|
||||
htmlContent: extracted.htmlContent,
|
||||
author: extracted.byline,
|
||||
siteName: extracted.siteName,
|
||||
wordCount: extracted.wordCount,
|
||||
readingTimeMinutes: extracted.readingTimeMinutes,
|
||||
...(warning && { warning }),
|
||||
});
|
||||
});
|
||||
|
||||
// POST /extract/html — client supplies HTML (from the user's browser
|
||||
// tab, where cookies + JS rendering already happened). We only run
|
||||
// Readability on it. Cap payload to 10 MiB so a pathological site
|
||||
// can't exhaust server memory via the bookmarklet — typical rendered
|
||||
// article HTML is 200-800 KB.
|
||||
const MAX_HTML_BYTES = 10 * 1024 * 1024;
|
||||
|
||||
routes.post('/extract/html', async (c) => {
|
||||
const body = await c.req
|
||||
.json<{ url?: string; html?: string }>()
|
||||
.catch(() => ({}) as { url?: string; html?: string });
|
||||
const url = body.url;
|
||||
const html = body.html;
|
||||
if (!url || typeof url !== 'string') {
|
||||
return c.json({ error: 'URL is required' }, 400);
|
||||
}
|
||||
if (!html || typeof html !== 'string') {
|
||||
return c.json({ error: 'HTML is required' }, 400);
|
||||
}
|
||||
if (!isValidHttpUrl(url)) {
|
||||
return c.json({ error: 'Invalid URL' }, 400);
|
||||
}
|
||||
if (html.length > MAX_HTML_BYTES) {
|
||||
return c.json({ error: 'HTML payload too large' }, 413);
|
||||
}
|
||||
|
||||
const extracted = await extractFromHtml(html, url);
|
||||
if (!extracted) {
|
||||
return c.json({ error: 'Extraction failed' }, 502);
|
||||
}
|
||||
|
||||
// The consent-wall heuristic still applies here — a rare case is
|
||||
// that the user bookmarklet-fires BEFORE the consent dialog is
|
||||
// dismissed. Flag it so the client doesn't silently persist garbage.
|
||||
const warning = looksLikeConsentWall(extracted.content, extracted.wordCount)
|
||||
? 'probable_consent_wall'
|
||||
: undefined;
|
||||
|
||||
return c.json({
|
||||
originalUrl: url,
|
||||
title: extracted.title,
|
||||
excerpt: extracted.excerpt,
|
||||
content: extracted.content,
|
||||
htmlContent: extracted.htmlContent,
|
||||
author: extracted.byline,
|
||||
siteName: extracted.siteName,
|
||||
wordCount: extracted.wordCount,
|
||||
readingTimeMinutes: extracted.readingTimeMinutes,
|
||||
...(warning && { warning }),
|
||||
});
|
||||
});
|
||||
|
||||
export { routes as articlesRoutes };
|
||||
Loading…
Add table
Add a link
Reference in a new issue