feat: add Ollama memory optimization, LLM metrics, and chat streaming

Three improvements to the unified LLM infrastructure:

1. Ollama memory optimization (scripts/mac-mini/configure-ollama.sh):
   - OLLAMA_KEEP_ALIVE=5m → models unload after 5min idle (saves 3-16GB RAM)
   - OLLAMA_NUM_PARALLEL=1 → predictable memory usage
   - OLLAMA_MAX_LOADED_MODELS=1 → max 1 model in RAM at a time

2. Request-level metrics in @manacore/shared-llm:
   - LlmRequestMetrics interface (model, latency, tokens, fallback detection)
   - LlmMetricsCollector class with summary stats (for health endpoints)
   - Optional onMetrics callback in LlmModuleOptions
   - Automatic metrics emission in chatMessages() (success + error)

3. Chat streaming (token-by-token SSE):
   - Backend: POST /chat/completions/stream SSE endpoint
   - OllamaService.createStreamingCompletion() via llm.chatStreamMessages()
   - ChatService.createStreamingCompletion() with upfront credit consumption
   - Web: chatApi.createStreamingCompletion() SSE consumer
   - Chat store: sendMessage() now streams tokens into assistant message
   - UI updates reactively as each token arrives

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-03-24 09:41:33 +01:00
parent ecda4535d8
commit 56ffcbac39
13 changed files with 462 additions and 29 deletions

View file

@ -1,4 +1,5 @@
import { Body, Controller, Get, Post, UseGuards } from '@nestjs/common';
import { Body, Controller, Get, Post, Res, UseGuards } from '@nestjs/common';
import type { Response } from 'express';
import { isOk } from '@manacore/shared-errors';
import { ChatService } from './chat.service';
import { ChatCompletionDto } from './dto/chat-completion.dto';
@ -24,9 +25,33 @@ export class ChatController {
const result = await this.chatService.createCompletion(dto, user.userId);
if (!isOk(result)) {
throw result.error; // Caught by AppExceptionFilter
throw result.error;
}
return result.value;
}
@Post('completions/stream')
async createStreamingCompletion(
@Body() dto: ChatCompletionDto,
@CurrentUser() user: CurrentUserData,
@Res() res: Response
): Promise<void> {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
try {
for await (const token of this.chatService.createStreamingCompletion(dto, user.userId)) {
res.write(`data: ${JSON.stringify({ token })}\n\n`);
}
res.write('data: [DONE]\n\n');
} catch (error) {
const message = error instanceof Error ? error.message : 'Stream failed';
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
} finally {
res.end();
}
}
}

View file

@ -159,6 +159,68 @@ export class ChatService {
return params?.model || model.provider;
}
/**
* Create a streaming completion. Yields text tokens as they arrive.
* Credits are consumed upfront (estimated cost) since we don't know final token count.
*/
async *createStreamingCompletion(dto: ChatCompletionDto, userId?: string): AsyncIterable<string> {
const model = await this.getModelById(dto.modelId);
if (!model) {
throw new Error(`Model ${dto.modelId} not found`);
}
// Consume credits upfront for streaming
if (userId) {
const creditOperation = this.getCreditOperationForModel(model);
const creditCost = CREDIT_COSTS[creditOperation];
const validation = await this.creditClient.validateCredits(
userId,
creditOperation,
creditCost
);
if (!validation.hasCredits) {
throw new Error(
`Insufficient credits: need ${creditCost}, have ${validation.availableCredits}`
);
}
await this.creditClient.consumeCredits(
userId,
creditOperation,
creditCost,
`Chat stream with ${this.getModelDisplayName(model)}`,
{ modelId: dto.modelId, provider: model.provider, streaming: true }
);
}
const params = model.parameters as {
model?: string;
temperature?: number;
max_tokens?: number;
} | null;
const modelName = params?.model || 'gemma3:4b';
const prefixedModel =
model.provider === 'openrouter'
? modelName.includes('/')
? `openrouter/${modelName}`
: modelName
: modelName;
const temperature = dto.temperature ?? params?.temperature ?? 0.7;
const maxTokens = dto.maxTokens ?? params?.max_tokens ?? 4096;
yield* this.ollamaService.createStreamingCompletion(
prefixedModel,
dto.messages.map((msg) => ({
role: msg.role as 'system' | 'user' | 'assistant',
content: msg.content,
})),
temperature,
maxTokens
);
}
private async createOllamaCompletion(
model: Model,
dto: ChatCompletionDto

View file

@ -89,6 +89,22 @@ export class OllamaService {
}
}
async *createStreamingCompletion(
modelName: string,
messages: ChatMessage[],
temperature?: number,
maxTokens?: number
): AsyncIterable<string> {
const normalizedModel = modelName.includes('/') ? modelName : `ollama/${modelName}`;
this.logger.log(`Streaming request to mana-llm model: ${normalizedModel}`);
yield* this.llm.chatStreamMessages(messages, {
model: normalizedModel,
temperature,
maxTokens,
});
}
async listModels(): Promise<string[]> {
try {
const models = await this.llm.listModels();

View file

@ -595,4 +595,68 @@ export const chatApi = {
}
return data;
},
/**
* Create a streaming completion. Returns an async generator of text tokens.
* Uses Server-Sent Events (SSE) for real-time token delivery.
*/
async *createStreamingCompletion(options: {
messages: ChatMessage[];
modelId: string;
temperature?: number;
maxTokens?: number;
}): AsyncGenerator<string> {
const authToken = await authStore.getValidToken();
if (!authToken) throw new Error('No authentication token');
const response = await fetch(`${API_BASE}/api/v1/chat/completions/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${authToken}`,
},
body: JSON.stringify(options),
});
if (!response.ok) {
throw new Error(`Stream error: ${response.status}`);
}
if (!response.body) {
throw new Error('No response body for stream');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith('data: ')) continue;
const data = trimmed.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
if (parsed.error) throw new Error(parsed.error);
if (parsed.token) yield parsed.token;
} catch (e) {
if (e instanceof Error && e.message !== data) throw e;
}
}
}
} finally {
reader.releaseLock();
}
},
};

View file

@ -28,7 +28,7 @@ export const chatService = {
},
/**
* Send chat completion request
* Send chat completion request (non-streaming)
*/
async createCompletion(request: ChatCompletionRequest): Promise<ChatCompletionResponse | null> {
return chatApi.createCompletion({
@ -38,4 +38,17 @@ export const chatService = {
maxTokens: request.maxTokens ?? 1000,
});
},
/**
* Send streaming chat completion request.
* Returns an async generator that yields text tokens as they arrive.
*/
async *createStreamingCompletion(request: ChatCompletionRequest): AsyncGenerator<string> {
yield* chatApi.createStreamingCompletion({
messages: request.messages,
modelId: request.modelId,
temperature: request.temperature ?? 0.7,
maxTokens: request.maxTokens ?? 1000,
});
},
};

View file

@ -80,36 +80,53 @@ export const chatStore = {
};
messages = [...messages, userMessage];
// Add placeholder assistant message for streaming
const assistantId = `temp-${++messageCounter}`;
const assistantMessage: Message = {
id: assistantId,
conversationId: '',
sender: 'assistant',
messageText: '',
createdAt: new Date().toISOString(),
};
messages = [...messages, assistantMessage];
try {
// Build chat messages for API
const chatMessages: ChatMessage[] = messages.map((m) => ({
role: m.sender === 'user' ? 'user' : 'assistant',
content: m.messageText,
}));
const chatMessages: ChatMessage[] = messages
.filter((m) => m.id !== assistantId)
.map((m) => ({
role: m.sender === 'user' ? 'user' : 'assistant',
content: m.messageText,
}));
const request: ChatCompletionRequest = {
messages: chatMessages,
modelId: selectedModelId,
};
const response = await chatService.createCompletion(request);
// Stream tokens into the assistant message
let fullContent = '';
for await (const token of chatService.createStreamingCompletion(request)) {
fullContent += token;
// Update the assistant message reactively
messages = messages.map((m) =>
m.id === assistantId ? { ...m, messageText: fullContent } : m
);
}
if (response) {
// Add assistant message
const assistantMessage: Message = {
id: `temp-${++messageCounter}`,
conversationId: '',
sender: 'assistant',
messageText: response.content,
createdAt: new Date().toISOString(),
};
messages = [...messages, assistantMessage];
ChatEvents.messageSent(selectedModelId);
} else {
if (!fullContent) {
error = 'Failed to get response';
messages = messages.filter((m) => m.id !== assistantId);
} else {
ChatEvents.messageSent(selectedModelId);
}
} catch (e) {
error = e instanceof Error ? e.message : 'Failed to send message';
// Remove empty assistant message on error
const msg = messages.find((m) => m.id === assistantId);
if (msg && !msg.messageText) {
messages = messages.filter((m) => m.id !== assistantId);
}
} finally {
isSending = false;
}