diff --git a/services/mana-ai/src/db/missions-projection.test.ts b/services/mana-ai/src/db/missions-projection.test.ts new file mode 100644 index 000000000..a91cc80f8 --- /dev/null +++ b/services/mana-ai/src/db/missions-projection.test.ts @@ -0,0 +1,133 @@ +import { describe, it, expect } from 'bun:test'; +import { mergeAndFilter } from './missions-projection'; + +function row(overrides: Record) { + return { + table_name: 'aiMissions', + record_id: 'm-1', + user_id: 'u-1', + op: 'insert', + data: null, + field_timestamps: null, + created_at: new Date('2026-04-15T00:00:00Z'), + ...overrides, + } as Parameters[0][number]; +} + +const NOW = '2026-04-15T12:00:00Z'; + +describe('mergeAndFilter', () => { + it('returns an active due mission', () => { + const rows = [ + row({ + op: 'insert', + data: { + state: 'active', + title: 'x', + objective: 'y', + conceptMarkdown: '', + inputs: [], + cadence: { kind: 'manual' }, + iterations: [], + nextRunAt: '2026-04-15T00:00:00Z', + }, + }), + ]; + const out = mergeAndFilter(rows, 'u-1', NOW); + expect(out).toHaveLength(1); + expect(out[0].title).toBe('x'); + }); + + it('drops missions whose nextRunAt is in the future', () => { + const rows = [ + row({ + data: { + state: 'active', + title: 'x', + objective: 'y', + inputs: [], + cadence: { kind: 'manual' }, + iterations: [], + nextRunAt: '2099-01-01T00:00:00Z', + }, + }), + ]; + expect(mergeAndFilter(rows, 'u-1', NOW)).toHaveLength(0); + }); + + it('drops paused / done / deleted missions', () => { + const rows = [ + row({ + data: { + state: 'paused', + inputs: [], + cadence: { kind: 'manual' }, + iterations: [], + nextRunAt: '2026-04-15T00:00:00Z', + }, + }), + ]; + expect(mergeAndFilter(rows, 'u-1', NOW)).toHaveLength(0); + }); + + it('honours delete tombstones', () => { + const rows = [ + row({ + op: 'insert', + data: { + state: 'active', + inputs: [], + cadence: { kind: 'manual' }, + iterations: [], + nextRunAt: '2026-04-15T00:00:00Z', + }, + }), + row({ op: 'delete', created_at: new Date('2026-04-15T01:00:00Z') }), + ]; + expect(mergeAndFilter(rows, 'u-1', NOW)).toHaveLength(0); + }); + + it('LWW-merges field updates from multiple rows', () => { + const rows = [ + row({ + created_at: new Date('2026-04-15T00:00:00Z'), + data: { + state: 'active', + title: 'old', + inputs: [], + cadence: { kind: 'manual' }, + iterations: [], + nextRunAt: '2026-04-15T00:00:00Z', + }, + field_timestamps: { + state: '2026-04-15T00:00:00Z', + title: '2026-04-15T00:00:00Z', + nextRunAt: '2026-04-15T00:00:00Z', + }, + }), + row({ + created_at: new Date('2026-04-15T01:00:00Z'), + data: { title: 'new' }, + field_timestamps: { title: '2026-04-15T01:00:00Z' }, + }), + ]; + const out = mergeAndFilter(rows, 'u-1', NOW); + expect(out).toHaveLength(1); + expect(out[0].title).toBe('new'); + }); + + it('stamps the supplied userId on each result', () => { + const rows = [ + row({ + data: { + state: 'active', + inputs: [], + cadence: { kind: 'manual' }, + iterations: [], + nextRunAt: '2026-04-15T00:00:00Z', + }, + }), + ]; + expect(mergeAndFilter(rows, 'different-user', NOW)[0].userId).toBe('different-user'); + }); +}); diff --git a/services/mana-ai/src/db/missions-projection.ts b/services/mana-ai/src/db/missions-projection.ts index 7453614ab..d2a41c003 100644 --- a/services/mana-ai/src/db/missions-projection.ts +++ b/services/mana-ai/src/db/missions-projection.ts @@ -11,6 +11,7 @@ */ import type { Sql } from './connection'; +import { withUser } from './connection'; /** * Subset of the Mission shape the server needs. Matches @@ -40,62 +41,108 @@ interface ChangeRow { } /** - * Return all currently-active missions whose `nextRunAt` has passed. - * Server-side equivalent of `listMissions({ dueBefore: now })` in the - * webapp store. + * Return the distinct user_ids that have ever written an aiMissions row. + * + * This is the ONE query that needs to see across users. It runs WITHOUT + * `withUser`, so the DB role hosting mana-ai either: + * - has BYPASSRLS (simplest — ops choice), or + * - owns sync_changes and the FORCE RLS policy excludes owner (default + * Postgres semantics; requires dropping `FORCE ROW LEVEL SECURITY`) + * + * The per-user read paths below scope through RLS normally, so a leaky + * user_ids discovery is the only cross-user surface this service exposes. + */ +export async function listMissionUsers(sql: Sql): Promise { + const rows = await sql<{ user_id: string }[]>` + SELECT DISTINCT user_id + FROM sync_changes + WHERE app_id = 'ai' AND table_name = 'aiMissions' + `; + return rows.map((r) => r.user_id); +} + +/** + * Return active missions for a single user whose `nextRunAt` has passed. + * RLS-scoped via `withUser` — defense-in-depth against a query wandering + * outside its user. + */ +async function listDueMissionsForUser( + sql: Sql, + userId: string, + now: string +): Promise { + const rows = await withUser( + sql, + userId, + async (tx) => + tx` + SELECT table_name, record_id, user_id, op, data, field_timestamps, created_at + FROM sync_changes + WHERE app_id = 'ai' AND table_name = 'aiMissions' AND user_id = ${userId} + ORDER BY created_at ASC + ` + ); + return mergeAndFilter(rows as ChangeRow[], userId, now); +} + +/** + * Return all currently-active missions whose `nextRunAt` has passed, + * across every user. Two-phase: discover users (cross-user), then + * RLS-scope per user. * * @param now ISO timestamp used as the due-before cutoff. */ export async function listDueMissions(sql: Sql, now: string): Promise { - // Pull every event for the ai app across users. For a real deploy - // we'd scope per-user or shard; the pre-launch user count makes this - // single scan defensible. - const rows = await sql` - SELECT table_name, record_id, user_id, op, data, field_timestamps, created_at - FROM sync_changes - WHERE app_id = 'ai' AND table_name = 'aiMissions' - ORDER BY created_at ASC - `; + const users = await listMissionUsers(sql); + const perUser = await Promise.all(users.map((u) => listDueMissionsForUser(sql, u, now))); + return perUser.flat(); +} - // Replay per record. Map key: userId::recordId (user isolation is kept - // even though we fetched across users in one scan — the result goes - // back to whichever user owns each row). - const merged = new Map }>(); +/** + * Merge `sync_changes` rows for ONE user's aiMissions set via field-level + * LWW, then filter down to due + active records. + * + * Pure function — no DB access, no ambient state. Exported for tests. + */ +export function mergeAndFilter( + rows: readonly ChangeRow[], + userId: string, + now: string +): ServerMission[] { + const merged = new Map>(); for (const row of rows) { - const key = `${row.user_id}::${row.record_id}`; - const entry = merged.get(key); + const existing = merged.get(row.record_id); if (row.op === 'delete') { - merged.delete(key); + merged.delete(row.record_id); continue; } - if (!entry) { + if (!existing) { if (row.data) { - merged.set(key, { userId: row.user_id, record: { id: row.record_id, ...row.data } }); + merged.set(row.record_id, { id: row.record_id, ...row.data }); } continue; } - // Field-level LWW: newer timestamps overwrite. - const prevFT = (entry.record.__fieldTimestamps as Record | undefined) ?? {}; + const prevFT = (existing.__fieldTimestamps as Record | undefined) ?? {}; const nextFT = { ...prevFT }; if (row.data) { for (const [k, v] of Object.entries(row.data)) { const serverTime = row.field_timestamps?.[k] ?? row.created_at.toISOString(); const localTime = prevFT[k] ?? ''; if (serverTime >= localTime) { - entry.record[k] = v; + existing[k] = v; nextFT[k] = serverTime; } } } - entry.record.__fieldTimestamps = nextFT; + existing.__fieldTimestamps = nextFT; } const missions: ServerMission[] = []; - for (const { userId, record } of merged.values()) { + for (const record of merged.values()) { const state = record.state as ServerMission['state']; const nextRunAt = record.nextRunAt as string | undefined; const deletedAt = record.deletedAt as string | undefined;