Skip to content

Commit

Permalink
Update to Millicast SDK 1.5.2
Browse files Browse the repository at this point in the history
Add log handler
Add logic to use the video and audio tracks received before the onSubscribed or onActive callback.
Make `MCClientOptions` configurable at the time of connect
Update app for SDK’s API changes
Update API changes to `MCSubscriberListener`
  • Loading branch information
aravind-raveendran committed Oct 10, 2023
1 parent 833391f commit e5c1212
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 42 deletions.
24 changes: 24 additions & 0 deletions Sources/DolbyIORTSCore/Manager/MillicastLoggerHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//
// MillicastLoggerHandler.swift
//

import Foundation
import MillicastSDK
import os

final class MillicastLoggerHandler: NSObject {

private static let logger = Logger.make(category: String(describing: MillicastLoggerHandler.self))

override init() {
super.init()
MCLogger.setDelegate(self)
MCLogger.disableWebsocketLogs(true)
}
}

extension MillicastLoggerHandler: MCLoggerDelegate {
func onLog(withMessage message: String!, level: MCLogLevel) {
Self.logger.log("🪵 onLog - \(message), log-level - \(level.rawValue)")
}
}
65 changes: 41 additions & 24 deletions Sources/DolbyIORTSCore/Manager/SubscriptionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protocol SubscriptionManagerDelegate: AnyObject {
protocol SubscriptionManagerProtocol: AnyObject {
var delegate: SubscriptionManagerDelegate? { get set }

func connect(streamName: String, accountID: String) async -> Bool
func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration) async -> Bool
func startSubscribe() async -> Bool
func stopSubscribe() async -> Bool
func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder)
Expand All @@ -50,6 +50,15 @@ protocol SubscriptionManagerProtocol: AnyObject {
func unprojectAudio(for source: StreamSource)
}

struct SubscriptionConfiguration {
let autoReconnect = false
let videoJitterMinimumDelayMs: UInt = 20
let statsDelayMs: UInt = 1000
let forcePlayoutDelay = false
let disableAudio = false
let rtcEventLogOutputPath: String? = nil
}

final class SubscriptionManager: SubscriptionManagerProtocol {
private enum Defaults {
static let subscribeURL = "https://director.millicast.com/api/director/subscribe"
Expand All @@ -60,12 +69,12 @@ final class SubscriptionManager: SubscriptionManagerProtocol {

weak var delegate: SubscriptionManagerDelegate?

func connect(streamName: String, accountID: String) async -> Bool {
func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration) async -> Bool {
if subscriber != nil {
_ = await stopSubscribe()
}

guard let subscriber = makeSubscriber() else {
guard let subscriber = makeSubscriber(with: configuration) else {
Self.logger.error("💼 Failed to initialise subscriber")
return false
}
Expand Down Expand Up @@ -160,15 +169,15 @@ final class SubscriptionManager: SubscriptionManagerProtocol {
}

func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder) {
Self.logger.debug("💼 Add remote track for source - \(sourceBuilder.sourceId.value ?? "MAIN")")
Self.logger.debug("💼 Add remote track for source - \(sourceBuilder.sourceId)")
sourceBuilder.supportedTrackItems.forEach { subscriber.addRemoteTrack($0.mediaType.rawValue) }
}

func projectVideo(for source: StreamSource, withQuality quality: VideoQuality) {
let videoTrack = source.videoTrack
let matchingVideoQuality = source.lowLevelVideoQualityList.matching(videoQuality: quality)

Self.logger.debug("💼 Project video for source \(String(describing: source.sourceId.value)) with quality - \(String(describing: matchingVideoQuality?.description))")
Self.logger.debug("💼 Project video for source \(source.sourceId) with quality - \(String(describing: matchingVideoQuality?.description))")

let projectionData = MCProjectionData()
projectionData.media = videoTrack.trackInfo.mediaType.rawValue
Expand All @@ -180,13 +189,13 @@ final class SubscriptionManager: SubscriptionManagerProtocol {
}

func unprojectVideo(for source: StreamSource) {
Self.logger.debug("💼 Unproject video for source \(source.sourceId.value ?? "N/A")")
Self.logger.debug("💼 Unproject video for source \(source.sourceId)")
let videoTrack = source.videoTrack
subscriber.unproject([videoTrack.trackInfo.mid])
}

func projectAudio(for source: StreamSource) {
Self.logger.debug("💼 Project audio for source \(source.sourceId.value ?? "N/A")")
Self.logger.debug("💼 Project audio for source \(source.sourceId)")
guard let audioTrack = source.audioTracks.first else {
return
}
Expand All @@ -202,7 +211,7 @@ final class SubscriptionManager: SubscriptionManagerProtocol {
}

func unprojectAudio(for source: StreamSource) {
Self.logger.debug("💼 Unproject audio for source \(source.sourceId.value ?? "N/A")")
Self.logger.debug("💼 Unproject audio for source \(source.sourceId)")
guard let audioTrack = source.audioTracks.first else {
return
}
Expand All @@ -215,15 +224,24 @@ final class SubscriptionManager: SubscriptionManagerProtocol {

private extension SubscriptionManager {

func makeSubscriber() -> MCSubscriber? {
func makeSubscriber(with configuration: SubscriptionConfiguration) -> MCSubscriber? {
let subscriber = MCSubscriber.create()

subscriber?.enableStats(true)

let options = MCClientOptions()
options.autoReconnect = false
options.autoReconnect = configuration.autoReconnect
options.videoJitterMinimumDelayMs = Int32(configuration.videoJitterMinimumDelayMs)
options.statsDelayMs = Int32(configuration.statsDelayMs)
if let rtcEventLogOutputPath = configuration.rtcEventLogOutputPath {
options.rtcEventLogOutputPath = rtcEventLogOutputPath
}
options.disableAudio = configuration.disableAudio
options.forcePlayoutDelay = configuration.forcePlayoutDelay

subscriber?.setOptions(options)

subscriber?.enableStats(true)

return subscriber
}

Expand All @@ -241,7 +259,6 @@ private extension SubscriptionManager {
// MARK: MCSubscriberListener implementation

extension SubscriptionManager: MCSubscriberListener {

func onDisconnected() {
Self.logger.debug("💼 Delegate - onDisconnected")
delegate?.onDisconnected()
Expand All @@ -252,28 +269,28 @@ extension SubscriptionManager: MCSubscriberListener {
delegate?.onSubscribed()
}

func onSubscribedError(_ reason: String!) {
func onSubscribedError(_ reason: String) {
Self.logger.error("💼 Delegate - onSubscribedError \(reason)")
delegate?.onSubscribedError(reason)
}

func onVideoTrack(_ track: MCVideoTrack!, withMid mid: String!) {
func onVideoTrack(_ track: MCVideoTrack, withMid mid: String) {
Self.logger.debug("💼 Delegate - onVideoTrack with mid \(mid)")
delegate?.onVideoTrack(track, withMid: mid)
}

func onAudioTrack(_ track: MCAudioTrack!, withMid mid: String!) {
func onAudioTrack(_ track: MCAudioTrack, withMid mid: String) {
Self.logger.debug("💼 Delegate - onAudioTrack with mid \(mid)")
delegate?.onAudioTrack(track, withMid: mid)
}

func onActive(_ streamId: String!, tracks: [String]!, sourceId: String!) {
Self.logger.debug("💼 Delegate - onActive with sourceId \(sourceId ?? "NULL"), tracks - \(tracks)")
func onActive(_ streamId: String, tracks: [String], sourceId: String) {
Self.logger.debug("💼 Delegate - onActive with sourceId \(sourceId), tracks - \(tracks)")
delegate?.onActive(streamId, tracks: tracks, sourceId: sourceId)
}

func onInactive(_ streamId: String!, sourceId: String!) {
Self.logger.debug("💼 Delegate - onInactive with sourceId \(sourceId ?? "NULL")")
func onInactive(_ streamId: String, sourceId: String) {
Self.logger.debug("💼 Delegate - onInactive with sourceId \(sourceId)")
delegate?.onInactive(streamId, sourceId: sourceId)
}

Expand All @@ -282,11 +299,11 @@ extension SubscriptionManager: MCSubscriberListener {
delegate?.onStopped()
}

func onVad(_ mid: String!, sourceId: String!) {
func onVad(_ mid: String, sourceId: String) {
Self.logger.debug("💼 Delegate - onVad with mid \(mid), sourceId \(sourceId)")
}

func onLayers(_ mid: String!, activeLayers: [MCLayerData]!, inactiveLayers: [MCLayerData]!) {
func onLayers(_ mid: String, activeLayers: [MCLayerData], inactiveLayers: [MCLayerData]) {
Self.logger.debug("💼 Delegate - onLayers for mid - \(mid) with activeLayers \(activeLayers), inactiveLayers \(inactiveLayers)")
delegate?.onLayers(mid, activeLayers: activeLayers, inactiveLayers: inactiveLayers)
}
Expand All @@ -296,17 +313,17 @@ extension SubscriptionManager: MCSubscriberListener {
delegate?.onConnected()
}

func onConnectionError(_ status: Int32, withReason reason: String!) {
func onConnectionError(_ status: Int32, withReason reason: String) {
Self.logger.error("💼 Delegate - onConnectionError")
delegate?.onConnectionError(status, withReason: reason)
}

func onSignalingError(_ message: String!) {
func onSignalingError(_ message: String) {
Self.logger.error("💼 Delegate - onSignalingError")
delegate?.onSignalingError(message)
}

func onStatsReport(_ report: MCStatsReport!) {
func onStatsReport(_ report: MCStatsReport) {
Self.logger.debug("💼 Delegate - onStatsReport")
delegate?.onStatsReport(report)
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/DolbyIORTSCore/Model/StreamSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public struct StreamSource: Equatable, Hashable, Identifiable {

init(id: String?) {
switch id {
case .none:
case .none, .some(""):
self = .main
case let .some(id):
self = .other(sourceId: id)
Expand Down
29 changes: 25 additions & 4 deletions Sources/DolbyIORTSCore/State/State.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
import Foundation
import MillicastSDK

struct VideoTrackAndMid {
let videoTrack: MCVideoTrack
let mid: String
}

struct AudioTrackAndMid {
let audioTrack: MCAudioTrack
let mid: String
}

enum State: CustomStringConvertible {
case disconnected
case connecting
Expand Down Expand Up @@ -39,16 +49,27 @@ struct SubscribedState {
private(set) var streamSourceBuilders: [StreamSourceBuilder]
private(set) var numberOfStreamViewers: Int
private(set) var streamingStats: AllStreamingStatistics?
private(set) var cachedSourceZeroVideoTrackAndMid: VideoTrackAndMid?
private(set) var cachedSourceZeroAudioTrackAndMid: AudioTrackAndMid?

init() {
init(cachedVideoTrackDetail: VideoTrackAndMid?, cachedAudioTrackDetail: AudioTrackAndMid?) {
cachedSourceZeroVideoTrackAndMid = cachedVideoTrackDetail
cachedSourceZeroAudioTrackAndMid = cachedAudioTrackDetail
streamSourceBuilders = []
numberOfStreamViewers = 0
}

mutating func add(streamId: String, sourceId: String?, tracks: [String]) {
streamSourceBuilders.append(
StreamSourceBuilder.init(streamId: streamId, sourceId: sourceId, tracks: tracks)
)
let streamSourceBuilder = StreamSourceBuilder.init(streamId: streamId, sourceId: sourceId, tracks: tracks)
if let videoTrackAndMid = cachedSourceZeroVideoTrackAndMid {
streamSourceBuilder.addVideoTrack(videoTrackAndMid.videoTrack, mid: videoTrackAndMid.mid)
cachedSourceZeroVideoTrackAndMid = nil
}
if let audioTrackAndMid = cachedSourceZeroAudioTrackAndMid {
streamSourceBuilder.addAudioTrack(audioTrackAndMid.audioTrack, mid: audioTrackAndMid.mid)
cachedSourceZeroAudioTrackAndMid = nil
}
streamSourceBuilders.append(streamSourceBuilder)
}

mutating func remove(streamId: String, sourceId: String?) {
Expand Down
14 changes: 13 additions & 1 deletion Sources/DolbyIORTSCore/State/StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ final class StateMachine {

private let stateSubject: PassthroughSubject<State, Never> = PassthroughSubject()
lazy var statePublisher: AnyPublisher<State, Never> = stateSubject.eraseToAnyPublisher()
private(set) var cachedSourceZeroVideoTrackAndMid: VideoTrackAndMid?
private(set) var cachedSourceZeroAudioTrackAndMid: AudioTrackAndMid?

init(initialState: State) {
currentState = initialState
Expand Down Expand Up @@ -69,7 +71,15 @@ final class StateMachine {
}

func onSubscribed() {
currentState = .subscribed(.init())
currentState = .subscribed(
.init(
cachedVideoTrackDetail: cachedSourceZeroVideoTrackAndMid,
cachedAudioTrackDetail: cachedSourceZeroAudioTrackAndMid
)
)
cachedSourceZeroAudioTrackAndMid = nil
cachedSourceZeroAudioTrackAndMid = nil

}

func onSubscribedError(_ reason: String) {
Expand Down Expand Up @@ -115,6 +125,7 @@ final class StateMachine {
currentState = .subscribed(state)

default:
self.cachedSourceZeroVideoTrackAndMid = VideoTrackAndMid(videoTrack: track, mid: mid)
Self.logger.error("🛑 Unexpected state on onVideoTrack - \(self.currentState.description)")
}
}
Expand All @@ -125,6 +136,7 @@ final class StateMachine {
state.addAudioTrack(track, mid: mid)
currentState = .subscribed(state)
default:
self.cachedSourceZeroAudioTrackAndMid = AudioTrackAndMid(audioTrack: track, mid: mid)
Self.logger.error("🛑 Unexpected state on onAudioTrack - \(self.currentState.description)")
}
}
Expand Down
18 changes: 9 additions & 9 deletions Sources/DolbyIORTSCore/StreamOrchestrator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public final actor StreamOrchestrator {
.eraseToAnyPublisher()
private var activeStreamDetail: StreamDetail?
private static var configuration: StreamOrchestrator.Configuration = .init()

private let logHandler: MillicastLoggerHandler = .init()

private init() {
self.init(
subscriptionManager: SubscriptionManager(),
Expand Down Expand Up @@ -69,7 +70,7 @@ public final actor StreamOrchestrator {
Self.logger.debug("👮‍♂️ Start subscribe")

async let startConnectionStateUpdate: Void = stateMachine.startConnection(streamName: streamName, accountID: accountID)
async let startConnection = subscriptionManager.connect(streamName: streamName, accountID: accountID)
async let startConnection = subscriptionManager.connect(streamName: streamName, accountID: accountID, configuration: .init())

let (_, connectionResult) = await (startConnectionStateUpdate, startConnection)
if connectionResult {
Expand Down Expand Up @@ -257,10 +258,10 @@ private extension StreamOrchestrator {
return await subscriptionManager.startSubscribe()
}

func stopAudio(for sourceId: String) {
func stopAudio(for sourceId: String?) {
switch stateSubject.value {
case let .subscribed(sources: sources, numberOfStreamViewers: _):
if let source = sources.first (where: { $0.sourceId.value == sourceId }), source.isPlayingAudio {
if let source = sources.first (where: { $0.sourceId == StreamSource.SourceId(id: sourceId) }), source.isPlayingAudio {
subscriptionManager.unprojectAudio(for: source)
}
default: break
Expand Down Expand Up @@ -367,10 +368,11 @@ extension StreamOrchestrator: SubscriptionManagerDelegate {
let stateMachineState = self.stateMachine.currentState
switch stateMachineState {
case let .subscribed(state):
guard let sourceBuilder = state.streamSourceBuilders.first(where: { $0.sourceId.value == sourceId }) else {
guard let sourceBuilder = state.streamSourceBuilders.first(where: { $0.sourceId == StreamSource.SourceId(id: sourceId) }) else {
return
}
self.subscriptionManager.addRemoteTrack(sourceBuilder)

default:
return
}
Expand All @@ -382,10 +384,8 @@ extension StreamOrchestrator: SubscriptionManagerDelegate {
guard let self = self else { return }

// Unproject audio whose source is inactive
if let sourceId = sourceId {
await self.stopAudio(for: sourceId)
}

await self.stopAudio(for: sourceId)

self.stateMachine.onInactive(streamId, sourceId: sourceId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,6 @@ final class StreamViewModel: ObservableObject {
self.internalState = .error(ErrorViewModel(error: streamError))
case .stopped, .disconnected:
self.internalState = .error(.streamOffline)
default:
// Handle's scenario where there is no sources
self.internalState = .error(.genericError)
}
}
.store(in: &self.subscriptions)
Expand Down

0 comments on commit e5c1212

Please sign in to comment.