Skip to content

Commit

Permalink
Add PacingHandler
Browse files Browse the repository at this point in the history
MediaHandler that can be used to pace packet delivery

Resolves #1017
  • Loading branch information
Sean-Der committed Apr 17, 2024
1 parent fe7bec8 commit ffc534c
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp
)

set(LIBDATACHANNEL_HEADERS
Expand Down Expand Up @@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h
)

Expand Down
54 changes: 54 additions & 0 deletions include/rtc/pacinghandler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (c) 2020 Staz Modrzynski
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef RTC_PACING_HANDLER_H
#define RTC_PACING_HANDLER_H

#if RTC_ENABLE_MEDIA

#include "mediahandler.hpp"
#include "utils.hpp"

#include <atomic>
#include <deque>

namespace rtc {

// Paced sending of RTP packets. Takes a stream of RTP packets that can an
// uneven bitrate. It then delivers these packets in a smoother manner by
// sending a fixed size of them on an interval
class RTC_CPP_EXPORT PacingHandler : public MediaHandler {
public:
static constexpr float DefaultOverageFactor = 2.0;
PacingHandler(float bytesPerMillisecond, std::chrono::milliseconds sendInterval,
float overageFactor = DefaultOverageFactor);

void outgoing(message_vector &messages, const message_callback &send) override;

private:
float mBytesPerMillisecond;
std::chrono::milliseconds mSendInterval;

std::atomic<bool> mHaveScheduled = false;
std::chrono::time_point<std::chrono::high_resolution_clock> mLastRun;

size_t mOverage;
float mOverageFactor;

std::mutex mMutex;
std::deque<message_ptr> mRtpBuffer = {};

void schedule(const message_callback &send);
};

} // namespace rtc

#endif // RTC_ENABLE_MEDIA

#endif // RTC_PACING_HANDLER_H
1 change: 1 addition & 0 deletions include/rtc/rtc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "h265rtppacketizer.hpp"
#include "mediahandler.hpp"
#include "plihandler.hpp"
#include "pacinghandler.hpp"
#include "rtcpnackresponder.hpp"
#include "rtcpreceivingsession.hpp"
#include "rtcpsrreporter.hpp"
Expand Down
87 changes: 87 additions & 0 deletions src/pacinghandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#if RTC_ENABLE_MEDIA

#include <memory>

#include "pacinghandler.hpp"

#include "impl/internals.hpp"
#include "impl/threadpool.hpp"

namespace rtc {

PacingHandler::PacingHandler(float bitsPerSecond, std::chrono::milliseconds sendInterval,
float overageFactor)
: mBytesPerMillisecond((bitsPerSecond / 1000) / 8), mSendInterval(sendInterval),
mOverageFactor(overageFactor){};

void PacingHandler::schedule(const message_callback &send) {
if (!mHaveScheduled.exchange(true)) {
return;
}

impl::ThreadPool::Instance().schedule(mSendInterval, [weak_this = weak_from_this(), send]() {
if (auto locked = std::dynamic_pointer_cast<PacingHandler>(weak_this.lock())) {
const std::lock_guard<std::mutex> lock(locked->mMutex);
locked->mHaveScheduled.store(false);

// set byteBudget to how many milliseconds have elapsed since last run
size_t byteBudget = locked->mSendInterval.count();
if (locked->mLastRun !=
std::chrono::time_point<std::chrono::high_resolution_clock>::min()) {
byteBudget = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - locked->mLastRun)
.count();
}

// byteBudget is now the total amount of bytes that we can send since last run
byteBudget = static_cast<size_t>(byteBudget * locked->mBytesPerMillisecond);

size_t amountSent = 0;
while (!locked->mRtpBuffer.empty()) {
auto pktSize = locked->mRtpBuffer.front()->size();

// If overage is available spend it on current packet
// Otherwise don't send anymore packets
if ((amountSent + pktSize) >= byteBudget) {
if (locked->mOverage > pktSize) {
locked->mOverage -= pktSize;
} else {
return;
}
}

send(std::move(locked->mRtpBuffer.front()));
amountSent += pktSize;
locked->mRtpBuffer.pop_front();
}

// The remaining byteBudget is added to the overage
locked->mOverage += (byteBudget - amountSent);
locked->mOverage = std::min(static_cast<size_t>(byteBudget * locked->mOverageFactor),
locked->mOverage);
locked->mLastRun = std::chrono::high_resolution_clock::now();
}
});
}

void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {

std::lock_guard<std::mutex> lock(mMutex);

std::move(std::begin(messages), std::end(messages), std::back_inserter(mRtpBuffer));
messages.clear();

schedule(send);
}

} // namespace rtc

#endif /* RTC_ENABLE_MEDIA */

0 comments on commit ffc534c

Please sign in to comment.