Skip to content

Commit

Permalink
Simplify handling of voice tracks (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 authored Oct 31, 2022
1 parent b256ced commit 33bef20
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 116 deletions.
16 changes: 7 additions & 9 deletions service/rtc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ func (c *call) addSession(cfg SessionConfig, rtcConn *webrtc.PeerConnection, clo
}

s := &session{
cfg: cfg,
rtcConn: rtcConn,
iceInCh: make(chan []byte, signalChSize*2),
sdpInCh: make(chan []byte, signalChSize),
closeCh: make(chan struct{}),
closeCb: closeCb,
tracksCh: make(chan *webrtc.TrackLocalStaticRTP, tracksChSize),
trackEnableCh: make(chan bool, tracksChSize),
rtpSendersMap: make(map[*webrtc.TrackLocalStaticRTP]*webrtc.RTPSender),
cfg: cfg,
rtcConn: rtcConn,
iceInCh: make(chan []byte, signalChSize*2),
sdpInCh: make(chan []byte, signalChSize),
closeCh: make(chan struct{}),
closeCb: closeCb,
tracksCh: make(chan *webrtc.TrackLocalStaticRTP, tracksChSize),
}

c.sessions[cfg.SessionID] = s
Expand Down
22 changes: 18 additions & 4 deletions service/rtc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,25 @@ func (s *Server) msgReader() {
}
call.mut.Unlock()
case MuteMessage, UnmuteMessage:
select {
case session.trackEnableCh <- (msg.Type == MuteMessage):
default:
s.log.Error("failed to send track enable message: channel is full")
session.mut.RLock()
track := session.outVoiceTrack
session.mut.RUnlock()
if track == nil {
continue
}

var enabled bool
if msg.Type == UnmuteMessage {
enabled = true
}

s.log.Debug("setting voice track state",
mlog.Bool("enabled", enabled),
mlog.String("sessionID", session.cfg.SessionID))

session.mut.Lock()
session.outVoiceTrackEnabled = enabled
session.mut.Unlock()
default:
s.log.Error("received unexpected message type")
}
Expand Down
38 changes: 11 additions & 27 deletions service/rtc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"sync"
"time"

"github.com/mattermost/rtcd/service/random"

"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"

Expand All @@ -35,8 +33,6 @@ type session struct {
remoteScreenTrack *webrtc.TrackRemote
rtcConn *webrtc.PeerConnection
tracksCh chan *webrtc.TrackLocalStaticRTP
trackEnableCh chan bool
rtpSendersMap map[*webrtc.TrackLocalStaticRTP]*webrtc.RTPSender
iceInCh chan []byte
sdpInCh chan []byte

Expand Down Expand Up @@ -113,18 +109,18 @@ func (s *session) handleICE(log mlog.LoggerIFace, m Metrics) {

var candidate webrtc.ICECandidateInit
if err := json.Unmarshal(data, &candidate); err != nil {
log.Error("failed to encode ice candidate", mlog.Err(err))
log.Error("failed to encode ice candidate", mlog.Err(err), mlog.String("sessionID", s.cfg.SessionID))
continue
}

if candidate.Candidate == "" {
continue
}

log.Debug("setting ICE candidate for remote")
log.Debug("setting ICE candidate for remote", mlog.String("sessionID", s.cfg.SessionID))

if err := s.rtcConn.AddICECandidate(candidate); err != nil {
log.Error("failed to add ice candidate", mlog.Err(err))
log.Error("failed to add ice candidate", mlog.Err(err), mlog.String("sessionID", s.cfg.SessionID))
m.IncRTCErrors(s.cfg.GroupID, "ice")
continue
}
Expand All @@ -141,25 +137,26 @@ func (s *session) handlePLI(log mlog.LoggerIFace, call *call, sender *webrtc.RTP
for {
pkts, _, err := sender.ReadRTCP()
if err != nil {
log.Error("failed to read RTCP packet", mlog.Err(err))
log.Error("failed to read RTCP packet",
mlog.Err(err), mlog.String("sessionID", s.cfg.SessionID))
return
}
for _, pkt := range pkts {
if _, ok := pkt.(*rtcp.PictureLossIndication); ok {
screenSession := call.getScreenSession()
if screenSession == nil {
log.Error("screenSession should not be nil")
log.Error("screenSession should not be nil", mlog.String("sessionID", s.cfg.SessionID))
return
}

screenTrack := screenSession.getRemoteScreenTrack()
if screenTrack == nil {
log.Error("screenTrack should not be nil")
log.Error("screenTrack should not be nil", mlog.String("sessionID", s.cfg.SessionID))
return
}

if err := screenSession.rtcConn.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(screenTrack.SSRC())}}); err != nil {
log.Error("failed to write RTCP packet", mlog.Err(err))
log.Error("failed to write RTCP packet", mlog.Err(err), mlog.String("sessionID", s.cfg.SessionID))
return
}
}
Expand All @@ -168,20 +165,11 @@ func (s *session) handlePLI(log mlog.LoggerIFace, call *call, sender *webrtc.RTP
}

// addTrack adds the given track to the peer and starts negotiation.
func (s *session) addTrack(log mlog.LoggerIFace, c *call, sdpOutCh chan<- Message, track *webrtc.TrackLocalStaticRTP, enabled bool) error {
t := track
if !enabled {
dummyTrack, err := webrtc.NewTrackLocalStaticRTP(rtpAudioCodec, "voice", random.NewID())
if err != nil {
return fmt.Errorf("failed to create new static track: %w", err)
}
t = dummyTrack
}

sender, err := s.rtcConn.AddTrack(t)
func (s *session) addTrack(log mlog.LoggerIFace, c *call, sdpOutCh chan<- Message, track *webrtc.TrackLocalStaticRTP) error {
sender, err := s.rtcConn.AddTrack(track)
if err != nil {
return fmt.Errorf("failed to add track: %w", err)
} else if t.Kind() == webrtc.RTPCodecTypeVideo {
} else if track.Kind() == webrtc.RTPCodecTypeVideo {
go s.handlePLI(log, c, sender)
}

Expand Down Expand Up @@ -223,10 +211,6 @@ func (s *session) addTrack(log mlog.LoggerIFace, c *call, sdpOutCh chan<- Messag
return fmt.Errorf("failed to set remote description: %w", err)
}

s.mut.Lock()
s.rtpSendersMap[track] = sender
s.mut.Unlock()

return nil
}

Expand Down
Loading

0 comments on commit 33bef20

Please sign in to comment.