From 5c165d2a7b0155bc879c65b405faefaa801efe85 Mon Sep 17 00:00:00 2001 From: Jordi Cenzano Ferret Date: Wed, 25 Dec 2024 15:02:42 -0800 Subject: [PATCH] Adapt API Changes to MoQFlvStreamerClient (Video/Audio -> MOQT) (#14) Summary: - Adapts MoQFlvStreamerClient to work with MOQ refactored API - Optimization of MoQFlvStreamerClient Differential Revision: D66176642 --- .../MoQFlvStreamerClient.cpp | 187 +++++++++--------- 1 file changed, 93 insertions(+), 94 deletions(-) diff --git a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp index 79acba0..65d37ac 100644 --- a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp +++ b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp @@ -41,7 +41,6 @@ class MoQFlvStreamerClient { std::move(url), (FLAGS_quic_transport ? MoQClient::TransportType::QUIC : MoQClient::TransportType::H3_WEBTRANSPORT)), - evb_(evb), fullVideoTrackName_(std::move(fvtn)), fullAudioTrackName_(std::move(fatn)) {} @@ -63,11 +62,7 @@ class MoQFlvStreamerClient { if (annResp.hasValue()) { trackNamespace_ = annResp->trackNamespace; - publishLoop() - .scheduleOn(folly::getGlobalIOExecutor()) - .start() - .via(evb_) - .thenTry([this](auto&&) { stop(); }); + folly::getGlobalIOExecutor()->add([this] { publishLoop(); }); } else { XLOG(INFO) << "Announce error trackNamespace=" << annResp->trackNamespace @@ -89,11 +84,11 @@ class MoQFlvStreamerClient { } } - folly::coro::Task publishLoop() { + void publishLoop() { XLOG(INFO) << __func__; auto g = folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); - + folly::Executor::KeepAlive keepAlive(moqClient_.getEventBase()); FlvSequentialReader flvSeqReader(FLAGS_input_flv_file); while (moqClient_.moqSession_) { auto item = flvSeqReader.getNextItem(); @@ -102,57 +97,29 @@ class MoQFlvStreamerClient { break; } for (auto& sub : subscriptions_) { - XLOG(DBG1) << "Sending item: " << item->id + XLOG(DBG1) << "Evaluating to send item: " << item->id << ", type: " << folly::to_underlying(item->type) << ", to subID-TrackAlias: " << sub.second.subscribeID << "-" << sub.second.trackAlias; if (sub.second.fullTrackName == fullVideoTrackName_ && videoPub_) { - if (item->isEOF && - MoQFlvStreamerClient::isAnyElementSent(latestVideo_)) { - // EOF detected and an some video element was sent, close group - moqClient_.getEventBase()->runInEventBaseThread([this] { - latestVideo_.object++, - XLOG(DBG1) - << "Closing group because EOF. objId=" << latestVideo_.object; - if (videoSgPub_) { - videoSgPub_->endOfGroup(latestVideo_.object); - videoSgPub_.reset(); - } - }); - } - - if (item->type == FlvSequentialReader::MediaType::VIDEO) { - // Video + if (item->data && + (item->type == FlvSequentialReader::MediaType::VIDEO || + item->isEOF)) { + // Send audio data in a thread (stream per object) moqClient_.getEventBase()->runInEventBaseThread( - [this, item]() mutable { publishVideoItem(std::move(item)); }); + [self(this), item] { self->PublishVideo(item); }); } } - if (sub.second.fullTrackName == fullAudioTrackName_ && - item->type == FlvSequentialReader::MediaType::AUDIO && audioPub_) { + if (sub.second.fullTrackName == fullAudioTrackName_ && audioPub_) { // Audio - if (item->data) { + if (item->data && + (item->type == FlvSequentialReader::MediaType::AUDIO || + item->isEOF)) { // Send audio data in a thread (stream per object) - ObjectHeader objHeader = ObjectHeader{ - sub.second.trackAlias, - latestAudio_.group++, - /*subgroup=*/0, - latestAudio_.object, - AUDIO_STREAM_PRIORITY, - ForwardPreference::Subgroup, - ObjectStatus::NORMAL}; - moqClient_.getEventBase()->runInEventBaseThread( - [this, objHeader, item] { - auto objPayload = encodeToMoQMi(item); - if (!objPayload) { - XLOG(ERR) << "Failed to encode audio frame"; - } else { - XLOG(DBG1) << "Sending audio frame" << objHeader - << ", payload size: " - << objPayload->computeChainDataLength(); - audioPub_->objectStream(objHeader, std::move(objPayload)); - } + [self(this), trackAlias(sub.second.trackAlias), item] { + self->PublishAudio(trackAlias, item); }); } } @@ -162,44 +129,6 @@ class MoQFlvStreamerClient { break; } } - co_return; - } - - void publishVideoItem(std::shared_ptr item) { - if (item->isIdr && MoQFlvStreamerClient::isAnyElementSent(latestVideo_) && - videoSgPub_) { - // Close group - latestVideo_.object++, - XLOG(DBG1) << "Closing group because IDR. objHeader: " - << latestVideo_.object; - videoSgPub_->endOfGroup(latestVideo_.object); - videoSgPub_.reset(); - - // Start new group - latestVideo_.group++; - latestVideo_.object = 0; - } - if (!videoSgPub_) { - auto res = videoPub_->beginSubgroup( - latestVideo_.group, 0, VIDEO_STREAM_PRIORITY); - if (!res) { - XLOG(FATAL) << "Error creating subgroup"; - } - videoSgPub_ = std::move(res.value()); - } - auto objPayload = encodeToMoQMi(item); - if (!objPayload) { - XLOG(ERR) << "Failed to encode video frame"; - } else { - XLOG(DBG1) << "Sending video frame={" << latestVideo_.group << "," - << latestVideo_.object - << "}, payload size: " << objPayload->computeChainDataLength(); - videoSgPub_->object( - latestVideo_.object, - std::move(objPayload), - /*finSubgroup=*/false); - } - latestVideo_.object++; } folly::coro::Task controlReadLoop() { @@ -284,17 +213,88 @@ class MoQFlvStreamerClient { XLOG(INFO) << "Session closed"; } - private: - static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */ - static const uint8_t VIDEO_STREAM_PRIORITY = 200; + void PublishAudio( + TrackAlias trackAlias, + std::shared_ptr item) { + if (item->isEOF) { + XLOG(INFO) << "FLV audio received EOF"; + return; + } + auto objPayload = encodeToMoQMi(item); + if (!objPayload) { + XLOG(ERR) << "Failed to encode audio frame"; + return; + } + ObjectHeader objHeader = ObjectHeader{ + trackAlias, + latestAudio_.group++, + /*subgroup=*/0, + latestAudio_.object, + AUDIO_STREAM_PRIORITY, + ForwardPreference::Subgroup, + ObjectStatus::NORMAL}; + + XLOG(DBG1) << "Sending audio frame" << objHeader + << ", payload size: " << objPayload->computeChainDataLength(); + audioPub_->objectStream(objHeader, std::move(objPayload)); + } + + void PublishVideo(std::shared_ptr item) { + if (item->isEOF) { + XLOG(INFO) << "FLV video received EOF"; + if (videoPub_ && videoSgPub_) { + videoSgPub_->endOfGroup(latestVideo_.object); + videoSgPub_.reset(); + + latestVideo_.group++; + latestVideo_.object = 0; + } + return; + } + + auto objPayload = encodeToMoQMi(item); + if (!objPayload) { + XLOG(ERR) << "Failed to encode video frame"; + return; + } + + if (!item->isIdr && !videoSgPub_) { + XLOG(INFO) << "Discarding non-IDR frame before subgroup started"; + return; + } - static bool isAnyElementSent(const AbsoluteLocation& loc) { - if (loc.group == 0 && loc.object == 0) { - return false; + if (item->isIdr) { + if (videoSgPub_) { + // Close previous subgroup + videoSgPub_->endOfGroup(latestVideo_.object); + videoSgPub_.reset(); + latestVideo_.group++; + latestVideo_.object = 0; + } + // Open new subgroup + auto res = videoPub_->beginSubgroup( + latestVideo_.group, 0, VIDEO_STREAM_PRIORITY); + if (!res) { + XLOG(ERR) << "Error creating subgroup"; + } + videoSgPub_ = std::move(res.value()); + } + + // Send video data + if (videoSgPub_) { + XLOG(DBG1) << "Sending video frame. grp-obj: " << latestVideo_.group + << "-" << latestVideo_.object + << ". Payload size: " << objPayload->computeChainDataLength(); + videoSgPub_->object(latestVideo_.object++, std::move(objPayload)); + } else { + XLOG(ERR) << "Should not happen"; } - return true; } + private: + static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */ + static const uint8_t VIDEO_STREAM_PRIORITY = 200; + static std::unique_ptr encodeToMoQMi( std::shared_ptr item) { if (item->type == FlvSequentialReader::MediaType::VIDEO) { @@ -326,7 +326,6 @@ class MoQFlvStreamerClient { } MoQClient moqClient_; - folly::EventBase* evb_; TrackNamespace trackNamespace_; FullTrackName fullVideoTrackName_; FullTrackName fullAudioTrackName_;