diff --git a/CMakeLists.txt b/CMakeLists.txt index ca0f12e52..79837a012 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -99,6 +99,7 @@ set(LIBDATACHANNEL_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/common.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/global.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/message.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/mediasample.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/peerconnection.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h diff --git a/include/rtc/h264rtpdepacketizer.hpp b/include/rtc/h264rtpdepacketizer.hpp index 663e459a4..bf4505687 100644 --- a/include/rtc/h264rtpdepacketizer.hpp +++ b/include/rtc/h264rtpdepacketizer.hpp @@ -32,7 +32,8 @@ class RTC_CPP_EXPORT H264RtpDepacketizer : public MediaHandler { private: std::vector mRtpBuffer; - message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt); + message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt, + uint32_t timestamp); }; } // namespace rtc diff --git a/include/rtc/mediasample.hpp b/include/rtc/mediasample.hpp new file mode 100644 index 000000000..af41ff580 --- /dev/null +++ b/include/rtc/mediasample.hpp @@ -0,0 +1,26 @@ +/** + * Copyright (c) 2019-2020 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#ifndef RTC_FRAMEINFO_H +#define RTC_FRAMEINFO_H + +#include "common.hpp" + +namespace rtc { + +struct RTC_CPP_EXPORT MediaSample : binary { + MediaSample(uint32_t timestamp) : timestamp(timestamp){}; + MediaSample(binary &&data, uint32_t timestamp) + : binary(std::move(data)), timestamp(timestamp){}; + + uint32_t timestamp = 0; // RTP Timestamp +}; + +} // namespace rtc + +#endif // RTC_FRAMEINFO_H diff --git a/include/rtc/message.hpp b/include/rtc/message.hpp index 486210723..a4f937b38 100644 --- a/include/rtc/message.hpp +++ b/include/rtc/message.hpp @@ -10,6 +10,7 @@ #define RTC_MESSAGE_H #include "common.hpp" +#include "mediasample.hpp" #include "reliability.hpp" #include @@ -32,6 +33,7 @@ struct RTC_CPP_EXPORT Message : binary { unsigned int stream = 0; // Stream id (SCTP stream or SSRC) unsigned int dscp = 0; // Differentiated Services Code Point shared_ptr reliability; + shared_ptr media_sample; }; using message_ptr = shared_ptr; @@ -44,10 +46,12 @@ inline size_t message_size_func(const message_ptr &m) { template message_ptr make_message(Iterator begin, Iterator end, Message::Type type = Message::Binary, - unsigned int stream = 0, shared_ptr reliability = nullptr) { + unsigned int stream = 0, shared_ptr reliability = nullptr, + shared_ptr media_sample = nullptr) { auto message = std::make_shared(begin, end, type); message->stream = stream; message->reliability = reliability; + message->media_sample = media_sample; return message; } @@ -57,7 +61,8 @@ RTC_CPP_EXPORT message_ptr make_message(size_t size, Message::Type type = Messag RTC_CPP_EXPORT message_ptr make_message(binary &&data, Message::Type type = Message::Binary, unsigned int stream = 0, - shared_ptr reliability = nullptr); + shared_ptr reliability = nullptr, + shared_ptr media_sample = nullptr); RTC_CPP_EXPORT message_ptr make_message(size_t size, message_ptr orig); diff --git a/include/rtc/track.hpp b/include/rtc/track.hpp index 4bd9fde3e..a97175bd2 100644 --- a/include/rtc/track.hpp +++ b/include/rtc/track.hpp @@ -41,6 +41,8 @@ class RTC_CPP_EXPORT Track final : private CheshireCat, public Chan bool isClosed(void) const override; size_t maxMessageSize() const override; + void onMediaSample(std::function callback); + bool requestKeyframe(); bool requestBitrate(unsigned int bitrate); diff --git a/src/h264rtpdepacketizer.cpp b/src/h264rtpdepacketizer.cpp index 3e8cc1d00..6d6f41e7f 100644 --- a/src/h264rtpdepacketizer.cpp +++ b/src/h264rtpdepacketizer.cpp @@ -32,9 +32,10 @@ const uint8_t naluTypeSTAPA = 24; const uint8_t naluTypeFUA = 28; message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin, - message_vector::iterator end) { + message_vector::iterator end, uint32_t timestamp) { message_vector out = {}; auto fua_buffer = std::vector{}; + auto media_sample = std::make_shared(timestamp); for (auto it = begin; it != end; it++) { auto pkt = it->get(); @@ -58,11 +59,13 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin, fua_buffer.at(0) = std::byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType()); - out.push_back(make_message(std::move(fua_buffer))); + out.push_back( + make_message(std::move(fua_buffer), Message::Binary, 0, nullptr, media_sample)); fua_buffer.clear(); } } else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) { - out.push_back(make_message(pkt->begin() + headerSize, pkt->end())); + out.push_back(make_message(pkt->begin() + headerSize, pkt->end(), Message::Binary, 0, + nullptr, media_sample)); } else if (nalUnitHeader.unitType() == naluTypeSTAPA) { auto currOffset = stapaHeaderSize + headerSize; @@ -76,11 +79,11 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin, throw std::runtime_error("STAP-A declared size is larger then buffer"); } - out.push_back( - make_message(pkt->begin() + currOffset, pkt->begin() + currOffset + naluSize)); + out.push_back(make_message(pkt->begin() + currOffset, + pkt->begin() + currOffset + naluSize, Message::Binary, 0, + nullptr, media_sample)); currOffset += naluSize; } - } else { throw std::runtime_error("Unknown H264 RTP Packetization"); } @@ -90,20 +93,22 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin, } void H264RtpDepacketizer::incoming(message_vector &messages, const message_callback &) { - for (auto message : messages) { - if (message->type == Message::Control) { - continue; // RTCP - } - - if (message->size() < sizeof(RtpHeader)) { - PLOG_VERBOSE << "RTP packet is too small, size=" << message->size(); - continue; - } - - mRtpBuffer.push_back(message); - } - - messages.clear(); + messages.erase(std::remove_if(messages.begin(), messages.end(), + [&](message_ptr message) { + if (message->type == Message::Control) { + return false; + } + + if (message->size() < sizeof(RtpHeader)) { + PLOG_VERBOSE << "RTP packet is too small, size=" + << message->size(); + return false; + } + + mRtpBuffer.push_back(std::move(message)); + return true; + }), + messages.end()); while (mRtpBuffer.size() != 0) { uint32_t current_timestamp = 0; @@ -128,7 +133,7 @@ void H264RtpDepacketizer::incoming(message_vector &messages, const message_callb auto begin = mRtpBuffer.begin(); auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1); - auto frames = buildFrames(begin, end + 1); + auto frames = buildFrames(begin, end + 1, current_timestamp); messages.insert(messages.end(), frames.begin(), frames.end()); mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp); } diff --git a/src/impl/channel.hpp b/src/impl/channel.hpp index 85093b8d5..267d76774 100644 --- a/src/impl/channel.hpp +++ b/src/impl/channel.hpp @@ -28,7 +28,7 @@ struct Channel { virtual void triggerAvailable(size_t count); virtual void triggerBufferedAmount(size_t amount); - void flushPendingMessages(); + virtual void flushPendingMessages(); void resetOpenCallback(); void resetCallbacks(); @@ -43,7 +43,7 @@ struct Channel { std::atomic bufferedAmount = 0; std::atomic bufferedAmountLowThreshold = 0; -private: +protected: std::atomic mOpenTriggered = false; }; diff --git a/src/impl/track.cpp b/src/impl/track.cpp index 80927fd76..66820d0e3 100644 --- a/src/impl/track.cpp +++ b/src/impl/track.cpp @@ -75,24 +75,23 @@ void Track::close() { resetCallbacks(); } +message_variant Track::trackMessageToVariant(message_ptr message) { + if (message->type == Message::Control) + return to_variant(*message); // The same message may be frowarded into multiple Tracks + else + return to_variant(std::move(*message)); +} + optional Track::receive() { if (auto next = mRecvQueue.pop()) { - message_ptr message = *next; - if (message->type == Message::Control) - return to_variant(**next); // The same message may be frowarded into multiple Tracks - else - return to_variant(std::move(*message)); + return trackMessageToVariant(*next); } return nullopt; } optional Track::peek() { if (auto next = mRecvQueue.peek()) { - message_ptr message = *next; - if (message->type == Message::Control) - return to_variant(**next); // The same message may be forwarded into multiple Tracks - else - return to_variant(std::move(*message)); + return trackMessageToVariant(*next); } return nullopt; } @@ -226,4 +225,32 @@ shared_ptr Track::getMediaHandler() { return mMediaHandler; } +void Track::onMediaSample(std::function callback) { + mediaSampleCallback = callback; + flushPendingMessages(); +} + +void Track::flushPendingMessages() { + if (!mOpenTriggered) + return; + + while (messageCallback || mediaSampleCallback) { + auto next = mRecvQueue.pop(); + if (!next) + break; + + auto message = next.value(); + try { + if (message->media_sample != nullptr && mediaSampleCallback) { + mediaSampleCallback( + MediaSample(std::move(*message), message->media_sample->timestamp)); + } else if (message->media_sample == nullptr && messageCallback) { + messageCallback(trackMessageToVariant(message)); + } + } catch (const std::exception &e) { + PLOG_WARNING << "Uncaught exception in callback: " << e.what(); + } + } +} + } // namespace rtc::impl diff --git a/src/impl/track.hpp b/src/impl/track.hpp index ea1446b34..a6913d414 100644 --- a/src/impl/track.hpp +++ b/src/impl/track.hpp @@ -38,6 +38,10 @@ class Track final : public std::enable_shared_from_this, public Channel { optional receive() override; optional peek() override; size_t availableAmount() const override; + void flushPendingMessages() override; + message_variant trackMessageToVariant(message_ptr message); + + void onMediaSample(std::function callback); bool isOpen() const; bool isClosed() const; @@ -71,6 +75,8 @@ class Track final : public std::enable_shared_from_this, public Channel { std::atomic mIsClosed = false; Queue mRecvQueue; + + synchronized_callback mediaSampleCallback; }; } // namespace rtc::impl diff --git a/src/message.cpp b/src/message.cpp index a0e3c35aa..f15cda17e 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -19,10 +19,12 @@ message_ptr make_message(size_t size, Message::Type type, unsigned int stream, } message_ptr make_message(binary &&data, Message::Type type, unsigned int stream, - shared_ptr reliability) { + shared_ptr reliability, + shared_ptr media_sample) { auto message = std::make_shared(std::move(data), type); message->stream = stream; message->reliability = reliability; + message->media_sample = media_sample; return message; } diff --git a/src/track.cpp b/src/track.cpp index 70bda4672..188ff8465 100644 --- a/src/track.cpp +++ b/src/track.cpp @@ -70,4 +70,8 @@ bool Track::requestBitrate(unsigned int bitrate) { shared_ptr Track::getMediaHandler() { return impl()->getMediaHandler(); } +void Track::onMediaSample(std::function callback) { + impl()->onMediaSample(callback); +} + } // namespace rtc