From 9e20a7d5b458da9dc228d22799aef188270dc7d1 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 14 Nov 2024 11:01:54 +0530 Subject: [PATCH] Send local participant SID with reconnect=1. (#558) Also, some minor clean up as room was reaching into client and setting a callback (for track unpublished) which was also going through engine. Just make it so that the chain is always room -> engine -> signalclient. --- engine.go | 13 ++++++++++++- room.go | 13 +++++++++++-- signalclient.go | 11 +++++++---- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/engine.go b/engine.go index c92b077..5d899cd 100644 --- a/engine.go +++ b/engine.go @@ -68,6 +68,7 @@ type RTCEngine struct { // callbacks OnLocalTrackUnpublished func(response *livekit.TrackUnpublishedResponse) + OnTrackRemoteMuted func(request *livekit.MuteTrackRequest) OnDisconnected func(reason DisconnectionReason) OnMediaTrack func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) OnParticipantUpdate func([]*livekit.ParticipantInfo) @@ -80,6 +81,9 @@ type RTCEngine struct { OnRestarted func(*livekit.JoinResponse) OnResuming func() OnResumed func() + + // callbacks to get data + CbGetLocalParticipantSID func() string } func NewRTCEngine() *RTCEngine { @@ -102,6 +106,7 @@ func NewRTCEngine() *RTCEngine { } e.client.OnLocalTrackPublished = e.handleLocalTrackPublished e.client.OnLocalTrackUnpublished = e.handleLocalTrackUnpublished + e.client.OnTrackRemoteMuted = e.handleTrackRemoteMuted e.client.OnConnectionQuality = func(cqi []*livekit.ConnectionQualityInfo) { if f := e.OnConnectionQuality; f != nil { f(cqi) @@ -501,6 +506,12 @@ func (e *RTCEngine) handleLocalTrackUnpublished(res *livekit.TrackUnpublishedRes } } +func (e *RTCEngine) handleTrackRemoteMuted(request *livekit.MuteTrackRequest) { + if e.OnTrackRemoteMuted != nil { + e.OnTrackRemoteMuted(request) + } +} + func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) { packet, err := e.readDataPacket(msg) if err != nil { @@ -605,7 +616,7 @@ func (e *RTCEngine) handleDisconnect(fullReconnect bool) { } func (e *RTCEngine) resumeConnection() error { - reconnect, err := e.client.Reconnect(e.url, e.token.Load(), *e.connParams) + reconnect, err := e.client.Reconnect(e.url, e.token.Load(), *e.connParams, e.CbGetLocalParticipantSID()) if err != nil { return err } diff --git a/room.go b/room.go index 30d8498..a84f3fd 100644 --- a/room.go +++ b/room.go @@ -197,8 +197,11 @@ func NewRoom(callback *RoomCallback) *Room { engine.OnRestarted = r.handleRestarted engine.OnResuming = r.handleResuming engine.OnResumed = r.handleResumed - engine.client.OnLocalTrackUnpublished = r.handleLocalTrackUnpublished - engine.client.OnTrackRemoteMuted = r.handleTrackRemoteMuted + engine.OnLocalTrackUnpublished = r.handleLocalTrackUnpublished + engine.OnTrackRemoteMuted = r.handleTrackRemoteMuted + + // callbacks engine can use to get data + engine.CbGetLocalParticipantSID = r.getLocalParticipantSID return r } @@ -818,6 +821,12 @@ func (r *Room) Simulate(scenario SimulateScenario) { } } +func (r *Room) getLocalParticipantSID() string { + return r.LocalParticipant.SID() +} + +// --------------------------------------------------------- + func unpackStreamID(packed string) (participantId string, trackId string) { parts := strings.Split(packed, "|") if len(parts) > 1 { diff --git a/signalclient.go b/signalclient.go index e6dfef0..382fbc3 100644 --- a/signalclient.go +++ b/signalclient.go @@ -82,7 +82,7 @@ func (c *SignalClient) IsStarted() bool { } func (c *SignalClient) Join(urlPrefix string, token string, params connectParams) (*livekit.JoinResponse, error) { - res, err := c.connect(urlPrefix, token, params) + res, err := c.connect(urlPrefix, token, params, "") if err != nil { return nil, err } @@ -97,9 +97,9 @@ func (c *SignalClient) Join(urlPrefix string, token string, params connectParams // Reconnect starts a new WebSocket connection to the server, passing in reconnect=1 // when successful, it'll return a ReconnectResponse; older versions of the server will not send back a ReconnectResponse -func (c *SignalClient) Reconnect(urlPrefix string, token string, params connectParams) (*livekit.ReconnectResponse, error) { +func (c *SignalClient) Reconnect(urlPrefix string, token string, params connectParams, participantSID string) (*livekit.ReconnectResponse, error) { params.Reconnect = true - res, err := c.connect(urlPrefix, token, params) + res, err := c.connect(urlPrefix, token, params, participantSID) if err != nil { return nil, err } @@ -121,7 +121,7 @@ func (c *SignalClient) Reconnect(urlPrefix string, token string, params connectP return nil, nil } -func (c *SignalClient) connect(urlPrefix string, token string, params connectParams) (*livekit.SignalResponse, error) { +func (c *SignalClient) connect(urlPrefix string, token string, params connectParams, participantSID string) (*livekit.SignalResponse, error) { if urlPrefix == "" { return nil, ErrURLNotProvided } @@ -135,6 +135,9 @@ func (c *SignalClient) connect(urlPrefix string, token string, params connectPar } if params.Reconnect { urlSuffix += "&reconnect=1" + if participantSID != "" { + urlSuffix += fmt.Sprintf("&sid=%s", participantSID) + } } urlSuffix += getStatsParamString()