moodlit-native/Sources/Core/Sync/PresenceClient.swift
till bb7415b7ac feat(presence): μ-8.3 iOS+macOS PresenceClient + Banner-UI
Cross-Device-Live-Sync gegen mana-presence (Port 3079).

Was geht:
- `PresenceClient` @Observable + interner Actor:
  - PUT /v1/sessions bei Player-Open/Pause/Close mit MoodlitSessionPayload
  - SSE /v1/stream/moodlit als langlebige URLSession.bytes-Connection
  - Reconnect mit exponential backoff (500ms → 30s)
  - Last-Event-ID-Catch-Up
  - Echo-Filter: eigene originDevice-ID wird in remote ignoriert
  - 401-Retry mit Refresh-Token
- AppConfig.presenceBaseURL (prod presence.mana.how, MANA_PRESENCE_URL-Override)
- MoodlitNativeApp injiziert PresenceClient als Environment
- RootView: connect on .active + signedIn, disconnect on .background + signedOut
- MoodPlayerView: publishSession on open/pause/close, endSession on close
- MoodListView: "Läuft auf einem anderen Gerät"-Banner + Tap-to-Mirror

UX-Entscheidung V1: Banner zeigt remote-Mood wenn lokaler Player zu ist.
Auto-Switch innerhalb laufender Player kommt in μ-8.5 (Bidirectional-Mirror).
Eigenes Device-Echo wird per UUID in App-Group-UserDefaults gefiltert.

Build iOS+macOS grün, 11/11 Tests grün.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 13:18:12 +02:00

299 lines
8.8 KiB
Swift

import Foundation
import ManaCore
import OSLog
/// Client für `mana-presence` (Live-Cross-Device-State).
///
/// - **Publish:** `PUT /v1/sessions` mit Mood-Payload bei Player-
/// Open/Change/Close. JWT-Auth via `AuthenticatedTransport`.
/// - **Subscribe:** SSE `GET /v1/stream/moodlit` als langlebige
/// HTTP-Connection. Reconnect mit `Last-Event-ID`. Auto-Heartbeat
/// alle 25s vom Server.
/// - **Mirror-UX:** `remote` enthält den State, der gerade auf
/// *anderen* Geräten dieses Users läuft. Die UI entscheidet, ob sie
/// ihn anzeigt, ignoriert oder spiegelt `PresenceClient` ist nur
/// der Transport.
///
/// Lebenszyklus: `connect()` beim App-Start nach `auth.bootstrap`,
/// `disconnect()` beim `scenePhase == .background` mit 30s Grace.
@MainActor
@Observable
public final class PresenceClient {
public enum State: Sendable, Equatable {
case disconnected
case connecting
case connected(revision: Int64)
case failed(message: String)
}
public private(set) var state: State = .disconnected
public private(set) var remote: PresenceSession?
public private(set) var lastError: String?
/// Eindeutige Geräte-ID. Identifiziert *dieses* Gerät in
/// Server-Events wenn `remote.originDevice == self.deviceId`
/// ist es das eigene Echo und sollte nicht als remote" gespiegelt
/// werden. Persistiert in App-Group-UserDefaults, damit
/// Reinstall/Reset einen neuen ID kriegt.
public let deviceId: String
private let auth: AuthClient
private let baseURL: URL
private let appId: String
private let session: URLSession
private let decoder = JSONDecoder()
private let encoder = JSONEncoder()
private let log = Logger(subsystem: "ev.mana.moodlit", category: "presence")
private var streamTask: Task<Void, Never>?
private var lastRevision: Int64 = 0
public init(
auth: AuthClient,
baseURL: URL,
appId: String = "moodlit",
deviceIdKey: String = "moodlit.presence.deviceId",
appGroup: String? = nil,
session: URLSession = .shared
) {
self.auth = auth
self.baseURL = baseURL
self.appId = appId
self.session = session
self.deviceId = Self.loadOrCreateDeviceId(key: deviceIdKey, appGroup: appGroup)
}
// MARK: - Lifecycle
public func connect() async {
switch state {
case .connecting, .connected:
return
default:
break
}
state = .connecting
streamTask?.cancel()
streamTask = Task { [weak self] in
await self?.streamLoop()
}
}
public func disconnect() {
streamTask?.cancel()
streamTask = nil
state = .disconnected
}
// MARK: - Publish
/// Publisht den lokalen Player-State zu mana-presence. Liefert
/// `revision` des Server-Ergebnisses zurück kann verwendet
/// werden, um eigenes Echo (SSE-Event mit derselben revision) zu
/// ignorieren.
@discardableResult
public func publish(
payload: MoodlitSessionPayload,
source: String,
isPaused: Bool,
startedAt: Date? = nil
) async throws -> Int64 {
let body = PresenceUpsertBody(
appId: appId,
payload: payload,
source: source,
originDevice: deviceId,
isPaused: isPaused,
startedAt: startedAt.map { ISO8601DateFormatter().string(from: $0) }
)
let data = try await request(
path: "/v1/sessions",
method: "PUT",
body: try encoder.encode(body)
)
let session = try decoder.decode(PresenceSession.self, from: data)
lastRevision = session.revision
return session.revision
}
public func endSession() async {
do {
_ = try await request(path: "/v1/sessions/\(appId)", method: "DELETE", body: nil)
} catch {
log.warning("DELETE /v1/sessions failed: \(error.localizedDescription, privacy: .public)")
}
}
// MARK: - Internal Stream
private func streamLoop() async {
var backoffMs: UInt64 = 500
while !Task.isCancelled {
do {
try await runStreamOnce()
backoffMs = 500
} catch is CancellationError {
return
} catch {
let msg = (error as? URLError)?.localizedDescription ?? String(describing: error)
log.warning("SSE-Loop unterbrochen, reconnect in \(backoffMs)ms: \(msg, privacy: .public)")
await MainActor.run {
self.state = .failed(message: msg)
self.lastError = msg
}
try? await Task.sleep(nanoseconds: backoffMs * 1_000_000)
backoffMs = min(backoffMs * 2, 30_000)
}
}
}
private func runStreamOnce() async throws {
let token = try await auth.freshAccessToken()
var request = URLRequest(url: baseURL.appendingPathComponent("/v1/stream/\(appId)"))
request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
if lastRevision > 0 {
request.setValue(String(lastRevision), forHTTPHeaderField: "Last-Event-ID")
}
request.timeoutInterval = 0 // SSE = langlebig
let (bytes, response) = try await session.bytes(for: request)
guard let http = response as? HTTPURLResponse else {
throw URLError(.badServerResponse)
}
if http.statusCode == 401 {
_ = try await auth.refreshAccessToken()
throw URLError(.userAuthenticationRequired) // triggert Reconnect mit frischem Token
}
guard http.statusCode == 200 else {
throw URLError(.badServerResponse)
}
await MainActor.run {
self.state = .connected(revision: self.lastRevision)
self.lastError = nil
}
// SSE-Parser Frames bestehen aus `id:`, `event:`, `data:` Zeilen,
// abgetrennt durch Leerzeile. `data:` kann mehrzeilig sein, wird
// joined.
var currentId: String?
var currentEvent: String?
var currentData = ""
for try await line in bytes.lines {
if line.isEmpty {
await dispatchFrame(id: currentId, event: currentEvent, data: currentData)
currentId = nil
currentEvent = nil
currentData = ""
continue
}
if line.hasPrefix(":") {
continue // SSE-Kommentar (Heartbeat)
}
if let value = line.value(after: "id:") {
currentId = value
} else if let value = line.value(after: "event:") {
currentEvent = value
} else if let value = line.value(after: "data:") {
if !currentData.isEmpty { currentData.append("\n") }
currentData.append(value)
}
}
}
private func dispatchFrame(id: String?, event: String?, data: String) async {
guard !data.isEmpty else { return }
if let id, let rev = Int64(id) {
lastRevision = max(lastRevision, rev)
await MainActor.run {
self.state = .connected(revision: self.lastRevision)
}
}
switch event {
case "session", nil:
guard let json = data.data(using: .utf8) else { return }
do {
let session = try decoder.decode(PresenceSession.self, from: json)
// Eigenes Echo nicht als remote" interpretieren
guard session.originDevice != deviceId else { return }
await MainActor.run {
self.remote = session
}
} catch {
log.warning("SSE session-decode failed: \(error.localizedDescription, privacy: .public)")
}
case "deleted":
await MainActor.run {
self.remote = nil
}
case "empty":
// Initialer Snapshot ohne aktive Session
await MainActor.run {
self.remote = nil
}
case "ping":
return // Heartbeat
default:
return
}
}
// MARK: - Request-Helper
private func request(path: String, method: String, body: Data?) async throws -> Data {
let token = try await auth.freshAccessToken()
let url = baseURL.appendingPathComponent(path)
var req = URLRequest(url: url)
req.httpMethod = method
req.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
if let body {
req.httpBody = body
req.setValue("application/json", forHTTPHeaderField: "Content-Type")
}
let (data, response) = try await session.data(for: req)
guard let http = response as? HTTPURLResponse else {
throw URLError(.badServerResponse)
}
if http.statusCode == 401 {
_ = try await auth.refreshAccessToken()
let token2 = try await auth.freshAccessToken()
req.setValue("Bearer \(token2)", forHTTPHeaderField: "Authorization")
let (data2, response2) = try await session.data(for: req)
guard let http2 = response2 as? HTTPURLResponse, (200...299).contains(http2.statusCode) else {
throw URLError(.userAuthenticationRequired)
}
return data2
}
guard (200...299).contains(http.statusCode) else {
throw URLError(.init(rawValue: http.statusCode))
}
return data
}
// MARK: - Device-ID
private static func loadOrCreateDeviceId(key: String, appGroup: String?) -> String {
let defaults = appGroup.flatMap { UserDefaults(suiteName: $0) } ?? UserDefaults.standard
if let existing = defaults.string(forKey: key), !existing.isEmpty {
return existing
}
let new = "ios-\(UUID().uuidString.lowercased())"
defaults.set(new, forKey: key)
return new
}
}
private extension String {
/// Wenn der String mit `prefix` beginnt: gib den Teil danach
/// zurück (führendes Whitespace getrimmt). Sonst `nil`.
func value(after prefix: String) -> String? {
guard hasPrefix(prefix) else { return nil }
let rest = dropFirst(prefix.count)
return String(rest).trimmingCharacters(in: .whitespaces)
}
}