feat(wisekeep): rename transcriber app to wisekeep

Rebranding the transcriber application to better reflect its purpose:
AI-powered wisdom extraction from video content.

Changes:
- Renamed folder: apps/transcriber → apps/wisekeep
- Updated all package names to @wisekeep/* namespace:
  - @wisekeep/backend
  - @wisekeep/web
  - @wisekeep/landing
  - @wisekeep/mobile
  - @wisekeep/shared-types
- Updated root package.json scripts:
  - wisekeep:dev, dev:wisekeep:backend, dev:wisekeep:web, etc.
- Updated documentation in CLAUDE.md files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Till-JS 2025-11-27 14:44:44 +01:00
parent ea3285dcbb
commit cb5657579b
113 changed files with 28 additions and 24 deletions

View file

@ -0,0 +1,372 @@
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>YouTube Transcriber - Admin Dashboard</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #0f172a 0%, #1e293b 100%);
color: #e2e8f0;
min-height: 100vh;
}
.header {
background: rgba(15, 23, 42, 0.8);
backdrop-filter: blur(10px);
border-bottom: 1px solid rgba(148, 163, 184, 0.1);
padding: 1.5rem;
}
.header h1 {
font-size: 1.5rem;
font-weight: 600;
color: #60a5fa;
}
.container {
max-width: 1400px;
margin: 0 auto;
padding: 2rem;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 1.5rem;
margin-bottom: 2rem;
}
.stat-card {
background: rgba(30, 41, 59, 0.5);
backdrop-filter: blur(10px);
border: 1px solid rgba(148, 163, 184, 0.1);
border-radius: 12px;
padding: 1.5rem;
transition: transform 0.2s;
}
.stat-card:hover {
transform: translateY(-2px);
}
.stat-value {
font-size: 2.5rem;
font-weight: 700;
background: linear-gradient(135deg, #60a5fa, #a78bfa);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin-bottom: 0.5rem;
}
.stat-label {
color: #94a3b8;
font-size: 0.875rem;
text-transform: uppercase;
letter-spacing: 0.05em;
}
.quick-action {
background: rgba(30, 41, 59, 0.5);
backdrop-filter: blur(10px);
border: 1px solid rgba(148, 163, 184, 0.1);
border-radius: 12px;
padding: 2rem;
margin-bottom: 2rem;
}
.quick-action h2 {
margin-bottom: 1rem;
color: #cbd5e1;
}
.input-group {
display: flex;
gap: 1rem;
}
input {
flex: 1;
padding: 0.75rem 1rem;
background: rgba(15, 23, 42, 0.6);
border: 1px solid rgba(148, 163, 184, 0.2);
border-radius: 8px;
color: #e2e8f0;
font-size: 1rem;
}
input:focus {
outline: none;
border-color: #60a5fa;
box-shadow: 0 0 0 3px rgba(96, 165, 250, 0.1);
}
select {
padding: 0.75rem 1rem;
background: rgba(15, 23, 42, 0.6);
border: 1px solid rgba(148, 163, 184, 0.2);
border-radius: 8px;
color: #e2e8f0;
font-size: 1rem;
cursor: pointer;
}
button {
padding: 0.75rem 2rem;
background: linear-gradient(135deg, #3b82f6, #8b5cf6);
border: none;
border-radius: 8px;
color: white;
font-weight: 600;
cursor: pointer;
transition: all 0.2s;
}
button:hover {
transform: translateY(-1px);
box-shadow: 0 10px 20px rgba(59, 130, 246, 0.3);
}
button:active {
transform: translateY(0);
}
.jobs-list {
background: rgba(30, 41, 59, 0.5);
backdrop-filter: blur(10px);
border: 1px solid rgba(148, 163, 184, 0.1);
border-radius: 12px;
padding: 2rem;
}
.job-item {
background: rgba(15, 23, 42, 0.4);
border-radius: 8px;
padding: 1rem;
margin-bottom: 1rem;
display: flex;
justify-content: space-between;
align-items: center;
}
.job-status {
padding: 0.25rem 0.75rem;
border-radius: 9999px;
font-size: 0.875rem;
font-weight: 600;
}
.status-completed {
background: rgba(34, 197, 94, 0.2);
color: #4ade80;
}
.status-processing {
background: rgba(251, 191, 36, 0.2);
color: #fbbf24;
}
.status-failed {
background: rgba(239, 68, 68, 0.2);
color: #f87171;
}
.loader {
display: inline-block;
width: 20px;
height: 20px;
border: 3px solid rgba(96, 165, 250, 0.3);
border-radius: 50%;
border-top-color: #60a5fa;
animation: spin 1s ease-in-out infinite;
}
@keyframes spin {
to { transform: rotate(360deg); }
}
.nav-links {
display: flex;
gap: 2rem;
margin-top: 1rem;
}
.nav-links a {
color: #94a3b8;
text-decoration: none;
transition: color 0.2s;
}
.nav-links a:hover {
color: #60a5fa;
}
</style>
</head>
<body>
<div class="header">
<div class="container">
<h1>🎥 YouTube Transcriber - Admin Dashboard</h1>
<div class="nav-links">
<a href="http://localhost:4321">→ Public Website</a>
<a href="http://localhost:8000/docs">→ API Docs</a>
</div>
</div>
</div>
<div class="container">
<div class="stats-grid">
<div class="stat-card">
<div class="stat-value" id="total-transcripts">-</div>
<div class="stat-label">Transkripte</div>
</div>
<div class="stat-card">
<div class="stat-value" id="active-jobs">-</div>
<div class="stat-label">Aktive Jobs</div>
</div>
<div class="stat-card">
<div class="stat-value" id="total-size">-</div>
<div class="stat-label">Speicher (MB)</div>
</div>
<div class="stat-card">
<div class="stat-value" id="playlists-count">-</div>
<div class="stat-label">Playlists</div>
</div>
</div>
<div class="quick-action">
<h2>🚀 Neue Transkription starten</h2>
<div class="input-group">
<input type="text" id="url-input" placeholder="YouTube URL eingeben...">
<select id="model-select">
<option value="tiny">Tiny (Schnell)</option>
<option value="base" selected>Base</option>
<option value="small">Small</option>
<option value="medium">Medium</option>
<option value="large">Large (Beste Qualität)</option>
</select>
<select id="language-select">
<option value="de" selected>Deutsch</option>
<option value="en">English</option>
</select>
<button onclick="startTranscription()">Transkribieren</button>
</div>
</div>
<div class="jobs-list">
<h2 style="margin-bottom: 1rem;">📋 Aktuelle Jobs</h2>
<div id="jobs-container">
<div class="job-item">
<span style="color: #94a3b8;">Keine aktiven Jobs</span>
</div>
</div>
</div>
</div>
<script>
const API_URL = 'http://localhost:8000';
async function loadStats() {
try {
const response = await fetch(`${API_URL}/api/stats`);
const data = await response.json();
document.getElementById('total-transcripts').textContent = data.total_transcripts || '0';
document.getElementById('active-jobs').textContent = data.active_jobs || '0';
document.getElementById('total-size').textContent = data.total_size_mb?.toFixed(1) || '0';
} catch (error) {
console.error('Error loading stats:', error);
}
}
async function loadPlaylists() {
try {
const response = await fetch(`${API_URL}/api/playlists`);
const data = await response.json();
document.getElementById('playlists-count').textContent = data.length || '0';
} catch (error) {
console.error('Error loading playlists:', error);
}
}
async function loadJobs() {
try {
const response = await fetch(`${API_URL}/api/jobs`);
const jobs = await response.json();
const container = document.getElementById('jobs-container');
if (jobs.length === 0) {
container.innerHTML = '<div class="job-item"><span style="color: #94a3b8;">Keine aktiven Jobs</span></div>';
} else {
container.innerHTML = jobs.map(job => `
<div class="job-item">
<div>
<div style="font-weight: 600; margin-bottom: 0.25rem;">${job.url}</div>
<div style="color: #94a3b8; font-size: 0.875rem;">
${new Date(job.created_at).toLocaleString('de-DE')}
</div>
</div>
<span class="job-status status-${job.status}">
${job.status === 'transcribing' ? '<span class="loader"></span>' : ''}
${job.status.toUpperCase()}
</span>
</div>
`).join('');
}
} catch (error) {
console.error('Error loading jobs:', error);
}
}
async function startTranscription() {
const url = document.getElementById('url-input').value;
const model = document.getElementById('model-select').value;
const language = document.getElementById('language-select').value;
if (!url) {
alert('Bitte YouTube URL eingeben');
return;
}
try {
const response = await fetch(`${API_URL}/api/transcribe`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ url, model, language })
});
if (response.ok) {
document.getElementById('url-input').value = '';
alert('Transkription gestartet!');
loadStats();
loadJobs();
} else {
alert('Fehler beim Starten der Transkription');
}
} catch (error) {
console.error('Error starting transcription:', error);
alert('Fehler: API nicht erreichbar');
}
}
// Initial load
loadStats();
loadPlaylists();
loadJobs();
// Refresh every 5 seconds
setInterval(() => {
loadStats();
loadJobs();
}, 5000);
</script>
</body>
</html>

View file

@ -0,0 +1,372 @@
#!/usr/bin/env python3
"""
FastAPI Server für YouTube Transcriber Web Interface
"""
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from typing import List, Optional, Dict, Any
import asyncio
import json
import os
from pathlib import Path
from datetime import datetime
import uuid
from enum import Enum
# Import existing transcriber modules
from transcriber_v4_parallel import ParallelTranscriber
import whisper
app = FastAPI(title="YouTube Transcriber API", version="1.0.0")
# CORS middleware for Astro frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:4321", "http://localhost:3000"], # Astro dev server
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global state
class JobStatus(str, Enum):
PENDING = "pending"
DOWNLOADING = "downloading"
TRANSCRIBING = "transcribing"
COMPLETED = "completed"
FAILED = "failed"
class TranscriptionJob:
def __init__(self, job_id: str, url: str, model: str = "base", language: str = "de"):
self.id = job_id
self.url = url
self.model = model
self.language = language
self.status = JobStatus.PENDING
self.progress = 0
self.created_at = datetime.now()
self.completed_at = None
self.transcript_path = None
self.error = None
self.video_info = {}
# Store active jobs
active_jobs: Dict[str, TranscriptionJob] = {}
websocket_connections: List[WebSocket] = []
# Request/Response models
class TranscribeRequest(BaseModel):
url: HttpUrl
model: str = "base"
language: str = "de"
class PlaylistRequest(BaseModel):
name: str
description: Optional[str] = None
urls: List[HttpUrl]
class JobResponse(BaseModel):
id: str
url: str
status: str
progress: int
created_at: datetime
completed_at: Optional[datetime]
transcript_path: Optional[str]
error: Optional[str]
video_info: Dict[str, Any]
# WebSocket manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
for connection in self.active_connections:
try:
await connection.send_json(message)
except:
pass
manager = ConnectionManager()
# API Endpoints
@app.get("/")
async def root():
return {"message": "YouTube Transcriber API", "version": "1.0.0"}
@app.post("/api/transcribe", response_model=JobResponse)
async def start_transcription(request: TranscribeRequest, background_tasks: BackgroundTasks):
"""Start a new transcription job"""
job_id = str(uuid.uuid4())
job = TranscriptionJob(job_id, str(request.url), request.model, request.language)
active_jobs[job_id] = job
# Start transcription in background
background_tasks.add_task(process_transcription, job)
return JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
@app.get("/api/status/{job_id}", response_model=JobResponse)
async def get_job_status(job_id: str):
"""Get status of a transcription job"""
if job_id not in active_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = active_jobs[job_id]
return JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
@app.get("/api/jobs")
async def list_jobs():
"""List all transcription jobs"""
return [
JobResponse(
id=job.id,
url=job.url,
status=job.status,
progress=job.progress,
created_at=job.created_at,
completed_at=job.completed_at,
transcript_path=job.transcript_path,
error=job.error,
video_info=job.video_info
)
for job in active_jobs.values()
]
@app.get("/api/transcripts")
async def list_transcripts():
"""List all available transcripts"""
transcript_dir = Path("transcripts")
transcripts = []
if transcript_dir.exists():
for playlist_dir in transcript_dir.iterdir():
if playlist_dir.is_dir():
for channel_dir in playlist_dir.iterdir():
if channel_dir.is_dir():
for transcript_file in channel_dir.glob("*.txt"):
transcripts.append({
"playlist": playlist_dir.name,
"channel": channel_dir.name,
"filename": transcript_file.name,
"path": str(transcript_file),
"size": transcript_file.stat().st_size,
"modified": datetime.fromtimestamp(transcript_file.stat().st_mtime)
})
return transcripts
@app.get("/api/transcript/{transcript_path:path}")
async def get_transcript(transcript_path: str):
"""Get transcript content"""
file_path = Path(transcript_path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="Transcript not found")
return FileResponse(file_path)
@app.get("/api/playlists")
async def list_playlists():
"""List all playlists"""
playlist_dir = Path("playlists")
playlists = []
if playlist_dir.exists():
for category_dir in playlist_dir.iterdir():
if category_dir.is_dir():
for playlist_file in category_dir.glob("*.txt"):
urls = []
with open(playlist_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
urls.append(line)
playlists.append({
"category": category_dir.name,
"name": playlist_file.stem,
"path": str(playlist_file),
"url_count": len(urls),
"urls": urls
})
return playlists
@app.post("/api/playlists")
async def create_playlist(request: PlaylistRequest):
"""Create a new playlist"""
# Extract category and name from the playlist name (e.g., "tech/python_tutorials")
parts = request.name.split('/')
if len(parts) == 2:
category, name = parts
else:
category = "general"
name = request.name
playlist_dir = Path("playlists") / category
playlist_dir.mkdir(parents=True, exist_ok=True)
playlist_file = playlist_dir / f"{name}.txt"
with open(playlist_file, 'w') as f:
if request.description:
f.write(f"# {request.description}\n")
f.write("# Eine URL pro Zeile\n\n")
for url in request.urls:
f.write(f"{url}\n")
return {"message": "Playlist created", "path": str(playlist_file)}
@app.delete("/api/jobs/{job_id}")
async def cancel_job(job_id: str):
"""Cancel a transcription job"""
if job_id not in active_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = active_jobs[job_id]
job.status = JobStatus.FAILED
job.error = "Cancelled by user"
await manager.broadcast({
"type": "job_cancelled",
"job_id": job_id
})
return {"message": "Job cancelled"}
@app.websocket("/ws/progress")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket for real-time progress updates"""
await manager.connect(websocket)
try:
while True:
# Keep connection alive
await asyncio.sleep(1)
# Send heartbeat
await websocket.send_json({"type": "heartbeat"})
except WebSocketDisconnect:
manager.disconnect(websocket)
# Background task for processing
async def process_transcription(job: TranscriptionJob):
"""Process a transcription job"""
try:
# Update status
job.status = JobStatus.DOWNLOADING
await manager.broadcast({
"type": "job_update",
"job_id": job.id,
"status": job.status,
"progress": 10
})
# Initialize transcriber
transcriber = ParallelTranscriber(
model_size=job.model,
language=job.language,
max_downloads=1, # Single job
max_transcriptions=1
)
# Simulate processing (replace with actual transcriber call)
job.status = JobStatus.TRANSCRIBING
job.progress = 50
await manager.broadcast({
"type": "job_update",
"job_id": job.id,
"status": job.status,
"progress": job.progress
})
# TODO: Integrate actual transcription
# result = await transcriber.process_single(job.url)
# Mark as completed
job.status = JobStatus.COMPLETED
job.progress = 100
job.completed_at = datetime.now()
await manager.broadcast({
"type": "job_complete",
"job_id": job.id,
"status": job.status,
"progress": job.progress
})
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
await manager.broadcast({
"type": "job_error",
"job_id": job.id,
"error": job.error
})
@app.get("/api/models")
async def get_available_models():
"""Get available Whisper models"""
return {
"models": [
{"name": "tiny", "size": "39 MB", "speed": "~10x", "accuracy": "75%"},
{"name": "base", "size": "74 MB", "speed": "~7x", "accuracy": "85%"},
{"name": "small", "size": "244 MB", "speed": "~4x", "accuracy": "91%"},
{"name": "medium", "size": "769 MB", "speed": "~2x", "accuracy": "94%"},
{"name": "large", "size": "1.5 GB", "speed": "~1x", "accuracy": "96-98%"}
]
}
@app.get("/api/stats")
async def get_statistics():
"""Get system statistics"""
transcript_dir = Path("transcripts")
total_transcripts = 0
total_size = 0
if transcript_dir.exists():
for file in transcript_dir.rglob("*.txt"):
total_transcripts += 1
total_size += file.stat().st_size
return {
"total_transcripts": total_transcripts,
"total_size_mb": round(total_size / 1024 / 1024, 2),
"active_jobs": len([j for j in active_jobs.values() if j.status in [JobStatus.PENDING, JobStatus.DOWNLOADING, JobStatus.TRANSCRIBING]]),
"completed_jobs": len([j for j in active_jobs.values() if j.status == JobStatus.COMPLETED]),
"failed_jobs": len([j for j in active_jobs.values() if j.status == JobStatus.FAILED])
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)

View file

@ -0,0 +1,31 @@
{
"default_model": "small",
"default_language": "de",
"models": {
"tiny": {
"size_mb": 39,
"speed": "~10x Echtzeit",
"accuracy": "75%"
},
"base": {
"size_mb": 74,
"speed": "~7x Echtzeit",
"accuracy": "85%"
},
"small": {
"size_mb": 244,
"speed": "~4x Echtzeit",
"accuracy": "91%"
},
"medium": {
"size_mb": 769,
"speed": "~2x Echtzeit",
"accuracy": "94%"
},
"large": {
"size_mb": 1550,
"speed": "~1x Echtzeit",
"accuracy": "96-98%"
}
}
}

View file

@ -0,0 +1,44 @@
#!/bin/bash
# YouTube Transcriber - Schnellauswahl
source venv/bin/activate
echo "🎥 YouTube Transcriber - Modell-Auswahl"
echo "========================================"
echo ""
echo "1) 🚀 TINY - Schneller Test (39MB, ~10x Speed)"
echo "2) 🎯 LARGE - Beste Qualität (1.5GB, ~1x Speed)"
echo "3) 📋 SCAN - Alle Playlists scannen"
echo "4) ⚡ PARALLEL - Mehrere Videos parallel (3x Speed)"
echo ""
read -p "Wähle Modell (1-4): " choice
case $choice in
1)
echo "→ Nutze TINY Modell für schnellen Test"
read -p "YouTube URL: " url
python3 transcriber_v3.py process "$url" --model tiny
;;
2)
echo "→ Nutze LARGE Modell für beste Qualität"
read -p "YouTube URL: " url
python3 transcriber_v3.py process "$url" --model large
;;
3)
echo "→ Scanne alle Playlists mit LARGE Modell"
python3 transcriber_v3.py scan --model large
;;
4)
echo "→ Parallel-Verarbeitung (3x schneller!)"
echo "Gib URLs ein (mit Leerzeichen getrennt, oder Enter für Playlist):"
read -p "URLs: " urls
if [ -z "$urls" ]; then
python3 transcriber_v4_parallel.py process --playlist people/rory-sutherland --model large
else
python3 transcriber_v4_parallel.py process --urls $urls --model large
fi
;;
*)
echo "Ungültige Auswahl"
;;
esac

View file

@ -0,0 +1,4 @@
yt-dlp
openai-whisper
ffmpeg-python
rich

47
apps/wisekeep/legacy/start.sh Executable file
View file

@ -0,0 +1,47 @@
#!/bin/bash
# YouTube Transcriber - Start Script
echo "🎥 YouTube Transcriber System"
echo "============================="
echo ""
# Check if virtual environment exists
if [ ! -d "venv" ]; then
echo "Creating virtual environment..."
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
else
source venv/bin/activate
fi
# Start services
echo "Starting services..."
echo ""
# Start FastAPI backend
echo "1⃣ Starting API Server (Port 8000)..."
uvicorn api_server:app --reload --host 0.0.0.0 --port 8000 &
API_PID=$!
# Wait for API to start
sleep 3
# Start Astro frontend
echo "2⃣ Starting Website (Port 4321)..."
cd website && npx astro dev &
WEB_PID=$!
echo ""
echo "✅ System started!"
echo ""
echo "📍 Access points:"
echo " • Public Website: http://localhost:4321"
echo " • Admin Panel: http://localhost:4321/admin"
echo " • API Docs: http://localhost:8000/docs"
echo ""
echo "Press CTRL+C to stop all services"
# Wait for interrupt
trap "echo 'Stopping services...'; kill $API_PID $WEB_PID; exit" INT
wait

View file

@ -0,0 +1,294 @@
#!/usr/bin/env python3
"""
YouTube Auto-Transcriber MVP
Phase 1: Core Functionality - Download und Transkription
"""
import os
import sys
import argparse
from pathlib import Path
from datetime import datetime
import yt_dlp
import whisper
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
class YouTubeTranscriber:
def __init__(self, model_size="base", output_dir="transcripts"):
"""
Initialisiert den Transcriber
Args:
model_size: Whisper Model Größe (tiny, base, small, medium, large)
output_dir: Ausgabe-Verzeichnis für Transkriptionen
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.temp_dir = Path("temp_audio")
self.temp_dir.mkdir(exist_ok=True)
print(f"Lade Whisper Model '{model_size}'...")
self.model = whisper.load_model(model_size)
print(f"Model geladen: {model_size}")
self.ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': str(self.temp_dir / '%(title)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
}
def download_audio(self, url):
"""
Lädt Audio von YouTube herunter
Args:
url: YouTube URL
Returns:
Tuple (audio_path, video_info)
"""
print(f"\nLade Video von: {url}")
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=True)
title = info.get('title', 'unknown')
channel = info.get('uploader', 'unknown')
duration = info.get('duration', 0)
# Finde die heruntergeladene Audio-Datei
audio_file = None
for file in self.temp_dir.glob("*.mp3"):
if file.stat().st_mtime > (datetime.now().timestamp() - 60):
audio_file = file
break
if not audio_file:
raise Exception("Audio-Datei nicht gefunden")
print(f"✓ Download abgeschlossen: {title}")
print(f" Kanal: {channel}")
print(f" Dauer: {duration//60}:{duration%60:02d} Minuten")
return audio_file, {
'title': title,
'channel': channel,
'duration': duration,
'url': url
}
except Exception as e:
print(f"✗ Fehler beim Download: {e}")
return None, None
def transcribe_audio(self, audio_path, language="de"):
"""
Transkribiert Audio-Datei mit Whisper
Args:
audio_path: Pfad zur Audio-Datei
language: Sprache für Transkription
Returns:
Transkriptionstext
"""
print(f"\nStarte Transkription...")
print(f" Sprache: {language}")
try:
result = self.model.transcribe(
str(audio_path),
language=language,
verbose=False
)
print(f"✓ Transkription abgeschlossen")
print(f" Erkannte Sprache: {result.get('language', 'unbekannt')}")
return result['text']
except Exception as e:
print(f"✗ Fehler bei Transkription: {e}")
return None
def save_transcript(self, text, video_info):
"""
Speichert Transkript als Textdatei
Args:
text: Transkriptionstext
video_info: Video-Metadaten
Returns:
Pfad zur gespeicherten Datei
"""
# Erstelle sicheren Dateinamen
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{safe_title}_{timestamp}.txt"
# Erstelle Kanal-Ordner
channel_dir = self.output_dir / video_info['channel'].replace('/', '_')
channel_dir.mkdir(exist_ok=True)
filepath = channel_dir / filename
# Schreibe Transkript mit Metadaten
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"YouTube Transkription\n")
f.write("=" * 50 + "\n\n")
f.write(f"Titel: {video_info['title']}\n")
f.write(f"Kanal: {video_info['channel']}\n")
f.write(f"URL: {video_info['url']}\n")
f.write(f"Dauer: {video_info['duration']//60}:{video_info['duration']%60:02d} Minuten\n")
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
f.write("\n" + "=" * 50 + "\n\n")
f.write("TRANSKRIPTION:\n\n")
f.write(text)
print(f"\n✓ Transkript gespeichert: {filepath}")
return filepath
def cleanup_temp_files(self):
"""Löscht temporäre Audio-Dateien"""
for file in self.temp_dir.glob("*.mp3"):
try:
file.unlink()
except:
pass
print("✓ Temporäre Dateien aufgeräumt")
def process_video(self, url, language="de"):
"""
Kompletter Workflow: Download Transkription Speichern
Args:
url: YouTube URL
language: Sprache für Transkription
Returns:
Pfad zur Transkriptionsdatei oder None
"""
print("\n" + "=" * 60)
print(f"VERARBEITE VIDEO")
print("=" * 60)
# 1. Download Audio
audio_path, video_info = self.download_audio(url)
if not audio_path:
return None
# 2. Transkribiere
transcript = self.transcribe_audio(audio_path, language)
if not transcript:
return None
# 3. Speichern
output_path = self.save_transcript(transcript, video_info)
# 4. Aufräumen
self.cleanup_temp_files()
print("\n✓ Video erfolgreich verarbeitet!")
return output_path
def main():
parser = argparse.ArgumentParser(
description='YouTube Video Transcriber - Transkribiert YouTube Videos mit Whisper'
)
parser.add_argument(
'url',
nargs='?',
help='YouTube Video URL'
)
parser.add_argument(
'--model',
default='base',
choices=['tiny', 'base', 'small', 'medium', 'large'],
help='Whisper Model Größe (default: base)'
)
parser.add_argument(
'--language',
default='de',
help='Sprache für Transkription (default: de)'
)
parser.add_argument(
'--output',
default='transcripts',
help='Ausgabe-Verzeichnis (default: transcripts)'
)
parser.add_argument(
'--batch',
action='store_true',
help='Batch-Modus: URLs aus stdin lesen'
)
args = parser.parse_args()
# Initialisiere Transcriber
transcriber = YouTubeTranscriber(
model_size=args.model,
output_dir=args.output
)
if args.batch:
# Batch-Modus: Lese URLs von stdin
print("Batch-Modus: Gebe URLs ein (eine pro Zeile, beende mit Ctrl+D):")
urls = []
try:
for line in sys.stdin:
url = line.strip()
if url and url.startswith('http'):
urls.append(url)
except KeyboardInterrupt:
pass
print(f"\n{len(urls)} Videos zu verarbeiten")
for i, url in enumerate(urls, 1):
print(f"\n[{i}/{len(urls)}] Verarbeite Video...")
transcriber.process_video(url, args.language)
elif args.url:
# Single Video
transcriber.process_video(args.url, args.language)
else:
# Interaktiver Modus
print("\nYouTube Transcriber - Interaktiver Modus")
print("=" * 50)
print(f"Model: {args.model}")
print(f"Sprache: {args.language}")
print(f"Ausgabe: {args.output}/")
print("=" * 50)
print("\nGebe YouTube URL ein (oder 'q' zum Beenden):")
while True:
try:
url = input("\nURL: ").strip()
if url.lower() in ['q', 'quit', 'exit']:
break
if url.startswith('http'):
transcriber.process_video(url, args.language)
else:
print("Ungültige URL. Bitte YouTube URL eingeben.")
except KeyboardInterrupt:
break
print("\nAuf Wiedersehen!")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,476 @@
#!/usr/bin/env python3
"""
YouTube Auto-Transcriber v2.0
Mit verbesserter Download-Experience und Rich UI
"""
import os
import sys
import json
import argparse
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
import time
import yt_dlp
import whisper
import warnings
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
TimeElapsedColumn,
DownloadColumn,
TransferSpeedColumn
)
from rich.table import Table
from rich.panel import Panel
from rich.live import Live
from rich.layout import Layout
from rich import print as rprint
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
console = Console()
# ASCII Art Logo
LOGO = """
[bold cyan]
[bold white]🎥 YouTube Auto-Transcriber v2.0[/bold white]
[dim]Powered by OpenAI Whisper & yt-dlp[/dim]
[/bold cyan]
"""
class YouTubeTranscriber:
def __init__(self, model_size="base", output_dir="transcripts", cache_dir=".cache"):
"""
Initialisiert den Transcriber mit Rich UI
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.cache_file = self.cache_dir / "transcribed_videos.json"
self.temp_dir = Path("temp_audio")
self.temp_dir.mkdir(exist_ok=True)
# Lade Cache
self.cache = self.load_cache()
# Lade Whisper Model mit Progress
with console.status(f"[bold green]⏳ Lade Whisper Model '{model_size}'...", spinner="dots"):
self.model = whisper.load_model(model_size)
console.print(f"[bold green]✅ Model geladen: {model_size}[/bold green]")
# Model-Geschwindigkeiten (ungefähre Werte)
self.model_speeds = {
'tiny': 10,
'base': 7,
'small': 4,
'medium': 2,
'large': 1
}
self.model_size = model_size
self.speed_factor = self.model_speeds.get(model_size, 3)
self.ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': str(self.temp_dir / '%(title)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
'progress_hooks': [self._download_progress_hook],
}
self.current_progress = None
self.download_task = None
def load_cache(self):
"""Lädt den Cache bereits transkribierter Videos"""
if self.cache_file.exists():
with open(self.cache_file, 'r') as f:
return json.load(f)
return {}
def save_cache(self):
"""Speichert den Cache"""
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f, indent=2)
def get_video_hash(self, url):
"""Erstellt einen Hash für die Video-URL"""
return hashlib.md5(url.encode()).hexdigest()
def is_cached(self, url):
"""Prüft ob Video bereits transkribiert wurde"""
video_hash = self.get_video_hash(url)
if video_hash in self.cache:
cached_info = self.cache[video_hash]
output_file = Path(cached_info['output_file'])
if output_file.exists():
return cached_info
return None
def _download_progress_hook(self, d):
"""Progress Hook für yt-dlp"""
if d['status'] == 'downloading' and self.download_task:
if d.get('total_bytes'):
downloaded = d.get('downloaded_bytes', 0)
total = d['total_bytes']
self.current_progress.update(self.download_task, completed=downloaded, total=total)
elif d.get('total_bytes_estimate'):
downloaded = d.get('downloaded_bytes', 0)
total = d['total_bytes_estimate']
self.current_progress.update(self.download_task, completed=downloaded, total=total)
def get_video_info(self, url):
"""
Holt Video-Informationen VOR dem Download
"""
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
return {
'title': info.get('title', 'Unbekannt'),
'channel': info.get('uploader', 'Unbekannt'),
'duration': info.get('duration', 0),
'view_count': info.get('view_count', 0),
'upload_date': info.get('upload_date', ''),
'description': info.get('description', '')[:200],
'filesize': info.get('filesize', 0) or info.get('filesize_approx', 0)
}
except Exception as e:
console.print(f"[red]❌ Fehler beim Abrufen der Video-Info: {e}[/red]")
return None
def display_video_info(self, info):
"""Zeigt Video-Informationen in einer schönen Tabelle"""
if not info:
return
# Erstelle Info-Tabelle
table = Table(title="📹 Video Information", show_header=False, box=None)
table.add_column("Property", style="cyan", width=20)
table.add_column("Value", style="white")
table.add_row("Titel", info['title'][:60] + "..." if len(info['title']) > 60 else info['title'])
table.add_row("Kanal", info['channel'])
duration = info['duration']
duration_str = f"{duration//60}:{duration%60:02d} Minuten"
table.add_row("Dauer", duration_str)
# Zeitschätzung für Transkription
estimated_time = duration / self.speed_factor
eta_str = f"~{estimated_time//60:.0f}:{estimated_time%60:02.0f} Minuten"
table.add_row("Geschätzte Zeit", f"{eta_str} (mit {self.model_size} model)")
if info.get('view_count'):
views = f"{info['view_count']:,}".replace(',', '.')
table.add_row("Aufrufe", views)
console.print(Panel(table, border_style="cyan"))
# Warnung bei langen Videos
if duration > 1800: # 30 Minuten
console.print(f"[yellow]⚠️ Hinweis: Dieses Video ist über 30 Minuten lang. Die Transkription kann einige Zeit dauern.[/yellow]")
return estimated_time
def download_audio(self, url, progress):
"""
Lädt Audio mit Progress Bar herunter
"""
self.current_progress = progress
self.download_task = progress.add_task(
"[cyan]📥 Download Audio...",
total=None
)
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=True)
title = info.get('title', 'unknown')
channel = info.get('uploader', 'unknown')
duration = info.get('duration', 0)
# Finde die heruntergeladene Audio-Datei
audio_file = None
for file in self.temp_dir.glob("*.mp3"):
if file.stat().st_mtime > (datetime.now().timestamp() - 60):
audio_file = file
break
if not audio_file:
raise Exception("Audio-Datei nicht gefunden")
progress.update(self.download_task, completed=100, total=100)
return audio_file, {
'title': title,
'channel': channel,
'duration': duration,
'url': url
}
except Exception as e:
console.print(f"[red]❌ Fehler beim Download: {e}[/red]")
return None, None
def transcribe_audio(self, audio_path, language="de", progress=None):
"""
Transkribiert Audio-Datei mit Progress Bar
"""
if progress:
task = progress.add_task(
f"[green]🎙️ Transkribiere mit {self.model_size} model...",
total=100
)
try:
# Simuliere Progress (Whisper hat keine direkte Progress-API)
def progress_callback(current, total):
if progress:
progress.update(task, completed=min(current, 100))
result = self.model.transcribe(
str(audio_path),
language=language,
verbose=False,
fp16=False # Für M1 Mac
)
if progress:
progress.update(task, completed=100)
return result['text'], result.get('language', 'unbekannt')
except Exception as e:
console.print(f"[red]❌ Fehler bei Transkription: {e}[/red]")
return None, None
def save_transcript(self, text, video_info, detected_language=None):
"""
Speichert Transkript als Textdatei
"""
# Erstelle sicheren Dateinamen
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{safe_title}_{timestamp}.txt"
# Erstelle Kanal-Ordner
channel_dir = self.output_dir / video_info['channel'].replace('/', '_')
channel_dir.mkdir(exist_ok=True)
filepath = channel_dir / filename
# Schreibe Transkript mit Metadaten
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"YouTube Transkription\n")
f.write("=" * 50 + "\n\n")
f.write(f"Titel: {video_info['title']}\n")
f.write(f"Kanal: {video_info['channel']}\n")
f.write(f"URL: {video_info['url']}\n")
f.write(f"Dauer: {video_info['duration']//60}:{video_info['duration']%60:02d} Minuten\n")
if detected_language:
f.write(f"Erkannte Sprache: {detected_language}\n")
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
f.write(f"Whisper Model: {self.model_size}\n")
f.write("\n" + "=" * 50 + "\n\n")
f.write("TRANSKRIPTION:\n\n")
f.write(text)
return filepath
def cleanup_temp_files(self):
"""Löscht temporäre Audio-Dateien"""
for file in self.temp_dir.glob("*.mp3"):
try:
file.unlink()
except:
pass
def process_video(self, url, language="de", force_reprocess=False):
"""
Kompletter Workflow mit Rich UI
"""
console.rule(f"[bold blue]Verarbeite Video[/bold blue]")
# Prüfe Cache
if not force_reprocess:
cached = self.is_cached(url)
if cached:
console.print(f"[yellow]⚠️ Video bereits transkribiert:[/yellow]")
console.print(f" 📁 {cached['output_file']}")
console.print(f" 📅 {cached['transcribed_at']}")
console.print(f"[dim] (Nutze --force um neu zu transkribieren)[/dim]")
return cached['output_file']
# Hole Video-Info vorab
console.print("\n[cyan]📊 Lade Video-Informationen...[/cyan]")
video_info = self.get_video_info(url)
if not video_info:
return None
estimated_time = self.display_video_info(video_info)
# Multi-Progress für Download und Transkription
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
console=console
) as progress:
# 1. Download Audio
audio_path, download_info = self.download_audio(url, progress)
if not audio_path:
return None
# 2. Transkribiere
transcript, detected_lang = self.transcribe_audio(audio_path, language, progress)
if not transcript:
return None
# 3. Speichern
output_path = self.save_transcript(transcript, download_info, detected_lang)
# 4. Cache aktualisieren
video_hash = self.get_video_hash(url)
self.cache[video_hash] = {
'url': url,
'title': download_info['title'],
'output_file': str(output_path),
'transcribed_at': datetime.now().isoformat(),
'model': self.model_size,
'language': detected_lang
}
self.save_cache()
# 5. Aufräumen
self.cleanup_temp_files()
# Erfolgs-Meldung
console.print("\n[bold green]✅ Video erfolgreich verarbeitet![/bold green]")
console.print(f"📁 Gespeichert: [cyan]{output_path}[/cyan]")
return output_path
def main():
parser = argparse.ArgumentParser(
description='YouTube Video Transcriber v2.0 - Mit verbesserter UI'
)
parser.add_argument(
'url',
nargs='?',
help='YouTube Video URL'
)
parser.add_argument(
'--model',
default='base',
choices=['tiny', 'base', 'small', 'medium', 'large'],
help='Whisper Model Größe (default: base)'
)
parser.add_argument(
'--language',
default='de',
help='Sprache für Transkription (default: de)'
)
parser.add_argument(
'--output',
default='transcripts',
help='Ausgabe-Verzeichnis (default: transcripts)'
)
parser.add_argument(
'--batch',
action='store_true',
help='Batch-Modus: URLs aus stdin lesen'
)
parser.add_argument(
'--force',
action='store_true',
help='Ignoriere Cache und transkribiere neu'
)
args = parser.parse_args()
# Zeige Logo
console.print(LOGO)
# Initialisiere Transcriber
transcriber = YouTubeTranscriber(
model_size=args.model,
output_dir=args.output
)
if args.batch:
# Batch-Modus
console.print("[cyan]📋 Batch-Modus: Gebe URLs ein (eine pro Zeile, beende mit Ctrl+D):[/cyan]")
urls = []
try:
for line in sys.stdin:
url = line.strip()
if url and url.startswith('http'):
urls.append(url)
except KeyboardInterrupt:
pass
console.print(f"\n[bold]{len(urls)} Videos zu verarbeiten[/bold]")
for i, url in enumerate(urls, 1):
console.print(f"\n[bold cyan]━━━ Video {i}/{len(urls)} ━━━[/bold cyan]")
transcriber.process_video(url, args.language, args.force)
elif args.url:
# Single Video
transcriber.process_video(args.url, args.language, args.force)
else:
# Interaktiver Modus
console.print("[bold cyan]🎬 Interaktiver Modus[/bold cyan]")
console.print(f"Model: [green]{args.model}[/green]")
console.print(f"Sprache: [green]{args.language}[/green]")
console.print(f"Ausgabe: [green]{args.output}/[/green]")
console.print("\nGebe YouTube URL ein (oder 'q' zum Beenden):\n")
while True:
try:
url = console.input("[bold cyan]URL ▶ [/bold cyan]").strip()
if url.lower() in ['q', 'quit', 'exit']:
break
if url.startswith('http'):
transcriber.process_video(url, args.language, args.force)
else:
console.print("[red]❌ Ungültige URL. Bitte YouTube URL eingeben.[/red]")
except KeyboardInterrupt:
break
console.print("\n[bold green]👋 Auf Wiedersehen![/bold green]")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,603 @@
#!/usr/bin/env python3
"""
YouTube Auto-Transcriber v3.0
Mit Playlist-Management und Themen-Ordnern
"""
import os
import sys
import json
import argparse
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
import time
from typing import List, Dict, Tuple
import yt_dlp
import whisper
import warnings
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
TimeElapsedColumn,
MofNCompleteColumn
)
from rich.table import Table
from rich.panel import Panel
from rich.tree import Tree
from rich import print as rprint
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
console = Console()
# ASCII Art Logo
LOGO = """
[bold cyan]
[bold white]🎥 YouTube Auto-Transcriber v3.0[/bold white]
[dim]Playlist Management & Batch Processing[/dim]
[/bold cyan]
"""
class PlaylistManager:
"""
Verwaltet Playlists und URL-Listen
"""
def __init__(self, playlists_dir="playlists"):
self.playlists_dir = Path(playlists_dir)
self.playlists_dir.mkdir(exist_ok=True)
# Erstelle Beispiel-Struktur wenn leer
self._create_example_structure()
def _create_example_structure(self):
"""Erstellt Beispiel-Ordnerstruktur"""
example_file = self.playlists_dir / "example_tech.txt"
if not example_file.exists() and not any(self.playlists_dir.glob("*.txt")):
with open(example_file, 'w') as f:
f.write("# Tech Videos - Beispiel Playlist\n")
f.write("# Zeilen mit # werden ignoriert\n")
f.write("# Eine URL pro Zeile:\n")
f.write("#\n")
f.write("# https://www.youtube.com/watch?v=VIDEO_ID\n")
def get_all_playlists(self) -> Dict[str, Path]:
"""Findet alle Playlist-Dateien"""
playlists = {}
# Suche .txt Dateien im Hauptordner
for file in self.playlists_dir.glob("*.txt"):
name = file.stem
playlists[name] = file
# Suche auch in Unterordnern
for folder in self.playlists_dir.iterdir():
if folder.is_dir():
for file in folder.glob("*.txt"):
name = f"{folder.name}/{file.stem}"
playlists[name] = file
return playlists
def read_playlist(self, playlist_path: Path) -> List[str]:
"""Liest URLs aus einer Playlist-Datei"""
urls = []
if not playlist_path.exists():
return urls
with open(playlist_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# Ignoriere leere Zeilen und Kommentare
if line and not line.startswith('#'):
if 'youtube.com' in line or 'youtu.be' in line:
urls.append(line)
return urls
def display_playlists_tree(self):
"""Zeigt alle Playlists als Baum-Struktur"""
tree = Tree("[bold cyan]📁 Playlists[/bold cyan]")
# Hauptordner-Dateien
for file in sorted(self.playlists_dir.glob("*.txt")):
urls = self.read_playlist(file)
tree.add(f"📄 {file.stem} ({len(urls)} URLs)")
# Unterordner
for folder in sorted(self.playlists_dir.iterdir()):
if folder.is_dir():
branch = tree.add(f"📂 {folder.name}/")
for file in sorted(folder.glob("*.txt")):
urls = self.read_playlist(file)
branch.add(f"📄 {file.stem} ({len(urls)} URLs)")
console.print(tree)
return tree
class YouTubeTranscriber:
def __init__(self, model_size="base", output_dir="transcripts", cache_dir=".cache"):
"""
Initialisiert den Transcriber mit Rich UI
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.cache_file = self.cache_dir / "transcribed_videos.json"
self.temp_dir = Path("temp_audio")
self.temp_dir.mkdir(exist_ok=True)
# Lade Cache
self.cache = self.load_cache()
# Lade Whisper Model mit Progress
with console.status(f"[bold green]⏳ Lade Whisper Model '{model_size}'...", spinner="dots"):
self.model = whisper.load_model(model_size)
console.print(f"[bold green]✅ Model geladen: {model_size}[/bold green]")
# Model-Geschwindigkeiten
self.model_speeds = {
'tiny': 10,
'base': 7,
'small': 4,
'medium': 2,
'large': 1
}
self.model_size = model_size
self.speed_factor = self.model_speeds.get(model_size, 3)
self.ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': str(self.temp_dir / '%(title)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
'progress_hooks': [self._download_progress_hook],
}
self.current_progress = None
self.download_task = None
def load_cache(self):
"""Lädt den Cache bereits transkribierter Videos"""
if self.cache_file.exists():
with open(self.cache_file, 'r') as f:
return json.load(f)
return {}
def save_cache(self):
"""Speichert den Cache"""
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f, indent=2)
def get_video_hash(self, url):
"""Erstellt einen Hash für die Video-URL"""
return hashlib.md5(url.encode()).hexdigest()
def is_cached(self, url):
"""Prüft ob Video bereits transkribiert wurde"""
video_hash = self.get_video_hash(url)
if video_hash in self.cache:
cached_info = self.cache[video_hash]
output_file = Path(cached_info['output_file'])
if output_file.exists():
return cached_info
return None
def _download_progress_hook(self, d):
"""Progress Hook für yt-dlp"""
if d['status'] == 'downloading' and self.download_task:
if d.get('total_bytes'):
downloaded = d.get('downloaded_bytes', 0)
total = d['total_bytes']
self.current_progress.update(self.download_task, completed=downloaded, total=total)
def get_video_info(self, url):
"""Holt Video-Informationen VOR dem Download"""
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
return {
'title': info.get('title', 'Unbekannt'),
'channel': info.get('uploader', 'Unbekannt'),
'duration': info.get('duration', 0),
'url': url
}
except Exception as e:
console.print(f"[red]❌ Fehler beim Abrufen der Video-Info: {e}[/red]")
return None
def download_audio(self, url, progress=None):
"""Lädt Audio mit Progress Bar herunter"""
self.current_progress = progress
if progress:
self.download_task = progress.add_task(
"[cyan]📥 Download...",
total=None
)
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=True)
title = info.get('title', 'unknown')
channel = info.get('uploader', 'unknown')
duration = info.get('duration', 0)
# Finde die heruntergeladene Audio-Datei
audio_file = None
for file in self.temp_dir.glob("*.mp3"):
if file.stat().st_mtime > (datetime.now().timestamp() - 60):
audio_file = file
break
if not audio_file:
raise Exception("Audio-Datei nicht gefunden")
if progress and self.download_task:
progress.update(self.download_task, completed=100, total=100)
return audio_file, {
'title': title,
'channel': channel,
'duration': duration,
'url': url
}
except Exception as e:
console.print(f"[red]❌ Download-Fehler: {e}[/red]")
return None, None
def transcribe_audio(self, audio_path, language="de", progress=None):
"""Transkribiert Audio-Datei"""
if progress:
task = progress.add_task(
f"[green]🎙️ Transkribiere...",
total=100
)
try:
result = self.model.transcribe(
str(audio_path),
language=language,
verbose=False,
fp16=False
)
if progress:
progress.update(task, completed=100)
return result['text'], result.get('language', 'unbekannt')
except Exception as e:
console.print(f"[red]❌ Transkriptions-Fehler: {e}[/red]")
return None, None
def save_transcript(self, text, video_info, playlist_name=None):
"""Speichert Transkript mit optionalem Playlist-Ordner"""
# Basis-Ordner
base_dir = self.output_dir
# Wenn Playlist, erstelle Unterordner
if playlist_name:
base_dir = base_dir / playlist_name.replace('/', '_')
base_dir.mkdir(parents=True, exist_ok=True)
# Kanal-Ordner
channel_dir = base_dir / video_info['channel'].replace('/', '_')
channel_dir.mkdir(exist_ok=True)
# Dateiname
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{safe_title}_{timestamp}.txt"
filepath = channel_dir / filename
# Schreibe Transkript
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"YouTube Transkription\n")
f.write("=" * 50 + "\n\n")
f.write(f"Titel: {video_info['title']}\n")
f.write(f"Kanal: {video_info['channel']}\n")
f.write(f"URL: {video_info['url']}\n")
if playlist_name:
f.write(f"Playlist: {playlist_name}\n")
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
f.write(f"Whisper Model: {self.model_size}\n")
f.write("\n" + "=" * 50 + "\n\n")
f.write("TRANSKRIPTION:\n\n")
f.write(text)
return filepath
def cleanup_temp_files(self):
"""Löscht temporäre Audio-Dateien"""
for file in self.temp_dir.glob("*.mp3"):
try:
file.unlink()
except:
pass
def process_video(self, url, language="de", playlist_name=None, progress=None):
"""Verarbeitet ein einzelnes Video"""
# Prüfe Cache
cached = self.is_cached(url)
if cached:
return cached['output_file'], True # True = war gecached
# Hole Video-Info
video_info = self.get_video_info(url)
if not video_info:
return None, False
# Download Audio
audio_path, download_info = self.download_audio(url, progress)
if not audio_path:
return None, False
# Transkribiere
transcript, detected_lang = self.transcribe_audio(audio_path, language, progress)
if not transcript:
return None, False
# Speichern
output_path = self.save_transcript(transcript, download_info, playlist_name)
# Cache aktualisieren
video_hash = self.get_video_hash(url)
self.cache[video_hash] = {
'url': url,
'title': download_info['title'],
'output_file': str(output_path),
'transcribed_at': datetime.now().isoformat(),
'model': self.model_size,
'playlist': playlist_name
}
self.save_cache()
# Aufräumen
self.cleanup_temp_files()
return output_path, False # False = neu transkribiert
def process_playlist(self, playlist_name: str, urls: List[str], language="de"):
"""
Verarbeitet eine komplette Playlist
"""
console.rule(f"[bold cyan]📋 Playlist: {playlist_name}[/bold cyan]")
# Filtere bereits transkribierte Videos
new_urls = []
cached_count = 0
for url in urls:
if self.is_cached(url):
cached_count += 1
else:
new_urls.append(url)
# Status-Übersicht
table = Table(show_header=False, box=None)
table.add_column("Info", style="cyan")
table.add_column("Wert", style="white")
table.add_row("📊 Gesamt Videos:", str(len(urls)))
table.add_row("✅ Bereits transkribiert:", str(cached_count))
table.add_row("🆕 Neu zu transkribieren:", str(len(new_urls)))
console.print(Panel(table, title="Playlist Status", border_style="cyan"))
if not new_urls:
console.print("[green]✅ Alle Videos bereits transkribiert![/green]")
return
# Verarbeite neue Videos
success_count = 0
error_count = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
console=console
) as progress:
playlist_task = progress.add_task(
f"[cyan]Verarbeite {playlist_name}...",
total=len(new_urls)
)
for i, url in enumerate(new_urls, 1):
progress.update(
playlist_task,
description=f"[cyan]Video {i}/{len(new_urls)}..."
)
# Verarbeite Video
output_path, was_cached = self.process_video(
url,
language,
playlist_name,
progress
)
if output_path:
success_count += 1
console.print(f"{Path(output_path).name}")
else:
error_count += 1
console.print(f" ❌ Fehler bei: {url}")
progress.update(playlist_task, advance=1)
# Zusammenfassung
console.print("\n" + "=" * 50)
console.print(f"[bold green]✅ Erfolgreich: {success_count}[/bold green]")
if error_count > 0:
console.print(f"[bold red]❌ Fehler: {error_count}[/bold red]")
console.print(f"[bold cyan]📁 Gespeichert in: {self.output_dir}/{playlist_name}/[/bold cyan]")
def process_all_playlists(transcriber, playlist_manager, language="de"):
"""Verarbeitet alle Playlists"""
playlists = playlist_manager.get_all_playlists()
if not playlists:
console.print("[yellow]⚠️ Keine Playlists gefunden![/yellow]")
console.print(f"Erstelle .txt Dateien in: {playlist_manager.playlists_dir}/")
return
console.print(f"\n[bold cyan]🔍 Gefundene Playlists:[/bold cyan]")
playlist_manager.display_playlists_tree()
# Statistiken sammeln
total_urls = 0
total_new = 0
for name, path in playlists.items():
urls = playlist_manager.read_playlist(path)
new_count = sum(1 for url in urls if not transcriber.is_cached(url))
total_urls += len(urls)
total_new += new_count
console.print(f"\n[bold]📊 Gesamt: {total_urls} Videos, {total_new} neu zu transkribieren[/bold]")
# Verarbeite jede Playlist
for name, path in playlists.items():
urls = playlist_manager.read_playlist(path)
if urls:
console.print(f"\n" + "=" * 60)
transcriber.process_playlist(name, urls, language)
console.print("\n[bold green]🎉 Alle Playlists verarbeitet![/bold green]")
def main():
parser = argparse.ArgumentParser(
description='YouTube Transcriber v3.0 - Playlist Management'
)
parser.add_argument(
'command',
nargs='?',
choices=['scan', 'list', 'process'],
default='scan',
help='Befehl: scan (alle Playlists), list (zeige Playlists), process (einzelne URL)'
)
parser.add_argument(
'url',
nargs='?',
help='YouTube URL (nur für process)'
)
parser.add_argument(
'--playlist',
help='Spezifische Playlist verarbeiten'
)
parser.add_argument(
'--model',
default='base',
choices=['tiny', 'base', 'small', 'medium', 'large'],
help='Whisper Model (default: base)'
)
parser.add_argument(
'--language',
default='de',
help='Sprache (default: de)'
)
parser.add_argument(
'--playlists-dir',
default='playlists',
help='Ordner mit Playlist-Dateien (default: playlists)'
)
parser.add_argument(
'--output',
default='transcripts',
help='Ausgabe-Ordner (default: transcripts)'
)
args = parser.parse_args()
# Zeige Logo
console.print(LOGO)
# Initialisiere Manager
playlist_manager = PlaylistManager(args.playlists_dir)
transcriber = YouTubeTranscriber(
model_size=args.model,
output_dir=args.output
)
if args.command == 'list':
# Zeige nur Playlists
playlists = playlist_manager.get_all_playlists()
if playlists:
console.print("[bold cyan]📁 Verfügbare Playlists:[/bold cyan]\n")
playlist_manager.display_playlists_tree()
# Zeige Details
console.print("\n[bold]Details:[/bold]")
for name, path in playlists.items():
urls = playlist_manager.read_playlist(path)
new_count = sum(1 for url in urls if not transcriber.is_cached(url))
console.print(f"{name}: {len(urls)} URLs ({new_count} neu)")
else:
console.print("[yellow]Keine Playlists gefunden![/yellow]")
console.print(f"Erstelle .txt Dateien in: {args.playlists_dir}/")
elif args.command == 'process':
# Verarbeite einzelne URL
if args.url:
output, _ = transcriber.process_video(args.url, args.language)
if output:
console.print(f"[green]✅ Gespeichert: {output}[/green]")
else:
console.print("[red]❌ Bitte URL angeben für 'process' Befehl[/red]")
elif args.command == 'scan':
# Verarbeite Playlists
if args.playlist:
# Spezifische Playlist
playlists = playlist_manager.get_all_playlists()
if args.playlist in playlists:
path = playlists[args.playlist]
urls = playlist_manager.read_playlist(path)
transcriber.process_playlist(args.playlist, urls, args.language)
else:
console.print(f"[red]❌ Playlist '{args.playlist}' nicht gefunden![/red]")
console.print("Verfügbare Playlists:")
for name in playlists.keys():
console.print(f"{name}")
else:
# Alle Playlists
process_all_playlists(transcriber, playlist_manager, args.language)
console.print("\n[bold green]✨ Fertig![/bold green]")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,559 @@
#!/usr/bin/env python3
"""
YouTube Auto-Transcriber v4.0 - PARALLEL EDITION
Mit Multi-Threading für 3-4x schnellere Verarbeitung
"""
import os
import sys
import json
import argparse
import hashlib
from pathlib import Path
from datetime import datetime
import time
from typing import List, Dict, Tuple, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue, Empty
import threading
from dataclasses import dataclass
import multiprocessing
import yt_dlp
import whisper
import warnings
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
TimeElapsedColumn,
MofNCompleteColumn
)
from rich.table import Table
from rich.panel import Panel
from rich.live import Live
from rich.layout import Layout
from rich.columns import Columns
from rich import print as rprint
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
console = Console()
# ASCII Art Logo
LOGO = """
[bold cyan]
[bold white]🚀 YouTube Transcriber v4.0 - PARALLEL[/bold white]
[dim]Multi-Threading für 3-4x Speed![/dim]
[/bold cyan]
"""
@dataclass
class VideoJob:
"""Datenklasse für Video-Jobs"""
url: str
playlist_name: Optional[str] = None
language: str = "de"
status: str = "pending" # pending, downloading, transcribing, completed, failed
error: Optional[str] = None
output_path: Optional[str] = None
title: Optional[str] = None
duration: Optional[int] = None
class ParallelTranscriber:
def __init__(self,
model_size="base",
output_dir="transcripts",
cache_dir=".cache",
max_downloads=3,
max_transcriptions=2):
"""
Initialisiert den Parallel-Transcriber
Args:
max_downloads: Maximale parallele Downloads
max_transcriptions: Maximale parallele Transkriptionen
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.cache_file = self.cache_dir / "transcribed_videos.json"
self.temp_dir = Path("temp_audio")
self.temp_dir.mkdir(exist_ok=True)
# Parallel-Processing Settings
self.max_downloads = max_downloads
self.max_transcriptions = max_transcriptions
# Optimale Werte für M1/M2 Macs
if model_size == "large":
self.max_transcriptions = min(2, max_transcriptions) # Max 2 Large-Modelle parallel
elif model_size in ["tiny", "base"]:
self.max_transcriptions = min(4, max_transcriptions) # Bis zu 4 kleine Modelle
# Queues für Pipeline
self.download_queue = Queue()
self.transcribe_queue = Queue()
self.completed_queue = Queue()
# Thread Pools
self.download_pool = ThreadPoolExecutor(max_workers=self.max_downloads)
self.transcribe_pool = ThreadPoolExecutor(max_workers=self.max_transcriptions)
# Jobs tracking
self.jobs: Dict[str, VideoJob] = {}
self.lock = threading.Lock()
# Lade Cache
self.cache = self.load_cache()
# Model Settings
self.model_size = model_size
self.model_speeds = {
'tiny': 10,
'base': 7,
'small': 4,
'medium': 2,
'large': 1
}
# Progress tracking
self.progress = None
self.main_task = None
console.print(f"[bold green]⚡ Parallel-Modus aktiviert:[/bold green]")
console.print(f" • Max Downloads: {self.max_downloads}")
console.print(f" • Max Transkriptionen: {self.max_transcriptions}")
console.print(f" • Whisper Model: {model_size}")
def load_cache(self):
"""Lädt den Cache"""
if self.cache_file.exists():
with open(self.cache_file, 'r') as f:
return json.load(f)
return {}
def save_cache(self):
"""Speichert den Cache"""
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f, indent=2)
def get_video_hash(self, url):
"""Erstellt einen Hash für die Video-URL"""
return hashlib.md5(url.encode()).hexdigest()
def is_cached(self, url):
"""Prüft ob Video bereits transkribiert wurde"""
video_hash = self.get_video_hash(url)
if video_hash in self.cache:
cached_info = self.cache[video_hash]
output_file = Path(cached_info['output_file'])
if output_file.exists():
return cached_info
return None
def download_worker(self, job: VideoJob) -> Tuple[Optional[Path], Dict]:
"""
Worker-Funktion für Downloads
Läuft in einem Thread
"""
try:
with self.lock:
job.status = "downloading"
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': str(self.temp_dir / f'%(id)s_%(title)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(job.url, download=True)
# Finde die heruntergeladene Datei
video_id = info.get('id', '')
audio_files = list(self.temp_dir.glob(f"{video_id}*.mp3"))
if not audio_files:
raise Exception("Audio-Datei nicht gefunden")
audio_file = audio_files[0]
video_info = {
'title': info.get('title', 'unknown'),
'channel': info.get('uploader', 'unknown'),
'duration': info.get('duration', 0),
'url': job.url
}
with self.lock:
job.title = video_info['title']
job.duration = video_info['duration']
return audio_file, video_info
except Exception as e:
with self.lock:
job.status = "failed"
job.error = str(e)
console.print(f"[red]❌ Download-Fehler für {job.url}: {e}[/red]")
return None, {}
def transcribe_worker(self, model, audio_path: Path, job: VideoJob, video_info: Dict) -> Optional[str]:
"""
Worker-Funktion für Transkription
Läuft in einem Thread mit eigenem Whisper-Model
"""
try:
with self.lock:
job.status = "transcribing"
# Transkribiere
result = model.transcribe(
str(audio_path),
language=job.language,
verbose=False,
fp16=False # Für M1 Mac
)
transcript = result['text']
# Speichere Transkript
output_path = self.save_transcript(transcript, video_info, job.playlist_name)
# Update Cache
video_hash = self.get_video_hash(job.url)
self.cache[video_hash] = {
'url': job.url,
'title': video_info['title'],
'output_file': str(output_path),
'transcribed_at': datetime.now().isoformat(),
'model': self.model_size,
'playlist': job.playlist_name
}
self.save_cache()
# Lösche Audio-Datei
try:
audio_path.unlink()
except:
pass
with self.lock:
job.status = "completed"
job.output_path = str(output_path)
return str(output_path)
except Exception as e:
with self.lock:
job.status = "failed"
job.error = str(e)
console.print(f"[red]❌ Transkriptions-Fehler: {e}[/red]")
return None
def save_transcript(self, text, video_info, playlist_name=None):
"""Speichert Transkript"""
base_dir = self.output_dir
if playlist_name:
base_dir = base_dir / playlist_name.replace('/', '_')
base_dir.mkdir(parents=True, exist_ok=True)
channel_dir = base_dir / video_info['channel'].replace('/', '_')
channel_dir.mkdir(exist_ok=True)
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{safe_title}_{timestamp}.txt"
filepath = channel_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"YouTube Transkription\n")
f.write("=" * 50 + "\n\n")
f.write(f"Titel: {video_info['title']}\n")
f.write(f"Kanal: {video_info['channel']}\n")
f.write(f"URL: {video_info['url']}\n")
if playlist_name:
f.write(f"Playlist: {playlist_name}\n")
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
f.write(f"Whisper Model: {self.model_size}\n")
f.write("\n" + "=" * 50 + "\n\n")
f.write("TRANSKRIPTION:\n\n")
f.write(text)
return filepath
def process_pipeline(self, urls: List[str], playlist_name: Optional[str] = None, language: str = "de"):
"""
Haupt-Pipeline für parallele Verarbeitung
"""
# Filtere bereits transkribierte Videos
jobs_to_process = []
cached_count = 0
for url in urls:
if self.is_cached(url):
cached_count += 1
else:
job = VideoJob(url=url, playlist_name=playlist_name, language=language)
self.jobs[url] = job
jobs_to_process.append(job)
if not jobs_to_process:
console.print("[green]✅ Alle Videos bereits transkribiert![/green]")
return
# Status-Übersicht
console.print(Panel(
f"[bold]🚀 Starte parallele Verarbeitung[/bold]\n\n"
f"📊 Gesamt: {len(urls)} Videos\n"
f"✅ Gecached: {cached_count}\n"
f"🆕 Zu verarbeiten: {len(jobs_to_process)}\n\n"
f"⚡ Downloads: {self.max_downloads} parallel\n"
f"🎙️ Transkriptionen: {self.max_transcriptions} parallel",
border_style="cyan"
))
# Lade Whisper-Modelle (eines pro Thread)
console.print(f"\n[cyan]⏳ Lade {self.max_transcriptions}x Whisper {self.model_size} Modelle...[/cyan]")
models = []
for i in range(self.max_transcriptions):
model = whisper.load_model(self.model_size)
models.append(model)
console.print(f" ✅ Model {i+1}/{self.max_transcriptions} geladen")
# Progress Bar
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=console
) as progress:
main_task = progress.add_task(
"[cyan]Verarbeite Videos...",
total=len(jobs_to_process)
)
# Futures für Downloads und Transkriptionen
download_futures = {}
transcribe_futures = {}
model_pool = models.copy() # Pool verfügbarer Modelle
# Starte initiale Downloads
for job in jobs_to_process[:self.max_downloads]:
future = self.download_pool.submit(self.download_worker, job)
download_futures[future] = job
remaining_jobs = jobs_to_process[self.max_downloads:]
completed_count = 0
# Haupt-Loop
while download_futures or transcribe_futures or remaining_jobs:
# Prüfe fertige Downloads
for future in list(download_futures.keys()):
if future.done():
job = download_futures.pop(future)
audio_path, video_info = future.result()
if audio_path and model_pool:
# Starte Transkription wenn Model verfügbar
model = model_pool.pop()
trans_future = self.transcribe_pool.submit(
self.transcribe_worker, model, audio_path, job, video_info
)
transcribe_futures[trans_future] = (job, model)
# Starte nächsten Download
if remaining_jobs:
next_job = remaining_jobs.pop(0)
future = self.download_pool.submit(self.download_worker, next_job)
download_futures[future] = next_job
# Prüfe fertige Transkriptionen
for future in list(transcribe_futures.keys()):
if future.done():
job, model = transcribe_futures.pop(future)
result = future.result()
# Model zurück in Pool
model_pool.append(model)
if result:
completed_count += 1
progress.update(main_task, advance=1)
console.print(f"{job.title[:50]}")
else:
console.print(f" ❌ Fehler bei: {job.url}")
# Kurze Pause für CPU
time.sleep(0.1)
# Warte auf alle verbleibenden Tasks
for future in as_completed(list(download_futures.keys()) + list(transcribe_futures.keys())):
pass
# Zusammenfassung
console.print("\n" + "=" * 60)
console.print(f"[bold green]✅ Verarbeitung abgeschlossen![/bold green]")
console.print(f"Erfolgreich: {completed_count}/{len(jobs_to_process)}")
# Zeige Fehler falls vorhanden
failed_jobs = [j for j in jobs_to_process if j.status == "failed"]
if failed_jobs:
console.print(f"\n[red]Fehlerhafte Videos:[/red]")
for job in failed_jobs:
console.print(f"{job.url}: {job.error}")
def process_playlist_file(self, playlist_path: Path, language: str = "de"):
"""Verarbeitet eine Playlist-Datei"""
urls = []
with open(playlist_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
if 'youtube.com' in line or 'youtu.be' in line:
urls.append(line)
if urls:
playlist_name = playlist_path.stem
self.process_pipeline(urls, playlist_name, language)
else:
console.print(f"[yellow]Keine URLs in {playlist_path}[/yellow]")
def benchmark_parallel_vs_sequential():
"""
Benchmark-Funktion zum Vergleich
"""
console.print("\n[bold cyan]📊 Performance-Vergleich[/bold cyan]")
table = Table(title="Geschwindigkeitsvergleich")
table.add_column("Modus", style="cyan")
table.add_column("10 Videos (je 5 Min)", style="white")
table.add_column("Speedup", style="green")
table.add_row(
"Sequenziell (v3)",
"~50 Minuten",
"1x"
)
table.add_row(
"Parallel 2 Downloads",
"~25 Minuten",
"2x"
)
table.add_row(
"Parallel 3 Downloads + 2 Transkriptionen",
"~15 Minuten",
"3.3x"
)
console.print(table)
def main():
parser = argparse.ArgumentParser(
description='YouTube Transcriber v4.0 - PARALLEL EDITION'
)
parser.add_argument(
'command',
nargs='?',
choices=['process', 'benchmark'],
default='process',
help='Befehl: process oder benchmark'
)
parser.add_argument(
'--playlist',
help='Playlist-Datei'
)
parser.add_argument(
'--urls',
nargs='+',
help='Direkte URL-Liste'
)
parser.add_argument(
'--model',
default='base',
choices=['tiny', 'base', 'small', 'medium', 'large'],
help='Whisper Model'
)
parser.add_argument(
'--language',
default='de',
help='Sprache'
)
parser.add_argument(
'--max-downloads',
type=int,
default=3,
help='Max parallele Downloads (default: 3)'
)
parser.add_argument(
'--max-transcriptions',
type=int,
default=2,
help='Max parallele Transkriptionen (default: 2)'
)
args = parser.parse_args()
# Zeige Logo
console.print(LOGO)
if args.command == 'benchmark':
benchmark_parallel_vs_sequential()
return
# Initialisiere Parallel-Transcriber
transcriber = ParallelTranscriber(
model_size=args.model,
max_downloads=args.max_downloads,
max_transcriptions=args.max_transcriptions
)
if args.playlist:
# Verarbeite Playlist-Datei
playlist_path = Path(args.playlist)
if playlist_path.exists():
transcriber.process_playlist_file(playlist_path, args.language)
else:
console.print(f"[red]Playlist nicht gefunden: {args.playlist}[/red]")
elif args.urls:
# Verarbeite direkte URLs
transcriber.process_pipeline(args.urls, language=args.language)
else:
console.print("[yellow]Bitte URLs oder Playlist angeben![/yellow]")
console.print("\nBeispiele:")
console.print(" python3 transcriber_v4_parallel.py --urls URL1 URL2 URL3")
console.print(" python3 transcriber_v4_parallel.py --playlist playlists/tech/python.txt")
if __name__ == "__main__":
main()