mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-15 20:59:40 +02:00
Move inactive projects out of active workspace: - bauntown (community website) - maerchenzauber (AI story generation) - memoro (voice memo app) - news (news aggregation) - nutriphi (nutrition tracking) - reader (reading app) - uload (URL shortener) - wisekeep (AI wisdom extraction) Update CLAUDE.md documentation: - Add presi to active projects - Document archived projects section - Update workspace configuration Archived apps can be re-activated by moving back to apps/ 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
476 lines
No EOL
17 KiB
Python
Executable file
476 lines
No EOL
17 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
YouTube Auto-Transcriber v2.0
|
|
Mit verbesserter Download-Experience und Rich UI
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import hashlib
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
import time
|
|
import yt_dlp
|
|
import whisper
|
|
import warnings
|
|
|
|
from rich.console import Console
|
|
from rich.progress import (
|
|
Progress,
|
|
SpinnerColumn,
|
|
TextColumn,
|
|
BarColumn,
|
|
TaskProgressColumn,
|
|
TimeRemainingColumn,
|
|
TimeElapsedColumn,
|
|
DownloadColumn,
|
|
TransferSpeedColumn
|
|
)
|
|
from rich.table import Table
|
|
from rich.panel import Panel
|
|
from rich.live import Live
|
|
from rich.layout import Layout
|
|
from rich import print as rprint
|
|
|
|
warnings.filterwarnings("ignore", category=FutureWarning)
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
|
|
console = Console()
|
|
|
|
# ASCII Art Logo
|
|
LOGO = """
|
|
[bold cyan]╔═══════════════════════════════════════════════════════╗
|
|
║ ║
|
|
║ [bold white]🎥 YouTube Auto-Transcriber v2.0[/bold white] ║
|
|
║ [dim]Powered by OpenAI Whisper & yt-dlp[/dim] ║
|
|
║ ║
|
|
╚═══════════════════════════════════════════════════════╝[/bold cyan]
|
|
"""
|
|
|
|
class YouTubeTranscriber:
|
|
def __init__(self, model_size="base", output_dir="transcripts", cache_dir=".cache"):
|
|
"""
|
|
Initialisiert den Transcriber mit Rich UI
|
|
"""
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(exist_ok=True)
|
|
|
|
self.cache_dir = Path(cache_dir)
|
|
self.cache_dir.mkdir(exist_ok=True)
|
|
self.cache_file = self.cache_dir / "transcribed_videos.json"
|
|
|
|
self.temp_dir = Path("temp_audio")
|
|
self.temp_dir.mkdir(exist_ok=True)
|
|
|
|
# Lade Cache
|
|
self.cache = self.load_cache()
|
|
|
|
# Lade Whisper Model mit Progress
|
|
with console.status(f"[bold green]⏳ Lade Whisper Model '{model_size}'...", spinner="dots"):
|
|
self.model = whisper.load_model(model_size)
|
|
|
|
console.print(f"[bold green]✅ Model geladen: {model_size}[/bold green]")
|
|
|
|
# Model-Geschwindigkeiten (ungefähre Werte)
|
|
self.model_speeds = {
|
|
'tiny': 10,
|
|
'base': 7,
|
|
'small': 4,
|
|
'medium': 2,
|
|
'large': 1
|
|
}
|
|
self.model_size = model_size
|
|
self.speed_factor = self.model_speeds.get(model_size, 3)
|
|
|
|
self.ydl_opts = {
|
|
'format': 'bestaudio/best',
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}],
|
|
'outtmpl': str(self.temp_dir / '%(title)s.%(ext)s'),
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'progress_hooks': [self._download_progress_hook],
|
|
}
|
|
|
|
self.current_progress = None
|
|
self.download_task = None
|
|
|
|
def load_cache(self):
|
|
"""Lädt den Cache bereits transkribierter Videos"""
|
|
if self.cache_file.exists():
|
|
with open(self.cache_file, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_cache(self):
|
|
"""Speichert den Cache"""
|
|
with open(self.cache_file, 'w') as f:
|
|
json.dump(self.cache, f, indent=2)
|
|
|
|
def get_video_hash(self, url):
|
|
"""Erstellt einen Hash für die Video-URL"""
|
|
return hashlib.md5(url.encode()).hexdigest()
|
|
|
|
def is_cached(self, url):
|
|
"""Prüft ob Video bereits transkribiert wurde"""
|
|
video_hash = self.get_video_hash(url)
|
|
if video_hash in self.cache:
|
|
cached_info = self.cache[video_hash]
|
|
output_file = Path(cached_info['output_file'])
|
|
if output_file.exists():
|
|
return cached_info
|
|
return None
|
|
|
|
def _download_progress_hook(self, d):
|
|
"""Progress Hook für yt-dlp"""
|
|
if d['status'] == 'downloading' and self.download_task:
|
|
if d.get('total_bytes'):
|
|
downloaded = d.get('downloaded_bytes', 0)
|
|
total = d['total_bytes']
|
|
self.current_progress.update(self.download_task, completed=downloaded, total=total)
|
|
elif d.get('total_bytes_estimate'):
|
|
downloaded = d.get('downloaded_bytes', 0)
|
|
total = d['total_bytes_estimate']
|
|
self.current_progress.update(self.download_task, completed=downloaded, total=total)
|
|
|
|
def get_video_info(self, url):
|
|
"""
|
|
Holt Video-Informationen VOR dem Download
|
|
"""
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'extract_flat': False,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
try:
|
|
info = ydl.extract_info(url, download=False)
|
|
return {
|
|
'title': info.get('title', 'Unbekannt'),
|
|
'channel': info.get('uploader', 'Unbekannt'),
|
|
'duration': info.get('duration', 0),
|
|
'view_count': info.get('view_count', 0),
|
|
'upload_date': info.get('upload_date', ''),
|
|
'description': info.get('description', '')[:200],
|
|
'filesize': info.get('filesize', 0) or info.get('filesize_approx', 0)
|
|
}
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Fehler beim Abrufen der Video-Info: {e}[/red]")
|
|
return None
|
|
|
|
def display_video_info(self, info):
|
|
"""Zeigt Video-Informationen in einer schönen Tabelle"""
|
|
if not info:
|
|
return
|
|
|
|
# Erstelle Info-Tabelle
|
|
table = Table(title="📹 Video Information", show_header=False, box=None)
|
|
table.add_column("Property", style="cyan", width=20)
|
|
table.add_column("Value", style="white")
|
|
|
|
table.add_row("Titel", info['title'][:60] + "..." if len(info['title']) > 60 else info['title'])
|
|
table.add_row("Kanal", info['channel'])
|
|
|
|
duration = info['duration']
|
|
duration_str = f"{duration//60}:{duration%60:02d} Minuten"
|
|
table.add_row("Dauer", duration_str)
|
|
|
|
# Zeitschätzung für Transkription
|
|
estimated_time = duration / self.speed_factor
|
|
eta_str = f"~{estimated_time//60:.0f}:{estimated_time%60:02.0f} Minuten"
|
|
table.add_row("Geschätzte Zeit", f"{eta_str} (mit {self.model_size} model)")
|
|
|
|
if info.get('view_count'):
|
|
views = f"{info['view_count']:,}".replace(',', '.')
|
|
table.add_row("Aufrufe", views)
|
|
|
|
console.print(Panel(table, border_style="cyan"))
|
|
|
|
# Warnung bei langen Videos
|
|
if duration > 1800: # 30 Minuten
|
|
console.print(f"[yellow]⚠️ Hinweis: Dieses Video ist über 30 Minuten lang. Die Transkription kann einige Zeit dauern.[/yellow]")
|
|
|
|
return estimated_time
|
|
|
|
def download_audio(self, url, progress):
|
|
"""
|
|
Lädt Audio mit Progress Bar herunter
|
|
"""
|
|
self.current_progress = progress
|
|
self.download_task = progress.add_task(
|
|
"[cyan]📥 Download Audio...",
|
|
total=None
|
|
)
|
|
|
|
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
|
|
try:
|
|
info = ydl.extract_info(url, download=True)
|
|
title = info.get('title', 'unknown')
|
|
channel = info.get('uploader', 'unknown')
|
|
duration = info.get('duration', 0)
|
|
|
|
# Finde die heruntergeladene Audio-Datei
|
|
audio_file = None
|
|
for file in self.temp_dir.glob("*.mp3"):
|
|
if file.stat().st_mtime > (datetime.now().timestamp() - 60):
|
|
audio_file = file
|
|
break
|
|
|
|
if not audio_file:
|
|
raise Exception("Audio-Datei nicht gefunden")
|
|
|
|
progress.update(self.download_task, completed=100, total=100)
|
|
|
|
return audio_file, {
|
|
'title': title,
|
|
'channel': channel,
|
|
'duration': duration,
|
|
'url': url
|
|
}
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Fehler beim Download: {e}[/red]")
|
|
return None, None
|
|
|
|
def transcribe_audio(self, audio_path, language="de", progress=None):
|
|
"""
|
|
Transkribiert Audio-Datei mit Progress Bar
|
|
"""
|
|
if progress:
|
|
task = progress.add_task(
|
|
f"[green]🎙️ Transkribiere mit {self.model_size} model...",
|
|
total=100
|
|
)
|
|
|
|
try:
|
|
# Simuliere Progress (Whisper hat keine direkte Progress-API)
|
|
def progress_callback(current, total):
|
|
if progress:
|
|
progress.update(task, completed=min(current, 100))
|
|
|
|
result = self.model.transcribe(
|
|
str(audio_path),
|
|
language=language,
|
|
verbose=False,
|
|
fp16=False # Für M1 Mac
|
|
)
|
|
|
|
if progress:
|
|
progress.update(task, completed=100)
|
|
|
|
return result['text'], result.get('language', 'unbekannt')
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Fehler bei Transkription: {e}[/red]")
|
|
return None, None
|
|
|
|
def save_transcript(self, text, video_info, detected_language=None):
|
|
"""
|
|
Speichert Transkript als Textdatei
|
|
"""
|
|
# Erstelle sicheren Dateinamen
|
|
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{safe_title}_{timestamp}.txt"
|
|
|
|
# Erstelle Kanal-Ordner
|
|
channel_dir = self.output_dir / video_info['channel'].replace('/', '_')
|
|
channel_dir.mkdir(exist_ok=True)
|
|
|
|
filepath = channel_dir / filename
|
|
|
|
# Schreibe Transkript mit Metadaten
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(f"YouTube Transkription\n")
|
|
f.write("=" * 50 + "\n\n")
|
|
f.write(f"Titel: {video_info['title']}\n")
|
|
f.write(f"Kanal: {video_info['channel']}\n")
|
|
f.write(f"URL: {video_info['url']}\n")
|
|
f.write(f"Dauer: {video_info['duration']//60}:{video_info['duration']%60:02d} Minuten\n")
|
|
if detected_language:
|
|
f.write(f"Erkannte Sprache: {detected_language}\n")
|
|
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
|
|
f.write(f"Whisper Model: {self.model_size}\n")
|
|
f.write("\n" + "=" * 50 + "\n\n")
|
|
f.write("TRANSKRIPTION:\n\n")
|
|
f.write(text)
|
|
|
|
return filepath
|
|
|
|
def cleanup_temp_files(self):
|
|
"""Löscht temporäre Audio-Dateien"""
|
|
for file in self.temp_dir.glob("*.mp3"):
|
|
try:
|
|
file.unlink()
|
|
except:
|
|
pass
|
|
|
|
def process_video(self, url, language="de", force_reprocess=False):
|
|
"""
|
|
Kompletter Workflow mit Rich UI
|
|
"""
|
|
console.rule(f"[bold blue]Verarbeite Video[/bold blue]")
|
|
|
|
# Prüfe Cache
|
|
if not force_reprocess:
|
|
cached = self.is_cached(url)
|
|
if cached:
|
|
console.print(f"[yellow]⚠️ Video bereits transkribiert:[/yellow]")
|
|
console.print(f" 📁 {cached['output_file']}")
|
|
console.print(f" 📅 {cached['transcribed_at']}")
|
|
console.print(f"[dim] (Nutze --force um neu zu transkribieren)[/dim]")
|
|
return cached['output_file']
|
|
|
|
# Hole Video-Info vorab
|
|
console.print("\n[cyan]📊 Lade Video-Informationen...[/cyan]")
|
|
video_info = self.get_video_info(url)
|
|
if not video_info:
|
|
return None
|
|
|
|
estimated_time = self.display_video_info(video_info)
|
|
|
|
# Multi-Progress für Download und Transkription
|
|
with Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
TaskProgressColumn(),
|
|
TimeElapsedColumn(),
|
|
console=console
|
|
) as progress:
|
|
|
|
# 1. Download Audio
|
|
audio_path, download_info = self.download_audio(url, progress)
|
|
if not audio_path:
|
|
return None
|
|
|
|
# 2. Transkribiere
|
|
transcript, detected_lang = self.transcribe_audio(audio_path, language, progress)
|
|
if not transcript:
|
|
return None
|
|
|
|
# 3. Speichern
|
|
output_path = self.save_transcript(transcript, download_info, detected_lang)
|
|
|
|
# 4. Cache aktualisieren
|
|
video_hash = self.get_video_hash(url)
|
|
self.cache[video_hash] = {
|
|
'url': url,
|
|
'title': download_info['title'],
|
|
'output_file': str(output_path),
|
|
'transcribed_at': datetime.now().isoformat(),
|
|
'model': self.model_size,
|
|
'language': detected_lang
|
|
}
|
|
self.save_cache()
|
|
|
|
# 5. Aufräumen
|
|
self.cleanup_temp_files()
|
|
|
|
# Erfolgs-Meldung
|
|
console.print("\n[bold green]✅ Video erfolgreich verarbeitet![/bold green]")
|
|
console.print(f"📁 Gespeichert: [cyan]{output_path}[/cyan]")
|
|
|
|
return output_path
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='YouTube Video Transcriber v2.0 - Mit verbesserter UI'
|
|
)
|
|
parser.add_argument(
|
|
'url',
|
|
nargs='?',
|
|
help='YouTube Video URL'
|
|
)
|
|
parser.add_argument(
|
|
'--model',
|
|
default='base',
|
|
choices=['tiny', 'base', 'small', 'medium', 'large'],
|
|
help='Whisper Model Größe (default: base)'
|
|
)
|
|
parser.add_argument(
|
|
'--language',
|
|
default='de',
|
|
help='Sprache für Transkription (default: de)'
|
|
)
|
|
parser.add_argument(
|
|
'--output',
|
|
default='transcripts',
|
|
help='Ausgabe-Verzeichnis (default: transcripts)'
|
|
)
|
|
parser.add_argument(
|
|
'--batch',
|
|
action='store_true',
|
|
help='Batch-Modus: URLs aus stdin lesen'
|
|
)
|
|
parser.add_argument(
|
|
'--force',
|
|
action='store_true',
|
|
help='Ignoriere Cache und transkribiere neu'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Zeige Logo
|
|
console.print(LOGO)
|
|
|
|
# Initialisiere Transcriber
|
|
transcriber = YouTubeTranscriber(
|
|
model_size=args.model,
|
|
output_dir=args.output
|
|
)
|
|
|
|
if args.batch:
|
|
# Batch-Modus
|
|
console.print("[cyan]📋 Batch-Modus: Gebe URLs ein (eine pro Zeile, beende mit Ctrl+D):[/cyan]")
|
|
urls = []
|
|
try:
|
|
for line in sys.stdin:
|
|
url = line.strip()
|
|
if url and url.startswith('http'):
|
|
urls.append(url)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
console.print(f"\n[bold]{len(urls)} Videos zu verarbeiten[/bold]")
|
|
|
|
for i, url in enumerate(urls, 1):
|
|
console.print(f"\n[bold cyan]━━━ Video {i}/{len(urls)} ━━━[/bold cyan]")
|
|
transcriber.process_video(url, args.language, args.force)
|
|
|
|
elif args.url:
|
|
# Single Video
|
|
transcriber.process_video(args.url, args.language, args.force)
|
|
|
|
else:
|
|
# Interaktiver Modus
|
|
console.print("[bold cyan]🎬 Interaktiver Modus[/bold cyan]")
|
|
console.print(f"Model: [green]{args.model}[/green]")
|
|
console.print(f"Sprache: [green]{args.language}[/green]")
|
|
console.print(f"Ausgabe: [green]{args.output}/[/green]")
|
|
console.print("\nGebe YouTube URL ein (oder 'q' zum Beenden):\n")
|
|
|
|
while True:
|
|
try:
|
|
url = console.input("[bold cyan]URL ▶ [/bold cyan]").strip()
|
|
if url.lower() in ['q', 'quit', 'exit']:
|
|
break
|
|
if url.startswith('http'):
|
|
transcriber.process_video(url, args.language, args.force)
|
|
else:
|
|
console.print("[red]❌ Ungültige URL. Bitte YouTube URL eingeben.[/red]")
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
console.print("\n[bold green]👋 Auf Wiedersehen![/bold green]")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |