From 99287252b786637455f009235a5b3f760ef9f2a7 Mon Sep 17 00:00:00 2001 From: Paul-Louis Ageneau Date: Sat, 17 Feb 2024 20:32:08 +0100 Subject: [PATCH] Properly implement the renegotiation needed mechanism --- include/rtc/description.hpp | 6 +- include/rtc/peerconnection.hpp | 1 + src/capi.cpp | 4 +- src/description.cpp | 11 ++- src/impl/peerconnection.cpp | 122 ++++++++++++++++++++++++++------- src/impl/peerconnection.hpp | 8 ++- src/peerconnection.cpp | 56 ++++++++------- 7 files changed, 146 insertions(+), 62 deletions(-) diff --git a/include/rtc/description.hpp b/include/rtc/description.hpp index ac24f4306..c1ce87b1d 100644 --- a/include/rtc/description.hpp +++ b/include/rtc/description.hpp @@ -281,9 +281,9 @@ class RTC_CPP_EXPORT Description { int addAudio(string mid = "audio", Direction dir = Direction::SendOnly); void clearMedia(); - variant media(unsigned int index); - variant media(unsigned int index) const; - unsigned int mediaCount() const; + variant media(int index); + variant media(int index) const; + int mediaCount() const; const Application *application() const; Application *application(); diff --git a/include/rtc/peerconnection.hpp b/include/rtc/peerconnection.hpp index 4f0a049ca..d2d73537d 100644 --- a/include/rtc/peerconnection.hpp +++ b/include/rtc/peerconnection.hpp @@ -86,6 +86,7 @@ class RTC_CPP_EXPORT PeerConnection final : CheshireCat { IceState iceState() const; GatheringState gatheringState() const; SignalingState signalingState() const; + bool negotiationNeeded() const; bool hasMedia() const; optional localDescription() const; optional remoteDescription() const; diff --git a/src/capi.cpp b/src/capi.cpp index 4fa456392..bf7bca3c0 100644 --- a/src/capi.cpp +++ b/src/capi.cpp @@ -1440,7 +1440,7 @@ int rtcGetSsrcsForType(const char *mediaType, const char *sdp, uint32_t *buffer, auto oldSDP = string(sdp); auto description = Description(oldSDP, "unspec"); auto mediaCount = description.mediaCount(); - for (unsigned int i = 0; i < mediaCount; i++) { + for (int i = 0; i < mediaCount; i++) { if (std::holds_alternative(description.media(i))) { auto media = std::get(description.media(i)); auto currentMediaType = lowercased(media->type()); @@ -1461,7 +1461,7 @@ int rtcSetSsrcForType(const char *mediaType, const char *sdp, char *buffer, cons auto prevSDP = string(sdp); auto description = Description(prevSDP, "unspec"); auto mediaCount = description.mediaCount(); - for (unsigned int i = 0; i < mediaCount; i++) { + for (int i = 0; i < mediaCount; i++) { if (std::holds_alternative(description.media(i))) { auto media = std::get(description.media(i)); auto currentMediaType = lowercased(media->type()); diff --git a/src/description.cpp b/src/description.cpp index 77d0d8033..27bee555a 100644 --- a/src/description.cpp +++ b/src/description.cpp @@ -493,8 +493,8 @@ void Description::clearMedia() { mApplication.reset(); } -variant Description::media(unsigned int index) { - if (index >= mEntries.size()) +variant Description::media(int index) { + if (index < 0 || index >= int(mEntries.size())) throw std::out_of_range("Media index out of range"); const auto &entry = mEntries[index]; @@ -514,9 +514,8 @@ variant Description::media(uns } } -variant -Description::media(unsigned int index) const { - if (index >= mEntries.size()) +variant Description::media(int index) const { + if (index < 0 || index >= int(mEntries.size())) throw std::out_of_range("Media index out of range"); const auto &entry = mEntries[index]; @@ -536,7 +535,7 @@ Description::media(unsigned int index) const { } } -unsigned int Description::mediaCount() const { return unsigned(mEntries.size()); } +int Description::mediaCount() const { return int(mEntries.size()); } Description::Entry::Entry(const string &mline, string mid, Direction dir) : mMid(std::move(mid)), mDirection(dir) { diff --git a/src/impl/peerconnection.cpp b/src/impl/peerconnection.cpp index 39f94dd44..aee3906f7 100644 --- a/src/impl/peerconnection.cpp +++ b/src/impl/peerconnection.cpp @@ -85,7 +85,6 @@ PeerConnection::~PeerConnection() { } void PeerConnection::close() { - negotiationNeeded = false; if (!closing.exchange(true)) { PLOG_VERBOSE << "Closing PeerConnection"; if (auto transport = std::atomic_load(&mSctpTransport)) @@ -829,27 +828,58 @@ void PeerConnection::iterateTracks(std::function track)> } } +void PeerConnection::iterateRemoteTracks(std::function track)> func) { + auto remote = remoteDescription(); + if(!remote) + return; + + std::vector> locked; + { + std::shared_lock lock(mTracksMutex); // read-only + locked.reserve(mTracks.size()); + for(int i = 0; i < remote->mediaCount(); ++i) { + if (std::holds_alternative(remote->media(i))) { + auto remoteMedia = std::get(remote->media(i)); + if (!remoteMedia->isRemoved()) + if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) + if (auto track = it->second.lock()) + locked.push_back(std::move(track)); + } + } + } + + for (auto &track : locked) { + try { + func(std::move(track)); + } catch (const std::exception &e) { + PLOG_WARNING << e.what(); + } + } +} + + void PeerConnection::openTracks() { #if RTC_ENABLE_MEDIA - if (auto transport = std::atomic_load(&mDtlsTransport)) { - auto srtpTransport = std::dynamic_pointer_cast(transport); - - iterateTracks([&](const shared_ptr &track) { - if (!track->isOpen()) { - if (srtpTransport) { - track->open(srtpTransport); - } else { - // A track was added during a latter renegotiation, whereas SRTP transport was - // not initialized. This is an optimization to use the library with data - // channels only. Set forceMediaTransport to true to initialize the transport - // before dynamically adding tracks. - auto errorMsg = "The connection has no media transport"; - PLOG_ERROR << errorMsg; - track->triggerError(errorMsg); - } + auto transport = std::atomic_load(&mDtlsTransport); + if (!transport) + return; + + auto srtpTransport = std::dynamic_pointer_cast(transport); + iterateRemoteTracks([&](shared_ptr track) { + if(!track->isOpen()) { + if (srtpTransport) { + track->open(srtpTransport); + } else { + // A track was added during a latter renegotiation, whereas SRTP transport was + // not initialized. This is an optimization to use the library with data + // channels only. Set forceMediaTransport to true to initialize the transport + // before dynamically adding tracks. + auto errorMsg = "The connection has no media transport"; + PLOG_ERROR << errorMsg; + track->triggerError(errorMsg); } - }); - } + } + }); #endif } @@ -872,7 +902,7 @@ void PeerConnection::validateRemoteDescription(const Description &description) { throw std::invalid_argument("Remote description has no media line"); int activeMediaCount = 0; - for (unsigned int i = 0; i < description.mediaCount(); ++i) + for (int i = 0; i < description.mediaCount(); ++i) std::visit(rtc::overloaded{[&](const Description::Application *application) { if (!application->isRemoved()) ++activeMediaCount; @@ -900,7 +930,7 @@ void PeerConnection::processLocalDescription(Description description) { if (auto remote = remoteDescription()) { // Reciprocate remote description - for (unsigned int i = 0; i < remote->mediaCount(); ++i) + for (int i = 0; i < remote->mediaCount(); ++i) std::visit( // reciprocate each media rtc::overloaded{ [&](Description::Application *remoteApp) { @@ -1027,8 +1057,7 @@ void PeerConnection::processLocalDescription(Description description) { } } - // There might be no media at this point if the user created a Track, deleted it, - // then called setLocalDescription(). + // There might be no media at this point, for instance if the user deleted tracks if (description.mediaCount() == 0) throw std::runtime_error("No DataChannel or Track to negotiate"); } @@ -1102,8 +1131,8 @@ void PeerConnection::processRemoteDescription(Description description) { mRemoteDescription->addCandidates(std::move(existingCandidates)); } + auto dtlsTransport = std::atomic_load(&mDtlsTransport); if (description.hasApplication()) { - auto dtlsTransport = std::atomic_load(&mDtlsTransport); auto sctpTransport = std::atomic_load(&mSctpTransport); if (!sctpTransport && dtlsTransport && dtlsTransport->state() == Transport::State::Connected) @@ -1111,6 +1140,10 @@ void PeerConnection::processRemoteDescription(Description description) { } else { mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this()); } + + if (dtlsTransport && dtlsTransport->state() == Transport::State::Connected) + mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this()); + } void PeerConnection::processRemoteCandidate(Candidate candidate) { @@ -1156,6 +1189,45 @@ string PeerConnection::localBundleMid() const { return mLocalDescription ? mLocalDescription->bundleMid() : "0"; } +bool PeerConnection::negotiationNeeded() const { + auto description = localDescription(); + + { + std::shared_lock lock(mDataChannelsMutex); + if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) + if(!description || !description->hasApplication()) { + PLOG_DEBUG << "Negotiation needed for data channels"; + return true; + } + } + + { + std::shared_lock lock(mTracksMutex); + for(const auto &[mid, weakTrack] : mTracks) + if (auto track = weakTrack.lock()) + if (!description || !description->hasMid(track->mid())) { + PLOG_DEBUG << "Negotiation needed to add track, mid=" << track->mid(); + return true; + } + + if(description) { + for(int i = 0; i < description->mediaCount(); ++i) { + if (std::holds_alternative(description->media(i))) { + auto media = std::get(description->media(i)); + if (!media->isRemoved()) + if (auto it = mTracks.find(media->mid()); it != mTracks.end()) + if (auto track = it->second.lock(); !track || track->isClosed()) { + PLOG_DEBUG << "Negotiation needed to remove track, mid=" << track->mid(); + return true; + } + } + } + } + } + + return false; +} + void PeerConnection::setMediaHandler(shared_ptr handler) { std::unique_lock lock(mMediaHandlerMutex); mMediaHandler = handler; @@ -1321,7 +1393,7 @@ void PeerConnection::updateTrackSsrcCache(const Description &description) { std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc // Setup SSRC -> Track mapping - for (unsigned int i = 0; i < description.mediaCount(); ++i) + for (int i = 0; i < description.mediaCount(); ++i) std::visit( // ssrc -> track mapping rtc::overloaded{ [&](Description::Application const *) { return; }, diff --git a/src/impl/peerconnection.hpp b/src/impl/peerconnection.hpp index a72f58575..8f98ff6ed 100644 --- a/src/impl/peerconnection.hpp +++ b/src/impl/peerconnection.hpp @@ -70,6 +70,7 @@ struct PeerConnection : std::enable_shared_from_this { shared_ptr emplaceTrack(Description::Media description); void iterateTracks(std::function track)> func); + void iterateRemoteTracks(std::function track)> func); void openTracks(); void closeTracks(); @@ -80,6 +81,8 @@ struct PeerConnection : std::enable_shared_from_this { void processRemoteCandidate(Candidate candidate); string localBundleMid() const; + bool negotiationNeeded() const; + void setMediaHandler(shared_ptr handler); shared_ptr getMediaHandler(); @@ -115,7 +118,6 @@ struct PeerConnection : std::enable_shared_from_this { std::atomic iceState = IceState::New; std::atomic gatheringState = GatheringState::New; std::atomic signalingState = SignalingState::Stable; - std::atomic negotiationNeeded = false; std::atomic closing = false; std::mutex signalingMutex; @@ -154,12 +156,12 @@ struct PeerConnection : std::enable_shared_from_this { std::unordered_map> mDataChannels; // by stream ID std::vector> mUnassignedDataChannels; - std::shared_mutex mDataChannelsMutex; + mutable std::shared_mutex mDataChannelsMutex; std::unordered_map> mTracks; // by mid std::unordered_map> mTracksBySsrc; // by SSRC std::vector> mTrackLines; // by SDP order - std::shared_mutex mTracksMutex; + mutable std::shared_mutex mTracksMutex; Queue> mPendingDataChannels; Queue> mPendingTracks; diff --git a/src/peerconnection.cpp b/src/peerconnection.cpp index 20d6b1f75..5e6cd2739 100644 --- a/src/peerconnection.cpp +++ b/src/peerconnection.cpp @@ -61,6 +61,10 @@ PeerConnection::SignalingState PeerConnection::signalingState() const { return impl()->signalingState; } +bool PeerConnection::negotiationNeeded() const { + return impl()->negotiationNeeded(); +} + optional PeerConnection::localDescription() const { return impl()->localDescription(); } @@ -98,12 +102,6 @@ void PeerConnection::setLocalDescription(Description::Type type, LocalDescriptio type = Description::Type::Offer; } - // Only a local offer resets the negotiation needed flag - if (type == Description::Type::Offer && !impl()->negotiationNeeded.exchange(false)) { - PLOG_DEBUG << "No negotiation needed"; - return; - } - // Get the new signaling state SignalingState newSignalingState; switch (signalingState) { @@ -151,6 +149,12 @@ void PeerConnection::setLocalDescription(Description::Type type, LocalDescriptio impl()->changeSignalingState(newSignalingState); signalingLock.unlock(); + if (!impl()->config.disableAutoNegotiation && newSignalingState == SignalingState::Stable) { + // We might need to make a new offer + if (impl()->negotiationNeeded()) + setLocalDescription(Description::Type::Offer); + } + if (impl()->gatheringState == GatheringState::New && !impl()->config.disableAutoGathering) { iceTransport->gatherLocalCandidates(impl()->localBundleMid()); } @@ -239,7 +243,6 @@ void PeerConnection::setRemoteDescription(Description description) { // Candidates will be added at the end, extract them for now auto remoteCandidates = description.extractCandidates(); - auto type = description.type(); auto iceTransport = impl()->initIceTransport(); if (!iceTransport) @@ -251,14 +254,26 @@ void PeerConnection::setRemoteDescription(Description description) { impl()->changeSignalingState(newSignalingState); signalingLock.unlock(); - if (type == Description::Type::Offer) { - // This is an offer, we need to answer - if (!impl()->config.disableAutoNegotiation) - setLocalDescription(Description::Type::Answer); - } - for (const auto &candidate : remoteCandidates) addRemoteCandidate(candidate); + + if (!impl()->config.disableAutoNegotiation) { + switch (newSignalingState) { + case SignalingState::Stable: + // We might need to make a new offer + if (impl()->negotiationNeeded()) + setLocalDescription(Description::Type::Offer); + break; + + case SignalingState::HaveRemoteOffer: + // We need to answer + setLocalDescription(Description::Type::Answer); + break; + + default: + break; + } + } } void PeerConnection::addRemoteCandidate(Candidate candidate) { @@ -289,13 +304,11 @@ shared_ptr PeerConnection::createDataChannel(string label, DataChan auto channelImpl = impl()->emplaceDataChannel(std::move(label), std::move(init)); auto channel = std::make_shared(channelImpl); - // Renegotiation is needed iff the current local description does not have application - auto local = impl()->localDescription(); - if (!local || !local->hasApplication()) - impl()->negotiationNeeded = true; - - if (!impl()->config.disableAutoNegotiation) - setLocalDescription(); + if (!impl()->config.disableAutoNegotiation && impl()->signalingState.load() == SignalingState::Stable) { + // We might need to make a new offer + if (impl()->negotiationNeeded()) + setLocalDescription(Description::Type::Offer); + } return channel; } @@ -310,9 +323,6 @@ std::shared_ptr PeerConnection::addTrack(Description::Media description) auto trackImpl = impl()->emplaceTrack(std::move(description)); auto track = std::make_shared(trackImpl); - // Renegotiation is needed for the new or updated track - impl()->negotiationNeeded = true; - return track; }