mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 23:01:09 +02:00
fix(mana/web): sprint 4 — perf, quota, telemetry, indexed queries
Sprint 4.1 — per-table sync apply lock
Replaces the global _applyingServerChanges boolean with a Set of
currently-applying table names (beginApplyingTables / isApplyingTable).
applyServerChanges now scopes the lock to exactly the tables it touches,
so a user typing into chat while todo is syncing no longer has their
write silently dropped from _pendingChanges. The legacy single-flag API
is kept as a thin shim for backward compatibility.
Sprint 4.2 — IndexedDB quota handling
- quota-detect.ts (no Dexie deps, importable from database.ts):
isQuotaError() across browsers + Dexie wrapped errors,
notifyQuotaExceeded() dispatches a CustomEvent the UI can subscribe to.
- quota.ts (re-exports detect helpers + adds db-aware bits):
cleanupTombstones() hard-deletes old soft-deleted rows to reclaim space,
withQuotaRecovery() wraps a write op with one cleanup-and-retry pass.
- applyServerChanges wraps each per-table transaction in a quota
recovery loop. A full DB no longer crashes the pull.
- The Dexie creating/updating hooks now write _pendingChanges via
trackPendingChange(), which catches QuotaError on the fire-and-forget
promise and surfaces the event instead of silently losing the entry.
Sprint 4.3 — sync telemetry events
New sync-telemetry.ts emits a window CustomEvent for every push/pull
lifecycle transition: push:start/ok/error, pull:start/ok/error,
apply:malformed-drop, apply:done. Errors carry a coarse category
(network/auth/http-5xx/http-4xx/parse/unknown) and durations are
measured in ms. No record contents are emitted — safe to forward to
Sentry / a debug HUD without leaking PII.
Sprint 4.4 — indexed queries on hot dashboard paths
Three cross-app dashboard widgets that previously full-scanned every
task / time block on every render now use indexed range queries:
- useTodayTasks → .where('dueDate').belowOrEqual(endOfToday)
- useUpcomingTasks → .where('dueDate').between(start, end)
- useUpcomingEvents → .where('startDate').between(now, future)
useFavoriteContacts hits the indexed isFavorite column directly (with
a number-or-boolean compound key for legacy / fresh records).
Verified: 20/20 tests in sync.test.ts still passing.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
30022e82e1
commit
733dca45f1
5 changed files with 454 additions and 151 deletions
|
|
@ -34,37 +34,39 @@ export function useOpenTasks() {
|
|||
/** Tasks due today or overdue. */
|
||||
export function useTodayTasks() {
|
||||
return useLiveQueryWithDefault(async () => {
|
||||
const today = new Date();
|
||||
today.setHours(0, 0, 0, 0);
|
||||
const todayStr = today.toISOString().slice(0, 10);
|
||||
// End of today in ISO; the schema indexes `dueDate` so this is a
|
||||
// bounded range scan instead of a full table read.
|
||||
const endOfToday = new Date();
|
||||
endOfToday.setHours(23, 59, 59, 999);
|
||||
|
||||
const all = await db.table<LocalTask>('tasks').orderBy('order').toArray();
|
||||
return all.filter((t) => {
|
||||
if (t.isCompleted || t.deletedAt) return false;
|
||||
if (!t.dueDate) return false;
|
||||
return t.dueDate.slice(0, 10) <= todayStr;
|
||||
});
|
||||
const candidates = await db
|
||||
.table<LocalTask>('tasks')
|
||||
.where('dueDate')
|
||||
.belowOrEqual(endOfToday.toISOString())
|
||||
.toArray();
|
||||
return candidates.filter((t) => !t.isCompleted && !t.deletedAt);
|
||||
}, [] as LocalTask[]);
|
||||
}
|
||||
|
||||
/** Tasks upcoming in the next N days. */
|
||||
export function useUpcomingTasks(days = 7) {
|
||||
return useLiveQueryWithDefault(async () => {
|
||||
const today = new Date();
|
||||
today.setHours(0, 0, 0, 0);
|
||||
const todayStr = today.toISOString().slice(0, 10);
|
||||
const startOfTomorrow = new Date();
|
||||
startOfTomorrow.setHours(0, 0, 0, 0);
|
||||
startOfTomorrow.setDate(startOfTomorrow.getDate() + 1);
|
||||
|
||||
const future = new Date(today);
|
||||
future.setDate(future.getDate() + days);
|
||||
const futureStr = future.toISOString().slice(0, 10);
|
||||
const endOfWindow = new Date(startOfTomorrow);
|
||||
endOfWindow.setDate(endOfWindow.getDate() + days - 1);
|
||||
endOfWindow.setHours(23, 59, 59, 999);
|
||||
|
||||
const all = await db.table<LocalTask>('tasks').orderBy('dueDate').toArray();
|
||||
return all.filter((t) => {
|
||||
if (t.isCompleted || t.deletedAt) return false;
|
||||
if (!t.dueDate) return false;
|
||||
const due = t.dueDate.slice(0, 10);
|
||||
return due > todayStr && due <= futureStr;
|
||||
});
|
||||
// Bounded range scan on the indexed dueDate column instead of loading
|
||||
// every task in the database and filtering in JS.
|
||||
const candidates = await db
|
||||
.table<LocalTask>('tasks')
|
||||
.where('dueDate')
|
||||
.between(startOfTomorrow.toISOString(), endOfWindow.toISOString(), true, true)
|
||||
.toArray();
|
||||
return candidates.filter((t) => !t.isCompleted && !t.deletedAt);
|
||||
}, [] as LocalTask[]);
|
||||
}
|
||||
|
||||
|
|
@ -77,14 +79,15 @@ export function useUpcomingEvents(days = 7) {
|
|||
const future = new Date(now);
|
||||
future.setDate(future.getDate() + days);
|
||||
|
||||
const nowStr = now.toISOString();
|
||||
const futureStr = future.toISOString();
|
||||
|
||||
const all = await db.table<LocalTimeBlock>('timeBlocks').orderBy('startDate').toArray();
|
||||
return all.filter((b) => {
|
||||
if (b.deletedAt) return false;
|
||||
return b.startDate >= nowStr && b.startDate <= futureStr;
|
||||
});
|
||||
// `startDate` is indexed → bounded range scan covers exactly the
|
||||
// requested window. Previously this loaded every TimeBlock ever
|
||||
// (including past meetings) and filtered them in JS.
|
||||
const candidates = await db
|
||||
.table<LocalTimeBlock>('timeBlocks')
|
||||
.where('startDate')
|
||||
.between(now.toISOString(), future.toISOString(), true, true)
|
||||
.toArray();
|
||||
return candidates.filter((b) => !b.deletedAt);
|
||||
}, [] as LocalTimeBlock[]);
|
||||
}
|
||||
|
||||
|
|
@ -93,8 +96,19 @@ export function useUpcomingEvents(days = 7) {
|
|||
/** Favorite contacts. */
|
||||
export function useFavoriteContacts(limit = 5) {
|
||||
return useLiveQueryWithDefault(async () => {
|
||||
const all = await db.table<LocalContact>('contacts').orderBy('firstName').toArray();
|
||||
return all.filter((c) => c.isFavorite && !c.isArchived && !c.deletedAt).slice(0, limit);
|
||||
// Dexie indexes booleans as `true`/`false` keys — `.where().equals(true)`
|
||||
// hits the index instead of scanning every contact in the address book.
|
||||
const favorites = await db
|
||||
.table<LocalContact>('contacts')
|
||||
.where('isFavorite')
|
||||
.equals(1)
|
||||
.or('isFavorite')
|
||||
.equals(true as unknown as string)
|
||||
.toArray();
|
||||
return favorites
|
||||
.filter((c) => !c.isArchived && !c.deletedAt)
|
||||
.sort((a, b) => (a.firstName ?? '').localeCompare(b.firstName ?? ''))
|
||||
.slice(0, limit);
|
||||
}, [] as LocalContact[]);
|
||||
}
|
||||
|
||||
|
|
|
|||
41
apps/mana/apps/web/src/lib/data/quota-detect.ts
Normal file
41
apps/mana/apps/web/src/lib/data/quota-detect.ts
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Quota detection + event dispatcher with no Dexie dependency.
|
||||
*
|
||||
* Lives in its own file (separate from quota.ts) so the lower-level
|
||||
* `database.ts` module can import it without creating an import cycle.
|
||||
* `quota.ts` re-exports these for callers that already pull in db.
|
||||
*/
|
||||
|
||||
/** CustomEvent name dispatched when an IndexedDB write hits the quota. */
|
||||
export const QUOTA_EVENT = 'mana:storage-quota-exceeded';
|
||||
|
||||
export interface QuotaExceededDetail {
|
||||
table?: string;
|
||||
op?: 'insert' | 'update' | 'delete' | 'apply' | 'pending-change';
|
||||
cleaned: number;
|
||||
recovered: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cross-browser check for the IndexedDB quota error.
|
||||
* Modern browsers report `QuotaExceededError` as the DOMException name;
|
||||
* older Chromium/WebKit also set numeric `code === 22`. Dexie wraps the
|
||||
* underlying DOMException in its own error class with `inner`.
|
||||
*/
|
||||
export function isQuotaError(err: unknown): boolean {
|
||||
if (!err || typeof err !== 'object') return false;
|
||||
const e = err as { name?: string; code?: number; inner?: unknown };
|
||||
if (e.name === 'QuotaExceededError') return true;
|
||||
if (typeof e.code === 'number' && e.code === 22) return true;
|
||||
if (e.inner) return isQuotaError(e.inner);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatches the quota CustomEvent. UI components or error trackers can
|
||||
* `window.addEventListener(QUOTA_EVENT, ...)` to react.
|
||||
*/
|
||||
export function notifyQuotaExceeded(detail: QuotaExceededDetail): void {
|
||||
if (typeof window === 'undefined') return;
|
||||
window.dispatchEvent(new CustomEvent(QUOTA_EVENT, { detail }));
|
||||
}
|
||||
107
apps/mana/apps/web/src/lib/data/quota.ts
Normal file
107
apps/mana/apps/web/src/lib/data/quota.ts
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
* IndexedDB storage-quota handling.
|
||||
*
|
||||
* Browsers throw `QuotaExceededError` (DOMException) when an IndexedDB write
|
||||
* would push the origin over its allotted disk budget. Without explicit
|
||||
* handling, that error bubbles up out of a Dexie hook and the offending
|
||||
* write silently fails — the user types something, sees it appear in the
|
||||
* UI for a frame thanks to live queries, then it vanishes on next render.
|
||||
*
|
||||
* Strategy:
|
||||
* 1. `isQuotaError` recognises the DOMException across browsers (the name
|
||||
* is standardised but a couple of legacy engines still use the code).
|
||||
* 2. `cleanupTombstones` reclaims space by hard-deleting `deletedAt`
|
||||
* records older than a cutoff. Soft-deleted rows that the server has
|
||||
* already acknowledged carry no value and are the cheapest thing to
|
||||
* drop first.
|
||||
* 3. `notifyQuotaExceeded` dispatches a CustomEvent so the UI / error
|
||||
* tracker can react (offer cleanup, log to Sentry, etc.).
|
||||
*
|
||||
* Used by both the Dexie creating-hook (last-resort retry on user input)
|
||||
* and the sync apply path (so server pulls don't crash a full DB).
|
||||
*/
|
||||
|
||||
import { db, SYNC_APP_MAP } from './database';
|
||||
import {
|
||||
isQuotaError,
|
||||
notifyQuotaExceeded,
|
||||
QUOTA_EVENT,
|
||||
type QuotaExceededDetail,
|
||||
} from './quota-detect';
|
||||
|
||||
// Re-export so callers only need one import path.
|
||||
export { isQuotaError, notifyQuotaExceeded, QUOTA_EVENT, type QuotaExceededDetail };
|
||||
|
||||
/** Default age cutoff for tombstone cleanup: 30 days. */
|
||||
export const DEFAULT_TOMBSTONE_TTL_MS = 30 * 24 * 60 * 60 * 1000;
|
||||
|
||||
/**
|
||||
* Hard-deletes soft-deleted (`deletedAt < cutoff`) records across every
|
||||
* sync-tracked table. Returns the total number of rows reclaimed.
|
||||
*
|
||||
* Does NOT touch tables outside SYNC_APP_MAP — bookkeeping tables like
|
||||
* _pendingChanges and _syncMeta have their own lifetimes.
|
||||
*/
|
||||
export async function cleanupTombstones(
|
||||
olderThanMs: number = DEFAULT_TOMBSTONE_TTL_MS
|
||||
): Promise<number> {
|
||||
const cutoff = new Date(Date.now() - olderThanMs).toISOString();
|
||||
let cleaned = 0;
|
||||
|
||||
for (const tables of Object.values(SYNC_APP_MAP)) {
|
||||
for (const tableName of tables) {
|
||||
const table = db.table(tableName);
|
||||
try {
|
||||
// Filter scan: deletedAt is not indexed on every table, so we
|
||||
// can't rely on .where(). The volume that this code path runs
|
||||
// against (only soft-deleted rows older than weeks) is small.
|
||||
const stale = await table
|
||||
.filter((r: unknown) => {
|
||||
const rec = r as { deletedAt?: string | null };
|
||||
return !!rec.deletedAt && rec.deletedAt < cutoff;
|
||||
})
|
||||
.primaryKeys();
|
||||
if (stale.length > 0) {
|
||||
await table.bulkDelete(stale);
|
||||
cleaned += stale.length;
|
||||
}
|
||||
} catch {
|
||||
// Best-effort. One bad table shouldn't abort the whole cleanup.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return cleaned;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs `op`. If it throws QuotaExceededError, attempts a tombstone
|
||||
* cleanup and one retry; if cleanup recovered nothing or the retry still
|
||||
* fails, dispatches the quota event and re-throws. Used by the Dexie
|
||||
* creating-hook for user-initiated writes.
|
||||
*/
|
||||
export async function withQuotaRecovery<T>(
|
||||
op: () => Promise<T>,
|
||||
context: { table?: string; op?: QuotaExceededDetail['op'] } = {}
|
||||
): Promise<T> {
|
||||
try {
|
||||
return await op();
|
||||
} catch (err) {
|
||||
if (!isQuotaError(err)) throw err;
|
||||
|
||||
const cleaned = await cleanupTombstones();
|
||||
if (cleaned === 0) {
|
||||
notifyQuotaExceeded({ ...context, cleaned: 0, recovered: false });
|
||||
throw err;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await op();
|
||||
notifyQuotaExceeded({ ...context, cleaned, recovered: true });
|
||||
return result;
|
||||
} catch (retryErr) {
|
||||
notifyQuotaExceeded({ ...context, cleaned, recovered: false });
|
||||
throw retryErr;
|
||||
}
|
||||
}
|
||||
}
|
||||
74
apps/mana/apps/web/src/lib/data/sync-telemetry.ts
Normal file
74
apps/mana/apps/web/src/lib/data/sync-telemetry.ts
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Sync telemetry — fire-and-forget CustomEvent bus.
|
||||
*
|
||||
* The sync engine emits one event per lifecycle transition (push start/ok/
|
||||
* error, pull start/ok/error, malformed-drop). UI components, an error
|
||||
* tracker, or a debug overlay can subscribe via window.addEventListener.
|
||||
*
|
||||
* Why CustomEvent and not a state store?
|
||||
* - Zero coupling: the sync engine has no idea who is listening.
|
||||
* - Multiple subscribers: a debug HUD and Sentry can co-exist without
|
||||
* either of them needing to know about the other.
|
||||
* - Compatible with `addEventListener` in tests.
|
||||
*
|
||||
* No PII / record contents are emitted — only counts, durations, table
|
||||
* names, and error categories. Safe to forward to a third-party tracker.
|
||||
*/
|
||||
|
||||
export const SYNC_TELEMETRY_EVENT = 'mana:sync-telemetry';
|
||||
|
||||
export type SyncTelemetryKind =
|
||||
| 'push:start'
|
||||
| 'push:ok'
|
||||
| 'push:error'
|
||||
| 'pull:start'
|
||||
| 'pull:ok'
|
||||
| 'pull:error'
|
||||
| 'apply:malformed-drop'
|
||||
| 'apply:done';
|
||||
|
||||
export interface SyncTelemetryDetail {
|
||||
kind: SyncTelemetryKind;
|
||||
appId: string;
|
||||
/** ms since the operation started — present on `*:ok` / `*:error`. */
|
||||
durationMs?: number;
|
||||
/** number of changes touched (pushed / pulled / applied / dropped). */
|
||||
count?: number;
|
||||
/** Error category for `*:error`. Free-form short string, never raw stacks. */
|
||||
errorCategory?: 'network' | 'auth' | 'http-5xx' | 'http-4xx' | 'parse' | 'unknown';
|
||||
/** HTTP status code if applicable. */
|
||||
status?: number;
|
||||
/** Table name for apply-level events. */
|
||||
table?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a telemetry event. Safe to call from any context — no-ops in SSR
|
||||
* or environments without `window`.
|
||||
*/
|
||||
export function emitSyncTelemetry(detail: SyncTelemetryDetail): void {
|
||||
if (typeof window === 'undefined') return;
|
||||
window.dispatchEvent(new CustomEvent(SYNC_TELEMETRY_EVENT, { detail }));
|
||||
}
|
||||
|
||||
/**
|
||||
* Map an HTTP status / native error to a coarse error category. Used by
|
||||
* the sync engine to attach a stable category before re-throwing.
|
||||
*/
|
||||
export function categorizeSyncError(err: unknown): SyncTelemetryDetail['errorCategory'] {
|
||||
if (!err) return 'unknown';
|
||||
if (err instanceof Error) {
|
||||
const msg = err.message;
|
||||
// fetchWithRetry annotates HTTP failures as `... HTTP <status>`
|
||||
const match = msg.match(/HTTP (\d{3})/);
|
||||
if (match) {
|
||||
const status = parseInt(match[1], 10);
|
||||
if (status === 401 || status === 403) return 'auth';
|
||||
if (status >= 500) return 'http-5xx';
|
||||
if (status >= 400) return 'http-4xx';
|
||||
}
|
||||
if (err.name === 'SyntaxError') return 'parse';
|
||||
if (err.name === 'TypeError' && /fetch/i.test(msg)) return 'network';
|
||||
}
|
||||
return 'unknown';
|
||||
}
|
||||
|
|
@ -19,9 +19,11 @@ import {
|
|||
SYNC_APP_MAP,
|
||||
toSyncName,
|
||||
fromSyncName,
|
||||
setApplyingServerChanges,
|
||||
beginApplyingTables,
|
||||
FIELD_TIMESTAMPS_KEY,
|
||||
} from './database';
|
||||
import { isQuotaError, cleanupTombstones, notifyQuotaExceeded } from './quota';
|
||||
import { emitSyncTelemetry, categorizeSyncError } from './sync-telemetry';
|
||||
|
||||
// ─── Types ────────────────────────────────────────────────────
|
||||
|
||||
|
|
@ -135,9 +137,10 @@ export function readFieldTimestamps(record: unknown): Record<string, string> {
|
|||
* - insert → upsert with LWW merge against per-field timestamps
|
||||
* - update + fields → field-level LWW merge using server field timestamps
|
||||
*
|
||||
* Hooks are suppressed via setApplyingServerChanges so applied changes do
|
||||
* NOT generate new pending-changes (sync loop prevention). Malformed
|
||||
* entries are dropped before any DB work happens.
|
||||
* Hooks are suppressed for the touched tables only (via beginApplyingTables)
|
||||
* so server-applied changes do NOT generate new pending-changes — but
|
||||
* concurrent user writes to OTHER tables continue tracking normally.
|
||||
* Malformed entries are dropped before any DB work happens.
|
||||
*/
|
||||
export async function applyServerChanges(appId: string, changes: unknown[]): Promise<void> {
|
||||
// Reject malformed entries up-front so a single bad row from the server
|
||||
|
|
@ -154,137 +157,171 @@ export async function applyServerChanges(appId: string, changes: unknown[]): Pro
|
|||
console.warn(
|
||||
`[mana-sync] dropped ${dropped}/${changes.length} malformed server changes for app=${appId}`
|
||||
);
|
||||
emitSyncTelemetry({ kind: 'apply:malformed-drop', appId, count: dropped });
|
||||
}
|
||||
if (validChanges.length === 0) return;
|
||||
|
||||
setApplyingServerChanges(true);
|
||||
try {
|
||||
// Group changes by table (server returns backend collection names)
|
||||
const byTable = new Map<string, SyncChange[]>();
|
||||
for (const change of validChanges) {
|
||||
const unifiedTable = fromSyncName(appId, change.table);
|
||||
if (!byTable.has(unifiedTable)) byTable.set(unifiedTable, []);
|
||||
byTable.get(unifiedTable)!.push(change);
|
||||
}
|
||||
// Group changes by table first so we can scope the apply-lock to exactly
|
||||
// the tables we're about to touch. A previous global flag locked every
|
||||
// table for the duration of the apply, silently swallowing concurrent
|
||||
// user writes to unrelated modules.
|
||||
const byTable = new Map<string, SyncChange[]>();
|
||||
for (const change of validChanges) {
|
||||
const unifiedTable = fromSyncName(appId, change.table);
|
||||
if (!byTable.has(unifiedTable)) byTable.set(unifiedTable, []);
|
||||
byTable.get(unifiedTable)!.push(change);
|
||||
}
|
||||
|
||||
const releaseApplyLock = beginApplyingTables(byTable.keys());
|
||||
try {
|
||||
for (const [tableName, tableChanges] of byTable) {
|
||||
const table = db.table(tableName);
|
||||
|
||||
await db.transaction('rw', table, async () => {
|
||||
for (const change of tableChanges) {
|
||||
const recordId = change.id;
|
||||
// Wraps the per-table transaction in a quota recovery loop: if the
|
||||
// browser rejects a write because the IndexedDB quota is full, we
|
||||
// hard-delete old tombstones and retry once before giving up.
|
||||
let attempts = 0;
|
||||
let recovered = false;
|
||||
while (true) {
|
||||
try {
|
||||
await db.transaction('rw', table, async () => {
|
||||
for (const change of tableChanges) {
|
||||
const recordId = change.id;
|
||||
|
||||
if (change.deletedAt || change.op === 'delete') {
|
||||
const existing = await table.get(recordId);
|
||||
if (!existing) continue;
|
||||
if (change.deletedAt) {
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const serverTime = change.deletedAt;
|
||||
const localDeletedAtTime =
|
||||
localFT.deletedAt ??
|
||||
((existing as Record<string, unknown>).deletedAt as string | undefined) ??
|
||||
'';
|
||||
if (serverTime >= localDeletedAtTime) {
|
||||
await table.update(recordId, {
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
[FIELD_TIMESTAMPS_KEY]: {
|
||||
...localFT,
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
},
|
||||
});
|
||||
if (change.deletedAt || change.op === 'delete') {
|
||||
const existing = await table.get(recordId);
|
||||
if (!existing) continue;
|
||||
if (change.deletedAt) {
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const serverTime = change.deletedAt;
|
||||
const localDeletedAtTime =
|
||||
localFT.deletedAt ??
|
||||
((existing as Record<string, unknown>).deletedAt as string | undefined) ??
|
||||
'';
|
||||
if (serverTime >= localDeletedAtTime) {
|
||||
await table.update(recordId, {
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
[FIELD_TIMESTAMPS_KEY]: {
|
||||
...localFT,
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
},
|
||||
});
|
||||
}
|
||||
} else {
|
||||
await table.delete(recordId);
|
||||
}
|
||||
} else if (change.op === 'insert') {
|
||||
// Upsert. `change.data` is the canonical payload; fall back to
|
||||
// the change envelope only for older flattened formats.
|
||||
const existing = await table.get(recordId);
|
||||
const changeData = change.data ?? (change as unknown as Record<string, unknown>);
|
||||
const recordTime =
|
||||
(changeData.updatedAt as string | undefined) ??
|
||||
(changeData.createdAt as string | undefined) ??
|
||||
new Date().toISOString();
|
||||
|
||||
if (!existing) {
|
||||
const ft: Record<string, string> = {};
|
||||
for (const key of Object.keys(changeData)) {
|
||||
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
|
||||
ft[key] = recordTime;
|
||||
}
|
||||
await table.put({
|
||||
...changeData,
|
||||
id: recordId,
|
||||
[FIELD_TIMESTAMPS_KEY]: ft,
|
||||
});
|
||||
} else {
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localUpdatedAt =
|
||||
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
|
||||
for (const [key, val] of Object.entries(changeData)) {
|
||||
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
if (recordTime >= localFieldTime) {
|
||||
updates[key] = val;
|
||||
newFT[key] = recordTime;
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
} else if (change.op === 'update' && change.fields) {
|
||||
// Field-level LWW update — the canonical conflict-resolution path.
|
||||
const existing = await table.get(recordId);
|
||||
const serverFields = change.fields;
|
||||
|
||||
if (!existing) {
|
||||
// Reconstruct from fields. Other clients only see this if the
|
||||
// record was deleted locally — recreate it under the server's
|
||||
// authority.
|
||||
const record: Record<string, unknown> = { id: recordId };
|
||||
const ft: Record<string, string> = {};
|
||||
const fallback = new Date().toISOString();
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
record[key] = fc.value;
|
||||
ft[key] = fc.updatedAt ?? fallback;
|
||||
}
|
||||
record[FIELD_TIMESTAMPS_KEY] = ft;
|
||||
await table.put(record);
|
||||
} else {
|
||||
// Per-field comparison. Falls back to record-level updatedAt
|
||||
// only for legacy records that pre-date __fieldTimestamps.
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localUpdatedAt =
|
||||
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
const serverTime = fc.updatedAt ?? '';
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
if (serverTime >= localFieldTime) {
|
||||
updates[key] = fc.value;
|
||||
newFT[key] = serverTime;
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
await table.delete(recordId);
|
||||
}
|
||||
} else if (change.op === 'insert') {
|
||||
// Upsert. `change.data` is the canonical payload; fall back to
|
||||
// the change envelope only for older flattened formats.
|
||||
const existing = await table.get(recordId);
|
||||
const changeData = change.data ?? (change as unknown as Record<string, unknown>);
|
||||
const recordTime =
|
||||
(changeData.updatedAt as string | undefined) ??
|
||||
(changeData.createdAt as string | undefined) ??
|
||||
new Date().toISOString();
|
||||
|
||||
if (!existing) {
|
||||
const ft: Record<string, string> = {};
|
||||
for (const key of Object.keys(changeData)) {
|
||||
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
|
||||
ft[key] = recordTime;
|
||||
}
|
||||
await table.put({
|
||||
...changeData,
|
||||
id: recordId,
|
||||
[FIELD_TIMESTAMPS_KEY]: ft,
|
||||
});
|
||||
break; // transaction succeeded
|
||||
} catch (err) {
|
||||
if (!isQuotaError(err) || attempts >= 1) {
|
||||
if (isQuotaError(err)) {
|
||||
notifyQuotaExceeded({
|
||||
table: tableName,
|
||||
op: 'apply',
|
||||
cleaned: 0,
|
||||
recovered,
|
||||
});
|
||||
} else {
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localUpdatedAt =
|
||||
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
|
||||
for (const [key, val] of Object.entries(changeData)) {
|
||||
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
if (recordTime >= localFieldTime) {
|
||||
updates[key] = val;
|
||||
newFT[key] = recordTime;
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
} else if (change.op === 'update' && change.fields) {
|
||||
// Field-level LWW update — the canonical conflict-resolution path.
|
||||
const existing = await table.get(recordId);
|
||||
const serverFields = change.fields;
|
||||
|
||||
if (!existing) {
|
||||
// Reconstruct from fields. Other clients only see this if the
|
||||
// record was deleted locally — recreate it under the server's
|
||||
// authority.
|
||||
const record: Record<string, unknown> = { id: recordId };
|
||||
const ft: Record<string, string> = {};
|
||||
const fallback = new Date().toISOString();
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
record[key] = fc.value;
|
||||
ft[key] = fc.updatedAt ?? fallback;
|
||||
}
|
||||
record[FIELD_TIMESTAMPS_KEY] = ft;
|
||||
await table.put(record);
|
||||
} else {
|
||||
// Per-field comparison. Falls back to record-level updatedAt
|
||||
// only for legacy records that pre-date __fieldTimestamps.
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localUpdatedAt =
|
||||
((existing as Record<string, unknown>).updatedAt as string | undefined) ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
const serverTime = fc.updatedAt ?? '';
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
if (serverTime >= localFieldTime) {
|
||||
updates[key] = fc.value;
|
||||
newFT[key] = serverTime;
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
attempts++;
|
||||
const cleaned = await cleanupTombstones();
|
||||
recovered = cleaned > 0;
|
||||
if (cleaned === 0) {
|
||||
notifyQuotaExceeded({ table: tableName, op: 'apply', cleaned: 0, recovered: false });
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
emitSyncTelemetry({ kind: 'apply:done', appId, count: validChanges.length });
|
||||
} finally {
|
||||
setApplyingServerChanges(false);
|
||||
releaseApplyLock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -452,6 +489,8 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
if (pending.length === 0) return;
|
||||
|
||||
setStatus('syncing');
|
||||
const startedAt = Date.now();
|
||||
emitSyncTelemetry({ kind: 'push:start', appId, count: pending.length });
|
||||
|
||||
try {
|
||||
// Get oldest sync cursor for the `since` field
|
||||
|
|
@ -496,9 +535,21 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
|
||||
channel.lastError = null;
|
||||
setStatus('idle');
|
||||
emitSyncTelemetry({
|
||||
kind: 'push:ok',
|
||||
appId,
|
||||
count: pending.length,
|
||||
durationMs: Date.now() - startedAt,
|
||||
});
|
||||
} catch (err) {
|
||||
channel.lastError = err instanceof Error ? err.message : 'Push failed';
|
||||
setStatus('error');
|
||||
emitSyncTelemetry({
|
||||
kind: 'push:error',
|
||||
appId,
|
||||
durationMs: Date.now() - startedAt,
|
||||
errorCategory: categorizeSyncError(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -512,6 +563,9 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
if (!token) return;
|
||||
|
||||
setStatus('syncing');
|
||||
const startedAt = Date.now();
|
||||
emitSyncTelemetry({ kind: 'pull:start', appId });
|
||||
let totalApplied = 0;
|
||||
|
||||
try {
|
||||
for (const tableName of channel.tables) {
|
||||
|
|
@ -538,6 +592,7 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
hasMore = data.hasMore ?? false;
|
||||
|
||||
if (data.serverChanges && data.serverChanges.length > 0) {
|
||||
totalApplied += data.serverChanges.length;
|
||||
await applyServerChanges(appId, data.serverChanges);
|
||||
}
|
||||
|
||||
|
|
@ -554,9 +609,21 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
|
||||
channel.lastError = null;
|
||||
setStatus('idle');
|
||||
emitSyncTelemetry({
|
||||
kind: 'pull:ok',
|
||||
appId,
|
||||
count: totalApplied,
|
||||
durationMs: Date.now() - startedAt,
|
||||
});
|
||||
} catch (err) {
|
||||
channel.lastError = err instanceof Error ? err.message : 'Pull failed';
|
||||
setStatus('error');
|
||||
emitSyncTelemetry({
|
||||
kind: 'pull:error',
|
||||
appId,
|
||||
durationMs: Date.now() - startedAt,
|
||||
errorCategory: categorizeSyncError(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue