feat(mcp): Phase 2 — real DB operations for tool execution

Implement actual sync_changes reads and writes for MCP tool calls:

- sync-db.ts: Connection to mana_sync DB, RLS-scoped withUser(),
  readLatestRecords() for replaying sync state, writeRecord() for
  creating sync_changes entries
- executor.ts: 10 tool handlers implemented:
  - Reads: list_tasks, get_task_stats, list_notes, get_todays_events,
    get_contacts, get_habits
  - Writes: create_task, complete_task, create_note, create_contact
  - Remaining tools return helpful "not yet implemented" message
- server.ts: userId from auth context bound into MCP session via closure
- index.ts: typed Hono app with AuthVariables

Write pattern matches mana-ai: INSERT into sync_changes with
actor={kind:'system', source:'mcp-tool'}, client_id='mcp-server'.
Records appear on user devices on next sync cycle.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-16 13:46:06 +02:00
parent 10acabfed6
commit e969324cc8
4 changed files with 391 additions and 53 deletions

View file

@ -13,6 +13,7 @@ import {
errorHandler,
notFoundHandler,
rateLimitMiddleware,
type AuthVariables,
} from '@mana/shared-hono';
// MCP server
@ -41,7 +42,7 @@ import { whoRoutes } from './modules/who/routes';
const PORT = parseInt(process.env.PORT || '3060', 10);
const CORS_ORIGINS = (process.env.CORS_ORIGINS || 'http://localhost:5173').split(',');
const app = new Hono();
const app = new Hono<{ Variables: AuthVariables }>();
// ─── Global Middleware ──────────────────────────────────────
app.onError(errorHandler);
@ -53,7 +54,7 @@ app.use('/api/*', authMiddleware());
// ─── MCP Endpoint ──────────────────────────────────────────
// Streamable HTTP transport: POST (messages), GET (SSE stream), DELETE (close)
app.all('/api/v1/mcp', (c) => handleMcpRequest(c.req.raw));
app.all('/api/v1/mcp', (c) => handleMcpRequest(c.req.raw, c.get('userId')));
// ─── Module Routes ──────────────────────────────────────────
app.route('/api/v1/calendar', calendarRoutes);

View file

@ -1,12 +1,17 @@
/**
* MCP Tool Executor handles tools/call requests by routing to module
* handlers or returning status messages.
* MCP Tool Executor handles tools/call requests by routing to
* sync database reads and writes.
*
* Phase 1: Read-only tools query the sync database directly.
* Phase 2: Write tools will insert into sync_changes (like mana-ai does).
* Read tools query sync_changes to reconstruct current user state.
* Write tools INSERT into sync_changes records appear on the user's
* devices on their next sync cycle.
*
* Uses the same sync_changes pattern as mana-ai's iteration-writer,
* with actor attribution as 'system:mcp'.
*/
import { AI_TOOL_CATALOG_BY_NAME } from '@mana/shared-ai';
import { readLatestRecords, writeRecord } from './sync-db';
export interface McpToolResult {
[key: string]: unknown;
@ -14,54 +19,283 @@ export interface McpToolResult {
isError?: boolean;
}
// ── Tool handler registry ──────────────────────────────────────
type ToolHandler = (args: Record<string, unknown>, userId: string) => Promise<McpToolResult>;
const handlers = new Map<string, ToolHandler>();
function register(name: string, handler: ToolHandler): void {
handlers.set(name, handler);
}
// ── Helpers ────────────────────────────────────────────────────
function ok(text: string, data?: unknown): McpToolResult {
return {
content: [{ type: 'text', text: data ? `${text}\n\n${JSON.stringify(data, null, 2)}` : text }],
};
}
function err(text: string): McpToolResult {
return { content: [{ type: 'text', text }], isError: true };
}
function nowIso(): string {
return new Date().toISOString();
}
function fieldTs(fields: string[]): Record<string, string> {
const ts = nowIso();
return Object.fromEntries(fields.map((f) => [f, ts]));
}
// ── Todo tools ─────────────────────────────────────────────────
register('list_tasks', async (_args, userId) => {
const records = await readLatestRecords(userId, 'todo', 'tasks');
const filter = (_args.filter as string) ?? 'open';
const limit = (_args.limit as number) ?? 20;
const today = new Date().toISOString().split('T')[0];
let tasks = records.map((r) => ({
id: r.id as string,
title: r.title as string,
dueDate: r.dueDate as string | undefined,
priority: r.priority as string | undefined,
isCompleted: !!r.isCompleted,
}));
if (filter === 'open') tasks = tasks.filter((t) => !t.isCompleted);
else if (filter === 'completed') tasks = tasks.filter((t) => t.isCompleted);
else if (filter === 'overdue')
tasks = tasks.filter((t) => !t.isCompleted && t.dueDate != null && t.dueDate < today);
else if (filter === 'today') tasks = tasks.filter((t) => !t.isCompleted && t.dueDate === today);
const list = tasks.slice(0, limit);
if (list.length === 0) return ok(`Keine ${filter} Tasks.`);
const lines = list.map(
(t) =>
`- [${t.id}] ${t.title}${t.dueDate ? ` (fällig ${t.dueDate})` : ''}${t.priority === 'high' ? ' [HOHE PRIO]' : ''}`
);
return ok(`${list.length} Tasks (${filter}):\n${lines.join('\n')}`, list);
});
register('get_task_stats', async (_args, userId) => {
const records = await readLatestRecords(userId, 'todo', 'tasks');
const today = new Date().toISOString().split('T')[0];
const total = records.length;
const completed = records.filter((r) => r.isCompleted).length;
const overdue = records.filter(
(r) => !r.isCompleted && r.dueDate != null && (r.dueDate as string) < today
).length;
const dueToday = records.filter((r) => !r.isCompleted && (r.dueDate as string) === today).length;
return ok(
`${total} Tasks: ${completed} erledigt, ${overdue} überfällig, ${dueToday} heute fällig`,
{ total, completed, overdue, dueToday, open: total - completed }
);
});
register('create_task', async (args, userId) => {
const taskId = crypto.randomUUID();
const now = nowIso();
const data = {
id: taskId,
userId,
title: args.title as string,
description: (args.description as string) ?? '',
dueDate: (args.dueDate as string) ?? null,
priority: (args.priority as string) ?? 'medium',
isCompleted: false,
order: 0,
createdAt: now,
updatedAt: now,
};
await writeRecord(userId, 'todo', 'tasks', taskId, 'insert', data, fieldTs(Object.keys(data)));
return ok(`Task "${args.title}" erstellt (ID: ${taskId}). Erscheint beim nächsten Sync.`, {
id: taskId,
});
});
register('complete_task', async (args, userId) => {
const taskId = args.taskId as string;
const now = nowIso();
await writeRecord(
userId,
'todo',
'tasks',
taskId,
'update',
{
isCompleted: true,
completedAt: now,
updatedAt: now,
},
fieldTs(['isCompleted', 'completedAt', 'updatedAt'])
);
return ok(`Task ${taskId} als erledigt markiert.`);
});
// ── Notes tools ────────────────────────────────────────────────
register('list_notes', async (args, userId) => {
const records = await readLatestRecords(userId, 'notes', 'notes');
const limit = (args.limit as number) ?? 30;
const query = (args.query as string)?.toLowerCase();
let notes = records.map((r) => ({
id: r.id as string,
title: (r.title as string) ?? '(Ohne Titel)',
excerpt: ((r.content as string) ?? '').slice(0, 100),
}));
if (query) {
notes = notes.filter(
(n) => n.title.toLowerCase().includes(query) || n.excerpt.toLowerCase().includes(query)
);
}
const list = notes.slice(0, limit);
if (list.length === 0) return ok('Keine Notizen gefunden.');
const lines = list.map((n) => `- [${n.id}] ${n.title}: ${n.excerpt}`);
return ok(`${list.length} Notizen:\n${lines.join('\n')}`, list);
});
register('create_note', async (args, userId) => {
const noteId = crypto.randomUUID();
const now = nowIso();
const data = {
id: noteId,
userId,
title: (args.title as string) ?? '',
content: (args.content as string) ?? '',
createdAt: now,
updatedAt: now,
};
await writeRecord(userId, 'notes', 'notes', noteId, 'insert', data, fieldTs(Object.keys(data)));
return ok(`Notiz "${data.title || '(Ohne Titel)'}" erstellt (ID: ${noteId}).`, { id: noteId });
});
// ── Calendar tools ─────────────────────────────────────────────
register('get_todays_events', async (_args, userId) => {
const records = await readLatestRecords(userId, 'calendar', 'timeBlocks');
const today = new Date().toISOString().split('T')[0];
const events = records
.filter(
(r) =>
r.type === 'event' &&
r.sourceModule === 'calendar' &&
(r.startDate as string)?.startsWith(today)
)
.map((r) => ({
id: r.sourceId as string,
title: r.title as string,
startTime: r.startDate as string,
endTime: r.endDate as string,
}))
.sort((a, b) => a.startTime.localeCompare(b.startTime));
if (events.length === 0) return ok('Keine Termine heute.');
const lines = events.map((e) => `- ${e.startTime.slice(11, 16)} ${e.title}`);
return ok(`${events.length} Termine heute:\n${lines.join('\n')}`, events);
});
// ── Contacts tools ─────────────────────────────────────────────
register('get_contacts', async (_args, userId) => {
const records = await readLatestRecords(userId, 'contacts', 'contacts');
const contacts = records
.filter((r) => !r.isArchived)
.map((r) => ({
id: r.id as string,
name: [r.firstName, r.lastName].filter(Boolean).join(' '),
company: r.company as string | undefined,
email: r.email as string | undefined,
}));
if (contacts.length === 0) return ok('Keine Kontakte.');
return ok(`${contacts.length} Kontakte`, contacts);
});
register('create_contact', async (args, userId) => {
const contactId = crypto.randomUUID();
const now = nowIso();
const data = {
id: contactId,
userId,
firstName: args.firstName as string,
lastName: (args.lastName as string) ?? '',
email: (args.email as string) ?? '',
phone: (args.phone as string) ?? '',
company: (args.company as string) ?? '',
notes: (args.notes as string) ?? '',
createdAt: now,
updatedAt: now,
};
await writeRecord(
userId,
'contacts',
'contacts',
contactId,
'insert',
data,
fieldTs(Object.keys(data))
);
return ok(`Kontakt "${args.firstName}" erstellt (ID: ${contactId}).`, { id: contactId });
});
// ── Habits tools ───────────────────────────────────────────────
register('get_habits', async (_args, userId) => {
const records = await readLatestRecords(userId, 'habits', 'habits');
const habits = records.map((r) => ({
id: r.id as string,
title: r.title as string,
icon: r.icon as string,
}));
if (habits.length === 0) return ok('Keine Habits.');
return ok(`${habits.length} Habits`, habits);
});
// ── Entry point ────────────────────────────────────────────────
/**
* Execute an MCP tool call. Returns MCP-formatted result content.
*
* Phase 1 scope:
* - All tools are listed (via tools/list from AI_TOOL_CATALOG)
* - Write tools return a "coming soon" message
* - Read tools are planned for Phase 2 (requires sync DB queries)
* Execute an MCP tool call. Routes to registered handlers or returns
* a "not yet implemented" message for tools without a handler.
*/
export async function executeMcpTool(
toolName: string,
args: Record<string, unknown>,
_userId: string
userId: string
): Promise<McpToolResult> {
const schema = AI_TOOL_CATALOG_BY_NAME.get(toolName);
if (!schema) {
return {
content: [{ type: 'text', text: `Unknown tool: ${toolName}` }],
isError: true,
};
if (!schema) return err(`Unknown tool: ${toolName}`);
const handler = handlers.get(toolName);
if (handler) {
try {
return await handler(args, userId);
} catch (error) {
const msg = error instanceof Error ? error.message : String(error);
return err(`Tool "${toolName}" failed: ${msg}`);
}
}
// Phase 1: all tools return a descriptive message about what they will do.
// Phase 2 will implement actual DB reads and sync_changes writes.
if (schema.defaultPolicy === 'auto') {
return {
content: [
{
type: 'text',
text:
`[Mana MCP] Read-tool "${toolName}" (${schema.module}) acknowledged.\n` +
// Fallback for tools without a handler yet
return ok(
`[Mana MCP] Tool "${toolName}" (${schema.module}) ist noch nicht serverseitig implementiert.\n` +
`Args: ${JSON.stringify(args)}\n` +
`Note: Server-side execution coming in Phase 2. ` +
`This tool will query the sync database for user data.`,
},
],
};
}
return {
content: [
{
type: 'text',
text:
`[Mana MCP] Write-tool "${toolName}" (${schema.module}) acknowledged.\n` +
`Args: ${JSON.stringify(args)}\n` +
`Note: Server-side execution coming in Phase 2. ` +
`This tool will write to the sync database and appear on your devices.`,
},
],
};
`Nutze die Mana-App unter mana.how für diese Aktion.`
);
}

View file

@ -39,21 +39,22 @@ function toZodShape(
/**
* Create a new McpServer instance with all Mana tools registered.
*/
function createMcpServer(): McpServer {
function createMcpServer(userId: string): McpServer {
const server = new McpServer({ name: 'mana', version: '1.0.0' }, { capabilities: { tools: {} } });
// Register all 29 tools from the AI Tool Catalog
// Register all 29 tools from the AI Tool Catalog.
// userId is bound via closure — each MCP session belongs to one user.
for (const tool of AI_TOOL_CATALOG) {
const zodShape = toZodShape(tool.parameters);
const hasParams = Object.keys(zodShape).length > 0;
if (hasParams) {
server.tool(tool.name, tool.description, zodShape, async (args) => {
return executeMcpTool(tool.name, args, 'mcp-user');
return executeMcpTool(tool.name, args, userId);
});
} else {
server.tool(tool.name, tool.description, async () => {
return executeMcpTool(tool.name, {}, 'mcp-user');
return executeMcpTool(tool.name, {}, userId);
});
}
}
@ -71,7 +72,7 @@ const sessions = new Map<string, WebStandardStreamableHTTPServerTransport>();
* initialization, and subsequent requests must carry it via the
* `Mcp-Session-Id` header.
*/
export async function handleMcpRequest(req: Request): Promise<Response> {
export async function handleMcpRequest(req: Request, userId: string): Promise<Response> {
const sessionId = req.headers.get('mcp-session-id');
// Existing session — route to its transport
@ -92,7 +93,7 @@ export async function handleMcpRequest(req: Request): Promise<Response> {
},
});
const server = createMcpServer();
const server = createMcpServer(userId);
await server.connect(transport);
return transport.handleRequest(req);

102
apps/api/src/mcp/sync-db.ts Normal file
View file

@ -0,0 +1,102 @@
/**
* Sync database connection for MCP tool execution.
*
* MCP tools read and write via the mana_sync database (the shared sync
* event log), not mana_platform. This matches the local-first pattern:
* writes go to sync_changes, clients pick them up on next sync.
*
* The connection uses the same env var as mana-ai: SYNC_DATABASE_URL.
*/
import postgres from 'postgres';
const SYNC_DATABASE_URL =
process.env.SYNC_DATABASE_URL ?? 'postgresql://mana:devpassword@localhost:5432/mana_sync';
let syncPool: ReturnType<typeof postgres> | null = null;
/** Returns the shared sync database connection pool. */
export function getSyncConnection() {
if (!syncPool) {
syncPool = postgres(SYNC_DATABASE_URL, { max: 5, idle_timeout: 30 });
}
return syncPool;
}
export type SyncSql = ReturnType<typeof postgres>;
/**
* Run a callback within an RLS-scoped transaction for the given user.
* Sets `app.current_user_id` so the sync_changes RLS policy allows
* reads and writes only for that user's data.
*/
export async function withUser<T>(
userId: string,
fn: (tx: postgres.TransactionSql<Record<string, never>>) => Promise<T>
): Promise<T> {
if (!userId) throw new Error('withUser: empty userId');
const sql = getSyncConnection();
return sql.begin(async (tx) => {
await tx`SELECT set_config('app.current_user_id', ${userId}, true)`;
return fn(tx);
}) as Promise<T>;
}
/**
* Read the latest version of all records for a user + app + table from
* sync_changes. Applies field-level LWW to reconstruct current state.
*
* This is the server-side equivalent of the Dexie liveQuery: it replays
* the sync_changes log to build the latest record versions. For small
* datasets this is fine; for large tables we'll need materialized
* snapshots (like mana-ai's mission_snapshots).
*/
export async function readLatestRecords(
userId: string,
appId: string,
tableName: string
): Promise<Record<string, unknown>[]> {
const sql = getSyncConnection();
// Get the latest change per record_id (by created_at desc), then
// reconstruct the record. Only include non-deleted records.
const rows = await sql<{ record_id: string; data: Record<string, unknown>; op: string }[]>`
SELECT DISTINCT ON (record_id)
record_id, data, op
FROM sync_changes
WHERE user_id = ${userId}
AND app_id = ${appId}
AND table_name = ${tableName}
ORDER BY record_id, created_at DESC
`;
// Filter out deleted records and records with delete ops
return rows
.filter((r) => r.op !== 'delete' && r.data && !(r.data as Record<string, unknown>).deletedAt)
.map((r) => r.data);
}
/**
* Write a new record via sync_changes INSERT. The record will appear
* on the user's devices on their next sync cycle.
*/
export async function writeRecord(
userId: string,
appId: string,
tableName: string,
recordId: string,
op: 'insert' | 'update' | 'delete',
data: Record<string, unknown>,
fieldTimestamps: Record<string, string>
): Promise<void> {
await withUser(userId, async (tx) => {
await tx`
INSERT INTO sync_changes
(app_id, table_name, record_id, user_id, op, data, field_timestamps, client_id, schema_version, actor)
VALUES
(${appId}, ${tableName}, ${recordId}, ${userId}, ${op},
${tx.json(data as never)}, ${tx.json(fieldTimestamps as never)},
'mcp-server', 1,
${tx.json({ kind: 'system', principalId: 'system:mcp', displayName: 'MCP Server' } as never)})
`;
});
}