From 845cde3248fee789fe6a0a064acae5661f8f9cee Mon Sep 17 00:00:00 2001 From: Ilias Pavlidakis Date: Mon, 23 Sep 2024 14:44:49 +0300 Subject: [PATCH] [Fix]Cancel stage task to avoid unwanted transitions --- .../Stages/WebRTCCoordinator+CleanUp.swift | 6 ++- .../Stages/WebRTCCoordinator+Connecting.swift | 13 +++++ .../WebRTCCoordinator+FastReconnecting.swift | 13 +++-- .../Stages/WebRTCCoordinator+Joined.swift | 49 ++++++++++++++--- .../Stages/WebRTCCoordinator+Joining.swift | 54 ++++++++++++------- .../Stages/WebRTCCoordinator+Leaving.swift | 5 ++ .../Stages/WebRTCCoordinator+Migrated.swift | 9 ++-- .../Stages/WebRTCCoordinator+Migrating.swift | 9 ++-- .../Stages/WebRTCCoordinator+Rejoining.swift | 8 +-- .../Stages/WebRTCCoordinator+Stage.swift | 8 ++- .../XCTestCase+PredicateFulfillment.swift | 39 ++++++++++++++ ...dinatorStateMachine_JoinedStageTests.swift | 31 +++++++++-- 12 files changed, 194 insertions(+), 50 deletions(-) diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+CleanUp.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+CleanUp.swift index 1e4b839f0..948f99fb0 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+CleanUp.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+CleanUp.swift @@ -27,6 +27,8 @@ extension WebRTCCoordinator.StateMachine.Stage { WebRTCCoordinator.StateMachine.Stage, @unchecked Sendable { + private let disposableBag = DisposableBag() + /// Initializes a new instance of `CleanUpStage`. /// - Parameter context: The context for the clean-up stage. init( @@ -59,7 +61,6 @@ extension WebRTCCoordinator.StateMachine.Stage { /// Executes the clean-up process. private func execute() { - context.sfuEventObserver = nil Task { [weak self] in do { guard @@ -71,6 +72,8 @@ extension WebRTCCoordinator.StateMachine.Stage { try Task.checkCancellation() + context.sfuEventObserver = nil + await coordinator.stateAdapter.cleanUp() context = .init(coordinator: context.coordinator) @@ -79,6 +82,7 @@ extension WebRTCCoordinator.StateMachine.Stage { self?.transitionErrorOrLog(error) } } + .store(in: disposableBag) } } } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Connecting.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Connecting.swift index 38287eeda..ba35fc677 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Connecting.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Connecting.swift @@ -45,6 +45,8 @@ extension WebRTCCoordinator.StateMachine.Stage { let ring: Bool let notify: Bool + private let disposableBag = DisposableBag() + /// Initializes a new instance of `ConnectingStage`. /// - Parameters: /// - context: The context for the connecting stage. @@ -129,12 +131,16 @@ extension WebRTCCoordinator.StateMachine.Stage { throw ClientError("WebRTCCoordinator instance not available in stage id:\(id).") } + try Task.checkCancellation() + if updateSession { /// By refreshing the session, we are asking the stateAdapter to update /// the sessionId to a new one. await coordinator.stateAdapter.refreshSession() } + try Task.checkCancellation() + /// The authenticator will fetch a ``JoinCallResponse`` and will use it to /// create an ``SFUAdapter`` instance that we can later use in our flow. let (sfuAdapter, response) = try await context @@ -148,11 +154,15 @@ extension WebRTCCoordinator.StateMachine.Stage { options: options ) + try Task.checkCancellation() + /// We provide the ``SFUAdapter`` to the authenticator which will ensure /// that we will continue only when the WS `connectionState` on the /// ``SFUAdapter`` has changed to `.authenticating`. try await context.authenticator.waitForAuthentication(on: sfuAdapter) + try Task.checkCancellation() + /// With the ``SFUAdapter`` having a `connectionState` to /// `.authenticating`, we store the instance on the ``WebRTCStateAdapter``. await coordinator.stateAdapter.set(sfuAdapter: sfuAdapter) @@ -161,6 +171,8 @@ extension WebRTCCoordinator.StateMachine.Stage { /// during `migration`. context.currentSFU = response.credentials.server.edgeName + try Task.checkCancellation() + /// We are going to transition to the next stage ``.connected``. If that transition /// fail for any reason, we will transition to ``.disconnected`` to allow for /// reconnection. @@ -171,6 +183,7 @@ extension WebRTCCoordinator.StateMachine.Stage { transitionDisconnectOrError(error) } } + .store(in: disposableBag) } } } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+FastReconnecting.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+FastReconnecting.swift index 381dc0da5..121247a7e 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+FastReconnecting.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+FastReconnecting.swift @@ -29,6 +29,8 @@ extension WebRTCCoordinator.StateMachine.Stage { WebRTCCoordinator.StateMachine.Stage, @unchecked Sendable { + private let disposableBag = DisposableBag() + /// Initializes a new instance of `FastReconnectingStage`. /// - Parameter context: The context for the fast reconnecting stage. init( @@ -68,6 +70,8 @@ extension WebRTCCoordinator.StateMachine.Stage { throw ClientError("WebRCTAdapter instance not available.") } + try Task.checkCancellation() + log.debug("Refreshing webSocket", subsystems: .webRTC) sfuAdapter.refresh( webSocketConfiguration: .init( @@ -76,6 +80,8 @@ extension WebRTCCoordinator.StateMachine.Stage { ) ) + try Task.checkCancellation() + log.debug( "Waiting for webSocket state to change to authenticating", subsystems: .webRTC @@ -83,16 +89,13 @@ extension WebRTCCoordinator.StateMachine.Stage { try await context.authenticator.waitForAuthentication(on: sfuAdapter) - try transition?( - .fastReconnected( - context - ) - ) + transitionOrDisconnect(.fastReconnected(context)) } catch { context.reconnectionStrategy = context.nextReconnectionStrategy() transitionDisconnectOrError(error) } } + .store(in: disposableBag) } } } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joined.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joined.swift index c96185b6c..3277f2046 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joined.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joined.swift @@ -80,13 +80,19 @@ extension WebRTCCoordinator.StateMachine.Stage { ) } + try Task.checkCancellation() + // We set the reconnectionStrategy to rejoin as default. context.reconnectionStrategy = .rejoin + try Task.checkCancellation() + let migrationStatusObserver = context.migrationStatusObserver let previousSFUAdapter = context.previousSFUAdapter await cleanUpPreviousSessionIfRequired() + try Task.checkCancellation() + try await observeMigrationStatusIfRequired( migrationStatusObserver, previousSFUAdapter: previousSFUAdapter @@ -94,19 +100,46 @@ extension WebRTCCoordinator.StateMachine.Stage { observeInternetConnection() + try Task.checkCancellation() + await observeForSubscriptionUpdates() + + try Task.checkCancellation() + await observeConnection() + + try Task.checkCancellation() + await observeCallEndedEvent() + + try Task.checkCancellation() + await observeMigrationEvent() + + try Task.checkCancellation() + await observeDisconnectEvent() + + try Task.checkCancellation() + await observePreferredReconnectionStrategy() + + try Task.checkCancellation() + await observeCallSettingsUpdates() + + try Task.checkCancellation() + await observePeerConnectionState() + + try Task.checkCancellation() + await configureStatsCollectionAndDelivery() } catch { transitionDisconnectOrError(error) } } + .store(in: disposableBag) } /// Cleans up the previous session if required. @@ -391,14 +424,18 @@ extension WebRTCCoordinator.StateMachine.Stage { let statsReporter = WebRTCStatsReporter( sessionID: await stateAdapter.sessionID ) + statsReporter.interval = await stateAdapter.statsReporter?.interval ?? 0 + statsReporter.publisher = await stateAdapter.publisher + statsReporter.subscriber = await stateAdapter.subscriber + statsReporter.sfuAdapter = await stateAdapter.sfuAdapter await stateAdapter.set(statsReporter: statsReporter) + } else { + let statsReporter = await stateAdapter.statsReporter + statsReporter?.interval = await stateAdapter.statsReporter?.interval ?? 0 + statsReporter?.publisher = await stateAdapter.publisher + statsReporter?.subscriber = await stateAdapter.subscriber + statsReporter?.sfuAdapter = await stateAdapter.sfuAdapter } - - let statsReporter = await stateAdapter.statsReporter - statsReporter?.interval = await stateAdapter.statsReporter?.interval ?? 0 - statsReporter?.publisher = await stateAdapter.publisher - statsReporter?.subscriber = await stateAdapter.subscriber - statsReporter?.sfuAdapter = await stateAdapter.sfuAdapter } /// Observes internet connection status. diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift index d13028a07..1fde26bb9 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift @@ -28,6 +28,8 @@ extension WebRTCCoordinator.StateMachine.Stage { WebRTCCoordinator.StateMachine.Stage, @unchecked Sendable { + private let disposableBag = DisposableBag() + /// Initializes a new instance of `JoiningStage`. /// - Parameter context: The context for the joining stage. init( @@ -83,10 +85,14 @@ extension WebRTCCoordinator.StateMachine.Stage { ) } + try Task.checkCancellation() + if !isFastReconnecting { try await coordinator.stateAdapter.configurePeerConnections() } + try Task.checkCancellation() + await sfuAdapter.sendJoinRequest( WebRTCJoinRequestFactory() .buildRequest( @@ -103,21 +109,21 @@ extension WebRTCCoordinator.StateMachine.Stage { ) ) + try Task.checkCancellation() + try await join( coordinator: coordinator, sfuAdapter: sfuAdapter ) + try Task.checkCancellation() + if isFastReconnecting { await coordinator.stateAdapter.publisher?.restartICE() await coordinator.stateAdapter.subscriber?.restartICE() } - try transition?( - .joined( - context - ) - ) + transitionOrDisconnect(.joined(context)) } catch { context.reconnectionStrategy = context .reconnectionStrategy @@ -125,6 +131,7 @@ extension WebRTCCoordinator.StateMachine.Stage { transitionDisconnectOrError(error) } } + .store(in: disposableBag) } /// Executes the migration process. @@ -142,8 +149,12 @@ extension WebRTCCoordinator.StateMachine.Stage { ) } + try Task.checkCancellation() + try await coordinator.stateAdapter.configurePeerConnections() + try Task.checkCancellation() + await sfuAdapter.sendJoinRequest( WebRTCJoinRequestFactory() .buildRequest( @@ -162,24 +173,20 @@ extension WebRTCCoordinator.StateMachine.Stage { context.reconnectAttempts += 1 + try Task.checkCancellation() + try await join( coordinator: coordinator, sfuAdapter: sfuAdapter ) - try transition?( - .joined( - context - ) - ) + transitionOrDisconnect(.joined(context)) } catch { - if let clientError = error as? ClientError { - log.error(clientError) - } context.reconnectionStrategy = .rejoin transitionDisconnectOrError(error) } } + .store(in: disposableBag) } /// Executes the rejoining process. @@ -198,8 +205,12 @@ extension WebRTCCoordinator.StateMachine.Stage { ) } + try Task.checkCancellation() + try await coordinator.stateAdapter.configurePeerConnections() + try Task.checkCancellation() + await sfuAdapter.sendJoinRequest( WebRTCJoinRequestFactory() .buildRequest( @@ -217,20 +228,19 @@ extension WebRTCCoordinator.StateMachine.Stage { ) context.reconnectAttempts += 1 + try Task.checkCancellation() + try await join( coordinator: coordinator, sfuAdapter: sfuAdapter ) - try transition?( - .joined( - context - ) - ) + transitionOrDisconnect(.joined(context)) } catch { transitionDisconnectOrError(error) } } + .store(in: disposableBag) } /// Builds the subscriber session description. @@ -286,10 +296,14 @@ extension WebRTCCoordinator.StateMachine.Stage { ) } + try Task.checkCancellation() + let joinResponse = try await sfuAdapter .publisher(eventType: Stream_Video_Sfu_Event_JoinResponse.self) .nextValue(timeout: WebRTCConfiguration.timeout.join) + try Task.checkCancellation() + try await coordinator .stateAdapter .publisher? @@ -301,6 +315,8 @@ extension WebRTCCoordinator.StateMachine.Stage { joinResponse.fastReconnectDeadlineSeconds ) + try Task.checkCancellation() + try await context.authenticator.waitForConnect(on: sfuAdapter) let participants = joinResponse @@ -309,6 +325,8 @@ extension WebRTCCoordinator.StateMachine.Stage { .map { $0.toCallParticipant() } .reduce(into: [String: CallParticipant]()) { $0[$1.sessionId] = $1 } + try Task.checkCancellation() + await coordinator .stateAdapter .didUpdateParticipants(participants) diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Leaving.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Leaving.swift index 4f714b12f..bef986875 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Leaving.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Leaving.swift @@ -27,6 +27,8 @@ extension WebRTCCoordinator.StateMachine.Stage { WebRTCCoordinator.StateMachine.Stage, @unchecked Sendable { + private let disposableBag = DisposableBag() + /// Initializes a new instance of `LeavingStage`. /// - Parameter context: The context for the leaving stage. init( @@ -64,6 +66,8 @@ extension WebRTCCoordinator.StateMachine.Stage { throw ClientError("WebRCTAdapter instance not available.") } + try Task.checkCancellation() + if let sfuAdapter = await coordinator.stateAdapter.sfuAdapter { if case .connected = sfuAdapter.connectionState { await sfuAdapter.sendLeaveRequest( @@ -78,6 +82,7 @@ extension WebRTCCoordinator.StateMachine.Stage { transitionErrorOrLog(error) } } + .store(in: disposableBag) } } } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrated.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrated.swift index 463fc0056..7bbc8d7c1 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrated.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrated.swift @@ -27,6 +27,8 @@ extension WebRTCCoordinator.StateMachine.Stage { WebRTCCoordinator.StateMachine.Stage, @unchecked Sendable { + private let disposableBag = DisposableBag() + /// Initializes a new instance of `MigratedStage`. /// - Parameter context: The context for the migrated stage. init( @@ -107,15 +109,12 @@ extension WebRTCCoordinator.StateMachine.Stage { context.migrationStatusObserver = nil } - try transition?( - .joining( - context - ) - ) + transitionOrDisconnect(.joining(context)) } catch { transitionDisconnectOrError(error) } } + .store(in: disposableBag) } } } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrating.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrating.swift index f72e55179..b7ca0faba 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrating.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Migrating.swift @@ -25,6 +25,8 @@ extension WebRTCCoordinator.StateMachine.Stage { WebRTCCoordinator.StateMachine.Stage, @unchecked Sendable { + private let disposableBag = DisposableBag() + /// Initializes a new instance of `MigratingStage`. /// - Parameter context: The context for the migrating stage. init( @@ -85,15 +87,12 @@ extension WebRTCCoordinator.StateMachine.Stage { .stateAdapter .cleanUpForReconnection() - try transition?( - .migrated( - context - ) - ) + transitionOrDisconnect(.migrated(context)) } catch { transitionDisconnectOrError(error) } } + .store(in: disposableBag) return self default: return nil diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Rejoining.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Rejoining.swift index 8524f8ffc..810889f2e 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Rejoining.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Rejoining.swift @@ -107,7 +107,7 @@ extension WebRTCCoordinator.StateMachine.Stage { try Task.checkCancellation() - try transition?( + transitionOrDisconnect( .connecting( context, create: false, @@ -117,11 +117,7 @@ extension WebRTCCoordinator.StateMachine.Stage { ) ) } catch { - if error is CancellationError { - /* No-op */ - } else { - transitionDisconnectOrError(error) - } + transitionDisconnectOrError(error) } } .store(in: disposableBag) diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Stage.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Stage.swift index 8cbf65efd..d06314994 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Stage.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Stage.swift @@ -94,7 +94,13 @@ extension WebRTCCoordinator.StateMachine { /// Attempts to transition to a disconnected state or an error state. /// - Parameter error: The error that occurred. - func transitionDisconnectOrError(_ error: Error) { + func transitionDisconnectOrError( + _ error: Error, + ignoresCancellationError: Bool = true + ) { + guard (error as? CancellationError) == nil else { + return + } context.flowError = error transitionOrError(.disconnected(context)) } diff --git a/StreamVideoTests/Utilities/Extensions/XCTestCase+PredicateFulfillment.swift b/StreamVideoTests/Utilities/Extensions/XCTestCase+PredicateFulfillment.swift index 493321791..0788c1ccd 100644 --- a/StreamVideoTests/Utilities/Extensions/XCTestCase+PredicateFulfillment.swift +++ b/StreamVideoTests/Utilities/Extensions/XCTestCase+PredicateFulfillment.swift @@ -2,6 +2,7 @@ // Copyright © 2024 Stream.io Inc. All rights reserved. // +import Combine import Foundation import XCTest @@ -31,6 +32,44 @@ extension XCTestCase { XCTAssertTrue(block(), message(), file: file, line: line) } + @MainActor + func fulfillment( + timeout: TimeInterval = defaultTimeout, + _ message: @autoclosure () -> String = "", + file: StaticString = #file, + line: UInt = #line, + block: @MainActor @Sendable @escaping () async -> Bool + ) async { + let stepInterval = 0.1 + var maxIterations = Int(timeout / stepInterval) + var iterations = 0 + var cancellable: AnyCancellable? + let waitExpectation = expectation(description: "Wait for completion.") + cancellable = Foundation + .Timer + .publish(every: stepInterval, on: .main, in: .default) + .autoconnect() + .sink { _ in + Task { + defer { iterations += 1 } + guard await block() || iterations > maxIterations else { + return + } + cancellable?.cancel() + waitExpectation.fulfill() + } + } + + await safeFulfillment( + of: [waitExpectation], + timeout: timeout, + file: file, + line: line + ) + let value = await block() + XCTAssertTrue(value, message(), file: file, line: line) + } + @MainActor func safeFulfillment( of expectations: [XCTestExpectation], diff --git a/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoinedStageTests.swift b/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoinedStageTests.swift index 7a4add0ca..e89173898 100644 --- a/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoinedStageTests.swift +++ b/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoinedStageTests.swift @@ -566,7 +566,7 @@ final class WebRTCCoordinatorStateMachine_JoinedStageTests: XCTestCase, @uncheck // MARK: configureStatsCollectionAndDelivery - func test_transition_configuresStatsReporter() async throws { + func test_transition_sameSessionId_configuresStatsReporter() async throws { let stateAdapter = mockCoordinatorStack.coordinator.stateAdapter let sfuAdapter = mockCoordinatorStack.sfuStack.adapter await stateAdapter.set(sfuAdapter: sfuAdapter) @@ -574,7 +574,7 @@ final class WebRTCCoordinatorStateMachine_JoinedStageTests: XCTestCase, @uncheck try await stateAdapter.configurePeerConnections() let publisher = await stateAdapter.publisher let subscriber = await stateAdapter.subscriber - let initialStatsReporter = WebRTCStatsReporter(interval: 10, sessionID: sessionId) + let initialStatsReporter = WebRTCStatsReporter(interval: 12, sessionID: sessionId) await stateAdapter.set(statsReporter: initialStatsReporter) subject.context.coordinator = mockCoordinatorStack.coordinator @@ -582,7 +582,32 @@ final class WebRTCCoordinatorStateMachine_JoinedStageTests: XCTestCase, @uncheck await wait(for: 1) let newStatsReporter = await stateAdapter.statsReporter - XCTAssertEqual(newStatsReporter?.interval, 10) + XCTAssertEqual(newStatsReporter?.interval, 12) + XCTAssertTrue(newStatsReporter?.publisher === publisher) + XCTAssertTrue(newStatsReporter?.subscriber === subscriber) + XCTAssertTrue(newStatsReporter?.sfuAdapter === sfuAdapter) + } + + func test_transition_differentSessionId_configuresStatsReporter() async throws { + let stateAdapter = mockCoordinatorStack.coordinator.stateAdapter + let sfuAdapter = mockCoordinatorStack.sfuStack.adapter + await stateAdapter.set(sfuAdapter: sfuAdapter) + let sessionId = await stateAdapter.sessionID + try await stateAdapter.configurePeerConnections() + let publisher = await stateAdapter.publisher + let subscriber = await stateAdapter.subscriber + let initialStatsReporter = WebRTCStatsReporter(interval: 11, sessionID: .unique) + await stateAdapter.set(statsReporter: initialStatsReporter) + subject.context.coordinator = mockCoordinatorStack.coordinator + + _ = subject.transition(from: .joining(subject.context)) + + await fulfillment { + let newStatsReporter = await stateAdapter.statsReporter + return newStatsReporter !== initialStatsReporter && newStatsReporter?.interval == 11 + } + let newStatsReporter = await stateAdapter.statsReporter + XCTAssertEqual(newStatsReporter?.interval, 11) XCTAssertTrue(newStatsReporter?.publisher === publisher) XCTAssertTrue(newStatsReporter?.subscriber === subscriber) XCTAssertTrue(newStatsReporter?.sfuAdapter === sfuAdapter)