diff --git a/services/mana-sync/internal/backup/handler.go b/services/mana-sync/internal/backup/handler.go index ecb458347..55f1e5124 100644 --- a/services/mana-sync/internal/backup/handler.go +++ b/services/mana-sync/internal/backup/handler.go @@ -26,19 +26,13 @@ package backup import ( - "archive/zip" - "crypto/sha256" - "encoding/hex" - "encoding/json" + "context" "fmt" - "io" "log/slog" "net/http" - "sort" "time" "github.com/mana/mana-sync/internal/auth" - syncproto "github.com/mana/mana-sync/internal/sync" "github.com/mana/mana-sync/internal/store" ) @@ -58,10 +52,9 @@ func NewHandler(s *store.Store, v *auth.Validator) *Handler { return &Handler{store: s, validator: v} } -// exportLine is the on-wire shape of one row inside events.jsonl. Field -// names mirror the sync-protocol Change shape so the restore side can feed -// lines straight into applyServerChanges() after running them through the -// migration chain keyed on schemaVersion. +// exportLine is the on-wire shape of one row inside events.jsonl. Shared +// with writer.go so both the HTTP path and the writer tests serialize +// identically. type exportLine struct { EventID string `json:"eventId"` SchemaVersion int `json:"schemaVersion"` @@ -75,11 +68,10 @@ type exportLine struct { CreatedAt string `json:"createdAt"` } -// manifestFile is the header object serialized as manifest.json. Kept small -// and declarative so tools can parse it without loading events.jsonl. +// manifestFile is the header object serialized as manifest.json. type manifestFile struct { FormatVersion int `json:"formatVersion"` - SchemaVersion int `json:"schemaVersion"` // max event schemaVersion this server knows + SchemaVersion int `json:"schemaVersion"` UserID string `json:"userId"` CreatedAt string `json:"createdAt"` EventCount int `json:"eventCount"` @@ -90,8 +82,10 @@ type manifestFile struct { SchemaVersionMax int `json:"schemaVersionMax,omitempty"` } -// HandleExport streams a .mana zip archive containing the user's full -// sync-event log plus a manifest with integrity hash. +// HandleExport is an HTTP shim over WriteBackup: it authenticates, sets +// download headers, and hands the response writer plus a store-backed +// iterator to the shared writer. Tests talk to WriteBackup directly with +// a synthetic iterator. func (h *Handler) HandleExport(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) @@ -113,133 +107,22 @@ func (h *Handler) HandleExport(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Accel-Buffering", "no") w.Header().Set("Cache-Control", "no-store") - zw := zip.NewWriter(w) - // Only close once — closing writes the central directory, which we need - // even if streaming errored partway so the file is at least a valid zip. - zipClosed := false - closeZip := func() { - if zipClosed { - return - } - zipClosed = true - if err := zw.Close(); err != nil { - slog.Error("backup: zip close failed", "user_id", userID, "error", err) - } - } - defer closeZip() - - // ─── events.jsonl entry ────────────────────────────────────── - eventsWriter, err := zw.CreateHeader(&zip.FileHeader{ - Name: "events.jsonl", - Method: zip.Deflate, - Modified: createdAt, - }) - if err != nil { - slog.Error("backup: create events.jsonl entry", "user_id", userID, "error", err) - return - } - - hasher := sha256.New() - // Tee so the deflate entry and the hash both see every byte — the hash - // is over the *decompressed* JSONL, which is what the restore side will - // re-hash after unzipping. - teed := io.MultiWriter(eventsWriter, hasher) - encoder := json.NewEncoder(teed) - - var ( - count int - appSet = make(map[string]struct{}) - minVer int - maxVer int - ) - - streamErr := h.store.StreamAllUserChanges(r.Context(), userID, func(row store.ChangeRow) error { - sv := row.SchemaVersion - if sv <= 0 { - sv = 1 - } - if count == 0 { - minVer = sv - maxVer = sv - } else { - if sv < minVer { - minVer = sv - } - if sv > maxVer { - maxVer = sv - } - } - line := exportLine{ - EventID: row.ID, - SchemaVersion: sv, - AppID: row.AppID, - Table: row.TableName, - RecordID: row.RecordID, - Op: row.Op, - Data: row.Data, - FieldTimestamps: row.FieldTimestamps, - ClientID: row.ClientID, - CreatedAt: row.CreatedAt.UTC().Format(time.RFC3339Nano), - } - if err := encoder.Encode(line); err != nil { - return err - } - appSet[row.AppID] = struct{}{} - count++ - return nil - }) - if streamErr != nil { - slog.Error("backup: stream failed", "user_id", userID, "written", count, "error", streamErr) - // Headers are flushed; best we can do is close the zip so the file - // isn't corrupt. The manifest won't land, and the absence of it is + iter := storeIterator(r.Context(), h.store, userID) + if err := WriteBackup(w, userID, createdAt, iter); err != nil { + // Headers are flushed so we cannot downgrade to a 500 here; closing + // the zip partial is the best we can do. The missing manifest is // itself a signal to the importer that the export was truncated. + slog.Error("backup: write failed", "user_id", userID, "error", err) return } - // ─── manifest.json entry ───────────────────────────────────── - apps := make([]string, 0, len(appSet)) - for a := range appSet { - apps = append(apps, a) - } - sort.Strings(apps) - - manifest := manifestFile{ - FormatVersion: BackupFormatVersion, - SchemaVersion: syncproto.CurrentSchemaVersion, - UserID: userID, - CreatedAt: createdAt.Format(time.RFC3339Nano), - EventCount: count, - EventsSHA256: hex.EncodeToString(hasher.Sum(nil)), - Apps: apps, - ProducedBy: "mana-sync", - SchemaVersionMin: minVer, - SchemaVersionMax: maxVer, - } - manifestBytes, err := json.MarshalIndent(manifest, "", " ") - if err != nil { - slog.Error("backup: marshal manifest", "user_id", userID, "error", err) - return - } - manifestWriter, err := zw.CreateHeader(&zip.FileHeader{ - Name: "manifest.json", - Method: zip.Deflate, - Modified: createdAt, - }) - if err != nil { - slog.Error("backup: create manifest entry", "user_id", userID, "error", err) - return - } - if _, err := manifestWriter.Write(manifestBytes); err != nil { - slog.Error("backup: write manifest", "user_id", userID, "error", err) - return - } - - closeZip() - slog.Info("backup export ok", - "user_id", userID, - "rows", count, - "apps", len(apps), - "schema_min", minVer, - "schema_max", maxVer, - ) + slog.Info("backup export ok", "user_id", userID) +} + +// storeIterator adapts store.Store.StreamAllUserChanges to the RowIterator +// shape WriteBackup expects, holding the request context in the closure. +func storeIterator(ctx context.Context, s *store.Store, userID string) RowIterator { + return func(fn func(store.ChangeRow) error) error { + return s.StreamAllUserChanges(ctx, userID, fn) + } } diff --git a/services/mana-sync/internal/backup/writer.go b/services/mana-sync/internal/backup/writer.go new file mode 100644 index 000000000..e4eeeb6d9 --- /dev/null +++ b/services/mana-sync/internal/backup/writer.go @@ -0,0 +1,133 @@ +package backup + +import ( + "archive/zip" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "sort" + "time" + + syncproto "github.com/mana/mana-sync/internal/sync" + "github.com/mana/mana-sync/internal/store" +) + +// RowIterator yields every sync_changes row that belongs in a backup, +// invoking fn for each. The HTTP handler wires this to +// store.StreamAllUserChanges; tests wire it to an in-memory slice so the +// zip writer can be exercised without Postgres. +type RowIterator func(fn func(store.ChangeRow) error) error + +// WriteBackup serializes the user's sync_changes as a .mana zip archive +// into dst. This is the integration point with io.Writer so both the HTTP +// streaming path and tests share the same byte-for-byte production code. +// +// Single pass: events.jsonl is written first while sha256 tees through the +// encoder; manifest.json lands as a second zip entry with the final hash. +// +// The function returns after closing the zip's central directory, so dst +// contains a fully valid archive by the time err == nil. +func WriteBackup(dst io.Writer, userID string, createdAt time.Time, iter RowIterator) error { + if userID == "" { + return fmt.Errorf("backup: empty userID") + } + + zw := zip.NewWriter(dst) + defer zw.Close() + + eventsWriter, err := zw.CreateHeader(&zip.FileHeader{ + Name: "events.jsonl", + Method: zip.Deflate, + Modified: createdAt, + }) + if err != nil { + return fmt.Errorf("backup: create events.jsonl entry: %w", err) + } + + hasher := sha256.New() + teed := io.MultiWriter(eventsWriter, hasher) + encoder := json.NewEncoder(teed) + + var ( + count int + appSet = make(map[string]struct{}) + minVer int + maxVer int + ) + + if err := iter(func(row store.ChangeRow) error { + sv := row.SchemaVersion + if sv <= 0 { + sv = 1 + } + if count == 0 { + minVer = sv + maxVer = sv + } else { + if sv < minVer { + minVer = sv + } + if sv > maxVer { + maxVer = sv + } + } + line := exportLine{ + EventID: row.ID, + SchemaVersion: sv, + AppID: row.AppID, + Table: row.TableName, + RecordID: row.RecordID, + Op: row.Op, + Data: row.Data, + FieldTimestamps: row.FieldTimestamps, + ClientID: row.ClientID, + CreatedAt: row.CreatedAt.UTC().Format(time.RFC3339Nano), + } + if err := encoder.Encode(line); err != nil { + return err + } + appSet[row.AppID] = struct{}{} + count++ + return nil + }); err != nil { + return fmt.Errorf("backup: iterate rows: %w", err) + } + + apps := make([]string, 0, len(appSet)) + for a := range appSet { + apps = append(apps, a) + } + sort.Strings(apps) + + manifest := manifestFile{ + FormatVersion: BackupFormatVersion, + SchemaVersion: syncproto.CurrentSchemaVersion, + UserID: userID, + CreatedAt: createdAt.UTC().Format(time.RFC3339Nano), + EventCount: count, + EventsSHA256: hex.EncodeToString(hasher.Sum(nil)), + Apps: apps, + ProducedBy: "mana-sync", + SchemaVersionMin: minVer, + SchemaVersionMax: maxVer, + } + manifestBytes, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + return fmt.Errorf("backup: marshal manifest: %w", err) + } + manifestWriter, err := zw.CreateHeader(&zip.FileHeader{ + Name: "manifest.json", + Method: zip.Deflate, + Modified: createdAt, + }) + if err != nil { + return fmt.Errorf("backup: create manifest entry: %w", err) + } + if _, err := manifestWriter.Write(manifestBytes); err != nil { + return fmt.Errorf("backup: write manifest: %w", err) + } + + return zw.Close() +} diff --git a/services/mana-sync/internal/backup/writer_test.go b/services/mana-sync/internal/backup/writer_test.go new file mode 100644 index 000000000..5e19c5152 --- /dev/null +++ b/services/mana-sync/internal/backup/writer_test.go @@ -0,0 +1,251 @@ +package backup + +import ( + "archive/zip" + "bytes" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "io" + "strings" + "testing" + "time" + + "github.com/mana/mana-sync/internal/store" +) + +// rowsIterator returns a RowIterator that walks a fixed slice of rows. +// Used in place of the Postgres store so tests exercise the writer +// end-to-end without a live DB. +func rowsIterator(rows []store.ChangeRow) RowIterator { + return func(fn func(store.ChangeRow) error) error { + for _, r := range rows { + if err := fn(r); err != nil { + return err + } + } + return nil + } +} + +func sampleRows() []store.ChangeRow { + ts := func(s string) time.Time { + t, err := time.Parse(time.RFC3339Nano, s) + if err != nil { + panic(err) + } + return t + } + return []store.ChangeRow{ + { + ID: "evt-1", + AppID: "todo", + TableName: "tasks", + RecordID: "task-1", + Op: "insert", + Data: map[string]any{"title": "Buy milk"}, + ClientID: "client-a", + CreatedAt: ts("2026-04-14T10:00:00.000Z"), + SchemaVersion: 1, + }, + { + ID: "evt-2", + AppID: "todo", + TableName: "tasks", + RecordID: "task-1", + Op: "update", + Data: map[string]any{"completed": true}, + FieldTimestamps: map[string]string{"completed": "2026-04-14T10:05:00.000Z"}, + ClientID: "client-a", + CreatedAt: ts("2026-04-14T10:05:00.000Z"), + SchemaVersion: 1, + }, + { + ID: "evt-3", + AppID: "calendar", + TableName: "events", + RecordID: "evt-42", + Op: "insert", + Data: map[string]any{"title": "Meeting"}, + ClientID: "client-b", + CreatedAt: ts("2026-04-14T11:00:00.000Z"), + SchemaVersion: 1, + }, + } +} + +func TestWriteBackup_Roundtrip(t *testing.T) { + var buf bytes.Buffer + createdAt := time.Date(2026, 4, 14, 12, 0, 0, 0, time.UTC) + + if err := WriteBackup(&buf, "user-123", createdAt, rowsIterator(sampleRows())); err != nil { + t.Fatalf("WriteBackup: %v", err) + } + + // Archive must parse as a valid zip with exactly two entries. + zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + if err != nil { + t.Fatalf("zip.NewReader: %v", err) + } + if len(zr.File) != 2 { + t.Fatalf("expected 2 entries, got %d", len(zr.File)) + } + + events := readZipEntry(t, zr, "events.jsonl") + manifestBytes := readZipEntry(t, zr, "manifest.json") + + // events.jsonl: three newline-separated JSON records in input order. + lines := strings.Split(strings.TrimRight(string(events), "\n"), "\n") + if len(lines) != 3 { + t.Fatalf("expected 3 events, got %d", len(lines)) + } + + // Event 1 is insert with data, no fieldTimestamps. + var e1 map[string]any + if err := json.Unmarshal([]byte(lines[0]), &e1); err != nil { + t.Fatalf("parse line 0: %v", err) + } + if e1["op"] != "insert" || e1["eventId"] != "evt-1" || e1["appId"] != "todo" { + t.Fatalf("event 0 unexpected: %#v", e1) + } + if _, ok := e1["fieldTimestamps"]; ok { + t.Fatalf("event 0 should omit fieldTimestamps (insert)") + } + + // Event 2 is update with fieldTimestamps surfaced. + var e2 map[string]any + if err := json.Unmarshal([]byte(lines[1]), &e2); err != nil { + t.Fatalf("parse line 1: %v", err) + } + ft, ok := e2["fieldTimestamps"].(map[string]any) + if !ok { + t.Fatalf("event 1 fieldTimestamps missing") + } + if ft["completed"] != "2026-04-14T10:05:00.000Z" { + t.Fatalf("event 1 fieldTimestamps wrong: %#v", ft) + } + + // Manifest: all declared fields match what we wrote. + var m manifestFile + if err := json.Unmarshal(manifestBytes, &m); err != nil { + t.Fatalf("parse manifest: %v", err) + } + if m.FormatVersion != BackupFormatVersion { + t.Fatalf("formatVersion=%d want %d", m.FormatVersion, BackupFormatVersion) + } + if m.UserID != "user-123" { + t.Fatalf("userId=%q want user-123", m.UserID) + } + if m.EventCount != 3 { + t.Fatalf("eventCount=%d want 3", m.EventCount) + } + if m.SchemaVersionMin != 1 || m.SchemaVersionMax != 1 { + t.Fatalf("schemaVersion range=[%d,%d] want [1,1]", m.SchemaVersionMin, m.SchemaVersionMax) + } + if len(m.Apps) != 2 || m.Apps[0] != "calendar" || m.Apps[1] != "todo" { + t.Fatalf("apps=%v want sorted [calendar todo]", m.Apps) + } + if m.ProducedBy != "mana-sync" { + t.Fatalf("producedBy=%q want mana-sync", m.ProducedBy) + } + + // eventsSha256 must match a fresh SHA of the decompressed events body. + h := sha256.New() + h.Write(events) + want := hex.EncodeToString(h.Sum(nil)) + if m.EventsSHA256 != want { + t.Fatalf("eventsSha256 mismatch: manifest=%s recomputed=%s", m.EventsSHA256, want) + } +} + +func TestWriteBackup_EmptyUser(t *testing.T) { + var buf bytes.Buffer + err := WriteBackup(&buf, "", time.Now(), rowsIterator(nil)) + if err == nil { + t.Fatal("expected error for empty userID") + } + if !strings.Contains(err.Error(), "empty userID") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestWriteBackup_NoRows(t *testing.T) { + var buf bytes.Buffer + createdAt := time.Date(2026, 4, 14, 12, 0, 0, 0, time.UTC) + + if err := WriteBackup(&buf, "user-x", createdAt, rowsIterator(nil)); err != nil { + t.Fatalf("WriteBackup: %v", err) + } + + zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + if err != nil { + t.Fatalf("zip.NewReader: %v", err) + } + + events := readZipEntry(t, zr, "events.jsonl") + if len(events) != 0 { + t.Fatalf("expected empty events.jsonl, got %d bytes", len(events)) + } + + manifestBytes := readZipEntry(t, zr, "manifest.json") + var m manifestFile + if err := json.Unmarshal(manifestBytes, &m); err != nil { + t.Fatalf("parse manifest: %v", err) + } + if m.EventCount != 0 { + t.Fatalf("eventCount=%d want 0", m.EventCount) + } + if len(m.Apps) != 0 { + t.Fatalf("apps=%v want empty", m.Apps) + } + // Empty body still needs a valid sha. + if m.EventsSHA256 == "" { + t.Fatal("eventsSha256 empty even for zero-row export") + } +} + +func TestWriteBackup_DefaultsSchemaVersionZeroRowsToOne(t *testing.T) { + // Legacy rows stored before the schema_version column existed scan as + // 0. The writer must clamp them to 1 so the manifest's + // schemaVersionMin/Max never claims a nonexistent protocol version. + rows := []store.ChangeRow{{ + ID: "e1", AppID: "todo", TableName: "tasks", RecordID: "t1", + Op: "insert", Data: map[string]any{"x": 1}, ClientID: "c", + CreatedAt: time.Now(), SchemaVersion: 0, + }} + var buf bytes.Buffer + if err := WriteBackup(&buf, "u", time.Now(), rowsIterator(rows)); err != nil { + t.Fatalf("WriteBackup: %v", err) + } + zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + if err != nil { + t.Fatalf("zip.NewReader: %v", err) + } + events := readZipEntry(t, zr, "events.jsonl") + if !strings.Contains(string(events), `"schemaVersion":1`) { + t.Fatalf("expected schemaVersion:1 in events body, got: %s", events) + } +} + +// readZipEntry reads the named entry out of a zip archive in full. Fails +// the test if the entry is missing or cannot be decompressed. +func readZipEntry(t *testing.T, zr *zip.Reader, name string) []byte { + t.Helper() + for _, f := range zr.File { + if f.Name != name { + continue + } + rc, err := f.Open() + if err != nil { + t.Fatalf("open %s: %v", name, err) + } + defer rc.Close() + body, err := io.ReadAll(rc) + if err != nil { + t.Fatalf("read %s: %v", name, err) + } + return body + } + t.Fatalf("entry %q not found in zip", name) + return nil +}