From a18506caf6637a79c11a4fc9a118166953541d2e Mon Sep 17 00:00:00 2001 From: Till JS Date: Tue, 14 Apr 2026 20:48:03 +0200 Subject: [PATCH] feat(events): Actor attribution on every DomainEvent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a discriminated Actor union (user | ai | system) threaded through the event pipeline so downstream consumers can distinguish human writes from AI-initiated ones and derived subsystem writes. - `EventMeta.actor: Actor` is required (no legacy fallback — pre-launch) - `emitDomainEvent` takes an options bag `{ actor?, causedBy? }`; falls back to the ambient actor set by `runAs` / `runAsAsync` - `runAs` / `runAsAsync` pin the actor at defined boundaries (tool executor, mission runner, projection dispatcher) — primitives capture synchronously so ambient context is never SoT past the write moment Foundation for the AI Workbench. Follow-up: mana-sync server must accept and persist `actor` in pending-change payloads. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../web/src/lib/companion/goals/store.test.ts | 5 + .../web/src/lib/data/events/actor.test.ts | 118 ++++++++++++++++++ .../apps/web/src/lib/data/events/actor.ts | 95 ++++++++++++++ .../mana/apps/web/src/lib/data/events/emit.ts | 30 +++-- .../web/src/lib/data/events/event-bus.test.ts | 2 + .../web/src/lib/data/events/event-store.ts | 10 +- .../apps/web/src/lib/data/events/index.ts | 3 + .../apps/web/src/lib/data/events/types.ts | 6 +- .../lib/data/projections/correlations.test.ts | 2 + .../src/lib/data/projections/streaks.test.ts | 7 ++ 10 files changed, 257 insertions(+), 21 deletions(-) create mode 100644 apps/mana/apps/web/src/lib/data/events/actor.test.ts create mode 100644 apps/mana/apps/web/src/lib/data/events/actor.ts diff --git a/apps/mana/apps/web/src/lib/companion/goals/store.test.ts b/apps/mana/apps/web/src/lib/companion/goals/store.test.ts index 70b7add13..b8dff482b 100644 --- a/apps/mana/apps/web/src/lib/companion/goals/store.test.ts +++ b/apps/mana/apps/web/src/lib/companion/goals/store.test.ts @@ -13,6 +13,7 @@ vi.mock('$lib/triggers/inline-suggest', () => ({ import { db } from '$lib/data/database'; import { eventBus } from '$lib/data/events/event-bus'; +import { USER_ACTOR } from '$lib/data/events/actor'; import { goalStore, startGoalTracker, stopGoalTracker, GOAL_TEMPLATES } from './index'; import type { LocalGoal } from './types'; @@ -77,6 +78,7 @@ describe('goal event tracking', () => { collection: 'tasks', recordId: '1', userId: 'u1', + actor: USER_ACTOR, }, }); await flush(); @@ -102,6 +104,7 @@ describe('goal event tracking', () => { collection: 'drinkEntries', recordId: '1', userId: 'u1', + actor: USER_ACTOR, }, }); await flush(); @@ -120,6 +123,7 @@ describe('goal event tracking', () => { collection: 'drinkEntries', recordId: '2', userId: 'u1', + actor: USER_ACTOR, }, }); await flush(); @@ -144,6 +148,7 @@ describe('goal event tracking', () => { collection: 'tasks', recordId: '1', userId: 'u1', + actor: USER_ACTOR, }, }); await flush(); diff --git a/apps/mana/apps/web/src/lib/data/events/actor.test.ts b/apps/mana/apps/web/src/lib/data/events/actor.test.ts new file mode 100644 index 000000000..73866b77b --- /dev/null +++ b/apps/mana/apps/web/src/lib/data/events/actor.test.ts @@ -0,0 +1,118 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { runAs, runAsAsync, getCurrentActor, USER_ACTOR, isAiActor, isSystemActor } from './actor'; +import { emitDomainEvent } from './emit'; +import { eventBus } from './event-bus'; +import type { DomainEvent } from './types'; + +const AI_ACTOR = { + kind: 'ai', + missionId: 'm-1', + iterationId: 'i-1', + rationale: 'test', +} as const; + +const SYSTEM_ACTOR = { kind: 'system', source: 'projection' } as const; + +describe('actor context', () => { + it('defaults to the user actor', () => { + expect(getCurrentActor()).toEqual(USER_ACTOR); + }); + + it('pins the actor inside runAs and restores on exit', () => { + runAs(AI_ACTOR, () => { + expect(isAiActor(getCurrentActor())).toBe(true); + }); + expect(getCurrentActor()).toEqual(USER_ACTOR); + }); + + it('recognises the system actor', () => { + runAs(SYSTEM_ACTOR, () => { + expect(isSystemActor(getCurrentActor())).toBe(true); + expect(isAiActor(getCurrentActor())).toBe(false); + }); + }); + + it('restores the previous actor even when the body throws', () => { + expect(() => + runAs(AI_ACTOR, () => { + throw new Error('boom'); + }) + ).toThrow('boom'); + expect(getCurrentActor()).toEqual(USER_ACTOR); + }); + + it('supports nesting', () => { + runAs({ ...AI_ACTOR, missionId: 'outer' }, () => { + expect((getCurrentActor() as { missionId: string }).missionId).toBe('outer'); + runAs({ ...AI_ACTOR, missionId: 'inner' }, () => { + expect((getCurrentActor() as { missionId: string }).missionId).toBe('inner'); + }); + expect((getCurrentActor() as { missionId: string }).missionId).toBe('outer'); + }); + }); + + it('preserves the actor across awaits inside runAsAsync', async () => { + await runAsAsync({ ...AI_ACTOR, missionId: 'async' }, async () => { + await Promise.resolve(); + expect((getCurrentActor() as { missionId: string }).missionId).toBe('async'); + }); + expect(getCurrentActor()).toEqual(USER_ACTOR); + }); +}); + +describe('emitDomainEvent actor attribution', () => { + let received: DomainEvent[] = []; + + beforeEach(() => { + received = []; + }); + + it('stamps the ambient actor onto the event meta', async () => { + const unsub = eventBus.on('TestEvent', (e) => received.push(e)); + try { + runAs(AI_ACTOR, () => { + emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-1', { n: 1 }); + }); + await new Promise((r) => queueMicrotask(() => r(undefined))); + expect(received).toHaveLength(1); + expect(received[0].meta.actor).toEqual(AI_ACTOR); + } finally { + unsub(); + } + }); + + it('defaults to the user actor outside runAs', async () => { + const unsub = eventBus.on('TestEvent', (e) => received.push(e)); + try { + emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-2', { n: 2 }); + await new Promise((r) => queueMicrotask(() => r(undefined))); + expect(received[0].meta.actor).toEqual(USER_ACTOR); + } finally { + unsub(); + } + }); + + it('honours an explicit actor in options over the ambient one', async () => { + const unsub = eventBus.on('TestEvent', (e) => received.push(e)); + try { + runAs(AI_ACTOR, () => { + emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-3', { n: 3 }, { actor: SYSTEM_ACTOR }); + }); + await new Promise((r) => queueMicrotask(() => r(undefined))); + expect(received[0].meta.actor).toEqual(SYSTEM_ACTOR); + } finally { + unsub(); + } + }); + + it('carries causedBy through the options bag', async () => { + const unsub = eventBus.on('TestEvent', (e) => received.push(e)); + try { + emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-4', { n: 4 }, { causedBy: 'parent-id' }); + await new Promise((r) => queueMicrotask(() => r(undefined))); + expect(received[0].meta.causedBy).toBe('parent-id'); + } finally { + unsub(); + } + }); +}); diff --git a/apps/mana/apps/web/src/lib/data/events/actor.ts b/apps/mana/apps/web/src/lib/data/events/actor.ts new file mode 100644 index 000000000..200f369eb --- /dev/null +++ b/apps/mana/apps/web/src/lib/data/events/actor.ts @@ -0,0 +1,95 @@ +/** + * Actor attribution — who triggered a write. + * + * Every DomainEvent, pending-change row, and synced record carries an Actor so + * the UI can distinguish user-initiated work from AI-initiated work, render + * ghost state for proposals, attribute field-level edits, and let the user + * revert a whole mission. + * + * Three actor kinds: + * - `user` — the human at the keyboard + * - `ai` — autonomous AI work, carrying mission/iteration metadata so the + * Workbench can group, review, and revert per-mission + * - `system` — derived writes (projections, rule engines, data migrations) + * that are neither user nor AI + * + * Threading model: a module-level "current actor" acts like an AsyncLocalStorage + * fiber. The browser is single-threaded and the Dexie write path is synchronous, + * so a mutable slot wrapped by `runAs(actor, fn)` is enough at the boundaries + * (UI handlers, executor, runners). At the primitive sites (Dexie hooks, + * `emitDomainEvent`) the actor is **captured synchronously** and frozen onto + * the data — ambient context is never the source of truth past that point. + */ + +export type Actor = + | { readonly kind: 'user' } + | { + readonly kind: 'ai'; + /** Mission this write belongs to. */ + readonly missionId: string; + /** Iteration within the mission (nth autonomous run). */ + readonly iterationId: string; + /** Human-readable reason the AI took this action. */ + readonly rationale: string; + } + | { + readonly kind: 'system'; + /** Subsystem responsible for this derived write. */ + readonly source: 'projection' | 'rule' | 'migration'; + }; + +export const USER_ACTOR: Actor = Object.freeze({ kind: 'user' }); + +let currentActor: Actor = USER_ACTOR; + +/** Returns the actor attributed to the currently executing write. */ +export function getCurrentActor(): Actor { + return currentActor; +} + +/** + * Run `fn` with the given actor pinned to the current context. Restores the + * previous actor on exit, even if `fn` throws. Supports nesting. + * + * Use this at the three defined boundaries only: + * 1. Tool executor (AI-initiated tool calls) + * 2. Mission runner (background AI loop) + * 3. Projection / rule dispatcher (system-initiated cascades) + * Past those boundaries every write primitive freezes the actor onto data — + * do not rely on ambient context across `setTimeout` / `queueMicrotask` hops. + */ +export function runAs(actor: Actor, fn: () => T): T { + const previous = currentActor; + currentActor = actor; + try { + return fn(); + } finally { + currentActor = previous; + } +} + +/** + * Async variant of {@link runAs}. The actor stays pinned across awaits within + * the same Promise chain, but NOT across `setTimeout` or un-awaited work. + * That is fine only because every primitive (emitDomainEvent, Dexie hooks) + * captures the actor synchronously at the write moment. + */ +export async function runAsAsync(actor: Actor, fn: () => Promise): Promise { + const previous = currentActor; + currentActor = actor; + try { + return await fn(); + } finally { + currentActor = previous; + } +} + +/** True when an AI agent wrote this record/event/field. */ +export function isAiActor(actor: Actor | undefined): boolean { + return actor?.kind === 'ai'; +} + +/** True when a derived subsystem (projection / rule / migration) wrote it. */ +export function isSystemActor(actor: Actor | undefined): boolean { + return actor?.kind === 'system'; +} diff --git a/apps/mana/apps/web/src/lib/data/events/emit.ts b/apps/mana/apps/web/src/lib/data/events/emit.ts index d87516a13..b65680015 100644 --- a/apps/mana/apps/web/src/lib/data/events/emit.ts +++ b/apps/mana/apps/web/src/lib/data/events/emit.ts @@ -1,23 +1,30 @@ /** * Convenience helper for emitting domain events from module stores. * - * Builds the EventMeta automatically so stores only need to specify - * the event type, routing info, and payload. + * Builds the EventMeta automatically so stores only need to specify the event + * type, routing info, payload, and (optionally) an explicit actor / cause. */ import { eventBus } from './event-bus'; import { getEffectiveUserId } from '../current-user'; +import { getCurrentActor } from './actor'; +import type { Actor } from './actor'; import type { DomainEvent } from './types'; +export interface EmitOptions { + /** + * Who triggered this event. Defaults to the ambient actor set by `runAs` + * (which is `{ kind: 'user' }` when nothing else is active). Pass an + * explicit actor when crossing async boundaries where ambient context + * can't be trusted (e.g. deferred `setTimeout` callbacks). + */ + actor?: Actor; + /** Parent event ID (for trigger chains / cascades). */ + causedBy?: string; +} + /** * Emit a domain event on the shared bus. - * - * @example - * ```ts - * emitDomainEvent('TaskCompleted', 'todo', 'tasks', id, { - * taskId: id, title: task.title, wasOverdue: true, - * }); - * ``` */ export function emitDomainEvent

( type: string, @@ -25,7 +32,7 @@ export function emitDomainEvent

( collection: string, recordId: string, payload: P, - causedBy?: string + opts: EmitOptions = {} ): void { const event: DomainEvent = { type, @@ -37,7 +44,8 @@ export function emitDomainEvent

( collection, recordId, userId: getEffectiveUserId(), - causedBy, + actor: opts.actor ?? getCurrentActor(), + causedBy: opts.causedBy, }, }; eventBus.emit(event); diff --git a/apps/mana/apps/web/src/lib/data/events/event-bus.test.ts b/apps/mana/apps/web/src/lib/data/events/event-bus.test.ts index 99ac0a58d..f1cbacdd3 100644 --- a/apps/mana/apps/web/src/lib/data/events/event-bus.test.ts +++ b/apps/mana/apps/web/src/lib/data/events/event-bus.test.ts @@ -9,6 +9,7 @@ import { describe, it, expect, vi } from 'vitest'; import { createEventBus } from './event-bus'; +import { USER_ACTOR } from './actor'; import type { DomainEvent } from './types'; function makeEvent(type: string, payload: unknown = {}): DomainEvent { @@ -22,6 +23,7 @@ function makeEvent(type: string, payload: unknown = {}): DomainEvent { collection: 'test', recordId: '1', userId: 'user1', + actor: USER_ACTOR, }, }; } diff --git a/apps/mana/apps/web/src/lib/data/events/event-store.ts b/apps/mana/apps/web/src/lib/data/events/event-store.ts index a6d94f5ed..babe33988 100644 --- a/apps/mana/apps/web/src/lib/data/events/event-store.ts +++ b/apps/mana/apps/web/src/lib/data/events/event-store.ts @@ -53,15 +53,7 @@ interface StoredEvent { seq?: number; type: string; payload: unknown; - meta: { - id: string; - timestamp: string; - appId: string; - collection: string; - recordId: string; - userId: string; - causedBy?: string; - }; + meta: DomainEvent['meta']; } /** Query persisted events. Most recent first. */ diff --git a/apps/mana/apps/web/src/lib/data/events/index.ts b/apps/mana/apps/web/src/lib/data/events/index.ts index 3687db1f0..0208d06c7 100644 --- a/apps/mana/apps/web/src/lib/data/events/index.ts +++ b/apps/mana/apps/web/src/lib/data/events/index.ts @@ -1,4 +1,7 @@ export { eventBus, createEventBus } from './event-bus'; export { emitDomainEvent } from './emit'; +export type { EmitOptions } from './emit'; +export { runAs, runAsAsync, getCurrentActor, isAiActor, isSystemActor, USER_ACTOR } from './actor'; +export type { Actor } from './actor'; export type { DomainEvent, EventMeta, EventBus, EventHandler } from './types'; export type * from './catalog'; diff --git a/apps/mana/apps/web/src/lib/data/events/types.ts b/apps/mana/apps/web/src/lib/data/events/types.ts index 0abb305b3..cf04b71a2 100644 --- a/apps/mana/apps/web/src/lib/data/events/types.ts +++ b/apps/mana/apps/web/src/lib/data/events/types.ts @@ -7,6 +7,8 @@ * the LLM Context Builder to work without reverse-engineering field diffs. */ +import type { Actor } from './actor'; + // ── Core Event Shape ──────────────────────────────── export interface DomainEvent { @@ -28,7 +30,9 @@ export interface EventMeta { readonly recordId: string; /** User who triggered this */ readonly userId: string; - /** Parent event ID (for trigger chains / cascades) */ + /** Who triggered this write — user, AI, or a derived subsystem. */ + readonly actor: Actor; + /** Parent event ID (for trigger chains / cascades). */ readonly causedBy?: string; } diff --git a/apps/mana/apps/web/src/lib/data/projections/correlations.test.ts b/apps/mana/apps/web/src/lib/data/projections/correlations.test.ts index 70e646dcc..ea1bddb9e 100644 --- a/apps/mana/apps/web/src/lib/data/projections/correlations.test.ts +++ b/apps/mana/apps/web/src/lib/data/projections/correlations.test.ts @@ -16,6 +16,7 @@ vi.mock('$lib/triggers/inline-suggest', () => ({ import { db } from '../database'; import { computeCorrelations } from './correlations'; +import { USER_ACTOR } from '../events/actor'; const EVENTS_TABLE = '_events'; @@ -30,6 +31,7 @@ function makeEvent(type: string, payload: Record, date: string) collection: 'test', recordId: crypto.randomUUID(), userId: 'user1', + actor: USER_ACTOR, }, }; } diff --git a/apps/mana/apps/web/src/lib/data/projections/streaks.test.ts b/apps/mana/apps/web/src/lib/data/projections/streaks.test.ts index bc9d76157..7f89a6269 100644 --- a/apps/mana/apps/web/src/lib/data/projections/streaks.test.ts +++ b/apps/mana/apps/web/src/lib/data/projections/streaks.test.ts @@ -13,6 +13,7 @@ vi.mock('$lib/triggers/inline-suggest', () => ({ import { db } from '../database'; import { eventBus } from '../events/event-bus'; +import { USER_ACTOR } from '../events/actor'; import { startStreakTracker, stopStreakTracker, useStreaks } from './streaks'; const TABLE = '_streakState'; @@ -47,6 +48,7 @@ describe('Streak Tracker', () => { collection: 'tasks', recordId: '1', userId: 'u1', + actor: USER_ACTOR, }, }); await flush(); @@ -72,6 +74,7 @@ describe('Streak Tracker', () => { collection: 'tasks', recordId: `${i}`, userId: 'u1', + actor: USER_ACTOR, }, }); } @@ -95,6 +98,7 @@ describe('Streak Tracker', () => { collection: 'drinkEntries', recordId: '1', userId: 'u1', + actor: USER_ACTOR, }, }); await flush(); @@ -117,6 +121,7 @@ describe('Streak Tracker', () => { collection: 'drinkEntries', recordId: '1', userId: 'u1', + actor: USER_ACTOR, }, }); await flush(); @@ -139,6 +144,7 @@ describe('Streak Tracker', () => { collection: 'tasks', recordId: '1', userId: 'u1', + actor: USER_ACTOR, }, }); eventBus.emit({ @@ -157,6 +163,7 @@ describe('Streak Tracker', () => { collection: 'meals', recordId: '2', userId: 'u1', + actor: USER_ACTOR, }, }); await flush();