Skip to content

Commit

Permalink
Prevent race in participant update. (#308)
Browse files Browse the repository at this point in the history
In load test, participant updates were racing causing subscription
failures. The sequence is
- Join response received
- Read worker started
- Add remote participants from join response
- But, before that could happen, because the read worker is active,
  there was a participant update which contained track information. That
  path added the remote participant before join response handler
  could add it.
- Eventually join response path comes around and adds the remote
  participant, but that version of remote participant info does not have
  the tracks yet. And it ends up overwriting the participant info. And
  it also drops tracks that are not there in the update.
- As the order got reversed, tracks added by participant update path
  gets wiped out by join response path.
- And subscription fails.

Adding a simple fix to reject participant info updates that are older
version.

It will probably be better to process join response fully?
Maybe, start the read worker only after processing join response?

NOTE: When the remote participant gets created via the participant
update path, the local participant itself is not set up fully.
  • Loading branch information
boks1971 authored Sep 9, 2023
1 parent 0513228 commit a30cb3f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
8 changes: 7 additions & 1 deletion participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,13 @@ func (p *baseParticipant) setConnectionQualityInfo(info *livekit.ConnectionQuali
p.roomCallback.OnConnectionQualityChanged(info, p)
}

func (p *baseParticipant) updateInfo(pi *livekit.ParticipantInfo, participant Participant) {
func (p *baseParticipant) updateInfo(pi *livekit.ParticipantInfo, participant Participant) bool {
p.lock.Lock()
if p.info != nil && p.info.Version > pi.Version {
// already updated with a later version
p.lock.Unlock()
return false
}
p.info = pi
p.identity = pi.Identity
p.sid = pi.Sid
Expand All @@ -177,6 +182,7 @@ func (p *baseParticipant) updateInfo(pi *livekit.ParticipantInfo, participant Pa
p.Callback.OnMetadataChanged(oldMetadata, participant)
p.roomCallback.OnMetadataChanged(oldMetadata, participant)
}
return true
}

func (p *baseParticipant) addPublication(publication TrackPublication) {
Expand Down
5 changes: 4 additions & 1 deletion remoteparticipant.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func newRemoteParticipant(pi *livekit.ParticipantInfo, roomCallback *RoomCallbac
}

func (p *RemoteParticipant) updateInfo(pi *livekit.ParticipantInfo) {
p.baseParticipant.updateInfo(pi, p)
if !p.baseParticipant.updateInfo(pi, p) {
// not a valid update, could be due to older version
return
}
// update tracks
validPubs := make(map[string]TrackPublication)
newPubs := make(map[string]TrackPublication)
Expand Down
14 changes: 7 additions & 7 deletions room.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,19 +350,19 @@ func (r *Room) handleParticipantUpdate(participants []*livekit.ParticipantInfo)
continue
}

p := r.GetParticipant(pi.Sid)
isNew := p == nil
rp := r.GetParticipant(pi.Sid)
isNew := rp == nil

if pi.State == livekit.ParticipantInfo_DISCONNECTED {
// remove
if p != nil {
r.handleParticipantDisconnect(p)
if rp != nil {
r.handleParticipantDisconnect(rp)
}
} else if isNew {
p = r.addRemoteParticipant(pi, true)
go r.callback.OnParticipantConnected(p)
rp = r.addRemoteParticipant(pi, true)
go r.callback.OnParticipantConnected(rp)
} else {
p.updateInfo(pi)
rp.updateInfo(pi)
}
}
}
Expand Down

0 comments on commit a30cb3f

Please sign in to comment.