chore(cutover): remove services/mana-tts/ — moved to mana-platform

Live containers on the Mac Mini build out of `../mana/services/mana-tts/`
since the 8-Doppel-Cutover commit (774852ba2). Smoke test green
2026-05-08 — health endpoints, JWKS, login flow, Stripe-webhook all
reachable from the new build path. Removing the now-stale duplicate.

Was 148K in this repo, gone now. Active code lives in
`Code/mana/services/mana-tts/` (siehe ../mana/CLAUDE.md).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-05-08 18:53:53 +02:00
parent 3c4a6d4f69
commit 6103d4d2d9
19 changed files with 0 additions and 3360 deletions

View file

@ -1,36 +0,0 @@
# Mana TTS Service Configuration
# Copy to .env and adjust values as needed
# Server
PORT=3022
# Models
# Set to true to preload models on startup (slower startup, faster first request)
PRELOAD_MODELS=false
# Text Limits
MAX_TEXT_LENGTH=1000
# CORS Origins (comma-separated)
CORS_ORIGINS=https://mana.how,https://chat.mana.how,http://localhost:5173
# ===========================================
# Authentication
# ===========================================
# Enable API key authentication (default: true for production)
REQUIRE_AUTH=true
# API Keys (comma-separated, format: key:name)
# Example: sk-abc123:myapp,sk-def456:testuser
API_KEYS=
# Internal API key (no rate limit, for internal services)
# Generate with: openssl rand -hex 32
INTERNAL_API_KEY=
# Rate Limiting
# Requests per window per API key
RATE_LIMIT_REQUESTS=60
# Window size in seconds
RATE_LIMIT_WINDOW=60

View file

@ -1,127 +0,0 @@
# mana-tts
Text-to-Speech microservice. Wraps Kokoro (English presets), Piper (German, local ONNX), and F5-TTS (voice cloning) behind a small FastAPI surface. Lives on the Windows GPU server (`mana-server-gpu`, RTX 3090).
> ⚠️ **Earlier history**: this directory used to contain MLX-optimized
> Mac-Mini code (`f5-tts-mlx`, `mlx-audio`, `setup.sh` with Apple Silicon
> checks, `com.mana.mana-tts.plist` launchd setup). All of that moved to
> the Windows GPU box and was removed from the repo. If you need the
> MLX path, see git history.
## Tech Stack
| Layer | Technology |
|-------|------------|
| **Runtime** | Python 3.11 + uvicorn (Windows) |
| **Framework** | FastAPI |
| **English (preset)** | Kokoro-82M (`kokoro_service.py`) |
| **German (local)** | Piper ONNX with `kerstin_low.onnx` and `thorsten_medium.onnx` voices (`piper_service.py`) |
| **German (high-quality)** | Orpheus-3B German finetune (`orpheus_service.py`) — best for pre-generation |
| **Multilingual (expressive)** | Zonos v0.1 by Zyphra (`zonos_service.py`) — emotion control, 200k hours training |
| **Voice cloning** | F5-TTS on CUDA (`f5_service.py`) |
| **Audio I/O** | `soundfile`, `pydub` |
| **Auth** | Per-key + internal-key API auth (`auth.py`) + JWT via mana-auth (`external_auth.py`) |
| **VRAM** | Shared `vram_manager.py` (same module as mana-stt + mana-image-gen) |
| **Process supervision** | Windows Scheduled Task `ManaTTS` (AtLogOn) |
## Port: 3022
## Where it runs
| Host | Path on disk | Entrypoint |
|------|--------------|------------|
| Windows GPU server (`192.168.178.11`) | `C:\mana\services\mana-tts\` | `service.pyw` via Scheduled Task `ManaTTS` |
Public URL: `https://gpu-tts.mana.how`.
## API Endpoints
| Method | Path | Description |
|--------|------|-------------|
| GET | `/health` | Liveness + which backends are loaded |
| GET | `/models` | Available TTS models |
| GET | `/voices` | List all voices (preset + custom) |
| POST | `/voices` | Register a custom voice (reference audio + transcript) |
| DELETE | `/voices/{voice_id}` | Delete a custom voice |
| POST | `/synthesize/kokoro` | Kokoro synthesis (English presets) |
| POST | `/synthesize` | F5-TTS voice cloning |
| POST | `/synthesize/orpheus` | Orpheus synthesis (German, high-quality, pre-generation) |
| POST | `/synthesize/zonos` | Zonos synthesis (multilingual, expressive, emotion control) |
| POST | `/synthesize/auto` | Routing helper — picks the right backend for the requested voice |
All non-health endpoints require `Authorization: Bearer <token>` (per-app key, internal key, or mana-auth JWT).
## Voices
### Kokoro-82M (English presets)
~300 MB download. 30+ preset English voices. Fast, no reference audio needed.
### Piper (German, local ONNX)
~63 MB per voice. 100% local, GDPR-compliant. Available:
- `de_kerstin` (female, default)
- `de_thorsten` (male)
Fallback to Edge TTS cloud voices if Piper isn't loaded.
### Orpheus-3B German (high-quality pre-generation)
~8 GB VRAM. German finetune (`Kartoffel/Orpheus-3B_german_natural-v0.1`). Natural intonation, built-in speaker voices (tara, leo, emma, ...). Best quality for pre-generating static audio files. Not real-time.
### Zonos v0.1 (expressive multilingual)
~5 GB VRAM. By Zyphra, trained on 200k hours. Explicit German support. Fine-grained control: emotion (neutral/friendly/warm/curious), speaking rate, pitch variation. Can clone voices from 5s reference audio.
### F5-TTS (voice cloning)
~6 GB. Requires reference audio + transcript. Higher quality, slower. Custom voices live in `voices/` (reference audio + transcript per voice ID).
## Configuration (`.env` on the Windows GPU box)
```env
PORT=3022
PRELOAD_MODELS=false
MAX_TEXT_LENGTH=1000
REQUIRE_AUTH=true
API_KEYS=sk-app1:app1,sk-app2:app2
INTERNAL_API_KEY=...
CORS_ORIGINS=https://mana.how,https://chat.mana.how
```
## Code layout
```
services/mana-tts/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI endpoints
│ ├── kokoro_service.py # Kokoro (English presets)
│ ├── piper_service.py # Piper (German, local ONNX)
│ ├── f5_service.py # F5-TTS (voice cloning, CUDA)
│ ├── orpheus_service.py # Orpheus-3B German (high-quality)
│ ├── zonos_service.py # Zonos v0.1 (expressive multilingual)
│ ├── voice_manager.py # Custom voice registry
│ ├── audio_utils.py # Format conversion, resampling
│ ├── auth.py # API-key auth
│ ├── external_auth.py # JWT validation via mana-auth
│ └── vram_manager.py # Shared VRAM accountant
└── service.pyw # Windows runner (used by ManaTTS scheduled task)
```
The Piper voice ONNX files live alongside the service on the GPU box (`C:\mana\services\mana-tts\piper_voices\*.onnx`) — too big to commit, downloaded once during setup.
## Operations
```powershell
# Status
Get-ScheduledTask -TaskName "ManaTTS" | Format-List TaskName, State
Get-NetTCPConnection -LocalPort 3022 -State Listen
# Restart
Stop-ScheduledTask -TaskName "ManaTTS"
Start-ScheduledTask -TaskName "ManaTTS"
# Logs
Get-Content C:\mana\services\mana-tts\service.log -Tail 50
```
## Reference
- `docs/WINDOWS_GPU_SERVER_SETUP.md` — Windows box setup, scheduled tasks, firewall, Cloudflare tunnel
- `docs/PORT_SCHEMA.md` — port assignments across services

View file

@ -1,36 +0,0 @@
# Mana TTS
Text-to-Speech microservice running on the Windows GPU server (`mana-server-gpu`, RTX 3090). Wraps **Kokoro** (English presets), **Piper** (German, local ONNX), and **F5-TTS** (CUDA voice cloning).
For architecture, deployment, configuration, and operations see [`CLAUDE.md`](./CLAUDE.md) and [`docs/WINDOWS_GPU_SERVER_SETUP.md`](../../docs/WINDOWS_GPU_SERVER_SETUP.md).
## Port: 3022
## Public URL
`https://gpu-tts.mana.how` (via Cloudflare Tunnel + Mac Mini gpu-proxy)
## API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check + which backends are loaded |
| `/models` | GET | List available models |
| `/voices` | GET | List preset + custom voices |
| `/voices` | POST | Register a custom voice (reference audio + transcript) |
| `/voices/{id}` | DELETE | Delete a custom voice |
| `/synthesize/kokoro` | POST | Kokoro (English presets) |
| `/synthesize` | POST | F5-TTS voice cloning |
| `/synthesize/auto` | POST | Auto-select best backend for the requested voice |
All non-health endpoints require `Authorization: Bearer <token>`.
## Quick Test
```bash
curl -X POST https://gpu-tts.mana.how/synthesize/kokoro \
-H "Authorization: Bearer $INTERNAL_API_KEY" \
-H "Content-Type: application/json" \
-d '{"text":"Hello world","voice":"af_heart"}' \
--output test.wav
```

View file

@ -1,224 +0,0 @@
"""
Audio conversion utilities for the TTS service.
Handles format conversion between WAV and MP3.
"""
import io
import logging
import tempfile
from pathlib import Path
from typing import Optional
import numpy as np
import soundfile as sf
logger = logging.getLogger(__name__)
# Supported output formats
SUPPORTED_FORMATS = ["wav", "mp3"]
DEFAULT_FORMAT = "wav"
DEFAULT_SAMPLE_RATE = 24000
def audio_to_wav_bytes(
audio_data: np.ndarray,
sample_rate: int = DEFAULT_SAMPLE_RATE,
) -> bytes:
"""
Convert numpy audio array to WAV bytes.
Args:
audio_data: Audio samples as numpy array
sample_rate: Sample rate in Hz
Returns:
WAV file as bytes
"""
buffer = io.BytesIO()
sf.write(buffer, audio_data, sample_rate, format="WAV")
buffer.seek(0)
return buffer.read()
def audio_to_mp3_bytes(
audio_data: np.ndarray,
sample_rate: int = DEFAULT_SAMPLE_RATE,
bitrate: str = "192k",
) -> bytes:
"""
Convert numpy audio array to MP3 bytes.
Requires ffmpeg to be installed.
Args:
audio_data: Audio samples as numpy array
sample_rate: Sample rate in Hz
bitrate: MP3 bitrate (e.g., "128k", "192k", "320k")
Returns:
MP3 file as bytes
"""
try:
from pydub import AudioSegment
except ImportError:
logger.error("pydub not installed, falling back to WAV")
return audio_to_wav_bytes(audio_data, sample_rate)
# First convert to WAV
wav_bytes = audio_to_wav_bytes(audio_data, sample_rate)
# Then convert to MP3 using pydub
try:
audio_segment = AudioSegment.from_wav(io.BytesIO(wav_bytes))
buffer = io.BytesIO()
audio_segment.export(buffer, format="mp3", bitrate=bitrate)
buffer.seek(0)
return buffer.read()
except Exception as e:
logger.error(f"MP3 conversion failed: {e}, falling back to WAV")
return wav_bytes
def convert_audio(
audio_data: np.ndarray,
sample_rate: int = DEFAULT_SAMPLE_RATE,
output_format: str = DEFAULT_FORMAT,
) -> tuple[bytes, str]:
"""
Convert audio data to the specified format.
Args:
audio_data: Audio samples as numpy array
sample_rate: Sample rate in Hz
output_format: Output format ("wav" or "mp3")
Returns:
Tuple of (audio bytes, content type)
"""
output_format = output_format.lower()
if output_format not in SUPPORTED_FORMATS:
logger.warning(f"Unsupported format '{output_format}', using WAV")
output_format = "wav"
if output_format == "mp3":
return audio_to_mp3_bytes(audio_data, sample_rate), "audio/mpeg"
else:
return audio_to_wav_bytes(audio_data, sample_rate), "audio/wav"
def get_content_type(format: str) -> str:
"""Get MIME content type for audio format."""
content_types = {
"wav": "audio/wav",
"mp3": "audio/mpeg",
}
return content_types.get(format.lower(), "audio/wav")
def load_reference_audio(
file_path: str | Path,
) -> tuple[np.ndarray, int]:
"""
Load reference audio file for voice cloning.
Args:
file_path: Path to the audio file
Returns:
Tuple of (audio data as numpy array, sample rate)
"""
audio_data, sample_rate = sf.read(file_path)
# Convert to mono if stereo
if len(audio_data.shape) > 1:
audio_data = np.mean(audio_data, axis=1)
return audio_data, sample_rate
def resample_audio(
audio_data: np.ndarray,
original_sr: int,
target_sr: int = DEFAULT_SAMPLE_RATE,
) -> np.ndarray:
"""
Resample audio to target sample rate.
Args:
audio_data: Audio samples as numpy array
original_sr: Original sample rate
target_sr: Target sample rate
Returns:
Resampled audio data
"""
if original_sr == target_sr:
return audio_data
from scipy import signal
# Calculate resampling ratio
num_samples = int(len(audio_data) * target_sr / original_sr)
resampled = signal.resample(audio_data, num_samples)
return resampled.astype(np.float32)
def normalize_audio(
audio_data: np.ndarray,
target_db: float = -3.0,
) -> np.ndarray:
"""
Normalize audio to target dB level.
Args:
audio_data: Audio samples as numpy array
target_db: Target peak level in dB
Returns:
Normalized audio data
"""
# Calculate current peak
peak = np.max(np.abs(audio_data))
if peak == 0:
return audio_data
# Calculate target peak from dB
target_peak = 10 ** (target_db / 20)
# Apply gain
gain = target_peak / peak
return audio_data * gain
def save_temp_audio(
audio_bytes: bytes,
suffix: str = ".wav",
) -> str:
"""
Save audio bytes to a temporary file.
Args:
audio_bytes: Audio data as bytes
suffix: File extension
Returns:
Path to temporary file
"""
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(audio_bytes)
return tmp.name
def cleanup_temp_file(file_path: str) -> None:
"""
Clean up a temporary file.
Args:
file_path: Path to the file to delete
"""
try:
Path(file_path).unlink()
except Exception:
pass # Silent cleanup failure

View file

@ -1,271 +0,0 @@
"""
API Key Authentication for ManaCore STT Service
Supports two authentication modes:
1. Local API keys: Configured via environment variables
2. External API keys: Validated via mana-core-auth service (when EXTERNAL_AUTH_ENABLED=true)
Usage:
# Local keys
API_KEYS=sk-key1:name1,sk-key2:name2
INTERNAL_API_KEY=sk-internal-xxx
# External auth (for user-created keys via mana.how)
EXTERNAL_AUTH_ENABLED=true
MANA_CORE_AUTH_URL=http://localhost:3001
"""
import os
import time
import logging
from typing import Optional
from collections import defaultdict
from dataclasses import dataclass, field
from fastapi import HTTPException, Security, Request
from fastapi.security import APIKeyHeader
from .external_auth import (
is_external_auth_enabled,
validate_api_key_external,
ExternalValidationResult,
)
logger = logging.getLogger(__name__)
# Configuration
API_KEYS_ENV = os.getenv("API_KEYS", "") # Format: "sk-key1:name1,sk-key2:name2"
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "") # Unlimited internal key
REQUIRE_AUTH = os.getenv("REQUIRE_AUTH", "true").lower() == "true"
RATE_LIMIT_REQUESTS = int(os.getenv("RATE_LIMIT_REQUESTS", "60")) # Per minute
RATE_LIMIT_WINDOW = int(os.getenv("RATE_LIMIT_WINDOW", "60")) # Seconds
@dataclass
class APIKey:
"""API Key with metadata."""
key: str
name: str
is_internal: bool = False
rate_limit: int = RATE_LIMIT_REQUESTS # Requests per window
@dataclass
class RateLimitInfo:
"""Rate limit tracking per key."""
requests: list = field(default_factory=list)
def is_allowed(self, limit: int, window: int) -> bool:
"""Check if request is allowed within rate limit."""
now = time.time()
# Remove old requests outside window
self.requests = [t for t in self.requests if now - t < window]
if len(self.requests) >= limit:
return False
self.requests.append(now)
return True
def remaining(self, limit: int, window: int) -> int:
"""Get remaining requests in current window."""
now = time.time()
self.requests = [t for t in self.requests if now - t < window]
return max(0, limit - len(self.requests))
# Parse API keys from environment
def _parse_api_keys() -> dict[str, APIKey]:
"""Parse API keys from environment variables."""
keys = {}
# Parse comma-separated keys
if API_KEYS_ENV:
for entry in API_KEYS_ENV.split(","):
entry = entry.strip()
if ":" in entry:
key, name = entry.split(":", 1)
else:
key, name = entry, "default"
keys[key.strip()] = APIKey(key=key.strip(), name=name.strip())
# Add internal key with no rate limit
if INTERNAL_API_KEY:
keys[INTERNAL_API_KEY] = APIKey(
key=INTERNAL_API_KEY,
name="internal",
is_internal=True,
rate_limit=999999, # Effectively unlimited
)
return keys
# Global state
_api_keys = _parse_api_keys()
_rate_limits: dict[str, RateLimitInfo] = defaultdict(RateLimitInfo)
# Security scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@dataclass
class AuthResult:
"""Result of authentication check."""
authenticated: bool
key_name: Optional[str] = None
is_internal: bool = False
rate_limit_remaining: Optional[int] = None
user_id: Optional[str] = None # Set when using external auth
async def verify_api_key(
request: Request,
api_key: Optional[str] = Security(api_key_header),
) -> AuthResult:
"""
Verify API key and check rate limits.
Supports two authentication modes:
1. External auth via mana-core-auth (for sk_live_ keys)
2. Local auth via environment variables
Returns AuthResult with authentication status.
Raises HTTPException if auth fails or rate limited.
"""
# Skip auth for health and docs endpoints
path = request.url.path
if path in ["/health", "/docs", "/openapi.json", "/redoc"]:
return AuthResult(authenticated=True, key_name="public")
# If auth not required, allow all
if not REQUIRE_AUTH:
return AuthResult(authenticated=True, key_name="anonymous")
# Check for API key
if not api_key:
logger.warning(f"Missing API key for {path} from {request.client.host if request.client else 'unknown'}")
raise HTTPException(
status_code=401,
detail="Missing API key. Provide X-API-Key header.",
headers={"WWW-Authenticate": "ApiKey"},
)
# Try external auth first for sk_live_ keys (user-created keys via mana.how)
if api_key.startswith("sk_live_") and is_external_auth_enabled():
external_result = await validate_api_key_external(api_key, "stt")
if external_result is not None:
if external_result.valid:
# Use rate limits from external auth
rate_info = _rate_limits[api_key]
limit = external_result.rate_limit_requests
window = external_result.rate_limit_window
if not rate_info.is_allowed(limit, window):
remaining = rate_info.remaining(limit, window)
logger.warning(f"Rate limit exceeded for external key")
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {window} seconds.",
headers={
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + window),
"Retry-After": str(window),
},
)
remaining = rate_info.remaining(limit, window)
logger.debug(f"Authenticated external request from user {external_result.user_id} to {path}")
return AuthResult(
authenticated=True,
key_name="external",
is_internal=False,
rate_limit_remaining=remaining,
user_id=external_result.user_id,
)
else:
# External auth returned invalid
logger.warning(f"External auth failed: {external_result.error}")
raise HTTPException(
status_code=401,
detail=external_result.error or "Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
# If external_result is None, fall through to local auth
# Local auth: Validate key against environment variables
if api_key not in _api_keys:
logger.warning(f"Invalid API key attempt for {path}")
raise HTTPException(
status_code=401,
detail="Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
key_info = _api_keys[api_key]
# Check rate limit (skip for internal keys)
if not key_info.is_internal:
rate_info = _rate_limits[api_key]
if not rate_info.is_allowed(key_info.rate_limit, RATE_LIMIT_WINDOW):
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
logger.warning(f"Rate limit exceeded for key '{key_info.name}'")
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {RATE_LIMIT_WINDOW} seconds.",
headers={
"X-RateLimit-Limit": str(key_info.rate_limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + RATE_LIMIT_WINDOW),
"Retry-After": str(RATE_LIMIT_WINDOW),
},
)
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
else:
remaining = None
logger.debug(f"Authenticated request from '{key_info.name}' to {path}")
return AuthResult(
authenticated=True,
key_name=key_info.name,
is_internal=key_info.is_internal,
rate_limit_remaining=remaining,
)
def get_api_key_stats() -> dict:
"""Get statistics about API keys (for admin endpoint)."""
stats = {
"total_keys": len(_api_keys),
"auth_required": REQUIRE_AUTH,
"rate_limit": {
"requests_per_window": RATE_LIMIT_REQUESTS,
"window_seconds": RATE_LIMIT_WINDOW,
},
"keys": [],
}
for key, info in _api_keys.items():
# Don't expose actual keys, just metadata
masked_key = key[:8] + "..." if len(key) > 8 else "***"
rate_info = _rate_limits.get(key, RateLimitInfo())
stats["keys"].append({
"name": info.name,
"key_prefix": masked_key,
"is_internal": info.is_internal,
"requests_in_window": len(rate_info.requests),
"remaining": rate_info.remaining(info.rate_limit, RATE_LIMIT_WINDOW),
})
return stats
def reload_api_keys():
"""Reload API keys from environment (for runtime updates)."""
global _api_keys
_api_keys = _parse_api_keys()
logger.info(f"Reloaded {len(_api_keys)} API keys")

View file

@ -1,145 +0,0 @@
"""
External API Key Validation via mana-core-auth
When EXTERNAL_AUTH_ENABLED=true, API keys are validated against the
central mana-core-auth service. This allows users to create and manage
API keys from the mana.how web interface.
Results are cached for 5 minutes to reduce load on the auth service.
"""
import os
import time
import logging
import httpx
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Configuration
EXTERNAL_AUTH_ENABLED = os.getenv("EXTERNAL_AUTH_ENABLED", "false").lower() == "true"
MANA_CORE_AUTH_URL = os.getenv("MANA_CORE_AUTH_URL", "http://localhost:3001")
API_KEY_CACHE_TTL = int(os.getenv("API_KEY_CACHE_TTL", "300")) # 5 minutes
EXTERNAL_AUTH_TIMEOUT = float(os.getenv("EXTERNAL_AUTH_TIMEOUT", "5.0")) # seconds
@dataclass
class ExternalValidationResult:
"""Result from external API key validation."""
valid: bool
user_id: Optional[str] = None
scopes: Optional[list] = None
rate_limit_requests: int = 60
rate_limit_window: int = 60
error: Optional[str] = None
cached_at: float = 0.0
# In-memory cache for validation results
# Key: API key, Value: ExternalValidationResult
_validation_cache: dict[str, ExternalValidationResult] = {}
def is_external_auth_enabled() -> bool:
"""Check if external authentication is enabled."""
return EXTERNAL_AUTH_ENABLED
def _get_cached_result(api_key: str) -> Optional[ExternalValidationResult]:
"""Get cached validation result if still valid."""
result = _validation_cache.get(api_key)
if result and (time.time() - result.cached_at) < API_KEY_CACHE_TTL:
return result
return None
def _cache_result(api_key: str, result: ExternalValidationResult):
"""Cache a validation result."""
result.cached_at = time.time()
_validation_cache[api_key] = result
# Clean up old entries periodically (keep cache size manageable)
if len(_validation_cache) > 1000:
now = time.time()
expired_keys = [
k for k, v in _validation_cache.items()
if (now - v.cached_at) >= API_KEY_CACHE_TTL
]
for k in expired_keys:
del _validation_cache[k]
async def validate_api_key_external(api_key: str, scope: str) -> Optional[ExternalValidationResult]:
"""
Validate an API key against mana-core-auth service.
Args:
api_key: The API key to validate (e.g., "sk_live_...")
scope: The required scope (e.g., "stt" or "tts")
Returns:
ExternalValidationResult if external auth is enabled and the key was validated.
None if external auth is disabled or the service is unavailable (fallback to local).
"""
if not EXTERNAL_AUTH_ENABLED:
return None
# Check cache first
cached = _get_cached_result(api_key)
if cached:
logger.debug(f"Using cached validation result for key prefix: {api_key[:12]}...")
# Check scope against cached result
if cached.valid and cached.scopes and scope not in cached.scopes:
return ExternalValidationResult(
valid=False,
error=f"API key does not have scope: {scope}",
)
return cached
# Call mana-core-auth validation endpoint
try:
async with httpx.AsyncClient(timeout=EXTERNAL_AUTH_TIMEOUT) as client:
response = await client.post(
f"{MANA_CORE_AUTH_URL}/api/v1/api-keys/validate",
json={"apiKey": api_key, "scope": scope},
)
if response.status_code == 200:
data = response.json()
result = ExternalValidationResult(
valid=data.get("valid", False),
user_id=data.get("userId"),
scopes=data.get("scopes", []),
rate_limit_requests=data.get("rateLimit", {}).get("requests", 60),
rate_limit_window=data.get("rateLimit", {}).get("window", 60),
error=data.get("error"),
)
_cache_result(api_key, result)
return result
else:
logger.warning(
f"External auth returned status {response.status_code}: {response.text}"
)
# Don't cache errors - allow retry
return ExternalValidationResult(
valid=False,
error=f"Auth service returned {response.status_code}",
)
except httpx.TimeoutException:
logger.warning("External auth service timeout - falling back to local auth")
return None
except httpx.ConnectError:
logger.warning("Cannot connect to external auth service - falling back to local auth")
return None
except Exception as e:
logger.error(f"External auth error: {e}")
return None
def clear_cache():
"""Clear the validation cache (for testing or runtime updates)."""
global _validation_cache
_validation_cache.clear()
logger.info("External auth cache cleared")

View file

@ -1,178 +0,0 @@
"""
F5-TTS Service for voice cloning synthesis.
CUDA version using f5-tts PyTorch package.
"""
import logging
import os
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import numpy as np
logger = logging.getLogger(__name__)
# Global singleton for lazy initialization
_f5_api = None
# Default model
DEFAULT_F5_MODEL = os.getenv("F5_MODEL", "F5-TTS")
# Default generation parameters
DEFAULT_STEPS = 32
DEFAULT_CFG_STRENGTH = 2.0
DEFAULT_SWAY_COEF = -1.0
DEFAULT_SPEED = 1.0
@dataclass
class F5Result:
"""Result from F5-TTS synthesis."""
audio: np.ndarray
sample_rate: int
duration: float
voice_id: Optional[str] = None
def get_f5_model(model_name: str = DEFAULT_F5_MODEL):
"""Get or create F5-TTS API instance (singleton pattern)."""
global _f5_api
if _f5_api is not None:
return _f5_api
logger.info(f"Loading F5-TTS model: {model_name}")
try:
from f5_tts.api import F5TTS
_f5_api = F5TTS(model_type="F5-TTS")
logger.info("F5-TTS model loaded successfully (CUDA)")
return _f5_api
except ImportError as e:
logger.error(f"Failed to import f5_tts: {e}")
raise RuntimeError(
"f5-tts not installed. Run: pip install f5-tts"
)
except Exception as e:
logger.error(f"Failed to load F5-TTS model: {e}")
raise
def is_f5_loaded() -> bool:
"""Check if F5-TTS model is currently loaded."""
return _f5_api is not None
async def synthesize_f5(
text: str,
reference_audio_path: str,
reference_text: str,
duration: Optional[float] = None,
steps: int = DEFAULT_STEPS,
cfg_strength: float = DEFAULT_CFG_STRENGTH,
sway_coef: float = DEFAULT_SWAY_COEF,
speed: float = DEFAULT_SPEED,
model_name: str = DEFAULT_F5_MODEL,
) -> F5Result:
"""
Synthesize speech using F5-TTS with voice cloning.
Args:
text: Text to synthesize
reference_audio_path: Path to reference audio file
reference_text: Transcript of the reference audio
duration: Target duration in seconds (auto-calculated if None)
steps: Number of diffusion steps
cfg_strength: Classifier-free guidance strength
sway_coef: Sway sampling coefficient
speed: Speech speed multiplier
model_name: Model identifier
Returns:
F5Result with audio data
"""
import asyncio
api = get_f5_model(model_name)
logger.info(
f"Synthesizing with F5-TTS: text_length={len(text)}, "
f"ref_audio={reference_audio_path}, steps={steps}"
)
try:
# F5-TTS API infer method (runs synchronously, offload to thread)
loop = asyncio.get_event_loop()
def _generate():
wav, sr, _ = api.infer(
ref_file=reference_audio_path,
ref_text=reference_text,
gen_text=text,
nfe_step=steps,
cfg_strength=cfg_strength,
sway_sampling_coeff=sway_coef,
speed=speed,
)
return wav, sr
audio, sample_rate = await loop.run_in_executor(None, _generate)
# Convert to numpy if needed
if not isinstance(audio, np.ndarray):
audio = np.array(audio, dtype=np.float32)
# Calculate duration
audio_duration = len(audio) / sample_rate
logger.info(f"F5-TTS synthesis complete: duration={audio_duration:.2f}s")
return F5Result(
audio=audio,
sample_rate=sample_rate,
duration=audio_duration,
)
except Exception as e:
logger.error(f"F5-TTS synthesis failed: {e}")
raise RuntimeError(f"Voice cloning synthesis failed: {e}")
async def synthesize_f5_from_bytes(
text: str,
reference_audio_bytes: bytes,
reference_text: str,
audio_extension: str = ".wav",
**kwargs,
) -> F5Result:
"""Synthesize speech using F5-TTS with reference audio as bytes."""
with tempfile.NamedTemporaryFile(suffix=audio_extension, delete=False) as tmp:
tmp.write(reference_audio_bytes)
tmp_path = tmp.name
try:
result = await synthesize_f5(
text=text,
reference_audio_path=tmp_path,
reference_text=reference_text,
**kwargs,
)
return result
finally:
try:
Path(tmp_path).unlink()
except Exception:
pass
def estimate_duration(text: str, speed: float = 1.0) -> float:
"""Estimate audio duration from text."""
words = len(text) / 5
minutes = words / 150
seconds = minutes * 60
return seconds / speed

View file

@ -1,165 +0,0 @@
"""
Kokoro TTS Service for fast preset voice synthesis.
CUDA version using kokoro PyTorch package.
"""
import logging
from dataclasses import dataclass
from typing import Optional
import numpy as np
logger = logging.getLogger(__name__)
# Global singleton for lazy initialization
_kokoro_pipeline = None
# Default model
DEFAULT_KOKORO_MODEL = "hexgrad/Kokoro-82M"
# Available Kokoro voices (American Female/Male, British Female/Male)
KOKORO_VOICES = {
# American Female voices
"af_heart": "American Female - Heart (warm, emotional)",
"af_alloy": "American Female - Alloy (neutral, professional)",
"af_aoede": "American Female - Aoede (clear, articulate)",
"af_bella": "American Female - Bella (friendly, approachable)",
"af_jessica": "American Female - Jessica (confident, clear)",
"af_kore": "American Female - Kore (calm, measured)",
"af_nicole": "American Female - Nicole (bright, energetic)",
"af_nova": "American Female - Nova (modern, dynamic)",
"af_river": "American Female - River (smooth, flowing)",
"af_sarah": "American Female - Sarah (warm, conversational)",
"af_sky": "American Female - Sky (light, airy)",
# American Male voices
"am_adam": "American Male - Adam (deep, authoritative)",
"am_echo": "American Male - Echo (resonant, clear)",
"am_eric": "American Male - Eric (professional, neutral)",
"am_fenrir": "American Male - Fenrir (strong, commanding)",
"am_liam": "American Male - Liam (friendly, casual)",
"am_michael": "American Male - Michael (warm, trustworthy)",
"am_onyx": "American Male - Onyx (deep, smooth)",
"am_puck": "American Male - Puck (playful, light)",
# British Female voices
"bf_alice": "British Female - Alice (refined, elegant)",
"bf_emma": "British Female - Emma (clear, professional)",
"bf_isabella": "British Female - Isabella (sophisticated, warm)",
"bf_lily": "British Female - Lily (soft, gentle)",
# British Male voices
"bm_daniel": "British Male - Daniel (classic, authoritative)",
"bm_fable": "British Male - Fable (storyteller, expressive)",
"bm_george": "British Male - George (traditional, clear)",
"bm_lewis": "British Male - Lewis (modern, approachable)",
}
DEFAULT_VOICE = "af_heart"
@dataclass
class KokoroResult:
"""Result from Kokoro TTS synthesis."""
audio: np.ndarray
sample_rate: int
voice: str
duration: float
def get_kokoro_model(model_name: str = DEFAULT_KOKORO_MODEL):
"""Get or create Kokoro pipeline instance (singleton pattern)."""
global _kokoro_pipeline
if _kokoro_pipeline is not None:
return _kokoro_pipeline
logger.info(f"Loading Kokoro model: {model_name}")
try:
from kokoro import KPipeline
_kokoro_pipeline = KPipeline(lang_code="a") # 'a' for American English
logger.info("Kokoro pipeline loaded successfully")
return _kokoro_pipeline
except ImportError as e:
logger.error(f"Failed to import kokoro: {e}")
raise RuntimeError(
"kokoro not installed. Run: pip install kokoro"
)
except Exception as e:
logger.error(f"Failed to load Kokoro model: {e}")
raise
def is_kokoro_loaded() -> bool:
"""Check if Kokoro model is currently loaded."""
return _kokoro_pipeline is not None
def get_available_voices() -> dict[str, str]:
"""Get dictionary of available Kokoro voices."""
return KOKORO_VOICES.copy()
async def synthesize_kokoro(
text: str,
voice: str = DEFAULT_VOICE,
speed: float = 1.0,
model_name: str = DEFAULT_KOKORO_MODEL,
) -> KokoroResult:
"""
Synthesize speech using Kokoro TTS.
Args:
text: Text to synthesize
voice: Voice ID from KOKORO_VOICES
speed: Speech speed multiplier (0.5-2.0)
model_name: Model identifier
Returns:
KokoroResult with audio data
"""
# Validate voice
if voice not in KOKORO_VOICES:
logger.warning(f"Unknown voice '{voice}', using default '{DEFAULT_VOICE}'")
voice = DEFAULT_VOICE
# Clamp speed to valid range
speed = max(0.5, min(2.0, speed))
# Get model
pipeline = get_kokoro_model(model_name)
logger.info(f"Synthesizing with Kokoro: voice={voice}, speed={speed}, text_length={len(text)}")
try:
# Generate audio using kokoro pipeline
audio_chunks = []
sample_rate = 24000 # Kokoro default
for result in pipeline(text, voice=voice, speed=speed):
# result is a KPipelineResult with .audio (tensor) and .graphemes/.phonemes
audio_np = result.audio.numpy()
audio_chunks.append(audio_np)
# Concatenate all chunks
if audio_chunks:
full_audio = np.concatenate(audio_chunks)
else:
raise RuntimeError("No audio generated")
# Calculate duration from audio length
total_duration = len(full_audio) / sample_rate
logger.info(f"Kokoro synthesis complete: duration={total_duration:.2f}s")
return KokoroResult(
audio=full_audio,
sample_rate=sample_rate,
voice=voice,
duration=total_duration,
)
except Exception as e:
logger.error(f"Kokoro synthesis failed: {e}")
raise RuntimeError(f"TTS synthesis failed: {e}")

View file

@ -1,844 +0,0 @@
"""
Mana TTS - Text-to-Speech Microservice
Provides TTS synthesis using:
- Kokoro: Fast preset voices
- F5-TTS: Voice cloning with reference audio
Optimized for Apple Silicon (MLX).
"""
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Response, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from .auth import verify_api_key, AuthResult, REQUIRE_AUTH
from .audio_utils import convert_audio, SUPPORTED_FORMATS, cleanup_temp_file, save_temp_audio
from .kokoro_service import (
synthesize_kokoro,
get_kokoro_model,
is_kokoro_loaded,
KOKORO_VOICES,
DEFAULT_VOICE as DEFAULT_KOKORO_VOICE,
DEFAULT_KOKORO_MODEL,
)
from .f5_service import (
synthesize_f5,
synthesize_f5_from_bytes,
get_f5_model,
is_f5_loaded,
DEFAULT_F5_MODEL,
)
from .voice_manager import get_voice_manager, CustomVoice
from .piper_service import (
synthesize_piper,
PIPER_VOICES,
is_piper_loaded,
)
from .orpheus_service import (
synthesize_orpheus,
is_orpheus_loaded,
ORPHEUS_VOICES,
DEFAULT_VOICE as DEFAULT_ORPHEUS_VOICE,
)
from .zonos_service import (
synthesize_zonos,
is_zonos_loaded,
EMOTION_PRESETS as ZONOS_EMOTIONS,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Configuration from environment
PORT = int(os.getenv("PORT", "3022"))
PRELOAD_MODELS = os.getenv("PRELOAD_MODELS", "false").lower() == "true"
MAX_TEXT_LENGTH = int(os.getenv("MAX_TEXT_LENGTH", "1000"))
CORS_ORIGINS = os.getenv(
"CORS_ORIGINS",
"https://mana.how,https://chat.mana.how,https://todo.mana.how,http://localhost:5173",
).split(",")
# Supported audio extensions for uploads
SUPPORTED_AUDIO_EXTENSIONS = {".wav", ".mp3", ".m4a", ".flac", ".ogg"}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager for startup/shutdown."""
logger.info(f"Starting Mana TTS service on port {PORT}")
# Initialize voice manager (scans voices directory)
voice_manager = get_voice_manager()
logger.info(f"Voice manager initialized with {len(voice_manager.list_voices())} custom voices")
if PRELOAD_MODELS:
logger.info("Pre-loading models (PRELOAD_MODELS=true)...")
try:
get_kokoro_model()
logger.info("Kokoro model pre-loaded")
except Exception as e:
logger.warning(f"Failed to pre-load Kokoro: {e}")
try:
get_f5_model()
logger.info("F5-TTS model pre-loaded")
except Exception as e:
logger.warning(f"Failed to pre-load F5-TTS: {e}")
else:
logger.info("Models will be loaded on first request (lazy loading)")
yield
logger.info("Shutting down Mana TTS service")
# Create FastAPI app
app = FastAPI(
title="Mana TTS",
description="Text-to-Speech service with voice cloning support",
version="1.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# Request/Response Models
# ============================================================================
class KokoroRequest(BaseModel):
"""Request for Kokoro TTS synthesis."""
text: str = Field(..., description="Text to synthesize", max_length=5000)
voice: str = Field(DEFAULT_KOKORO_VOICE, description="Voice ID")
speed: float = Field(1.0, ge=0.5, le=2.0, description="Speech speed")
output_format: str = Field("wav", description="Output format (wav, mp3)")
class AutoRequest(BaseModel):
"""Request for auto-selection TTS synthesis."""
text: str = Field(..., description="Text to synthesize", max_length=5000)
voice: Optional[str] = Field(None, description="Voice ID (Kokoro preset or registered)")
speed: float = Field(1.0, ge=0.5, le=2.0, description="Speech speed")
output_format: str = Field("wav", description="Output format (wav, mp3)")
class RegisterVoiceRequest(BaseModel):
"""Request to register a new custom voice."""
voice_id: str = Field(..., description="Unique voice identifier", min_length=2, max_length=50)
name: str = Field(..., description="Display name")
description: str = Field("", description="Voice description")
transcript: str = Field(..., description="Transcript of the reference audio")
class HealthResponse(BaseModel):
"""Health check response."""
status: str
service: str
models_loaded: dict
auth_required: bool
class ModelsResponse(BaseModel):
"""Available models response."""
kokoro: dict
f5: dict
class VoiceInfo(BaseModel):
"""Voice information."""
id: str
name: str
description: str
type: str # "kokoro" or "f5_custom"
class VoicesResponse(BaseModel):
"""Available voices response."""
kokoro_voices: list[VoiceInfo]
custom_voices: list[VoiceInfo]
class VoiceRegisteredResponse(BaseModel):
"""Response after registering a voice."""
voice_id: str
message: str
class VoiceDeletedResponse(BaseModel):
"""Response after deleting a voice."""
voice_id: str
message: str
# ============================================================================
# Health & Info Endpoints
# ============================================================================
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Check service health and model status."""
return HealthResponse(
status="healthy",
service="mana-tts",
models_loaded={
"kokoro": is_kokoro_loaded(),
"f5": is_f5_loaded(),
"orpheus": is_orpheus_loaded(),
"zonos": is_zonos_loaded(),
},
auth_required=REQUIRE_AUTH,
)
@app.get("/models", response_model=ModelsResponse)
async def get_models(auth: AuthResult = Depends(verify_api_key)):
"""Get information about available models."""
return ModelsResponse(
kokoro={
"name": "Kokoro-82M",
"description": "Fast TTS with preset voices",
"model_id": DEFAULT_KOKORO_MODEL,
"loaded": is_kokoro_loaded(),
"voice_count": len(KOKORO_VOICES),
},
f5={
"name": "F5-TTS",
"description": "Voice cloning with reference audio",
"model_id": DEFAULT_F5_MODEL,
"loaded": is_f5_loaded(),
"supports_cloning": True,
},
)
# ============================================================================
# Voice Management Endpoints
# ============================================================================
@app.get("/voices", response_model=VoicesResponse)
async def get_voices(auth: AuthResult = Depends(verify_api_key)):
"""Get all available voices."""
# Kokoro preset voices
kokoro_voices = [
VoiceInfo(
id=voice_id,
name=voice_id,
description=description,
type="kokoro",
)
for voice_id, description in KOKORO_VOICES.items()
]
# Custom voices from voice manager
voice_manager = get_voice_manager()
custom_voices = [
VoiceInfo(
id=voice.id,
name=voice.name,
description=voice.description,
type="f5_custom",
)
for voice in voice_manager.list_voices()
]
return VoicesResponse(
kokoro_voices=kokoro_voices,
custom_voices=custom_voices,
)
@app.post("/voices", response_model=VoiceRegisteredResponse)
async def register_voice(
voice_id: str = Form(..., description="Unique voice identifier"),
name: str = Form(..., description="Display name"),
description: str = Form("", description="Voice description"),
transcript: str = Form(..., description="Transcript of the reference audio"),
reference_audio: UploadFile = File(..., description="Reference audio file"),
auth: AuthResult = Depends(verify_api_key),
):
"""
Register a new custom voice for F5-TTS voice cloning.
Requires:
- Reference audio file (WAV, MP3, M4A, FLAC, OGG)
- Transcript of what is said in the audio
"""
# Validate file extension
if reference_audio.filename:
ext = Path(reference_audio.filename).suffix.lower()
if ext not in SUPPORTED_AUDIO_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Unsupported audio format. Use one of: {SUPPORTED_AUDIO_EXTENSIONS}",
)
else:
ext = ".wav"
# Read audio bytes
audio_bytes = await reference_audio.read()
if len(audio_bytes) == 0:
raise HTTPException(status_code=400, detail="Audio file is empty")
if len(audio_bytes) > 50 * 1024 * 1024: # 50 MB limit
raise HTTPException(status_code=400, detail="Audio file too large (max 50 MB)")
# Register voice
voice_manager = get_voice_manager()
try:
voice_manager.register_voice(
voice_id=voice_id,
name=name,
description=description,
audio_bytes=audio_bytes,
transcript=transcript,
audio_extension=ext,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return VoiceRegisteredResponse(
voice_id=voice_id,
message=f"Voice '{voice_id}' registered successfully",
)
@app.delete("/voices/{voice_id}", response_model=VoiceDeletedResponse)
async def delete_voice(voice_id: str, auth: AuthResult = Depends(verify_api_key)):
"""Delete a registered custom voice."""
voice_manager = get_voice_manager()
if not voice_manager.delete_voice(voice_id):
raise HTTPException(status_code=404, detail=f"Voice '{voice_id}' not found")
return VoiceDeletedResponse(
voice_id=voice_id,
message=f"Voice '{voice_id}' deleted successfully",
)
# ============================================================================
# Kokoro TTS Endpoint
# ============================================================================
@app.post("/synthesize/kokoro")
async def synthesize_with_kokoro(
request: KokoroRequest,
auth: AuthResult = Depends(verify_api_key),
):
"""
Synthesize speech using Kokoro with preset voices.
Fast synthesis with high-quality preset voices.
"""
# Validate text length
if len(request.text) > MAX_TEXT_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Text exceeds maximum length of {MAX_TEXT_LENGTH} characters",
)
if not request.text.strip():
raise HTTPException(status_code=400, detail="Text cannot be empty")
# Validate output format
output_format = request.output_format.lower()
if output_format not in SUPPORTED_FORMATS:
raise HTTPException(
status_code=400,
detail=f"Unsupported format. Use one of: {SUPPORTED_FORMATS}",
)
try:
# Synthesize
result = await synthesize_kokoro(
text=request.text,
voice=request.voice,
speed=request.speed,
)
# Convert to requested format
audio_bytes, content_type = convert_audio(
result.audio,
result.sample_rate,
output_format,
)
# Return audio response
return Response(
content=audio_bytes,
media_type=content_type,
headers={
"X-Voice": result.voice,
"X-Duration": str(result.duration),
"X-Sample-Rate": str(result.sample_rate),
},
)
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Kokoro synthesis error: {e}")
raise HTTPException(status_code=500, detail=f"Synthesis failed: {e}")
# ============================================================================
# F5-TTS Endpoint
# ============================================================================
@app.post("/synthesize")
async def synthesize_with_f5(
text: str = Form(..., description="Text to synthesize"),
voice_id: Optional[str] = Form(None, description="Registered voice ID"),
reference_audio: Optional[UploadFile] = File(None, description="Reference audio for cloning"),
reference_text: Optional[str] = Form(None, description="Transcript of reference audio"),
output_format: str = Form("wav", description="Output format (wav, mp3)"),
speed: float = Form(1.0, ge=0.5, le=2.0, description="Speech speed"),
steps: int = Form(32, ge=8, le=64, description="Diffusion steps"),
auth: AuthResult = Depends(verify_api_key),
):
"""
Synthesize speech using F5-TTS with voice cloning.
Provide either:
- voice_id: Use a pre-registered voice
- reference_audio + reference_text: Clone voice from audio sample
"""
# Validate text
if len(text) > MAX_TEXT_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Text exceeds maximum length of {MAX_TEXT_LENGTH} characters",
)
if not text.strip():
raise HTTPException(status_code=400, detail="Text cannot be empty")
# Validate output format
output_format = output_format.lower()
if output_format not in SUPPORTED_FORMATS:
raise HTTPException(
status_code=400,
detail=f"Unsupported format. Use one of: {SUPPORTED_FORMATS}",
)
voice_manager = get_voice_manager()
ref_audio_path: Optional[str] = None
ref_text: Optional[str] = None
temp_file_path: Optional[str] = None
try:
# Option 1: Use registered voice
if voice_id:
voice = voice_manager.get_voice(voice_id)
if not voice:
raise HTTPException(
status_code=404,
detail=f"Voice '{voice_id}' not found. Register it first or provide reference audio.",
)
ref_audio_path = voice.audio_path
ref_text = voice.transcript
# Option 2: Use uploaded reference audio
elif reference_audio and reference_text:
# Get file extension
ext = ".wav"
if reference_audio.filename:
ext = Path(reference_audio.filename).suffix.lower()
if ext not in SUPPORTED_AUDIO_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Unsupported audio format. Use one of: {SUPPORTED_AUDIO_EXTENSIONS}",
)
# Read and save to temp file
audio_bytes = await reference_audio.read()
if len(audio_bytes) == 0:
raise HTTPException(status_code=400, detail="Reference audio is empty")
temp_file_path = save_temp_audio(audio_bytes, suffix=ext)
ref_audio_path = temp_file_path
ref_text = reference_text
else:
raise HTTPException(
status_code=400,
detail="Provide either voice_id or reference_audio + reference_text",
)
# Synthesize with F5-TTS
result = await synthesize_f5(
text=text,
reference_audio_path=ref_audio_path,
reference_text=ref_text,
speed=speed,
steps=steps,
)
# Convert to requested format
audio_bytes, content_type = convert_audio(
result.audio,
result.sample_rate,
output_format,
)
# Return audio response
return Response(
content=audio_bytes,
media_type=content_type,
headers={
"X-Model": "f5-tts",
"X-Voice-ID": voice_id or "custom",
"X-Duration": str(result.duration),
"X-Sample-Rate": str(result.sample_rate),
},
)
except HTTPException:
raise
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"F5-TTS synthesis error: {e}")
raise HTTPException(status_code=500, detail=f"Voice cloning synthesis failed: {e}")
finally:
# Clean up temp file
if temp_file_path:
cleanup_temp_file(temp_file_path)
# ============================================================================
# Orpheus TTS Endpoint (German, high-quality)
# ============================================================================
class OrpheusRequest(BaseModel):
"""Request for Orpheus TTS synthesis."""
text: str = Field(..., description="Text to synthesize (German)", max_length=5000)
voice: str = Field(DEFAULT_ORPHEUS_VOICE, description="Speaker voice")
output_format: str = Field("wav", description="Output format (wav, mp3)")
temperature: float = Field(0.6, ge=0.1, le=1.5, description="Sampling temperature")
@app.post("/synthesize/orpheus")
async def synthesize_with_orpheus(
request: OrpheusRequest,
auth: AuthResult = Depends(verify_api_key),
):
"""
Synthesize German speech using Orpheus TTS.
High-quality German synthesis with natural intonation.
Not optimized for real-time designed for pre-generation.
"""
if not request.text.strip():
raise HTTPException(status_code=400, detail="Text cannot be empty")
if len(request.text) > MAX_TEXT_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Text exceeds maximum length of {MAX_TEXT_LENGTH} characters",
)
output_format = request.output_format.lower()
if output_format not in SUPPORTED_FORMATS:
raise HTTPException(
status_code=400,
detail=f"Unsupported format. Use one of: {SUPPORTED_FORMATS}",
)
try:
result = await synthesize_orpheus(
text=request.text,
voice=request.voice,
temperature=request.temperature,
)
audio_bytes, content_type = convert_audio(
result.audio,
result.sample_rate,
output_format,
)
return Response(
content=audio_bytes,
media_type=content_type,
headers={
"X-Model": "orpheus-german",
"X-Voice": result.voice,
"X-Duration": str(result.duration),
"X-Sample-Rate": str(result.sample_rate),
},
)
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Orpheus synthesis error: {e}")
raise HTTPException(status_code=500, detail=f"Orpheus synthesis failed: {e}")
# ============================================================================
# Zonos TTS Endpoint (Multilingual, expressive)
# ============================================================================
class ZonosRequest(BaseModel):
"""Request for Zonos TTS synthesis."""
text: str = Field(..., description="Text to synthesize", max_length=5000)
language: str = Field("de", description="Language code")
emotion: str = Field("friendly", description="Emotion preset: neutral, friendly, warm, curious")
speaking_rate: float = Field(13.0, ge=5.0, le=25.0, description="Phonemes per second")
pitch_std: float = Field(20.0, ge=5.0, le=50.0, description="Pitch variation in Hz")
output_format: str = Field("wav", description="Output format (wav, mp3)")
@app.post("/synthesize/zonos")
async def synthesize_with_zonos(
request: ZonosRequest,
auth: AuthResult = Depends(verify_api_key),
):
"""
Synthesize speech using Zonos TTS by Zyphra.
Expressive multilingual synthesis with emotion control.
Trained on 200k hours explicit German support.
"""
if not request.text.strip():
raise HTTPException(status_code=400, detail="Text cannot be empty")
if len(request.text) > MAX_TEXT_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Text exceeds maximum length of {MAX_TEXT_LENGTH} characters",
)
output_format = request.output_format.lower()
if output_format not in SUPPORTED_FORMATS:
raise HTTPException(
status_code=400,
detail=f"Unsupported format. Use one of: {SUPPORTED_FORMATS}",
)
if request.emotion not in ZONOS_EMOTIONS:
raise HTTPException(
status_code=400,
detail=f"Unknown emotion. Use one of: {list(ZONOS_EMOTIONS.keys())}",
)
try:
result = await synthesize_zonos(
text=request.text,
language=request.language,
emotion=request.emotion,
speaking_rate=request.speaking_rate,
pitch_std=request.pitch_std,
)
audio_bytes, content_type = convert_audio(
result.audio,
result.sample_rate,
output_format,
)
return Response(
content=audio_bytes,
media_type=content_type,
headers={
"X-Model": "zonos-v0.1",
"X-Emotion": result.emotion,
"X-Duration": str(result.duration),
"X-Sample-Rate": str(result.sample_rate),
},
)
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Zonos synthesis error: {e}")
raise HTTPException(status_code=500, detail=f"Zonos synthesis failed: {e}")
# ============================================================================
# Auto-Selection Endpoint
# ============================================================================
@app.post("/synthesize/auto")
async def synthesize_auto(
request: AutoRequest,
auth: AuthResult = Depends(verify_api_key),
):
"""
Auto-select the best TTS model based on voice parameter.
- If voice is a Kokoro preset: Use Kokoro
- If voice is a registered custom voice: Use F5-TTS
- If no voice specified: Use Kokoro with default voice
"""
# Validate text
if len(request.text) > MAX_TEXT_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Text exceeds maximum length of {MAX_TEXT_LENGTH} characters",
)
if not request.text.strip():
raise HTTPException(status_code=400, detail="Text cannot be empty")
# Determine which model to use
voice = request.voice or DEFAULT_KOKORO_VOICE
# Check if it's a Kokoro voice
if voice in KOKORO_VOICES:
kokoro_request = KokoroRequest(
text=request.text,
voice=voice,
speed=request.speed,
output_format=request.output_format,
)
return await synthesize_with_kokoro(kokoro_request)
# Check if it's a Piper/German voice
if voice in PIPER_VOICES:
try:
# Convert speed to length_scale (inverse relationship)
# speed > 1 means faster, so length_scale < 1
length_scale = 1.0 / request.speed
result = await synthesize_piper(
text=request.text,
voice=voice,
length_scale=length_scale,
)
# Convert to requested format
output_format = request.output_format.lower()
audio_bytes, content_type = convert_audio(
result.audio,
result.sample_rate,
output_format,
)
return Response(
content=audio_bytes,
media_type=content_type,
headers={
"X-Model": "piper",
"X-Voice": voice,
"X-Duration": str(result.duration),
"X-Sample-Rate": str(result.sample_rate),
},
)
except Exception as e:
logger.error(f"Piper synthesis error: {e}")
raise HTTPException(status_code=500, detail=f"German voice synthesis failed: {e}")
# Check if it's a registered custom voice
voice_manager = get_voice_manager()
if voice_manager.voice_exists(voice):
# Use F5-TTS with registered voice
# Create a form-like context for the F5 endpoint
custom_voice = voice_manager.get_voice(voice)
try:
result = await synthesize_f5(
text=request.text,
reference_audio_path=custom_voice.audio_path,
reference_text=custom_voice.transcript,
speed=request.speed,
)
# Convert to requested format
output_format = request.output_format.lower()
audio_bytes, content_type = convert_audio(
result.audio,
result.sample_rate,
output_format,
)
return Response(
content=audio_bytes,
media_type=content_type,
headers={
"X-Model": "f5-tts",
"X-Voice-ID": voice,
"X-Duration": str(result.duration),
"X-Sample-Rate": str(result.sample_rate),
},
)
except Exception as e:
logger.error(f"F5-TTS auto synthesis error: {e}")
raise HTTPException(status_code=500, detail=f"Voice synthesis failed: {e}")
# Unknown voice - fall back to Kokoro with default
logger.warning(f"Unknown voice '{voice}', falling back to Kokoro default")
kokoro_request = KokoroRequest(
text=request.text,
voice=DEFAULT_KOKORO_VOICE,
speed=request.speed,
output_format=request.output_format,
)
return await synthesize_with_kokoro(kokoro_request)
# ============================================================================
# Error Handler
# ============================================================================
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
"""Handle uncaught exceptions."""
logger.error(f"Unhandled exception: {exc}")
return Response(
content=f'{{"error": "Internal server error", "detail": "{str(exc)}"}}',
status_code=500,
media_type="application/json",
)
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT)

View file

@ -1,229 +0,0 @@
"""
Orpheus TTS High-quality German speech synthesis.
Uses the Orpheus-TTS model with German finetune for natural-sounding
interview question generation. Not optimized for real-time quality first.
Model: Kartoffel_Orpheus-3B_german_natural-v0.1 (HuggingFace)
VRAM: ~8 GB (fits comfortably on RTX 3090 alongside other models)
"""
import logging
import asyncio
from dataclasses import dataclass
from typing import Optional
import numpy as np
logger = logging.getLogger(__name__)
# Lazy-loaded model state
_model = None
_tokenizer = None
_loaded = False
MODEL_ID = "Vishalshendge3198/orpheus-3b-tts-german-emotional-merged"
SAMPLE_RATE = 24000
# Available voices (Orpheus built-in speaker tags)
ORPHEUS_VOICES = {
"tara": "Female, warm and clear (default)",
"leah": "Female, soft and friendly",
"jess": "Female, energetic",
"leo": "Male, calm and professional",
"dan": "Male, deep and warm",
"mia": "Female, young and bright",
"zac": "Male, confident",
"emma": "Female, neutral",
}
DEFAULT_VOICE = "tara"
@dataclass
class OrpheusResult:
audio: np.ndarray
sample_rate: int
duration: float
voice: str
def is_orpheus_loaded() -> bool:
return _loaded
def get_orpheus_model():
"""Load the Orpheus German model (lazy, first call only)."""
global _model, _tokenizer, _loaded
if _loaded:
return _model, _tokenizer
logger.info(f"Loading Orpheus German model: {MODEL_ID}")
try:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
_tokenizer = AutoTokenizer.from_pretrained(
MODEL_ID,
trust_remote_code=True,
)
_model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype=torch.bfloat16,
device_map="cuda",
trust_remote_code=True,
)
_model.eval()
_loaded = True
logger.info("Orpheus German model loaded successfully")
return _model, _tokenizer
except Exception as e:
logger.error(f"Failed to load Orpheus model: {e}")
raise RuntimeError(f"Failed to load Orpheus model: {e}")
def unload_orpheus():
"""Free VRAM by unloading the model."""
global _model, _tokenizer, _loaded
import torch
if _model is not None:
del _model
_model = None
if _tokenizer is not None:
del _tokenizer
_tokenizer = None
_loaded = False
torch.cuda.empty_cache()
logger.info("Orpheus model unloaded")
async def synthesize_orpheus(
text: str,
voice: str = DEFAULT_VOICE,
temperature: float = 0.6,
top_p: float = 0.95,
max_new_tokens: int = 4096,
) -> OrpheusResult:
"""
Synthesize German speech using Orpheus TTS.
Returns OrpheusResult with audio as numpy float32 array.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
_synthesize_sync,
text,
voice,
temperature,
top_p,
max_new_tokens,
)
def _synthesize_sync(
text: str,
voice: str,
temperature: float,
top_p: float,
max_new_tokens: int,
) -> OrpheusResult:
"""Synchronous synthesis (runs in thread pool)."""
import torch
model, tokenizer = get_orpheus_model()
# Orpheus uses a specific prompt format with speaker tags
prompt = f"<|speaker:{voice}|>{text}"
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
do_sample=True,
)
# Extract audio tokens (model-specific decoding)
audio_tokens = outputs[0][inputs["input_ids"].shape[1]:]
# Decode audio tokens to waveform
# Orpheus uses a SNAC-based codec — tokens map to audio via the model's decode method
if hasattr(model, "decode_audio"):
audio_np = model.decode_audio(audio_tokens).cpu().numpy().flatten()
else:
# Fallback: use the tokenizer's decode if model doesn't have decode_audio
# This handles different Orpheus model versions
audio_np = _decode_orpheus_tokens(audio_tokens, model)
duration = len(audio_np) / SAMPLE_RATE
return OrpheusResult(
audio=audio_np,
sample_rate=SAMPLE_RATE,
duration=duration,
voice=voice,
)
def _decode_orpheus_tokens(tokens, model) -> np.ndarray:
"""
Decode Orpheus audio tokens using SNAC codec.
Orpheus generates special audio tokens that need to be decoded
through the SNAC vocoder to produce the final waveform.
"""
import torch
try:
from snac import SNAC
snac = SNAC.from_pretrained("hubertsiuzdak/snac_24khz").to(model.device)
# Filter to audio-only tokens (above text vocab range)
audio_token_ids = tokens[tokens >= 128256].tolist()
if not audio_token_ids:
logger.warning("No audio tokens generated")
return np.zeros(SAMPLE_RATE, dtype=np.float32) # 1s silence
# Orpheus interleaves 3 codebook levels: [c1, c2, c3, c1, c2, c3, ...]
# Redistribute into separate codebook tensors
codes_0, codes_1, codes_2 = [], [], []
for i, token_id in enumerate(audio_token_ids):
# Offset tokens back to codebook range
code = token_id - 128256
level = i % 3
if level == 0:
codes_0.append(code)
elif level == 1:
codes_1.append(code)
else:
codes_2.append(code)
# Trim to equal lengths
min_len = min(len(codes_0), len(codes_1), len(codes_2))
if min_len == 0:
return np.zeros(SAMPLE_RATE, dtype=np.float32)
codes = [
torch.tensor(codes_0[:min_len], device=model.device).unsqueeze(0),
torch.tensor(codes_1[:min_len], device=model.device).unsqueeze(0),
torch.tensor(codes_2[:min_len], device=model.device).unsqueeze(0),
]
with torch.no_grad():
audio = snac.decode(codes).squeeze().cpu().numpy()
return audio.astype(np.float32)
except ImportError:
logger.error("snac package not installed — pip install snac")
raise RuntimeError("snac package required for Orpheus audio decoding")

View file

@ -1,385 +0,0 @@
"""
German TTS Service - Piper TTS (local, fast) with Edge TTS fallback.
Primary: Piper TTS - 100% local, DSGVO-konform, very fast
Fallback: Edge TTS - Cloud-based (Microsoft), high quality but sends data externally
"""
import logging
import tempfile
import os
import asyncio
from dataclasses import dataclass
from typing import Optional
from pathlib import Path
import numpy as np
import soundfile as sf
logger = logging.getLogger(__name__)
# Paths for Piper models
PIPER_VOICES_DIR = Path(__file__).parent.parent / "piper_voices"
# Available German voices
PIPER_VOICES = {
# === LOCAL PIPER VOICES (Primary - 100% local) ===
"de_thorsten": {
"type": "piper",
"model": "thorsten_medium.onnx",
"name": "Thorsten",
"description": "Deutsche Männerstimme (lokal, schnell)",
"language": "de",
"gender": "male",
"local": True,
},
"de_kerstin": {
"type": "piper",
"model": "kerstin_low.onnx",
"name": "Kerstin",
"description": "Deutsche Frauenstimme (lokal, schnell)",
"language": "de",
"gender": "female",
"local": True,
},
# === EDGE TTS VOICES (Fallback - Cloud) ===
"de_katja": {
"type": "edge",
"edge_voice": "de-DE-KatjaNeural",
"name": "Katja",
"description": "Deutsche Frauenstimme (Cloud)",
"language": "de",
"gender": "female",
"local": False,
},
"de_conrad": {
"type": "edge",
"edge_voice": "de-DE-ConradNeural",
"name": "Conrad",
"description": "Deutsche Männerstimme (Cloud)",
"language": "de",
"gender": "male",
"local": False,
},
"de_amala": {
"type": "edge",
"edge_voice": "de-DE-AmalaNeural",
"name": "Amala",
"description": "Deutsche Frauenstimme jung (Cloud)",
"language": "de",
"gender": "female",
"local": False,
},
"de_florian": {
"type": "edge",
"edge_voice": "de-DE-FlorianNeural",
"name": "Florian",
"description": "Deutsche Männerstimme jung (Cloud)",
"language": "de",
"gender": "male",
"local": False,
},
# Legacy alias - maps to local Thorsten
"de_anna": {
"type": "piper",
"model": "thorsten_medium.onnx",
"name": "Anna (→ Thorsten)",
"description": "Alias für Thorsten (lokal)",
"language": "de",
"gender": "male",
"local": True,
},
}
DEFAULT_PIPER_VOICE = "de_thorsten"
# Cached Piper voice instances (one per model)
_piper_voices: dict = {}
_piper_available = None
_edge_available = None
def _get_piper_model_path(model_name: str) -> Path:
"""Get full path to a Piper model."""
return PIPER_VOICES_DIR / model_name
def check_piper_available() -> bool:
"""Check if Piper TTS is available."""
global _piper_available
if _piper_available is not None:
return _piper_available
try:
from piper import PiperVoice
model_path = _get_piper_model_path("thorsten_medium.onnx")
if model_path.exists():
_piper_available = True
logger.info(f"Piper TTS available with model: {model_path}")
else:
_piper_available = False
logger.warning(f"Piper model not found: {model_path}")
except ImportError as e:
_piper_available = False
logger.warning(f"Piper TTS not installed: {e}")
return _piper_available
def _check_edge_available() -> bool:
"""Check if Edge TTS is available."""
global _edge_available
if _edge_available is not None:
return _edge_available
try:
import edge_tts
_edge_available = True
logger.info("Edge TTS available as fallback")
except ImportError:
_edge_available = False
logger.warning("Edge TTS not installed")
return _edge_available
def is_piper_loaded() -> bool:
"""Check if any TTS is available."""
return check_piper_available() or _check_edge_available()
def _get_piper_voice(model_name: str = "thorsten_medium.onnx"):
"""Get or create cached Piper voice instance for a specific model."""
global _piper_voices
if model_name in _piper_voices:
return _piper_voices[model_name]
if not check_piper_available():
return None
try:
from piper import PiperVoice
model_path = _get_piper_model_path(model_name)
config_path = _get_piper_model_path(f"{model_name}.json")
logger.info(f"Loading Piper voice from {model_path}")
voice = PiperVoice.load(str(model_path), str(config_path))
_piper_voices[model_name] = voice
logger.info(f"Piper voice {model_name} loaded successfully")
return voice
except Exception as e:
logger.error(f"Failed to load Piper voice {model_name}: {e}")
return None
@dataclass
class PiperSynthesisResult:
"""Result of TTS synthesis."""
audio: np.ndarray
sample_rate: int
duration: float
voice: str
async def _synthesize_with_piper(
text: str,
voice_id: str = "de_thorsten",
length_scale: float = 1.0,
) -> PiperSynthesisResult:
"""Synthesize using local Piper TTS."""
# Get the model name for this voice
voice_config = PIPER_VOICES.get(voice_id, PIPER_VOICES["de_thorsten"])
model_name = voice_config.get("model", "thorsten_medium.onnx")
piper_voice = _get_piper_voice(model_name)
if piper_voice is None:
raise RuntimeError(f"Piper voice {voice_id} not available")
logger.debug(f"Piper synthesizing with {voice_id}: \"{text[:50]}...\"")
# Piper uses length_scale directly (1.0 = normal, >1 = slower)
# Run in thread pool to not block async
loop = asyncio.get_event_loop()
def _synth():
audio_data = []
for audio_chunk in piper_voice.synthesize_stream_raw(text, length_scale=length_scale):
audio_data.append(audio_chunk)
return b"".join(audio_data)
audio_bytes = await loop.run_in_executor(None, _synth)
# Convert to numpy (16-bit PCM)
audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
sample_rate = piper_voice.config.sample_rate
duration = len(audio) / sample_rate
logger.debug(f"Piper synthesis complete: {duration:.2f}s, {sample_rate}Hz")
return PiperSynthesisResult(
audio=audio,
sample_rate=sample_rate,
duration=duration,
voice=voice_id,
)
async def _synthesize_with_edge(
text: str,
edge_voice: str,
length_scale: float = 1.0,
) -> PiperSynthesisResult:
"""Synthesize using Edge TTS (cloud fallback)."""
import edge_tts
logger.debug(f"Edge TTS synthesizing: \"{text[:50]}...\" with voice={edge_voice}")
# Convert length_scale to rate string
rate_percent = int((1.0 / length_scale - 1.0) * 100)
rate_str = f"{rate_percent:+d}%"
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file:
tmp_path = tmp_file.name
try:
communicate = edge_tts.Communicate(text, edge_voice, rate=rate_str)
await communicate.save(tmp_path)
audio, sample_rate = sf.read(tmp_path)
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
audio = audio.astype(np.float32)
duration = len(audio) / sample_rate
logger.debug(f"Edge TTS synthesis complete: {duration:.2f}s, {sample_rate}Hz")
return PiperSynthesisResult(
audio=audio,
sample_rate=sample_rate,
duration=duration,
voice=edge_voice,
)
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
async def synthesize_piper(
text: str,
voice: str = DEFAULT_PIPER_VOICE,
length_scale: float = 1.0,
) -> PiperSynthesisResult:
"""
Synthesize speech - uses local Piper if available, falls back to Edge TTS.
Args:
text: Text to synthesize
voice: Voice ID (e.g., "de_thorsten", "de_katja")
length_scale: Speed control (1.0 = normal, >1 = slower, <1 = faster)
Returns:
PiperSynthesisResult with audio data
"""
if not text.strip():
raise ValueError("Text cannot be empty")
# Get voice config
if voice not in PIPER_VOICES:
logger.warning(f"Unknown voice: {voice}, using default {DEFAULT_PIPER_VOICE}")
voice = DEFAULT_PIPER_VOICE
voice_config = PIPER_VOICES[voice]
voice_type = voice_config.get("type", "piper")
# Try local Piper first for piper-type voices
if voice_type == "piper" and check_piper_available():
try:
return await _synthesize_with_piper(text, voice, length_scale)
except Exception as e:
logger.warning(f"Piper synthesis failed, trying Edge fallback: {e}")
# Use Edge TTS for edge-type voices or as fallback
if _check_edge_available():
edge_voice = voice_config.get("edge_voice", "de-DE-ConradNeural")
if voice_type == "piper":
# Fallback: use appropriate Edge voice based on gender
gender = voice_config.get("gender", "male")
edge_voice = "de-DE-KatjaNeural" if gender == "female" else "de-DE-ConradNeural"
return await _synthesize_with_edge(text, edge_voice, length_scale)
raise RuntimeError("No TTS backend available (neither Piper nor Edge TTS)")
def list_piper_voices() -> list[dict]:
"""List all available German voices."""
voices = []
piper_available = check_piper_available()
edge_available = _check_edge_available()
for voice_id, config in PIPER_VOICES.items():
# Skip legacy alias
if voice_id == "de_anna":
continue
voice_type = config.get("type", "piper")
is_available = (voice_type == "piper" and piper_available) or \
(voice_type == "edge" and edge_available)
voices.append({
"id": voice_id,
"name": config["name"],
"description": config["description"],
"language": config["language"],
"gender": config.get("gender", "unknown"),
"local": config.get("local", False),
"installed": is_available,
"loaded": is_available,
})
# Sort: local voices first
voices.sort(key=lambda v: (not v["local"], v["id"]))
return voices
def get_piper_voice(voice_id: str) -> Optional[dict]:
"""Get voice configuration by ID."""
if voice_id not in PIPER_VOICES:
return None
config = PIPER_VOICES[voice_id]
voice_type = config.get("type", "piper")
piper_available = check_piper_available()
edge_available = _check_edge_available()
is_available = (voice_type == "piper" and piper_available) or \
(voice_type == "edge" and edge_available)
return {
"id": voice_id,
"name": config["name"],
"description": config["description"],
"language": config["language"],
"gender": config.get("gender", "unknown"),
"local": config.get("local", False),
"installed": is_available,
"loaded": is_available,
}
async def download_piper_voice(voice_id: str) -> bool:
"""Check if voice is available."""
if voice_id not in PIPER_VOICES:
return False
config = PIPER_VOICES[voice_id]
voice_type = config.get("type", "piper")
if voice_type == "piper":
return check_piper_available()
elif voice_type == "edge":
return _check_edge_available()
return False

View file

@ -1,275 +0,0 @@
"""
Voice Manager for registering and managing custom voices.
Handles pre-defined voices from the voices/ directory and runtime-registered voices.
"""
import json
import logging
import os
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# Base directory for voices
VOICES_DIR = Path(__file__).parent.parent / "voices"
# Registry file for custom voices
REGISTRY_FILE = VOICES_DIR / "registry.json"
@dataclass
class CustomVoice:
"""Custom voice registration."""
id: str
name: str
description: str
audio_path: str
transcript: str
created_at: str # ISO format timestamp
class VoiceManager:
"""Manages custom voice registrations for F5-TTS."""
def __init__(self, voices_dir: Path = VOICES_DIR):
self.voices_dir = voices_dir
self.registry_file = voices_dir / "registry.json"
self._voices: dict[str, CustomVoice] = {}
self._load_registry()
self._scan_predefined_voices()
def _load_registry(self) -> None:
"""Load voice registry from disk."""
if not self.registry_file.exists():
logger.info("No voice registry found, starting fresh")
return
try:
with open(self.registry_file, "r") as f:
data = json.load(f)
for voice_id, voice_data in data.items():
# Verify audio file exists
if Path(voice_data["audio_path"]).exists():
self._voices[voice_id] = CustomVoice(**voice_data)
else:
logger.warning(
f"Voice '{voice_id}' audio file not found: {voice_data['audio_path']}"
)
logger.info(f"Loaded {len(self._voices)} custom voices from registry")
except Exception as e:
logger.error(f"Failed to load voice registry: {e}")
def _save_registry(self) -> None:
"""Save voice registry to disk."""
try:
data = {
voice_id: asdict(voice)
for voice_id, voice in self._voices.items()
}
with open(self.registry_file, "w") as f:
json.dump(data, f, indent=2)
logger.info("Voice registry saved")
except Exception as e:
logger.error(f"Failed to save voice registry: {e}")
def _scan_predefined_voices(self) -> None:
"""Scan voices directory for pre-defined voices."""
if not self.voices_dir.exists():
return
# Look for voice directories with audio + transcript
for voice_dir in self.voices_dir.iterdir():
if not voice_dir.is_dir():
continue
voice_id = voice_dir.name
if voice_id in self._voices:
continue # Already registered
# Look for audio file
audio_file = None
for ext in [".wav", ".mp3", ".m4a", ".flac"]:
candidate = voice_dir / f"reference{ext}"
if candidate.exists():
audio_file = candidate
break
# Look for transcript
transcript_file = voice_dir / "transcript.txt"
if not transcript_file.exists():
continue
if not audio_file:
logger.warning(f"No reference audio found in {voice_dir}")
continue
# Load transcript
try:
transcript = transcript_file.read_text().strip()
except Exception as e:
logger.warning(f"Failed to read transcript for {voice_id}: {e}")
continue
# Load metadata if exists
metadata_file = voice_dir / "metadata.json"
name = voice_id
description = f"Pre-defined voice: {voice_id}"
if metadata_file.exists():
try:
with open(metadata_file, "r") as f:
metadata = json.load(f)
name = metadata.get("name", name)
description = metadata.get("description", description)
except Exception:
pass
# Register pre-defined voice
from datetime import datetime
self._voices[voice_id] = CustomVoice(
id=voice_id,
name=name,
description=description,
audio_path=str(audio_file),
transcript=transcript,
created_at=datetime.now().isoformat(),
)
logger.info(f"Found pre-defined voice: {voice_id}")
def register_voice(
self,
voice_id: str,
name: str,
description: str,
audio_bytes: bytes,
transcript: str,
audio_extension: str = ".wav",
) -> CustomVoice:
"""
Register a new custom voice.
Args:
voice_id: Unique voice identifier
name: Display name
description: Voice description
audio_bytes: Reference audio data
transcript: Transcript of the reference audio
audio_extension: Audio file extension
Returns:
Registered CustomVoice
Raises:
ValueError: If voice_id already exists
"""
if voice_id in self._voices:
raise ValueError(f"Voice '{voice_id}' already exists")
# Validate voice_id format
if not voice_id.replace("_", "").replace("-", "").isalnum():
raise ValueError("Voice ID must be alphanumeric (with _ or -)")
# Create voice directory
voice_dir = self.voices_dir / voice_id
voice_dir.mkdir(parents=True, exist_ok=True)
# Save audio file
audio_path = voice_dir / f"reference{audio_extension}"
with open(audio_path, "wb") as f:
f.write(audio_bytes)
# Save transcript
transcript_file = voice_dir / "transcript.txt"
with open(transcript_file, "w") as f:
f.write(transcript)
# Create voice entry
from datetime import datetime
voice = CustomVoice(
id=voice_id,
name=name,
description=description,
audio_path=str(audio_path),
transcript=transcript,
created_at=datetime.now().isoformat(),
)
# Save metadata
metadata_file = voice_dir / "metadata.json"
with open(metadata_file, "w") as f:
json.dump(
{"name": name, "description": description},
f,
indent=2,
)
# Add to registry
self._voices[voice_id] = voice
self._save_registry()
logger.info(f"Registered new voice: {voice_id}")
return voice
def get_voice(self, voice_id: str) -> Optional[CustomVoice]:
"""Get a voice by ID."""
return self._voices.get(voice_id)
def delete_voice(self, voice_id: str) -> bool:
"""
Delete a custom voice.
Args:
voice_id: Voice to delete
Returns:
True if deleted, False if not found
"""
if voice_id not in self._voices:
return False
voice = self._voices[voice_id]
# Remove voice directory
voice_dir = self.voices_dir / voice_id
if voice_dir.exists():
import shutil
try:
shutil.rmtree(voice_dir)
except Exception as e:
logger.error(f"Failed to delete voice directory: {e}")
# Remove from registry
del self._voices[voice_id]
self._save_registry()
logger.info(f"Deleted voice: {voice_id}")
return True
def list_voices(self) -> list[CustomVoice]:
"""List all registered custom voices."""
return list(self._voices.values())
def voice_exists(self, voice_id: str) -> bool:
"""Check if a voice exists."""
return voice_id in self._voices
# Global singleton instance
_voice_manager: Optional[VoiceManager] = None
def get_voice_manager() -> VoiceManager:
"""Get the global VoiceManager instance."""
global _voice_manager
if _voice_manager is None:
_voice_manager = VoiceManager()
return _voice_manager

View file

@ -1,114 +0,0 @@
"""
VRAM Manager Automatic model unloading after idle timeout.
Tracks last usage time per model and unloads after configurable timeout.
Designed for shared GPU environments (multiple services on one RTX 3090).
Usage in a service:
from vram_manager import VramManager
vram = VramManager(idle_timeout=300) # 5 min
# Before using a model
vram.touch()
# Call periodically (e.g., from health check or background task)
vram.check_idle(unload_fn=my_unload_function)
"""
import os
import time
import logging
import threading
from typing import Optional, Callable
logger = logging.getLogger(__name__)
DEFAULT_IDLE_TIMEOUT = int(os.getenv("VRAM_IDLE_TIMEOUT", "300")) # 5 minutes
class VramManager:
def __init__(self, idle_timeout: int = DEFAULT_IDLE_TIMEOUT, service_name: str = "unknown"):
self.idle_timeout = idle_timeout
self.service_name = service_name
self.last_used: float = 0.0
self.model_loaded: bool = False
self._lock = threading.Lock()
self._timer: Optional[threading.Timer] = None
def touch(self):
"""Mark the model as recently used. Call before/after each inference."""
with self._lock:
self.last_used = time.time()
self.model_loaded = True
self._schedule_check()
def mark_loaded(self):
"""Mark that a model has been loaded into VRAM."""
with self._lock:
self.model_loaded = True
self.last_used = time.time()
self._schedule_check()
logger.info(f"[{self.service_name}] Model loaded, idle timeout: {self.idle_timeout}s")
def mark_unloaded(self):
"""Mark that a model has been unloaded from VRAM."""
with self._lock:
self.model_loaded = False
if self._timer:
self._timer.cancel()
self._timer = None
logger.info(f"[{self.service_name}] Model unloaded, VRAM freed")
def is_idle(self) -> bool:
"""Check if the model has been idle longer than the timeout."""
if not self.model_loaded:
return False
return (time.time() - self.last_used) > self.idle_timeout
def seconds_until_unload(self) -> Optional[float]:
"""Seconds until the model will be unloaded, or None if not loaded."""
if not self.model_loaded:
return None
remaining = self.idle_timeout - (time.time() - self.last_used)
return max(0, remaining)
def check_and_unload(self, unload_fn: Callable[[], None]) -> bool:
"""Check if idle and unload if so. Returns True if unloaded."""
if self.is_idle():
logger.info(f"[{self.service_name}] Idle for >{self.idle_timeout}s, unloading model...")
try:
unload_fn()
self.mark_unloaded()
return True
except Exception as e:
logger.error(f"[{self.service_name}] Failed to unload: {e}")
return False
def _schedule_check(self):
"""Schedule an idle check after the timeout period."""
if self._timer:
self._timer.cancel()
self._timer = threading.Timer(
self.idle_timeout + 5, # Small buffer
self._auto_check,
)
self._timer.daemon = True
self._timer.start()
def _auto_check(self):
"""Auto-triggered idle check (called by timer)."""
# This is just a log — actual unloading needs the unload_fn
# which depends on the service. The service should call check_and_unload.
if self.is_idle():
logger.info(f"[{self.service_name}] Model idle for >{self.idle_timeout}s — ready to unload")
def status(self) -> dict:
"""Get current VRAM manager status."""
return {
"model_loaded": self.model_loaded,
"idle_seconds": round(time.time() - self.last_used, 1) if self.model_loaded else None,
"idle_timeout": self.idle_timeout,
"seconds_until_unload": round(self.seconds_until_unload(), 1) if self.model_loaded else None,
}

View file

@ -1,205 +0,0 @@
"""
Zonos TTS Expressive multilingual speech synthesis by Zyphra.
Trained on 200k hours of speech data with explicit German support.
Fine-grained control over pitch, speaking rate, and emotions.
Model: Zyphra/Zonos-v0.1-transformer (HuggingFace)
VRAM: ~5 GB (fits comfortably on RTX 3090)
"""
import logging
import asyncio
import os
from dataclasses import dataclass
from typing import Optional
import numpy as np
# Disable torch.compile (requires MSVC cl.exe on Windows which we don't have)
os.environ["TORCHDYNAMO_DISABLE"] = "1"
logger = logging.getLogger(__name__)
# Lazy-loaded model state
_model = None
_loaded = False
MODEL_ID = "Zyphra/Zonos-v0.1-transformer"
SAMPLE_RATE = 44100 # Zonos outputs 44.1 kHz audio
# Emotion presets for the interview context
EMOTION_PRESETS = {
"neutral": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0], # neutral dominant
"friendly": [0.5, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5], # happiness + neutral
"warm": [0.3, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.7], # slight warmth
"curious": [0.2, 0.1, 0.0, 0.0, 0.0, 0.0, 0.0, 0.7], # interested
}
DEFAULT_EMOTION = "friendly"
@dataclass
class ZonosResult:
audio: np.ndarray
sample_rate: int
duration: float
emotion: str
def is_zonos_loaded() -> bool:
return _loaded
def get_zonos_model():
"""Load the Zonos model (lazy, first call only)."""
global _model, _loaded
if _loaded:
return _model
logger.info(f"Loading Zonos model: {MODEL_ID}")
try:
import torch
# Zonos provides its own loader
# Try the official zonos package first, fall back to transformers
try:
from zonos.model import Zonos
_model = Zonos.from_pretrained(MODEL_ID, device="cuda")
except ImportError:
# If zonos package not installed, use transformers
logger.info("zonos package not found, trying transformers loading")
from transformers import AutoModel
_model = AutoModel.from_pretrained(
MODEL_ID,
torch_dtype=torch.float32,
trust_remote_code=True,
).to("cuda")
_loaded = True
logger.info("Zonos model loaded successfully")
return _model
except Exception as e:
logger.error(f"Failed to load Zonos model: {e}")
raise RuntimeError(f"Failed to load Zonos model: {e}")
def unload_zonos():
"""Free VRAM by unloading the model."""
global _model, _loaded
import torch
if _model is not None:
del _model
_model = None
_loaded = False
torch.cuda.empty_cache()
logger.info("Zonos model unloaded")
async def synthesize_zonos(
text: str,
language: str = "de",
emotion: str = DEFAULT_EMOTION,
speaking_rate: float = 13.0,
pitch_std: float = 20.0,
speaker_audio: Optional[bytes] = None,
) -> ZonosResult:
"""
Synthesize speech using Zonos TTS.
Args:
text: Text to synthesize
language: Language code (default: 'de' for German)
emotion: Emotion preset name or custom emotion vector
speaking_rate: Speaking rate in phonemes/sec (default 13.0, range ~8-20)
pitch_std: Pitch variation in Hz (default 20.0, range ~5-50)
speaker_audio: Optional reference audio bytes for voice cloning
Returns ZonosResult with audio as numpy float32 array.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
_synthesize_sync,
text,
language,
emotion,
speaking_rate,
pitch_std,
speaker_audio,
)
def _synthesize_sync(
text: str,
language: str,
emotion: str,
speaking_rate: float,
pitch_std: float,
speaker_audio: Optional[bytes],
) -> ZonosResult:
"""Synchronous synthesis (runs in thread pool)."""
import torch
from zonos.conditioning import make_cond_dict
model = get_zonos_model()
# Resolve emotion preset
emotion_values = EMOTION_PRESETS.get(emotion, EMOTION_PRESETS["friendly"])
# Build speaker embedding if reference audio provided
speaker_embedding = None
if speaker_audio:
speaker_embedding = _embed_speaker(speaker_audio, model)
# Map language codes: Zonos expects espeak language codes like 'de' or 'en-us'
lang_map = {"de": "de", "en": "en-us", "fr": "fr-fr", "es": "es", "it": "it"}
espeak_lang = lang_map.get(language, language)
# Build conditioning using Zonos's own helper
cond = make_cond_dict(
text=text,
language=espeak_lang,
emotion=emotion_values,
speaking_rate=speaking_rate,
pitch_std=pitch_std,
speaker=speaker_embedding,
)
# Generate
with torch.no_grad():
conditioning = model.prepare_conditioning(cond)
codes = model.generate(conditioning)
audio = model.autoencoder.decode(codes).squeeze().cpu().numpy()
audio = audio.astype(np.float32)
duration = len(audio) / SAMPLE_RATE
return ZonosResult(
audio=audio,
sample_rate=SAMPLE_RATE,
duration=duration,
emotion=emotion,
)
def _embed_speaker(audio_bytes: bytes, model) -> "torch.Tensor":
"""Create speaker embedding from reference audio bytes."""
import torch
import io
import soundfile as sf
audio_data, sr = sf.read(io.BytesIO(audio_bytes))
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1) # mono
audio_tensor = torch.tensor(audio_data, dtype=torch.float32, device="cuda").unsqueeze(0)
return model.make_speaker_embedding(audio_tensor, sr)

View file

@ -1,35 +0,0 @@
# Web Framework
fastapi>=0.115.0
uvicorn[standard]>=0.34.0
python-multipart>=0.0.20
# TTS Models (MLX optimized for Apple Silicon)
f5-tts-mlx>=0.2.6
mlx-audio>=0.1.0
mlx>=0.21.0
# Kokoro dependencies (phonemizer)
misaki[en]>=0.9.0
# Audio Processing
soundfile>=0.13.0
scipy>=1.11.0
numpy>=1.26.0
pydub>=0.25.1
tqdm>=4.67.0
# Utilities
aiofiles>=24.1.0
# External Auth (mana-core-auth integration)
httpx>=0.27.0
# ── Orpheus TTS (German high-quality) ──
# Uses transformers + SNAC codec for audio decoding
transformers>=4.44.0
snac>=1.2.0
torch>=2.1.0
# ── Zonos TTS (expressive multilingual by Zyphra) ──
# Install via: pip install git+https://github.com/Zyphra/Zonos.git
# (the 'zonos' package pulls its own deps including torch, encodec, etc.)

View file

@ -1,74 +0,0 @@
#!/usr/bin/env bash
#
# Compare Orpheus vs Zonos vs Piper for German interview questions.
# Run this after both models are installed on the GPU box.
#
# Usage: ./compare-german-tts.sh [TTS_URL] [API_KEY]
#
# Generates WAV files in ./comparison/ for side-by-side listening.
set -euo pipefail
TTS_URL="${1:-https://gpu-tts.mana.how}"
API_KEY="${2:-${MANA_TTS_API_KEY:-}}"
OUT="./comparison"
mkdir -p "$OUT"
# Sample interview questions (subset)
QUESTIONS=(
"Was machst du beruflich?"
"Wo lebst du?"
"Welche Sprachen sprichst du?"
"Erzähl kurz von dir."
"Wann stehst du normalerweise auf?"
"Was sind deine Interessen und Hobbys?"
"Was sind deine aktuellen Ziele?"
)
AUTH_HEADER=""
if [ -n "$API_KEY" ]; then
AUTH_HEADER="Authorization: Bearer $API_KEY"
fi
echo "=== German TTS Comparison ==="
echo "Server: $TTS_URL"
echo "Output: $OUT/"
echo ""
for i in "${!QUESTIONS[@]}"; do
q="${QUESTIONS[$i]}"
idx=$(printf "%02d" $((i + 1)))
echo "[$idx] \"$q\""
# Piper (baseline)
echo " → Piper..."
curl -s -X POST "$TTS_URL/synthesize/auto" \
${AUTH_HEADER:+-H "$AUTH_HEADER"} \
-H "Content-Type: application/json" \
-d "{\"text\": \"$q\", \"voice\": \"de_kerstin\"}" \
-o "$OUT/${idx}_piper.wav" 2>/dev/null || echo " ✗ Piper failed"
# Orpheus
echo " → Orpheus..."
curl -s -X POST "$TTS_URL/synthesize/orpheus" \
${AUTH_HEADER:+-H "$AUTH_HEADER"} \
-H "Content-Type: application/json" \
-d "{\"text\": \"$q\", \"voice\": \"tara\"}" \
-o "$OUT/${idx}_orpheus.wav" 2>/dev/null || echo " ✗ Orpheus failed"
# Zonos (friendly)
echo " → Zonos..."
curl -s -X POST "$TTS_URL/synthesize/zonos" \
${AUTH_HEADER:+-H "$AUTH_HEADER"} \
-H "Content-Type: application/json" \
-d "{\"text\": \"$q\", \"language\": \"de\", \"emotion\": \"friendly\"}" \
-o "$OUT/${idx}_zonos.wav" 2>/dev/null || echo " ✗ Zonos failed"
echo ""
done
echo "Done! Compare files in $OUT/"
echo ""
echo "Quick listen (macOS):"
echo " for f in $OUT/01_*.wav; do echo \"\$f\"; afplay \"\$f\"; sleep 1; done"

View file

@ -1,17 +0,0 @@
"""mana-tts service runner."""
import os
import sys
os.chdir(r"C:\mana\services\mana-tts")
sys.path.insert(0, r"C:\mana\services\mana-tts")
# Load .env file
from dotenv import load_dotenv
load_dotenv(r"C:\mana\services\mana-tts\.env")
# Redirect stdout/stderr to log file
log = open(r"C:\mana\services\mana-tts\service.log", "w", buffering=1)
sys.stdout = log
sys.stderr = log
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=3022, log_level="info")