managarten/services/mana-notify/internal/queue/worker.go
Till JS 8e8b6ac65f fix(mana-auth) + chore: rewrite /api/v1/auth/login JWT mint, remove Matrix stack
This commit bundles two unrelated changes that were swept together by an
accidental `git add -A` in another working session. Documented here so the
history reflects what's actually inside.

═══════════════════════════════════════════════════════════════════════
1. fix(mana-auth): /api/v1/auth/login mints JWT via auth.handler instead
   of api.signInEmail
═══════════════════════════════════════════════════════════════════════

Previous attempt (commit 55cc75e7d) tried to fix the broken JWT mint in
/api/v1/auth/login by switching the cookie name from `mana.session_token`
to `__Secure-mana.session_token` for production. That was necessary but
not sufficient: Better Auth's session cookie value isn't just the raw
session token, it's `<token>.<HMAC>` where the HMAC is derived from the
better-auth secret. Reconstructing the cookie from auth.api.signInEmail's
JSON response only gave us the raw token, so /api/auth/token's
get-session middleware still couldn't validate it and the JWT mint kept
silently failing.

Real fix: do the sign-in via auth.handler (the HTTP path) rather than
auth.api.signInEmail (the SDK path). The handler returns a real fetch
Response with a Set-Cookie header containing the fully signed cookie
envelope. We capture that header verbatim and forward it as the cookie
on the /api/auth/token request, which now passes validation and mints
the JWT correctly.

Verified end-to-end on auth.mana.how:

  $ curl -X POST https://auth.mana.how/api/v1/auth/login \
      -d '{"email":"...","password":"..."}'
  {
    "user": {...},
    "token": "<session token>",
    "accessToken": "eyJhbGciOiJFZERTQSI...",   ← real JWT now
    "refreshToken": "<session token>"
  }

Side benefits:
- Email-not-verified path is now handled by checking
  signInResponse.status === 403 directly, no more catching APIError
  with the comment-noted async-stream footgun.
- X-Forwarded-For is forwarded explicitly so Better Auth's rate limiter
  and our security log see the real client IP.
- The leftover catch block now only handles unexpected exceptions
  (network errors etc); the FORBIDDEN-checking logic in it is dead but
  harmless and left in for defense in depth.

═══════════════════════════════════════════════════════════════════════
2. chore: remove the entire self-hosted Matrix stack (Synapse, Element,
   Manalink, mana-matrix-bot)
═══════════════════════════════════════════════════════════════════════

The Matrix subsystem ran parallel to the main Mana product without any
load-bearing integration: the unified web app never imported matrix-js-sdk,
the chat module uses mana-sync (local-first), and mana-matrix-bot's
plugins duplicated features the unified app already ships natively.
Keeping it alive cost a Synapse + Element + matrix-web + bot container
quartet, three Cloudflare routes, an OIDC provider plugin in mana-auth,
and a steady drip of devlog/dependency churn.

Removed:
- apps/matrix (Manalink web + mobile, ~150 files)
- services/mana-matrix-bot (Go bot with ~20 plugins)
- docker/matrix configs (Synapse + Element)
- synapse/element-web/matrix-web/mana-matrix-bot services in
  docker-compose.macmini.yml
- matrix.mana.how/element.mana.how/link.mana.how Cloudflare tunnel routes
- OIDC provider plugin + matrix-synapse trustedClient + matrixUserLinks
  table from mana-auth (oauth_* schema definitions also removed)
- MatrixService import path in mana-media (importFromMatrix endpoint)
- Matrix notification channel in mana-notify (worker, metrics, config,
  channel_type enum, MatrixOptions handler)
- Matrix entries from shared-branding (mana-apps + app-icons),
  notify-client, the i18n bundle, the observatory map, the credits
  app-label list, the landing footer/apps page, the prometheus + alerts
  + promtail tier mappings, and the matrix-related deploy paths in
  cd-macmini.yml + ci.yml

Devlog/manascore/blueprint entries that mention Matrix are left intact
as historical record. The oauth_* + matrix_user_links Postgres tables
stay on existing prod databases — code can no longer write to them, drop
them in a follow-up migration if you want them gone for real.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 16:32:13 +02:00

296 lines
7.5 KiB
Go

package queue
import (
"context"
"fmt"
"log/slog"
"math"
"time"
"github.com/mana/mana-notify/internal/channel"
"github.com/mana/mana-notify/internal/db"
"github.com/mana/mana-notify/internal/metrics"
)
// Job represents a notification delivery job.
type Job struct {
NotificationID string
Channel string
AppID string
Recipient string
Subject string
Body string
Data map[string]any
// Channel-specific
From string
ReplyTo string
Tokens []string // Push: multiple tokens
Sound string // Push
Badge *int // Push
Platform string // Push
WebhookMethod string // Webhook
WebhookHeaders map[string]string // Webhook
WebhookTimeout int // Webhook
// Retry
Attempt int
MaxRetries int
BackoffMs int
ScheduleAt *time.Time
}
// WorkerConfig per channel.
type WorkerConfig struct {
Concurrency int
MaxRetries int
BackoffMs int
}
var DefaultConfigs = map[string]WorkerConfig{
"email": {Concurrency: 5, MaxRetries: 3, BackoffMs: 5000},
"push": {Concurrency: 10, MaxRetries: 3, BackoffMs: 1000},
"webhook": {Concurrency: 10, MaxRetries: 5, BackoffMs: 3000},
}
// WorkerPool manages goroutine-based workers for all channels.
type WorkerPool struct {
db *db.DB
email *channel.EmailService
push *channel.PushService
webhook *channel.WebhookService
metrics *metrics.Metrics
jobs chan Job
ctx context.Context
cancel context.CancelFunc
}
func NewWorkerPool(database *db.DB, email *channel.EmailService, push *channel.PushService, webhook *channel.WebhookService, m *metrics.Metrics) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
db: database,
email: email,
push: push,
webhook: webhook,
metrics: m,
jobs: make(chan Job, 1000),
ctx: ctx,
cancel: cancel,
}
}
func (wp *WorkerPool) Start() {
// Start workers per channel type
for ch, cfg := range DefaultConfigs {
for i := 0; i < cfg.Concurrency; i++ {
go wp.worker(ch)
}
slog.Info("workers started", "channel", ch, "concurrency", cfg.Concurrency)
}
}
func (wp *WorkerPool) Stop() {
wp.cancel()
close(wp.jobs)
}
// Enqueue adds a job to the worker pool.
func (wp *WorkerPool) Enqueue(job Job) {
cfg := DefaultConfigs[job.Channel]
if job.MaxRetries == 0 {
job.MaxRetries = cfg.MaxRetries
}
if job.BackoffMs == 0 {
job.BackoffMs = cfg.BackoffMs
}
// Handle scheduled delivery
if job.ScheduleAt != nil && job.ScheduleAt.After(time.Now()) {
delay := time.Until(*job.ScheduleAt)
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-timer.C:
wp.jobs <- job
case <-wp.ctx.Done():
}
}()
return
}
wp.jobs <- job
}
func (wp *WorkerPool) worker(ch string) {
for {
select {
case job, ok := <-wp.jobs:
if !ok {
return
}
if job.Channel != ch {
// Put back for correct channel worker
wp.jobs <- job
continue
}
wp.processJob(job)
case <-wp.ctx.Done():
return
}
}
}
func (wp *WorkerPool) processJob(job Job) {
start := time.Now()
ctx := wp.ctx
// Update status to processing
wp.updateStatus(ctx, job.NotificationID, "processing", job.Attempt+1, nil)
var success bool
var providerID string
var statusCode *int
var errMsg string
defer func() {
if r := recover(); r != nil {
slog.Error("job panic", "notification", job.NotificationID, "panic", r)
errMsg = "internal panic"
success = false
}
duration := time.Since(start)
durationMs := int(duration.Milliseconds())
// Log delivery attempt
wp.logDelivery(ctx, job.NotificationID, job.Attempt+1, job.Channel, success, statusCode, errMsg, providerID, durationMs)
if success {
wp.updateStatus(ctx, job.NotificationID, "delivered", job.Attempt+1, nil)
wp.metrics.NotificationsSent.WithLabelValues(job.Channel, job.AppID).Inc()
} else if job.Attempt+1 < job.MaxRetries {
// Retry with exponential backoff
job.Attempt++
delay := time.Duration(float64(job.BackoffMs)*math.Pow(2, float64(job.Attempt-1))) * time.Millisecond
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-timer.C:
wp.jobs <- job
case <-wp.ctx.Done():
}
}()
slog.Info("retrying job", "notification", job.NotificationID, "attempt", job.Attempt, "delay", delay)
} else {
errStr := errMsg
wp.updateStatus(ctx, job.NotificationID, "failed", job.Attempt+1, &errStr)
wp.metrics.NotificationsFailed.WithLabelValues(job.Channel, job.AppID, "max_retries").Inc()
}
wp.metrics.NotificationLatency.WithLabelValues(job.Channel).Observe(duration.Seconds())
}()
switch job.Channel {
case "email":
result := wp.email.Send(&channel.EmailMessage{
To: job.Recipient,
Subject: job.Subject,
HTML: job.Body,
From: job.From,
ReplyTo: job.ReplyTo,
})
success = result.Success
providerID = result.MessageID
errMsg = result.Error
template := "custom"
status := "success"
if !success {
status = "failed"
}
wp.metrics.EmailsSent.WithLabelValues(template, status).Inc()
wp.metrics.EmailLatency.Observe(time.Since(start).Seconds())
case "push":
tokens := job.Tokens
if len(tokens) == 0 && job.Recipient != "" {
tokens = []string{job.Recipient}
}
results := wp.push.SendToTokens(ctx, tokens, job.Subject, job.Body, job.Data, job.Sound, job.Badge)
// Check if all succeeded
success = true
for _, r := range results {
if !r.Success {
success = false
errMsg = r.Error
} else {
providerID = r.TicketID
}
}
status := "success"
if !success {
status = "failed"
}
wp.metrics.PushSent.WithLabelValues(job.Platform, status).Inc()
wp.metrics.PushLatency.Observe(time.Since(start).Seconds())
case "webhook":
result := wp.webhook.Send(ctx, &channel.WebhookMessage{
URL: job.Recipient,
Method: job.WebhookMethod,
Headers: job.WebhookHeaders,
Body: job.Data,
Timeout: job.WebhookTimeout,
})
success = result.Success
sc := result.StatusCode
statusCode = &sc
errMsg = result.Error
status := "success"
if !success {
status = "failed"
}
wp.metrics.WebhooksSent.WithLabelValues(status).Inc()
}
}
func (wp *WorkerPool) updateStatus(ctx context.Context, notificationID, status string, attempts int, errMsg *string) {
query := `UPDATE notify.notifications SET status = $1, attempts = $2, updated_at = NOW()`
args := []any{status, attempts}
if status == "delivered" {
query += `, delivered_at = NOW()`
}
if errMsg != nil {
query += fmt.Sprintf(`, error_message = $%d`, len(args)+1)
args = append(args, *errMsg)
}
query += fmt.Sprintf(` WHERE id = $%d`, len(args)+1)
args = append(args, notificationID)
if _, err := wp.db.Pool.Exec(ctx, query, args...); err != nil {
slog.Error("update notification status failed", "id", notificationID, "error", err)
}
}
func (wp *WorkerPool) logDelivery(ctx context.Context, notificationID string, attempt int, ch string, success bool, statusCode *int, errMsg, providerID string, durationMs int) {
var errMsgPtr, providerIDPtr *string
if errMsg != "" {
errMsgPtr = &errMsg
}
if providerID != "" {
providerIDPtr = &providerID
}
_, err := wp.db.Pool.Exec(ctx,
`INSERT INTO notify.delivery_logs (notification_id, attempt_number, channel, success, status_code, error_message, provider_id, duration_ms)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
notificationID, attempt, ch, success, statusCode, errMsgPtr, providerIDPtr, durationMs,
)
if err != nil {
slog.Error("log delivery failed", "notification", notificationID, "error", err)
}
}