Skip to content

Commit

Permalink
Send local participant SID with reconnect=1. (#558)
Browse files Browse the repository at this point in the history
Also, some minor clean up as room was reaching into client and setting a
callback (for track unpublished) which was also going through engine.
Just make it so that the chain is always room -> engine -> signalclient.
  • Loading branch information
boks1971 authored Nov 14, 2024
1 parent 9fe0702 commit 9e20a7d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
13 changes: 12 additions & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type RTCEngine struct {

// callbacks
OnLocalTrackUnpublished func(response *livekit.TrackUnpublishedResponse)
OnTrackRemoteMuted func(request *livekit.MuteTrackRequest)
OnDisconnected func(reason DisconnectionReason)
OnMediaTrack func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver)
OnParticipantUpdate func([]*livekit.ParticipantInfo)
Expand All @@ -80,6 +81,9 @@ type RTCEngine struct {
OnRestarted func(*livekit.JoinResponse)
OnResuming func()
OnResumed func()

// callbacks to get data
CbGetLocalParticipantSID func() string
}

func NewRTCEngine() *RTCEngine {
Expand All @@ -102,6 +106,7 @@ func NewRTCEngine() *RTCEngine {
}
e.client.OnLocalTrackPublished = e.handleLocalTrackPublished
e.client.OnLocalTrackUnpublished = e.handleLocalTrackUnpublished
e.client.OnTrackRemoteMuted = e.handleTrackRemoteMuted
e.client.OnConnectionQuality = func(cqi []*livekit.ConnectionQualityInfo) {
if f := e.OnConnectionQuality; f != nil {
f(cqi)
Expand Down Expand Up @@ -501,6 +506,12 @@ func (e *RTCEngine) handleLocalTrackUnpublished(res *livekit.TrackUnpublishedRes
}
}

func (e *RTCEngine) handleTrackRemoteMuted(request *livekit.MuteTrackRequest) {
if e.OnTrackRemoteMuted != nil {
e.OnTrackRemoteMuted(request)
}
}

func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) {
packet, err := e.readDataPacket(msg)
if err != nil {
Expand Down Expand Up @@ -605,7 +616,7 @@ func (e *RTCEngine) handleDisconnect(fullReconnect bool) {
}

func (e *RTCEngine) resumeConnection() error {
reconnect, err := e.client.Reconnect(e.url, e.token.Load(), *e.connParams)
reconnect, err := e.client.Reconnect(e.url, e.token.Load(), *e.connParams, e.CbGetLocalParticipantSID())
if err != nil {
return err
}
Expand Down
13 changes: 11 additions & 2 deletions room.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ func NewRoom(callback *RoomCallback) *Room {
engine.OnRestarted = r.handleRestarted
engine.OnResuming = r.handleResuming
engine.OnResumed = r.handleResumed
engine.client.OnLocalTrackUnpublished = r.handleLocalTrackUnpublished
engine.client.OnTrackRemoteMuted = r.handleTrackRemoteMuted
engine.OnLocalTrackUnpublished = r.handleLocalTrackUnpublished
engine.OnTrackRemoteMuted = r.handleTrackRemoteMuted

// callbacks engine can use to get data
engine.CbGetLocalParticipantSID = r.getLocalParticipantSID

return r
}
Expand Down Expand Up @@ -818,6 +821,12 @@ func (r *Room) Simulate(scenario SimulateScenario) {
}
}

func (r *Room) getLocalParticipantSID() string {
return r.LocalParticipant.SID()
}

// ---------------------------------------------------------

func unpackStreamID(packed string) (participantId string, trackId string) {
parts := strings.Split(packed, "|")
if len(parts) > 1 {
Expand Down
11 changes: 7 additions & 4 deletions signalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *SignalClient) IsStarted() bool {
}

func (c *SignalClient) Join(urlPrefix string, token string, params connectParams) (*livekit.JoinResponse, error) {
res, err := c.connect(urlPrefix, token, params)
res, err := c.connect(urlPrefix, token, params, "")
if err != nil {
return nil, err
}
Expand All @@ -97,9 +97,9 @@ func (c *SignalClient) Join(urlPrefix string, token string, params connectParams

// Reconnect starts a new WebSocket connection to the server, passing in reconnect=1
// when successful, it'll return a ReconnectResponse; older versions of the server will not send back a ReconnectResponse
func (c *SignalClient) Reconnect(urlPrefix string, token string, params connectParams) (*livekit.ReconnectResponse, error) {
func (c *SignalClient) Reconnect(urlPrefix string, token string, params connectParams, participantSID string) (*livekit.ReconnectResponse, error) {
params.Reconnect = true
res, err := c.connect(urlPrefix, token, params)
res, err := c.connect(urlPrefix, token, params, participantSID)
if err != nil {
return nil, err
}
Expand All @@ -121,7 +121,7 @@ func (c *SignalClient) Reconnect(urlPrefix string, token string, params connectP
return nil, nil
}

func (c *SignalClient) connect(urlPrefix string, token string, params connectParams) (*livekit.SignalResponse, error) {
func (c *SignalClient) connect(urlPrefix string, token string, params connectParams, participantSID string) (*livekit.SignalResponse, error) {
if urlPrefix == "" {
return nil, ErrURLNotProvided
}
Expand All @@ -135,6 +135,9 @@ func (c *SignalClient) connect(urlPrefix string, token string, params connectPar
}
if params.Reconnect {
urlSuffix += "&reconnect=1"
if participantSID != "" {
urlSuffix += fmt.Sprintf("&sid=%s", participantSID)
}
}
urlSuffix += getStatsParamString()

Expand Down

0 comments on commit 9e20a7d

Please sign in to comment.