managarten/services/mana-mcp/src/transport.ts
Till JS e5d230e599 feat(agent-loop): M1 — policy gate + reminder channel + parallel reads
Three Claude-Code-inspired primitives for runPlannerLoop, derived from the
reverse-engineering reports in docs/reports/:

1. **Policy gate** (@mana/tool-registry) — evaluatePolicy() gates every tool
   dispatch: denies admin-scope, denies destructive tools not in the user's
   opt-in list, rate-limits per tool (30/60s default), flags prompt-injection
   markers in freetext without blocking. Wired into mana-mcp with a
   per-user rolling invocation log and POLICY_MODE env (off|log-only|enforce,
   default log-only). mana-ai uses detectInjectionMarker only — tool dispatch
   there is plan-only, so rate-limit/destructive checks don't apply yet.

2. **Reminder channel** (packages/shared-ai/src/planner/loop.ts) — new
   reminderChannel callback in PlannerLoopInput. Called once per round with
   LoopState snapshot (round, toolCallCount, usage, lastCall); returned
   strings wrap in <reminder> tags and inject as transient system messages
   into THIS LLM request only. Never pushed to messages[] — the Claude-Code
   <system-reminder> pattern that keeps the KV-cache prefix stable.

3. **Parallel reads** (loop.ts) — isParallelSafe predicate enables
   Promise.all dispatch when every tool_call in a round is parallel-safe,
   in batches of PARALLEL_TOOL_BATCH_SIZE=10. Any non-safe call downgrades
   the whole round to sequential. messages[] always appends in source
   order, never completion order, so the debug log stays linear.
   Default-off (undefined predicate) preserves pre-M1 behaviour.

Tests: 21 new in tool-registry (policy), 9 new in shared-ai (5 parallel,
4 reminder). All 74 green, type-check clean across 4 packages.

Design/plan: docs/plans/agent-loop-improvements-m1.md
Reports: docs/reports/claude-code-architecture.md,
         docs/reports/mana-agent-improvements-from-claude-code.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 13:56:40 +02:00

80 lines
2.5 KiB
TypeScript

/**
* Streamable HTTP transport handler.
*
* Pattern lifted from apps/api/src/mcp/server.ts (the Mana-internal MCP
* endpoint), but with per-request auth — every session is created against
* a verified user, and sessions are scoped to that user for their lifetime.
*
* Lifecycle:
* POST /mcp (no session id) → initialize, returns Mcp-Session-Id header
* POST /mcp (with session id) → JSON-RPC message in
* GET /mcp (with session id) → SSE stream out
* DELETE /mcp (with session id) → close
*/
import { WebStandardStreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js';
import { createMcpServerForUser } from './mcp-adapter.ts';
import type { VerifiedUser } from './auth.ts';
import type { Config } from './config.ts';
interface SessionEntry {
transport: WebStandardStreamableHTTPServerTransport;
userId: string;
}
const sessions = new Map<string, SessionEntry>();
export async function handleMcpRequest(
req: Request,
user: VerifiedUser,
config: Config
): Promise<Response> {
const sessionId = req.headers.get('mcp-session-id');
// Existing session — must belong to the same user.
if (sessionId && sessions.has(sessionId)) {
const entry = sessions.get(sessionId)!;
if (entry.userId !== user.userId) {
return new Response(JSON.stringify({ error: 'Session belongs to a different user' }), {
status: 403,
headers: { 'content-type': 'application/json' },
});
}
return entry.transport.handleRequest(req);
}
// New session: only POST without session id is a valid initialization.
if (req.method === 'POST' && !sessionId) {
const transport = new WebStandardStreamableHTTPServerTransport({
sessionIdGenerator: () => crypto.randomUUID(),
onsessioninitialized: (id) => {
sessions.set(id, { transport, userId: user.userId });
},
onsessionclosed: (id) => {
sessions.delete(id);
},
});
const server = createMcpServerForUser(user, config);
await server.connect(transport);
return transport.handleRequest(req);
}
if (sessionId && !sessions.has(sessionId)) {
return new Response(JSON.stringify({ error: 'Session not found' }), {
status: 404,
headers: { 'content-type': 'application/json' },
});
}
return new Response(JSON.stringify({ error: 'Bad request' }), {
status: 400,
headers: { 'content-type': 'application/json' },
});
}
/** Test-only — sessions accumulate across requests otherwise. */
export function __resetSessionsForTests(): void {
sessions.clear();
}