diff --git a/CMakeLists.txt b/CMakeLists.txt index cd8f0476a..034697b03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp ) set(LIBDATACHANNEL_HEADERS @@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h ) diff --git a/include/rtc/pacinghandler.hpp b/include/rtc/pacinghandler.hpp new file mode 100644 index 000000000..c8687792b --- /dev/null +++ b/include/rtc/pacinghandler.hpp @@ -0,0 +1,54 @@ +/** + * Copyright (c) 2020 Staz Modrzynski + * Copyright (c) 2020 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#ifndef RTC_PACING_HANDLER_H +#define RTC_PACING_HANDLER_H + +#if RTC_ENABLE_MEDIA + +#include "mediahandler.hpp" +#include "utils.hpp" + +#include +#include + +namespace rtc { + +// Paced sending of RTP packets. Takes a stream of RTP packets that can an +// uneven bitrate. It then delivers these packets in a smoother manner by +// sending a fixed size of them on an interval +class RTC_CPP_EXPORT PacingHandler : public MediaHandler { +public: + static constexpr float DefaultOverageFactor = 2.0; + PacingHandler(float bytesPerMillisecond, std::chrono::milliseconds sendInterval, + float overageFactor = DefaultOverageFactor); + + void outgoing(message_vector &messages, const message_callback &send) override; + +private: + float mBytesPerMillisecond; + std::chrono::milliseconds mSendInterval; + + std::atomic mHaveScheduled = false; + std::chrono::time_point mLastRun; + + size_t mOverage; + float mOverageFactor; + + std::mutex mMutex; + std::deque mRtpBuffer = {}; + + void schedule(const message_callback &send); +}; + +} // namespace rtc + +#endif // RTC_ENABLE_MEDIA + +#endif // RTC_PACING_HANDLER_H diff --git a/include/rtc/rtc.hpp b/include/rtc/rtc.hpp index 683219fc9..2f3f46b59 100644 --- a/include/rtc/rtc.hpp +++ b/include/rtc/rtc.hpp @@ -34,6 +34,7 @@ #include "h265rtppacketizer.hpp" #include "mediahandler.hpp" #include "plihandler.hpp" +#include "pacinghandler.hpp" #include "rtcpnackresponder.hpp" #include "rtcpreceivingsession.hpp" #include "rtcpsrreporter.hpp" diff --git a/src/pacinghandler.cpp b/src/pacinghandler.cpp new file mode 100644 index 000000000..51eaa6221 --- /dev/null +++ b/src/pacinghandler.cpp @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2020 Filip Klembara (in2core) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#if RTC_ENABLE_MEDIA + +#include + +#include "pacinghandler.hpp" + +#include "impl/internals.hpp" +#include "impl/threadpool.hpp" + +namespace rtc { + +PacingHandler::PacingHandler(float bitsPerSecond, std::chrono::milliseconds sendInterval, + float overageFactor) + : mBytesPerMillisecond((bitsPerSecond / 1000) / 8), mSendInterval(sendInterval), + mOverageFactor(overageFactor){}; + +void PacingHandler::schedule(const message_callback &send) { + if (!mHaveScheduled.exchange(true)) { + return; + } + + impl::ThreadPool::Instance().schedule(mSendInterval, [weak_this = weak_from_this(), send]() { + if (auto locked = std::dynamic_pointer_cast(weak_this.lock())) { + const std::lock_guard lock(locked->mMutex); + locked->mHaveScheduled.store(false); + + // set byteBudget to how many milliseconds have elapsed since last run + size_t byteBudget = locked->mSendInterval.count(); + if (locked->mLastRun != + std::chrono::time_point::min()) { + byteBudget = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - locked->mLastRun) + .count(); + } + + // byteBudget is now the total amount of bytes that we can send since last run + byteBudget = static_cast(byteBudget * locked->mBytesPerMillisecond); + + size_t amountSent = 0; + while (!locked->mRtpBuffer.empty()) { + auto pktSize = locked->mRtpBuffer.front()->size(); + + // If overage is available spend it on current packet + // Otherwise don't send anymore packets + if ((amountSent + pktSize) >= byteBudget) { + if (locked->mOverage > pktSize) { + locked->mOverage -= pktSize; + } else { + return; + } + } + + send(std::move(locked->mRtpBuffer.front())); + amountSent += pktSize; + locked->mRtpBuffer.pop_front(); + } + + // The remaining byteBudget is added to the overage + locked->mOverage += (byteBudget - amountSent); + locked->mOverage = std::min(static_cast(byteBudget * locked->mOverageFactor), + locked->mOverage); + locked->mLastRun = std::chrono::high_resolution_clock::now(); + } + }); +} + +void PacingHandler::outgoing(message_vector &messages, const message_callback &send) { + + std::lock_guard lock(mMutex); + + std::move(std::begin(messages), std::end(messages), std::back_inserter(mRtpBuffer)); + messages.clear(); + + schedule(send); +} + +} // namespace rtc + +#endif /* RTC_ENABLE_MEDIA */