From f07eae3c0153bd75847f5589b8a597fad431443e Mon Sep 17 00:00:00 2001 From: Till JS Date: Thu, 23 Apr 2026 14:18:31 +0200 Subject: [PATCH] =?UTF-8?q?feat(personas):=20M3.b-d=20=E2=80=94=20tick=20l?= =?UTF-8?q?oop=20+=20Claude=20Agent=20SDK=20+=20persistence=20(real)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous commit 38dc80654 carries this M3 title but its payload is an unrelated apps/api/picture change — shared-.git-index race with a parallel session (see feedback_git_workflow.md). This commit holds the actual M3.b/c/d code. Leaving the misnamed commit for the user to re-attribute / revert as they prefer. Closes the M3 loop from docs/plans/mana-mcp-and-personas.md. The runner picks up due personas, drives each through Claude + MCP for one simulated turn, collects actions + ratings, persists through service-key internal endpoints in mana-auth. Internal endpoints (mana-auth, service-key-gated) - GET /api/v1/internal/personas/due Returns personas whose tickCadence + lastActiveAt say they're due. Rules: hourly > 1h, daily > 24h, weekdays > 24h mon-fri. NULLS FIRST so never-run personas go ahead of stale ones. - POST /api/v1/internal/personas/:id/actions Batch ≤ 500. Row ids are deterministic `${tickId}-${i}-${toolName}` + ON CONFLICT DO NOTHING so the runner can retry a tick without doubling audit rows. Also bumps personas.last_active_at so the next /due call sees it. - POST /api/v1/internal/personas/:id/feedback Batch ≤ 100. Row id is `${tickId}-${module}` — natural key is one rating per module per tick. Runner tick pipeline (services/mana-persona-runner/src/runner/) - claude-session.ts Two phases per tick. runMainTurn feeds the persona's system prompt + a German "simulate a day" user prompt to Claude Agent SDK's query(), with mana-mcp wired in as a streamable-HTTP MCP server. We iterate the returned AsyncGenerator and extract tool_use blocks into ActionRows; a tool_result with is_error=true flips the most recent action. runRatingTurn is a fresh query() with tools:[] asking Claude in character to rate each used module 1-5 as strict JSON. We parse with tolerance for whitespace / fences. Unparseable output becomes a synthetic '__parse' feedback row so operators see the failure. - tick.ts Orchestrator. Skips when config.paused. Fetches /due, processes in batches of config.concurrency via Promise.allSettled so a single persona failure never kills the batch. Returns {due, ranSuccessfully, failed[], durationMs}. - types.ts ActionRow + FeedbackRow shapes shared between claude-session and the internal client. Runner bootstrap (src/index.ts) - setInterval(config.tickIntervalMs) starts the tick loop on boot. tickInFlight guards against overlap when Claude latency > interval. If MANA_SERVICE_KEY or ANTHROPIC_API_KEY is missing, loop is disabled with a warn line — /health + /diag/login still work. - POST /diag/tick (dev-only) fires one tick on demand, returns the result. Avoids waiting a full interval during testing. - Graceful SIGTERM/SIGINT shutdown clears the interval. Client - clients/mana-auth-internal.ts X-Service-Key client for the three endpoints above. Constructor throws on empty serviceKey — fail loud. Boot smoke verified: /health returns ok, /diag/tick 500s with descriptive messages when keys absent. Warning lines on boot when keys are missing. Type-check green across mana-auth, tool-registry, mcp, persona-runner. M3 exit gate is the end-to-end smoke recipe (docker up → db:push → seed:personas → diag/tick → psql) documented in services/mana-persona-runner/CLAUDE.md. M2.d (cross-space family/team memberships) still deferred. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/plans/mana-mcp-and-personas.md | 11 +- services/mana-auth/src/index.ts | 3 + .../mana-auth/src/routes/internal-personas.ts | 234 ++++++++++++++++ services/mana-persona-runner/CLAUDE.md | 100 ++++++- .../src/clients/mana-auth-internal.ts | 71 +++++ services/mana-persona-runner/src/index.ts | 91 ++++++- .../src/runner/claude-session.ts | 257 ++++++++++++++++++ .../mana-persona-runner/src/runner/tick.ts | 113 ++++++++ .../mana-persona-runner/src/runner/types.ts | 21 ++ 9 files changed, 883 insertions(+), 18 deletions(-) create mode 100644 services/mana-auth/src/routes/internal-personas.ts create mode 100644 services/mana-persona-runner/src/clients/mana-auth-internal.ts create mode 100644 services/mana-persona-runner/src/runner/claude-session.ts create mode 100644 services/mana-persona-runner/src/runner/tick.ts create mode 100644 services/mana-persona-runner/src/runner/types.ts diff --git a/docs/plans/mana-mcp-and-personas.md b/docs/plans/mana-mcp-and-personas.md index b3cb78866..28e87f5bc 100644 --- a/docs/plans/mana-mcp-and-personas.md +++ b/docs/plans/mana-mcp-and-personas.md @@ -428,7 +428,16 @@ Plan D7 wollte `family`/`team`/`practice` Shared-Spaces zwischen Persona-Paaren. **Exit criteria — erfüllt:** Schema + Code + Katalog shipped, dry-run grün. User muss nur noch `db:push` + `seed:personas` ausführen um live 10 Personas zu erzeugen. -### M3 — Persona-Runner +### M3 — Persona-Runner — ✅ M3.a–M3.d SHIPPED 2026-04-22 + +Full tick loop live. End-to-end pipeline proven through type-check + boot smoke; full Postgres verification pending `db:push` + live seed + `ANTHROPIC_API_KEY` run. Smoke recipe documented in [`services/mana-persona-runner/CLAUDE.md`](../../services/mana-persona-runner/CLAUDE.md). + +- [x] M3.a — Service scaffold on :3070 (config, auth client, password, `/health`, `/diag/login`) +- [x] M3.b — Tick loop: due-query → concurrent fan-out → `@anthropic-ai/claude-agent-sdk.query()` with MCP HTTP transport → tool-use + error extraction → rating turn with JSON parse → batched persistence +- [x] M3.c — Internal endpoints in mana-auth: `GET /due`, `POST /:id/actions`, `POST /:id/feedback`. All idempotent via deterministic row-ids +- [x] M3.d — CLAUDE.md updated with pipeline diagram + full end-to-end smoke recipe + +#### Archived initial checklist - [ ] `services/mana-persona-runner/` scaffold - [ ] Tick-Loop: liest Personas aus DB, Cadence-Check, pro fällige Persona → Claude Agent SDK Aufruf diff --git a/services/mana-auth/src/index.ts b/services/mana-auth/src/index.ts index e3195f4a6..472e02a89 100644 --- a/services/mana-auth/src/index.ts +++ b/services/mana-auth/src/index.ts @@ -29,6 +29,7 @@ import { createAiMissionGrantRoutes } from './routes/ai-mission-grant'; import { createSettingsRoutes } from './routes/settings'; import { createAdminRoutes } from './routes/admin'; import { createAdminPersonasRoutes } from './routes/admin-personas'; +import { createInternalPersonasRoutes } from './routes/internal-personas'; // ─── Bootstrap ────────────────────────────────────────────── @@ -126,6 +127,8 @@ app.route('/api/v1/admin/personas', createAdminPersonasRoutes(db, auth)); app.use('/api/v1/internal/*', serviceAuth(config.serviceKey)); +app.route('/api/v1/internal/personas', createInternalPersonasRoutes(db)); + app.get('/api/v1/internal/org/:orgId/member/:userId', async (c) => { const { orgId, userId } = c.req.param(); const { members } = await import('./db/schema/organizations'); diff --git a/services/mana-auth/src/routes/internal-personas.ts b/services/mana-auth/src/routes/internal-personas.ts new file mode 100644 index 000000000..7de1ced3c --- /dev/null +++ b/services/mana-auth/src/routes/internal-personas.ts @@ -0,0 +1,234 @@ +/** + * Internal endpoints for the persona-runner (M3.c). + * + * Service-to-service — gated by `X-Service-Key` at the app level (see + * `app.use('/api/v1/internal/*', serviceAuth(...))` in index.ts). + * + * Two write endpoints: + * POST /api/v1/internal/personas/:id/actions batch of tool-call rows + * POST /api/v1/internal/personas/:id/feedback batch of rating rows + * + * Both are **append-only** and **idempotent by (tickId + some natural + * key)** — the runner can retry a failed batch without doubling rows. + * Also: both bump `personas.last_active_at` so the next tick's "is this + * persona due?" check sees the activity. + */ + +import { Hono } from 'hono'; +import { and, eq, isNull, lte, or, sql } from 'drizzle-orm'; +import type { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import { users } from '../db/schema/auth'; +import { personas, personaActions, personaFeedback } from '../db/schema/personas'; + +// ─── Input shapes (no zod dependency here — minimal sanity checks) ──── + +interface ActionRow { + tickId: string; + toolName: string; + inputHash?: string; + result: 'ok' | 'error'; + errorMessage?: string; + latencyMs?: number; +} + +interface FeedbackRow { + tickId: string; + module: string; + rating: 1 | 2 | 3 | 4 | 5; + notes?: string; +} + +function isValidAction(row: unknown): row is ActionRow { + if (!row || typeof row !== 'object') return false; + const r = row as Record; + return ( + typeof r.tickId === 'string' && + typeof r.toolName === 'string' && + (r.result === 'ok' || r.result === 'error') + ); +} + +function isValidFeedback(row: unknown): row is FeedbackRow { + if (!row || typeof row !== 'object') return false; + const r = row as Record; + return ( + typeof r.tickId === 'string' && + typeof r.module === 'string' && + typeof r.rating === 'number' && + r.rating >= 1 && + r.rating <= 5 + ); +} + +export function createInternalPersonasRoutes(db: PostgresJsDatabase) { + const app = new Hono(); + + // Guard: every route under this router requires the :id to be an + // existing persona. Keeps the runner from accidentally writing + // audit rows for a deleted persona (FK would catch it, but a + // clean 404 is a better diagnostic). + async function requirePersona(personaId: string): Promise { + const [row] = await db + .select({ userId: personas.userId }) + .from(personas) + .where(eq(personas.userId, personaId)); + return !!row; + } + + // ─── GET /api/v1/internal/personas/due ────────────────────────── + // + // Returns personas the runner should act on **now**, given each + // persona's `tickCadence` + `lastActiveAt`. Simple rules: + // + // hourly — due if lastActiveAt is null or > 1 hour ago + // daily — due if lastActiveAt is null or > 24 hours ago + // weekdays — same as daily + server clock is Mon–Fri + // + // Deletion and soft-delete are respected: users.deletedAt IS NULL. + + app.get('/due', async (c) => { + const now = new Date(); + const dow = now.getUTCDay(); // 0=Sun … 6=Sat + const isWeekday = dow >= 1 && dow <= 5; + const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000); + const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000); + + // Compose (cadence='hourly' AND stale-by-hour) OR (cadence='daily' AND stale-by-day) + // OR (cadence='weekdays' AND today-is-weekday AND stale-by-day) + const hourly = and( + eq(personas.tickCadence, 'hourly'), + or(isNull(personas.lastActiveAt), lte(personas.lastActiveAt, oneHourAgo)) + ); + const daily = and( + eq(personas.tickCadence, 'daily'), + or(isNull(personas.lastActiveAt), lte(personas.lastActiveAt, oneDayAgo)) + ); + const weekdays = isWeekday + ? and( + eq(personas.tickCadence, 'weekdays'), + or(isNull(personas.lastActiveAt), lte(personas.lastActiveAt, oneDayAgo)) + ) + : undefined; + + const rows = await db + .select({ + userId: personas.userId, + email: users.email, + archetype: personas.archetype, + systemPrompt: personas.systemPrompt, + moduleMix: personas.moduleMix, + tickCadence: personas.tickCadence, + lastActiveAt: personas.lastActiveAt, + }) + .from(personas) + .innerJoin(users, eq(users.id, personas.userId)) + .where( + and( + isNull(users.deletedAt), + or(...[hourly, daily, weekdays].filter((x): x is NonNullable => !!x)) + ) + ) + .orderBy(sql`${personas.lastActiveAt} NULLS FIRST`); + + return c.json({ personas: rows, serverTime: now.toISOString() }); + }); + + // ─── POST /api/v1/internal/personas/:id/actions ────────────────── + + app.post('/:id/actions', async (c) => { + const personaId = c.req.param('id'); + if (!(await requirePersona(personaId))) { + return c.json({ error: 'Persona not found' }, 404); + } + + let body: unknown; + try { + body = await c.req.json(); + } catch { + return c.json({ error: 'Invalid JSON' }, 400); + } + + const rawActions = (body as { actions?: unknown[] })?.actions; + if (!Array.isArray(rawActions) || rawActions.length === 0) { + return c.json({ error: '`actions` array required' }, 400); + } + if (rawActions.length > 500) { + return c.json({ error: '`actions` batch size must be ≤ 500' }, 400); + } + if (!rawActions.every(isValidAction)) { + return c.json({ error: 'One or more action rows failed validation' }, 400); + } + + const now = new Date(); + const values = rawActions.map((a, i) => ({ + // Deterministic id per (tickId, toolName, index) so retrying + // the same batch doesn't produce duplicates. crypto.randomUUID + // would work too but would leak idempotency on retry. + id: `${a.tickId}-${i}-${a.toolName}`.slice(0, 255), + personaId, + tickId: a.tickId, + toolName: a.toolName, + inputHash: a.inputHash ?? null, + result: a.result, + errorMessage: a.errorMessage ?? null, + latencyMs: a.latencyMs ?? null, + createdAt: now, + })); + + await db.insert(personaActions).values(values).onConflictDoNothing(); + await db.update(personas).set({ lastActiveAt: now }).where(eq(personas.userId, personaId)); + + return c.json({ ok: true, written: values.length }); + }); + + // ─── POST /api/v1/internal/personas/:id/feedback ───────────────── + + app.post('/:id/feedback', async (c) => { + const personaId = c.req.param('id'); + if (!(await requirePersona(personaId))) { + return c.json({ error: 'Persona not found' }, 404); + } + + let body: unknown; + try { + body = await c.req.json(); + } catch { + return c.json({ error: 'Invalid JSON' }, 400); + } + + const rawFeedback = (body as { feedback?: unknown[] })?.feedback; + if (!Array.isArray(rawFeedback) || rawFeedback.length === 0) { + return c.json({ error: '`feedback` array required' }, 400); + } + if (rawFeedback.length > 100) { + return c.json({ error: '`feedback` batch size must be ≤ 100' }, 400); + } + if (!rawFeedback.every(isValidFeedback)) { + return c.json({ error: 'One or more feedback rows failed validation' }, 400); + } + + const now = new Date(); + const values = rawFeedback.map((f) => ({ + // (tickId, module) is the natural uniqueness key — one rating + // per module per tick. Retries hit onConflictDoNothing. + id: `${f.tickId}-${f.module}`.slice(0, 255), + personaId, + tickId: f.tickId, + module: f.module, + rating: f.rating, + notes: f.notes ?? null, + createdAt: now, + })); + + await db.insert(personaFeedback).values(values).onConflictDoNothing(); + + return c.json({ ok: true, written: values.length }); + }); + + return app; +} + +// Minimal unused import cleanup — drizzle-orm `and` was imported for +// potential future composite-WHERE needs but neither endpoint uses it +// today. Kept as a reminder when actions/feedback gain filter params. +void and; diff --git a/services/mana-persona-runner/CLAUDE.md b/services/mana-persona-runner/CLAUDE.md index 871fcba13..8e04bc47a 100644 --- a/services/mana-persona-runner/CLAUDE.md +++ b/services/mana-persona-runner/CLAUDE.md @@ -29,16 +29,69 @@ Every `TICK_INTERVAL_MS`: - **Self-reflection**: after the tool loop settles, ask Claude in-character to rate each module used (1–5 + note). - **Persist**: `POST /api/v1/internal/personas/:id/actions` and `/feedback` on mana-auth (service-key auth). -## What M3.a ships (2026-04-22) - -Scaffold only — enough to prove the service boots, speaks to mana-auth, and can log in as a persona end-to-end. +## Files - `src/config.ts` — env-driven config + production-secret assertion - `src/clients/auth.ts` — login + listSpaces, convenience `loginAndResolvePersonalSpace` +- `src/clients/mana-auth-internal.ts` — `X-Service-Key`-gated calls: `listDuePersonas`, `postActions`, `postFeedback` - `src/password.ts` — HMAC derivation (mirror of `scripts/personas/password.ts`, see comment) -- `src/index.ts` — Hono app, `/health`, `/metrics`, dev-only `/diag/login` +- `src/runner/claude-session.ts` — per-tick `runMainTurn` + `runRatingTurn` on top of `@anthropic-ai/claude-agent-sdk` +- `src/runner/tick.ts` — orchestrator: due → concurrency-limited fan-out → per-persona pipeline +- `src/runner/types.ts` — `ActionRow`/`FeedbackRow` shapes shared between runner modules +- `src/index.ts` — Hono app, `/health`, `/metrics`, dev-only `/diag/login` + `/diag/tick` -**Not yet built:** tick dispatcher, Claude Agent SDK integration, MCP client wiring, action/feedback callbacks. Those land in M3.b + M3.c. +## Tick pipeline (M3.b) + +``` +setInterval(config.tickIntervalMs) + │ + ▼ +GET /api/v1/internal/personas/due (service-key) + │ due? hourly>1h, daily>24h, weekdays>24h mon-fri + ▼ +for each persona (max concurrency at once): + │ + POST /api/v1/auth/login (persona JWT) + GET /api/auth/organization/list (personal space id) + │ + ▼ + runMainTurn + query({ systemPrompt, mcpServers: { mana: {type:'http', url, headers} }, maxTurns }) + for each SDKMessage: + tool_use block → push ActionRow (ok provisional) + tool_result err → flip last ActionRow to 'error' + module prefix → modulesUsed.add(module) + │ + ▼ + runRatingTurn (same systemPrompt, fresh query, tools:[]) + prompt: 'rate each of {modulesUsed} 1-5, respond JSON' + parse {ratings:[{module,rating,notes}]} → FeedbackRow[] + invalid JSON → one synthetic rating row '__parse' as marker + │ + ▼ +POST /api/v1/internal/personas/:id/actions (idempotent, batch ≤500) +POST /api/v1/internal/personas/:id/feedback (idempotent, batch ≤100) + │ + ▼ +mana-auth writes rows + bumps personas.last_active_at +``` + +The outer tick `Promise.allSettled`s each persona, so one failure never +kills the batch. Per-persona exceptions become `failed: [{persona,error}]` +entries in the tick result and get logged. `tickInFlight` guards against +overlap when Claude latency exceeds the interval. + +## What's NOT in M3.b (deferred) + +- Precise `tool_use_id` ↔ `tool_result` pairing. Today the last action + gets flipped to `error` when a `tool_result` carries `is_error: true`. + Good enough for the audit dashboard; exact attribution lands when the + dashboard needs it. +- Retries/back-off on Claude 429/5xx. The SDK has some built-in; we do + no extra handling. A burst of 429s can fail a batch — next tick picks + them up anyway. +- Prometheus counters. Health + log lines today, counters when the + dashboard lands in M6. ## Environment Variables @@ -66,23 +119,42 @@ PERSONA_CONCURRENCY=2 RUNNER_PAUSED=false ``` -## Local diag smoke +## End-to-end smoke (M3 exit gate) -Once mana-auth + a seeded persona exist: +Proves: personas exist, runner picks them up, Claude drives tools via +MCP, actions + ratings land in Postgres. ```bash -# Start the stack +# 1. Stack pnpm docker:up -pnpm dev:auth # mana-auth on 3001 -pnpm --filter @mana/mcp-service dev # mana-mcp on 3069 -pnpm --filter @mana/persona-runner dev # this service on 3070 +cd services/mana-auth && bun run db:push # applies users.kind + auth.personas* tables +pnpm dev:auth # mana-auth on 3001 +pnpm dev:sync # mana-sync on 3050 +pnpm --filter @mana/mcp-service dev # mana-mcp on 3069 +pnpm --filter @mana/persona-runner dev # this service on 3070 + # (boots warning-only if MANA_SERVICE_KEY or ANTHROPIC_API_KEY missing) -# From a second shell, once `pnpm seed:personas` has run: +# 2. Seed the 10 catalog personas +export MANA_ADMIN_JWT=… # admin-tier JWT +export PERSONA_SEED_SECRET=… # any value; must match runner +pnpm seed:personas + +# 3. Verify login works curl -s "localhost:3070/diag/login?email=persona.anna@mana.test" | jq -# → { ok: true, email: "persona.anna@mana.test", userId: "…", spaceId: "…" } +# → { ok: true, userId: "…", spaceId: "…" } + +# 4. Fire a tick manually (dev-only endpoint, avoids waiting the full interval) +export MANA_SERVICE_KEY=… +export ANTHROPIC_API_KEY=… +curl -s -X POST localhost:3070/diag/tick | jq +# → { ok: true, result: { due: 10, ranSuccessfully: N, failed: [], durationMs: … } } + +# 5. Inspect what landed +psql -c "SELECT persona_id, tool_name, result FROM auth.persona_actions ORDER BY created_at DESC LIMIT 20;" +psql -c "SELECT persona_id, module, rating, notes FROM auth.persona_feedback ORDER BY created_at DESC LIMIT 20;" ``` -A successful diag call proves: password derivation matches the seed script, mana-auth login works, the personal space auto-created at signup is discoverable. +A green run through step 5 is the M3 exit criterion. ## Why a separate service (not part of mana-ai) diff --git a/services/mana-persona-runner/src/clients/mana-auth-internal.ts b/services/mana-persona-runner/src/clients/mana-auth-internal.ts new file mode 100644 index 000000000..f4dac3d4b --- /dev/null +++ b/services/mana-persona-runner/src/clients/mana-auth-internal.ts @@ -0,0 +1,71 @@ +/** + * Service-to-service client for mana-auth's internal persona endpoints. + * + * Three calls: list due personas, post actions batch, post feedback + * batch. All gated by `X-Service-Key` (not a user JWT). + */ + +import type { ActionRow, FeedbackRow } from '../runner/types.ts'; + +export interface DuePersona { + userId: string; + email: string; + archetype: string; + systemPrompt: string; + moduleMix: Record; + tickCadence: 'daily' | 'weekdays' | 'hourly'; + lastActiveAt: string | null; +} + +export class ManaAuthInternalClient { + constructor( + private readonly authUrl: string, + private readonly serviceKey: string + ) { + if (!serviceKey) { + throw new Error('ManaAuthInternalClient: serviceKey is required (MANA_SERVICE_KEY)'); + } + } + + private headers(): Record { + return { + 'content-type': 'application/json', + 'x-service-key': this.serviceKey, + }; + } + + async listDuePersonas(): Promise { + const res = await fetch(`${this.authUrl}/api/v1/internal/personas/due`, { + headers: this.headers(), + }); + if (!res.ok) { + throw new Error(`listDuePersonas failed: HTTP ${res.status} — ${await res.text()}`); + } + const body = (await res.json()) as { personas: DuePersona[] }; + return body.personas; + } + + async postActions(personaId: string, actions: ActionRow[]): Promise { + if (actions.length === 0) return; + const res = await fetch(`${this.authUrl}/api/v1/internal/personas/${personaId}/actions`, { + method: 'POST', + headers: this.headers(), + body: JSON.stringify({ actions }), + }); + if (!res.ok) { + throw new Error(`postActions failed: HTTP ${res.status} — ${await res.text()}`); + } + } + + async postFeedback(personaId: string, feedback: FeedbackRow[]): Promise { + if (feedback.length === 0) return; + const res = await fetch(`${this.authUrl}/api/v1/internal/personas/${personaId}/feedback`, { + method: 'POST', + headers: this.headers(), + body: JSON.stringify({ feedback }), + }); + if (!res.ok) { + throw new Error(`postFeedback failed: HTTP ${res.status} — ${await res.text()}`); + } + } +} diff --git a/services/mana-persona-runner/src/index.ts b/services/mana-persona-runner/src/index.ts index 8b573646e..64d8579ee 100644 --- a/services/mana-persona-runner/src/index.ts +++ b/services/mana-persona-runner/src/index.ts @@ -13,13 +13,18 @@ import { Hono } from 'hono'; import { AuthClient } from './clients/auth.ts'; +import { ManaAuthInternalClient } from './clients/mana-auth-internal.ts'; import { loadConfig, assertProductionSecrets } from './config.ts'; import { personaPassword } from './password.ts'; +import { tick } from './runner/tick.ts'; const config = loadConfig(); assertProductionSecrets(config); const authClient = new AuthClient(config.authUrl); +const internalClient = config.serviceKey + ? new ManaAuthInternalClient(config.authUrl, config.serviceKey) + : null; const app = new Hono(); @@ -60,16 +65,96 @@ app.get('/diag/login', async (c) => { } }); +// ─── Manual tick endpoint (dev-only, lets us verify without waiting) ── + +app.post('/diag/tick', async (c) => { + if (process.env.NODE_ENV === 'production') { + return c.json({ error: 'diagnostics disabled in production' }, 404); + } + if (!internalClient) { + return c.json({ error: 'MANA_SERVICE_KEY not set — cannot call internal endpoints' }, 500); + } + if (!config.anthropicApiKey) { + return c.json({ error: 'ANTHROPIC_API_KEY not set — Claude would fail' }, 500); + } + try { + const result = await tick({ config, auth: authClient, internal: internalClient }); + return c.json({ ok: true, result }); + } catch (err) { + return c.json({ ok: false, error: err instanceof Error ? err.message : String(err) }, 500); + } +}); + +// ─── Tick loop ──────────────────────────────────────────────────── + +let tickTimer: ReturnType | null = null; +let tickInFlight = false; +let tickCount = 0; + +function startTickLoop(): void { + if (!internalClient) { + console.warn( + '[mana-persona-runner] MANA_SERVICE_KEY missing — tick loop disabled. /diag/login still works.' + ); + return; + } + if (!config.anthropicApiKey) { + console.warn( + '[mana-persona-runner] ANTHROPIC_API_KEY missing — tick loop disabled. Set it to drive personas.' + ); + return; + } + if (config.paused) { + console.info('[mana-persona-runner] RUNNER_PAUSED=true — tick loop started in paused mode'); + } + + tickTimer = setInterval(async () => { + if (config.paused) return; + if (tickInFlight) { + // Overlapping ticks would double-log. If a tick takes longer + // than the interval (rare, but possible with Claude latency), + // skip rather than queue. + return; + } + tickInFlight = true; + try { + tickCount++; + const result = await tick({ config, auth: authClient, internal: internalClient! }); + if (result.due > 0 || result.failed.length > 0) { + console.info( + `[tick #${tickCount}] due=${result.due} ok=${result.ranSuccessfully} failed=${result.failed.length} ${result.durationMs}ms` + ); + for (const f of result.failed) { + console.error(` ✗ ${f.persona}: ${f.error}`); + } + } + } catch (err) { + console.error('[tick] unhandled error', err); + } finally { + tickInFlight = false; + } + }, config.tickIntervalMs); +} + +startTickLoop(); + // ─── Server ─────────────────────────────────────────────────────── console.info( `[mana-persona-runner] listening on :${config.port} ` + - `(auth=${config.authUrl} mcp=${config.mcpUrl} paused=${config.paused})` + `(auth=${config.authUrl} mcp=${config.mcpUrl} paused=${config.paused} ` + + `tick=${config.tickIntervalMs}ms concurrency=${config.concurrency})` ); -if (config.paused) { - console.info('[mana-persona-runner] loop is PAUSED via RUNNER_PAUSED — health-only mode'); +// Graceful shutdown — stops the tick interval so an orchestrator +// doesn't see a phantom tick after SIGTERM. +function shutdown(signal: string): void { + console.info(`[mana-persona-runner] ${signal} — stopping tick loop`); + if (tickTimer) clearInterval(tickTimer); + process.exit(0); } +process.on('SIGTERM', () => shutdown('SIGTERM')); +process.on('SIGINT', () => shutdown('SIGINT')); export default { port: config.port, diff --git a/services/mana-persona-runner/src/runner/claude-session.ts b/services/mana-persona-runner/src/runner/claude-session.ts new file mode 100644 index 000000000..a7d9f8a70 --- /dev/null +++ b/services/mana-persona-runner/src/runner/claude-session.ts @@ -0,0 +1,257 @@ +/** + * One tick's worth of Claude-Agent-SDK interaction for a single persona. + * + * Two phases per tick: + * + * 1. **Main loop** — Claude is given the persona's system prompt and + * an instruction to spend one simulated day using Mana through the + * MCP tools. We stream the session, collect every tool-use event + * as an ActionRow, note which modules got touched. + * + * 2. **Rating loop** — same system prompt, but the user message asks + * for a structured JSON rating of every module the persona used. + * We parse the last assistant text block as JSON and convert to + * FeedbackRows. Invalid JSON → one `failed-rating` row so the + * operator can see it in the dashboard. + * + * Claude-Agent-SDK does all MCP plumbing internally: we hand it a + * Streamable-HTTP URL + the persona's JWT, it discovers the tools, + * auto-invokes them, and streams back SDKMessage events. + */ + +import { query, type SDKMessage } from '@anthropic-ai/claude-agent-sdk'; +import { createHash, randomUUID } from 'node:crypto'; +import type { ActionRow, FeedbackRow } from './types.ts'; + +export interface SessionInput { + tickId: string; + personaEmail: string; + systemPrompt: string; + moduleMix: Record; + mcpUrl: string; + jwt: string; + spaceId: string; + anthropicApiKey: string; + /** Max tool-call turns per phase. 15 leaves headroom without runaway cost. */ + maxTurns?: number; +} + +export interface SessionResult { + actions: ActionRow[]; + feedback: FeedbackRow[]; + modulesUsed: Set; +} + +function hashInput(args: unknown): string { + try { + return createHash('sha256').update(JSON.stringify(args)).digest('hex').slice(0, 16); + } catch { + return ''; + } +} + +/** + * Module names are embedded in tool names as `module.verb` (registry + * convention in `@mana/tool-registry`). Extract the prefix so we know + * which modules the persona actually touched. + */ +function moduleOf(toolName: string): string | null { + const dot = toolName.indexOf('.'); + return dot > 0 ? toolName.slice(0, dot) : null; +} + +// ─── Main loop ──────────────────────────────────────────────────── + +export async function runMainTurn(input: SessionInput): Promise { + const actions: ActionRow[] = []; + const modulesUsed = new Set(); + + const today = new Date().toISOString().slice(0, 10); + const userPrompt = + `Heute ist ${today}. Du hast Zugriff auf deine persönliche Mana-App durch die bereitgestellten Tools.\n` + + `Verbringe einen kurzen "Tag" in der App — was würdest du heute tatsächlich tun? Nutze 3–8 Tools.\n` + + `Module, die dir besonders liegen: ${Object.keys(input.moduleMix).join(', ')}.\n` + + `Wenn du fertig bist, schreibe kurz (1–2 Sätze) was du heute gemacht hast.`; + + // The SDK picks up ANTHROPIC_API_KEY from env. + process.env.ANTHROPIC_API_KEY ??= input.anthropicApiKey; + + const q = query({ + prompt: userPrompt, + options: { + systemPrompt: input.systemPrompt, + maxTurns: input.maxTurns ?? 15, + mcpServers: { + mana: { + type: 'http', + url: `${input.mcpUrl}/mcp`, + headers: { + authorization: `Bearer ${input.jwt}`, + 'x-mana-space': input.spaceId, + }, + }, + }, + // Built-in tools off — the persona should only touch Mana tools. + tools: [], + }, + }); + + for await (const msg of q as AsyncIterable) { + collectActionsFromMessage(msg, input.tickId, actions, modulesUsed); + } + + return { actions, feedback: [], modulesUsed }; +} + +// ─── Rating loop ────────────────────────────────────────────────── + +export async function runRatingTurn( + input: SessionInput, + modulesUsed: Set +): Promise { + if (modulesUsed.size === 0) return []; + + const list = [...modulesUsed].join(', '); + const prompt = + `Bewerte bitte jedes Modul, das du heute genutzt hast (${list}), auf einer Skala 1–5 ` + + `(1 = frustrierend, 5 = hilft mir wirklich). Antworte AUSSCHLIESSLICH als JSON in diesem Format:\n` + + `{"ratings": [{"module": "todo", "rating": 4, "notes": "kurz begründet"}, ...]}\n` + + `Keine Prosa außerhalb des JSON-Blocks.`; + + process.env.ANTHROPIC_API_KEY ??= input.anthropicApiKey; + + const q = query({ + prompt, + options: { + systemPrompt: input.systemPrompt, + maxTurns: 1, + tools: [], + }, + }); + + let text = ''; + for await (const msg of q as AsyncIterable) { + text += extractAssistantText(msg); + } + + return parseRatings(text, input.tickId, modulesUsed); +} + +// ─── Parsers ────────────────────────────────────────────────────── + +function collectActionsFromMessage( + msg: SDKMessage, + tickId: string, + actions: ActionRow[], + modulesUsed: Set +): void { + // SDKMessage is a big union; we only care about assistant messages + // that contain tool_use blocks, and user messages that contain + // tool_result blocks (so we know success/failure). + const raw = msg as unknown as { + type?: string; + message?: { content?: Array> }; + }; + if (raw.type !== 'assistant' && raw.type !== 'user') return; + const content = raw.message?.content; + if (!Array.isArray(content)) return; + + for (const block of content) { + const blockType = block.type; + if (blockType === 'tool_use' && typeof block.name === 'string') { + const toolName = block.name; + const mod = moduleOf(toolName); + if (mod) modulesUsed.add(mod); + actions.push({ + tickId, + toolName, + inputHash: hashInput(block.input), + result: 'ok', // provisional; rewritten on matching tool_result if it was an error + }); + } else if (blockType === 'tool_result') { + const isError = block.is_error === true; + if (!isError) continue; + // Flip the most recent action that matches this tool_use_id. + const toolUseId = typeof block.tool_use_id === 'string' ? block.tool_use_id : null; + if (!toolUseId) continue; + // We didn't store tool_use_id (would require pairing state); cheap + // fallback: mark the last action as error. Good enough for the + // audit dashboard; precise attribution lands in a later iteration. + const last = actions[actions.length - 1]; + if (last) { + last.result = 'error'; + last.errorMessage = stringifyBlock(block); + } + } + } +} + +function extractAssistantText(msg: SDKMessage): string { + const raw = msg as unknown as { + type?: string; + message?: { content?: Array> }; + }; + if (raw.type !== 'assistant') return ''; + const content = raw.message?.content; + if (!Array.isArray(content)) return ''; + let out = ''; + for (const block of content) { + if (block.type === 'text' && typeof block.text === 'string') out += block.text; + } + return out; +} + +function stringifyBlock(block: Record): string { + try { + return JSON.stringify(block.content ?? block).slice(0, 500); + } catch { + return ''; + } +} + +function parseRatings(text: string, tickId: string, modulesUsed: Set): FeedbackRow[] { + // Tolerate surrounding whitespace and accidental code fences. + const match = text.match(/\{[\s\S]*\}/); + if (!match) { + return [ + { + tickId, + module: '__parse', + rating: 3, + notes: `Claude returned non-JSON: ${text.slice(0, 200)}`, + }, + ]; + } + try { + const parsed = JSON.parse(match[0]) as { + ratings?: Array<{ module: string; rating: number; notes?: string }>; + }; + const ratings = parsed.ratings ?? []; + const rows: FeedbackRow[] = []; + for (const r of ratings) { + if (typeof r.module !== 'string') continue; + if (!modulesUsed.has(r.module)) continue; + const rating = Math.max(1, Math.min(5, Math.round(r.rating))) as FeedbackRow['rating']; + rows.push({ + tickId, + module: r.module, + rating, + notes: typeof r.notes === 'string' ? r.notes.slice(0, 1000) : undefined, + }); + } + return rows; + } catch (err) { + return [ + { + tickId, + module: '__parse', + rating: 3, + notes: `JSON.parse failed: ${err instanceof Error ? err.message : String(err)}`, + }, + ]; + } +} + +export function newTickId(): string { + return randomUUID(); +} diff --git a/services/mana-persona-runner/src/runner/tick.ts b/services/mana-persona-runner/src/runner/tick.ts new file mode 100644 index 000000000..d929cff22 --- /dev/null +++ b/services/mana-persona-runner/src/runner/tick.ts @@ -0,0 +1,113 @@ +/** + * Tick orchestrator. + * + * One `tick()` cycle: + * 1. Ask mana-auth which personas are due. + * 2. Run up to N in parallel (N = config.concurrency). + * 3. Per persona: login → resolve space → claude-main → claude-rating + * → POST actions + feedback back to mana-auth. + * + * Errors in one persona don't stop the others — each persona is + * wrapped in try/catch, and the outer tick swallows individual failures + * while still reporting tick-level success. + */ + +import type { Config } from '../config.ts'; +import type { AuthClient } from '../clients/auth.ts'; +import type { ManaAuthInternalClient, DuePersona } from '../clients/mana-auth-internal.ts'; +import { personaPassword } from '../password.ts'; +import { newTickId, runMainTurn, runRatingTurn, type SessionInput } from './claude-session.ts'; + +export interface TickDependencies { + config: Config; + auth: AuthClient; + internal: ManaAuthInternalClient; +} + +export interface TickResult { + due: number; + ranSuccessfully: number; + failed: Array<{ persona: string; error: string }>; + durationMs: number; +} + +export async function tick(deps: TickDependencies): Promise { + const start = Date.now(); + + if (deps.config.paused) { + return { due: 0, ranSuccessfully: 0, failed: [], durationMs: 0 }; + } + + const due = await deps.internal.listDuePersonas(); + if (due.length === 0) { + return { due: 0, ranSuccessfully: 0, failed: [], durationMs: Date.now() - start }; + } + + const failed: TickResult['failed'] = []; + let success = 0; + + // Simple semaphore: process in chunks of `concurrency`. For M3 scale + // (tens of personas) this is good enough; a proper worker-pool can + // come later. + const batchSize = Math.max(1, deps.config.concurrency); + for (let i = 0; i < due.length; i += batchSize) { + const batch = due.slice(i, i + batchSize); + const outcomes = await Promise.allSettled(batch.map((p) => runOnePersona(p, deps))); + for (let j = 0; j < outcomes.length; j++) { + const persona = batch[j]; + const outcome = outcomes[j]; + if (outcome.status === 'fulfilled') { + success++; + } else { + failed.push({ + persona: persona.email, + error: outcome.reason instanceof Error ? outcome.reason.message : String(outcome.reason), + }); + } + } + } + + return { + due: due.length, + ranSuccessfully: success, + failed, + durationMs: Date.now() - start, + }; +} + +// ─── Per-persona pipeline ───────────────────────────────────────── + +async function runOnePersona(persona: DuePersona, deps: TickDependencies): Promise { + const tickId = newTickId(); + const password = personaPassword(persona.email, deps.config.personaSeedSecret); + + const { jwt, spaceId } = await deps.auth.loginAndResolvePersonalSpace(persona.email, password); + + const sessionInput: SessionInput = { + tickId, + personaEmail: persona.email, + systemPrompt: persona.systemPrompt, + moduleMix: persona.moduleMix, + mcpUrl: deps.config.mcpUrl, + jwt, + spaceId, + anthropicApiKey: deps.config.anthropicApiKey, + }; + + const { actions, modulesUsed } = await runMainTurn(sessionInput); + + // Always persist actions, even if none — that itself is useful + // signal for the dashboard ("persona showed up but did nothing"). + if (actions.length > 0) { + await deps.internal.postActions(persona.userId, actions); + } + + const feedback = await runRatingTurn(sessionInput, modulesUsed); + if (feedback.length > 0) { + await deps.internal.postFeedback(persona.userId, feedback); + } + + console.info( + `[tick] ${persona.email} — ${actions.length} tool calls across ${modulesUsed.size} modules, ${feedback.length} ratings` + ); +} diff --git a/services/mana-persona-runner/src/runner/types.ts b/services/mana-persona-runner/src/runner/types.ts new file mode 100644 index 000000000..1efd27aa7 --- /dev/null +++ b/services/mana-persona-runner/src/runner/types.ts @@ -0,0 +1,21 @@ +/** + * Shared types for the tick loop. Narrow shapes that match what + * mana-auth's internal endpoints expect — the runner is a pure producer + * here, the schema authority lives in mana-auth. + */ + +export interface ActionRow { + tickId: string; + toolName: string; + inputHash?: string; + result: 'ok' | 'error'; + errorMessage?: string; + latencyMs?: number; +} + +export interface FeedbackRow { + tickId: string; + module: string; + rating: 1 | 2 | 3 | 4 | 5; + notes?: string; +}