Skip to content

Commit

Permalink
feat: add support for sfu migration
Browse files Browse the repository at this point in the history
  • Loading branch information
gulzar1996 authored Jul 31, 2024
1 parent 2cbb2a0 commit c58345b
Show file tree
Hide file tree
Showing 20 changed files with 404 additions and 239 deletions.
2 changes: 1 addition & 1 deletion packages/hms-video-store/src/connection/HMSConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export default abstract class HMSConnection {
return await this.nativeConnection.getStats();
}

async close() {
close() {
this.nativeConnection.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export default class HMSPublishConnection extends HMSConnection {
this.observer.onIceConnectionChange(this.nativeConnection.iceConnectionState);
};

// @TODO(eswar): Remove this. Use iceconnectionstate change with interval and threshold.
this.nativeConnection.onconnectionstatechange = () => {
this.observer.onConnectionStateChange(this.nativeConnection.connectionState);

Expand All @@ -51,6 +50,11 @@ export default class HMSPublishConnection extends HMSConnection {
};
}

close() {
super.close();
this.channel.close();
}

initAfterJoin() {
this.nativeConnection.onnegotiationneeded = async () => {
HMSLogger.d(this.TAG, `onnegotiationneeded`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ export default class HMSSubscribeConnection extends HMSConnection {
return this.sendMessage(request, id);
}

async close() {
await super.close();
close() {
super.close();
this.apiChannel?.close();
}

Expand Down
1 change: 1 addition & 0 deletions packages/hms-video-store/src/interfaces/update-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export interface HMSUpdateListener extends DeviceChangeListener, SessionStoreLis
onError(error: HMSException): void;
onReconnecting(error: HMSException): void;
onReconnected(): void;
onSFUMigration?: () => void;
onRoleChangeRequest(request: HMSRoleChangeRequest): void;
onRoleUpdate(newRole: string): void;
onChangeTrackStateRequest(request: HMSChangeTrackStateRequest): void;
Expand Down
4 changes: 4 additions & 0 deletions packages/hms-video-store/src/media/streams/HMSLocalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export class HMSLocalStream extends HMSMediaStream {
this.connection = connection;
}

clone() {
return new HMSLocalStream(this.nativeStream.clone());
}

addTransceiver(track: HMSLocalTrack, simulcastLayers: SimulcastLayer[]) {
const transceiver = this.connection!.addTransceiver(track.getTrackBeingSent(), {
streams: [this.nativeStream],
Expand Down
13 changes: 12 additions & 1 deletion packages/hms-video-store/src/media/tracks/HMSLocalAudioTrack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class HMSLocalAudioTrack extends HMSAudioTrack {
source: string,
private eventBus: EventBus,
settings: HMSAudioTrackSettings = new HMSAudioTrackSettingsBuilder().build(),
room?: Room,
private room?: Room,
) {
super(stream, track, source);
stream.tracks.push(this);
Expand All @@ -64,6 +64,17 @@ export class HMSLocalAudioTrack extends HMSAudioTrack {
}
}

clone(stream?: HMSLocalStream) {
return new HMSLocalAudioTrack(
stream || (this.stream as HMSLocalStream).clone(),
this.nativeTrack.clone(),
this.source!,
this.eventBus,
this.settings,
this.room,
);
}

getManuallySelectedDeviceId() {
return this.manuallySelectedDeviceId;
}
Expand Down
13 changes: 12 additions & 1 deletion packages/hms-video-store/src/media/tracks/HMSLocalVideoTrack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class HMSLocalVideoTrack extends HMSVideoTrack {
source: string,
private eventBus: EventBus,
settings: HMSVideoTrackSettings = new HMSVideoTrackSettingsBuilder().build(),
room?: Room,
private room?: Room,
) {
super(stream, track, source);
stream.tracks.push(this);
Expand All @@ -89,6 +89,17 @@ export class HMSLocalVideoTrack extends HMSVideoTrack {
}
}

clone(stream?: HMSLocalStream) {
return new HMSLocalVideoTrack(
stream || (this.stream as HMSLocalStream).clone(),
this.nativeTrack.clone(),
this.source!,
this.eventBus,
this.settings,
this.room,
);
}

/** @internal */
setSimulcastDefinitons(definitions: HMSSimulcastLayerDefinition[]) {
this._layerDefinitions = definitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ export enum HMSNotificationMethod {
POLL_STATS = 'on-poll-stats',
ROOM_INFO = 'room-info',
SESSION_INFO = 'session-info',
NODE_INFO = 'node-info',
WHITEBOARD_UPDATE = 'on-whiteboard-update',
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,7 @@ export interface WhiteboardInfo {
state?: string;
attributes?: Array<{ name: string; value: unknown }>;
}

export interface NodeInfo {
sfu_node_id: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { WhiteboardManager } from './managers/WhiteboardManager';
import { HMSNotificationMethod } from './HMSNotificationMethod';
import {
ConnectionQualityList,
NodeInfo,
OnTrackLayerUpdateNotification,
PolicyParams,
SpeakerList,
Expand Down Expand Up @@ -168,6 +169,10 @@ export class NotificationManager {
this.policyChangeManager.handlePolicyChange(notification as PolicyParams);
break;

case HMSNotificationMethod.NODE_INFO:
this.transport.setSFUNodeId((notification as NodeInfo).sfu_node_id);
break;

default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ export class TrackManager {
track.type === HMSTrackType.AUDIO
? this.eventBus.audioTrackAdded.publish({ track: track as HMSRemoteAudioTrack, peer: hmsPeer as HMSRemotePeer })
: this.listener?.onTrackUpdate(HMSTrackUpdate.TRACK_ADDED, track, hmsPeer);
this.store.removeTrackState(track.trackId);
this.tracksToProcess.delete(track.trackId);
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import HMSRoom from '../sdk/models/HMSRoom';
import { HMSRemotePeer } from '../sdk/models/peer';
import { Store } from '../sdk/store';
import HMSTransport from '../transport';
import ITransportObserver from '../transport/ITransportObserver';

let joinHandler: jest.Mock<any, any>;
let previewHandler: jest.Mock<any, any>;
Expand All @@ -37,6 +38,9 @@ const store: Store = new Store();
let notificationManager: NotificationManager;
let eventBus: EventBus;
let transport: HMSTransport;
let deviceManager: DeviceManager;
let analyticsTimer: AnalyticsTimer;
let observer: ITransportObserver;

beforeEach(() => {
joinHandler = jest.fn();
Expand All @@ -58,6 +62,16 @@ beforeEach(() => {
pollsUpdateHandler = jest.fn();
whiteboardUpdateHandler = jest.fn();
eventBus = new EventBus();
deviceManager = new DeviceManager(store, eventBus);
analyticsTimer = new AnalyticsTimer();
observer = {
onNotification: jest.fn(),
onTrackAdd: jest.fn(),
onTrackRemove: jest.fn(),
onFailure: jest.fn(),
onStateChange: jest.fn(),
onConnected: jest.fn(),
};
const mockMediaStream = {
id: 'native-stream-id',
getVideoTracks: jest.fn(() => [
Expand All @@ -83,19 +97,12 @@ beforeEach(() => {
global.HTMLCanvasElement.prototype.captureStream = jest.fn().mockImplementation(() => mockMediaStream);

transport = new HMSTransport(
{
onNotification: jest.fn(),
onTrackAdd: jest.fn(),
onTrackRemove: jest.fn(),
onFailure: jest.fn(),
onStateChange: jest.fn(),
onConnected: jest.fn(),
},
new DeviceManager(store, eventBus),
observer,
deviceManager,
store,
eventBus,
new AnalyticsEventsService(store),
new AnalyticsTimer(),
analyticsTimer,
new PluginUsageTracker(eventBus),
);
store.setRoom(new HMSRoom('1234'));
Expand All @@ -120,6 +127,8 @@ beforeEach(() => {
onWhiteboardUpdate: whiteboardUpdateHandler,
};

transport.setListener(listener);

audioListener = { onAudioLevelUpdate: audioUpdateHandler };

notificationManager = new NotificationManager(store, eventBus, transport, listener, audioListener);
Expand Down
4 changes: 4 additions & 0 deletions packages/hms-video-store/src/reactive-store/HMSSDKActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ export class HMSSDKActions<T extends HMSGenericTypes = { sessionStore: Record<st
onSessionStoreUpdate: this.onSessionStoreUpdate.bind(this),
onPollsUpdate: this.onPollsUpdate.bind(this),
onWhiteboardUpdate: this.onWhiteboardUpdate.bind(this),
onSFUMigration: this.onSFUMigration.bind(this),
});
this.sdk.addAudioListener({
onAudioLevelUpdate: this.onAudioLevelUpdate.bind(this),
Expand All @@ -871,6 +872,9 @@ export class HMSSDKActions<T extends HMSGenericTypes = { sessionStore: Record<st
});
}

private onSFUMigration() {
this.syncRoomState('SFUMigration');
}
private onRemovedFromRoom(request: SDKHMSLeaveRoomRequest) {
const requestedBy = this.store.getState(selectPeerByID(request.requestedBy?.peerId));
this.hmsNotifications.sendLeaveRoom({
Expand Down
12 changes: 7 additions & 5 deletions packages/hms-video-store/src/sdk/LocalTrackManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const testObserver: ITransportObserver = {

let testStore = new Store();
let testEventBus = new EventBus();
let analyticsTimer = new AnalyticsTimer();

const policyParams: PolicyParams = {
name: 'host',
Expand Down Expand Up @@ -235,6 +236,7 @@ describe('LocalTrackManager', () => {
type: HMSPeerType.REGULAR,
});
testStore.addPeer(localPeer);
analyticsTimer = new AnalyticsTimer();
});

it('instantiates without any issues', () => {
Expand All @@ -243,7 +245,7 @@ describe('LocalTrackManager', () => {
testObserver,
new DeviceManager(testStore, testEventBus),
testEventBus,
new AnalyticsTimer(),
analyticsTimer,
);
expect(manager).toBeDefined();
});
Expand All @@ -254,7 +256,7 @@ describe('LocalTrackManager', () => {
testObserver,
new DeviceManager(testStore, testEventBus),
testEventBus,
new AnalyticsTimer(),
analyticsTimer,
);
testStore.setKnownRoles(policyParams);
await manager.getTracksToPublish({});
Expand All @@ -276,7 +278,7 @@ describe('LocalTrackManager', () => {
testObserver,
new DeviceManager(testStore, testEventBus),
testEventBus,
new AnalyticsTimer(),
analyticsTimer,
);
global.navigator.mediaDevices.getUserMedia = mockDenyGetUserMedia as any;
testStore.setKnownRoles(policyParams);
Expand Down Expand Up @@ -436,7 +438,7 @@ describe('LocalTrackManager', () => {
testObserver,
new DeviceManager(testStore, testEventBus),
testEventBus,
new AnalyticsTimer(),
analyticsTimer,
);
testStore.setKnownRoles(policyParams);
const tracksToPublish = await manager.getTracksToPublish({});
Expand Down Expand Up @@ -465,7 +467,7 @@ describe('LocalTrackManager', () => {
testObserver,
new DeviceManager(testStore, testEventBus),
testEventBus,
new AnalyticsTimer(),
analyticsTimer,
);
testStore.setKnownRoles(policyParams);
const tracksToPublish = await manager.getTracksToPublish({});
Expand Down
4 changes: 4 additions & 0 deletions packages/hms-video-store/src/sdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export class HMSSdk implements HMSInterface {
this.notificationManager?.setListener(this.listener);
this.audioSinkManager.setListener(this.listener);
this.interactivityCenter.setListener(this.listener);
this.transport.setListener(this.listener);
return;
}

Expand Down Expand Up @@ -838,6 +839,9 @@ export class HMSSdk implements HMSInterface {
});
return;
}
this.transport.setOnScreenshareStop(() => {
this.stopEndedScreenshare(onStop);
});
await this.transport.publish(tracks);
tracks.forEach(track => {
track.peerId = this.localPeer?.peerId;
Expand Down
4 changes: 4 additions & 0 deletions packages/hms-video-store/src/sdk/store/Store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ class Store {
this.peerTrackStates[trackStateEntry.trackInfo.track_id] = trackStateEntry;
}

removeTrackState(trackId: string) {
delete this.peerTrackStates[trackId];
}

removePeer(peerId: string) {
if (this.localPeerId === peerId) {
this.localPeerId = undefined;
Expand Down
14 changes: 11 additions & 3 deletions packages/hms-video-store/src/signal/jsonrpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ export default class JsonRpcSignal {
simulcast: boolean,
onDemandTracks: boolean,
offer?: RTCSessionDescriptionInit,
): Promise<RTCSessionDescriptionInit> {
): Promise<RTCSessionDescriptionInit & { sfu_node_id: string | undefined }> {
if (!this.isConnected) {
throw ErrorFactory.WebSocketConnectionErrors.WebSocketConnectionLost(
HMSAction.JOIN,
Expand All @@ -260,7 +260,10 @@ export default class JsonRpcSignal {
simulcast,
onDemandTracks,
};
const response: RTCSessionDescriptionInit = await this.internalCall(HMSSignalMethod.JOIN, params);
const response: RTCSessionDescriptionInit & { sfu_node_id: string | undefined } = await this.internalCall(
HMSSignalMethod.JOIN,
params,
);

this.isJoinCompleted = true;
this.pendingTrickle.forEach(({ target, candidate }) => this.trickle(target, candidate));
Expand All @@ -278,10 +281,15 @@ export default class JsonRpcSignal {
}
}

async offer(desc: RTCSessionDescriptionInit, tracks: Map<string, any>): Promise<RTCSessionDescriptionInit> {
async offer(
desc: RTCSessionDescriptionInit,
tracks: Map<string, any>,
sfuNodeId?: string,
): Promise<RTCSessionDescriptionInit> {
const response = await this.call(HMSSignalMethod.OFFER, {
desc,
tracks: Object.fromEntries(tracks),
sfu_node_id: sfuNodeId,
});
return response as RTCSessionDescriptionInit;
}
Expand Down
Loading

0 comments on commit c58345b

Please sign in to comment.