mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 18:41:08 +02:00
fix(mana/web): sprint 1 data integrity (LWW, retry, atomic cascades)
- Per-field LWW: Dexie hooks pflegen __fieldTimestamps; applyServerChanges vergleicht jetzt feldweise statt Record-Level updatedAt. Verhindert stillen Datenverlust bei parallelen Edits unterschiedlicher Felder. - Sync-Retry: fetchWithRetry mit exponentiellem Backoff + Jitter (max 3 Versuche, retried nur 5xx/429/Netzwerk, 4xx/Abort sofort durchgereicht). - Atomare Cascade-Soft-Deletes via db.transaction in cards, chat, presi, music – verhindert Orphan-Children bei Crash mitten im Cascade-Loop. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b900df5ee0
commit
0909538827
6 changed files with 228 additions and 61 deletions
|
|
@ -558,6 +558,16 @@ export function setApplyingServerChanges(v: boolean): void {
|
|||
|
||||
const pendingChangesTable = db.table('_pendingChanges');
|
||||
|
||||
/**
|
||||
* Hidden field on every synced record holding per-field LWW timestamps.
|
||||
* Not indexed, not sent to the server in pending-change payloads.
|
||||
*/
|
||||
export const FIELD_TIMESTAMPS_KEY = '__fieldTimestamps';
|
||||
|
||||
function isInternalKey(key: string): boolean {
|
||||
return key === 'id' || key === FIELD_TIMESTAMPS_KEY;
|
||||
}
|
||||
|
||||
for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
||||
for (const tableName of tables) {
|
||||
const table = db.table(tableName);
|
||||
|
|
@ -565,18 +575,31 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
table.hook('creating', function (_primKey, obj) {
|
||||
if (_applyingServerChanges) return;
|
||||
const now = new Date().toISOString();
|
||||
|
||||
// Stamp every real field with the create-time so future LWW comparisons
|
||||
// have a baseline. Mutates obj in place — Dexie persists the mutation.
|
||||
const ft: Record<string, string> = {};
|
||||
for (const key of Object.keys(obj)) {
|
||||
if (isInternalKey(key)) continue;
|
||||
ft[key] = now;
|
||||
}
|
||||
(obj as Record<string, unknown>)[FIELD_TIMESTAMPS_KEY] = ft;
|
||||
|
||||
// Build payload for pending-change WITHOUT the internal timestamp map
|
||||
const { [FIELD_TIMESTAMPS_KEY]: _omit, ...dataForSync } = obj as Record<string, unknown>;
|
||||
|
||||
pendingChangesTable.add({
|
||||
appId,
|
||||
collection: tableName,
|
||||
recordId: obj.id,
|
||||
op: 'insert',
|
||||
data: { ...obj },
|
||||
data: dataForSync,
|
||||
createdAt: now,
|
||||
});
|
||||
trackFirstContent(appId);
|
||||
fireTrigger(appId, tableName, 'insert', { ...obj });
|
||||
fireTrigger(appId, tableName, 'insert', { ...dataForSync });
|
||||
// Defer cross-table reads outside the Dexie hook's transaction scope
|
||||
const objCopy = { ...obj };
|
||||
const objCopy = { ...dataForSync };
|
||||
setTimeout(() => {
|
||||
checkInlineSuggestion(appId, tableName, objCopy).then((sug) => {
|
||||
if (sug)
|
||||
|
|
@ -585,14 +608,24 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
}, 0);
|
||||
});
|
||||
|
||||
table.hook('updating', function (modifications, primKey) {
|
||||
if (_applyingServerChanges) return;
|
||||
table.hook('updating', function (modifications, primKey, obj) {
|
||||
if (_applyingServerChanges) return undefined;
|
||||
const now = new Date().toISOString();
|
||||
const fields: Record<string, { value: unknown; updatedAt: string }> = {};
|
||||
|
||||
// Merge field timestamps: keep existing, overwrite for each modified field
|
||||
const existingFT =
|
||||
((obj as Record<string, unknown>)[FIELD_TIMESTAMPS_KEY] as
|
||||
| Record<string, string>
|
||||
| undefined) ?? {};
|
||||
const newFT: Record<string, string> = { ...existingFT };
|
||||
|
||||
for (const [key, value] of Object.entries(modifications)) {
|
||||
if (key === 'id') continue;
|
||||
if (isInternalKey(key)) continue;
|
||||
fields[key] = { value, updatedAt: now };
|
||||
newFT[key] = now;
|
||||
}
|
||||
|
||||
pendingChangesTable.add({
|
||||
appId,
|
||||
collection: tableName,
|
||||
|
|
@ -604,6 +637,11 @@ for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
|||
});
|
||||
const op = (modifications as Record<string, unknown>).deletedAt ? 'delete' : 'update';
|
||||
fireTrigger(appId, tableName, op, modifications as Record<string, unknown>);
|
||||
|
||||
// Returning an object from a Dexie 'updating' hook merges it into the
|
||||
// modifications applied to the record — use this to persist the new
|
||||
// per-field timestamps alongside the user's update.
|
||||
return { [FIELD_TIMESTAMPS_KEY]: newFT };
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,14 @@
|
|||
* WS: GET /ws/{appId} — auth: { type: "auth", token: "..." }
|
||||
*/
|
||||
|
||||
import { db, SYNC_APP_MAP, toSyncName, fromSyncName, setApplyingServerChanges } from './database';
|
||||
import {
|
||||
db,
|
||||
SYNC_APP_MAP,
|
||||
toSyncName,
|
||||
fromSyncName,
|
||||
setApplyingServerChanges,
|
||||
FIELD_TIMESTAMPS_KEY,
|
||||
} from './database';
|
||||
|
||||
// ─── Types ────────────────────────────────────────────────────
|
||||
|
||||
|
|
@ -53,6 +60,52 @@ const PUSH_DEBOUNCE = 1000;
|
|||
const PULL_INTERVAL = 30_000;
|
||||
const WS_RECONNECT_DELAY = 5000;
|
||||
|
||||
// Retry config for transient sync failures (network drops, 5xx).
|
||||
// 4xx (auth, validation) is treated as permanent and not retried.
|
||||
const RETRY_MAX_ATTEMPTS = 3;
|
||||
const RETRY_BASE_DELAY_MS = 500;
|
||||
const RETRY_MAX_DELAY_MS = 8_000;
|
||||
|
||||
function isRetriableStatus(status: number): boolean {
|
||||
return status === 0 || status === 408 || status === 429 || status >= 500;
|
||||
}
|
||||
|
||||
function backoffDelay(attempt: number): number {
|
||||
const exp = Math.min(RETRY_MAX_DELAY_MS, RETRY_BASE_DELAY_MS * 2 ** attempt);
|
||||
// Full jitter to avoid thundering herd when many clients reconnect together.
|
||||
return Math.floor(Math.random() * exp);
|
||||
}
|
||||
|
||||
const sleep = (ms: number) => new Promise<void>((r) => setTimeout(r, ms));
|
||||
|
||||
/**
|
||||
* Wraps a fetch call with exponential backoff. Re-throws after the final
|
||||
* attempt or immediately for non-retriable HTTP errors.
|
||||
*/
|
||||
async function fetchWithRetry(
|
||||
input: RequestInfo | URL,
|
||||
init: RequestInit,
|
||||
label: string
|
||||
): Promise<Response> {
|
||||
let lastError: unknown = null;
|
||||
for (let attempt = 0; attempt < RETRY_MAX_ATTEMPTS; attempt++) {
|
||||
try {
|
||||
const res = await fetch(input, init);
|
||||
if (res.ok) return res;
|
||||
if (!isRetriableStatus(res.status)) return res; // permanent — let caller handle
|
||||
lastError = new Error(`${label} failed: HTTP ${res.status}`);
|
||||
} catch (err) {
|
||||
// AbortError must propagate immediately (caller-initiated cancel).
|
||||
if (err instanceof Error && err.name === 'AbortError') throw err;
|
||||
lastError = err;
|
||||
}
|
||||
if (attempt < RETRY_MAX_ATTEMPTS - 1) {
|
||||
await sleep(backoffDelay(attempt));
|
||||
}
|
||||
}
|
||||
throw lastError instanceof Error ? lastError : new Error(`${label} failed`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Eager apps are synced at startup (needed for dashboard widgets).
|
||||
* Lazy apps are synced on first module visit via ensureAppSynced().
|
||||
|
|
@ -163,15 +216,19 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
// Build changeset in backend protocol format
|
||||
const changeset = buildChangeset(pending, clientId, oldestCursor);
|
||||
|
||||
const res = await fetch(`${serverUrl}/sync/${appId}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${token}`,
|
||||
'X-Client-Id': clientId,
|
||||
const res = await fetchWithRetry(
|
||||
`${serverUrl}/sync/${appId}`,
|
||||
{
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${token}`,
|
||||
'X-Client-Id': clientId,
|
||||
},
|
||||
body: JSON.stringify(changeset),
|
||||
},
|
||||
body: JSON.stringify(changeset),
|
||||
});
|
||||
`push[${appId}]`
|
||||
);
|
||||
|
||||
if (!res.ok) throw new Error(`Push failed: ${res.status}`);
|
||||
|
||||
|
|
@ -220,14 +277,15 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
|
||||
// Paginated pull: continue fetching until server signals no more data
|
||||
while (hasMore) {
|
||||
const res = await fetch(
|
||||
const res = await fetchWithRetry(
|
||||
`${serverUrl}/sync/${appId}/pull?collection=${encodeURIComponent(syncName)}&since=${encodeURIComponent(cursor)}`,
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
'X-Client-Id': clientId,
|
||||
},
|
||||
}
|
||||
},
|
||||
`pull[${appId}/${syncName}]`
|
||||
);
|
||||
|
||||
if (!res.ok) break;
|
||||
|
|
@ -360,6 +418,12 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
|
||||
// ─── Apply Server Changes ───────────────────────────────
|
||||
|
||||
function readFieldTimestamps(record: unknown): Record<string, string> {
|
||||
if (!record || typeof record !== 'object') return {};
|
||||
const ft = (record as Record<string, unknown>)[FIELD_TIMESTAMPS_KEY];
|
||||
return ft && typeof ft === 'object' ? (ft as Record<string, string>) : {};
|
||||
}
|
||||
|
||||
async function applyServerChanges(appId: string, changes: any[]): Promise<void> {
|
||||
setApplyingServerChanges(true);
|
||||
try {
|
||||
|
|
@ -381,14 +445,26 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
const recordId = change.id;
|
||||
|
||||
if (change.deletedAt || change.op === 'delete') {
|
||||
// Soft delete or hard delete
|
||||
// Soft delete (deletedAt) or hard delete
|
||||
const existing = await table.get(recordId);
|
||||
if (existing) {
|
||||
if (change.deletedAt) {
|
||||
await table.update(recordId, {
|
||||
deletedAt: change.deletedAt,
|
||||
updatedAt: change.deletedAt,
|
||||
});
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const serverTime = change.deletedAt as string;
|
||||
// LWW guard: only apply if newer than local deletedAt timestamp
|
||||
const localDeletedAtTime = localFT.deletedAt ?? (existing as any).deletedAt ?? '';
|
||||
if (serverTime >= localDeletedAtTime) {
|
||||
const newFT = {
|
||||
...localFT,
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
};
|
||||
await table.update(recordId, {
|
||||
deletedAt: serverTime,
|
||||
updatedAt: serverTime,
|
||||
[FIELD_TIMESTAMPS_KEY]: newFT,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
await table.delete(recordId);
|
||||
}
|
||||
|
|
@ -396,41 +472,82 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
} else if (change.op === 'insert') {
|
||||
// Upsert for inserts
|
||||
const existing = await table.get(recordId);
|
||||
const changeData = (change.data ?? change) as Record<string, unknown>;
|
||||
const recordTime =
|
||||
(changeData.updatedAt as string | undefined) ??
|
||||
(changeData.createdAt as string | undefined) ??
|
||||
new Date().toISOString();
|
||||
|
||||
if (!existing) {
|
||||
await table.put(change.data ?? { id: recordId, ...change });
|
||||
// Stamp every field at the record's timestamp
|
||||
const ft: Record<string, string> = {};
|
||||
for (const key of Object.keys(changeData)) {
|
||||
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
|
||||
ft[key] = recordTime;
|
||||
}
|
||||
await table.put({
|
||||
...changeData,
|
||||
id: recordId,
|
||||
[FIELD_TIMESTAMPS_KEY]: ft,
|
||||
});
|
||||
} else {
|
||||
// Record exists — merge with LWW
|
||||
// Existing record — merge with field-level LWW using recordTime as
|
||||
// the timestamp for every incoming field.
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localUpdatedAt = (existing as any).updatedAt ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
const changeData = change.data ?? change;
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
|
||||
for (const [key, val] of Object.entries(changeData)) {
|
||||
if (key === 'id') continue;
|
||||
updates[key] = val;
|
||||
if (key === 'id' || key === FIELD_TIMESTAMPS_KEY) continue;
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
if (recordTime >= localFieldTime) {
|
||||
updates[key] = val;
|
||||
newFT[key] = recordTime;
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
} else if (change.op === 'update' && change.fields) {
|
||||
// Field-level LWW update
|
||||
const existing = await table.get(recordId);
|
||||
const serverFields = change.fields as Record<
|
||||
string,
|
||||
{ value: unknown; updatedAt?: string }
|
||||
>;
|
||||
|
||||
if (!existing) {
|
||||
// Record doesn't exist locally — reconstruct from fields
|
||||
const record: Record<string, unknown> = { id: recordId };
|
||||
for (const [key, fc] of Object.entries(change.fields as Record<string, any>)) {
|
||||
const ft: Record<string, string> = {};
|
||||
const fallback = new Date().toISOString();
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
record[key] = fc.value;
|
||||
ft[key] = fc.updatedAt ?? fallback;
|
||||
}
|
||||
record[FIELD_TIMESTAMPS_KEY] = ft;
|
||||
await table.put(record);
|
||||
} else {
|
||||
// Merge — only update fields that are newer
|
||||
// Merge — compare per-field timestamps. Falls back to record-level
|
||||
// updatedAt for legacy records that pre-date __fieldTimestamps.
|
||||
const localFT = readFieldTimestamps(existing);
|
||||
const localUpdatedAt = (existing as any).updatedAt ?? '';
|
||||
const updates: Record<string, unknown> = {};
|
||||
for (const [key, fc] of Object.entries(change.fields as Record<string, any>)) {
|
||||
const newFT: Record<string, string> = { ...localFT };
|
||||
|
||||
for (const [key, fc] of Object.entries(serverFields)) {
|
||||
const serverTime = fc.updatedAt ?? '';
|
||||
const localTime = (existing as any).updatedAt ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
const localFieldTime = localFT[key] ?? localUpdatedAt;
|
||||
if (serverTime >= localFieldTime) {
|
||||
updates[key] = fc.value;
|
||||
newFT[key] = serverTime;
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
updates[FIELD_TIMESTAMPS_KEY] = newFT;
|
||||
await table.update(recordId, updates);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
|
||||
import { CardsEvents } from '@mana/shared-utils/analytics';
|
||||
import { db } from '$lib/data/database';
|
||||
import { cardDeckTable, cardTable } from '../collections';
|
||||
import { toDeck } from '../queries';
|
||||
import type { LocalDeck } from '../types';
|
||||
|
|
@ -63,14 +64,16 @@ export const deckStore = {
|
|||
try {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
// Soft-delete all cards belonging to this deck
|
||||
const cards = await cardTable.where('deckId').equals(id).toArray();
|
||||
for (const card of cards) {
|
||||
await cardTable.update(card.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
|
||||
// Soft-delete the deck
|
||||
await cardDeckTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
// Atomic cascade: deck + all child cards are soft-deleted in one
|
||||
// Dexie transaction. If any write fails, the whole operation aborts —
|
||||
// no orphaned cards left pointing at a deleted deck.
|
||||
await db.transaction('rw', cardDeckTable, cardTable, async () => {
|
||||
const cards = await cardTable.where('deckId').equals(id).toArray();
|
||||
for (const card of cards) {
|
||||
await cardTable.update(card.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
await cardDeckTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
});
|
||||
CardsEvents.deckDeleted();
|
||||
} catch (err: any) {
|
||||
error = err.message || 'Failed to delete deck';
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
* This store only handles writes to IndexedDB via the unified database.
|
||||
*/
|
||||
|
||||
import { db } from '$lib/data/database';
|
||||
import { conversationTable, messageTable } from '../collections';
|
||||
import { toConversation } from '../queries';
|
||||
import { createArchiveOps } from '@mana/shared-stores';
|
||||
|
|
@ -78,15 +79,18 @@ export const conversationsStore = {
|
|||
});
|
||||
},
|
||||
|
||||
/** Soft-delete a conversation and its messages. */
|
||||
/** Soft-delete a conversation and its messages atomically. */
|
||||
async delete(id: string) {
|
||||
const now = new Date().toISOString();
|
||||
await conversationTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
// Cascade soft-delete to messages
|
||||
const msgs = await messageTable.where('conversationId').equals(id).toArray();
|
||||
for (const msg of msgs) {
|
||||
await messageTable.update(msg.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
// Atomic cascade: conversation + all messages in one Dexie transaction.
|
||||
// Aborts as a unit on failure to avoid orphaned messages.
|
||||
await db.transaction('rw', conversationTable, messageTable, async () => {
|
||||
await conversationTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
const msgs = await messageTable.where('conversationId').equals(id).toArray();
|
||||
for (const msg of msgs) {
|
||||
await messageTable.update(msg.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
});
|
||||
ChatEvents.conversationDeleted();
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
* Handles playlist CRUD and song associations.
|
||||
*/
|
||||
|
||||
import { db } from '$lib/data/database';
|
||||
import { musicPlaylistTable, playlistSongTable } from '../collections';
|
||||
import { toPlaylist } from '../queries';
|
||||
import { MusicEvents } from '@mana/shared-utils/analytics';
|
||||
|
|
@ -32,15 +33,17 @@ export const playlistsStore = {
|
|||
});
|
||||
},
|
||||
|
||||
/** Soft-delete a playlist and its song associations. */
|
||||
/** Soft-delete a playlist and its song associations atomically. */
|
||||
async delete(id: string) {
|
||||
const now = new Date().toISOString();
|
||||
await musicPlaylistTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
// Soft-delete associated playlistSongs
|
||||
const allPS = await playlistSongTable.where('playlistId').equals(id).toArray();
|
||||
for (const ps of allPS) {
|
||||
await playlistSongTable.update(ps.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
// Atomic cascade: playlist + playlistSongs in one Dexie transaction.
|
||||
await db.transaction('rw', musicPlaylistTable, playlistSongTable, async () => {
|
||||
await musicPlaylistTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
const allPS = await playlistSongTable.where('playlistId').equals(id).toArray();
|
||||
for (const ps of allPS) {
|
||||
await playlistSongTable.update(ps.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
});
|
||||
MusicEvents.playlistDeleted();
|
||||
},
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
* This store only handles writes to IndexedDB via the unified database.
|
||||
*/
|
||||
|
||||
import { db } from '$lib/data/database';
|
||||
import { presiDeckTable, slideTable } from '../collections';
|
||||
import { toDeck, toSlide } from '../queries';
|
||||
import { PresiEvents } from '@mana/shared-utils/analytics';
|
||||
|
|
@ -70,13 +71,14 @@ function createDecksStore() {
|
|||
error = null;
|
||||
try {
|
||||
const now = new Date().toISOString();
|
||||
// Soft-delete all slides belonging to this deck
|
||||
const slides = await slideTable.where('deckId').equals(id).toArray();
|
||||
for (const slide of slides) {
|
||||
await slideTable.update(slide.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
// Soft-delete the deck
|
||||
await presiDeckTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
// Atomic cascade: deck + all slides in one Dexie transaction.
|
||||
await db.transaction('rw', presiDeckTable, slideTable, async () => {
|
||||
const slides = await slideTable.where('deckId').equals(id).toArray();
|
||||
for (const slide of slides) {
|
||||
await slideTable.update(slide.id, { deletedAt: now, updatedAt: now });
|
||||
}
|
||||
await presiDeckTable.update(id, { deletedAt: now, updatedAt: now });
|
||||
});
|
||||
PresiEvents.deckDeleted();
|
||||
return true;
|
||||
} catch (e) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue