Skip to content

Commit

Permalink
[Fix]Cancel stage task to avoid unwanted transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
ipavlidakis committed Sep 23, 2024
1 parent 8a4cdbf commit 845cde3
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
WebRTCCoordinator.StateMachine.Stage,
@unchecked Sendable
{
private let disposableBag = DisposableBag()

/// Initializes a new instance of `CleanUpStage`.
/// - Parameter context: The context for the clean-up stage.
init(
Expand Down Expand Up @@ -59,7 +61,6 @@ extension WebRTCCoordinator.StateMachine.Stage {

/// Executes the clean-up process.
private func execute() {
context.sfuEventObserver = nil
Task { [weak self] in
do {
guard
Expand All @@ -71,6 +72,8 @@ extension WebRTCCoordinator.StateMachine.Stage {

try Task.checkCancellation()

context.sfuEventObserver = nil

await coordinator.stateAdapter.cleanUp()
context = .init(coordinator: context.coordinator)

Expand All @@ -79,6 +82,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
self?.transitionErrorOrLog(error)
}
}
.store(in: disposableBag)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
let ring: Bool
let notify: Bool

private let disposableBag = DisposableBag()

/// Initializes a new instance of `ConnectingStage`.
/// - Parameters:
/// - context: The context for the connecting stage.
Expand Down Expand Up @@ -129,12 +131,16 @@ extension WebRTCCoordinator.StateMachine.Stage {
throw ClientError("WebRTCCoordinator instance not available in stage id:\(id).")
}

try Task.checkCancellation()

if updateSession {
/// By refreshing the session, we are asking the stateAdapter to update
/// the sessionId to a new one.
await coordinator.stateAdapter.refreshSession()
}

try Task.checkCancellation()

/// The authenticator will fetch a ``JoinCallResponse`` and will use it to
/// create an ``SFUAdapter`` instance that we can later use in our flow.
let (sfuAdapter, response) = try await context
Expand All @@ -148,11 +154,15 @@ extension WebRTCCoordinator.StateMachine.Stage {
options: options
)

try Task.checkCancellation()

/// We provide the ``SFUAdapter`` to the authenticator which will ensure
/// that we will continue only when the WS `connectionState` on the
/// ``SFUAdapter`` has changed to `.authenticating`.
try await context.authenticator.waitForAuthentication(on: sfuAdapter)

try Task.checkCancellation()

/// With the ``SFUAdapter`` having a `connectionState` to
/// `.authenticating`, we store the instance on the ``WebRTCStateAdapter``.
await coordinator.stateAdapter.set(sfuAdapter: sfuAdapter)
Expand All @@ -161,6 +171,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
/// during `migration`.
context.currentSFU = response.credentials.server.edgeName

try Task.checkCancellation()

/// We are going to transition to the next stage ``.connected``. If that transition
/// fail for any reason, we will transition to ``.disconnected`` to allow for
/// reconnection.
Expand All @@ -171,6 +183,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
transitionDisconnectOrError(error)
}
}
.store(in: disposableBag)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
WebRTCCoordinator.StateMachine.Stage,
@unchecked Sendable
{
private let disposableBag = DisposableBag()

/// Initializes a new instance of `FastReconnectingStage`.
/// - Parameter context: The context for the fast reconnecting stage.
init(
Expand Down Expand Up @@ -68,6 +70,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
throw ClientError("WebRCTAdapter instance not available.")
}

try Task.checkCancellation()

log.debug("Refreshing webSocket", subsystems: .webRTC)
sfuAdapter.refresh(
webSocketConfiguration: .init(
Expand All @@ -76,23 +80,22 @@ extension WebRTCCoordinator.StateMachine.Stage {
)
)

try Task.checkCancellation()

log.debug(
"Waiting for webSocket state to change to authenticating",
subsystems: .webRTC
)

try await context.authenticator.waitForAuthentication(on: sfuAdapter)

try transition?(
.fastReconnected(
context
)
)
transitionOrDisconnect(.fastReconnected(context))
} catch {
context.reconnectionStrategy = context.nextReconnectionStrategy()
transitionDisconnectOrError(error)
}
}
.store(in: disposableBag)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,66 @@ extension WebRTCCoordinator.StateMachine.Stage {
)
}

try Task.checkCancellation()

// We set the reconnectionStrategy to rejoin as default.
context.reconnectionStrategy = .rejoin

try Task.checkCancellation()

let migrationStatusObserver = context.migrationStatusObserver
let previousSFUAdapter = context.previousSFUAdapter
await cleanUpPreviousSessionIfRequired()

try Task.checkCancellation()

try await observeMigrationStatusIfRequired(
migrationStatusObserver,
previousSFUAdapter: previousSFUAdapter
)

observeInternetConnection()

try Task.checkCancellation()

await observeForSubscriptionUpdates()

try Task.checkCancellation()

await observeConnection()

try Task.checkCancellation()

await observeCallEndedEvent()

try Task.checkCancellation()

await observeMigrationEvent()

try Task.checkCancellation()

await observeDisconnectEvent()

try Task.checkCancellation()

await observePreferredReconnectionStrategy()

try Task.checkCancellation()

await observeCallSettingsUpdates()

try Task.checkCancellation()

await observePeerConnectionState()

try Task.checkCancellation()

await configureStatsCollectionAndDelivery()
} catch {
transitionDisconnectOrError(error)
}
}
.store(in: disposableBag)
}

/// Cleans up the previous session if required.
Expand Down Expand Up @@ -391,14 +424,18 @@ extension WebRTCCoordinator.StateMachine.Stage {
let statsReporter = WebRTCStatsReporter(
sessionID: await stateAdapter.sessionID
)
statsReporter.interval = await stateAdapter.statsReporter?.interval ?? 0
statsReporter.publisher = await stateAdapter.publisher
statsReporter.subscriber = await stateAdapter.subscriber
statsReporter.sfuAdapter = await stateAdapter.sfuAdapter
await stateAdapter.set(statsReporter: statsReporter)
} else {
let statsReporter = await stateAdapter.statsReporter
statsReporter?.interval = await stateAdapter.statsReporter?.interval ?? 0
statsReporter?.publisher = await stateAdapter.publisher
statsReporter?.subscriber = await stateAdapter.subscriber
statsReporter?.sfuAdapter = await stateAdapter.sfuAdapter
}

let statsReporter = await stateAdapter.statsReporter
statsReporter?.interval = await stateAdapter.statsReporter?.interval ?? 0
statsReporter?.publisher = await stateAdapter.publisher
statsReporter?.subscriber = await stateAdapter.subscriber
statsReporter?.sfuAdapter = await stateAdapter.sfuAdapter
}

/// Observes internet connection status.
Expand Down
Loading

0 comments on commit 845cde3

Please sign in to comment.