From c58345b593407de11826ba1617485cbaa810a483 Mon Sep 17 00:00:00 2001 From: Gulzar Date: Wed, 31 Jul 2024 16:59:20 +0400 Subject: [PATCH] feat: add support for sfu migration --- .../src/connection/HMSConnection.ts | 2 +- .../connection/publish/publishConnection.ts | 6 +- .../subscribe/subscribeConnection.ts | 4 +- .../src/interfaces/update-listener.ts | 1 + .../src/media/streams/HMSLocalStream.ts | 4 + .../src/media/tracks/HMSLocalAudioTrack.ts | 13 +- .../src/media/tracks/HMSLocalVideoTrack.ts | 13 +- .../HMSNotificationMethod.ts | 1 + .../notification-manager/HMSNotifications.ts | 4 + .../NotificationManager.ts | 5 + .../managers/TrackManager.ts | 1 + .../notification-manager.test.ts | 29 +- .../src/reactive-store/HMSSDKActions.ts | 4 + .../src/sdk/LocalTrackManager.test.ts | 12 +- packages/hms-video-store/src/sdk/index.ts | 4 + .../hms-video-store/src/sdk/store/Store.ts | 4 + .../src/signal/jsonrpc/index.ts | 14 +- .../hms-video-store/src/transport/index.ts | 518 +++++++++++------- .../hms-video-store/src/utils/constants.ts | 2 +- .../Prebuilt/components/AppData/AppData.tsx | 2 +- 20 files changed, 404 insertions(+), 239 deletions(-) diff --git a/packages/hms-video-store/src/connection/HMSConnection.ts b/packages/hms-video-store/src/connection/HMSConnection.ts index aa8619b67d..265dc4d1bb 100644 --- a/packages/hms-video-store/src/connection/HMSConnection.ts +++ b/packages/hms-video-store/src/connection/HMSConnection.ts @@ -198,7 +198,7 @@ export default abstract class HMSConnection { return await this.nativeConnection.getStats(); } - async close() { + close() { this.nativeConnection.close(); } diff --git a/packages/hms-video-store/src/connection/publish/publishConnection.ts b/packages/hms-video-store/src/connection/publish/publishConnection.ts index 042410914c..422c7eb8c8 100644 --- a/packages/hms-video-store/src/connection/publish/publishConnection.ts +++ b/packages/hms-video-store/src/connection/publish/publishConnection.ts @@ -32,7 +32,6 @@ export default class HMSPublishConnection extends HMSConnection { this.observer.onIceConnectionChange(this.nativeConnection.iceConnectionState); }; - // @TODO(eswar): Remove this. Use iceconnectionstate change with interval and threshold. this.nativeConnection.onconnectionstatechange = () => { this.observer.onConnectionStateChange(this.nativeConnection.connectionState); @@ -51,6 +50,11 @@ export default class HMSPublishConnection extends HMSConnection { }; } + close() { + super.close(); + this.channel.close(); + } + initAfterJoin() { this.nativeConnection.onnegotiationneeded = async () => { HMSLogger.d(this.TAG, `onnegotiationneeded`); diff --git a/packages/hms-video-store/src/connection/subscribe/subscribeConnection.ts b/packages/hms-video-store/src/connection/subscribe/subscribeConnection.ts index 2758b7f290..3f95f886b2 100644 --- a/packages/hms-video-store/src/connection/subscribe/subscribeConnection.ts +++ b/packages/hms-video-store/src/connection/subscribe/subscribeConnection.ts @@ -154,8 +154,8 @@ export default class HMSSubscribeConnection extends HMSConnection { return this.sendMessage(request, id); } - async close() { - await super.close(); + close() { + super.close(); this.apiChannel?.close(); } diff --git a/packages/hms-video-store/src/interfaces/update-listener.ts b/packages/hms-video-store/src/interfaces/update-listener.ts index 315713cb34..adbe9973ad 100644 --- a/packages/hms-video-store/src/interfaces/update-listener.ts +++ b/packages/hms-video-store/src/interfaces/update-listener.ts @@ -83,6 +83,7 @@ export interface HMSUpdateListener extends DeviceChangeListener, SessionStoreLis onError(error: HMSException): void; onReconnecting(error: HMSException): void; onReconnected(): void; + onSFUMigration?: () => void; onRoleChangeRequest(request: HMSRoleChangeRequest): void; onRoleUpdate(newRole: string): void; onChangeTrackStateRequest(request: HMSChangeTrackStateRequest): void; diff --git a/packages/hms-video-store/src/media/streams/HMSLocalStream.ts b/packages/hms-video-store/src/media/streams/HMSLocalStream.ts index 6b0b551e87..45a4e19e55 100644 --- a/packages/hms-video-store/src/media/streams/HMSLocalStream.ts +++ b/packages/hms-video-store/src/media/streams/HMSLocalStream.ts @@ -15,6 +15,10 @@ export class HMSLocalStream extends HMSMediaStream { this.connection = connection; } + clone() { + return new HMSLocalStream(this.nativeStream.clone()); + } + addTransceiver(track: HMSLocalTrack, simulcastLayers: SimulcastLayer[]) { const transceiver = this.connection!.addTransceiver(track.getTrackBeingSent(), { streams: [this.nativeStream], diff --git a/packages/hms-video-store/src/media/tracks/HMSLocalAudioTrack.ts b/packages/hms-video-store/src/media/tracks/HMSLocalAudioTrack.ts index c0e6a99fc6..713bbc5496 100644 --- a/packages/hms-video-store/src/media/tracks/HMSLocalAudioTrack.ts +++ b/packages/hms-video-store/src/media/tracks/HMSLocalAudioTrack.ts @@ -46,7 +46,7 @@ export class HMSLocalAudioTrack extends HMSAudioTrack { source: string, private eventBus: EventBus, settings: HMSAudioTrackSettings = new HMSAudioTrackSettingsBuilder().build(), - room?: Room, + private room?: Room, ) { super(stream, track, source); stream.tracks.push(this); @@ -64,6 +64,17 @@ export class HMSLocalAudioTrack extends HMSAudioTrack { } } + clone(stream?: HMSLocalStream) { + return new HMSLocalAudioTrack( + stream || (this.stream as HMSLocalStream).clone(), + this.nativeTrack.clone(), + this.source!, + this.eventBus, + this.settings, + this.room, + ); + } + getManuallySelectedDeviceId() { return this.manuallySelectedDeviceId; } diff --git a/packages/hms-video-store/src/media/tracks/HMSLocalVideoTrack.ts b/packages/hms-video-store/src/media/tracks/HMSLocalVideoTrack.ts index 758881a3d0..faa7537dd8 100644 --- a/packages/hms-video-store/src/media/tracks/HMSLocalVideoTrack.ts +++ b/packages/hms-video-store/src/media/tracks/HMSLocalVideoTrack.ts @@ -70,7 +70,7 @@ export class HMSLocalVideoTrack extends HMSVideoTrack { source: string, private eventBus: EventBus, settings: HMSVideoTrackSettings = new HMSVideoTrackSettingsBuilder().build(), - room?: Room, + private room?: Room, ) { super(stream, track, source); stream.tracks.push(this); @@ -89,6 +89,17 @@ export class HMSLocalVideoTrack extends HMSVideoTrack { } } + clone(stream?: HMSLocalStream) { + return new HMSLocalVideoTrack( + stream || (this.stream as HMSLocalStream).clone(), + this.nativeTrack.clone(), + this.source!, + this.eventBus, + this.settings, + this.room, + ); + } + /** @internal */ setSimulcastDefinitons(definitions: HMSSimulcastLayerDefinition[]) { this._layerDefinitions = definitions; diff --git a/packages/hms-video-store/src/notification-manager/HMSNotificationMethod.ts b/packages/hms-video-store/src/notification-manager/HMSNotificationMethod.ts index 28616e29da..c5a287dde2 100644 --- a/packages/hms-video-store/src/notification-manager/HMSNotificationMethod.ts +++ b/packages/hms-video-store/src/notification-manager/HMSNotificationMethod.ts @@ -35,5 +35,6 @@ export enum HMSNotificationMethod { POLL_STATS = 'on-poll-stats', ROOM_INFO = 'room-info', SESSION_INFO = 'session-info', + NODE_INFO = 'node-info', WHITEBOARD_UPDATE = 'on-whiteboard-update', } diff --git a/packages/hms-video-store/src/notification-manager/HMSNotifications.ts b/packages/hms-video-store/src/notification-manager/HMSNotifications.ts index 9350179bc2..11fffbb254 100644 --- a/packages/hms-video-store/src/notification-manager/HMSNotifications.ts +++ b/packages/hms-video-store/src/notification-manager/HMSNotifications.ts @@ -393,3 +393,7 @@ export interface WhiteboardInfo { state?: string; attributes?: Array<{ name: string; value: unknown }>; } + +export interface NodeInfo { + sfu_node_id: string; +} diff --git a/packages/hms-video-store/src/notification-manager/NotificationManager.ts b/packages/hms-video-store/src/notification-manager/NotificationManager.ts index cb6922b20a..64fcf9c314 100644 --- a/packages/hms-video-store/src/notification-manager/NotificationManager.ts +++ b/packages/hms-video-store/src/notification-manager/NotificationManager.ts @@ -14,6 +14,7 @@ import { WhiteboardManager } from './managers/WhiteboardManager'; import { HMSNotificationMethod } from './HMSNotificationMethod'; import { ConnectionQualityList, + NodeInfo, OnTrackLayerUpdateNotification, PolicyParams, SpeakerList, @@ -168,6 +169,10 @@ export class NotificationManager { this.policyChangeManager.handlePolicyChange(notification as PolicyParams); break; + case HMSNotificationMethod.NODE_INFO: + this.transport.setSFUNodeId((notification as NodeInfo).sfu_node_id); + break; + default: break; } diff --git a/packages/hms-video-store/src/notification-manager/managers/TrackManager.ts b/packages/hms-video-store/src/notification-manager/managers/TrackManager.ts index f84d7a2b5a..bd8c9077ab 100644 --- a/packages/hms-video-store/src/notification-manager/managers/TrackManager.ts +++ b/packages/hms-video-store/src/notification-manager/managers/TrackManager.ts @@ -205,6 +205,7 @@ export class TrackManager { track.type === HMSTrackType.AUDIO ? this.eventBus.audioTrackAdded.publish({ track: track as HMSRemoteAudioTrack, peer: hmsPeer as HMSRemotePeer }) : this.listener?.onTrackUpdate(HMSTrackUpdate.TRACK_ADDED, track, hmsPeer); + this.store.removeTrackState(track.trackId); this.tracksToProcess.delete(track.trackId); }); }; diff --git a/packages/hms-video-store/src/notification-manager/notification-manager.test.ts b/packages/hms-video-store/src/notification-manager/notification-manager.test.ts index b1344e1f8b..8b33228517 100644 --- a/packages/hms-video-store/src/notification-manager/notification-manager.test.ts +++ b/packages/hms-video-store/src/notification-manager/notification-manager.test.ts @@ -11,6 +11,7 @@ import HMSRoom from '../sdk/models/HMSRoom'; import { HMSRemotePeer } from '../sdk/models/peer'; import { Store } from '../sdk/store'; import HMSTransport from '../transport'; +import ITransportObserver from '../transport/ITransportObserver'; let joinHandler: jest.Mock; let previewHandler: jest.Mock; @@ -37,6 +38,9 @@ const store: Store = new Store(); let notificationManager: NotificationManager; let eventBus: EventBus; let transport: HMSTransport; +let deviceManager: DeviceManager; +let analyticsTimer: AnalyticsTimer; +let observer: ITransportObserver; beforeEach(() => { joinHandler = jest.fn(); @@ -58,6 +62,16 @@ beforeEach(() => { pollsUpdateHandler = jest.fn(); whiteboardUpdateHandler = jest.fn(); eventBus = new EventBus(); + deviceManager = new DeviceManager(store, eventBus); + analyticsTimer = new AnalyticsTimer(); + observer = { + onNotification: jest.fn(), + onTrackAdd: jest.fn(), + onTrackRemove: jest.fn(), + onFailure: jest.fn(), + onStateChange: jest.fn(), + onConnected: jest.fn(), + }; const mockMediaStream = { id: 'native-stream-id', getVideoTracks: jest.fn(() => [ @@ -83,19 +97,12 @@ beforeEach(() => { global.HTMLCanvasElement.prototype.captureStream = jest.fn().mockImplementation(() => mockMediaStream); transport = new HMSTransport( - { - onNotification: jest.fn(), - onTrackAdd: jest.fn(), - onTrackRemove: jest.fn(), - onFailure: jest.fn(), - onStateChange: jest.fn(), - onConnected: jest.fn(), - }, - new DeviceManager(store, eventBus), + observer, + deviceManager, store, eventBus, new AnalyticsEventsService(store), - new AnalyticsTimer(), + analyticsTimer, new PluginUsageTracker(eventBus), ); store.setRoom(new HMSRoom('1234')); @@ -120,6 +127,8 @@ beforeEach(() => { onWhiteboardUpdate: whiteboardUpdateHandler, }; + transport.setListener(listener); + audioListener = { onAudioLevelUpdate: audioUpdateHandler }; notificationManager = new NotificationManager(store, eventBus, transport, listener, audioListener); diff --git a/packages/hms-video-store/src/reactive-store/HMSSDKActions.ts b/packages/hms-video-store/src/reactive-store/HMSSDKActions.ts index 6f1d22e6f6..094ff6aac5 100644 --- a/packages/hms-video-store/src/reactive-store/HMSSDKActions.ts +++ b/packages/hms-video-store/src/reactive-store/HMSSDKActions.ts @@ -862,6 +862,7 @@ export class HMSSDKActions { type: HMSPeerType.REGULAR, }); testStore.addPeer(localPeer); + analyticsTimer = new AnalyticsTimer(); }); it('instantiates without any issues', () => { @@ -243,7 +245,7 @@ describe('LocalTrackManager', () => { testObserver, new DeviceManager(testStore, testEventBus), testEventBus, - new AnalyticsTimer(), + analyticsTimer, ); expect(manager).toBeDefined(); }); @@ -254,7 +256,7 @@ describe('LocalTrackManager', () => { testObserver, new DeviceManager(testStore, testEventBus), testEventBus, - new AnalyticsTimer(), + analyticsTimer, ); testStore.setKnownRoles(policyParams); await manager.getTracksToPublish({}); @@ -276,7 +278,7 @@ describe('LocalTrackManager', () => { testObserver, new DeviceManager(testStore, testEventBus), testEventBus, - new AnalyticsTimer(), + analyticsTimer, ); global.navigator.mediaDevices.getUserMedia = mockDenyGetUserMedia as any; testStore.setKnownRoles(policyParams); @@ -436,7 +438,7 @@ describe('LocalTrackManager', () => { testObserver, new DeviceManager(testStore, testEventBus), testEventBus, - new AnalyticsTimer(), + analyticsTimer, ); testStore.setKnownRoles(policyParams); const tracksToPublish = await manager.getTracksToPublish({}); @@ -465,7 +467,7 @@ describe('LocalTrackManager', () => { testObserver, new DeviceManager(testStore, testEventBus), testEventBus, - new AnalyticsTimer(), + analyticsTimer, ); testStore.setKnownRoles(policyParams); const tracksToPublish = await manager.getTracksToPublish({}); diff --git a/packages/hms-video-store/src/sdk/index.ts b/packages/hms-video-store/src/sdk/index.ts index c99eb87ef6..cb45ca5d57 100644 --- a/packages/hms-video-store/src/sdk/index.ts +++ b/packages/hms-video-store/src/sdk/index.ts @@ -169,6 +169,7 @@ export class HMSSdk implements HMSInterface { this.notificationManager?.setListener(this.listener); this.audioSinkManager.setListener(this.listener); this.interactivityCenter.setListener(this.listener); + this.transport.setListener(this.listener); return; } @@ -838,6 +839,9 @@ export class HMSSdk implements HMSInterface { }); return; } + this.transport.setOnScreenshareStop(() => { + this.stopEndedScreenshare(onStop); + }); await this.transport.publish(tracks); tracks.forEach(track => { track.peerId = this.localPeer?.peerId; diff --git a/packages/hms-video-store/src/sdk/store/Store.ts b/packages/hms-video-store/src/sdk/store/Store.ts index 320310813c..24cdc4f106 100644 --- a/packages/hms-video-store/src/sdk/store/Store.ts +++ b/packages/hms-video-store/src/sdk/store/Store.ts @@ -280,6 +280,10 @@ class Store { this.peerTrackStates[trackStateEntry.trackInfo.track_id] = trackStateEntry; } + removeTrackState(trackId: string) { + delete this.peerTrackStates[trackId]; + } + removePeer(peerId: string) { if (this.localPeerId === peerId) { this.localPeerId = undefined; diff --git a/packages/hms-video-store/src/signal/jsonrpc/index.ts b/packages/hms-video-store/src/signal/jsonrpc/index.ts index e00fea019d..ab4cf7c5cd 100644 --- a/packages/hms-video-store/src/signal/jsonrpc/index.ts +++ b/packages/hms-video-store/src/signal/jsonrpc/index.ts @@ -244,7 +244,7 @@ export default class JsonRpcSignal { simulcast: boolean, onDemandTracks: boolean, offer?: RTCSessionDescriptionInit, - ): Promise { + ): Promise { if (!this.isConnected) { throw ErrorFactory.WebSocketConnectionErrors.WebSocketConnectionLost( HMSAction.JOIN, @@ -260,7 +260,10 @@ export default class JsonRpcSignal { simulcast, onDemandTracks, }; - const response: RTCSessionDescriptionInit = await this.internalCall(HMSSignalMethod.JOIN, params); + const response: RTCSessionDescriptionInit & { sfu_node_id: string | undefined } = await this.internalCall( + HMSSignalMethod.JOIN, + params, + ); this.isJoinCompleted = true; this.pendingTrickle.forEach(({ target, candidate }) => this.trickle(target, candidate)); @@ -278,10 +281,15 @@ export default class JsonRpcSignal { } } - async offer(desc: RTCSessionDescriptionInit, tracks: Map): Promise { + async offer( + desc: RTCSessionDescriptionInit, + tracks: Map, + sfuNodeId?: string, + ): Promise { const response = await this.call(HMSSignalMethod.OFFER, { desc, tracks: Object.fromEntries(tracks), + sfu_node_id: sfuNodeId, }); return response as RTCSessionDescriptionInit; } diff --git a/packages/hms-video-store/src/transport/index.ts b/packages/hms-video-store/src/transport/index.ts index 54cc1848bc..67fdbe83c1 100644 --- a/packages/hms-video-store/src/transport/index.ts +++ b/packages/hms-video-store/src/transport/index.ts @@ -24,7 +24,7 @@ import { ErrorFactory } from '../error/ErrorFactory'; import { HMSAction } from '../error/HMSAction'; import { HMSException } from '../error/HMSException'; import { EventBus } from '../events/EventBus'; -import { HMSICEServer, HMSRole } from '../interfaces'; +import { HMSICEServer, HMSRole, HMSUpdateListener } from '../interfaces'; import { HMSLocalStream } from '../media/streams/HMSLocalStream'; import { HMSLocalTrack, HMSLocalVideoTrack, HMSTrack } from '../media/tracks'; import { TrackState } from '../notification-manager'; @@ -80,7 +80,11 @@ export default class HMSTransport { private subscribeStatsAnalytics?: SubscribeStatsAnalytics; private maxSubscribeBitrate = 0; private connectivityListener?: HMSDiagnosticsConnectivityListener; + private sfuNodeId?: string; joinRetryCount = 0; + private publishDisconnectTimer = 0; + private listener?: HMSUpdateListener; + private onScreenshareStop = () => {}; constructor( private observer: ITransportObserver, @@ -122,6 +126,14 @@ export default class HMSTransport { */ private readonly callbacks = new Map(); + setListener = (listener: HMSUpdateListener) => { + this.listener = listener; + }; + + setOnScreenshareStop = (onStop: () => void) => { + this.onScreenshareStop = onStop; + }; + private signalObserver: ISignalEventsObserver = { onOffer: async (jsep: RTCSessionDescriptionInit) => { try { @@ -149,7 +161,7 @@ export default class HMSTransport { if (err instanceof HMSException) { ex = err; } else { - ex = ErrorFactory.GenericErrors.Unknown(HMSAction.PUBLISH, (err as Error).message); + ex = ErrorFactory.GenericErrors.Unknown(HMSAction.SUBSCRIBE, (err as Error).message); } this.observer.onFailure(ex); this.eventBus.analytics.publish(AnalyticsEventFactory.subscribeFail(ex)); @@ -218,189 +230,6 @@ export default class HMSTransport { private publishDtlsStateTimer = 0; private lastPublishDtlsState: RTCDtlsTransportState = 'new'; - private publishConnectionObserver: IPublishConnectionObserver = { - onRenegotiationNeeded: async () => { - await this.performPublishRenegotiation(); - }, - - // eslint-disable-next-line complexity - onDTLSTransportStateChange: (state?: RTCDtlsTransportState) => { - const log = state === 'failed' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); - log(TAG, `Publisher on dtls transport state change: ${state}`); - - if (!state || this.lastPublishDtlsState === state) { - return; - } - - this.lastPublishDtlsState = state; - if (this.publishDtlsStateTimer !== 0) { - clearTimeout(this.publishDtlsStateTimer); - this.publishDtlsStateTimer = 0; - } - - if (state !== 'connecting' && state !== 'failed') { - return; - } - - const timeout = this.initConfig?.config?.dtlsStateTimeouts?.[state]; - if (!timeout || timeout <= 0) { - return; - } - - // if we're in connecting check again after timeout - // hotfix: mitigate https://100ms.atlassian.net/browse/LIVE-1924 - this.publishDtlsStateTimer = window.setTimeout(() => { - const newState = this.publishConnection?.nativeConnection.connectionState; - if (newState && state && newState === state) { - // stuck in either `connecting` or `failed` state for long time - const err = ErrorFactory.WebrtcErrors.ICEFailure( - HMSAction.PUBLISH, - `DTLS transport state ${state} timeout:${timeout}ms`, - true, - ); - this.eventBus.analytics.publish(AnalyticsEventFactory.disconnect(err)); - this.observer.onFailure(err); - } - }, timeout); - }, - - onDTLSTransportError: (error: Error) => { - HMSLogger.e(TAG, `onDTLSTransportError ${error.name} ${error.message}`, error); - this.eventBus.analytics.publish(AnalyticsEventFactory.disconnect(error)); - }, - - onIceConnectionChange: async (newState: RTCIceConnectionState) => { - const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); - log(TAG, `Publish ice connection state change: ${newState}`); - - // @TODO: Uncomment this and remove connectionstatechange - if (newState === 'failed') { - // await this.handleIceConnectionFailure(HMSConnectionRole.Publish); - } - }, - - // @TODO(eswar): Remove this. Use iceconnectionstate change with interval and threshold. - onConnectionStateChange: async (newState: RTCPeerConnectionState) => { - const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); - log(TAG, `Publish connection state change: ${newState}`); - - if (newState === 'connected') { - this.connectivityListener?.onICESuccess(true); - this.publishConnection?.handleSelectedIceCandidatePairs(); - } - - if (newState === 'disconnected') { - // if state stays disconnected for 5 seconds, retry - setTimeout(() => { - if (this.publishConnection?.connectionState === 'disconnected') { - this.handleIceConnectionFailure( - HMSConnectionRole.Publish, - ErrorFactory.WebrtcErrors.ICEDisconnected( - HMSAction.PUBLISH, - `local candidate - ${this.publishConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.publishConnection?.selectedCandidatePair?.remote?.candidate}`, - ), - ); - } - }, ICE_DISCONNECTION_TIMEOUT); - } - - if (newState === 'failed') { - await this.handleIceConnectionFailure( - HMSConnectionRole.Publish, - ErrorFactory.WebrtcErrors.ICEFailure( - HMSAction.PUBLISH, - `local candidate - ${this.publishConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.publishConnection?.selectedCandidatePair?.remote?.candidate}`, - ), - ); - } - }, - - onIceCandidate: candidate => { - this.connectivityListener?.onICECandidate(candidate, true); - }, - - onSelectedCandidatePairChange: candidatePair => { - this.connectivityListener?.onSelectedICECandidatePairChange(candidatePair, true); - }, - }; - - private subscribeConnectionObserver: ISubscribeConnectionObserver = { - onApiChannelMessage: (message: string) => { - this.observer.onNotification(JSON.parse(message)); - }, - - onTrackAdd: (track: HMSTrack) => { - HMSLogger.d(TAG, '[Subscribe] onTrackAdd', `${track}`); - this.observer.onTrackAdd(track); - }, - - onTrackRemove: (track: HMSTrack) => { - HMSLogger.d(TAG, '[Subscribe] onTrackRemove', `${track}`); - this.observer.onTrackRemove(track); - }, - - onIceConnectionChange: async (newState: RTCIceConnectionState) => { - const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); - log(TAG, `Subscribe ice connection state change: ${newState}`); - - if (newState === 'failed') { - // await this.handleIceConnectionFailure(HMSConnectionRole.Subscribe); - } - - if (newState === 'connected') { - const callback = this.callbacks.get(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); - this.callbacks.delete(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); - - this.connectivityListener?.onICESuccess(false); - if (callback) { - callback.promise.resolve(true); - } - } - }, - - // @TODO(eswar): Remove this. Use iceconnectionstate change with interval and threshold. - onConnectionStateChange: async (newState: RTCPeerConnectionState) => { - const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); - log(TAG, `Subscribe connection state change: ${newState}`); - - if (newState === 'failed') { - await this.handleIceConnectionFailure( - HMSConnectionRole.Subscribe, - ErrorFactory.WebrtcErrors.ICEFailure( - HMSAction.SUBSCRIBE, - `local candidate - ${this.subscribeConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.subscribeConnection?.selectedCandidatePair?.remote?.candidate}`, - ), - ); - } - - if (newState === 'disconnected') { - setTimeout(() => { - if (this.subscribeConnection?.connectionState === 'disconnected') { - this.handleIceConnectionFailure( - HMSConnectionRole.Subscribe, - ErrorFactory.WebrtcErrors.ICEDisconnected( - HMSAction.SUBSCRIBE, - `local candidate - ${this.subscribeConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.subscribeConnection?.selectedCandidatePair?.remote?.candidate}`, - ), - ); - } - }, ICE_DISCONNECTION_TIMEOUT); - } - - if (newState === 'connected') { - this.handleSubscribeConnectionConnected(); - } - }, - - onIceCandidate: candidate => { - this.connectivityListener?.onICECandidate(candidate, false); - }, - - onSelectedCandidatePairChange: candidatePair => { - this.connectivityListener?.onSelectedICECandidatePairChange(candidatePair, false); - }, - }; - getWebrtcInternals() { return this.webrtcInternals; } @@ -533,8 +362,7 @@ export default class HMSTransport { this.publishStatsAnalytics?.stop(); this.subscribeStatsAnalytics?.stop(); this.webrtcInternals?.cleanup(); - await this.publishConnection?.close(); - await this.subscribeConnection?.close(); + this.clearPeerConnections(); if (notifyServer) { try { this.signal.leave(); @@ -591,6 +419,98 @@ export default class HMSTransport { } } + setSFUNodeId(id: string) { + if (!this.sfuNodeId) { + this.sfuNodeId = id; + } else if (this.sfuNodeId !== id) { + this.sfuNodeId = id; + this.handleSFUMigration(); + } + } + + // eslint-disable-next-line complexity + async handleSFUMigration() { + HMSLogger.time('sfu migration'); + const peers = this.store.getPeerMap(); + for (const peerId in peers) { + const peer = peers[peerId]; + if (peer.isLocal) { + continue; + } + if (peer.audioTrack) { + this.store.removeTrack(peer.audioTrack); + peer.audioTrack = undefined; + } + if (peer.videoTrack) { + this.store.removeTrack(peer.videoTrack); + peer.videoTrack = undefined; + } + while (peer.auxiliaryTracks.length > 0) { + const track = peer.auxiliaryTracks.shift(); + if (track) { + this.store.removeTrack(track); + } + } + } + this.clearPeerConnections(); + this.createPeerConnections(); + await this.negotiateOnFirstPublish(); + const localPeer = this.store.getLocalPeer(); + if (!localPeer) { + return; + } + + let tracksToPublish = []; + const streamMap = new Map(); + if (localPeer.audioTrack) { + const stream = localPeer.audioTrack.stream as HMSLocalStream; + if (!streamMap.get(stream.id)) { + streamMap.set(stream.id, stream.clone()); + } + const newTrack = localPeer.audioTrack.clone(streamMap.get(stream.id)); + this.store.removeTrack(localPeer.audioTrack); + tracksToPublish.push(newTrack); + localPeer.audioTrack = newTrack; + } + + if (localPeer.videoTrack) { + const stream = localPeer.videoTrack.stream as HMSLocalStream; + if (!streamMap.get(stream.id)) { + streamMap.set(stream.id, stream.clone()); + } + this.store.removeTrack(localPeer.videoTrack); + const newTrack = localPeer.videoTrack.clone(streamMap.get(stream.id)); + tracksToPublish.push(newTrack); + localPeer.videoTrack = newTrack; + } + + const auxTracks = []; + while (localPeer.auxiliaryTracks.length > 0) { + const track = localPeer.auxiliaryTracks.shift(); + if (track) { + const stream = track.stream as HMSLocalStream; + if (!streamMap.get(stream.id)) { + streamMap.set(stream.id, stream.clone()); + } + this.store.removeTrack(track); + const newTrack = track.clone(streamMap.get(stream.id)); + if (newTrack.type === 'video' && newTrack.source === 'screen') { + newTrack.nativeTrack.addEventListener('ended', this.onScreenshareStop); + } + auxTracks.push(newTrack); + } + } + localPeer.auxiliaryTracks = auxTracks; + tracksToPublish = tracksToPublish.concat(auxTracks); + streamMap.clear(); + for (const track of tracksToPublish) { + await this.publishTrack(track); + // this.listener?.onTrackUpdate(HMSTrackUpdate.TRACK_ADDED, track, localPeer); + } + this.listener?.onSFUMigration?.(); + HMSLogger.timeEnd('sfu migration'); + } + /** * TODO: check if track.publishedTrackId be used instead of the hack to match with track with same type and * source. The hack won't work if there are multiple tracks with same source and type. @@ -653,7 +573,7 @@ export default class HMSTransport { HMSLogger.d(TAG, `✅ publishTrack: trackId=${track.trackId}`, `${track}`, this.callbacks); } - private async unpublishTrack(track: HMSLocalTrack): Promise { + private async unpublishTrack(track: HMSLocalTrack, sfuMigration = false): Promise { HMSLogger.d(TAG, `⏳ unpublishTrack: trackId=${track.trackId}`, `${track}`); if (track.publishedTrackId && this.trackStates.has(track.publishedTrackId)) { this.trackStates.delete(track.publishedTrackId); @@ -679,12 +599,26 @@ export default class HMSTransport { const stream = track.stream as HMSLocalStream; stream.removeSender(track); await p; - await track.cleanup(); - // remove track from store on unpublish - this.store.removeTrack(track); + if (!sfuMigration) { + await track.cleanup(); + // remove track from store on unpublish + this.store.removeTrack(track); + } HMSLogger.d(TAG, `✅ unpublishTrack: trackId=${track.trackId}`, this.callbacks); } + private async clearPeerConnections() { + clearTimeout(this.publishDtlsStateTimer); + this.publishDtlsStateTimer = 0; + clearTimeout(this.publishDisconnectTimer); + this.publishDisconnectTimer = 0; + this.lastPublishDtlsState = 'new'; + this.publishConnection?.close(); + this.subscribeConnection?.close(); + this.publishConnection = null; + this.subscribeConnection = null; + } + private waitForLocalRoleAvailability() { if (this.store.hasRoleDetailsArrived()) { return; @@ -716,11 +650,186 @@ export default class HMSTransport { private createPeerConnections() { if (this.initConfig) { + const publishConnectionObserver: IPublishConnectionObserver = { + onRenegotiationNeeded: async () => { + await this.performPublishRenegotiation(); + }, + + // eslint-disable-next-line complexity + onDTLSTransportStateChange: (state?: RTCDtlsTransportState) => { + const log = state === 'failed' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); + log(TAG, `Publisher on dtls transport state change: ${state}`); + + if (!state || this.lastPublishDtlsState === state) { + return; + } + + this.lastPublishDtlsState = state; + if (this.publishDtlsStateTimer !== 0) { + clearTimeout(this.publishDtlsStateTimer); + this.publishDtlsStateTimer = 0; + } + + if (state !== 'connecting' && state !== 'failed') { + return; + } + + const timeout = this.initConfig?.config?.dtlsStateTimeouts?.[state]; + if (!timeout || timeout <= 0) { + return; + } + + // if we're in connecting check again after timeout + // hotfix: mitigate https://100ms.atlassian.net/browse/LIVE-1924 + this.publishDtlsStateTimer = window.setTimeout(() => { + const newState = this.publishConnection?.nativeConnection.connectionState; + if (newState && state && newState === state) { + // stuck in either `connecting` or `failed` state for long time + const err = ErrorFactory.WebrtcErrors.ICEFailure( + HMSAction.PUBLISH, + `DTLS transport state ${state} timeout:${timeout}ms`, + true, + ); + this.eventBus.analytics.publish(AnalyticsEventFactory.disconnect(err)); + this.observer.onFailure(err); + } + }, timeout); + }, + + onDTLSTransportError: (error: Error) => { + HMSLogger.e(TAG, `onDTLSTransportError ${error.name} ${error.message}`, error); + this.eventBus.analytics.publish(AnalyticsEventFactory.disconnect(error)); + }, + + onIceConnectionChange: async (newState: RTCIceConnectionState) => { + const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); + log(TAG, `Publish ice connection state change: ${newState}`); + }, + + onConnectionStateChange: async (newState: RTCPeerConnectionState) => { + const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); + log(TAG, `Publish connection state change: ${newState}`); + if (newState === 'new') { + return; + } + + if (newState === 'connected') { + this.connectivityListener?.onICESuccess(true); + this.publishConnection?.handleSelectedIceCandidatePairs(); + } else if (newState === 'failed') { + await this.handleIceConnectionFailure( + HMSConnectionRole.Publish, + ErrorFactory.WebrtcErrors.ICEFailure( + HMSAction.PUBLISH, + `local candidate - ${this.publishConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.publishConnection?.selectedCandidatePair?.remote?.candidate}`, + ), + ); + } else { + this.publishDisconnectTimer = window.setTimeout(() => { + if (this.publishConnection?.connectionState !== 'connected') { + this.handleIceConnectionFailure( + HMSConnectionRole.Publish, + ErrorFactory.WebrtcErrors.ICEDisconnected( + HMSAction.PUBLISH, + `local candidate - ${this.publishConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.publishConnection?.selectedCandidatePair?.remote?.candidate}`, + ), + ); + } + }, ICE_DISCONNECTION_TIMEOUT); + } + }, + + onIceCandidate: candidate => { + this.connectivityListener?.onICECandidate(candidate, true); + }, + + onSelectedCandidatePairChange: candidatePair => { + this.connectivityListener?.onSelectedICECandidatePairChange(candidatePair, true); + }, + }; + + const subscribeConnectionObserver: ISubscribeConnectionObserver = { + onApiChannelMessage: (message: string) => { + this.observer.onNotification(JSON.parse(message)); + }, + + onTrackAdd: (track: HMSTrack) => { + HMSLogger.d(TAG, '[Subscribe] onTrackAdd', `${track}`); + this.observer.onTrackAdd(track); + }, + + onTrackRemove: (track: HMSTrack) => { + HMSLogger.d(TAG, '[Subscribe] onTrackRemove', `${track}`); + this.observer.onTrackRemove(track); + }, + + onIceConnectionChange: async (newState: RTCIceConnectionState) => { + const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); + log(TAG, `Subscribe ice connection state change: ${newState}`); + + // if (newState === 'failed') { + // // await this.handleIceConnectionFailure(HMSConnectionRole.Subscribe); + // } + + if (newState === 'connected') { + const callback = this.callbacks.get(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); + this.callbacks.delete(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); + + this.connectivityListener?.onICESuccess(false); + if (callback) { + callback.promise.resolve(true); + } + } + }, + + onConnectionStateChange: async (newState: RTCPeerConnectionState) => { + const log = newState === 'disconnected' ? HMSLogger.w.bind(HMSLogger) : HMSLogger.d.bind(HMSLogger); + log(TAG, `Subscribe connection state change: ${newState}`); + + if (newState === 'failed') { + await this.handleIceConnectionFailure( + HMSConnectionRole.Subscribe, + ErrorFactory.WebrtcErrors.ICEFailure( + HMSAction.SUBSCRIBE, + `local candidate - ${this.subscribeConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.subscribeConnection?.selectedCandidatePair?.remote?.candidate}`, + ), + ); + } else if (newState === 'disconnected') { + setTimeout(() => { + if (this.subscribeConnection?.connectionState === 'disconnected') { + this.handleIceConnectionFailure( + HMSConnectionRole.Subscribe, + ErrorFactory.WebrtcErrors.ICEDisconnected( + HMSAction.SUBSCRIBE, + `local candidate - ${this.subscribeConnection?.selectedCandidatePair?.local?.candidate}; remote candidate - ${this.subscribeConnection?.selectedCandidatePair?.remote?.candidate}`, + ), + ); + } + }, ICE_DISCONNECTION_TIMEOUT); + } else if (newState === 'connected') { + this.subscribeConnection?.handleSelectedIceCandidatePairs(); + const callback = this.callbacks.get(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); + this.callbacks.delete(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); + + if (callback) { + callback.promise.resolve(true); + } + } + }, + + onIceCandidate: candidate => { + this.connectivityListener?.onICECandidate(candidate, false); + }, + + onSelectedCandidatePairChange: candidatePair => { + this.connectivityListener?.onSelectedICECandidatePairChange(candidatePair, false); + }, + }; if (!this.publishConnection) { this.publishConnection = new HMSPublishConnection( this.signal, this.initConfig.rtcConfiguration, - this.publishConnectionObserver, + publishConnectionObserver, ); } @@ -729,7 +838,7 @@ export default class HMSTransport { this.signal, this.initConfig.rtcConfiguration, this.isFlagEnabled.bind(this), - this.subscribeConnectionObserver, + subscribeConnectionObserver, ); } } @@ -838,6 +947,7 @@ export default class HMSTransport { simulcast, onDemandTracks, ); + this.sfuNodeId = response?.sfu_node_id; return !!response; } @@ -852,7 +962,7 @@ export default class HMSTransport { } const offer = await this.publishConnection.createOffer(this.trackStates); await this.publishConnection.setLocalDescription(offer); - const answer = await this.signal.offer(offer, this.trackStates); + const answer = await this.signal.offer(offer, this.trackStates, this.sfuNodeId); await this.publishConnection.setRemoteDescription(answer); for (const candidate of this.publishConnection.candidates) { await this.publishConnection.addIceCandidate(candidate); @@ -878,7 +988,7 @@ export default class HMSTransport { const offer = await this.publishConnection.createOffer(this.trackStates, constraints); await this.publishConnection.setLocalDescription(offer); HMSLogger.time(`renegotiation-offer-exchange`); - const answer = await this.signal.offer(offer, this.trackStates); + const answer = await this.signal.offer(offer, this.trackStates, this.sfuNodeId); this.callbacks.delete(RENEGOTIATION_CALLBACK_ID); HMSLogger.timeEnd(`renegotiation-offer-exchange`); await this.publishConnection.setRemoteDescription(answer); @@ -1081,15 +1191,7 @@ export default class HMSTransport { * Do iceRestart only if not connected */ if (this.publishConnection) { - const p = new Promise((resolve, reject) => { - this.callbacks.set(RENEGOTIATION_CALLBACK_ID, { - promise: { resolve, reject }, - action: HMSAction.RESTART_ICE, - extra: {}, - }); - }); await this.performPublishRenegotiation({ iceRestart: this.publishConnection.connectionState !== 'connected' }); - await p; } return true; @@ -1139,16 +1241,6 @@ export default class HMSTransport { return ok; }; - private handleSubscribeConnectionConnected() { - this.subscribeConnection?.handleSelectedIceCandidatePairs(); - const callback = this.callbacks.get(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); - this.callbacks.delete(SUBSCRIBE_ICE_CONNECTION_CALLBACK_ID); - - if (callback) { - callback.promise.resolve(true); - } - } - private setTransportStateForConnect() { if (this.state === TransportState.Failed) { this.state = TransportState.Disconnected; diff --git a/packages/hms-video-store/src/utils/constants.ts b/packages/hms-video-store/src/utils/constants.ts index 5372663c7b..b31ecbfc32 100644 --- a/packages/hms-video-store/src/utils/constants.ts +++ b/packages/hms-video-store/src/utils/constants.ts @@ -59,7 +59,7 @@ export const HMSEvents = { export const PROTOCOL_VERSION = '2.5'; -export const PROTOCOL_SPEC = '20240521'; +export const PROTOCOL_SPEC = '20240720'; export const HAND_RAISE_GROUP_NAME = '_handraise'; diff --git a/packages/roomkit-react/src/Prebuilt/components/AppData/AppData.tsx b/packages/roomkit-react/src/Prebuilt/components/AppData/AppData.tsx index e84667484c..0d1d866e68 100644 --- a/packages/roomkit-react/src/Prebuilt/components/AppData/AppData.tsx +++ b/packages/roomkit-react/src/Prebuilt/components/AppData/AppData.tsx @@ -117,7 +117,7 @@ export const AppData = React.memo(() => { ...uiSettings, [UI_SETTINGS.isAudioOnly]: undefined, [UI_SETTINGS.uiViewMode]: uiSettings.uiViewMode || UI_MODE_GRID, - [UI_SETTINGS.maxTileCount]: elements?.video_tile_layout?.grid?.tiles_in_view || 9, + [UI_SETTINGS.maxTileCount]: Number(elements?.video_tile_layout?.grid?.tiles_in_view) || 9, }; hmsActions.setAppData(APP_DATA.uiSettings, updatedSettings, true); }, [preferences, hmsActions, elements?.video_tile_layout]);