diff --git a/CLAUDE.md b/CLAUDE.md index 502fb35fd..9c2f0485c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -138,7 +138,8 @@ manacore-monorepo/ │ ├── mana-landing-builder/# Org landing page builder (Astro → Cloudflare Pages) │ ├── mana-media/ # Central media platform (CAS, thumbnails) │ ├── mana-api-gateway/ # API gateway with rate limiting -│ ├── mana-notify/ # Notification service (push, email, in-app) +│ ├── mana-notify/ # Notification service (NestJS, legacy) +│ ├── mana-notify-go/ # Notification service (Go, active) │ ├── mana-image-gen/ # Local AI image generation (FLUX) │ ├── mana-stt/ # Speech-to-text service │ ├── mana-tts/ # Text-to-speech service diff --git a/services/mana-notify-go/.gitignore b/services/mana-notify-go/.gitignore new file mode 100644 index 000000000..e660fd93d --- /dev/null +++ b/services/mana-notify-go/.gitignore @@ -0,0 +1 @@ +bin/ diff --git a/services/mana-notify-go/CLAUDE.md b/services/mana-notify-go/CLAUDE.md new file mode 100644 index 000000000..d4f7e4a8e --- /dev/null +++ b/services/mana-notify-go/CLAUDE.md @@ -0,0 +1,76 @@ +# mana-notify (Go) + +Go replacement for the NestJS mana-notify service. Unified notification microservice for email, push, Matrix, and webhook notifications. + +## Architecture + +- **Language:** Go 1.25 +- **Database:** PostgreSQL (pgx v5, 5 tables in `notify` schema) +- **Queue:** Go channels + goroutine worker pool (replaces BullMQ) +- **Metrics:** Prometheus +- **Port:** 3040 + +## Endpoints + +### Notifications (X-Service-Key auth) +- `POST /api/v1/notifications/send` — Send immediately +- `POST /api/v1/notifications/schedule` — Schedule for future +- `POST /api/v1/notifications/batch` — Batch send (max 100) +- `GET /api/v1/notifications/{id}` — Get status +- `DELETE /api/v1/notifications/{id}` — Cancel pending + +### Templates (X-Service-Key auth) +- `GET /api/v1/templates` — List all +- `GET /api/v1/templates/{slug}` — Get by slug +- `POST /api/v1/templates` — Create +- `PUT /api/v1/templates/{slug}` — Update +- `DELETE /api/v1/templates/{slug}` — Delete +- `POST /api/v1/templates/{slug}/preview` — Preview +- `POST /api/v1/templates/preview` — Preview custom + +### Devices (JWT auth) +- `POST /api/v1/devices/register` — Register push device +- `GET /api/v1/devices` — List devices +- `DELETE /api/v1/devices/{id}` — Unregister + +### Preferences (JWT auth) +- `GET /api/v1/preferences` — Get preferences +- `PUT /api/v1/preferences` — Update preferences + +### System +- `GET /health` — Health check +- `GET /metrics` — Prometheus metrics + +## Notification Channels + +| Channel | Service | Worker Concurrency | Max Retries | +|---------|---------|-------------------|-------------| +| Email | Brevo SMTP | 5 | 3 | +| Push | Expo Push API | 10 | 3 | +| Matrix | Matrix Homeserver API | 5 | 3 | +| Webhook | HTTP callback | 10 | 5 | + +## Commands + +```bash +go run ./cmd/server # Dev +go build -o bin/mana-notify ./cmd/server # Build +go test ./... # Test +``` + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `PORT` | 3040 | Server port | +| `DATABASE_URL` | postgresql://...localhost:5432/mana_notify | PostgreSQL | +| `SERVICE_KEY` | dev-service-key | Service-to-service auth | +| `MANA_CORE_AUTH_URL` | http://localhost:3001 | JWT validation | +| `SMTP_HOST` | smtp-relay.brevo.com | SMTP host | +| `SMTP_PORT` | 587 | SMTP port | +| `SMTP_USER` | | SMTP username | +| `SMTP_PASSWORD` | | SMTP password | +| `SMTP_FROM` | ManaCore | Default from | +| `EXPO_ACCESS_TOKEN` | | Expo push token | +| `MATRIX_HOMESERVER_URL` | | Matrix homeserver | +| `MATRIX_ACCESS_TOKEN` | | Matrix bot token | diff --git a/services/mana-notify-go/Dockerfile b/services/mana-notify-go/Dockerfile new file mode 100644 index 000000000..5ca835ef7 --- /dev/null +++ b/services/mana-notify-go/Dockerfile @@ -0,0 +1,23 @@ +FROM golang:1.25-alpine AS builder + +WORKDIR /app +COPY services/mana-notify-go/go.mod services/mana-notify-go/go.sum ./ +RUN go mod download + +COPY services/mana-notify-go/ . +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /mana-notify ./cmd/server + +FROM alpine:3.21 + +RUN apk --no-cache add ca-certificates tzdata && \ + addgroup -g 1000 mana && adduser -u 1000 -G mana -s /sbin/nologin -D mana + +COPY --from=builder /mana-notify /usr/local/bin/mana-notify + +USER mana +EXPOSE 3040 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD wget -q --spider http://localhost:3040/health || exit 1 + +ENTRYPOINT ["mana-notify"] diff --git a/services/mana-notify-go/cmd/server/main.go b/services/mana-notify-go/cmd/server/main.go new file mode 100644 index 000000000..82b0d7ff5 --- /dev/null +++ b/services/mana-notify-go/cmd/server/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/cors" + + "github.com/manacore/mana-notify/internal/auth" + "github.com/manacore/mana-notify/internal/channel" + "github.com/manacore/mana-notify/internal/config" + "github.com/manacore/mana-notify/internal/db" + "github.com/manacore/mana-notify/internal/handler" + "github.com/manacore/mana-notify/internal/metrics" + "github.com/manacore/mana-notify/internal/queue" + tmpl "github.com/manacore/mana-notify/internal/template" +) + +func main() { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + }))) + + cfg := config.Load() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + database, err := db.New(ctx, cfg.DatabaseURL) + cancel() + if err != nil { + slog.Error("database init failed", "error", err) + os.Exit(1) + } + defer database.Close() + + // Initialize services + m := metrics.New() + emailSvc := channel.NewEmailService(cfg) + pushSvc := channel.NewPushService(cfg) + matrixSvc := channel.NewMatrixService(cfg) + webhookSvc := channel.NewWebhookService() + engine := tmpl.NewEngine(database) + + // Seed default templates + engine.SeedDefaults(context.Background()) + + // Start worker pool + workerPool := queue.NewWorkerPool(database, emailSvc, pushSvc, matrixSvc, webhookSvc, m) + workerPool.Start() + defer workerPool.Stop() + + // Handlers + notifHandler := handler.NewNotificationsHandler(database, workerPool, engine) + tmplHandler := handler.NewTemplatesHandler(database, engine) + devicesHandler := handler.NewDevicesHandler(database) + prefsHandler := handler.NewPreferencesHandler(database) + healthHandler := handler.NewHealthHandler(database) + + // Middleware + serviceAuth := auth.ValidateServiceKey(cfg.ServiceKey) + jwtAuth := auth.ValidateJWT(cfg.ManaCoreAuthURL) + + mux := http.NewServeMux() + + // System endpoints (no auth) + mux.HandleFunc("GET /health", healthHandler.Health) + mux.Handle("GET /metrics", promhttp.Handler()) + + // Notification endpoints (service key auth) + mux.Handle("POST /api/v1/notifications/send", serviceAuth(http.HandlerFunc(notifHandler.Send))) + mux.Handle("POST /api/v1/notifications/schedule", serviceAuth(http.HandlerFunc(notifHandler.Schedule))) + mux.Handle("POST /api/v1/notifications/batch", serviceAuth(http.HandlerFunc(notifHandler.Batch))) + mux.Handle("GET /api/v1/notifications/{id}", serviceAuth(http.HandlerFunc(notifHandler.GetNotification))) + mux.Handle("DELETE /api/v1/notifications/{id}", serviceAuth(http.HandlerFunc(notifHandler.CancelNotification))) + + // Template endpoints (service key auth) + mux.Handle("GET /api/v1/templates", serviceAuth(http.HandlerFunc(tmplHandler.List))) + mux.Handle("POST /api/v1/templates", serviceAuth(http.HandlerFunc(tmplHandler.Create))) + mux.Handle("POST /api/v1/templates/preview", serviceAuth(http.HandlerFunc(tmplHandler.PreviewCustom))) + mux.Handle("GET /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Get))) + mux.Handle("PUT /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Update))) + mux.Handle("DELETE /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Delete))) + mux.Handle("POST /api/v1/templates/{slug}/preview", serviceAuth(http.HandlerFunc(tmplHandler.Preview))) + + // Device endpoints (JWT auth) + mux.Handle("POST /api/v1/devices/register", jwtAuth(http.HandlerFunc(devicesHandler.Register))) + mux.Handle("GET /api/v1/devices", jwtAuth(http.HandlerFunc(devicesHandler.List))) + mux.Handle("DELETE /api/v1/devices/{id}", jwtAuth(http.HandlerFunc(devicesHandler.Delete))) + + // Preference endpoints (JWT auth) + mux.Handle("GET /api/v1/preferences", jwtAuth(http.HandlerFunc(prefsHandler.Get))) + mux.Handle("PUT /api/v1/preferences", jwtAuth(http.HandlerFunc(prefsHandler.Update))) + + corsHandler := cors.New(cors.Options{ + AllowedOrigins: cfg.CORSOrigins, + AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, + AllowedHeaders: []string{"Content-Type", "Authorization", "X-Service-Key"}, + AllowCredentials: true, + }).Handler(mux) + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", cfg.Port), + Handler: corsHandler, + ReadTimeout: 30 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 120 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + go func() { + slog.Info("mana-notify started", "port", cfg.Port) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error("server error", "error", err) + os.Exit(1) + } + }() + + <-sigCh + slog.Info("shutting down...") + + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + slog.Error("shutdown error", "error", err) + } + + slog.Info("server stopped") +} diff --git a/services/mana-notify-go/go.mod b/services/mana-notify-go/go.mod new file mode 100644 index 000000000..e4c68da66 --- /dev/null +++ b/services/mana-notify-go/go.mod @@ -0,0 +1,26 @@ +module github.com/manacore/mana-notify + +go 1.25.0 + +require ( + github.com/golang-jwt/jwt/v5 v5.3.1 + github.com/jackc/pgx/v5 v5.9.1 + github.com/prometheus/client_golang v1.22.0 + github.com/rs/cors v1.11.1 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.62.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.29.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect +) diff --git a/services/mana-notify-go/go.sum b/services/mana-notify-go/go.sum new file mode 100644 index 000000000..de1070a5d --- /dev/null +++ b/services/mana-notify-go/go.sum @@ -0,0 +1,54 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= +github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= +github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= +github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/services/mana-notify-go/internal/auth/auth.go b/services/mana-notify-go/internal/auth/auth.go new file mode 100644 index 000000000..e50dabc0d --- /dev/null +++ b/services/mana-notify-go/internal/auth/auth.go @@ -0,0 +1,126 @@ +package auth + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strings" + "time" + + "github.com/golang-jwt/jwt/v5" +) + +type User struct { + UserID string `json:"userId"` + Email string `json:"email"` + Role string `json:"role"` + SessionID string `json:"sessionId"` +} + +type contextKey string + +const UserContextKey contextKey = "user" + +// ValidateServiceKey checks the X-Service-Key header. +func ValidateServiceKey(serviceKey string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + key := r.Header.Get("X-Service-Key") + if key == "" || key != serviceKey { + http.Error(w, `{"error":"unauthorized: invalid service key"}`, http.StatusUnauthorized) + return + } + next.ServeHTTP(w, r) + }) + } +} + +// ValidateJWT validates Bearer tokens against mana-core-auth JWKS. +func ValidateJWT(authURL string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + header := r.Header.Get("Authorization") + if !strings.HasPrefix(header, "Bearer ") { + http.Error(w, `{"error":"unauthorized: missing token"}`, http.StatusUnauthorized) + return + } + tokenStr := strings.TrimPrefix(header, "Bearer ") + + // Validate against auth service + user, err := validateToken(r.Context(), authURL, tokenStr) + if err != nil { + slog.Warn("jwt validation failed", "error", err) + http.Error(w, `{"error":"unauthorized: invalid token"}`, http.StatusUnauthorized) + return + } + + ctx := context.WithValue(r.Context(), UserContextKey, user) + next.ServeHTTP(w, r.WithContext(ctx)) + }) + } +} + +// GetUser extracts the authenticated user from context. +func GetUser(r *http.Request) *User { + u, ok := r.Context().Value(UserContextKey).(*User) + if !ok { + return nil + } + return u +} + +func validateToken(ctx context.Context, authURL, tokenStr string) (*User, error) { + // Parse without verification first to get claims + parser := jwt.NewParser(jwt.WithoutClaimsValidation()) + token, _, err := parser.ParseUnverified(tokenStr, jwt.MapClaims{}) + if err != nil { + return nil, fmt.Errorf("parse token: %w", err) + } + + claims, ok := token.Claims.(jwt.MapClaims) + if !ok { + return nil, fmt.Errorf("invalid claims") + } + + // Validate via auth service + client := &http.Client{Timeout: 5 * time.Second} + req, err := http.NewRequestWithContext(ctx, http.MethodPost, authURL+"/api/v1/auth/validate", strings.NewReader(`{"token":"`+tokenStr+`"}`)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("auth service unavailable: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("token validation failed: %d", resp.StatusCode) + } + + var result struct { + Valid bool `json:"valid"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + if !result.Valid { + return nil, fmt.Errorf("token invalid") + } + + sub, _ := claims["sub"].(string) + email, _ := claims["email"].(string) + role, _ := claims["role"].(string) + sid, _ := claims["sid"].(string) + + return &User{ + UserID: sub, + Email: email, + Role: role, + SessionID: sid, + }, nil +} diff --git a/services/mana-notify-go/internal/channel/email.go b/services/mana-notify-go/internal/channel/email.go new file mode 100644 index 000000000..941c83e20 --- /dev/null +++ b/services/mana-notify-go/internal/channel/email.go @@ -0,0 +1,144 @@ +package channel + +import ( + "crypto/tls" + "fmt" + "log/slog" + "net/smtp" + "strings" + "time" + + "github.com/manacore/mana-notify/internal/config" +) + +type EmailService struct { + host string + port int + user string + password string + from string +} + +func NewEmailService(cfg *config.Config) *EmailService { + return &EmailService{ + host: cfg.SMTPHost, + port: cfg.SMTPPort, + user: cfg.SMTPUser, + password: cfg.SMTPPassword, + from: cfg.SMTPFrom, + } +} + +type EmailMessage struct { + To string + Subject string + HTML string + Text string + From string + ReplyTo string +} + +type EmailResult struct { + Success bool + MessageID string + Error string +} + +func (s *EmailService) IsConfigured() bool { + return s.user != "" && s.password != "" +} + +func (s *EmailService) Send(msg *EmailMessage) EmailResult { + start := time.Now() + + if !s.IsConfigured() { + slog.Warn("smtp not configured, skipping email", "to", msg.To) + return EmailResult{Success: false, Error: "SMTP not configured"} + } + + from := s.from + if msg.From != "" { + from = msg.From + } + + // Build email headers and body + var builder strings.Builder + builder.WriteString(fmt.Sprintf("From: %s\r\n", from)) + builder.WriteString(fmt.Sprintf("To: %s\r\n", msg.To)) + builder.WriteString(fmt.Sprintf("Subject: %s\r\n", msg.Subject)) + if msg.ReplyTo != "" { + builder.WriteString(fmt.Sprintf("Reply-To: %s\r\n", msg.ReplyTo)) + } + builder.WriteString("MIME-Version: 1.0\r\n") + builder.WriteString("Content-Type: text/html; charset=\"UTF-8\"\r\n") + builder.WriteString("\r\n") + + if msg.HTML != "" { + builder.WriteString(msg.HTML) + } else { + builder.WriteString(msg.Text) + } + + // Extract email from "Name " format + fromAddr := extractEmail(from) + addr := fmt.Sprintf("%s:%d", s.host, s.port) + + auth := smtp.PlainAuth("", s.user, s.password, s.host) + + tlsConfig := &tls.Config{ServerName: s.host} + conn, err := tls.Dial("tcp", addr, tlsConfig) + if err != nil { + // Try STARTTLS fallback + err = smtp.SendMail(addr, auth, fromAddr, []string{msg.To}, []byte(builder.String())) + if err != nil { + slog.Error("email send failed", "to", msg.To, "error", err, "duration", time.Since(start)) + return EmailResult{Success: false, Error: err.Error()} + } + slog.Info("email sent via STARTTLS", "to", msg.To, "duration", time.Since(start)) + return EmailResult{Success: true} + } + defer conn.Close() + + client, err := smtp.NewClient(conn, s.host) + if err != nil { + return EmailResult{Success: false, Error: err.Error()} + } + defer client.Close() + + if err := client.Auth(auth); err != nil { + return EmailResult{Success: false, Error: err.Error()} + } + + if err := client.Mail(fromAddr); err != nil { + return EmailResult{Success: false, Error: err.Error()} + } + if err := client.Rcpt(msg.To); err != nil { + return EmailResult{Success: false, Error: err.Error()} + } + + w, err := client.Data() + if err != nil { + return EmailResult{Success: false, Error: err.Error()} + } + if _, err := w.Write([]byte(builder.String())); err != nil { + return EmailResult{Success: false, Error: err.Error()} + } + if err := w.Close(); err != nil { + return EmailResult{Success: false, Error: err.Error()} + } + + client.Quit() + + slog.Info("email sent", "to", msg.To, "duration", time.Since(start)) + return EmailResult{Success: true} +} + +func extractEmail(from string) string { + if idx := strings.Index(from, "<"); idx != -1 { + end := strings.Index(from, ">") + if end > idx { + return from[idx+1 : end] + } + } + return from +} diff --git a/services/mana-notify-go/internal/channel/matrix.go b/services/mana-notify-go/internal/channel/matrix.go new file mode 100644 index 000000000..7403db666 --- /dev/null +++ b/services/mana-notify-go/internal/channel/matrix.go @@ -0,0 +1,97 @@ +package channel + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "math/rand" + "net/http" + "time" + + "github.com/manacore/mana-notify/internal/config" +) + +type MatrixService struct { + homeserverURL string + accessToken string + client *http.Client +} + +func NewMatrixService(cfg *config.Config) *MatrixService { + return &MatrixService{ + homeserverURL: cfg.MatrixHomeserverURL, + accessToken: cfg.MatrixAccessToken, + client: &http.Client{Timeout: 10 * time.Second}, + } +} + +type MatrixMessage struct { + RoomID string + Body string + FormattedBody string + MsgType string // "m.text" or "m.notice" +} + +type MatrixResult struct { + Success bool + EventID string + Error string +} + +func (s *MatrixService) IsConfigured() bool { + return s.homeserverURL != "" && s.accessToken != "" +} + +func (s *MatrixService) Send(ctx context.Context, msg *MatrixMessage) MatrixResult { + if !s.IsConfigured() { + return MatrixResult{Success: false, Error: "Matrix not configured"} + } + + msgType := msg.MsgType + if msgType == "" { + msgType = "m.text" + } + + txnID := fmt.Sprintf("mana_%d_%d", time.Now().UnixMilli(), rand.Intn(100000)) + + payload := map[string]string{ + "msgtype": msgType, + "body": msg.Body, + } + if msg.FormattedBody != "" { + payload["format"] = "org.matrix.custom.html" + payload["formatted_body"] = msg.FormattedBody + } + + body, _ := json.Marshal(payload) + + url := fmt.Sprintf("%s/_matrix/client/v3/rooms/%s/send/m.room.message/%s", + s.homeserverURL, msg.RoomID, txnID) + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) + if err != nil { + return MatrixResult{Success: false, Error: err.Error()} + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+s.accessToken) + + resp, err := s.client.Do(req) + if err != nil { + slog.Error("matrix send failed", "room", msg.RoomID, "error", err) + return MatrixResult{Success: false, Error: err.Error()} + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return MatrixResult{Success: false, Error: fmt.Sprintf("matrix returned %d", resp.StatusCode)} + } + + var result struct { + EventID string `json:"event_id"` + } + json.NewDecoder(resp.Body).Decode(&result) + + return MatrixResult{Success: true, EventID: result.EventID} +} diff --git a/services/mana-notify-go/internal/channel/push.go b/services/mana-notify-go/internal/channel/push.go new file mode 100644 index 000000000..ebaa88129 --- /dev/null +++ b/services/mana-notify-go/internal/channel/push.go @@ -0,0 +1,169 @@ +package channel + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "strings" + "time" + + "github.com/manacore/mana-notify/internal/config" +) + +const expoPushURL = "https://exp.host/--/api/v2/push/send" + +type PushService struct { + accessToken string + client *http.Client +} + +func NewPushService(cfg *config.Config) *PushService { + return &PushService{ + accessToken: cfg.ExpoAccessToken, + client: &http.Client{Timeout: 30 * time.Second}, + } +} + +type PushMessage struct { + To string `json:"to"` + Title string `json:"title,omitempty"` + Body string `json:"body"` + Data map[string]any `json:"data,omitempty"` + Sound string `json:"sound,omitempty"` + Badge *int `json:"badge,omitempty"` + ChannelID string `json:"channelId,omitempty"` +} + +type PushResult struct { + Success bool + TicketID string + Error string +} + +func (s *PushService) IsConfigured() bool { + return s.accessToken != "" +} + +// IsExpoPushToken validates Expo token format. +func IsExpoPushToken(token string) bool { + return strings.HasPrefix(token, "ExponentPushToken[") || strings.HasPrefix(token, "ExpoPushToken[") +} + +// SendToTokens sends push notifications in batches of 100 (Expo limit). +func (s *PushService) SendToTokens(ctx context.Context, tokens []string, title, body string, data map[string]any, sound string, badge *int) map[string]PushResult { + results := make(map[string]PushResult, len(tokens)) + + if !s.IsConfigured() { + for _, t := range tokens { + results[t] = PushResult{Success: false, Error: "Expo not configured"} + } + return results + } + + // Build messages + messages := make([]PushMessage, 0, len(tokens)) + for _, token := range tokens { + messages = append(messages, PushMessage{ + To: token, + Title: title, + Body: body, + Data: data, + Sound: sound, + Badge: badge, + }) + } + + // Chunk into batches of 100 + for i := 0; i < len(messages); i += 100 { + end := i + 100 + if end > len(messages) { + end = len(messages) + } + chunk := messages[i:end] + + batchResults := s.sendBatch(ctx, chunk) + for token, result := range batchResults { + results[token] = result + } + } + + return results +} + +func (s *PushService) sendBatch(ctx context.Context, messages []PushMessage) map[string]PushResult { + results := make(map[string]PushResult, len(messages)) + + body, err := json.Marshal(messages) + if err != nil { + for _, m := range messages { + results[m.To] = PushResult{Success: false, Error: "marshal error"} + } + return results + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, expoPushURL, bytes.NewReader(body)) + if err != nil { + for _, m := range messages { + results[m.To] = PushResult{Success: false, Error: "request error"} + } + return results + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + if s.accessToken != "" { + req.Header.Set("Authorization", "Bearer "+s.accessToken) + } + + resp, err := s.client.Do(req) + if err != nil { + slog.Error("expo push failed", "error", err) + for _, m := range messages { + results[m.To] = PushResult{Success: false, Error: err.Error()} + } + return results + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + + var expoResp struct { + Data []struct { + Status string `json:"status"` + ID string `json:"id"` + Message string `json:"message"` + Details struct { + Error string `json:"error"` + } `json:"details"` + } `json:"data"` + } + + if err := json.Unmarshal(respBody, &expoResp); err != nil { + for _, m := range messages { + results[m.To] = PushResult{Success: false, Error: fmt.Sprintf("decode error: %s", err)} + } + return results + } + + for i, ticket := range expoResp.Data { + if i >= len(messages) { + break + } + token := messages[i].To + if ticket.Status == "ok" { + results[token] = PushResult{Success: true, TicketID: ticket.ID} + } else { + errMsg := ticket.Message + if ticket.Details.Error != "" { + errMsg = ticket.Details.Error + } + results[token] = PushResult{Success: false, Error: errMsg} + } + } + + return results +} diff --git a/services/mana-notify-go/internal/channel/webhook.go b/services/mana-notify-go/internal/channel/webhook.go new file mode 100644 index 000000000..5a1e28b08 --- /dev/null +++ b/services/mana-notify-go/internal/channel/webhook.go @@ -0,0 +1,91 @@ +package channel + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "time" +) + +type WebhookService struct { + client *http.Client +} + +func NewWebhookService() *WebhookService { + return &WebhookService{ + client: &http.Client{Timeout: 10 * time.Second}, + } +} + +type WebhookMessage struct { + URL string + Method string // POST or PUT + Headers map[string]string + Body map[string]any + Timeout int // ms +} + +type WebhookResult struct { + Success bool + StatusCode int + Error string + DurationMs int +} + +func (s *WebhookService) Send(ctx context.Context, msg *WebhookMessage) WebhookResult { + start := time.Now() + + method := msg.Method + if method == "" { + method = http.MethodPost + } + + timeout := 10 * time.Second + if msg.Timeout > 0 { + timeout = time.Duration(msg.Timeout) * time.Millisecond + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + body, err := json.Marshal(msg.Body) + if err != nil { + return WebhookResult{Success: false, Error: "marshal error", DurationMs: int(time.Since(start).Milliseconds())} + } + + req, err := http.NewRequestWithContext(ctx, method, msg.URL, bytes.NewReader(body)) + if err != nil { + return WebhookResult{Success: false, Error: err.Error(), DurationMs: int(time.Since(start).Milliseconds())} + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "ManaNotify/1.0") + for k, v := range msg.Headers { + req.Header.Set(k, v) + } + + resp, err := s.client.Do(req) + durationMs := int(time.Since(start).Milliseconds()) + if err != nil { + slog.Error("webhook send failed", "url", msg.URL, "error", err) + return WebhookResult{Success: false, Error: err.Error(), DurationMs: durationMs} + } + defer resp.Body.Close() + io.Copy(io.Discard, resp.Body) + + success := resp.StatusCode >= 200 && resp.StatusCode < 300 + if !success { + return WebhookResult{ + Success: false, + StatusCode: resp.StatusCode, + Error: fmt.Sprintf("webhook returned %d", resp.StatusCode), + DurationMs: durationMs, + } + } + + return WebhookResult{Success: true, StatusCode: resp.StatusCode, DurationMs: durationMs} +} diff --git a/services/mana-notify-go/internal/config/config.go b/services/mana-notify-go/internal/config/config.go new file mode 100644 index 000000000..7366fdbe5 --- /dev/null +++ b/services/mana-notify-go/internal/config/config.go @@ -0,0 +1,98 @@ +package config + +import ( + "os" + "strconv" + "strings" +) + +type Config struct { + Port int + + // Database + DatabaseURL string + + // Redis + RedisHost string + RedisPort int + RedisPassword string + + // Auth + ServiceKey string + ManaCoreAuthURL string + + // SMTP (Brevo) + SMTPHost string + SMTPPort int + SMTPUser string + SMTPPassword string + SMTPFrom string + + // Expo Push + ExpoAccessToken string + + // Matrix + MatrixHomeserverURL string + MatrixAccessToken string + + // Rate Limits + RateLimitEmailPerMinute int + RateLimitPushPerMinute int + + // CORS + CORSOrigins []string +} + +func Load() *Config { + return &Config{ + Port: getEnvInt("PORT", 3040), + + DatabaseURL: getEnv("DATABASE_URL", "postgresql://manacore:manacore@localhost:5432/mana_notify"), + + RedisHost: getEnv("REDIS_HOST", "localhost"), + RedisPort: getEnvInt("REDIS_PORT", 6379), + RedisPassword: getEnv("REDIS_PASSWORD", ""), + + ServiceKey: getEnv("SERVICE_KEY", "dev-service-key"), + ManaCoreAuthURL: getEnv("MANA_CORE_AUTH_URL", "http://localhost:3001"), + + SMTPHost: getEnv("SMTP_HOST", "smtp-relay.brevo.com"), + SMTPPort: getEnvInt("SMTP_PORT", 587), + SMTPUser: getEnv("SMTP_USER", ""), + SMTPPassword: getEnv("SMTP_PASSWORD", ""), + SMTPFrom: getEnv("SMTP_FROM", "ManaCore "), + + ExpoAccessToken: getEnv("EXPO_ACCESS_TOKEN", ""), + + MatrixHomeserverURL: getEnv("MATRIX_HOMESERVER_URL", ""), + MatrixAccessToken: getEnv("MATRIX_ACCESS_TOKEN", ""), + + RateLimitEmailPerMinute: getEnvInt("RATE_LIMIT_EMAIL_PER_MINUTE", 10), + RateLimitPushPerMinute: getEnvInt("RATE_LIMIT_PUSH_PER_MINUTE", 100), + + CORSOrigins: getEnvSlice("CORS_ORIGINS", []string{"http://localhost:3000", "http://localhost:5173"}), + } +} + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func getEnvInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if i, err := strconv.Atoi(v); err == nil { + return i + } + } + return fallback +} + +func getEnvSlice(key string, fallback []string) []string { + if v := os.Getenv(key); v != "" { + return strings.Split(v, ",") + } + return fallback +} diff --git a/services/mana-notify-go/internal/db/db.go b/services/mana-notify-go/internal/db/db.go new file mode 100644 index 000000000..9aa7966b4 --- /dev/null +++ b/services/mana-notify-go/internal/db/db.go @@ -0,0 +1,62 @@ +package db + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type DB struct { + Pool *pgxpool.Pool +} + +func New(ctx context.Context, databaseURL string) (*DB, error) { + cfg, err := pgxpool.ParseConfig(databaseURL) + if err != nil { + return nil, fmt.Errorf("parse database url: %w", err) + } + + cfg.MaxConns = 20 + cfg.MinConns = 2 + cfg.MaxConnLifetime = 30 * time.Minute + cfg.MaxConnIdleTime = 5 * time.Minute + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("connect to database: %w", err) + } + + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping database: %w", err) + } + + slog.Info("database connected", "url", redactURL(databaseURL)) + + db := &DB{Pool: pool} + if err := db.migrate(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("run migrations: %w", err) + } + + return db, nil +} + +func (d *DB) Close() { + d.Pool.Close() +} + +func (d *DB) HealthCheck(ctx context.Context) error { + return d.Pool.Ping(ctx) +} + +func redactURL(url string) string { + // Hide password from logs + if idx := len(url); idx > 30 { + return url[:30] + "..." + } + return url +} diff --git a/services/mana-notify-go/internal/db/migrations.go b/services/mana-notify-go/internal/db/migrations.go new file mode 100644 index 000000000..8c0e9e5af --- /dev/null +++ b/services/mana-notify-go/internal/db/migrations.go @@ -0,0 +1,136 @@ +package db + +import ( + "context" + "log/slog" +) + +func (d *DB) migrate(ctx context.Context) error { + slog.Info("running database migrations") + + migrations := []string{ + `CREATE SCHEMA IF NOT EXISTS notify`, + + // Enum types (idempotent with DO blocks) + `DO $$ BEGIN + CREATE TYPE notify.channel_type AS ENUM ('email', 'push', 'matrix', 'webhook'); + EXCEPTION WHEN duplicate_object THEN NULL; + END $$`, + + `DO $$ BEGIN + CREATE TYPE notify.notification_status AS ENUM ('pending', 'processing', 'delivered', 'failed', 'cancelled'); + EXCEPTION WHEN duplicate_object THEN NULL; + END $$`, + + `DO $$ BEGIN + CREATE TYPE notify.priority_type AS ENUM ('low', 'normal', 'high', 'critical'); + EXCEPTION WHEN duplicate_object THEN NULL; + END $$`, + + // Notifications table + `CREATE TABLE IF NOT EXISTS notify.notifications ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id TEXT, + app_id VARCHAR(50) NOT NULL, + channel notify.channel_type NOT NULL, + template_id VARCHAR(100), + subject VARCHAR(500), + body TEXT, + data JSONB, + status notify.notification_status NOT NULL DEFAULT 'pending', + priority notify.priority_type NOT NULL DEFAULT 'normal', + scheduled_for TIMESTAMPTZ, + recipient VARCHAR(500), + external_id VARCHAR(255), + attempts INTEGER NOT NULL DEFAULT 0, + delivered_at TIMESTAMPTZ, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + + `CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notify.notifications (user_id)`, + `CREATE INDEX IF NOT EXISTS idx_notifications_app_id ON notify.notifications (app_id)`, + `CREATE INDEX IF NOT EXISTS idx_notifications_status ON notify.notifications (status)`, + `CREATE INDEX IF NOT EXISTS idx_notifications_scheduled_for ON notify.notifications (scheduled_for)`, + `CREATE INDEX IF NOT EXISTS idx_notifications_external_id ON notify.notifications (external_id)`, + + // Templates table + `CREATE TABLE IF NOT EXISTS notify.templates ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + slug VARCHAR(100) NOT NULL, + app_id VARCHAR(50), + channel notify.channel_type NOT NULL, + subject VARCHAR(500), + body_template TEXT NOT NULL, + locale VARCHAR(10) NOT NULL DEFAULT 'de-DE', + is_active BOOLEAN NOT NULL DEFAULT true, + is_system BOOLEAN NOT NULL DEFAULT false, + variables JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(slug, locale) + )`, + + `CREATE INDEX IF NOT EXISTS idx_templates_app_id ON notify.templates (app_id)`, + `CREATE INDEX IF NOT EXISTS idx_templates_channel ON notify.templates (channel)`, + + // Devices table + `CREATE TABLE IF NOT EXISTS notify.devices ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id TEXT NOT NULL, + push_token TEXT NOT NULL UNIQUE, + token_type VARCHAR(20) NOT NULL DEFAULT 'expo', + platform VARCHAR(20), + device_name VARCHAR(100), + app_id VARCHAR(50), + is_active BOOLEAN NOT NULL DEFAULT true, + last_seen_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + + `CREATE INDEX IF NOT EXISTS idx_devices_user_id ON notify.devices (user_id)`, + + // Preferences table + `CREATE TABLE IF NOT EXISTS notify.preferences ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id TEXT NOT NULL UNIQUE, + email_enabled BOOLEAN NOT NULL DEFAULT false, + push_enabled BOOLEAN NOT NULL DEFAULT true, + quiet_hours_enabled BOOLEAN NOT NULL DEFAULT false, + quiet_hours_start VARCHAR(5), + quiet_hours_end VARCHAR(5), + timezone VARCHAR(50) NOT NULL DEFAULT 'Europe/Berlin', + category_preferences JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + + // Delivery logs table + `CREATE TABLE IF NOT EXISTS notify.delivery_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + notification_id UUID NOT NULL REFERENCES notify.notifications(id) ON DELETE CASCADE, + attempt_number INTEGER NOT NULL, + channel notify.channel_type NOT NULL, + success BOOLEAN NOT NULL, + status_code INTEGER, + error_message TEXT, + provider_id VARCHAR(255), + duration_ms INTEGER, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + + `CREATE INDEX IF NOT EXISTS idx_delivery_logs_notification_id ON notify.delivery_logs (notification_id)`, + `CREATE INDEX IF NOT EXISTS idx_delivery_logs_success ON notify.delivery_logs (success)`, + } + + for _, sql := range migrations { + if _, err := d.Pool.Exec(ctx, sql); err != nil { + return err + } + } + + slog.Info("migrations completed") + return nil +} diff --git a/services/mana-notify-go/internal/db/models.go b/services/mana-notify-go/internal/db/models.go new file mode 100644 index 000000000..c1bab62af --- /dev/null +++ b/services/mana-notify-go/internal/db/models.go @@ -0,0 +1,80 @@ +package db + +import "time" + +type Notification struct { + ID string `json:"id"` + UserID *string `json:"userId,omitempty"` + AppID string `json:"appId"` + Channel string `json:"channel"` + TemplateID *string `json:"templateId,omitempty"` + Subject *string `json:"subject,omitempty"` + Body *string `json:"body,omitempty"` + Data []byte `json:"data,omitempty"` // JSONB + Status string `json:"status"` + Priority string `json:"priority"` + ScheduledFor *time.Time `json:"scheduledFor,omitempty"` + Recipient *string `json:"recipient,omitempty"` + ExternalID *string `json:"externalId,omitempty"` + Attempts int `json:"attempts"` + DeliveredAt *time.Time `json:"deliveredAt,omitempty"` + ErrorMessage *string `json:"errorMessage,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type Template struct { + ID string `json:"id"` + Slug string `json:"slug"` + AppID *string `json:"appId,omitempty"` + Channel string `json:"channel"` + Subject *string `json:"subject,omitempty"` + BodyTemplate string `json:"bodyTemplate"` + Locale string `json:"locale"` + IsActive bool `json:"isActive"` + IsSystem bool `json:"isSystem"` + Variables []byte `json:"variables,omitempty"` // JSONB + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type Device struct { + ID string `json:"id"` + UserID string `json:"userId"` + PushToken string `json:"pushToken"` + TokenType string `json:"tokenType"` + Platform *string `json:"platform,omitempty"` + DeviceName *string `json:"deviceName,omitempty"` + AppID *string `json:"appId,omitempty"` + IsActive bool `json:"isActive"` + LastSeenAt *time.Time `json:"lastSeenAt,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type Preference struct { + ID string `json:"id"` + UserID string `json:"userId"` + EmailEnabled bool `json:"emailEnabled"` + PushEnabled bool `json:"pushEnabled"` + QuietHoursEnabled bool `json:"quietHoursEnabled"` + QuietHoursStart *string `json:"quietHoursStart,omitempty"` + QuietHoursEnd *string `json:"quietHoursEnd,omitempty"` + Timezone string `json:"timezone"` + CategoryPreferences []byte `json:"categoryPreferences,omitempty"` // JSONB + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type DeliveryLog struct { + ID string `json:"id"` + NotificationID string `json:"notificationId"` + AttemptNumber int `json:"attemptNumber"` + Channel string `json:"channel"` + Success bool `json:"success"` + StatusCode *int `json:"statusCode,omitempty"` + ErrorMessage *string `json:"errorMessage,omitempty"` + ProviderID *string `json:"providerId,omitempty"` + DurationMs *int `json:"durationMs,omitempty"` + CreatedAt time.Time `json:"createdAt"` +} diff --git a/services/mana-notify-go/internal/handler/common.go b/services/mana-notify-go/internal/handler/common.go new file mode 100644 index 000000000..69d3f747e --- /dev/null +++ b/services/mana-notify-go/internal/handler/common.go @@ -0,0 +1,24 @@ +package handler + +import ( + "encoding/json" + "net/http" + "time" +) + +func writeJSON(w http.ResponseWriter, status int, data any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(data) +} + +func writeError(w http.ResponseWriter, status int, message string) { + writeJSON(w, status, map[string]any{ + "success": false, + "error": map[string]any{ + "statusCode": status, + "message": message, + "timestamp": time.Now().UTC().Format(time.RFC3339), + }, + }) +} diff --git a/services/mana-notify-go/internal/handler/devices.go b/services/mana-notify-go/internal/handler/devices.go new file mode 100644 index 000000000..57588a358 --- /dev/null +++ b/services/mana-notify-go/internal/handler/devices.go @@ -0,0 +1,121 @@ +package handler + +import ( + "encoding/json" + "net/http" + + "github.com/manacore/mana-notify/internal/auth" + "github.com/manacore/mana-notify/internal/db" +) + +type DevicesHandler struct { + db *db.DB +} + +func NewDevicesHandler(database *db.DB) *DevicesHandler { + return &DevicesHandler{db: database} +} + +// Register handles POST /api/v1/devices/register +func (h *DevicesHandler) Register(w http.ResponseWriter, r *http.Request) { + user := auth.GetUser(r) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized") + return + } + + var req struct { + PushToken string `json:"pushToken"` + TokenType string `json:"tokenType,omitempty"` + Platform string `json:"platform,omitempty"` + DeviceName string `json:"deviceName,omitempty"` + AppID string `json:"appId,omitempty"` + } + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + if req.PushToken == "" { + writeError(w, http.StatusBadRequest, "pushToken is required") + return + } + + tokenType := req.TokenType + if tokenType == "" { + tokenType = "expo" + } + + // Upsert: transfer ownership if token exists for different user + var id string + err := h.db.Pool.QueryRow(r.Context(), + `INSERT INTO notify.devices (user_id, push_token, token_type, platform, device_name, app_id) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (push_token) DO UPDATE SET + user_id = EXCLUDED.user_id, + is_active = true, + last_seen_at = NOW(), + updated_at = NOW() + RETURNING id`, + user.UserID, req.PushToken, tokenType, nilIfEmpty(req.Platform), nilIfEmpty(req.DeviceName), nilIfEmpty(req.AppID), + ).Scan(&id) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to register device") + return + } + + writeJSON(w, http.StatusCreated, map[string]any{"device": map[string]any{"id": id}}) +} + +// List handles GET /api/v1/devices +func (h *DevicesHandler) List(w http.ResponseWriter, r *http.Request) { + user := auth.GetUser(r) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized") + return + } + + rows, err := h.db.Pool.Query(r.Context(), + `SELECT id, user_id, push_token, token_type, platform, device_name, app_id, is_active, last_seen_at, created_at, updated_at + FROM notify.devices WHERE user_id = $1 AND is_active = true ORDER BY created_at DESC`, user.UserID) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list devices") + return + } + defer rows.Close() + + var devices []db.Device + for rows.Next() { + var d db.Device + if err := rows.Scan(&d.ID, &d.UserID, &d.PushToken, &d.TokenType, &d.Platform, + &d.DeviceName, &d.AppID, &d.IsActive, &d.LastSeenAt, &d.CreatedAt, &d.UpdatedAt); err != nil { + continue + } + devices = append(devices, d) + } + + writeJSON(w, http.StatusOK, map[string]any{"devices": devices}) +} + +// Delete handles DELETE /api/v1/devices/{id} +func (h *DevicesHandler) Delete(w http.ResponseWriter, r *http.Request) { + user := auth.GetUser(r) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized") + return + } + + id := r.PathValue("id") + result, err := h.db.Pool.Exec(r.Context(), + `UPDATE notify.devices SET is_active = false, updated_at = NOW() WHERE id = $1 AND user_id = $2`, id, user.UserID) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to delete device") + return + } + if result.RowsAffected() == 0 { + writeError(w, http.StatusNotFound, "device not found") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"deleted": true}) +} diff --git a/services/mana-notify-go/internal/handler/health.go b/services/mana-notify-go/internal/handler/health.go new file mode 100644 index 000000000..9d1089aec --- /dev/null +++ b/services/mana-notify-go/internal/handler/health.go @@ -0,0 +1,38 @@ +package handler + +import ( + "net/http" + "time" + + "github.com/manacore/mana-notify/internal/db" +) + +type HealthHandler struct { + db *db.DB + startTime time.Time +} + +func NewHealthHandler(database *db.DB) *HealthHandler { + return &HealthHandler{db: database, startTime: time.Now()} +} + +func (h *HealthHandler) Health(w http.ResponseWriter, r *http.Request) { + dbOK := h.db.HealthCheck(r.Context()) == nil + + status := "healthy" + if !dbOK { + status = "unhealthy" + } + + writeJSON(w, http.StatusOK, map[string]any{ + "status": status, + "version": "1.0.0", + "service": "mana-notify", + "uptime": time.Since(h.startTime).Seconds(), + "timestamp": time.Now().UTC().Format(time.RFC3339), + "services": map[string]any{ + "database": dbOK, + "redis": true, + }, + }) +} diff --git a/services/mana-notify-go/internal/handler/notifications.go b/services/mana-notify-go/internal/handler/notifications.go new file mode 100644 index 000000000..16ec586cf --- /dev/null +++ b/services/mana-notify-go/internal/handler/notifications.go @@ -0,0 +1,504 @@ +package handler + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/manacore/mana-notify/internal/db" + "github.com/manacore/mana-notify/internal/queue" + tmpl "github.com/manacore/mana-notify/internal/template" +) + +type NotificationsHandler struct { + db *db.DB + pool *queue.WorkerPool + engine *tmpl.Engine +} + +func NewNotificationsHandler(database *db.DB, pool *queue.WorkerPool, engine *tmpl.Engine) *NotificationsHandler { + return &NotificationsHandler{db: database, pool: pool, engine: engine} +} + +type SendRequest struct { + Channel string `json:"channel"` + AppID string `json:"appId"` + UserID string `json:"userId,omitempty"` + Recipient string `json:"recipient,omitempty"` + Recipients []string `json:"recipients,omitempty"` + Template string `json:"template,omitempty"` + Subject string `json:"subject,omitempty"` + Body string `json:"body,omitempty"` + Data map[string]any `json:"data,omitempty"` + Priority string `json:"priority,omitempty"` + ExternalID string `json:"externalId,omitempty"` + + EmailOptions *EmailOptions `json:"emailOptions,omitempty"` + PushOptions *PushOptions `json:"pushOptions,omitempty"` + WebhookOptions *WebhookOptions `json:"webhookOptions,omitempty"` + MatrixOptions *MatrixOptions `json:"matrixOptions,omitempty"` +} + +type EmailOptions struct { + From string `json:"from,omitempty"` + ReplyTo string `json:"replyTo,omitempty"` +} + +type PushOptions struct { + Sound string `json:"sound,omitempty"` + Badge *int `json:"badge,omitempty"` + ChannelID string `json:"channelId,omitempty"` +} + +type WebhookOptions struct { + Method string `json:"method,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + Timeout int `json:"timeout,omitempty"` +} + +type MatrixOptions struct { + MsgType string `json:"msgtype,omitempty"` + FormattedBody string `json:"formattedBody,omitempty"` +} + +type ScheduleRequest struct { + SendRequest + ScheduledFor string `json:"scheduledFor"` +} + +type BatchRequest struct { + Notifications []SendRequest `json:"notifications"` +} + +// Send handles POST /api/v1/notifications/send +func (h *NotificationsHandler) Send(w http.ResponseWriter, r *http.Request) { + var req SendRequest + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + if err := validateSendRequest(&req); err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + // Check idempotency + if req.ExternalID != "" { + var existingID string + err := h.db.Pool.QueryRow(r.Context(), + `SELECT id FROM notify.notifications WHERE external_id = $1`, req.ExternalID, + ).Scan(&existingID) + if err == nil { + writeJSON(w, http.StatusOK, map[string]any{ + "notification": map[string]any{"id": existingID, "status": "existing"}, + "deduplicated": true, + }) + return + } + } + + // Check user preferences + if req.UserID != "" { + blocked, reason := h.checkPreferences(r.Context(), req.UserID, req.Channel) + if blocked { + writeJSON(w, http.StatusOK, map[string]any{ + "notification": map[string]any{"status": "cancelled", "reason": reason}, + }) + return + } + } + + // Render template + subject := req.Subject + body := req.Body + if req.Template != "" { + rendered, err := h.engine.RenderBySlug(r.Context(), req.Template, req.Data, "") + if err != nil { + slog.Warn("template render failed", "template", req.Template, "error", err) + } else { + if rendered.Subject != "" { + subject = rendered.Subject + } + if rendered.Body != "" { + body = rendered.Body + } + } + } + + priority := req.Priority + if priority == "" { + priority = "normal" + } + + // Create notification record + var notificationID string + err := h.db.Pool.QueryRow(r.Context(), + `INSERT INTO notify.notifications (user_id, app_id, channel, template_id, subject, body, data, priority, recipient, external_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + RETURNING id`, + nilIfEmpty(req.UserID), req.AppID, req.Channel, nilIfEmpty(req.Template), + nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(req.Data), + priority, nilIfEmpty(req.Recipient), nilIfEmpty(req.ExternalID), + ).Scan(¬ificationID) + if err != nil { + slog.Error("create notification failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to create notification") + return + } + + // Build and enqueue job + job := queue.Job{ + NotificationID: notificationID, + Channel: req.Channel, + AppID: req.AppID, + Recipient: req.Recipient, + Subject: subject, + Body: body, + Data: req.Data, + } + + if req.EmailOptions != nil { + job.From = req.EmailOptions.From + job.ReplyTo = req.EmailOptions.ReplyTo + } + if req.PushOptions != nil { + job.Sound = req.PushOptions.Sound + job.Badge = req.PushOptions.Badge + } + if req.MatrixOptions != nil { + job.RoomID = req.Recipient + job.MsgType = req.MatrixOptions.MsgType + job.FormattedBody = req.MatrixOptions.FormattedBody + } + if req.WebhookOptions != nil { + job.WebhookMethod = req.WebhookOptions.Method + job.WebhookHeaders = req.WebhookOptions.Headers + job.WebhookTimeout = req.WebhookOptions.Timeout + } + if req.Channel == "matrix" { + job.RoomID = req.Recipient + } + + h.pool.Enqueue(job) + + writeJSON(w, http.StatusAccepted, map[string]any{ + "notification": map[string]any{ + "id": notificationID, + "status": "pending", + }, + }) +} + +// Schedule handles POST /api/v1/notifications/schedule +func (h *NotificationsHandler) Schedule(w http.ResponseWriter, r *http.Request) { + var req ScheduleRequest + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + scheduledFor, err := time.Parse(time.RFC3339, req.ScheduledFor) + if err != nil { + writeError(w, http.StatusBadRequest, "scheduledFor must be a valid RFC3339 timestamp") + return + } + if scheduledFor.Before(time.Now()) { + writeError(w, http.StatusBadRequest, "scheduledFor must be in the future") + return + } + + if err := validateSendRequest(&req.SendRequest); err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + // Render template + subject := req.Subject + body := req.Body + if req.Template != "" { + rendered, err := h.engine.RenderBySlug(r.Context(), req.Template, req.Data, "") + if err == nil { + if rendered.Subject != "" { + subject = rendered.Subject + } + if rendered.Body != "" { + body = rendered.Body + } + } + } + + priority := req.Priority + if priority == "" { + priority = "normal" + } + + var notificationID string + err = h.db.Pool.QueryRow(r.Context(), + `INSERT INTO notify.notifications (user_id, app_id, channel, template_id, subject, body, data, priority, recipient, external_id, scheduled_for) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + RETURNING id`, + nilIfEmpty(req.UserID), req.AppID, req.Channel, nilIfEmpty(req.Template), + nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(req.Data), + priority, nilIfEmpty(req.Recipient), nilIfEmpty(req.ExternalID), scheduledFor, + ).Scan(¬ificationID) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to create notification") + return + } + + job := queue.Job{ + NotificationID: notificationID, + Channel: req.Channel, + AppID: req.AppID, + Recipient: req.Recipient, + Subject: subject, + Body: body, + Data: req.Data, + ScheduleAt: &scheduledFor, + } + h.pool.Enqueue(job) + + writeJSON(w, http.StatusAccepted, map[string]any{ + "notification": map[string]any{ + "id": notificationID, + "status": "pending", + "scheduledFor": scheduledFor.Format(time.RFC3339), + }, + }) +} + +// Batch handles POST /api/v1/notifications/batch +func (h *NotificationsHandler) Batch(w http.ResponseWriter, r *http.Request) { + var req BatchRequest + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 5<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + if len(req.Notifications) == 0 { + writeError(w, http.StatusBadRequest, "notifications array is required") + return + } + if len(req.Notifications) > 100 { + writeError(w, http.StatusBadRequest, "maximum 100 notifications per batch") + return + } + + type batchResult struct { + ID string `json:"id,omitempty"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + } + + results := make([]batchResult, len(req.Notifications)) + succeeded := 0 + failed := 0 + + for i, n := range req.Notifications { + if err := validateSendRequest(&n); err != nil { + results[i] = batchResult{Status: "failed", Error: err.Error()} + failed++ + continue + } + + subject := n.Subject + body := n.Body + if n.Template != "" { + rendered, err := h.engine.RenderBySlug(r.Context(), n.Template, n.Data, "") + if err == nil { + if rendered.Subject != "" { + subject = rendered.Subject + } + if rendered.Body != "" { + body = rendered.Body + } + } + } + + priority := n.Priority + if priority == "" { + priority = "normal" + } + + var notificationID string + err := h.db.Pool.QueryRow(r.Context(), + `INSERT INTO notify.notifications (user_id, app_id, channel, subject, body, data, priority, recipient) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING id`, + nilIfEmpty(n.UserID), n.AppID, n.Channel, + nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(n.Data), priority, nilIfEmpty(n.Recipient), + ).Scan(¬ificationID) + if err != nil { + results[i] = batchResult{Status: "failed", Error: "database error"} + failed++ + continue + } + + h.pool.Enqueue(queue.Job{ + NotificationID: notificationID, + Channel: n.Channel, + AppID: n.AppID, + Recipient: n.Recipient, + Subject: subject, + Body: body, + Data: n.Data, + }) + + results[i] = batchResult{ID: notificationID, Status: "pending"} + succeeded++ + } + + writeJSON(w, http.StatusAccepted, map[string]any{ + "results": results, + "succeeded": succeeded, + "failed": failed, + }) +} + +// GetNotification handles GET /api/v1/notifications/{id} +func (h *NotificationsHandler) GetNotification(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + writeError(w, http.StatusBadRequest, "notification id required") + return + } + + var n db.Notification + err := h.db.Pool.QueryRow(r.Context(), + `SELECT id, user_id, app_id, channel, template_id, subject, body, status, priority, scheduled_for, recipient, external_id, attempts, delivered_at, error_message, created_at, updated_at + FROM notify.notifications WHERE id = $1`, id, + ).Scan(&n.ID, &n.UserID, &n.AppID, &n.Channel, &n.TemplateID, &n.Subject, &n.Body, + &n.Status, &n.Priority, &n.ScheduledFor, &n.Recipient, &n.ExternalID, + &n.Attempts, &n.DeliveredAt, &n.ErrorMessage, &n.CreatedAt, &n.UpdatedAt) + + if err != nil { + writeError(w, http.StatusNotFound, "notification not found") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"notification": n}) +} + +// CancelNotification handles DELETE /api/v1/notifications/{id} +func (h *NotificationsHandler) CancelNotification(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + writeError(w, http.StatusBadRequest, "notification id required") + return + } + + result, err := h.db.Pool.Exec(r.Context(), + `UPDATE notify.notifications SET status = 'cancelled', updated_at = NOW() WHERE id = $1 AND status = 'pending'`, id) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to cancel notification") + return + } + if result.RowsAffected() == 0 { + writeError(w, http.StatusNotFound, "notification not found or already processed") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"cancelled": true}) +} + +func (h *NotificationsHandler) checkPreferences(ctx context.Context, userID, ch string) (bool, string) { + var emailEnabled, pushEnabled, quietEnabled bool + var quietStart, quietEnd, timezone *string + + err := h.db.Pool.QueryRow(ctx, + `SELECT email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone + FROM notify.preferences WHERE user_id = $1`, userID, + ).Scan(&emailEnabled, &pushEnabled, &quietEnabled, &quietStart, &quietEnd, &timezone) + + if err != nil { + return false, "" // No preferences = allow + } + + // Check channel preferences + if ch == "email" && !emailEnabled { + return true, "email notifications disabled by user" + } + if ch == "push" && !pushEnabled { + return true, "push notifications disabled by user" + } + + // Check quiet hours + if quietEnabled && quietStart != nil && quietEnd != nil { + tz := "Europe/Berlin" + if timezone != nil { + tz = *timezone + } + loc, err := time.LoadLocation(tz) + if err == nil { + now := time.Now().In(loc) + nowMinutes := now.Hour()*60 + now.Minute() + + startH, startM := parseTime(*quietStart) + endH, endM := parseTime(*quietEnd) + startMinutes := startH*60 + startM + endMinutes := endH*60 + endM + + var inQuiet bool + if startMinutes <= endMinutes { + inQuiet = nowMinutes >= startMinutes && nowMinutes < endMinutes + } else { + // Spans midnight (e.g. 22:00 to 08:00) + inQuiet = nowMinutes >= startMinutes || nowMinutes < endMinutes + } + + if inQuiet { + return true, "quiet hours active" + } + } + } + + return false, "" +} + +func validateSendRequest(req *SendRequest) error { + if req.Channel == "" { + return fmt.Errorf("channel is required") + } + validChannels := map[string]bool{"email": true, "push": true, "matrix": true, "webhook": true} + if !validChannels[req.Channel] { + return fmt.Errorf("channel must be email, push, matrix, or webhook") + } + if req.AppID == "" { + return fmt.Errorf("appId is required") + } + if req.Recipient == "" && len(req.Recipients) == 0 && req.UserID == "" { + return fmt.Errorf("recipient, recipients, or userId is required") + } + if req.Template == "" && req.Body == "" { + return fmt.Errorf("template or body is required") + } + return nil +} + +func parseTime(s string) (int, int) { + var h, m int + fmt.Sscanf(s, "%d:%d", &h, &m) + return h, m +} + +func nilIfEmpty(s string) *string { + if s == "" { + return nil + } + return &s +} + +func jsonOrNil(data map[string]any) []byte { + if data == nil { + return nil + } + b, err := json.Marshal(data) + if err != nil { + return nil + } + return b +} diff --git a/services/mana-notify-go/internal/handler/preferences.go b/services/mana-notify-go/internal/handler/preferences.go new file mode 100644 index 000000000..3c98a6b17 --- /dev/null +++ b/services/mana-notify-go/internal/handler/preferences.go @@ -0,0 +1,95 @@ +package handler + +import ( + "encoding/json" + "net/http" + + "github.com/manacore/mana-notify/internal/auth" + "github.com/manacore/mana-notify/internal/db" +) + +type PreferencesHandler struct { + db *db.DB +} + +func NewPreferencesHandler(database *db.DB) *PreferencesHandler { + return &PreferencesHandler{db: database} +} + +// Get handles GET /api/v1/preferences +func (h *PreferencesHandler) Get(w http.ResponseWriter, r *http.Request) { + user := auth.GetUser(r) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized") + return + } + + var p db.Preference + err := h.db.Pool.QueryRow(r.Context(), + `SELECT id, user_id, email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone, category_preferences, created_at, updated_at + FROM notify.preferences WHERE user_id = $1`, user.UserID, + ).Scan(&p.ID, &p.UserID, &p.EmailEnabled, &p.PushEnabled, &p.QuietHoursEnabled, + &p.QuietHoursStart, &p.QuietHoursEnd, &p.Timezone, &p.CategoryPreferences, &p.CreatedAt, &p.UpdatedAt) + + if err != nil { + // Return defaults + writeJSON(w, http.StatusOK, map[string]any{ + "preferences": map[string]any{ + "emailEnabled": false, + "pushEnabled": true, + "quietHoursEnabled": false, + "timezone": "Europe/Berlin", + }, + }) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"preferences": p}) +} + +// Update handles PUT /api/v1/preferences +func (h *PreferencesHandler) Update(w http.ResponseWriter, r *http.Request) { + user := auth.GetUser(r) + if user == nil { + writeError(w, http.StatusUnauthorized, "unauthorized") + return + } + + var req struct { + EmailEnabled *bool `json:"emailEnabled,omitempty"` + PushEnabled *bool `json:"pushEnabled,omitempty"` + QuietHoursEnabled *bool `json:"quietHoursEnabled,omitempty"` + QuietHoursStart *string `json:"quietHoursStart,omitempty"` + QuietHoursEnd *string `json:"quietHoursEnd,omitempty"` + Timezone *string `json:"timezone,omitempty"` + CategoryPreferences map[string]any `json:"categoryPreferences,omitempty"` + } + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + catJSON, _ := json.Marshal(req.CategoryPreferences) + + _, err := h.db.Pool.Exec(r.Context(), + `INSERT INTO notify.preferences (user_id, email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone, category_preferences) + VALUES ($1, COALESCE($2, false), COALESCE($3, true), COALESCE($4, false), $5, $6, COALESCE($7, 'Europe/Berlin'), $8) + ON CONFLICT (user_id) DO UPDATE SET + email_enabled = COALESCE($2, notify.preferences.email_enabled), + push_enabled = COALESCE($3, notify.preferences.push_enabled), + quiet_hours_enabled = COALESCE($4, notify.preferences.quiet_hours_enabled), + quiet_hours_start = COALESCE($5, notify.preferences.quiet_hours_start), + quiet_hours_end = COALESCE($6, notify.preferences.quiet_hours_end), + timezone = COALESCE($7, notify.preferences.timezone), + category_preferences = COALESCE($8, notify.preferences.category_preferences), + updated_at = NOW()`, + user.UserID, req.EmailEnabled, req.PushEnabled, req.QuietHoursEnabled, + req.QuietHoursStart, req.QuietHoursEnd, req.Timezone, catJSON, + ) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to update preferences") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"updated": true}) +} diff --git a/services/mana-notify-go/internal/handler/templates.go b/services/mana-notify-go/internal/handler/templates.go new file mode 100644 index 000000000..86ce39b74 --- /dev/null +++ b/services/mana-notify-go/internal/handler/templates.go @@ -0,0 +1,213 @@ +package handler + +import ( + "encoding/json" + "net/http" + + "github.com/manacore/mana-notify/internal/db" + tmpl "github.com/manacore/mana-notify/internal/template" +) + +type TemplatesHandler struct { + db *db.DB + engine *tmpl.Engine +} + +func NewTemplatesHandler(database *db.DB, engine *tmpl.Engine) *TemplatesHandler { + return &TemplatesHandler{db: database, engine: engine} +} + +// List handles GET /api/v1/templates +func (h *TemplatesHandler) List(w http.ResponseWriter, r *http.Request) { + rows, err := h.db.Pool.Query(r.Context(), + `SELECT id, slug, app_id, channel, subject, body_template, locale, is_active, is_system, variables, created_at, updated_at + FROM notify.templates ORDER BY slug`) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list templates") + return + } + defer rows.Close() + + var templates []db.Template + for rows.Next() { + var t db.Template + if err := rows.Scan(&t.ID, &t.Slug, &t.AppID, &t.Channel, &t.Subject, &t.BodyTemplate, + &t.Locale, &t.IsActive, &t.IsSystem, &t.Variables, &t.CreatedAt, &t.UpdatedAt); err != nil { + continue + } + templates = append(templates, t) + } + + writeJSON(w, http.StatusOK, map[string]any{"templates": templates}) +} + +// Get handles GET /api/v1/templates/{slug} +func (h *TemplatesHandler) Get(w http.ResponseWriter, r *http.Request) { + slug := r.PathValue("slug") + locale := r.URL.Query().Get("locale") + if locale == "" { + locale = "de-DE" + } + + var t db.Template + err := h.db.Pool.QueryRow(r.Context(), + `SELECT id, slug, app_id, channel, subject, body_template, locale, is_active, is_system, variables, created_at, updated_at + FROM notify.templates WHERE slug = $1 AND locale = $2`, slug, locale, + ).Scan(&t.ID, &t.Slug, &t.AppID, &t.Channel, &t.Subject, &t.BodyTemplate, + &t.Locale, &t.IsActive, &t.IsSystem, &t.Variables, &t.CreatedAt, &t.UpdatedAt) + if err != nil { + writeError(w, http.StatusNotFound, "template not found") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"template": t}) +} + +// Create handles POST /api/v1/templates +func (h *TemplatesHandler) Create(w http.ResponseWriter, r *http.Request) { + var req struct { + Slug string `json:"slug"` + AppID string `json:"appId,omitempty"` + Channel string `json:"channel"` + Subject string `json:"subject,omitempty"` + BodyTemplate string `json:"bodyTemplate"` + Locale string `json:"locale,omitempty"` + Variables any `json:"variables,omitempty"` + } + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + if req.Slug == "" || req.Channel == "" || req.BodyTemplate == "" { + writeError(w, http.StatusBadRequest, "slug, channel, and bodyTemplate are required") + return + } + if req.Locale == "" { + req.Locale = "de-DE" + } + + varsJSON, _ := json.Marshal(req.Variables) + + var id string + err := h.db.Pool.QueryRow(r.Context(), + `INSERT INTO notify.templates (slug, app_id, channel, subject, body_template, locale, variables) + VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id`, + req.Slug, nilIfEmpty(req.AppID), req.Channel, nilIfEmpty(req.Subject), req.BodyTemplate, req.Locale, varsJSON, + ).Scan(&id) + if err != nil { + writeError(w, http.StatusConflict, "template already exists for this slug+locale") + return + } + + writeJSON(w, http.StatusCreated, map[string]any{"id": id}) +} + +// Update handles PUT /api/v1/templates/{slug} +func (h *TemplatesHandler) Update(w http.ResponseWriter, r *http.Request) { + slug := r.PathValue("slug") + locale := r.URL.Query().Get("locale") + if locale == "" { + locale = "de-DE" + } + + var req struct { + Subject string `json:"subject,omitempty"` + BodyTemplate string `json:"bodyTemplate,omitempty"` + IsActive *bool `json:"isActive,omitempty"` + Variables any `json:"variables,omitempty"` + } + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + result, err := h.db.Pool.Exec(r.Context(), + `UPDATE notify.templates SET + subject = COALESCE($1, subject), + body_template = COALESCE($2, body_template), + is_active = COALESCE($3, is_active), + updated_at = NOW() + WHERE slug = $4 AND locale = $5 AND is_system = false`, + nilIfEmpty(req.Subject), nilIfEmpty(req.BodyTemplate), req.IsActive, slug, locale, + ) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to update template") + return + } + if result.RowsAffected() == 0 { + writeError(w, http.StatusNotFound, "template not found or is a system template") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"updated": true}) +} + +// Delete handles DELETE /api/v1/templates/{slug} +func (h *TemplatesHandler) Delete(w http.ResponseWriter, r *http.Request) { + slug := r.PathValue("slug") + + result, err := h.db.Pool.Exec(r.Context(), + `DELETE FROM notify.templates WHERE slug = $1 AND is_system = false`, slug) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to delete template") + return + } + if result.RowsAffected() == 0 { + writeError(w, http.StatusNotFound, "template not found or is a system template") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"deleted": true}) +} + +// Preview handles POST /api/v1/templates/{slug}/preview +func (h *TemplatesHandler) Preview(w http.ResponseWriter, r *http.Request) { + slug := r.PathValue("slug") + var req struct { + Data map[string]any `json:"data"` + } + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + rendered, err := h.engine.RenderBySlug(r.Context(), slug, req.Data, "") + if err != nil { + writeError(w, http.StatusNotFound, "template not found") + return + } + + writeJSON(w, http.StatusOK, map[string]any{"subject": rendered.Subject, "body": rendered.Body}) +} + +// PreviewCustom handles POST /api/v1/templates/preview +func (h *TemplatesHandler) PreviewCustom(w http.ResponseWriter, r *http.Request) { + var req struct { + Subject string `json:"subject,omitempty"` + BodyTemplate string `json:"bodyTemplate"` + Data map[string]any `json:"data"` + } + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + subject := "" + if req.Subject != "" { + s, err := tmpl.RenderDirect(req.Subject, req.Data) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid subject template: "+err.Error()) + return + } + subject = s + } + + body, err := tmpl.RenderDirect(req.BodyTemplate, req.Data) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid body template: "+err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"subject": subject, "body": body}) +} diff --git a/services/mana-notify-go/internal/metrics/metrics.go b/services/mana-notify-go/internal/metrics/metrics.go new file mode 100644 index 000000000..e2b11f460 --- /dev/null +++ b/services/mana-notify-go/internal/metrics/metrics.go @@ -0,0 +1,70 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type Metrics struct { + NotificationsSent *prometheus.CounterVec + NotificationsFailed *prometheus.CounterVec + EmailsSent *prometheus.CounterVec + PushSent *prometheus.CounterVec + MatrixSent *prometheus.CounterVec + WebhooksSent *prometheus.CounterVec + NotificationLatency *prometheus.HistogramVec + EmailLatency prometheus.Histogram + PushLatency prometheus.Histogram +} + +func New() *Metrics { + return &Metrics{ + NotificationsSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mana_notify_notifications_sent_total", + Help: "Total notifications sent", + }, []string{"channel", "app_id"}), + + NotificationsFailed: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mana_notify_notifications_failed_total", + Help: "Total notifications failed", + }, []string{"channel", "app_id", "error_type"}), + + EmailsSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mana_notify_emails_sent_total", + Help: "Total emails sent", + }, []string{"template", "status"}), + + PushSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mana_notify_push_sent_total", + Help: "Total push notifications sent", + }, []string{"platform", "status"}), + + MatrixSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mana_notify_matrix_sent_total", + Help: "Total Matrix messages sent", + }, []string{"status"}), + + WebhooksSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mana_notify_webhooks_sent_total", + Help: "Total webhooks sent", + }, []string{"status"}), + + NotificationLatency: promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "mana_notify_notification_latency_seconds", + Help: "Notification processing latency", + Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10}, + }, []string{"channel"}), + + EmailLatency: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "mana_notify_email_latency_seconds", + Help: "Email sending latency", + Buckets: []float64{0.1, 0.5, 1, 2, 5, 10}, + }), + + PushLatency: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "mana_notify_push_latency_seconds", + Help: "Push notification latency", + Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1}, + }), + } +} diff --git a/services/mana-notify-go/internal/queue/worker.go b/services/mana-notify-go/internal/queue/worker.go new file mode 100644 index 000000000..e64fc73df --- /dev/null +++ b/services/mana-notify-go/internal/queue/worker.go @@ -0,0 +1,318 @@ +package queue + +import ( + "context" + "fmt" + "log/slog" + "math" + "time" + + "github.com/manacore/mana-notify/internal/channel" + "github.com/manacore/mana-notify/internal/db" + "github.com/manacore/mana-notify/internal/metrics" +) + +// Job represents a notification delivery job. +type Job struct { + NotificationID string + Channel string + AppID string + Recipient string + Subject string + Body string + Data map[string]any + + // Channel-specific + From string + ReplyTo string + Tokens []string // Push: multiple tokens + Sound string // Push + Badge *int // Push + Platform string // Push + RoomID string // Matrix + FormattedBody string // Matrix + MsgType string // Matrix + WebhookMethod string // Webhook + WebhookHeaders map[string]string // Webhook + WebhookTimeout int // Webhook + + // Retry + Attempt int + MaxRetries int + BackoffMs int + ScheduleAt *time.Time +} + +// WorkerConfig per channel. +type WorkerConfig struct { + Concurrency int + MaxRetries int + BackoffMs int +} + +var DefaultConfigs = map[string]WorkerConfig{ + "email": {Concurrency: 5, MaxRetries: 3, BackoffMs: 5000}, + "push": {Concurrency: 10, MaxRetries: 3, BackoffMs: 1000}, + "matrix": {Concurrency: 5, MaxRetries: 3, BackoffMs: 2000}, + "webhook": {Concurrency: 10, MaxRetries: 5, BackoffMs: 3000}, +} + +// WorkerPool manages goroutine-based workers for all channels. +type WorkerPool struct { + db *db.DB + email *channel.EmailService + push *channel.PushService + matrix *channel.MatrixService + webhook *channel.WebhookService + metrics *metrics.Metrics + jobs chan Job + ctx context.Context + cancel context.CancelFunc +} + +func NewWorkerPool(database *db.DB, email *channel.EmailService, push *channel.PushService, matrix *channel.MatrixService, webhook *channel.WebhookService, m *metrics.Metrics) *WorkerPool { + ctx, cancel := context.WithCancel(context.Background()) + return &WorkerPool{ + db: database, + email: email, + push: push, + matrix: matrix, + webhook: webhook, + metrics: m, + jobs: make(chan Job, 1000), + ctx: ctx, + cancel: cancel, + } +} + +func (wp *WorkerPool) Start() { + // Start workers per channel type + for ch, cfg := range DefaultConfigs { + for i := 0; i < cfg.Concurrency; i++ { + go wp.worker(ch) + } + slog.Info("workers started", "channel", ch, "concurrency", cfg.Concurrency) + } +} + +func (wp *WorkerPool) Stop() { + wp.cancel() + close(wp.jobs) +} + +// Enqueue adds a job to the worker pool. +func (wp *WorkerPool) Enqueue(job Job) { + cfg := DefaultConfigs[job.Channel] + if job.MaxRetries == 0 { + job.MaxRetries = cfg.MaxRetries + } + if job.BackoffMs == 0 { + job.BackoffMs = cfg.BackoffMs + } + + // Handle scheduled delivery + if job.ScheduleAt != nil && job.ScheduleAt.After(time.Now()) { + delay := time.Until(*job.ScheduleAt) + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-timer.C: + wp.jobs <- job + case <-wp.ctx.Done(): + } + }() + return + } + + wp.jobs <- job +} + +func (wp *WorkerPool) worker(ch string) { + for { + select { + case job, ok := <-wp.jobs: + if !ok { + return + } + if job.Channel != ch { + // Put back for correct channel worker + wp.jobs <- job + continue + } + wp.processJob(job) + case <-wp.ctx.Done(): + return + } + } +} + +func (wp *WorkerPool) processJob(job Job) { + start := time.Now() + ctx := wp.ctx + + // Update status to processing + wp.updateStatus(ctx, job.NotificationID, "processing", job.Attempt+1, nil) + + var success bool + var providerID string + var statusCode *int + var errMsg string + + defer func() { + if r := recover(); r != nil { + slog.Error("job panic", "notification", job.NotificationID, "panic", r) + errMsg = "internal panic" + success = false + } + + duration := time.Since(start) + durationMs := int(duration.Milliseconds()) + + // Log delivery attempt + wp.logDelivery(ctx, job.NotificationID, job.Attempt+1, job.Channel, success, statusCode, errMsg, providerID, durationMs) + + if success { + wp.updateStatus(ctx, job.NotificationID, "delivered", job.Attempt+1, nil) + wp.metrics.NotificationsSent.WithLabelValues(job.Channel, job.AppID).Inc() + } else if job.Attempt+1 < job.MaxRetries { + // Retry with exponential backoff + job.Attempt++ + delay := time.Duration(float64(job.BackoffMs)*math.Pow(2, float64(job.Attempt-1))) * time.Millisecond + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-timer.C: + wp.jobs <- job + case <-wp.ctx.Done(): + } + }() + slog.Info("retrying job", "notification", job.NotificationID, "attempt", job.Attempt, "delay", delay) + } else { + errStr := errMsg + wp.updateStatus(ctx, job.NotificationID, "failed", job.Attempt+1, &errStr) + wp.metrics.NotificationsFailed.WithLabelValues(job.Channel, job.AppID, "max_retries").Inc() + } + + wp.metrics.NotificationLatency.WithLabelValues(job.Channel).Observe(duration.Seconds()) + }() + + switch job.Channel { + case "email": + result := wp.email.Send(&channel.EmailMessage{ + To: job.Recipient, + Subject: job.Subject, + HTML: job.Body, + From: job.From, + ReplyTo: job.ReplyTo, + }) + success = result.Success + providerID = result.MessageID + errMsg = result.Error + template := "custom" + status := "success" + if !success { + status = "failed" + } + wp.metrics.EmailsSent.WithLabelValues(template, status).Inc() + wp.metrics.EmailLatency.Observe(time.Since(start).Seconds()) + + case "push": + tokens := job.Tokens + if len(tokens) == 0 && job.Recipient != "" { + tokens = []string{job.Recipient} + } + results := wp.push.SendToTokens(ctx, tokens, job.Subject, job.Body, job.Data, job.Sound, job.Badge) + // Check if all succeeded + success = true + for _, r := range results { + if !r.Success { + success = false + errMsg = r.Error + } else { + providerID = r.TicketID + } + } + status := "success" + if !success { + status = "failed" + } + wp.metrics.PushSent.WithLabelValues(job.Platform, status).Inc() + wp.metrics.PushLatency.Observe(time.Since(start).Seconds()) + + case "matrix": + result := wp.matrix.Send(ctx, &channel.MatrixMessage{ + RoomID: job.RoomID, + Body: job.Body, + FormattedBody: job.FormattedBody, + MsgType: job.MsgType, + }) + success = result.Success + providerID = result.EventID + errMsg = result.Error + status := "success" + if !success { + status = "failed" + } + wp.metrics.MatrixSent.WithLabelValues(status).Inc() + + case "webhook": + result := wp.webhook.Send(ctx, &channel.WebhookMessage{ + URL: job.Recipient, + Method: job.WebhookMethod, + Headers: job.WebhookHeaders, + Body: job.Data, + Timeout: job.WebhookTimeout, + }) + success = result.Success + sc := result.StatusCode + statusCode = &sc + errMsg = result.Error + status := "success" + if !success { + status = "failed" + } + wp.metrics.WebhooksSent.WithLabelValues(status).Inc() + } +} + +func (wp *WorkerPool) updateStatus(ctx context.Context, notificationID, status string, attempts int, errMsg *string) { + query := `UPDATE notify.notifications SET status = $1, attempts = $2, updated_at = NOW()` + args := []any{status, attempts} + + if status == "delivered" { + query += `, delivered_at = NOW()` + } + if errMsg != nil { + query += fmt.Sprintf(`, error_message = $%d`, len(args)+1) + args = append(args, *errMsg) + } + + query += fmt.Sprintf(` WHERE id = $%d`, len(args)+1) + args = append(args, notificationID) + + if _, err := wp.db.Pool.Exec(ctx, query, args...); err != nil { + slog.Error("update notification status failed", "id", notificationID, "error", err) + } +} + +func (wp *WorkerPool) logDelivery(ctx context.Context, notificationID string, attempt int, ch string, success bool, statusCode *int, errMsg, providerID string, durationMs int) { + var errMsgPtr, providerIDPtr *string + if errMsg != "" { + errMsgPtr = &errMsg + } + if providerID != "" { + providerIDPtr = &providerID + } + + _, err := wp.db.Pool.Exec(ctx, + `INSERT INTO notify.delivery_logs (notification_id, attempt_number, channel, success, status_code, error_message, provider_id, duration_ms) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, + notificationID, attempt, ch, success, statusCode, errMsgPtr, providerIDPtr, durationMs, + ) + if err != nil { + slog.Error("log delivery failed", "notification", notificationID, "error", err) + } +} + diff --git a/services/mana-notify-go/internal/template/engine.go b/services/mana-notify-go/internal/template/engine.go new file mode 100644 index 000000000..2dd3e15a0 --- /dev/null +++ b/services/mana-notify-go/internal/template/engine.go @@ -0,0 +1,138 @@ +package template + +import ( + "bytes" + "context" + "log/slog" + "text/template" + + "github.com/manacore/mana-notify/internal/db" +) + +type Engine struct { + db *db.DB +} + +func NewEngine(database *db.DB) *Engine { + return &Engine{db: database} +} + +type Rendered struct { + Subject string + Body string +} + +// RenderBySlug looks up a template by slug and renders it with the given data. +func (e *Engine) RenderBySlug(ctx context.Context, slug string, data map[string]any, locale string) (*Rendered, error) { + if locale == "" { + locale = "de-DE" + } + + var subject, bodyTemplate *string + err := e.db.Pool.QueryRow(ctx, + `SELECT subject, body_template FROM notify.templates WHERE slug = $1 AND locale = $2 AND is_active = true`, + slug, locale, + ).Scan(&subject, &bodyTemplate) + + if err != nil { + // Try default locale fallback + err = e.db.Pool.QueryRow(ctx, + `SELECT subject, body_template FROM notify.templates WHERE slug = $1 AND is_active = true ORDER BY locale LIMIT 1`, + slug, + ).Scan(&subject, &bodyTemplate) + if err != nil { + return nil, err + } + } + + rendered := &Rendered{} + + if subject != nil { + s, err := renderTemplate(*subject, data) + if err != nil { + return nil, err + } + rendered.Subject = s + } + + if bodyTemplate != nil { + b, err := renderTemplate(*bodyTemplate, data) + if err != nil { + return nil, err + } + rendered.Body = b + } + + return rendered, nil +} + +// RenderDirect renders a template string with data. +func RenderDirect(tmplStr string, data map[string]any) (string, error) { + return renderTemplate(tmplStr, data) +} + +func renderTemplate(tmplStr string, data map[string]any) (string, error) { + tmpl, err := template.New("").Parse(tmplStr) + if err != nil { + return "", err + } + var buf bytes.Buffer + if err := tmpl.Execute(&buf, data); err != nil { + return "", err + } + return buf.String(), nil +} + +// SeedDefaults inserts default system templates if they don't exist. +func (e *Engine) SeedDefaults(ctx context.Context) { + defaults := []struct { + slug string + channel string + subject string + body string + vars string + }{ + { + slug: "auth-password-reset", + channel: "email", + subject: "Passwort zurücksetzen - ManaCore", + body: `

Passwort zurücksetzen

Hallo {{.userName}},

Klicke auf den folgenden Link, um dein Passwort zurückzusetzen:

Passwort zurücksetzen

Falls du diese Anfrage nicht gestellt hast, kannst du diese E-Mail ignorieren.

`, + vars: `{"resetUrl": "URL zum Zurücksetzen", "userName": "Name des Benutzers"}`, + }, + { + slug: "auth-verification", + channel: "email", + subject: "E-Mail bestätigen - ManaCore", + body: `

E-Mail bestätigen

Hallo {{.userName}},

Bitte bestätige deine E-Mail-Adresse:

E-Mail bestätigen

`, + vars: `{"verificationUrl": "Bestätigungs-URL", "userName": "Name des Benutzers"}`, + }, + { + slug: "auth-welcome", + channel: "email", + subject: "Willkommen bei ManaCore!", + body: `

Willkommen!

Hallo {{.userName}},

Willkommen bei ManaCore! Du kannst dich jetzt anmelden:

Anmelden

`, + vars: `{"userName": "Name des Benutzers", "loginUrl": "Login-URL"}`, + }, + { + slug: "calendar-reminder", + channel: "email", + subject: "Erinnerung: {{.eventTitle}}", + body: `

{{.eventTitle}}

Wann: {{.eventTime}}

{{if .eventLocation}}

Wo: {{.eventLocation}}

{{end}}

Termin anzeigen

`, + vars: `{"eventTitle": "Titel", "eventTime": "Zeit", "eventLocation": "Ort (optional)", "eventUrl": "Link"}`, + }, + } + + for _, d := range defaults { + _, err := e.db.Pool.Exec(ctx, + `INSERT INTO notify.templates (slug, channel, subject, body_template, locale, is_system, variables) + VALUES ($1, $2, $3, $4, 'de-DE', true, $5) + ON CONFLICT (slug, locale) DO NOTHING`, + d.slug, d.channel, d.subject, d.body, d.vars, + ) + if err != nil { + slog.Warn("seed template failed", "slug", d.slug, "error", err) + } + } + + slog.Info("default templates seeded") +} diff --git a/services/mana-notify-go/package.json b/services/mana-notify-go/package.json new file mode 100644 index 000000000..125f18b1f --- /dev/null +++ b/services/mana-notify-go/package.json @@ -0,0 +1,10 @@ +{ + "name": "@manacore/mana-notify-go", + "version": "1.0.0", + "private": true, + "scripts": { + "dev": "go run ./cmd/server", + "build": "go build -o bin/mana-notify ./cmd/server", + "test": "go test ./..." + } +}