feat(shared-llm): Phase 4 — persistent LLM task queue

Until now, modules wanting to use the orchestrator had to await each
LLM call inline in their store code. That's fine for foreground tasks
("user clicked summarize") but a non-starter for background work
("auto-tag every new note", "generate a title for every voice memo
after STT finishes"). Background tasks need to:

  - Queue up while no LLM tier is ready, then drain when one becomes
    available (e.g. user just enabled the browser tier from settings)
  - Survive page reloads, browser restarts, and the user navigating
    away mid-execution
  - Run one at a time without blocking the foreground UI
  - Allow modules to subscribe to results reactively without polling
  - Retry transient failures (network, model loading) but not
    semantic ones (tier-too-low, content blocked)

Phase 4 ships exactly that.

Architecture:

  packages/shared-llm/src/queue.ts — LlmTaskQueue class
    + QueuedTask interface (the persistent row shape)
    + EnqueueOptions (refType/refId/priority/maxAttempts)
    + TaskRegistry type (name → LlmTask map)
    + LlmTaskQueueOptions (table + orchestrator + registry +
                           retryBackoffMs + idleWakeupMs)

  Public API:
    - enqueue(task, input, opts) → string  (returns the queued id)
    - get(id), list(filter)
    - retry(id), cancel(id), purge(olderThanMs)
    - start(), stop()  (idempotent processor lifecycle)

  apps/mana/apps/web/src/lib/llm-queue.ts — web app singleton
    - Dedicated `mana-llm-queue` Dexie database (separate from the
      main `mana` IDB; see comment for the rationale: ephemeral
      per-device state, no encryption needed, no sync needed, doesn't
      belong in the long-frozen `mana` schema)
    - Wires up the queue with llmOrchestrator + taskRegistry
    - Exposes startLlmQueue() / stopLlmQueue() for the layout hook

  apps/mana/apps/web/src/lib/llm-task-registry.ts
    - Maps task names → task objects so the queue processor can
      look up the implementation when pulling rows off the table.
      Closures can't be persisted, so we round-trip via name.
    - Currently registers extractDateTask + summarizeTextTask;
      module-side tasks land here as we add them.

  apps/mana/apps/web/src/routes/(app)/+layout.svelte
    - startLlmQueue() in handleAuthReady's Phase A (auth-independent)
      so guests + authenticated users both get the queue
    - stopLlmQueue() in onDestroy as a fire-and-forget cleanup

Processor loop semantics (the heart of the implementation):

  1. On start(), reclaim any 'running' rows from a crashed previous
     session — reset them to 'pending'. The orphan recovery is the
     reason a crash mid-task doesn't leave the queue stuck.
  2. findNextRunnable() picks the highest-priority pending task whose
     `notBefore` (retry-backoff timestamp) is in the past. Sort key:
     priority desc, then enqueuedAt asc (FIFO within priority).
  3. Mark the task running, increment attempts, look up the LlmTask
     in the registry, hand it to orchestrator.run().
  4. On success: mark done, store result + source + finishedAt.
  5. On error:
       - TierTooLowError or ProviderBlockedError → fail immediately,
         no retry. These are not transient — the user's settings or
         the content itself need to change.
       - Anything else → if attempts < maxAttempts, reset to pending
         with notBefore = now + retryBackoffMs (default 60s). Else
         mark failed.
  6. When no work is pending, sleep on a Promise that resolves when
     either (a) someone calls enqueue() (which fires notifyWakeup),
     or (b) idleWakeupMs elapses (default 30s, safety net for any
     missed wakeup signal).

Module-side reactive reads use Dexie liveQuery directly on the queue
table — no special subscription API on the queue itself. This is
consistent with how every other Mana module reads its data, so the
mental model stays uniform:

  const tags = useLiveQuery(
    () => llmQueueDb.tasks
      .where({ refType: 'note', refId, taskName: 'common.extractTags' })
      .reverse().first(),
    [refId]
  );

Smoke test: a new "Queue" tab in /llm-test lets you enqueue the
existing extractDate / summarize tasks and watch the live state of
the queue table via liveQuery. The display includes per-row state
badge (pending/running/done/failed), tier source, attempt count,
input/output, and a "Done/failed löschen" button that exercises
purge().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-09 01:51:20 +02:00
parent 6e20c298ac
commit 3b5d58ecbe
8 changed files with 567 additions and 251 deletions

View file

@ -0,0 +1,66 @@
/**
* LLM task queue singleton for the Mana web app.
*
* Wires up @mana/shared-llm's LlmTaskQueue with:
*
* - A dedicated Dexie database (`mana-llm-queue`) separate from
* the main `mana` IndexedDB. The queue holds ephemeral, per-device
* state that does NOT need encryption (the inputs are user content
* they already see), does NOT need cross-device sync (running on
* device A doesn't help device B), and does NOT belong in the
* long-frozen `mana` schema with its 120+ collections. A separate
* small DB is the right granularity here.
*
* - The shared LlmOrchestrator singleton from @mana/shared-llm.
*
* - The task registry from $lib/llm-task-registry.ts every task
* name that the queue might encounter has to be listed there
* so the processor can look up the LlmTask object at execution
* time. (Closures can't be persisted, so we round-trip via name.)
*
* The queue is started from the (app)/+layout.svelte's onMount so it
* runs once per page session as long as the app is open.
*/
import Dexie, { type Table } from 'dexie';
import { LlmTaskQueue, llmOrchestrator, type QueuedTask } from '@mana/shared-llm';
import { taskRegistry } from './llm-task-registry';
class LlmQueueDb extends Dexie {
tasks!: Table<QueuedTask, string>;
constructor() {
super('mana-llm-queue');
this.version(1).stores({
// Indexes:
// id primary key (uuid string)
// state filter on pending/running/done/failed
// refType+refId compound index for module reactive reads
// ("show me all tasks for note X")
// taskName filter by task type
// enqueuedAt sort key for FIFO ordering
tasks: 'id, state, [refType+refId], taskName, enqueuedAt',
});
}
}
export const llmQueueDb = new LlmQueueDb();
export const llmTaskQueue = new LlmTaskQueue({
table: llmQueueDb.tasks,
orchestrator: llmOrchestrator,
registry: taskRegistry,
});
/** Start the background processor. Idempotent safe to call from
* layout onMount even if multiple components mount in parallel. */
export function startLlmQueue(): void {
if (typeof window === 'undefined') return;
llmTaskQueue.start();
}
/** Stop the queue and wait for the current task to finish. Used by
* tests and by the layout's onDestroy hook. */
export async function stopLlmQueue(): Promise<void> {
await llmTaskQueue.stop();
}

View file

@ -0,0 +1,27 @@
/**
* Central registry of all LlmTasks the Mana web app knows about.
*
* The persistent task queue stores task NAMES (strings) rather than
* task OBJECTS, because closures can't be serialised to IndexedDB.
* When the queue processor pulls a row off the table, it looks up
* the task name in this registry to recover the actual LlmTask
* object with its runLlm() / runRules() implementations.
*
* Adding a new task: import it here and add it to the map. The
* convention is `{module}.{action}` for the task name, matching
* the `name` field on the LlmTask itself.
*
* If you forget to register a task, the queue will mark any enqueued
* row with that name as failed with the error
* "Task '<name>' is not registered" which is at least loud and
* obvious enough to catch the typo immediately.
*/
import type { TaskRegistry } from '@mana/shared-llm';
import { extractDateTask } from './llm-tasks/extract-date';
import { summarizeTextTask } from './llm-tasks/summarize';
export const taskRegistry: TaskRegistry = {
[extractDateTask.name]: extractDateTask,
[summarizeTextTask.name]: summarizeTextTask,
};

View file

@ -36,6 +36,7 @@
import { tagLocalStore, tagMutations, useAllTags } from '@mana/shared-stores';
import { linkLocalStore, linkMutations } from '@mana/shared-links';
import { manaStore } from '$lib/data/local-store';
import { startLlmQueue, stopLlmQueue } from '$lib/llm-queue';
import { createUnifiedSync } from '$lib/data/sync';
import { networkStore } from '$lib/stores/network.svelte';
import { db } from '$lib/data/database';
@ -305,6 +306,12 @@
initSharedUload();
await dashboardStore.initialize();
// Start the persistent LLM task queue. Idempotent — safe to call
// repeatedly. The queue picks up any tasks left in 'pending' state
// from previous sessions (and reclaims orphaned 'running' rows
// from a crashed session) before going idle. See $lib/llm-queue.ts.
startLlmQueue();
// Restore nav collapsed state
if (typeof localStorage !== 'undefined') {
const savedCollapsed = localStorage.getItem(STORAGE_KEYS.NAV_COLLAPSED);
@ -373,6 +380,10 @@
unifiedSync?.stopAll();
reminderScheduler.stop();
guestMode?.destroy();
// Fire-and-forget — we don't need to await; the in-flight task
// will finish in the background and the next page session will
// pick up where we left off.
void stopLlmQueue();
});
// ── Search / Spotlight ───────────────────────────────────

View file

@ -21,6 +21,8 @@
} from '@mana/shared-llm';
import { extractDateTask } from '$lib/llm-tasks/extract-date';
import { summarizeTextTask } from '$lib/llm-tasks/summarize';
import { llmTaskQueue, llmQueueDb } from '$lib/llm-queue';
import { useLiveQueryWithDefault } from '@mana/local-store/svelte';
import { marked } from 'marked';
import { Robot, Trash, PaperPlaneRight, ClockCounterClockwise } from '@mana/shared-icons';
@ -53,9 +55,21 @@
// --- State ---
let selectedModel: ModelKey = $state('gemma-4-e2b');
let activeTab: 'chat' | 'extract' | 'classify' | 'compare' | 'benchmark' | 'router' =
let activeTab: 'chat' | 'extract' | 'classify' | 'compare' | 'benchmark' | 'router' | 'queue' =
$state('chat');
// --- Queue tab state ---
let queueInput = $state('Treffen mit Sara morgen 14:30');
let queueLastEnqueuedId = $state<string | null>(null);
const queueRows = useLiveQueryWithDefault(
async () => llmQueueDb.tasks.orderBy('enqueuedAt').reverse().limit(20).toArray(),
[]
);
async function enqueueTaskNow(task: typeof extractDateTask | typeof summarizeTextTask) {
queueLastEnqueuedId = await llmTaskQueue.enqueue(task, { text: queueInput });
}
// --- Router tab state ---
const settings = $derived(llmSettingsState.current);
let routerInput = $state('Treffen mit Sara morgen 14:30');
@ -631,7 +645,7 @@
<!-- Tabs -->
<div class="mb-4 flex gap-1 rounded-lg border border-border bg-card p-1">
{#each [{ id: 'chat', label: 'Chat' }, { id: 'extract', label: 'JSON Extract' }, { id: 'classify', label: 'Classify' }, { id: 'compare', label: 'Compare' }, { id: 'benchmark', label: 'Benchmark' }, { id: 'router', label: 'Router' }] as tab}
{#each [{ id: 'chat', label: 'Chat' }, { id: 'extract', label: 'JSON Extract' }, { id: 'classify', label: 'Classify' }, { id: 'compare', label: 'Compare' }, { id: 'benchmark', label: 'Benchmark' }, { id: 'router', label: 'Router' }, { id: 'queue', label: 'Queue' }] as tab}
<button
onclick={() => (activeTab = tab.id as typeof activeTab)}
class="flex-1 rounded-md px-3 py-1.5 text-sm font-medium transition-colors {activeTab ===
@ -1279,5 +1293,113 @@
{/if}
</div>
{/if}
<!-- Queue Tab — exercises the persistent LlmTaskQueue -->
{#if activeTab === 'queue'}
<div class="flex flex-col gap-4">
<div class="rounded-xl border border-border bg-card p-4">
<p class="mb-3 text-sm text-muted-foreground">
Smoke-Test für die persistente Task-Queue. Tasks werden in einer eigenen Dexie-DB (<code
class="rounded bg-muted px-1 py-0.5 text-[10px]">mana-llm-queue</code
>) gespeichert und im Hintergrund vom Queue-Processor abgearbeitet sobald ein passender
LLM-Tier verfügbar ist. Tasks überleben Page-Reloads — du kannst die Seite hart neuladen
und sie laufen weiter.
</p>
<input
type="text"
bind:value={queueInput}
placeholder="Eingabetext für den Task..."
class="mb-3 w-full rounded-lg border border-border bg-background px-3 py-2 text-sm text-foreground placeholder:text-muted-foreground focus:border-primary focus:outline-none"
/>
<div class="flex flex-wrap gap-2">
<button
onclick={() => enqueueTaskNow(extractDateTask)}
disabled={!queueInput.trim()}
class="rounded-lg bg-primary px-4 py-2 text-sm font-medium text-primary-foreground disabled:opacity-50"
>
Enqueue extractDate
</button>
<button
onclick={() => enqueueTaskNow(summarizeTextTask)}
disabled={!queueInput.trim()}
class="rounded-lg bg-primary px-4 py-2 text-sm font-medium text-primary-foreground disabled:opacity-50"
>
Enqueue summarize
</button>
</div>
{#if queueLastEnqueuedId}
<div class="mt-3 text-xs text-muted-foreground">
Letzte Task-ID:
<code class="rounded bg-muted px-1 py-0.5 font-mono">{queueLastEnqueuedId}</code>
</div>
{/if}
</div>
<!-- Live queue table view via Dexie liveQuery -->
<div class="rounded-xl border border-border bg-card p-4">
<div class="mb-3 flex items-center justify-between">
<h3 class="text-sm font-semibold">Letzte 20 Tasks</h3>
<button
onclick={() => llmTaskQueue.purge(0)}
class="rounded-md border border-border px-2 py-1 text-xs text-muted-foreground hover:text-foreground"
>
Done/failed löschen
</button>
</div>
{#if queueRows.value.length === 0}
<div class="rounded-lg bg-muted/20 p-3 text-sm text-muted-foreground">
Queue ist leer. Reihe oben einen Task ein.
</div>
{:else}
<div class="space-y-2">
{#each queueRows.value as row}
{@const stateColor =
row.state === 'done'
? 'border-emerald-500/40 bg-emerald-500/5 text-emerald-600 dark:text-emerald-400'
: row.state === 'failed'
? 'border-red-500/40 bg-red-500/5 text-red-600 dark:text-red-400'
: row.state === 'running'
? 'border-blue-500/40 bg-blue-500/5 text-blue-600 dark:text-blue-400'
: 'border-muted-foreground/30 bg-muted/10 text-muted-foreground'}
<div class="rounded-lg border p-3 text-xs {stateColor}">
<div class="flex flex-wrap items-center gap-2">
<span class="rounded-full border border-current px-2 py-0.5 font-medium">
{row.state}
</span>
<span class="font-mono text-foreground">{row.taskName}</span>
<span class="text-muted-foreground">
· {new Date(row.enqueuedAt).toLocaleTimeString()}
</span>
{#if row.attempts > 1}
<span class="text-muted-foreground">
· {row.attempts}/{row.maxAttempts} attempts
</span>
{/if}
{#if row.source}
<span class="text-muted-foreground">· via {row.source}</span>
{/if}
</div>
<div class="mt-1 truncate text-muted-foreground">
input: <code class="font-mono">{JSON.stringify(row.input)}</code>
</div>
{#if row.result !== undefined}
<div class="mt-1 text-foreground">
result: <code class="font-mono">{JSON.stringify(row.result)}</code>
</div>
{/if}
{#if row.error}
<div class="mt-1 text-red-400">error: {row.error}</div>
{/if}
</div>
{/each}
</div>
{/if}
</div>
</div>
{/if}
{/if}
</div>