From 1d553f64b487f45193e77ad4496e3a435574d2bf Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Sun, 7 Apr 2024 00:02:49 -0400 Subject: [PATCH] Add PacingHandler MediaHandler that can be used to pace packet delivery Resolves #1017 --- CMakeLists.txt | 2 ++ include/rtc/pacinghandler.hpp | 51 ++++++++++++++++++++++++++ include/rtc/rtc.hpp | 1 + src/impl/track.cpp | 12 +++++-- src/pacinghandler.cpp | 68 +++++++++++++++++++++++++++++++++++ 5 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 include/rtc/pacinghandler.hpp create mode 100644 src/pacinghandler.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index cd8f0476a..034697b03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp ) set(LIBDATACHANNEL_HEADERS @@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h ) diff --git a/include/rtc/pacinghandler.hpp b/include/rtc/pacinghandler.hpp new file mode 100644 index 000000000..cf8d78321 --- /dev/null +++ b/include/rtc/pacinghandler.hpp @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2020 Staz Modrzynski + * Copyright (c) 2020 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#ifndef RTC_PACING_HANDLER_H +#define RTC_PACING_HANDLER_H + +#if RTC_ENABLE_MEDIA + +#include "mediahandler.hpp" +#include "utils.hpp" + +#include +#include + +namespace rtc { + +// Paced sending of RTP packets. Takes a stream of RTP packets that can an +// uneven bitrate. It then delivers these packets in a smoother manner by +// sending a fixed size of them on an interval +class RTC_CPP_EXPORT PacingHandler : public MediaHandler { +public: + PacingHandler(double mBytesPerSecond, std::chrono::milliseconds sendInterval); + + void outgoing(message_vector &messages, const message_callback &send) override; + +private: + std::atomic mHaveScheduled = false; + + double mBytesPerSecond; + double mBudget; + + std::chrono::milliseconds mSendInterval; + std::chrono::time_point mLastRun; + + std::mutex mMutex; + std::queue mRtpBuffer = {}; + + void schedule(const message_callback &send); +}; + +} // namespace rtc + +#endif // RTC_ENABLE_MEDIA + +#endif // RTC_PACING_HANDLER_H diff --git a/include/rtc/rtc.hpp b/include/rtc/rtc.hpp index 683219fc9..2f3f46b59 100644 --- a/include/rtc/rtc.hpp +++ b/include/rtc/rtc.hpp @@ -34,6 +34,7 @@ #include "h265rtppacketizer.hpp" #include "mediahandler.hpp" #include "plihandler.hpp" +#include "pacinghandler.hpp" #include "rtcpnackresponder.hpp" #include "rtcpreceivingsession.hpp" #include "rtcpsrreporter.hpp" diff --git a/src/impl/track.cpp b/src/impl/track.cpp index bb2d29557..155c8afaf 100644 --- a/src/impl/track.cpp +++ b/src/impl/track.cpp @@ -142,7 +142,11 @@ void Track::incoming(message_ptr message) { message_vector messages{std::move(message)}; if (auto handler = getMediaHandler()) - handler->incomingChain(messages, [this](message_ptr m) { transportSend(m); }); + handler->incomingChain(messages, [weak_this = weak_from_this()](message_ptr m) { + if (auto locked = weak_this.lock()) { + locked->transportSend(m); + } + }); for (auto &m : messages) { // Tail drop if queue is full @@ -175,7 +179,11 @@ bool Track::outgoing(message_ptr message) { if (handler) { message_vector messages{std::move(message)}; - handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); }); + handler->outgoingChain(messages, [weak_this = weak_from_this()](message_ptr m) { + if (auto locked = weak_this.lock()) { + locked->transportSend(m); + } + }); bool ret = false; for (auto &m : messages) ret = transportSend(std::move(m)); diff --git a/src/pacinghandler.cpp b/src/pacinghandler.cpp new file mode 100644 index 000000000..b9b928a31 --- /dev/null +++ b/src/pacinghandler.cpp @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2020 Filip Klembara (in2core) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#if RTC_ENABLE_MEDIA + +#include + +#include "pacinghandler.hpp" + +#include "impl/internals.hpp" +#include "impl/threadpool.hpp" + +namespace rtc { + +PacingHandler::PacingHandler(double bytesPerSecond, std::chrono::milliseconds sendInterval) + : mBytesPerSecond(bytesPerSecond), mSendInterval(sendInterval){}; + +void PacingHandler::schedule(const message_callback &send) { + if (!mHaveScheduled.exchange(true)) { + return; + } + + impl::ThreadPool::Instance().schedule(mSendInterval, [weak_this = weak_from_this(), send]() { + if (auto locked = std::dynamic_pointer_cast(weak_this.lock())) { + const std::lock_guard lock(locked->mMutex); + locked->mHaveScheduled.store(false); + + // Update the budget and cap it + auto newBudget = std::chrono::duration( + std::chrono::high_resolution_clock::now() - locked->mLastRun) * + locked->mBytesPerSecond; + auto maxBudget = + std::chrono::duration(locked->mSendInterval) * locked->mBytesPerSecond; + locked->mBudget = std::min(locked->mBudget + newBudget.count(), maxBudget.count()); + + // Send packets while there is budget, allow a single partial packet over budget + while (!locked->mRtpBuffer.empty() && locked->mBudget > 0) { + auto size = int(locked->mRtpBuffer.front()->size()); + send(std::move(locked->mRtpBuffer.front())); + locked->mRtpBuffer.pop(); + locked->mBudget -= size; + } + + locked->mLastRun = std::chrono::high_resolution_clock::now(); + } + }); +} + +void PacingHandler::outgoing(message_vector &messages, const message_callback &send) { + + std::lock_guard lock(mMutex); + + while (messages.size() > 0) { + mRtpBuffer.push(std::move(messages.front())); + messages.erase(messages.begin()); + } + + schedule(send); +} + +} // namespace rtc + +#endif /* RTC_ENABLE_MEDIA */