Skip to content

Commit

Permalink
Add PacingHandler
Browse files Browse the repository at this point in the history
MediaHandler that can be used to pace packet delivery

Resolves #1017

Co-authored-by: Paul-Louis Ageneau <[email protected]>
  • Loading branch information
Sean-Der and paullouisageneau committed Apr 26, 2024
1 parent fe7bec8 commit 56d8de0
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp
)

set(LIBDATACHANNEL_HEADERS
Expand Down Expand Up @@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h
)

Expand Down
51 changes: 51 additions & 0 deletions include/rtc/pacinghandler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (c) 2020 Staz Modrzynski
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef RTC_PACING_HANDLER_H
#define RTC_PACING_HANDLER_H

#if RTC_ENABLE_MEDIA

#include "mediahandler.hpp"
#include "utils.hpp"

#include <atomic>
#include <queue>

namespace rtc {

// Paced sending of RTP packets. Takes a stream of RTP packets that can an
// uneven bitrate. It then delivers these packets in a smoother manner by
// sending a fixed size of them on an interval
class RTC_CPP_EXPORT PacingHandler : public MediaHandler {
public:
PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval);

void outgoing(message_vector &messages, const message_callback &send) override;

private:
std::atomic<bool> mHaveScheduled = false;

double mBytesPerSecond;
double mBudget;

std::chrono::milliseconds mSendInterval;
std::chrono::time_point<std::chrono::high_resolution_clock> mLastRun;

std::mutex mMutex;
std::queue<message_ptr> mRtpBuffer;

void schedule(const message_callback &send);
};

} // namespace rtc

#endif // RTC_ENABLE_MEDIA

#endif // RTC_PACING_HANDLER_H
1 change: 1 addition & 0 deletions include/rtc/rtc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "h265rtppacketizer.hpp"
#include "mediahandler.hpp"
#include "plihandler.hpp"
#include "pacinghandler.hpp"
#include "rtcpnackresponder.hpp"
#include "rtcpreceivingsession.hpp"
#include "rtcpsrreporter.hpp"
Expand Down
12 changes: 10 additions & 2 deletions src/impl/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ void Track::incoming(message_ptr message) {

message_vector messages{std::move(message)};
if (auto handler = getMediaHandler())
handler->incomingChain(messages, [this](message_ptr m) { transportSend(m); });
handler->incomingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
if (auto locked = weak_this.lock()) {
transportSend(m);
}
});

for (auto &m : messages) {
// Tail drop if queue is full
Expand Down Expand Up @@ -175,7 +179,11 @@ bool Track::outgoing(message_ptr message) {

if (handler) {
message_vector messages{std::move(message)};
handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); });
handler->outgoingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
if (auto locked = weak_this.lock()) {
transportSend(m);
}
});
bool ret = false;
for (auto &m : messages)
ret = transportSend(std::move(m));
Expand Down
75 changes: 75 additions & 0 deletions src/pacinghandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#if RTC_ENABLE_MEDIA

#include <memory>

#include "pacinghandler.hpp"

#include "impl/internals.hpp"
#include "impl/threadpool.hpp"

namespace rtc {

PacingHandler::PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval)
: mBytesPerSecond(bitsPerSecond / 8), mBudget(0), mSendInterval(sendInterval){};

void PacingHandler::schedule(const message_callback &send) {
if (!mHaveScheduled.exchange(true)) {
return;
}

impl::ThreadPool::Instance().schedule(mSendInterval, [this, weak_this = weak_from_this(),
send]() {
if (auto locked = weak_this.lock()) {
const std::lock_guard<std::mutex> lock(mMutex);
mHaveScheduled.store(false);

// Update the budget and cap it
auto newBudget =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - mLastRun)
.count() *
mBytesPerSecond;
auto maxBudget = std::chrono::duration<double>(mSendInterval).count() * mBytesPerSecond;
mBudget = std::min(mBudget + newBudget, maxBudget);
mLastRun = std::chrono::high_resolution_clock::now();

// Send packets while there is budget, allow a single partial packet over budget
while (!mRtpBuffer.empty() && mBudget > 0) {
auto size = int(mRtpBuffer.front()->size());
send(std::move(mRtpBuffer.front()));
mRtpBuffer.pop();
mBudget -= size;
}

if (!mRtpBuffer.empty()) {
printf("\n more \n");
schedule(send);
} else {
printf("\n empty \n");
}
}
});
}

void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {

std::lock_guard<std::mutex> lock(mMutex);

for (auto &m : messages) {
mRtpBuffer.push(std::move(m));
}
messages.clear();

schedule(send);
}

} // namespace rtc

#endif /* RTC_ENABLE_MEDIA */

0 comments on commit 56d8de0

Please sign in to comment.