Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ognkrmms committed Jan 2, 2024
1 parent a721f3a commit c7cd078
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/metronome.cpp
)

set(LIBDATACHANNEL_HEADERS
Expand Down Expand Up @@ -119,6 +120,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/metronome.hpp
)

set(LIBDATACHANNEL_IMPL_SOURCES
Expand Down
28 changes: 28 additions & 0 deletions examples/streamer/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@ const uint16_t defaultPort = 8000;
string ip_address = defaultIPAddress;
uint16_t port = defaultPort;

class BucketPacer final : public rtc::PacerAlgorithm {
using clock = std::chrono::steady_clock;
// Budget is remaining bitrate. Pace is the maximum bitrate.
unsigned int mBudget, mPaceInBytes;
clock::time_point mPrevTime;

public:
BucketPacer(unsigned int paceInBytes) : mPaceInBytes(paceInBytes), mBudget(paceInBytes) {
mPrevTime = clock::now();
}

unsigned int getBudget() override {
auto now = clock::now();
if (now - mPrevTime > std::chrono::seconds(1)) {
mBudget = mPaceInBytes;
mPrevTime = now;
}
return mBudget;
}
unsigned int getPace() override { return mPaceInBytes; }
void setPace(unsigned int pace) override { mPaceInBytes = pace; };
void setBudget(unsigned int budget) override { mBudget = budget; };
void resetBudget() override { mBudget = mPaceInBytes; }
};


/// Incomming message handler for websocket
/// @param message Incommint message
/// @param config Configuration
Expand Down Expand Up @@ -217,6 +243,8 @@ shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const
// add RTCP NACK handler
auto nackResponder = make_shared<RtcpNackResponder>();
packetizer->addToChain(nackResponder);
auto pacer = make_shared<Metronome>(1000000, make_shared<BucketPacer>(300000), nullptr);
packetizer->addToChain(pacer);
// set handler
track->setMediaHandler(packetizer);
track->onOpen(onOpen);
Expand Down
44 changes: 44 additions & 0 deletions include/rtc/metronome.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#ifndef PACER_H
#define PACER_H

#if RTC_ENABLE_MEDIA

#include <deque>
#include <chrono>
#include "mediahandler.hpp"

namespace rtc {

class RTC_CPP_EXPORT PacerAlgorithm {
public:
virtual unsigned int getBudget() = 0;
virtual unsigned int getPace() = 0;
virtual void setPace(unsigned int pace) = 0;
virtual void setBudget(unsigned int budget) = 0;
virtual void resetBudget() = 0;
};

class RTC_CPP_EXPORT Metronome final : public MediaHandler {
std::deque<message_ptr> mSendQueue;
size_t mQueueSizeInBytes;
size_t mMaxQueueSizeInBytes;
std::mutex mSendQueueMutex;

std::chrono::milliseconds mThreadDelay;
std::function<void(message_vector&)> mProcessPacketsCallback; // For bookkeeping of sent packets
std::shared_ptr<PacerAlgorithm> mPacerAlgorithm;

public:
Metronome(size_t maxQueueSizeInBytes, std::shared_ptr<PacerAlgorithm> pacerAlgorithm,
std::function<void(message_vector &)> processPacketsCallback);

void outgoing(message_vector &messages, const message_callback &send) override;
void senderProcess(const message_callback &send);

};

} // namespace rtc

#endif /* RTC_ENABLE_MEDIA */

#endif /* PACER_H */
2 changes: 1 addition & 1 deletion include/rtc/rtc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@
#include "rtcpreceivingsession.hpp"
#include "rtcpsrreporter.hpp"
#include "rtppacketizer.hpp"

#include "metronome.hpp"
#endif // RTC_ENABLE_MEDIA
5 changes: 3 additions & 2 deletions src/impl/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ static LogCounter COUNTER_QUEUE_FULL(plog::warning,

Track::Track(weak_ptr<PeerConnection> pc, Description::Media desc)
: mPeerConnection(pc), mMediaDescription(std::move(desc)),
mRecvQueue(RECV_QUEUE_LIMIT, [](const message_ptr &m) { return m->size(); }) {
mRecvQueue(RECV_QUEUE_LIMIT, [](const message_ptr &m) { return m->size(); }),
send_callback([this](message_ptr m) { transportSend(m); }) {

// Discard messages by default if track is send only
if (mMediaDescription.direction() == Description::Direction::SendOnly)
Expand Down Expand Up @@ -176,7 +177,7 @@ bool Track::outgoing(message_ptr message) {

if (handler) {
message_vector messages{std::move(message)};
handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); });
handler->outgoingChain(messages, send_callback);
bool ret = false;
for (auto &m : messages)
ret = transportSend(std::move(m));
Expand Down
1 change: 1 addition & 0 deletions src/impl/track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Track final : public std::enable_shared_from_this<Track>, public Channel {
#endif

bool transportSend(message_ptr message);
const message_callback send_callback;

private:
const weak_ptr<PeerConnection> mPeerConnection;
Expand Down
68 changes: 68 additions & 0 deletions src/metronome.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#if RTC_ENABLE_MEDIA

#include "metronome.hpp"
#include "impl/threadpool.hpp"
#include "rtp.hpp"

namespace rtc {

using ThreadPool = rtc::impl::ThreadPool;

Metronome::Metronome(size_t maxQueueSizeInBytes, std::shared_ptr<PacerAlgorithm> pacer,
std::function<void(message_vector &)> processPacketsCallback)
: mMaxQueueSizeInBytes(maxQueueSizeInBytes), mPacerAlgorithm(pacer),
mProcessPacketsCallback(processPacketsCallback), mThreadDelay(5), mQueueSizeInBytes(0) {}

void Metronome::outgoing(message_vector &messages, const message_callback &send) {
message_vector others;
std::unique_lock<std::mutex> lock(mSendQueueMutex);
for (auto &message : messages) {
if (message->type != Message::Binary) {
others.push_back(std::move(message));
continue;
}
mQueueSizeInBytes += message->size();
mSendQueue.push_back(std::move(message));
}
while (mQueueSizeInBytes > mMaxQueueSizeInBytes) {
size_t msg_size = mSendQueue.front()->size();
// I am not dropping a packet that is just over the limit.
if (mMaxQueueSizeInBytes > mQueueSizeInBytes - msg_size)
break;
mQueueSizeInBytes -= msg_size;
mSendQueue.pop_front();
}
messages.swap(others);
ThreadPool::Instance().schedule(std::chrono::milliseconds(0), [this, &send]() { senderProcess(send); });
}

void Metronome::senderProcess(const message_callback &send) {
if (!mSendQueue.empty()) {
unsigned int budget = mPacerAlgorithm->getBudget();
message_vector outgoing;
{
std::unique_lock<std::mutex> lock(mSendQueueMutex);
while (!mSendQueue.empty() && budget >= mSendQueue.front()->size()) {
size_t msg_size = mSendQueue.front()->size();
budget -= msg_size;
mQueueSizeInBytes -= msg_size;
outgoing.push_back(std::move(mSendQueue.front()));
mSendQueue.pop_front();
}
}
mPacerAlgorithm->setBudget(budget);
for (const auto &message : outgoing) {
send(message);
}
if (mProcessPacketsCallback) {
mProcessPacketsCallback(outgoing);
}
if (!mSendQueue.empty()) {
ThreadPool::Instance().schedule(mThreadDelay, [this, &send]() { senderProcess(send); });
}
}
}

}

#endif

0 comments on commit c7cd078

Please sign in to comment.