managarten/apps/transcriber/legacy/api_server.py
Till-JS 4b08c41547 feat(transcriber): Add YouTube transcriber app to monorepo
Integrate new transcriber application for AI-powered YouTube video
transcription with full monorepo structure and Groq Whisper API support.

## App Structure
- apps/transcriber/apps/backend - NestJS API server (port 3006)
- apps/transcriber/apps/web - SvelteKit web application
- apps/transcriber/apps/landing - Astro marketing/content site
- apps/transcriber/apps/mobile - Expo React Native app
- apps/transcriber/packages/shared-types - Shared TypeScript types

## Backend Features
- YouTube video download via yt-dlp (child_process)
- Ultra-fast transcription via Groq Whisper API (~300x realtime)
- Fallback to local Whisper for offline use
- Job queue with background processing
- Real-time progress updates via WebSocket (Socket.io)
- Playlist management for batch processing
- Health check endpoints

## API Endpoints
- POST /transcription - Start transcription job
- GET /transcription - List all jobs
- GET /transcription/:id - Get job status
- DELETE /transcription/:id - Cancel job
- GET /transcription/stats - Statistics
- GET /whisper/models - Available models
- GET/POST/DELETE /playlist - Playlist management
- GET /health - Health checks

## Whisper Models
- Groq: whisper-large-v3-turbo (fast, $0.04/hr)
- Groq: whisper-large-v3 (accurate, $0.111/hr)
- Local: tiny, base, small, medium, large

## Monorepo Integration
- Added to pnpm workspace via apps/*/apps/* pattern
- Root scripts: transcriber:dev, dev:transcriber:*
- Package naming: @transcriber/{backend,web,landing,mobile}
- Turbo tasks: dev, build, lint, type-check
- CLAUDE.md documentation

## Technology Stack
- Backend: NestJS 10, TypeScript, Socket.io
- Web: SvelteKit 2, Svelte 5, Tailwind CSS
- Landing: Astro 4, Solid.js, Tailwind CSS
- Mobile: Expo 52, React Native, NativeWind, Zustand
- Transcription: Groq Whisper API (OpenAI-compatible)

## Migration from Python
- Original Python/FastAPI code preserved in legacy/
- Full rewrite to TypeScript/NestJS
- Same functionality with improved architecture

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-27 14:21:49 +01:00

372 lines
No EOL
12 KiB
Python

#!/usr/bin/env python3
"""
FastAPI Server für YouTube Transcriber Web Interface
"""
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from typing import List, Optional, Dict, Any
import asyncio
import json
import os
from pathlib import Path
from datetime import datetime
import uuid
from enum import Enum
# Import existing transcriber modules
from transcriber_v4_parallel import ParallelTranscriber
import whisper
app = FastAPI(title="YouTube Transcriber API", version="1.0.0")
# CORS middleware for Astro frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:4321", "http://localhost:3000"], # Astro dev server
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global state
class JobStatus(str, Enum):
PENDING = "pending"
DOWNLOADING = "downloading"
TRANSCRIBING = "transcribing"
COMPLETED = "completed"
FAILED = "failed"
class TranscriptionJob:
def __init__(self, job_id: str, url: str, model: str = "base", language: str = "de"):
self.id = job_id
self.url = url
self.model = model
self.language = language
self.status = JobStatus.PENDING
self.progress = 0
self.created_at = datetime.now()
self.completed_at = None
self.transcript_path = None
self.error = None
self.video_info = {}
# Store active jobs
active_jobs: Dict[str, TranscriptionJob] = {}
websocket_connections: List[WebSocket] = []
# Request/Response models
class TranscribeRequest(BaseModel):
url: HttpUrl
model: str = "base"
language: str = "de"
class PlaylistRequest(BaseModel):
name: str
description: Optional[str] = None
urls: List[HttpUrl]
class JobResponse(BaseModel):
id: str
url: str
status: str
progress: int
created_at: datetime
completed_at: Optional[datetime]
transcript_path: Optional[str]
error: Optional[str]
video_info: Dict[str, Any]
# WebSocket manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
for connection in self.active_connections:
try:
await connection.send_json(message)
except:
pass
manager = ConnectionManager()
# API Endpoints
@app.get("/")
async def root():
return {"message": "YouTube Transcriber API", "version": "1.0.0"}
@app.post("/api/transcribe", response_model=JobResponse)
async def start_transcription(request: TranscribeRequest, background_tasks: BackgroundTasks):
"""Start a new transcription job"""
job_id = str(uuid.uuid4())
job = TranscriptionJob(job_id, str(request.url), request.model, request.language)
active_jobs[job_id] = job
# Start transcription in background
background_tasks.add_task(process_transcription, job)
return JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
@app.get("/api/status/{job_id}", response_model=JobResponse)
async def get_job_status(job_id: str):
"""Get status of a transcription job"""
if job_id not in active_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = active_jobs[job_id]
return JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
@app.get("/api/jobs")
async def list_jobs():
"""List all transcription jobs"""
return [
JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
for job in active_jobs.values()
]
@app.get("/api/transcripts")
async def list_transcripts():
"""List all available transcripts"""
transcript_dir = Path("transcripts")
transcripts = []
if transcript_dir.exists():
for playlist_dir in transcript_dir.iterdir():
if playlist_dir.is_dir():
for channel_dir in playlist_dir.iterdir():
if channel_dir.is_dir():
for transcript_file in channel_dir.glob("*.txt"):
transcripts.append({
"playlist": playlist_dir.name,
"channel": channel_dir.name,
"filename": transcript_file.name,
"path": str(transcript_file),
"size": transcript_file.stat().st_size,
"modified": datetime.fromtimestamp(transcript_file.stat().st_mtime)
})
return transcripts
@app.get("/api/transcript/{transcript_path:path}")
async def get_transcript(transcript_path: str):
"""Get transcript content"""
file_path = Path(transcript_path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="Transcript not found")
return FileResponse(file_path)
@app.get("/api/playlists")
async def list_playlists():
"""List all playlists"""
playlist_dir = Path("playlists")
playlists = []
if playlist_dir.exists():
for category_dir in playlist_dir.iterdir():
if category_dir.is_dir():
for playlist_file in category_dir.glob("*.txt"):
urls = []
with open(playlist_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
urls.append(line)
playlists.append({
"category": category_dir.name,
"name": playlist_file.stem,
"path": str(playlist_file),
"url_count": len(urls),
"urls": urls
})
return playlists
@app.post("/api/playlists")
async def create_playlist(request: PlaylistRequest):
"""Create a new playlist"""
# Extract category and name from the playlist name (e.g., "tech/python_tutorials")
parts = request.name.split('/')
if len(parts) == 2:
category, name = parts
else:
category = "general"
name = request.name
playlist_dir = Path("playlists") / category
playlist_dir.mkdir(parents=True, exist_ok=True)
playlist_file = playlist_dir / f"{name}.txt"
with open(playlist_file, 'w') as f:
if request.description:
f.write(f"# {request.description}\n")
f.write("# Eine URL pro Zeile\n\n")
for url in request.urls:
f.write(f"{url}\n")
return {"message": "Playlist created", "path": str(playlist_file)}
@app.delete("/api/jobs/{job_id}")
async def cancel_job(job_id: str):
"""Cancel a transcription job"""
if job_id not in active_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = active_jobs[job_id]
job.status = JobStatus.FAILED
job.error = "Cancelled by user"
await manager.broadcast({
"type": "job_cancelled",
"job_id": job_id
})
return {"message": "Job cancelled"}
@app.websocket("/ws/progress")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket for real-time progress updates"""
await manager.connect(websocket)
try:
while True:
# Keep connection alive
await asyncio.sleep(1)
# Send heartbeat
await websocket.send_json({"type": "heartbeat"})
except WebSocketDisconnect:
manager.disconnect(websocket)
# Background task for processing
async def process_transcription(job: TranscriptionJob):
"""Process a transcription job"""
try:
# Update status
job.status = JobStatus.DOWNLOADING
await manager.broadcast({
"type": "job_update",
"job_id": job.id,
"status": job.status,
"progress": 10
})
# Initialize transcriber
transcriber = ParallelTranscriber(
model_size=job.model,
language=job.language,
max_downloads=1, # Single job
max_transcriptions=1
)
# Simulate processing (replace with actual transcriber call)
job.status = JobStatus.TRANSCRIBING
job.progress = 50
await manager.broadcast({
"type": "job_update",
"job_id": job.id,
"status": job.status,
"progress": job.progress
})
# TODO: Integrate actual transcription
# result = await transcriber.process_single(job.url)
# Mark as completed
job.status = JobStatus.COMPLETED
job.progress = 100
job.completed_at = datetime.now()
await manager.broadcast({
"type": "job_complete",
"job_id": job.id,
"status": job.status,
"progress": job.progress
})
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
await manager.broadcast({
"type": "job_error",
"job_id": job.id,
"error": job.error
})
@app.get("/api/models")
async def get_available_models():
"""Get available Whisper models"""
return {
"models": [
{"name": "tiny", "size": "39 MB", "speed": "~10x", "accuracy": "75%"},
{"name": "base", "size": "74 MB", "speed": "~7x", "accuracy": "85%"},
{"name": "small", "size": "244 MB", "speed": "~4x", "accuracy": "91%"},
{"name": "medium", "size": "769 MB", "speed": "~2x", "accuracy": "94%"},
{"name": "large", "size": "1.5 GB", "speed": "~1x", "accuracy": "96-98%"}
]
}
@app.get("/api/stats")
async def get_statistics():
"""Get system statistics"""
transcript_dir = Path("transcripts")
total_transcripts = 0
total_size = 0
if transcript_dir.exists():
for file in transcript_dir.rglob("*.txt"):
total_transcripts += 1
total_size += file.stat().st_size
return {
"total_transcripts": total_transcripts,
"total_size_mb": round(total_size / 1024 / 1024, 2),
"active_jobs": len([j for j in active_jobs.values() if j.status in [JobStatus.PENDING, JobStatus.DOWNLOADING, JobStatus.TRANSCRIBING]]),
"completed_jobs": len([j for j in active_jobs.values() if j.status == JobStatus.COMPLETED]),
"failed_jobs": len([j for j in active_jobs.values() if j.status == JobStatus.FAILED])
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)