Skip to content

Commit

Permalink
Merge pull request #38 from DolbyIO/feature/sdk_bump_1.5.2
Browse files Browse the repository at this point in the history
Update to Millicast SDK 1.5.2
  • Loading branch information
aravind-raveendran authored Oct 10, 2023
2 parents 833391f + 8623501 commit 36b3e97
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 42 deletions.
24 changes: 24 additions & 0 deletions Sources/DolbyIORTSCore/Manager/MillicastLoggerHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//
// MillicastLoggerHandler.swift
//

import Foundation
import MillicastSDK
import os

final class MillicastLoggerHandler: NSObject {

private static let logger = Logger.make(category: String(describing: MillicastLoggerHandler.self))

override init() {
super.init()
MCLogger.setDelegate(self)
MCLogger.disableWebsocketLogs(true)
}
}

extension MillicastLoggerHandler: MCLoggerDelegate {
func onLog(withMessage message: String!, level: MCLogLevel) {
Self.logger.debug("🪵 onLog - \(message), log-level - \(level.rawValue)")
}
}
65 changes: 41 additions & 24 deletions Sources/DolbyIORTSCore/Manager/SubscriptionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protocol SubscriptionManagerDelegate: AnyObject {
protocol SubscriptionManagerProtocol: AnyObject {
var delegate: SubscriptionManagerDelegate? { get set }

func connect(streamName: String, accountID: String) async -> Bool
func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration) async -> Bool
func startSubscribe() async -> Bool
func stopSubscribe() async -> Bool
func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder)
Expand All @@ -50,6 +50,15 @@ protocol SubscriptionManagerProtocol: AnyObject {
func unprojectAudio(for source: StreamSource)
}

struct SubscriptionConfiguration {
let autoReconnect = false
let videoJitterMinimumDelayMs: UInt = 20
let statsDelayMs: UInt = 1000
let forcePlayoutDelay = false
let disableAudio = false
let rtcEventLogOutputPath: String? = nil
}

final class SubscriptionManager: SubscriptionManagerProtocol {
private enum Defaults {
static let subscribeURL = "https://director.millicast.com/api/director/subscribe"
Expand All @@ -60,12 +69,12 @@ final class SubscriptionManager: SubscriptionManagerProtocol {

weak var delegate: SubscriptionManagerDelegate?

func connect(streamName: String, accountID: String) async -> Bool {
func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration) async -> Bool {
if subscriber != nil {
_ = await stopSubscribe()
}

guard let subscriber = makeSubscriber() else {
guard let subscriber = makeSubscriber(with: configuration) else {
Self.logger.error("💼 Failed to initialise subscriber")
return false
}
Expand Down Expand Up @@ -160,15 +169,15 @@ final class SubscriptionManager: SubscriptionManagerProtocol {
}

func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder) {
Self.logger.debug("💼 Add remote track for source - \(sourceBuilder.sourceId.value ?? "MAIN")")
Self.logger.debug("💼 Add remote track for source - \(sourceBuilder.sourceId)")
sourceBuilder.supportedTrackItems.forEach { subscriber.addRemoteTrack($0.mediaType.rawValue) }
}

func projectVideo(for source: StreamSource, withQuality quality: VideoQuality) {
let videoTrack = source.videoTrack
let matchingVideoQuality = source.lowLevelVideoQualityList.matching(videoQuality: quality)

Self.logger.debug("💼 Project video for source \(String(describing: source.sourceId.value)) with quality - \(String(describing: matchingVideoQuality?.description))")
Self.logger.debug("💼 Project video for source \(source.sourceId) with quality - \(String(describing: matchingVideoQuality?.description))")

let projectionData = MCProjectionData()
projectionData.media = videoTrack.trackInfo.mediaType.rawValue
Expand All @@ -180,13 +189,13 @@ final class SubscriptionManager: SubscriptionManagerProtocol {
}

func unprojectVideo(for source: StreamSource) {
Self.logger.debug("💼 Unproject video for source \(source.sourceId.value ?? "N/A")")
Self.logger.debug("💼 Unproject video for source \(source.sourceId)")
let videoTrack = source.videoTrack
subscriber.unproject([videoTrack.trackInfo.mid])
}

func projectAudio(for source: StreamSource) {
Self.logger.debug("💼 Project audio for source \(source.sourceId.value ?? "N/A")")
Self.logger.debug("💼 Project audio for source \(source.sourceId)")
guard let audioTrack = source.audioTracks.first else {
return
}
Expand All @@ -202,7 +211,7 @@ final class SubscriptionManager: SubscriptionManagerProtocol {
}

func unprojectAudio(for source: StreamSource) {
Self.logger.debug("💼 Unproject audio for source \(source.sourceId.value ?? "N/A")")
Self.logger.debug("💼 Unproject audio for source \(source.sourceId)")
guard let audioTrack = source.audioTracks.first else {
return
}
Expand All @@ -215,15 +224,24 @@ final class SubscriptionManager: SubscriptionManagerProtocol {

private extension SubscriptionManager {

func makeSubscriber() -> MCSubscriber? {
func makeSubscriber(with configuration: SubscriptionConfiguration) -> MCSubscriber? {
let subscriber = MCSubscriber.create()

subscriber?.enableStats(true)

let options = MCClientOptions()
options.autoReconnect = false
options.autoReconnect = configuration.autoReconnect
options.videoJitterMinimumDelayMs = Int32(configuration.videoJitterMinimumDelayMs)
options.statsDelayMs = Int32(configuration.statsDelayMs)
if let rtcEventLogOutputPath = configuration.rtcEventLogOutputPath {
options.rtcEventLogOutputPath = rtcEventLogOutputPath
}
options.disableAudio = configuration.disableAudio
options.forcePlayoutDelay = configuration.forcePlayoutDelay

subscriber?.setOptions(options)

subscriber?.enableStats(true)

return subscriber
}

Expand All @@ -241,7 +259,6 @@ private extension SubscriptionManager {
// MARK: MCSubscriberListener implementation

extension SubscriptionManager: MCSubscriberListener {

func onDisconnected() {
Self.logger.debug("💼 Delegate - onDisconnected")
delegate?.onDisconnected()
Expand All @@ -252,28 +269,28 @@ extension SubscriptionManager: MCSubscriberListener {
delegate?.onSubscribed()
}

func onSubscribedError(_ reason: String!) {
func onSubscribedError(_ reason: String) {
Self.logger.error("💼 Delegate - onSubscribedError \(reason)")
delegate?.onSubscribedError(reason)
}

func onVideoTrack(_ track: MCVideoTrack!, withMid mid: String!) {
func onVideoTrack(_ track: MCVideoTrack, withMid mid: String) {
Self.logger.debug("💼 Delegate - onVideoTrack with mid \(mid)")
delegate?.onVideoTrack(track, withMid: mid)
}

func onAudioTrack(_ track: MCAudioTrack!, withMid mid: String!) {
func onAudioTrack(_ track: MCAudioTrack, withMid mid: String) {
Self.logger.debug("💼 Delegate - onAudioTrack with mid \(mid)")
delegate?.onAudioTrack(track, withMid: mid)
}

func onActive(_ streamId: String!, tracks: [String]!, sourceId: String!) {
Self.logger.debug("💼 Delegate - onActive with sourceId \(sourceId ?? "NULL"), tracks - \(tracks)")
func onActive(_ streamId: String, tracks: [String], sourceId: String) {
Self.logger.debug("💼 Delegate - onActive with sourceId \(sourceId), tracks - \(tracks)")
delegate?.onActive(streamId, tracks: tracks, sourceId: sourceId)
}

func onInactive(_ streamId: String!, sourceId: String!) {
Self.logger.debug("💼 Delegate - onInactive with sourceId \(sourceId ?? "NULL")")
func onInactive(_ streamId: String, sourceId: String) {
Self.logger.debug("💼 Delegate - onInactive with sourceId \(sourceId)")
delegate?.onInactive(streamId, sourceId: sourceId)
}

Expand All @@ -282,11 +299,11 @@ extension SubscriptionManager: MCSubscriberListener {
delegate?.onStopped()
}

func onVad(_ mid: String!, sourceId: String!) {
func onVad(_ mid: String, sourceId: String) {
Self.logger.debug("💼 Delegate - onVad with mid \(mid), sourceId \(sourceId)")
}

func onLayers(_ mid: String!, activeLayers: [MCLayerData]!, inactiveLayers: [MCLayerData]!) {
func onLayers(_ mid: String, activeLayers: [MCLayerData], inactiveLayers: [MCLayerData]) {
Self.logger.debug("💼 Delegate - onLayers for mid - \(mid) with activeLayers \(activeLayers), inactiveLayers \(inactiveLayers)")
delegate?.onLayers(mid, activeLayers: activeLayers, inactiveLayers: inactiveLayers)
}
Expand All @@ -296,17 +313,17 @@ extension SubscriptionManager: MCSubscriberListener {
delegate?.onConnected()
}

func onConnectionError(_ status: Int32, withReason reason: String!) {
func onConnectionError(_ status: Int32, withReason reason: String) {
Self.logger.error("💼 Delegate - onConnectionError")
delegate?.onConnectionError(status, withReason: reason)
}

func onSignalingError(_ message: String!) {
func onSignalingError(_ message: String) {
Self.logger.error("💼 Delegate - onSignalingError")
delegate?.onSignalingError(message)
}

func onStatsReport(_ report: MCStatsReport!) {
func onStatsReport(_ report: MCStatsReport) {
Self.logger.debug("💼 Delegate - onStatsReport")
delegate?.onStatsReport(report)
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/DolbyIORTSCore/Model/StreamSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public struct StreamSource: Equatable, Hashable, Identifiable {

init(id: String?) {
switch id {
case .none:
case .none, .some(""):
self = .main
case let .some(id):
self = .other(sourceId: id)
Expand Down
29 changes: 25 additions & 4 deletions Sources/DolbyIORTSCore/State/State.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
import Foundation
import MillicastSDK

struct VideoTrackAndMid {
let videoTrack: MCVideoTrack
let mid: String
}

struct AudioTrackAndMid {
let audioTrack: MCAudioTrack
let mid: String
}

enum State: CustomStringConvertible {
case disconnected
case connecting
Expand Down Expand Up @@ -39,16 +49,27 @@ struct SubscribedState {
private(set) var streamSourceBuilders: [StreamSourceBuilder]
private(set) var numberOfStreamViewers: Int
private(set) var streamingStats: AllStreamingStatistics?
private(set) var cachedSourceZeroVideoTrackAndMid: VideoTrackAndMid?
private(set) var cachedSourceZeroAudioTrackAndMid: AudioTrackAndMid?

init() {
init(cachedVideoTrackDetail: VideoTrackAndMid?, cachedAudioTrackDetail: AudioTrackAndMid?) {
cachedSourceZeroVideoTrackAndMid = cachedVideoTrackDetail
cachedSourceZeroAudioTrackAndMid = cachedAudioTrackDetail
streamSourceBuilders = []
numberOfStreamViewers = 0
}

mutating func add(streamId: String, sourceId: String?, tracks: [String]) {
streamSourceBuilders.append(
StreamSourceBuilder.init(streamId: streamId, sourceId: sourceId, tracks: tracks)
)
let streamSourceBuilder = StreamSourceBuilder.init(streamId: streamId, sourceId: sourceId, tracks: tracks)
if let videoTrackAndMid = cachedSourceZeroVideoTrackAndMid {
streamSourceBuilder.addVideoTrack(videoTrackAndMid.videoTrack, mid: videoTrackAndMid.mid)
cachedSourceZeroVideoTrackAndMid = nil
}
if let audioTrackAndMid = cachedSourceZeroAudioTrackAndMid {
streamSourceBuilder.addAudioTrack(audioTrackAndMid.audioTrack, mid: audioTrackAndMid.mid)
cachedSourceZeroAudioTrackAndMid = nil
}
streamSourceBuilders.append(streamSourceBuilder)
}

mutating func remove(streamId: String, sourceId: String?) {
Expand Down
14 changes: 13 additions & 1 deletion Sources/DolbyIORTSCore/State/StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ final class StateMachine {

private let stateSubject: PassthroughSubject<State, Never> = PassthroughSubject()
lazy var statePublisher: AnyPublisher<State, Never> = stateSubject.eraseToAnyPublisher()
private(set) var cachedSourceZeroVideoTrackAndMid: VideoTrackAndMid?
private(set) var cachedSourceZeroAudioTrackAndMid: AudioTrackAndMid?

init(initialState: State) {
currentState = initialState
Expand Down Expand Up @@ -69,7 +71,15 @@ final class StateMachine {
}

func onSubscribed() {
currentState = .subscribed(.init())
currentState = .subscribed(
.init(
cachedVideoTrackDetail: cachedSourceZeroVideoTrackAndMid,
cachedAudioTrackDetail: cachedSourceZeroAudioTrackAndMid
)
)
cachedSourceZeroAudioTrackAndMid = nil
cachedSourceZeroAudioTrackAndMid = nil

}

func onSubscribedError(_ reason: String) {
Expand Down Expand Up @@ -115,6 +125,7 @@ final class StateMachine {
currentState = .subscribed(state)

default:
self.cachedSourceZeroVideoTrackAndMid = VideoTrackAndMid(videoTrack: track, mid: mid)
Self.logger.error("🛑 Unexpected state on onVideoTrack - \(self.currentState.description)")
}
}
Expand All @@ -125,6 +136,7 @@ final class StateMachine {
state.addAudioTrack(track, mid: mid)
currentState = .subscribed(state)
default:
self.cachedSourceZeroAudioTrackAndMid = AudioTrackAndMid(audioTrack: track, mid: mid)
Self.logger.error("🛑 Unexpected state on onAudioTrack - \(self.currentState.description)")
}
}
Expand Down
18 changes: 9 additions & 9 deletions Sources/DolbyIORTSCore/StreamOrchestrator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public final actor StreamOrchestrator {
.eraseToAnyPublisher()
private var activeStreamDetail: StreamDetail?
private static var configuration: StreamOrchestrator.Configuration = .init()

private let logHandler: MillicastLoggerHandler = .init()

private init() {
self.init(
subscriptionManager: SubscriptionManager(),
Expand Down Expand Up @@ -69,7 +70,7 @@ public final actor StreamOrchestrator {
Self.logger.debug("👮‍♂️ Start subscribe")

async let startConnectionStateUpdate: Void = stateMachine.startConnection(streamName: streamName, accountID: accountID)
async let startConnection = subscriptionManager.connect(streamName: streamName, accountID: accountID)
async let startConnection = subscriptionManager.connect(streamName: streamName, accountID: accountID, configuration: .init())

let (_, connectionResult) = await (startConnectionStateUpdate, startConnection)
if connectionResult {
Expand Down Expand Up @@ -257,10 +258,10 @@ private extension StreamOrchestrator {
return await subscriptionManager.startSubscribe()
}

func stopAudio(for sourceId: String) {
func stopAudio(for sourceId: String?) {
switch stateSubject.value {
case let .subscribed(sources: sources, numberOfStreamViewers: _):
if let source = sources.first (where: { $0.sourceId.value == sourceId }), source.isPlayingAudio {
if let source = sources.first (where: { $0.sourceId == StreamSource.SourceId(id: sourceId) }), source.isPlayingAudio {
subscriptionManager.unprojectAudio(for: source)
}
default: break
Expand Down Expand Up @@ -367,10 +368,11 @@ extension StreamOrchestrator: SubscriptionManagerDelegate {
let stateMachineState = self.stateMachine.currentState
switch stateMachineState {
case let .subscribed(state):
guard let sourceBuilder = state.streamSourceBuilders.first(where: { $0.sourceId.value == sourceId }) else {
guard let sourceBuilder = state.streamSourceBuilders.first(where: { $0.sourceId == StreamSource.SourceId(id: sourceId) }) else {
return
}
self.subscriptionManager.addRemoteTrack(sourceBuilder)

default:
return
}
Expand All @@ -382,10 +384,8 @@ extension StreamOrchestrator: SubscriptionManagerDelegate {
guard let self = self else { return }

// Unproject audio whose source is inactive
if let sourceId = sourceId {
await self.stopAudio(for: sourceId)
}

await self.stopAudio(for: sourceId)

self.stateMachine.onInactive(streamId, sourceId: sourceId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,6 @@ final class StreamViewModel: ObservableObject {
self.internalState = .error(ErrorViewModel(error: streamError))
case .stopped, .disconnected:
self.internalState = .error(.streamOffline)
default:
// Handle's scenario where there is no sources
self.internalState = .error(.genericError)
}
}
.store(in: &self.subscriptions)
Expand Down

0 comments on commit 36b3e97

Please sign in to comment.