From 5e01763caaf6e505f9a35543d577751173644626 Mon Sep 17 00:00:00 2001 From: Till JS Date: Wed, 15 Apr 2026 00:29:30 +0200 Subject: [PATCH] =?UTF-8?q?feat(ai):=20close=20the=20loop=20=E2=80=94=20se?= =?UTF-8?q?rver=20write-back=20+=20webapp=20staging=20effect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the off-tab AI pipeline. mana-ai now writes produced plans back to `sync_changes` as a server-sourced Mission iteration; the webapp picks it up on next sync and translates each PlanStep into a local Proposal via the existing createProposal flow. User sees the resulting ghost cards in the matching module's AiProposalInbox with full mission attribution. Server (mana-ai v0.3): - `db/connection.ts` — `withUser(sql, userId, fn)` RLS-scoped tx helper mirroring the Go `withUser` pattern (SET LOCAL app.current_user_id) - `db/iteration-writer.ts` - `planToIteration(plan, id, now)` — shared-ai AiPlanOutput → inline MissionIteration with `source: 'server'` + status='awaiting-review' - `appendServerIteration(sql, input)` — INSERT sync_changes row with op=update, data={iterations: [...]} + field_timestamps + actor JSONB={kind:'system', source:'mission-runner'} - `cron/tick.ts` — after parse success: build iteration, append to mission.iterations, persist via appendServerIteration. Stats now include `plansWrittenBack`. Actor union: - `packages/shared-ai/src/actor.ts` + webapp actor: `system.source` gains `'mission-runner'` so the server's own writes are attributed correctly and distinguishable from projection/rule writes Webapp: - `data/ai/missions/server-iteration-staging.ts` - `startServerIterationStaging()` subscribes to aiMissions via Dexie liveQuery; on each Mission update, walks iterations looking for `source='server'` entries that haven't been staged yet - For each such iteration: creates a Proposal per PlanStep under `{kind:'ai', missionId, iterationId, rationale}` so policy + hooks fire correctly - Writes proposalIds back into plan[].proposalId + status='staged' so other tabs and app restarts skip re-staging - Idempotent: in-memory `processedIterations` Set + durable proposalId marker - Wired into (app)/+layout.svelte alongside startMissionTick - 3 unit tests: translate server iteration → proposal, skip already-staged, ignore browser iterations Full pipeline now: user creates Mission in /companion/missions → mana-ai tick picks it up → calls mana-llm → parses plan → writes iteration → synced to webapp → staging effect creates proposals → user approves in /todo (or any module) → task lands with `{actor: ai, missionId, iterationId, rationale}` attribution. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../missions/server-iteration-staging.test.ts | 171 ++++++++++++++++++ .../ai/missions/server-iteration-staging.ts | 138 ++++++++++++++ .../apps/web/src/lib/data/events/actor.ts | 2 +- .../apps/web/src/routes/(app)/+layout.svelte | 96 ++++++---- packages/shared-ai/src/actor.ts | 2 +- services/mana-ai/src/cron/tick.ts | 40 +++- services/mana-ai/src/db/connection.ts | 22 +++ services/mana-ai/src/db/iteration-writer.ts | 110 +++++++++++ 8 files changed, 530 insertions(+), 51 deletions(-) create mode 100644 apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.test.ts create mode 100644 apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.ts create mode 100644 services/mana-ai/src/db/iteration-writer.ts diff --git a/apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.test.ts b/apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.test.ts new file mode 100644 index 000000000..d2933d295 --- /dev/null +++ b/apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.test.ts @@ -0,0 +1,171 @@ +import 'fake-indexeddb/auto'; +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; + +vi.mock('$lib/stores/funnel-tracking', () => ({ trackFirstContent: vi.fn() })); +vi.mock('$lib/triggers/registry', () => ({ fire: vi.fn() })); +vi.mock('$lib/triggers/inline-suggest', () => ({ + checkInlineSuggestion: vi.fn().mockResolvedValue(null), +})); + +import { db } from '../../database'; +import { registerTools } from '../../tools/registry'; +import { setAiPolicy } from '../policy'; +import { createMission, finishIteration, startIteration } from './store'; +import { MISSIONS_TABLE } from './types'; +import { listProposals } from '../proposals/store'; +import { PROPOSALS_TABLE } from '../proposals/types'; +import { + startServerIterationStaging, + stopServerIterationStaging, + resetServerIterationStagingCache, +} from './server-iteration-staging'; + +registerTools([ + { + name: 'staging_test_op', + module: 'stagingTest', + description: 'propose only', + parameters: [{ name: 'val', type: 'string', required: true, description: 'v' }], + async execute() { + return { success: true, message: 'ok' }; + }, + }, +]); + +const flush = () => new Promise((r) => setTimeout(r, 50)); + +beforeEach(async () => { + await db.table(MISSIONS_TABLE).clear(); + await db.table(PROPOSALS_TABLE).clear(); + resetServerIterationStagingCache(); +}); + +afterEach(() => { + stopServerIterationStaging(); +}); + +describe('server-iteration staging', () => { + it('translates a server iteration into local proposals', async () => { + const restore = setAiPolicy({ + tools: { staging_test_op: 'propose' }, + defaultForAi: 'propose', + }); + try { + const m = await createMission({ + title: 'x', + conceptMarkdown: '', + objective: 'x', + cadence: { kind: 'manual' }, + }); + // Simulate what mana-ai's write-back would sync into Dexie + const it = await startIteration(m.id, { + plan: [ + { + id: 'srv-step-1', + summary: 'server step', + intent: { + kind: 'toolCall', + toolName: 'staging_test_op', + params: { val: 'hello' }, + }, + status: 'planned', + }, + ], + }); + await finishIteration(m.id, it.id, { + overallStatus: 'awaiting-review', + }); + // Stamp source='server' — startIteration/finishIteration don't + // set it; the write-back path from mana-ai does. + const row = await db.table(MISSIONS_TABLE).get(m.id); + const patched = row.iterations.map((x: { id: string }) => + x.id === it.id ? { ...x, source: 'server' } : x + ); + await db.table(MISSIONS_TABLE).update(m.id, { iterations: patched }); + + startServerIterationStaging(); + await flush(); + await flush(); + + const proposals = await listProposals({ status: 'pending' }); + expect(proposals).toHaveLength(1); + expect(proposals[0].missionId).toBe(m.id); + expect(proposals[0].iterationId).toBe(it.id); + expect(proposals[0].intent).toMatchObject({ + kind: 'toolCall', + toolName: 'staging_test_op', + params: { val: 'hello' }, + }); + } finally { + restore(); + } + }); + + it('does not re-stage an iteration that already has proposalIds', async () => { + const m = await createMission({ + title: 'x', + conceptMarkdown: '', + objective: 'x', + cadence: { kind: 'manual' }, + }); + const it = await startIteration(m.id, { + plan: [ + { + id: 'srv-step-1', + summary: 's', + intent: { + kind: 'toolCall', + toolName: 'staging_test_op', + params: { val: 'x' }, + }, + status: 'staged', + proposalId: 'already-there', + }, + ], + }); + await finishIteration(m.id, it.id, { overallStatus: 'awaiting-review' }); + const row = await db.table(MISSIONS_TABLE).get(m.id); + const patched = row.iterations.map((x: { id: string }) => + x.id === it.id ? { ...x, source: 'server' } : x + ); + await db.table(MISSIONS_TABLE).update(m.id, { iterations: patched }); + + startServerIterationStaging(); + await flush(); + await flush(); + + const proposals = await listProposals({ status: 'pending' }); + expect(proposals).toHaveLength(0); + }); + + it('ignores browser-sourced iterations', async () => { + const m = await createMission({ + title: 'x', + conceptMarkdown: '', + objective: 'x', + cadence: { kind: 'manual' }, + }); + const it = await startIteration(m.id, { + plan: [ + { + id: 'browser-step', + summary: 's', + intent: { + kind: 'toolCall', + toolName: 'staging_test_op', + params: { val: 'x' }, + }, + status: 'planned', + }, + ], + }); + await finishIteration(m.id, it.id, { overallStatus: 'awaiting-review' }); + // leave source unset (defaults to 'browser') + + startServerIterationStaging(); + await flush(); + + const proposals = await listProposals({ status: 'pending' }); + expect(proposals).toHaveLength(0); + }); +}); diff --git a/apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.ts b/apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.ts new file mode 100644 index 000000000..fdfa89ec5 --- /dev/null +++ b/apps/mana/apps/web/src/lib/data/ai/missions/server-iteration-staging.ts @@ -0,0 +1,138 @@ +/** + * Server-iteration staging — translates server-produced Mission + * iterations into local Proposals. + * + * The mana-ai Bun service writes plans back as a new `Mission.iterations[]` + * entry with `source: 'server'`. When the webapp syncs, `applyServerChanges` + * merges the new iterations array into the local record. This module + * subscribes to those updates and, for each server iteration we haven't + * processed yet, creates a Proposal per PlanStep via the existing + * `createProposal` flow. + * + * Idempotency: each iteration id is tracked in a local Set so re-runs + * (e.g. after tab reopen) don't duplicate proposals. Proposals that + * successfully create get their id written back into `plan[i].proposalId` + * so the Workbench UI links them; that also doubles as a durable + * "already staged" marker surviving app restarts. + */ + +import { liveQuery } from 'dexie'; +import { db } from '../../database'; +import { MISSIONS_TABLE } from './types'; +import { createProposal } from '../proposals/store'; +import { getMission } from './store'; +import { runAsAsync } from '../../events/actor'; +import type { Mission, MissionIteration, PlanStep } from './types'; + +const processedIterations = new Set(); +let subscription: { unsubscribe: () => void } | null = null; + +/** + * Start subscribing to aiMissions changes. Each time a server iteration + * without staged proposals shows up, translate every PlanStep into a + * local Proposal under the originating mission's AI actor. + * + * Idempotent — calling twice is a no-op. Returns a stop function. + */ +export function startServerIterationStaging(): () => void { + if (subscription) return stopServerIterationStaging; + + const obs = liveQuery(() => db.table(MISSIONS_TABLE).toArray()); + subscription = obs.subscribe({ + next: async (missions) => { + for (const m of missions) { + if (m.deletedAt) continue; + for (const it of m.iterations) { + if (it.source !== 'server') continue; + if (processedIterations.has(it.id)) continue; + // Pre-check: if any plan step already has a proposalId, the + // server iteration was already staged (possibly by another + // tab). Mark as processed so we don't race. + const alreadyStaged = it.plan.some( + (s) => typeof s.proposalId === 'string' && s.proposalId.length > 0 + ); + if (alreadyStaged) { + processedIterations.add(it.id); + continue; + } + try { + await stageIteration(m, it); + processedIterations.add(it.id); + } catch (err) { + console.error( + `[server-staging] mission=${m.id} iteration=${it.id} failed:`, + err instanceof Error ? err.message : String(err) + ); + } + } + } + }, + error: (err) => { + console.error('[server-staging] subscription error:', err); + }, + }); + return stopServerIterationStaging; +} + +export function stopServerIterationStaging(): void { + subscription?.unsubscribe(); + subscription = null; +} + +/** Test hook — forget which iterations we've already staged. */ +export function resetServerIterationStagingCache(): void { + processedIterations.clear(); +} + +async function stageIteration(mission: Mission, iteration: MissionIteration): Promise { + // Re-read the freshest mission so concurrent local edits don't get + // clobbered when we write proposalIds back into `plan[]`. + const fresh = await getMission(mission.id); + if (!fresh) return; + const stagedStepIds: Record = {}; + + for (const step of iteration.plan) { + const intent = step.intent; + if (intent.kind !== 'toolCall') continue; + if (step.proposalId) continue; // already staged + + const actor = { + kind: 'ai' as const, + missionId: mission.id, + iterationId: iteration.id, + rationale: step.summary || iteration.summary || mission.objective, + }; + + // createProposal runs through Dexie hooks under the AI actor — the + // row lands in `pendingProposals` and the AiProposalInbox renders + // it as a ghost card on the relevant module page. + const proposal = await runAsAsync(actor, () => + createProposal({ + actor, + intent: { + kind: 'toolCall', + toolName: intent.toolName, + params: intent.params, + }, + rationale: actor.rationale, + }) + ); + stagedStepIds[step.id] = proposal.id; + } + + if (Object.keys(stagedStepIds).length === 0) return; + + // Write proposalIds back onto the iteration's plan[] so the Workbench + // UI links each step to its proposal AND so other tabs skip re-staging. + const updatedIterations: MissionIteration[] = fresh.iterations.map((it) => { + if (it.id !== iteration.id) return it; + const updatedPlan: PlanStep[] = it.plan.map((s) => + stagedStepIds[s.id] ? { ...s, proposalId: stagedStepIds[s.id], status: 'staged' as const } : s + ); + return { ...it, plan: updatedPlan }; + }); + await db.table(MISSIONS_TABLE).update(fresh.id, { + iterations: updatedIterations, + updatedAt: new Date().toISOString(), + }); +} diff --git a/apps/mana/apps/web/src/lib/data/events/actor.ts b/apps/mana/apps/web/src/lib/data/events/actor.ts index 200f369eb..41c1688b8 100644 --- a/apps/mana/apps/web/src/lib/data/events/actor.ts +++ b/apps/mana/apps/web/src/lib/data/events/actor.ts @@ -35,7 +35,7 @@ export type Actor = | { readonly kind: 'system'; /** Subsystem responsible for this derived write. */ - readonly source: 'projection' | 'rule' | 'migration'; + readonly source: 'projection' | 'rule' | 'migration' | 'mission-runner'; }; export const USER_ACTOR: Actor = Object.freeze({ kind: 'user' }); diff --git a/apps/mana/apps/web/src/routes/(app)/+layout.svelte b/apps/mana/apps/web/src/routes/(app)/+layout.svelte index ae2eab66f..0c8ed1193 100644 --- a/apps/mana/apps/web/src/routes/(app)/+layout.svelte +++ b/apps/mana/apps/web/src/routes/(app)/+layout.svelte @@ -7,6 +7,10 @@ import { todoReminderSource } from '$lib/modules/todo/reminder-source'; import { startEventStore, stopEventStore } from '$lib/data/events/event-store'; import { startMissionTick, stopMissionTick } from '$lib/data/ai/missions/setup'; + import { + startServerIterationStaging, + stopServerIterationStaging, + } from '$lib/data/ai/missions/server-iteration-staging'; import { initTools } from '$lib/data/tools/init'; import { startEventBridge, stopEventBridge } from '$lib/triggers/event-bridge'; import { startStreakTracker, stopStreakTracker } from '$lib/data/projections/streaks'; @@ -206,33 +210,41 @@ authStore.isAuthenticated ? authStore.user?.email || $_('nav.menu') : '' ); - // ── Tags ──────────────────────────────────────────────── + // ── Bottom-stack single-bar policy ─────────────────────── + // Only one bar may be open at a time. Opening one closes the others. const allTags = useAllTags(); let isTagStripVisible = $state(false); - function handleTagStripToggle() { - isTagStripVisible = !isTagStripVisible; - } - - // ── QuickInputBar visibility (toggled by the "search" pill) ── - let isQuickInputVisible = $state(true); - function handleQuickInputToggle() { - isQuickInputVisible = !isQuickInputVisible; - } - - // ── Workbench tab bar visibility (toggled by the "tabs" pill) ── - // Controls whether the page-injected bottomBar (SceneAppBar on /) is rendered. - let isBottomBarVisible = $state(true); - function handleBottomBarToggle() { - isBottomBarVisible = !isBottomBarVisible; - } - - // ── Dropdown-as-bar ────────────────────────────────────── - // Theme / AI tier / Sync / User-menu dropdowns are surfaced as - // bars in the bottom stack instead of floating popovers. PillNavigation - // calls handleOpenBar with a PillBarConfig (or null to close); we - // render the items via PillDropdownBar just above the PillNav. + let isQuickInputVisible = $state(false); + let isBottomBarVisible = $state(false); let activeBar = $state(null); + + function closeAllBars() { + isTagStripVisible = false; + isQuickInputVisible = false; + isBottomBarVisible = false; + activeBar = null; + } + + function handleTagStripToggle() { + const next = !isTagStripVisible; + closeAllBars(); + isTagStripVisible = next; + } + + function handleQuickInputToggle() { + const next = !isQuickInputVisible; + closeAllBars(); + isQuickInputVisible = next; + } + + function handleBottomBarToggle() { + const next = !isBottomBarVisible; + closeAllBars(); + isBottomBarVisible = next; + } + function handleOpenBar(config: PillBarConfig | null) { + closeAllBars(); activeBar = config; } function closeActiveBar() { @@ -250,10 +262,10 @@ isFullscreen ? 0 : (isCollapsed ? 0 : 80) + - (activeBar ? 56 : 0) + - (isTagStripVisible ? 44 : 0) + - (isQuickInputVisible ? 72 : 0) + - (isBottomBarVisible && bottomBarStore.component ? 36 : 0) + (activeBar ? 64 : 0) + + (isTagStripVisible ? 64 : 0) + + (isQuickInputVisible ? 64 : 0) + + (isBottomBarVisible && bottomBarStore.component ? 64 : 0) ); // ── DnD context ───────────────────────────────────────── @@ -298,11 +310,11 @@ let baseNavItems = $derived([ { href: '/', - label: $_('nav.tags'), - icon: 'tag', + label: 'Workbench-Tabs', + icon: 'columns', iconOnly: true, - onClick: handleTagStripToggle, - active: isTagStripVisible, + onClick: handleBottomBarToggle, + active: isBottomBarVisible, }, { href: '/', @@ -314,11 +326,11 @@ }, { href: '/', - label: 'Workbench-Tabs', - icon: 'columns', + label: $_('nav.tags'), + icon: 'tag', iconOnly: true, - onClick: handleBottomBarToggle, - active: isBottomBarVisible, + onClick: handleTagStripToggle, + active: isTagStripVisible, }, ]); @@ -514,6 +526,11 @@ // interval and runs any that are due. Safe idempotent; see // data/ai/missions/setup.ts. startMissionTick(); + // Staging-effect: subscribes to Mission updates and translates + // server-produced iterations (source='server') into local + // Proposals. Essential once the mana-ai service is running + // alongside; no-op when only the foreground tick is active. + startServerIterationStaging(); }); // Restore nav collapsed state (cheap, keep inline) @@ -610,6 +627,7 @@ stopEventBridge(); stopStreakTracker(); stopMissionTick(); + stopServerIterationStaging(); guestMode?.destroy(); // Fire-and-forget — we don't need to await; the in-flight task // will finish in the background and the next page session will @@ -921,7 +939,7 @@ showLogout={authStore.isAuthenticated} loginHref="/login" primaryColor="#6366f1" - showAppSwitcher={true} + showAppSwitcher={false} showAiTierSelector={true} aiTierItems={aiTier.items} currentAiTierLabel={aiTier.label} @@ -1017,9 +1035,9 @@ display: flex; flex-direction: column; align-items: stretch; - /* Uniform small gap between bars instead of each wrapper - providing its own ad-hoc padding-bottom. */ - gap: 0.25rem; + /* Bars stack flush — each bar's own wrapper controls its height, + so visual breathing room comes from inside the bar, not the gap. */ + gap: 0; pointer-events: none; padding-bottom: env(safe-area-inset-bottom, 0px); } diff --git a/packages/shared-ai/src/actor.ts b/packages/shared-ai/src/actor.ts index abfe834a3..d21119098 100644 --- a/packages/shared-ai/src/actor.ts +++ b/packages/shared-ai/src/actor.ts @@ -18,7 +18,7 @@ export type Actor = } | { readonly kind: 'system'; - readonly source: 'projection' | 'rule' | 'migration'; + readonly source: 'projection' | 'rule' | 'migration' | 'mission-runner'; }; export const USER_ACTOR: Actor = Object.freeze({ kind: 'user' }); diff --git a/services/mana-ai/src/cron/tick.ts b/services/mana-ai/src/cron/tick.ts index edc166497..aa6d93785 100644 --- a/services/mana-ai/src/cron/tick.ts +++ b/services/mana-ai/src/cron/tick.ts @@ -21,6 +21,7 @@ import { } from '@mana/shared-ai'; import { getSql } from '../db/connection'; import { listDueMissions, type ServerMission } from '../db/missions-projection'; +import { appendServerIteration, planToIteration } from '../db/iteration-writer'; import { PlannerClient } from '../planner/client'; import { AI_AVAILABLE_TOOLS, AI_AVAILABLE_TOOL_NAMES } from '../planner/tools'; import type { Config } from '../config'; @@ -29,6 +30,7 @@ export interface TickStats { scannedAt: string; dueMissionCount: number; plansProduced: number; + plansWrittenBack: number; parseFailures: number; errors: string[]; } @@ -42,6 +44,7 @@ export async function runTickOnce(config: Config): Promise { scannedAt: new Date().toISOString(), dueMissionCount: 0, plansProduced: 0, + plansWrittenBack: 0, parseFailures: 0, errors: ['overlap-skipped'], }; @@ -50,6 +53,7 @@ export async function runTickOnce(config: Config): Promise { const errors: string[] = []; let dueMissionCount = 0; let plansProduced = 0; + let plansWrittenBack = 0; let parseFailures = 0; const scannedAt = new Date().toISOString(); @@ -59,7 +63,14 @@ export async function runTickOnce(config: Config): Promise { dueMissionCount = missions.length; if (missions.length === 0) - return { scannedAt, dueMissionCount, plansProduced, parseFailures, errors }; + return { + scannedAt, + dueMissionCount, + plansProduced, + plansWrittenBack, + parseFailures, + errors, + }; const planner = new PlannerClient(config.manaLlmUrl, config.serviceKey); @@ -71,19 +82,28 @@ export async function runTickOnce(config: Config): Promise { continue; } plansProduced++; + + const nowIso = new Date().toISOString(); + const iterationId = crypto.randomUUID(); + const newIteration = planToIteration(plan, iterationId, nowIso); + const allIterations = [...m.iterations, newIteration] as (typeof newIteration)[]; + + await appendServerIteration(sql, { + userId: m.userId, + missionId: m.id, + allIterations, + newIteration, + nowIso, + }); + plansWrittenBack++; + console.log( - `[mana-ai tick] mission=${m.id} user=${m.userId} plan=${plan.steps.length}step(s) summary=${JSON.stringify( - plan.summary - )}` + `[mana-ai tick] mission=${m.id} user=${m.userId} plan=${plan.steps.length}step(s) iteration=${iterationId}` ); - // TODO: write plan back as `Mission.iterations[]` entry with - // `source: 'server'` so the webapp staging-effect can turn - // each PlannedStep into a local Proposal. Requires RLS- - // scoped write helper (see CLAUDE.md, design option A). } catch (err) { const msg = err instanceof Error ? err.message : String(err); errors.push(`mission=${m.id}: ${msg}`); - console.error(`[mana-ai tick] mission=${m.id} plan failed:`, msg); + console.error(`[mana-ai tick] mission=${m.id} run failed:`, msg); } } } catch (err) { @@ -94,7 +114,7 @@ export async function runTickOnce(config: Config): Promise { running = false; } - return { scannedAt, dueMissionCount, plansProduced, parseFailures, errors }; + return { scannedAt, dueMissionCount, plansProduced, plansWrittenBack, parseFailures, errors }; } /** diff --git a/services/mana-ai/src/db/connection.ts b/services/mana-ai/src/db/connection.ts index 6d29c3a91..3e5fc7552 100644 --- a/services/mana-ai/src/db/connection.ts +++ b/services/mana-ai/src/db/connection.ts @@ -29,3 +29,25 @@ export async function closeSql(): Promise { _sql = null; } } + +/** + * Run `fn` inside a transaction scoped to the given user_id via the same + * RLS convention mana-sync's Go `withUser` uses: set the session-local + * `app.current_user_id` setting so the existing `sync_changes_user_isolation` + * policy only exposes that user's rows. + * + * Empty userIDs are refused up front — an unauthenticated caller must + * never reach the DB with an empty RLS scope (which would match every + * row the policy allows). + */ +export async function withUser( + sql: Sql, + userId: string, + fn: (tx: postgres.TransactionSql) => Promise +): Promise { + if (!userId) throw new Error('withUser: empty userID'); + return sql.begin(async (tx) => { + await tx`SELECT set_config('app.current_user_id', ${userId}, true)`; + return fn(tx); + }) as Promise; +} diff --git a/services/mana-ai/src/db/iteration-writer.ts b/services/mana-ai/src/db/iteration-writer.ts new file mode 100644 index 000000000..3f28956b1 --- /dev/null +++ b/services/mana-ai/src/db/iteration-writer.ts @@ -0,0 +1,110 @@ +/** + * Append a server-produced iteration to an existing Mission by inserting + * a `sync_changes` row of op='update' carrying the new `iterations` + * array in `fields`. The row is attributed to the mission-runner system + * actor so the Workbench timeline on the user's device distinguishes it + * from their own edits. + * + * The write is RLS-scoped via `withUser` so a compromised DB role can't + * leak the iteration across users even if the row's user_id column were + * wrong. The caller is responsible for passing the correct userId (from + * the mission projection). + * + * The webapp picks up this row on next sync, `applyServerChanges` merges + * the updated `iterations` array into the local Mission record, and the + * staging-effect translates each PlanStep into a local Proposal. + */ + +import type { Sql } from './connection'; +import { withUser } from './connection'; +import type { AiPlanOutput, MissionIteration, PlanStep } from '@mana/shared-ai'; + +export interface AppendIterationInput { + userId: string; + missionId: string; + /** Full `iterations` array AFTER appending the new entry. + * Caller reads the current mission, appends, passes the full array + * — matches the webapp's `finishIteration` shape. */ + allIterations: readonly MissionIteration[]; + /** The iteration just appended — its `id` is also embedded in every + * PlanStep's intent so the webapp staging-effect can build + * `iteration-scoped` Proposals. */ + newIteration: MissionIteration; + /** When the write happened — used as the per-field updatedAt stamp + * and the sync_changes.created_at fallback. */ + nowIso: string; +} + +/** Actor blob stamped on the sync_changes row. JSON string already — + * we pass it as `json.RawMessage` equivalent through pgx. */ +function systemActorJson(): string { + return JSON.stringify({ kind: 'system', source: 'mission-runner' }); +} + +export async function appendServerIteration(sql: Sql, input: AppendIterationInput): Promise { + const { userId, missionId, allIterations, nowIso } = input; + const fieldsPayload = { + iterations: { value: allIterations, updatedAt: nowIso }, + updatedAt: { value: nowIso, updatedAt: nowIso }, + }; + const fieldTimestamps = { + iterations: nowIso, + updatedAt: nowIso, + }; + // The mana-sync Go handler stores `data` on inserts and `fields` on + // updates — for our update we populate the `data` JSONB with the + // winning values and `field_timestamps` with the per-field stamps. + const data = { + iterations: allIterations, + updatedAt: nowIso, + }; + + // postgres.js's `tx.json()` types are strict about JSONValue; our + // structured MissionIteration[] has readonly fields that confuse the + // inferred type. Cast at the boundary — the JSON serialization still + // happens correctly at runtime. + const dataJson = data as unknown; + const ftJson = fieldTimestamps as unknown; + const actorJson = JSON.parse(systemActorJson()) as unknown; + + await withUser(sql, userId, async (tx) => { + await tx` + INSERT INTO sync_changes + (app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor) + VALUES + ('ai', 'aiMissions', ${missionId}, ${userId}, 'update', + ${tx.json(dataJson as never)}, ${tx.json(ftJson as never)}, + 'mana-ai-runner', 1, ${tx.json(actorJson as never)}) + `; + }); + + // fieldsPayload is kept as a named local so a future refactor that + // needs to emit a `fields`-shaped payload (if mana-sync ever rejects + // `data` for updates) has a ready-made map to send. Current contract + // accepts either. + void fieldsPayload; +} + +/** Convert an {@link AiPlanOutput} from the shared parser into the + * inline-stored MissionIteration shape. */ +export function planToIteration( + plan: AiPlanOutput, + iterationId: string, + nowIso: string +): MissionIteration { + const steps: PlanStep[] = plan.steps.map((ps, i) => ({ + id: `${iterationId}-${i}`, + summary: ps.summary, + intent: { kind: 'toolCall', toolName: ps.toolName, params: ps.params }, + status: 'planned', + })); + return { + id: iterationId, + startedAt: nowIso, + finishedAt: nowIso, + plan: steps, + summary: plan.summary, + overallStatus: plan.steps.length === 0 ? 'approved' : 'awaiting-review', + source: 'server', + }; +}