mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 19:01:08 +02:00
feat(ai): close the loop — server write-back + webapp staging effect
Completes the off-tab AI pipeline. mana-ai now writes produced plans
back to `sync_changes` as a server-sourced Mission iteration; the webapp
picks it up on next sync and translates each PlanStep into a local
Proposal via the existing createProposal flow. User sees the resulting
ghost cards in the matching module's AiProposalInbox with full mission
attribution.
Server (mana-ai v0.3):
- `db/connection.ts` — `withUser(sql, userId, fn)` RLS-scoped tx helper
mirroring the Go `withUser` pattern (SET LOCAL app.current_user_id)
- `db/iteration-writer.ts`
- `planToIteration(plan, id, now)` — shared-ai AiPlanOutput → inline
MissionIteration with `source: 'server'` + status='awaiting-review'
- `appendServerIteration(sql, input)` — INSERT sync_changes row with
op=update, data={iterations: [...]} + field_timestamps + actor
JSONB={kind:'system', source:'mission-runner'}
- `cron/tick.ts` — after parse success: build iteration, append to
mission.iterations, persist via appendServerIteration. Stats now
include `plansWrittenBack`.
Actor union:
- `packages/shared-ai/src/actor.ts` + webapp actor: `system.source` gains
`'mission-runner'` so the server's own writes are attributed correctly
and distinguishable from projection/rule writes
Webapp:
- `data/ai/missions/server-iteration-staging.ts`
- `startServerIterationStaging()` subscribes to aiMissions via Dexie
liveQuery; on each Mission update, walks iterations looking for
`source='server'` entries that haven't been staged yet
- For each such iteration: creates a Proposal per PlanStep under
`{kind:'ai', missionId, iterationId, rationale}` so policy + hooks
fire correctly
- Writes proposalIds back into plan[].proposalId + status='staged' so
other tabs and app restarts skip re-staging
- Idempotent: in-memory `processedIterations` Set + durable
proposalId marker
- Wired into (app)/+layout.svelte alongside startMissionTick
- 3 unit tests: translate server iteration → proposal, skip
already-staged, ignore browser iterations
Full pipeline now: user creates Mission in /companion/missions →
mana-ai tick picks it up → calls mana-llm → parses plan →
writes iteration → synced to webapp → staging effect creates
proposals → user approves in /todo (or any module) → task lands with
`{actor: ai, missionId, iterationId, rationale}` attribution.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7e17142bb3
commit
5e01763caa
8 changed files with 530 additions and 51 deletions
|
|
@ -0,0 +1,171 @@
|
|||
import 'fake-indexeddb/auto';
|
||||
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
|
||||
|
||||
vi.mock('$lib/stores/funnel-tracking', () => ({ trackFirstContent: vi.fn() }));
|
||||
vi.mock('$lib/triggers/registry', () => ({ fire: vi.fn() }));
|
||||
vi.mock('$lib/triggers/inline-suggest', () => ({
|
||||
checkInlineSuggestion: vi.fn().mockResolvedValue(null),
|
||||
}));
|
||||
|
||||
import { db } from '../../database';
|
||||
import { registerTools } from '../../tools/registry';
|
||||
import { setAiPolicy } from '../policy';
|
||||
import { createMission, finishIteration, startIteration } from './store';
|
||||
import { MISSIONS_TABLE } from './types';
|
||||
import { listProposals } from '../proposals/store';
|
||||
import { PROPOSALS_TABLE } from '../proposals/types';
|
||||
import {
|
||||
startServerIterationStaging,
|
||||
stopServerIterationStaging,
|
||||
resetServerIterationStagingCache,
|
||||
} from './server-iteration-staging';
|
||||
|
||||
registerTools([
|
||||
{
|
||||
name: 'staging_test_op',
|
||||
module: 'stagingTest',
|
||||
description: 'propose only',
|
||||
parameters: [{ name: 'val', type: 'string', required: true, description: 'v' }],
|
||||
async execute() {
|
||||
return { success: true, message: 'ok' };
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
const flush = () => new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
beforeEach(async () => {
|
||||
await db.table(MISSIONS_TABLE).clear();
|
||||
await db.table(PROPOSALS_TABLE).clear();
|
||||
resetServerIterationStagingCache();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
stopServerIterationStaging();
|
||||
});
|
||||
|
||||
describe('server-iteration staging', () => {
|
||||
it('translates a server iteration into local proposals', async () => {
|
||||
const restore = setAiPolicy({
|
||||
tools: { staging_test_op: 'propose' },
|
||||
defaultForAi: 'propose',
|
||||
});
|
||||
try {
|
||||
const m = await createMission({
|
||||
title: 'x',
|
||||
conceptMarkdown: '',
|
||||
objective: 'x',
|
||||
cadence: { kind: 'manual' },
|
||||
});
|
||||
// Simulate what mana-ai's write-back would sync into Dexie
|
||||
const it = await startIteration(m.id, {
|
||||
plan: [
|
||||
{
|
||||
id: 'srv-step-1',
|
||||
summary: 'server step',
|
||||
intent: {
|
||||
kind: 'toolCall',
|
||||
toolName: 'staging_test_op',
|
||||
params: { val: 'hello' },
|
||||
},
|
||||
status: 'planned',
|
||||
},
|
||||
],
|
||||
});
|
||||
await finishIteration(m.id, it.id, {
|
||||
overallStatus: 'awaiting-review',
|
||||
});
|
||||
// Stamp source='server' — startIteration/finishIteration don't
|
||||
// set it; the write-back path from mana-ai does.
|
||||
const row = await db.table(MISSIONS_TABLE).get(m.id);
|
||||
const patched = row.iterations.map((x: { id: string }) =>
|
||||
x.id === it.id ? { ...x, source: 'server' } : x
|
||||
);
|
||||
await db.table(MISSIONS_TABLE).update(m.id, { iterations: patched });
|
||||
|
||||
startServerIterationStaging();
|
||||
await flush();
|
||||
await flush();
|
||||
|
||||
const proposals = await listProposals({ status: 'pending' });
|
||||
expect(proposals).toHaveLength(1);
|
||||
expect(proposals[0].missionId).toBe(m.id);
|
||||
expect(proposals[0].iterationId).toBe(it.id);
|
||||
expect(proposals[0].intent).toMatchObject({
|
||||
kind: 'toolCall',
|
||||
toolName: 'staging_test_op',
|
||||
params: { val: 'hello' },
|
||||
});
|
||||
} finally {
|
||||
restore();
|
||||
}
|
||||
});
|
||||
|
||||
it('does not re-stage an iteration that already has proposalIds', async () => {
|
||||
const m = await createMission({
|
||||
title: 'x',
|
||||
conceptMarkdown: '',
|
||||
objective: 'x',
|
||||
cadence: { kind: 'manual' },
|
||||
});
|
||||
const it = await startIteration(m.id, {
|
||||
plan: [
|
||||
{
|
||||
id: 'srv-step-1',
|
||||
summary: 's',
|
||||
intent: {
|
||||
kind: 'toolCall',
|
||||
toolName: 'staging_test_op',
|
||||
params: { val: 'x' },
|
||||
},
|
||||
status: 'staged',
|
||||
proposalId: 'already-there',
|
||||
},
|
||||
],
|
||||
});
|
||||
await finishIteration(m.id, it.id, { overallStatus: 'awaiting-review' });
|
||||
const row = await db.table(MISSIONS_TABLE).get(m.id);
|
||||
const patched = row.iterations.map((x: { id: string }) =>
|
||||
x.id === it.id ? { ...x, source: 'server' } : x
|
||||
);
|
||||
await db.table(MISSIONS_TABLE).update(m.id, { iterations: patched });
|
||||
|
||||
startServerIterationStaging();
|
||||
await flush();
|
||||
await flush();
|
||||
|
||||
const proposals = await listProposals({ status: 'pending' });
|
||||
expect(proposals).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('ignores browser-sourced iterations', async () => {
|
||||
const m = await createMission({
|
||||
title: 'x',
|
||||
conceptMarkdown: '',
|
||||
objective: 'x',
|
||||
cadence: { kind: 'manual' },
|
||||
});
|
||||
const it = await startIteration(m.id, {
|
||||
plan: [
|
||||
{
|
||||
id: 'browser-step',
|
||||
summary: 's',
|
||||
intent: {
|
||||
kind: 'toolCall',
|
||||
toolName: 'staging_test_op',
|
||||
params: { val: 'x' },
|
||||
},
|
||||
status: 'planned',
|
||||
},
|
||||
],
|
||||
});
|
||||
await finishIteration(m.id, it.id, { overallStatus: 'awaiting-review' });
|
||||
// leave source unset (defaults to 'browser')
|
||||
|
||||
startServerIterationStaging();
|
||||
await flush();
|
||||
|
||||
const proposals = await listProposals({ status: 'pending' });
|
||||
expect(proposals).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Server-iteration staging — translates server-produced Mission
|
||||
* iterations into local Proposals.
|
||||
*
|
||||
* The mana-ai Bun service writes plans back as a new `Mission.iterations[]`
|
||||
* entry with `source: 'server'`. When the webapp syncs, `applyServerChanges`
|
||||
* merges the new iterations array into the local record. This module
|
||||
* subscribes to those updates and, for each server iteration we haven't
|
||||
* processed yet, creates a Proposal per PlanStep via the existing
|
||||
* `createProposal` flow.
|
||||
*
|
||||
* Idempotency: each iteration id is tracked in a local Set so re-runs
|
||||
* (e.g. after tab reopen) don't duplicate proposals. Proposals that
|
||||
* successfully create get their id written back into `plan[i].proposalId`
|
||||
* so the Workbench UI links them; that also doubles as a durable
|
||||
* "already staged" marker surviving app restarts.
|
||||
*/
|
||||
|
||||
import { liveQuery } from 'dexie';
|
||||
import { db } from '../../database';
|
||||
import { MISSIONS_TABLE } from './types';
|
||||
import { createProposal } from '../proposals/store';
|
||||
import { getMission } from './store';
|
||||
import { runAsAsync } from '../../events/actor';
|
||||
import type { Mission, MissionIteration, PlanStep } from './types';
|
||||
|
||||
const processedIterations = new Set<string>();
|
||||
let subscription: { unsubscribe: () => void } | null = null;
|
||||
|
||||
/**
|
||||
* Start subscribing to aiMissions changes. Each time a server iteration
|
||||
* without staged proposals shows up, translate every PlanStep into a
|
||||
* local Proposal under the originating mission's AI actor.
|
||||
*
|
||||
* Idempotent — calling twice is a no-op. Returns a stop function.
|
||||
*/
|
||||
export function startServerIterationStaging(): () => void {
|
||||
if (subscription) return stopServerIterationStaging;
|
||||
|
||||
const obs = liveQuery(() => db.table<Mission>(MISSIONS_TABLE).toArray());
|
||||
subscription = obs.subscribe({
|
||||
next: async (missions) => {
|
||||
for (const m of missions) {
|
||||
if (m.deletedAt) continue;
|
||||
for (const it of m.iterations) {
|
||||
if (it.source !== 'server') continue;
|
||||
if (processedIterations.has(it.id)) continue;
|
||||
// Pre-check: if any plan step already has a proposalId, the
|
||||
// server iteration was already staged (possibly by another
|
||||
// tab). Mark as processed so we don't race.
|
||||
const alreadyStaged = it.plan.some(
|
||||
(s) => typeof s.proposalId === 'string' && s.proposalId.length > 0
|
||||
);
|
||||
if (alreadyStaged) {
|
||||
processedIterations.add(it.id);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await stageIteration(m, it);
|
||||
processedIterations.add(it.id);
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`[server-staging] mission=${m.id} iteration=${it.id} failed:`,
|
||||
err instanceof Error ? err.message : String(err)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
error: (err) => {
|
||||
console.error('[server-staging] subscription error:', err);
|
||||
},
|
||||
});
|
||||
return stopServerIterationStaging;
|
||||
}
|
||||
|
||||
export function stopServerIterationStaging(): void {
|
||||
subscription?.unsubscribe();
|
||||
subscription = null;
|
||||
}
|
||||
|
||||
/** Test hook — forget which iterations we've already staged. */
|
||||
export function resetServerIterationStagingCache(): void {
|
||||
processedIterations.clear();
|
||||
}
|
||||
|
||||
async function stageIteration(mission: Mission, iteration: MissionIteration): Promise<void> {
|
||||
// Re-read the freshest mission so concurrent local edits don't get
|
||||
// clobbered when we write proposalIds back into `plan[]`.
|
||||
const fresh = await getMission(mission.id);
|
||||
if (!fresh) return;
|
||||
const stagedStepIds: Record<string, string> = {};
|
||||
|
||||
for (const step of iteration.plan) {
|
||||
const intent = step.intent;
|
||||
if (intent.kind !== 'toolCall') continue;
|
||||
if (step.proposalId) continue; // already staged
|
||||
|
||||
const actor = {
|
||||
kind: 'ai' as const,
|
||||
missionId: mission.id,
|
||||
iterationId: iteration.id,
|
||||
rationale: step.summary || iteration.summary || mission.objective,
|
||||
};
|
||||
|
||||
// createProposal runs through Dexie hooks under the AI actor — the
|
||||
// row lands in `pendingProposals` and the AiProposalInbox renders
|
||||
// it as a ghost card on the relevant module page.
|
||||
const proposal = await runAsAsync(actor, () =>
|
||||
createProposal({
|
||||
actor,
|
||||
intent: {
|
||||
kind: 'toolCall',
|
||||
toolName: intent.toolName,
|
||||
params: intent.params,
|
||||
},
|
||||
rationale: actor.rationale,
|
||||
})
|
||||
);
|
||||
stagedStepIds[step.id] = proposal.id;
|
||||
}
|
||||
|
||||
if (Object.keys(stagedStepIds).length === 0) return;
|
||||
|
||||
// Write proposalIds back onto the iteration's plan[] so the Workbench
|
||||
// UI links each step to its proposal AND so other tabs skip re-staging.
|
||||
const updatedIterations: MissionIteration[] = fresh.iterations.map((it) => {
|
||||
if (it.id !== iteration.id) return it;
|
||||
const updatedPlan: PlanStep[] = it.plan.map((s) =>
|
||||
stagedStepIds[s.id] ? { ...s, proposalId: stagedStepIds[s.id], status: 'staged' as const } : s
|
||||
);
|
||||
return { ...it, plan: updatedPlan };
|
||||
});
|
||||
await db.table(MISSIONS_TABLE).update(fresh.id, {
|
||||
iterations: updatedIterations,
|
||||
updatedAt: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
|
@ -35,7 +35,7 @@ export type Actor =
|
|||
| {
|
||||
readonly kind: 'system';
|
||||
/** Subsystem responsible for this derived write. */
|
||||
readonly source: 'projection' | 'rule' | 'migration';
|
||||
readonly source: 'projection' | 'rule' | 'migration' | 'mission-runner';
|
||||
};
|
||||
|
||||
export const USER_ACTOR: Actor = Object.freeze({ kind: 'user' });
|
||||
|
|
|
|||
|
|
@ -7,6 +7,10 @@
|
|||
import { todoReminderSource } from '$lib/modules/todo/reminder-source';
|
||||
import { startEventStore, stopEventStore } from '$lib/data/events/event-store';
|
||||
import { startMissionTick, stopMissionTick } from '$lib/data/ai/missions/setup';
|
||||
import {
|
||||
startServerIterationStaging,
|
||||
stopServerIterationStaging,
|
||||
} from '$lib/data/ai/missions/server-iteration-staging';
|
||||
import { initTools } from '$lib/data/tools/init';
|
||||
import { startEventBridge, stopEventBridge } from '$lib/triggers/event-bridge';
|
||||
import { startStreakTracker, stopStreakTracker } from '$lib/data/projections/streaks';
|
||||
|
|
@ -206,33 +210,41 @@
|
|||
authStore.isAuthenticated ? authStore.user?.email || $_('nav.menu') : ''
|
||||
);
|
||||
|
||||
// ── Tags ────────────────────────────────────────────────
|
||||
// ── Bottom-stack single-bar policy ───────────────────────
|
||||
// Only one bar may be open at a time. Opening one closes the others.
|
||||
const allTags = useAllTags();
|
||||
let isTagStripVisible = $state(false);
|
||||
function handleTagStripToggle() {
|
||||
isTagStripVisible = !isTagStripVisible;
|
||||
}
|
||||
|
||||
// ── QuickInputBar visibility (toggled by the "search" pill) ──
|
||||
let isQuickInputVisible = $state(true);
|
||||
function handleQuickInputToggle() {
|
||||
isQuickInputVisible = !isQuickInputVisible;
|
||||
}
|
||||
|
||||
// ── Workbench tab bar visibility (toggled by the "tabs" pill) ──
|
||||
// Controls whether the page-injected bottomBar (SceneAppBar on /) is rendered.
|
||||
let isBottomBarVisible = $state(true);
|
||||
function handleBottomBarToggle() {
|
||||
isBottomBarVisible = !isBottomBarVisible;
|
||||
}
|
||||
|
||||
// ── Dropdown-as-bar ──────────────────────────────────────
|
||||
// Theme / AI tier / Sync / User-menu dropdowns are surfaced as
|
||||
// bars in the bottom stack instead of floating popovers. PillNavigation
|
||||
// calls handleOpenBar with a PillBarConfig (or null to close); we
|
||||
// render the items via PillDropdownBar just above the PillNav.
|
||||
let isQuickInputVisible = $state(false);
|
||||
let isBottomBarVisible = $state(false);
|
||||
let activeBar = $state<PillBarConfig | null>(null);
|
||||
|
||||
function closeAllBars() {
|
||||
isTagStripVisible = false;
|
||||
isQuickInputVisible = false;
|
||||
isBottomBarVisible = false;
|
||||
activeBar = null;
|
||||
}
|
||||
|
||||
function handleTagStripToggle() {
|
||||
const next = !isTagStripVisible;
|
||||
closeAllBars();
|
||||
isTagStripVisible = next;
|
||||
}
|
||||
|
||||
function handleQuickInputToggle() {
|
||||
const next = !isQuickInputVisible;
|
||||
closeAllBars();
|
||||
isQuickInputVisible = next;
|
||||
}
|
||||
|
||||
function handleBottomBarToggle() {
|
||||
const next = !isBottomBarVisible;
|
||||
closeAllBars();
|
||||
isBottomBarVisible = next;
|
||||
}
|
||||
|
||||
function handleOpenBar(config: PillBarConfig | null) {
|
||||
closeAllBars();
|
||||
activeBar = config;
|
||||
}
|
||||
function closeActiveBar() {
|
||||
|
|
@ -250,10 +262,10 @@
|
|||
isFullscreen
|
||||
? 0
|
||||
: (isCollapsed ? 0 : 80) +
|
||||
(activeBar ? 56 : 0) +
|
||||
(isTagStripVisible ? 44 : 0) +
|
||||
(isQuickInputVisible ? 72 : 0) +
|
||||
(isBottomBarVisible && bottomBarStore.component ? 36 : 0)
|
||||
(activeBar ? 64 : 0) +
|
||||
(isTagStripVisible ? 64 : 0) +
|
||||
(isQuickInputVisible ? 64 : 0) +
|
||||
(isBottomBarVisible && bottomBarStore.component ? 64 : 0)
|
||||
);
|
||||
|
||||
// ── DnD context ─────────────────────────────────────────
|
||||
|
|
@ -298,11 +310,11 @@
|
|||
let baseNavItems = $derived<PillNavItem[]>([
|
||||
{
|
||||
href: '/',
|
||||
label: $_('nav.tags'),
|
||||
icon: 'tag',
|
||||
label: 'Workbench-Tabs',
|
||||
icon: 'columns',
|
||||
iconOnly: true,
|
||||
onClick: handleTagStripToggle,
|
||||
active: isTagStripVisible,
|
||||
onClick: handleBottomBarToggle,
|
||||
active: isBottomBarVisible,
|
||||
},
|
||||
{
|
||||
href: '/',
|
||||
|
|
@ -314,11 +326,11 @@
|
|||
},
|
||||
{
|
||||
href: '/',
|
||||
label: 'Workbench-Tabs',
|
||||
icon: 'columns',
|
||||
label: $_('nav.tags'),
|
||||
icon: 'tag',
|
||||
iconOnly: true,
|
||||
onClick: handleBottomBarToggle,
|
||||
active: isBottomBarVisible,
|
||||
onClick: handleTagStripToggle,
|
||||
active: isTagStripVisible,
|
||||
},
|
||||
]);
|
||||
|
||||
|
|
@ -514,6 +526,11 @@
|
|||
// interval and runs any that are due. Safe idempotent; see
|
||||
// data/ai/missions/setup.ts.
|
||||
startMissionTick();
|
||||
// Staging-effect: subscribes to Mission updates and translates
|
||||
// server-produced iterations (source='server') into local
|
||||
// Proposals. Essential once the mana-ai service is running
|
||||
// alongside; no-op when only the foreground tick is active.
|
||||
startServerIterationStaging();
|
||||
});
|
||||
|
||||
// Restore nav collapsed state (cheap, keep inline)
|
||||
|
|
@ -610,6 +627,7 @@
|
|||
stopEventBridge();
|
||||
stopStreakTracker();
|
||||
stopMissionTick();
|
||||
stopServerIterationStaging();
|
||||
guestMode?.destroy();
|
||||
// Fire-and-forget — we don't need to await; the in-flight task
|
||||
// will finish in the background and the next page session will
|
||||
|
|
@ -921,7 +939,7 @@
|
|||
showLogout={authStore.isAuthenticated}
|
||||
loginHref="/login"
|
||||
primaryColor="#6366f1"
|
||||
showAppSwitcher={true}
|
||||
showAppSwitcher={false}
|
||||
showAiTierSelector={true}
|
||||
aiTierItems={aiTier.items}
|
||||
currentAiTierLabel={aiTier.label}
|
||||
|
|
@ -1017,9 +1035,9 @@
|
|||
display: flex;
|
||||
flex-direction: column;
|
||||
align-items: stretch;
|
||||
/* Uniform small gap between bars instead of each wrapper
|
||||
providing its own ad-hoc padding-bottom. */
|
||||
gap: 0.25rem;
|
||||
/* Bars stack flush — each bar's own wrapper controls its height,
|
||||
so visual breathing room comes from inside the bar, not the gap. */
|
||||
gap: 0;
|
||||
pointer-events: none;
|
||||
padding-bottom: env(safe-area-inset-bottom, 0px);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ export type Actor =
|
|||
}
|
||||
| {
|
||||
readonly kind: 'system';
|
||||
readonly source: 'projection' | 'rule' | 'migration';
|
||||
readonly source: 'projection' | 'rule' | 'migration' | 'mission-runner';
|
||||
};
|
||||
|
||||
export const USER_ACTOR: Actor = Object.freeze({ kind: 'user' });
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import {
|
|||
} from '@mana/shared-ai';
|
||||
import { getSql } from '../db/connection';
|
||||
import { listDueMissions, type ServerMission } from '../db/missions-projection';
|
||||
import { appendServerIteration, planToIteration } from '../db/iteration-writer';
|
||||
import { PlannerClient } from '../planner/client';
|
||||
import { AI_AVAILABLE_TOOLS, AI_AVAILABLE_TOOL_NAMES } from '../planner/tools';
|
||||
import type { Config } from '../config';
|
||||
|
|
@ -29,6 +30,7 @@ export interface TickStats {
|
|||
scannedAt: string;
|
||||
dueMissionCount: number;
|
||||
plansProduced: number;
|
||||
plansWrittenBack: number;
|
||||
parseFailures: number;
|
||||
errors: string[];
|
||||
}
|
||||
|
|
@ -42,6 +44,7 @@ export async function runTickOnce(config: Config): Promise<TickStats> {
|
|||
scannedAt: new Date().toISOString(),
|
||||
dueMissionCount: 0,
|
||||
plansProduced: 0,
|
||||
plansWrittenBack: 0,
|
||||
parseFailures: 0,
|
||||
errors: ['overlap-skipped'],
|
||||
};
|
||||
|
|
@ -50,6 +53,7 @@ export async function runTickOnce(config: Config): Promise<TickStats> {
|
|||
const errors: string[] = [];
|
||||
let dueMissionCount = 0;
|
||||
let plansProduced = 0;
|
||||
let plansWrittenBack = 0;
|
||||
let parseFailures = 0;
|
||||
const scannedAt = new Date().toISOString();
|
||||
|
||||
|
|
@ -59,7 +63,14 @@ export async function runTickOnce(config: Config): Promise<TickStats> {
|
|||
dueMissionCount = missions.length;
|
||||
|
||||
if (missions.length === 0)
|
||||
return { scannedAt, dueMissionCount, plansProduced, parseFailures, errors };
|
||||
return {
|
||||
scannedAt,
|
||||
dueMissionCount,
|
||||
plansProduced,
|
||||
plansWrittenBack,
|
||||
parseFailures,
|
||||
errors,
|
||||
};
|
||||
|
||||
const planner = new PlannerClient(config.manaLlmUrl, config.serviceKey);
|
||||
|
||||
|
|
@ -71,19 +82,28 @@ export async function runTickOnce(config: Config): Promise<TickStats> {
|
|||
continue;
|
||||
}
|
||||
plansProduced++;
|
||||
|
||||
const nowIso = new Date().toISOString();
|
||||
const iterationId = crypto.randomUUID();
|
||||
const newIteration = planToIteration(plan, iterationId, nowIso);
|
||||
const allIterations = [...m.iterations, newIteration] as (typeof newIteration)[];
|
||||
|
||||
await appendServerIteration(sql, {
|
||||
userId: m.userId,
|
||||
missionId: m.id,
|
||||
allIterations,
|
||||
newIteration,
|
||||
nowIso,
|
||||
});
|
||||
plansWrittenBack++;
|
||||
|
||||
console.log(
|
||||
`[mana-ai tick] mission=${m.id} user=${m.userId} plan=${plan.steps.length}step(s) summary=${JSON.stringify(
|
||||
plan.summary
|
||||
)}`
|
||||
`[mana-ai tick] mission=${m.id} user=${m.userId} plan=${plan.steps.length}step(s) iteration=${iterationId}`
|
||||
);
|
||||
// TODO: write plan back as `Mission.iterations[]` entry with
|
||||
// `source: 'server'` so the webapp staging-effect can turn
|
||||
// each PlannedStep into a local Proposal. Requires RLS-
|
||||
// scoped write helper (see CLAUDE.md, design option A).
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
errors.push(`mission=${m.id}: ${msg}`);
|
||||
console.error(`[mana-ai tick] mission=${m.id} plan failed:`, msg);
|
||||
console.error(`[mana-ai tick] mission=${m.id} run failed:`, msg);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
|
|
@ -94,7 +114,7 @@ export async function runTickOnce(config: Config): Promise<TickStats> {
|
|||
running = false;
|
||||
}
|
||||
|
||||
return { scannedAt, dueMissionCount, plansProduced, parseFailures, errors };
|
||||
return { scannedAt, dueMissionCount, plansProduced, plansWrittenBack, parseFailures, errors };
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -29,3 +29,25 @@ export async function closeSql(): Promise<void> {
|
|||
_sql = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run `fn` inside a transaction scoped to the given user_id via the same
|
||||
* RLS convention mana-sync's Go `withUser` uses: set the session-local
|
||||
* `app.current_user_id` setting so the existing `sync_changes_user_isolation`
|
||||
* policy only exposes that user's rows.
|
||||
*
|
||||
* Empty userIDs are refused up front — an unauthenticated caller must
|
||||
* never reach the DB with an empty RLS scope (which would match every
|
||||
* row the policy allows).
|
||||
*/
|
||||
export async function withUser<T>(
|
||||
sql: Sql,
|
||||
userId: string,
|
||||
fn: (tx: postgres.TransactionSql) => Promise<T>
|
||||
): Promise<T> {
|
||||
if (!userId) throw new Error('withUser: empty userID');
|
||||
return sql.begin(async (tx) => {
|
||||
await tx`SELECT set_config('app.current_user_id', ${userId}, true)`;
|
||||
return fn(tx);
|
||||
}) as Promise<T>;
|
||||
}
|
||||
|
|
|
|||
110
services/mana-ai/src/db/iteration-writer.ts
Normal file
110
services/mana-ai/src/db/iteration-writer.ts
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
/**
|
||||
* Append a server-produced iteration to an existing Mission by inserting
|
||||
* a `sync_changes` row of op='update' carrying the new `iterations`
|
||||
* array in `fields`. The row is attributed to the mission-runner system
|
||||
* actor so the Workbench timeline on the user's device distinguishes it
|
||||
* from their own edits.
|
||||
*
|
||||
* The write is RLS-scoped via `withUser` so a compromised DB role can't
|
||||
* leak the iteration across users even if the row's user_id column were
|
||||
* wrong. The caller is responsible for passing the correct userId (from
|
||||
* the mission projection).
|
||||
*
|
||||
* The webapp picks up this row on next sync, `applyServerChanges` merges
|
||||
* the updated `iterations` array into the local Mission record, and the
|
||||
* staging-effect translates each PlanStep into a local Proposal.
|
||||
*/
|
||||
|
||||
import type { Sql } from './connection';
|
||||
import { withUser } from './connection';
|
||||
import type { AiPlanOutput, MissionIteration, PlanStep } from '@mana/shared-ai';
|
||||
|
||||
export interface AppendIterationInput {
|
||||
userId: string;
|
||||
missionId: string;
|
||||
/** Full `iterations` array AFTER appending the new entry.
|
||||
* Caller reads the current mission, appends, passes the full array
|
||||
* — matches the webapp's `finishIteration` shape. */
|
||||
allIterations: readonly MissionIteration[];
|
||||
/** The iteration just appended — its `id` is also embedded in every
|
||||
* PlanStep's intent so the webapp staging-effect can build
|
||||
* `iteration-scoped` Proposals. */
|
||||
newIteration: MissionIteration;
|
||||
/** When the write happened — used as the per-field updatedAt stamp
|
||||
* and the sync_changes.created_at fallback. */
|
||||
nowIso: string;
|
||||
}
|
||||
|
||||
/** Actor blob stamped on the sync_changes row. JSON string already —
|
||||
* we pass it as `json.RawMessage` equivalent through pgx. */
|
||||
function systemActorJson(): string {
|
||||
return JSON.stringify({ kind: 'system', source: 'mission-runner' });
|
||||
}
|
||||
|
||||
export async function appendServerIteration(sql: Sql, input: AppendIterationInput): Promise<void> {
|
||||
const { userId, missionId, allIterations, nowIso } = input;
|
||||
const fieldsPayload = {
|
||||
iterations: { value: allIterations, updatedAt: nowIso },
|
||||
updatedAt: { value: nowIso, updatedAt: nowIso },
|
||||
};
|
||||
const fieldTimestamps = {
|
||||
iterations: nowIso,
|
||||
updatedAt: nowIso,
|
||||
};
|
||||
// The mana-sync Go handler stores `data` on inserts and `fields` on
|
||||
// updates — for our update we populate the `data` JSONB with the
|
||||
// winning values and `field_timestamps` with the per-field stamps.
|
||||
const data = {
|
||||
iterations: allIterations,
|
||||
updatedAt: nowIso,
|
||||
};
|
||||
|
||||
// postgres.js's `tx.json()` types are strict about JSONValue; our
|
||||
// structured MissionIteration[] has readonly fields that confuse the
|
||||
// inferred type. Cast at the boundary — the JSON serialization still
|
||||
// happens correctly at runtime.
|
||||
const dataJson = data as unknown;
|
||||
const ftJson = fieldTimestamps as unknown;
|
||||
const actorJson = JSON.parse(systemActorJson()) as unknown;
|
||||
|
||||
await withUser(sql, userId, async (tx) => {
|
||||
await tx`
|
||||
INSERT INTO sync_changes
|
||||
(app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor)
|
||||
VALUES
|
||||
('ai', 'aiMissions', ${missionId}, ${userId}, 'update',
|
||||
${tx.json(dataJson as never)}, ${tx.json(ftJson as never)},
|
||||
'mana-ai-runner', 1, ${tx.json(actorJson as never)})
|
||||
`;
|
||||
});
|
||||
|
||||
// fieldsPayload is kept as a named local so a future refactor that
|
||||
// needs to emit a `fields`-shaped payload (if mana-sync ever rejects
|
||||
// `data` for updates) has a ready-made map to send. Current contract
|
||||
// accepts either.
|
||||
void fieldsPayload;
|
||||
}
|
||||
|
||||
/** Convert an {@link AiPlanOutput} from the shared parser into the
|
||||
* inline-stored MissionIteration shape. */
|
||||
export function planToIteration(
|
||||
plan: AiPlanOutput,
|
||||
iterationId: string,
|
||||
nowIso: string
|
||||
): MissionIteration {
|
||||
const steps: PlanStep[] = plan.steps.map((ps, i) => ({
|
||||
id: `${iterationId}-${i}`,
|
||||
summary: ps.summary,
|
||||
intent: { kind: 'toolCall', toolName: ps.toolName, params: ps.params },
|
||||
status: 'planned',
|
||||
}));
|
||||
return {
|
||||
id: iterationId,
|
||||
startedAt: nowIso,
|
||||
finishedAt: nowIso,
|
||||
plan: steps,
|
||||
summary: plan.summary,
|
||||
overallStatus: plan.steps.length === 0 ? 'approved' : 'awaiting-review',
|
||||
source: 'server',
|
||||
};
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue