From 29cbaf30f59eb6d9e48012948329f23419f5c345 Mon Sep 17 00:00:00 2001 From: Till JS Date: Tue, 28 Apr 2026 22:23:45 +0200 Subject: [PATCH] feat(articles): bulk-import store + queries (Phase 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit apps/mana/apps/web/src/lib/modules/articles/: - stores/imports.svelte.ts: new file. articleImportsStore with createJob (bulkAdd N items + 1 job), pauseJob, resumeJob, cancelJob, retryFailed, deleteJob. parseUrls exported as a pure function — splits on whitespace+comma, validates http(s) scheme, deduplicates while preserving input order; used by both the store and the UI's $derived live-validation in Phase 5. - queries.ts: toImportJob/toImportItem converters + useImportJobs (index list), useImportJob (detail header), useImportItems (per- job item list). All scope-aware via scopedForModule / scopedGet. Job creation: createJob(urls) → jobId. Items written first so a worker tick that races the job-write doesn't see a job with totalUrls=N but fewer items reachable. Server-worker picks up state='pending' items on its 2s tick. retryFailed re-arms the job to status='running' if it was 'done', because all-terminal-items had triggered the auto-completion in the worker's counter-rollup pass. deleteJob is soft (deletedAt stamp) on both job + items; already- landed Article rows are NOT touched. Phase 5 (UI) follows. Plan: docs/plans/articles-bulk-import.md. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../web/src/lib/modules/articles/queries.ts | 91 ++++++++- .../modules/articles/stores/imports.svelte.ts | 185 ++++++++++++++++++ 2 files changed, 275 insertions(+), 1 deletion(-) create mode 100644 apps/mana/apps/web/src/lib/modules/articles/stores/imports.svelte.ts diff --git a/apps/mana/apps/web/src/lib/modules/articles/queries.ts b/apps/mana/apps/web/src/lib/modules/articles/queries.ts index a8ce472cd..0819f9eae 100644 --- a/apps/mana/apps/web/src/lib/modules/articles/queries.ts +++ b/apps/mana/apps/web/src/lib/modules/articles/queries.ts @@ -11,7 +11,17 @@ import { deriveUpdatedAt } from '$lib/data/sync'; import { decryptRecords } from '$lib/data/crypto'; import { scopedForModule, scopedGet } from '$lib/data/scope'; import { articleTagOps } from './stores/tags.svelte'; -import type { LocalArticle, LocalHighlight, Article, Highlight, ArticleStatus } from './types'; +import type { + Article, + ArticleImportItem, + ArticleImportJob, + ArticleStatus, + Highlight, + LocalArticle, + LocalArticleImportItem, + LocalArticleImportJob, + LocalHighlight, +} from './types'; // ─── Type Converters ───────────────────────────────────── @@ -267,6 +277,85 @@ export function useArticleHighlights(articleId: string) { }, [] as Highlight[]); } +// ─── Bulk-Import (docs/plans/articles-bulk-import.md) ──── + +export function toImportJob(local: LocalArticleImportJob): ArticleImportJob { + const now = new Date().toISOString(); + return { + id: local.id, + totalUrls: local.totalUrls, + status: local.status, + leasedBy: local.leasedBy ?? null, + leasedUntil: local.leasedUntil ?? null, + startedAt: local.startedAt ?? null, + finishedAt: local.finishedAt ?? null, + savedCount: local.savedCount ?? 0, + duplicateCount: local.duplicateCount ?? 0, + errorCount: local.errorCount ?? 0, + warningCount: local.warningCount ?? 0, + createdAt: local.createdAt ?? now, + updatedAt: deriveUpdatedAt(local) ?? local.createdAt ?? now, + }; +} + +export function toImportItem(local: LocalArticleImportItem): ArticleImportItem { + const now = new Date().toISOString(); + return { + id: local.id, + jobId: local.jobId, + idx: local.idx, + url: local.url, + state: local.state, + articleId: local.articleId ?? null, + warning: local.warning ?? null, + error: local.error ?? null, + attempts: local.attempts ?? 0, + lastAttemptAt: local.lastAttemptAt ?? null, + createdAt: local.createdAt ?? now, + updatedAt: deriveUpdatedAt(local) ?? local.createdAt ?? now, + }; +} + +/** All bulk-import jobs in the active space, newest first. Drives the + * `/articles/import` index. */ +export function useImportJobs() { + return useScopedLiveQuery(async () => { + const locals = await scopedForModule( + 'articles', + 'articleImportJobs' + ).toArray(); + const visible = locals.filter((j) => !j.deletedAt); + visible.sort((a, b) => (deriveUpdatedAt(b) ?? '').localeCompare(deriveUpdatedAt(a) ?? '')); + return visible.map(toImportJob); + }, [] as ArticleImportJob[]); +} + +/** Single job — drives the `/articles/import/[jobId]` detail header. */ +export function useImportJob(jobId: string) { + return useScopedLiveQuery( + async () => { + const local = await scopedGet('articleImportJobs', jobId); + if (!local || local.deletedAt) return null; + return toImportJob(local); + }, + null as ArticleImportJob | null + ); +} + +/** Items for one job, in the original input order. Drives the per-row + * list on the detail view. */ +export function useImportItems(jobId: string) { + return useScopedLiveQuery(async () => { + const locals = await scopedForModule( + 'articles', + 'articleImportItems' + ).toArray(); + const forJob = locals.filter((i) => i.jobId === jobId && !i.deletedAt); + forJob.sort((a, b) => a.idx - b.idx); + return forJob.map(toImportItem); + }, [] as ArticleImportItem[]); +} + // ─── Pure Helpers ───────────────────────────────────────── export function filterByStatus(articles: Article[], status: ArticleStatus): Article[] { diff --git a/apps/mana/apps/web/src/lib/modules/articles/stores/imports.svelte.ts b/apps/mana/apps/web/src/lib/modules/articles/stores/imports.svelte.ts new file mode 100644 index 000000000..9c6d924a8 --- /dev/null +++ b/apps/mana/apps/web/src/lib/modules/articles/stores/imports.svelte.ts @@ -0,0 +1,185 @@ +/** + * Articles Bulk-Import — store (mutations only). + * + * Creates and steers `articleImportJobs` + `articleImportItems`. The + * server-side worker (apps/api/src/modules/articles/import-worker.ts) + * picks up `state='pending'` items, extracts them, drops Pickup rows + * the client-side `consume-pickup.ts` consumer translates into encrypted + * `articles` rows. + * + * Read-side queries live in `queries.ts` (a `useImportJob(id)` / + * `useImportItems(jobId)` pair will land alongside the UI in Phase 5). + * + * Plan: docs/plans/articles-bulk-import.md. + */ + +import { articleImportJobTable, articleImportItemTable } from '../collections'; +import type { + ArticleImportItemState, + LocalArticleImportItem, + LocalArticleImportJob, +} from '../types'; + +/** + * Pure URL parser — used by both the store (`createJob` accepts a raw + * textarea blob) and the UI's `$derived` live-validation. Splits on + * any whitespace + comma, drops empties, validates with `new URL`, + * deduplicates while preserving first-occurrence order. + * + * Exported as a standalone pure function so the unit-test file can + * import it without booting Dexie. + */ +export interface ParsedUrls { + valid: string[]; + invalid: string[]; + duplicates: string[]; +} + +export function parseUrls(raw: string): ParsedUrls { + const tokens = raw + .split(/[\s,]+/) + .map((t) => t.trim()) + .filter(Boolean); + const valid: string[] = []; + const invalid: string[] = []; + const duplicates: string[] = []; + const seen = new Set(); + for (const token of tokens) { + // Reject anything without an http(s) scheme — `new URL('foo.com')` + // would happily accept it as an opaque URI and the server-side + // fetch would then 400 on us. + let parsed: URL; + try { + parsed = new URL(token); + } catch { + invalid.push(token); + continue; + } + if (parsed.protocol !== 'http:' && parsed.protocol !== 'https:') { + invalid.push(token); + continue; + } + const canonical = parsed.toString(); + if (seen.has(canonical)) { + duplicates.push(canonical); + continue; + } + seen.add(canonical); + valid.push(canonical); + } + return { valid, invalid, duplicates }; +} + +export const articleImportsStore = { + /** + * Create a job with N items, all in state='pending'. Returns the + * job id so the caller can navigate to `/articles/import/[jobId]`. + * + * No URL validation here — `parseUrls` is the canonical entry, and + * the UI calls it for live feedback before submit. We accept a + * pre-cleaned string array so this method stays trivially testable. + */ + async createJob(urls: readonly string[]): Promise { + if (urls.length === 0) { + throw new Error('createJob: empty url list'); + } + const jobId = crypto.randomUUID(); + + const job: LocalArticleImportJob = { + id: jobId, + totalUrls: urls.length, + status: 'queued', + leasedBy: null, + leasedUntil: null, + startedAt: null, + finishedAt: null, + savedCount: 0, + duplicateCount: 0, + errorCount: 0, + warningCount: 0, + }; + + const items: LocalArticleImportItem[] = urls.map((url, idx) => ({ + id: crypto.randomUUID(), + jobId, + idx, + url, + state: 'pending' as ArticleImportItemState, + articleId: null, + warning: null, + error: null, + attempts: 0, + lastAttemptAt: null, + })); + + // Items first so a server-worker tick that races the job-write + // won't see a job with totalUrls=N but only N-1 items reachable. + // (Conservative ordering — the worker filters jobs to running/ + // queued before scanning items, but the bulkAdd is cheap.) + await articleImportItemTable.bulkAdd(items); + await articleImportJobTable.add(job); + + return jobId; + }, + + /** Pause a running job. Server-worker observes `status='paused'` and + * stops claiming new items. Already-extracting items finish their + * roundtrip; pickup/encrypt cycle for them runs normally. */ + async pauseJob(jobId: string): Promise { + await articleImportJobTable.update(jobId, { status: 'paused' }); + }, + + /** Resume a paused job. */ + async resumeJob(jobId: string): Promise { + await articleImportJobTable.update(jobId, { status: 'running' }); + }, + + /** Cancel a job. Server-worker flips every still-pending item to + * state='cancelled' on the next tick. */ + async cancelJob(jobId: string): Promise { + await articleImportJobTable.update(jobId, { status: 'cancelled' }); + }, + + /** + * Retry the failed items of a job — flip them back to 'pending' so + * the worker picks them up again. Resets attempts so the per-item + * 3-attempt budget restarts cleanly. Counter delta is left to the + * worker (it derives counters from current item states each tick). + */ + async retryFailed(jobId: string): Promise { + const failed = await articleImportItemTable + .where('[jobId+state]') + .equals([jobId, 'error']) + .toArray(); + for (const it of failed) { + await articleImportItemTable.update(it.id, { + state: 'pending' as ArticleImportItemState, + error: null, + attempts: 0, + }); + } + // If the job was 'done' because everything was terminal, re-arm it. + if (failed.length > 0) { + const job = await articleImportJobTable.get(jobId); + if (job?.status === 'done') { + await articleImportJobTable.update(jobId, { + status: 'running', + finishedAt: null, + }); + } + } + return failed.length; + }, + + /** Soft-delete the job + all its items. Article rows that already + * landed are NOT touched — the user's reading list is independent + * from the import job's history. */ + async deleteJob(jobId: string): Promise { + const now = new Date().toISOString(); + const items = await articleImportItemTable.where('jobId').equals(jobId).toArray(); + for (const it of items) { + await articleImportItemTable.update(it.id, { deletedAt: now }); + } + await articleImportJobTable.update(jobId, { deletedAt: now }); + }, +};