mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 19:01:08 +02:00
feat(agent-loop): M1 — policy gate + reminder channel + parallel reads
Three Claude-Code-inspired primitives for runPlannerLoop, derived from the
reverse-engineering reports in docs/reports/:
1. **Policy gate** (@mana/tool-registry) — evaluatePolicy() gates every tool
dispatch: denies admin-scope, denies destructive tools not in the user's
opt-in list, rate-limits per tool (30/60s default), flags prompt-injection
markers in freetext without blocking. Wired into mana-mcp with a
per-user rolling invocation log and POLICY_MODE env (off|log-only|enforce,
default log-only). mana-ai uses detectInjectionMarker only — tool dispatch
there is plan-only, so rate-limit/destructive checks don't apply yet.
2. **Reminder channel** (packages/shared-ai/src/planner/loop.ts) — new
reminderChannel callback in PlannerLoopInput. Called once per round with
LoopState snapshot (round, toolCallCount, usage, lastCall); returned
strings wrap in <reminder> tags and inject as transient system messages
into THIS LLM request only. Never pushed to messages[] — the Claude-Code
<system-reminder> pattern that keeps the KV-cache prefix stable.
3. **Parallel reads** (loop.ts) — isParallelSafe predicate enables
Promise.all dispatch when every tool_call in a round is parallel-safe,
in batches of PARALLEL_TOOL_BATCH_SIZE=10. Any non-safe call downgrades
the whole round to sequential. messages[] always appends in source
order, never completion order, so the debug log stays linear.
Default-off (undefined predicate) preserves pre-M1 behaviour.
Tests: 21 new in tool-registry (policy), 9 new in shared-ai (5 parallel,
4 reminder). All 74 green, type-check clean across 4 packages.
Design/plan: docs/plans/agent-loop-improvements-m1.md
Reports: docs/reports/claude-code-architecture.md,
docs/reports/mana-agent-improvements-from-claude-code.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
493db0c3b2
commit
e5d230e599
19 changed files with 2550 additions and 29 deletions
|
|
@ -20,9 +20,11 @@ export type {
|
|||
LlmCompletionRequest,
|
||||
LlmCompletionResponse,
|
||||
LlmFinishReason,
|
||||
LoopState,
|
||||
LoopStopReason,
|
||||
PlannerLoopInput,
|
||||
PlannerLoopResult,
|
||||
ReminderChannel,
|
||||
TokenUsage,
|
||||
ToolCallRequest,
|
||||
ToolResult,
|
||||
|
|
|
|||
|
|
@ -148,3 +148,302 @@ describe('runPlannerLoop', () => {
|
|||
expect(result.executedCalls).toHaveLength(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('runPlannerLoop — parallel reads', () => {
|
||||
it('runs a batch of parallel-safe tools via Promise.all', async () => {
|
||||
const llm = new MockLlmClient()
|
||||
.enqueueToolCalls([
|
||||
{ name: 'list_things', args: { i: 1 } },
|
||||
{ name: 'list_things', args: { i: 2 } },
|
||||
{ name: 'list_things', args: { i: 3 } },
|
||||
])
|
||||
.enqueueStop();
|
||||
|
||||
let concurrent = 0;
|
||||
let peakConcurrent = 0;
|
||||
let completed = 0;
|
||||
const onToolCall = async (_call: ToolCallRequest): Promise<ToolResult> => {
|
||||
concurrent++;
|
||||
peakConcurrent = Math.max(peakConcurrent, concurrent);
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
concurrent--;
|
||||
completed++;
|
||||
return { success: true, message: `done-${completed}` };
|
||||
};
|
||||
|
||||
await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
isParallelSafe: (name) => name === 'list_things',
|
||||
},
|
||||
onToolCall,
|
||||
});
|
||||
|
||||
// All three ran concurrently — peak should be 3, not 1.
|
||||
expect(peakConcurrent).toBe(3);
|
||||
});
|
||||
|
||||
it('preserves source order in messages despite parallel completion', async () => {
|
||||
const llm = new MockLlmClient()
|
||||
.enqueueToolCalls([
|
||||
{ name: 'list_things', args: { i: 'a' } },
|
||||
{ name: 'list_things', args: { i: 'b' } },
|
||||
{ name: 'list_things', args: { i: 'c' } },
|
||||
])
|
||||
.enqueueStop();
|
||||
|
||||
// Reverse completion order: first call finishes last.
|
||||
const delays: Record<string, number> = { a: 30, b: 10, c: 1 };
|
||||
const onToolCall = async (call: ToolCallRequest): Promise<ToolResult> => {
|
||||
const i = call.arguments.i as string;
|
||||
await new Promise((r) => setTimeout(r, delays[i]));
|
||||
return { success: true, message: `item-${i}` };
|
||||
};
|
||||
|
||||
const result = await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
isParallelSafe: () => true,
|
||||
},
|
||||
onToolCall,
|
||||
});
|
||||
|
||||
// executedCalls follows source order
|
||||
expect(result.executedCalls.map((ec) => ec.call.arguments.i)).toEqual(['a', 'b', 'c']);
|
||||
|
||||
// Tool messages on the NEXT LLM call are in source order too
|
||||
const toolMsgs = llm.calls[1].messages.filter((m) => m.role === 'tool');
|
||||
expect(toolMsgs.map((m) => m.content)).toEqual([
|
||||
expect.stringContaining('item-a'),
|
||||
expect.stringContaining('item-b'),
|
||||
expect.stringContaining('item-c'),
|
||||
]);
|
||||
});
|
||||
|
||||
it('falls back to sequential when any call is not parallel-safe', async () => {
|
||||
const llm = new MockLlmClient()
|
||||
.enqueueToolCalls([
|
||||
{ name: 'list_things', args: {} },
|
||||
{ name: 'create_thing', args: { title: 'x' } }, // unsafe
|
||||
{ name: 'list_things', args: {} },
|
||||
])
|
||||
.enqueueStop();
|
||||
|
||||
let concurrent = 0;
|
||||
let peakConcurrent = 0;
|
||||
const onToolCall = async (): Promise<ToolResult> => {
|
||||
concurrent++;
|
||||
peakConcurrent = Math.max(peakConcurrent, concurrent);
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
concurrent--;
|
||||
return { success: true, message: 'ok' };
|
||||
};
|
||||
|
||||
await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
isParallelSafe: (name) => name === 'list_things',
|
||||
},
|
||||
onToolCall,
|
||||
});
|
||||
|
||||
// Mixed batch ran sequentially — peak concurrency stayed at 1.
|
||||
expect(peakConcurrent).toBe(1);
|
||||
});
|
||||
|
||||
it('batches more than PARALLEL_TOOL_BATCH_SIZE calls', async () => {
|
||||
const N = 15; // > 10-call ceiling
|
||||
const llm = new MockLlmClient()
|
||||
.enqueueToolCalls(Array.from({ length: N }, (_, i) => ({ name: 'list_things', args: { i } })))
|
||||
.enqueueStop();
|
||||
|
||||
let concurrent = 0;
|
||||
let peakConcurrent = 0;
|
||||
const onToolCall = async (): Promise<ToolResult> => {
|
||||
concurrent++;
|
||||
peakConcurrent = Math.max(peakConcurrent, concurrent);
|
||||
await new Promise((r) => setTimeout(r, 15));
|
||||
concurrent--;
|
||||
return { success: true, message: 'ok' };
|
||||
};
|
||||
|
||||
const result = await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
isParallelSafe: () => true,
|
||||
},
|
||||
onToolCall,
|
||||
});
|
||||
|
||||
// Capped at the batch size — the 11th onwards had to wait.
|
||||
expect(peakConcurrent).toBeLessThanOrEqual(10);
|
||||
// All still executed, all in source order.
|
||||
expect(result.executedCalls).toHaveLength(N);
|
||||
expect(result.executedCalls.map((ec) => ec.call.arguments.i)).toEqual(
|
||||
Array.from({ length: N }, (_, i) => i)
|
||||
);
|
||||
});
|
||||
|
||||
it('stays sequential when isParallelSafe is not provided (pre-M1 default)', async () => {
|
||||
const llm = new MockLlmClient()
|
||||
.enqueueToolCalls([
|
||||
{ name: 'list_things', args: {} },
|
||||
{ name: 'list_things', args: {} },
|
||||
])
|
||||
.enqueueStop();
|
||||
|
||||
let concurrent = 0;
|
||||
let peakConcurrent = 0;
|
||||
const onToolCall = async (): Promise<ToolResult> => {
|
||||
concurrent++;
|
||||
peakConcurrent = Math.max(peakConcurrent, concurrent);
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
concurrent--;
|
||||
return { success: true, message: 'ok' };
|
||||
};
|
||||
|
||||
await runPlannerLoop({
|
||||
llm,
|
||||
input: { systemPrompt: 's', userPrompt: 'u', tools, model: 'm' },
|
||||
onToolCall,
|
||||
});
|
||||
|
||||
expect(peakConcurrent).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('runPlannerLoop — reminderChannel', () => {
|
||||
it('injects reminders as transient system messages on the LLM call', async () => {
|
||||
const llm = new MockLlmClient().enqueueStop('done');
|
||||
const result = await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
reminderChannel: () => ['budget 80%', 'mission overdue'],
|
||||
},
|
||||
onToolCall: vi.fn(),
|
||||
});
|
||||
|
||||
// The request messages the mock saw must include the reminders
|
||||
// AFTER the user turn, each wrapped in <reminder> tags.
|
||||
const seenByLlm = llm.calls[0].messages;
|
||||
expect(seenByLlm).toHaveLength(4); // system + user + 2 reminders
|
||||
expect(seenByLlm[0].role).toBe('system');
|
||||
expect(seenByLlm[0].content).toBe('s');
|
||||
expect(seenByLlm[1].role).toBe('user');
|
||||
expect(seenByLlm[2].role).toBe('system');
|
||||
expect(seenByLlm[2].content).toBe('<reminder>budget 80%</reminder>');
|
||||
expect(seenByLlm[3].role).toBe('system');
|
||||
expect(seenByLlm[3].content).toBe('<reminder>mission overdue</reminder>');
|
||||
|
||||
// And the persisted history must NOT contain them.
|
||||
expect(result.messages.find((m) => m.content?.includes('<reminder>'))).toBeUndefined();
|
||||
});
|
||||
|
||||
it('is called per round with fresh state — round 2 does not see round 1 reminders', async () => {
|
||||
const llm = new MockLlmClient()
|
||||
.enqueueToolCalls([{ name: 'list_things', args: {} }])
|
||||
.enqueueStop('done');
|
||||
|
||||
const channelCalls: Array<{ round: number; reminders: string[] }> = [];
|
||||
const channel = vi.fn((state) => {
|
||||
const reminders = [`round-${state.round}`];
|
||||
channelCalls.push({ round: state.round, reminders });
|
||||
return reminders;
|
||||
});
|
||||
|
||||
await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
reminderChannel: channel,
|
||||
},
|
||||
onToolCall: async () => ({ success: true, message: 'ok' }),
|
||||
});
|
||||
|
||||
expect(channel).toHaveBeenCalledTimes(2);
|
||||
expect(channelCalls).toEqual([
|
||||
{ round: 1, reminders: ['round-1'] },
|
||||
{ round: 2, reminders: ['round-2'] },
|
||||
]);
|
||||
|
||||
// Round 2's request must have ONLY round-2's reminder, not round-1's.
|
||||
const round2Seen = llm.calls[1].messages;
|
||||
const reminders = round2Seen.filter((m) => m.content?.includes('<reminder>'));
|
||||
expect(reminders).toHaveLength(1);
|
||||
expect(reminders[0].content).toBe('<reminder>round-2</reminder>');
|
||||
});
|
||||
|
||||
it('surfaces loop state — toolCallCount and lastCall — to the channel', async () => {
|
||||
const llm = new MockLlmClient()
|
||||
.enqueueToolCalls([{ name: 'list_things', args: {} }])
|
||||
.enqueueToolCalls([{ name: 'create_thing', args: { title: 'x' } }])
|
||||
.enqueueStop('done');
|
||||
|
||||
const snapshots: Array<{ round: number; toolCallCount: number; lastName?: string }> = [];
|
||||
await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
reminderChannel: (state) => {
|
||||
snapshots.push({
|
||||
round: state.round,
|
||||
toolCallCount: state.toolCallCount,
|
||||
lastName: state.lastCall?.call.name,
|
||||
});
|
||||
return [];
|
||||
},
|
||||
},
|
||||
onToolCall: async () => ({ success: true, message: 'ok' }),
|
||||
});
|
||||
|
||||
expect(snapshots).toEqual([
|
||||
{ round: 1, toolCallCount: 0, lastName: undefined },
|
||||
{ round: 2, toolCallCount: 1, lastName: 'list_things' },
|
||||
{ round: 3, toolCallCount: 2, lastName: 'create_thing' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('empty reminders array leaves the request unchanged', async () => {
|
||||
const llm = new MockLlmClient().enqueueStop('done');
|
||||
await runPlannerLoop({
|
||||
llm,
|
||||
input: {
|
||||
systemPrompt: 's',
|
||||
userPrompt: 'u',
|
||||
tools,
|
||||
model: 'm',
|
||||
reminderChannel: () => [],
|
||||
},
|
||||
onToolCall: vi.fn(),
|
||||
});
|
||||
|
||||
const seenByLlm = llm.calls[0].messages;
|
||||
expect(seenByLlm).toHaveLength(2); // just system + user
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -69,6 +69,38 @@ export interface LlmClient {
|
|||
|
||||
// ─── Loop input / result ────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Transient loop state surfaced to the reminderChannel. The reminder
|
||||
* callback is pure — it reads this snapshot and returns hints; it does
|
||||
* not mutate anything.
|
||||
*/
|
||||
export interface LoopState {
|
||||
/** 1-based round index for the CURRENT LLM call (before it runs). */
|
||||
readonly round: number;
|
||||
/** Number of tool calls executed across all prior rounds. */
|
||||
readonly toolCallCount: number;
|
||||
/** Accumulated tokens reported by the provider, up to (but not
|
||||
* including) the current round's call. Zero when the provider
|
||||
* hasn't reported usage. */
|
||||
readonly usage: TokenUsage;
|
||||
/** The most recent ExecutedCall, or undefined in round 1. Handy for
|
||||
* "the last tool failed — warn the LLM" producers. */
|
||||
readonly lastCall?: ExecutedCall;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback that yields transient system-message strings to attach to the
|
||||
* NEXT LLM request only. Returned strings are wrapped in `<reminder>…
|
||||
* </reminder>` tags and injected as system messages AFTER the persistent
|
||||
* `messages` history. They are NEVER written back to `messages[]` and
|
||||
* therefore NEVER appear in `PlannerLoopResult.messages`.
|
||||
*
|
||||
* This is the Claude-Code `<system-reminder>` pattern: steering the model
|
||||
* per-turn without polluting the persisted conversation log or
|
||||
* invalidating the provider's KV-cache on stable prefixes.
|
||||
*/
|
||||
export type ReminderChannel = (state: LoopState) => readonly string[];
|
||||
|
||||
export interface PlannerLoopInput {
|
||||
readonly systemPrompt: string;
|
||||
readonly userPrompt: string;
|
||||
|
|
@ -82,8 +114,29 @@ export interface PlannerLoopInput {
|
|||
/** Hard ceiling on planner rounds. Each round = one LLM call plus
|
||||
* whatever tool executions its output triggered. Defaults to 5. */
|
||||
readonly maxRounds?: number;
|
||||
/** Optional per-round reminder producer — see ReminderChannel docs. */
|
||||
readonly reminderChannel?: ReminderChannel;
|
||||
/**
|
||||
* Predicate that decides whether a tool is safe to execute in parallel
|
||||
* with other tools of the same stripe. Claude-Code `gW5` pattern: when
|
||||
* every tool_call in a round is parallel-safe, they run via Promise.all
|
||||
* in batches of 10; if any call is NOT parallel-safe, the whole batch
|
||||
* falls back to sequential (preserves ordering invariants for
|
||||
* write-after-read chains).
|
||||
*
|
||||
* Default: `() => false` → fully sequential, matching pre-M1 behaviour.
|
||||
*
|
||||
* The predicate is called once per tool_call per round, so cheap
|
||||
* constant-time lookups are expected (registry hit, name-prefix check).
|
||||
*/
|
||||
readonly isParallelSafe?: (toolName: string) => boolean;
|
||||
}
|
||||
|
||||
/** Max concurrent tool executions per round. Mirrors Claude Code's gW5
|
||||
* ceiling. Keeps tail latency bounded when the LLM requests many reads
|
||||
* at once and protects downstream services from unbounded fan-out. */
|
||||
export const PARALLEL_TOOL_BATCH_SIZE = 10;
|
||||
|
||||
export interface ExecutedCall {
|
||||
readonly round: number;
|
||||
readonly call: ToolCallRequest;
|
||||
|
|
@ -142,8 +195,35 @@ export async function runPlannerLoop(opts: {
|
|||
|
||||
while (rounds < maxRounds) {
|
||||
rounds++;
|
||||
|
||||
// Per-round reminder injection: ask the channel for transient
|
||||
// hints, wrap each in <reminder> tags, and prepend them as system
|
||||
// messages to THIS request only. Nothing gets pushed to `messages`
|
||||
// — the reminders are ephemeral steering, not conversation.
|
||||
let requestMessages: readonly ChatMessage[] = messages;
|
||||
if (input.reminderChannel) {
|
||||
const state: LoopState = {
|
||||
round: rounds,
|
||||
toolCallCount: executedCalls.length,
|
||||
usage: {
|
||||
promptTokens,
|
||||
completionTokens,
|
||||
totalTokens: promptTokens + completionTokens,
|
||||
},
|
||||
lastCall: executedCalls[executedCalls.length - 1],
|
||||
};
|
||||
const reminders = input.reminderChannel(state);
|
||||
if (reminders.length > 0) {
|
||||
const reminderMessages: ChatMessage[] = reminders.map((text) => ({
|
||||
role: 'system',
|
||||
content: `<reminder>${text}</reminder>`,
|
||||
}));
|
||||
requestMessages = [...messages, ...reminderMessages];
|
||||
}
|
||||
}
|
||||
|
||||
const response = await llm.complete({
|
||||
messages,
|
||||
messages: requestMessages,
|
||||
tools: toolSpecs,
|
||||
model: input.model,
|
||||
temperature: input.temperature,
|
||||
|
|
@ -169,22 +249,56 @@ export async function runPlannerLoop(opts: {
|
|||
break;
|
||||
}
|
||||
|
||||
// Execute each tool_call sequentially. Parallel execution is a
|
||||
// perfectly valid optimisation for pure-read tools but we keep
|
||||
// order here so the message log tells a linear story when the
|
||||
// user debugs a failure.
|
||||
for (const call of response.toolCalls) {
|
||||
const result = await onToolCall(call);
|
||||
executedCalls.push({ round: rounds, call, result });
|
||||
messages.push({
|
||||
role: 'tool',
|
||||
toolCallId: call.id,
|
||||
content: JSON.stringify({
|
||||
success: result.success,
|
||||
message: result.message,
|
||||
...(result.data !== undefined ? { data: result.data } : {}),
|
||||
}),
|
||||
});
|
||||
// Tool execution.
|
||||
//
|
||||
// Sequential by default. When the caller supplies `isParallelSafe`
|
||||
// and EVERY call in this round passes it, we dispatch in batches
|
||||
// of PARALLEL_TOOL_BATCH_SIZE via Promise.all. A single unsafe
|
||||
// call in the batch downgrades the whole round to sequential —
|
||||
// this preserves semantics for write-after-read chains without
|
||||
// pushing the decision onto the model.
|
||||
//
|
||||
// In both modes we append to `messages` in the LLM's original
|
||||
// call order, not completion order, so the debug-log stays linear.
|
||||
const calls = response.toolCalls;
|
||||
const allParallelSafe =
|
||||
!!input.isParallelSafe &&
|
||||
calls.length > 1 &&
|
||||
calls.every((c) => input.isParallelSafe!(c.name));
|
||||
|
||||
if (allParallelSafe) {
|
||||
for (let i = 0; i < calls.length; i += PARALLEL_TOOL_BATCH_SIZE) {
|
||||
const batch = calls.slice(i, i + PARALLEL_TOOL_BATCH_SIZE);
|
||||
const results = await Promise.all(batch.map((call) => onToolCall(call)));
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const call = batch[j];
|
||||
const result = results[j];
|
||||
executedCalls.push({ round: rounds, call, result });
|
||||
messages.push({
|
||||
role: 'tool',
|
||||
toolCallId: call.id,
|
||||
content: JSON.stringify({
|
||||
success: result.success,
|
||||
message: result.message,
|
||||
...(result.data !== undefined ? { data: result.data } : {}),
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const call of calls) {
|
||||
const result = await onToolCall(call);
|
||||
executedCalls.push({ round: rounds, call, result });
|
||||
messages.push({
|
||||
role: 'tool',
|
||||
toolCallId: call.id,
|
||||
content: JSON.stringify({
|
||||
success: result.success,
|
||||
message: result.message,
|
||||
...(result.data !== undefined ? { data: result.data } : {}),
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// If the round limit is about to hit, surface it as the reason —
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue