mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 19:01:08 +02:00
refactor(mana-ai): RLS-scope mission reads via per-user two-phase query
Closes the "cross-user scan" caveat on the mission read path. The
earlier implementation pulled every aiMissions row server-wide and
partitioned by user_id in memory — fine for a pre-launch single-user
deploy, not a cross-user infrastructure.
New flow:
1. `listMissionUsers(sql)` — one cross-user DISTINCT query. This is
the ONLY surface that still reads across users; documented as
requiring BYPASSRLS on the service's DB role (or ownership without
FORCE).
2. `listDueMissionsForUser(sql, userId, now)` — RLS-scoped via
`withUser(sql, userId, tx => ...)` just like the write path in
`iteration-writer.ts`. Defense-in-depth: even if the SELECT mis-
filters, RLS drops any row whose user_id doesn't match the session
setting.
3. `listDueMissions(sql, now)` — two-phase composition of the above.
The LWW merge + due-filter logic is factored out into a pure
`mergeAndFilter(rows, userId, now)`. Fully unit-tested (6 Bun cases):
active-due happy-path, future nextRunAt, non-active state, delete
tombstone, multi-row LWW merge, userId stamping.
Matches the pattern already in use for writes (`db/connection.ts:withUser`
+ `db/iteration-writer.ts`). Docstring on `listMissionUsers` spells out
the remaining BYPASSRLS dependency so ops knows what role the service
needs.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b03bbe132e
commit
ad1659f036
2 changed files with 206 additions and 26 deletions
133
services/mana-ai/src/db/missions-projection.test.ts
Normal file
133
services/mana-ai/src/db/missions-projection.test.ts
Normal file
|
|
@ -0,0 +1,133 @@
|
|||
import { describe, it, expect } from 'bun:test';
|
||||
import { mergeAndFilter } from './missions-projection';
|
||||
|
||||
function row(overrides: Record<string, unknown>) {
|
||||
return {
|
||||
table_name: 'aiMissions',
|
||||
record_id: 'm-1',
|
||||
user_id: 'u-1',
|
||||
op: 'insert',
|
||||
data: null,
|
||||
field_timestamps: null,
|
||||
created_at: new Date('2026-04-15T00:00:00Z'),
|
||||
...overrides,
|
||||
} as Parameters<typeof mergeAndFilter>[0][number];
|
||||
}
|
||||
|
||||
const NOW = '2026-04-15T12:00:00Z';
|
||||
|
||||
describe('mergeAndFilter', () => {
|
||||
it('returns an active due mission', () => {
|
||||
const rows = [
|
||||
row({
|
||||
op: 'insert',
|
||||
data: {
|
||||
state: 'active',
|
||||
title: 'x',
|
||||
objective: 'y',
|
||||
conceptMarkdown: '',
|
||||
inputs: [],
|
||||
cadence: { kind: 'manual' },
|
||||
iterations: [],
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
},
|
||||
}),
|
||||
];
|
||||
const out = mergeAndFilter(rows, 'u-1', NOW);
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].title).toBe('x');
|
||||
});
|
||||
|
||||
it('drops missions whose nextRunAt is in the future', () => {
|
||||
const rows = [
|
||||
row({
|
||||
data: {
|
||||
state: 'active',
|
||||
title: 'x',
|
||||
objective: 'y',
|
||||
inputs: [],
|
||||
cadence: { kind: 'manual' },
|
||||
iterations: [],
|
||||
nextRunAt: '2099-01-01T00:00:00Z',
|
||||
},
|
||||
}),
|
||||
];
|
||||
expect(mergeAndFilter(rows, 'u-1', NOW)).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('drops paused / done / deleted missions', () => {
|
||||
const rows = [
|
||||
row({
|
||||
data: {
|
||||
state: 'paused',
|
||||
inputs: [],
|
||||
cadence: { kind: 'manual' },
|
||||
iterations: [],
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
},
|
||||
}),
|
||||
];
|
||||
expect(mergeAndFilter(rows, 'u-1', NOW)).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('honours delete tombstones', () => {
|
||||
const rows = [
|
||||
row({
|
||||
op: 'insert',
|
||||
data: {
|
||||
state: 'active',
|
||||
inputs: [],
|
||||
cadence: { kind: 'manual' },
|
||||
iterations: [],
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
},
|
||||
}),
|
||||
row({ op: 'delete', created_at: new Date('2026-04-15T01:00:00Z') }),
|
||||
];
|
||||
expect(mergeAndFilter(rows, 'u-1', NOW)).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('LWW-merges field updates from multiple rows', () => {
|
||||
const rows = [
|
||||
row({
|
||||
created_at: new Date('2026-04-15T00:00:00Z'),
|
||||
data: {
|
||||
state: 'active',
|
||||
title: 'old',
|
||||
inputs: [],
|
||||
cadence: { kind: 'manual' },
|
||||
iterations: [],
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
},
|
||||
field_timestamps: {
|
||||
state: '2026-04-15T00:00:00Z',
|
||||
title: '2026-04-15T00:00:00Z',
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
},
|
||||
}),
|
||||
row({
|
||||
created_at: new Date('2026-04-15T01:00:00Z'),
|
||||
data: { title: 'new' },
|
||||
field_timestamps: { title: '2026-04-15T01:00:00Z' },
|
||||
}),
|
||||
];
|
||||
const out = mergeAndFilter(rows, 'u-1', NOW);
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].title).toBe('new');
|
||||
});
|
||||
|
||||
it('stamps the supplied userId on each result', () => {
|
||||
const rows = [
|
||||
row({
|
||||
data: {
|
||||
state: 'active',
|
||||
inputs: [],
|
||||
cadence: { kind: 'manual' },
|
||||
iterations: [],
|
||||
nextRunAt: '2026-04-15T00:00:00Z',
|
||||
},
|
||||
}),
|
||||
];
|
||||
expect(mergeAndFilter(rows, 'different-user', NOW)[0].userId).toBe('different-user');
|
||||
});
|
||||
});
|
||||
|
|
@ -11,6 +11,7 @@
|
|||
*/
|
||||
|
||||
import type { Sql } from './connection';
|
||||
import { withUser } from './connection';
|
||||
|
||||
/**
|
||||
* Subset of the Mission shape the server needs. Matches
|
||||
|
|
@ -40,62 +41,108 @@ interface ChangeRow {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return all currently-active missions whose `nextRunAt` has passed.
|
||||
* Server-side equivalent of `listMissions({ dueBefore: now })` in the
|
||||
* webapp store.
|
||||
* Return the distinct user_ids that have ever written an aiMissions row.
|
||||
*
|
||||
* This is the ONE query that needs to see across users. It runs WITHOUT
|
||||
* `withUser`, so the DB role hosting mana-ai either:
|
||||
* - has BYPASSRLS (simplest — ops choice), or
|
||||
* - owns sync_changes and the FORCE RLS policy excludes owner (default
|
||||
* Postgres semantics; requires dropping `FORCE ROW LEVEL SECURITY`)
|
||||
*
|
||||
* The per-user read paths below scope through RLS normally, so a leaky
|
||||
* user_ids discovery is the only cross-user surface this service exposes.
|
||||
*/
|
||||
export async function listMissionUsers(sql: Sql): Promise<string[]> {
|
||||
const rows = await sql<{ user_id: string }[]>`
|
||||
SELECT DISTINCT user_id
|
||||
FROM sync_changes
|
||||
WHERE app_id = 'ai' AND table_name = 'aiMissions'
|
||||
`;
|
||||
return rows.map((r) => r.user_id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return active missions for a single user whose `nextRunAt` has passed.
|
||||
* RLS-scoped via `withUser` — defense-in-depth against a query wandering
|
||||
* outside its user.
|
||||
*/
|
||||
async function listDueMissionsForUser(
|
||||
sql: Sql,
|
||||
userId: string,
|
||||
now: string
|
||||
): Promise<ServerMission[]> {
|
||||
const rows = await withUser(
|
||||
sql,
|
||||
userId,
|
||||
async (tx) =>
|
||||
tx<ChangeRow[]>`
|
||||
SELECT table_name, record_id, user_id, op, data, field_timestamps, created_at
|
||||
FROM sync_changes
|
||||
WHERE app_id = 'ai' AND table_name = 'aiMissions' AND user_id = ${userId}
|
||||
ORDER BY created_at ASC
|
||||
`
|
||||
);
|
||||
return mergeAndFilter(rows as ChangeRow[], userId, now);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all currently-active missions whose `nextRunAt` has passed,
|
||||
* across every user. Two-phase: discover users (cross-user), then
|
||||
* RLS-scope per user.
|
||||
*
|
||||
* @param now ISO timestamp used as the due-before cutoff.
|
||||
*/
|
||||
export async function listDueMissions(sql: Sql, now: string): Promise<ServerMission[]> {
|
||||
// Pull every event for the ai app across users. For a real deploy
|
||||
// we'd scope per-user or shard; the pre-launch user count makes this
|
||||
// single scan defensible.
|
||||
const rows = await sql<ChangeRow[]>`
|
||||
SELECT table_name, record_id, user_id, op, data, field_timestamps, created_at
|
||||
FROM sync_changes
|
||||
WHERE app_id = 'ai' AND table_name = 'aiMissions'
|
||||
ORDER BY created_at ASC
|
||||
`;
|
||||
const users = await listMissionUsers(sql);
|
||||
const perUser = await Promise.all(users.map((u) => listDueMissionsForUser(sql, u, now)));
|
||||
return perUser.flat();
|
||||
}
|
||||
|
||||
// Replay per record. Map key: userId::recordId (user isolation is kept
|
||||
// even though we fetched across users in one scan — the result goes
|
||||
// back to whichever user owns each row).
|
||||
const merged = new Map<string, { userId: string; record: Record<string, unknown> }>();
|
||||
/**
|
||||
* Merge `sync_changes` rows for ONE user's aiMissions set via field-level
|
||||
* LWW, then filter down to due + active records.
|
||||
*
|
||||
* Pure function — no DB access, no ambient state. Exported for tests.
|
||||
*/
|
||||
export function mergeAndFilter(
|
||||
rows: readonly ChangeRow[],
|
||||
userId: string,
|
||||
now: string
|
||||
): ServerMission[] {
|
||||
const merged = new Map<string, Record<string, unknown>>();
|
||||
|
||||
for (const row of rows) {
|
||||
const key = `${row.user_id}::${row.record_id}`;
|
||||
const entry = merged.get(key);
|
||||
const existing = merged.get(row.record_id);
|
||||
|
||||
if (row.op === 'delete') {
|
||||
merged.delete(key);
|
||||
merged.delete(row.record_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!entry) {
|
||||
if (!existing) {
|
||||
if (row.data) {
|
||||
merged.set(key, { userId: row.user_id, record: { id: row.record_id, ...row.data } });
|
||||
merged.set(row.record_id, { id: row.record_id, ...row.data });
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Field-level LWW: newer timestamps overwrite.
|
||||
const prevFT = (entry.record.__fieldTimestamps as Record<string, string> | undefined) ?? {};
|
||||
const prevFT = (existing.__fieldTimestamps as Record<string, string> | undefined) ?? {};
|
||||
const nextFT = { ...prevFT };
|
||||
if (row.data) {
|
||||
for (const [k, v] of Object.entries(row.data)) {
|
||||
const serverTime = row.field_timestamps?.[k] ?? row.created_at.toISOString();
|
||||
const localTime = prevFT[k] ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
entry.record[k] = v;
|
||||
existing[k] = v;
|
||||
nextFT[k] = serverTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
entry.record.__fieldTimestamps = nextFT;
|
||||
existing.__fieldTimestamps = nextFT;
|
||||
}
|
||||
|
||||
const missions: ServerMission[] = [];
|
||||
for (const { userId, record } of merged.values()) {
|
||||
for (const record of merged.values()) {
|
||||
const state = record.state as ServerMission['state'];
|
||||
const nextRunAt = record.nextRunAt as string | undefined;
|
||||
const deletedAt = record.deletedAt as string | undefined;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue