diff --git a/peerconnection.go b/peerconnection.go index e3389f5fbb4..900dae1a8af 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1608,68 +1608,71 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err // undeclaredMediaProcessor handles RTP/RTCP packets that don't match any a:ssrc lines func (pc *PeerConnection) undeclaredMediaProcessor() { - go func() { - var simulcastRoutineCount uint64 - for { - srtpSession, err := pc.dtlsTransport.getSRTPSession() - if err != nil { - pc.log.Warnf("undeclaredMediaProcessor failed to open SrtpSession: %v", err) - return - } + go pc.undeclaredRTPMediaProcessor() + go pc.undeclaredRTCPMediaProcessor() +} - stream, ssrc, err := srtpSession.AcceptStream() - if err != nil { - pc.log.Warnf("Failed to accept RTP %v", err) - return - } +func (pc *PeerConnection) undeclaredRTPMediaProcessor() { + var simulcastRoutineCount uint64 + for { + srtpSession, err := pc.dtlsTransport.getSRTPSession() + if err != nil { + pc.log.Warnf("undeclaredMediaProcessor failed to open SrtpSession: %v", err) + return + } - if pc.isClosed.get() { - if err = stream.Close(); err != nil { - pc.log.Warnf("Failed to close RTP stream %v", err) - } - continue - } + stream, ssrc, err := srtpSession.AcceptStream() + if err != nil { + pc.log.Warnf("Failed to accept RTP %v", err) + return + } - if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines { - atomic.AddUint64(&simulcastRoutineCount, ^uint64(0)) - pc.log.Warn(ErrSimulcastProbeOverflow.Error()) - pc.dtlsTransport.storeSimulcastStream(stream) - continue + if pc.isClosed.get() { + if err = stream.Close(); err != nil { + pc.log.Warnf("Failed to close RTP stream %v", err) } + continue + } - go func(rtpStream io.Reader, ssrc SSRC) { - if err := pc.handleIncomingSSRC(rtpStream, ssrc); err != nil { - pc.log.Errorf(incomingUnhandledRTPSsrc, ssrc, err) - pc.dtlsTransport.storeSimulcastStream(stream) - } - atomic.AddUint64(&simulcastRoutineCount, ^uint64(0)) - }(stream, SSRC(ssrc)) + if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines { + atomic.AddUint64(&simulcastRoutineCount, ^uint64(0)) + pc.log.Warn(ErrSimulcastProbeOverflow.Error()) + pc.dtlsTransport.storeSimulcastStream(stream) + continue } - }() - go func() { - var unhandledStreams []*srtp.ReadStreamSRTCP - defer func() { - for _, s := range unhandledStreams { - s.Close() - } - }() - for { - srtcpSession, err := pc.dtlsTransport.getSRTCPSession() - if err != nil { - pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err) - return + go func(rtpStream io.Reader, ssrc SSRC) { + if err := pc.handleIncomingSSRC(rtpStream, ssrc); err != nil { + pc.log.Errorf(incomingUnhandledRTPSsrc, ssrc, err) + pc.dtlsTransport.storeSimulcastStream(stream) } + atomic.AddUint64(&simulcastRoutineCount, ^uint64(0)) + }(stream, SSRC(ssrc)) + } +} - stream, ssrc, err := srtcpSession.AcceptStream() - if err != nil { - pc.log.Warnf("Failed to accept RTCP %v", err) - return - } - pc.log.Warnf("Incoming unhandled RTCP ssrc(%d), OnTrack will not be fired", ssrc) - unhandledStreams = append(unhandledStreams, stream) +func (pc *PeerConnection) undeclaredRTCPMediaProcessor() { + var unhandledStreams []*srtp.ReadStreamSRTCP + defer func() { + for _, s := range unhandledStreams { + _ = s.Close() } }() + for { + srtcpSession, err := pc.dtlsTransport.getSRTCPSession() + if err != nil { + pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err) + return + } + + stream, ssrc, err := srtcpSession.AcceptStream() + if err != nil { + pc.log.Warnf("Failed to accept RTCP %v", err) + return + } + pc.log.Warnf("Incoming unhandled RTCP ssrc(%d), OnTrack will not be fired", ssrc) + unhandledStreams = append(unhandledStreams, stream) + } } // RemoteDescription returns pendingRemoteDescription if it is not null and