Cross-Device-Live-Sync gegen mana-presence (Port 3079). Was geht: - `PresenceClient` @Observable + interner Actor: - PUT /v1/sessions bei Player-Open/Pause/Close mit MoodlitSessionPayload - SSE /v1/stream/moodlit als langlebige URLSession.bytes-Connection - Reconnect mit exponential backoff (500ms → 30s) - Last-Event-ID-Catch-Up - Echo-Filter: eigene originDevice-ID wird in remote ignoriert - 401-Retry mit Refresh-Token - AppConfig.presenceBaseURL (prod presence.mana.how, MANA_PRESENCE_URL-Override) - MoodlitNativeApp injiziert PresenceClient als Environment - RootView: connect on .active + signedIn, disconnect on .background + signedOut - MoodPlayerView: publishSession on open/pause/close, endSession on close - MoodListView: "Läuft auf einem anderen Gerät"-Banner + Tap-to-Mirror UX-Entscheidung V1: Banner zeigt remote-Mood wenn lokaler Player zu ist. Auto-Switch innerhalb laufender Player kommt in μ-8.5 (Bidirectional-Mirror). Eigenes Device-Echo wird per UUID in App-Group-UserDefaults gefiltert. Build iOS+macOS grün, 11/11 Tests grün. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
299 lines
8.8 KiB
Swift
299 lines
8.8 KiB
Swift
import Foundation
|
|
import ManaCore
|
|
import OSLog
|
|
|
|
/// Client für `mana-presence` (Live-Cross-Device-State).
|
|
///
|
|
/// - **Publish:** `PUT /v1/sessions` mit Mood-Payload bei Player-
|
|
/// Open/Change/Close. JWT-Auth via `AuthenticatedTransport`.
|
|
/// - **Subscribe:** SSE `GET /v1/stream/moodlit` als langlebige
|
|
/// HTTP-Connection. Reconnect mit `Last-Event-ID`. Auto-Heartbeat
|
|
/// alle 25s vom Server.
|
|
/// - **Mirror-UX:** `remote` enthält den State, der gerade auf
|
|
/// *anderen* Geräten dieses Users läuft. Die UI entscheidet, ob sie
|
|
/// ihn anzeigt, ignoriert oder spiegelt — `PresenceClient` ist nur
|
|
/// der Transport.
|
|
///
|
|
/// Lebenszyklus: `connect()` beim App-Start nach `auth.bootstrap`,
|
|
/// `disconnect()` beim `scenePhase == .background` mit 30s Grace.
|
|
@MainActor
|
|
@Observable
|
|
public final class PresenceClient {
|
|
public enum State: Sendable, Equatable {
|
|
case disconnected
|
|
case connecting
|
|
case connected(revision: Int64)
|
|
case failed(message: String)
|
|
}
|
|
|
|
public private(set) var state: State = .disconnected
|
|
public private(set) var remote: PresenceSession?
|
|
public private(set) var lastError: String?
|
|
|
|
/// Eindeutige Geräte-ID. Identifiziert *dieses* Gerät in
|
|
/// Server-Events — wenn `remote.originDevice == self.deviceId`
|
|
/// ist es das eigene Echo und sollte nicht als „remote" gespiegelt
|
|
/// werden. Persistiert in App-Group-UserDefaults, damit
|
|
/// Reinstall/Reset einen neuen ID kriegt.
|
|
public let deviceId: String
|
|
|
|
private let auth: AuthClient
|
|
private let baseURL: URL
|
|
private let appId: String
|
|
private let session: URLSession
|
|
private let decoder = JSONDecoder()
|
|
private let encoder = JSONEncoder()
|
|
private let log = Logger(subsystem: "ev.mana.moodlit", category: "presence")
|
|
|
|
private var streamTask: Task<Void, Never>?
|
|
private var lastRevision: Int64 = 0
|
|
|
|
public init(
|
|
auth: AuthClient,
|
|
baseURL: URL,
|
|
appId: String = "moodlit",
|
|
deviceIdKey: String = "moodlit.presence.deviceId",
|
|
appGroup: String? = nil,
|
|
session: URLSession = .shared
|
|
) {
|
|
self.auth = auth
|
|
self.baseURL = baseURL
|
|
self.appId = appId
|
|
self.session = session
|
|
self.deviceId = Self.loadOrCreateDeviceId(key: deviceIdKey, appGroup: appGroup)
|
|
}
|
|
|
|
// MARK: - Lifecycle
|
|
|
|
public func connect() async {
|
|
switch state {
|
|
case .connecting, .connected:
|
|
return
|
|
default:
|
|
break
|
|
}
|
|
|
|
state = .connecting
|
|
streamTask?.cancel()
|
|
streamTask = Task { [weak self] in
|
|
await self?.streamLoop()
|
|
}
|
|
}
|
|
|
|
public func disconnect() {
|
|
streamTask?.cancel()
|
|
streamTask = nil
|
|
state = .disconnected
|
|
}
|
|
|
|
// MARK: - Publish
|
|
|
|
/// Publisht den lokalen Player-State zu mana-presence. Liefert
|
|
/// `revision` des Server-Ergebnisses zurück — kann verwendet
|
|
/// werden, um eigenes Echo (SSE-Event mit derselben revision) zu
|
|
/// ignorieren.
|
|
@discardableResult
|
|
public func publish(
|
|
payload: MoodlitSessionPayload,
|
|
source: String,
|
|
isPaused: Bool,
|
|
startedAt: Date? = nil
|
|
) async throws -> Int64 {
|
|
let body = PresenceUpsertBody(
|
|
appId: appId,
|
|
payload: payload,
|
|
source: source,
|
|
originDevice: deviceId,
|
|
isPaused: isPaused,
|
|
startedAt: startedAt.map { ISO8601DateFormatter().string(from: $0) }
|
|
)
|
|
let data = try await request(
|
|
path: "/v1/sessions",
|
|
method: "PUT",
|
|
body: try encoder.encode(body)
|
|
)
|
|
let session = try decoder.decode(PresenceSession.self, from: data)
|
|
lastRevision = session.revision
|
|
return session.revision
|
|
}
|
|
|
|
public func endSession() async {
|
|
do {
|
|
_ = try await request(path: "/v1/sessions/\(appId)", method: "DELETE", body: nil)
|
|
} catch {
|
|
log.warning("DELETE /v1/sessions failed: \(error.localizedDescription, privacy: .public)")
|
|
}
|
|
}
|
|
|
|
// MARK: - Internal Stream
|
|
|
|
private func streamLoop() async {
|
|
var backoffMs: UInt64 = 500
|
|
while !Task.isCancelled {
|
|
do {
|
|
try await runStreamOnce()
|
|
backoffMs = 500
|
|
} catch is CancellationError {
|
|
return
|
|
} catch {
|
|
let msg = (error as? URLError)?.localizedDescription ?? String(describing: error)
|
|
log.warning("SSE-Loop unterbrochen, reconnect in \(backoffMs)ms: \(msg, privacy: .public)")
|
|
await MainActor.run {
|
|
self.state = .failed(message: msg)
|
|
self.lastError = msg
|
|
}
|
|
try? await Task.sleep(nanoseconds: backoffMs * 1_000_000)
|
|
backoffMs = min(backoffMs * 2, 30_000)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func runStreamOnce() async throws {
|
|
let token = try await auth.freshAccessToken()
|
|
var request = URLRequest(url: baseURL.appendingPathComponent("/v1/stream/\(appId)"))
|
|
request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
|
|
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
|
|
if lastRevision > 0 {
|
|
request.setValue(String(lastRevision), forHTTPHeaderField: "Last-Event-ID")
|
|
}
|
|
request.timeoutInterval = 0 // SSE = langlebig
|
|
|
|
let (bytes, response) = try await session.bytes(for: request)
|
|
guard let http = response as? HTTPURLResponse else {
|
|
throw URLError(.badServerResponse)
|
|
}
|
|
if http.statusCode == 401 {
|
|
_ = try await auth.refreshAccessToken()
|
|
throw URLError(.userAuthenticationRequired) // triggert Reconnect mit frischem Token
|
|
}
|
|
guard http.statusCode == 200 else {
|
|
throw URLError(.badServerResponse)
|
|
}
|
|
|
|
await MainActor.run {
|
|
self.state = .connected(revision: self.lastRevision)
|
|
self.lastError = nil
|
|
}
|
|
|
|
// SSE-Parser — Frames bestehen aus `id:`, `event:`, `data:` Zeilen,
|
|
// abgetrennt durch Leerzeile. `data:` kann mehrzeilig sein, wird
|
|
// joined.
|
|
var currentId: String?
|
|
var currentEvent: String?
|
|
var currentData = ""
|
|
|
|
for try await line in bytes.lines {
|
|
if line.isEmpty {
|
|
await dispatchFrame(id: currentId, event: currentEvent, data: currentData)
|
|
currentId = nil
|
|
currentEvent = nil
|
|
currentData = ""
|
|
continue
|
|
}
|
|
if line.hasPrefix(":") {
|
|
continue // SSE-Kommentar (Heartbeat)
|
|
}
|
|
if let value = line.value(after: "id:") {
|
|
currentId = value
|
|
} else if let value = line.value(after: "event:") {
|
|
currentEvent = value
|
|
} else if let value = line.value(after: "data:") {
|
|
if !currentData.isEmpty { currentData.append("\n") }
|
|
currentData.append(value)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func dispatchFrame(id: String?, event: String?, data: String) async {
|
|
guard !data.isEmpty else { return }
|
|
|
|
if let id, let rev = Int64(id) {
|
|
lastRevision = max(lastRevision, rev)
|
|
await MainActor.run {
|
|
self.state = .connected(revision: self.lastRevision)
|
|
}
|
|
}
|
|
|
|
switch event {
|
|
case "session", nil:
|
|
guard let json = data.data(using: .utf8) else { return }
|
|
do {
|
|
let session = try decoder.decode(PresenceSession.self, from: json)
|
|
// Eigenes Echo nicht als „remote" interpretieren
|
|
guard session.originDevice != deviceId else { return }
|
|
await MainActor.run {
|
|
self.remote = session
|
|
}
|
|
} catch {
|
|
log.warning("SSE session-decode failed: \(error.localizedDescription, privacy: .public)")
|
|
}
|
|
case "deleted":
|
|
await MainActor.run {
|
|
self.remote = nil
|
|
}
|
|
case "empty":
|
|
// Initialer Snapshot ohne aktive Session
|
|
await MainActor.run {
|
|
self.remote = nil
|
|
}
|
|
case "ping":
|
|
return // Heartbeat
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
|
|
// MARK: - Request-Helper
|
|
|
|
private func request(path: String, method: String, body: Data?) async throws -> Data {
|
|
let token = try await auth.freshAccessToken()
|
|
let url = baseURL.appendingPathComponent(path)
|
|
var req = URLRequest(url: url)
|
|
req.httpMethod = method
|
|
req.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
|
|
if let body {
|
|
req.httpBody = body
|
|
req.setValue("application/json", forHTTPHeaderField: "Content-Type")
|
|
}
|
|
let (data, response) = try await session.data(for: req)
|
|
guard let http = response as? HTTPURLResponse else {
|
|
throw URLError(.badServerResponse)
|
|
}
|
|
if http.statusCode == 401 {
|
|
_ = try await auth.refreshAccessToken()
|
|
let token2 = try await auth.freshAccessToken()
|
|
req.setValue("Bearer \(token2)", forHTTPHeaderField: "Authorization")
|
|
let (data2, response2) = try await session.data(for: req)
|
|
guard let http2 = response2 as? HTTPURLResponse, (200...299).contains(http2.statusCode) else {
|
|
throw URLError(.userAuthenticationRequired)
|
|
}
|
|
return data2
|
|
}
|
|
guard (200...299).contains(http.statusCode) else {
|
|
throw URLError(.init(rawValue: http.statusCode))
|
|
}
|
|
return data
|
|
}
|
|
|
|
// MARK: - Device-ID
|
|
|
|
private static func loadOrCreateDeviceId(key: String, appGroup: String?) -> String {
|
|
let defaults = appGroup.flatMap { UserDefaults(suiteName: $0) } ?? UserDefaults.standard
|
|
if let existing = defaults.string(forKey: key), !existing.isEmpty {
|
|
return existing
|
|
}
|
|
let new = "ios-\(UUID().uuidString.lowercased())"
|
|
defaults.set(new, forKey: key)
|
|
return new
|
|
}
|
|
}
|
|
|
|
private extension String {
|
|
/// Wenn der String mit `prefix` beginnt: gib den Teil danach
|
|
/// zurück (führendes Whitespace getrimmt). Sonst `nil`.
|
|
func value(after prefix: String) -> String? {
|
|
guard hasPrefix(prefix) else { return nil }
|
|
let rest = dropFirst(prefix.count)
|
|
return String(rest).trimmingCharacters(in: .whitespaces)
|
|
}
|
|
}
|