mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 20:41:09 +02:00
feat(articles): client-side pickup consumer (Phase 3)
Watches `articleExtractPickup` via liveQuery. For each row the server-
worker drops:
1. Look up the matching `articleImportItems` row. Stale → just clean
the inbox.
2. Dedupe race: if the URL has been single-saved meanwhile, point
the import item at the existing article (state='duplicate'),
don't create a second row.
3. Happy path: call existing articlesStore.saveFromExtracted (which
runs encryptRecord + articleTable.add and emits ArticleSaved)
→ flip item to 'saved' (or 'consent-wall' on warning).
4. Delete the pickup row so the inbox stays empty in steady state.
Multi-tab coordination via `navigator.locks.request('mana:articles:pickup')`
with `ifAvailable: true` — only the lock-holder consumes; other tabs
just observe the liveQuery and exit. Falls back to per-row in-memory
dedupe when the Locks API isn't available; the field-LWW server merge
forgives the rare double-process.
Wired from data-layer-listeners.ts so it boots once with the rest of
the data layer and disposes on layout unmount.
End-to-end pipeline now live:
Client write items(state='pending')
→ sync_changes
→ server-worker tick (Phase 2)
→ Pickup row + state='extracted'
→ sync pull → liveQuery
→ saveFromExtracted (encrypt) → flip 'saved' / 'duplicate' / 'consent-wall'
→ delete pickup row
What's still needed for first user-visible test: Phase 4 (store
methods to create a job) + Phase 5 (UI). Without those there's no
way yet to inject items.
Plan: docs/plans/articles-bulk-import.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
2bbcf14aba
commit
a9bcd4183a
2 changed files with 166 additions and 0 deletions
|
|
@ -23,6 +23,7 @@ import { cleanupTombstones } from './quota';
|
|||
import { pruneActivityLog } from './activity';
|
||||
import { SYNC_TELEMETRY_EVENT, type SyncTelemetryDetail } from './sync-telemetry';
|
||||
import { installConflictListener } from './conflict-store.svelte';
|
||||
import { startArticlePickupConsumer } from '$lib/modules/articles/consume-pickup';
|
||||
|
||||
/** How often to run the tombstone cleanup. 24h is a comfortable cadence
|
||||
* given that the cutoff is 30 days — runs roughly once per app session. */
|
||||
|
|
@ -106,6 +107,12 @@ export function installDataLayerListeners(): () => void {
|
|||
// coalescing, auto-dismiss, and the restore-write path.
|
||||
const disposeConflict = installConflictListener();
|
||||
|
||||
// ─── Articles bulk-import: pickup consumer ─────────────────
|
||||
// Drains `articleExtractPickup` rows the server-worker drops for
|
||||
// successful URL extractions. Web-Lock-coordinated for multi-tab
|
||||
// safety. See docs/plans/articles-bulk-import.md.
|
||||
const disposeArticlePickup = startArticlePickupConsumer();
|
||||
|
||||
// ─── Periodic cleanup loop ─────────────────────────────────
|
||||
// Runs once on boot, then daily. Two independent jobs share the
|
||||
// schedule so we never have a third interval competing for the same
|
||||
|
|
@ -151,5 +158,6 @@ export function installDataLayerListeners(): () => void {
|
|||
window.removeEventListener(SYNC_TELEMETRY_EVENT, handleTelemetry);
|
||||
window.clearInterval(cleanupTimer);
|
||||
disposeConflict();
|
||||
disposeArticlePickup();
|
||||
};
|
||||
}
|
||||
|
|
|
|||
158
apps/mana/apps/web/src/lib/modules/articles/consume-pickup.ts
Normal file
158
apps/mana/apps/web/src/lib/modules/articles/consume-pickup.ts
Normal file
|
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* Articles Bulk-Import — client-side Pickup Consumer.
|
||||
*
|
||||
* The server-side import-worker drops `articleExtractPickup` rows for
|
||||
* each successful URL extraction. This consumer:
|
||||
*
|
||||
* 1. Watches the pickup table via `liveQuery`.
|
||||
* 2. For each new row: calls `articlesStore.saveFromExtracted()` so
|
||||
* the existing Single-URL save-path runs unchanged (encrypt →
|
||||
* `articleTable.add()` → emit ArticleSaved domain event).
|
||||
* 3. Updates the matching `articleImportItems` row to state='saved'
|
||||
* (or 'duplicate' / 'consent-wall') with the resulting articleId.
|
||||
* 4. Deletes the pickup row so the inbox stays empty in steady state.
|
||||
*
|
||||
* Multi-tab coordination via `navigator.locks.request('mana:articles:pickup')`:
|
||||
* any number of tabs can subscribe, but only the lock-holder consumes.
|
||||
* Falls back to per-row in-memory dedupe when locks aren't available
|
||||
* (older Safari) — the field-LWW merge on the server forgives the rare
|
||||
* double-process.
|
||||
*
|
||||
* Plan: docs/plans/articles-bulk-import.md.
|
||||
*/
|
||||
|
||||
import { liveQuery, type Subscription } from 'dexie';
|
||||
import { articleExtractPickupTable, articleImportItemTable } from './collections';
|
||||
import { articlesStore } from './stores/articles.svelte';
|
||||
import type { ArticleImportItemState, LocalArticleExtractPickup } from './types';
|
||||
|
||||
const LOCK_NAME = 'mana:articles:pickup';
|
||||
|
||||
/** In-memory guard so a quick liveQuery double-tick doesn't race the
|
||||
* same pickup row through `consumeOne` twice. Reset on tab close. */
|
||||
const inFlight = new Set<string>();
|
||||
|
||||
let subscription: Subscription | null = null;
|
||||
|
||||
/**
|
||||
* Start watching the pickup inbox. Idempotent — second call returns
|
||||
* the existing dispose function.
|
||||
*
|
||||
* Disable via `localStorage('mana:articles:pickup:disabled')` (string
|
||||
* 'true') — escape hatch for users who want to debug without the
|
||||
* consumer running.
|
||||
*/
|
||||
export function startArticlePickupConsumer(): () => void {
|
||||
if (typeof window === 'undefined') return () => {};
|
||||
if (subscription) return stopArticlePickupConsumer;
|
||||
if (window.localStorage?.getItem('mana:articles:pickup:disabled') === 'true') {
|
||||
console.log('[articles-import] pickup consumer disabled via localStorage');
|
||||
return () => {};
|
||||
}
|
||||
|
||||
const query = liveQuery(async () =>
|
||||
articleExtractPickupTable.filter((r) => !r.deletedAt).toArray()
|
||||
);
|
||||
subscription = query.subscribe({
|
||||
next: (rows: LocalArticleExtractPickup[]) => {
|
||||
void runConsume(rows);
|
||||
},
|
||||
error: (err) => {
|
||||
console.error('[articles-import] pickup liveQuery error:', err);
|
||||
},
|
||||
});
|
||||
return stopArticlePickupConsumer;
|
||||
}
|
||||
|
||||
export function stopArticlePickupConsumer(): void {
|
||||
subscription?.unsubscribe();
|
||||
subscription = null;
|
||||
inFlight.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain the current set of pickup rows under the multi-tab Web-Lock.
|
||||
* If the lock is held by another tab, this returns immediately and the
|
||||
* other tab's run handles the rows.
|
||||
*/
|
||||
async function runConsume(rows: readonly LocalArticleExtractPickup[]): Promise<void> {
|
||||
if (rows.length === 0) return;
|
||||
|
||||
const locks = (navigator as Navigator & { locks?: LockManager }).locks;
|
||||
if (!locks) {
|
||||
await drain(rows);
|
||||
return;
|
||||
}
|
||||
|
||||
await locks.request(LOCK_NAME, { ifAvailable: true }, async (lock) => {
|
||||
if (!lock) {
|
||||
// Another tab is the consumer — leave the rows alone.
|
||||
return;
|
||||
}
|
||||
await drain(rows);
|
||||
});
|
||||
}
|
||||
|
||||
async function drain(rows: readonly LocalArticleExtractPickup[]): Promise<void> {
|
||||
for (const row of rows) {
|
||||
if (inFlight.has(row.id)) continue;
|
||||
inFlight.add(row.id);
|
||||
try {
|
||||
await consumeOne(row);
|
||||
} catch (err) {
|
||||
console.error('[articles-import] consumeOne failed:', row.id, err);
|
||||
} finally {
|
||||
inFlight.delete(row.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function consumeOne(row: LocalArticleExtractPickup): Promise<void> {
|
||||
const item = await articleImportItemTable.get(row.itemId);
|
||||
|
||||
// Stale pickup row — item was deleted, cancelled, or already
|
||||
// consumed by a previous tab. Just clean up the inbox.
|
||||
if (!item || item.state !== 'extracted' || item.deletedAt) {
|
||||
await articleExtractPickupTable.delete(row.id);
|
||||
return;
|
||||
}
|
||||
|
||||
// Dedupe race: user may have single-saved this URL via QuickAddInput
|
||||
// while the bulk job was running. Don't write a duplicate Article
|
||||
// row; just point the import item at the existing one.
|
||||
const existing = await articlesStore.findByUrl(row.payload.originalUrl);
|
||||
if (existing) {
|
||||
await articleImportItemTable.update(item.id, {
|
||||
state: 'duplicate',
|
||||
articleId: existing.id,
|
||||
});
|
||||
await articleExtractPickupTable.delete(row.id);
|
||||
return;
|
||||
}
|
||||
|
||||
// Happy path: persist via the existing single-URL pipeline. This
|
||||
// runs encryptRecord + articleTable.add and emits the ArticleSaved
|
||||
// domain event, exactly like a manual `Save URL` would.
|
||||
const article = await articlesStore.saveFromExtracted({
|
||||
originalUrl: row.payload.originalUrl,
|
||||
title: row.payload.title,
|
||||
excerpt: row.payload.excerpt,
|
||||
content: row.payload.content,
|
||||
htmlContent: row.payload.htmlContent,
|
||||
author: row.payload.author,
|
||||
siteName: row.payload.siteName,
|
||||
wordCount: row.payload.wordCount,
|
||||
readingTimeMinutes: row.payload.readingTimeMinutes,
|
||||
warning: row.payload.warning,
|
||||
});
|
||||
|
||||
const nextState: ArticleImportItemState =
|
||||
row.payload.warning === 'probable_consent_wall' ? 'consent-wall' : 'saved';
|
||||
|
||||
await articleImportItemTable.update(item.id, {
|
||||
state: nextState,
|
||||
articleId: article.id,
|
||||
warning: row.payload.warning ?? null,
|
||||
});
|
||||
await articleExtractPickupTable.delete(row.id);
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue