Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to MoQFlvStreamerClient to work with refectored MoQSession + MoQFlvReceiverClient (to test) #14

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 93 additions & 94 deletions moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class MoQFlvStreamerClient {
std::move(url),
(FLAGS_quic_transport ? MoQClient::TransportType::QUIC
: MoQClient::TransportType::H3_WEBTRANSPORT)),
evb_(evb),
fullVideoTrackName_(std::move(fvtn)),
fullAudioTrackName_(std::move(fatn)) {}

Expand All @@ -63,11 +62,7 @@ class MoQFlvStreamerClient {
if (annResp.hasValue()) {
trackNamespace_ = annResp->trackNamespace;

publishLoop()
.scheduleOn(folly::getGlobalIOExecutor())
.start()
.via(evb_)
.thenTry([this](auto&&) { stop(); });
folly::getGlobalIOExecutor()->add([this] { publishLoop(); });
} else {
XLOG(INFO) << "Announce error trackNamespace="
<< annResp->trackNamespace
Expand All @@ -89,11 +84,11 @@ class MoQFlvStreamerClient {
}
}

folly::coro::Task<void> publishLoop() {
void publishLoop() {
XLOG(INFO) << __func__;
auto g =
folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; });

folly::Executor::KeepAlive keepAlive(moqClient_.getEventBase());
FlvSequentialReader flvSeqReader(FLAGS_input_flv_file);
while (moqClient_.moqSession_) {
auto item = flvSeqReader.getNextItem();
Expand All @@ -102,57 +97,29 @@ class MoQFlvStreamerClient {
break;
}
for (auto& sub : subscriptions_) {
XLOG(DBG1) << "Sending item: " << item->id
XLOG(DBG1) << "Evaluating to send item: " << item->id
<< ", type: " << folly::to_underlying(item->type)
<< ", to subID-TrackAlias: " << sub.second.subscribeID << "-"
<< sub.second.trackAlias;

if (sub.second.fullTrackName == fullVideoTrackName_ && videoPub_) {
if (item->isEOF &&
MoQFlvStreamerClient::isAnyElementSent(latestVideo_)) {
// EOF detected and an some video element was sent, close group
moqClient_.getEventBase()->runInEventBaseThread([this] {
latestVideo_.object++,
XLOG(DBG1)
<< "Closing group because EOF. objId=" << latestVideo_.object;
if (videoSgPub_) {
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();
}
});
}

if (item->type == FlvSequentialReader::MediaType::VIDEO) {
// Video
if (item->data &&
(item->type == FlvSequentialReader::MediaType::VIDEO ||
item->isEOF)) {
// Send audio data in a thread (stream per object)
moqClient_.getEventBase()->runInEventBaseThread(
[this, item]() mutable { publishVideoItem(std::move(item)); });
[self(this), item] { self->PublishVideo(item); });
}
}
if (sub.second.fullTrackName == fullAudioTrackName_ &&
item->type == FlvSequentialReader::MediaType::AUDIO && audioPub_) {
if (sub.second.fullTrackName == fullAudioTrackName_ && audioPub_) {
// Audio
if (item->data) {
if (item->data &&
(item->type == FlvSequentialReader::MediaType::AUDIO ||
item->isEOF)) {
// Send audio data in a thread (stream per object)
ObjectHeader objHeader = ObjectHeader{
sub.second.trackAlias,
latestAudio_.group++,
/*subgroup=*/0,
latestAudio_.object,
AUDIO_STREAM_PRIORITY,
ForwardPreference::Subgroup,
ObjectStatus::NORMAL};

moqClient_.getEventBase()->runInEventBaseThread(
[this, objHeader, item] {
auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode audio frame";
} else {
XLOG(DBG1) << "Sending audio frame" << objHeader
<< ", payload size: "
<< objPayload->computeChainDataLength();
audioPub_->objectStream(objHeader, std::move(objPayload));
}
[self(this), trackAlias(sub.second.trackAlias), item] {
self->PublishAudio(trackAlias, item);
});
}
}
Expand All @@ -162,44 +129,6 @@ class MoQFlvStreamerClient {
break;
}
}
co_return;
}

void publishVideoItem(std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->isIdr && MoQFlvStreamerClient::isAnyElementSent(latestVideo_) &&
videoSgPub_) {
// Close group
latestVideo_.object++,
XLOG(DBG1) << "Closing group because IDR. objHeader: "
<< latestVideo_.object;
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();

// Start new group
latestVideo_.group++;
latestVideo_.object = 0;
}
if (!videoSgPub_) {
auto res = videoPub_->beginSubgroup(
latestVideo_.group, 0, VIDEO_STREAM_PRIORITY);
if (!res) {
XLOG(FATAL) << "Error creating subgroup";
}
videoSgPub_ = std::move(res.value());
}
auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode video frame";
} else {
XLOG(DBG1) << "Sending video frame={" << latestVideo_.group << ","
<< latestVideo_.object
<< "}, payload size: " << objPayload->computeChainDataLength();
videoSgPub_->object(
latestVideo_.object,
std::move(objPayload),
/*finSubgroup=*/false);
}
latestVideo_.object++;
}

folly::coro::Task<void> controlReadLoop() {
Expand Down Expand Up @@ -284,17 +213,88 @@ class MoQFlvStreamerClient {
XLOG(INFO) << "Session closed";
}

private:
static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */
static const uint8_t VIDEO_STREAM_PRIORITY = 200;
void PublishAudio(
TrackAlias trackAlias,
std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->isEOF) {
XLOG(INFO) << "FLV audio received EOF";
return;
}
auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode audio frame";
return;
}
ObjectHeader objHeader = ObjectHeader{
trackAlias,
latestAudio_.group++,
/*subgroup=*/0,
latestAudio_.object,
AUDIO_STREAM_PRIORITY,
ForwardPreference::Subgroup,
ObjectStatus::NORMAL};

XLOG(DBG1) << "Sending audio frame" << objHeader
<< ", payload size: " << objPayload->computeChainDataLength();
audioPub_->objectStream(objHeader, std::move(objPayload));
}

void PublishVideo(std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->isEOF) {
XLOG(INFO) << "FLV video received EOF";
if (videoPub_ && videoSgPub_) {
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();

latestVideo_.group++;
latestVideo_.object = 0;
}
return;
}

auto objPayload = encodeToMoQMi(item);
if (!objPayload) {
XLOG(ERR) << "Failed to encode video frame";
return;
}

if (!item->isIdr && !videoSgPub_) {
XLOG(INFO) << "Discarding non-IDR frame before subgroup started";
return;
}

static bool isAnyElementSent(const AbsoluteLocation& loc) {
if (loc.group == 0 && loc.object == 0) {
return false;
if (item->isIdr) {
if (videoSgPub_) {
// Close previous subgroup
videoSgPub_->endOfGroup(latestVideo_.object);
videoSgPub_.reset();
latestVideo_.group++;
latestVideo_.object = 0;
}
// Open new subgroup
auto res = videoPub_->beginSubgroup(
latestVideo_.group, 0, VIDEO_STREAM_PRIORITY);
if (!res) {
XLOG(ERR) << "Error creating subgroup";
}
videoSgPub_ = std::move(res.value());
}

// Send video data
if (videoSgPub_) {
XLOG(DBG1) << "Sending video frame. grp-obj: " << latestVideo_.group
<< "-" << latestVideo_.object
<< ". Payload size: " << objPayload->computeChainDataLength();
videoSgPub_->object(latestVideo_.object++, std::move(objPayload));
} else {
XLOG(ERR) << "Should not happen";
}
return true;
}

private:
static const uint8_t AUDIO_STREAM_PRIORITY = 100; /* Lower is higher pri */
static const uint8_t VIDEO_STREAM_PRIORITY = 200;

static std::unique_ptr<folly::IOBuf> encodeToMoQMi(
std::shared_ptr<FlvSequentialReader::MediaItem> item) {
if (item->type == FlvSequentialReader::MediaType::VIDEO) {
Expand Down Expand Up @@ -326,7 +326,6 @@ class MoQFlvStreamerClient {
}

MoQClient moqClient_;
folly::EventBase* evb_;
TrackNamespace trackNamespace_;
FullTrackName fullVideoTrackName_;
FullTrackName fullAudioTrackName_;
Expand Down
Loading