mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-15 00:01:10 +02:00
feat(mana-sync): unified WebSocket — one connection per user instead of 27
Add unified /ws endpoint that serves all app notifications over a single connection.
The server now includes appId in the sync-available message payload so the client
knows which app to pull. Legacy /ws/{appId} endpoint remains for backward compatibility.
Backend (Go):
- hub.go: Message struct gains AppId field, NotifyUser sends to all user clients
(unified clients receive everything, legacy clients filtered by appId)
- main.go: new GET /ws route (empty appId = unified mode)
Frontend (sync.ts):
- Single connectUnifiedWs() replaces 27 per-app connectWs() calls
- Parses msg.appId from server to pull only the affected app
- Reconnect/offline logic simplified to one WS
This reduces WebSocket connections from 27 per user to 1, cutting server
connection overhead by ~96%.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
df7395e57a
commit
ee831992de
4 changed files with 64 additions and 39 deletions
|
|
@ -40,7 +40,6 @@ interface SyncMeta {
|
|||
interface SyncChannelState {
|
||||
appId: string;
|
||||
tables: string[];
|
||||
ws: WebSocket | null;
|
||||
pushTimer: ReturnType<typeof setTimeout> | null;
|
||||
pullTimer: ReturnType<typeof setInterval> | null;
|
||||
lastError: string | null;
|
||||
|
|
@ -53,8 +52,6 @@ export type SyncStatus = 'idle' | 'syncing' | 'error' | 'offline';
|
|||
const PUSH_DEBOUNCE = 1000;
|
||||
const PULL_INTERVAL = 30_000;
|
||||
const WS_RECONNECT_DELAY = 5000;
|
||||
const WS_AUTH_TIMEOUT = 10_000;
|
||||
|
||||
// ─── Unified Sync Manager ─────────────────────────────────────
|
||||
|
||||
export function createUnifiedSync(serverUrl: string, getToken: () => Promise<string | null>) {
|
||||
|
|
@ -63,6 +60,7 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
let status: SyncStatus = 'idle';
|
||||
let online = typeof navigator !== 'undefined' ? navigator.onLine : true;
|
||||
let _statusListeners: Array<(s: SyncStatus) => void> = [];
|
||||
let unifiedWs: WebSocket | null = null;
|
||||
|
||||
// ─── Lifecycle ──────────────────────────────────────────
|
||||
|
||||
|
|
@ -71,7 +69,6 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
const channel: SyncChannelState = {
|
||||
appId,
|
||||
tables,
|
||||
ws: null,
|
||||
pushTimer: null,
|
||||
pullTimer: null,
|
||||
lastError: null,
|
||||
|
|
@ -81,11 +78,11 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
// Initial pull, then start periodic sync
|
||||
pull(appId).catch(() => {});
|
||||
channel.pullTimer = setInterval(() => pull(appId).catch(() => {}), PULL_INTERVAL);
|
||||
|
||||
// Connect WebSocket for real-time push notifications
|
||||
connectWs(appId);
|
||||
}
|
||||
|
||||
// Single unified WebSocket for all apps
|
||||
connectUnifiedWs();
|
||||
|
||||
// Listen for online/offline
|
||||
if (typeof window !== 'undefined') {
|
||||
window.addEventListener('online', handleOnline);
|
||||
|
|
@ -97,14 +94,15 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
for (const [, channel] of channels) {
|
||||
if (channel.pushTimer) clearTimeout(channel.pushTimer);
|
||||
if (channel.pullTimer) clearInterval(channel.pullTimer);
|
||||
if (channel.ws) {
|
||||
channel.ws.close();
|
||||
channel.ws = null;
|
||||
}
|
||||
}
|
||||
channels.clear();
|
||||
_statusListeners = [];
|
||||
|
||||
if (unifiedWs) {
|
||||
unifiedWs.close();
|
||||
unifiedWs = null;
|
||||
}
|
||||
|
||||
if (typeof window !== 'undefined') {
|
||||
window.removeEventListener('online', handleOnline);
|
||||
window.removeEventListener('offline', handleOffline);
|
||||
|
|
@ -237,19 +235,18 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
}
|
||||
}
|
||||
|
||||
// ─── WebSocket ──────────────────────────────────────────
|
||||
// ─── WebSocket (unified — one connection for all apps) ──
|
||||
|
||||
function connectWs(appId: string): void {
|
||||
const channel = channels.get(appId);
|
||||
if (!channel || !online) return;
|
||||
function connectUnifiedWs(): void {
|
||||
if (!online) return;
|
||||
|
||||
const wsUrl = serverUrl.replace(/^http/, 'ws') + `/ws/${appId}`;
|
||||
const wsUrl = serverUrl.replace(/^http/, 'ws') + '/ws';
|
||||
|
||||
try {
|
||||
const ws = new WebSocket(wsUrl);
|
||||
|
||||
ws.onopen = async () => {
|
||||
channel.ws = ws;
|
||||
unifiedWs = ws;
|
||||
// Authenticate — backend requires auth within 10 seconds
|
||||
const token = await getToken();
|
||||
if (token && ws.readyState === WebSocket.OPEN) {
|
||||
|
|
@ -260,9 +257,9 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const msg = JSON.parse(event.data);
|
||||
if (msg.type === 'sync-available') {
|
||||
// Server notifies us of new changes — trigger pull
|
||||
pull(appId).catch(() => {});
|
||||
if (msg.type === 'sync-available' && msg.appId) {
|
||||
// Server notifies us of changes for a specific app — pull only that app
|
||||
pull(msg.appId).catch(() => {});
|
||||
}
|
||||
} catch {
|
||||
// Ignore malformed messages
|
||||
|
|
@ -270,10 +267,10 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
channel.ws = null;
|
||||
unifiedWs = null;
|
||||
// Reconnect after delay
|
||||
if (channels.has(appId) && online) {
|
||||
setTimeout(() => connectWs(appId), WS_RECONNECT_DELAY);
|
||||
if (channels.size > 0 && online) {
|
||||
setTimeout(() => connectUnifiedWs(), WS_RECONNECT_DELAY);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -427,19 +424,20 @@ export function createUnifiedSync(serverUrl: string, getToken: () => Promise<str
|
|||
// Resume sync for all channels
|
||||
for (const appId of channels.keys()) {
|
||||
pull(appId).catch(() => {});
|
||||
connectWs(appId);
|
||||
}
|
||||
// Reconnect unified WebSocket
|
||||
if (!unifiedWs) {
|
||||
connectUnifiedWs();
|
||||
}
|
||||
}
|
||||
|
||||
function handleOffline() {
|
||||
online = false;
|
||||
setStatus('offline');
|
||||
// Close all WebSockets
|
||||
for (const channel of channels.values()) {
|
||||
if (channel.ws) {
|
||||
channel.ws.close();
|
||||
channel.ws = null;
|
||||
}
|
||||
// Close unified WebSocket
|
||||
if (unifiedWs) {
|
||||
unifiedWs.close();
|
||||
unifiedWs = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -82,22 +82,32 @@ Header: X-Client-Id: chrome-tab-abc123
|
|||
Header: Authorization: Bearer <jwt>
|
||||
```
|
||||
|
||||
### WebSocket (GET /ws/{appId})
|
||||
### WebSocket — Unified (GET /ws) [Recommended]
|
||||
|
||||
Real-time notifications when other clients sync. Client must authenticate first.
|
||||
Single connection per user. Receives notifications for all apps with `appId` in the payload.
|
||||
|
||||
```
|
||||
CLIENT -> SERVER: { "type": "auth", "token": "<jwt>" }
|
||||
SERVER -> CLIENT: { "type": "auth-ok" }
|
||||
|
||||
// When another client syncs:
|
||||
SERVER -> CLIENT: { "type": "sync-available", "tables": ["todos"] }
|
||||
SERVER -> CLIENT: { "type": "sync-available", "appId": "todo", "tables": ["tasks"] }
|
||||
|
||||
// Keepalive:
|
||||
CLIENT -> SERVER: { "type": "ping" }
|
||||
SERVER -> CLIENT: { "type": "pong" }
|
||||
```
|
||||
|
||||
### WebSocket — Legacy (GET /ws/{appId})
|
||||
|
||||
One connection per app. Only receives notifications for that specific app. Backward-compatible.
|
||||
|
||||
```
|
||||
CLIENT -> SERVER: { "type": "auth", "token": "<jwt>" }
|
||||
SERVER -> CLIENT: { "type": "auth-ok" }
|
||||
SERVER -> CLIENT: { "type": "sync-available", "appId": "todo", "tables": ["tasks"] }
|
||||
```
|
||||
|
||||
## Conflict Resolution: Field-Level LWW
|
||||
|
||||
Each field update carries a timestamp. When the same field is modified by multiple clients, the latest timestamp wins.
|
||||
|
|
@ -119,7 +129,8 @@ Result: title="Buy eggs", completed=true (merged — different fields)
|
|||
|----------|--------|------|-------------|
|
||||
| `POST /sync/{appId}` | POST | JWT | Push changes, get server delta |
|
||||
| `GET /sync/{appId}/pull` | GET | JWT | Pull changes for a collection |
|
||||
| `GET /ws/{appId}` | WS | JWT (in-band) | Real-time sync notifications |
|
||||
| `GET /ws` | WS | JWT (in-band) | Unified real-time sync (all apps, one connection) |
|
||||
| `GET /ws/{appId}` | WS | JWT (in-band) | Legacy per-app sync notifications |
|
||||
| `GET /health` | GET | No | Health check with connection stats |
|
||||
| `GET /metrics` | GET | No | Prometheus metrics |
|
||||
|
||||
|
|
|
|||
|
|
@ -59,7 +59,12 @@ func main() {
|
|||
mux.HandleFunc("POST /sync/{appId}", handler.HandleSync)
|
||||
mux.HandleFunc("GET /sync/{appId}/pull", handler.HandlePull)
|
||||
|
||||
// WebSocket endpoint
|
||||
// WebSocket endpoints
|
||||
// Unified: one connection per user, receives all app notifications with appId in payload
|
||||
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.HandleWebSocket(w, r, "") // empty appID = unified mode
|
||||
})
|
||||
// Legacy: one connection per app (backward-compatible)
|
||||
mux.HandleFunc("/ws/{appId}", func(w http.ResponseWriter, r *http.Request) {
|
||||
appID := r.PathValue("appId")
|
||||
hub.HandleWebSocket(w, r, appID)
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import (
|
|||
// Message types sent over WebSocket.
|
||||
type Message struct {
|
||||
Type string `json:"type"`
|
||||
AppID string `json:"appId,omitempty"`
|
||||
Tables []string `json:"tables,omitempty"`
|
||||
Token string `json:"token,omitempty"`
|
||||
}
|
||||
|
|
@ -45,6 +46,7 @@ func NewHub(validator *auth.Validator) *Hub {
|
|||
|
||||
// HandleWebSocket upgrades an HTTP connection to WebSocket and registers the client.
|
||||
// The client must send an auth message with a valid JWT before receiving notifications.
|
||||
// Supports both unified (/ws) and legacy per-app (/ws/{appId}) connections.
|
||||
func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, appID string) {
|
||||
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
OriginPatterns: []string{"*"},
|
||||
|
|
@ -56,7 +58,7 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, appID stri
|
|||
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
client := &Client{
|
||||
AppID: appID,
|
||||
AppID: appID, // empty for unified connections, set for legacy per-app
|
||||
Conn: conn,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
|
@ -67,6 +69,8 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, appID stri
|
|||
|
||||
// NotifyUser sends a sync-available message to all connected clients of a user,
|
||||
// except the client that originated the change.
|
||||
// For unified connections (AppID==""), all clients receive the notification with appId in the payload.
|
||||
// For legacy per-app connections, only clients matching the appId are notified.
|
||||
func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string) {
|
||||
h.mu.RLock()
|
||||
clients, ok := h.clients[userID]
|
||||
|
|
@ -78,7 +82,9 @@ func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string)
|
|||
// Copy the client set under read lock to avoid holding lock during writes
|
||||
clientsCopy := make([]*Client, 0, len(clients))
|
||||
for client := range clients {
|
||||
if client.AppID == appID {
|
||||
// Unified clients (AppID=="") receive all notifications.
|
||||
// Legacy per-app clients only receive notifications for their app.
|
||||
if client.AppID == "" || client.AppID == appID {
|
||||
clientsCopy = append(clientsCopy, client)
|
||||
}
|
||||
}
|
||||
|
|
@ -90,6 +96,7 @@ func (h *Hub) NotifyUser(userID, appID, excludeClientID string, tables []string)
|
|||
|
||||
msg := Message{
|
||||
Type: "sync-available",
|
||||
AppID: appID,
|
||||
Tables: tables,
|
||||
}
|
||||
data, err := json.Marshal(msg)
|
||||
|
|
@ -175,7 +182,11 @@ func (h *Hub) readLoop(ctx context.Context, client *Client) {
|
|||
ackData, _ := json.Marshal(ackMsg)
|
||||
client.Conn.Write(ctx, websocket.MessageText, ackData)
|
||||
|
||||
slog.Info("websocket authenticated", "userID", client.UserID, "appID", client.AppID)
|
||||
mode := "unified"
|
||||
if client.AppID != "" {
|
||||
mode = "legacy:" + client.AppID
|
||||
}
|
||||
slog.Info("websocket authenticated", "userID", client.UserID, "mode", mode)
|
||||
|
||||
case "ping":
|
||||
pongMsg := Message{Type: "pong"}
|
||||
|
|
@ -221,7 +232,7 @@ func (h *Hub) addClient(client *Client) {
|
|||
}
|
||||
h.clients[client.UserID][client] = struct{}{}
|
||||
|
||||
slog.Info("client connected", "userID", client.UserID, "appID", client.AppID)
|
||||
slog.Info("client connected", "userID", client.UserID, "appID", client.AppID, "unified", client.AppID == "")
|
||||
}
|
||||
|
||||
func (h *Hub) removeClient(client *Client) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue