mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-14 22:21:10 +02:00
feat(manacore): Phase 7 — unified sync manager for multi-app DB
Create sync infrastructure for the single-DB multi-app architecture:
sync.ts — Unified Sync Manager:
- One sync channel per appId (todo, calendar, contacts, etc.)
- Push: collects _pendingChanges tagged with appId → POST /sync/{appId}/push
- Pull: fetches server delta per collection → applies field-level LWW
- WebSocket: connects to /sync/{appId}/ws for real-time push notifications
- Online/offline handling with automatic reconnect
- Uses SYNC_APP_MAP from database.ts for table→appId routing
change-tracker.ts — Write tracking helper:
- trackChange(): records writes to _pendingChanges with auto-appId lookup
- trackFieldUpdate(): field-level LWW tracking for partial updates
- trackDelete(): soft-delete tracking
The _pendingChanges table now includes appId as an indexed field,
and _syncMeta uses compound key [appId+collection] for per-app cursors.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
59e1e8e833
commit
a3a472ce39
2 changed files with 461 additions and 0 deletions
80
apps/manacore/apps/web/src/lib/data/change-tracker.ts
Normal file
80
apps/manacore/apps/web/src/lib/data/change-tracker.ts
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Change Tracker — records local writes to _pendingChanges with appId routing.
|
||||
*
|
||||
* Usage in mutation stores:
|
||||
* import { trackChange } from '$lib/data/change-tracker';
|
||||
* await taskTable.put(task);
|
||||
* await trackChange('tasks', task.id, 'insert', task);
|
||||
*/
|
||||
|
||||
import { db, TABLE_TO_APP } from './database';
|
||||
|
||||
interface PendingChange {
|
||||
appId: string;
|
||||
collection: string;
|
||||
recordId: string;
|
||||
op: 'insert' | 'update' | 'delete';
|
||||
fields?: Record<string, { value: unknown; updatedAt: string }>;
|
||||
data?: Record<string, unknown>;
|
||||
deletedAt?: string;
|
||||
createdAt: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a local change to _pendingChanges for later sync.
|
||||
*/
|
||||
export async function trackChange(
|
||||
collection: string,
|
||||
recordId: string,
|
||||
op: 'insert' | 'update' | 'delete',
|
||||
data?: Record<string, unknown>,
|
||||
fields?: Record<string, { value: unknown; updatedAt: string }>
|
||||
): Promise<void> {
|
||||
const appId = TABLE_TO_APP[collection];
|
||||
if (!appId) {
|
||||
console.warn(`[ChangeTracker] No appId mapping for collection "${collection}"`);
|
||||
return;
|
||||
}
|
||||
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const change: PendingChange = {
|
||||
appId,
|
||||
collection,
|
||||
recordId,
|
||||
op,
|
||||
createdAt: now,
|
||||
};
|
||||
|
||||
if (fields) change.fields = fields;
|
||||
if (data) change.data = data;
|
||||
if (op === 'delete') change.deletedAt = now;
|
||||
|
||||
await db.table('_pendingChanges').add(change);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a field-level update change (LWW).
|
||||
* Only the changed fields are tracked, not the entire record.
|
||||
*/
|
||||
export async function trackFieldUpdate(
|
||||
collection: string,
|
||||
recordId: string,
|
||||
updatedFields: Record<string, unknown>
|
||||
): Promise<void> {
|
||||
const now = new Date().toISOString();
|
||||
const fields: Record<string, { value: unknown; updatedAt: string }> = {};
|
||||
|
||||
for (const [key, value] of Object.entries(updatedFields)) {
|
||||
fields[key] = { value, updatedAt: now };
|
||||
}
|
||||
|
||||
await trackChange(collection, recordId, 'update', undefined, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a soft-delete change.
|
||||
*/
|
||||
export async function trackDelete(collection: string, recordId: string): Promise<void> {
|
||||
await trackChange(collection, recordId, 'delete');
|
||||
}
|
||||
381
apps/manacore/apps/web/src/lib/data/sync.ts
Normal file
381
apps/manacore/apps/web/src/lib/data/sync.ts
Normal file
|
|
@ -0,0 +1,381 @@
|
|||
/**
|
||||
* Unified Sync Manager — orchestrates sync across all apps in one DB.
|
||||
*
|
||||
* Each appId gets its own sync "channel" to the mana-sync server,
|
||||
* but all share one Dexie database and one _pendingChanges table.
|
||||
*
|
||||
* Architecture:
|
||||
* Unified DB → PendingChange (tagged with appId) → SyncChannel per appId → mana-sync /sync/{appId}
|
||||
* mana-sync /sync/{appId} → WebSocket push → SyncChannel → applies to Unified DB
|
||||
*/
|
||||
|
||||
import { db, SYNC_APP_MAP, TABLE_TO_APP } from './database';
|
||||
import type Dexie from 'dexie';
|
||||
|
||||
// ─── Types ────────────────────────────────────────────────────
|
||||
|
||||
interface PendingChange {
|
||||
id?: number;
|
||||
appId: string;
|
||||
collection: string;
|
||||
recordId: string;
|
||||
op: 'insert' | 'update' | 'delete';
|
||||
fields?: Record<string, { value: unknown; updatedAt: string }>;
|
||||
data?: Record<string, unknown>;
|
||||
deletedAt?: string;
|
||||
createdAt: string;
|
||||
}
|
||||
|
||||
interface SyncMeta {
|
||||
appId: string;
|
||||
collection: string;
|
||||
lastSyncedAt: string;
|
||||
pendingCount: number;
|
||||
}
|
||||
|
||||
interface SyncChannelState {
|
||||
appId: string;
|
||||
tables: string[];
|
||||
ws: WebSocket | null;
|
||||
pushTimer: ReturnType<typeof setTimeout> | null;
|
||||
pullTimer: ReturnType<typeof setInterval> | null;
|
||||
lastError: string | null;
|
||||
}
|
||||
|
||||
type SyncStatus = 'idle' | 'syncing' | 'error' | 'offline';
|
||||
|
||||
// ─── Config ───────────────────────────────────────────────────
|
||||
|
||||
const PUSH_DEBOUNCE = 1000;
|
||||
const PULL_INTERVAL = 30_000;
|
||||
const WS_RECONNECT_DELAY = 5000;
|
||||
|
||||
// ─── Unified Sync Manager ─────────────────────────────────────
|
||||
|
||||
export function createUnifiedSync(serverUrl: string, getToken: () => Promise<string | null>) {
|
||||
const channels = new Map<string, SyncChannelState>();
|
||||
let clientId = getOrCreateClientId();
|
||||
let status: SyncStatus = 'idle';
|
||||
let online = typeof navigator !== 'undefined' ? navigator.onLine : true;
|
||||
|
||||
// ─── Lifecycle ──────────────────────────────────────────
|
||||
|
||||
function startAll(): void {
|
||||
for (const [appId, tables] of Object.entries(SYNC_APP_MAP)) {
|
||||
const channel: SyncChannelState = {
|
||||
appId,
|
||||
tables,
|
||||
ws: null,
|
||||
pushTimer: null,
|
||||
pullTimer: null,
|
||||
lastError: null,
|
||||
};
|
||||
channels.set(appId, channel);
|
||||
|
||||
// Initial pull, then start periodic sync
|
||||
pull(appId).catch(() => {});
|
||||
channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL);
|
||||
|
||||
// Connect WebSocket for real-time push notifications
|
||||
connectWs(appId);
|
||||
}
|
||||
|
||||
// Watch _pendingChanges for new writes
|
||||
db.table('_pendingChanges').hook('creating', (primKey, obj) => {
|
||||
// Auto-tag with appId based on collection
|
||||
if (!obj.appId && obj.collection) {
|
||||
obj.appId = TABLE_TO_APP[obj.collection] || 'manacore';
|
||||
}
|
||||
// Debounced push
|
||||
const appId = obj.appId;
|
||||
if (appId) schedulePush(appId);
|
||||
});
|
||||
|
||||
// Listen for online/offline
|
||||
if (typeof window !== 'undefined') {
|
||||
window.addEventListener('online', handleOnline);
|
||||
window.addEventListener('offline', handleOffline);
|
||||
}
|
||||
}
|
||||
|
||||
function stopAll(): void {
|
||||
for (const [appId, channel] of channels) {
|
||||
if (channel.pushTimer) clearTimeout(channel.pushTimer);
|
||||
if (channel.pullTimer) clearInterval(channel.pullTimer);
|
||||
if (channel.ws) {
|
||||
channel.ws.close();
|
||||
channel.ws = null;
|
||||
}
|
||||
}
|
||||
channels.clear();
|
||||
|
||||
if (typeof window !== 'undefined') {
|
||||
window.removeEventListener('online', handleOnline);
|
||||
window.removeEventListener('offline', handleOffline);
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Push: Local → Server ───────────────────────────────
|
||||
|
||||
function schedulePush(appId: string): void {
|
||||
const channel = channels.get(appId);
|
||||
if (!channel || !online) return;
|
||||
|
||||
if (channel.pushTimer) clearTimeout(channel.pushTimer);
|
||||
channel.pushTimer = setTimeout(() => push(appId).catch(() => {}), PUSH_DEBOUNCE);
|
||||
}
|
||||
|
||||
async function push(appId: string): Promise<void> {
|
||||
const channel = channels.get(appId);
|
||||
if (!channel) return;
|
||||
|
||||
const token = await getToken();
|
||||
if (!token) return;
|
||||
|
||||
// Get pending changes for this appId
|
||||
const pending: PendingChange[] = await db
|
||||
.table('_pendingChanges')
|
||||
.where('appId')
|
||||
.equals(appId)
|
||||
.sortBy('createdAt');
|
||||
|
||||
if (pending.length === 0) return;
|
||||
|
||||
status = 'syncing';
|
||||
|
||||
try {
|
||||
const changeset = buildChangeset(pending, clientId);
|
||||
const res = await fetch(`${serverUrl}/sync/${appId}/push`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${token}`,
|
||||
},
|
||||
body: JSON.stringify(changeset),
|
||||
});
|
||||
|
||||
if (!res.ok) throw new Error(`Push failed: ${res.status}`);
|
||||
|
||||
// Clear synced pending changes
|
||||
const ids = pending.map((p) => p.id).filter((id): id is number => id !== undefined);
|
||||
await db.table('_pendingChanges').bulkDelete(ids);
|
||||
|
||||
channel.lastError = null;
|
||||
status = 'idle';
|
||||
} catch (err) {
|
||||
channel.lastError = err instanceof Error ? err.message : 'Push failed';
|
||||
status = 'error';
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Pull: Server → Local ───────────────────────────────
|
||||
|
||||
async function pull(appId: string): Promise<void> {
|
||||
const channel = channels.get(appId);
|
||||
if (!channel || !online) return;
|
||||
|
||||
const token = await getToken();
|
||||
if (!token) return;
|
||||
|
||||
status = 'syncing';
|
||||
|
||||
try {
|
||||
for (const tableName of channel.tables) {
|
||||
const cursor = await getSyncCursor(appId, tableName);
|
||||
|
||||
const res = await fetch(
|
||||
`${serverUrl}/sync/${appId}/pull?collection=${tableName}&since=${encodeURIComponent(cursor)}&clientId=${clientId}`,
|
||||
{
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
}
|
||||
);
|
||||
|
||||
if (!res.ok) continue;
|
||||
|
||||
const data = await res.json();
|
||||
if (!data.changes || data.changes.length === 0) continue;
|
||||
|
||||
// Apply changes to local DB
|
||||
await applyServerChanges(tableName, data.changes);
|
||||
|
||||
// Update cursor
|
||||
if (data.syncedUntil) {
|
||||
await setSyncCursor(appId, tableName, data.syncedUntil);
|
||||
}
|
||||
}
|
||||
|
||||
channel.lastError = null;
|
||||
status = 'idle';
|
||||
} catch (err) {
|
||||
channel.lastError = err instanceof Error ? err.message : 'Pull failed';
|
||||
status = 'error';
|
||||
}
|
||||
}
|
||||
|
||||
// ─── WebSocket ──────────────────────────────────────────
|
||||
|
||||
function connectWs(appId: string): void {
|
||||
const channel = channels.get(appId);
|
||||
if (!channel || !online) return;
|
||||
|
||||
const wsUrl = serverUrl.replace(/^http/, 'ws') + `/sync/${appId}/ws?clientId=${clientId}`;
|
||||
|
||||
try {
|
||||
const ws = new WebSocket(wsUrl);
|
||||
|
||||
ws.onopen = () => {
|
||||
channel.ws = ws;
|
||||
};
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const msg = JSON.parse(event.data);
|
||||
if (msg.type === 'push') {
|
||||
// Server notifies us of new changes — trigger pull
|
||||
pull(appId).catch(() => {});
|
||||
}
|
||||
} catch {}
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
channel.ws = null;
|
||||
// Reconnect after delay
|
||||
if (channels.has(appId) && online) {
|
||||
setTimeout(() => connectWs(appId), WS_RECONNECT_DELAY);
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
ws.close();
|
||||
};
|
||||
} catch {
|
||||
// WebSocket not available or blocked
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Helpers ─────────────────────────────────────────────
|
||||
|
||||
async function getSyncCursor(appId: string, collection: string): Promise<string> {
|
||||
const meta: SyncMeta | undefined = await db.table('_syncMeta').get([appId, collection]);
|
||||
return meta?.lastSyncedAt ?? '1970-01-01T00:00:00.000Z';
|
||||
}
|
||||
|
||||
async function setSyncCursor(
|
||||
appId: string,
|
||||
collection: string,
|
||||
syncedUntil: string
|
||||
): Promise<void> {
|
||||
await db.table('_syncMeta').put({
|
||||
appId,
|
||||
collection,
|
||||
lastSyncedAt: syncedUntil,
|
||||
pendingCount: 0,
|
||||
});
|
||||
}
|
||||
|
||||
async function applyServerChanges(tableName: string, changes: any[]): Promise<void> {
|
||||
const table = db.table(tableName);
|
||||
|
||||
await db.transaction('rw', table, async () => {
|
||||
for (const change of changes) {
|
||||
if (change.deletedAt) {
|
||||
// Soft delete
|
||||
const existing = await table.get(change.id);
|
||||
if (existing) {
|
||||
await table.update(change.id, {
|
||||
deletedAt: change.deletedAt,
|
||||
updatedAt: change.updatedAt,
|
||||
});
|
||||
}
|
||||
} else if (change.op === 'delete') {
|
||||
await table.delete(change.id);
|
||||
} else {
|
||||
// Upsert — field-level LWW
|
||||
const existing = await table.get(change.id);
|
||||
if (!existing) {
|
||||
await table.put(change.data ?? change);
|
||||
} else {
|
||||
// Only update fields that are newer
|
||||
const updates: Record<string, unknown> = {};
|
||||
const changeData = change.data ?? change;
|
||||
for (const [key, val] of Object.entries(changeData)) {
|
||||
if (key === 'id') continue;
|
||||
const serverTime = change.fields?.[key]?.updatedAt ?? change.updatedAt;
|
||||
const localTime = (existing as any).updatedAt ?? '';
|
||||
if (serverTime >= localTime) {
|
||||
updates[key] = val;
|
||||
}
|
||||
}
|
||||
if (Object.keys(updates).length > 0) {
|
||||
await table.update(change.id, updates);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function buildChangeset(pending: PendingChange[], cid: string) {
|
||||
return {
|
||||
clientId: cid,
|
||||
changes: pending.map((p) => ({
|
||||
collection: p.collection,
|
||||
recordId: p.recordId,
|
||||
op: p.op,
|
||||
fields: p.fields,
|
||||
data: p.data,
|
||||
deletedAt: p.deletedAt,
|
||||
createdAt: p.createdAt,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
function handleOnline() {
|
||||
online = true;
|
||||
status = 'idle';
|
||||
// Resume sync for all channels
|
||||
for (const appId of channels.keys()) {
|
||||
pull(appId).catch(() => {});
|
||||
connectWs(appId);
|
||||
}
|
||||
}
|
||||
|
||||
function handleOffline() {
|
||||
online = false;
|
||||
status = 'offline';
|
||||
// Close all WebSockets
|
||||
for (const channel of channels.values()) {
|
||||
if (channel.ws) {
|
||||
channel.ws.close();
|
||||
channel.ws = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
startAll,
|
||||
stopAll,
|
||||
get status() {
|
||||
return status;
|
||||
},
|
||||
get online() {
|
||||
return online;
|
||||
},
|
||||
getChannel: (appId: string) => channels.get(appId),
|
||||
pushNow: push,
|
||||
pullNow: pull,
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Client ID ────────────────────────────────────────────────
|
||||
|
||||
function getOrCreateClientId(): string {
|
||||
const key = 'manacore-sync-client-id';
|
||||
if (typeof localStorage === 'undefined') return crypto.randomUUID();
|
||||
let id = localStorage.getItem(key);
|
||||
if (!id) {
|
||||
id = crypto.randomUUID();
|
||||
localStorage.setItem(key, id);
|
||||
}
|
||||
return id;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue