feat(articles): client-side pickup consumer (Phase 3)

Watches `articleExtractPickup` via liveQuery. For each row the server-
worker drops:

  1. Look up the matching `articleImportItems` row. Stale → just clean
     the inbox.
  2. Dedupe race: if the URL has been single-saved meanwhile, point
     the import item at the existing article (state='duplicate'),
     don't create a second row.
  3. Happy path: call existing articlesStore.saveFromExtracted (which
     runs encryptRecord + articleTable.add and emits ArticleSaved)
     → flip item to 'saved' (or 'consent-wall' on warning).
  4. Delete the pickup row so the inbox stays empty in steady state.

Multi-tab coordination via `navigator.locks.request('mana:articles:pickup')`
with `ifAvailable: true` — only the lock-holder consumes; other tabs
just observe the liveQuery and exit. Falls back to per-row in-memory
dedupe when the Locks API isn't available; the field-LWW server merge
forgives the rare double-process.

Wired from data-layer-listeners.ts so it boots once with the rest of
the data layer and disposes on layout unmount.

End-to-end pipeline now live:
  Client write items(state='pending')
    → sync_changes
    → server-worker tick (Phase 2)
    → Pickup row + state='extracted'
    → sync pull → liveQuery
    → saveFromExtracted (encrypt) → flip 'saved' / 'duplicate' / 'consent-wall'
    → delete pickup row

What's still needed for first user-visible test: Phase 4 (store
methods to create a job) + Phase 5 (UI). Without those there's no
way yet to inject items.

Plan: docs/plans/articles-bulk-import.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-28 22:16:10 +02:00
parent 2bbcf14aba
commit a9bcd4183a
2 changed files with 166 additions and 0 deletions

View file

@ -23,6 +23,7 @@ import { cleanupTombstones } from './quota';
import { pruneActivityLog } from './activity'; import { pruneActivityLog } from './activity';
import { SYNC_TELEMETRY_EVENT, type SyncTelemetryDetail } from './sync-telemetry'; import { SYNC_TELEMETRY_EVENT, type SyncTelemetryDetail } from './sync-telemetry';
import { installConflictListener } from './conflict-store.svelte'; import { installConflictListener } from './conflict-store.svelte';
import { startArticlePickupConsumer } from '$lib/modules/articles/consume-pickup';
/** How often to run the tombstone cleanup. 24h is a comfortable cadence /** How often to run the tombstone cleanup. 24h is a comfortable cadence
* given that the cutoff is 30 days runs roughly once per app session. */ * given that the cutoff is 30 days runs roughly once per app session. */
@ -106,6 +107,12 @@ export function installDataLayerListeners(): () => void {
// coalescing, auto-dismiss, and the restore-write path. // coalescing, auto-dismiss, and the restore-write path.
const disposeConflict = installConflictListener(); const disposeConflict = installConflictListener();
// ─── Articles bulk-import: pickup consumer ─────────────────
// Drains `articleExtractPickup` rows the server-worker drops for
// successful URL extractions. Web-Lock-coordinated for multi-tab
// safety. See docs/plans/articles-bulk-import.md.
const disposeArticlePickup = startArticlePickupConsumer();
// ─── Periodic cleanup loop ───────────────────────────────── // ─── Periodic cleanup loop ─────────────────────────────────
// Runs once on boot, then daily. Two independent jobs share the // Runs once on boot, then daily. Two independent jobs share the
// schedule so we never have a third interval competing for the same // schedule so we never have a third interval competing for the same
@ -151,5 +158,6 @@ export function installDataLayerListeners(): () => void {
window.removeEventListener(SYNC_TELEMETRY_EVENT, handleTelemetry); window.removeEventListener(SYNC_TELEMETRY_EVENT, handleTelemetry);
window.clearInterval(cleanupTimer); window.clearInterval(cleanupTimer);
disposeConflict(); disposeConflict();
disposeArticlePickup();
}; };
} }

View file

@ -0,0 +1,158 @@
/**
* Articles Bulk-Import client-side Pickup Consumer.
*
* The server-side import-worker drops `articleExtractPickup` rows for
* each successful URL extraction. This consumer:
*
* 1. Watches the pickup table via `liveQuery`.
* 2. For each new row: calls `articlesStore.saveFromExtracted()` so
* the existing Single-URL save-path runs unchanged (encrypt
* `articleTable.add()` emit ArticleSaved domain event).
* 3. Updates the matching `articleImportItems` row to state='saved'
* (or 'duplicate' / 'consent-wall') with the resulting articleId.
* 4. Deletes the pickup row so the inbox stays empty in steady state.
*
* Multi-tab coordination via `navigator.locks.request('mana:articles:pickup')`:
* any number of tabs can subscribe, but only the lock-holder consumes.
* Falls back to per-row in-memory dedupe when locks aren't available
* (older Safari) the field-LWW merge on the server forgives the rare
* double-process.
*
* Plan: docs/plans/articles-bulk-import.md.
*/
import { liveQuery, type Subscription } from 'dexie';
import { articleExtractPickupTable, articleImportItemTable } from './collections';
import { articlesStore } from './stores/articles.svelte';
import type { ArticleImportItemState, LocalArticleExtractPickup } from './types';
const LOCK_NAME = 'mana:articles:pickup';
/** In-memory guard so a quick liveQuery double-tick doesn't race the
* same pickup row through `consumeOne` twice. Reset on tab close. */
const inFlight = new Set<string>();
let subscription: Subscription | null = null;
/**
* Start watching the pickup inbox. Idempotent second call returns
* the existing dispose function.
*
* Disable via `localStorage('mana:articles:pickup:disabled')` (string
* 'true') escape hatch for users who want to debug without the
* consumer running.
*/
export function startArticlePickupConsumer(): () => void {
if (typeof window === 'undefined') return () => {};
if (subscription) return stopArticlePickupConsumer;
if (window.localStorage?.getItem('mana:articles:pickup:disabled') === 'true') {
console.log('[articles-import] pickup consumer disabled via localStorage');
return () => {};
}
const query = liveQuery(async () =>
articleExtractPickupTable.filter((r) => !r.deletedAt).toArray()
);
subscription = query.subscribe({
next: (rows: LocalArticleExtractPickup[]) => {
void runConsume(rows);
},
error: (err) => {
console.error('[articles-import] pickup liveQuery error:', err);
},
});
return stopArticlePickupConsumer;
}
export function stopArticlePickupConsumer(): void {
subscription?.unsubscribe();
subscription = null;
inFlight.clear();
}
/**
* Drain the current set of pickup rows under the multi-tab Web-Lock.
* If the lock is held by another tab, this returns immediately and the
* other tab's run handles the rows.
*/
async function runConsume(rows: readonly LocalArticleExtractPickup[]): Promise<void> {
if (rows.length === 0) return;
const locks = (navigator as Navigator & { locks?: LockManager }).locks;
if (!locks) {
await drain(rows);
return;
}
await locks.request(LOCK_NAME, { ifAvailable: true }, async (lock) => {
if (!lock) {
// Another tab is the consumer — leave the rows alone.
return;
}
await drain(rows);
});
}
async function drain(rows: readonly LocalArticleExtractPickup[]): Promise<void> {
for (const row of rows) {
if (inFlight.has(row.id)) continue;
inFlight.add(row.id);
try {
await consumeOne(row);
} catch (err) {
console.error('[articles-import] consumeOne failed:', row.id, err);
} finally {
inFlight.delete(row.id);
}
}
}
async function consumeOne(row: LocalArticleExtractPickup): Promise<void> {
const item = await articleImportItemTable.get(row.itemId);
// Stale pickup row — item was deleted, cancelled, or already
// consumed by a previous tab. Just clean up the inbox.
if (!item || item.state !== 'extracted' || item.deletedAt) {
await articleExtractPickupTable.delete(row.id);
return;
}
// Dedupe race: user may have single-saved this URL via QuickAddInput
// while the bulk job was running. Don't write a duplicate Article
// row; just point the import item at the existing one.
const existing = await articlesStore.findByUrl(row.payload.originalUrl);
if (existing) {
await articleImportItemTable.update(item.id, {
state: 'duplicate',
articleId: existing.id,
});
await articleExtractPickupTable.delete(row.id);
return;
}
// Happy path: persist via the existing single-URL pipeline. This
// runs encryptRecord + articleTable.add and emits the ArticleSaved
// domain event, exactly like a manual `Save URL` would.
const article = await articlesStore.saveFromExtracted({
originalUrl: row.payload.originalUrl,
title: row.payload.title,
excerpt: row.payload.excerpt,
content: row.payload.content,
htmlContent: row.payload.htmlContent,
author: row.payload.author,
siteName: row.payload.siteName,
wordCount: row.payload.wordCount,
readingTimeMinutes: row.payload.readingTimeMinutes,
warning: row.payload.warning,
});
const nextState: ArticleImportItemState =
row.payload.warning === 'probable_consent_wall' ? 'consent-wall' : 'saved';
await articleImportItemTable.update(item.id, {
state: nextState,
articleId: article.id,
warning: row.payload.warning ?? null,
});
await articleExtractPickupTable.delete(row.id);
}