diff --git a/services/mana-notify/.gitignore b/services/mana-notify/.gitignore deleted file mode 100644 index f2823069e..000000000 --- a/services/mana-notify/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -bin/ -server diff --git a/services/mana-notify/CLAUDE.md b/services/mana-notify/CLAUDE.md deleted file mode 100644 index 4d23cc432..000000000 --- a/services/mana-notify/CLAUDE.md +++ /dev/null @@ -1,73 +0,0 @@ -# mana-notify (Go) - -Go replacement for the NestJS mana-notify service. Unified notification microservice for email, push, and webhook notifications. - -## Architecture - -- **Language:** Go 1.25 -- **Database:** PostgreSQL (pgx v5, 5 tables in `notify` schema) -- **Queue:** Go channels + goroutine worker pool (replaces BullMQ) -- **Metrics:** Prometheus -- **Port:** 3040 - -## Endpoints - -### Notifications (X-Service-Key auth) -- `POST /api/v1/notifications/send` — Send immediately -- `POST /api/v1/notifications/schedule` — Schedule for future -- `POST /api/v1/notifications/batch` — Batch send (max 100) -- `GET /api/v1/notifications/{id}` — Get status -- `DELETE /api/v1/notifications/{id}` — Cancel pending - -### Templates (X-Service-Key auth) -- `GET /api/v1/templates` — List all -- `GET /api/v1/templates/{slug}` — Get by slug -- `POST /api/v1/templates` — Create -- `PUT /api/v1/templates/{slug}` — Update -- `DELETE /api/v1/templates/{slug}` — Delete -- `POST /api/v1/templates/{slug}/preview` — Preview -- `POST /api/v1/templates/preview` — Preview custom - -### Devices (JWT auth) -- `POST /api/v1/devices/register` — Register push device -- `GET /api/v1/devices` — List devices -- `DELETE /api/v1/devices/{id}` — Unregister - -### Preferences (JWT auth) -- `GET /api/v1/preferences` — Get preferences -- `PUT /api/v1/preferences` — Update preferences - -### System -- `GET /health` — Health check -- `GET /metrics` — Prometheus metrics - -## Notification Channels - -| Channel | Service | Worker Concurrency | Max Retries | -|---------|---------|-------------------|-------------| -| Email | Stalwart SMTP (self-hosted, see `docs/MAIL_SERVER.md`) | 5 | 3 | -| Push | Expo Push API | 10 | 3 | -| Webhook | HTTP callback | 10 | 5 | - -## Commands - -```bash -go run ./cmd/server # Dev -go build -o bin/mana-notify ./cmd/server # Build -go test ./... # Test -``` - -## Environment Variables - -| Variable | Default | Description | -|----------|---------|-------------| -| `PORT` | 3040 | Server port | -| `DATABASE_URL` | postgresql://...localhost:5432/mana_notify | PostgreSQL | -| `SERVICE_KEY` | dev-service-key | Service-to-service auth | -| `MANA_AUTH_URL` | http://localhost:3001 | JWT validation | -| `SMTP_HOST` | stalwart | SMTP host (self-hosted Stalwart) | -| `SMTP_PORT` | 587 | SMTP port | -| `SMTP_USER` | | SMTP username | -| `SMTP_PASSWORD` | | SMTP password | -| `SMTP_FROM` | Mana | Default from | -| `EXPO_ACCESS_TOKEN` | | Expo push token | diff --git a/services/mana-notify/Dockerfile b/services/mana-notify/Dockerfile deleted file mode 100644 index e30192cce..000000000 --- a/services/mana-notify/Dockerfile +++ /dev/null @@ -1,25 +0,0 @@ -FROM golang:1.25-alpine AS builder - -WORKDIR /app -COPY packages/shared-go/ /shared-go/ -COPY services/mana-notify/go.mod services/mana-notify/go.sum ./ -RUN go mod edit -replace github.com/mana/shared-go=/shared-go && go mod download - -COPY services/mana-notify/ . -RUN go mod edit -replace github.com/mana/shared-go=/shared-go && \ - CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /mana-notify ./cmd/server - -FROM alpine:3.21 - -RUN apk --no-cache add ca-certificates tzdata && \ - addgroup -g 1000 mana && adduser -u 1000 -G mana -s /sbin/nologin -D mana - -COPY --from=builder /mana-notify /usr/local/bin/mana-notify - -USER mana -EXPOSE 3040 - -HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ - CMD wget -q --spider http://localhost:3040/health || exit 1 - -ENTRYPOINT ["mana-notify"] diff --git a/services/mana-notify/cmd/server/main.go b/services/mana-notify/cmd/server/main.go deleted file mode 100644 index ab9de9526..000000000 --- a/services/mana-notify/cmd/server/main.go +++ /dev/null @@ -1,136 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log/slog" - "net/http" - "os" - "os/signal" - "syscall" - "time" - - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/cors" - - "github.com/mana/mana-notify/internal/auth" - "github.com/mana/mana-notify/internal/channel" - "github.com/mana/mana-notify/internal/config" - "github.com/mana/mana-notify/internal/db" - "github.com/mana/mana-notify/internal/handler" - "github.com/mana/mana-notify/internal/metrics" - "github.com/mana/mana-notify/internal/queue" - tmpl "github.com/mana/mana-notify/internal/template" -) - -func main() { - slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: slog.LevelInfo, - }))) - - cfg := config.Load() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - database, err := db.New(ctx, cfg.DatabaseURL) - cancel() - if err != nil { - slog.Error("database init failed", "error", err) - os.Exit(1) - } - defer database.Close() - - // Initialize services - m := metrics.New() - emailSvc := channel.NewEmailService(cfg) - pushSvc := channel.NewPushService(cfg) - webhookSvc := channel.NewWebhookService() - engine := tmpl.NewEngine(database) - - // Seed default templates - engine.SeedDefaults(context.Background()) - - // Start worker pool - workerPool := queue.NewWorkerPool(database, emailSvc, pushSvc, webhookSvc, m) - workerPool.Start() - defer workerPool.Stop() - - // Handlers - notifHandler := handler.NewNotificationsHandler(database, workerPool, engine) - tmplHandler := handler.NewTemplatesHandler(database, engine) - devicesHandler := handler.NewDevicesHandler(database) - prefsHandler := handler.NewPreferencesHandler(database) - healthHandler := handler.NewHealthHandler(database) - - // Middleware - serviceAuth := auth.ValidateServiceKey(cfg.ServiceKey) - jwtAuth := auth.ValidateJWT(cfg.ManaAuthURL) - - mux := http.NewServeMux() - - // System endpoints (no auth) - mux.HandleFunc("GET /health", healthHandler.Health) - mux.Handle("GET /metrics", promhttp.Handler()) - - // Notification endpoints (service key auth) - mux.Handle("POST /api/v1/notifications/send", serviceAuth(http.HandlerFunc(notifHandler.Send))) - mux.Handle("POST /api/v1/notifications/schedule", serviceAuth(http.HandlerFunc(notifHandler.Schedule))) - mux.Handle("POST /api/v1/notifications/batch", serviceAuth(http.HandlerFunc(notifHandler.Batch))) - mux.Handle("GET /api/v1/notifications/{id}", serviceAuth(http.HandlerFunc(notifHandler.GetNotification))) - mux.Handle("DELETE /api/v1/notifications/{id}", serviceAuth(http.HandlerFunc(notifHandler.CancelNotification))) - - // Template endpoints (service key auth) - mux.Handle("GET /api/v1/templates", serviceAuth(http.HandlerFunc(tmplHandler.List))) - mux.Handle("POST /api/v1/templates", serviceAuth(http.HandlerFunc(tmplHandler.Create))) - mux.Handle("POST /api/v1/templates/preview", serviceAuth(http.HandlerFunc(tmplHandler.PreviewCustom))) - mux.Handle("GET /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Get))) - mux.Handle("PUT /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Update))) - mux.Handle("DELETE /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Delete))) - mux.Handle("POST /api/v1/templates/{slug}/preview", serviceAuth(http.HandlerFunc(tmplHandler.Preview))) - - // Device endpoints (JWT auth) - mux.Handle("POST /api/v1/devices/register", jwtAuth(http.HandlerFunc(devicesHandler.Register))) - mux.Handle("GET /api/v1/devices", jwtAuth(http.HandlerFunc(devicesHandler.List))) - mux.Handle("DELETE /api/v1/devices/{id}", jwtAuth(http.HandlerFunc(devicesHandler.Delete))) - - // Preference endpoints (JWT auth) - mux.Handle("GET /api/v1/preferences", jwtAuth(http.HandlerFunc(prefsHandler.Get))) - mux.Handle("PUT /api/v1/preferences", jwtAuth(http.HandlerFunc(prefsHandler.Update))) - - corsHandler := cors.New(cors.Options{ - AllowedOrigins: cfg.CORSOrigins, - AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, - AllowedHeaders: []string{"Content-Type", "Authorization", "X-Service-Key"}, - AllowCredentials: true, - }).Handler(mux) - - server := &http.Server{ - Addr: fmt.Sprintf(":%d", cfg.Port), - Handler: corsHandler, - ReadTimeout: 30 * time.Second, - WriteTimeout: 60 * time.Second, - IdleTimeout: 120 * time.Second, - MaxHeaderBytes: 1 << 20, - } - - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - - go func() { - slog.Info("mana-notify started", "port", cfg.Port) - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - slog.Error("server error", "error", err) - os.Exit(1) - } - }() - - <-sigCh - slog.Info("shutting down...") - - ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := server.Shutdown(ctx); err != nil { - slog.Error("shutdown error", "error", err) - } - - slog.Info("server stopped") -} diff --git a/services/mana-notify/go.mod b/services/mana-notify/go.mod deleted file mode 100644 index 4d7bd6540..000000000 --- a/services/mana-notify/go.mod +++ /dev/null @@ -1,29 +0,0 @@ -module github.com/mana/mana-notify - -go 1.25.0 - -require ( - github.com/jackc/pgx/v5 v5.9.1 - github.com/mana/shared-go v0.0.0 - github.com/prometheus/client_golang v1.22.0 - github.com/rs/cors v1.11.1 -) - -require ( - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/golang-jwt/jwt/v5 v5.3.1 // indirect - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.62.0 // indirect - github.com/prometheus/procfs v0.15.1 // indirect - golang.org/x/sync v0.17.0 // indirect - golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.29.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect -) - -replace github.com/mana/shared-go => ../../packages/shared-go diff --git a/services/mana-notify/go.sum b/services/mana-notify/go.sum deleted file mode 100644 index de1070a5d..000000000 --- a/services/mana-notify/go.sum +++ /dev/null @@ -1,54 +0,0 @@ -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= -github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= -github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= -github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= -github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= -github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= -github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= -github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= -github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= -github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= -github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= -github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= -github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= -github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= -golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/services/mana-notify/internal/auth/auth.go b/services/mana-notify/internal/auth/auth.go deleted file mode 100644 index 4568e08dd..000000000 --- a/services/mana-notify/internal/auth/auth.go +++ /dev/null @@ -1,31 +0,0 @@ -// Package auth provides authentication middleware for mana-notify. -// Delegates to shared-go/authutil for JWT and service key validation. -package auth - -import ( - "net/http" - - "github.com/mana/shared-go/authutil" -) - -// Re-export types for backward compatibility. -type User = authutil.User - -// UserContextKey is the context key for the authenticated user. -const UserContextKey = authutil.UserContextKey - -// ValidateServiceKey checks the X-Service-Key header. -func ValidateServiceKey(serviceKey string) func(http.Handler) http.Handler { - return authutil.ServiceKeyMiddleware(serviceKey) -} - -// ValidateJWT validates Bearer tokens against mana-auth. -func ValidateJWT(authURL string) func(http.Handler) http.Handler { - validator := authutil.NewRemoteValidator(authURL) - return authutil.JWTMiddleware(validator) -} - -// GetUser extracts the authenticated user from context. -func GetUser(r *http.Request) *User { - return authutil.GetUser(r) -} diff --git a/services/mana-notify/internal/auth/auth_test.go b/services/mana-notify/internal/auth/auth_test.go deleted file mode 100644 index 9554e2c3b..000000000 --- a/services/mana-notify/internal/auth/auth_test.go +++ /dev/null @@ -1,174 +0,0 @@ -package auth - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" -) - -func TestValidateServiceKey(t *testing.T) { - const validKey = "test-service-key-123" - - okHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"ok":true}`)) - }) - - middleware := ValidateServiceKey(validKey) - handler := middleware(okHandler) - - tests := []struct { - name string - key string - wantStatus int - }{ - { - name: "valid key passes through", - key: validKey, - wantStatus: http.StatusOK, - }, - { - name: "missing key returns 401", - key: "", - wantStatus: http.StatusUnauthorized, - }, - { - name: "wrong key returns 401", - key: "wrong-key", - wantStatus: http.StatusUnauthorized, - }, - { - name: "partial key returns 401", - key: "test-service-key", - wantStatus: http.StatusUnauthorized, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "/api/v1/test", nil) - if tt.key != "" { - req.Header.Set("X-Service-Key", tt.key) - } - rec := httptest.NewRecorder() - - handler.ServeHTTP(rec, req) - - if rec.Code != tt.wantStatus { - t.Fatalf("status = %d, want %d", rec.Code, tt.wantStatus) - } - }) - } -} - -func TestValidateServiceKey_NextHandlerCalled(t *testing.T) { - called := false - next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - called = true - w.WriteHeader(http.StatusOK) - }) - - handler := ValidateServiceKey("key123")(next) - - req := httptest.NewRequest(http.MethodGet, "/", nil) - req.Header.Set("X-Service-Key", "key123") - rec := httptest.NewRecorder() - - handler.ServeHTTP(rec, req) - - if !called { - t.Fatal("next handler was not called with valid key") - } -} - -func TestValidateServiceKey_NextHandlerNotCalledOnInvalidKey(t *testing.T) { - called := false - next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - called = true - }) - - handler := ValidateServiceKey("correct-key")(next) - - req := httptest.NewRequest(http.MethodGet, "/", nil) - req.Header.Set("X-Service-Key", "wrong-key") - rec := httptest.NewRecorder() - - handler.ServeHTTP(rec, req) - - if called { - t.Fatal("next handler should not be called with invalid key") - } -} - -func TestValidateJWT_MissingBearer(t *testing.T) { - next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - t.Fatal("next handler should not be called without token") - }) - - handler := ValidateJWT("http://localhost:3001")(next) - - tests := []struct { - name string - header string - }{ - {"no header", ""}, - {"no Bearer prefix", "Token abc123"}, - {"basic auth", "Basic dXNlcjpwYXNz"}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "/", nil) - if tt.header != "" { - req.Header.Set("Authorization", tt.header) - } - rec := httptest.NewRecorder() - - handler.ServeHTTP(rec, req) - - if rec.Code != http.StatusUnauthorized { - t.Fatalf("status = %d, want 401", rec.Code) - } - }) - } -} - -func TestGetUser_NoUser(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "/", nil) - user := GetUser(req) - if user != nil { - t.Fatal("expected nil user from empty context") - } -} - -func TestGetUser_WithUser(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "/", nil) - expected := &User{ - UserID: "user-123", - Email: "test@example.com", - Role: "user", - SessionID: "sess-456", - } - - ctx := req.Context() - ctx = context.WithValue(ctx, UserContextKey, expected) - req = req.WithContext(ctx) - - user := GetUser(req) - if user == nil { - t.Fatal("expected non-nil user") - } - if user.UserID != expected.UserID { - t.Fatalf("UserID = %q, want %q", user.UserID, expected.UserID) - } - if user.Email != expected.Email { - t.Fatalf("Email = %q, want %q", user.Email, expected.Email) - } - if user.Role != expected.Role { - t.Fatalf("Role = %q, want %q", user.Role, expected.Role) - } - if user.SessionID != expected.SessionID { - t.Fatalf("SessionID = %q, want %q", user.SessionID, expected.SessionID) - } -} diff --git a/services/mana-notify/internal/channel/email.go b/services/mana-notify/internal/channel/email.go deleted file mode 100644 index b7dccc160..000000000 --- a/services/mana-notify/internal/channel/email.go +++ /dev/null @@ -1,219 +0,0 @@ -package channel - -import ( - "crypto/tls" - "fmt" - "log/slog" - "net/smtp" - "strings" - "time" - - "github.com/mana/mana-notify/internal/config" -) - -type EmailService struct { - host string - port int - user string - password string - from string - insecureTLS bool -} - -func NewEmailService(cfg *config.Config) *EmailService { - return &EmailService{ - host: cfg.SMTPHost, - port: cfg.SMTPPort, - user: cfg.SMTPUser, - password: cfg.SMTPPassword, - from: cfg.SMTPFrom, - insecureTLS: cfg.SMTPInsecureTLS, - } -} - -type EmailMessage struct { - To string - Subject string - HTML string - Text string - From string - ReplyTo string -} - -type EmailResult struct { - Success bool - MessageID string - Error string -} - -// loginAuth implements smtp.Auth for LOGIN mechanism (some servers need this). -// Also bypasses Go's PlainAuth hostname check for internal connections. -type loginAuth struct { - username, password string -} - -func (a *loginAuth) Start(server *smtp.ServerInfo) (string, []byte, error) { - return "LOGIN", []byte(a.username), nil -} - -func (a *loginAuth) Next(fromServer []byte, more bool) ([]byte, error) { - if more { - prompt := strings.TrimSpace(string(fromServer)) - switch strings.ToLower(prompt) { - case "username:": - return []byte(a.username), nil - case "password:": - return []byte(a.password), nil - default: - return nil, fmt.Errorf("unexpected server prompt: %s", prompt) - } - } - return nil, nil -} - -func (s *EmailService) IsConfigured() bool { - return s.user != "" && s.password != "" -} - -func (s *EmailService) Send(msg *EmailMessage) EmailResult { - start := time.Now() - - if !s.IsConfigured() { - slog.Warn("smtp not configured, skipping email", "to", msg.To) - return EmailResult{Success: false, Error: "SMTP not configured"} - } - - from := s.from - if msg.From != "" { - from = msg.From - } - - // Build email headers and body - msgID := fmt.Sprintf("<%d.%s@mana.how>", time.Now().UnixNano(), msg.To) - - var builder strings.Builder - builder.WriteString(fmt.Sprintf("From: %s\r\n", from)) - builder.WriteString(fmt.Sprintf("To: %s\r\n", msg.To)) - builder.WriteString(fmt.Sprintf("Subject: %s\r\n", msg.Subject)) - builder.WriteString(fmt.Sprintf("Message-ID: %s\r\n", msgID)) - builder.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().UTC().Format(time.RFC1123Z))) - if msg.ReplyTo != "" { - builder.WriteString(fmt.Sprintf("Reply-To: %s\r\n", msg.ReplyTo)) - } - builder.WriteString("MIME-Version: 1.0\r\n") - builder.WriteString("Content-Type: text/html; charset=\"UTF-8\"\r\n") - builder.WriteString("\r\n") - - if msg.HTML != "" { - builder.WriteString(msg.HTML) - } else { - builder.WriteString(msg.Text) - } - - fromAddr := extractEmail(from) - addr := fmt.Sprintf("%s:%d", s.host, s.port) - body := []byte(builder.String()) - - tlsConfig := &tls.Config{ServerName: s.host, InsecureSkipVerify: s.insecureTLS} - - // Try implicit TLS first (port 465 style) - conn, err := tls.Dial("tcp", addr, tlsConfig) - if err == nil { - defer conn.Close() - result := s.sendViaClient(conn, s.host, fromAddr, msg.To, body, start) - if result.Success { - slog.Info("email sent via TLS", "to", msg.To, "duration", time.Since(start)) - } - return result - } - - // Fallback: STARTTLS on plain connection - c, dialErr := smtp.Dial(addr) - if dialErr != nil { - slog.Error("smtp dial failed", "to", msg.To, "error", dialErr, "duration", time.Since(start)) - return EmailResult{Success: false, Error: dialErr.Error()} - } - defer c.Close() - - // Try STARTTLS - if err := c.StartTLS(tlsConfig); err != nil { - if s.insecureTLS { - slog.Warn("STARTTLS failed, continuing without TLS", "error", err) - } else { - slog.Error("STARTTLS failed", "to", msg.To, "error", err) - return EmailResult{Success: false, Error: err.Error()} - } - } - - // Auth — use loginAuth to bypass Go's PlainAuth hostname restriction - auth := &loginAuth{username: s.user, password: s.password} - if err := c.Auth(auth); err != nil { - slog.Error("smtp auth failed", "to", msg.To, "error", err, "duration", time.Since(start)) - return EmailResult{Success: false, Error: err.Error()} - } - - if err := c.Mail(fromAddr); err != nil { - slog.Error("smtp MAIL FROM failed", "to", msg.To, "error", err) - return EmailResult{Success: false, Error: err.Error()} - } - if err := c.Rcpt(msg.To); err != nil { - slog.Error("smtp RCPT TO failed", "to", msg.To, "error", err) - return EmailResult{Success: false, Error: err.Error()} - } - - w, err := c.Data() - if err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - if _, err := w.Write(body); err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - if err := w.Close(); err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - c.Quit() - - slog.Info("email sent via STARTTLS", "to", msg.To, "duration", time.Since(start)) - return EmailResult{Success: true} -} - -func (s *EmailService) sendViaClient(conn *tls.Conn, host string, from, to string, body []byte, start time.Time) EmailResult { - client, err := smtp.NewClient(conn, host) - if err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - defer client.Close() - - auth := &loginAuth{username: s.user, password: s.password} - if err := client.Auth(auth); err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - if err := client.Mail(from); err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - if err := client.Rcpt(to); err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - w, err := client.Data() - if err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - if _, err := w.Write(body); err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - if err := w.Close(); err != nil { - return EmailResult{Success: false, Error: err.Error()} - } - client.Quit() - return EmailResult{Success: true} -} - -func extractEmail(from string) string { - if idx := strings.Index(from, "<"); idx != -1 { - end := strings.Index(from, ">") - if end > idx { - return from[idx+1 : end] - } - } - return from -} diff --git a/services/mana-notify/internal/channel/push.go b/services/mana-notify/internal/channel/push.go deleted file mode 100644 index 65d946fc4..000000000 --- a/services/mana-notify/internal/channel/push.go +++ /dev/null @@ -1,169 +0,0 @@ -package channel - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "net/http" - "strings" - "time" - - "github.com/mana/mana-notify/internal/config" -) - -const expoPushURL = "https://exp.host/--/api/v2/push/send" - -type PushService struct { - accessToken string - client *http.Client -} - -func NewPushService(cfg *config.Config) *PushService { - return &PushService{ - accessToken: cfg.ExpoAccessToken, - client: &http.Client{Timeout: 30 * time.Second}, - } -} - -type PushMessage struct { - To string `json:"to"` - Title string `json:"title,omitempty"` - Body string `json:"body"` - Data map[string]any `json:"data,omitempty"` - Sound string `json:"sound,omitempty"` - Badge *int `json:"badge,omitempty"` - ChannelID string `json:"channelId,omitempty"` -} - -type PushResult struct { - Success bool - TicketID string - Error string -} - -func (s *PushService) IsConfigured() bool { - return s.accessToken != "" -} - -// IsExpoPushToken validates Expo token format. -func IsExpoPushToken(token string) bool { - return strings.HasPrefix(token, "ExponentPushToken[") || strings.HasPrefix(token, "ExpoPushToken[") -} - -// SendToTokens sends push notifications in batches of 100 (Expo limit). -func (s *PushService) SendToTokens(ctx context.Context, tokens []string, title, body string, data map[string]any, sound string, badge *int) map[string]PushResult { - results := make(map[string]PushResult, len(tokens)) - - if !s.IsConfigured() { - for _, t := range tokens { - results[t] = PushResult{Success: false, Error: "Expo not configured"} - } - return results - } - - // Build messages - messages := make([]PushMessage, 0, len(tokens)) - for _, token := range tokens { - messages = append(messages, PushMessage{ - To: token, - Title: title, - Body: body, - Data: data, - Sound: sound, - Badge: badge, - }) - } - - // Chunk into batches of 100 - for i := 0; i < len(messages); i += 100 { - end := i + 100 - if end > len(messages) { - end = len(messages) - } - chunk := messages[i:end] - - batchResults := s.sendBatch(ctx, chunk) - for token, result := range batchResults { - results[token] = result - } - } - - return results -} - -func (s *PushService) sendBatch(ctx context.Context, messages []PushMessage) map[string]PushResult { - results := make(map[string]PushResult, len(messages)) - - body, err := json.Marshal(messages) - if err != nil { - for _, m := range messages { - results[m.To] = PushResult{Success: false, Error: "marshal error"} - } - return results - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, expoPushURL, bytes.NewReader(body)) - if err != nil { - for _, m := range messages { - results[m.To] = PushResult{Success: false, Error: "request error"} - } - return results - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "application/json") - if s.accessToken != "" { - req.Header.Set("Authorization", "Bearer "+s.accessToken) - } - - resp, err := s.client.Do(req) - if err != nil { - slog.Error("expo push failed", "error", err) - for _, m := range messages { - results[m.To] = PushResult{Success: false, Error: err.Error()} - } - return results - } - defer resp.Body.Close() - - respBody, _ := io.ReadAll(resp.Body) - - var expoResp struct { - Data []struct { - Status string `json:"status"` - ID string `json:"id"` - Message string `json:"message"` - Details struct { - Error string `json:"error"` - } `json:"details"` - } `json:"data"` - } - - if err := json.Unmarshal(respBody, &expoResp); err != nil { - for _, m := range messages { - results[m.To] = PushResult{Success: false, Error: fmt.Sprintf("decode error: %s", err)} - } - return results - } - - for i, ticket := range expoResp.Data { - if i >= len(messages) { - break - } - token := messages[i].To - if ticket.Status == "ok" { - results[token] = PushResult{Success: true, TicketID: ticket.ID} - } else { - errMsg := ticket.Message - if ticket.Details.Error != "" { - errMsg = ticket.Details.Error - } - results[token] = PushResult{Success: false, Error: errMsg} - } - } - - return results -} diff --git a/services/mana-notify/internal/channel/push_test.go b/services/mana-notify/internal/channel/push_test.go deleted file mode 100644 index 738f9a871..000000000 --- a/services/mana-notify/internal/channel/push_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package channel - -import "testing" - -func TestIsExpoPushToken(t *testing.T) { - tests := []struct { - token string - want bool - }{ - {"ExponentPushToken[xxxxxxxxxxxxxxxxxxxxxx]", true}, - {"ExpoPushToken[xxxxxxxxxxxxxxxxxxxxxx]", true}, - {"ExponentPushToken[abc123]", true}, - {"ExpoPushToken[abc123]", true}, - {"ExponentPushToken[]", true}, - {"", false}, - {"some-random-token", false}, - {"Bearer ExponentPushToken[abc]", false}, - {"exponentpushtoken[abc]", false}, // case sensitive - {"ExpoPush[abc]", false}, - {"fcm:token123", false}, - } - - for _, tt := range tests { - t.Run(tt.token, func(t *testing.T) { - got := IsExpoPushToken(tt.token) - if got != tt.want { - t.Fatalf("IsExpoPushToken(%q) = %v, want %v", tt.token, got, tt.want) - } - }) - } -} diff --git a/services/mana-notify/internal/channel/webhook.go b/services/mana-notify/internal/channel/webhook.go deleted file mode 100644 index 5a1e28b08..000000000 --- a/services/mana-notify/internal/channel/webhook.go +++ /dev/null @@ -1,91 +0,0 @@ -package channel - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "net/http" - "time" -) - -type WebhookService struct { - client *http.Client -} - -func NewWebhookService() *WebhookService { - return &WebhookService{ - client: &http.Client{Timeout: 10 * time.Second}, - } -} - -type WebhookMessage struct { - URL string - Method string // POST or PUT - Headers map[string]string - Body map[string]any - Timeout int // ms -} - -type WebhookResult struct { - Success bool - StatusCode int - Error string - DurationMs int -} - -func (s *WebhookService) Send(ctx context.Context, msg *WebhookMessage) WebhookResult { - start := time.Now() - - method := msg.Method - if method == "" { - method = http.MethodPost - } - - timeout := 10 * time.Second - if msg.Timeout > 0 { - timeout = time.Duration(msg.Timeout) * time.Millisecond - } - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - body, err := json.Marshal(msg.Body) - if err != nil { - return WebhookResult{Success: false, Error: "marshal error", DurationMs: int(time.Since(start).Milliseconds())} - } - - req, err := http.NewRequestWithContext(ctx, method, msg.URL, bytes.NewReader(body)) - if err != nil { - return WebhookResult{Success: false, Error: err.Error(), DurationMs: int(time.Since(start).Milliseconds())} - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("User-Agent", "ManaNotify/1.0") - for k, v := range msg.Headers { - req.Header.Set(k, v) - } - - resp, err := s.client.Do(req) - durationMs := int(time.Since(start).Milliseconds()) - if err != nil { - slog.Error("webhook send failed", "url", msg.URL, "error", err) - return WebhookResult{Success: false, Error: err.Error(), DurationMs: durationMs} - } - defer resp.Body.Close() - io.Copy(io.Discard, resp.Body) - - success := resp.StatusCode >= 200 && resp.StatusCode < 300 - if !success { - return WebhookResult{ - Success: false, - StatusCode: resp.StatusCode, - Error: fmt.Sprintf("webhook returned %d", resp.StatusCode), - DurationMs: durationMs, - } - } - - return WebhookResult{Success: true, StatusCode: resp.StatusCode, DurationMs: durationMs} -} diff --git a/services/mana-notify/internal/channel/webhook_test.go b/services/mana-notify/internal/channel/webhook_test.go deleted file mode 100644 index b7d628b82..000000000 --- a/services/mana-notify/internal/channel/webhook_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package channel - -import ( - "context" - "encoding/json" - "io" - "net/http" - "net/http/httptest" - "testing" -) - -func TestWebhookService_Send(t *testing.T) { - tests := []struct { - name string - handler http.HandlerFunc - msg WebhookMessage - wantOK bool - wantStatus int - }{ - { - name: "successful POST with 200", - handler: func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - t.Errorf("expected POST, got %s", r.Method) - } - if ct := r.Header.Get("Content-Type"); ct != "application/json" { - t.Errorf("expected Content-Type application/json, got %s", ct) - } - if ua := r.Header.Get("User-Agent"); ua != "ManaNotify/1.0" { - t.Errorf("expected User-Agent ManaNotify/1.0, got %s", ua) - } - w.WriteHeader(http.StatusOK) - }, - msg: WebhookMessage{Body: map[string]any{"test": true}}, - wantOK: true, - wantStatus: 200, - }, - { - name: "successful PUT", - handler: func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPut { - t.Errorf("expected PUT, got %s", r.Method) - } - w.WriteHeader(http.StatusOK) - }, - msg: WebhookMessage{Method: "PUT", Body: map[string]any{"update": true}}, - wantOK: true, - wantStatus: 200, - }, - { - name: "custom headers are sent", - handler: func(w http.ResponseWriter, r *http.Request) { - if v := r.Header.Get("X-Custom"); v != "test-value" { - t.Errorf("expected X-Custom=test-value, got %s", v) - } - w.WriteHeader(http.StatusOK) - }, - msg: WebhookMessage{ - Headers: map[string]string{"X-Custom": "test-value"}, - Body: map[string]any{}, - }, - wantOK: true, - wantStatus: 200, - }, - { - name: "body is sent as JSON", - handler: func(w http.ResponseWriter, r *http.Request) { - body, _ := io.ReadAll(r.Body) - var m map[string]any - if err := json.Unmarshal(body, &m); err != nil { - t.Errorf("body is not valid JSON: %v", err) - } - if m["event"] != "test" { - t.Errorf("expected event=test, got %v", m["event"]) - } - w.WriteHeader(http.StatusOK) - }, - msg: WebhookMessage{Body: map[string]any{"event": "test"}}, - wantOK: true, - wantStatus: 200, - }, - { - name: "server error returns failure", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - }, - msg: WebhookMessage{Body: map[string]any{}}, - wantOK: false, - wantStatus: 500, - }, - { - name: "404 returns failure", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - }, - msg: WebhookMessage{Body: map[string]any{}}, - wantOK: false, - wantStatus: 404, - }, - { - name: "201 is treated as success", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusCreated) - }, - msg: WebhookMessage{Body: map[string]any{}}, - wantOK: true, - wantStatus: 201, - }, - } - - svc := NewWebhookService() - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - srv := httptest.NewServer(tt.handler) - defer srv.Close() - - tt.msg.URL = srv.URL - result := svc.Send(context.Background(), &tt.msg) - - if result.Success != tt.wantOK { - t.Fatalf("Success = %v, want %v (error: %s)", result.Success, tt.wantOK, result.Error) - } - if tt.wantStatus != 0 && result.StatusCode != tt.wantStatus { - t.Fatalf("StatusCode = %d, want %d", result.StatusCode, tt.wantStatus) - } - if result.DurationMs < 0 { - t.Fatalf("DurationMs should be >= 0, got %d", result.DurationMs) - } - }) - } -} - -func TestWebhookService_Send_InvalidURL(t *testing.T) { - svc := NewWebhookService() - result := svc.Send(context.Background(), &WebhookMessage{ - URL: "http://localhost:1", // unreachable port - Body: map[string]any{}, - }) - - if result.Success { - t.Fatal("expected failure for unreachable URL") - } - if result.Error == "" { - t.Fatal("expected non-empty error") - } -} - -func TestWebhookService_Send_DefaultMethod(t *testing.T) { - var gotMethod string - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - gotMethod = r.Method - w.WriteHeader(http.StatusOK) - })) - defer srv.Close() - - svc := NewWebhookService() - svc.Send(context.Background(), &WebhookMessage{ - URL: srv.URL, - Body: map[string]any{}, - }) - - if gotMethod != "POST" { - t.Fatalf("default method should be POST, got %s", gotMethod) - } -} diff --git a/services/mana-notify/internal/config/config.go b/services/mana-notify/internal/config/config.go deleted file mode 100644 index 7886d6d58..000000000 --- a/services/mana-notify/internal/config/config.go +++ /dev/null @@ -1,68 +0,0 @@ -package config - -import ( - "github.com/mana/shared-go/envutil" -) - -type Config struct { - Port int - - // Database - DatabaseURL string - - // Redis - RedisHost string - RedisPort int - RedisPassword string - - // Auth - ServiceKey string - ManaAuthURL string - - // SMTP - SMTPHost string - SMTPPort int - SMTPUser string - SMTPPassword string - SMTPFrom string - SMTPInsecureTLS bool - - // Expo Push - ExpoAccessToken string - - // Rate Limits - RateLimitEmailPerMinute int - RateLimitPushPerMinute int - - // CORS - CORSOrigins []string -} - -func Load() *Config { - return &Config{ - Port: envutil.GetInt("PORT", 3040), - - DatabaseURL: envutil.Get("DATABASE_URL", "postgresql://mana:mana@localhost:5432/mana_notify"), - - RedisHost: envutil.Get("REDIS_HOST", "localhost"), - RedisPort: envutil.GetInt("REDIS_PORT", 6379), - RedisPassword: envutil.Get("REDIS_PASSWORD", ""), - - ServiceKey: envutil.Get("SERVICE_KEY", "dev-service-key"), - ManaAuthURL: envutil.Get("MANA_AUTH_URL", "http://localhost:3001"), - - SMTPHost: envutil.Get("SMTP_HOST", "smtp-relay.brevo.com"), - SMTPPort: envutil.GetInt("SMTP_PORT", 587), - SMTPUser: envutil.Get("SMTP_USER", ""), - SMTPPassword: envutil.Get("SMTP_PASSWORD", ""), - SMTPFrom: envutil.Get("SMTP_FROM", "Mana "), - SMTPInsecureTLS: envutil.GetBool("SMTP_INSECURE_TLS", false), - - ExpoAccessToken: envutil.Get("EXPO_ACCESS_TOKEN", ""), - - RateLimitEmailPerMinute: envutil.GetInt("RATE_LIMIT_EMAIL_PER_MINUTE", 10), - RateLimitPushPerMinute: envutil.GetInt("RATE_LIMIT_PUSH_PER_MINUTE", 100), - - CORSOrigins: envutil.GetSlice("CORS_ORIGINS", []string{"http://localhost:3000", "http://localhost:5173"}), - } -} diff --git a/services/mana-notify/internal/db/db.go b/services/mana-notify/internal/db/db.go deleted file mode 100644 index 9aa7966b4..000000000 --- a/services/mana-notify/internal/db/db.go +++ /dev/null @@ -1,62 +0,0 @@ -package db - -import ( - "context" - "fmt" - "log/slog" - "time" - - "github.com/jackc/pgx/v5/pgxpool" -) - -type DB struct { - Pool *pgxpool.Pool -} - -func New(ctx context.Context, databaseURL string) (*DB, error) { - cfg, err := pgxpool.ParseConfig(databaseURL) - if err != nil { - return nil, fmt.Errorf("parse database url: %w", err) - } - - cfg.MaxConns = 20 - cfg.MinConns = 2 - cfg.MaxConnLifetime = 30 * time.Minute - cfg.MaxConnIdleTime = 5 * time.Minute - - pool, err := pgxpool.NewWithConfig(ctx, cfg) - if err != nil { - return nil, fmt.Errorf("connect to database: %w", err) - } - - if err := pool.Ping(ctx); err != nil { - pool.Close() - return nil, fmt.Errorf("ping database: %w", err) - } - - slog.Info("database connected", "url", redactURL(databaseURL)) - - db := &DB{Pool: pool} - if err := db.migrate(ctx); err != nil { - pool.Close() - return nil, fmt.Errorf("run migrations: %w", err) - } - - return db, nil -} - -func (d *DB) Close() { - d.Pool.Close() -} - -func (d *DB) HealthCheck(ctx context.Context) error { - return d.Pool.Ping(ctx) -} - -func redactURL(url string) string { - // Hide password from logs - if idx := len(url); idx > 30 { - return url[:30] + "..." - } - return url -} diff --git a/services/mana-notify/internal/db/migrations.go b/services/mana-notify/internal/db/migrations.go deleted file mode 100644 index a1666ce77..000000000 --- a/services/mana-notify/internal/db/migrations.go +++ /dev/null @@ -1,136 +0,0 @@ -package db - -import ( - "context" - "log/slog" -) - -func (d *DB) migrate(ctx context.Context) error { - slog.Info("running database migrations") - - migrations := []string{ - `CREATE SCHEMA IF NOT EXISTS notify`, - - // Enum types (idempotent with DO blocks) - `DO $$ BEGIN - CREATE TYPE notify.channel_type AS ENUM ('email', 'push', 'webhook'); - EXCEPTION WHEN duplicate_object THEN NULL; - END $$`, - - `DO $$ BEGIN - CREATE TYPE notify.notification_status AS ENUM ('pending', 'processing', 'delivered', 'failed', 'cancelled'); - EXCEPTION WHEN duplicate_object THEN NULL; - END $$`, - - `DO $$ BEGIN - CREATE TYPE notify.priority_type AS ENUM ('low', 'normal', 'high', 'critical'); - EXCEPTION WHEN duplicate_object THEN NULL; - END $$`, - - // Notifications table - `CREATE TABLE IF NOT EXISTS notify.notifications ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - user_id TEXT, - app_id VARCHAR(50) NOT NULL, - channel notify.channel_type NOT NULL, - template_id VARCHAR(100), - subject VARCHAR(500), - body TEXT, - data JSONB, - status notify.notification_status NOT NULL DEFAULT 'pending', - priority notify.priority_type NOT NULL DEFAULT 'normal', - scheduled_for TIMESTAMPTZ, - recipient VARCHAR(500), - external_id VARCHAR(255), - attempts INTEGER NOT NULL DEFAULT 0, - delivered_at TIMESTAMPTZ, - error_message TEXT, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() - )`, - - `CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notify.notifications (user_id)`, - `CREATE INDEX IF NOT EXISTS idx_notifications_app_id ON notify.notifications (app_id)`, - `CREATE INDEX IF NOT EXISTS idx_notifications_status ON notify.notifications (status)`, - `CREATE INDEX IF NOT EXISTS idx_notifications_scheduled_for ON notify.notifications (scheduled_for)`, - `CREATE INDEX IF NOT EXISTS idx_notifications_external_id ON notify.notifications (external_id)`, - - // Templates table - `CREATE TABLE IF NOT EXISTS notify.templates ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - slug VARCHAR(100) NOT NULL, - app_id VARCHAR(50), - channel notify.channel_type NOT NULL, - subject VARCHAR(500), - body_template TEXT NOT NULL, - locale VARCHAR(10) NOT NULL DEFAULT 'de-DE', - is_active BOOLEAN NOT NULL DEFAULT true, - is_system BOOLEAN NOT NULL DEFAULT false, - variables JSONB, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - UNIQUE(slug, locale) - )`, - - `CREATE INDEX IF NOT EXISTS idx_templates_app_id ON notify.templates (app_id)`, - `CREATE INDEX IF NOT EXISTS idx_templates_channel ON notify.templates (channel)`, - - // Devices table - `CREATE TABLE IF NOT EXISTS notify.devices ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - user_id TEXT NOT NULL, - push_token TEXT NOT NULL UNIQUE, - token_type VARCHAR(20) NOT NULL DEFAULT 'expo', - platform VARCHAR(20), - device_name VARCHAR(100), - app_id VARCHAR(50), - is_active BOOLEAN NOT NULL DEFAULT true, - last_seen_at TIMESTAMPTZ, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() - )`, - - `CREATE INDEX IF NOT EXISTS idx_devices_user_id ON notify.devices (user_id)`, - - // Preferences table - `CREATE TABLE IF NOT EXISTS notify.preferences ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - user_id TEXT NOT NULL UNIQUE, - email_enabled BOOLEAN NOT NULL DEFAULT false, - push_enabled BOOLEAN NOT NULL DEFAULT true, - quiet_hours_enabled BOOLEAN NOT NULL DEFAULT false, - quiet_hours_start VARCHAR(5), - quiet_hours_end VARCHAR(5), - timezone VARCHAR(50) NOT NULL DEFAULT 'Europe/Berlin', - category_preferences JSONB, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() - )`, - - // Delivery logs table - `CREATE TABLE IF NOT EXISTS notify.delivery_logs ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - notification_id UUID NOT NULL REFERENCES notify.notifications(id) ON DELETE CASCADE, - attempt_number INTEGER NOT NULL, - channel notify.channel_type NOT NULL, - success BOOLEAN NOT NULL, - status_code INTEGER, - error_message TEXT, - provider_id VARCHAR(255), - duration_ms INTEGER, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() - )`, - - `CREATE INDEX IF NOT EXISTS idx_delivery_logs_notification_id ON notify.delivery_logs (notification_id)`, - `CREATE INDEX IF NOT EXISTS idx_delivery_logs_success ON notify.delivery_logs (success)`, - } - - for _, sql := range migrations { - if _, err := d.Pool.Exec(ctx, sql); err != nil { - return err - } - } - - slog.Info("migrations completed") - return nil -} diff --git a/services/mana-notify/internal/db/models.go b/services/mana-notify/internal/db/models.go deleted file mode 100644 index c1bab62af..000000000 --- a/services/mana-notify/internal/db/models.go +++ /dev/null @@ -1,80 +0,0 @@ -package db - -import "time" - -type Notification struct { - ID string `json:"id"` - UserID *string `json:"userId,omitempty"` - AppID string `json:"appId"` - Channel string `json:"channel"` - TemplateID *string `json:"templateId,omitempty"` - Subject *string `json:"subject,omitempty"` - Body *string `json:"body,omitempty"` - Data []byte `json:"data,omitempty"` // JSONB - Status string `json:"status"` - Priority string `json:"priority"` - ScheduledFor *time.Time `json:"scheduledFor,omitempty"` - Recipient *string `json:"recipient,omitempty"` - ExternalID *string `json:"externalId,omitempty"` - Attempts int `json:"attempts"` - DeliveredAt *time.Time `json:"deliveredAt,omitempty"` - ErrorMessage *string `json:"errorMessage,omitempty"` - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` -} - -type Template struct { - ID string `json:"id"` - Slug string `json:"slug"` - AppID *string `json:"appId,omitempty"` - Channel string `json:"channel"` - Subject *string `json:"subject,omitempty"` - BodyTemplate string `json:"bodyTemplate"` - Locale string `json:"locale"` - IsActive bool `json:"isActive"` - IsSystem bool `json:"isSystem"` - Variables []byte `json:"variables,omitempty"` // JSONB - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` -} - -type Device struct { - ID string `json:"id"` - UserID string `json:"userId"` - PushToken string `json:"pushToken"` - TokenType string `json:"tokenType"` - Platform *string `json:"platform,omitempty"` - DeviceName *string `json:"deviceName,omitempty"` - AppID *string `json:"appId,omitempty"` - IsActive bool `json:"isActive"` - LastSeenAt *time.Time `json:"lastSeenAt,omitempty"` - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` -} - -type Preference struct { - ID string `json:"id"` - UserID string `json:"userId"` - EmailEnabled bool `json:"emailEnabled"` - PushEnabled bool `json:"pushEnabled"` - QuietHoursEnabled bool `json:"quietHoursEnabled"` - QuietHoursStart *string `json:"quietHoursStart,omitempty"` - QuietHoursEnd *string `json:"quietHoursEnd,omitempty"` - Timezone string `json:"timezone"` - CategoryPreferences []byte `json:"categoryPreferences,omitempty"` // JSONB - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` -} - -type DeliveryLog struct { - ID string `json:"id"` - NotificationID string `json:"notificationId"` - AttemptNumber int `json:"attemptNumber"` - Channel string `json:"channel"` - Success bool `json:"success"` - StatusCode *int `json:"statusCode,omitempty"` - ErrorMessage *string `json:"errorMessage,omitempty"` - ProviderID *string `json:"providerId,omitempty"` - DurationMs *int `json:"durationMs,omitempty"` - CreatedAt time.Time `json:"createdAt"` -} diff --git a/services/mana-notify/internal/handler/devices.go b/services/mana-notify/internal/handler/devices.go deleted file mode 100644 index 5a105f330..000000000 --- a/services/mana-notify/internal/handler/devices.go +++ /dev/null @@ -1,123 +0,0 @@ -package handler - -import ( - "encoding/json" - "net/http" - - "github.com/mana/shared-go/httputil" - - "github.com/mana/mana-notify/internal/auth" - "github.com/mana/mana-notify/internal/db" -) - -type DevicesHandler struct { - db *db.DB -} - -func NewDevicesHandler(database *db.DB) *DevicesHandler { - return &DevicesHandler{db: database} -} - -// Register handles POST /api/v1/devices/register -func (h *DevicesHandler) Register(w http.ResponseWriter, r *http.Request) { - user := auth.GetUser(r) - if user == nil { - httputil.WriteError(w, http.StatusUnauthorized, "unauthorized") - return - } - - var req struct { - PushToken string `json:"pushToken"` - TokenType string `json:"tokenType,omitempty"` - Platform string `json:"platform,omitempty"` - DeviceName string `json:"deviceName,omitempty"` - AppID string `json:"appId,omitempty"` - } - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - if req.PushToken == "" { - httputil.WriteError(w, http.StatusBadRequest, "pushToken is required") - return - } - - tokenType := req.TokenType - if tokenType == "" { - tokenType = "expo" - } - - // Upsert: transfer ownership if token exists for different user - var id string - err := h.db.Pool.QueryRow(r.Context(), - `INSERT INTO notify.devices (user_id, push_token, token_type, platform, device_name, app_id) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (push_token) DO UPDATE SET - user_id = EXCLUDED.user_id, - is_active = true, - last_seen_at = NOW(), - updated_at = NOW() - RETURNING id`, - user.UserID, req.PushToken, tokenType, nilIfEmpty(req.Platform), nilIfEmpty(req.DeviceName), nilIfEmpty(req.AppID), - ).Scan(&id) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to register device") - return - } - - httputil.WriteJSON(w, http.StatusCreated, map[string]any{"device": map[string]any{"id": id}}) -} - -// List handles GET /api/v1/devices -func (h *DevicesHandler) List(w http.ResponseWriter, r *http.Request) { - user := auth.GetUser(r) - if user == nil { - httputil.WriteError(w, http.StatusUnauthorized, "unauthorized") - return - } - - rows, err := h.db.Pool.Query(r.Context(), - `SELECT id, user_id, push_token, token_type, platform, device_name, app_id, is_active, last_seen_at, created_at, updated_at - FROM notify.devices WHERE user_id = $1 AND is_active = true ORDER BY created_at DESC`, user.UserID) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to list devices") - return - } - defer rows.Close() - - var devices []db.Device - for rows.Next() { - var d db.Device - if err := rows.Scan(&d.ID, &d.UserID, &d.PushToken, &d.TokenType, &d.Platform, - &d.DeviceName, &d.AppID, &d.IsActive, &d.LastSeenAt, &d.CreatedAt, &d.UpdatedAt); err != nil { - continue - } - devices = append(devices, d) - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"devices": devices}) -} - -// Delete handles DELETE /api/v1/devices/{id} -func (h *DevicesHandler) Delete(w http.ResponseWriter, r *http.Request) { - user := auth.GetUser(r) - if user == nil { - httputil.WriteError(w, http.StatusUnauthorized, "unauthorized") - return - } - - id := r.PathValue("id") - result, err := h.db.Pool.Exec(r.Context(), - `UPDATE notify.devices SET is_active = false, updated_at = NOW() WHERE id = $1 AND user_id = $2`, id, user.UserID) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to delete device") - return - } - if result.RowsAffected() == 0 { - httputil.WriteError(w, http.StatusNotFound, "device not found") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"deleted": true}) -} diff --git a/services/mana-notify/internal/handler/health.go b/services/mana-notify/internal/handler/health.go deleted file mode 100644 index fae905d09..000000000 --- a/services/mana-notify/internal/handler/health.go +++ /dev/null @@ -1,40 +0,0 @@ -package handler - -import ( - "net/http" - - "github.com/mana/shared-go/httputil" - "time" - - "github.com/mana/mana-notify/internal/db" -) - -type HealthHandler struct { - db *db.DB - startTime time.Time -} - -func NewHealthHandler(database *db.DB) *HealthHandler { - return &HealthHandler{db: database, startTime: time.Now()} -} - -func (h *HealthHandler) Health(w http.ResponseWriter, r *http.Request) { - dbOK := h.db.HealthCheck(r.Context()) == nil - - status := "healthy" - if !dbOK { - status = "unhealthy" - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{ - "status": status, - "version": "1.0.0", - "service": "mana-notify", - "uptime": time.Since(h.startTime).Seconds(), - "timestamp": time.Now().UTC().Format(time.RFC3339), - "services": map[string]any{ - "database": dbOK, - "redis": true, - }, - }) -} diff --git a/services/mana-notify/internal/handler/notifications.go b/services/mana-notify/internal/handler/notifications.go deleted file mode 100644 index 514a8657f..000000000 --- a/services/mana-notify/internal/handler/notifications.go +++ /dev/null @@ -1,492 +0,0 @@ -package handler - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "net/http" - - "github.com/mana/shared-go/httputil" - "time" - - "github.com/mana/mana-notify/internal/db" - "github.com/mana/mana-notify/internal/queue" - tmpl "github.com/mana/mana-notify/internal/template" -) - -type NotificationsHandler struct { - db *db.DB - pool *queue.WorkerPool - engine *tmpl.Engine -} - -func NewNotificationsHandler(database *db.DB, pool *queue.WorkerPool, engine *tmpl.Engine) *NotificationsHandler { - return &NotificationsHandler{db: database, pool: pool, engine: engine} -} - -type SendRequest struct { - Channel string `json:"channel"` - AppID string `json:"appId"` - UserID string `json:"userId,omitempty"` - Recipient string `json:"recipient,omitempty"` - Recipients []string `json:"recipients,omitempty"` - Template string `json:"template,omitempty"` - Subject string `json:"subject,omitempty"` - Body string `json:"body,omitempty"` - Data map[string]any `json:"data,omitempty"` - Priority string `json:"priority,omitempty"` - ExternalID string `json:"externalId,omitempty"` - - EmailOptions *EmailOptions `json:"emailOptions,omitempty"` - PushOptions *PushOptions `json:"pushOptions,omitempty"` - WebhookOptions *WebhookOptions `json:"webhookOptions,omitempty"` -} - -type EmailOptions struct { - From string `json:"from,omitempty"` - ReplyTo string `json:"replyTo,omitempty"` -} - -type PushOptions struct { - Sound string `json:"sound,omitempty"` - Badge *int `json:"badge,omitempty"` - ChannelID string `json:"channelId,omitempty"` -} - -type WebhookOptions struct { - Method string `json:"method,omitempty"` - Headers map[string]string `json:"headers,omitempty"` - Timeout int `json:"timeout,omitempty"` -} - -type ScheduleRequest struct { - SendRequest - ScheduledFor string `json:"scheduledFor"` -} - -type BatchRequest struct { - Notifications []SendRequest `json:"notifications"` -} - -// Send handles POST /api/v1/notifications/send -func (h *NotificationsHandler) Send(w http.ResponseWriter, r *http.Request) { - var req SendRequest - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - if err := validateSendRequest(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, err.Error()) - return - } - - // Check idempotency - if req.ExternalID != "" { - var existingID string - err := h.db.Pool.QueryRow(r.Context(), - `SELECT id FROM notify.notifications WHERE external_id = $1`, req.ExternalID, - ).Scan(&existingID) - if err == nil { - httputil.WriteJSON(w, http.StatusOK, map[string]any{ - "notification": map[string]any{"id": existingID, "status": "existing"}, - "deduplicated": true, - }) - return - } - } - - // Check user preferences - if req.UserID != "" { - blocked, reason := h.checkPreferences(r.Context(), req.UserID, req.Channel) - if blocked { - httputil.WriteJSON(w, http.StatusOK, map[string]any{ - "notification": map[string]any{"status": "cancelled", "reason": reason}, - }) - return - } - } - - // Render template - subject := req.Subject - body := req.Body - if req.Template != "" { - rendered, err := h.engine.RenderBySlug(r.Context(), req.Template, req.Data, "") - if err != nil { - slog.Warn("template render failed", "template", req.Template, "error", err) - } else { - if rendered.Subject != "" { - subject = rendered.Subject - } - if rendered.Body != "" { - body = rendered.Body - } - } - } - - priority := req.Priority - if priority == "" { - priority = "normal" - } - - // Create notification record - var notificationID string - err := h.db.Pool.QueryRow(r.Context(), - `INSERT INTO notify.notifications (user_id, app_id, channel, template_id, subject, body, data, priority, recipient, external_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - RETURNING id`, - nilIfEmpty(req.UserID), req.AppID, req.Channel, nilIfEmpty(req.Template), - nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(req.Data), - priority, nilIfEmpty(req.Recipient), nilIfEmpty(req.ExternalID), - ).Scan(¬ificationID) - if err != nil { - slog.Error("create notification failed", "error", err) - httputil.WriteError(w, http.StatusInternalServerError, "failed to create notification") - return - } - - // Build and enqueue job - job := queue.Job{ - NotificationID: notificationID, - Channel: req.Channel, - AppID: req.AppID, - Recipient: req.Recipient, - Subject: subject, - Body: body, - Data: req.Data, - } - - if req.EmailOptions != nil { - job.From = req.EmailOptions.From - job.ReplyTo = req.EmailOptions.ReplyTo - } - if req.PushOptions != nil { - job.Sound = req.PushOptions.Sound - job.Badge = req.PushOptions.Badge - } - if req.WebhookOptions != nil { - job.WebhookMethod = req.WebhookOptions.Method - job.WebhookHeaders = req.WebhookOptions.Headers - job.WebhookTimeout = req.WebhookOptions.Timeout - } - - h.pool.Enqueue(job) - - httputil.WriteJSON(w, http.StatusAccepted, map[string]any{ - "notification": map[string]any{ - "id": notificationID, - "status": "pending", - }, - }) -} - -// Schedule handles POST /api/v1/notifications/schedule -func (h *NotificationsHandler) Schedule(w http.ResponseWriter, r *http.Request) { - var req ScheduleRequest - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - scheduledFor, err := time.Parse(time.RFC3339, req.ScheduledFor) - if err != nil { - httputil.WriteError(w, http.StatusBadRequest, "scheduledFor must be a valid RFC3339 timestamp") - return - } - if scheduledFor.Before(time.Now()) { - httputil.WriteError(w, http.StatusBadRequest, "scheduledFor must be in the future") - return - } - - if err := validateSendRequest(&req.SendRequest); err != nil { - httputil.WriteError(w, http.StatusBadRequest, err.Error()) - return - } - - // Render template - subject := req.Subject - body := req.Body - if req.Template != "" { - rendered, err := h.engine.RenderBySlug(r.Context(), req.Template, req.Data, "") - if err == nil { - if rendered.Subject != "" { - subject = rendered.Subject - } - if rendered.Body != "" { - body = rendered.Body - } - } - } - - priority := req.Priority - if priority == "" { - priority = "normal" - } - - var notificationID string - err = h.db.Pool.QueryRow(r.Context(), - `INSERT INTO notify.notifications (user_id, app_id, channel, template_id, subject, body, data, priority, recipient, external_id, scheduled_for) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - RETURNING id`, - nilIfEmpty(req.UserID), req.AppID, req.Channel, nilIfEmpty(req.Template), - nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(req.Data), - priority, nilIfEmpty(req.Recipient), nilIfEmpty(req.ExternalID), scheduledFor, - ).Scan(¬ificationID) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to create notification") - return - } - - job := queue.Job{ - NotificationID: notificationID, - Channel: req.Channel, - AppID: req.AppID, - Recipient: req.Recipient, - Subject: subject, - Body: body, - Data: req.Data, - ScheduleAt: &scheduledFor, - } - h.pool.Enqueue(job) - - httputil.WriteJSON(w, http.StatusAccepted, map[string]any{ - "notification": map[string]any{ - "id": notificationID, - "status": "pending", - "scheduledFor": scheduledFor.Format(time.RFC3339), - }, - }) -} - -// Batch handles POST /api/v1/notifications/batch -func (h *NotificationsHandler) Batch(w http.ResponseWriter, r *http.Request) { - var req BatchRequest - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 5<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - if len(req.Notifications) == 0 { - httputil.WriteError(w, http.StatusBadRequest, "notifications array is required") - return - } - if len(req.Notifications) > 100 { - httputil.WriteError(w, http.StatusBadRequest, "maximum 100 notifications per batch") - return - } - - type batchResult struct { - ID string `json:"id,omitempty"` - Status string `json:"status"` - Error string `json:"error,omitempty"` - } - - results := make([]batchResult, len(req.Notifications)) - succeeded := 0 - failed := 0 - - for i, n := range req.Notifications { - if err := validateSendRequest(&n); err != nil { - results[i] = batchResult{Status: "failed", Error: err.Error()} - failed++ - continue - } - - subject := n.Subject - body := n.Body - if n.Template != "" { - rendered, err := h.engine.RenderBySlug(r.Context(), n.Template, n.Data, "") - if err == nil { - if rendered.Subject != "" { - subject = rendered.Subject - } - if rendered.Body != "" { - body = rendered.Body - } - } - } - - priority := n.Priority - if priority == "" { - priority = "normal" - } - - var notificationID string - err := h.db.Pool.QueryRow(r.Context(), - `INSERT INTO notify.notifications (user_id, app_id, channel, subject, body, data, priority, recipient) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - RETURNING id`, - nilIfEmpty(n.UserID), n.AppID, n.Channel, - nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(n.Data), priority, nilIfEmpty(n.Recipient), - ).Scan(¬ificationID) - if err != nil { - results[i] = batchResult{Status: "failed", Error: "database error"} - failed++ - continue - } - - h.pool.Enqueue(queue.Job{ - NotificationID: notificationID, - Channel: n.Channel, - AppID: n.AppID, - Recipient: n.Recipient, - Subject: subject, - Body: body, - Data: n.Data, - }) - - results[i] = batchResult{ID: notificationID, Status: "pending"} - succeeded++ - } - - httputil.WriteJSON(w, http.StatusAccepted, map[string]any{ - "results": results, - "succeeded": succeeded, - "failed": failed, - }) -} - -// GetNotification handles GET /api/v1/notifications/{id} -func (h *NotificationsHandler) GetNotification(w http.ResponseWriter, r *http.Request) { - id := r.PathValue("id") - if id == "" { - httputil.WriteError(w, http.StatusBadRequest, "notification id required") - return - } - - var n db.Notification - err := h.db.Pool.QueryRow(r.Context(), - `SELECT id, user_id, app_id, channel, template_id, subject, body, status, priority, scheduled_for, recipient, external_id, attempts, delivered_at, error_message, created_at, updated_at - FROM notify.notifications WHERE id = $1`, id, - ).Scan(&n.ID, &n.UserID, &n.AppID, &n.Channel, &n.TemplateID, &n.Subject, &n.Body, - &n.Status, &n.Priority, &n.ScheduledFor, &n.Recipient, &n.ExternalID, - &n.Attempts, &n.DeliveredAt, &n.ErrorMessage, &n.CreatedAt, &n.UpdatedAt) - - if err != nil { - httputil.WriteError(w, http.StatusNotFound, "notification not found") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"notification": n}) -} - -// CancelNotification handles DELETE /api/v1/notifications/{id} -func (h *NotificationsHandler) CancelNotification(w http.ResponseWriter, r *http.Request) { - id := r.PathValue("id") - if id == "" { - httputil.WriteError(w, http.StatusBadRequest, "notification id required") - return - } - - result, err := h.db.Pool.Exec(r.Context(), - `UPDATE notify.notifications SET status = 'cancelled', updated_at = NOW() WHERE id = $1 AND status = 'pending'`, id) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to cancel notification") - return - } - if result.RowsAffected() == 0 { - httputil.WriteError(w, http.StatusNotFound, "notification not found or already processed") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"cancelled": true}) -} - -func (h *NotificationsHandler) checkPreferences(ctx context.Context, userID, ch string) (bool, string) { - var emailEnabled, pushEnabled, quietEnabled bool - var quietStart, quietEnd, timezone *string - - err := h.db.Pool.QueryRow(ctx, - `SELECT email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone - FROM notify.preferences WHERE user_id = $1`, userID, - ).Scan(&emailEnabled, &pushEnabled, &quietEnabled, &quietStart, &quietEnd, &timezone) - - if err != nil { - return false, "" // No preferences = allow - } - - // Check channel preferences - if ch == "email" && !emailEnabled { - return true, "email notifications disabled by user" - } - if ch == "push" && !pushEnabled { - return true, "push notifications disabled by user" - } - - // Check quiet hours - if quietEnabled && quietStart != nil && quietEnd != nil { - tz := "Europe/Berlin" - if timezone != nil { - tz = *timezone - } - loc, err := time.LoadLocation(tz) - if err == nil { - now := time.Now().In(loc) - nowMinutes := now.Hour()*60 + now.Minute() - - startH, startM := parseTime(*quietStart) - endH, endM := parseTime(*quietEnd) - startMinutes := startH*60 + startM - endMinutes := endH*60 + endM - - var inQuiet bool - if startMinutes <= endMinutes { - inQuiet = nowMinutes >= startMinutes && nowMinutes < endMinutes - } else { - // Spans midnight (e.g. 22:00 to 08:00) - inQuiet = nowMinutes >= startMinutes || nowMinutes < endMinutes - } - - if inQuiet { - return true, "quiet hours active" - } - } - } - - return false, "" -} - -func validateSendRequest(req *SendRequest) error { - if req.Channel == "" { - return fmt.Errorf("channel is required") - } - validChannels := map[string]bool{"email": true, "push": true, "webhook": true} - if !validChannels[req.Channel] { - return fmt.Errorf("channel must be email, push, or webhook") - } - if req.AppID == "" { - return fmt.Errorf("appId is required") - } - if req.Recipient == "" && len(req.Recipients) == 0 && req.UserID == "" { - return fmt.Errorf("recipient, recipients, or userId is required") - } - if req.Template == "" && req.Body == "" { - return fmt.Errorf("template or body is required") - } - return nil -} - -func parseTime(s string) (int, int) { - var h, m int - fmt.Sscanf(s, "%d:%d", &h, &m) - return h, m -} - -func nilIfEmpty(s string) *string { - if s == "" { - return nil - } - return &s -} - -func jsonOrNil(data map[string]any) []byte { - if data == nil { - return nil - } - b, err := json.Marshal(data) - if err != nil { - return nil - } - return b -} diff --git a/services/mana-notify/internal/handler/notifications_test.go b/services/mana-notify/internal/handler/notifications_test.go deleted file mode 100644 index cc2e03ff4..000000000 --- a/services/mana-notify/internal/handler/notifications_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package handler - -import ( - "encoding/json" - "testing" -) - -func TestValidateSendRequest(t *testing.T) { - tests := []struct { - name string - req SendRequest - wantErr string - }{ - { - name: "missing channel", - req: SendRequest{AppID: "app1", Recipient: "user@test.com", Body: "hello"}, - wantErr: "channel is required", - }, - { - name: "invalid channel", - req: SendRequest{Channel: "sms", AppID: "app1", Recipient: "user@test.com", Body: "hello"}, - wantErr: "channel must be email, push, or webhook", - }, - { - name: "missing appId", - req: SendRequest{Channel: "email", Recipient: "user@test.com", Body: "hello"}, - wantErr: "appId is required", - }, - { - name: "missing recipient and userId", - req: SendRequest{Channel: "email", AppID: "app1", Body: "hello"}, - wantErr: "recipient, recipients, or userId is required", - }, - { - name: "missing template and body", - req: SendRequest{Channel: "email", AppID: "app1", Recipient: "user@test.com"}, - wantErr: "template or body is required", - }, - { - name: "valid with recipient and body", - req: SendRequest{Channel: "email", AppID: "app1", Recipient: "user@test.com", Body: "hello"}, - }, - { - name: "valid with userId and template", - req: SendRequest{Channel: "push", AppID: "app1", UserID: "u1", Template: "welcome"}, - }, - { - name: "valid with recipients", - req: SendRequest{Channel: "webhook", AppID: "app1", Recipients: []string{"url1"}, Body: "data"}, - }, - { - name: "valid email channel", - req: SendRequest{Channel: "email", AppID: "app1", Recipient: "a@b.com", Body: "hi"}, - }, - { - name: "valid push channel", - req: SendRequest{Channel: "push", AppID: "app1", Recipient: "token", Body: "hi"}, - }, - { - name: "valid webhook channel", - req: SendRequest{Channel: "webhook", AppID: "app1", Recipient: "https://hook.example.com", Body: "{}"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := validateSendRequest(&tt.req) - if tt.wantErr != "" { - if err == nil { - t.Fatalf("expected error %q, got nil", tt.wantErr) - } - if err.Error() != tt.wantErr { - t.Fatalf("expected error %q, got %q", tt.wantErr, err.Error()) - } - } else { - if err != nil { - t.Fatalf("expected no error, got %q", err.Error()) - } - } - }) - } -} - -func TestParseTime(t *testing.T) { - tests := []struct { - input string - wantH int - wantM int - }{ - {"22:00", 22, 0}, - {"08:30", 8, 30}, - {"0:00", 0, 0}, - {"23:59", 23, 59}, - {"invalid", 0, 0}, - {"", 0, 0}, - } - - for _, tt := range tests { - t.Run(tt.input, func(t *testing.T) { - h, m := parseTime(tt.input) - if h != tt.wantH || m != tt.wantM { - t.Fatalf("parseTime(%q) = (%d, %d), want (%d, %d)", tt.input, h, m, tt.wantH, tt.wantM) - } - }) - } -} - -func TestNilIfEmpty(t *testing.T) { - tests := []struct { - name string - input string - isNil bool - }{ - {"empty string returns nil", "", true}, - {"non-empty returns pointer", "hello", false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := nilIfEmpty(tt.input) - if tt.isNil { - if result != nil { - t.Fatal("expected nil, got non-nil") - } - } else { - if result == nil { - t.Fatal("expected non-nil, got nil") - } - if *result != tt.input { - t.Fatalf("expected %q, got %q", tt.input, *result) - } - } - }) - } -} - -func TestJsonOrNil(t *testing.T) { - tests := []struct { - name string - input map[string]any - isNil bool - verify func(t *testing.T, b []byte) - }{ - { - name: "nil map returns nil", - input: nil, - isNil: true, - }, - { - name: "empty map returns valid JSON", - input: map[string]any{}, - isNil: false, - verify: func(t *testing.T, b []byte) { - var m map[string]any - if err := json.Unmarshal(b, &m); err != nil { - t.Fatalf("invalid JSON: %v", err) - } - if len(m) != 0 { - t.Fatalf("expected empty map, got %v", m) - } - }, - }, - { - name: "map with data returns valid JSON", - input: map[string]any{"key": "value", "num": float64(42)}, - isNil: false, - verify: func(t *testing.T, b []byte) { - var m map[string]any - if err := json.Unmarshal(b, &m); err != nil { - t.Fatalf("invalid JSON: %v", err) - } - if m["key"] != "value" { - t.Fatalf("expected key=value, got %v", m["key"]) - } - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := jsonOrNil(tt.input) - if tt.isNil { - if result != nil { - t.Fatal("expected nil, got non-nil") - } - } else { - if result == nil { - t.Fatal("expected non-nil, got nil") - } - if tt.verify != nil { - tt.verify(t, result) - } - } - }) - } -} diff --git a/services/mana-notify/internal/handler/preferences.go b/services/mana-notify/internal/handler/preferences.go deleted file mode 100644 index 004a2f3f4..000000000 --- a/services/mana-notify/internal/handler/preferences.go +++ /dev/null @@ -1,97 +0,0 @@ -package handler - -import ( - "encoding/json" - "net/http" - - "github.com/mana/shared-go/httputil" - - "github.com/mana/mana-notify/internal/auth" - "github.com/mana/mana-notify/internal/db" -) - -type PreferencesHandler struct { - db *db.DB -} - -func NewPreferencesHandler(database *db.DB) *PreferencesHandler { - return &PreferencesHandler{db: database} -} - -// Get handles GET /api/v1/preferences -func (h *PreferencesHandler) Get(w http.ResponseWriter, r *http.Request) { - user := auth.GetUser(r) - if user == nil { - httputil.WriteError(w, http.StatusUnauthorized, "unauthorized") - return - } - - var p db.Preference - err := h.db.Pool.QueryRow(r.Context(), - `SELECT id, user_id, email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone, category_preferences, created_at, updated_at - FROM notify.preferences WHERE user_id = $1`, user.UserID, - ).Scan(&p.ID, &p.UserID, &p.EmailEnabled, &p.PushEnabled, &p.QuietHoursEnabled, - &p.QuietHoursStart, &p.QuietHoursEnd, &p.Timezone, &p.CategoryPreferences, &p.CreatedAt, &p.UpdatedAt) - - if err != nil { - // Return defaults - httputil.WriteJSON(w, http.StatusOK, map[string]any{ - "preferences": map[string]any{ - "emailEnabled": false, - "pushEnabled": true, - "quietHoursEnabled": false, - "timezone": "Europe/Berlin", - }, - }) - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"preferences": p}) -} - -// Update handles PUT /api/v1/preferences -func (h *PreferencesHandler) Update(w http.ResponseWriter, r *http.Request) { - user := auth.GetUser(r) - if user == nil { - httputil.WriteError(w, http.StatusUnauthorized, "unauthorized") - return - } - - var req struct { - EmailEnabled *bool `json:"emailEnabled,omitempty"` - PushEnabled *bool `json:"pushEnabled,omitempty"` - QuietHoursEnabled *bool `json:"quietHoursEnabled,omitempty"` - QuietHoursStart *string `json:"quietHoursStart,omitempty"` - QuietHoursEnd *string `json:"quietHoursEnd,omitempty"` - Timezone *string `json:"timezone,omitempty"` - CategoryPreferences map[string]any `json:"categoryPreferences,omitempty"` - } - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - catJSON, _ := json.Marshal(req.CategoryPreferences) - - _, err := h.db.Pool.Exec(r.Context(), - `INSERT INTO notify.preferences (user_id, email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone, category_preferences) - VALUES ($1, COALESCE($2, false), COALESCE($3, true), COALESCE($4, false), $5, $6, COALESCE($7, 'Europe/Berlin'), $8) - ON CONFLICT (user_id) DO UPDATE SET - email_enabled = COALESCE($2, notify.preferences.email_enabled), - push_enabled = COALESCE($3, notify.preferences.push_enabled), - quiet_hours_enabled = COALESCE($4, notify.preferences.quiet_hours_enabled), - quiet_hours_start = COALESCE($5, notify.preferences.quiet_hours_start), - quiet_hours_end = COALESCE($6, notify.preferences.quiet_hours_end), - timezone = COALESCE($7, notify.preferences.timezone), - category_preferences = COALESCE($8, notify.preferences.category_preferences), - updated_at = NOW()`, - user.UserID, req.EmailEnabled, req.PushEnabled, req.QuietHoursEnabled, - req.QuietHoursStart, req.QuietHoursEnd, req.Timezone, catJSON, - ) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to update preferences") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"updated": true}) -} diff --git a/services/mana-notify/internal/handler/templates.go b/services/mana-notify/internal/handler/templates.go deleted file mode 100644 index 534af95ce..000000000 --- a/services/mana-notify/internal/handler/templates.go +++ /dev/null @@ -1,215 +0,0 @@ -package handler - -import ( - "encoding/json" - "net/http" - - "github.com/mana/shared-go/httputil" - - "github.com/mana/mana-notify/internal/db" - tmpl "github.com/mana/mana-notify/internal/template" -) - -type TemplatesHandler struct { - db *db.DB - engine *tmpl.Engine -} - -func NewTemplatesHandler(database *db.DB, engine *tmpl.Engine) *TemplatesHandler { - return &TemplatesHandler{db: database, engine: engine} -} - -// List handles GET /api/v1/templates -func (h *TemplatesHandler) List(w http.ResponseWriter, r *http.Request) { - rows, err := h.db.Pool.Query(r.Context(), - `SELECT id, slug, app_id, channel, subject, body_template, locale, is_active, is_system, variables, created_at, updated_at - FROM notify.templates ORDER BY slug`) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to list templates") - return - } - defer rows.Close() - - var templates []db.Template - for rows.Next() { - var t db.Template - if err := rows.Scan(&t.ID, &t.Slug, &t.AppID, &t.Channel, &t.Subject, &t.BodyTemplate, - &t.Locale, &t.IsActive, &t.IsSystem, &t.Variables, &t.CreatedAt, &t.UpdatedAt); err != nil { - continue - } - templates = append(templates, t) - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"templates": templates}) -} - -// Get handles GET /api/v1/templates/{slug} -func (h *TemplatesHandler) Get(w http.ResponseWriter, r *http.Request) { - slug := r.PathValue("slug") - locale := r.URL.Query().Get("locale") - if locale == "" { - locale = "de-DE" - } - - var t db.Template - err := h.db.Pool.QueryRow(r.Context(), - `SELECT id, slug, app_id, channel, subject, body_template, locale, is_active, is_system, variables, created_at, updated_at - FROM notify.templates WHERE slug = $1 AND locale = $2`, slug, locale, - ).Scan(&t.ID, &t.Slug, &t.AppID, &t.Channel, &t.Subject, &t.BodyTemplate, - &t.Locale, &t.IsActive, &t.IsSystem, &t.Variables, &t.CreatedAt, &t.UpdatedAt) - if err != nil { - httputil.WriteError(w, http.StatusNotFound, "template not found") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"template": t}) -} - -// Create handles POST /api/v1/templates -func (h *TemplatesHandler) Create(w http.ResponseWriter, r *http.Request) { - var req struct { - Slug string `json:"slug"` - AppID string `json:"appId,omitempty"` - Channel string `json:"channel"` - Subject string `json:"subject,omitempty"` - BodyTemplate string `json:"bodyTemplate"` - Locale string `json:"locale,omitempty"` - Variables any `json:"variables,omitempty"` - } - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - if req.Slug == "" || req.Channel == "" || req.BodyTemplate == "" { - httputil.WriteError(w, http.StatusBadRequest, "slug, channel, and bodyTemplate are required") - return - } - if req.Locale == "" { - req.Locale = "de-DE" - } - - varsJSON, _ := json.Marshal(req.Variables) - - var id string - err := h.db.Pool.QueryRow(r.Context(), - `INSERT INTO notify.templates (slug, app_id, channel, subject, body_template, locale, variables) - VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id`, - req.Slug, nilIfEmpty(req.AppID), req.Channel, nilIfEmpty(req.Subject), req.BodyTemplate, req.Locale, varsJSON, - ).Scan(&id) - if err != nil { - httputil.WriteError(w, http.StatusConflict, "template already exists for this slug+locale") - return - } - - httputil.WriteJSON(w, http.StatusCreated, map[string]any{"id": id}) -} - -// Update handles PUT /api/v1/templates/{slug} -func (h *TemplatesHandler) Update(w http.ResponseWriter, r *http.Request) { - slug := r.PathValue("slug") - locale := r.URL.Query().Get("locale") - if locale == "" { - locale = "de-DE" - } - - var req struct { - Subject string `json:"subject,omitempty"` - BodyTemplate string `json:"bodyTemplate,omitempty"` - IsActive *bool `json:"isActive,omitempty"` - Variables any `json:"variables,omitempty"` - } - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - result, err := h.db.Pool.Exec(r.Context(), - `UPDATE notify.templates SET - subject = COALESCE($1, subject), - body_template = COALESCE($2, body_template), - is_active = COALESCE($3, is_active), - updated_at = NOW() - WHERE slug = $4 AND locale = $5 AND is_system = false`, - nilIfEmpty(req.Subject), nilIfEmpty(req.BodyTemplate), req.IsActive, slug, locale, - ) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to update template") - return - } - if result.RowsAffected() == 0 { - httputil.WriteError(w, http.StatusNotFound, "template not found or is a system template") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"updated": true}) -} - -// Delete handles DELETE /api/v1/templates/{slug} -func (h *TemplatesHandler) Delete(w http.ResponseWriter, r *http.Request) { - slug := r.PathValue("slug") - - result, err := h.db.Pool.Exec(r.Context(), - `DELETE FROM notify.templates WHERE slug = $1 AND is_system = false`, slug) - if err != nil { - httputil.WriteError(w, http.StatusInternalServerError, "failed to delete template") - return - } - if result.RowsAffected() == 0 { - httputil.WriteError(w, http.StatusNotFound, "template not found or is a system template") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"deleted": true}) -} - -// Preview handles POST /api/v1/templates/{slug}/preview -func (h *TemplatesHandler) Preview(w http.ResponseWriter, r *http.Request) { - slug := r.PathValue("slug") - var req struct { - Data map[string]any `json:"data"` - } - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - rendered, err := h.engine.RenderBySlug(r.Context(), slug, req.Data, "") - if err != nil { - httputil.WriteError(w, http.StatusNotFound, "template not found") - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"subject": rendered.Subject, "body": rendered.Body}) -} - -// PreviewCustom handles POST /api/v1/templates/preview -func (h *TemplatesHandler) PreviewCustom(w http.ResponseWriter, r *http.Request) { - var req struct { - Subject string `json:"subject,omitempty"` - BodyTemplate string `json:"bodyTemplate"` - Data map[string]any `json:"data"` - } - if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid request body") - return - } - - subject := "" - if req.Subject != "" { - s, err := tmpl.RenderDirect(req.Subject, req.Data) - if err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid subject template: "+err.Error()) - return - } - subject = s - } - - body, err := tmpl.RenderDirect(req.BodyTemplate, req.Data) - if err != nil { - httputil.WriteError(w, http.StatusBadRequest, "invalid body template: "+err.Error()) - return - } - - httputil.WriteJSON(w, http.StatusOK, map[string]any{"subject": subject, "body": body}) -} diff --git a/services/mana-notify/internal/metrics/metrics.go b/services/mana-notify/internal/metrics/metrics.go deleted file mode 100644 index 3ed135c10..000000000 --- a/services/mana-notify/internal/metrics/metrics.go +++ /dev/null @@ -1,64 +0,0 @@ -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -type Metrics struct { - NotificationsSent *prometheus.CounterVec - NotificationsFailed *prometheus.CounterVec - EmailsSent *prometheus.CounterVec - PushSent *prometheus.CounterVec - WebhooksSent *prometheus.CounterVec - NotificationLatency *prometheus.HistogramVec - EmailLatency prometheus.Histogram - PushLatency prometheus.Histogram -} - -func New() *Metrics { - return &Metrics{ - NotificationsSent: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "mana_notify_notifications_sent_total", - Help: "Total notifications sent", - }, []string{"channel", "app_id"}), - - NotificationsFailed: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "mana_notify_notifications_failed_total", - Help: "Total notifications failed", - }, []string{"channel", "app_id", "error_type"}), - - EmailsSent: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "mana_notify_emails_sent_total", - Help: "Total emails sent", - }, []string{"template", "status"}), - - PushSent: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "mana_notify_push_sent_total", - Help: "Total push notifications sent", - }, []string{"platform", "status"}), - - WebhooksSent: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "mana_notify_webhooks_sent_total", - Help: "Total webhooks sent", - }, []string{"status"}), - - NotificationLatency: promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "mana_notify_notification_latency_seconds", - Help: "Notification processing latency", - Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10}, - }, []string{"channel"}), - - EmailLatency: promauto.NewHistogram(prometheus.HistogramOpts{ - Name: "mana_notify_email_latency_seconds", - Help: "Email sending latency", - Buckets: []float64{0.1, 0.5, 1, 2, 5, 10}, - }), - - PushLatency: promauto.NewHistogram(prometheus.HistogramOpts{ - Name: "mana_notify_push_latency_seconds", - Help: "Push notification latency", - Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1}, - }), - } -} diff --git a/services/mana-notify/internal/queue/worker.go b/services/mana-notify/internal/queue/worker.go deleted file mode 100644 index b0802212d..000000000 --- a/services/mana-notify/internal/queue/worker.go +++ /dev/null @@ -1,296 +0,0 @@ -package queue - -import ( - "context" - "fmt" - "log/slog" - "math" - "time" - - "github.com/mana/mana-notify/internal/channel" - "github.com/mana/mana-notify/internal/db" - "github.com/mana/mana-notify/internal/metrics" -) - -// Job represents a notification delivery job. -type Job struct { - NotificationID string - Channel string - AppID string - Recipient string - Subject string - Body string - Data map[string]any - - // Channel-specific - From string - ReplyTo string - Tokens []string // Push: multiple tokens - Sound string // Push - Badge *int // Push - Platform string // Push - WebhookMethod string // Webhook - WebhookHeaders map[string]string // Webhook - WebhookTimeout int // Webhook - - // Retry - Attempt int - MaxRetries int - BackoffMs int - ScheduleAt *time.Time -} - -// WorkerConfig per channel. -type WorkerConfig struct { - Concurrency int - MaxRetries int - BackoffMs int -} - -var DefaultConfigs = map[string]WorkerConfig{ - "email": {Concurrency: 5, MaxRetries: 3, BackoffMs: 5000}, - "push": {Concurrency: 10, MaxRetries: 3, BackoffMs: 1000}, - "webhook": {Concurrency: 10, MaxRetries: 5, BackoffMs: 3000}, -} - -// WorkerPool manages goroutine-based workers for all channels. -type WorkerPool struct { - db *db.DB - email *channel.EmailService - push *channel.PushService - webhook *channel.WebhookService - metrics *metrics.Metrics - jobs chan Job - ctx context.Context - cancel context.CancelFunc -} - -func NewWorkerPool(database *db.DB, email *channel.EmailService, push *channel.PushService, webhook *channel.WebhookService, m *metrics.Metrics) *WorkerPool { - ctx, cancel := context.WithCancel(context.Background()) - return &WorkerPool{ - db: database, - email: email, - push: push, - webhook: webhook, - metrics: m, - jobs: make(chan Job, 1000), - ctx: ctx, - cancel: cancel, - } -} - -func (wp *WorkerPool) Start() { - // Start workers per channel type - for ch, cfg := range DefaultConfigs { - for i := 0; i < cfg.Concurrency; i++ { - go wp.worker(ch) - } - slog.Info("workers started", "channel", ch, "concurrency", cfg.Concurrency) - } -} - -func (wp *WorkerPool) Stop() { - wp.cancel() - close(wp.jobs) -} - -// Enqueue adds a job to the worker pool. -func (wp *WorkerPool) Enqueue(job Job) { - cfg := DefaultConfigs[job.Channel] - if job.MaxRetries == 0 { - job.MaxRetries = cfg.MaxRetries - } - if job.BackoffMs == 0 { - job.BackoffMs = cfg.BackoffMs - } - - // Handle scheduled delivery - if job.ScheduleAt != nil && job.ScheduleAt.After(time.Now()) { - delay := time.Until(*job.ScheduleAt) - go func() { - timer := time.NewTimer(delay) - defer timer.Stop() - select { - case <-timer.C: - wp.jobs <- job - case <-wp.ctx.Done(): - } - }() - return - } - - wp.jobs <- job -} - -func (wp *WorkerPool) worker(ch string) { - for { - select { - case job, ok := <-wp.jobs: - if !ok { - return - } - if job.Channel != ch { - // Put back for correct channel worker - wp.jobs <- job - continue - } - wp.processJob(job) - case <-wp.ctx.Done(): - return - } - } -} - -func (wp *WorkerPool) processJob(job Job) { - start := time.Now() - ctx := wp.ctx - - // Update status to processing - wp.updateStatus(ctx, job.NotificationID, "processing", job.Attempt+1, nil) - - var success bool - var providerID string - var statusCode *int - var errMsg string - - defer func() { - if r := recover(); r != nil { - slog.Error("job panic", "notification", job.NotificationID, "panic", r) - errMsg = "internal panic" - success = false - } - - duration := time.Since(start) - durationMs := int(duration.Milliseconds()) - - // Log delivery attempt - wp.logDelivery(ctx, job.NotificationID, job.Attempt+1, job.Channel, success, statusCode, errMsg, providerID, durationMs) - - if success { - wp.updateStatus(ctx, job.NotificationID, "delivered", job.Attempt+1, nil) - wp.metrics.NotificationsSent.WithLabelValues(job.Channel, job.AppID).Inc() - } else if job.Attempt+1 < job.MaxRetries { - // Retry with exponential backoff - job.Attempt++ - delay := time.Duration(float64(job.BackoffMs)*math.Pow(2, float64(job.Attempt-1))) * time.Millisecond - go func() { - timer := time.NewTimer(delay) - defer timer.Stop() - select { - case <-timer.C: - wp.jobs <- job - case <-wp.ctx.Done(): - } - }() - slog.Info("retrying job", "notification", job.NotificationID, "attempt", job.Attempt, "delay", delay) - } else { - errStr := errMsg - wp.updateStatus(ctx, job.NotificationID, "failed", job.Attempt+1, &errStr) - wp.metrics.NotificationsFailed.WithLabelValues(job.Channel, job.AppID, "max_retries").Inc() - } - - wp.metrics.NotificationLatency.WithLabelValues(job.Channel).Observe(duration.Seconds()) - }() - - switch job.Channel { - case "email": - result := wp.email.Send(&channel.EmailMessage{ - To: job.Recipient, - Subject: job.Subject, - HTML: job.Body, - From: job.From, - ReplyTo: job.ReplyTo, - }) - success = result.Success - providerID = result.MessageID - errMsg = result.Error - template := "custom" - status := "success" - if !success { - status = "failed" - } - wp.metrics.EmailsSent.WithLabelValues(template, status).Inc() - wp.metrics.EmailLatency.Observe(time.Since(start).Seconds()) - - case "push": - tokens := job.Tokens - if len(tokens) == 0 && job.Recipient != "" { - tokens = []string{job.Recipient} - } - results := wp.push.SendToTokens(ctx, tokens, job.Subject, job.Body, job.Data, job.Sound, job.Badge) - // Check if all succeeded - success = true - for _, r := range results { - if !r.Success { - success = false - errMsg = r.Error - } else { - providerID = r.TicketID - } - } - status := "success" - if !success { - status = "failed" - } - wp.metrics.PushSent.WithLabelValues(job.Platform, status).Inc() - wp.metrics.PushLatency.Observe(time.Since(start).Seconds()) - - case "webhook": - result := wp.webhook.Send(ctx, &channel.WebhookMessage{ - URL: job.Recipient, - Method: job.WebhookMethod, - Headers: job.WebhookHeaders, - Body: job.Data, - Timeout: job.WebhookTimeout, - }) - success = result.Success - sc := result.StatusCode - statusCode = &sc - errMsg = result.Error - status := "success" - if !success { - status = "failed" - } - wp.metrics.WebhooksSent.WithLabelValues(status).Inc() - } -} - -func (wp *WorkerPool) updateStatus(ctx context.Context, notificationID, status string, attempts int, errMsg *string) { - query := `UPDATE notify.notifications SET status = $1, attempts = $2, updated_at = NOW()` - args := []any{status, attempts} - - if status == "delivered" { - query += `, delivered_at = NOW()` - } - if errMsg != nil { - query += fmt.Sprintf(`, error_message = $%d`, len(args)+1) - args = append(args, *errMsg) - } - - query += fmt.Sprintf(` WHERE id = $%d`, len(args)+1) - args = append(args, notificationID) - - if _, err := wp.db.Pool.Exec(ctx, query, args...); err != nil { - slog.Error("update notification status failed", "id", notificationID, "error", err) - } -} - -func (wp *WorkerPool) logDelivery(ctx context.Context, notificationID string, attempt int, ch string, success bool, statusCode *int, errMsg, providerID string, durationMs int) { - var errMsgPtr, providerIDPtr *string - if errMsg != "" { - errMsgPtr = &errMsg - } - if providerID != "" { - providerIDPtr = &providerID - } - - _, err := wp.db.Pool.Exec(ctx, - `INSERT INTO notify.delivery_logs (notification_id, attempt_number, channel, success, status_code, error_message, provider_id, duration_ms) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, - notificationID, attempt, ch, success, statusCode, errMsgPtr, providerIDPtr, durationMs, - ) - if err != nil { - slog.Error("log delivery failed", "notification", notificationID, "error", err) - } -} - diff --git a/services/mana-notify/internal/template/engine.go b/services/mana-notify/internal/template/engine.go deleted file mode 100644 index a15a6a78e..000000000 --- a/services/mana-notify/internal/template/engine.go +++ /dev/null @@ -1,145 +0,0 @@ -package template - -import ( - "bytes" - "context" - "log/slog" - "text/template" - - "github.com/mana/mana-notify/internal/db" -) - -type Engine struct { - db *db.DB -} - -func NewEngine(database *db.DB) *Engine { - return &Engine{db: database} -} - -type Rendered struct { - Subject string - Body string -} - -// RenderBySlug looks up a template by slug and renders it with the given data. -func (e *Engine) RenderBySlug(ctx context.Context, slug string, data map[string]any, locale string) (*Rendered, error) { - if locale == "" { - locale = "de-DE" - } - - var subject, bodyTemplate *string - err := e.db.Pool.QueryRow(ctx, - `SELECT subject, body_template FROM notify.templates WHERE slug = $1 AND locale = $2 AND is_active = true`, - slug, locale, - ).Scan(&subject, &bodyTemplate) - - if err != nil { - // Try default locale fallback - err = e.db.Pool.QueryRow(ctx, - `SELECT subject, body_template FROM notify.templates WHERE slug = $1 AND is_active = true ORDER BY locale LIMIT 1`, - slug, - ).Scan(&subject, &bodyTemplate) - if err != nil { - return nil, err - } - } - - rendered := &Rendered{} - - if subject != nil { - s, err := renderTemplate(*subject, data) - if err != nil { - return nil, err - } - rendered.Subject = s - } - - if bodyTemplate != nil { - b, err := renderTemplate(*bodyTemplate, data) - if err != nil { - return nil, err - } - rendered.Body = b - } - - return rendered, nil -} - -// RenderDirect renders a template string with data. -func RenderDirect(tmplStr string, data map[string]any) (string, error) { - return renderTemplate(tmplStr, data) -} - -func renderTemplate(tmplStr string, data map[string]any) (string, error) { - tmpl, err := template.New("").Parse(tmplStr) - if err != nil { - return "", err - } - var buf bytes.Buffer - if err := tmpl.Execute(&buf, data); err != nil { - return "", err - } - return buf.String(), nil -} - -// SeedDefaults inserts default system templates if they don't exist. -func (e *Engine) SeedDefaults(ctx context.Context) { - defaults := []struct { - slug string - channel string - subject string - body string - vars string - }{ - { - slug: "auth-password-reset", - channel: "email", - subject: "Passwort zurücksetzen - Mana", - body: `

Passwort zurücksetzen

Hallo {{.userName}},

Klicke auf den folgenden Link, um dein Passwort zurückzusetzen:

Passwort zurücksetzen

Falls du diese Anfrage nicht gestellt hast, kannst du diese E-Mail ignorieren.

`, - vars: `{"resetUrl": "URL zum Zurücksetzen", "userName": "Name des Benutzers"}`, - }, - { - slug: "auth-verification", - channel: "email", - subject: "E-Mail bestätigen - Mana", - body: `

E-Mail bestätigen

Hallo {{.userName}},

Bitte bestätige deine E-Mail-Adresse:

E-Mail bestätigen

`, - vars: `{"verificationUrl": "Bestätigungs-URL", "userName": "Name des Benutzers"}`, - }, - { - slug: "auth-welcome", - channel: "email", - subject: "Willkommen bei Mana!", - body: `

Willkommen!

Hallo {{.userName}},

Willkommen bei Mana! Du kannst dich jetzt anmelden:

Anmelden

`, - vars: `{"userName": "Name des Benutzers", "loginUrl": "Login-URL"}`, - }, - { - slug: "calendar-reminder", - channel: "email", - subject: "Erinnerung: {{.eventTitle}}", - body: `

{{.eventTitle}}

Wann: {{.eventTime}}

{{if .eventLocation}}

Wo: {{.eventLocation}}

{{end}}

Termin anzeigen

`, - vars: `{"eventTitle": "Titel", "eventTime": "Zeit", "eventLocation": "Ort (optional)", "eventUrl": "Link"}`, - }, - { - slug: "task-reminder", - channel: "email", - subject: "Erinnerung: {{.taskTitle}}", - body: `

{{.taskTitle}}

{{if .dueDate}}

Fällig: {{.dueDate}}

{{end}}

Aufgabe anzeigen

`, - vars: `{"taskTitle": "Aufgabentitel", "dueDate": "Fälligkeitsdatum (optional)", "taskUrl": "Link zur Aufgabe"}`, - }, - } - - for _, d := range defaults { - _, err := e.db.Pool.Exec(ctx, - `INSERT INTO notify.templates (slug, channel, subject, body_template, locale, is_system, variables) - VALUES ($1, $2, $3, $4, 'de-DE', true, $5) - ON CONFLICT (slug, locale) DO NOTHING`, - d.slug, d.channel, d.subject, d.body, d.vars, - ) - if err != nil { - slog.Warn("seed template failed", "slug", d.slug, "error", err) - } - } - - slog.Info("default templates seeded") -} diff --git a/services/mana-notify/internal/template/engine_test.go b/services/mana-notify/internal/template/engine_test.go deleted file mode 100644 index bdaa34ea6..000000000 --- a/services/mana-notify/internal/template/engine_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package template - -import "testing" - -func TestRenderDirect(t *testing.T) { - tests := []struct { - name string - tmpl string - data map[string]any - want string - wantErr bool - }{ - { - name: "simple variable substitution", - tmpl: "Hello {{.name}}!", - data: map[string]any{"name": "Till"}, - want: "Hello Till!", - }, - { - name: "multiple variables", - tmpl: "{{.greeting}}, {{.name}}! Your code is {{.code}}.", - data: map[string]any{"greeting": "Hi", "name": "User", "code": "ABC123"}, - want: "Hi, User! Your code is ABC123.", - }, - { - name: "no variables", - tmpl: "Static text here", - data: map[string]any{}, - want: "Static text here", - }, - { - name: "nil data", - tmpl: "No data needed", - data: nil, - want: "No data needed", - }, - { - name: "conditional", - tmpl: "{{if .show}}visible{{else}}hidden{{end}}", - data: map[string]any{"show": true}, - want: "visible", - }, - { - name: "conditional false", - tmpl: "{{if .show}}visible{{else}}hidden{{end}}", - data: map[string]any{"show": false}, - want: "hidden", - }, - { - name: "invalid template syntax", - tmpl: "{{.name", - data: map[string]any{"name": "test"}, - wantErr: true, - }, - { - name: "HTML content preserved", - tmpl: "

{{.title}}

{{.body}}

", - data: map[string]any{"title": "Welcome", "body": "Hello world"}, - want: "

Welcome

Hello world

", - }, - { - name: "email template pattern", - tmpl: "Hallo {{.userName}}, klicke hier: {{.resetUrl}}", - data: map[string]any{"userName": "Max", "resetUrl": "https://mana.how/reset/abc"}, - want: "Hallo Max, klicke hier: https://mana.how/reset/abc", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := RenderDirect(tt.tmpl, tt.data) - if tt.wantErr { - if err == nil { - t.Fatal("expected error, got nil") - } - return - } - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if got != tt.want { - t.Fatalf("got %q, want %q", got, tt.want) - } - }) - } -} diff --git a/services/mana-notify/package.json b/services/mana-notify/package.json deleted file mode 100644 index b7767af9a..000000000 --- a/services/mana-notify/package.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "name": "@mana/mana-notify", - "version": "1.0.0", - "private": true, - "scripts": { - "dev": "go run ./cmd/server", - "build": "go build -o bin/mana-notify ./cmd/server", - "test": "go test ./..." - } -}