moodlit-native/Sources/Core/Sync/BeatSubscriber.swift
till c92995b7e9 feat(beat-sync): μ-10.4 NTP-Style Lookahead-Timing für tight Beat-Sync
Beats kommen jetzt mit Server-Wall-Clock-Zeitstempel rein, lokal
gegen monotonic ContinuousClock geschedult — kompensiert WS-Latenz
und Hue-CLIP-Round-Trip.

Was:
- Sources/Core/Sync/ClockSync.swift @MainActor:
  - refresh(samples: 5): GET /v1/time fünfmal, Sample mit kleinster
    RTT gewinnt (Jitter-resistent); offsetMs = serverTime - midpoint
  - localDeadline(forServerTimeMs:): mappt Server-Wall-Clock zurück
    auf lokalen ContinuousClock.Instant für Task.sleep
  - currentServerTimeMs(): lokaler now + offset
- BeatSubscriber.openOnce(): refresht ClockSync vor dem WS-Connect
- BeatSubscriber.scheduleBeat(beat): Task.sleep bis localDeadline,
  dann onBeat-Callback. Falls beat.at fehlt: sofort feuern (V1-Fallback)
- BeatEvent.at type Int → Int64 (passend zu Server-Wire-Format)

Build iOS+macOS BUILD SUCCEEDED, 18/18 Tests grün.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 15:44:04 +02:00

166 lines
4.8 KiB
Swift
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import Foundation
import ManaCore
import OSLog
/// Subscribiert den WebSocket-Beat-Channel auf `mana-presence` und
/// löst pro Beat einen kurzen Brightness-Puls auf den ausgewählten
/// Hue/HomeKit-Lampen aus.
///
/// V1-Verhalten: Brightness-Pulse via CLIP-API (Hue's REST) bei
/// 120180 BPM = 23 Hz, weit unter Hue's 10/s-Limit pro Lampe. Echte
/// Entertainment-DTLS (50Hz, alle Lampen in einem Frame) kommt mit
/// μ-10.6.
///
/// Bei Mood-Switch oder Player-Close wird der Subscriber gestoppt;
/// reconnects laufen mit exponential backoff.
@MainActor
@Observable
final class BeatSubscriber {
enum State: Sendable, Equatable {
case disconnected
case connecting
case connected
case failed(message: String)
}
private(set) var state: State = .disconnected
private let auth: AuthClient
private let presenceBaseURL: URL
private let urlSession: URLSession
private let log = Logger(subsystem: "ev.mana.moodlit", category: "beat-subscriber")
private let deviceId: String
private var wsTask: URLSessionWebSocketTask?
private var reconnectTask: Task<Void, Never>?
private var receiveTask: Task<Void, Never>?
private var onBeatCallback: ((BeatEvent) -> Void)?
private var backoffMs: UInt64 = 500
private let clockSync: ClockSync
struct BeatEvent: Decodable, Sendable {
let type: String
let at: Int64? // Server-Wall-Clock-Zeitstempel, wann der Beat PHYSISCH erklingen soll
let songId: String?
let intensity: Double?
}
init(
auth: AuthClient,
presenceBaseURL: URL,
session: URLSession = .shared
) {
self.auth = auth
self.presenceBaseURL = presenceBaseURL
self.urlSession = session
self.deviceId = "moodlit-beats-\(UUID().uuidString.lowercased())"
self.clockSync = ClockSync(baseURL: presenceBaseURL, session: session)
}
/// Startet eine WS-Connection. `onBeat` wird auf MainActor pro
/// empfangenem Beat-Event aufgerufen.
func start(onBeat: @escaping (BeatEvent) -> Void) {
onBeatCallback = onBeat
guard wsTask == nil else { return }
state = .connecting
Task { await connectLoop() }
}
func stop() {
reconnectTask?.cancel()
reconnectTask = nil
receiveTask?.cancel()
receiveTask = nil
wsTask?.cancel(with: .goingAway, reason: nil)
wsTask = nil
state = .disconnected
onBeatCallback = nil
}
private func connectLoop() async {
while !Task.isCancelled, onBeatCallback != nil {
do {
try await openOnce()
try await receiveLoop()
} catch is CancellationError {
return
} catch {
let msg = error.localizedDescription
state = .failed(message: msg)
log.warning("Beat-WS unterbrochen, reconnect in \(self.backoffMs)ms: \(msg, privacy: .public)")
try? await Task.sleep(nanoseconds: backoffMs * 1_000_000)
backoffMs = min(backoffMs * 2, 30_000)
}
}
}
private func openOnce() async throws {
// Erst Clock-Sync sonst können wir keine Lookahead-Deadlines
// rechnen. Bei Fehler: log + weiter mit offset=0 (degrades to
// "render immediately").
do {
try await clockSync.refresh()
} catch {
log.warning("ClockSync skipped: \(error.localizedDescription, privacy: .public)")
}
let token = try await auth.freshAccessToken()
var comps = URLComponents(url: presenceBaseURL, resolvingAgainstBaseURL: false)!
comps.scheme = comps.scheme == "https" ? "wss" : "ws"
comps.path = "/v1/beats/moodlit"
comps.queryItems = [
URLQueryItem(name: "token", value: token),
URLQueryItem(name: "senderId", value: deviceId),
]
let task = urlSession.webSocketTask(with: comps.url!)
task.resume()
wsTask = task
state = .connected
backoffMs = 500
}
private func receiveLoop() async throws {
guard let task = wsTask else { return }
while !Task.isCancelled {
let message = try await task.receive()
switch message {
case .string(let str):
dispatch(text: str)
case .data(let data):
if let str = String(data: data, encoding: .utf8) {
dispatch(text: str)
}
@unknown default:
break
}
}
}
private func dispatch(text: String) {
guard let data = text.data(using: .utf8) else { return }
do {
let beat = try JSONDecoder().decode(BeatEvent.self, from: data)
guard beat.type == "beat" else { return }
scheduleBeat(beat)
} catch {
log.warning("Beat decode failed: \(error.localizedDescription, privacy: .public)")
}
}
/// Pro empfangenem Beat wird ein Sleep-Task gescheduled, der bei
/// der Server-Wall-Clock-Zeit `beat.at` (in lokaler monotonic
/// Zeit übersetzt) den `onBeat`-Callback feuert. Wenn `beat.at`
/// fehlt oder schon vergangen ist, feuern wir sofort.
private func scheduleBeat(_ beat: BeatEvent) {
guard let cb = onBeatCallback else { return }
guard let atMs = beat.at else {
cb(beat)
return
}
let deadline = clockSync.localDeadline(forServerTimeMs: atMs)
Task {
try? await Task.sleep(until: deadline, clock: ContinuousClock())
cb(beat)
}
}
}