Skip to content

Commit

Permalink
Add MediaSample
Browse files Browse the repository at this point in the history
Depacketizers will include metadata about assembled media.
  • Loading branch information
Sean-Der committed Mar 4, 2024
1 parent 437a758 commit fab084d
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 37 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/common.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/global.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/message.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/mediasample.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/peerconnection.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h
Expand Down
3 changes: 2 additions & 1 deletion include/rtc/h264rtpdepacketizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class RTC_CPP_EXPORT H264RtpDepacketizer : public MediaHandler {
private:
std::vector<message_ptr> mRtpBuffer;

message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt);
message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt,
uint32_t timestamp);
};

} // namespace rtc
Expand Down
27 changes: 27 additions & 0 deletions include/rtc/mediasample.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2019-2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef RTC_FRAMEINFO_H
#define RTC_FRAMEINFO_H

#if RTC_ENABLE_MEDIA

namespace rtc {

struct RTC_CPP_EXPORT MediaSample : binary {
MediaSample(uint32_t timestamp) : timestamp(timestamp){};
MediaSample(binary &&data, uint32_t timestamp)
: binary(std::move(data)), timestamp(timestamp){};

uint32_t timestamp = 0; // RTP Timestamp
};

} // namespace rtc

#endif // RTC_ENABLE_MEDIA
#endif // RTC_FRAMEINFO_H
12 changes: 10 additions & 2 deletions include/rtc/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define RTC_MESSAGE_H

#include "common.hpp"
#include "mediasample.hpp"
#include "reliability.hpp"

#include <functional>
Expand All @@ -32,6 +33,10 @@ struct RTC_CPP_EXPORT Message : binary {
unsigned int stream = 0; // Stream id (SCTP stream or SSRC)
unsigned int dscp = 0; // Differentiated Services Code Point
shared_ptr<Reliability> reliability;

#if RTC_ENABLE_MEDIA
shared_ptr<MediaSample> media_sample;
#endif
};

using message_ptr = shared_ptr<Message>;
Expand All @@ -44,10 +49,12 @@ inline size_t message_size_func(const message_ptr &m) {

template <typename Iterator>
message_ptr make_message(Iterator begin, Iterator end, Message::Type type = Message::Binary,
unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr) {
unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr,
shared_ptr<MediaSample> media_sample = nullptr) {
auto message = std::make_shared<Message>(begin, end, type);
message->stream = stream;
message->reliability = reliability;
message->media_sample = media_sample;
return message;
}

Expand All @@ -57,7 +64,8 @@ RTC_CPP_EXPORT message_ptr make_message(size_t size, Message::Type type = Messag

RTC_CPP_EXPORT message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
unsigned int stream = 0,
shared_ptr<Reliability> reliability = nullptr);
shared_ptr<Reliability> reliability = nullptr,
shared_ptr<MediaSample> media_sample = nullptr);

RTC_CPP_EXPORT message_ptr make_message(size_t size, message_ptr orig);

Expand Down
2 changes: 2 additions & 0 deletions include/rtc/track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class RTC_CPP_EXPORT Track final : private CheshireCat<impl::Track>, public Chan
bool isClosed(void) const override;
size_t maxMessageSize() const override;

void onMediaSample(std::function<void(MediaSample data)> callback);

bool requestKeyframe();
bool requestBitrate(unsigned int bitrate);

Expand Down
47 changes: 26 additions & 21 deletions src/h264rtpdepacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ const uint8_t naluTypeSTAPA = 24;
const uint8_t naluTypeFUA = 28;

message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
message_vector::iterator end) {
message_vector::iterator end, uint32_t timestamp) {
message_vector out = {};
auto fua_buffer = std::vector<std::byte>{};
auto media_sample = std::make_shared<MediaSample>(timestamp);

for (auto it = begin; it != end; it++) {
auto pkt = it->get();
Expand All @@ -58,11 +59,13 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
fua_buffer.at(0) =
std::byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType());

out.push_back(make_message(std::move(fua_buffer)));
out.push_back(
make_message(std::move(fua_buffer), Message::Binary, 0, nullptr, media_sample));
fua_buffer.clear();
}
} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
out.push_back(make_message(pkt->begin() + headerSize, pkt->end()));
out.push_back(make_message(pkt->begin() + headerSize, pkt->end(), Message::Binary, 0,
nullptr, media_sample));
} else if (nalUnitHeader.unitType() == naluTypeSTAPA) {
auto currOffset = stapaHeaderSize + headerSize;

Expand All @@ -76,11 +79,11 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
throw std::runtime_error("STAP-A declared size is larger then buffer");
}

out.push_back(
make_message(pkt->begin() + currOffset, pkt->begin() + currOffset + naluSize));
out.push_back(make_message(pkt->begin() + currOffset,
pkt->begin() + currOffset + naluSize, Message::Binary, 0,
nullptr, media_sample));
currOffset += naluSize;
}

} else {
throw std::runtime_error("Unknown H264 RTP Packetization");
}
Expand All @@ -90,20 +93,22 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
}

void H264RtpDepacketizer::incoming(message_vector &messages, const message_callback &) {
for (auto message : messages) {
if (message->type == Message::Control) {
continue; // RTCP
}

if (message->size() < sizeof(RtpHeader)) {
PLOG_VERBOSE << "RTP packet is too small, size=" << message->size();
continue;
}

mRtpBuffer.push_back(message);
}

messages.clear();
messages.erase(std::remove_if(messages.begin(), messages.end(),
[&](message_ptr message) {
if (message->type == Message::Control) {
return false;
}

if (message->size() < sizeof(RtpHeader)) {
PLOG_VERBOSE << "RTP packet is too small, size="
<< message->size();
return false;
}

mRtpBuffer.push_back(std::move(message));
return true;
}),
messages.end());

while (mRtpBuffer.size() != 0) {
uint32_t current_timestamp = 0;
Expand All @@ -128,7 +133,7 @@ void H264RtpDepacketizer::incoming(message_vector &messages, const message_callb
auto begin = mRtpBuffer.begin();
auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1);

auto frames = buildFrames(begin, end + 1);
auto frames = buildFrames(begin, end + 1, current_timestamp);
messages.insert(messages.end(), frames.begin(), frames.end());
mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp);
}
Expand Down
4 changes: 2 additions & 2 deletions src/impl/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct Channel {
virtual void triggerAvailable(size_t count);
virtual void triggerBufferedAmount(size_t amount);

void flushPendingMessages();
virtual void flushPendingMessages();
void resetOpenCallback();
void resetCallbacks();

Expand All @@ -43,7 +43,7 @@ struct Channel {
std::atomic<size_t> bufferedAmount = 0;
std::atomic<size_t> bufferedAmountLowThreshold = 0;

private:
protected:
std::atomic<bool> mOpenTriggered = false;
};

Expand Down
47 changes: 37 additions & 10 deletions src/impl/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,23 @@ void Track::close() {
resetCallbacks();
}

message_variant Track::trackMessageToVariant(message_ptr message) {
if (message->type == Message::Control)
return to_variant(*message); // The same message may be frowarded into multiple Tracks
else
return to_variant(std::move(*message));
}

optional<message_variant> Track::receive() {
if (auto next = mRecvQueue.pop()) {
message_ptr message = *next;
if (message->type == Message::Control)
return to_variant(**next); // The same message may be frowarded into multiple Tracks
else
return to_variant(std::move(*message));
return trackMessageToVariant(*next);
}
return nullopt;
}

optional<message_variant> Track::peek() {
if (auto next = mRecvQueue.peek()) {
message_ptr message = *next;
if (message->type == Message::Control)
return to_variant(**next); // The same message may be forwarded into multiple Tracks
else
return to_variant(std::move(*message));
return trackMessageToVariant(*next);
}
return nullopt;
}
Expand Down Expand Up @@ -226,4 +225,32 @@ shared_ptr<MediaHandler> Track::getMediaHandler() {
return mMediaHandler;
}

void Track::onMediaSample(std::function<void(MediaSample data)> callback) {
mediaSampleCallback = callback;
flushPendingMessages();
}

void Track::flushPendingMessages() {
if (!mOpenTriggered)
return;

while (messageCallback || mediaSampleCallback) {
auto next = mRecvQueue.pop();
if (!next)
break;

auto message = next.value();
try {
if (message->media_sample != nullptr && mediaSampleCallback) {
mediaSampleCallback(
MediaSample(std::move(*message), message->media_sample->timestamp));
} else if (message->media_sample == nullptr && messageCallback) {
messageCallback(trackMessageToVariant(message));
}
} catch (const std::exception &e) {
PLOG_WARNING << "Uncaught exception in callback: " << e.what();
}
}
}

} // namespace rtc::impl
6 changes: 6 additions & 0 deletions src/impl/track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class Track final : public std::enable_shared_from_this<Track>, public Channel {
optional<message_variant> receive() override;
optional<message_variant> peek() override;
size_t availableAmount() const override;
void flushPendingMessages() override;
message_variant trackMessageToVariant(message_ptr message);

void onMediaSample(std::function<void(MediaSample data)> callback);

bool isOpen() const;
bool isClosed() const;
Expand Down Expand Up @@ -71,6 +75,8 @@ class Track final : public std::enable_shared_from_this<Track>, public Channel {
std::atomic<bool> mIsClosed = false;

Queue<message_ptr> mRecvQueue;

synchronized_callback<MediaSample> mediaSampleCallback;
};

} // namespace rtc::impl
Expand Down
4 changes: 3 additions & 1 deletion src/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ message_ptr make_message(size_t size, Message::Type type, unsigned int stream,
}

message_ptr make_message(binary &&data, Message::Type type, unsigned int stream,
shared_ptr<Reliability> reliability) {
shared_ptr<Reliability> reliability,
shared_ptr<MediaSample> media_sample) {
auto message = std::make_shared<Message>(std::move(data), type);
message->stream = stream;
message->reliability = reliability;
message->media_sample = media_sample;
return message;
}

Expand Down
4 changes: 4 additions & 0 deletions src/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,8 @@ bool Track::requestBitrate(unsigned int bitrate) {

shared_ptr<MediaHandler> Track::getMediaHandler() { return impl()->getMediaHandler(); }

void Track::onMediaSample(std::function<void(MediaSample data)> callback) {
impl()->onMediaSample(callback);
}

} // namespace rtc

0 comments on commit fab084d

Please sign in to comment.