mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-15 16:39:39 +02:00
Move inactive projects out of active workspace: - bauntown (community website) - maerchenzauber (AI story generation) - memoro (voice memo app) - news (news aggregation) - nutriphi (nutrition tracking) - reader (reading app) - uload (URL shortener) - wisekeep (AI wisdom extraction) Update CLAUDE.md documentation: - Add presi to active projects - Document archived projects section - Update workspace configuration Archived apps can be re-activated by moving back to apps/ 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
603 lines
No EOL
21 KiB
Python
Executable file
603 lines
No EOL
21 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
YouTube Auto-Transcriber v3.0
|
|
Mit Playlist-Management und Themen-Ordnern
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import hashlib
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
import time
|
|
from typing import List, Dict, Tuple
|
|
import yt_dlp
|
|
import whisper
|
|
import warnings
|
|
|
|
from rich.console import Console
|
|
from rich.progress import (
|
|
Progress,
|
|
SpinnerColumn,
|
|
TextColumn,
|
|
BarColumn,
|
|
TaskProgressColumn,
|
|
TimeRemainingColumn,
|
|
TimeElapsedColumn,
|
|
MofNCompleteColumn
|
|
)
|
|
from rich.table import Table
|
|
from rich.panel import Panel
|
|
from rich.tree import Tree
|
|
from rich import print as rprint
|
|
|
|
warnings.filterwarnings("ignore", category=FutureWarning)
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
|
|
console = Console()
|
|
|
|
# ASCII Art Logo
|
|
LOGO = """
|
|
[bold cyan]╔═══════════════════════════════════════════════════════╗
|
|
║ ║
|
|
║ [bold white]🎥 YouTube Auto-Transcriber v3.0[/bold white] ║
|
|
║ [dim]Playlist Management & Batch Processing[/dim] ║
|
|
║ ║
|
|
╚═══════════════════════════════════════════════════════╝[/bold cyan]
|
|
"""
|
|
|
|
class PlaylistManager:
|
|
"""
|
|
Verwaltet Playlists und URL-Listen
|
|
"""
|
|
def __init__(self, playlists_dir="playlists"):
|
|
self.playlists_dir = Path(playlists_dir)
|
|
self.playlists_dir.mkdir(exist_ok=True)
|
|
|
|
# Erstelle Beispiel-Struktur wenn leer
|
|
self._create_example_structure()
|
|
|
|
def _create_example_structure(self):
|
|
"""Erstellt Beispiel-Ordnerstruktur"""
|
|
example_file = self.playlists_dir / "example_tech.txt"
|
|
if not example_file.exists() and not any(self.playlists_dir.glob("*.txt")):
|
|
with open(example_file, 'w') as f:
|
|
f.write("# Tech Videos - Beispiel Playlist\n")
|
|
f.write("# Zeilen mit # werden ignoriert\n")
|
|
f.write("# Eine URL pro Zeile:\n")
|
|
f.write("#\n")
|
|
f.write("# https://www.youtube.com/watch?v=VIDEO_ID\n")
|
|
|
|
def get_all_playlists(self) -> Dict[str, Path]:
|
|
"""Findet alle Playlist-Dateien"""
|
|
playlists = {}
|
|
|
|
# Suche .txt Dateien im Hauptordner
|
|
for file in self.playlists_dir.glob("*.txt"):
|
|
name = file.stem
|
|
playlists[name] = file
|
|
|
|
# Suche auch in Unterordnern
|
|
for folder in self.playlists_dir.iterdir():
|
|
if folder.is_dir():
|
|
for file in folder.glob("*.txt"):
|
|
name = f"{folder.name}/{file.stem}"
|
|
playlists[name] = file
|
|
|
|
return playlists
|
|
|
|
def read_playlist(self, playlist_path: Path) -> List[str]:
|
|
"""Liest URLs aus einer Playlist-Datei"""
|
|
urls = []
|
|
if not playlist_path.exists():
|
|
return urls
|
|
|
|
with open(playlist_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
# Ignoriere leere Zeilen und Kommentare
|
|
if line and not line.startswith('#'):
|
|
if 'youtube.com' in line or 'youtu.be' in line:
|
|
urls.append(line)
|
|
|
|
return urls
|
|
|
|
def display_playlists_tree(self):
|
|
"""Zeigt alle Playlists als Baum-Struktur"""
|
|
tree = Tree("[bold cyan]📁 Playlists[/bold cyan]")
|
|
|
|
# Hauptordner-Dateien
|
|
for file in sorted(self.playlists_dir.glob("*.txt")):
|
|
urls = self.read_playlist(file)
|
|
tree.add(f"📄 {file.stem} ({len(urls)} URLs)")
|
|
|
|
# Unterordner
|
|
for folder in sorted(self.playlists_dir.iterdir()):
|
|
if folder.is_dir():
|
|
branch = tree.add(f"📂 {folder.name}/")
|
|
for file in sorted(folder.glob("*.txt")):
|
|
urls = self.read_playlist(file)
|
|
branch.add(f"📄 {file.stem} ({len(urls)} URLs)")
|
|
|
|
console.print(tree)
|
|
return tree
|
|
|
|
|
|
class YouTubeTranscriber:
|
|
def __init__(self, model_size="base", output_dir="transcripts", cache_dir=".cache"):
|
|
"""
|
|
Initialisiert den Transcriber mit Rich UI
|
|
"""
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(exist_ok=True)
|
|
|
|
self.cache_dir = Path(cache_dir)
|
|
self.cache_dir.mkdir(exist_ok=True)
|
|
self.cache_file = self.cache_dir / "transcribed_videos.json"
|
|
|
|
self.temp_dir = Path("temp_audio")
|
|
self.temp_dir.mkdir(exist_ok=True)
|
|
|
|
# Lade Cache
|
|
self.cache = self.load_cache()
|
|
|
|
# Lade Whisper Model mit Progress
|
|
with console.status(f"[bold green]⏳ Lade Whisper Model '{model_size}'...", spinner="dots"):
|
|
self.model = whisper.load_model(model_size)
|
|
|
|
console.print(f"[bold green]✅ Model geladen: {model_size}[/bold green]")
|
|
|
|
# Model-Geschwindigkeiten
|
|
self.model_speeds = {
|
|
'tiny': 10,
|
|
'base': 7,
|
|
'small': 4,
|
|
'medium': 2,
|
|
'large': 1
|
|
}
|
|
self.model_size = model_size
|
|
self.speed_factor = self.model_speeds.get(model_size, 3)
|
|
|
|
self.ydl_opts = {
|
|
'format': 'bestaudio/best',
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}],
|
|
'outtmpl': str(self.temp_dir / '%(title)s.%(ext)s'),
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'progress_hooks': [self._download_progress_hook],
|
|
}
|
|
|
|
self.current_progress = None
|
|
self.download_task = None
|
|
|
|
def load_cache(self):
|
|
"""Lädt den Cache bereits transkribierter Videos"""
|
|
if self.cache_file.exists():
|
|
with open(self.cache_file, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_cache(self):
|
|
"""Speichert den Cache"""
|
|
with open(self.cache_file, 'w') as f:
|
|
json.dump(self.cache, f, indent=2)
|
|
|
|
def get_video_hash(self, url):
|
|
"""Erstellt einen Hash für die Video-URL"""
|
|
return hashlib.md5(url.encode()).hexdigest()
|
|
|
|
def is_cached(self, url):
|
|
"""Prüft ob Video bereits transkribiert wurde"""
|
|
video_hash = self.get_video_hash(url)
|
|
if video_hash in self.cache:
|
|
cached_info = self.cache[video_hash]
|
|
output_file = Path(cached_info['output_file'])
|
|
if output_file.exists():
|
|
return cached_info
|
|
return None
|
|
|
|
def _download_progress_hook(self, d):
|
|
"""Progress Hook für yt-dlp"""
|
|
if d['status'] == 'downloading' and self.download_task:
|
|
if d.get('total_bytes'):
|
|
downloaded = d.get('downloaded_bytes', 0)
|
|
total = d['total_bytes']
|
|
self.current_progress.update(self.download_task, completed=downloaded, total=total)
|
|
|
|
def get_video_info(self, url):
|
|
"""Holt Video-Informationen VOR dem Download"""
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'extract_flat': False,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
try:
|
|
info = ydl.extract_info(url, download=False)
|
|
return {
|
|
'title': info.get('title', 'Unbekannt'),
|
|
'channel': info.get('uploader', 'Unbekannt'),
|
|
'duration': info.get('duration', 0),
|
|
'url': url
|
|
}
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Fehler beim Abrufen der Video-Info: {e}[/red]")
|
|
return None
|
|
|
|
def download_audio(self, url, progress=None):
|
|
"""Lädt Audio mit Progress Bar herunter"""
|
|
self.current_progress = progress
|
|
if progress:
|
|
self.download_task = progress.add_task(
|
|
"[cyan]📥 Download...",
|
|
total=None
|
|
)
|
|
|
|
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
|
|
try:
|
|
info = ydl.extract_info(url, download=True)
|
|
title = info.get('title', 'unknown')
|
|
channel = info.get('uploader', 'unknown')
|
|
duration = info.get('duration', 0)
|
|
|
|
# Finde die heruntergeladene Audio-Datei
|
|
audio_file = None
|
|
for file in self.temp_dir.glob("*.mp3"):
|
|
if file.stat().st_mtime > (datetime.now().timestamp() - 60):
|
|
audio_file = file
|
|
break
|
|
|
|
if not audio_file:
|
|
raise Exception("Audio-Datei nicht gefunden")
|
|
|
|
if progress and self.download_task:
|
|
progress.update(self.download_task, completed=100, total=100)
|
|
|
|
return audio_file, {
|
|
'title': title,
|
|
'channel': channel,
|
|
'duration': duration,
|
|
'url': url
|
|
}
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Download-Fehler: {e}[/red]")
|
|
return None, None
|
|
|
|
def transcribe_audio(self, audio_path, language="de", progress=None):
|
|
"""Transkribiert Audio-Datei"""
|
|
if progress:
|
|
task = progress.add_task(
|
|
f"[green]🎙️ Transkribiere...",
|
|
total=100
|
|
)
|
|
|
|
try:
|
|
result = self.model.transcribe(
|
|
str(audio_path),
|
|
language=language,
|
|
verbose=False,
|
|
fp16=False
|
|
)
|
|
|
|
if progress:
|
|
progress.update(task, completed=100)
|
|
|
|
return result['text'], result.get('language', 'unbekannt')
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Transkriptions-Fehler: {e}[/red]")
|
|
return None, None
|
|
|
|
def save_transcript(self, text, video_info, playlist_name=None):
|
|
"""Speichert Transkript mit optionalem Playlist-Ordner"""
|
|
# Basis-Ordner
|
|
base_dir = self.output_dir
|
|
|
|
# Wenn Playlist, erstelle Unterordner
|
|
if playlist_name:
|
|
base_dir = base_dir / playlist_name.replace('/', '_')
|
|
base_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Kanal-Ordner
|
|
channel_dir = base_dir / video_info['channel'].replace('/', '_')
|
|
channel_dir.mkdir(exist_ok=True)
|
|
|
|
# Dateiname
|
|
safe_title = "".join(c for c in video_info['title'] if c.isalnum() or c in (' ', '-', '_'))[:100]
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{safe_title}_{timestamp}.txt"
|
|
|
|
filepath = channel_dir / filename
|
|
|
|
# Schreibe Transkript
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(f"YouTube Transkription\n")
|
|
f.write("=" * 50 + "\n\n")
|
|
f.write(f"Titel: {video_info['title']}\n")
|
|
f.write(f"Kanal: {video_info['channel']}\n")
|
|
f.write(f"URL: {video_info['url']}\n")
|
|
if playlist_name:
|
|
f.write(f"Playlist: {playlist_name}\n")
|
|
f.write(f"Transkribiert am: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}\n")
|
|
f.write(f"Whisper Model: {self.model_size}\n")
|
|
f.write("\n" + "=" * 50 + "\n\n")
|
|
f.write("TRANSKRIPTION:\n\n")
|
|
f.write(text)
|
|
|
|
return filepath
|
|
|
|
def cleanup_temp_files(self):
|
|
"""Löscht temporäre Audio-Dateien"""
|
|
for file in self.temp_dir.glob("*.mp3"):
|
|
try:
|
|
file.unlink()
|
|
except:
|
|
pass
|
|
|
|
def process_video(self, url, language="de", playlist_name=None, progress=None):
|
|
"""Verarbeitet ein einzelnes Video"""
|
|
# Prüfe Cache
|
|
cached = self.is_cached(url)
|
|
if cached:
|
|
return cached['output_file'], True # True = war gecached
|
|
|
|
# Hole Video-Info
|
|
video_info = self.get_video_info(url)
|
|
if not video_info:
|
|
return None, False
|
|
|
|
# Download Audio
|
|
audio_path, download_info = self.download_audio(url, progress)
|
|
if not audio_path:
|
|
return None, False
|
|
|
|
# Transkribiere
|
|
transcript, detected_lang = self.transcribe_audio(audio_path, language, progress)
|
|
if not transcript:
|
|
return None, False
|
|
|
|
# Speichern
|
|
output_path = self.save_transcript(transcript, download_info, playlist_name)
|
|
|
|
# Cache aktualisieren
|
|
video_hash = self.get_video_hash(url)
|
|
self.cache[video_hash] = {
|
|
'url': url,
|
|
'title': download_info['title'],
|
|
'output_file': str(output_path),
|
|
'transcribed_at': datetime.now().isoformat(),
|
|
'model': self.model_size,
|
|
'playlist': playlist_name
|
|
}
|
|
self.save_cache()
|
|
|
|
# Aufräumen
|
|
self.cleanup_temp_files()
|
|
|
|
return output_path, False # False = neu transkribiert
|
|
|
|
def process_playlist(self, playlist_name: str, urls: List[str], language="de"):
|
|
"""
|
|
Verarbeitet eine komplette Playlist
|
|
"""
|
|
console.rule(f"[bold cyan]📋 Playlist: {playlist_name}[/bold cyan]")
|
|
|
|
# Filtere bereits transkribierte Videos
|
|
new_urls = []
|
|
cached_count = 0
|
|
|
|
for url in urls:
|
|
if self.is_cached(url):
|
|
cached_count += 1
|
|
else:
|
|
new_urls.append(url)
|
|
|
|
# Status-Übersicht
|
|
table = Table(show_header=False, box=None)
|
|
table.add_column("Info", style="cyan")
|
|
table.add_column("Wert", style="white")
|
|
|
|
table.add_row("📊 Gesamt Videos:", str(len(urls)))
|
|
table.add_row("✅ Bereits transkribiert:", str(cached_count))
|
|
table.add_row("🆕 Neu zu transkribieren:", str(len(new_urls)))
|
|
|
|
console.print(Panel(table, title="Playlist Status", border_style="cyan"))
|
|
|
|
if not new_urls:
|
|
console.print("[green]✅ Alle Videos bereits transkribiert![/green]")
|
|
return
|
|
|
|
# Verarbeite neue Videos
|
|
success_count = 0
|
|
error_count = 0
|
|
|
|
with Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
MofNCompleteColumn(),
|
|
TimeElapsedColumn(),
|
|
console=console
|
|
) as progress:
|
|
|
|
playlist_task = progress.add_task(
|
|
f"[cyan]Verarbeite {playlist_name}...",
|
|
total=len(new_urls)
|
|
)
|
|
|
|
for i, url in enumerate(new_urls, 1):
|
|
progress.update(
|
|
playlist_task,
|
|
description=f"[cyan]Video {i}/{len(new_urls)}..."
|
|
)
|
|
|
|
# Verarbeite Video
|
|
output_path, was_cached = self.process_video(
|
|
url,
|
|
language,
|
|
playlist_name,
|
|
progress
|
|
)
|
|
|
|
if output_path:
|
|
success_count += 1
|
|
console.print(f" ✅ {Path(output_path).name}")
|
|
else:
|
|
error_count += 1
|
|
console.print(f" ❌ Fehler bei: {url}")
|
|
|
|
progress.update(playlist_task, advance=1)
|
|
|
|
# Zusammenfassung
|
|
console.print("\n" + "=" * 50)
|
|
console.print(f"[bold green]✅ Erfolgreich: {success_count}[/bold green]")
|
|
if error_count > 0:
|
|
console.print(f"[bold red]❌ Fehler: {error_count}[/bold red]")
|
|
console.print(f"[bold cyan]📁 Gespeichert in: {self.output_dir}/{playlist_name}/[/bold cyan]")
|
|
|
|
|
|
def process_all_playlists(transcriber, playlist_manager, language="de"):
|
|
"""Verarbeitet alle Playlists"""
|
|
playlists = playlist_manager.get_all_playlists()
|
|
|
|
if not playlists:
|
|
console.print("[yellow]⚠️ Keine Playlists gefunden![/yellow]")
|
|
console.print(f"Erstelle .txt Dateien in: {playlist_manager.playlists_dir}/")
|
|
return
|
|
|
|
console.print(f"\n[bold cyan]🔍 Gefundene Playlists:[/bold cyan]")
|
|
playlist_manager.display_playlists_tree()
|
|
|
|
# Statistiken sammeln
|
|
total_urls = 0
|
|
total_new = 0
|
|
|
|
for name, path in playlists.items():
|
|
urls = playlist_manager.read_playlist(path)
|
|
new_count = sum(1 for url in urls if not transcriber.is_cached(url))
|
|
total_urls += len(urls)
|
|
total_new += new_count
|
|
|
|
console.print(f"\n[bold]📊 Gesamt: {total_urls} Videos, {total_new} neu zu transkribieren[/bold]")
|
|
|
|
# Verarbeite jede Playlist
|
|
for name, path in playlists.items():
|
|
urls = playlist_manager.read_playlist(path)
|
|
if urls:
|
|
console.print(f"\n" + "=" * 60)
|
|
transcriber.process_playlist(name, urls, language)
|
|
|
|
console.print("\n[bold green]🎉 Alle Playlists verarbeitet![/bold green]")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='YouTube Transcriber v3.0 - Playlist Management'
|
|
)
|
|
parser.add_argument(
|
|
'command',
|
|
nargs='?',
|
|
choices=['scan', 'list', 'process'],
|
|
default='scan',
|
|
help='Befehl: scan (alle Playlists), list (zeige Playlists), process (einzelne URL)'
|
|
)
|
|
parser.add_argument(
|
|
'url',
|
|
nargs='?',
|
|
help='YouTube URL (nur für process)'
|
|
)
|
|
parser.add_argument(
|
|
'--playlist',
|
|
help='Spezifische Playlist verarbeiten'
|
|
)
|
|
parser.add_argument(
|
|
'--model',
|
|
default='base',
|
|
choices=['tiny', 'base', 'small', 'medium', 'large'],
|
|
help='Whisper Model (default: base)'
|
|
)
|
|
parser.add_argument(
|
|
'--language',
|
|
default='de',
|
|
help='Sprache (default: de)'
|
|
)
|
|
parser.add_argument(
|
|
'--playlists-dir',
|
|
default='playlists',
|
|
help='Ordner mit Playlist-Dateien (default: playlists)'
|
|
)
|
|
parser.add_argument(
|
|
'--output',
|
|
default='transcripts',
|
|
help='Ausgabe-Ordner (default: transcripts)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Zeige Logo
|
|
console.print(LOGO)
|
|
|
|
# Initialisiere Manager
|
|
playlist_manager = PlaylistManager(args.playlists_dir)
|
|
transcriber = YouTubeTranscriber(
|
|
model_size=args.model,
|
|
output_dir=args.output
|
|
)
|
|
|
|
if args.command == 'list':
|
|
# Zeige nur Playlists
|
|
playlists = playlist_manager.get_all_playlists()
|
|
if playlists:
|
|
console.print("[bold cyan]📁 Verfügbare Playlists:[/bold cyan]\n")
|
|
playlist_manager.display_playlists_tree()
|
|
|
|
# Zeige Details
|
|
console.print("\n[bold]Details:[/bold]")
|
|
for name, path in playlists.items():
|
|
urls = playlist_manager.read_playlist(path)
|
|
new_count = sum(1 for url in urls if not transcriber.is_cached(url))
|
|
console.print(f" • {name}: {len(urls)} URLs ({new_count} neu)")
|
|
else:
|
|
console.print("[yellow]Keine Playlists gefunden![/yellow]")
|
|
console.print(f"Erstelle .txt Dateien in: {args.playlists_dir}/")
|
|
|
|
elif args.command == 'process':
|
|
# Verarbeite einzelne URL
|
|
if args.url:
|
|
output, _ = transcriber.process_video(args.url, args.language)
|
|
if output:
|
|
console.print(f"[green]✅ Gespeichert: {output}[/green]")
|
|
else:
|
|
console.print("[red]❌ Bitte URL angeben für 'process' Befehl[/red]")
|
|
|
|
elif args.command == 'scan':
|
|
# Verarbeite Playlists
|
|
if args.playlist:
|
|
# Spezifische Playlist
|
|
playlists = playlist_manager.get_all_playlists()
|
|
if args.playlist in playlists:
|
|
path = playlists[args.playlist]
|
|
urls = playlist_manager.read_playlist(path)
|
|
transcriber.process_playlist(args.playlist, urls, args.language)
|
|
else:
|
|
console.print(f"[red]❌ Playlist '{args.playlist}' nicht gefunden![/red]")
|
|
console.print("Verfügbare Playlists:")
|
|
for name in playlists.keys():
|
|
console.print(f" • {name}")
|
|
else:
|
|
# Alle Playlists
|
|
process_all_playlists(transcriber, playlist_manager, args.language)
|
|
|
|
console.print("\n[bold green]✨ Fertig![/bold green]")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |