Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix]Cancel running tasks on stages when transitioning away #536

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
WebRTCCoordinator.StateMachine.Stage,
@unchecked Sendable
{
private let disposableBag = DisposableBag()

/// Initializes a new instance of `CleanUpStage`.
/// - Parameter context: The context for the clean-up stage.
init(
Expand Down Expand Up @@ -59,7 +61,6 @@ extension WebRTCCoordinator.StateMachine.Stage {

/// Executes the clean-up process.
private func execute() {
context.sfuEventObserver = nil
Task { [weak self] in
do {
guard
Expand All @@ -71,6 +72,8 @@ extension WebRTCCoordinator.StateMachine.Stage {

try Task.checkCancellation()

context.sfuEventObserver = nil

await coordinator.stateAdapter.cleanUp()
context = .init(coordinator: context.coordinator)

Expand All @@ -79,6 +82,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
self?.transitionErrorOrLog(error)
}
}
.store(in: disposableBag)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
let ring: Bool
let notify: Bool

private let disposableBag = DisposableBag()

/// Initializes a new instance of `ConnectingStage`.
/// - Parameters:
/// - context: The context for the connecting stage.
Expand Down Expand Up @@ -129,12 +131,16 @@ extension WebRTCCoordinator.StateMachine.Stage {
throw ClientError("WebRTCCoordinator instance not available in stage id:\(id).")
}

try Task.checkCancellation()

if updateSession {
/// By refreshing the session, we are asking the stateAdapter to update
/// the sessionId to a new one.
await coordinator.stateAdapter.refreshSession()
}

try Task.checkCancellation()

/// The authenticator will fetch a ``JoinCallResponse`` and will use it to
/// create an ``SFUAdapter`` instance that we can later use in our flow.
let (sfuAdapter, response) = try await context
Expand All @@ -148,11 +154,15 @@ extension WebRTCCoordinator.StateMachine.Stage {
options: options
)

try Task.checkCancellation()

/// We provide the ``SFUAdapter`` to the authenticator which will ensure
/// that we will continue only when the WS `connectionState` on the
/// ``SFUAdapter`` has changed to `.authenticating`.
try await context.authenticator.waitForAuthentication(on: sfuAdapter)

try Task.checkCancellation()

/// With the ``SFUAdapter`` having a `connectionState` to
/// `.authenticating`, we store the instance on the ``WebRTCStateAdapter``.
await coordinator.stateAdapter.set(sfuAdapter: sfuAdapter)
Expand All @@ -161,6 +171,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
/// during `migration`.
context.currentSFU = response.credentials.server.edgeName

try Task.checkCancellation()

/// We are going to transition to the next stage ``.connected``. If that transition
/// fail for any reason, we will transition to ``.disconnected`` to allow for
/// reconnection.
Expand All @@ -171,6 +183,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
transitionDisconnectOrError(error)
}
}
.store(in: disposableBag)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
WebRTCCoordinator.StateMachine.Stage,
@unchecked Sendable
{
private let disposableBag = DisposableBag()

/// Initializes a new instance of `FastReconnectingStage`.
/// - Parameter context: The context for the fast reconnecting stage.
init(
Expand Down Expand Up @@ -68,6 +70,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
throw ClientError("WebRCTAdapter instance not available.")
}

try Task.checkCancellation()

log.debug("Refreshing webSocket", subsystems: .webRTC)
sfuAdapter.refresh(
webSocketConfiguration: .init(
Expand All @@ -76,23 +80,22 @@ extension WebRTCCoordinator.StateMachine.Stage {
)
)

try Task.checkCancellation()

log.debug(
"Waiting for webSocket state to change to authenticating",
subsystems: .webRTC
)

try await context.authenticator.waitForAuthentication(on: sfuAdapter)

try transition?(
.fastReconnected(
context
)
)
transitionOrDisconnect(.fastReconnected(context))
} catch {
context.reconnectionStrategy = context.nextReconnectionStrategy()
transitionDisconnectOrError(error)
}
}
.store(in: disposableBag)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,66 @@ extension WebRTCCoordinator.StateMachine.Stage {
)
}

try Task.checkCancellation()

// We set the reconnectionStrategy to rejoin as default.
context.reconnectionStrategy = .rejoin

try Task.checkCancellation()

let migrationStatusObserver = context.migrationStatusObserver
let previousSFUAdapter = context.previousSFUAdapter
await cleanUpPreviousSessionIfRequired()

try Task.checkCancellation()

try await observeMigrationStatusIfRequired(
migrationStatusObserver,
previousSFUAdapter: previousSFUAdapter
)

observeInternetConnection()

try Task.checkCancellation()

await observeForSubscriptionUpdates()

try Task.checkCancellation()

await observeConnection()

try Task.checkCancellation()

await observeCallEndedEvent()

try Task.checkCancellation()

await observeMigrationEvent()

try Task.checkCancellation()

await observeDisconnectEvent()

try Task.checkCancellation()

await observePreferredReconnectionStrategy()

try Task.checkCancellation()

await observeCallSettingsUpdates()

try Task.checkCancellation()

await observePeerConnectionState()

try Task.checkCancellation()

await configureStatsCollectionAndDelivery()
} catch {
transitionDisconnectOrError(error)
}
}
.store(in: disposableBag)
}

/// Cleans up the previous session if required.
Expand Down Expand Up @@ -391,14 +424,18 @@ extension WebRTCCoordinator.StateMachine.Stage {
let statsReporter = WebRTCStatsReporter(
sessionID: await stateAdapter.sessionID
)
statsReporter.interval = await stateAdapter.statsReporter?.interval ?? 0
statsReporter.publisher = await stateAdapter.publisher
statsReporter.subscriber = await stateAdapter.subscriber
statsReporter.sfuAdapter = await stateAdapter.sfuAdapter
await stateAdapter.set(statsReporter: statsReporter)
} else {
let statsReporter = await stateAdapter.statsReporter
statsReporter?.interval = await stateAdapter.statsReporter?.interval ?? 0
statsReporter?.publisher = await stateAdapter.publisher
statsReporter?.subscriber = await stateAdapter.subscriber
statsReporter?.sfuAdapter = await stateAdapter.sfuAdapter
}

let statsReporter = await stateAdapter.statsReporter
statsReporter?.interval = await stateAdapter.statsReporter?.interval ?? 0
statsReporter?.publisher = await stateAdapter.publisher
statsReporter?.subscriber = await stateAdapter.subscriber
statsReporter?.sfuAdapter = await stateAdapter.sfuAdapter
}

/// Observes internet connection status.
Expand Down
Loading
Loading