managarten/services/mana-sync/internal/sync/handler.go
Till JS 38d35247cd feat(spaces): end-to-end shared-space sync (membership lookup + plaintext)
Closes the gap between "invite flow UI exists" and "two users in the
same space actually see each other's data". Three pieces land together
because they're meaningless without each other.

mana-auth — new internal endpoint:
  GET /api/v1/internal/users/:userId/memberships
  Returns [{organizationId, role}, ...] for the user. mana-sync uses
  this to populate the multi-member RLS session config.

mana-sync — membership lookup:
  new internal/memberships package with an HTTP client + 5 min
  per-user cache, fail-open (empty list = pre-Spaces behavior).
  Config gets MANA_AUTH_URL (default http://localhost:3001).
  Handler.NewHandler takes the Lookup. Every Push/Pull/Stream call
  now passes spaceIDsFor(userID) to Store methods.
  GetChangesSince + GetAllChangesSince extend their WHERE clause:
    WHERE (user_id = $1 OR space_id = ANY($memberSpaces))
  so co-members see each other's rows, not just the author.

apps/web — encryption skip for shared-space records:
  encryptRecord now checks record.spaceId:
    - `_personal:<userId>` sentinel OR no active shared space → encrypt
      with user master key (E2E as today).
    - Active space resolves to non-personal type AND spaceId matches
      that space → skip encryption; write lands plaintext.
  decryptRecord is unchanged because its per-field isEncrypted() guard
  already passes plaintext through.
  Phase-1 compromise: shared-space data is protected by server RLS
  only, not E2E. Phase 2 adds per-Space shared keys with per-member
  wrap — tracked in docs/plans/spaces-foundation.md.

Plus docs/plans/shared-space-smoketest.md: step-by-step Zwei-User-Test
mit erwarteten Ergebnissen und Debugging-Hinweisen bei Problemen.

Build + go test + web check all green.

Plan: docs/plans/spaces-foundation.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 20:46:53 +02:00

490 lines
14 KiB
Go

package sync
import (
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"github.com/mana/mana-sync/internal/auth"
"github.com/mana/mana-sync/internal/memberships"
"github.com/mana/mana-sync/internal/store"
"github.com/mana/mana-sync/internal/ws"
)
// Handler handles sync HTTP endpoints.
type Handler struct {
store *store.Store
validator *auth.Validator
hub *ws.Hub
memberships *memberships.Lookup
}
// NewHandler creates a new sync handler.
// memberships may be nil — if so, the handler treats every user as
// having no shared-space memberships (same as pre-Spaces behavior).
func NewHandler(s *store.Store, v *auth.Validator, h *ws.Hub, m *memberships.Lookup) *Handler {
return &Handler{store: s, validator: v, hub: h, memberships: m}
}
// spaceIDsFor returns the Space membership list for the caller, or an
// empty slice if no lookup is configured. Used to populate the session
// config the multi-member RLS policy reads.
func (h *Handler) spaceIDsFor(userID string) []string {
if h.memberships == nil {
return nil
}
return h.memberships.For(userID)
}
// maxBodySize is the maximum allowed request body (10 MB).
const maxBodySize = 10 * 1024 * 1024
// extractSpaceID pulls the Space id out of an incoming change payload. The
// protocol preference is:
//
// 1. Top-level `spaceId` (v28+ clients stamp it explicitly — cheap to parse).
// 2. `data.spaceId` — present on inserts from pre-protocol clients that only
// send the full record object.
// 3. `fields.spaceId.value` — present on updates that happen to touch the
// scope field (rare; spaceId is marked immutable in the Dexie updating
// hook, so in practice this branch only covers edge cases).
//
// Returns the empty string when none of the above yield a usable value. An
// empty string lands as SQL NULL in the space_id column so partial indexes
// keep skipping legacy rows cleanly.
func extractSpaceID(change Change) string {
if change.SpaceID != "" {
return change.SpaceID
}
if change.Data != nil {
if v, ok := change.Data["spaceId"].(string); ok && v != "" {
return v
}
}
if change.Fields != nil {
if fc, ok := change.Fields["spaceId"]; ok && fc != nil {
if v, ok := fc.Value.(string); ok && v != "" {
return v
}
}
}
return ""
}
// changeFromRow projects a stored sync_changes row onto the wire Change shape.
// Carries eventId + schemaVersion through so clients can dedup on replay and
// route through the migration chain.
func changeFromRow(row store.ChangeRow) Change {
sv := row.SchemaVersion
if sv <= 0 {
sv = 1
}
c := Change{
EventID: row.ID,
SchemaVersion: sv,
Table: row.TableName,
ID: row.RecordID,
Op: row.Op,
SpaceID: row.SpaceID,
Actor: row.Actor,
}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
value, ok := row.Data[field]
if !ok {
continue
}
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
return c
}
// validOps are the allowed sync operation types.
var validOps = map[string]bool{"insert": true, "update": true, "delete": true}
// HandleSync processes a POST /sync/:appId request.
// Receives a changeset from a client, records changes, and returns the server delta.
func (h *Handler) HandleSync(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Authenticate
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
// Parse app ID from path: /sync/{appId}
appID := r.PathValue("appId")
if appID == "" {
http.Error(w, "missing appId", http.StatusBadRequest)
return
}
// Limit request body size
r.Body = http.MaxBytesReader(w, r.Body, maxBodySize)
// Parse changeset
var changeset Changeset
if err := json.NewDecoder(r.Body).Decode(&changeset); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
// Normalize + validate protocol version. Pre-M2 clients omit the field
// (treated as v1); a client newer than this build is refused so we don't
// silently store events we can't fully interpret.
schemaVersion := changeset.SchemaVersion
if schemaVersion <= 0 {
schemaVersion = 1
}
if schemaVersion > MaxSupportedSchemaVersion {
http.Error(w, fmt.Sprintf("unsupported schemaVersion %d (max %d)", schemaVersion, MaxSupportedSchemaVersion), http.StatusBadRequest)
return
}
// Validate changes
for i, change := range changeset.Changes {
if !validOps[change.Op] {
http.Error(w, fmt.Sprintf("invalid op %q in change %d", change.Op, i), http.StatusBadRequest)
return
}
if change.Table == "" || change.ID == "" {
http.Error(w, fmt.Sprintf("missing table or id in change %d", i), http.StatusBadRequest)
return
}
}
ctx := r.Context()
clientID := r.Header.Get("X-Client-Id")
if clientID == "" {
clientID = changeset.ClientID
}
// Process each change
affectedTables := make(map[string]struct{})
for _, change := range changeset.Changes {
affectedTables[change.Table] = struct{}{}
// Build data and field timestamps
data := change.Data
fieldTimestamps := make(map[string]string)
if change.Op == "update" && change.Fields != nil {
data = make(map[string]any)
for field, fc := range change.Fields {
data[field] = fc.Value
fieldTimestamps[field] = fc.UpdatedAt
}
}
if change.Op == "delete" {
if data == nil {
data = make(map[string]any)
}
if change.DeletedAt != nil {
data["deletedAt"] = *change.DeletedAt
}
}
// Per-change schemaVersion falls back to the changeset-level value
// so a well-formed pre-M2 change nested in an M2 changeset still
// lands on the right version.
rowSchemaVersion := change.SchemaVersion
if rowSchemaVersion <= 0 {
rowSchemaVersion = schemaVersion
}
// spaceId for this change: prefer the top-level field (post-v28
// clients stamp it explicitly), fall back to data.spaceId for
// inserts and fields.spaceId.value for updates so pre-protocol
// clients still get indexed correctly. Empty string lands as SQL
// NULL via RecordChange.
spaceID := extractSpaceID(change)
err := h.store.RecordChange(ctx, appID, change.Table, change.ID, userID, spaceID, change.Op, clientID, data, fieldTimestamps, rowSchemaVersion, change.Actor)
if err != nil {
slog.Error("failed to record change", "error", err, "table", change.Table, "id", change.ID)
http.Error(w, "failed to record change: "+err.Error(), http.StatusInternalServerError)
return
}
}
// Get server changes since client's last sync (excluding client's own changes)
serverChanges, err := h.store.GetAllChangesSince(ctx, userID, appID, changeset.Since, clientID, h.spaceIDsFor(userID))
if err != nil {
slog.Error("failed to get server changes", "error", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// Convert store rows to sync changes
responseChanges := make([]Change, 0, len(serverChanges))
for _, row := range serverChanges {
responseChanges = append(responseChanges, changeFromRow(row))
}
now := time.Now().UTC().Format(time.RFC3339Nano)
resp := SyncResponse{
ServerChanges: responseChanges,
Conflicts: []SyncConflict{}, // Field-level LWW doesn't produce conflicts
SyncedUntil: now,
}
// Notify other connected clients via WebSocket/SSE
if len(affectedTables) > 0 {
tables := make([]string, 0, len(affectedTables))
for t := range affectedTables {
tables = append(tables, t)
}
h.hub.NotifyUser(userID, appID, clientID, tables)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// HandlePull processes a GET /sync/:appId/pull request.
// Returns server changes for a specific collection since a timestamp.
func (h *Handler) HandlePull(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
appID := r.PathValue("appId")
if appID == "" {
http.Error(w, "missing appId", http.StatusBadRequest)
return
}
collection := r.URL.Query().Get("collection")
since := r.URL.Query().Get("since")
clientID := r.Header.Get("X-Client-Id")
if collection == "" || since == "" {
http.Error(w, "missing collection or since parameter", http.StatusBadRequest)
return
}
ctx := r.Context()
const batchLimit = 1000
serverChanges, err := h.store.GetChangesSince(ctx, userID, appID, collection, since, clientID, batchLimit+1, h.spaceIDsFor(userID))
if err != nil {
slog.Error("failed to get changes", "error", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// Check if there are more rows than the batch limit
hasMore := len(serverChanges) > batchLimit
if hasMore {
serverChanges = serverChanges[:batchLimit]
}
responseChanges := make([]Change, 0, len(serverChanges))
for _, row := range serverChanges {
responseChanges = append(responseChanges, changeFromRow(row))
}
// When paginating, use last row's timestamp as cursor; otherwise now()
var syncedUntil string
if hasMore && len(serverChanges) > 0 {
syncedUntil = serverChanges[len(serverChanges)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
} else {
syncedUntil = time.Now().UTC().Format(time.RFC3339Nano)
}
resp := SyncResponse{
ServerChanges: responseChanges,
Conflicts: []SyncConflict{},
SyncedUntil: syncedUntil,
HasMore: hasMore,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// HandleStream provides a Server-Sent Events (SSE) endpoint that streams
// changes to the client in real-time. Replaces the WebSocket notification +
// HTTP pull round-trip with a single persistent connection.
//
// GET /sync/{appId}/stream?collections=tasks,projects&since=2024-01-01T10:00:00Z
func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
userID, err := h.validator.UserIDFromRequest(r)
if err != nil {
http.Error(w, "unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
appID := r.PathValue("appId")
if appID == "" {
http.Error(w, "missing appId", http.StatusBadRequest)
return
}
clientID := r.Header.Get("X-Client-Id")
since := r.URL.Query().Get("since")
if since == "" {
since = "1970-01-01T00:00:00.000Z"
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
// SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ctx := r.Context()
const batchLimit = 1000
// Parse requested collections
var collections []string
if q := r.URL.Query().Get("collections"); q != "" {
for _, c := range strings.Split(q, ",") {
c = strings.TrimSpace(c)
if c != "" {
collections = append(collections, c)
}
}
}
// Track cursors per collection — default to now() if no initial data
now := time.Now().UTC().Format(time.RFC3339Nano)
cursors := make(map[string]string)
for _, coll := range collections {
cursors[coll] = since
}
// Initial sync: send pending changes since cursor for each collection
memberSpaceIDs := h.spaceIDsFor(userID)
for _, coll := range collections {
changes, err := h.store.GetChangesSince(ctx, userID, appID, coll, since, clientID, batchLimit+1, memberSpaceIDs)
if err != nil {
slog.Error("SSE initial pull failed", "error", err, "collection", coll)
cursors[coll] = now // Default to now on error
continue
}
hasMore := len(changes) > batchLimit
if hasMore {
changes = changes[:batchLimit]
}
if len(changes) > 0 {
cursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
cursors[coll] = cursor
sendChangeEvent(w, coll, h.convertChanges(changes), cursor, hasMore)
flusher.Flush()
} else {
// No initial data — set cursor to now so live updates work
cursors[coll] = now
}
}
// Subscribe to hub notifications for real-time updates
ch := h.hub.Subscribe(userID)
defer h.hub.Unsubscribe(userID, ch)
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case notification := <-ch:
if notification.AppID != appID {
continue
}
for _, table := range notification.Tables {
cursor := cursors[table]
if cursor == "" {
cursor = since
}
changes, err := h.store.GetChangesSince(ctx, userID, appID, table, cursor, clientID, batchLimit+1, memberSpaceIDs)
if err != nil || len(changes) == 0 {
continue
}
hasMore := len(changes) > batchLimit
if hasMore {
changes = changes[:batchLimit]
}
newCursor := changes[len(changes)-1].CreatedAt.UTC().Format(time.RFC3339Nano)
cursors[table] = newCursor
sendChangeEvent(w, table, h.convertChanges(changes), newCursor, hasMore)
flusher.Flush()
}
case <-heartbeat.C:
fmt.Fprint(w, "event: heartbeat\ndata: {}\n\n")
flusher.Flush()
case <-ctx.Done():
return
}
}
}
// convertChanges transforms store rows into sync Change objects.
func (h *Handler) convertChanges(rows []store.ChangeRow) []Change {
changes := make([]Change, 0, len(rows))
for _, row := range rows {
c := Change{Table: row.TableName, ID: row.RecordID, Op: row.Op, Actor: row.Actor}
switch row.Op {
case "insert":
c.Data = row.Data
case "update":
c.Fields = make(map[string]*FieldChange)
for field, ts := range row.FieldTimestamps {
if value, ok := row.Data[field]; ok {
c.Fields[field] = &FieldChange{Value: value, UpdatedAt: ts}
}
}
case "delete":
if deletedAt, ok := row.Data["deletedAt"].(string); ok {
c.DeletedAt = &deletedAt
}
}
changes = append(changes, c)
}
return changes
}
func sendChangeEvent(w http.ResponseWriter, table string, changes []Change, syncedUntil string, hasMore bool) {
event := map[string]any{
"table": table, "changes": changes,
"syncedUntil": syncedUntil, "hasMore": hasMore,
}
data, _ := json.Marshal(event)
fmt.Fprintf(w, "event: changes\ndata: %s\n\n", data)
}