mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 22:21:10 +02:00
feat(ai): auto-execute server-planned iterations on the client
When mana-ai plans a mission tick in the background, it writes an iteration with source='server' and plan[].status='planned' — the server itself has no Dexie access, so those planned tool_calls have to run on the user's device. This commit adds the missing half. - server-iteration-executor.ts subscribes via Dexie liveQuery to server-sourced iterations with planned steps. For each one it reconstructs the AI actor (mission + agent + iteration), runs every step through executeTool, and writes the result status (approved / failed) back into the iteration. - Idempotency: a new local-only Dexie table `_serverIterationExecutions` (v30) marks iterations we've already run, so sync replays and page reloads don't re-execute. Also guarded by an in-flight Set because liveQuery fires rapidly during the execution's own writes. - Wired into (app)/+layout.svelte alongside startMissionTick — lives for the whole session, stops on teardown. This is the mirror of the old server-iteration-staging.ts but direct- execute instead of proposal-stage — no manual approval step. Same user-facing behaviour as foreground mission runs: they show up in the Workbench Timeline, revertable per iteration. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6d8637b837
commit
728027c478
3 changed files with 177 additions and 0 deletions
|
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* Server-iteration executor — applies server-planned iterations on the
|
||||
* user's device.
|
||||
*
|
||||
* mana-ai produces iterations with source='server' and plan[] entries
|
||||
* at status='planned'. The server can't touch Dexie, so those steps
|
||||
* have to execute on the client when sync delivers them. This module
|
||||
* subscribes to the Mission table, picks up newly arrived server
|
||||
* iterations, runs each tool_call through the local executor under
|
||||
* the AI actor, and flips the status in place (planned → approved /
|
||||
* failed). A Dexie marker table gives us idempotency so a sync replay
|
||||
* or a page reload doesn't re-run anything.
|
||||
*
|
||||
* Writes go to Dexie directly rather than through store mutations —
|
||||
* we need to update an iteration embedded in the Mission's iterations
|
||||
* array, which the store API doesn't expose yet.
|
||||
*/
|
||||
|
||||
import { liveQuery, type Observable, type Subscription } from 'dexie';
|
||||
import { db } from '../../database';
|
||||
import { executeTool } from '../../tools/executor';
|
||||
import { getAgent } from '../agents/store';
|
||||
import { makeAgentActor, LEGACY_AI_PRINCIPAL } from '../../events/actor';
|
||||
import { DEFAULT_AGENT_NAME } from '../agents/types';
|
||||
import { MISSIONS_TABLE } from './types';
|
||||
import type { Mission, MissionIteration, PlanStep } from './types';
|
||||
|
||||
/** Local-only Dexie table — tracks iterations we've already run so a
|
||||
* sync replay doesn't re-execute their tool_calls. */
|
||||
const MARKER_TABLE = '_serverIterationExecutions';
|
||||
|
||||
interface ExecutionMarker {
|
||||
iterationId: string;
|
||||
missionId: string;
|
||||
executedAt: string;
|
||||
overallStatus: 'approved' | 'failed';
|
||||
}
|
||||
|
||||
let subscription: Subscription | null = null;
|
||||
/** Per-iteration lock so concurrent subscription ticks don't both
|
||||
* try to execute the same iteration. liveQuery can fire rapidly
|
||||
* on Dexie writes (including ours, during tool execution). */
|
||||
const inFlight = new Set<string>();
|
||||
|
||||
export function startServerIterationExecutor(): void {
|
||||
if (subscription) return;
|
||||
|
||||
const stream: Observable<Array<{ mission: Mission; iteration: MissionIteration }>> = liveQuery(
|
||||
async () => {
|
||||
const missions = await db.table<Mission>(MISSIONS_TABLE).toArray();
|
||||
const work: Array<{ mission: Mission; iteration: MissionIteration }> = [];
|
||||
for (const m of missions) {
|
||||
for (const it of m.iterations) {
|
||||
if (it.source !== 'server') continue;
|
||||
if (!it.plan.some((s) => s.status === 'planned')) continue;
|
||||
work.push({ mission: m, iteration: it });
|
||||
}
|
||||
}
|
||||
return work;
|
||||
}
|
||||
);
|
||||
|
||||
subscription = stream.subscribe({
|
||||
next: async (work) => {
|
||||
for (const entry of work) {
|
||||
if (inFlight.has(entry.iteration.id)) continue;
|
||||
const already = await db.table<ExecutionMarker>(MARKER_TABLE).get(entry.iteration.id);
|
||||
if (already) continue;
|
||||
inFlight.add(entry.iteration.id);
|
||||
try {
|
||||
await executeServerIteration(entry.mission, entry.iteration);
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`[ServerIterationExecutor] mission=${entry.mission.id} iter=${entry.iteration.id} failed:`,
|
||||
err
|
||||
);
|
||||
} finally {
|
||||
inFlight.delete(entry.iteration.id);
|
||||
}
|
||||
}
|
||||
},
|
||||
error: (err) => console.error('[ServerIterationExecutor] stream error:', err),
|
||||
});
|
||||
}
|
||||
|
||||
export function stopServerIterationExecutor(): void {
|
||||
subscription?.unsubscribe();
|
||||
subscription = null;
|
||||
inFlight.clear();
|
||||
}
|
||||
|
||||
/** Internal: run one iteration's planned steps, then rewrite the
|
||||
* Mission record with updated step status + overallStatus. */
|
||||
async function executeServerIteration(
|
||||
mission: Mission,
|
||||
iteration: MissionIteration
|
||||
): Promise<void> {
|
||||
const owningAgent = mission.agentId ? await getAgent(mission.agentId) : null;
|
||||
const aiActor = makeAgentActor({
|
||||
agentId: owningAgent?.id ?? LEGACY_AI_PRINCIPAL,
|
||||
displayName: owningAgent?.name ?? DEFAULT_AGENT_NAME,
|
||||
missionId: mission.id,
|
||||
iterationId: iteration.id,
|
||||
rationale: mission.objective,
|
||||
});
|
||||
|
||||
const nextSteps: PlanStep[] = [];
|
||||
let failed = 0;
|
||||
|
||||
for (const step of iteration.plan) {
|
||||
if (step.status !== 'planned') {
|
||||
nextSteps.push(step);
|
||||
continue;
|
||||
}
|
||||
if (step.intent.kind !== 'toolCall') {
|
||||
nextSteps.push({ ...step, status: 'skipped' });
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const result = await executeTool(step.intent.toolName, step.intent.params, aiActor);
|
||||
nextSteps.push({
|
||||
...step,
|
||||
status: result.success ? 'approved' : 'failed',
|
||||
summary: result.success ? step.summary : `${step.summary} (FEHLER: ${result.message})`,
|
||||
});
|
||||
if (!result.success) failed++;
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
nextSteps.push({ ...step, status: 'failed', summary: `${step.summary} (FEHLER: ${msg})` });
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
|
||||
const overallStatus: MissionIteration['overallStatus'] =
|
||||
nextSteps.length === 0 ? 'approved' : failed === nextSteps.length ? 'failed' : 'approved';
|
||||
|
||||
// Write the updated iteration back into the Mission record.
|
||||
// Dexie's modify() passes a mutable view; the Mission type marks
|
||||
// iterations as readonly at the TS level for observer callers —
|
||||
// cast is scoped to this mutation.
|
||||
await db
|
||||
.table<Mission>(MISSIONS_TABLE)
|
||||
.where('id')
|
||||
.equals(mission.id)
|
||||
.modify((m) => {
|
||||
const iterations = m.iterations as MissionIteration[];
|
||||
const idx = iterations.findIndex((it) => it.id === iteration.id);
|
||||
if (idx < 0) return;
|
||||
iterations[idx] = { ...iteration, plan: nextSteps, overallStatus };
|
||||
});
|
||||
|
||||
await db.table<ExecutionMarker>(MARKER_TABLE).put({
|
||||
iterationId: iteration.id,
|
||||
missionId: mission.id,
|
||||
executedAt: new Date().toISOString(),
|
||||
overallStatus,
|
||||
});
|
||||
}
|
||||
|
|
@ -661,6 +661,17 @@ db.version(29).stores({
|
|||
pendingProposals: null,
|
||||
});
|
||||
|
||||
// v30 — Local-only marker for server-iteration execution. When mana-ai
|
||||
// plans a mission iteration in the background, it syncs down with
|
||||
// source='server' and plan[].status='planned'. A sync-listener on the
|
||||
// client (data/ai/missions/server-iteration-executor.ts) runs those
|
||||
// planned tool_calls locally and flips the status. This marker table
|
||||
// guarantees idempotency across sync replays and page reloads — once
|
||||
// an iteration id lands here, the listener skips it. Never synced.
|
||||
db.version(30).stores({
|
||||
_serverIterationExecutions: 'iterationId, missionId, executedAt',
|
||||
});
|
||||
|
||||
// ─── Sync Routing ──────────────────────────────────────────
|
||||
// SYNC_APP_MAP, TABLE_TO_SYNC_NAME, TABLE_TO_APP, SYNC_NAME_TO_TABLE,
|
||||
// toSyncName() and fromSyncName() are now derived from per-module
|
||||
|
|
|
|||
|
|
@ -8,6 +8,10 @@
|
|||
import { todoReminderSource } from '$lib/modules/todo/reminder-source';
|
||||
import { startEventStore, stopEventStore } from '$lib/data/events/event-store';
|
||||
import { startMissionTick, stopMissionTick } from '$lib/data/ai/missions/setup';
|
||||
import {
|
||||
startServerIterationExecutor,
|
||||
stopServerIterationExecutor,
|
||||
} from '$lib/data/ai/missions/server-iteration-executor';
|
||||
import { initTools } from '$lib/data/tools/init';
|
||||
import { startEventBridge, stopEventBridge } from '$lib/triggers/event-bridge';
|
||||
import { startStreakTracker, stopStreakTracker } from '$lib/data/projections/streaks';
|
||||
|
|
@ -536,6 +540,9 @@
|
|||
// interval and runs any that are due. Safe idempotent; see
|
||||
// data/ai/missions/setup.ts.
|
||||
startMissionTick();
|
||||
// Apply server-planned iterations locally on sync — see
|
||||
// data/ai/missions/server-iteration-executor.ts.
|
||||
startServerIterationExecutor();
|
||||
});
|
||||
|
||||
// Restore nav collapsed state (cheap, keep inline)
|
||||
|
|
@ -650,6 +657,7 @@
|
|||
stopStreakTracker();
|
||||
stopGoalTracker();
|
||||
stopMissionTick();
|
||||
stopServerIterationExecutor();
|
||||
guestMode?.destroy();
|
||||
// Fire-and-forget — we don't need to await; the in-flight task
|
||||
// will finish in the background and the next page session will
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue