managarten/services/mana-sync/cmd/server/main.go
Till JS 38d35247cd feat(spaces): end-to-end shared-space sync (membership lookup + plaintext)
Closes the gap between "invite flow UI exists" and "two users in the
same space actually see each other's data". Three pieces land together
because they're meaningless without each other.

mana-auth — new internal endpoint:
  GET /api/v1/internal/users/:userId/memberships
  Returns [{organizationId, role}, ...] for the user. mana-sync uses
  this to populate the multi-member RLS session config.

mana-sync — membership lookup:
  new internal/memberships package with an HTTP client + 5 min
  per-user cache, fail-open (empty list = pre-Spaces behavior).
  Config gets MANA_AUTH_URL (default http://localhost:3001).
  Handler.NewHandler takes the Lookup. Every Push/Pull/Stream call
  now passes spaceIDsFor(userID) to Store methods.
  GetChangesSince + GetAllChangesSince extend their WHERE clause:
    WHERE (user_id = $1 OR space_id = ANY($memberSpaces))
  so co-members see each other's rows, not just the author.

apps/web — encryption skip for shared-space records:
  encryptRecord now checks record.spaceId:
    - `_personal:<userId>` sentinel OR no active shared space → encrypt
      with user master key (E2E as today).
    - Active space resolves to non-personal type AND spaceId matches
      that space → skip encryption; write lands plaintext.
  decryptRecord is unchanged because its per-field isEncrypted() guard
  already passes plaintext through.
  Phase-1 compromise: shared-space data is protected by server RLS
  only, not E2E. Phase 2 adds per-Space shared keys with per-member
  wrap — tracked in docs/plans/spaces-foundation.md.

Plus docs/plans/shared-space-smoketest.md: step-by-step Zwei-User-Test
mit erwarteten Ergebnissen und Debugging-Hinweisen bei Problemen.

Build + go test + web check all green.

Plan: docs/plans/spaces-foundation.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 20:46:53 +02:00

150 lines
4.9 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/mana/mana-sync/internal/auth"
"github.com/mana/mana-sync/internal/backup"
"github.com/mana/mana-sync/internal/billing"
"github.com/mana/mana-sync/internal/config"
"github.com/mana/mana-sync/internal/memberships"
"github.com/mana/mana-sync/internal/store"
syncHandler "github.com/mana/mana-sync/internal/sync"
"github.com/mana/mana-sync/internal/ws"
"github.com/rs/cors"
)
func main() {
// Structured logging
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
})))
cfg := config.Load()
ctx := context.Background()
// Connect to PostgreSQL
db, err := store.New(ctx, cfg.DatabaseURL)
if err != nil {
slog.Error("failed to connect to database", "error", err)
os.Exit(1)
}
defer db.Close()
// Run migrations
if err := db.Migrate(ctx); err != nil {
slog.Error("failed to run migrations", "error", err)
os.Exit(1)
}
// Initialize JWT validator
validator := auth.NewValidator(cfg.JWKSUrl)
// Initialize WebSocket hub (with JWT validator for auth)
hub := ws.NewHub(validator)
// Initialize billing checker (verifies sync subscription via mana-credits)
billingChecker := billing.NewChecker(cfg.ManaCreditsURL, cfg.ServiceKey)
billingMiddleware := billingChecker.Middleware(validator)
// Initialize Space-membership lookup against mana-auth. The handler
// passes the caller's membership list into every sync query so the
// multi-member RLS policy lets co-members of a shared Space see each
// other's records.
membershipLookup := memberships.New(cfg.ManaAuthURL, cfg.ServiceKey)
// Initialize sync handler
handler := syncHandler.NewHandler(db, validator, hub, membershipLookup)
// Set up routes
mux := http.NewServeMux()
// Sync endpoints (Go 1.22+ routing patterns) — gated by billing check
mux.Handle("POST /sync/{appId}", billingMiddleware(http.HandlerFunc(handler.HandleSync)))
mux.Handle("GET /sync/{appId}/pull", billingMiddleware(http.HandlerFunc(handler.HandlePull)))
mux.Handle("GET /sync/{appId}/stream", billingMiddleware(http.HandlerFunc(handler.HandleStream)))
// Backup/export — GDPR-grade, auth-only (no billing gate so users can
// always retrieve their data). M1 thin slice: streams raw sync_changes
// as JSONL. Manifest + zip container land in M3.
backupHandler := backup.NewHandler(db, validator)
mux.Handle("GET /backup/export", http.HandlerFunc(backupHandler.HandleExport))
// WebSocket endpoints
// Unified: one connection per user, receives all app notifications with appId in payload
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
hub.HandleWebSocket(w, r, "") // empty appID = unified mode
})
// Legacy: one connection per app (backward-compatible)
mux.HandleFunc("/ws/{appId}", func(w http.ResponseWriter, r *http.Request) {
appID := r.PathValue("appId")
hub.HandleWebSocket(w, r, appID)
})
// Health check
mux.HandleFunc("GET /health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"status": "ok",
"service": "mana-sync",
"timestamp": time.Now().UTC().Format(time.RFC3339),
"connections": hub.TotalConnections(),
"users": hub.ConnectedUsers(),
})
})
// Metrics (Prometheus-compatible)
mux.HandleFunc("GET /metrics", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "# HELP mana_sync_connections_total Total WebSocket connections\n")
fmt.Fprintf(w, "# TYPE mana_sync_connections_total gauge\n")
fmt.Fprintf(w, "mana_sync_connections_total %d\n", hub.TotalConnections())
fmt.Fprintf(w, "# HELP mana_sync_users_connected Connected unique users\n")
fmt.Fprintf(w, "# TYPE mana_sync_users_connected gauge\n")
fmt.Fprintf(w, "mana_sync_users_connected %d\n", hub.ConnectedUsers())
})
// CORS
origins := strings.Split(cfg.CORSOrigins, ",")
c := cors.New(cors.Options{
AllowedOrigins: origins,
AllowedMethods: []string{"GET", "POST", "OPTIONS"},
AllowedHeaders: []string{"Authorization", "Content-Type", "X-Client-Id"},
AllowCredentials: true,
})
server := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: c.Handler(mux),
ReadTimeout: 15 * time.Second,
WriteTimeout: 0, // Disabled for SSE streaming (long-lived connections)
IdleTimeout: 120 * time.Second,
}
// Graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
slog.Info("shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
server.Shutdown(ctx)
}()
slog.Info("mana-sync starting", "port", cfg.Port, "jwks", cfg.JWKSUrl)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
slog.Error("server error", "error", err)
os.Exit(1)
}
}