managarten/apps-archived/wisekeep/legacy/api_server.py
Till JS 076e0c843d chore: restore archived apps (bauntown, news, reader, uload, wisekeep)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 01:08:51 +01:00

372 lines
No EOL
12 KiB
Python

#!/usr/bin/env python3
"""
FastAPI Server für YouTube Transcriber Web Interface
"""
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from typing import List, Optional, Dict, Any
import asyncio
import json
import os
from pathlib import Path
from datetime import datetime
import uuid
from enum import Enum
# Import existing transcriber modules
from transcriber_v4_parallel import ParallelTranscriber
import whisper
app = FastAPI(title="YouTube Transcriber API", version="1.0.0")
# CORS middleware for Astro frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:4321", "http://localhost:3000"], # Astro dev server
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global state
class JobStatus(str, Enum):
PENDING = "pending"
DOWNLOADING = "downloading"
TRANSCRIBING = "transcribing"
COMPLETED = "completed"
FAILED = "failed"
class TranscriptionJob:
def __init__(self, job_id: str, url: str, model: str = "base", language: str = "de"):
self.id = job_id
self.url = url
self.model = model
self.language = language
self.status = JobStatus.PENDING
self.progress = 0
self.created_at = datetime.now()
self.completed_at = None
self.transcript_path = None
self.error = None
self.video_info = {}
# Store active jobs
active_jobs: Dict[str, TranscriptionJob] = {}
websocket_connections: List[WebSocket] = []
# Request/Response models
class TranscribeRequest(BaseModel):
url: HttpUrl
model: str = "base"
language: str = "de"
class PlaylistRequest(BaseModel):
name: str
description: Optional[str] = None
urls: List[HttpUrl]
class JobResponse(BaseModel):
id: str
url: str
status: str
progress: int
created_at: datetime
completed_at: Optional[datetime]
transcript_path: Optional[str]
error: Optional[str]
video_info: Dict[str, Any]
# WebSocket manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
for connection in self.active_connections:
try:
await connection.send_json(message)
except:
pass
manager = ConnectionManager()
# API Endpoints
@app.get("/")
async def root():
return {"message": "YouTube Transcriber API", "version": "1.0.0"}
@app.post("/api/transcribe", response_model=JobResponse)
async def start_transcription(request: TranscribeRequest, background_tasks: BackgroundTasks):
"""Start a new transcription job"""
job_id = str(uuid.uuid4())
job = TranscriptionJob(job_id, str(request.url), request.model, request.language)
active_jobs[job_id] = job
# Start transcription in background
background_tasks.add_task(process_transcription, job)
return JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
@app.get("/api/status/{job_id}", response_model=JobResponse)
async def get_job_status(job_id: str):
"""Get status of a transcription job"""
if job_id not in active_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = active_jobs[job_id]
return JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
@app.get("/api/jobs")
async def list_jobs():
"""List all transcription jobs"""
return [
JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
for job in active_jobs.values()
]
@app.get("/api/transcripts")
async def list_transcripts():
"""List all available transcripts"""
transcript_dir = Path("transcripts")
transcripts = []
if transcript_dir.exists():
for playlist_dir in transcript_dir.iterdir():
if playlist_dir.is_dir():
for channel_dir in playlist_dir.iterdir():
if channel_dir.is_dir():
for transcript_file in channel_dir.glob("*.txt"):
transcripts.append({
"playlist": playlist_dir.name,
"channel": channel_dir.name,
"filename": transcript_file.name,
"path": str(transcript_file),
"size": transcript_file.stat().st_size,
"modified": datetime.fromtimestamp(transcript_file.stat().st_mtime)
})
return transcripts
@app.get("/api/transcript/{transcript_path:path}")
async def get_transcript(transcript_path: str):
"""Get transcript content"""
file_path = Path(transcript_path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="Transcript not found")
return FileResponse(file_path)
@app.get("/api/playlists")
async def list_playlists():
"""List all playlists"""
playlist_dir = Path("playlists")
playlists = []
if playlist_dir.exists():
for category_dir in playlist_dir.iterdir():
if category_dir.is_dir():
for playlist_file in category_dir.glob("*.txt"):
urls = []
with open(playlist_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
urls.append(line)
playlists.append({
"category": category_dir.name,
"name": playlist_file.stem,
"path": str(playlist_file),
"url_count": len(urls),
"urls": urls
})
return playlists
@app.post("/api/playlists")
async def create_playlist(request: PlaylistRequest):
"""Create a new playlist"""
# Extract category and name from the playlist name (e.g., "tech/python_tutorials")
parts = request.name.split('/')
if len(parts) == 2:
category, name = parts
else:
category = "general"
name = request.name
playlist_dir = Path("playlists") / category
playlist_dir.mkdir(parents=True, exist_ok=True)
playlist_file = playlist_dir / f"{name}.txt"
with open(playlist_file, 'w') as f:
if request.description:
f.write(f"# {request.description}\n")
f.write("# Eine URL pro Zeile\n\n")
for url in request.urls:
f.write(f"{url}\n")
return {"message": "Playlist created", "path": str(playlist_file)}
@app.delete("/api/jobs/{job_id}")
async def cancel_job(job_id: str):
"""Cancel a transcription job"""
if job_id not in active_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = active_jobs[job_id]
job.status = JobStatus.FAILED
job.error = "Cancelled by user"
await manager.broadcast({
"type": "job_cancelled",
"job_id": job_id
})
return {"message": "Job cancelled"}
@app.websocket("/ws/progress")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket for real-time progress updates"""
await manager.connect(websocket)
try:
while True:
# Keep connection alive
await asyncio.sleep(1)
# Send heartbeat
await websocket.send_json({"type": "heartbeat"})
except WebSocketDisconnect:
manager.disconnect(websocket)
# Background task for processing
async def process_transcription(job: TranscriptionJob):
"""Process a transcription job"""
try:
# Update status
job.status = JobStatus.DOWNLOADING
await manager.broadcast({
"type": "job_update",
"job_id": job.id,
"status": job.status,
"progress": 10
})
# Initialize transcriber
transcriber = ParallelTranscriber(
model_size=job.model,
language=job.language,
max_downloads=1, # Single job
max_transcriptions=1
)
# Simulate processing (replace with actual transcriber call)
job.status = JobStatus.TRANSCRIBING
job.progress = 50
await manager.broadcast({
"type": "job_update",
"job_id": job.id,
"status": job.status,
"progress": job.progress
})
# TODO: Integrate actual transcription
# result = await transcriber.process_single(job.url)
# Mark as completed
job.status = JobStatus.COMPLETED
job.progress = 100
job.completed_at = datetime.now()
await manager.broadcast({
"type": "job_complete",
"job_id": job.id,
"status": job.status,
"progress": job.progress
})
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
await manager.broadcast({
"type": "job_error",
"job_id": job.id,
"error": job.error
})
@app.get("/api/models")
async def get_available_models():
"""Get available Whisper models"""
return {
"models": [
{"name": "tiny", "size": "39 MB", "speed": "~10x", "accuracy": "75%"},
{"name": "base", "size": "74 MB", "speed": "~7x", "accuracy": "85%"},
{"name": "small", "size": "244 MB", "speed": "~4x", "accuracy": "91%"},
{"name": "medium", "size": "769 MB", "speed": "~2x", "accuracy": "94%"},
{"name": "large", "size": "1.5 GB", "speed": "~1x", "accuracy": "96-98%"}
]
}
@app.get("/api/stats")
async def get_statistics():
"""Get system statistics"""
transcript_dir = Path("transcripts")
total_transcripts = 0
total_size = 0
if transcript_dir.exists():
for file in transcript_dir.rglob("*.txt"):
total_transcripts += 1
total_size += file.stat().st_size
return {
"total_transcripts": total_transcripts,
"total_size_mb": round(total_size / 1024 / 1024, 2),
"active_jobs": len([j for j in active_jobs.values() if j.status in [JobStatus.PENDING, JobStatus.DOWNLOADING, JobStatus.TRANSCRIBING]]),
"completed_jobs": len([j for j in active_jobs.values() if j.status == JobStatus.COMPLETED]),
"failed_jobs": len([j for j in active_jobs.values() if j.status == JobStatus.FAILED])
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)