Skip to content

Commit

Permalink
Handle publishing non-normal object status
Browse files Browse the repository at this point in the history
Summary: As in title

Reviewed By: roticv

Differential Revision:
D59650047

Privacy Context Container: L1222497

fbshipit-source-id: b2f0c8e23b4d7a365d69ca86a343e71672c1e983
  • Loading branch information
afrind authored and facebook-github-bot committed Jul 12, 2024
1 parent cd66a74 commit 659d006
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
22 changes: 20 additions & 2 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ folly::coro::Task<void> MoQSession::readLoop(
codec.onIngress(std::move(streamData->data), streamData->fin);
}
fin = streamData->fin;
XLOG_IF(DBG3, fin) << "End of stream";
}
}
}
Expand Down Expand Up @@ -507,6 +508,20 @@ void MoQSession::publish(
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom) {
XCHECK_EQ(objHeader.status, ObjectStatus::NORMAL);
publishImpl(objHeader, payloadOffset, std::move(payload), eom);
}

void MoQSession::publishStatus(const ObjectHeader& objHeader) {
XCHECK_NE(objHeader.status, ObjectStatus::NORMAL);
publishImpl(objHeader, 0, nullptr, true);
}

void MoQSession::publishImpl(
const ObjectHeader& objHeader,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom) {
XLOG(DBG1) << __func__ << " sid=" << objHeader.subscribeID
<< " t=" << objHeader.trackAlias << " g=" << objHeader.group
<< " o=" << objHeader.id;
Expand Down Expand Up @@ -625,7 +640,11 @@ void MoQSession::publish(
publishDataMap_.erase(pubDataIt);
} else {
bool streamEOM =
eom && objHeader.forwardPreference == ForwardPreference::Object;
(eom && objHeader.forwardPreference == ForwardPreference::Object) ||
(objHeader.status == ObjectStatus::END_OF_GROUP ||
objHeader.status == ObjectStatus::END_OF_TRACK_AND_GROUP);
XLOG_IF(DBG1, streamEOM) << "End of stream";
// TODO: verify that pubDataIt->second.objectLength is empty or 0
wt_->writeStreamData(
pubDataIt->second.streamID, writeBuf.move(), streamEOM);
if (streamEOM) {
Expand All @@ -634,7 +653,6 @@ void MoQSession::publish(
if (eom) {
pubDataIt->second.offset = 0;
pubDataIt->second.objectLength.reset();
// TODO: we never close multi-object streams
} else {
pubDataIt->second.offset += payloadLength;
if (pubDataIt->second.objectLength) {
Expand Down
7 changes: 7 additions & 0 deletions moxygen/MoQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class MoQSession : public MoQCodec::Callback {
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom);
void publishStatus(const ObjectHeader& objHeader);

void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh);
void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh);
Expand Down Expand Up @@ -288,6 +289,12 @@ class MoQSession : public MoQCodec::Callback {
void onGoaway(Goaway goaway) override;
void onConnectionError(ErrorCode error) override;

void publishImpl(
const ObjectHeader& objHeader,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom);

struct PublishKey {
uint64_t subscribeID;
uint64_t group;
Expand Down
6 changes: 5 additions & 1 deletion moxygen/relay/MoQForwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@ class MoQForwarder {
eom]() mutable {
objHeader.subscribeID = subId;
objHeader.trackAlias = trackAlias;
session->publish(objHeader, payloadOffset, std::move(buf), eom);
if (objHeader.status != ObjectStatus::NORMAL) {
session->publishStatus(objHeader);
} else {
session->publish(objHeader, payloadOffset, std::move(buf), eom);
}
});
it++;
}
Expand Down

0 comments on commit 659d006

Please sign in to comment.