mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-15 01:21:09 +02:00
Move inactive projects out of active workspace: - bauntown (community website) - maerchenzauber (AI story generation) - memoro (voice memo app) - news (news aggregation) - nutriphi (nutrition tracking) - reader (reading app) - uload (URL shortener) - wisekeep (AI wisdom extraction) Update CLAUDE.md documentation: - Add presi to active projects - Document archived projects section - Update workspace configuration Archived apps can be re-activated by moving back to apps/ 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
559 lines
No EOL
19 KiB
Python
559 lines
No EOL
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
YouTube Auto-Transcriber v4.0 - PARALLEL EDITION
|
|
Mit Multi-Threading für 3-4x schnellere Verarbeitung
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import hashlib
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import time
|
|
from typing import List, Dict, Tuple, Optional
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from queue import Queue, Empty
|
|
import threading
|
|
from dataclasses import dataclass
|
|
import multiprocessing
|
|
|
|
import yt_dlp
|
|
import whisper
|
|
import warnings
|
|
|
|
from rich.console import Console
|
|
from rich.progress import (
|
|
Progress,
|
|
SpinnerColumn,
|
|
TextColumn,
|
|
BarColumn,
|
|
TaskProgressColumn,
|
|
TimeRemainingColumn,
|
|
TimeElapsedColumn,
|
|
MofNCompleteColumn
|
|
)
|
|
from rich.table import Table
|
|
from rich.panel import Panel
|
|
from rich.live import Live
|
|
from rich.layout import Layout
|
|
from rich.columns import Columns
|
|
from rich import print as rprint
|
|
|
|
warnings.filterwarnings("ignore", category=FutureWarning)
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
|
|
console = Console()
|
|
|
|
# ASCII Art Logo
|
|
LOGO = """
|
|
[bold cyan]╔═══════════════════════════════════════════════════════╗
|
|
║ ║
|
|
║ [bold white]🚀 YouTube Transcriber v4.0 - PARALLEL[/bold white] ║
|
|
║ [dim]Multi-Threading für 3-4x Speed![/dim] ║
|
|
║ ║
|
|
╚═══════════════════════════════════════════════════════╝[/bold cyan]
|
|
"""
|
|
|
|
@dataclass
|
|
class VideoJob:
|
|
"""Datenklasse für Video-Jobs"""
|
|
url: str
|
|
playlist_name: Optional[str] = None
|
|
language: str = "de"
|
|
status: str = "pending" # pending, downloading, transcribing, completed, failed
|
|
error: Optional[str] = None
|
|
output_path: Optional[str] = None
|
|
title: Optional[str] = None
|
|
duration: Optional[int] = None
|
|
|
|
|
|
class ParallelTranscriber:
|
|
def __init__(self,
|
|
model_size="base",
|
|
output_dir="transcripts",
|
|
cache_dir=".cache",
|
|
max_downloads=3,
|
|
max_transcriptions=2):
|
|
"""
|
|
Initialisiert den Parallel-Transcriber
|
|
|
|
Args:
|
|
max_downloads: Maximale parallele Downloads
|
|
max_transcriptions: Maximale parallele Transkriptionen
|
|
"""
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(exist_ok=True)
|
|
|
|
self.cache_dir = Path(cache_dir)
|
|
self.cache_dir.mkdir(exist_ok=True)
|
|
self.cache_file = self.cache_dir / "transcribed_videos.json"
|
|
|
|
self.temp_dir = Path("temp_audio")
|
|
self.temp_dir.mkdir(exist_ok=True)
|
|
|
|
# Parallel-Processing Settings
|
|
self.max_downloads = max_downloads
|
|
self.max_transcriptions = max_transcriptions
|
|
|
|
# Optimale Werte für M1/M2 Macs
|
|
if model_size == "large":
|
|
self.max_transcriptions = min(2, max_transcriptions) # Max 2 Large-Modelle parallel
|
|
elif model_size in ["tiny", "base"]:
|
|
self.max_transcriptions = min(4, max_transcriptions) # Bis zu 4 kleine Modelle
|
|
|
|
# Queues für Pipeline
|
|
self.download_queue = Queue()
|
|
self.transcribe_queue = Queue()
|
|
self.completed_queue = Queue()
|
|
|
|
# Thread Pools
|
|
self.download_pool = ThreadPoolExecutor(max_workers=self.max_downloads)
|
|
self.transcribe_pool = ThreadPoolExecutor(max_workers=self.max_transcriptions)
|
|
|
|
# Jobs tracking
|
|
self.jobs: Dict[str, VideoJob] = {}
|
|
self.lock = threading.Lock()
|
|
|
|
# Lade Cache
|
|
self.cache = self.load_cache()
|
|
|
|
# Model Settings
|
|
self.model_size = model_size
|
|
self.model_speeds = {
|
|
'tiny': 10,
|
|
'base': 7,
|
|
'small': 4,
|
|
'medium': 2,
|
|
'large': 1
|
|
}
|
|
|
|
# Progress tracking
|
|
self.progress = None
|
|
self.main_task = None
|
|
|
|
console.print(f"[bold green]⚡ Parallel-Modus aktiviert:[/bold green]")
|
|
console.print(f" • Max Downloads: {self.max_downloads}")
|
|
console.print(f" • Max Transkriptionen: {self.max_transcriptions}")
|
|
console.print(f" • Whisper Model: {model_size}")
|
|
|
|
def load_cache(self):
|
|
"""Lädt den Cache"""
|
|
if self.cache_file.exists():
|
|
with open(self.cache_file, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_cache(self):
|
|
"""Speichert den Cache"""
|
|
with open(self.cache_file, 'w') as f:
|
|
json.dump(self.cache, f, indent=2)
|
|
|
|
def get_video_hash(self, url):
|
|
"""Erstellt einen Hash für die Video-URL"""
|
|
return hashlib.md5(url.encode()).hexdigest()
|
|
|
|
def is_cached(self, url):
|
|
"""Prüft ob Video bereits transkribiert wurde"""
|
|
video_hash = self.get_video_hash(url)
|
|
if video_hash in self.cache:
|
|
cached_info = self.cache[video_hash]
|
|
output_file = Path(cached_info['output_file'])
|
|
if output_file.exists():
|
|
return cached_info
|
|
return None
|
|
|
|
def download_worker(self, job: VideoJob) -> Tuple[Optional[Path], Dict]:
|
|
"""
|
|
Worker-Funktion für Downloads
|
|
Läuft in einem Thread
|
|
"""
|
|
try:
|
|
with self.lock:
|
|
job.status = "downloading"
|
|
|
|
ydl_opts = {
|
|
'format': 'bestaudio/best',
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}],
|
|
'outtmpl': str(self.temp_dir / f'%(id)s_%(title)s.%(ext)s'),
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(job.url, download=True)
|
|
|
|
# Finde die heruntergeladene Datei
|
|
video_id = info.get('id', '')
|
|
audio_files = list(self.temp_dir.glob(f"{video_id}*.mp3"))
|
|
|
|
if not audio_files:
|
|
raise Exception("Audio-Datei nicht gefunden")
|
|
|
|
audio_file = audio_files[0]
|
|
|
|
video_info = {
|
|
'title': info.get('title', 'unknown'),
|
|
'channel': info.get('uploader', 'unknown'),
|
|
'duration': info.get('duration', 0),
|
|
'url': job.url
|
|
}
|
|
|
|
with self.lock:
|
|
job.title = video_info['title']
|
|
job.duration = video_info['duration']
|
|
|
|
return audio_file, video_info
|
|
|
|
except Exception as e:
|
|
with self.lock:
|
|
job.status = "failed"
|
|
job.error = str(e)
|
|
console.print(f"[red]❌ Download-Fehler für {job.url}: {e}[/red]")
|
|
return None, {}
|
|
|
|
def transcribe_worker(self, model, audio_path: Path, job: VideoJob, video_info: Dict) -> Optional[str]:
|
|
"""
|
|
Worker-Funktion für Transkription
|
|
Läuft in einem Thread mit eigenem Whisper-Model
|
|
"""
|
|
try:
|
|
with self.lock:
|
|
job.status = "transcribing"
|
|
|
|
# Transkribiere
|
|
result = model.transcribe(
|
|
str(audio_path),
|
|
language=job.language,
|
|
verbose=False,
|
|
fp16=False # Für M1 Mac
|
|
)
|
|
|
|
transcript = result['text']
|
|
|
|
# Speichere Transkript
|
|
output_path = self.save_transcript(transcript, video_info, job.playlist_name)
|
|
|
|
# Update Cache
|
|
video_hash = self.get_video_hash(job.url)
|
|
self.cache[video_hash] = {
|
|
'url': job.url,
|
|
'title': video_info['title'],
|
|
'output_file': str(output_path),
|
|
'transcribed_at': datetime.now().isoformat(),
|
|
'model': self.model_size,
|
|
'playlist': job.playlist_name
|
|
}
|
|
self.save_cache()
|
|
|
|
# Lösche Audio-Datei
|
|
try:
|
|
audio_path.unlink()
|
|
except:
|
|
pass
|
|
|
|
with self.lock:
|
|
job.status = "completed"
|
|
job.output_path = str(output_path)
|
|
|
|
return str(output_path)
|
|
|
|
except Exception as e:
|
|
with self.lock:
|
|
job.status = "failed"
|
|
job.error = str(e)
|
|
console.print(f"[red]❌ Transkriptions-Fehler: {e}[/red]")
|
|
return None
|
|
|
|
def save_transcript(self, text, video_info, playlist_name=None):
|
|
"""Speichert Transkript"""
|
|
base_dir = self.output_dir
|
|
|
|
if playlist_name:
|
|
base_dir = base_dir / playlist_name.replace('/', '_')
|
|
base_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
channel_dir = base_dir / video_info['channel'].replace('/', '_')
|
|
channel_dir.mkdir(exist_ok=True)
|
|
|
|
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{safe_title}_{timestamp}.txt"
|
|
|
|
filepath = channel_dir / filename
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(f"YouTube Transkription\n")
|
|
f.write("=" * 50 + "\n\n")
|
|
f.write(f"Titel: {video_info['title']}\n")
|
|
f.write(f"Kanal: {video_info['channel']}\n")
|
|
f.write(f"URL: {video_info['url']}\n")
|
|
if playlist_name:
|
|
f.write(f"Playlist: {playlist_name}\n")
|
|
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
|
|
f.write(f"Whisper Model: {self.model_size}\n")
|
|
f.write("\n" + "=" * 50 + "\n\n")
|
|
f.write("TRANSKRIPTION:\n\n")
|
|
f.write(text)
|
|
|
|
return filepath
|
|
|
|
def process_pipeline(self, urls: List[str], playlist_name: Optional[str] = None, language: str = "de"):
|
|
"""
|
|
Haupt-Pipeline für parallele Verarbeitung
|
|
"""
|
|
# Filtere bereits transkribierte Videos
|
|
jobs_to_process = []
|
|
cached_count = 0
|
|
|
|
for url in urls:
|
|
if self.is_cached(url):
|
|
cached_count += 1
|
|
else:
|
|
job = VideoJob(url=url, playlist_name=playlist_name, language=language)
|
|
self.jobs[url] = job
|
|
jobs_to_process.append(job)
|
|
|
|
if not jobs_to_process:
|
|
console.print("[green]✅ Alle Videos bereits transkribiert![/green]")
|
|
return
|
|
|
|
# Status-Übersicht
|
|
console.print(Panel(
|
|
f"[bold]🚀 Starte parallele Verarbeitung[/bold]\n\n"
|
|
f"📊 Gesamt: {len(urls)} Videos\n"
|
|
f"✅ Gecached: {cached_count}\n"
|
|
f"🆕 Zu verarbeiten: {len(jobs_to_process)}\n\n"
|
|
f"⚡ Downloads: {self.max_downloads} parallel\n"
|
|
f"🎙️ Transkriptionen: {self.max_transcriptions} parallel",
|
|
border_style="cyan"
|
|
))
|
|
|
|
# Lade Whisper-Modelle (eines pro Thread)
|
|
console.print(f"\n[cyan]⏳ Lade {self.max_transcriptions}x Whisper {self.model_size} Modelle...[/cyan]")
|
|
models = []
|
|
for i in range(self.max_transcriptions):
|
|
model = whisper.load_model(self.model_size)
|
|
models.append(model)
|
|
console.print(f" ✅ Model {i+1}/{self.max_transcriptions} geladen")
|
|
|
|
# Progress Bar
|
|
with Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
MofNCompleteColumn(),
|
|
TimeElapsedColumn(),
|
|
TimeRemainingColumn(),
|
|
console=console
|
|
) as progress:
|
|
|
|
main_task = progress.add_task(
|
|
"[cyan]Verarbeite Videos...",
|
|
total=len(jobs_to_process)
|
|
)
|
|
|
|
# Futures für Downloads und Transkriptionen
|
|
download_futures = {}
|
|
transcribe_futures = {}
|
|
model_pool = models.copy() # Pool verfügbarer Modelle
|
|
|
|
# Starte initiale Downloads
|
|
for job in jobs_to_process[:self.max_downloads]:
|
|
future = self.download_pool.submit(self.download_worker, job)
|
|
download_futures[future] = job
|
|
|
|
remaining_jobs = jobs_to_process[self.max_downloads:]
|
|
completed_count = 0
|
|
|
|
# Haupt-Loop
|
|
while download_futures or transcribe_futures or remaining_jobs:
|
|
|
|
# Prüfe fertige Downloads
|
|
for future in list(download_futures.keys()):
|
|
if future.done():
|
|
job = download_futures.pop(future)
|
|
audio_path, video_info = future.result()
|
|
|
|
if audio_path and model_pool:
|
|
# Starte Transkription wenn Model verfügbar
|
|
model = model_pool.pop()
|
|
trans_future = self.transcribe_pool.submit(
|
|
self.transcribe_worker, model, audio_path, job, video_info
|
|
)
|
|
transcribe_futures[trans_future] = (job, model)
|
|
|
|
# Starte nächsten Download
|
|
if remaining_jobs:
|
|
next_job = remaining_jobs.pop(0)
|
|
future = self.download_pool.submit(self.download_worker, next_job)
|
|
download_futures[future] = next_job
|
|
|
|
# Prüfe fertige Transkriptionen
|
|
for future in list(transcribe_futures.keys()):
|
|
if future.done():
|
|
job, model = transcribe_futures.pop(future)
|
|
result = future.result()
|
|
|
|
# Model zurück in Pool
|
|
model_pool.append(model)
|
|
|
|
if result:
|
|
completed_count += 1
|
|
progress.update(main_task, advance=1)
|
|
console.print(f" ✅ {job.title[:50]}")
|
|
else:
|
|
console.print(f" ❌ Fehler bei: {job.url}")
|
|
|
|
# Kurze Pause für CPU
|
|
time.sleep(0.1)
|
|
|
|
# Warte auf alle verbleibenden Tasks
|
|
for future in as_completed(list(download_futures.keys()) + list(transcribe_futures.keys())):
|
|
pass
|
|
|
|
# Zusammenfassung
|
|
console.print("\n" + "=" * 60)
|
|
console.print(f"[bold green]✅ Verarbeitung abgeschlossen![/bold green]")
|
|
console.print(f"Erfolgreich: {completed_count}/{len(jobs_to_process)}")
|
|
|
|
# Zeige Fehler falls vorhanden
|
|
failed_jobs = [j for j in jobs_to_process if j.status == "failed"]
|
|
if failed_jobs:
|
|
console.print(f"\n[red]Fehlerhafte Videos:[/red]")
|
|
for job in failed_jobs:
|
|
console.print(f" • {job.url}: {job.error}")
|
|
|
|
def process_playlist_file(self, playlist_path: Path, language: str = "de"):
|
|
"""Verarbeitet eine Playlist-Datei"""
|
|
urls = []
|
|
with open(playlist_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
if 'youtube.com' in line or 'youtu.be' in line:
|
|
urls.append(line)
|
|
|
|
if urls:
|
|
playlist_name = playlist_path.stem
|
|
self.process_pipeline(urls, playlist_name, language)
|
|
else:
|
|
console.print(f"[yellow]Keine URLs in {playlist_path}[/yellow]")
|
|
|
|
|
|
def benchmark_parallel_vs_sequential():
|
|
"""
|
|
Benchmark-Funktion zum Vergleich
|
|
"""
|
|
console.print("\n[bold cyan]📊 Performance-Vergleich[/bold cyan]")
|
|
|
|
table = Table(title="Geschwindigkeitsvergleich")
|
|
table.add_column("Modus", style="cyan")
|
|
table.add_column("10 Videos (je 5 Min)", style="white")
|
|
table.add_column("Speedup", style="green")
|
|
|
|
table.add_row(
|
|
"Sequenziell (v3)",
|
|
"~50 Minuten",
|
|
"1x"
|
|
)
|
|
table.add_row(
|
|
"Parallel 2 Downloads",
|
|
"~25 Minuten",
|
|
"2x"
|
|
)
|
|
table.add_row(
|
|
"Parallel 3 Downloads + 2 Transkriptionen",
|
|
"~15 Minuten",
|
|
"3.3x"
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='YouTube Transcriber v4.0 - PARALLEL EDITION'
|
|
)
|
|
parser.add_argument(
|
|
'command',
|
|
nargs='?',
|
|
choices=['process', 'benchmark'],
|
|
default='process',
|
|
help='Befehl: process oder benchmark'
|
|
)
|
|
parser.add_argument(
|
|
'--playlist',
|
|
help='Playlist-Datei'
|
|
)
|
|
parser.add_argument(
|
|
'--urls',
|
|
nargs='+',
|
|
help='Direkte URL-Liste'
|
|
)
|
|
parser.add_argument(
|
|
'--model',
|
|
default='base',
|
|
choices=['tiny', 'base', 'small', 'medium', 'large'],
|
|
help='Whisper Model'
|
|
)
|
|
parser.add_argument(
|
|
'--language',
|
|
default='de',
|
|
help='Sprache'
|
|
)
|
|
parser.add_argument(
|
|
'--max-downloads',
|
|
type=int,
|
|
default=3,
|
|
help='Max parallele Downloads (default: 3)'
|
|
)
|
|
parser.add_argument(
|
|
'--max-transcriptions',
|
|
type=int,
|
|
default=2,
|
|
help='Max parallele Transkriptionen (default: 2)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Zeige Logo
|
|
console.print(LOGO)
|
|
|
|
if args.command == 'benchmark':
|
|
benchmark_parallel_vs_sequential()
|
|
return
|
|
|
|
# Initialisiere Parallel-Transcriber
|
|
transcriber = ParallelTranscriber(
|
|
model_size=args.model,
|
|
max_downloads=args.max_downloads,
|
|
max_transcriptions=args.max_transcriptions
|
|
)
|
|
|
|
if args.playlist:
|
|
# Verarbeite Playlist-Datei
|
|
playlist_path = Path(args.playlist)
|
|
if playlist_path.exists():
|
|
transcriber.process_playlist_file(playlist_path, args.language)
|
|
else:
|
|
console.print(f"[red]Playlist nicht gefunden: {args.playlist}[/red]")
|
|
|
|
elif args.urls:
|
|
# Verarbeite direkte URLs
|
|
transcriber.process_pipeline(args.urls, language=args.language)
|
|
|
|
else:
|
|
console.print("[yellow]Bitte URLs oder Playlist angeben![/yellow]")
|
|
console.print("\nBeispiele:")
|
|
console.print(" python3 transcriber_v4_parallel.py --urls URL1 URL2 URL3")
|
|
console.print(" python3 transcriber_v4_parallel.py --playlist playlists/tech/python.txt")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |