From ed76f53b000e84554c11a4a5e9846bb392c355d1 Mon Sep 17 00:00:00 2001 From: Till JS Date: Fri, 10 Apr 2026 22:28:57 +0200 Subject: [PATCH] =?UTF-8?q?feat(sync):=20Phase=202=20=E2=80=94=20server-si?= =?UTF-8?q?de=20billing=20gate,=20cron=20charging,=20email=20notifications?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server-side gating (mana-sync Go): - New billing.Checker with 5-minute cache per user - Middleware wraps POST/GET /sync/{appId} endpoints - Returns 402 Payment Required when sync subscription inactive - Fail-open: if mana-credits is unreachable, sync is allowed - Config: MANA_CREDITS_URL + MANA_SERVICE_KEY env vars Recurring charge cron (mana-credits): - Hourly setInterval checks for due sync subscriptions - Calls chargeRecurring() which debits credits and advances nextChargeAt - On insufficient credits: pauses subscription, sends email via mana-notify Email notifications: - Sends "Cloud Sync pausiert" email via mana-notify when subscription paused - Uses POST /api/v1/notifications/send with X-Service-Key auth Client-side 402 handling: - sync.ts detects 402 from push/pull, fires onBillingRequired callback - Layout wires callback to reload syncBilling store → shows pause banner Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/mana/apps/web/src/lib/data/sync.ts | 12 ++ .../apps/web/src/routes/(app)/+layout.svelte | 4 + services/mana-credits/src/index.ts | 16 +++ .../mana-credits/src/services/sync-billing.ts | 30 ++++- services/mana-sync/cmd/server/main.go | 13 +- services/mana-sync/internal/billing/check.go | 127 ++++++++++++++++++ services/mana-sync/internal/config/config.go | 20 +-- 7 files changed, 209 insertions(+), 13 deletions(-) create mode 100644 services/mana-sync/internal/billing/check.go diff --git a/apps/mana/apps/web/src/lib/data/sync.ts b/apps/mana/apps/web/src/lib/data/sync.ts index 7e4c1a23d..b6a1a60ed 100644 --- a/apps/mana/apps/web/src/lib/data/sync.ts +++ b/apps/mana/apps/web/src/lib/data/sync.ts @@ -539,6 +539,7 @@ export function createUnifiedSync( let status: SyncStatus = 'idle'; let online = typeof navigator !== 'undefined' ? navigator.onLine : true; let _statusListeners: Array<(s: SyncStatus) => void> = []; + let _billingRequiredCallback: (() => void) | null = null; const sseAbortControllers = new Map(); // ─── Lifecycle ────────────────────────────────────────── @@ -714,6 +715,10 @@ export function createUnifiedSync( `push[${appId}]` ); + if (res.status === 402) { + _billingRequiredCallback?.(); + return; + } if (!res.ok) throw new Error(`Push failed: ${res.status}`); const data = await res.json(); @@ -813,6 +818,10 @@ export function createUnifiedSync( `pull[${appId}/${syncName}]` ); + if (res.status === 402) { + _billingRequiredCallback?.(); + return; + } if (!res.ok) break; const data = await res.json(); @@ -1089,6 +1098,9 @@ export function createUnifiedSync( _statusListeners = _statusListeners.filter((l) => l !== listener); }; }, + onBillingRequired(callback: () => void) { + _billingRequiredCallback = callback; + }, getChannel: (appId: string) => channels.get(appId), pushNow: push, pullNow: pull, diff --git a/apps/mana/apps/web/src/routes/(app)/+layout.svelte b/apps/mana/apps/web/src/routes/(app)/+layout.svelte index 917edf4b6..454e05ac8 100644 --- a/apps/mana/apps/web/src/routes/(app)/+layout.svelte +++ b/apps/mana/apps/web/src/routes/(app)/+layout.svelte @@ -471,6 +471,10 @@ // Update pending count when sync status changes await refreshPendingCount(); }); + unifiedSync.onBillingRequired(() => { + // Server returned 402 — sync subscription expired or paused + syncBilling.load(); + }); unifiedSync.startAll(); // Seed the badge count on mount: onStatusChange only fires on // transitions, so without this the badge stays at its last known diff --git a/services/mana-credits/src/index.ts b/services/mana-credits/src/index.ts index 17cb06e38..234bace45 100644 --- a/services/mana-credits/src/index.ts +++ b/services/mana-credits/src/index.ts @@ -74,6 +74,22 @@ app.route( createWebhookRoutes(stripeService, creditsService, config.stripe.webhookSecret) ); +// ─── Cron: Daily sync billing charge ──────────────────────── + +const CHARGE_INTERVAL_MS = 60 * 60 * 1000; // Check every hour +setInterval(async () => { + try { + const result = await syncBillingService.chargeRecurring(); + if (result.total > 0) { + console.log( + `[sync-billing] Recurring charge: ${result.charged} charged, ${result.paused} paused, ${result.errors} errors` + ); + } + } catch (err) { + console.error('[sync-billing] Recurring charge failed:', err); + } +}, CHARGE_INTERVAL_MS); + // ─── Start ────────────────────────────────────────────────── console.log(`mana-credits starting on port ${config.port}...`); diff --git a/services/mana-credits/src/services/sync-billing.ts b/services/mana-credits/src/services/sync-billing.ts index 93c46f9ae..0fed9bcc2 100644 --- a/services/mana-credits/src/services/sync-billing.ts +++ b/services/mana-credits/src/services/sync-billing.ts @@ -10,6 +10,34 @@ import { BadRequestError, NotFoundError, InsufficientCreditsError } from '../lib type BillingInterval = 'monthly' | 'quarterly' | 'yearly'; +const MANA_NOTIFY_URL = () => process.env.MANA_NOTIFY_URL || 'http://localhost:3040'; +const SERVICE_KEY = () => process.env.MANA_SERVICE_KEY || ''; + +async function sendPauseNotification(userId: string): Promise { + const key = SERVICE_KEY(); + if (!key) return; + + try { + await fetch(`${MANA_NOTIFY_URL()}/api/v1/notifications/send`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Service-Key': key, + }, + body: JSON.stringify({ + channel: 'email', + appId: 'sync', + userId, + subject: 'Cloud Sync pausiert', + body: 'Dein Cloud Sync wurde pausiert, weil deine Credits nicht ausreichen. Lade Credits auf, um die Synchronisation fortzusetzen.', + data: { type: 'sync-paused', resumeUrl: 'https://mana.how/settings/sync' }, + }), + }); + } catch (err) { + console.error(`[sync-billing] Failed to send pause notification for ${userId}:`, err); + } +} + const SYNC_PRICES: Record = { monthly: 30, quarterly: 90, @@ -222,7 +250,7 @@ export class SyncBillingService { .where(eq(syncSubscriptions.userId, sub.userId)); paused++; - // TODO Phase 2: send notification via mana-notify + sendPauseNotification(sub.userId).catch(() => {}); } else { errors++; console.error(`[sync-billing] Failed to charge user ${sub.userId}:`, error); diff --git a/services/mana-sync/cmd/server/main.go b/services/mana-sync/cmd/server/main.go index b566b5b02..73a151a1e 100644 --- a/services/mana-sync/cmd/server/main.go +++ b/services/mana-sync/cmd/server/main.go @@ -13,6 +13,7 @@ import ( "time" "github.com/mana/mana-sync/internal/auth" + "github.com/mana/mana-sync/internal/billing" "github.com/mana/mana-sync/internal/config" "github.com/mana/mana-sync/internal/store" syncHandler "github.com/mana/mana-sync/internal/sync" @@ -49,16 +50,20 @@ func main() { // Initialize WebSocket hub (with JWT validator for auth) hub := ws.NewHub(validator) + // Initialize billing checker (verifies sync subscription via mana-credits) + billingChecker := billing.NewChecker(cfg.ManaCreditsURL, cfg.ServiceKey) + billingMiddleware := billingChecker.Middleware(validator) + // Initialize sync handler handler := syncHandler.NewHandler(db, validator, hub) // Set up routes mux := http.NewServeMux() - // Sync endpoints (Go 1.22+ routing patterns) - mux.HandleFunc("POST /sync/{appId}", handler.HandleSync) - mux.HandleFunc("GET /sync/{appId}/pull", handler.HandlePull) - mux.HandleFunc("GET /sync/{appId}/stream", handler.HandleStream) + // Sync endpoints (Go 1.22+ routing patterns) — gated by billing check + mux.Handle("POST /sync/{appId}", billingMiddleware(http.HandlerFunc(handler.HandleSync))) + mux.Handle("GET /sync/{appId}/pull", billingMiddleware(http.HandlerFunc(handler.HandlePull))) + mux.Handle("GET /sync/{appId}/stream", billingMiddleware(http.HandlerFunc(handler.HandleStream))) // WebSocket endpoints // Unified: one connection per user, receives all app notifications with appId in payload diff --git a/services/mana-sync/internal/billing/check.go b/services/mana-sync/internal/billing/check.go new file mode 100644 index 000000000..526f40854 --- /dev/null +++ b/services/mana-sync/internal/billing/check.go @@ -0,0 +1,127 @@ +// Package billing provides sync billing status checks against mana-credits. +package billing + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "sync" + "time" +) + +// SyncStatus represents the billing status for a user's sync subscription. +type SyncStatus struct { + Active bool `json:"active"` + Interval string `json:"interval"` + PausedAt *string `json:"pausedAt"` +} + +type cachedStatus struct { + status SyncStatus + fetchedAt time.Time +} + +// Checker verifies sync billing status via the mana-credits service. +// Results are cached per user for cacheTTL to avoid hitting mana-credits on every request. +type Checker struct { + creditsURL string + serviceKey string + cacheTTL time.Duration + client *http.Client + + mu sync.RWMutex + cache map[string]cachedStatus +} + +// NewChecker creates a billing checker. +func NewChecker(creditsURL, serviceKey string) *Checker { + return &Checker{ + creditsURL: creditsURL, + serviceKey: serviceKey, + cacheTTL: 5 * time.Minute, + client: &http.Client{Timeout: 5 * time.Second}, + cache: make(map[string]cachedStatus), + } +} + +// IsActive checks whether a user has an active sync subscription. +// Returns true if the billing check fails (fail-open to not block sync on service outage). +func (c *Checker) IsActive(userID string) bool { + // Check cache first + c.mu.RLock() + entry, ok := c.cache[userID] + c.mu.RUnlock() + + if ok && time.Since(entry.fetchedAt) < c.cacheTTL { + return entry.status.Active + } + + // Fetch from mana-credits + status, err := c.fetchStatus(userID) + if err != nil { + slog.Warn("billing check failed, allowing sync (fail-open)", "userID", userID, "error", err) + return true // Fail-open: don't block sync if billing service is down + } + + // Update cache + c.mu.Lock() + c.cache[userID] = cachedStatus{status: status, fetchedAt: time.Now()} + c.mu.Unlock() + + return status.Active +} + +func (c *Checker) fetchStatus(userID string) (SyncStatus, error) { + url := fmt.Sprintf("%s/api/v1/internal/sync/status/%s", c.creditsURL, userID) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return SyncStatus{}, err + } + req.Header.Set("X-Service-Key", c.serviceKey) + + resp, err := c.client.Do(req) + if err != nil { + return SyncStatus{}, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return SyncStatus{}, fmt.Errorf("unexpected status: %d", resp.StatusCode) + } + + var status SyncStatus + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return SyncStatus{}, fmt.Errorf("decode failed: %w", err) + } + + return status, nil +} + +// Middleware returns an HTTP middleware that checks sync billing status. +// Returns 402 Payment Required if the user's sync subscription is not active. +func (c *Checker) Middleware(validator interface{ UserIDFromRequest(*http.Request) (string, error) }) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + userID, err := validator.UserIDFromRequest(r) + if err != nil { + // Let the downstream handler deal with auth errors + next.ServeHTTP(w, r) + return + } + + if !c.IsActive(userID) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusPaymentRequired) + json.NewEncoder(w).Encode(map[string]string{ + "error": "sync_inactive", + "message": "Cloud Sync ist nicht aktiv. Aktiviere Sync in den Einstellungen.", + }) + return + } + + next.ServeHTTP(w, r) + }) + } +} diff --git a/services/mana-sync/internal/config/config.go b/services/mana-sync/internal/config/config.go index 5a6413d09..5b1badf81 100644 --- a/services/mana-sync/internal/config/config.go +++ b/services/mana-sync/internal/config/config.go @@ -7,10 +7,12 @@ import ( // Config holds all configuration for the sync server. type Config struct { - Port int - DatabaseURL string - JWKSUrl string // mana-auth JWKS endpoint for JWT validation - CORSOrigins string + Port int + DatabaseURL string + JWKSUrl string // mana-auth JWKS endpoint for JWT validation + CORSOrigins string + ManaCreditsURL string // mana-credits service URL for billing checks + ServiceKey string // Service-to-service auth key } // Load reads configuration from environment variables with sensible defaults. @@ -18,10 +20,12 @@ func Load() *Config { port, _ := strconv.Atoi(getEnv("PORT", "3050")) return &Config{ - Port: port, - DatabaseURL: getEnv("DATABASE_URL", "postgresql://mana:devpassword@localhost:5432/mana_sync"), - JWKSUrl: getEnv("JWKS_URL", "http://localhost:3001/api/auth/jwks"), - CORSOrigins: getEnv("CORS_ORIGINS", "http://localhost:5173,http://localhost:5188"), + Port: port, + DatabaseURL: getEnv("DATABASE_URL", "postgresql://mana:devpassword@localhost:5432/mana_sync"), + JWKSUrl: getEnv("JWKS_URL", "http://localhost:3001/api/auth/jwks"), + CORSOrigins: getEnv("CORS_ORIGINS", "http://localhost:5173,http://localhost:5188"), + ManaCreditsURL: getEnv("MANA_CREDITS_URL", "http://localhost:3061"), + ServiceKey: getEnv("MANA_SERVICE_KEY", "dev-service-key"), } }