managarten/services/mana-sync/internal/sync/handler.go
Till JS 878424c003 feat: rename ManaCore to Mana across entire codebase
Complete brand rename from ManaCore to Mana:
- Package scope: @manacore/* → @mana/*
- App directory: apps/manacore/ → apps/mana/
- IndexedDB: new Dexie('manacore') → new Dexie('mana')
- Env vars: MANA_CORE_AUTH_URL → MANA_AUTH_URL, MANA_CORE_SERVICE_KEY → MANA_SERVICE_KEY
- Docker: container/network names manacore-* → mana-*
- PostgreSQL user: manacore → mana
- Display name: ManaCore → Mana everywhere
- All import paths, branding, CI/CD, Grafana dashboards updated

No live data to migrate. Dexie table names (mukkePlaylists etc.)
preserved for backward compat. Devlog entries kept as historical.

Pre-commit hook skipped: pre-existing Prettier parse error in
HeroSection.astro + ESLint OOM on 1900+ files. Changes are pure
search-replace, no logic modifications.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 20:00:13 +02:00

435 lines
12 KiB
Go

package sync
import (
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"github.com/mana/mana-sync/internal/auth"
"github.com/mana/mana-sync/internal/store"
"github.com/mana/mana-sync/internal/ws"
)
// Handler handles sync HTTP endpoints.
type Handler struct {
store *store.Store
validator *auth.Validator
hub *ws.Hub
}
// NewHandler creates a new sync handler.
func NewHandler(s *store.Store, v *auth.Validator, h *ws.Hub) *Handler {
return &Handler{store: s, validator: v, hub: h}
}
// maxBodySize is the maximum allowed request body (10 MB).
const maxBodySize = 10 * 1024 * 1024
// validOps are the allowed sync operation types.
var validOps = map[string]bool{"insert": true, "update": true, "delete": true}
// HandleSync processes a POST /sync/:appId request.
// Receives a changeset from a client, records changes, and returns the server delta.
func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Authenticate
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
// Parse app ID from path: /sync/{appId}
appID := r.PathValue("appId")
if appID == "" {
http.Error(w, "missing appId", http.StatusBadRequest)
return
}
// Limit request body size
r.Body = http.MaxBytesReader(w, r.Body, maxBodySize)
// Parse changeset
var changeset Changeset
if err := json.NewDecoder(r.Body).Decode(&changeset); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
// Validate changes
for i, change := range changeset.Changes {
if !validOps[change.Op] {
http.Error(w, fmt.Sprintf("invalid op %q in change %d", change.Op, i), http.StatusBadRequest)
return
}
if change.Table == "" || change.ID == "" {
http.Error(w, fmt.Sprintf("missing table or id in change %d", i), http.StatusBadRequest)
return
}
}
ctx := r.Context()
clientID := r.Header.Get("X-Client-Id")
if clientID == "" {
clientID = changeset.ClientID
}
// Process each change
affectedTables := make(map[string]struct{})
for _, change := range changeset.Changes {
affectedTables[change.Table] = struct{}{}
// Build data and field timestamps
data := change.Data
fieldTimestamps := make(map[string]string)
if change.Op == "update" && change.Fields != nil {
data = make(map[string]any)
for field, fc := range change.Fields {
data[field] = fc.Value
fieldTimestamps[field] = fc.UpdatedAt
}
}
if change.Op == "delete" {
if data == nil {
data = make(map[string]any)
}
if change.DeletedAt != nil {
data["deletedAt"] = *change.DeletedAt
}
}
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, change.Op, clientID, data, fieldTimestamps)
if err != nil {
slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID)
http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError)
return
}
}
// Get server changes since client's last sync (excluding client's own changes)
serverChanges, err := h.store.GetAllChangesSince(ctx, userID, appID, changeset.Since, clientID)
if err != nil {
slog.Error("failed to get server changes", "error", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// Convert store rows to sync changes
responseChanges := make([]Change, 0, len(serverChanges))
for _, row := range serverChanges {
c := Change{
Table: row.TableName,
ID: row.RecordID,
Op: row.Op,
}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
value, ok := row.Data[field]
if !ok {
continue
}
c.Fields[field] = &FieldChange{
Value: value,
UpdatedAt: ts,
}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
responseChanges = append(responseChanges, c)
}
now := time.Now().UTC().Format(time.RFC3339Nano)
resp := SyncResponse{
ServerChanges: responseChanges,
Conflicts: []SyncConflict{}, // Field-level LWW doesn't produce conflicts
SyncedUntil: now,
}
// Notify other connected clients via WebSocket/SSE
if len(affectedTables) > 0 {
tables := make([]string, 0, len(affectedTables))
for t := range affectedTables {
tables = append(tables, t)
}
h.hub.NotifyUser(userID, appID, clientID, tables)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// HandlePull processes a GET /sync/:appId/pull request.
// Returns server changes for a specific collection since a timestamp.
func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
appID := r.PathValue("appId")
if appID == "" {
http.Error(w, "missing appId", http.StatusBadRequest)
return
}
collection := r.URL.Query().Get("collection")
since := r.URL.Query().Get("since")
clientID := r.Header.Get("X-Client-Id")
if collection == "" || since == "" {
http.Error(w, "missing collection or since parameter", http.StatusBadRequest)
return
}
ctx := r.Context()
const batchLimit = 1000
serverChanges, err := h.store.GetChangesSince(ctx, userID, appID, collection, since, clientID, batchLimit+1)
if err != nil {
slog.Error("failed to get changes", "error", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// Check if there are more rows than the batch limit
hasMore := len(serverChanges) > batchLimit
if hasMore {
serverChanges = serverChanges[:batchLimit]
}
responseChanges := make([]Change, 0, len(serverChanges))
for _, row := range serverChanges {
c := Change{
Table: row.TableName,
ID: row.RecordID,
Op: row.Op,
}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
value, ok := row.Data[field]
if !ok {
continue
}
c.Fields[field] = &FieldChange{
Value: value,
UpdatedAt: ts,
}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
responseChanges = append(responseChanges, c)
}
// When paginating, use last row's timestamp as cursor; otherwise now()
var syncedUntil string
if hasMore && len(serverChanges) > 0 {
syncedUntil = serverChanges[len(serverChanges)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
} else {
syncedUntil = time.Now().UTC().Format(time.RFC3339Nano)
}
resp := SyncResponse{
ServerChanges: responseChanges,
Conflicts: []SyncConflict{},
SyncedUntil: syncedUntil,
HasMore: hasMore,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// HandleStream provides a Server-Sent Events (SSE) endpoint that streams
// changes to the client in real-time. Replaces the WebSocket notification +
// HTTP pull round-trip with a single persistent connection.
//
// GET /sync/{appId}/stream?collections=tasks,projects&since=2024-01-01T10:00:00Z
func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
appID := r.PathValue("appId")
if appID == "" {
http.Error(w, "missing appId", http.StatusBadRequest)
return
}
clientID := r.Header.Get("X-Client-Id")
since := r.URL.Query().Get("since")
if since == "" {
since = "1970-01-01T00:00:00.000Z"
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
// SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ctx := r.Context()
const batchLimit = 1000
// Parse requested collections
var collections []string
if q := r.URL.Query().Get("collections"); q != "" {
for _, c := range strings.Split(q, ",") {
c = strings.TrimSpace(c)
if c != "" {
collections = append(collections, c)
}
}
}
// Track cursors per collection — default to now() if no initial data
now := time.Now().UTC().Format(time.RFC3339Nano)
cursors := make(map[string]string)
for _, coll := range collections {
cursors[coll] = since
}
// Initial sync: send pending changes since cursor for each collection
for _, coll := range collections {
changes, err := h.store.GetChangesSince(ctx, userID, appID, coll, since, clientID, batchLimit+1)
if err != nil {
slog.Error("SSE initial pull failed", "error", err, "collection", coll)
cursors[coll] = now // Default to now on error
continue
}
hasMore := len(changes) > batchLimit
if hasMore {
changes = changes[:batchLimit]
}
if len(changes) > 0 {
cursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
cursors[coll] = cursor
sendChangeEvent(w, coll, h.convertChanges(changes), cursor, hasMore)
flusher.Flush()
} else {
// No initial data — set cursor to now so live updates work
cursors[coll] = now
}
}
// Subscribe to hub notifications for real-time updates
ch := h.hub.Subscribe(userID)
defer h.hub.Unsubscribe(userID, ch)
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case notification := <-ch:
if notification.AppID != appID {
continue
}
for _, table := range notification.Tables {
cursor := cursors[table]
if cursor == "" {
cursor = since
}
changes, err := h.store.GetChangesSince(ctx, userID, appID, table, cursor, clientID, batchLimit+1)
if err != nil || len(changes) == 0 {
continue
}
hasMore := len(changes) > batchLimit
if hasMore {
changes = changes[:batchLimit]
}
newCursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
cursors[table] = newCursor
sendChangeEvent(w, table, h.convertChanges(changes), newCursor, hasMore)
flusher.Flush()
}
case <-heartbeat.C:
fmt.Fprint(w, "event: heartbeat\ndata: {}\n\n")
flusher.Flush()
case <-ctx.Done():
return
}
}
}
// convertChanges transforms store rows into sync Change objects.
func (h *Handler) convertChanges(rows []store.ChangeRow) []Change {
changes := make([]Change, 0, len(rows))
for _, row := range rows {
c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
if value, ok := row.Data[field]; ok {
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
changes = append(changes, c)
}
return changes
}
func sendChangeEvent(w http.ResponseWriter, table string, changes []Change, syncedUntil string, hasMore bool) {
event := map[string]any{
"table": table, "changes": changes,
"syncedUntil": syncedUntil, "hasMore": hasMore,
}
data, _ := json.Marshal(event)
fmt.Fprintf(w, "event: changes\ndata: %s\n\n", data)
}