managarten/services/mana-ai/src/db/migrate.ts
Till JS 0af50f0166 feat(mana-ai): agent-aware tick loop + snapshot projection (Phase 3)
Third phase of the Multi-Agent Workbench. The background mission
runner now respects the owning Agent: agent state gates whether
a mission runs, concurrency is capped per-agent, and server-produced
iterations carry the agent's identity as their Actor.

Data layer:
- db/migrate.ts: new mana_ai.agent_snapshots table (mirrors
  mission_snapshots) with indexes on (user_id, last_applied_at) and
  a partial index on active agents.
- db/agents-projection.ts: refreshAgentSnapshots (incremental LWW
  replay over sync_changes appId='ai' table='agents') +
  loadActiveAgents / loadAgent helpers. mergeRaw exported for tests.
- db/missions-projection.ts: ServerMission.agentId + projection
  reads the JSONB field (undefined for legacy missions).

Tick integration (cron/tick.ts):
- Refreshes both snapshot tables on every pass (parallel).
- Per-user in-tick agent cache (Map<userId, Map<agentId, Agent>>)
  so N missions for one user hit the DB once.
- Gate order: agent archived → skip silently; agent paused → skip;
  per-agent maxConcurrentMissions exhausted this tick → defer to next.
  All skip paths bump mana_ai_agent_decisions_total{decision}.
- Prompt injection: withAgentContext prepends an <agent_context>
  block to the system prompt with the agent's name + role, and
  plaintext systemPrompt + memory when available. Ciphertext
  (enc:1:… blobs) are skipped — server has no key by design. Mirrors
  the Mission Grant privacy stance: encrypted context belongs to the
  foreground runner.

Iteration writer (db/iteration-writer.ts):
- New optional `agent` + `iterationId` + `rationale` inputs.
- When agent is present, the sync_changes row is stamped with a
  makeAgentActor actor (principalId=agentId, displayName=agent.name)
  so the webapp timeline groups the write under the right agent.
- Falls back to an AI actor with LEGACY_AI_PRINCIPAL + 'Mana' when
  the mission has no owning agent; ultimate fallback to the
  mission-runner system actor when iterationId is also missing.

Metrics:
- mana_ai_agent_decisions_total{decision=ran|skipped-paused|
  skipped-archived|skipped-concurrency}. Missions without an agent
  don't produce this metric — plansWrittenBackTotal is the universal
  "did we run" counter.

Tests: 41/41 (was 35) including 6 new cases for the agent LWW merge.
mana-ai type-check clean. Webapp svelte-check: 0 errors (4 unrelated
warnings in a different module).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 20:46:57 +02:00

121 lines
4.2 KiB
TypeScript

/**
* Schema migration for mana-ai's own derived state.
*
* Per the service's contract, mana-ai reads `sync_changes` (owned by
* mana-sync) and writes back to it through the public sync protocol.
* It doesn't own that schema. But it DOES need a small amount of
* persistent derived state — notably the mission snapshot table that
* caches LWW-merged records so the tick loop doesn't scan the full
* event log every minute.
*
* Such derived state lives in the `mana_ai` schema in the same
* Postgres database (`mana_sync`). One schema, one migration, called
* idempotently on service boot.
*/
import type { Sql } from './connection';
export async function migrate(sql: Sql): Promise<void> {
await sql`CREATE SCHEMA IF NOT EXISTS mana_ai`;
await sql`
CREATE TABLE IF NOT EXISTS mana_ai.mission_snapshots (
user_id TEXT NOT NULL,
mission_id TEXT NOT NULL,
record JSONB NOT NULL,
last_applied_at TIMESTAMPTZ NOT NULL DEFAULT 'epoch',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (user_id, mission_id)
)
`;
await sql`
CREATE INDEX IF NOT EXISTS idx_mission_snapshots_due
ON mana_ai.mission_snapshots ((record->>'state'), (record->>'nextRunAt'))
WHERE record->>'state' = 'active'
`;
await sql`
CREATE INDEX IF NOT EXISTS idx_mission_snapshots_user
ON mana_ai.mission_snapshots (user_id, last_applied_at)
`;
// ─── Mission Grant decrypt audit ─────────────────────────────
// Every server-side decrypt of an encrypted record (triggered by a
// Mission with a valid Grant) writes one row here. Surfaces in the
// webapp under "Mission → Datenzugriff" so the user can see exactly
// what the runner has read. Keep the row shape flat + append-only;
// never mutate after insert.
//
// Why in mana_ai and not mana-auth? The write originates here and
// the read is operator-scoped to a specific mission — keeping it
// adjacent to `mission_snapshots` means `withUser` transactions
// already have the right RLS set up.
await sql`
CREATE TABLE IF NOT EXISTS mana_ai.decrypt_audit (
id BIGSERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
mission_id TEXT NOT NULL,
iteration_id TEXT,
table_name TEXT NOT NULL,
record_id TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('ok', 'failed', 'scope-violation')),
reason TEXT,
ts TIMESTAMPTZ NOT NULL DEFAULT now()
)
`;
await sql`
CREATE INDEX IF NOT EXISTS idx_decrypt_audit_mission
ON mana_ai.decrypt_audit (user_id, mission_id, ts DESC)
`;
// Mirror the RLS pattern used on sync_changes / mission_snapshots:
// every read goes through a `withUser` transaction that sets
// `app.user_id`; the policy gates row visibility to that user.
await sql`ALTER TABLE mana_ai.decrypt_audit ENABLE ROW LEVEL SECURITY`;
await sql`
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_policies
WHERE schemaname = 'mana_ai'
AND tablename = 'decrypt_audit'
AND policyname = 'decrypt_audit_user_scope'
) THEN
CREATE POLICY decrypt_audit_user_scope ON mana_ai.decrypt_audit
USING (user_id = current_setting('app.current_user_id', true));
END IF;
END $$
`;
// ─── Agent snapshots (Multi-Agent Workbench, Phase 3) ────────
// Mirrors mission_snapshots: a materialized LWW-merged view of the
// agents table from sync_changes. Runner loads agents per-user
// per-tick from here instead of replaying the event log each time.
// systemPrompt + memory stay as they arrived (encrypted strings for
// most users); the runner opts to skip injecting ciphertext rather
// than requiring a per-agent Grant.
await sql`
CREATE TABLE IF NOT EXISTS mana_ai.agent_snapshots (
user_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
record JSONB NOT NULL,
last_applied_at TIMESTAMPTZ NOT NULL DEFAULT 'epoch',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (user_id, agent_id)
)
`;
await sql`
CREATE INDEX IF NOT EXISTS idx_agent_snapshots_user
ON mana_ai.agent_snapshots (user_id, last_applied_at)
`;
await sql`
CREATE INDEX IF NOT EXISTS idx_agent_snapshots_state
ON mana_ai.agent_snapshots ((record->>'state'))
WHERE record->>'state' = 'active'
`;
}