managarten/services/mana-research/src/routes/internal-research.ts
Till JS f10a95e842 feat(mana-research): add Gemini 3.1 Pro Deep Research async providers
- New providers gemini-deep-research + gemini-deep-research-max on the
  Interactions API (preview-04-2026). Submit/poll split, tier parameter
  selects between standard (~minutes, $1–3) and max (up to 60 min, $3–7).
- Parser matches the real response shape: flat `outputs` array of
  thought|text|image items, url_citation annotations without title,
  `usage.total_input_tokens` / `total_output_tokens`.
- Route generalisation: /v1/research/async accepts `provider` with
  default 'openai-deep-research' (backward compatible) and dispatches
  to the right submit/poll pair.
- New internal service-to-service endpoint /v1/internal/research/async
  gated by X-Service-Key + X-User-Id for credit accounting. Enables
  mana-ai to drive deep-research jobs on the mission owner's wallet
  without requiring a user JWT.
- Pricing: 300 credits (standard) / 1500 credits (max). Conservative
  markup over the ~$3/$7 ceiling so the first runs can't surprise us.
- Docs: AGENT_PROVIDER_IDS + pricing + env map + auto-router stay in
  sync; CLAUDE.md Phase 3b now current; API_KEYS.md references the
  new providers under GOOGLE_GENAI_API_KEY.

Verified with a real smoke test against the Gemini API: submit + poll
both succeed, completed response parsed cleanly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:55:30 +02:00

242 lines
7.5 KiB
TypeScript

/**
* Internal service-to-service research routes.
*
* POST /api/v1/internal/research/async — submit an async research job for a user
* GET /api/v1/internal/research/async/:id — poll a job (scoped to the X-User-Id header)
*
* Callers (mana-ai today) authenticate via `X-Service-Key` and pass the
* target user id in `X-User-Id`. Credits are reserved against that user
* exactly like the user-facing path; the difference is only in how the
* caller is authorised.
*
* Keep the logic here a thin wrapper over the same submit/poll helpers
* the user-facing /async route uses — divergence would be silent and
* surprising for anyone debugging a mana-ai request against the user API
* later.
*/
import { Hono } from 'hono';
import { z } from 'zod';
import { agentOptionsSchema } from '@mana/shared-research';
import type { HonoEnv } from '../lib/hono-env';
import type { AsyncJobStorage } from '../storage/async-jobs';
import type { CreditsClient } from '../clients/mana-credits';
import { BadRequestError, NotFoundError, UnauthorizedError } from '../lib/errors';
import type { Config } from '../config';
import { priceFor } from '../lib/pricing';
import { pollDeepResearch, submitDeepResearch } from '../providers/agent/openai-deep-research';
import {
pollGeminiDeepResearch,
submitGeminiDeepResearch,
} from '../providers/agent/gemini-deep-research';
const ASYNC_PROVIDER_IDS = [
'openai-deep-research',
'gemini-deep-research',
'gemini-deep-research-max',
] as const;
type AsyncProviderId = (typeof ASYNC_PROVIDER_IDS)[number];
const submitBodySchema = z.object({
query: z.string().min(1).max(4000),
provider: z.enum(ASYNC_PROVIDER_IDS),
options: agentOptionsSchema.optional(),
});
interface AsyncDispatch {
apiKey: string | null;
missingKeyMessage: string;
submit(
query: string,
options: { systemPrompt?: string; maxTokens?: number; model?: string },
apiKey: string,
signal?: AbortSignal
): Promise<{ externalId: string; status: 'queued' | 'running' }>;
poll(
externalId: string,
apiKey: string,
signal?: AbortSignal
): Promise<{
status: 'queued' | 'running' | 'completed' | 'failed';
answer?: import('@mana/shared-research').AgentAnswer;
error?: string;
}>;
}
function dispatchAsync(providerId: AsyncProviderId, config: Config): AsyncDispatch {
switch (providerId) {
case 'openai-deep-research':
return {
apiKey: config.providerKeys.openai ?? null,
missingKeyMessage: 'openai-deep-research requires OPENAI_API_KEY',
submit: (q, o, k, s) => submitDeepResearch(q, o, k, s),
poll: (id, k, s) => pollDeepResearch(id, k, s),
};
case 'gemini-deep-research':
return {
apiKey: config.providerKeys.googleGenai ?? null,
missingKeyMessage: 'gemini-deep-research requires GOOGLE_GENAI_API_KEY',
submit: (q, o, k, s) => submitGeminiDeepResearch('standard', q, o, k, s),
poll: (id, k, s) => pollGeminiDeepResearch('standard', id, k, s),
};
case 'gemini-deep-research-max':
return {
apiKey: config.providerKeys.googleGenai ?? null,
missingKeyMessage: 'gemini-deep-research-max requires GOOGLE_GENAI_API_KEY',
submit: (q, o, k, s) => submitGeminiDeepResearch('max', q, o, k, s),
poll: (id, k, s) => pollGeminiDeepResearch('max', id, k, s),
};
}
}
function requireUserId(userId: string | undefined): string {
if (!userId) {
throw new UnauthorizedError('X-User-Id header is required on internal research routes');
}
return userId;
}
export function createInternalResearchRoutes(
config: Config,
asyncStorage: AsyncJobStorage,
credits: CreditsClient
) {
return new Hono<HonoEnv>()
.post('/async', async (c) => {
const userId = requireUserId(c.req.header('X-User-Id'));
const body = submitBodySchema.parse(await c.req.json());
const providerId = body.provider;
const dispatch = dispatchAsync(providerId, config);
if (!dispatch.apiKey) throw new BadRequestError(dispatch.missingKeyMessage);
const price = priceFor(providerId, 'research');
const reservation = await credits.reserve(
userId,
price,
`research:${providerId}:internal-submit`
);
try {
const submission = await dispatch.submit(body.query, body.options ?? {}, dispatch.apiKey);
const job = await asyncStorage.create({
userId,
providerId,
externalId: submission.externalId,
status: submission.status,
query: body.query,
options: body.options ?? {},
reservationId: reservation.reservationId,
costCredits: price,
});
return c.json({
taskId: job.id,
status: job.status,
providerId,
costCredits: price,
});
} catch (err) {
await credits.refund(reservation.reservationId).catch(() => {});
throw err;
}
})
.get('/async/:id', async (c) => {
const userId = requireUserId(c.req.header('X-User-Id'));
const job = await asyncStorage.get(c.req.param('id'), userId);
if (!job) throw new NotFoundError('Task not found');
if (job.status === 'completed' || job.status === 'failed' || job.status === 'cancelled') {
return c.json({
taskId: job.id,
status: job.status,
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
result: job.result,
error: job.errorMessage,
});
}
if (!job.externalId) throw new BadRequestError('Task has no external id yet');
const jobProviderId = job.providerId as AsyncProviderId;
if (!(ASYNC_PROVIDER_IDS as readonly string[]).includes(jobProviderId)) {
throw new BadRequestError(`Unknown async provider on job: ${job.providerId}`);
}
const dispatch = dispatchAsync(jobProviderId, config);
if (!dispatch.apiKey) {
return c.json({
taskId: job.id,
status: job.status,
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
error: `${dispatch.missingKeyMessage}; cannot poll`,
});
}
const poll = await dispatch.poll(job.externalId, dispatch.apiKey).catch((err: Error) => ({
status: 'failed' as const,
error: err.message,
}));
if (poll.status === 'completed' && poll.answer) {
const answer = { ...poll.answer, query: job.query };
await asyncStorage.updateStatus(job.id, { status: 'completed', result: { answer } });
if (job.reservationId) {
await credits
.commit(job.reservationId, `research ${job.providerId}`)
.catch((err) => console.warn('[internal] commit failed:', err));
}
return c.json({
taskId: job.id,
status: 'completed',
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: new Date(),
result: { answer },
});
}
if (poll.status === 'failed') {
await asyncStorage.updateStatus(job.id, {
status: 'failed',
errorMessage: poll.error ?? 'unknown',
});
if (job.reservationId) {
await credits
.refund(job.reservationId)
.catch((err) => console.warn('[internal] refund failed:', err));
}
return c.json({
taskId: job.id,
status: 'failed',
query: job.query,
providerId: job.providerId,
costCredits: 0,
createdAt: job.createdAt,
updatedAt: new Date(),
error: poll.error,
});
}
if (poll.status !== job.status) {
await asyncStorage.updateStatus(job.id, { status: poll.status });
}
return c.json({
taskId: job.id,
status: poll.status,
query: job.query,
providerId: job.providerId,
costCredits: job.costCredits,
createdAt: job.createdAt,
updatedAt: new Date(),
});
});
}