feat(ai): SSE streaming for foreground Mission Runner

Enable real-time token streaming during the planner "calling-llm" phase
so the user sees live progress ("empfange Plan… 128 tokens") instead of
a static spinner. The parser still receives the full text once complete —
no partial-JSON risk.

Changes:
- Extract shared SSE parser from playground into @mana/shared-llm/sse-parser
- remote.ts: use stream:true when onToken callback is provided
- AiPlanInput: add optional onToken field (shared-ai)
- ai-plan task: pass onToken through to backend.generate()
- runner.ts: throttled (500ms) phaseDetail updates during streaming
- Playground: refactored to use shared SSE parser

Also includes: AI agent architecture comparison report (docs/reports/)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-16 12:32:43 +02:00
parent 8a0bf93699
commit be81d11dc3
9 changed files with 633 additions and 106 deletions

View file

@ -51,6 +51,11 @@ const RESEARCH_TRIGGER = /\b(recherchier|research|news|finde|suche|aktuelle|neue
* 5 is generous for read-act-refine patterns ("list_notes → tag them")
* without running the LLM bill dry on stuck missions. */
const MAX_REASONING_LOOP_ITERATIONS = 5;
/** Min interval between Dexie phaseDetail writes during streaming.
* 50 tokens/s × 500ms = ~25 tokens between writes frequent enough
* for the UI to feel live, infrequent enough to avoid Dexie thrashing. */
const STREAMING_PHASE_THROTTLE_MS = 500;
/** Singleton row id of the kontext doc kept in sync with
* `modules/kontext/types.ts` (KONTEXT_SINGLETON_ID). */
const KONTEXT_SINGLETON_ID = 'singleton';
@ -274,8 +279,32 @@ export async function runMission(
: `Planner Runde ${loopIndex + 1}/${MAX_REASONING_LOOP_ITERATIONS}`
);
let plan: AiPlanOutput;
// Streaming: show live token progress while waiting for the
// planner response. Throttled to avoid Dexie write floods.
let streamTokenCount = 0;
let lastStreamWrite = 0;
const roundLabel = loopIndex === 0 ? '' : ` (Runde ${loopIndex + 1})`;
const onToken = (_delta: string) => {
streamTokenCount++;
const now = Date.now();
if (now - lastStreamWrite < STREAMING_PHASE_THROTTLE_MS) return;
lastStreamWrite = now;
void setIterationPhase(
mission!.id,
iterationId,
'calling-llm',
`empfange Plan${roundLabel}${streamTokenCount} tokens`
);
};
try {
plan = await deps.plan({ mission: mission!, resolvedInputs: loopInputs, availableTools });
plan = await deps.plan({
mission: mission!,
resolvedInputs: loopInputs,
availableTools,
onToken,
});
} catch (err) {
if (isAiDebugEnabled()) {
void recordAiDebug({

View file

@ -51,6 +51,7 @@ export const aiPlanTask: LlmTask<AiPlanInput, AiPlanOutput> = {
// (e.g. 10 notes → 10 add_tag_to_note calls). 4096 fits ~15-20
// step objects while still fast on browser tier.
maxTokens: 4096,
onToken: input.onToken,
});
// Always populate debug payload (cheap — strings already in memory).

View file

@ -27,6 +27,7 @@
type CreditOperationType,
} from '@mana/credits';
import { ManaEvents } from '@mana/shared-utils/analytics';
import { authStore } from '$lib/stores/auth.svelte';
let balance = $state<CreditBalance | null>(null);
let transactions = $state<CreditTransaction[]>([]);
@ -135,21 +136,35 @@
async function loadData() {
loading = true;
error = null;
try {
const [balanceData, transactionsData, packagesData] = await Promise.all([
creditsService.getBalance(),
creditsService.getTransactions(20),
creditsService.getPackages(),
]);
balance = balanceData;
transactions = transactionsData;
packages = packagesData.filter((p) => p.active).sort((a, b) => a.sortOrder - b.sortOrder);
} catch (e) {
error = e instanceof Error ? e.message : $_('common.error_loading');
console.error('Failed to load credits data:', e);
} finally {
// API calls require auth — skip for guests, still show costs tab (static data)
if (!authStore.isAuthenticated) {
loading = false;
return;
}
// Load each independently so a single 401/failure doesn't blank the whole page
const results = await Promise.allSettled([
creditsService.getBalance(),
creditsService.getTransactions(20),
creditsService.getPackages(),
]);
if (results[0].status === 'fulfilled') balance = results[0].value;
if (results[1].status === 'fulfilled') transactions = results[1].value;
if (results[2].status === 'fulfilled') {
packages = results[2].value.filter((p) => p.active).sort((a, b) => a.sortOrder - b.sortOrder);
}
// Only show error if ALL three failed
const allFailed = results.every((r) => r.status === 'rejected');
if (allFailed) {
const firstErr = results.find((r) => r.status === 'rejected') as PromiseRejectedResult;
error =
firstErr.reason instanceof Error ? firstErr.reason.message : $_('common.error_loading');
}
loading = false;
}
// ── Helpers ────────────────────────────────────────────

View file

@ -2,16 +2,12 @@
* Playground LLM client thin wrapper around mana-llm's OpenAI-compatible
* `/v1/chat/completions` (streaming) and `/v1/models` endpoints.
*
* Lives next to the playground UI rather than in a shared package because
* the playground is the only consumer right now. If chat / todo enrichment
* / period insights end up calling the same surface in the future, lift
* this into `$lib/data/llm-client.ts`.
*
* The chunk parser is hand-rolled rather than pulled from a library: the
* SSE wire format from mana-llm is straight OpenAI (`data: {…}\n\n` lines
* with a sentinel `[DONE]`), so a 30-line reader is simpler than a dep.
* The SSE chunk parser lives in `@mana/shared-llm/sse-parser` and is shared
* with the LLM orchestrator's remote backend (backends/remote.ts).
*/
import { consumeSSEStream } from '@mana/shared-llm/sse-parser';
const DEFAULT_LLM_URL = 'http://localhost:3025';
/** Resolve the mana-llm base URL from the window-injected env, falling
@ -91,49 +87,42 @@ export async function* streamCompletion(opts: CompletionOptions): AsyncGenerator
throw new Error(`mana-llm: ${res.status} ${res.statusText}${text ? `${text}` : ''}`);
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
// Collect chunks via the shared SSE parser, then yield them.
// We use a queue pattern so the async generator can yield chunks as
// they arrive from the callback-based consumeSSEStream.
const chunks: StreamChunk[] = [];
let resolve: (() => void) | null = null;
let done = false;
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const streamPromise = consumeSSEStream(
res.body,
(content) => {
chunks.push({ type: 'delta', content });
resolve?.();
},
(usage) => {
chunks.push({
type: 'usage',
promptTokens: usage.promptTokens,
completionTokens: usage.completionTokens,
});
resolve?.();
}
).then(() => {
done = true;
resolve?.();
});
// SSE frames are separated by blank lines. Process complete frames
// and leave any partial trailing frame in the buffer for the next
// chunk.
let sep: number;
while ((sep = buffer.indexOf('\n\n')) !== -1) {
const frame = buffer.slice(0, sep);
buffer = buffer.slice(sep + 2);
for (const line of frame.split('\n')) {
if (!line.startsWith('data:')) continue;
const data = line.slice(5).trim();
if (!data || data === '[DONE]') continue;
try {
const json = JSON.parse(data) as {
choices?: Array<{ delta?: { content?: string } }>;
usage?: { prompt_tokens?: number; completion_tokens?: number };
};
const delta = json.choices?.[0]?.delta?.content;
if (delta) yield { type: 'delta', content: delta };
// Usage stats — typically in the final chunk
if (json.usage?.prompt_tokens != null) {
yield {
type: 'usage',
promptTokens: json.usage.prompt_tokens,
completionTokens: json.usage.completion_tokens ?? 0,
};
}
} catch {
// Malformed frame — skip silently. mana-llm occasionally
// emits keepalive comments and we don't want them to
// crash the stream.
}
}
while (!done || chunks.length > 0) {
if (chunks.length > 0) {
yield chunks.shift()!;
} else {
await new Promise<void>((r) => {
resolve = r;
});
}
}
// Ensure the stream promise settles (propagate any errors).
await streamPromise;
}