Skip to content

Commit

Permalink
Adapt API Changes to MoQFlvStreamerClient (Video/Audio -> MOQT) (#14)
Browse files Browse the repository at this point in the history
Summary:

- Adapts MoQFlvStreamerClient to work with MOQ refactored API
- Optimization of MoQFlvStreamerClient

Differential Revision: D66176642
  • Loading branch information
jordicenzano authored and facebook-github-bot committed Dec 25, 2024
1 parent 58ba359 commit 5c165d2
Showing 1 changed file with 93 additions and 94 deletions.
187 changes: 93 additions & 94 deletions moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class MoQFlvStreamerClient {
std::move(url),
(FLAGS_quic_transport ? MoQClient::TransportType::QUIC
: MoQClient::TransportType::H3_WEBTRANSPORT)),
evb_(evb),
fullVideoTrackName_(std::move(fvtn)),
fullAudioTrackName_(std::move(fatn)) {}

Expand All @@ -63,11 +62,7 @@ class MoQFlvStreamerClient {
if (annResp.hasValue()) {
trackNamespace_ = annResp->trackNamespace;

publishLoop()
.scheduleOn(folly::getGlobalIOExecutor())
.start()
.via(evb_)
.thenTry([this](auto&&) { stop(); });
folly::getGlobalIOExecutor()->add([this] { publishLoop(); });
} else {
XLOG(INFO) << "Announce error trackNamespace="
<< annResp->trackNamespace
Expand All @@ -89,11 +84,11 @@ class MoQFlvStreamerClient {
}
}

folly::coro::Task<void> publishLoop() {
void publishLoop() {
XLOG(INFO) << __func__;
auto g =
folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; });

folly::Executor::KeepAlive keepAlive(moqClient_.getEventBase());
FlvSequentialReader flvSeqReader(FLAGS_input_flv_file);
while (moqClient_.moqSession_) {
auto item = flvSeqReader.getNextItem();
Expand All @@ -102,57 +97,29 @@ class MoQFlvStreamerClient {
break;
}
for (auto& sub : subscriptions_) {
XLOG(DBG1) << "Sending item: " << item->id
XLOG(DBG1) << "Evaluating to send item: " << item->id
<< ", type: " << folly::to_underlying(item->type)
<< ", to subID-TrackAlias: " << sub.second.subscribeID << "-"
<< sub.second.trackAlias;

if (sub.second.fullTrackName == fullVideoTrackName_ && videoPub_) {
if (item->isEOF &&
MoQFlvStreamerClient::isAnyElementSent(latestVideo_)) {
// EOF detected and an some video element was sent, close group
moqClient_.getEventBase()->runInEventBaseThread([this] {
latestVideo_.object++,
XLOG(DBG1)
<< "Closing group because EOF. objId=" << latestVideo_.object;
if (videoSgPub_) {
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();
}
});
}

if (item->type == FlvSequentialReader::MediaType::VIDEO) {
// Video
if (item->data &&
(item->type == FlvSequentialReader::MediaType::VIDEO ||
item->isEOF)) {
// Send audio data in a thread (stream per object)
moqClient_.getEventBase()->runInEventBaseThread(
[this, item]() mutable { publishVideoItem(std::move(item)); });
[self(this), item] { self->PublishVideo(item); });
}
}
if (sub.second.fullTrackName == fullAudioTrackName_ &&
item->type == FlvSequentialReader::MediaType::AUDIO && audioPub_) {
if (sub.second.fullTrackName == fullAudioTrackName_ && audioPub_) {
// Audio
if (item->data) {
if (item->data &&
(item->type == FlvSequentialReader::MediaType::AUDIO ||
item->isEOF)) {
// Send audio data in a thread (stream per object)
ObjectHeader objHeader = ObjectHeader{
sub.second.trackAlias,
latestAudio_.group++,
/*subgroup=*/0,
latestAudio_.object,
AUDIO_STREAM_PRIORITY,
ForwardPreference::Subgroup,
ObjectStatus::NORMAL};

moqClient_.getEventBase()->runInEventBaseThread(
[this, objHeader, item] {
auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode audio frame";
} else {
XLOG(DBG1) << "Sending audio frame" << objHeader
<< ", payload size: "
<< objPayload->computeChainDataLength();
audioPub_->objectStream(objHeader, std::move(objPayload));
}
[self(this), trackAlias(sub.second.trackAlias), item] {
self->PublishAudio(trackAlias, item);
});
}
}
Expand All @@ -162,44 +129,6 @@ class MoQFlvStreamerClient {
break;
}
}
co_return;
}

void publishVideoItem(std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->isIdr && MoQFlvStreamerClient::isAnyElementSent(latestVideo_) &&
videoSgPub_) {
// Close group
latestVideo_.object++,
XLOG(DBG1) << "Closing group because IDR. objHeader: "
<< latestVideo_.object;
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();

// Start new group
latestVideo_.group++;
latestVideo_.object = 0;
}
if (!videoSgPub_) {
auto res = videoPub_->beginSubgroup(
latestVideo_.group, 0, VIDEO_STREAM_PRIORITY);
if (!res) {
XLOG(FATAL) << "Error creating subgroup";
}
videoSgPub_ = std::move(res.value());
}
auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode video frame";
} else {
XLOG(DBG1) << "Sending video frame={" << latestVideo_.group << ","
<< latestVideo_.object
<< "}, payload size: " << objPayload->computeChainDataLength();
videoSgPub_->object(
latestVideo_.object,
std::move(objPayload),
/*finSubgroup=*/false);
}
latestVideo_.object++;
}

folly::coro::Task<void> controlReadLoop() {
Expand Down Expand Up @@ -284,17 +213,88 @@ class MoQFlvStreamerClient {
XLOG(INFO) << "Session closed";
}

private:
static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */
static const uint8_t VIDEO_STREAM_PRIORITY = 200;
void PublishAudio(
TrackAlias trackAlias,
std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->isEOF) {
XLOG(INFO) << "FLV audio received EOF";
return;
}
auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode audio frame";
return;
}
ObjectHeader objHeader = ObjectHeader{
trackAlias,
latestAudio_.group++,
/*subgroup=*/0,
latestAudio_.object,
AUDIO_STREAM_PRIORITY,
ForwardPreference::Subgroup,
ObjectStatus::NORMAL};

XLOG(DBG1) << "Sending audio frame" << objHeader
<< ", payload size: " << objPayload->computeChainDataLength();
audioPub_->objectStream(objHeader, std::move(objPayload));
}

void PublishVideo(std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->isEOF) {
XLOG(INFO) << "FLV video received EOF";
if (videoPub_ && videoSgPub_) {
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();

latestVideo_.group++;
latestVideo_.object = 0;
}
return;
}

auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode video frame";
return;
}

if (!item->isIdr && !videoSgPub_) {
XLOG(INFO) << "Discarding non-IDR frame before subgroup started";
return;
}

static bool isAnyElementSent(const AbsoluteLocation& loc) {
if (loc.group == 0 && loc.object == 0) {
return false;
if (item->isIdr) {
if (videoSgPub_) {
// Close previous subgroup
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();
latestVideo_.group++;
latestVideo_.object = 0;
}
// Open new subgroup
auto res = videoPub_->beginSubgroup(
latestVideo_.group, 0, VIDEO_STREAM_PRIORITY);
if (!res) {
XLOG(ERR) << "Error creating subgroup";
}
videoSgPub_ = std::move(res.value());
}

// Send video data
if (videoSgPub_) {
XLOG(DBG1) << "Sending video frame. grp-obj: " << latestVideo_.group
<< "-" << latestVideo_.object
<< ". Payload size: " << objPayload->computeChainDataLength();
videoSgPub_->object(latestVideo_.object++, std::move(objPayload));
} else {
XLOG(ERR) << "Should not happen";
}
return true;
}

private:
static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */
static const uint8_t VIDEO_STREAM_PRIORITY = 200;

static std::unique_ptr<folly::IOBuf> encodeToMoQMi(
std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->type == FlvSequentialReader::MediaType::VIDEO) {
Expand Down Expand Up @@ -326,7 +326,6 @@ class MoQFlvStreamerClient {
}

MoQClient moqClient_;
folly::EventBase* evb_;
TrackNamespace trackNamespace_;
FullTrackName fullVideoTrackName_;
FullTrackName fullAudioTrackName_;
Expand Down

0 comments on commit 5c165d2

Please sign in to comment.