managarten/apps-archived/wisekeep/legacy/transcriber_v4_parallel.py
Till-JS 61d181fbc2 chore: archive inactive projects to apps-archived/
Move inactive projects out of active workspace:
- bauntown (community website)
- maerchenzauber (AI story generation)
- memoro (voice memo app)
- news (news aggregation)
- nutriphi (nutrition tracking)
- reader (reading app)
- uload (URL shortener)
- wisekeep (AI wisdom extraction)

Update CLAUDE.md documentation:
- Add presi to active projects
- Document archived projects section
- Update workspace configuration

Archived apps can be re-activated by moving back to apps/

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 07:03:59 +01:00

559 lines
No EOL
19 KiB
Python

#!/usr/bin/env python3
"""
YouTube Auto-Transcriber v4.0 - PARALLEL EDITION
Mit Multi-Threading für 3-4x schnellere Verarbeitung
"""
import os
import sys
import json
import argparse
import hashlib
from pathlib import Path
from datetime import datetime
import time
from typing import List, Dict, Tuple, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue, Empty
import threading
from dataclasses import dataclass
import multiprocessing
import yt_dlp
import whisper
import warnings
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
TimeElapsedColumn,
MofNCompleteColumn
)
from rich.table import Table
from rich.panel import Panel
from rich.live import Live
from rich.layout import Layout
from rich.columns import Columns
from rich import print as rprint
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
console = Console()
# ASCII Art Logo
LOGO = """
[bold cyan]╔═══════════════════════════════════════════════════════╗
║ ║
║ [bold white]🚀 YouTube Transcriber v4.0 - PARALLEL[/bold white] ║
║ [dim]Multi-Threading für 3-4x Speed![/dim] ║
║ ║
╚═══════════════════════════════════════════════════════╝[/bold cyan]
"""
@dataclass
class VideoJob:
"""Datenklasse für Video-Jobs"""
url: str
playlist_name: Optional[str] = None
language: str = "de"
status: str = "pending" # pending, downloading, transcribing, completed, failed
error: Optional[str] = None
output_path: Optional[str] = None
title: Optional[str] = None
duration: Optional[int] = None
class ParallelTranscriber:
def __init__(self,
model_size="base",
output_dir="transcripts",
cache_dir=".cache",
max_downloads=3,
max_transcriptions=2):
"""
Initialisiert den Parallel-Transcriber
Args:
max_downloads: Maximale parallele Downloads
max_transcriptions: Maximale parallele Transkriptionen
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.cache_file = self.cache_dir / "transcribed_videos.json"
self.temp_dir = Path("temp_audio")
self.temp_dir.mkdir(exist_ok=True)
# Parallel-Processing Settings
self.max_downloads = max_downloads
self.max_transcriptions = max_transcriptions
# Optimale Werte für M1/M2 Macs
if model_size == "large":
self.max_transcriptions = min(2, max_transcriptions) # Max 2 Large-Modelle parallel
elif model_size in ["tiny", "base"]:
self.max_transcriptions = min(4, max_transcriptions) # Bis zu 4 kleine Modelle
# Queues für Pipeline
self.download_queue = Queue()
self.transcribe_queue = Queue()
self.completed_queue = Queue()
# Thread Pools
self.download_pool = ThreadPoolExecutor(max_workers=self.max_downloads)
self.transcribe_pool = ThreadPoolExecutor(max_workers=self.max_transcriptions)
# Jobs tracking
self.jobs: Dict[str, VideoJob] = {}
self.lock = threading.Lock()
# Lade Cache
self.cache = self.load_cache()
# Model Settings
self.model_size = model_size
self.model_speeds = {
'tiny': 10,
'base': 7,
'small': 4,
'medium': 2,
'large': 1
}
# Progress tracking
self.progress = None
self.main_task = None
console.print(f"[bold green]⚡ Parallel-Modus aktiviert:[/bold green]")
console.print(f" • Max Downloads: {self.max_downloads}")
console.print(f" • Max Transkriptionen: {self.max_transcriptions}")
console.print(f" • Whisper Model: {model_size}")
def load_cache(self):
"""Lädt den Cache"""
if self.cache_file.exists():
with open(self.cache_file, 'r') as f:
return json.load(f)
return {}
def save_cache(self):
"""Speichert den Cache"""
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f, indent=2)
def get_video_hash(self, url):
"""Erstellt einen Hash für die Video-URL"""
return hashlib.md5(url.encode()).hexdigest()
def is_cached(self, url):
"""Prüft ob Video bereits transkribiert wurde"""
video_hash = self.get_video_hash(url)
if video_hash in self.cache:
cached_info = self.cache[video_hash]
output_file = Path(cached_info['output_file'])
if output_file.exists():
return cached_info
return None
def download_worker(self, job: VideoJob) -> Tuple[Optional[Path], Dict]:
"""
Worker-Funktion für Downloads
Läuft in einem Thread
"""
try:
with self.lock:
job.status = "downloading"
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': str(self.temp_dir / f'%(id)s_%(title)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(job.url, download=True)
# Finde die heruntergeladene Datei
video_id = info.get('id', '')
audio_files = list(self.temp_dir.glob(f"{video_id}*.mp3"))
if not audio_files:
raise Exception("Audio-Datei nicht gefunden")
audio_file = audio_files[0]
video_info = {
'title': info.get('title', 'unknown'),
'channel': info.get('uploader', 'unknown'),
'duration': info.get('duration', 0),
'url': job.url
}
with self.lock:
job.title = video_info['title']
job.duration = video_info['duration']
return audio_file, video_info
except Exception as e:
with self.lock:
job.status = "failed"
job.error = str(e)
console.print(f"[red]❌ Download-Fehler für {job.url}: {e}[/red]")
return None, {}
def transcribe_worker(self, model, audio_path: Path, job: VideoJob, video_info: Dict) -> Optional[str]:
"""
Worker-Funktion für Transkription
Läuft in einem Thread mit eigenem Whisper-Model
"""
try:
with self.lock:
job.status = "transcribing"
# Transkribiere
result = model.transcribe(
str(audio_path),
language=job.language,
verbose=False,
fp16=False # Für M1 Mac
)
transcript = result['text']
# Speichere Transkript
output_path = self.save_transcript(transcript, video_info, job.playlist_name)
# Update Cache
video_hash = self.get_video_hash(job.url)
self.cache[video_hash] = {
'url': job.url,
'title': video_info['title'],
'output_file': str(output_path),
'transcribed_at': datetime.now().isoformat(),
'model': self.model_size,
'playlist': job.playlist_name
}
self.save_cache()
# Lösche Audio-Datei
try:
audio_path.unlink()
except:
pass
with self.lock:
job.status = "completed"
job.output_path = str(output_path)
return str(output_path)
except Exception as e:
with self.lock:
job.status = "failed"
job.error = str(e)
console.print(f"[red]❌ Transkriptions-Fehler: {e}[/red]")
return None
def save_transcript(self, text, video_info, playlist_name=None):
"""Speichert Transkript"""
base_dir = self.output_dir
if playlist_name:
base_dir = base_dir / playlist_name.replace('/', '_')
base_dir.mkdir(parents=True, exist_ok=True)
channel_dir = base_dir / video_info['channel'].replace('/', '_')
channel_dir.mkdir(exist_ok=True)
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{safe_title}_{timestamp}.txt"
filepath = channel_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"YouTube Transkription\n")
f.write("=" * 50 + "\n\n")
f.write(f"Titel: {video_info['title']}\n")
f.write(f"Kanal: {video_info['channel']}\n")
f.write(f"URL: {video_info['url']}\n")
if playlist_name:
f.write(f"Playlist: {playlist_name}\n")
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
f.write(f"Whisper Model: {self.model_size}\n")
f.write("\n" + "=" * 50 + "\n\n")
f.write("TRANSKRIPTION:\n\n")
f.write(text)
return filepath
def process_pipeline(self, urls: List[str], playlist_name: Optional[str] = None, language: str = "de"):
"""
Haupt-Pipeline für parallele Verarbeitung
"""
# Filtere bereits transkribierte Videos
jobs_to_process = []
cached_count = 0
for url in urls:
if self.is_cached(url):
cached_count += 1
else:
job = VideoJob(url=url, playlist_name=playlist_name, language=language)
self.jobs[url] = job
jobs_to_process.append(job)
if not jobs_to_process:
console.print("[green]✅ Alle Videos bereits transkribiert![/green]")
return
# Status-Übersicht
console.print(Panel(
f"[bold]🚀 Starte parallele Verarbeitung[/bold]\n\n"
f"📊 Gesamt: {len(urls)} Videos\n"
f"✅ Gecached: {cached_count}\n"
f"🆕 Zu verarbeiten: {len(jobs_to_process)}\n\n"
f"⚡ Downloads: {self.max_downloads} parallel\n"
f"🎙️ Transkriptionen: {self.max_transcriptions} parallel",
border_style="cyan"
))
# Lade Whisper-Modelle (eines pro Thread)
console.print(f"\n[cyan]⏳ Lade {self.max_transcriptions}x Whisper {self.model_size} Modelle...[/cyan]")
models = []
for i in range(self.max_transcriptions):
model = whisper.load_model(self.model_size)
models.append(model)
console.print(f" ✅ Model {i+1}/{self.max_transcriptions} geladen")
# Progress Bar
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=console
) as progress:
main_task = progress.add_task(
"[cyan]Verarbeite Videos...",
total=len(jobs_to_process)
)
# Futures für Downloads und Transkriptionen
download_futures = {}
transcribe_futures = {}
model_pool = models.copy() # Pool verfügbarer Modelle
# Starte initiale Downloads
for job in jobs_to_process[:self.max_downloads]:
future = self.download_pool.submit(self.download_worker, job)
download_futures[future] = job
remaining_jobs = jobs_to_process[self.max_downloads:]
completed_count = 0
# Haupt-Loop
while download_futures or transcribe_futures or remaining_jobs:
# Prüfe fertige Downloads
for future in list(download_futures.keys()):
if future.done():
job = download_futures.pop(future)
audio_path, video_info = future.result()
if audio_path and model_pool:
# Starte Transkription wenn Model verfügbar
model = model_pool.pop()
trans_future = self.transcribe_pool.submit(
self.transcribe_worker, model, audio_path, job, video_info
)
transcribe_futures[trans_future] = (job, model)
# Starte nächsten Download
if remaining_jobs:
next_job = remaining_jobs.pop(0)
future = self.download_pool.submit(self.download_worker, next_job)
download_futures[future] = next_job
# Prüfe fertige Transkriptionen
for future in list(transcribe_futures.keys()):
if future.done():
job, model = transcribe_futures.pop(future)
result = future.result()
# Model zurück in Pool
model_pool.append(model)
if result:
completed_count += 1
progress.update(main_task, advance=1)
console.print(f"{job.title[:50]}")
else:
console.print(f" ❌ Fehler bei: {job.url}")
# Kurze Pause für CPU
time.sleep(0.1)
# Warte auf alle verbleibenden Tasks
for future in as_completed(list(download_futures.keys()) + list(transcribe_futures.keys())):
pass
# Zusammenfassung
console.print("\n" + "=" * 60)
console.print(f"[bold green]✅ Verarbeitung abgeschlossen![/bold green]")
console.print(f"Erfolgreich: {completed_count}/{len(jobs_to_process)}")
# Zeige Fehler falls vorhanden
failed_jobs = [j for j in jobs_to_process if j.status == "failed"]
if failed_jobs:
console.print(f"\n[red]Fehlerhafte Videos:[/red]")
for job in failed_jobs:
console.print(f"{job.url}: {job.error}")
def process_playlist_file(self, playlist_path: Path, language: str = "de"):
"""Verarbeitet eine Playlist-Datei"""
urls = []
with open(playlist_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
if 'youtube.com' in line or 'youtu.be' in line:
urls.append(line)
if urls:
playlist_name = playlist_path.stem
self.process_pipeline(urls, playlist_name, language)
else:
console.print(f"[yellow]Keine URLs in {playlist_path}[/yellow]")
def benchmark_parallel_vs_sequential():
"""
Benchmark-Funktion zum Vergleich
"""
console.print("\n[bold cyan]📊 Performance-Vergleich[/bold cyan]")
table = Table(title="Geschwindigkeitsvergleich")
table.add_column("Modus", style="cyan")
table.add_column("10 Videos (je 5 Min)", style="white")
table.add_column("Speedup", style="green")
table.add_row(
"Sequenziell (v3)",
"~50 Minuten",
"1x"
)
table.add_row(
"Parallel 2 Downloads",
"~25 Minuten",
"2x"
)
table.add_row(
"Parallel 3 Downloads + 2 Transkriptionen",
"~15 Minuten",
"3.3x"
)
console.print(table)
def main():
parser = argparse.ArgumentParser(
description='YouTube Transcriber v4.0 - PARALLEL EDITION'
)
parser.add_argument(
'command',
nargs='?',
choices=['process', 'benchmark'],
default='process',
help='Befehl: process oder benchmark'
)
parser.add_argument(
'--playlist',
help='Playlist-Datei'
)
parser.add_argument(
'--urls',
nargs='+',
help='Direkte URL-Liste'
)
parser.add_argument(
'--model',
default='base',
choices=['tiny', 'base', 'small', 'medium', 'large'],
help='Whisper Model'
)
parser.add_argument(
'--language',
default='de',
help='Sprache'
)
parser.add_argument(
'--max-downloads',
type=int,
default=3,
help='Max parallele Downloads (default: 3)'
)
parser.add_argument(
'--max-transcriptions',
type=int,
default=2,
help='Max parallele Transkriptionen (default: 2)'
)
args = parser.parse_args()
# Zeige Logo
console.print(LOGO)
if args.command == 'benchmark':
benchmark_parallel_vs_sequential()
return
# Initialisiere Parallel-Transcriber
transcriber = ParallelTranscriber(
model_size=args.model,
max_downloads=args.max_downloads,
max_transcriptions=args.max_transcriptions
)
if args.playlist:
# Verarbeite Playlist-Datei
playlist_path = Path(args.playlist)
if playlist_path.exists():
transcriber.process_playlist_file(playlist_path, args.language)
else:
console.print(f"[red]Playlist nicht gefunden: {args.playlist}[/red]")
elif args.urls:
# Verarbeite direkte URLs
transcriber.process_pipeline(args.urls, language=args.language)
else:
console.print("[yellow]Bitte URLs oder Playlist angeben![/yellow]")
console.print("\nBeispiele:")
console.print(" python3 transcriber_v4_parallel.py --urls URL1 URL2 URL3")
console.print(" python3 transcriber_v4_parallel.py --playlist playlists/tech/python.txt")
if __name__ == "__main__":
main()