Beats kommen jetzt mit Server-Wall-Clock-Zeitstempel rein, lokal
gegen monotonic ContinuousClock geschedult — kompensiert WS-Latenz
und Hue-CLIP-Round-Trip.
Was:
- Sources/Core/Sync/ClockSync.swift @MainActor:
- refresh(samples: 5): GET /v1/time fünfmal, Sample mit kleinster
RTT gewinnt (Jitter-resistent); offsetMs = serverTime - midpoint
- localDeadline(forServerTimeMs:): mappt Server-Wall-Clock zurück
auf lokalen ContinuousClock.Instant für Task.sleep
- currentServerTimeMs(): lokaler now + offset
- BeatSubscriber.openOnce(): refresht ClockSync vor dem WS-Connect
- BeatSubscriber.scheduleBeat(beat): Task.sleep bis localDeadline,
dann onBeat-Callback. Falls beat.at fehlt: sofort feuern (V1-Fallback)
- BeatEvent.at type Int → Int64 (passend zu Server-Wire-Format)
Build iOS+macOS BUILD SUCCEEDED, 18/18 Tests grün.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
166 lines
4.8 KiB
Swift
166 lines
4.8 KiB
Swift
import Foundation
|
||
import ManaCore
|
||
import OSLog
|
||
|
||
/// Subscribiert den WebSocket-Beat-Channel auf `mana-presence` und
|
||
/// löst pro Beat einen kurzen Brightness-Puls auf den ausgewählten
|
||
/// Hue/HomeKit-Lampen aus.
|
||
///
|
||
/// V1-Verhalten: Brightness-Pulse via CLIP-API (Hue's REST) — bei
|
||
/// 120–180 BPM = 2–3 Hz, weit unter Hue's 10/s-Limit pro Lampe. Echte
|
||
/// Entertainment-DTLS (50Hz, alle Lampen in einem Frame) kommt mit
|
||
/// μ-10.6.
|
||
///
|
||
/// Bei Mood-Switch oder Player-Close wird der Subscriber gestoppt;
|
||
/// reconnects laufen mit exponential backoff.
|
||
@MainActor
|
||
@Observable
|
||
final class BeatSubscriber {
|
||
enum State: Sendable, Equatable {
|
||
case disconnected
|
||
case connecting
|
||
case connected
|
||
case failed(message: String)
|
||
}
|
||
|
||
private(set) var state: State = .disconnected
|
||
|
||
private let auth: AuthClient
|
||
private let presenceBaseURL: URL
|
||
private let urlSession: URLSession
|
||
private let log = Logger(subsystem: "ev.mana.moodlit", category: "beat-subscriber")
|
||
private let deviceId: String
|
||
|
||
private var wsTask: URLSessionWebSocketTask?
|
||
private var reconnectTask: Task<Void, Never>?
|
||
private var receiveTask: Task<Void, Never>?
|
||
private var onBeatCallback: ((BeatEvent) -> Void)?
|
||
private var backoffMs: UInt64 = 500
|
||
private let clockSync: ClockSync
|
||
|
||
struct BeatEvent: Decodable, Sendable {
|
||
let type: String
|
||
let at: Int64? // Server-Wall-Clock-Zeitstempel, wann der Beat PHYSISCH erklingen soll
|
||
let songId: String?
|
||
let intensity: Double?
|
||
}
|
||
|
||
init(
|
||
auth: AuthClient,
|
||
presenceBaseURL: URL,
|
||
session: URLSession = .shared
|
||
) {
|
||
self.auth = auth
|
||
self.presenceBaseURL = presenceBaseURL
|
||
self.urlSession = session
|
||
self.deviceId = "moodlit-beats-\(UUID().uuidString.lowercased())"
|
||
self.clockSync = ClockSync(baseURL: presenceBaseURL, session: session)
|
||
}
|
||
|
||
/// Startet eine WS-Connection. `onBeat` wird auf MainActor pro
|
||
/// empfangenem Beat-Event aufgerufen.
|
||
func start(onBeat: @escaping (BeatEvent) -> Void) {
|
||
onBeatCallback = onBeat
|
||
guard wsTask == nil else { return }
|
||
state = .connecting
|
||
Task { await connectLoop() }
|
||
}
|
||
|
||
func stop() {
|
||
reconnectTask?.cancel()
|
||
reconnectTask = nil
|
||
receiveTask?.cancel()
|
||
receiveTask = nil
|
||
wsTask?.cancel(with: .goingAway, reason: nil)
|
||
wsTask = nil
|
||
state = .disconnected
|
||
onBeatCallback = nil
|
||
}
|
||
|
||
private func connectLoop() async {
|
||
while !Task.isCancelled, onBeatCallback != nil {
|
||
do {
|
||
try await openOnce()
|
||
try await receiveLoop()
|
||
} catch is CancellationError {
|
||
return
|
||
} catch {
|
||
let msg = error.localizedDescription
|
||
state = .failed(message: msg)
|
||
log.warning("Beat-WS unterbrochen, reconnect in \(self.backoffMs)ms: \(msg, privacy: .public)")
|
||
try? await Task.sleep(nanoseconds: backoffMs * 1_000_000)
|
||
backoffMs = min(backoffMs * 2, 30_000)
|
||
}
|
||
}
|
||
}
|
||
|
||
private func openOnce() async throws {
|
||
// Erst Clock-Sync — sonst können wir keine Lookahead-Deadlines
|
||
// rechnen. Bei Fehler: log + weiter mit offset=0 (degrades to
|
||
// "render immediately").
|
||
do {
|
||
try await clockSync.refresh()
|
||
} catch {
|
||
log.warning("ClockSync skipped: \(error.localizedDescription, privacy: .public)")
|
||
}
|
||
|
||
let token = try await auth.freshAccessToken()
|
||
var comps = URLComponents(url: presenceBaseURL, resolvingAgainstBaseURL: false)!
|
||
comps.scheme = comps.scheme == "https" ? "wss" : "ws"
|
||
comps.path = "/v1/beats/moodlit"
|
||
comps.queryItems = [
|
||
URLQueryItem(name: "token", value: token),
|
||
URLQueryItem(name: "senderId", value: deviceId),
|
||
]
|
||
let task = urlSession.webSocketTask(with: comps.url!)
|
||
task.resume()
|
||
wsTask = task
|
||
state = .connected
|
||
backoffMs = 500
|
||
}
|
||
|
||
private func receiveLoop() async throws {
|
||
guard let task = wsTask else { return }
|
||
while !Task.isCancelled {
|
||
let message = try await task.receive()
|
||
switch message {
|
||
case .string(let str):
|
||
dispatch(text: str)
|
||
case .data(let data):
|
||
if let str = String(data: data, encoding: .utf8) {
|
||
dispatch(text: str)
|
||
}
|
||
@unknown default:
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
private func dispatch(text: String) {
|
||
guard let data = text.data(using: .utf8) else { return }
|
||
do {
|
||
let beat = try JSONDecoder().decode(BeatEvent.self, from: data)
|
||
guard beat.type == "beat" else { return }
|
||
scheduleBeat(beat)
|
||
} catch {
|
||
log.warning("Beat decode failed: \(error.localizedDescription, privacy: .public)")
|
||
}
|
||
}
|
||
|
||
/// Pro empfangenem Beat wird ein Sleep-Task gescheduled, der bei
|
||
/// der Server-Wall-Clock-Zeit `beat.at` (in lokaler monotonic
|
||
/// Zeit übersetzt) den `onBeat`-Callback feuert. Wenn `beat.at`
|
||
/// fehlt oder schon vergangen ist, feuern wir sofort.
|
||
private func scheduleBeat(_ beat: BeatEvent) {
|
||
guard let cb = onBeatCallback else { return }
|
||
guard let atMs = beat.at else {
|
||
cb(beat)
|
||
return
|
||
}
|
||
let deadline = clockSync.localDeadline(forServerTimeMs: atMs)
|
||
Task {
|
||
try? await Task.sleep(until: deadline, clock: ContinuousClock())
|
||
cb(beat)
|
||
}
|
||
}
|
||
}
|