feat(events): Actor attribution on every DomainEvent

Introduces a discriminated Actor union (user | ai | system) threaded through
the event pipeline so downstream consumers can distinguish human writes from
AI-initiated ones and derived subsystem writes.

- `EventMeta.actor: Actor` is required (no legacy fallback — pre-launch)
- `emitDomainEvent` takes an options bag `{ actor?, causedBy? }`; falls back
  to the ambient actor set by `runAs` / `runAsAsync`
- `runAs` / `runAsAsync` pin the actor at defined boundaries (tool executor,
  mission runner, projection dispatcher) — primitives capture synchronously
  so ambient context is never SoT past the write moment

Foundation for the AI Workbench. Follow-up: mana-sync server must accept
and persist `actor` in pending-change payloads.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-14 20:48:03 +02:00
parent 2fb2bb60fb
commit a18506caf6
10 changed files with 257 additions and 21 deletions

View file

@ -13,6 +13,7 @@ vi.mock('$lib/triggers/inline-suggest', () => ({
import { db } from '$lib/data/database';
import { eventBus } from '$lib/data/events/event-bus';
import { USER_ACTOR } from '$lib/data/events/actor';
import { goalStore, startGoalTracker, stopGoalTracker, GOAL_TEMPLATES } from './index';
import type { LocalGoal } from './types';
@ -77,6 +78,7 @@ describe('goal event tracking', () => {
collection: 'tasks',
recordId: '1',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();
@ -102,6 +104,7 @@ describe('goal event tracking', () => {
collection: 'drinkEntries',
recordId: '1',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();
@ -120,6 +123,7 @@ describe('goal event tracking', () => {
collection: 'drinkEntries',
recordId: '2',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();
@ -144,6 +148,7 @@ describe('goal event tracking', () => {
collection: 'tasks',
recordId: '1',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();

View file

@ -0,0 +1,118 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { runAs, runAsAsync, getCurrentActor, USER_ACTOR, isAiActor, isSystemActor } from './actor';
import { emitDomainEvent } from './emit';
import { eventBus } from './event-bus';
import type { DomainEvent } from './types';
const AI_ACTOR = {
kind: 'ai',
missionId: 'm-1',
iterationId: 'i-1',
rationale: 'test',
} as const;
const SYSTEM_ACTOR = { kind: 'system', source: 'projection' } as const;
describe('actor context', () => {
it('defaults to the user actor', () => {
expect(getCurrentActor()).toEqual(USER_ACTOR);
});
it('pins the actor inside runAs and restores on exit', () => {
runAs(AI_ACTOR, () => {
expect(isAiActor(getCurrentActor())).toBe(true);
});
expect(getCurrentActor()).toEqual(USER_ACTOR);
});
it('recognises the system actor', () => {
runAs(SYSTEM_ACTOR, () => {
expect(isSystemActor(getCurrentActor())).toBe(true);
expect(isAiActor(getCurrentActor())).toBe(false);
});
});
it('restores the previous actor even when the body throws', () => {
expect(() =>
runAs(AI_ACTOR, () => {
throw new Error('boom');
})
).toThrow('boom');
expect(getCurrentActor()).toEqual(USER_ACTOR);
});
it('supports nesting', () => {
runAs({ ...AI_ACTOR, missionId: 'outer' }, () => {
expect((getCurrentActor() as { missionId: string }).missionId).toBe('outer');
runAs({ ...AI_ACTOR, missionId: 'inner' }, () => {
expect((getCurrentActor() as { missionId: string }).missionId).toBe('inner');
});
expect((getCurrentActor() as { missionId: string }).missionId).toBe('outer');
});
});
it('preserves the actor across awaits inside runAsAsync', async () => {
await runAsAsync({ ...AI_ACTOR, missionId: 'async' }, async () => {
await Promise.resolve();
expect((getCurrentActor() as { missionId: string }).missionId).toBe('async');
});
expect(getCurrentActor()).toEqual(USER_ACTOR);
});
});
describe('emitDomainEvent actor attribution', () => {
let received: DomainEvent[] = [];
beforeEach(() => {
received = [];
});
it('stamps the ambient actor onto the event meta', async () => {
const unsub = eventBus.on('TestEvent', (e) => received.push(e));
try {
runAs(AI_ACTOR, () => {
emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-1', { n: 1 });
});
await new Promise((r) => queueMicrotask(() => r(undefined)));
expect(received).toHaveLength(1);
expect(received[0].meta.actor).toEqual(AI_ACTOR);
} finally {
unsub();
}
});
it('defaults to the user actor outside runAs', async () => {
const unsub = eventBus.on('TestEvent', (e) => received.push(e));
try {
emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-2', { n: 2 });
await new Promise((r) => queueMicrotask(() => r(undefined)));
expect(received[0].meta.actor).toEqual(USER_ACTOR);
} finally {
unsub();
}
});
it('honours an explicit actor in options over the ambient one', async () => {
const unsub = eventBus.on('TestEvent', (e) => received.push(e));
try {
runAs(AI_ACTOR, () => {
emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-3', { n: 3 }, { actor: SYSTEM_ACTOR });
});
await new Promise((r) => queueMicrotask(() => r(undefined)));
expect(received[0].meta.actor).toEqual(SYSTEM_ACTOR);
} finally {
unsub();
}
});
it('carries causedBy through the options bag', async () => {
const unsub = eventBus.on('TestEvent', (e) => received.push(e));
try {
emitDomainEvent('TestEvent', 'todo', 'tasks', 'rec-4', { n: 4 }, { causedBy: 'parent-id' });
await new Promise((r) => queueMicrotask(() => r(undefined)));
expect(received[0].meta.causedBy).toBe('parent-id');
} finally {
unsub();
}
});
});

View file

@ -0,0 +1,95 @@
/**
* Actor attribution who triggered a write.
*
* Every DomainEvent, pending-change row, and synced record carries an Actor so
* the UI can distinguish user-initiated work from AI-initiated work, render
* ghost state for proposals, attribute field-level edits, and let the user
* revert a whole mission.
*
* Three actor kinds:
* - `user` the human at the keyboard
* - `ai` autonomous AI work, carrying mission/iteration metadata so the
* Workbench can group, review, and revert per-mission
* - `system` derived writes (projections, rule engines, data migrations)
* that are neither user nor AI
*
* Threading model: a module-level "current actor" acts like an AsyncLocalStorage
* fiber. The browser is single-threaded and the Dexie write path is synchronous,
* so a mutable slot wrapped by `runAs(actor, fn)` is enough at the boundaries
* (UI handlers, executor, runners). At the primitive sites (Dexie hooks,
* `emitDomainEvent`) the actor is **captured synchronously** and frozen onto
* the data ambient context is never the source of truth past that point.
*/
export type Actor =
| { readonly kind: 'user' }
| {
readonly kind: 'ai';
/** Mission this write belongs to. */
readonly missionId: string;
/** Iteration within the mission (nth autonomous run). */
readonly iterationId: string;
/** Human-readable reason the AI took this action. */
readonly rationale: string;
}
| {
readonly kind: 'system';
/** Subsystem responsible for this derived write. */
readonly source: 'projection' | 'rule' | 'migration';
};
export const USER_ACTOR: Actor = Object.freeze({ kind: 'user' });
let currentActor: Actor = USER_ACTOR;
/** Returns the actor attributed to the currently executing write. */
export function getCurrentActor(): Actor {
return currentActor;
}
/**
* Run `fn` with the given actor pinned to the current context. Restores the
* previous actor on exit, even if `fn` throws. Supports nesting.
*
* Use this at the three defined boundaries only:
* 1. Tool executor (AI-initiated tool calls)
* 2. Mission runner (background AI loop)
* 3. Projection / rule dispatcher (system-initiated cascades)
* Past those boundaries every write primitive freezes the actor onto data
* do not rely on ambient context across `setTimeout` / `queueMicrotask` hops.
*/
export function runAs<T>(actor: Actor, fn: () => T): T {
const previous = currentActor;
currentActor = actor;
try {
return fn();
} finally {
currentActor = previous;
}
}
/**
* Async variant of {@link runAs}. The actor stays pinned across awaits within
* the same Promise chain, but NOT across `setTimeout` or un-awaited work.
* That is fine only because every primitive (emitDomainEvent, Dexie hooks)
* captures the actor synchronously at the write moment.
*/
export async function runAsAsync<T>(actor: Actor, fn: () => Promise<T>): Promise<T> {
const previous = currentActor;
currentActor = actor;
try {
return await fn();
} finally {
currentActor = previous;
}
}
/** True when an AI agent wrote this record/event/field. */
export function isAiActor(actor: Actor | undefined): boolean {
return actor?.kind === 'ai';
}
/** True when a derived subsystem (projection / rule / migration) wrote it. */
export function isSystemActor(actor: Actor | undefined): boolean {
return actor?.kind === 'system';
}

View file

@ -1,23 +1,30 @@
/**
* Convenience helper for emitting domain events from module stores.
*
* Builds the EventMeta automatically so stores only need to specify
* the event type, routing info, and payload.
* Builds the EventMeta automatically so stores only need to specify the event
* type, routing info, payload, and (optionally) an explicit actor / cause.
*/
import { eventBus } from './event-bus';
import { getEffectiveUserId } from '../current-user';
import { getCurrentActor } from './actor';
import type { Actor } from './actor';
import type { DomainEvent } from './types';
export interface EmitOptions {
/**
* Who triggered this event. Defaults to the ambient actor set by `runAs`
* (which is `{ kind: 'user' }` when nothing else is active). Pass an
* explicit actor when crossing async boundaries where ambient context
* can't be trusted (e.g. deferred `setTimeout` callbacks).
*/
actor?: Actor;
/** Parent event ID (for trigger chains / cascades). */
causedBy?: string;
}
/**
* Emit a domain event on the shared bus.
*
* @example
* ```ts
* emitDomainEvent('TaskCompleted', 'todo', 'tasks', id, {
* taskId: id, title: task.title, wasOverdue: true,
* });
* ```
*/
export function emitDomainEvent<P>(
type: string,
@ -25,7 +32,7 @@ export function emitDomainEvent<P>(
collection: string,
recordId: string,
payload: P,
causedBy?: string
opts: EmitOptions = {}
): void {
const event: DomainEvent<string, P> = {
type,
@ -37,7 +44,8 @@ export function emitDomainEvent<P>(
collection,
recordId,
userId: getEffectiveUserId(),
causedBy,
actor: opts.actor ?? getCurrentActor(),
causedBy: opts.causedBy,
},
};
eventBus.emit(event);

View file

@ -9,6 +9,7 @@
import { describe, it, expect, vi } from 'vitest';
import { createEventBus } from './event-bus';
import { USER_ACTOR } from './actor';
import type { DomainEvent } from './types';
function makeEvent(type: string, payload: unknown = {}): DomainEvent {
@ -22,6 +23,7 @@ function makeEvent(type: string, payload: unknown = {}): DomainEvent {
collection: 'test',
recordId: '1',
userId: 'user1',
actor: USER_ACTOR,
},
};
}

View file

@ -53,15 +53,7 @@ interface StoredEvent {
seq?: number;
type: string;
payload: unknown;
meta: {
id: string;
timestamp: string;
appId: string;
collection: string;
recordId: string;
userId: string;
causedBy?: string;
};
meta: DomainEvent['meta'];
}
/** Query persisted events. Most recent first. */

View file

@ -1,4 +1,7 @@
export { eventBus, createEventBus } from './event-bus';
export { emitDomainEvent } from './emit';
export type { EmitOptions } from './emit';
export { runAs, runAsAsync, getCurrentActor, isAiActor, isSystemActor, USER_ACTOR } from './actor';
export type { Actor } from './actor';
export type { DomainEvent, EventMeta, EventBus, EventHandler } from './types';
export type * from './catalog';

View file

@ -7,6 +7,8 @@
* the LLM Context Builder to work without reverse-engineering field diffs.
*/
import type { Actor } from './actor';
// ── Core Event Shape ────────────────────────────────
export interface DomainEvent<T extends string = string, P = unknown> {
@ -28,7 +30,9 @@ export interface EventMeta {
readonly recordId: string;
/** User who triggered this */
readonly userId: string;
/** Parent event ID (for trigger chains / cascades) */
/** Who triggered this write — user, AI, or a derived subsystem. */
readonly actor: Actor;
/** Parent event ID (for trigger chains / cascades). */
readonly causedBy?: string;
}

View file

@ -16,6 +16,7 @@ vi.mock('$lib/triggers/inline-suggest', () => ({
import { db } from '../database';
import { computeCorrelations } from './correlations';
import { USER_ACTOR } from '../events/actor';
const EVENTS_TABLE = '_events';
@ -30,6 +31,7 @@ function makeEvent(type: string, payload: Record<string, unknown>, date: string)
collection: 'test',
recordId: crypto.randomUUID(),
userId: 'user1',
actor: USER_ACTOR,
},
};
}

View file

@ -13,6 +13,7 @@ vi.mock('$lib/triggers/inline-suggest', () => ({
import { db } from '../database';
import { eventBus } from '../events/event-bus';
import { USER_ACTOR } from '../events/actor';
import { startStreakTracker, stopStreakTracker, useStreaks } from './streaks';
const TABLE = '_streakState';
@ -47,6 +48,7 @@ describe('Streak Tracker', () => {
collection: 'tasks',
recordId: '1',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();
@ -72,6 +74,7 @@ describe('Streak Tracker', () => {
collection: 'tasks',
recordId: `${i}`,
userId: 'u1',
actor: USER_ACTOR,
},
});
}
@ -95,6 +98,7 @@ describe('Streak Tracker', () => {
collection: 'drinkEntries',
recordId: '1',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();
@ -117,6 +121,7 @@ describe('Streak Tracker', () => {
collection: 'drinkEntries',
recordId: '1',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();
@ -139,6 +144,7 @@ describe('Streak Tracker', () => {
collection: 'tasks',
recordId: '1',
userId: 'u1',
actor: USER_ACTOR,
},
});
eventBus.emit({
@ -157,6 +163,7 @@ describe('Streak Tracker', () => {
collection: 'meals',
recordId: '2',
userId: 'u1',
actor: USER_ACTOR,
},
});
await flush();