From 56d8de09fd0529996eb91606f3ee469faeab976c Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Sun, 7 Apr 2024 00:02:49 -0400 Subject: [PATCH 1/4] Add PacingHandler MediaHandler that can be used to pace packet delivery Resolves #1017 Co-authored-by: Paul-Louis Ageneau --- CMakeLists.txt | 2 + include/rtc/pacinghandler.hpp | 51 ++++++++++++++++++++++++ include/rtc/rtc.hpp | 1 + src/impl/track.cpp | 12 +++++- src/pacinghandler.cpp | 75 +++++++++++++++++++++++++++++++++++ 5 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 include/rtc/pacinghandler.hpp create mode 100644 src/pacinghandler.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index cd8f0476a..034697b03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp ) set(LIBDATACHANNEL_HEADERS @@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h ) diff --git a/include/rtc/pacinghandler.hpp b/include/rtc/pacinghandler.hpp new file mode 100644 index 000000000..286a93f6e --- /dev/null +++ b/include/rtc/pacinghandler.hpp @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2020 Staz Modrzynski + * Copyright (c) 2020 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#ifndef RTC_PACING_HANDLER_H +#define RTC_PACING_HANDLER_H + +#if RTC_ENABLE_MEDIA + +#include "mediahandler.hpp" +#include "utils.hpp" + +#include +#include + +namespace rtc { + +// Paced sending of RTP packets. Takes a stream of RTP packets that can an +// uneven bitrate. It then delivers these packets in a smoother manner by +// sending a fixed size of them on an interval +class RTC_CPP_EXPORT PacingHandler : public MediaHandler { +public: + PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval); + + void outgoing(message_vector &messages, const message_callback &send) override; + +private: + std::atomic mHaveScheduled = false; + + double mBytesPerSecond; + double mBudget; + + std::chrono::milliseconds mSendInterval; + std::chrono::time_point mLastRun; + + std::mutex mMutex; + std::queue mRtpBuffer; + + void schedule(const message_callback &send); +}; + +} // namespace rtc + +#endif // RTC_ENABLE_MEDIA + +#endif // RTC_PACING_HANDLER_H diff --git a/include/rtc/rtc.hpp b/include/rtc/rtc.hpp index 683219fc9..2f3f46b59 100644 --- a/include/rtc/rtc.hpp +++ b/include/rtc/rtc.hpp @@ -34,6 +34,7 @@ #include "h265rtppacketizer.hpp" #include "mediahandler.hpp" #include "plihandler.hpp" +#include "pacinghandler.hpp" #include "rtcpnackresponder.hpp" #include "rtcpreceivingsession.hpp" #include "rtcpsrreporter.hpp" diff --git a/src/impl/track.cpp b/src/impl/track.cpp index bb2d29557..588045755 100644 --- a/src/impl/track.cpp +++ b/src/impl/track.cpp @@ -142,7 +142,11 @@ void Track::incoming(message_ptr message) { message_vector messages{std::move(message)}; if (auto handler = getMediaHandler()) - handler->incomingChain(messages, [this](message_ptr m) { transportSend(m); }); + handler->incomingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) { + if (auto locked = weak_this.lock()) { + transportSend(m); + } + }); for (auto &m : messages) { // Tail drop if queue is full @@ -175,7 +179,11 @@ bool Track::outgoing(message_ptr message) { if (handler) { message_vector messages{std::move(message)}; - handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); }); + handler->outgoingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) { + if (auto locked = weak_this.lock()) { + transportSend(m); + } + }); bool ret = false; for (auto &m : messages) ret = transportSend(std::move(m)); diff --git a/src/pacinghandler.cpp b/src/pacinghandler.cpp new file mode 100644 index 000000000..2dccadc01 --- /dev/null +++ b/src/pacinghandler.cpp @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2020 Filip Klembara (in2core) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#if RTC_ENABLE_MEDIA + +#include + +#include "pacinghandler.hpp" + +#include "impl/internals.hpp" +#include "impl/threadpool.hpp" + +namespace rtc { + +PacingHandler::PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval) + : mBytesPerSecond(bitsPerSecond / 8), mBudget(0), mSendInterval(sendInterval){}; + +void PacingHandler::schedule(const message_callback &send) { + if (!mHaveScheduled.exchange(true)) { + return; + } + + impl::ThreadPool::Instance().schedule(mSendInterval, [this, weak_this = weak_from_this(), + send]() { + if (auto locked = weak_this.lock()) { + const std::lock_guard lock(mMutex); + mHaveScheduled.store(false); + + // Update the budget and cap it + auto newBudget = + std::chrono::duration(std::chrono::high_resolution_clock::now() - mLastRun) + .count() * + mBytesPerSecond; + auto maxBudget = std::chrono::duration(mSendInterval).count() * mBytesPerSecond; + mBudget = std::min(mBudget + newBudget, maxBudget); + mLastRun = std::chrono::high_resolution_clock::now(); + + // Send packets while there is budget, allow a single partial packet over budget + while (!mRtpBuffer.empty() && mBudget > 0) { + auto size = int(mRtpBuffer.front()->size()); + send(std::move(mRtpBuffer.front())); + mRtpBuffer.pop(); + mBudget -= size; + } + + if (!mRtpBuffer.empty()) { + printf("\n more \n"); + schedule(send); + } else { + printf("\n empty \n"); + } + } + }); +} + +void PacingHandler::outgoing(message_vector &messages, const message_callback &send) { + + std::lock_guard lock(mMutex); + + for (auto &m : messages) { + mRtpBuffer.push(std::move(m)); + } + messages.clear(); + + schedule(send); +} + +} // namespace rtc + +#endif /* RTC_ENABLE_MEDIA */ From 1ab7d869e79ffdd156c70682a481334eb7a2b6aa Mon Sep 17 00:00:00 2001 From: Paul-Louis Ageneau Date: Fri, 26 Apr 2024 22:26:42 +0200 Subject: [PATCH 2/4] Remove debug printf --- src/pacinghandler.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/pacinghandler.cpp b/src/pacinghandler.cpp index 2dccadc01..f1f1cbbd0 100644 --- a/src/pacinghandler.cpp +++ b/src/pacinghandler.cpp @@ -49,10 +49,7 @@ void PacingHandler::schedule(const message_callback &send) { } if (!mRtpBuffer.empty()) { - printf("\n more \n"); schedule(send); - } else { - printf("\n empty \n"); } } }); From f63702c669db5f2fd276ef74d8ef1e0b35125d59 Mon Sep 17 00:00:00 2001 From: Paul-Louis Ageneau Date: Fri, 26 Apr 2024 22:40:08 +0200 Subject: [PATCH 3/4] Fix copyright in pacinghandler.hpp --- include/rtc/pacinghandler.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/rtc/pacinghandler.hpp b/include/rtc/pacinghandler.hpp index 286a93f6e..5e6ace6e3 100644 --- a/include/rtc/pacinghandler.hpp +++ b/include/rtc/pacinghandler.hpp @@ -1,6 +1,5 @@ /** - * Copyright (c) 2020 Staz Modrzynski - * Copyright (c) 2020 Paul-Louis Ageneau + * Copyright (c) 2024 Sean DuBois * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this From 118a692e41d58773ded9426606bec1bd9e9649a0 Mon Sep 17 00:00:00 2001 From: Paul-Louis Ageneau Date: Fri, 26 Apr 2024 22:41:02 +0200 Subject: [PATCH 4/4] Fix copyright in pacinghandler.cpp --- src/pacinghandler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pacinghandler.cpp b/src/pacinghandler.cpp index f1f1cbbd0..c8c447292 100644 --- a/src/pacinghandler.cpp +++ b/src/pacinghandler.cpp @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020 Filip Klembara (in2core) + * Copyright (c) 2024 Sean DuBois * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this