mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 18:41:08 +02:00
feat(sync): Phase 2 — server-side billing gate, cron charging, email notifications
Server-side gating (mana-sync Go):
- New billing.Checker with 5-minute cache per user
- Middleware wraps POST/GET /sync/{appId} endpoints
- Returns 402 Payment Required when sync subscription inactive
- Fail-open: if mana-credits is unreachable, sync is allowed
- Config: MANA_CREDITS_URL + MANA_SERVICE_KEY env vars
Recurring charge cron (mana-credits):
- Hourly setInterval checks for due sync subscriptions
- Calls chargeRecurring() which debits credits and advances nextChargeAt
- On insufficient credits: pauses subscription, sends email via mana-notify
Email notifications:
- Sends "Cloud Sync pausiert" email via mana-notify when subscription paused
- Uses POST /api/v1/notifications/send with X-Service-Key auth
Client-side 402 handling:
- sync.ts detects 402 from push/pull, fires onBillingRequired callback
- Layout wires callback to reload syncBilling store → shows pause banner
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7102063afc
commit
ed76f53b00
7 changed files with 209 additions and 13 deletions
|
|
@ -539,6 +539,7 @@ export function createUnifiedSync(
|
|||
let status: SyncStatus = 'idle';
|
||||
let online = typeof navigator !== 'undefined' ? navigator.onLine : true;
|
||||
let _statusListeners: Array<(s: SyncStatus) => void> = [];
|
||||
let _billingRequiredCallback: (() => void) | null = null;
|
||||
const sseAbortControllers = new Map<string, AbortController>();
|
||||
|
||||
// ─── Lifecycle ──────────────────────────────────────────
|
||||
|
|
@ -714,6 +715,10 @@ export function createUnifiedSync(
|
|||
`push[${appId}]`
|
||||
);
|
||||
|
||||
if (res.status === 402) {
|
||||
_billingRequiredCallback?.();
|
||||
return;
|
||||
}
|
||||
if (!res.ok) throw new Error(`Push failed: ${res.status}`);
|
||||
|
||||
const data = await res.json();
|
||||
|
|
@ -813,6 +818,10 @@ export function createUnifiedSync(
|
|||
`pull[${appId}/${syncName}]`
|
||||
);
|
||||
|
||||
if (res.status === 402) {
|
||||
_billingRequiredCallback?.();
|
||||
return;
|
||||
}
|
||||
if (!res.ok) break;
|
||||
|
||||
const data = await res.json();
|
||||
|
|
@ -1089,6 +1098,9 @@ export function createUnifiedSync(
|
|||
_statusListeners = _statusListeners.filter((l) => l !== listener);
|
||||
};
|
||||
},
|
||||
onBillingRequired(callback: () => void) {
|
||||
_billingRequiredCallback = callback;
|
||||
},
|
||||
getChannel: (appId: string) => channels.get(appId),
|
||||
pushNow: push,
|
||||
pullNow: pull,
|
||||
|
|
|
|||
|
|
@ -471,6 +471,10 @@
|
|||
// Update pending count when sync status changes
|
||||
await refreshPendingCount();
|
||||
});
|
||||
unifiedSync.onBillingRequired(() => {
|
||||
// Server returned 402 — sync subscription expired or paused
|
||||
syncBilling.load();
|
||||
});
|
||||
unifiedSync.startAll();
|
||||
// Seed the badge count on mount: onStatusChange only fires on
|
||||
// transitions, so without this the badge stays at its last known
|
||||
|
|
|
|||
|
|
@ -74,6 +74,22 @@ app.route(
|
|||
createWebhookRoutes(stripeService, creditsService, config.stripe.webhookSecret)
|
||||
);
|
||||
|
||||
// ─── Cron: Daily sync billing charge ────────────────────────
|
||||
|
||||
const CHARGE_INTERVAL_MS = 60 * 60 * 1000; // Check every hour
|
||||
setInterval(async () => {
|
||||
try {
|
||||
const result = await syncBillingService.chargeRecurring();
|
||||
if (result.total > 0) {
|
||||
console.log(
|
||||
`[sync-billing] Recurring charge: ${result.charged} charged, ${result.paused} paused, ${result.errors} errors`
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('[sync-billing] Recurring charge failed:', err);
|
||||
}
|
||||
}, CHARGE_INTERVAL_MS);
|
||||
|
||||
// ─── Start ──────────────────────────────────────────────────
|
||||
|
||||
console.log(`mana-credits starting on port ${config.port}...`);
|
||||
|
|
|
|||
|
|
@ -10,6 +10,34 @@ import { BadRequestError, NotFoundError, InsufficientCreditsError } from '../lib
|
|||
|
||||
type BillingInterval = 'monthly' | 'quarterly' | 'yearly';
|
||||
|
||||
const MANA_NOTIFY_URL = () => process.env.MANA_NOTIFY_URL || 'http://localhost:3040';
|
||||
const SERVICE_KEY = () => process.env.MANA_SERVICE_KEY || '';
|
||||
|
||||
async function sendPauseNotification(userId: string): Promise<void> {
|
||||
const key = SERVICE_KEY();
|
||||
if (!key) return;
|
||||
|
||||
try {
|
||||
await fetch(`${MANA_NOTIFY_URL()}/api/v1/notifications/send`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Service-Key': key,
|
||||
},
|
||||
body: JSON.stringify({
|
||||
channel: 'email',
|
||||
appId: 'sync',
|
||||
userId,
|
||||
subject: 'Cloud Sync pausiert',
|
||||
body: 'Dein Cloud Sync wurde pausiert, weil deine Credits nicht ausreichen. Lade Credits auf, um die Synchronisation fortzusetzen.',
|
||||
data: { type: 'sync-paused', resumeUrl: 'https://mana.how/settings/sync' },
|
||||
}),
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`[sync-billing] Failed to send pause notification for ${userId}:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
const SYNC_PRICES: Record<BillingInterval, number> = {
|
||||
monthly: 30,
|
||||
quarterly: 90,
|
||||
|
|
@ -222,7 +250,7 @@ export class SyncBillingService {
|
|||
.where(eq(syncSubscriptions.userId, sub.userId));
|
||||
|
||||
paused++;
|
||||
// TODO Phase 2: send notification via mana-notify
|
||||
sendPauseNotification(sub.userId).catch(() => {});
|
||||
} else {
|
||||
errors++;
|
||||
console.error(`[sync-billing] Failed to charge user ${sub.userId}:`, error);
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/mana/mana-sync/internal/auth"
|
||||
"github.com/mana/mana-sync/internal/billing"
|
||||
"github.com/mana/mana-sync/internal/config"
|
||||
"github.com/mana/mana-sync/internal/store"
|
||||
syncHandler "github.com/mana/mana-sync/internal/sync"
|
||||
|
|
@ -49,16 +50,20 @@ func main() {
|
|||
// Initialize WebSocket hub (with JWT validator for auth)
|
||||
hub := ws.NewHub(validator)
|
||||
|
||||
// Initialize billing checker (verifies sync subscription via mana-credits)
|
||||
billingChecker := billing.NewChecker(cfg.ManaCreditsURL, cfg.ServiceKey)
|
||||
billingMiddleware := billingChecker.Middleware(validator)
|
||||
|
||||
// Initialize sync handler
|
||||
handler := syncHandler.NewHandler(db, validator, hub)
|
||||
|
||||
// Set up routes
|
||||
mux := http.NewServeMux()
|
||||
|
||||
// Sync endpoints (Go 1.22+ routing patterns)
|
||||
mux.HandleFunc("POST /sync/{appId}", handler.HandleSync)
|
||||
mux.HandleFunc("GET /sync/{appId}/pull", handler.HandlePull)
|
||||
mux.HandleFunc("GET /sync/{appId}/stream", handler.HandleStream)
|
||||
// Sync endpoints (Go 1.22+ routing patterns) — gated by billing check
|
||||
mux.Handle("POST /sync/{appId}", billingMiddleware(http.HandlerFunc(handler.HandleSync)))
|
||||
mux.Handle("GET /sync/{appId}/pull", billingMiddleware(http.HandlerFunc(handler.HandlePull)))
|
||||
mux.Handle("GET /sync/{appId}/stream", billingMiddleware(http.HandlerFunc(handler.HandleStream)))
|
||||
|
||||
// WebSocket endpoints
|
||||
// Unified: one connection per user, receives all app notifications with appId in payload
|
||||
|
|
|
|||
127
services/mana-sync/internal/billing/check.go
Normal file
127
services/mana-sync/internal/billing/check.go
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
// Package billing provides sync billing status checks against mana-credits.
|
||||
package billing
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SyncStatus represents the billing status for a user's sync subscription.
|
||||
type SyncStatus struct {
|
||||
Active bool `json:"active"`
|
||||
Interval string `json:"interval"`
|
||||
PausedAt *string `json:"pausedAt"`
|
||||
}
|
||||
|
||||
type cachedStatus struct {
|
||||
status SyncStatus
|
||||
fetchedAt time.Time
|
||||
}
|
||||
|
||||
// Checker verifies sync billing status via the mana-credits service.
|
||||
// Results are cached per user for cacheTTL to avoid hitting mana-credits on every request.
|
||||
type Checker struct {
|
||||
creditsURL string
|
||||
serviceKey string
|
||||
cacheTTL time.Duration
|
||||
client *http.Client
|
||||
|
||||
mu sync.RWMutex
|
||||
cache map[string]cachedStatus
|
||||
}
|
||||
|
||||
// NewChecker creates a billing checker.
|
||||
func NewChecker(creditsURL, serviceKey string) *Checker {
|
||||
return &Checker{
|
||||
creditsURL: creditsURL,
|
||||
serviceKey: serviceKey,
|
||||
cacheTTL: 5 * time.Minute,
|
||||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
cache: make(map[string]cachedStatus),
|
||||
}
|
||||
}
|
||||
|
||||
// IsActive checks whether a user has an active sync subscription.
|
||||
// Returns true if the billing check fails (fail-open to not block sync on service outage).
|
||||
func (c *Checker) IsActive(userID string) bool {
|
||||
// Check cache first
|
||||
c.mu.RLock()
|
||||
entry, ok := c.cache[userID]
|
||||
c.mu.RUnlock()
|
||||
|
||||
if ok && time.Since(entry.fetchedAt) < c.cacheTTL {
|
||||
return entry.status.Active
|
||||
}
|
||||
|
||||
// Fetch from mana-credits
|
||||
status, err := c.fetchStatus(userID)
|
||||
if err != nil {
|
||||
slog.Warn("billing check failed, allowing sync (fail-open)", "userID", userID, "error", err)
|
||||
return true // Fail-open: don't block sync if billing service is down
|
||||
}
|
||||
|
||||
// Update cache
|
||||
c.mu.Lock()
|
||||
c.cache[userID] = cachedStatus{status: status, fetchedAt: time.Now()}
|
||||
c.mu.Unlock()
|
||||
|
||||
return status.Active
|
||||
}
|
||||
|
||||
func (c *Checker) fetchStatus(userID string) (SyncStatus, error) {
|
||||
url := fmt.Sprintf("%s/api/v1/internal/sync/status/%s", c.creditsURL, userID)
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return SyncStatus{}, err
|
||||
}
|
||||
req.Header.Set("X-Service-Key", c.serviceKey)
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return SyncStatus{}, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return SyncStatus{}, fmt.Errorf("unexpected status: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var status SyncStatus
|
||||
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
|
||||
return SyncStatus{}, fmt.Errorf("decode failed: %w", err)
|
||||
}
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// Middleware returns an HTTP middleware that checks sync billing status.
|
||||
// Returns 402 Payment Required if the user's sync subscription is not active.
|
||||
func (c *Checker) Middleware(validator interface{ UserIDFromRequest(*http.Request) (string, error) }) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
userID, err := validator.UserIDFromRequest(r)
|
||||
if err != nil {
|
||||
// Let the downstream handler deal with auth errors
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
if !c.IsActive(userID) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusPaymentRequired)
|
||||
json.NewEncoder(w).Encode(map[string]string{
|
||||
"error": "sync_inactive",
|
||||
"message": "Cloud Sync ist nicht aktiv. Aktiviere Sync in den Einstellungen.",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -7,10 +7,12 @@ import (
|
|||
|
||||
// Config holds all configuration for the sync server.
|
||||
type Config struct {
|
||||
Port int
|
||||
DatabaseURL string
|
||||
JWKSUrl string // mana-auth JWKS endpoint for JWT validation
|
||||
CORSOrigins string
|
||||
Port int
|
||||
DatabaseURL string
|
||||
JWKSUrl string // mana-auth JWKS endpoint for JWT validation
|
||||
CORSOrigins string
|
||||
ManaCreditsURL string // mana-credits service URL for billing checks
|
||||
ServiceKey string // Service-to-service auth key
|
||||
}
|
||||
|
||||
// Load reads configuration from environment variables with sensible defaults.
|
||||
|
|
@ -18,10 +20,12 @@ func Load() *Config {
|
|||
port, _ := strconv.Atoi(getEnv("PORT", "3050"))
|
||||
|
||||
return &Config{
|
||||
Port: port,
|
||||
DatabaseURL: getEnv("DATABASE_URL", "postgresql://mana:devpassword@localhost:5432/mana_sync"),
|
||||
JWKSUrl: getEnv("JWKS_URL", "http://localhost:3001/api/auth/jwks"),
|
||||
CORSOrigins: getEnv("CORS_ORIGINS", "http://localhost:5173,http://localhost:5188"),
|
||||
Port: port,
|
||||
DatabaseURL: getEnv("DATABASE_URL", "postgresql://mana:devpassword@localhost:5432/mana_sync"),
|
||||
JWKSUrl: getEnv("JWKS_URL", "http://localhost:3001/api/auth/jwks"),
|
||||
CORSOrigins: getEnv("CORS_ORIGINS", "http://localhost:5173,http://localhost:5188"),
|
||||
ManaCreditsURL: getEnv("MANA_CREDITS_URL", "http://localhost:3061"),
|
||||
ServiceKey: getEnv("MANA_SERVICE_KEY", "dev-service-key"),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue