mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 20:21:09 +02:00
Complete voice pipeline combining: - STT: Whisper (mana-stt) - LLM: Ollama (Gemma/Qwen) - TTS: Edge TTS (15 German voices) Endpoints: - /voice - Full audio-to-audio pipeline - /chat/audio - Text-to-audio - /tts - Direct TTS - /transcribe - STT only Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
504 lines
16 KiB
Python
504 lines
16 KiB
Python
"""
|
|
Mana Voice Bot - German Voice Assistant
|
|
|
|
Complete voice-to-voice pipeline:
|
|
1. STT: Whisper (mana-stt) - Speech to Text
|
|
2. LLM: Ollama (Gemma/Qwen) - Text Processing
|
|
3. TTS: Edge TTS - Text to Speech
|
|
|
|
Optimized for German language.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
import edge_tts
|
|
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import Response, StreamingResponse
|
|
from pydantic import BaseModel, Field
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
PORT = int(os.getenv("PORT", "3050"))
|
|
STT_URL = os.getenv("STT_URL", "http://localhost:3020")
|
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
|
|
DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gemma3:4b")
|
|
DEFAULT_VOICE = os.getenv("DEFAULT_VOICE", "de-DE-ConradNeural")
|
|
SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", """Du bist ein freundlicher und hilfreicher deutscher Sprachassistent.
|
|
Antworte immer auf Deutsch, kurz und prägnant.
|
|
Halte deine Antworten unter 3 Sätzen, es sei denn, der Nutzer fragt nach mehr Details.""")
|
|
|
|
CORS_ORIGINS = os.getenv(
|
|
"CORS_ORIGINS",
|
|
"https://mana.how,http://localhost:5173,http://localhost:3000",
|
|
).split(",")
|
|
|
|
# German Edge TTS voices
|
|
GERMAN_VOICES = {
|
|
"de-DE-ConradNeural": "Male - Conrad (Professional)",
|
|
"de-DE-KatjaNeural": "Female - Katja (Natural)",
|
|
"de-DE-AmalaNeural": "Female - Amala (Friendly)",
|
|
"de-DE-BerndNeural": "Male - Bernd (Calm)",
|
|
"de-DE-ChristophNeural": "Male - Christoph (News)",
|
|
"de-DE-ElkeNeural": "Female - Elke (Warm)",
|
|
"de-DE-KillianNeural": "Male - Killian (Casual)",
|
|
"de-DE-KlarissaNeural": "Female - Klarissa (Cheerful)",
|
|
"de-DE-KlausNeural": "Male - Klaus (Storyteller)",
|
|
"de-DE-LouisaNeural": "Female - Louisa (Assistant)",
|
|
"de-DE-TanjaNeural": "Female - Tanja (Business)",
|
|
}
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan manager."""
|
|
logger.info(f"Starting Mana Voice Bot on port {PORT}")
|
|
logger.info(f"STT Service: {STT_URL}")
|
|
logger.info(f"Ollama: {OLLAMA_URL}")
|
|
logger.info(f"Default Model: {DEFAULT_MODEL}")
|
|
logger.info(f"Default Voice: {DEFAULT_VOICE}")
|
|
yield
|
|
logger.info("Shutting down Mana Voice Bot")
|
|
|
|
|
|
app = FastAPI(
|
|
title="Mana Voice Bot",
|
|
description="German voice-to-voice assistant using Whisper + Ollama + Edge TTS",
|
|
version="1.0.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=CORS_ORIGINS,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Models
|
|
# ============================================================================
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
"""Text chat request."""
|
|
message: str = Field(..., description="User message")
|
|
model: str = Field(DEFAULT_MODEL, description="Ollama model to use")
|
|
voice: str = Field(DEFAULT_VOICE, description="TTS voice")
|
|
system_prompt: Optional[str] = Field(None, description="Custom system prompt")
|
|
|
|
|
|
class TranscribeResponse(BaseModel):
|
|
"""Transcription response."""
|
|
text: str
|
|
language: str
|
|
duration: float
|
|
|
|
|
|
class ChatResponse(BaseModel):
|
|
"""Chat response with text."""
|
|
user_text: str
|
|
assistant_text: str
|
|
model: str
|
|
processing_time: float
|
|
|
|
|
|
class VoiceResponse(BaseModel):
|
|
"""Voice processing response metadata."""
|
|
user_text: str
|
|
assistant_text: str
|
|
model: str
|
|
voice: str
|
|
stt_time: float
|
|
llm_time: float
|
|
tts_time: float
|
|
total_time: float
|
|
|
|
|
|
# ============================================================================
|
|
# Service Functions
|
|
# ============================================================================
|
|
|
|
|
|
async def transcribe_audio(audio_bytes: bytes, language: str = "de") -> dict:
|
|
"""Transcribe audio using mana-stt (Whisper)."""
|
|
async with aiohttp.ClientSession() as session:
|
|
data = aiohttp.FormData()
|
|
data.add_field("file", audio_bytes, filename="audio.wav", content_type="audio/wav")
|
|
data.add_field("language", language)
|
|
|
|
async with session.post(f"{STT_URL}/transcribe", data=data) as response:
|
|
if response.status != 200:
|
|
error = await response.text()
|
|
raise HTTPException(status_code=500, detail=f"STT error: {error}")
|
|
return await response.json()
|
|
|
|
|
|
async def chat_with_ollama(
|
|
message: str,
|
|
model: str = DEFAULT_MODEL,
|
|
system_prompt: str = SYSTEM_PROMPT,
|
|
) -> str:
|
|
"""Send message to Ollama and get response."""
|
|
async with aiohttp.ClientSession() as session:
|
|
payload = {
|
|
"model": model,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": message},
|
|
],
|
|
"stream": False,
|
|
}
|
|
|
|
async with session.post(
|
|
f"{OLLAMA_URL}/api/chat",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=120),
|
|
) as response:
|
|
if response.status != 200:
|
|
error = await response.text()
|
|
raise HTTPException(status_code=500, detail=f"Ollama error: {error}")
|
|
|
|
result = await response.json()
|
|
return result.get("message", {}).get("content", "")
|
|
|
|
|
|
async def synthesize_speech(text: str, voice: str = DEFAULT_VOICE) -> bytes:
|
|
"""Synthesize speech using Edge TTS."""
|
|
if voice not in GERMAN_VOICES:
|
|
voice = DEFAULT_VOICE
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
|
|
output_path = f.name
|
|
|
|
try:
|
|
communicate = edge_tts.Communicate(text, voice)
|
|
await communicate.save(output_path)
|
|
|
|
with open(output_path, "rb") as f:
|
|
return f.read()
|
|
finally:
|
|
if os.path.exists(output_path):
|
|
os.unlink(output_path)
|
|
|
|
|
|
# ============================================================================
|
|
# Endpoints
|
|
# ============================================================================
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint."""
|
|
# Check STT service
|
|
stt_ok = False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"{STT_URL}/health", timeout=aiohttp.ClientTimeout(total=5)) as r:
|
|
stt_ok = r.status == 200
|
|
except Exception:
|
|
pass
|
|
|
|
# Check Ollama
|
|
ollama_ok = False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"{OLLAMA_URL}/api/tags", timeout=aiohttp.ClientTimeout(total=5)) as r:
|
|
ollama_ok = r.status == 200
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"status": "healthy" if (stt_ok and ollama_ok) else "degraded",
|
|
"services": {
|
|
"stt": "ok" if stt_ok else "unavailable",
|
|
"ollama": "ok" if ollama_ok else "unavailable",
|
|
"tts": "ok", # Edge TTS is always available (cloud API)
|
|
},
|
|
"config": {
|
|
"default_model": DEFAULT_MODEL,
|
|
"default_voice": DEFAULT_VOICE,
|
|
},
|
|
}
|
|
|
|
|
|
@app.get("/voices")
|
|
async def list_voices():
|
|
"""List available German voices."""
|
|
return {
|
|
"voices": GERMAN_VOICES,
|
|
"default": DEFAULT_VOICE,
|
|
}
|
|
|
|
|
|
@app.get("/models")
|
|
async def list_models():
|
|
"""List available Ollama models."""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"{OLLAMA_URL}/api/tags") as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
models = [m["name"] for m in data.get("models", [])]
|
|
return {"models": models, "default": DEFAULT_MODEL}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get models: {e}")
|
|
|
|
return {"models": [], "default": DEFAULT_MODEL, "error": "Could not fetch models"}
|
|
|
|
|
|
@app.post("/transcribe", response_model=TranscribeResponse)
|
|
async def transcribe(
|
|
audio: UploadFile = File(..., description="Audio file to transcribe"),
|
|
language: str = Form("de", description="Language code"),
|
|
):
|
|
"""Transcribe audio to text using Whisper."""
|
|
start = time.time()
|
|
|
|
audio_bytes = await audio.read()
|
|
if len(audio_bytes) == 0:
|
|
raise HTTPException(status_code=400, detail="Audio file is empty")
|
|
|
|
result = await transcribe_audio(audio_bytes, language)
|
|
|
|
return TranscribeResponse(
|
|
text=result.get("text", ""),
|
|
language=result.get("language", language),
|
|
duration=time.time() - start,
|
|
)
|
|
|
|
|
|
@app.post("/chat", response_model=ChatResponse)
|
|
async def chat(request: ChatRequest):
|
|
"""Text chat with the LLM."""
|
|
start = time.time()
|
|
|
|
response_text = await chat_with_ollama(
|
|
message=request.message,
|
|
model=request.model,
|
|
system_prompt=request.system_prompt or SYSTEM_PROMPT,
|
|
)
|
|
|
|
return ChatResponse(
|
|
user_text=request.message,
|
|
assistant_text=response_text,
|
|
model=request.model,
|
|
processing_time=time.time() - start,
|
|
)
|
|
|
|
|
|
@app.post("/chat/audio")
|
|
async def chat_audio(request: ChatRequest):
|
|
"""Text chat with audio response."""
|
|
start = time.time()
|
|
|
|
# Get LLM response
|
|
llm_start = time.time()
|
|
response_text = await chat_with_ollama(
|
|
message=request.message,
|
|
model=request.model,
|
|
system_prompt=request.system_prompt or SYSTEM_PROMPT,
|
|
)
|
|
llm_time = time.time() - llm_start
|
|
|
|
# Synthesize speech
|
|
tts_start = time.time()
|
|
audio_bytes = await synthesize_speech(response_text, request.voice)
|
|
tts_time = time.time() - tts_start
|
|
|
|
return Response(
|
|
content=audio_bytes,
|
|
media_type="audio/mpeg",
|
|
headers={
|
|
"X-User-Text": request.message[:100],
|
|
"X-Assistant-Text": response_text[:200],
|
|
"X-Model": request.model,
|
|
"X-Voice": request.voice,
|
|
"X-LLM-Time": str(round(llm_time, 2)),
|
|
"X-TTS-Time": str(round(tts_time, 2)),
|
|
"X-Total-Time": str(round(time.time() - start, 2)),
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/voice")
|
|
async def voice_to_voice(
|
|
audio: UploadFile = File(..., description="Audio input"),
|
|
model: str = Form(DEFAULT_MODEL, description="Ollama model"),
|
|
voice: str = Form(DEFAULT_VOICE, description="TTS voice"),
|
|
language: str = Form("de", description="Input language"),
|
|
system_prompt: Optional[str] = Form(None, description="Custom system prompt"),
|
|
):
|
|
"""
|
|
Complete voice-to-voice pipeline.
|
|
|
|
1. Transcribe audio input (Whisper)
|
|
2. Process with LLM (Ollama)
|
|
3. Synthesize response (Edge TTS)
|
|
|
|
Returns audio response with metadata headers.
|
|
"""
|
|
total_start = time.time()
|
|
|
|
# Read audio
|
|
audio_bytes = await audio.read()
|
|
if len(audio_bytes) == 0:
|
|
raise HTTPException(status_code=400, detail="Audio file is empty")
|
|
|
|
# 1. Speech-to-Text
|
|
stt_start = time.time()
|
|
try:
|
|
stt_result = await transcribe_audio(audio_bytes, language)
|
|
user_text = stt_result.get("text", "").strip()
|
|
except Exception as e:
|
|
logger.error(f"STT failed: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Transcription failed: {e}")
|
|
stt_time = time.time() - stt_start
|
|
|
|
if not user_text:
|
|
raise HTTPException(status_code=400, detail="Could not transcribe audio - no speech detected")
|
|
|
|
logger.info(f"Transcribed: {user_text}")
|
|
|
|
# 2. LLM Processing
|
|
llm_start = time.time()
|
|
try:
|
|
assistant_text = await chat_with_ollama(
|
|
message=user_text,
|
|
model=model,
|
|
system_prompt=system_prompt or SYSTEM_PROMPT,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"LLM failed: {e}")
|
|
raise HTTPException(status_code=500, detail=f"LLM processing failed: {e}")
|
|
llm_time = time.time() - llm_start
|
|
|
|
logger.info(f"LLM response: {assistant_text[:100]}...")
|
|
|
|
# 3. Text-to-Speech
|
|
tts_start = time.time()
|
|
try:
|
|
response_audio = await synthesize_speech(assistant_text, voice)
|
|
except Exception as e:
|
|
logger.error(f"TTS failed: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Speech synthesis failed: {e}")
|
|
tts_time = time.time() - tts_start
|
|
|
|
total_time = time.time() - total_start
|
|
|
|
logger.info(f"Pipeline complete - STT: {stt_time:.2f}s, LLM: {llm_time:.2f}s, TTS: {tts_time:.2f}s, Total: {total_time:.2f}s")
|
|
|
|
# Return audio with metadata
|
|
return Response(
|
|
content=response_audio,
|
|
media_type="audio/mpeg",
|
|
headers={
|
|
"X-User-Text": user_text[:200].replace("\n", " "),
|
|
"X-Assistant-Text": assistant_text[:500].replace("\n", " "),
|
|
"X-Model": model,
|
|
"X-Voice": voice,
|
|
"X-STT-Time": str(round(stt_time, 2)),
|
|
"X-LLM-Time": str(round(llm_time, 2)),
|
|
"X-TTS-Time": str(round(tts_time, 2)),
|
|
"X-Total-Time": str(round(total_time, 2)),
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/voice/metadata", response_model=VoiceResponse)
|
|
async def voice_to_voice_with_metadata(
|
|
audio: UploadFile = File(..., description="Audio input"),
|
|
model: str = Form(DEFAULT_MODEL, description="Ollama model"),
|
|
voice: str = Form(DEFAULT_VOICE, description="TTS voice"),
|
|
language: str = Form("de", description="Input language"),
|
|
system_prompt: Optional[str] = Form(None, description="Custom system prompt"),
|
|
):
|
|
"""
|
|
Voice-to-voice pipeline returning JSON metadata (without audio).
|
|
Useful for debugging or when you need the text response.
|
|
"""
|
|
total_start = time.time()
|
|
|
|
audio_bytes = await audio.read()
|
|
if len(audio_bytes) == 0:
|
|
raise HTTPException(status_code=400, detail="Audio file is empty")
|
|
|
|
# STT
|
|
stt_start = time.time()
|
|
stt_result = await transcribe_audio(audio_bytes, language)
|
|
user_text = stt_result.get("text", "").strip()
|
|
stt_time = time.time() - stt_start
|
|
|
|
if not user_text:
|
|
raise HTTPException(status_code=400, detail="No speech detected")
|
|
|
|
# LLM
|
|
llm_start = time.time()
|
|
assistant_text = await chat_with_ollama(
|
|
message=user_text,
|
|
model=model,
|
|
system_prompt=system_prompt or SYSTEM_PROMPT,
|
|
)
|
|
llm_time = time.time() - llm_start
|
|
|
|
# TTS (just measure time, don't return audio)
|
|
tts_start = time.time()
|
|
await synthesize_speech(assistant_text, voice)
|
|
tts_time = time.time() - tts_start
|
|
|
|
return VoiceResponse(
|
|
user_text=user_text,
|
|
assistant_text=assistant_text,
|
|
model=model,
|
|
voice=voice,
|
|
stt_time=round(stt_time, 2),
|
|
llm_time=round(llm_time, 2),
|
|
tts_time=round(tts_time, 2),
|
|
total_time=round(time.time() - total_start, 2),
|
|
)
|
|
|
|
|
|
@app.post("/tts")
|
|
async def text_to_speech(
|
|
text: str = Form(..., description="Text to synthesize"),
|
|
voice: str = Form(DEFAULT_VOICE, description="Voice to use"),
|
|
):
|
|
"""Direct text-to-speech synthesis."""
|
|
if not text.strip():
|
|
raise HTTPException(status_code=400, detail="Text is empty")
|
|
|
|
audio_bytes = await synthesize_speech(text, voice)
|
|
|
|
return Response(
|
|
content=audio_bytes,
|
|
media_type="audio/mpeg",
|
|
headers={
|
|
"X-Voice": voice,
|
|
"X-Text-Length": str(len(text)),
|
|
},
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Main
|
|
# ============================================================================
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=PORT)
|