Skip to content

Commit

Permalink
Update publish-alpha (#3164)
Browse files Browse the repository at this point in the history
Co-authored-by: Ravi theja <[email protected]>
Co-authored-by: Eswar Prasad Clinton. A <[email protected]>
  • Loading branch information
3 people authored Aug 9, 2024
1 parent 9cbdaec commit 4ec5db3
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class HMSLocalAudioTrack extends HMSAudioTrack {
this.settings,
this.room,
);
track.peerId = this.peerId;

if (this.pluginsManager.pluginsMap.size > 0) {
this.pluginsManager.pluginsMap.forEach(value => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ export class HMSLocalVideoTrack extends HMSVideoTrack {
this.settings,
this.room,
);
track.peerId = this.peerId;

if (this.pluginsManager.pluginsMap.size > 0) {
this.pluginsManager.pluginsMap.forEach(value => {
track
Expand Down
39 changes: 15 additions & 24 deletions packages/hms-video-store/src/rtc-stats/HMSWebrtcInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,18 @@ export class HMSWebrtcInternals {
private isMonitored = false;
private hmsStats?: HMSWebrtcStats;

constructor(
private readonly store: Store,
private readonly eventBus: EventBus,
private publishConnection?: RTCPeerConnection,
private subscribeConnection?: RTCPeerConnection,
) {}
constructor(private readonly store: Store, private readonly eventBus: EventBus) {}

getPublishPeerConnection() {
return this.publishConnection;
getCurrentStats() {
return this.hmsStats;
}

getSubscribePeerConnection() {
return this.subscribeConnection;
getPublishPeerConnection() {
return this.hmsStats?.getPublishPeerConnection();
}

getCurrentStats() {
return this.hmsStats;
getSubscribePeerConnection() {
return this.hmsStats?.getSubscribePeerConnection();
}

onStatsChange(statsChangeCb: (stats: HMSWebrtcStats) => void) {
Expand All @@ -42,25 +37,21 @@ export class HMSWebrtcInternals {

private handleStatsUpdate = async () => {
await this.hmsStats?.updateStats();
this.eventBus.statsUpdate.publish(this.hmsStats);
if (this.hmsStats) {
this.eventBus.statsUpdate.publish(this.hmsStats);
}
};

/**
*
* @internal
*/
setPeerConnections({ publish, subscribe }: { publish?: RTCPeerConnection; subscribe?: RTCPeerConnection }) {
this.publishConnection = publish;
this.subscribeConnection = subscribe;

this.hmsStats = new HMSWebrtcStats(
{
publish: this.publishConnection?.getStats.bind(this.publishConnection),
subscribe: this.subscribeConnection?.getStats.bind(this.subscribeConnection),
},
this.store,
this.eventBus,
);
if (this.hmsStats) {
this.hmsStats.setPeerConnections({ publish, subscribe });
} else {
this.hmsStats = new HMSWebrtcStats(this.store, this.eventBus, publish, subscribe);
}
}

/**
Expand Down
23 changes: 18 additions & 5 deletions packages/hms-video-store/src/rtc-stats/HMSWebrtcStats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import AnalyticsEventFactory from '../analytics/AnalyticsEventFactory';
import { ErrorFactory } from '../error/ErrorFactory';
import { HMSAction } from '../error/HMSAction';
import { EventBus } from '../events/EventBus';
import { HMSPeerStats, HMSTrackStats, PeerConnectionType } from '../interfaces/webrtc-stats';
import { HMSPeerStats, HMSTrackStats } from '../interfaces/webrtc-stats';
import { HMSLocalTrack, HMSRemoteAudioTrack, HMSRemoteTrack, HMSRemoteVideoTrack } from '../media/tracks';
import { Store } from '../sdk/store';
import HMSLogger from '../utils/logger';
Expand All @@ -27,13 +27,27 @@ export class HMSWebrtcStats {
* this is initialized
*/
constructor(
private getStats: Record<PeerConnectionType, RTCPeerConnection['getStats'] | undefined>,
private store: Store,
private readonly eventBus: EventBus,
private publishConnection?: RTCPeerConnection,
private subscribeConnection?: RTCPeerConnection,
) {
this.localPeerID = this.store.getLocalPeer()?.peerId;
}

setPeerConnections({ publish, subscribe }: { publish?: RTCPeerConnection; subscribe?: RTCPeerConnection }) {
this.publishConnection = publish;
this.subscribeConnection = subscribe;
}

getPublishPeerConnection() {
return this.publishConnection;
}

getSubscribePeerConnection() {
return this.subscribeConnection;
}

getLocalPeerStats = (): HMSPeerStats | undefined => {
return this.peerStats[this.localPeerID!];
};
Expand Down Expand Up @@ -63,7 +77,7 @@ export class HMSWebrtcStats {
const prevLocalPeerStats = this.getLocalPeerStats();
let publishReport: RTCStatsReport | undefined;
try {
publishReport = await this.getStats.publish?.();
publishReport = await this.publishConnection?.getStats();
} catch (err: any) {
this.eventBus.analytics.publish(
AnalyticsEventFactory.rtcStatsFailed(ErrorFactory.WebrtcErrors.StatsFailed(HMSAction.PUBLISH, err.message)),
Expand All @@ -72,10 +86,9 @@ export class HMSWebrtcStats {
}
const publishStats: HMSPeerStats['publish'] | undefined =
publishReport && getLocalPeerStatsFromReport('publish', publishReport, prevLocalPeerStats);

let subscribeReport: RTCStatsReport | undefined;
try {
subscribeReport = await this.getStats.subscribe?.();
subscribeReport = await this.subscribeConnection?.getStats();
} catch (err: any) {
this.eventBus.analytics.publish(
AnalyticsEventFactory.rtcStatsFailed(ErrorFactory.WebrtcErrors.StatsFailed(HMSAction.SUBSCRIBE, err.message)),
Expand Down
49 changes: 31 additions & 18 deletions packages/hms-video-store/src/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export default class HMSTransport {
private publishDisconnectTimer = 0;
private listener?: HMSUpdateListener;
private onScreenshareStop = () => {};
private screenStream = new Set<MediaStream>();

constructor(
private observer: ITransportObserver,
Expand All @@ -95,12 +96,7 @@ export default class HMSTransport {
private analyticsTimer: AnalyticsTimer,
private pluginUsageTracker: PluginUsageTracker,
) {
this.webrtcInternals = new HMSWebrtcInternals(
this.store,
this.eventBus,
this.publishConnection?.nativeConnection,
this.subscribeConnection?.nativeConnection,
);
this.webrtcInternals = new HMSWebrtcInternals(this.store, this.eventBus);

const onStateChange = async (state: TransportState, error?: HMSException) => {
if (state !== this.state) {
Expand Down Expand Up @@ -281,7 +277,7 @@ export default class HMSTransport {
if (this.initConfig) {
await this.waitForLocalRoleAvailability();
await this.createConnectionsAndNegotiateJoin(customData, autoSubscribeVideo);
await this.initRtcStatsMonitor();
this.initStatsAnalytics();

HMSLogger.d(TAG, '✅ join: Negotiated over PUBLISH connection');
}
Expand Down Expand Up @@ -429,7 +425,7 @@ export default class HMSTransport {
this.sfuNodeId = id;
this.publishConnection?.setSfuNodeId(id);
this.subscribeConnection?.setSfuNodeId(id);
} else if (this.sfuNodeId !== id) {
} else if (id && this.sfuNodeId !== id) {
this.sfuNodeId = id;
this.handleSFUMigration();
}
Expand Down Expand Up @@ -489,11 +485,23 @@ export default class HMSTransport {
if (track) {
const stream = track.stream as HMSLocalStream;
if (!streamMap.get(stream.id)) {
streamMap.set(stream.id, new HMSLocalStream(new MediaStream()));
/**
* For screenshare, you need to clone the current stream only, cloning the track will not work otherwise, it will have all
* correct states but bytes sent and all other stats would be 0
**/
streamMap.set(
stream.id,
new HMSLocalStream(track.source === 'screen' ? stream.nativeStream.clone() : new MediaStream()),
);
}
this.store.removeTrack(track);
const newTrack = track.clone(streamMap.get(stream.id)!);
if (newTrack.type === 'video' && newTrack.source === 'screen') {
/**
* Store all the stream so they can be stopped when screenshare stopped. Stopping before is not helping
*/
this.screenStream.add(stream.nativeStream);
this.screenStream.add(newTrack.stream.nativeStream);
newTrack.nativeTrack.addEventListener('ended', this.onScreenshareStop);
}
track.cleanup();
Expand Down Expand Up @@ -596,6 +604,15 @@ export default class HMSTransport {
stream.removeSender(track);
await p;
await track.cleanup();
if (track.source === 'screen' && this.screenStream) {
// stop older screenshare tracks to remove the screenshare banner
this.screenStream.forEach(stream => {
stream.getTracks().forEach(_track => {
_track.stop();
});
this.screenStream.delete(stream);
});
}
// remove track from store on unpublish
this.store.removeTrack(track);
HMSLogger.d(TAG, `✅ unpublishTrack: trackId=${track.trackId}`, this.callbacks);
Expand Down Expand Up @@ -836,6 +853,11 @@ export default class HMSTransport {
);
}
}

this.webrtcInternals?.setPeerConnections({
publish: this.publishConnection?.nativeConnection,
subscribe: this.subscribeConnection?.nativeConnection,
});
}

private async negotiateJoinWithRetry({
Expand Down Expand Up @@ -1126,15 +1148,6 @@ export default class HMSTransport {
HMSLogger.d(TAG, '✅ internal connect: connected to ws endpoint');
}

private async initRtcStatsMonitor() {
this.webrtcInternals?.setPeerConnections({
publish: this.publishConnection?.nativeConnection,
subscribe: this.subscribeConnection?.nativeConnection,
});

this.initStatsAnalytics();
}

private initStatsAnalytics() {
if (this.isFlagEnabled(InitFlags.FLAG_PUBLISH_STATS)) {
this.publishStatsAnalytics = new PublishStatsAnalytics(
Expand Down

0 comments on commit 4ec5db3

Please sign in to comment.