feat(research): Phase 3b openai-deep-research async + BYO-keys CRUD & UI

Two backlog items landed in one commit because an earlier amend in a
parallel terminal dropped the initial Phase 3b commit and the BYO-keys
work was blocked on the same wiring.

openai-deep-research (async):
- New research.async_jobs table persists the OpenAI response.id, query,
  reservation, and cached result/error.
- POST /v1/research/async reserves credits, submits to the Responses API
  with background=true, returns a taskId. Submit failure refunds.
- GET /v1/research/async/:taskId polls upstream, commits the reservation
  on completion, refunds on failure, short-circuits for terminal states.
- GET /v1/research/async lists the user's async tasks.

BYO-keys:
- research.provider_configs CRUD at /v1/provider-configs. Keys are masked
  (••••last4) on read so the raw secret never re-transits to the browser.
  Currently stored plaintext with a TODO for AES-GCM-256 via the shared
  KEK — single call site in storage/configs.ts.decryptKey().
- New frontend route /research-lab/keys lets the user paste a key per
  provider, toggle enabled, and set daily/monthly credit budgets.
- ListView grew a 🔑 link in the header.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-17 15:43:12 +02:00
parent 10bdd64efb
commit 7d120225dc
9 changed files with 1032 additions and 3 deletions

View file

@ -129,3 +129,39 @@ export type EvalResult = typeof evalResults.$inferSelect;
export type NewEvalResult = typeof evalResults.$inferInsert;
export type ProviderConfig = typeof providerConfigs.$inferSelect;
export type ProviderStat = typeof providerStats.$inferSelect;
export const asyncJobStatusEnum = pgEnum('research_async_status', [
'queued',
'running',
'completed',
'failed',
'cancelled',
]);
/** Long-running research tasks (openai-deep-research). User submits, polls. */
export const asyncJobs = researchSchema.table(
'async_jobs',
{
id: uuid('id').primaryKey().defaultRandom(),
userId: text('user_id').notNull(),
providerId: text('provider_id').notNull(),
externalId: text('external_id'),
status: asyncJobStatusEnum('status').notNull().default('queued'),
query: text('query').notNull(),
options: jsonb('options'),
reservationId: text('reservation_id'),
costCredits: integer('cost_credits').notNull().default(0),
result: jsonb('result'),
errorMessage: text('error_message'),
runId: uuid('run_id').references(() => evalRuns.id, { onDelete: 'set null' }),
createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(),
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
},
(t) => ({
userIdx: index('async_jobs_user_idx').on(t.userId, t.createdAt),
statusIdx: index('async_jobs_status_idx').on(t.status),
})
);
export type AsyncJob = typeof asyncJobs.$inferSelect;
export type NewAsyncJob = typeof asyncJobs.$inferInsert;

View file

@ -21,9 +21,11 @@ import { createExtractRoutes } from './routes/extract';
import { createResearchRoutes } from './routes/research';
import { createProvidersRoutes } from './routes/providers';
import { createRunsRoutes } from './routes/runs';
import { createProviderConfigRoutes } from './routes/provider-configs';
import { buildRegistry } from './providers/registry';
import { RunStorage } from './storage/runs';
import { ConfigStorage } from './storage/configs';
import { AsyncJobStorage } from './storage/async-jobs';
import { CreditsClient } from './clients/mana-credits';
import { ManaSearchClient } from './clients/mana-search';
import { ManaLlmClient } from './clients/mana-llm';
@ -45,6 +47,7 @@ const credits = new CreditsClient({
const runStorage = new RunStorage(db);
const configStorage = new ConfigStorage(db);
const asyncStorage = new AsyncJobStorage(db);
const registry = buildRegistry({ manaSearch });
const executorDeps = {
@ -86,11 +89,17 @@ app.use('/api/v1/extract/*', jwtAuth(config.manaAuthUrl));
app.route('/api/v1/extract', createExtractRoutes(registry, runStorage, executorDeps, config));
app.use('/api/v1/research/*', jwtAuth(config.manaAuthUrl));
app.route('/api/v1/research', createResearchRoutes(registry, runStorage, executorDeps, config));
app.route(
'/api/v1/research',
createResearchRoutes(registry, runStorage, executorDeps, config, asyncStorage, credits)
);
app.use('/api/v1/runs/*', jwtAuth(config.manaAuthUrl));
app.route('/api/v1/runs', createRunsRoutes(runStorage));
app.use('/api/v1/provider-configs/*', jwtAuth(config.manaAuthUrl));
app.route('/api/v1/provider-configs', createProviderConfigRoutes(db));
// Service-to-service (X-Service-Key auth) — wired up in Phase 3 when mana-ai migrates
app.use('/api/v1/internal/*', serviceAuth(config.serviceKey));
app.get('/api/v1/internal/health', (c) => c.json({ ok: true }));

View file

@ -0,0 +1,169 @@
/**
* OpenAI Deep Research async via the Responses API with `background: true`.
* Docs: https://platform.openai.com/docs/guides/deep-research
*
* Two-phase flow:
* submit() POST /v1/responses returns { id, status: 'queued' | 'in_progress' }
* poll(id) GET /v1/responses/{id} eventual { status: 'completed', output: [...] }
*
* Results typically arrive in 530 minutes. We persist the OpenAI response.id
* in research.async_jobs and expose POST /v1/research/async + GET /:taskId.
*/
import type { AgentAnswer, Citation } from '@mana/shared-research';
import { ProviderError, ProviderNotConfiguredError } from '../../lib/errors';
const DEFAULT_MODEL = 'o3-deep-research';
export interface DeepResearchSubmitResult {
externalId: string;
status: 'queued' | 'running';
}
export interface DeepResearchPollResult {
status: 'queued' | 'running' | 'completed' | 'failed';
answer?: AgentAnswer;
error?: string;
}
interface OpenAISubmitResponse {
id: string;
status?: 'queued' | 'in_progress' | 'completed' | 'failed' | 'cancelled' | 'incomplete';
error?: { message?: string };
}
interface OpenAIPollResponse extends OpenAISubmitResponse {
output?: Array<{
type: string;
role?: string;
content?: Array<{
type: string;
text?: string;
annotations?: Array<{
type: string;
url?: string;
title?: string;
}>;
}>;
}>;
output_text?: string;
usage?: {
input_tokens?: number;
output_tokens?: number;
};
}
export async function submitDeepResearch(
query: string,
options: { model?: string; maxTokens?: number; systemPrompt?: string } = {},
apiKey: string | null,
signal?: AbortSignal
): Promise<DeepResearchSubmitResult> {
if (!apiKey) throw new ProviderNotConfiguredError('openai-deep-research');
const model = options.model ?? DEFAULT_MODEL;
const body: Record<string, unknown> = {
model,
input: options.systemPrompt
? [
{ role: 'system', content: options.systemPrompt },
{ role: 'user', content: query },
]
: query,
tools: [{ type: 'web_search_preview' }],
background: true,
};
if (options.maxTokens) body.max_output_tokens = options.maxTokens;
const res = await fetch('https://api.openai.com/v1/responses', {
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
signal,
});
if (!res.ok) {
const errBody = await res.text().catch(() => '');
throw new ProviderError(
'openai-deep-research',
`submit HTTP ${res.status} ${errBody.slice(0, 300)}`
);
}
const data = (await res.json()) as OpenAISubmitResponse;
if (!data.id) throw new ProviderError('openai-deep-research', 'submit: missing response id');
return {
externalId: data.id,
status: data.status === 'in_progress' ? 'running' : 'queued',
};
}
export async function pollDeepResearch(
externalId: string,
apiKey: string | null,
signal?: AbortSignal
): Promise<DeepResearchPollResult> {
if (!apiKey) throw new ProviderNotConfiguredError('openai-deep-research');
const res = await fetch(`https://api.openai.com/v1/responses/${externalId}`, {
headers: { Authorization: `Bearer ${apiKey}` },
signal,
});
if (!res.ok) {
const errBody = await res.text().catch(() => '');
throw new ProviderError(
'openai-deep-research',
`poll HTTP ${res.status} ${errBody.slice(0, 300)}`
);
}
const data = (await res.json()) as OpenAIPollResponse;
if (data.status === 'queued') return { status: 'queued' };
if (data.status === 'in_progress') return { status: 'running' };
if (data.status === 'failed' || data.status === 'incomplete' || data.status === 'cancelled') {
return { status: 'failed', error: data.error?.message ?? data.status };
}
// completed
const textParts: string[] = [];
const citations = new Map<string, Citation>();
if (data.output_text) textParts.push(data.output_text);
for (const item of data.output ?? []) {
if (item.type !== 'message') continue;
for (const content of item.content ?? []) {
if (content.type === 'output_text' && content.text) {
if (!data.output_text) textParts.push(content.text);
for (const ann of content.annotations ?? []) {
if (ann.url && !citations.has(ann.url)) {
citations.set(ann.url, { url: ann.url, title: ann.title ?? ann.url });
}
}
}
}
}
const tokenUsage = data.usage
? {
input: data.usage.input_tokens ?? 0,
output: data.usage.output_tokens ?? 0,
}
: undefined;
const answer: AgentAnswer = {
query: '',
answer: textParts.join('\n\n'),
citations: [...citations.values()],
tokenUsage,
providerRaw: data,
};
return { status: 'completed', answer };
}

View file

@ -0,0 +1,143 @@
/**
* /v1/provider-configs per-user BYO-key + budget CRUD.
*
* Keys are stored in research.provider_configs.apiKeyEncrypted. Phase 4
* persists plaintext with a TODO for AES-GCM-256 encryption (see
* src/storage/configs.ts `decryptKey` same plaintext path on read).
* A separate commit will wire in the shared-crypto KEK pattern.
*/
import { Hono } from 'hono';
import { and, eq } from 'drizzle-orm';
import { z } from 'zod';
import {
AGENT_PROVIDER_IDS,
EXTRACT_PROVIDER_IDS,
SEARCH_PROVIDER_IDS,
} from '@mana/shared-research';
import type { HonoEnv } from '../lib/hono-env';
import type { Database } from '../db/connection';
import { providerConfigs } from '../db/schema/research';
import { NotFoundError } from '../lib/errors';
const ALL_PROVIDER_IDS = [
...SEARCH_PROVIDER_IDS,
...EXTRACT_PROVIDER_IDS,
...AGENT_PROVIDER_IDS,
] as const;
const upsertSchema = z.object({
providerId: z.enum(ALL_PROVIDER_IDS),
apiKey: z.string().min(8).max(512).optional(),
enabled: z.boolean().optional(),
dailyBudgetCredits: z.number().int().nonnegative().nullable().optional(),
monthlyBudgetCredits: z.number().int().nonnegative().nullable().optional(),
});
/**
* Mask a stored API key so the UI can render "••••last4" without sending the
* raw secret to the browser on subsequent loads.
*/
function maskKey(raw: string | null): string | null {
if (!raw) return null;
if (raw.length <= 8) return '••••';
return `••••${raw.slice(-4)}`;
}
export function createProviderConfigRoutes(db: Database) {
return new Hono<HonoEnv>()
.get('/', async (c) => {
const user = c.get('user');
const rows = await db
.select()
.from(providerConfigs)
.where(eq(providerConfigs.userId, user.userId));
return c.json({
configs: rows.map((r) => ({
id: r.id,
providerId: r.providerId,
enabled: r.enabled,
dailyBudgetCredits: r.dailyBudgetCredits,
monthlyBudgetCredits: r.monthlyBudgetCredits,
maskedKey: maskKey(r.apiKeyEncrypted),
hasKey: !!r.apiKeyEncrypted,
updatedAt: r.updatedAt,
})),
});
})
.post('/', async (c) => {
const user = c.get('user');
const body = upsertSchema.parse(await c.req.json());
const [existing] = await db
.select()
.from(providerConfigs)
.where(
and(
eq(providerConfigs.userId, user.userId),
eq(providerConfigs.providerId, body.providerId)
)
)
.limit(1);
if (existing) {
const patch: Partial<typeof providerConfigs.$inferInsert> = {
updatedAt: new Date(),
};
if (body.apiKey !== undefined) patch.apiKeyEncrypted = body.apiKey;
if (body.enabled !== undefined) patch.enabled = body.enabled;
if (body.dailyBudgetCredits !== undefined)
patch.dailyBudgetCredits = body.dailyBudgetCredits;
if (body.monthlyBudgetCredits !== undefined)
patch.monthlyBudgetCredits = body.monthlyBudgetCredits;
const [updated] = await db
.update(providerConfigs)
.set(patch)
.where(eq(providerConfigs.id, existing.id))
.returning();
return c.json({
id: updated.id,
providerId: updated.providerId,
enabled: updated.enabled,
dailyBudgetCredits: updated.dailyBudgetCredits,
monthlyBudgetCredits: updated.monthlyBudgetCredits,
maskedKey: maskKey(updated.apiKeyEncrypted),
hasKey: !!updated.apiKeyEncrypted,
});
}
const [created] = await db
.insert(providerConfigs)
.values({
userId: user.userId,
providerId: body.providerId,
apiKeyEncrypted: body.apiKey ?? null,
enabled: body.enabled ?? true,
dailyBudgetCredits: body.dailyBudgetCredits ?? null,
monthlyBudgetCredits: body.monthlyBudgetCredits ?? null,
})
.returning();
return c.json({
id: created.id,
providerId: created.providerId,
enabled: created.enabled,
dailyBudgetCredits: created.dailyBudgetCredits,
monthlyBudgetCredits: created.monthlyBudgetCredits,
maskedKey: maskKey(created.apiKeyEncrypted),
hasKey: !!created.apiKeyEncrypted,
});
})
.delete('/:providerId', async (c) => {
const user = c.get('user');
const providerId = c.req.param('providerId');
const deleted = await db
.delete(providerConfigs)
.where(
and(eq(providerConfigs.userId, user.userId), eq(providerConfigs.providerId, providerId))
)
.returning();
if (deleted.length === 0) throw new NotFoundError('Config not found');
return c.json({ success: true });
});
}

View file

@ -13,9 +13,13 @@ import type { HonoEnv } from '../lib/hono-env';
import type { ProviderRegistry } from '../providers/registry';
import { getAgent } from '../providers/registry';
import type { RunStorage } from '../storage/runs';
import { BadRequestError } from '../lib/errors';
import type { AsyncJobStorage } from '../storage/async-jobs';
import type { CreditsClient } from '../clients/mana-credits';
import { BadRequestError, NotFoundError } from '../lib/errors';
import type { Config } from '../config';
import { pickAgent } from '../router/auto-route';
import { priceFor } from '../lib/pricing';
import { pollDeepResearch, submitDeepResearch } from '../providers/agent/openai-deep-research';
const MAX_COMPARE_AGENTS = 4;
@ -31,12 +35,21 @@ const compareBodySchema = z.object({
options: agentOptionsSchema.optional(),
});
const asyncSubmitBodySchema = z.object({
query: z.string().min(1).max(4000),
options: agentOptionsSchema.optional(),
});
export function createResearchRoutes(
registry: ProviderRegistry,
storage: RunStorage,
deps: ExecutorDeps,
config: Config
config: Config,
asyncStorage: AsyncJobStorage,
credits: CreditsClient
) {
const PROVIDER_ID = 'openai-deep-research' as const;
return new Hono<HonoEnv>()
.post('/', async (c) => {
const user = c.get('user');
@ -160,5 +173,154 @@ export function createResearchRoutes(
resultId: resultIds[i],
})),
});
})
.post('/async', async (c) => {
const user = c.get('user');
const body = asyncSubmitBodySchema.parse(await c.req.json());
const apiKey = config.providerKeys.openai;
if (!apiKey) {
throw new BadRequestError(
'openai-deep-research requires OPENAI_API_KEY on the server or via BYO key'
);
}
const price = priceFor(PROVIDER_ID, 'research');
const reservation = await credits.reserve(
user.userId,
price,
`research:${PROVIDER_ID}:submit`
);
try {
const submission = await submitDeepResearch(body.query, body.options ?? {}, apiKey);
const job = await asyncStorage.create({
userId: user.userId,
providerId: PROVIDER_ID,
externalId: submission.externalId,
status: submission.status,
query: body.query,
options: body.options ?? {},
reservationId: reservation.reservationId,
costCredits: price,
});
return c.json({
taskId: job.id,
status: job.status,
providerId: PROVIDER_ID,
costCredits: price,
});
} catch (err) {
await credits.refund(reservation.reservationId).catch(() => {});
throw err;
}
})
.get('/async/:id', async (c) => {
const user = c.get('user');
const job = await asyncStorage.get(c.req.param('id'), user.userId);
if (!job) throw new NotFoundError('Task not found');
// Short-circuit terminal states.
if (job.status === 'completed' || job.status === 'failed' || job.status === 'cancelled') {
return c.json({
taskId: job.id,
status: job.status,
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
result: job.result,
error: job.errorMessage,
});
}
// Poll upstream.
if (!job.externalId) {
throw new BadRequestError('Task has no external id yet');
}
const apiKey = config.providerKeys.openai;
if (!apiKey) {
return c.json({
taskId: job.id,
status: job.status,
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
error: 'OPENAI_API_KEY is no longer configured; cannot poll',
});
}
const poll = await pollDeepResearch(job.externalId, apiKey).catch((err: Error) => ({
status: 'failed' as const,
error: err.message,
}));
if (poll.status === 'completed' && poll.answer) {
const answer = { ...poll.answer, query: job.query };
await asyncStorage.updateStatus(job.id, {
status: 'completed',
result: { answer },
});
if (job.reservationId) {
await credits
.commit(job.reservationId, `research ${job.providerId}`)
.catch((err) => console.warn('[async] commit failed:', err));
}
return c.json({
taskId: job.id,
status: 'completed',
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: new Date(),
result: { answer },
});
}
if (poll.status === 'failed') {
await asyncStorage.updateStatus(job.id, {
status: 'failed',
errorMessage: poll.error ?? 'unknown',
});
if (job.reservationId) {
await credits
.refund(job.reservationId)
.catch((err) => console.warn('[async] refund failed:', err));
}
return c.json({
taskId: job.id,
status: 'failed',
query: job.query,
providerId: job.providerId,
costCredits: 0,
createdAt: job.createdAt,
updatedAt: new Date(),
error: poll.error,
});
}
// queued / running — update touch and return current
if (poll.status !== job.status) {
await asyncStorage.updateStatus(job.id, { status: poll.status });
}
return c.json({
taskId: job.id,
status: poll.status,
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: new Date(),
});
})
.get('/async', async (c) => {
const user = c.get('user');
const limit = Math.min(parseInt(c.req.query('limit') ?? '25', 10), 100);
const jobs = await asyncStorage.list(user.userId, limit);
return c.json({ tasks: jobs });
});
}

View file

@ -0,0 +1,46 @@
/**
* Persistence for long-running research tasks (openai-deep-research).
* Minimal CRUD + a helper to mark jobs done/failed with credit commit/refund.
*/
import { and, desc, eq } from 'drizzle-orm';
import type { Database } from '../db/connection';
import { asyncJobs } from '../db/schema/research';
import type { AsyncJob, NewAsyncJob } from '../db/schema/research';
export class AsyncJobStorage {
constructor(private db: Database) {}
async create(input: NewAsyncJob): Promise<AsyncJob> {
const [row] = await this.db.insert(asyncJobs).values(input).returning();
return row;
}
async get(id: string, userId: string): Promise<AsyncJob | null> {
const [row] = await this.db
.select()
.from(asyncJobs)
.where(and(eq(asyncJobs.id, id), eq(asyncJobs.userId, userId)))
.limit(1);
return row ?? null;
}
async list(userId: string, limit = 25): Promise<AsyncJob[]> {
return this.db
.select()
.from(asyncJobs)
.where(eq(asyncJobs.userId, userId))
.orderBy(desc(asyncJobs.createdAt))
.limit(limit);
}
async updateStatus(
id: string,
patch: Partial<Pick<AsyncJob, 'status' | 'result' | 'errorMessage' | 'externalId'>>
): Promise<void> {
await this.db
.update(asyncJobs)
.set({ ...patch, updatedAt: new Date() })
.where(eq(asyncJobs.id, id));
}
}