managarten/apps-archived/memoro/apps/landing/docs/seo/airbyte-complete-guide.md
Till-JS 61d181fbc2 chore: archive inactive projects to apps-archived/
Move inactive projects out of active workspace:
- bauntown (community website)
- maerchenzauber (AI story generation)
- memoro (voice memo app)
- news (news aggregation)
- nutriphi (nutrition tracking)
- reader (reading app)
- uload (URL shortener)
- wisekeep (AI wisdom extraction)

Update CLAUDE.md documentation:
- Add presi to active projects
- Document archived projects section
- Update workspace configuration

Archived apps can be re-activated by moving back to apps/

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 07:03:59 +01:00

18 KiB

Airbyte - Der ultimative Guide für Memoro

Was ist Airbyte?

Airbyte ist eine Open-Source ELT (Extract, Load, Transform) Platform, die Daten von 350+ Quellen in deine Datenbank synchronisiert. Stell dir vor: Ein Tool, das automatisch alle deine Business-Daten sammelt und zentral speichert.

[Datenquellen]  →  [Airbyte]  →  [Deine Datenbank]  →  [Dashboard]
   APIs             ELT Tool      PostgreSQL           Metabase

Warum Airbyte perfekt für Memoro ist

Deine Datenquellen sind bereits unterstützt:

Datenquelle Airbyte Connector Status Sync-Frequenz
Google Search Console Offiziell Stable Täglich
Google Play Console Offiziell Beta Täglich
App Store Connect Community Beta Täglich
PostHog Offiziell Stable Stündlich
RevenueCat ⚠️ Custom API Machbar Real-time
Umami Analytics ⚠️ Custom API Machbar Stündlich
PostgreSQL (Backup) Offiziell Stable Real-time

Kernfunktionen

  1. Incremental Sync - Lädt nur neue Daten (spart Zeit & Ressourcen)
  2. Schema Management - Erstellt automatisch Tabellen
  3. Error Handling - Automatische Wiederholungen bei Fehlern
  4. Monitoring - Dashboard zeigt Status aller Syncs
  5. Transformations - Daten können beim Import bereinigt werden

Wie funktioniert Airbyte?

Architektur

┌─────────────────────────────────────────────────┐
│                  AIRBYTE CORE                    │
├───────────────────────────────────────────────────┤
│  Web UI  │  API  │  Scheduler  │  Worker Pool   │
└───────────────────────────────────────────────────┘
         ↓                              ↑
┌──────────────┐              ┌──────────────────┐
│  Connectors  │              │   Destinations   │
├──────────────┤              ├──────────────────┤
│ • APIs       │              │ • PostgreSQL     │
│ • Databases  │              │ • BigQuery       │
│ • Files      │              │ • Snowflake      │
│ • Webhooks   │              │ • S3             │
└──────────────┘              └──────────────────┘

Sync-Prozess

  1. Extract: Connector holt Daten von API
  2. Normalize: Daten werden in einheitliches Format gebracht
  3. Load: Daten werden in Ziel-DB geschrieben
  4. Log: Alle Aktivitäten werden protokolliert

Installation & Setup

Option 1: Docker (Empfohlen für Entwicklung)

# 1. Airbyte herunterladen
git clone https://github.com/airbytehq/airbyte.git
cd airbyte

# 2. Starten (dauert ~5 Minuten beim ersten Mal)
./run-ab-platform.sh

# 3. Öffne Browser
# http://localhost:8000
# Username: airbyte
# Password: password

Option 2: Docker Compose (Production)

# docker-compose.yml
version: "3.8"

services:
  # PostgreSQL für Airbyte Metadata
  db:
    image: postgres:13-alpine
    restart: unless-stopped
    environment:
      POSTGRES_DB: airbyte
      POSTGRES_USER: airbyte
      POSTGRES_PASSWORD: ${DATABASE_PASSWORD}
    volumes:
      - db_data:/var/lib/postgresql/data

  # Airbyte Server
  server:
    image: airbyte/server:0.50.0
    restart: unless-stopped
    environment:
      DATABASE_URL: postgresql://airbyte:${DATABASE_PASSWORD}@db:5432/airbyte
      WORKSPACE_ROOT: /tmp/workspace
      LOCAL_ROOT: /tmp/airbyte_local
      TRACKING_STRATEGY: segment
    ports:
      - "8001:8001"
    volumes:
      - workspace:/tmp/workspace
      - local_data:/tmp/airbyte_local

  # Airbyte WebApp
  webapp:
    image: airbyte/webapp:0.50.0
    restart: unless-stopped
    ports:
      - "8000:80"
    environment:
      INTERNAL_API_HOST: server:8001

  # Airbyte Scheduler
  scheduler:
    image: airbyte/scheduler:0.50.0
    restart: unless-stopped
    environment:
      DATABASE_URL: postgresql://airbyte:${DATABASE_PASSWORD}@db:5432/airbyte
      WORKSPACE_ROOT: /tmp/workspace
    volumes:
      - workspace:/tmp/workspace

  # Workers für Sync Jobs
  worker:
    image: airbyte/worker:0.50.0
    restart: unless-stopped
    environment:
      DATABASE_URL: postgresql://airbyte:${DATABASE_PASSWORD}@db:5432/airbyte
      WORKSPACE_ROOT: /tmp/workspace
      LOCAL_ROOT: /tmp/airbyte_local
    volumes:
      - workspace:/tmp/workspace
      - local_data:/tmp/airbyte_local

  # Deine Analytics Datenbank (Ziel)
  analytics-db:
    image: postgres:14-alpine
    restart: unless-stopped
    environment:
      POSTGRES_DB: memoro_analytics
      POSTGRES_USER: memoro
      POSTGRES_PASSWORD: ${ANALYTICS_DB_PASSWORD}
    ports:
      - "5432:5432"
    volumes:
      - analytics_data:/var/lib/postgresql/data

volumes:
  db_data:
  workspace:
  local_data:
  analytics_data:

Option 3: Airbyte Cloud (Managed)

# Kostenlos bis 1GB/Monat
# Registriere dich: https://cloud.airbyte.com
# Keine Installation nötig!

Schritt-für-Schritt: Erste Datenquelle verbinden

Beispiel: Google Search Console → PostgreSQL

1. Source einrichten (Google Search Console)

// In Airbyte UI:
// Sources → New Source → Search Console

{
  "site_urls": ["https://memoro.ai"],
  "start_date": "2024-01-01",
  "authorization": {
    "auth_type": "Service",
    "service_account_info": "credentials.json content"
  },
  "custom_reports": [
    {
      "name": "keyword_performance",
      "dimensions": ["query", "page", "country", "device"],
      "metrics": ["clicks", "impressions", "ctr", "position"]
    }
  ]
}

2. Destination einrichten (PostgreSQL)

{
  "host": "localhost",
  "port": 5432,
  "database": "memoro_analytics",
  "username": "memoro",
  "password": "your_password",
  "schema": "search_console",  // Namespace für diese Daten
  "ssl": true
}

3. Connection erstellen

{
  "name": "Search Console to PostgreSQL",
  "source": "google-search-console",
  "destination": "postgres",
  "sync_frequency": "24 hours",
  "sync_mode": "incremental",  // Nur neue Daten
  "streams": [
    {
      "name": "search_analytics",
      "sync_mode": "incremental",
      "cursor_field": "date"
    }
  ]
}

Praktisches Beispiel: Alle Memoro-Daten sammeln

1. Setup Script

# setup_airbyte_connections.py
import requests
import json
from typing import Dict, Any

class AirbyteSetup:
    def __init__(self, host="http://localhost:8000"):
        self.base_url = f"{host}/api/v1"
        self.headers = {"Content-Type": "application/json"}
    
    def create_source(self, config: Dict[str, Any]):
        """Erstellt eine neue Datenquelle"""
        response = requests.post(
            f"{self.base_url}/sources/create",
            json=config,
            headers=self.headers
        )
        return response.json()
    
    def create_destination(self, config: Dict[str, Any]):
        """Erstellt ein neues Ziel"""
        response = requests.post(
            f"{self.base_url}/destinations/create",
            json=config,
            headers=self.headers
        )
        return response.json()
    
    def create_connection(self, source_id: str, destination_id: str, config: Dict[str, Any]):
        """Verbindet Source mit Destination"""
        config.update({
            "sourceId": source_id,
            "destinationId": destination_id
        })
        response = requests.post(
            f"{self.base_url}/connections/create",
            json=config,
            headers=self.headers
        )
        return response.json()

def setup_memoro_analytics():
    airbyte = AirbyteSetup()
    
    # 1. PostgreSQL Destination
    print("📦 Creating PostgreSQL destination...")
    destination = airbyte.create_destination({
        "name": "Memoro Analytics DB",
        "destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",  # PostgreSQL
        "connectionConfiguration": {
            "host": "localhost",
            "port": 5432,
            "database": "memoro_analytics",
            "username": "memoro",
            "password": "secure_password",
            "schema": "public"
        }
    })
    dest_id = destination["destinationId"]
    
    # 2. Google Search Console Source
    print("🔍 Setting up Search Console...")
    search_console = airbyte.create_source({
        "name": "Google Search Console",
        "sourceDefinitionId": "eb4c9e00-db83-4d63-a386-39cfa91012a8",
        "connectionConfiguration": {
            "site_urls": ["https://memoro.ai"],
            "start_date": "2024-01-01",
            "authorization": {
                "auth_type": "Service",
                "service_account_info": json.dumps(load_credentials())
            }
        }
    })
    
    # 3. PostHog Source
    print("📊 Setting up PostHog...")
    posthog = airbyte.create_source({
        "name": "PostHog Analytics",
        "sourceDefinitionId": "af6d50ee-dddf-4126-a8ee-7faee990774f",
        "connectionConfiguration": {
            "api_key": "phc_xxxxxxxxxxxxx",
            "base_url": "https://app.posthog.com",
            "start_date": "2024-01-01T00:00:00Z"
        }
    })
    
    # 4. Create Connections
    print("🔗 Creating connections...")
    
    # Search Console → PostgreSQL
    airbyte.create_connection(
        search_console["sourceId"],
        dest_id,
        {
            "name": "Search Console → Analytics DB",
            "schedule": {
                "units": 24,
                "timeUnit": "hours"
            },
            "syncCatalog": {
                "streams": [{
                    "stream": {
                        "name": "search_analytics",
                        "jsonSchema": {},
                        "supportedSyncModes": ["incremental"]
                    },
                    "config": {
                        "syncMode": "incremental",
                        "cursorField": ["date"],
                        "destinationSyncMode": "append_dedup",
                        "primaryKey": [["date"], ["query"], ["page"]]
                    }
                }]
            }
        }
    )
    
    print("✅ Setup complete!")

if __name__ == "__main__":
    setup_memoro_analytics()

2. Custom Connector für Umami/RevenueCat

# umami_connector_spec.yaml
documentationUrl: https://umami.is/docs/api
connectionSpecification:
  $schema: http://json-schema.org/draft-07/schema#
  title: Umami Analytics Spec
  type: object
  required:
    - api_url
    - api_key
    - website_id
  properties:
    api_url:
      type: string
      description: Umami API URL
      default: https://analytics.memoro.ai/api
    api_key:
      type: string
      description: API Key
      airbyte_secret: true
    website_id:
      type: string
      description: Website ID to track
    start_date:
      type: string
      format: date
      description: Start date for data sync
# umami_source.py
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
import requests

class UmamiStream(HttpStream):
    url_base = "https://analytics.memoro.ai/api/"
    
    def path(self, **kwargs) -> str:
        return f"websites/{self.website_id}/stats"
    
    def request_params(self, **kwargs) -> Dict[str, Any]:
        return {
            "start_date": self.start_date,
            "end_date": "today",
            "unit": "day"
        }
    
    def parse_response(self, response: requests.Response, **kwargs):
        data = response.json()
        for record in data["data"]:
            yield {
                "date": record["date"],
                "visitors": record["visitors"],
                "pageviews": record["pageviews"],
                "sessions": record["sessions"],
                "bounces": record["bounces"],
                "duration": record["duration"]
            }

class SourceUmami(AbstractSource):
    def check_connection(self, logger, config) -> bool:
        """Test API connection"""
        try:
            response = requests.get(
                f"{config['api_url']}/websites",
                headers={"Authorization": f"Bearer {config['api_key']}"}
            )
            return response.status_code == 200
        except Exception:
            return False
    
    def streams(self, config: Dict[str, Any]) -> List[Stream]:
        return [
            UmamiStream(
                authenticator=TokenAuthenticator(token=config["api_key"]),
                website_id=config["website_id"],
                start_date=config["start_date"]
            )
        ]

Daten-Transformation mit dbt

Nach dem Import kannst du die Daten mit dbt transformieren:

-- models/marts/unified_metrics.sql
WITH search_console AS (
  SELECT 
    date,
    'search_console' as source,
    SUM(clicks) as value,
    'clicks' as metric
  FROM {{ source('search_console', 'search_analytics') }}
  GROUP BY date
),

posthog_events AS (
  SELECT 
    DATE(timestamp) as date,
    'posthog' as source,
    COUNT(*) as value,
    'events' as metric  
  FROM {{ source('posthog', 'events') }}
  GROUP BY DATE(timestamp)
),

umami_stats AS (
  SELECT 
    date,
    'umami' as source,
    visitors as value,
    'visitors' as metric
  FROM {{ source('umami', 'daily_stats') }}
)

SELECT * FROM search_console
UNION ALL
SELECT * FROM posthog_events  
UNION ALL
SELECT * FROM umami_stats

Monitoring & Alerting

Airbyte Monitoring API

# monitor.py
import requests
from datetime import datetime, timedelta

def check_sync_status():
    """Prüft Status aller Sync Jobs"""
    response = requests.get("http://localhost:8000/api/v1/jobs/list")
    jobs = response.json()["jobs"]
    
    failed_jobs = []
    stale_syncs = []
    
    for job in jobs:
        # Check for failures
        if job["status"] == "failed":
            failed_jobs.append({
                "connection": job["connectionName"],
                "error": job["failureReason"],
                "time": job["createdAt"]
            })
        
        # Check for stale data (no sync in 48h)
        last_sync = datetime.fromisoformat(job["createdAt"])
        if datetime.now() - last_sync > timedelta(hours=48):
            stale_syncs.append({
                "connection": job["connectionName"],
                "last_sync": last_sync
            })
    
    if failed_jobs or stale_syncs:
        send_alert(failed_jobs, stale_syncs)

def send_alert(failed_jobs, stale_syncs):
    """Sendet Slack Alert"""
    webhook_url = "https://hooks.slack.com/services/xxx"
    
    message = "⚠️ *Airbyte Alert*\n"
    
    if failed_jobs:
        message += "\n❌ *Failed Syncs:*\n"
        for job in failed_jobs:
            message += f"• {job['connection']}: {job['error']}\n"
    
    if stale_syncs:
        message += "\n⏰ *Stale Data:*\n"
        for sync in stale_syncs:
            message += f"• {sync['connection']}: Last sync {sync['last_sync']}\n"
    
    requests.post(webhook_url, json={"text": message})

# Run every hour
if __name__ == "__main__":
    check_sync_status()

Kosten & Performance

Self-Hosted Kosten

  • Server: ~20-50€/Monat (4GB RAM minimum)
  • Storage: ~0.10€/GB/Monat
  • Beispiel: 10GB Daten = ~30€/Monat total

Airbyte Cloud Preise

  • Free: 1GB/Monat
  • Starter: $100/Monat für 10GB
  • Pro: Custom Pricing

Performance-Tipps

  1. Incremental Sync verwenden wo möglich
  2. Sync-Frequenz anpassen (nicht alles muss stündlich sein)
  3. Deduplizierung in Destination aktivieren
  4. Monitoring für Failed Syncs einrichten

Troubleshooting

Häufige Probleme

Problem: "Connection refused"

# Docker läuft nicht
docker ps
# Wenn leer:
./run-ab-platform.sh

Problem: "Out of Memory"

# docker-compose.yml anpassen
services:
  worker:
    mem_limit: 4g
    environment:
      JAVA_OPTS: "-Xmx3g"

Problem: "Sync dauert ewig"

-- Check active syncs
SELECT * FROM airbyte_internal.jobs 
WHERE status = 'running' 
ORDER BY created_at DESC;

-- Cancel stuck job
UPDATE airbyte_internal.jobs 
SET status = 'cancelled' 
WHERE id = 'stuck-job-id';

Alternative zu Airbyte: Meltano

Falls Airbyte zu komplex ist, schau dir Meltano an:

# Installation
pip install meltano

# Project erstellen
meltano init memoro-analytics

# Google Search Console Tap hinzufügen
meltano add extractor tap-google-search-console
meltano add loader target-postgres

# Konfigurieren & Ausführen
meltano config tap-google-search-console set ...
meltano run tap-google-search-console target-postgres

Zusammenfassung

Airbyte ist perfekt für Memoro weil:

  • Alle deine Datenquellen werden unterstützt
  • Open Source = keine Vendor Lock-in
  • Einfaches UI für nicht-technische Team-Mitglieder
  • Skaliert mit deinem Wachstum
  • Active Community & Support

Nächste Schritte:

  1. Airbyte lokal installieren (30 Min)
  2. Erste Datenquelle verbinden (15 Min)
  3. PostgreSQL als Destination einrichten (10 Min)
  4. Metabase für Visualisierung installieren (20 Min)
  5. Ersten Dashboard erstellen (30 Min)

= In 2 Stunden hast du ein funktionierendes Analytics System!