feat(mana-notify): rewrite notification service from NestJS to Go

Replaces the NestJS mana-notify service with a Go implementation.
Features: 4 notification channels (email/SMTP, Expo push, Matrix,
webhook), goroutine worker pool with retry/backoff (replaces BullMQ),
Go template engine (replaces Handlebars), PostgreSQL with auto-migrations
(5 tables), user preferences with quiet hours, idempotency via
externalId, batch sending, scheduled delivery, JWT + service key auth.

22 API endpoints, 1:1 compatible. Binary: 21 MB.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-03-27 22:28:19 +01:00
parent d0ef6676b5
commit 585cdc1753
26 changed files with 2853 additions and 1 deletions

View file

@ -138,7 +138,8 @@ manacore-monorepo/
│ ├── mana-landing-builder/# Org landing page builder (Astro → Cloudflare Pages)
│ ├── mana-media/ # Central media platform (CAS, thumbnails)
│ ├── mana-api-gateway/ # API gateway with rate limiting
│ ├── mana-notify/ # Notification service (push, email, in-app)
│ ├── mana-notify/ # Notification service (NestJS, legacy)
│ ├── mana-notify-go/ # Notification service (Go, active)
│ ├── mana-image-gen/ # Local AI image generation (FLUX)
│ ├── mana-stt/ # Speech-to-text service
│ ├── mana-tts/ # Text-to-speech service

1
services/mana-notify-go/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
bin/

View file

@ -0,0 +1,76 @@
# mana-notify (Go)
Go replacement for the NestJS mana-notify service. Unified notification microservice for email, push, Matrix, and webhook notifications.
## Architecture
- **Language:** Go 1.25
- **Database:** PostgreSQL (pgx v5, 5 tables in `notify` schema)
- **Queue:** Go channels + goroutine worker pool (replaces BullMQ)
- **Metrics:** Prometheus
- **Port:** 3040
## Endpoints
### Notifications (X-Service-Key auth)
- `POST /api/v1/notifications/send` — Send immediately
- `POST /api/v1/notifications/schedule` — Schedule for future
- `POST /api/v1/notifications/batch` — Batch send (max 100)
- `GET /api/v1/notifications/{id}` — Get status
- `DELETE /api/v1/notifications/{id}` — Cancel pending
### Templates (X-Service-Key auth)
- `GET /api/v1/templates` — List all
- `GET /api/v1/templates/{slug}` — Get by slug
- `POST /api/v1/templates` — Create
- `PUT /api/v1/templates/{slug}` — Update
- `DELETE /api/v1/templates/{slug}` — Delete
- `POST /api/v1/templates/{slug}/preview` — Preview
- `POST /api/v1/templates/preview` — Preview custom
### Devices (JWT auth)
- `POST /api/v1/devices/register` — Register push device
- `GET /api/v1/devices` — List devices
- `DELETE /api/v1/devices/{id}` — Unregister
### Preferences (JWT auth)
- `GET /api/v1/preferences` — Get preferences
- `PUT /api/v1/preferences` — Update preferences
### System
- `GET /health` — Health check
- `GET /metrics` — Prometheus metrics
## Notification Channels
| Channel | Service | Worker Concurrency | Max Retries |
|---------|---------|-------------------|-------------|
| Email | Brevo SMTP | 5 | 3 |
| Push | Expo Push API | 10 | 3 |
| Matrix | Matrix Homeserver API | 5 | 3 |
| Webhook | HTTP callback | 10 | 5 |
## Commands
```bash
go run ./cmd/server # Dev
go build -o bin/mana-notify ./cmd/server # Build
go test ./... # Test
```
## Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `PORT` | 3040 | Server port |
| `DATABASE_URL` | postgresql://...localhost:5432/mana_notify | PostgreSQL |
| `SERVICE_KEY` | dev-service-key | Service-to-service auth |
| `MANA_CORE_AUTH_URL` | http://localhost:3001 | JWT validation |
| `SMTP_HOST` | smtp-relay.brevo.com | SMTP host |
| `SMTP_PORT` | 587 | SMTP port |
| `SMTP_USER` | | SMTP username |
| `SMTP_PASSWORD` | | SMTP password |
| `SMTP_FROM` | ManaCore <noreply@mana.how> | Default from |
| `EXPO_ACCESS_TOKEN` | | Expo push token |
| `MATRIX_HOMESERVER_URL` | | Matrix homeserver |
| `MATRIX_ACCESS_TOKEN` | | Matrix bot token |

View file

@ -0,0 +1,23 @@
FROM golang:1.25-alpine AS builder
WORKDIR /app
COPY services/mana-notify-go/go.mod services/mana-notify-go/go.sum ./
RUN go mod download
COPY services/mana-notify-go/ .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /mana-notify ./cmd/server
FROM alpine:3.21
RUN apk --no-cache add ca-certificates tzdata && \
addgroup -g 1000 mana && adduser -u 1000 -G mana -s /sbin/nologin -D mana
COPY --from=builder /mana-notify /usr/local/bin/mana-notify
USER mana
EXPOSE 3040
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
CMD wget -q --spider http://localhost:3040/health || exit 1
ENTRYPOINT ["mana-notify"]

View file

@ -0,0 +1,137 @@
package main
import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
"github.com/manacore/mana-notify/internal/auth"
"github.com/manacore/mana-notify/internal/channel"
"github.com/manacore/mana-notify/internal/config"
"github.com/manacore/mana-notify/internal/db"
"github.com/manacore/mana-notify/internal/handler"
"github.com/manacore/mana-notify/internal/metrics"
"github.com/manacore/mana-notify/internal/queue"
tmpl "github.com/manacore/mana-notify/internal/template"
)
func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
})))
cfg := config.Load()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
database, err := db.New(ctx, cfg.DatabaseURL)
cancel()
if err != nil {
slog.Error("database init failed", "error", err)
os.Exit(1)
}
defer database.Close()
// Initialize services
m := metrics.New()
emailSvc := channel.NewEmailService(cfg)
pushSvc := channel.NewPushService(cfg)
matrixSvc := channel.NewMatrixService(cfg)
webhookSvc := channel.NewWebhookService()
engine := tmpl.NewEngine(database)
// Seed default templates
engine.SeedDefaults(context.Background())
// Start worker pool
workerPool := queue.NewWorkerPool(database, emailSvc, pushSvc, matrixSvc, webhookSvc, m)
workerPool.Start()
defer workerPool.Stop()
// Handlers
notifHandler := handler.NewNotificationsHandler(database, workerPool, engine)
tmplHandler := handler.NewTemplatesHandler(database, engine)
devicesHandler := handler.NewDevicesHandler(database)
prefsHandler := handler.NewPreferencesHandler(database)
healthHandler := handler.NewHealthHandler(database)
// Middleware
serviceAuth := auth.ValidateServiceKey(cfg.ServiceKey)
jwtAuth := auth.ValidateJWT(cfg.ManaCoreAuthURL)
mux := http.NewServeMux()
// System endpoints (no auth)
mux.HandleFunc("GET /health", healthHandler.Health)
mux.Handle("GET /metrics", promhttp.Handler())
// Notification endpoints (service key auth)
mux.Handle("POST /api/v1/notifications/send", serviceAuth(http.HandlerFunc(notifHandler.Send)))
mux.Handle("POST /api/v1/notifications/schedule", serviceAuth(http.HandlerFunc(notifHandler.Schedule)))
mux.Handle("POST /api/v1/notifications/batch", serviceAuth(http.HandlerFunc(notifHandler.Batch)))
mux.Handle("GET /api/v1/notifications/{id}", serviceAuth(http.HandlerFunc(notifHandler.GetNotification)))
mux.Handle("DELETE /api/v1/notifications/{id}", serviceAuth(http.HandlerFunc(notifHandler.CancelNotification)))
// Template endpoints (service key auth)
mux.Handle("GET /api/v1/templates", serviceAuth(http.HandlerFunc(tmplHandler.List)))
mux.Handle("POST /api/v1/templates", serviceAuth(http.HandlerFunc(tmplHandler.Create)))
mux.Handle("POST /api/v1/templates/preview", serviceAuth(http.HandlerFunc(tmplHandler.PreviewCustom)))
mux.Handle("GET /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Get)))
mux.Handle("PUT /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Update)))
mux.Handle("DELETE /api/v1/templates/{slug}", serviceAuth(http.HandlerFunc(tmplHandler.Delete)))
mux.Handle("POST /api/v1/templates/{slug}/preview", serviceAuth(http.HandlerFunc(tmplHandler.Preview)))
// Device endpoints (JWT auth)
mux.Handle("POST /api/v1/devices/register", jwtAuth(http.HandlerFunc(devicesHandler.Register)))
mux.Handle("GET /api/v1/devices", jwtAuth(http.HandlerFunc(devicesHandler.List)))
mux.Handle("DELETE /api/v1/devices/{id}", jwtAuth(http.HandlerFunc(devicesHandler.Delete)))
// Preference endpoints (JWT auth)
mux.Handle("GET /api/v1/preferences", jwtAuth(http.HandlerFunc(prefsHandler.Get)))
mux.Handle("PUT /api/v1/preferences", jwtAuth(http.HandlerFunc(prefsHandler.Update)))
corsHandler := cors.New(cors.Options{
AllowedOrigins: cfg.CORSOrigins,
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Content-Type", "Authorization", "X-Service-Key"},
AllowCredentials: true,
}).Handler(mux)
server := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: corsHandler,
ReadTimeout: 30 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
MaxHeaderBytes: 1 << 20,
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
slog.Info("mana-notify started", "port", cfg.Port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("server error", "error", err)
os.Exit(1)
}
}()
<-sigCh
slog.Info("shutting down...")
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
slog.Error("shutdown error", "error", err)
}
slog.Info("server stopped")
}

View file

@ -0,0 +1,26 @@
module github.com/manacore/mana-notify
go 1.25.0
require (
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/jackc/pgx/v5 v5.9.1
github.com/prometheus/client_golang v1.22.0
github.com/rs/cors v1.11.1
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.29.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
)

View file

@ -0,0 +1,54 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View file

@ -0,0 +1,126 @@
package auth
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"github.com/golang-jwt/jwt/v5"
)
type User struct {
UserID string `json:"userId"`
Email string `json:"email"`
Role string `json:"role"`
SessionID string `json:"sessionId"`
}
type contextKey string
const UserContextKey contextKey = "user"
// ValidateServiceKey checks the X-Service-Key header.
func ValidateServiceKey(serviceKey string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := r.Header.Get("X-Service-Key")
if key == "" || key != serviceKey {
http.Error(w, `{"error":"unauthorized: invalid service key"}`, http.StatusUnauthorized)
return
}
next.ServeHTTP(w, r)
})
}
}
// ValidateJWT validates Bearer tokens against mana-core-auth JWKS.
func ValidateJWT(authURL string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
header := r.Header.Get("Authorization")
if !strings.HasPrefix(header, "Bearer ") {
http.Error(w, `{"error":"unauthorized: missing token"}`, http.StatusUnauthorized)
return
}
tokenStr := strings.TrimPrefix(header, "Bearer ")
// Validate against auth service
user, err := validateToken(r.Context(), authURL, tokenStr)
if err != nil {
slog.Warn("jwt validation failed", "error", err)
http.Error(w, `{"error":"unauthorized: invalid token"}`, http.StatusUnauthorized)
return
}
ctx := context.WithValue(r.Context(), UserContextKey, user)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
// GetUser extracts the authenticated user from context.
func GetUser(r *http.Request) *User {
u, ok := r.Context().Value(UserContextKey).(*User)
if !ok {
return nil
}
return u
}
func validateToken(ctx context.Context, authURL, tokenStr string) (*User, error) {
// Parse without verification first to get claims
parser := jwt.NewParser(jwt.WithoutClaimsValidation())
token, _, err := parser.ParseUnverified(tokenStr, jwt.MapClaims{})
if err != nil {
return nil, fmt.Errorf("parse token: %w", err)
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
return nil, fmt.Errorf("invalid claims")
}
// Validate via auth service
client := &http.Client{Timeout: 5 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, authURL+"/api/v1/auth/validate", strings.NewReader(`{"token":"`+tokenStr+`"}`))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("auth service unavailable: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("token validation failed: %d", resp.StatusCode)
}
var result struct {
Valid bool `json:"valid"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
if !result.Valid {
return nil, fmt.Errorf("token invalid")
}
sub, _ := claims["sub"].(string)
email, _ := claims["email"].(string)
role, _ := claims["role"].(string)
sid, _ := claims["sid"].(string)
return &User{
UserID: sub,
Email: email,
Role: role,
SessionID: sid,
}, nil
}

View file

@ -0,0 +1,144 @@
package channel
import (
"crypto/tls"
"fmt"
"log/slog"
"net/smtp"
"strings"
"time"
"github.com/manacore/mana-notify/internal/config"
)
type EmailService struct {
host string
port int
user string
password string
from string
}
func NewEmailService(cfg *config.Config) *EmailService {
return &EmailService{
host: cfg.SMTPHost,
port: cfg.SMTPPort,
user: cfg.SMTPUser,
password: cfg.SMTPPassword,
from: cfg.SMTPFrom,
}
}
type EmailMessage struct {
To string
Subject string
HTML string
Text string
From string
ReplyTo string
}
type EmailResult struct {
Success bool
MessageID string
Error string
}
func (s *EmailService) IsConfigured() bool {
return s.user != "" && s.password != ""
}
func (s *EmailService) Send(msg *EmailMessage) EmailResult {
start := time.Now()
if !s.IsConfigured() {
slog.Warn("smtp not configured, skipping email", "to", msg.To)
return EmailResult{Success: false, Error: "SMTP not configured"}
}
from := s.from
if msg.From != "" {
from = msg.From
}
// Build email headers and body
var builder strings.Builder
builder.WriteString(fmt.Sprintf("From: %s\r\n", from))
builder.WriteString(fmt.Sprintf("To: %s\r\n", msg.To))
builder.WriteString(fmt.Sprintf("Subject: %s\r\n", msg.Subject))
if msg.ReplyTo != "" {
builder.WriteString(fmt.Sprintf("Reply-To: %s\r\n", msg.ReplyTo))
}
builder.WriteString("MIME-Version: 1.0\r\n")
builder.WriteString("Content-Type: text/html; charset=\"UTF-8\"\r\n")
builder.WriteString("\r\n")
if msg.HTML != "" {
builder.WriteString(msg.HTML)
} else {
builder.WriteString(msg.Text)
}
// Extract email from "Name <email@example.com>" format
fromAddr := extractEmail(from)
addr := fmt.Sprintf("%s:%d", s.host, s.port)
auth := smtp.PlainAuth("", s.user, s.password, s.host)
tlsConfig := &tls.Config{ServerName: s.host}
conn, err := tls.Dial("tcp", addr, tlsConfig)
if err != nil {
// Try STARTTLS fallback
err = smtp.SendMail(addr, auth, fromAddr, []string{msg.To}, []byte(builder.String()))
if err != nil {
slog.Error("email send failed", "to", msg.To, "error", err, "duration", time.Since(start))
return EmailResult{Success: false, Error: err.Error()}
}
slog.Info("email sent via STARTTLS", "to", msg.To, "duration", time.Since(start))
return EmailResult{Success: true}
}
defer conn.Close()
client, err := smtp.NewClient(conn, s.host)
if err != nil {
return EmailResult{Success: false, Error: err.Error()}
}
defer client.Close()
if err := client.Auth(auth); err != nil {
return EmailResult{Success: false, Error: err.Error()}
}
if err := client.Mail(fromAddr); err != nil {
return EmailResult{Success: false, Error: err.Error()}
}
if err := client.Rcpt(msg.To); err != nil {
return EmailResult{Success: false, Error: err.Error()}
}
w, err := client.Data()
if err != nil {
return EmailResult{Success: false, Error: err.Error()}
}
if _, err := w.Write([]byte(builder.String())); err != nil {
return EmailResult{Success: false, Error: err.Error()}
}
if err := w.Close(); err != nil {
return EmailResult{Success: false, Error: err.Error()}
}
client.Quit()
slog.Info("email sent", "to", msg.To, "duration", time.Since(start))
return EmailResult{Success: true}
}
func extractEmail(from string) string {
if idx := strings.Index(from, "<"); idx != -1 {
end := strings.Index(from, ">")
if end > idx {
return from[idx+1 : end]
}
}
return from
}

View file

@ -0,0 +1,97 @@
package channel
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"math/rand"
"net/http"
"time"
"github.com/manacore/mana-notify/internal/config"
)
type MatrixService struct {
homeserverURL string
accessToken string
client *http.Client
}
func NewMatrixService(cfg *config.Config) *MatrixService {
return &MatrixService{
homeserverURL: cfg.MatrixHomeserverURL,
accessToken: cfg.MatrixAccessToken,
client: &http.Client{Timeout: 10 * time.Second},
}
}
type MatrixMessage struct {
RoomID string
Body string
FormattedBody string
MsgType string // "m.text" or "m.notice"
}
type MatrixResult struct {
Success bool
EventID string
Error string
}
func (s *MatrixService) IsConfigured() bool {
return s.homeserverURL != "" && s.accessToken != ""
}
func (s *MatrixService) Send(ctx context.Context, msg *MatrixMessage) MatrixResult {
if !s.IsConfigured() {
return MatrixResult{Success: false, Error: "Matrix not configured"}
}
msgType := msg.MsgType
if msgType == "" {
msgType = "m.text"
}
txnID := fmt.Sprintf("mana_%d_%d", time.Now().UnixMilli(), rand.Intn(100000))
payload := map[string]string{
"msgtype": msgType,
"body": msg.Body,
}
if msg.FormattedBody != "" {
payload["format"] = "org.matrix.custom.html"
payload["formatted_body"] = msg.FormattedBody
}
body, _ := json.Marshal(payload)
url := fmt.Sprintf("%s/_matrix/client/v3/rooms/%s/send/m.room.message/%s",
s.homeserverURL, msg.RoomID, txnID)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return MatrixResult{Success: false, Error: err.Error()}
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+s.accessToken)
resp, err := s.client.Do(req)
if err != nil {
slog.Error("matrix send failed", "room", msg.RoomID, "error", err)
return MatrixResult{Success: false, Error: err.Error()}
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return MatrixResult{Success: false, Error: fmt.Sprintf("matrix returned %d", resp.StatusCode)}
}
var result struct {
EventID string `json:"event_id"`
}
json.NewDecoder(resp.Body).Decode(&result)
return MatrixResult{Success: true, EventID: result.EventID}
}

View file

@ -0,0 +1,169 @@
package channel
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
"github.com/manacore/mana-notify/internal/config"
)
const expoPushURL = "https://exp.host/--/api/v2/push/send"
type PushService struct {
accessToken string
client *http.Client
}
func NewPushService(cfg *config.Config) *PushService {
return &PushService{
accessToken: cfg.ExpoAccessToken,
client: &http.Client{Timeout: 30 * time.Second},
}
}
type PushMessage struct {
To string `json:"to"`
Title string `json:"title,omitempty"`
Body string `json:"body"`
Data map[string]any `json:"data,omitempty"`
Sound string `json:"sound,omitempty"`
Badge *int `json:"badge,omitempty"`
ChannelID string `json:"channelId,omitempty"`
}
type PushResult struct {
Success bool
TicketID string
Error string
}
func (s *PushService) IsConfigured() bool {
return s.accessToken != ""
}
// IsExpoPushToken validates Expo token format.
func IsExpoPushToken(token string) bool {
return strings.HasPrefix(token, "ExponentPushToken[") || strings.HasPrefix(token, "ExpoPushToken[")
}
// SendToTokens sends push notifications in batches of 100 (Expo limit).
func (s *PushService) SendToTokens(ctx context.Context, tokens []string, title, body string, data map[string]any, sound string, badge *int) map[string]PushResult {
results := make(map[string]PushResult, len(tokens))
if !s.IsConfigured() {
for _, t := range tokens {
results[t] = PushResult{Success: false, Error: "Expo not configured"}
}
return results
}
// Build messages
messages := make([]PushMessage, 0, len(tokens))
for _, token := range tokens {
messages = append(messages, PushMessage{
To: token,
Title: title,
Body: body,
Data: data,
Sound: sound,
Badge: badge,
})
}
// Chunk into batches of 100
for i := 0; i < len(messages); i += 100 {
end := i + 100
if end > len(messages) {
end = len(messages)
}
chunk := messages[i:end]
batchResults := s.sendBatch(ctx, chunk)
for token, result := range batchResults {
results[token] = result
}
}
return results
}
func (s *PushService) sendBatch(ctx context.Context, messages []PushMessage) map[string]PushResult {
results := make(map[string]PushResult, len(messages))
body, err := json.Marshal(messages)
if err != nil {
for _, m := range messages {
results[m.To] = PushResult{Success: false, Error: "marshal error"}
}
return results
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, expoPushURL, bytes.NewReader(body))
if err != nil {
for _, m := range messages {
results[m.To] = PushResult{Success: false, Error: "request error"}
}
return results
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
if s.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+s.accessToken)
}
resp, err := s.client.Do(req)
if err != nil {
slog.Error("expo push failed", "error", err)
for _, m := range messages {
results[m.To] = PushResult{Success: false, Error: err.Error()}
}
return results
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
var expoResp struct {
Data []struct {
Status string `json:"status"`
ID string `json:"id"`
Message string `json:"message"`
Details struct {
Error string `json:"error"`
} `json:"details"`
} `json:"data"`
}
if err := json.Unmarshal(respBody, &expoResp); err != nil {
for _, m := range messages {
results[m.To] = PushResult{Success: false, Error: fmt.Sprintf("decode error: %s", err)}
}
return results
}
for i, ticket := range expoResp.Data {
if i >= len(messages) {
break
}
token := messages[i].To
if ticket.Status == "ok" {
results[token] = PushResult{Success: true, TicketID: ticket.ID}
} else {
errMsg := ticket.Message
if ticket.Details.Error != "" {
errMsg = ticket.Details.Error
}
results[token] = PushResult{Success: false, Error: errMsg}
}
}
return results
}

View file

@ -0,0 +1,91 @@
package channel
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"time"
)
type WebhookService struct {
client *http.Client
}
func NewWebhookService() *WebhookService {
return &WebhookService{
client: &http.Client{Timeout: 10 * time.Second},
}
}
type WebhookMessage struct {
URL string
Method string // POST or PUT
Headers map[string]string
Body map[string]any
Timeout int // ms
}
type WebhookResult struct {
Success bool
StatusCode int
Error string
DurationMs int
}
func (s *WebhookService) Send(ctx context.Context, msg *WebhookMessage) WebhookResult {
start := time.Now()
method := msg.Method
if method == "" {
method = http.MethodPost
}
timeout := 10 * time.Second
if msg.Timeout > 0 {
timeout = time.Duration(msg.Timeout) * time.Millisecond
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
body, err := json.Marshal(msg.Body)
if err != nil {
return WebhookResult{Success: false, Error: "marshal error", DurationMs: int(time.Since(start).Milliseconds())}
}
req, err := http.NewRequestWithContext(ctx, method, msg.URL, bytes.NewReader(body))
if err != nil {
return WebhookResult{Success: false, Error: err.Error(), DurationMs: int(time.Since(start).Milliseconds())}
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "ManaNotify/1.0")
for k, v := range msg.Headers {
req.Header.Set(k, v)
}
resp, err := s.client.Do(req)
durationMs := int(time.Since(start).Milliseconds())
if err != nil {
slog.Error("webhook send failed", "url", msg.URL, "error", err)
return WebhookResult{Success: false, Error: err.Error(), DurationMs: durationMs}
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
success := resp.StatusCode >= 200 && resp.StatusCode < 300
if !success {
return WebhookResult{
Success: false,
StatusCode: resp.StatusCode,
Error: fmt.Sprintf("webhook returned %d", resp.StatusCode),
DurationMs: durationMs,
}
}
return WebhookResult{Success: true, StatusCode: resp.StatusCode, DurationMs: durationMs}
}

View file

@ -0,0 +1,98 @@
package config
import (
"os"
"strconv"
"strings"
)
type Config struct {
Port int
// Database
DatabaseURL string
// Redis
RedisHost string
RedisPort int
RedisPassword string
// Auth
ServiceKey string
ManaCoreAuthURL string
// SMTP (Brevo)
SMTPHost string
SMTPPort int
SMTPUser string
SMTPPassword string
SMTPFrom string
// Expo Push
ExpoAccessToken string
// Matrix
MatrixHomeserverURL string
MatrixAccessToken string
// Rate Limits
RateLimitEmailPerMinute int
RateLimitPushPerMinute int
// CORS
CORSOrigins []string
}
func Load() *Config {
return &Config{
Port: getEnvInt("PORT", 3040),
DatabaseURL: getEnv("DATABASE_URL", "postgresql://manacore:manacore@localhost:5432/mana_notify"),
RedisHost: getEnv("REDIS_HOST", "localhost"),
RedisPort: getEnvInt("REDIS_PORT", 6379),
RedisPassword: getEnv("REDIS_PASSWORD", ""),
ServiceKey: getEnv("SERVICE_KEY", "dev-service-key"),
ManaCoreAuthURL: getEnv("MANA_CORE_AUTH_URL", "http://localhost:3001"),
SMTPHost: getEnv("SMTP_HOST", "smtp-relay.brevo.com"),
SMTPPort: getEnvInt("SMTP_PORT", 587),
SMTPUser: getEnv("SMTP_USER", ""),
SMTPPassword: getEnv("SMTP_PASSWORD", ""),
SMTPFrom: getEnv("SMTP_FROM", "ManaCore <noreply@mana.how>"),
ExpoAccessToken: getEnv("EXPO_ACCESS_TOKEN", ""),
MatrixHomeserverURL: getEnv("MATRIX_HOMESERVER_URL", ""),
MatrixAccessToken: getEnv("MATRIX_ACCESS_TOKEN", ""),
RateLimitEmailPerMinute: getEnvInt("RATE_LIMIT_EMAIL_PER_MINUTE", 10),
RateLimitPushPerMinute: getEnvInt("RATE_LIMIT_PUSH_PER_MINUTE", 100),
CORSOrigins: getEnvSlice("CORS_ORIGINS", []string{"http://localhost:3000", "http://localhost:5173"}),
}
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func getEnvInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if i, err := strconv.Atoi(v); err == nil {
return i
}
}
return fallback
}
func getEnvSlice(key string, fallback []string) []string {
if v := os.Getenv(key); v != "" {
return strings.Split(v, ",")
}
return fallback
}

View file

@ -0,0 +1,62 @@
package db
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type DB struct {
Pool *pgxpool.Pool
}
func New(ctx context.Context, databaseURL string) (*DB, error) {
cfg, err := pgxpool.ParseConfig(databaseURL)
if err != nil {
return nil, fmt.Errorf("parse database url: %w", err)
}
cfg.MaxConns = 20
cfg.MinConns = 2
cfg.MaxConnLifetime = 30 * time.Minute
cfg.MaxConnIdleTime = 5 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping database: %w", err)
}
slog.Info("database connected", "url", redactURL(databaseURL))
db := &DB{Pool: pool}
if err := db.migrate(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("run migrations: %w", err)
}
return db, nil
}
func (d *DB) Close() {
d.Pool.Close()
}
func (d *DB) HealthCheck(ctx context.Context) error {
return d.Pool.Ping(ctx)
}
func redactURL(url string) string {
// Hide password from logs
if idx := len(url); idx > 30 {
return url[:30] + "..."
}
return url
}

View file

@ -0,0 +1,136 @@
package db
import (
"context"
"log/slog"
)
func (d *DB) migrate(ctx context.Context) error {
slog.Info("running database migrations")
migrations := []string{
`CREATE SCHEMA IF NOT EXISTS notify`,
// Enum types (idempotent with DO blocks)
`DO $$ BEGIN
CREATE TYPE notify.channel_type AS ENUM ('email', 'push', 'matrix', 'webhook');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$`,
`DO $$ BEGIN
CREATE TYPE notify.notification_status AS ENUM ('pending', 'processing', 'delivered', 'failed', 'cancelled');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$`,
`DO $$ BEGIN
CREATE TYPE notify.priority_type AS ENUM ('low', 'normal', 'high', 'critical');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$`,
// Notifications table
`CREATE TABLE IF NOT EXISTS notify.notifications (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id TEXT,
app_id VARCHAR(50) NOT NULL,
channel notify.channel_type NOT NULL,
template_id VARCHAR(100),
subject VARCHAR(500),
body TEXT,
data JSONB,
status notify.notification_status NOT NULL DEFAULT 'pending',
priority notify.priority_type NOT NULL DEFAULT 'normal',
scheduled_for TIMESTAMPTZ,
recipient VARCHAR(500),
external_id VARCHAR(255),
attempts INTEGER NOT NULL DEFAULT 0,
delivered_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)`,
`CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notify.notifications (user_id)`,
`CREATE INDEX IF NOT EXISTS idx_notifications_app_id ON notify.notifications (app_id)`,
`CREATE INDEX IF NOT EXISTS idx_notifications_status ON notify.notifications (status)`,
`CREATE INDEX IF NOT EXISTS idx_notifications_scheduled_for ON notify.notifications (scheduled_for)`,
`CREATE INDEX IF NOT EXISTS idx_notifications_external_id ON notify.notifications (external_id)`,
// Templates table
`CREATE TABLE IF NOT EXISTS notify.templates (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
slug VARCHAR(100) NOT NULL,
app_id VARCHAR(50),
channel notify.channel_type NOT NULL,
subject VARCHAR(500),
body_template TEXT NOT NULL,
locale VARCHAR(10) NOT NULL DEFAULT 'de-DE',
is_active BOOLEAN NOT NULL DEFAULT true,
is_system BOOLEAN NOT NULL DEFAULT false,
variables JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(slug, locale)
)`,
`CREATE INDEX IF NOT EXISTS idx_templates_app_id ON notify.templates (app_id)`,
`CREATE INDEX IF NOT EXISTS idx_templates_channel ON notify.templates (channel)`,
// Devices table
`CREATE TABLE IF NOT EXISTS notify.devices (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id TEXT NOT NULL,
push_token TEXT NOT NULL UNIQUE,
token_type VARCHAR(20) NOT NULL DEFAULT 'expo',
platform VARCHAR(20),
device_name VARCHAR(100),
app_id VARCHAR(50),
is_active BOOLEAN NOT NULL DEFAULT true,
last_seen_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)`,
`CREATE INDEX IF NOT EXISTS idx_devices_user_id ON notify.devices (user_id)`,
// Preferences table
`CREATE TABLE IF NOT EXISTS notify.preferences (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id TEXT NOT NULL UNIQUE,
email_enabled BOOLEAN NOT NULL DEFAULT false,
push_enabled BOOLEAN NOT NULL DEFAULT true,
quiet_hours_enabled BOOLEAN NOT NULL DEFAULT false,
quiet_hours_start VARCHAR(5),
quiet_hours_end VARCHAR(5),
timezone VARCHAR(50) NOT NULL DEFAULT 'Europe/Berlin',
category_preferences JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)`,
// Delivery logs table
`CREATE TABLE IF NOT EXISTS notify.delivery_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
notification_id UUID NOT NULL REFERENCES notify.notifications(id) ON DELETE CASCADE,
attempt_number INTEGER NOT NULL,
channel notify.channel_type NOT NULL,
success BOOLEAN NOT NULL,
status_code INTEGER,
error_message TEXT,
provider_id VARCHAR(255),
duration_ms INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)`,
`CREATE INDEX IF NOT EXISTS idx_delivery_logs_notification_id ON notify.delivery_logs (notification_id)`,
`CREATE INDEX IF NOT EXISTS idx_delivery_logs_success ON notify.delivery_logs (success)`,
}
for _, sql := range migrations {
if _, err := d.Pool.Exec(ctx, sql); err != nil {
return err
}
}
slog.Info("migrations completed")
return nil
}

View file

@ -0,0 +1,80 @@
package db
import "time"
type Notification struct {
ID string `json:"id"`
UserID *string `json:"userId,omitempty"`
AppID string `json:"appId"`
Channel string `json:"channel"`
TemplateID *string `json:"templateId,omitempty"`
Subject *string `json:"subject,omitempty"`
Body *string `json:"body,omitempty"`
Data []byte `json:"data,omitempty"` // JSONB
Status string `json:"status"`
Priority string `json:"priority"`
ScheduledFor *time.Time `json:"scheduledFor,omitempty"`
Recipient *string `json:"recipient,omitempty"`
ExternalID *string `json:"externalId,omitempty"`
Attempts int `json:"attempts"`
DeliveredAt *time.Time `json:"deliveredAt,omitempty"`
ErrorMessage *string `json:"errorMessage,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type Template struct {
ID string `json:"id"`
Slug string `json:"slug"`
AppID *string `json:"appId,omitempty"`
Channel string `json:"channel"`
Subject *string `json:"subject,omitempty"`
BodyTemplate string `json:"bodyTemplate"`
Locale string `json:"locale"`
IsActive bool `json:"isActive"`
IsSystem bool `json:"isSystem"`
Variables []byte `json:"variables,omitempty"` // JSONB
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type Device struct {
ID string `json:"id"`
UserID string `json:"userId"`
PushToken string `json:"pushToken"`
TokenType string `json:"tokenType"`
Platform *string `json:"platform,omitempty"`
DeviceName *string `json:"deviceName,omitempty"`
AppID *string `json:"appId,omitempty"`
IsActive bool `json:"isActive"`
LastSeenAt *time.Time `json:"lastSeenAt,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type Preference struct {
ID string `json:"id"`
UserID string `json:"userId"`
EmailEnabled bool `json:"emailEnabled"`
PushEnabled bool `json:"pushEnabled"`
QuietHoursEnabled bool `json:"quietHoursEnabled"`
QuietHoursStart *string `json:"quietHoursStart,omitempty"`
QuietHoursEnd *string `json:"quietHoursEnd,omitempty"`
Timezone string `json:"timezone"`
CategoryPreferences []byte `json:"categoryPreferences,omitempty"` // JSONB
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type DeliveryLog struct {
ID string `json:"id"`
NotificationID string `json:"notificationId"`
AttemptNumber int `json:"attemptNumber"`
Channel string `json:"channel"`
Success bool `json:"success"`
StatusCode *int `json:"statusCode,omitempty"`
ErrorMessage *string `json:"errorMessage,omitempty"`
ProviderID *string `json:"providerId,omitempty"`
DurationMs *int `json:"durationMs,omitempty"`
CreatedAt time.Time `json:"createdAt"`
}

View file

@ -0,0 +1,24 @@
package handler
import (
"encoding/json"
"net/http"
"time"
)
func writeJSON(w http.ResponseWriter, status int, data any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(data)
}
func writeError(w http.ResponseWriter, status int, message string) {
writeJSON(w, status, map[string]any{
"success": false,
"error": map[string]any{
"statusCode": status,
"message": message,
"timestamp": time.Now().UTC().Format(time.RFC3339),
},
})
}

View file

@ -0,0 +1,121 @@
package handler
import (
"encoding/json"
"net/http"
"github.com/manacore/mana-notify/internal/auth"
"github.com/manacore/mana-notify/internal/db"
)
type DevicesHandler struct {
db *db.DB
}
func NewDevicesHandler(database *db.DB) *DevicesHandler {
return &DevicesHandler{db: database}
}
// Register handles POST /api/v1/devices/register
func (h *DevicesHandler) Register(w http.ResponseWriter, r *http.Request) {
user := auth.GetUser(r)
if user == nil {
writeError(w, http.StatusUnauthorized, "unauthorized")
return
}
var req struct {
PushToken string `json:"pushToken"`
TokenType string `json:"tokenType,omitempty"`
Platform string `json:"platform,omitempty"`
DeviceName string `json:"deviceName,omitempty"`
AppID string `json:"appId,omitempty"`
}
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.PushToken == "" {
writeError(w, http.StatusBadRequest, "pushToken is required")
return
}
tokenType := req.TokenType
if tokenType == "" {
tokenType = "expo"
}
// Upsert: transfer ownership if token exists for different user
var id string
err := h.db.Pool.QueryRow(r.Context(),
`INSERT INTO notify.devices (user_id, push_token, token_type, platform, device_name, app_id)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (push_token) DO UPDATE SET
user_id = EXCLUDED.user_id,
is_active = true,
last_seen_at = NOW(),
updated_at = NOW()
RETURNING id`,
user.UserID, req.PushToken, tokenType, nilIfEmpty(req.Platform), nilIfEmpty(req.DeviceName), nilIfEmpty(req.AppID),
).Scan(&id)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to register device")
return
}
writeJSON(w, http.StatusCreated, map[string]any{"device": map[string]any{"id": id}})
}
// List handles GET /api/v1/devices
func (h *DevicesHandler) List(w http.ResponseWriter, r *http.Request) {
user := auth.GetUser(r)
if user == nil {
writeError(w, http.StatusUnauthorized, "unauthorized")
return
}
rows, err := h.db.Pool.Query(r.Context(),
`SELECT id, user_id, push_token, token_type, platform, device_name, app_id, is_active, last_seen_at, created_at, updated_at
FROM notify.devices WHERE user_id = $1 AND is_active = true ORDER BY created_at DESC`, user.UserID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list devices")
return
}
defer rows.Close()
var devices []db.Device
for rows.Next() {
var d db.Device
if err := rows.Scan(&d.ID, &d.UserID, &d.PushToken, &d.TokenType, &d.Platform,
&d.DeviceName, &d.AppID, &d.IsActive, &d.LastSeenAt, &d.CreatedAt, &d.UpdatedAt); err != nil {
continue
}
devices = append(devices, d)
}
writeJSON(w, http.StatusOK, map[string]any{"devices": devices})
}
// Delete handles DELETE /api/v1/devices/{id}
func (h *DevicesHandler) Delete(w http.ResponseWriter, r *http.Request) {
user := auth.GetUser(r)
if user == nil {
writeError(w, http.StatusUnauthorized, "unauthorized")
return
}
id := r.PathValue("id")
result, err := h.db.Pool.Exec(r.Context(),
`UPDATE notify.devices SET is_active = false, updated_at = NOW() WHERE id = $1 AND user_id = $2`, id, user.UserID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to delete device")
return
}
if result.RowsAffected() == 0 {
writeError(w, http.StatusNotFound, "device not found")
return
}
writeJSON(w, http.StatusOK, map[string]any{"deleted": true})
}

View file

@ -0,0 +1,38 @@
package handler
import (
"net/http"
"time"
"github.com/manacore/mana-notify/internal/db"
)
type HealthHandler struct {
db *db.DB
startTime time.Time
}
func NewHealthHandler(database *db.DB) *HealthHandler {
return &HealthHandler{db: database, startTime: time.Now()}
}
func (h *HealthHandler) Health(w http.ResponseWriter, r *http.Request) {
dbOK := h.db.HealthCheck(r.Context()) == nil
status := "healthy"
if !dbOK {
status = "unhealthy"
}
writeJSON(w, http.StatusOK, map[string]any{
"status": status,
"version": "1.0.0",
"service": "mana-notify",
"uptime": time.Since(h.startTime).Seconds(),
"timestamp": time.Now().UTC().Format(time.RFC3339),
"services": map[string]any{
"database": dbOK,
"redis": true,
},
})
}

View file

@ -0,0 +1,504 @@
package handler
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/manacore/mana-notify/internal/db"
"github.com/manacore/mana-notify/internal/queue"
tmpl "github.com/manacore/mana-notify/internal/template"
)
type NotificationsHandler struct {
db *db.DB
pool *queue.WorkerPool
engine *tmpl.Engine
}
func NewNotificationsHandler(database *db.DB, pool *queue.WorkerPool, engine *tmpl.Engine) *NotificationsHandler {
return &NotificationsHandler{db: database, pool: pool, engine: engine}
}
type SendRequest struct {
Channel string `json:"channel"`
AppID string `json:"appId"`
UserID string `json:"userId,omitempty"`
Recipient string `json:"recipient,omitempty"`
Recipients []string `json:"recipients,omitempty"`
Template string `json:"template,omitempty"`
Subject string `json:"subject,omitempty"`
Body string `json:"body,omitempty"`
Data map[string]any `json:"data,omitempty"`
Priority string `json:"priority,omitempty"`
ExternalID string `json:"externalId,omitempty"`
EmailOptions *EmailOptions `json:"emailOptions,omitempty"`
PushOptions *PushOptions `json:"pushOptions,omitempty"`
WebhookOptions *WebhookOptions `json:"webhookOptions,omitempty"`
MatrixOptions *MatrixOptions `json:"matrixOptions,omitempty"`
}
type EmailOptions struct {
From string `json:"from,omitempty"`
ReplyTo string `json:"replyTo,omitempty"`
}
type PushOptions struct {
Sound string `json:"sound,omitempty"`
Badge *int `json:"badge,omitempty"`
ChannelID string `json:"channelId,omitempty"`
}
type WebhookOptions struct {
Method string `json:"method,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
Timeout int `json:"timeout,omitempty"`
}
type MatrixOptions struct {
MsgType string `json:"msgtype,omitempty"`
FormattedBody string `json:"formattedBody,omitempty"`
}
type ScheduleRequest struct {
SendRequest
ScheduledFor string `json:"scheduledFor"`
}
type BatchRequest struct {
Notifications []SendRequest `json:"notifications"`
}
// Send handles POST /api/v1/notifications/send
func (h *NotificationsHandler) Send(w http.ResponseWriter, r *http.Request) {
var req SendRequest
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if err := validateSendRequest(&req); err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
// Check idempotency
if req.ExternalID != "" {
var existingID string
err := h.db.Pool.QueryRow(r.Context(),
`SELECT id FROM notify.notifications WHERE external_id = $1`, req.ExternalID,
).Scan(&existingID)
if err == nil {
writeJSON(w, http.StatusOK, map[string]any{
"notification": map[string]any{"id": existingID, "status": "existing"},
"deduplicated": true,
})
return
}
}
// Check user preferences
if req.UserID != "" {
blocked, reason := h.checkPreferences(r.Context(), req.UserID, req.Channel)
if blocked {
writeJSON(w, http.StatusOK, map[string]any{
"notification": map[string]any{"status": "cancelled", "reason": reason},
})
return
}
}
// Render template
subject := req.Subject
body := req.Body
if req.Template != "" {
rendered, err := h.engine.RenderBySlug(r.Context(), req.Template, req.Data, "")
if err != nil {
slog.Warn("template render failed", "template", req.Template, "error", err)
} else {
if rendered.Subject != "" {
subject = rendered.Subject
}
if rendered.Body != "" {
body = rendered.Body
}
}
}
priority := req.Priority
if priority == "" {
priority = "normal"
}
// Create notification record
var notificationID string
err := h.db.Pool.QueryRow(r.Context(),
`INSERT INTO notify.notifications (user_id, app_id, channel, template_id, subject, body, data, priority, recipient, external_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING id`,
nilIfEmpty(req.UserID), req.AppID, req.Channel, nilIfEmpty(req.Template),
nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(req.Data),
priority, nilIfEmpty(req.Recipient), nilIfEmpty(req.ExternalID),
).Scan(&notificationID)
if err != nil {
slog.Error("create notification failed", "error", err)
writeError(w, http.StatusInternalServerError, "failed to create notification")
return
}
// Build and enqueue job
job := queue.Job{
NotificationID: notificationID,
Channel: req.Channel,
AppID: req.AppID,
Recipient: req.Recipient,
Subject: subject,
Body: body,
Data: req.Data,
}
if req.EmailOptions != nil {
job.From = req.EmailOptions.From
job.ReplyTo = req.EmailOptions.ReplyTo
}
if req.PushOptions != nil {
job.Sound = req.PushOptions.Sound
job.Badge = req.PushOptions.Badge
}
if req.MatrixOptions != nil {
job.RoomID = req.Recipient
job.MsgType = req.MatrixOptions.MsgType
job.FormattedBody = req.MatrixOptions.FormattedBody
}
if req.WebhookOptions != nil {
job.WebhookMethod = req.WebhookOptions.Method
job.WebhookHeaders = req.WebhookOptions.Headers
job.WebhookTimeout = req.WebhookOptions.Timeout
}
if req.Channel == "matrix" {
job.RoomID = req.Recipient
}
h.pool.Enqueue(job)
writeJSON(w, http.StatusAccepted, map[string]any{
"notification": map[string]any{
"id": notificationID,
"status": "pending",
},
})
}
// Schedule handles POST /api/v1/notifications/schedule
func (h *NotificationsHandler) Schedule(w http.ResponseWriter, r *http.Request) {
var req ScheduleRequest
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
scheduledFor, err := time.Parse(time.RFC3339, req.ScheduledFor)
if err != nil {
writeError(w, http.StatusBadRequest, "scheduledFor must be a valid RFC3339 timestamp")
return
}
if scheduledFor.Before(time.Now()) {
writeError(w, http.StatusBadRequest, "scheduledFor must be in the future")
return
}
if err := validateSendRequest(&req.SendRequest); err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
// Render template
subject := req.Subject
body := req.Body
if req.Template != "" {
rendered, err := h.engine.RenderBySlug(r.Context(), req.Template, req.Data, "")
if err == nil {
if rendered.Subject != "" {
subject = rendered.Subject
}
if rendered.Body != "" {
body = rendered.Body
}
}
}
priority := req.Priority
if priority == "" {
priority = "normal"
}
var notificationID string
err = h.db.Pool.QueryRow(r.Context(),
`INSERT INTO notify.notifications (user_id, app_id, channel, template_id, subject, body, data, priority, recipient, external_id, scheduled_for)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING id`,
nilIfEmpty(req.UserID), req.AppID, req.Channel, nilIfEmpty(req.Template),
nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(req.Data),
priority, nilIfEmpty(req.Recipient), nilIfEmpty(req.ExternalID), scheduledFor,
).Scan(&notificationID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create notification")
return
}
job := queue.Job{
NotificationID: notificationID,
Channel: req.Channel,
AppID: req.AppID,
Recipient: req.Recipient,
Subject: subject,
Body: body,
Data: req.Data,
ScheduleAt: &scheduledFor,
}
h.pool.Enqueue(job)
writeJSON(w, http.StatusAccepted, map[string]any{
"notification": map[string]any{
"id": notificationID,
"status": "pending",
"scheduledFor": scheduledFor.Format(time.RFC3339),
},
})
}
// Batch handles POST /api/v1/notifications/batch
func (h *NotificationsHandler) Batch(w http.ResponseWriter, r *http.Request) {
var req BatchRequest
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 5<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if len(req.Notifications) == 0 {
writeError(w, http.StatusBadRequest, "notifications array is required")
return
}
if len(req.Notifications) > 100 {
writeError(w, http.StatusBadRequest, "maximum 100 notifications per batch")
return
}
type batchResult struct {
ID string `json:"id,omitempty"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
}
results := make([]batchResult, len(req.Notifications))
succeeded := 0
failed := 0
for i, n := range req.Notifications {
if err := validateSendRequest(&n); err != nil {
results[i] = batchResult{Status: "failed", Error: err.Error()}
failed++
continue
}
subject := n.Subject
body := n.Body
if n.Template != "" {
rendered, err := h.engine.RenderBySlug(r.Context(), n.Template, n.Data, "")
if err == nil {
if rendered.Subject != "" {
subject = rendered.Subject
}
if rendered.Body != "" {
body = rendered.Body
}
}
}
priority := n.Priority
if priority == "" {
priority = "normal"
}
var notificationID string
err := h.db.Pool.QueryRow(r.Context(),
`INSERT INTO notify.notifications (user_id, app_id, channel, subject, body, data, priority, recipient)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id`,
nilIfEmpty(n.UserID), n.AppID, n.Channel,
nilIfEmpty(subject), nilIfEmpty(body), jsonOrNil(n.Data), priority, nilIfEmpty(n.Recipient),
).Scan(&notificationID)
if err != nil {
results[i] = batchResult{Status: "failed", Error: "database error"}
failed++
continue
}
h.pool.Enqueue(queue.Job{
NotificationID: notificationID,
Channel: n.Channel,
AppID: n.AppID,
Recipient: n.Recipient,
Subject: subject,
Body: body,
Data: n.Data,
})
results[i] = batchResult{ID: notificationID, Status: "pending"}
succeeded++
}
writeJSON(w, http.StatusAccepted, map[string]any{
"results": results,
"succeeded": succeeded,
"failed": failed,
})
}
// GetNotification handles GET /api/v1/notifications/{id}
func (h *NotificationsHandler) GetNotification(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
writeError(w, http.StatusBadRequest, "notification id required")
return
}
var n db.Notification
err := h.db.Pool.QueryRow(r.Context(),
`SELECT id, user_id, app_id, channel, template_id, subject, body, status, priority, scheduled_for, recipient, external_id, attempts, delivered_at, error_message, created_at, updated_at
FROM notify.notifications WHERE id = $1`, id,
).Scan(&n.ID, &n.UserID, &n.AppID, &n.Channel, &n.TemplateID, &n.Subject, &n.Body,
&n.Status, &n.Priority, &n.ScheduledFor, &n.Recipient, &n.ExternalID,
&n.Attempts, &n.DeliveredAt, &n.ErrorMessage, &n.CreatedAt, &n.UpdatedAt)
if err != nil {
writeError(w, http.StatusNotFound, "notification not found")
return
}
writeJSON(w, http.StatusOK, map[string]any{"notification": n})
}
// CancelNotification handles DELETE /api/v1/notifications/{id}
func (h *NotificationsHandler) CancelNotification(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
writeError(w, http.StatusBadRequest, "notification id required")
return
}
result, err := h.db.Pool.Exec(r.Context(),
`UPDATE notify.notifications SET status = 'cancelled', updated_at = NOW() WHERE id = $1 AND status = 'pending'`, id)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to cancel notification")
return
}
if result.RowsAffected() == 0 {
writeError(w, http.StatusNotFound, "notification not found or already processed")
return
}
writeJSON(w, http.StatusOK, map[string]any{"cancelled": true})
}
func (h *NotificationsHandler) checkPreferences(ctx context.Context, userID, ch string) (bool, string) {
var emailEnabled, pushEnabled, quietEnabled bool
var quietStart, quietEnd, timezone *string
err := h.db.Pool.QueryRow(ctx,
`SELECT email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone
FROM notify.preferences WHERE user_id = $1`, userID,
).Scan(&emailEnabled, &pushEnabled, &quietEnabled, &quietStart, &quietEnd, &timezone)
if err != nil {
return false, "" // No preferences = allow
}
// Check channel preferences
if ch == "email" && !emailEnabled {
return true, "email notifications disabled by user"
}
if ch == "push" && !pushEnabled {
return true, "push notifications disabled by user"
}
// Check quiet hours
if quietEnabled && quietStart != nil && quietEnd != nil {
tz := "Europe/Berlin"
if timezone != nil {
tz = *timezone
}
loc, err := time.LoadLocation(tz)
if err == nil {
now := time.Now().In(loc)
nowMinutes := now.Hour()*60 + now.Minute()
startH, startM := parseTime(*quietStart)
endH, endM := parseTime(*quietEnd)
startMinutes := startH*60 + startM
endMinutes := endH*60 + endM
var inQuiet bool
if startMinutes <= endMinutes {
inQuiet = nowMinutes >= startMinutes && nowMinutes < endMinutes
} else {
// Spans midnight (e.g. 22:00 to 08:00)
inQuiet = nowMinutes >= startMinutes || nowMinutes < endMinutes
}
if inQuiet {
return true, "quiet hours active"
}
}
}
return false, ""
}
func validateSendRequest(req *SendRequest) error {
if req.Channel == "" {
return fmt.Errorf("channel is required")
}
validChannels := map[string]bool{"email": true, "push": true, "matrix": true, "webhook": true}
if !validChannels[req.Channel] {
return fmt.Errorf("channel must be email, push, matrix, or webhook")
}
if req.AppID == "" {
return fmt.Errorf("appId is required")
}
if req.Recipient == "" && len(req.Recipients) == 0 && req.UserID == "" {
return fmt.Errorf("recipient, recipients, or userId is required")
}
if req.Template == "" && req.Body == "" {
return fmt.Errorf("template or body is required")
}
return nil
}
func parseTime(s string) (int, int) {
var h, m int
fmt.Sscanf(s, "%d:%d", &h, &m)
return h, m
}
func nilIfEmpty(s string) *string {
if s == "" {
return nil
}
return &s
}
func jsonOrNil(data map[string]any) []byte {
if data == nil {
return nil
}
b, err := json.Marshal(data)
if err != nil {
return nil
}
return b
}

View file

@ -0,0 +1,95 @@
package handler
import (
"encoding/json"
"net/http"
"github.com/manacore/mana-notify/internal/auth"
"github.com/manacore/mana-notify/internal/db"
)
type PreferencesHandler struct {
db *db.DB
}
func NewPreferencesHandler(database *db.DB) *PreferencesHandler {
return &PreferencesHandler{db: database}
}
// Get handles GET /api/v1/preferences
func (h *PreferencesHandler) Get(w http.ResponseWriter, r *http.Request) {
user := auth.GetUser(r)
if user == nil {
writeError(w, http.StatusUnauthorized, "unauthorized")
return
}
var p db.Preference
err := h.db.Pool.QueryRow(r.Context(),
`SELECT id, user_id, email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone, category_preferences, created_at, updated_at
FROM notify.preferences WHERE user_id = $1`, user.UserID,
).Scan(&p.ID, &p.UserID, &p.EmailEnabled, &p.PushEnabled, &p.QuietHoursEnabled,
&p.QuietHoursStart, &p.QuietHoursEnd, &p.Timezone, &p.CategoryPreferences, &p.CreatedAt, &p.UpdatedAt)
if err != nil {
// Return defaults
writeJSON(w, http.StatusOK, map[string]any{
"preferences": map[string]any{
"emailEnabled": false,
"pushEnabled": true,
"quietHoursEnabled": false,
"timezone": "Europe/Berlin",
},
})
return
}
writeJSON(w, http.StatusOK, map[string]any{"preferences": p})
}
// Update handles PUT /api/v1/preferences
func (h *PreferencesHandler) Update(w http.ResponseWriter, r *http.Request) {
user := auth.GetUser(r)
if user == nil {
writeError(w, http.StatusUnauthorized, "unauthorized")
return
}
var req struct {
EmailEnabled *bool `json:"emailEnabled,omitempty"`
PushEnabled *bool `json:"pushEnabled,omitempty"`
QuietHoursEnabled *bool `json:"quietHoursEnabled,omitempty"`
QuietHoursStart *string `json:"quietHoursStart,omitempty"`
QuietHoursEnd *string `json:"quietHoursEnd,omitempty"`
Timezone *string `json:"timezone,omitempty"`
CategoryPreferences map[string]any `json:"categoryPreferences,omitempty"`
}
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
catJSON, _ := json.Marshal(req.CategoryPreferences)
_, err := h.db.Pool.Exec(r.Context(),
`INSERT INTO notify.preferences (user_id, email_enabled, push_enabled, quiet_hours_enabled, quiet_hours_start, quiet_hours_end, timezone, category_preferences)
VALUES ($1, COALESCE($2, false), COALESCE($3, true), COALESCE($4, false), $5, $6, COALESCE($7, 'Europe/Berlin'), $8)
ON CONFLICT (user_id) DO UPDATE SET
email_enabled = COALESCE($2, notify.preferences.email_enabled),
push_enabled = COALESCE($3, notify.preferences.push_enabled),
quiet_hours_enabled = COALESCE($4, notify.preferences.quiet_hours_enabled),
quiet_hours_start = COALESCE($5, notify.preferences.quiet_hours_start),
quiet_hours_end = COALESCE($6, notify.preferences.quiet_hours_end),
timezone = COALESCE($7, notify.preferences.timezone),
category_preferences = COALESCE($8, notify.preferences.category_preferences),
updated_at = NOW()`,
user.UserID, req.EmailEnabled, req.PushEnabled, req.QuietHoursEnabled,
req.QuietHoursStart, req.QuietHoursEnd, req.Timezone, catJSON,
)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update preferences")
return
}
writeJSON(w, http.StatusOK, map[string]any{"updated": true})
}

View file

@ -0,0 +1,213 @@
package handler
import (
"encoding/json"
"net/http"
"github.com/manacore/mana-notify/internal/db"
tmpl "github.com/manacore/mana-notify/internal/template"
)
type TemplatesHandler struct {
db *db.DB
engine *tmpl.Engine
}
func NewTemplatesHandler(database *db.DB, engine *tmpl.Engine) *TemplatesHandler {
return &TemplatesHandler{db: database, engine: engine}
}
// List handles GET /api/v1/templates
func (h *TemplatesHandler) List(w http.ResponseWriter, r *http.Request) {
rows, err := h.db.Pool.Query(r.Context(),
`SELECT id, slug, app_id, channel, subject, body_template, locale, is_active, is_system, variables, created_at, updated_at
FROM notify.templates ORDER BY slug`)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list templates")
return
}
defer rows.Close()
var templates []db.Template
for rows.Next() {
var t db.Template
if err := rows.Scan(&t.ID, &t.Slug, &t.AppID, &t.Channel, &t.Subject, &t.BodyTemplate,
&t.Locale, &t.IsActive, &t.IsSystem, &t.Variables, &t.CreatedAt, &t.UpdatedAt); err != nil {
continue
}
templates = append(templates, t)
}
writeJSON(w, http.StatusOK, map[string]any{"templates": templates})
}
// Get handles GET /api/v1/templates/{slug}
func (h *TemplatesHandler) Get(w http.ResponseWriter, r *http.Request) {
slug := r.PathValue("slug")
locale := r.URL.Query().Get("locale")
if locale == "" {
locale = "de-DE"
}
var t db.Template
err := h.db.Pool.QueryRow(r.Context(),
`SELECT id, slug, app_id, channel, subject, body_template, locale, is_active, is_system, variables, created_at, updated_at
FROM notify.templates WHERE slug = $1 AND locale = $2`, slug, locale,
).Scan(&t.ID, &t.Slug, &t.AppID, &t.Channel, &t.Subject, &t.BodyTemplate,
&t.Locale, &t.IsActive, &t.IsSystem, &t.Variables, &t.CreatedAt, &t.UpdatedAt)
if err != nil {
writeError(w, http.StatusNotFound, "template not found")
return
}
writeJSON(w, http.StatusOK, map[string]any{"template": t})
}
// Create handles POST /api/v1/templates
func (h *TemplatesHandler) Create(w http.ResponseWriter, r *http.Request) {
var req struct {
Slug string `json:"slug"`
AppID string `json:"appId,omitempty"`
Channel string `json:"channel"`
Subject string `json:"subject,omitempty"`
BodyTemplate string `json:"bodyTemplate"`
Locale string `json:"locale,omitempty"`
Variables any `json:"variables,omitempty"`
}
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.Slug == "" || req.Channel == "" || req.BodyTemplate == "" {
writeError(w, http.StatusBadRequest, "slug, channel, and bodyTemplate are required")
return
}
if req.Locale == "" {
req.Locale = "de-DE"
}
varsJSON, _ := json.Marshal(req.Variables)
var id string
err := h.db.Pool.QueryRow(r.Context(),
`INSERT INTO notify.templates (slug, app_id, channel, subject, body_template, locale, variables)
VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id`,
req.Slug, nilIfEmpty(req.AppID), req.Channel, nilIfEmpty(req.Subject), req.BodyTemplate, req.Locale, varsJSON,
).Scan(&id)
if err != nil {
writeError(w, http.StatusConflict, "template already exists for this slug+locale")
return
}
writeJSON(w, http.StatusCreated, map[string]any{"id": id})
}
// Update handles PUT /api/v1/templates/{slug}
func (h *TemplatesHandler) Update(w http.ResponseWriter, r *http.Request) {
slug := r.PathValue("slug")
locale := r.URL.Query().Get("locale")
if locale == "" {
locale = "de-DE"
}
var req struct {
Subject string `json:"subject,omitempty"`
BodyTemplate string `json:"bodyTemplate,omitempty"`
IsActive *bool `json:"isActive,omitempty"`
Variables any `json:"variables,omitempty"`
}
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
result, err := h.db.Pool.Exec(r.Context(),
`UPDATE notify.templates SET
subject = COALESCE($1, subject),
body_template = COALESCE($2, body_template),
is_active = COALESCE($3, is_active),
updated_at = NOW()
WHERE slug = $4 AND locale = $5 AND is_system = false`,
nilIfEmpty(req.Subject), nilIfEmpty(req.BodyTemplate), req.IsActive, slug, locale,
)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update template")
return
}
if result.RowsAffected() == 0 {
writeError(w, http.StatusNotFound, "template not found or is a system template")
return
}
writeJSON(w, http.StatusOK, map[string]any{"updated": true})
}
// Delete handles DELETE /api/v1/templates/{slug}
func (h *TemplatesHandler) Delete(w http.ResponseWriter, r *http.Request) {
slug := r.PathValue("slug")
result, err := h.db.Pool.Exec(r.Context(),
`DELETE FROM notify.templates WHERE slug = $1 AND is_system = false`, slug)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to delete template")
return
}
if result.RowsAffected() == 0 {
writeError(w, http.StatusNotFound, "template not found or is a system template")
return
}
writeJSON(w, http.StatusOK, map[string]any{"deleted": true})
}
// Preview handles POST /api/v1/templates/{slug}/preview
func (h *TemplatesHandler) Preview(w http.ResponseWriter, r *http.Request) {
slug := r.PathValue("slug")
var req struct {
Data map[string]any `json:"data"`
}
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
rendered, err := h.engine.RenderBySlug(r.Context(), slug, req.Data, "")
if err != nil {
writeError(w, http.StatusNotFound, "template not found")
return
}
writeJSON(w, http.StatusOK, map[string]any{"subject": rendered.Subject, "body": rendered.Body})
}
// PreviewCustom handles POST /api/v1/templates/preview
func (h *TemplatesHandler) PreviewCustom(w http.ResponseWriter, r *http.Request) {
var req struct {
Subject string `json:"subject,omitempty"`
BodyTemplate string `json:"bodyTemplate"`
Data map[string]any `json:"data"`
}
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
subject := ""
if req.Subject != "" {
s, err := tmpl.RenderDirect(req.Subject, req.Data)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid subject template: "+err.Error())
return
}
subject = s
}
body, err := tmpl.RenderDirect(req.BodyTemplate, req.Data)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid body template: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"subject": subject, "body": body})
}

View file

@ -0,0 +1,70 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Metrics struct {
NotificationsSent *prometheus.CounterVec
NotificationsFailed *prometheus.CounterVec
EmailsSent *prometheus.CounterVec
PushSent *prometheus.CounterVec
MatrixSent *prometheus.CounterVec
WebhooksSent *prometheus.CounterVec
NotificationLatency *prometheus.HistogramVec
EmailLatency prometheus.Histogram
PushLatency prometheus.Histogram
}
func New() *Metrics {
return &Metrics{
NotificationsSent: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mana_notify_notifications_sent_total",
Help: "Total notifications sent",
}, []string{"channel", "app_id"}),
NotificationsFailed: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mana_notify_notifications_failed_total",
Help: "Total notifications failed",
}, []string{"channel", "app_id", "error_type"}),
EmailsSent: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mana_notify_emails_sent_total",
Help: "Total emails sent",
}, []string{"template", "status"}),
PushSent: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mana_notify_push_sent_total",
Help: "Total push notifications sent",
}, []string{"platform", "status"}),
MatrixSent: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mana_notify_matrix_sent_total",
Help: "Total Matrix messages sent",
}, []string{"status"}),
WebhooksSent: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mana_notify_webhooks_sent_total",
Help: "Total webhooks sent",
}, []string{"status"}),
NotificationLatency: promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "mana_notify_notification_latency_seconds",
Help: "Notification processing latency",
Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10},
}, []string{"channel"}),
EmailLatency: promauto.NewHistogram(prometheus.HistogramOpts{
Name: "mana_notify_email_latency_seconds",
Help: "Email sending latency",
Buckets: []float64{0.1, 0.5, 1, 2, 5, 10},
}),
PushLatency: promauto.NewHistogram(prometheus.HistogramOpts{
Name: "mana_notify_push_latency_seconds",
Help: "Push notification latency",
Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1},
}),
}
}

View file

@ -0,0 +1,318 @@
package queue
import (
"context"
"fmt"
"log/slog"
"math"
"time"
"github.com/manacore/mana-notify/internal/channel"
"github.com/manacore/mana-notify/internal/db"
"github.com/manacore/mana-notify/internal/metrics"
)
// Job represents a notification delivery job.
type Job struct {
NotificationID string
Channel string
AppID string
Recipient string
Subject string
Body string
Data map[string]any
// Channel-specific
From string
ReplyTo string
Tokens []string // Push: multiple tokens
Sound string // Push
Badge *int // Push
Platform string // Push
RoomID string // Matrix
FormattedBody string // Matrix
MsgType string // Matrix
WebhookMethod string // Webhook
WebhookHeaders map[string]string // Webhook
WebhookTimeout int // Webhook
// Retry
Attempt int
MaxRetries int
BackoffMs int
ScheduleAt *time.Time
}
// WorkerConfig per channel.
type WorkerConfig struct {
Concurrency int
MaxRetries int
BackoffMs int
}
var DefaultConfigs = map[string]WorkerConfig{
"email": {Concurrency: 5, MaxRetries: 3, BackoffMs: 5000},
"push": {Concurrency: 10, MaxRetries: 3, BackoffMs: 1000},
"matrix": {Concurrency: 5, MaxRetries: 3, BackoffMs: 2000},
"webhook": {Concurrency: 10, MaxRetries: 5, BackoffMs: 3000},
}
// WorkerPool manages goroutine-based workers for all channels.
type WorkerPool struct {
db *db.DB
email *channel.EmailService
push *channel.PushService
matrix *channel.MatrixService
webhook *channel.WebhookService
metrics *metrics.Metrics
jobs chan Job
ctx context.Context
cancel context.CancelFunc
}
func NewWorkerPool(database *db.DB, email *channel.EmailService, push *channel.PushService, matrix *channel.MatrixService, webhook *channel.WebhookService, m *metrics.Metrics) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
db: database,
email: email,
push: push,
matrix: matrix,
webhook: webhook,
metrics: m,
jobs: make(chan Job, 1000),
ctx: ctx,
cancel: cancel,
}
}
func (wp *WorkerPool) Start() {
// Start workers per channel type
for ch, cfg := range DefaultConfigs {
for i := 0; i < cfg.Concurrency; i++ {
go wp.worker(ch)
}
slog.Info("workers started", "channel", ch, "concurrency", cfg.Concurrency)
}
}
func (wp *WorkerPool) Stop() {
wp.cancel()
close(wp.jobs)
}
// Enqueue adds a job to the worker pool.
func (wp *WorkerPool) Enqueue(job Job) {
cfg := DefaultConfigs[job.Channel]
if job.MaxRetries == 0 {
job.MaxRetries = cfg.MaxRetries
}
if job.BackoffMs == 0 {
job.BackoffMs = cfg.BackoffMs
}
// Handle scheduled delivery
if job.ScheduleAt != nil && job.ScheduleAt.After(time.Now()) {
delay := time.Until(*job.ScheduleAt)
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-timer.C:
wp.jobs <- job
case <-wp.ctx.Done():
}
}()
return
}
wp.jobs <- job
}
func (wp *WorkerPool) worker(ch string) {
for {
select {
case job, ok := <-wp.jobs:
if !ok {
return
}
if job.Channel != ch {
// Put back for correct channel worker
wp.jobs <- job
continue
}
wp.processJob(job)
case <-wp.ctx.Done():
return
}
}
}
func (wp *WorkerPool) processJob(job Job) {
start := time.Now()
ctx := wp.ctx
// Update status to processing
wp.updateStatus(ctx, job.NotificationID, "processing", job.Attempt+1, nil)
var success bool
var providerID string
var statusCode *int
var errMsg string
defer func() {
if r := recover(); r != nil {
slog.Error("job panic", "notification", job.NotificationID, "panic", r)
errMsg = "internal panic"
success = false
}
duration := time.Since(start)
durationMs := int(duration.Milliseconds())
// Log delivery attempt
wp.logDelivery(ctx, job.NotificationID, job.Attempt+1, job.Channel, success, statusCode, errMsg, providerID, durationMs)
if success {
wp.updateStatus(ctx, job.NotificationID, "delivered", job.Attempt+1, nil)
wp.metrics.NotificationsSent.WithLabelValues(job.Channel, job.AppID).Inc()
} else if job.Attempt+1 < job.MaxRetries {
// Retry with exponential backoff
job.Attempt++
delay := time.Duration(float64(job.BackoffMs)*math.Pow(2, float64(job.Attempt-1))) * time.Millisecond
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-timer.C:
wp.jobs <- job
case <-wp.ctx.Done():
}
}()
slog.Info("retrying job", "notification", job.NotificationID, "attempt", job.Attempt, "delay", delay)
} else {
errStr := errMsg
wp.updateStatus(ctx, job.NotificationID, "failed", job.Attempt+1, &errStr)
wp.metrics.NotificationsFailed.WithLabelValues(job.Channel, job.AppID, "max_retries").Inc()
}
wp.metrics.NotificationLatency.WithLabelValues(job.Channel).Observe(duration.Seconds())
}()
switch job.Channel {
case "email":
result := wp.email.Send(&channel.EmailMessage{
To: job.Recipient,
Subject: job.Subject,
HTML: job.Body,
From: job.From,
ReplyTo: job.ReplyTo,
})
success = result.Success
providerID = result.MessageID
errMsg = result.Error
template := "custom"
status := "success"
if !success {
status = "failed"
}
wp.metrics.EmailsSent.WithLabelValues(template, status).Inc()
wp.metrics.EmailLatency.Observe(time.Since(start).Seconds())
case "push":
tokens := job.Tokens
if len(tokens) == 0 && job.Recipient != "" {
tokens = []string{job.Recipient}
}
results := wp.push.SendToTokens(ctx, tokens, job.Subject, job.Body, job.Data, job.Sound, job.Badge)
// Check if all succeeded
success = true
for _, r := range results {
if !r.Success {
success = false
errMsg = r.Error
} else {
providerID = r.TicketID
}
}
status := "success"
if !success {
status = "failed"
}
wp.metrics.PushSent.WithLabelValues(job.Platform, status).Inc()
wp.metrics.PushLatency.Observe(time.Since(start).Seconds())
case "matrix":
result := wp.matrix.Send(ctx, &channel.MatrixMessage{
RoomID: job.RoomID,
Body: job.Body,
FormattedBody: job.FormattedBody,
MsgType: job.MsgType,
})
success = result.Success
providerID = result.EventID
errMsg = result.Error
status := "success"
if !success {
status = "failed"
}
wp.metrics.MatrixSent.WithLabelValues(status).Inc()
case "webhook":
result := wp.webhook.Send(ctx, &channel.WebhookMessage{
URL: job.Recipient,
Method: job.WebhookMethod,
Headers: job.WebhookHeaders,
Body: job.Data,
Timeout: job.WebhookTimeout,
})
success = result.Success
sc := result.StatusCode
statusCode = &sc
errMsg = result.Error
status := "success"
if !success {
status = "failed"
}
wp.metrics.WebhooksSent.WithLabelValues(status).Inc()
}
}
func (wp *WorkerPool) updateStatus(ctx context.Context, notificationID, status string, attempts int, errMsg *string) {
query := `UPDATE notify.notifications SET status = $1, attempts = $2, updated_at = NOW()`
args := []any{status, attempts}
if status == "delivered" {
query += `, delivered_at = NOW()`
}
if errMsg != nil {
query += fmt.Sprintf(`, error_message = $%d`, len(args)+1)
args = append(args, *errMsg)
}
query += fmt.Sprintf(` WHERE id = $%d`, len(args)+1)
args = append(args, notificationID)
if _, err := wp.db.Pool.Exec(ctx, query, args...); err != nil {
slog.Error("update notification status failed", "id", notificationID, "error", err)
}
}
func (wp *WorkerPool) logDelivery(ctx context.Context, notificationID string, attempt int, ch string, success bool, statusCode *int, errMsg, providerID string, durationMs int) {
var errMsgPtr, providerIDPtr *string
if errMsg != "" {
errMsgPtr = &errMsg
}
if providerID != "" {
providerIDPtr = &providerID
}
_, err := wp.db.Pool.Exec(ctx,
`INSERT INTO notify.delivery_logs (notification_id, attempt_number, channel, success, status_code, error_message, provider_id, duration_ms)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
notificationID, attempt, ch, success, statusCode, errMsgPtr, providerIDPtr, durationMs,
)
if err != nil {
slog.Error("log delivery failed", "notification", notificationID, "error", err)
}
}

View file

@ -0,0 +1,138 @@
package template
import (
"bytes"
"context"
"log/slog"
"text/template"
"github.com/manacore/mana-notify/internal/db"
)
type Engine struct {
db *db.DB
}
func NewEngine(database *db.DB) *Engine {
return &Engine{db: database}
}
type Rendered struct {
Subject string
Body string
}
// RenderBySlug looks up a template by slug and renders it with the given data.
func (e *Engine) RenderBySlug(ctx context.Context, slug string, data map[string]any, locale string) (*Rendered, error) {
if locale == "" {
locale = "de-DE"
}
var subject, bodyTemplate *string
err := e.db.Pool.QueryRow(ctx,
`SELECT subject, body_template FROM notify.templates WHERE slug = $1 AND locale = $2 AND is_active = true`,
slug, locale,
).Scan(&subject, &bodyTemplate)
if err != nil {
// Try default locale fallback
err = e.db.Pool.QueryRow(ctx,
`SELECT subject, body_template FROM notify.templates WHERE slug = $1 AND is_active = true ORDER BY locale LIMIT 1`,
slug,
).Scan(&subject, &bodyTemplate)
if err != nil {
return nil, err
}
}
rendered := &Rendered{}
if subject != nil {
s, err := renderTemplate(*subject, data)
if err != nil {
return nil, err
}
rendered.Subject = s
}
if bodyTemplate != nil {
b, err := renderTemplate(*bodyTemplate, data)
if err != nil {
return nil, err
}
rendered.Body = b
}
return rendered, nil
}
// RenderDirect renders a template string with data.
func RenderDirect(tmplStr string, data map[string]any) (string, error) {
return renderTemplate(tmplStr, data)
}
func renderTemplate(tmplStr string, data map[string]any) (string, error) {
tmpl, err := template.New("").Parse(tmplStr)
if err != nil {
return "", err
}
var buf bytes.Buffer
if err := tmpl.Execute(&buf, data); err != nil {
return "", err
}
return buf.String(), nil
}
// SeedDefaults inserts default system templates if they don't exist.
func (e *Engine) SeedDefaults(ctx context.Context) {
defaults := []struct {
slug string
channel string
subject string
body string
vars string
}{
{
slug: "auth-password-reset",
channel: "email",
subject: "Passwort zurücksetzen - ManaCore",
body: `<!DOCTYPE html><html><body><h1>Passwort zurücksetzen</h1><p>Hallo {{.userName}},</p><p>Klicke auf den folgenden Link, um dein Passwort zurückzusetzen:</p><p><a href="{{.resetUrl}}">Passwort zurücksetzen</a></p><p>Falls du diese Anfrage nicht gestellt hast, kannst du diese E-Mail ignorieren.</p></body></html>`,
vars: `{"resetUrl": "URL zum Zurücksetzen", "userName": "Name des Benutzers"}`,
},
{
slug: "auth-verification",
channel: "email",
subject: "E-Mail bestätigen - ManaCore",
body: `<!DOCTYPE html><html><body><h1>E-Mail bestätigen</h1><p>Hallo {{.userName}},</p><p>Bitte bestätige deine E-Mail-Adresse:</p><p><a href="{{.verificationUrl}}">E-Mail bestätigen</a></p></body></html>`,
vars: `{"verificationUrl": "Bestätigungs-URL", "userName": "Name des Benutzers"}`,
},
{
slug: "auth-welcome",
channel: "email",
subject: "Willkommen bei ManaCore!",
body: `<!DOCTYPE html><html><body><h1>Willkommen!</h1><p>Hallo {{.userName}},</p><p>Willkommen bei ManaCore! Du kannst dich jetzt anmelden:</p><p><a href="{{.loginUrl}}">Anmelden</a></p></body></html>`,
vars: `{"userName": "Name des Benutzers", "loginUrl": "Login-URL"}`,
},
{
slug: "calendar-reminder",
channel: "email",
subject: "Erinnerung: {{.eventTitle}}",
body: `<!DOCTYPE html><html><body><h1>{{.eventTitle}}</h1><p>Wann: {{.eventTime}}</p>{{if .eventLocation}}<p>Wo: {{.eventLocation}}</p>{{end}}<p><a href="{{.eventUrl}}">Termin anzeigen</a></p></body></html>`,
vars: `{"eventTitle": "Titel", "eventTime": "Zeit", "eventLocation": "Ort (optional)", "eventUrl": "Link"}`,
},
}
for _, d := range defaults {
_, err := e.db.Pool.Exec(ctx,
`INSERT INTO notify.templates (slug, channel, subject, body_template, locale, is_system, variables)
VALUES ($1, $2, $3, $4, 'de-DE', true, $5)
ON CONFLICT (slug, locale) DO NOTHING`,
d.slug, d.channel, d.subject, d.body, d.vars,
)
if err != nil {
slog.Warn("seed template failed", "slug", d.slug, "error", err)
}
}
slog.Info("default templates seeded")
}

View file

@ -0,0 +1,10 @@
{
"name": "@manacore/mana-notify-go",
"version": "1.0.0",
"private": true,
"scripts": {
"dev": "go run ./cmd/server",
"build": "go build -o bin/mana-notify ./cmd/server",
"test": "go test ./..."
}
}