From c7cd078bef5f0aa95d5df791bcde6cb9d1663e77 Mon Sep 17 00:00:00 2001 From: ognkrmms <12650228+ognkrmms@users.noreply.github.com> Date: Tue, 2 Jan 2024 23:05:37 +0300 Subject: [PATCH] initial commit --- CMakeLists.txt | 2 ++ examples/streamer/main.cpp | 28 ++++++++++++++++ include/rtc/metronome.hpp | 44 ++++++++++++++++++++++++ include/rtc/rtc.hpp | 2 +- src/impl/track.cpp | 5 +-- src/impl/track.hpp | 1 + src/metronome.cpp | 68 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 147 insertions(+), 3 deletions(-) create mode 100644 include/rtc/metronome.hpp create mode 100644 src/metronome.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5600db211..5e6ca2038 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,6 +86,7 @@ set(LIBDATACHANNEL_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/metronome.cpp ) set(LIBDATACHANNEL_HEADERS @@ -119,6 +120,7 @@ set(LIBDATACHANNEL_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/metronome.hpp ) set(LIBDATACHANNEL_IMPL_SOURCES diff --git a/examples/streamer/main.cpp b/examples/streamer/main.cpp index 75a509b36..e38f1e871 100644 --- a/examples/streamer/main.cpp +++ b/examples/streamer/main.cpp @@ -71,6 +71,32 @@ const uint16_t defaultPort = 8000; string ip_address = defaultIPAddress; uint16_t port = defaultPort; +class BucketPacer final : public rtc::PacerAlgorithm { + using clock = std::chrono::steady_clock; + // Budget is remaining bitrate. Pace is the maximum bitrate. + unsigned int mBudget, mPaceInBytes; + clock::time_point mPrevTime; + +public: + BucketPacer(unsigned int paceInBytes) : mPaceInBytes(paceInBytes), mBudget(paceInBytes) { + mPrevTime = clock::now(); + } + + unsigned int getBudget() override { + auto now = clock::now(); + if (now - mPrevTime > std::chrono::seconds(1)) { + mBudget = mPaceInBytes; + mPrevTime = now; + } + return mBudget; + } + unsigned int getPace() override { return mPaceInBytes; } + void setPace(unsigned int pace) override { mPaceInBytes = pace; }; + void setBudget(unsigned int budget) override { mBudget = budget; }; + void resetBudget() override { mBudget = mPaceInBytes; } +}; + + /// Incomming message handler for websocket /// @param message Incommint message /// @param config Configuration @@ -217,6 +243,8 @@ shared_ptr addVideo(const shared_ptr pc, const // add RTCP NACK handler auto nackResponder = make_shared(); packetizer->addToChain(nackResponder); + auto pacer = make_shared(1000000, make_shared(300000), nullptr); + packetizer->addToChain(pacer); // set handler track->setMediaHandler(packetizer); track->onOpen(onOpen); diff --git a/include/rtc/metronome.hpp b/include/rtc/metronome.hpp new file mode 100644 index 000000000..a8df20558 --- /dev/null +++ b/include/rtc/metronome.hpp @@ -0,0 +1,44 @@ +#ifndef PACER_H +#define PACER_H + +#if RTC_ENABLE_MEDIA + +#include +#include +#include "mediahandler.hpp" + +namespace rtc { + +class RTC_CPP_EXPORT PacerAlgorithm { +public: + virtual unsigned int getBudget() = 0; + virtual unsigned int getPace() = 0; + virtual void setPace(unsigned int pace) = 0; + virtual void setBudget(unsigned int budget) = 0; + virtual void resetBudget() = 0; +}; + +class RTC_CPP_EXPORT Metronome final : public MediaHandler { + std::deque mSendQueue; + size_t mQueueSizeInBytes; + size_t mMaxQueueSizeInBytes; + std::mutex mSendQueueMutex; + + std::chrono::milliseconds mThreadDelay; + std::function mProcessPacketsCallback; // For bookkeeping of sent packets + std::shared_ptr mPacerAlgorithm; + +public: + Metronome(size_t maxQueueSizeInBytes, std::shared_ptr pacerAlgorithm, + std::function processPacketsCallback); + + void outgoing(message_vector &messages, const message_callback &send) override; + void senderProcess(const message_callback &send); + +}; + +} // namespace rtc + +#endif /* RTC_ENABLE_MEDIA */ + +#endif /* PACER_H */ diff --git a/include/rtc/rtc.hpp b/include/rtc/rtc.hpp index ed10fa31f..d3910eeb0 100644 --- a/include/rtc/rtc.hpp +++ b/include/rtc/rtc.hpp @@ -37,5 +37,5 @@ #include "rtcpreceivingsession.hpp" #include "rtcpsrreporter.hpp" #include "rtppacketizer.hpp" - +#include "metronome.hpp" #endif // RTC_ENABLE_MEDIA diff --git a/src/impl/track.cpp b/src/impl/track.cpp index 80927fd76..a01a5620e 100644 --- a/src/impl/track.cpp +++ b/src/impl/track.cpp @@ -21,7 +21,8 @@ static LogCounter COUNTER_QUEUE_FULL(plog::warning, Track::Track(weak_ptr pc, Description::Media desc) : mPeerConnection(pc), mMediaDescription(std::move(desc)), - mRecvQueue(RECV_QUEUE_LIMIT, [](const message_ptr &m) { return m->size(); }) { + mRecvQueue(RECV_QUEUE_LIMIT, [](const message_ptr &m) { return m->size(); }), + send_callback([this](message_ptr m) { transportSend(m); }) { // Discard messages by default if track is send only if (mMediaDescription.direction() == Description::Direction::SendOnly) @@ -176,7 +177,7 @@ bool Track::outgoing(message_ptr message) { if (handler) { message_vector messages{std::move(message)}; - handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); }); + handler->outgoingChain(messages, send_callback); bool ret = false; for (auto &m : messages) ret = transportSend(std::move(m)); diff --git a/src/impl/track.hpp b/src/impl/track.hpp index ea1446b34..756bfefe8 100644 --- a/src/impl/track.hpp +++ b/src/impl/track.hpp @@ -56,6 +56,7 @@ class Track final : public std::enable_shared_from_this, public Channel { #endif bool transportSend(message_ptr message); + const message_callback send_callback; private: const weak_ptr mPeerConnection; diff --git a/src/metronome.cpp b/src/metronome.cpp new file mode 100644 index 000000000..7d1959c05 --- /dev/null +++ b/src/metronome.cpp @@ -0,0 +1,68 @@ +#if RTC_ENABLE_MEDIA + +#include "metronome.hpp" +#include "impl/threadpool.hpp" +#include "rtp.hpp" + +namespace rtc { + +using ThreadPool = rtc::impl::ThreadPool; + +Metronome::Metronome(size_t maxQueueSizeInBytes, std::shared_ptr pacer, + std::function processPacketsCallback) + : mMaxQueueSizeInBytes(maxQueueSizeInBytes), mPacerAlgorithm(pacer), + mProcessPacketsCallback(processPacketsCallback), mThreadDelay(5), mQueueSizeInBytes(0) {} + +void Metronome::outgoing(message_vector &messages, const message_callback &send) { + message_vector others; + std::unique_lock lock(mSendQueueMutex); + for (auto &message : messages) { + if (message->type != Message::Binary) { + others.push_back(std::move(message)); + continue; + } + mQueueSizeInBytes += message->size(); + mSendQueue.push_back(std::move(message)); + } + while (mQueueSizeInBytes > mMaxQueueSizeInBytes) { + size_t msg_size = mSendQueue.front()->size(); + // I am not dropping a packet that is just over the limit. + if (mMaxQueueSizeInBytes > mQueueSizeInBytes - msg_size) + break; + mQueueSizeInBytes -= msg_size; + mSendQueue.pop_front(); + } + messages.swap(others); + ThreadPool::Instance().schedule(std::chrono::milliseconds(0), [this, &send]() { senderProcess(send); }); +} + +void Metronome::senderProcess(const message_callback &send) { + if (!mSendQueue.empty()) { + unsigned int budget = mPacerAlgorithm->getBudget(); + message_vector outgoing; + { + std::unique_lock lock(mSendQueueMutex); + while (!mSendQueue.empty() && budget >= mSendQueue.front()->size()) { + size_t msg_size = mSendQueue.front()->size(); + budget -= msg_size; + mQueueSizeInBytes -= msg_size; + outgoing.push_back(std::move(mSendQueue.front())); + mSendQueue.pop_front(); + } + } + mPacerAlgorithm->setBudget(budget); + for (const auto &message : outgoing) { + send(message); + } + if (mProcessPacketsCallback) { + mProcessPacketsCallback(outgoing); + } + if (!mSendQueue.empty()) { + ThreadPool::Instance().schedule(mThreadDelay, [this, &send]() { senderProcess(send); }); + } + } +} + +} + +#endif