test(sync): extract WriteBackup + 4 Go integration tests

Refactor: HTTP handler becomes a thin shim over a pure WriteBackup(w,
userID, createdAt, iter) function. RowIterator abstracts the store, so
tests feed synthetic ChangeRow slices and production feeds
StreamAllUserChanges. Zero behavior change in production — same bytes
on the wire.

Tests (all pass):

- TestWriteBackup_Roundtrip: three rows across two apps, assert zip has
  2 entries, events.jsonl has 3 JSON lines in order, insert omits
  fieldTimestamps, update surfaces them, manifest apps are sorted,
  eventsSha256 equals a recomputed sha of the decompressed body.
- TestWriteBackup_EmptyUser: empty userID refused up-front.
- TestWriteBackup_NoRows: zero-row export still produces a valid zip
  with an empty events.jsonl and a manifest with eventCount=0 and a
  non-empty sha (sha of empty input).
- TestWriteBackup_DefaultsSchemaVersionZeroRowsToOne: legacy rows with
  schema_version=0 clamp to 1 so the manifest never claims a protocol
  version that never existed.

Paired with the vitest zip parser suite on the TS side, this closes
the Go-writes / JS-reads round-trip without needing live mana-sync.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-14 17:44:37 +02:00
parent d5cabed14d
commit cf3d93fac1
3 changed files with 408 additions and 141 deletions

View file

@ -26,19 +26,13 @@
package backup
import (
"archive/zip"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"context"
"fmt"
"io"
"log/slog"
"net/http"
"sort"
"time"
"github.com/mana/mana-sync/internal/auth"
syncproto "github.com/mana/mana-sync/internal/sync"
"github.com/mana/mana-sync/internal/store"
)
@ -58,10 +52,9 @@ func NewHandler(s *store.Store, v *auth.Validator) *Handler {
return &Handler{store: s, validator: v}
}
// exportLine is the on-wire shape of one row inside events.jsonl. Field
// names mirror the sync-protocol Change shape so the restore side can feed
// lines straight into applyServerChanges() after running them through the
// migration chain keyed on schemaVersion.
// exportLine is the on-wire shape of one row inside events.jsonl. Shared
// with writer.go so both the HTTP path and the writer tests serialize
// identically.
type exportLine struct {
EventID string `json:"eventId"`
SchemaVersion int `json:"schemaVersion"`
@ -75,11 +68,10 @@ type exportLine struct {
CreatedAt string `json:"createdAt"`
}
// manifestFile is the header object serialized as manifest.json. Kept small
// and declarative so tools can parse it without loading events.jsonl.
// manifestFile is the header object serialized as manifest.json.
type manifestFile struct {
FormatVersion int `json:"formatVersion"`
SchemaVersion int `json:"schemaVersion"` // max event schemaVersion this server knows
SchemaVersion int `json:"schemaVersion"`
UserID string `json:"userId"`
CreatedAt string `json:"createdAt"`
EventCount int `json:"eventCount"`
@ -90,8 +82,10 @@ type manifestFile struct {
SchemaVersionMax int `json:"schemaVersionMax,omitempty"`
}
// HandleExport streams a .mana zip archive containing the user's full
// sync-event log plus a manifest with integrity hash.
// HandleExport is an HTTP shim over WriteBackup: it authenticates, sets
// download headers, and hands the response writer plus a store-backed
// iterator to the shared writer. Tests talk to WriteBackup directly with
// a synthetic iterator.
func (h *Handler) HandleExport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
@ -113,133 +107,22 @@ func (h *Handler) HandleExport(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Accel-Buffering", "no")
w.Header().Set("Cache-Control", "no-store")
zw := zip.NewWriter(w)
// Only close once — closing writes the central directory, which we need
// even if streaming errored partway so the file is at least a valid zip.
zipClosed := false
closeZip := func() {
if zipClosed {
return
}
zipClosed = true
if err := zw.Close(); err != nil {
slog.Error("backup: zip close failed", "user_id", userID, "error", err)
}
}
defer closeZip()
// ─── events.jsonl entry ──────────────────────────────────────
eventsWriter, err := zw.CreateHeader(&zip.FileHeader{
Name: "events.jsonl",
Method: zip.Deflate,
Modified: createdAt,
})
if err != nil {
slog.Error("backup: create events.jsonl entry", "user_id", userID, "error", err)
return
}
hasher := sha256.New()
// Tee so the deflate entry and the hash both see every byte — the hash
// is over the *decompressed* JSONL, which is what the restore side will
// re-hash after unzipping.
teed := io.MultiWriter(eventsWriter, hasher)
encoder := json.NewEncoder(teed)
var (
count int
appSet = make(map[string]struct{})
minVer int
maxVer int
)
streamErr := h.store.StreamAllUserChanges(r.Context(), userID, func(row store.ChangeRow) error {
sv := row.SchemaVersion
if sv <= 0 {
sv = 1
}
if count == 0 {
minVer = sv
maxVer = sv
} else {
if sv < minVer {
minVer = sv
}
if sv > maxVer {
maxVer = sv
}
}
line := exportLine{
EventID: row.ID,
SchemaVersion: sv,
AppID: row.AppID,
Table: row.TableName,
RecordID: row.RecordID,
Op: row.Op,
Data: row.Data,
FieldTimestamps: row.FieldTimestamps,
ClientID: row.ClientID,
CreatedAt: row.CreatedAt.UTC().Format(time.RFC3339Nano),
}
if err := encoder.Encode(line); err != nil {
return err
}
appSet[row.AppID] = struct{}{}
count++
return nil
})
if streamErr != nil {
slog.Error("backup: stream failed", "user_id", userID, "written", count, "error", streamErr)
// Headers are flushed; best we can do is close the zip so the file
// isn't corrupt. The manifest won't land, and the absence of it is
iter := storeIterator(r.Context(), h.store, userID)
if err := WriteBackup(w, userID, createdAt, iter); err != nil {
// Headers are flushed so we cannot downgrade to a 500 here; closing
// the zip partial is the best we can do. The missing manifest is
// itself a signal to the importer that the export was truncated.
slog.Error("backup: write failed", "user_id", userID, "error", err)
return
}
// ─── manifest.json entry ─────────────────────────────────────
apps := make([]string, 0, len(appSet))
for a := range appSet {
apps = append(apps, a)
}
sort.Strings(apps)
manifest := manifestFile{
FormatVersion: BackupFormatVersion,
SchemaVersion: syncproto.CurrentSchemaVersion,
UserID: userID,
CreatedAt: createdAt.Format(time.RFC3339Nano),
EventCount: count,
EventsSHA256: hex.EncodeToString(hasher.Sum(nil)),
Apps: apps,
ProducedBy: "mana-sync",
SchemaVersionMin: minVer,
SchemaVersionMax: maxVer,
}
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
slog.Error("backup: marshal manifest", "user_id", userID, "error", err)
return
}
manifestWriter, err := zw.CreateHeader(&zip.FileHeader{
Name: "manifest.json",
Method: zip.Deflate,
Modified: createdAt,
})
if err != nil {
slog.Error("backup: create manifest entry", "user_id", userID, "error", err)
return
}
if _, err := manifestWriter.Write(manifestBytes); err != nil {
slog.Error("backup: write manifest", "user_id", userID, "error", err)
return
}
closeZip()
slog.Info("backup export ok",
"user_id", userID,
"rows", count,
"apps", len(apps),
"schema_min", minVer,
"schema_max", maxVer,
)
slog.Info("backup export ok", "user_id", userID)
}
// storeIterator adapts store.Store.StreamAllUserChanges to the RowIterator
// shape WriteBackup expects, holding the request context in the closure.
func storeIterator(ctx context.Context, s *store.Store, userID string) RowIterator {
return func(fn func(store.ChangeRow) error) error {
return s.StreamAllUserChanges(ctx, userID, fn)
}
}

View file

@ -0,0 +1,133 @@
package backup
import (
"archive/zip"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"sort"
"time"
syncproto "github.com/mana/mana-sync/internal/sync"
"github.com/mana/mana-sync/internal/store"
)
// RowIterator yields every sync_changes row that belongs in a backup,
// invoking fn for each. The HTTP handler wires this to
// store.StreamAllUserChanges; tests wire it to an in-memory slice so the
// zip writer can be exercised without Postgres.
type RowIterator func(fn func(store.ChangeRow) error) error
// WriteBackup serializes the user's sync_changes as a .mana zip archive
// into dst. This is the integration point with io.Writer so both the HTTP
// streaming path and tests share the same byte-for-byte production code.
//
// Single pass: events.jsonl is written first while sha256 tees through the
// encoder; manifest.json lands as a second zip entry with the final hash.
//
// The function returns after closing the zip's central directory, so dst
// contains a fully valid archive by the time err == nil.
func WriteBackup(dst io.Writer, userID string, createdAt time.Time, iter RowIterator) error {
if userID == "" {
return fmt.Errorf("backup: empty userID")
}
zw := zip.NewWriter(dst)
defer zw.Close()
eventsWriter, err := zw.CreateHeader(&zip.FileHeader{
Name: "events.jsonl",
Method: zip.Deflate,
Modified: createdAt,
})
if err != nil {
return fmt.Errorf("backup: create events.jsonl entry: %w", err)
}
hasher := sha256.New()
teed := io.MultiWriter(eventsWriter, hasher)
encoder := json.NewEncoder(teed)
var (
count int
appSet = make(map[string]struct{})
minVer int
maxVer int
)
if err := iter(func(row store.ChangeRow) error {
sv := row.SchemaVersion
if sv <= 0 {
sv = 1
}
if count == 0 {
minVer = sv
maxVer = sv
} else {
if sv < minVer {
minVer = sv
}
if sv > maxVer {
maxVer = sv
}
}
line := exportLine{
EventID: row.ID,
SchemaVersion: sv,
AppID: row.AppID,
Table: row.TableName,
RecordID: row.RecordID,
Op: row.Op,
Data: row.Data,
FieldTimestamps: row.FieldTimestamps,
ClientID: row.ClientID,
CreatedAt: row.CreatedAt.UTC().Format(time.RFC3339Nano),
}
if err := encoder.Encode(line); err != nil {
return err
}
appSet[row.AppID] = struct{}{}
count++
return nil
}); err != nil {
return fmt.Errorf("backup: iterate rows: %w", err)
}
apps := make([]string, 0, len(appSet))
for a := range appSet {
apps = append(apps, a)
}
sort.Strings(apps)
manifest := manifestFile{
FormatVersion: BackupFormatVersion,
SchemaVersion: syncproto.CurrentSchemaVersion,
UserID: userID,
CreatedAt: createdAt.UTC().Format(time.RFC3339Nano),
EventCount: count,
EventsSHA256: hex.EncodeToString(hasher.Sum(nil)),
Apps: apps,
ProducedBy: "mana-sync",
SchemaVersionMin: minVer,
SchemaVersionMax: maxVer,
}
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return fmt.Errorf("backup: marshal manifest: %w", err)
}
manifestWriter, err := zw.CreateHeader(&zip.FileHeader{
Name: "manifest.json",
Method: zip.Deflate,
Modified: createdAt,
})
if err != nil {
return fmt.Errorf("backup: create manifest entry: %w", err)
}
if _, err := manifestWriter.Write(manifestBytes); err != nil {
return fmt.Errorf("backup: write manifest: %w", err)
}
return zw.Close()
}

View file

@ -0,0 +1,251 @@
package backup
import (
"archive/zip"
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"strings"
"testing"
"time"
"github.com/mana/mana-sync/internal/store"
)
// rowsIterator returns a RowIterator that walks a fixed slice of rows.
// Used in place of the Postgres store so tests exercise the writer
// end-to-end without a live DB.
func rowsIterator(rows []store.ChangeRow) RowIterator {
return func(fn func(store.ChangeRow) error) error {
for _, r := range rows {
if err := fn(r); err != nil {
return err
}
}
return nil
}
}
func sampleRows() []store.ChangeRow {
ts := func(s string) time.Time {
t, err := time.Parse(time.RFC3339Nano, s)
if err != nil {
panic(err)
}
return t
}
return []store.ChangeRow{
{
ID: "evt-1",
AppID: "todo",
TableName: "tasks",
RecordID: "task-1",
Op: "insert",
Data: map[string]any{"title": "Buy milk"},
ClientID: "client-a",
CreatedAt: ts("2026-04-14T10:00:00.000Z"),
SchemaVersion: 1,
},
{
ID: "evt-2",
AppID: "todo",
TableName: "tasks",
RecordID: "task-1",
Op: "update",
Data: map[string]any{"completed": true},
FieldTimestamps: map[string]string{"completed": "2026-04-14T10:05:00.000Z"},
ClientID: "client-a",
CreatedAt: ts("2026-04-14T10:05:00.000Z"),
SchemaVersion: 1,
},
{
ID: "evt-3",
AppID: "calendar",
TableName: "events",
RecordID: "evt-42",
Op: "insert",
Data: map[string]any{"title": "Meeting"},
ClientID: "client-b",
CreatedAt: ts("2026-04-14T11:00:00.000Z"),
SchemaVersion: 1,
},
}
}
func TestWriteBackup_Roundtrip(t *testing.T) {
var buf bytes.Buffer
createdAt := time.Date(2026, 4, 14, 12, 0, 0, 0, time.UTC)
if err := WriteBackup(&buf, "user-123", createdAt, rowsIterator(sampleRows())); err != nil {
t.Fatalf("WriteBackup: %v", err)
}
// Archive must parse as a valid zip with exactly two entries.
zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
if err != nil {
t.Fatalf("zip.NewReader: %v", err)
}
if len(zr.File) != 2 {
t.Fatalf("expected 2 entries, got %d", len(zr.File))
}
events := readZipEntry(t, zr, "events.jsonl")
manifestBytes := readZipEntry(t, zr, "manifest.json")
// events.jsonl: three newline-separated JSON records in input order.
lines := strings.Split(strings.TrimRight(string(events), "\n"), "\n")
if len(lines) != 3 {
t.Fatalf("expected 3 events, got %d", len(lines))
}
// Event 1 is insert with data, no fieldTimestamps.
var e1 map[string]any
if err := json.Unmarshal([]byte(lines[0]), &e1); err != nil {
t.Fatalf("parse line 0: %v", err)
}
if e1["op"] != "insert" || e1["eventId"] != "evt-1" || e1["appId"] != "todo" {
t.Fatalf("event 0 unexpected: %#v", e1)
}
if _, ok := e1["fieldTimestamps"]; ok {
t.Fatalf("event 0 should omit fieldTimestamps (insert)")
}
// Event 2 is update with fieldTimestamps surfaced.
var e2 map[string]any
if err := json.Unmarshal([]byte(lines[1]), &e2); err != nil {
t.Fatalf("parse line 1: %v", err)
}
ft, ok := e2["fieldTimestamps"].(map[string]any)
if !ok {
t.Fatalf("event 1 fieldTimestamps missing")
}
if ft["completed"] != "2026-04-14T10:05:00.000Z" {
t.Fatalf("event 1 fieldTimestamps wrong: %#v", ft)
}
// Manifest: all declared fields match what we wrote.
var m manifestFile
if err := json.Unmarshal(manifestBytes, &m); err != nil {
t.Fatalf("parse manifest: %v", err)
}
if m.FormatVersion != BackupFormatVersion {
t.Fatalf("formatVersion=%d want %d", m.FormatVersion, BackupFormatVersion)
}
if m.UserID != "user-123" {
t.Fatalf("userId=%q want user-123", m.UserID)
}
if m.EventCount != 3 {
t.Fatalf("eventCount=%d want 3", m.EventCount)
}
if m.SchemaVersionMin != 1 || m.SchemaVersionMax != 1 {
t.Fatalf("schemaVersion range=[%d,%d] want [1,1]", m.SchemaVersionMin, m.SchemaVersionMax)
}
if len(m.Apps) != 2 || m.Apps[0] != "calendar" || m.Apps[1] != "todo" {
t.Fatalf("apps=%v want sorted [calendar todo]", m.Apps)
}
if m.ProducedBy != "mana-sync" {
t.Fatalf("producedBy=%q want mana-sync", m.ProducedBy)
}
// eventsSha256 must match a fresh SHA of the decompressed events body.
h := sha256.New()
h.Write(events)
want := hex.EncodeToString(h.Sum(nil))
if m.EventsSHA256 != want {
t.Fatalf("eventsSha256 mismatch: manifest=%s recomputed=%s", m.EventsSHA256, want)
}
}
func TestWriteBackup_EmptyUser(t *testing.T) {
var buf bytes.Buffer
err := WriteBackup(&buf, "", time.Now(), rowsIterator(nil))
if err == nil {
t.Fatal("expected error for empty userID")
}
if !strings.Contains(err.Error(), "empty userID") {
t.Fatalf("unexpected error: %v", err)
}
}
func TestWriteBackup_NoRows(t *testing.T) {
var buf bytes.Buffer
createdAt := time.Date(2026, 4, 14, 12, 0, 0, 0, time.UTC)
if err := WriteBackup(&buf, "user-x", createdAt, rowsIterator(nil)); err != nil {
t.Fatalf("WriteBackup: %v", err)
}
zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
if err != nil {
t.Fatalf("zip.NewReader: %v", err)
}
events := readZipEntry(t, zr, "events.jsonl")
if len(events) != 0 {
t.Fatalf("expected empty events.jsonl, got %d bytes", len(events))
}
manifestBytes := readZipEntry(t, zr, "manifest.json")
var m manifestFile
if err := json.Unmarshal(manifestBytes, &m); err != nil {
t.Fatalf("parse manifest: %v", err)
}
if m.EventCount != 0 {
t.Fatalf("eventCount=%d want 0", m.EventCount)
}
if len(m.Apps) != 0 {
t.Fatalf("apps=%v want empty", m.Apps)
}
// Empty body still needs a valid sha.
if m.EventsSHA256 == "" {
t.Fatal("eventsSha256 empty even for zero-row export")
}
}
func TestWriteBackup_DefaultsSchemaVersionZeroRowsToOne(t *testing.T) {
// Legacy rows stored before the schema_version column existed scan as
// 0. The writer must clamp them to 1 so the manifest's
// schemaVersionMin/Max never claims a nonexistent protocol version.
rows := []store.ChangeRow{{
ID: "e1", AppID: "todo", TableName: "tasks", RecordID: "t1",
Op: "insert", Data: map[string]any{"x": 1}, ClientID: "c",
CreatedAt: time.Now(), SchemaVersion: 0,
}}
var buf bytes.Buffer
if err := WriteBackup(&buf, "u", time.Now(), rowsIterator(rows)); err != nil {
t.Fatalf("WriteBackup: %v", err)
}
zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
if err != nil {
t.Fatalf("zip.NewReader: %v", err)
}
events := readZipEntry(t, zr, "events.jsonl")
if !strings.Contains(string(events), `"schemaVersion":1`) {
t.Fatalf("expected schemaVersion:1 in events body, got: %s", events)
}
}
// readZipEntry reads the named entry out of a zip archive in full. Fails
// the test if the entry is missing or cannot be decompressed.
func readZipEntry(t *testing.T, zr *zip.Reader, name string) []byte {
t.Helper()
for _, f := range zr.File {
if f.Name != name {
continue
}
rc, err := f.Open()
if err != nil {
t.Fatalf("open %s: %v", name, err)
}
defer rc.Close()
body, err := io.ReadAll(rc)
if err != nil {
t.Fatalf("read %s: %v", name, err)
}
return body
}
t.Fatalf("entry %q not found in zip", name)
return nil
}