diff --git a/moxygen/samples/flv_receiver_client/CMakeLists.txt b/moxygen/samples/flv_receiver_client/CMakeLists.txt new file mode 100644 index 0000000..7be6744 --- /dev/null +++ b/moxygen/samples/flv_receiver_client/CMakeLists.txt @@ -0,0 +1,36 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# MoQFlvReceiverClient +add_executable( + moqflvreceiverclient + MoQFlvReceiverClient.cpp +) +set_target_properties( + moqflvreceiverclient + PROPERTIES + BUILD_RPATH ${DEPS_LIBRARIES_DIR} + INSTALL_RPATH ${DEPS_LIBRARIES_DIR} +) +target_include_directories( + moqflvreceiverclient PUBLIC $ +) +target_compile_options( + moqflvreceiverclient PRIVATE + ${_MOXYGEN_COMMON_COMPILE_OPTIONS} +) +target_link_libraries( + moqflvreceiverclient PUBLIC + Folly::folly + moxygen +) + +install( + TARGETS moqflvreceiverclient + EXPORT moxygen-exports + ARCHIVE DESTINATION ${LIB_INSTALL_DIR} + LIBRARY DESTINATION ${LIB_INSTALL_DIR} +) diff --git a/moxygen/samples/flv_receiver_client/MoQFlvReceiverClient.cpp b/moxygen/samples/flv_receiver_client/MoQFlvReceiverClient.cpp new file mode 100644 index 0000000..d2d0242 --- /dev/null +++ b/moxygen/samples/flv_receiver_client/MoQFlvReceiverClient.cpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include "moxygen/MoQClient.h" +#include "moxygen/ObjectReceiver.h" + +#include +#include +#include + +DEFINE_string( + connect_url, + "https://localhost:4433/moq", + "URL for webtransport server"); +DEFINE_string(track_namespace, "flvstreamer", "Track Namespace"); +DEFINE_string(track_namespace_delimiter, "/", "Track Namespace Delimiter"); +// TODO DEFINE_string(video_track_name, "video0", "Video track Name"); +// TODO: Fix and add proper audo & video parsing. This is set to video0 on +// purpose to test the video track +DEFINE_string(track_name, "video0", "Track Name"); +DEFINE_int32(connect_timeout, 1000, "Connect timeout (ms)"); +DEFINE_int32(transaction_timeout, 120, "Transaction timeout (s)"); +DEFINE_bool(quic_transport, false, "Use raw QUIC transport"); +DEFINE_bool(fetch, false, "Use fetch rather than subscribe"); + +namespace { +using namespace moxygen; + +struct SubParams { + LocationType locType; + folly::Optional start; + folly::Optional end; +}; + +class TrackReceiverHandler : public ObjectReceiverCallback { + public: + ~TrackReceiverHandler() override = default; + FlowControlState onObject(const ObjectHeader&, Payload payload) override { + if (payload) { + std::cout << "Received payload. Size=" + << payload->computeChainDataLength() << std::endl; + } + return FlowControlState::UNBLOCKED; + } + void onObjectStatus(const ObjectHeader& objHeader) override { + std::cout << "ObjectStatus=" << uint32_t(objHeader.status) << std::endl; + } + void onEndOfStream() override {} + void onError(ResetStreamErrorCode error) override { + std::cout << "Stream Error=" << folly::to_underlying(error) << std::endl; + } + + void subscribeDone(SubscribeDone) override { + baton.post(); + } + + folly::coro::Baton baton; +}; + +class MoQFlvReceiverClient { + public: + MoQFlvReceiverClient( + folly::EventBase* evb, + proxygen::URL url, + FullTrackName ftn) + : moqClient_( + evb, + std::move(url), + (FLAGS_quic_transport ? MoQClient::TransportType::QUIC + : MoQClient::TransportType::H3_WEBTRANSPORT)), + fullTrackName_(std::move(ftn)) {} + + folly::coro::Task run(SubscribeRequest sub) noexcept { + XLOG(INFO) << __func__; + auto g = + folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); + try { + co_await moqClient_.setupMoQSession( + std::chrono::milliseconds(FLAGS_connect_timeout), + std::chrono::seconds(FLAGS_transaction_timeout), + Role::SUBSCRIBER); + auto exec = co_await folly::coro::co_current_executor; + controlReadLoop().scheduleOn(exec).start(); + + SubParams subParams{sub.locType, sub.start, sub.end}; + sub.locType = LocationType::LatestObject; + sub.start = folly::none; + sub.end = folly::none; + subRxHandler_ = std::make_shared( + ObjectReceiver::SUBSCRIBE, &trackReceiverHandler_); + auto track = + co_await moqClient_.moqSession_->subscribe(sub, subRxHandler_); + if (track.hasValue()) { + subscribeID_ = track->subscribeID; + XLOG(DBG1) << "subscribeID=" << subscribeID_; + auto latest = track->latest; + if (latest) { + XLOG(INFO) << "Latest={" << latest->group << ", " << latest->object + << "}"; + } + } else { + XLOG(INFO) << "SubscribeError id=" << track.error().subscribeID + << " code=" << track.error().errorCode + << " reason=" << track.error().reasonPhrase; + } + moqClient_.moqSession_->drain(); + } catch (const std::exception& ex) { + XLOG(ERR) << ex.what(); + co_return; + } + co_await trackReceiverHandler_.baton; + XLOG(INFO) << __func__ << " done"; + } + + void stop() { + moqClient_.moqSession_->unsubscribe({subscribeID_}); + moqClient_.moqSession_->close(); + } + + folly::coro::Task controlReadLoop() { + class ControlVisitor : public MoQSession::ControlVisitor { + public: + explicit ControlVisitor(MoQFlvReceiverClient& client) : client_(client) {} + + void operator()(Announce announce) const override { + XLOG(WARN) << "Announce ns=" << announce.trackNamespace; + // text client doesn't expect server or relay to announce anything, + // but announce OK anyways + client_.moqClient_.moqSession_->announceOk({announce.trackNamespace}); + } + + void operator()(SubscribeRequest subscribeReq) const override { + XLOG(INFO) << "SubscribeRequest"; + client_.moqClient_.moqSession_->subscribeError( + {subscribeReq.subscribeID, 404, "don't care"}); + } + + void operator()(Goaway) const override { + XLOG(INFO) << "Goaway"; + client_.moqClient_.moqSession_->unsubscribe({client_.subscribeID_}); + } + + private: + MoQFlvReceiverClient& client_; + }; + XLOG(INFO) << __func__; + auto g = + folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); + ControlVisitor visitor(*this); + MoQSession::ControlVisitor* vptr(&visitor); + while (auto msg = + co_await moqClient_.moqSession_->controlMessages().next()) { + boost::apply_visitor(*vptr, msg.value()); + } + } + + MoQClient moqClient_; + FullTrackName fullTrackName_; + SubscribeID subscribeID_{0}; + TrackReceiverHandler trackReceiverHandler_; + std::shared_ptr subRxHandler_; +}; +} // namespace + +using namespace moxygen; + +int main(int argc, char* argv[]) { + folly::Init init(&argc, &argv, false); + folly::EventBase eventBase; + proxygen::URL url(FLAGS_connect_url); + if (!url.isValid() || !url.hasHost()) { + XLOG(ERR) << "Invalid url: " << FLAGS_connect_url; + } + TrackNamespace ns = + TrackNamespace(FLAGS_track_namespace, FLAGS_track_namespace_delimiter); + MoQFlvReceiverClient flvReceiverClient( + &eventBase, + std::move(url), + moxygen::FullTrackName({ns, FLAGS_track_name})); + class SigHandler : public folly::AsyncSignalHandler { + public: + explicit SigHandler(folly::EventBase* evb, std::function fn) + : folly::AsyncSignalHandler(evb), fn_(std::move(fn)) { + registerSignalHandler(SIGTERM); + registerSignalHandler(SIGINT); + } + void signalReceived(int signum) noexcept override { + fn_(signum); + unreg(); + } + + void unreg() { + unregisterSignalHandler(SIGTERM); + unregisterSignalHandler(SIGINT); + } + + private: + std::function fn_; + }; + SigHandler handler(&eventBase, [&flvReceiverClient](int) mutable { + flvReceiverClient.stop(); + }); + auto subParams = + SubParams{LocationType::LatestObject, folly::none, folly::none}; + const auto subscribeID = 0; + const auto trackAlias = 1; + flvReceiverClient + .run( + {subscribeID, + trackAlias, + moxygen::FullTrackName({std::move(ns), FLAGS_track_name}), + 0, + GroupOrder::OldestFirst, + subParams.locType, + subParams.start, + subParams.end, + {}}) + .scheduleOn(&eventBase) + .start() + .via(&eventBase) + .thenTry([&handler](auto) { handler.unreg(); }); + eventBase.loop(); +} diff --git a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp index 2dd0410..82fcea7 100644 --- a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp +++ b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp @@ -41,7 +41,6 @@ class MoQFlvStreamerClient { std::move(url), (FLAGS_quic_transport ? MoQClient::TransportType::QUIC : MoQClient::TransportType::H3_WEBTRANSPORT)), - evb_(evb), fullVideoTrackName_(std::move(fvtn)), fullAudioTrackName_(std::move(fatn)) {} @@ -63,11 +62,7 @@ class MoQFlvStreamerClient { if (annResp.hasValue()) { trackNamespace_ = annResp->trackNamespace; - publishLoop() - .scheduleOn(folly::getGlobalIOExecutor()) - .start() - .via(evb_) - .thenTry([this](auto&&) { stop(); }); + folly::getGlobalIOExecutor()->add([this] { publishLoop(); }); } else { XLOG(INFO) << "Announce error trackNamespace=" << annResp->trackNamespace @@ -89,11 +84,11 @@ class MoQFlvStreamerClient { } } - folly::coro::Task publishLoop() { + void publishLoop() { XLOG(INFO) << __func__; auto g = folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); - + folly::Executor::KeepAlive keepAlive(moqClient_.getEventBase()); FlvSequentialReader flvSeqReader(FLAGS_input_flv_file); while (moqClient_.moqSession_) { auto item = flvSeqReader.getNextItem(); @@ -102,60 +97,31 @@ class MoQFlvStreamerClient { break; } for (auto& sub : subscriptions_) { - XLOG(DBG1) << "Sending item: " << item->id + XLOG(DBG1) << "Evaluating to send item: " << item->id << ", type: " << folly::to_underlying(item->type) << ", to subID-TrackAlias: " << sub.second.subscribeID << "-" << sub.second.trackAlias; if (sub.second.fullTrackName == fullVideoTrackName_ && videoPub_) { - if (item->isEOF && - MoQFlvStreamerClient::isAnyElementSent(latestVideo_)) { - // EOF detected and an some video element was sent, close group - moqClient_.getEventBase()->runInEventBaseThread( - [this, subscribeID(sub.second.subscribeID)] { - latestVideo_.object++, - XLOG(DBG1) << "Closing group because EOF. objId=" - << latestVideo_.object; - if (videoSgPub_) { - videoSgPub_->endOfGroup(latestVideo_.object); - videoSgPub_.reset(); - } - }); - } - - if (item->type == FlvSequentialReader::MediaType::VIDEO) { - // Video + if (item->data && + (item->type == FlvSequentialReader::MediaType::VIDEO || + item->isEOF)) { + // Send audio data in a thread (stream per object) moqClient_.getEventBase()->runInEventBaseThread( - [this, item, subscribeID(sub.second.subscribeID)] { - publishVideoItem(subscribeID, std::move(item)); + [self(this), trackAlias(sub.second.trackAlias), item] { + self->PublishVideo(trackAlias, item); }); } } - if (sub.second.fullTrackName == fullAudioTrackName_ && - item->type == FlvSequentialReader::MediaType::AUDIO && audioPub_) { + if (sub.second.fullTrackName == fullAudioTrackName_ && audioPub_) { // Audio - if (item->data) { + if (item->data && + (item->type == FlvSequentialReader::MediaType::AUDIO || + item->isEOF)) { // Send audio data in a thread (stream per object) - ObjectHeader objHeader = ObjectHeader{ - sub.second.trackAlias, - latestAudio_.group++, - /*subgroup=*/0, - latestAudio_.object, - AUDIO_STREAM_PRIORITY, - ForwardPreference::Subgroup, - ObjectStatus::NORMAL}; - moqClient_.getEventBase()->runInEventBaseThread( - [this, objHeader, item, subscribeID(sub.second.subscribeID)] { - auto objPayload = encodeToMoQMi(item); - if (!objPayload) { - XLOG(ERR) << "Failed to encode audio frame"; - } else { - XLOG(DBG1) << "Sending audio frame" << objHeader - << ", payload size: " - << objPayload->computeChainDataLength(); - audioPub_->objectStream(objHeader, std::move(objPayload)); - } + [self(this), trackAlias(sub.second.trackAlias), item] { + self->PublishAudio(trackAlias, item); }); } } @@ -165,46 +131,6 @@ class MoQFlvStreamerClient { break; } } - co_return; - } - - void publishVideoItem( - SubscribeID subscribeID, - std::shared_ptr item) { - if (item->isIdr && MoQFlvStreamerClient::isAnyElementSent(latestVideo_) && - videoSgPub_) { - // Close group - latestVideo_.object++, - XLOG(DBG1) << "Closing group because IDR. objHeader: " - << latestVideo_.object; - videoSgPub_->endOfGroup(latestVideo_.object); - videoSgPub_.reset(); - - // Start new group - latestVideo_.group++; - latestVideo_.object = 0; - } - if (!videoSgPub_) { - auto res = videoPub_->beginSubgroup( - latestVideo_.group, 0, VIDEO_STREAM_PRIORITY); - if (!res) { - XLOG(FATAL) << "Error creating subgroup"; - } - videoSgPub_ = std::move(res.value()); - } - auto objPayload = encodeToMoQMi(item); - if (!objPayload) { - XLOG(ERR) << "Failed to encode video frame"; - } else { - XLOG(DBG1) << "Sending video frame={" << latestVideo_.group << "," - << latestVideo_.object - << "}, payload size: " << objPayload->computeChainDataLength(); - videoSgPub_->object( - latestVideo_.object, - std::move(objPayload), - /*finSubgroup=*/false); - } - latestVideo_.object++; } folly::coro::Task controlReadLoop() { @@ -289,17 +215,90 @@ class MoQFlvStreamerClient { XLOG(INFO) << "Session closed"; } - private: - static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */ - static const uint8_t VIDEO_STREAM_PRIORITY = 200; + void PublishAudio( + TrackAlias trackAlias, + std::shared_ptr item) { + if (item->isEOF) { + XLOG(INFO) << "FLV audio received EOF"; + return; + } + auto objPayload = encodeToMoQMi(item); + if (!objPayload) { + XLOG(ERR) << "Failed to encode audio frame"; + return; + } + ObjectHeader objHeader = ObjectHeader{ + trackAlias, + latestAudio_.group++, + /*subgroup=*/0, + latestAudio_.object, + AUDIO_STREAM_PRIORITY, + ForwardPreference::Subgroup, + ObjectStatus::NORMAL}; + + XLOG(DBG1) << "Sending audio frame" << objHeader + << ", payload size: " << objPayload->computeChainDataLength(); + audioPub_->objectStream(objHeader, std::move(objPayload)); + } + + void PublishVideo( + TrackAlias trackAlias, + std::shared_ptr item) { + if (item->isEOF) { + XLOG(INFO) << "FLV video received EOF"; + if (videoPub_ && videoSgPub_) { + videoSgPub_->endOfGroup(latestVideo_.object); + videoSgPub_.reset(); + + latestVideo_.group++; + latestVideo_.object = 0; + } + return; + } - static bool isAnyElementSent(const AbsoluteLocation& loc) { - if (loc.group == 0 && loc.object == 0) { - return false; + auto objPayload = encodeToMoQMi(item); + if (!objPayload) { + XLOG(ERR) << "Failed to encode video frame"; + return; + } + + if (!item->isIdr && !videoSgPub_) { + XLOG(INFO) << "Discarding non-IDR frame before subgroup started"; + return; + } + + if (item->isIdr) { + if (videoSgPub_) { + // Close previous subgroup + videoSgPub_->endOfGroup(latestVideo_.object); + videoSgPub_.reset(); + latestVideo_.group++; + latestVideo_.object = 0; + } + // Open new subgroup + auto res = videoPub_->beginSubgroup( + latestVideo_.group, 0, VIDEO_STREAM_PRIORITY); + if (!res) { + XLOG(ERR) << "Error creating subgroup"; + } + videoSgPub_ = std::move(res.value()); + } + + // Send video data + if (videoSgPub_) { + XLOG(DBG1) << "Sending video frame. grp-obj: " << latestVideo_.group + << "-" << latestVideo_.object + << ". Payload size: " << objPayload->computeChainDataLength(); + videoSgPub_->object(latestVideo_.object++, std::move(objPayload)); + } else { + XLOG(ERR) << "Should not happen"; } - return true; } + private: + static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */ + static const uint8_t VIDEO_STREAM_PRIORITY = 200; + static std::unique_ptr encodeToMoQMi( std::shared_ptr item) { if (item->type == FlvSequentialReader::MediaType::VIDEO) { @@ -331,7 +330,6 @@ class MoQFlvStreamerClient { } MoQClient moqClient_; - folly::EventBase* evb_; TrackNamespace trackNamespace_; FullTrackName fullVideoTrackName_; FullTrackName fullAudioTrackName_;