diff --git a/apps/api/drizzle.config.ts b/apps/api/drizzle.config.ts new file mode 100644 index 000000000..2fe27573b --- /dev/null +++ b/apps/api/drizzle.config.ts @@ -0,0 +1,20 @@ +import { defineConfig } from 'drizzle-kit'; + +/** + * Drizzle config for the unified mana-api. + * + * Most modules in apps/api inline their schemas in routes.ts and create + * tables out-of-band (or piggyback on schemas owned by other services). + * This config currently only manages the `research` schema introduced for + * the deep-research feature; expand the `schema` glob and `schemaFilter` + * as more modules adopt managed migrations. + */ +export default defineConfig({ + schema: './src/modules/research/schema.ts', + out: './drizzle/research', + dialect: 'postgresql', + dbCredentials: { + url: process.env.DATABASE_URL || 'postgresql://mana:devpassword@localhost:5432/mana_platform', + }, + schemaFilter: ['research'], +}); diff --git a/apps/api/drizzle/research/0000_init.sql b/apps/api/drizzle/research/0000_init.sql new file mode 100644 index 000000000..b73f8d116 --- /dev/null +++ b/apps/api/drizzle/research/0000_init.sql @@ -0,0 +1,47 @@ +-- Research module — initial schema (manually authored to match +-- apps/api/src/modules/research/schema.ts). +-- +-- Once `drizzle-kit` is installed in apps/api, future migrations should +-- be generated via `pnpm --filter @mana/api db:generate` and this file +-- can become the canonical baseline. +-- +-- Apply with: +-- psql "$DATABASE_URL" -f apps/api/drizzle/research/0000_init.sql + +CREATE SCHEMA IF NOT EXISTS "research"; + +CREATE TABLE IF NOT EXISTS "research"."research_results" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), + "user_id" text NOT NULL, + "question_id" text NOT NULL, + "depth" text NOT NULL, + "status" text NOT NULL, + "sub_queries" jsonb, + "summary" text, + "key_points" jsonb, + "follow_up_questions" jsonb, + "error_message" text, + "started_at" timestamptz NOT NULL DEFAULT now(), + "finished_at" timestamptz +); + +CREATE INDEX IF NOT EXISTS "research_results_user_id_idx" + ON "research"."research_results" ("user_id"); + +CREATE INDEX IF NOT EXISTS "research_results_question_id_idx" + ON "research"."research_results" ("question_id"); + +CREATE TABLE IF NOT EXISTS "research"."sources" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), + "research_result_id" uuid NOT NULL REFERENCES "research"."research_results"("id") ON DELETE CASCADE, + "url" text NOT NULL, + "title" text, + "snippet" text, + "extracted_content" text, + "category" text, + "rank" integer NOT NULL, + "created_at" timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS "sources_research_result_id_idx" + ON "research"."sources" ("research_result_id"); diff --git a/apps/api/package.json b/apps/api/package.json index 46f2e3aec..c77215999 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -7,7 +7,9 @@ "dev": "bun --hot run src/index.ts", "build": "bun build src/index.ts --outdir dist --target bun", "start": "bun run dist/index.js", - "type-check": "tsc --noEmit" + "type-check": "tsc --noEmit", + "db:generate": "drizzle-kit generate", + "db:push": "drizzle-kit push" }, "dependencies": { "@mana/media-client": "workspace:*", @@ -24,6 +26,7 @@ "devDependencies": { "@types/bun": "latest", "@types/jsdom": "^21.1.0", + "drizzle-kit": "^0.30.0", "typescript": "^5.8.0" } } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 275edd6ce..b00aa28dc 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -31,6 +31,7 @@ import { moodlitRoutes } from './modules/moodlit/routes'; import { newsRoutes } from './modules/news/routes'; import { tracesRoutes } from './modules/traces/routes'; import { presiRoutes } from './modules/presi/routes'; +import { researchRoutes } from './modules/research/routes'; const PORT = parseInt(process.env.PORT || '3060', 10); const CORS_ORIGINS = (process.env.CORS_ORIGINS || 'http://localhost:5173').split(','); @@ -61,6 +62,7 @@ app.route('/api/v1/moodlit', moodlitRoutes); app.route('/api/v1/news', newsRoutes); app.route('/api/v1/traces', tracesRoutes); app.route('/api/v1/presi', presiRoutes); +app.route('/api/v1/research', researchRoutes); // ─── Server Info ──────────────────────────────────────────── console.log(`mana-api starting on port ${PORT}...`); diff --git a/apps/api/src/lib/llm.ts b/apps/api/src/lib/llm.ts new file mode 100644 index 000000000..0f879b525 --- /dev/null +++ b/apps/api/src/lib/llm.ts @@ -0,0 +1,175 @@ +/** + * Thin client for the mana-llm gateway. + * + * Two helpers, deliberately small: + * + * llmJson() — non-streaming, parses the model response as JSON. + * Used for plan/structuring steps where we need a typed object. + * + * llmStream() — streaming, calls onToken() for each delta and returns + * the full concatenated text at the end. Used for synthesis. + * + * mana-llm exposes an OpenAI-compatible /api/v1/chat/completions endpoint + * (see services/mana-llm). Models are namespaced as `provider/model`, e.g. + * `ollama/gemma3:4b`, `openrouter/meta-llama/llama-3.1-70b-instruct`. + * + * Internal service-to-service calls — no auth on the wire (private network). + */ + +const LLM_URL = process.env.MANA_LLM_URL || 'http://localhost:3025'; + +export interface LlmMessage { + role: 'system' | 'user' | 'assistant'; + content: string; +} + +export interface LlmJsonOptions { + model: string; + system?: string; + user: string; + temperature?: number; + maxTokens?: number; +} + +export interface LlmStreamOptions { + model: string; + system?: string; + user: string; + temperature?: number; + maxTokens?: number; + onToken: (delta: string) => void | Promise; + signal?: AbortSignal; +} + +export class LlmError extends Error { + constructor( + message: string, + public readonly status?: number, + public readonly body?: string + ) { + super(message); + this.name = 'LlmError'; + } +} + +function buildMessages(system: string | undefined, user: string): LlmMessage[] { + const msgs: LlmMessage[] = []; + if (system) msgs.push({ role: 'system', content: system }); + msgs.push({ role: 'user', content: user }); + return msgs; +} + +/** + * Call the LLM and parse the response as JSON. + * + * Strips markdown code fences if the model wraps its output in ```json ... ```. + * Throws LlmError on transport/HTTP failure or if the body isn't valid JSON. + */ +export async function llmJson(opts: LlmJsonOptions): Promise { + const res = await fetch(`${LLM_URL}/api/v1/chat/completions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + model: opts.model, + messages: buildMessages(opts.system, opts.user), + temperature: opts.temperature ?? 0.2, + max_tokens: opts.maxTokens ?? 1000, + response_format: { type: 'json_object' }, + }), + }); + + if (!res.ok) { + const body = await res.text().catch(() => ''); + throw new LlmError(`mana-llm returned ${res.status}`, res.status, body); + } + + const data = (await res.json()) as { + choices?: Array<{ message?: { content?: string } }>; + }; + const raw = data.choices?.[0]?.message?.content; + if (!raw) throw new LlmError('mana-llm response missing content'); + + const cleaned = stripCodeFence(raw); + try { + return JSON.parse(cleaned) as T; + } catch (err) { + throw new LlmError( + `mana-llm returned non-JSON content: ${(err as Error).message}`, + undefined, + raw + ); + } +} + +/** + * Call the LLM in streaming mode. Invokes onToken() for each delta and + * returns the full concatenated text once the stream completes. + * + * Parses OpenAI-style SSE: lines beginning with `data: ` and the + * sentinel `data: [DONE]`. + */ +export async function llmStream(opts: LlmStreamOptions): Promise { + const res = await fetch(`${LLM_URL}/api/v1/chat/completions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + model: opts.model, + messages: buildMessages(opts.system, opts.user), + temperature: opts.temperature ?? 0.5, + max_tokens: opts.maxTokens ?? 2000, + stream: true, + }), + signal: opts.signal, + }); + + if (!res.ok || !res.body) { + const body = await res.text().catch(() => ''); + throw new LlmError(`mana-llm stream returned ${res.status}`, res.status, body); + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + let full = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // SSE frames are separated by blank lines, but mana-llm forwards + // line-by-line — split on \n and keep the last (possibly partial) line. + const lines = buffer.split('\n'); + buffer = lines.pop() ?? ''; + + for (const line of lines) { + if (!line.startsWith('data: ')) continue; + const payload = line.slice(6).trim(); + if (!payload || payload === '[DONE]') continue; + + try { + const chunk = JSON.parse(payload) as { + choices?: Array<{ delta?: { content?: string } }>; + }; + const delta = chunk.choices?.[0]?.delta?.content; + if (delta) { + full += delta; + await opts.onToken(delta); + } + } catch { + // ignore malformed frames — keepalives, comments, etc. + } + } + } + + return full; +} + +function stripCodeFence(text: string): string { + const trimmed = text.trim(); + if (!trimmed.startsWith('```')) return trimmed; + // ```json\n...\n``` or ```\n...\n``` + const withoutOpen = trimmed.replace(/^```(?:json)?\s*\n?/, ''); + return withoutOpen.replace(/\n?```\s*$/, ''); +} diff --git a/apps/api/src/lib/responses.ts b/apps/api/src/lib/responses.ts new file mode 100644 index 000000000..2e3d296b6 --- /dev/null +++ b/apps/api/src/lib/responses.ts @@ -0,0 +1,118 @@ +/** + * Standard response helpers for mana-api modules. + * + * Background: A pre-launch audit (April 2026, see + * `docs/REFACTORING_AUDIT_2026_04.md` item #5) flagged that error and + * list responses were inconsistent across the 15+ modules. The actual + * inconsistency turned out to be smaller than reported — every module + * already returns errors as `{ error: 'message' }` — but using these + * helpers gives us: + * + * 1. **Type-safe status codes** — TS catches stray `c.json(..., 999)` + * 2. **One place to enrich the envelope** — when we add `code`, + * `requestId`, or `details` later, we change one file instead of + * grepping 79 callsites. + * 3. **Consistent list shape** — `{ items, count }` regardless of + * what the items are. Frontend `apps/mana/apps/web` doesn't have + * to special-case `events` vs `contacts` vs `occurrences`. + * + * The shape is wire-compatible with the existing inline `c.json(...)` + * calls, so adoption can be incremental: new code uses these helpers, + * old code keeps working until someone touches the file. + * + * @example + * ```ts + * import { errorResponse, listResponse, validationError } from '../../lib/responses'; + * + * routes.get('/things', async (c) => { + * const things = await db.select().from(thingsTable); + * return listResponse(c, things); + * }); + * + * routes.post('/things', async (c) => { + * const parsed = thingSchema.safeParse(await c.req.json()); + * if (!parsed.success) return validationError(c, parsed.error.issues); + * // ... + * }); + * ``` + */ + +import type { Context } from 'hono'; +import type { ContentfulStatusCode } from 'hono/utils/http-status'; + +/** + * Standard error response envelope. + * + * Wire-compatible with the inline `c.json({ error: '...' }, status)` + * pattern that already dominates the codebase. Future fields like + * `code` (machine-readable error code) and `details` (validation issues, + * etc.) can be added without touching callsites. + */ +export type ErrorBody = { + error: string; + code?: string; + details?: unknown; +}; + +/** + * Standard list response envelope. + * + * Always uses `items` as the field name, regardless of what's inside. + * The frontend hits a stable shape: `{ items: T[], count: number }`. + */ +export type ListBody = { + items: T[]; + count: number; +}; + +/** + * Return a structured error response. + * + * @param c Hono context + * @param error Human-readable message (also used as fallback for code) + * @param status HTTP status (default 500) + * @param extra Optional extra fields — `code` for machine-readable + * identification, `details` for validation issues, etc. + */ +export function errorResponse( + c: Context, + error: string, + status: ContentfulStatusCode = 500, + extra?: { code?: string; details?: unknown } +) { + const body: ErrorBody = { error }; + if (extra?.code) body.code = extra.code; + if (extra?.details !== undefined) body.details = extra.details; + return c.json(body, status); +} + +/** + * Return a validation error response (400) with structured issues. + * + * Convenience over `errorResponse` for the common Zod case — extracts + * the first error message as the human string and attaches the full + * issue list under `details`. + */ +export function validationError(c: Context, issues: unknown[], status: ContentfulStatusCode = 400) { + const firstMessage = + Array.isArray(issues) && + issues.length > 0 && + typeof issues[0] === 'object' && + issues[0] !== null && + 'message' in issues[0] + ? String((issues[0] as { message: unknown }).message) + : 'Invalid input'; + return errorResponse(c, firstMessage, status, { code: 'VALIDATION', details: issues }); +} + +/** + * Return a standard list response. Always wraps in `{ items, count }`, + * regardless of what `items` are. This is the *opposite* of the current + * convention where each module names its own field + * (`{ events, count }`, `{ contacts, count }`) — frontends benefit + * from a single uniform unwrap step. + */ +export function listResponse(c: Context, items: T[], status: ContentfulStatusCode = 200) { + const body: ListBody = { items, count: items.length }; + return c.json(body, status); +} diff --git a/apps/api/src/lib/search.ts b/apps/api/src/lib/search.ts new file mode 100644 index 000000000..33e787c6a --- /dev/null +++ b/apps/api/src/lib/search.ts @@ -0,0 +1,120 @@ +/** + * Thin client for the mana-search Go service. + * + * Two helpers, scoped to what the research orchestrator needs: + * + * webSearch() — POST /api/v1/search, returns ranked SearXNG results. + * bulkExtract() — POST /api/v1/extract/bulk, returns Readability text per URL. + * + * Internal service-to-service calls — no auth on the wire (private network). + */ + +const SEARCH_URL = process.env.MANA_SEARCH_URL || 'http://localhost:3021'; + +export interface SearchHit { + url: string; + title: string; + snippet: string; + engine: string; + score: number; + publishedDate?: string; + category: string; +} + +export interface ExtractedContent { + title: string; + description?: string; + author?: string; + publishedDate?: string; + siteName?: string; + text: string; + wordCount: number; +} + +export interface BulkExtractResult { + url: string; + success: boolean; + content?: ExtractedContent; + error?: string; +} + +export class SearchError extends Error { + constructor( + message: string, + public readonly status?: number + ) { + super(message); + this.name = 'SearchError'; + } +} + +export interface WebSearchOptions { + query: string; + limit?: number; + categories?: string[]; // 'general' | 'news' | 'science' | 'it' + language?: string; + signal?: AbortSignal; +} + +/** Run one SearXNG query via mana-search and return normalised hits. */ +export async function webSearch(opts: WebSearchOptions): Promise { + const res = await fetch(`${SEARCH_URL}/api/v1/search`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + query: opts.query, + options: { + limit: opts.limit ?? 10, + categories: opts.categories, + language: opts.language ?? 'de-DE', + }, + }), + signal: opts.signal, + }); + + if (!res.ok) { + throw new SearchError(`mana-search returned ${res.status}`, res.status); + } + + const data = (await res.json()) as { results?: SearchHit[] }; + return data.results ?? []; +} + +/** + * Extract Readability content for a batch of URLs in parallel server-side. + * mana-search caps at 20 URLs per call; we slice if more come in. + */ +export async function bulkExtract( + urls: string[], + opts: { maxLength?: number; concurrency?: number; signal?: AbortSignal } = {} +): Promise { + if (urls.length === 0) return []; + + const batches: string[][] = []; + for (let i = 0; i < urls.length; i += 20) batches.push(urls.slice(i, i + 20)); + + const all: BulkExtractResult[] = []; + for (const batch of batches) { + const res = await fetch(`${SEARCH_URL}/api/v1/extract/bulk`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + urls: batch, + concurrency: opts.concurrency ?? 5, + options: { + maxLength: opts.maxLength ?? 8000, + }, + }), + signal: opts.signal, + }); + + if (!res.ok) { + throw new SearchError(`mana-search bulk extract returned ${res.status}`, res.status); + } + + const data = (await res.json()) as { results?: BulkExtractResult[] }; + if (data.results) all.push(...data.results); + } + + return all; +} diff --git a/apps/api/src/modules/research/orchestrator.ts b/apps/api/src/modules/research/orchestrator.ts new file mode 100644 index 000000000..c2809d1a4 --- /dev/null +++ b/apps/api/src/modules/research/orchestrator.ts @@ -0,0 +1,366 @@ +/** + * Research orchestrator — three linear phases: + * + * 1. Plan — mana-llm produces N sub-queries (JSON) + * 2. Retrieve — mana-search runs each sub-query in parallel, + * deduplicates, optionally extracts full text + * 3. Synthesise — mana-llm streams a structured answer (summary, + * key points, follow-ups) over the source corpus + * + * Each phase persists its progress to research_results/sources so a + * caller can either await the whole thing (sync mode) or subscribe to + * progress events (will land in routes.ts via a small in-process pubsub). + * + * Errors flip status='error' and surface errorMessage; they never throw + * past runPipeline() so background invocations don't crash the worker. + */ + +import { eq } from 'drizzle-orm'; +import { db, researchResults, sources, type ResearchDepth } from './schema'; +import { llmJson, llmStream, LlmError } from '../../lib/llm'; +import { webSearch, bulkExtract, type SearchHit, SearchError } from '../../lib/search'; + +// ─── Depth configuration ──────────────────────────────────── + +interface DepthConfig { + subQueryCount: number; + hitsPerQuery: number; + maxSources: number; + extract: boolean; + categories: string[]; + planModel: string; + synthModel: string; +} + +const DEPTH_CONFIG: Record = { + quick: { + subQueryCount: 1, + hitsPerQuery: 5, + maxSources: 5, + extract: false, + categories: ['general'], + planModel: 'ollama/gemma3:4b', + synthModel: 'ollama/gemma3:4b', + }, + standard: { + subQueryCount: 3, + hitsPerQuery: 8, + maxSources: 15, + extract: true, + categories: ['general', 'news'], + planModel: 'ollama/gemma3:4b', + synthModel: 'ollama/gemma3:12b', + }, + deep: { + subQueryCount: 6, + hitsPerQuery: 8, + maxSources: 30, + extract: true, + categories: ['general', 'news', 'science', 'it'], + planModel: 'ollama/gemma3:12b', + synthModel: 'ollama/gemma3:12b', + }, +}; + +// ─── Progress events (consumed by routes.ts pubsub later) ─── + +export type ProgressEvent = + | { type: 'status'; status: 'planning' | 'searching' | 'extracting' | 'synthesizing' } + | { type: 'plan'; subQueries: string[] } + | { type: 'sources'; count: number } + | { type: 'token'; delta: string } + | { type: 'done'; researchResultId: string } + | { type: 'error'; message: string }; + +export type ProgressEmitter = (event: ProgressEvent) => void; + +const noop: ProgressEmitter = () => {}; + +// ─── Pipeline input ───────────────────────────────────────── + +export interface PipelineInput { + researchResultId: string; + questionTitle: string; + questionDescription?: string; + depth: ResearchDepth; +} + +// ─── Synthesis JSON shape ─────────────────────────────────── + +interface SynthesisPayload { + summary: string; + keyPoints: string[]; + followUps: string[]; +} + +// ─── Public entrypoint ────────────────────────────────────── + +/** + * Run the full pipeline. Resolves once the row is in `done` or `error` + * state. Never throws — all failures are caught and persisted. + */ +export async function runPipeline( + input: PipelineInput, + emit: ProgressEmitter = noop +): Promise { + const cfg = DEPTH_CONFIG[input.depth]; + const id = input.researchResultId; + + try { + // ─── Phase 1: Plan ───────────────────────────────── + await setStatus(id, 'planning'); + emit({ type: 'status', status: 'planning' }); + + const subQueries = await planSubQueries(input, cfg); + await db.update(researchResults).set({ subQueries }).where(eq(researchResults.id, id)); + emit({ type: 'plan', subQueries }); + + // ─── Phase 2: Retrieve ───────────────────────────── + await setStatus(id, 'searching'); + emit({ type: 'status', status: 'searching' }); + + const hits = await runSearches(subQueries, cfg); + const ranked = dedupeAndRank(hits).slice(0, cfg.maxSources); + + let enriched = ranked.map((h) => ({ + hit: h, + extractedText: undefined as string | undefined, + })); + + if (cfg.extract && ranked.length > 0) { + await setStatus(id, 'extracting'); + emit({ type: 'status', status: 'extracting' }); + + const extracts = await bulkExtract( + ranked.map((h) => h.url), + { maxLength: 8000 } + ); + const byUrl = new Map(extracts.map((e) => [e.url, e])); + enriched = ranked.map((h) => ({ + hit: h, + extractedText: byUrl.get(h.url)?.content?.text, + })); + } + + // Persist sources with stable rank order so citations [n] map to sources[n-1]. + await db.insert(sources).values( + enriched.map((e, idx) => ({ + researchResultId: id, + url: e.hit.url, + title: e.hit.title, + snippet: e.hit.snippet, + extractedContent: e.extractedText, + category: e.hit.category, + rank: idx + 1, + })) + ); + emit({ type: 'sources', count: enriched.length }); + + // ─── Phase 3: Synthesise ─────────────────────────── + await setStatus(id, 'synthesizing'); + emit({ type: 'status', status: 'synthesizing' }); + + const synthesis = await synthesise(input, enriched, cfg, emit); + + await db + .update(researchResults) + .set({ + status: 'done', + summary: synthesis.summary, + keyPoints: synthesis.keyPoints, + followUpQuestions: synthesis.followUps, + finishedAt: new Date(), + }) + .where(eq(researchResults.id, id)); + + emit({ type: 'done', researchResultId: id }); + } catch (err) { + const message = formatError(err); + console.error(`[research:${id}] pipeline failed:`, err); + await db + .update(researchResults) + .set({ status: 'error', errorMessage: message, finishedAt: new Date() }) + .where(eq(researchResults.id, id)) + .catch(() => {}); + emit({ type: 'error', message }); + } +} + +// ─── Phase 1: Plan ────────────────────────────────────────── + +async function planSubQueries(input: PipelineInput, cfg: DepthConfig): Promise { + if (cfg.subQueryCount === 1) { + // Cheap path: skip the LLM round-trip, just use the question itself. + return [input.questionTitle]; + } + + const system = + 'Du planst eine Web-Recherche. Antworte ausschließlich als JSON-Objekt mit dem Schlüssel "subQueries" (Array aus Strings). Kein Fließtext, kein Markdown.'; + + const user = [ + `Frage: ${input.questionTitle}`, + input.questionDescription ? `Kontext: ${input.questionDescription}` : null, + '', + `Erzeuge genau ${cfg.subQueryCount} präzise, sich gegenseitig ergänzende Web-Suchanfragen.`, + 'Mische deutsche und englische Anfragen, wenn das die Trefferqualität verbessert.', + 'Jede Anfrage soll einen anderen Aspekt der Frage abdecken.', + ] + .filter(Boolean) + .join('\n'); + + const result = await llmJson<{ subQueries?: unknown }>({ + model: cfg.planModel, + system, + user, + temperature: 0.3, + maxTokens: 400, + }); + + const queries = Array.isArray(result.subQueries) + ? result.subQueries.filter((q): q is string => typeof q === 'string' && q.trim().length > 0) + : []; + + if (queries.length === 0) { + // Fallback: don't fail the whole run because the planner produced garbage. + return [input.questionTitle]; + } + + return queries.slice(0, cfg.subQueryCount); +} + +// ─── Phase 2: Retrieve ────────────────────────────────────── + +async function runSearches(queries: string[], cfg: DepthConfig): Promise { + const results = await Promise.allSettled( + queries.map((q) => + webSearch({ + query: q, + limit: cfg.hitsPerQuery, + categories: cfg.categories, + }) + ) + ); + + const hits: SearchHit[] = []; + for (const r of results) { + if (r.status === 'fulfilled') hits.push(...r.value); + else console.warn('[research] sub-query failed:', r.reason); + } + return hits; +} + +/** + * Deduplicate by URL, keeping the highest-scored hit per URL. + * Sort by score descending so the best sources land at the top of the prompt. + */ +function dedupeAndRank(hits: SearchHit[]): SearchHit[] { + const byUrl = new Map(); + for (const h of hits) { + const existing = byUrl.get(h.url); + if (!existing || h.score > existing.score) byUrl.set(h.url, h); + } + return [...byUrl.values()].sort((a, b) => b.score - a.score); +} + +// ─── Phase 3: Synthesise ──────────────────────────────────── + +async function synthesise( + input: PipelineInput, + enriched: Array<{ hit: SearchHit; extractedText?: string }>, + cfg: DepthConfig, + emit: ProgressEmitter +): Promise { + const context = enriched + .map((e, i) => { + const body = e.extractedText ?? e.hit.snippet ?? ''; + return `[${i + 1}] ${e.hit.title}\n${e.hit.url}\n${truncate(body, 2000)}`; + }) + .join('\n\n---\n\n'); + + const system = [ + 'Du bist ein gründlicher Research-Assistent.', + 'Antworte ausschließlich als JSON-Objekt mit dieser exakten Form:', + '{ "summary": string, "keyPoints": string[], "followUps": string[] }', + '', + 'Regeln:', + '- summary: 2–4 Absätze auf Deutsch, jeder belegbare Claim bekommt eine Citation [n], die auf die Quellen-Nummer verweist.', + '- keyPoints: 3–6 Stichpunkte, jeweils mit mindestens einer [n]-Citation.', + '- followUps: 2–4 weiterführende Fragen, ohne Citations.', + '- Verwende ausschließlich Informationen aus den bereitgestellten Quellen. Wenn die Quellen die Frage nicht beantworten, sag das im summary.', + '- Kein Markdown, keine Code-Fences, nur reines JSON.', + ].join('\n'); + + const user = [ + `Frage: ${input.questionTitle}`, + input.questionDescription ? `Kontext: ${input.questionDescription}` : null, + '', + 'Quellen:', + context, + ] + .filter(Boolean) + .join('\n'); + + // We stream tokens to the client for live UI feedback, then parse the + // fully-collected text as JSON. The final structured payload is what + // gets persisted; the live tokens are just visual progress. + const fullText = await llmStream({ + model: cfg.synthModel, + system, + user, + temperature: 0.4, + maxTokens: 2000, + onToken: (delta) => emit({ type: 'token', delta }), + }); + + return parseSynthesis(fullText); +} + +function parseSynthesis(raw: string): SynthesisPayload { + const trimmed = stripCodeFence(raw.trim()); + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch { + // Last-ditch fallback: surface the raw text as the summary so the + // user at least sees what the model produced. + return { summary: raw.trim(), keyPoints: [], followUps: [] }; + } + + const obj = (parsed ?? {}) as Record; + return { + summary: typeof obj.summary === 'string' ? obj.summary : '', + keyPoints: Array.isArray(obj.keyPoints) + ? obj.keyPoints.filter((k): k is string => typeof k === 'string') + : [], + followUps: Array.isArray(obj.followUps) + ? obj.followUps.filter((k): k is string => typeof k === 'string') + : [], + }; +} + +// ─── Helpers ──────────────────────────────────────────────── + +async function setStatus( + id: string, + status: 'planning' | 'searching' | 'extracting' | 'synthesizing' +): Promise { + await db.update(researchResults).set({ status }).where(eq(researchResults.id, id)); +} + +function truncate(s: string, max: number): string { + if (s.length <= max) return s; + return s.slice(0, max) + '…'; +} + +function stripCodeFence(text: string): string { + if (!text.startsWith('```')) return text; + const withoutOpen = text.replace(/^```(?:json)?\s*\n?/, ''); + return withoutOpen.replace(/\n?```\s*$/, ''); +} + +function formatError(err: unknown): string { + if (err instanceof LlmError) return `LLM: ${err.message}`; + if (err instanceof SearchError) return `Search: ${err.message}`; + if (err instanceof Error) return err.message; + return String(err); +} diff --git a/apps/api/src/modules/research/pubsub.ts b/apps/api/src/modules/research/pubsub.ts new file mode 100644 index 000000000..1f3e3fea2 --- /dev/null +++ b/apps/api/src/modules/research/pubsub.ts @@ -0,0 +1,64 @@ +/** + * In-process pubsub for research progress events. + * + * Single-node only — keeps a Map> in + * memory. When apps/api scales horizontally, swap this for a Redis Pub/Sub + * implementation behind the same publish/subscribe interface. + * + * Subscribers are kept until either the pipeline emits a terminal event + * (`done` / `error`) or the consumer unsubscribes (e.g. SSE client closed). + * Late subscribers do NOT receive backfilled events — the routes layer is + * expected to read the current DB state once before subscribing. + */ + +import type { ProgressEvent } from './orchestrator'; + +type Subscriber = (event: ProgressEvent) => void; + +const channels = new Map>(); + +/** + * Publish an event to all current subscribers of `researchResultId`. + * Subscriber callbacks are wrapped in try/catch so a single misbehaving + * listener cannot block the orchestrator's progress. + */ +export function publish(researchResultId: string, event: ProgressEvent): void { + const subs = channels.get(researchResultId); + if (!subs) return; + for (const sub of subs) { + try { + sub(event); + } catch (err) { + console.error(`[research:pubsub] subscriber threw on ${event.type}:`, err); + } + } +} + +/** + * Subscribe to events for `researchResultId`. Returns an unsubscribe fn. + * The channel is GC'd once the last subscriber leaves. + */ +export function subscribe(researchResultId: string, fn: Subscriber): () => void { + let subs = channels.get(researchResultId); + if (!subs) { + subs = new Set(); + channels.set(researchResultId, subs); + } + subs.add(fn); + + return () => { + const current = channels.get(researchResultId); + if (!current) return; + current.delete(fn); + if (current.size === 0) channels.delete(researchResultId); + }; +} + +/** + * Drop a channel entirely. Called by the orchestrator wrapper after a + * terminal event has been published, so any leftover subscribers (e.g. a + * lingering SSE connection that hasn't ticked yet) get cleaned up. + */ +export function closeChannel(researchResultId: string): void { + channels.delete(researchResultId); +} diff --git a/apps/api/src/modules/research/routes.ts b/apps/api/src/modules/research/routes.ts new file mode 100644 index 000000000..b1fdbe600 --- /dev/null +++ b/apps/api/src/modules/research/routes.ts @@ -0,0 +1,302 @@ +/** + * Research module — HTTP routes. + * + * POST /api/v1/research/start — fire-and-forget, returns + * { researchResultId } immediately. + * The pipeline runs in the + * background and emits progress + * events via the in-process pubsub. + * GET /api/v1/research/:id/stream — SSE for live progress. Sends an + * initial `snapshot` event with the + * current DB state, then forwards + * pubsub events until `done`/`error`. + * POST /api/v1/research/start-sync — synchronous variant, blocks + * until the pipeline finishes. + * Kept for end-to-end testing. + * GET /api/v1/research/:id — fetch a research_results row. + * GET /api/v1/research/:id/sources — fetch all sources for a run. + */ + +import { Hono } from 'hono'; +import { streamSSE } from 'hono/streaming'; +import { z } from 'zod'; +import { and, asc, eq } from 'drizzle-orm'; +import { db, researchResults, sources } from './schema'; +import { runPipeline, type ProgressEvent } from './orchestrator'; +import { publish, subscribe, closeChannel } from './pubsub'; +import { errorResponse, listResponse, validationError } from '../../lib/responses'; +import type { AuthVariables } from '@mana/shared-hono'; +import { validateCredits, consumeCredits } from '@mana/shared-hono/credits'; +import type { ResearchDepth } from './schema'; + +const routes = new Hono<{ Variables: AuthVariables }>(); + +const StartSchema = z.object({ + questionId: z.string().min(1).max(200), + title: z.string().min(3).max(500), + description: z.string().max(4000).optional(), + depth: z.enum(['quick', 'standard', 'deep']), +}); + +/** + * Credit cost per research depth. Loosely calibrated against the chat + * module: a single local Ollama call costs 0.1 credits there, so a + * `quick` run (one search + one synthesis pass on Ollama) at 1 credit is + * roughly 10× a single chat completion. `deep` is 15× because it does + * 6 sub-queries × 8 hits + bulk extraction + a long synthesis pass. + */ +const RESEARCH_COST: Record = { + quick: 1, + standard: 5, + deep: 15, +}; + +const RESEARCH_OPERATION = 'AI_RESEARCH'; + +// ─── POST /start ──────────────────────────────────────────── +// Fire-and-forget: returns immediately, pipeline runs in background and +// emits progress through pubsub. Subscribe via GET /:id/stream. + +routes.post('/start', async (c) => { + const userId = c.get('userId'); + const parsed = StartSchema.safeParse(await c.req.json().catch(() => null)); + if (!parsed.success) { + return validationError(c, parsed.error.issues); + } + + // Reserve credits up-front. We don't deduct yet — credits are consumed + // only when the pipeline reaches `done`. Failed runs cost nothing, + // which keeps the UX forgiving for transient errors. + const cost = RESEARCH_COST[parsed.data.depth]; + const validation = await validateCredits(userId, RESEARCH_OPERATION, cost); + if (!validation.hasCredits) { + return errorResponse(c, 'Insufficient credits', 402, { + code: 'INSUFFICIENT_CREDITS', + details: { + required: cost, + available: validation.availableCredits, + }, + }); + } + + const [row] = await db + .insert(researchResults) + .values({ + userId, + questionId: parsed.data.questionId, + depth: parsed.data.depth, + status: 'planning', + }) + .returning(); + + void runPipelineWithPubsub(row.id, userId, parsed.data); + + return c.json({ researchResultId: row.id, cost }, 202); +}); + +/** + * Wraps runPipeline so every event is published to the in-process pubsub, + * credits are consumed only on success, and the channel is closed once a + * terminal event has gone out. + */ +async function runPipelineWithPubsub( + researchResultId: string, + userId: string, + input: z.infer +): Promise { + let succeeded = false; + const emit = (event: ProgressEvent) => { + if (event.type === 'done') succeeded = true; + publish(researchResultId, event); + }; + + try { + await runPipeline( + { + researchResultId, + questionTitle: input.title, + questionDescription: input.description, + depth: input.depth, + }, + emit + ); + + // Best-effort consume — log on failure but don't block the user. + // The DB row is already in `done` state at this point. + if (succeeded) { + const cost = RESEARCH_COST[input.depth]; + const ok = await consumeCredits( + userId, + RESEARCH_OPERATION, + cost, + `Research (${input.depth}): ${input.title.slice(0, 80)}`, + { researchResultId, depth: input.depth } + ); + if (!ok) { + console.warn( + `[research:${researchResultId}] consumeCredits failed — pipeline already finished, leaving as-is` + ); + } + } + } finally { + // Give SSE consumers a tick to flush the terminal event before we + // drop the channel. + setTimeout(() => closeChannel(researchResultId), 1000); + } +} + +// ─── GET /:id/stream ──────────────────────────────────────── +// SSE: emit current DB snapshot first (so late subscribers don't miss the +// run if they connect after `done`), then forward pubsub events until the +// pipeline reaches a terminal state or the client disconnects. + +routes.get('/:id/stream', async (c) => { + const userId = c.get('userId'); + const id = c.req.param('id'); + + const [row] = await db + .select() + .from(researchResults) + .where(and(eq(researchResults.id, id), eq(researchResults.userId, userId))) + .limit(1); + + if (!row) return errorResponse(c, 'not found', 404, { code: 'NOT_FOUND' }); + + return streamSSE(c, async (stream) => { + let closed = false; + const queue: ProgressEvent[] = []; + let resolveNext: (() => void) | null = null; + + const wake = () => { + if (resolveNext) { + const r = resolveNext; + resolveNext = null; + r(); + } + }; + + const unsubscribe = subscribe(id, (event) => { + queue.push(event); + wake(); + }); + + stream.onAbort(() => { + closed = true; + unsubscribe(); + wake(); + }); + + // 1. Initial snapshot — so a client that reconnects mid-run (or + // after a finished run) immediately sees state. + await stream.writeSSE({ + event: 'snapshot', + data: JSON.stringify(row), + }); + + // If the run is already terminal, just close. + if (row.status === 'done' || row.status === 'error') { + unsubscribe(); + return; + } + + // 2. Drain the live event queue until terminal. + while (!closed) { + while (queue.length > 0) { + const event = queue.shift()!; + await stream.writeSSE({ + event: event.type, + data: JSON.stringify(event), + }); + if (event.type === 'done' || event.type === 'error') { + closed = true; + break; + } + } + if (closed) break; + await new Promise((resolve) => { + resolveNext = resolve; + }); + } + + unsubscribe(); + }); +}); + +// ─── POST /start-sync ─────────────────────────────────────── +// Synchronous variant, blocks until the pipeline finishes. Useful for +// end-to-end smoke tests against real mana-search + mana-llm. + +routes.post('/start-sync', async (c) => { + const userId = c.get('userId'); + const parsed = StartSchema.safeParse(await c.req.json().catch(() => null)); + if (!parsed.success) { + return validationError(c, parsed.error.issues); + } + + const [row] = await db + .insert(researchResults) + .values({ + userId, + questionId: parsed.data.questionId, + depth: parsed.data.depth, + status: 'planning', + }) + .returning(); + + await runPipeline({ + researchResultId: row.id, + questionTitle: parsed.data.title, + questionDescription: parsed.data.description, + depth: parsed.data.depth, + }); + + const [finished] = await db + .select() + .from(researchResults) + .where(eq(researchResults.id, row.id)) + .limit(1); + + return c.json(finished); +}); + +// ─── GET /:id ─────────────────────────────────────────────── + +routes.get('/:id', async (c) => { + const userId = c.get('userId'); + const id = c.req.param('id'); + + const [row] = await db + .select() + .from(researchResults) + .where(and(eq(researchResults.id, id), eq(researchResults.userId, userId))) + .limit(1); + + if (!row) return errorResponse(c, 'not found', 404, { code: 'NOT_FOUND' }); + return c.json(row); +}); + +// ─── GET /:id/sources ─────────────────────────────────────── + +routes.get('/:id/sources', async (c) => { + const userId = c.get('userId'); + const id = c.req.param('id'); + + // Ownership check via the parent research_results row. + const [parent] = await db + .select({ id: researchResults.id }) + .from(researchResults) + .where(and(eq(researchResults.id, id), eq(researchResults.userId, userId))) + .limit(1); + + if (!parent) return errorResponse(c, 'not found', 404, { code: 'NOT_FOUND' }); + + const rows = await db + .select() + .from(sources) + .where(eq(sources.researchResultId, id)) + .orderBy(asc(sources.rank)); + + return listResponse(c, rows); +}); + +export { routes as researchRoutes }; diff --git a/apps/api/src/modules/research/schema.ts b/apps/api/src/modules/research/schema.ts new file mode 100644 index 000000000..9b4ceacb2 --- /dev/null +++ b/apps/api/src/modules/research/schema.ts @@ -0,0 +1,73 @@ +/** + * Research module — DB schema (Drizzle / pgSchema 'research') + * + * Server-side store for deep-research runs orchestrated by apps/api. + * Lives in mana_platform under its own pgSchema. + * + * - research_results: one row per research run, holds plan + final synthesis + * - sources: one row per web source consumed by a run + * + * The local-first questions module references research_results.id from + * LocalAnswer.researchResultId; sources are fetched on-demand via the API + * and never mirrored into IndexedDB (they're public web content). + */ + +import { drizzle } from 'drizzle-orm/postgres-js'; +import postgres from 'postgres'; +import { pgSchema, uuid, text, timestamp, integer, jsonb } from 'drizzle-orm/pg-core'; + +const DATABASE_URL = + process.env.DATABASE_URL ?? 'postgresql://mana:devpassword@localhost:5432/mana_platform'; + +export const researchSchema = pgSchema('research'); + +/** + * One row per research run. Created in `planning` state immediately on + * /start, then updated as the orchestrator advances through phases. + */ +export const researchResults = researchSchema.table('research_results', { + id: uuid('id').defaultRandom().primaryKey(), + userId: text('user_id').notNull(), + questionId: text('question_id').notNull(), // mirrors local LocalQuestion.id (UUID) + depth: text('depth').notNull(), // 'quick' | 'standard' | 'deep' + status: text('status').notNull(), // 'planning' | 'searching' | 'extracting' | 'synthesizing' | 'done' | 'error' + subQueries: jsonb('sub_queries').$type(), + summary: text('summary'), + keyPoints: jsonb('key_points').$type(), + followUpQuestions: jsonb('follow_up_questions').$type(), + errorMessage: text('error_message'), + startedAt: timestamp('started_at', { withTimezone: true }).defaultNow().notNull(), + finishedAt: timestamp('finished_at', { withTimezone: true }), +}); + +/** + * Sources consumed during a research run. Rank reflects ordering in the + * synthesis prompt so citation [n] in the summary maps to sources[n-1]. + */ +export const sources = researchSchema.table('sources', { + id: uuid('id').defaultRandom().primaryKey(), + researchResultId: uuid('research_result_id') + .notNull() + .references(() => researchResults.id, { onDelete: 'cascade' }), + url: text('url').notNull(), + title: text('title'), + snippet: text('snippet'), + extractedContent: text('extracted_content'), + category: text('category'), + rank: integer('rank').notNull(), + createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(), +}); + +const connection = postgres(DATABASE_URL, { max: 5, idle_timeout: 20 }); +export const db = drizzle(connection, { schema: { researchResults, sources } }); + +export type ResearchResult = typeof researchResults.$inferSelect; +export type Source = typeof sources.$inferSelect; +export type ResearchDepth = 'quick' | 'standard' | 'deep'; +export type ResearchStatus = + | 'planning' + | 'searching' + | 'extracting' + | 'synthesizing' + | 'done' + | 'error'; diff --git a/apps/mana/apps/web/src/hooks.server.ts b/apps/mana/apps/web/src/hooks.server.ts index d1f46cb49..f20bb2e88 100644 --- a/apps/mana/apps/web/src/hooks.server.ts +++ b/apps/mana/apps/web/src/hooks.server.ts @@ -43,6 +43,8 @@ const PUBLIC_MANA_LLM_URL_CLIENT = process.env.PUBLIC_MANA_LLM_URL_CLIENT || process.env.PUBLIC_MANA_LLM_URL || ''; const PUBLIC_MANA_EVENTS_URL_CLIENT = process.env.PUBLIC_MANA_EVENTS_URL_CLIENT || process.env.PUBLIC_MANA_EVENTS_URL || ''; +const PUBLIC_MANA_API_URL_CLIENT = + process.env.PUBLIC_MANA_API_URL_CLIENT || process.env.PUBLIC_MANA_API_URL || ''; // Map of app subdomains to internal paths const APP_SUBDOMAINS = new Set([ @@ -92,6 +94,7 @@ window.__PUBLIC_ULOAD_SERVER_URL__ = ${JSON.stringify(PUBLIC_ULOAD_SERVER_URL_CL window.__PUBLIC_MANA_MEDIA_URL__ = ${JSON.stringify(PUBLIC_MANA_MEDIA_URL_CLIENT)}; window.__PUBLIC_MANA_LLM_URL__ = ${JSON.stringify(PUBLIC_MANA_LLM_URL_CLIENT)}; window.__PUBLIC_MANA_EVENTS_URL__ = ${JSON.stringify(PUBLIC_MANA_EVENTS_URL_CLIENT)}; +window.__PUBLIC_MANA_API_URL__ = ${JSON.stringify(PUBLIC_MANA_API_URL_CLIENT)}; window.__PUBLIC_GLITCHTIP_DSN__ = ${JSON.stringify(PUBLIC_GLITCHTIP_DSN)}; `; return injectUmamiAnalytics(html.replace('', `${envScript}`)); @@ -107,6 +110,7 @@ window.__PUBLIC_GLITCHTIP_DSN__ = ${JSON.stringify(PUBLIC_GLITCHTIP_DSN)}; PUBLIC_MANA_MEDIA_URL_CLIENT, PUBLIC_MANA_LLM_URL_CLIENT, PUBLIC_MANA_EVENTS_URL_CLIENT, + PUBLIC_MANA_API_URL_CLIENT, 'wss://sync.mana.how', // @mana/local-llm (WebLLM) downloads model weights + config from // the mlc-ai HuggingFace repos and the WebGPU model library WASM diff --git a/apps/mana/apps/web/src/lib/api/config.ts b/apps/mana/apps/web/src/lib/api/config.ts index 8874ba75e..456ec2d54 100644 --- a/apps/mana/apps/web/src/lib/api/config.ts +++ b/apps/mana/apps/web/src/lib/api/config.ts @@ -33,3 +33,16 @@ export function getManaEventsUrl(): string { } return process.env.PUBLIC_MANA_EVENTS_URL || 'http://localhost:3065'; } + +/** + * Get the unified mana-api URL (Hono/Bun, port 3060 in dev). + * Hosts module-specific compute endpoints under /api/v1/{module}/*. + */ +export function getManaApiUrl(): string { + if (browser && typeof window !== 'undefined') { + const injected = (window as unknown as { __PUBLIC_MANA_API_URL__?: string }) + .__PUBLIC_MANA_API_URL__; + return injected || 'http://localhost:3060'; + } + return process.env.PUBLIC_MANA_API_URL || 'http://localhost:3060'; +} diff --git a/apps/mana/apps/web/src/lib/api/research.ts b/apps/mana/apps/web/src/lib/api/research.ts new file mode 100644 index 000000000..8105090ec --- /dev/null +++ b/apps/mana/apps/web/src/lib/api/research.ts @@ -0,0 +1,255 @@ +/** + * Research API client — talks to mana-api `/api/v1/research/*`. + * + * Backed by the unified deep-research pipeline (mana-search retrieval + + * mana-llm synthesis). See apps/api/src/modules/research/ for the server. + * + * The streaming endpoint uses `fetch` + a ReadableStream parser instead of + * `EventSource` because EventSource cannot send Authorization headers, + * and we don't want to leak the JWT into a query string. + */ + +import { authStore } from '$lib/stores/auth.svelte'; +import { getManaApiUrl } from './config'; + +// ─── Types — mirror apps/api/src/modules/research/{schema,orchestrator}.ts + +export type ResearchDepth = 'quick' | 'standard' | 'deep'; + +export type ResearchStatus = + | 'planning' + | 'searching' + | 'extracting' + | 'synthesizing' + | 'done' + | 'error'; + +export interface ResearchResult { + id: string; + userId: string; + questionId: string; + depth: ResearchDepth; + status: ResearchStatus; + subQueries: string[] | null; + summary: string | null; + keyPoints: string[] | null; + followUpQuestions: string[] | null; + errorMessage: string | null; + startedAt: string; + finishedAt: string | null; +} + +export interface ResearchSource { + id: string; + researchResultId: string; + url: string; + title: string | null; + snippet: string | null; + extractedContent: string | null; + category: string | null; + rank: number; + createdAt: string; +} + +export interface StartResearchInput { + questionId: string; + title: string; + description?: string; + depth: ResearchDepth; +} + +/** + * Live progress events forwarded from the server pubsub. Mirrors the + * `ProgressEvent` union in apps/api/src/modules/research/orchestrator.ts. + * + * `snapshot` is a synthetic event the server emits once at the start of + * the SSE stream so late subscribers see the current DB state immediately. + */ +export type ResearchEvent = + | { type: 'snapshot'; snapshot: ResearchResult } + | { type: 'status'; status: 'planning' | 'searching' | 'extracting' | 'synthesizing' } + | { type: 'plan'; subQueries: string[] } + | { type: 'sources'; count: number } + | { type: 'token'; delta: string } + | { type: 'done'; researchResultId: string } + | { type: 'error'; message: string }; + +export class ResearchApiError extends Error { + constructor( + message: string, + public readonly status?: number + ) { + super(message); + this.name = 'ResearchApiError'; + } +} + +// ─── Internal helpers ─────────────────────────────────────── + +async function authHeaders(extra: HeadersInit = {}): Promise { + const token = await authStore.getAccessToken(); + return { + 'Content-Type': 'application/json', + ...(token ? { Authorization: `Bearer ${token}` } : {}), + ...extra, + }; +} + +async function jsonRequest(path: string, init: RequestInit = {}): Promise { + const res = await fetch(`${getManaApiUrl()}${path}`, { + ...init, + headers: await authHeaders(init.headers), + }); + + if (!res.ok) { + const body = await res.text().catch(() => ''); + + // Special-case the structured errorResponse() body so the UI can + // show a friendly message for the most common failure modes. + if (res.status === 402) { + try { + const parsed = JSON.parse(body) as { + details?: { required?: number; available?: number }; + }; + const required = parsed.details?.required; + const available = parsed.details?.available; + if (required !== undefined) { + throw new ResearchApiError( + `Nicht genug Credits (benötigt: ${required}, verfügbar: ${available ?? 0})`, + 402 + ); + } + } catch (err) { + if (err instanceof ResearchApiError) throw err; + // Fall through to generic message + } + throw new ResearchApiError('Nicht genug Credits für diese Recherche', 402); + } + + throw new ResearchApiError(`mana-api ${path} returned ${res.status}: ${body}`, res.status); + } + + return (await res.json()) as T; +} + +// ─── Public API ───────────────────────────────────────────── + +export const researchApi = { + /** + * Kick off a research run. Returns immediately with the new + * researchResultId — the pipeline runs in the background. + */ + async start(input: StartResearchInput): Promise<{ researchResultId: string }> { + return jsonRequest('/api/v1/research/start', { + method: 'POST', + body: JSON.stringify(input), + }); + }, + + /** Fetch a single research result row by id. */ + async get(researchResultId: string): Promise { + return jsonRequest(`/api/v1/research/${researchResultId}`); + }, + + /** Fetch all sources consumed by a research run, ordered by rank. */ + async listSources(researchResultId: string): Promise { + const body = await jsonRequest<{ items: ResearchSource[] } | ResearchSource[]>( + `/api/v1/research/${researchResultId}/sources` + ); + // listResponse() in apps/api wraps results as { items, total } — + // fall back to a bare array for forward-compat. + if (Array.isArray(body)) return body; + return body.items ?? []; + }, + + /** + * Subscribe to live progress for a research run. Calls onEvent for + * each parsed SSE event. Returns a cleanup function that aborts the + * underlying fetch. + * + * Uses fetch+ReadableStream rather than EventSource so we can attach + * the JWT via Authorization header. + */ + streamProgress(researchResultId: string, onEvent: (event: ResearchEvent) => void): () => void { + const controller = new AbortController(); + + void (async () => { + try { + const res = await fetch(`${getManaApiUrl()}/api/v1/research/${researchResultId}/stream`, { + headers: await authHeaders({ Accept: 'text/event-stream' }), + signal: controller.signal, + }); + + if (!res.ok || !res.body) { + onEvent({ + type: 'error', + message: `Stream connect failed: ${res.status}`, + }); + return; + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + + // SSE frames are separated by blank lines. + let sep: number; + while ((sep = buffer.indexOf('\n\n')) !== -1) { + const frame = buffer.slice(0, sep); + buffer = buffer.slice(sep + 2); + const parsed = parseSseFrame(frame); + if (parsed) onEvent(parsed); + } + } + } catch (err) { + if ((err as Error).name === 'AbortError') return; + onEvent({ + type: 'error', + message: (err as Error).message ?? 'stream failed', + }); + } + })(); + + return () => controller.abort(); + }, +}; + +/** + * Parse a single SSE frame (one or more `event:` / `data:` lines) into a + * ResearchEvent. Returns null for keepalives, comments, or unknown shapes. + * + * The server always emits both an `event:` line (the type) and a `data:` + * line (JSON-encoded full event). The data field is the source of truth + * — we use the event line only as a sanity check. + */ +function parseSseFrame(frame: string): ResearchEvent | null { + let dataLine = ''; + let eventLine = ''; + for (const line of frame.split('\n')) { + if (line.startsWith('data:')) dataLine = line.slice(5).trim(); + else if (line.startsWith('event:')) eventLine = line.slice(6).trim(); + } + if (!dataLine) return null; + + try { + const parsed = JSON.parse(dataLine) as Record; + + // snapshot uses a non-discriminated DB row shape — wrap it. + if (eventLine === 'snapshot') { + return { type: 'snapshot', snapshot: parsed as unknown as ResearchResult }; + } + + // All other events already carry their `type` field. + if (typeof parsed.type === 'string') { + return parsed as unknown as ResearchEvent; + } + } catch { + // malformed frame — ignore + } + return null; +} diff --git a/apps/mana/apps/web/src/lib/modules/questions/components/AnswerCitations.svelte b/apps/mana/apps/web/src/lib/modules/questions/components/AnswerCitations.svelte new file mode 100644 index 000000000..a8be67e36 --- /dev/null +++ b/apps/mana/apps/web/src/lib/modules/questions/components/AnswerCitations.svelte @@ -0,0 +1,195 @@ + + + +
+ {#each segments as segment, i (i)} + {#if segment.kind === 'text'}{segment.text}{:else} + {@const src = sourceForRank(segment.rank)} + + + {#if hoveredRank === segment.rank} + showPopover(segment.rank)} + onmouseleave={hidePopover} + > + {#if loading && !src} + Lade Quelle… + {:else if loadError} + Fehler: {loadError} + {:else if src} + + {src.title ?? hostFromUrl(src.url)} + + + {hostFromUrl(src.url)} + + {#if src.snippet} + + {src.snippet} + + {/if} + + Öffnen + + {:else if loaded} + + Quelle {segment.rank} nicht gefunden + + {/if} + + {/if} + + {/if} + {/each} +
+ + diff --git a/apps/mana/apps/web/src/lib/modules/questions/stores/answers.svelte.ts b/apps/mana/apps/web/src/lib/modules/questions/stores/answers.svelte.ts new file mode 100644 index 000000000..6ec91fd19 --- /dev/null +++ b/apps/mana/apps/web/src/lib/modules/questions/stores/answers.svelte.ts @@ -0,0 +1,265 @@ +/** + * Answers store — write-side mutations for question answers. + * + * Per the apps/mana/CLAUDE.md module pattern, reads happen in queries.ts; + * this file only mutates. Two flavours of answer creation: + * + * createManual() — user types an answer themselves. Plain Dexie write, + * encrypted before persist. + * + * startResearch() — kicks off the deep-research pipeline against + * mana-api, creates an optimistic empty answer row, + * and streams synthesis tokens into it as they arrive. + * Marks the question as 'researching' for the duration. + * + * Encryption note: every write goes through encryptRecord('answers', …) + * because the `answers` table is in the crypto registry. The token-stream + * path decrypts → appends → re-encrypts on each tick. That's wasteful for + * very chatty streams but keeps invariants simple, and synthesis output + * runs at LLM speed, not keystroke speed. + */ + +import { db } from '$lib/data/database'; +import { encryptRecord, decryptRecord } from '$lib/data/crypto'; +import { researchApi, type ResearchEvent, type ResearchSource } from '$lib/api/research'; +import type { LocalAnswer, LocalQuestion } from '../types'; + +// ─── Manual answer creation ───────────────────────────────── + +export interface CreateManualAnswerInput { + questionId: string; + content: string; +} + +async function createManual(input: CreateManualAnswerInput): Promise { + const now = new Date().toISOString(); + const id = crypto.randomUUID(); + const row: Record = { + id, + questionId: input.questionId, + researchResultId: null, + content: input.content, + citations: [], + rating: null, + isAccepted: false, + createdAt: now, + updatedAt: now, + }; + await encryptRecord('answers', row); + await db.table('answers').add(row); + return id; +} + +// ─── Research-driven answer ───────────────────────────────── + +export interface ResearchHandle { + answerId: string; + researchResultId: string; + /** Cancel the SSE subscription. Does NOT cancel the server-side run. */ + cancel: () => void; +} + +export interface StartResearchOptions { + question: LocalQuestion; + /** Optional progress callback for the UI (phase indicator etc.). */ + onEvent?: (event: ResearchEvent) => void; +} + +/** + * Start a research run for `question`. Creates an optimistic empty answer + * locally, opens an SSE stream to mana-api, and appends each streamed + * token into the answer row. When the run completes (`done`), the row is + * finalised with citations and the question is flipped to 'answered'. + * + * Failures flip the question back to 'open' and surface the error message + * via onEvent. The optimistic answer row is left in place so the user can + * see what was produced before things went sideways. + */ +async function startResearch(opts: StartResearchOptions): Promise { + const { question, onEvent } = opts; + + // 1. Mark the question as researching so the UI flips immediately. + await db.table('questions').update(question.id, { + status: 'researching', + updatedAt: new Date().toISOString(), + }); + + // 2. Kick off the server-side pipeline. + const { researchResultId } = await researchApi.start({ + questionId: question.id, + title: question.title, + description: question.description ?? undefined, + depth: question.researchDepth, + }); + + // 3. Create the optimistic, empty answer row that the stream will fill in. + const answerId = crypto.randomUUID(); + const now = new Date().toISOString(); + const draft: Record = { + id: answerId, + questionId: question.id, + researchResultId, + content: '', + citations: [], + rating: null, + isAccepted: false, + createdAt: now, + updatedAt: now, + }; + await encryptRecord('answers', draft); + await db.table('answers').add(draft); + + // 4. Subscribe to SSE. Buffer streamed tokens locally and flush them + // in small batches to avoid encrypting on every single token. + let pendingDelta = ''; + let flushScheduled = false; + + const flush = async () => { + flushScheduled = false; + if (!pendingDelta) return; + const delta = pendingDelta; + pendingDelta = ''; + + const existing = (await db.table('answers').get(answerId)) as + | LocalAnswer + | undefined; + if (!existing) return; + + const decrypted = (await decryptRecord('answers', { ...existing })) as LocalAnswer; + const updated: Record = { + content: (decrypted.content ?? '') + delta, + updatedAt: new Date().toISOString(), + }; + await encryptRecord('answers', updated); + await db.table('answers').update(answerId, updated); + }; + + const scheduleFlush = () => { + if (flushScheduled) return; + flushScheduled = true; + // 100ms debounce — synthesis output is fast enough that batching + // makes a real difference, slow enough that the UI still feels live. + setTimeout(() => { + void flush(); + }, 100); + }; + + const cancel = researchApi.streamProgress(researchResultId, async (event) => { + onEvent?.(event); + + switch (event.type) { + case 'token': { + pendingDelta += event.delta; + scheduleFlush(); + break; + } + case 'done': { + await flush(); + await finaliseAnswer(answerId, researchResultId, question.id); + cancel(); + break; + } + case 'error': { + await flush(); + await db.table('questions').update(question.id, { + status: 'open', + updatedAt: new Date().toISOString(), + }); + cancel(); + break; + } + } + }); + + return { answerId, researchResultId, cancel }; +} + +/** + * Replace the streamed-in raw text with the structured server-side + * payload (parsed summary) and attach citations resolved from the + * server-side sources table. Flips the parent question to 'answered'. + */ +async function finaliseAnswer( + answerId: string, + researchResultId: string, + questionId: string +): Promise { + let result; + let sources: ResearchSource[]; + try { + [result, sources] = await Promise.all([ + researchApi.get(researchResultId), + researchApi.listSources(researchResultId), + ]); + } catch (err) { + console.error('[answers] failed to finalise research answer:', err); + return; + } + + // Build the final content from the structured payload. We prefer the + // server-side parsed summary over the raw streamed tokens because the + // stream may contain JSON scaffolding (`{ "summary": "...`) that + // shouldn't be shown to the user. + const parts: string[] = []; + if (result.summary) parts.push(result.summary); + if (result.keyPoints && result.keyPoints.length > 0) { + parts.push('', '**Kernpunkte:**', ...result.keyPoints.map((k) => `- ${k}`)); + } + if (result.followUpQuestions && result.followUpQuestions.length > 0) { + parts.push('', '**Weiterführende Fragen:**', ...result.followUpQuestions.map((q) => `- ${q}`)); + } + const content = parts.join('\n'); + + // Citations[n].sourceId points at the server-side source UUID with rank n. + const citations = sources.map((s) => ({ + sourceId: s.id, + text: s.title ?? s.url, + })); + + const update: Record = { + content, + citations, + updatedAt: new Date().toISOString(), + }; + await encryptRecord('answers', update); + await db.table('answers').update(answerId, update); + + await db.table('questions').update(questionId, { + status: 'answered', + updatedAt: new Date().toISOString(), + }); +} + +// ─── Other mutations (acceptance / deletion) ──────────────── + +async function accept(answerId: string, questionId: string): Promise { + const all = (await db.table('answers').toArray()).filter( + (a) => a.questionId === questionId && !a.deletedAt + ); + for (const a of all) { + if (a.isAccepted) { + await db.table('answers').update(a.id, { + isAccepted: false, + updatedAt: new Date().toISOString(), + }); + } + } + await db.table('answers').update(answerId, { + isAccepted: true, + updatedAt: new Date().toISOString(), + }); +} + +async function softDelete(answerId: string): Promise { + await db.table('answers').update(answerId, { + deletedAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); +} + +export const answersStore = { + createManual, + startResearch, + accept, + softDelete, +}; diff --git a/apps/mana/apps/web/src/lib/modules/questions/stores/sources.svelte.ts b/apps/mana/apps/web/src/lib/modules/questions/stores/sources.svelte.ts new file mode 100644 index 000000000..69351218f --- /dev/null +++ b/apps/mana/apps/web/src/lib/modules/questions/stores/sources.svelte.ts @@ -0,0 +1,49 @@ +/** + * Session-scoped cache for research sources. + * + * Sources live exclusively on the server (research.sources table) — they + * are public web content and we deliberately don't sync them into + * IndexedDB. The downside: every time the user opens an answer with + * citations we'd otherwise re-hit /api/v1/research/:id/sources. + * + * This little store keeps the result in memory for the lifetime of the + * tab (no persistence) and de-duplicates concurrent fetches so opening + * three citation popovers in a row only triggers one network round-trip. + */ + +import { researchApi, type ResearchSource } from '$lib/api/research'; + +const cache = new Map(); +const inFlight = new Map>(); + +/** + * Fetch (or return cached) sources for a research run. Concurrent calls + * for the same id share the same underlying fetch. + */ +export async function loadSources(researchResultId: string): Promise { + const cached = cache.get(researchResultId); + if (cached) return cached; + + const pending = inFlight.get(researchResultId); + if (pending) return pending; + + const promise = researchApi + .listSources(researchResultId) + .then((sources) => { + cache.set(researchResultId, sources); + inFlight.delete(researchResultId); + return sources; + }) + .catch((err) => { + inFlight.delete(researchResultId); + throw err; + }); + + inFlight.set(researchResultId, promise); + return promise; +} + +/** Drop a single research run from the cache (e.g. after re-running). */ +export function invalidateSources(researchResultId: string): void { + cache.delete(researchResultId); +} diff --git a/apps/mana/apps/web/src/routes/(app)/questions/[id]/+page.svelte b/apps/mana/apps/web/src/routes/(app)/questions/[id]/+page.svelte index f4a3b95c0..6606cb724 100644 --- a/apps/mana/apps/web/src/routes/(app)/questions/[id]/+page.svelte +++ b/apps/mana/apps/web/src/routes/(app)/questions/[id]/+page.svelte @@ -9,6 +9,11 @@ getQuestionById, } from '$lib/modules/questions/queries'; import type { Question, Answer } from '$lib/modules/questions/queries'; + import { answersStore } from '$lib/modules/questions/stores/answers.svelte'; + import AnswerCitations from '$lib/modules/questions/components/AnswerCitations.svelte'; + import type { LocalQuestion } from '$lib/modules/questions/types'; + import type { ResearchEvent } from '$lib/api/research'; + import { toastStore } from '@mana/shared-ui/toast'; import { ArrowLeft, Clock, @@ -17,6 +22,8 @@ Archive, PencilSimple, Trash, + MagnifyingGlass, + ArrowCounterClockwise, } from '@mana/shared-icons'; const allQuestions = useAllQuestions(); @@ -34,6 +41,92 @@ let newAnswer = $state(''); let savingAnswer = $state(false); + // ─── Deep-research state ───────────────────────────────── + let researchHandle = $state<{ cancel: () => void } | null>(null); + let researchPhase = $state(null); + let researchSourceCount = $state(null); + + const phaseLabels: Record = { + planning: 'Plane Recherche…', + searching: 'Suche im Web…', + extracting: 'Lese Quellen…', + synthesizing: 'Schreibe Antwort…', + }; + + function resetResearchState() { + researchPhase = null; + researchSourceCount = null; + researchHandle = null; + } + + function handleResearchEvent(event: ResearchEvent) { + switch (event.type) { + case 'snapshot': + if (event.snapshot.status !== 'done' && event.snapshot.status !== 'error') { + researchPhase = event.snapshot.status; + } + break; + case 'status': + researchPhase = event.status; + break; + case 'sources': + researchSourceCount = event.count; + break; + case 'done': + resetResearchState(); + toastStore.success('Recherche abgeschlossen'); + break; + case 'error': + resetResearchState(); + toastStore.error(`Recherche fehlgeschlagen: ${event.message}`); + break; + } + } + + async function startResearchRun() { + if (!question || researchHandle) return; + const confirmed = confirm( + 'Diese Frage wird an Web-Suchmaschinen und LLM-Anbieter übermittelt. Lokale Verschlüsselung gilt nur für die Speicherung auf diesem Gerät. Recherche starten?' + ); + if (!confirmed) return; + + try { + researchHandle = await answersStore.startResearch({ + question: question as unknown as LocalQuestion, + onEvent: handleResearchEvent, + }); + } catch (err) { + researchHandle = null; + toastStore.error(`Recherche konnte nicht gestartet werden: ${(err as Error).message}`); + } + } + + /** + * Re-run research for a question that already has an answer. Soft-deletes + * any prior research-driven answers (manual ones are kept) and kicks off + * a fresh pipeline. Old sources stay on the server but are no longer + * referenced from the local store. + */ + async function rerunResearch() { + if (!question || researchHandle) return; + const confirmed = confirm( + 'Vorherige Recherche-Antworten werden in den Papierkorb verschoben. Erneut recherchieren?' + ); + if (!confirmed) return; + + const previous = (answers as Answer[]).filter((a) => a.researchResultId); + for (const a of previous) { + await answersStore.softDelete(a.id); + } + await startResearchRun(); + } + + function cancelResearch() { + researchHandle?.cancel(); + resetResearchState(); + toastStore.info('Recherche-Stream beendet'); + } + const statusLabels: Record = { open: { label: 'Offen', @@ -287,6 +380,58 @@ {/each} + +
+
+
+

Recherche

+

+ {#if question.researchDepth === 'quick'} + Schnell · 5 Quellen · keine Volltext-Extraktion + {:else if question.researchDepth === 'standard'} + Standard · bis zu 15 Quellen · mit Volltext-Extraktion + {:else} + Tiefgehend · bis zu 30 Quellen · alle Kategorien + {/if} +

+
+ {#if researchHandle} + + {:else if (answers as Answer[]).some((a) => a.researchResultId)} + + {:else} + + {/if} +
+ + {#if researchPhase} +
+ + {phaseLabels[researchPhase] ?? researchPhase} + {#if researchSourceCount !== null} + · {researchSourceCount} Quellen + {/if} +
+ {/if} +
+

@@ -315,9 +460,10 @@

{/if} -
- {answer.content} -
+