managarten/services/mana-sync/cmd/server/main.go
Till JS ed76f53b00 feat(sync): Phase 2 — server-side billing gate, cron charging, email notifications
Server-side gating (mana-sync Go):
- New billing.Checker with 5-minute cache per user
- Middleware wraps POST/GET /sync/{appId} endpoints
- Returns 402 Payment Required when sync subscription inactive
- Fail-open: if mana-credits is unreachable, sync is allowed
- Config: MANA_CREDITS_URL + MANA_SERVICE_KEY env vars

Recurring charge cron (mana-credits):
- Hourly setInterval checks for due sync subscriptions
- Calls chargeRecurring() which debits credits and advances nextChargeAt
- On insufficient credits: pauses subscription, sends email via mana-notify

Email notifications:
- Sends "Cloud Sync pausiert" email via mana-notify when subscription paused
- Uses POST /api/v1/notifications/send with X-Service-Key auth

Client-side 402 handling:
- sync.ts detects 402 from push/pull, fires onBillingRequired callback
- Layout wires callback to reload syncBilling store → shows pause banner

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 22:28:57 +02:00

136 lines
4.2 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/mana/mana-sync/internal/auth"
"github.com/mana/mana-sync/internal/billing"
"github.com/mana/mana-sync/internal/config"
"github.com/mana/mana-sync/internal/store"
syncHandler "github.com/mana/mana-sync/internal/sync"
"github.com/mana/mana-sync/internal/ws"
"github.com/rs/cors"
)
func main() {
// Structured logging
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
})))
cfg := config.Load()
ctx := context.Background()
// Connect to PostgreSQL
db, err := store.New(ctx, cfg.DatabaseURL)
if err != nil {
slog.Error("failed to connect to database", "error", err)
os.Exit(1)
}
defer db.Close()
// Run migrations
if err := db.Migrate(ctx); err != nil {
slog.Error("failed to run migrations", "error", err)
os.Exit(1)
}
// Initialize JWT validator
validator := auth.NewValidator(cfg.JWKSUrl)
// Initialize WebSocket hub (with JWT validator for auth)
hub := ws.NewHub(validator)
// Initialize billing checker (verifies sync subscription via mana-credits)
billingChecker := billing.NewChecker(cfg.ManaCreditsURL, cfg.ServiceKey)
billingMiddleware := billingChecker.Middleware(validator)
// Initialize sync handler
handler := syncHandler.NewHandler(db, validator, hub)
// Set up routes
mux := http.NewServeMux()
// Sync endpoints (Go 1.22+ routing patterns) — gated by billing check
mux.Handle("POST /sync/{appId}", billingMiddleware(http.HandlerFunc(handler.HandleSync)))
mux.Handle("GET /sync/{appId}/pull", billingMiddleware(http.HandlerFunc(handler.HandlePull)))
mux.Handle("GET /sync/{appId}/stream", billingMiddleware(http.HandlerFunc(handler.HandleStream)))
// WebSocket endpoints
// Unified: one connection per user, receives all app notifications with appId in payload
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
hub.HandleWebSocket(w, r, "") // empty appID = unified mode
})
// Legacy: one connection per app (backward-compatible)
mux.HandleFunc("/ws/{appId}", func(w http.ResponseWriter, r *http.Request) {
appID := r.PathValue("appId")
hub.HandleWebSocket(w, r, appID)
})
// Health check
mux.HandleFunc("GET /health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"status": "ok",
"service": "mana-sync",
"timestamp": time.Now().UTC().Format(time.RFC3339),
"connections": hub.TotalConnections(),
"users": hub.ConnectedUsers(),
})
})
// Metrics (Prometheus-compatible)
mux.HandleFunc("GET /metrics", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "# HELP mana_sync_connections_total Total WebSocket connections\n")
fmt.Fprintf(w, "# TYPE mana_sync_connections_total gauge\n")
fmt.Fprintf(w, "mana_sync_connections_total %d\n", hub.TotalConnections())
fmt.Fprintf(w, "# HELP mana_sync_users_connected Connected unique users\n")
fmt.Fprintf(w, "# TYPE mana_sync_users_connected gauge\n")
fmt.Fprintf(w, "mana_sync_users_connected %d\n", hub.ConnectedUsers())
})
// CORS
origins := strings.Split(cfg.CORSOrigins, ",")
c := cors.New(cors.Options{
AllowedOrigins: origins,
AllowedMethods: []string{"GET", "POST", "OPTIONS"},
AllowedHeaders: []string{"Authorization", "Content-Type", "X-Client-Id"},
AllowCredentials: true,
})
server := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: c.Handler(mux),
ReadTimeout: 15 * time.Second,
WriteTimeout: 0, // Disabled for SSE streaming (long-lived connections)
IdleTimeout: 120 * time.Second,
}
// Graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
slog.Info("shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
server.Shutdown(ctx)
}()
slog.Info("mana-sync starting", "port", cfg.Port, "jwks", cfg.JWKSUrl)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
slog.Error("server error", "error", err)
os.Exit(1)
}
}