Skip to content

Commit

Permalink
Use Consumer interface for MoQSession reads (#13)
Browse files Browse the repository at this point in the history
Summary:

This is the second half of the MoQSession rewrite. subscribe and fetch callers now supply a Consumer which the library drives as a callback.

To make the consumer API work required changing the codec callbacks.

The relay now connects a Forwarder (Consumer) to the upstream subscription directly.

Differential Revision: D66881617
  • Loading branch information
afrind authored and facebook-github-bot committed Dec 19, 2024
1 parent b21fed2 commit 42faf08
Show file tree
Hide file tree
Showing 15 changed files with 1,067 additions and 589 deletions.
3 changes: 2 additions & 1 deletion moxygen/MoQClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ void MoQClient::HTTPHandler::onError(
void MoQClient::onSessionEnd(folly::Optional<uint32_t>) {
// TODO: cleanup?
XLOG(DBG1) << "resetting moqSession_";
moqSession_.reset();
auto moqSession = std::move(moqSession_);
moqSession.reset();
CHECK(!moqSession_);
}

Expand Down
126 changes: 80 additions & 46 deletions moxygen/MoQCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ void MoQCodec::onIngressEnd(

void MoQObjectStreamCodec::onIngress(
std::unique_ptr<folly::IOBuf> data,
bool eom) {
bool endOfStream) {
onIngressStart(std::move(data));
folly::io::Cursor cursor(ingress_.front());
bool isFetch = std::get_if<SubscribeID>(&curObjectHeader_.trackIdentifier);
while (!connError_ &&
((ingress_.chainLength() > 0 && !cursor.isAtEnd())/* ||
(eom && parseState_ == ParseState::OBJECT_PAYLOAD_NO_LENGTH)*/)) {
(endOfStream && parseState_ == ParseState::OBJECT_PAYLOAD_NO_LENGTH)*/)) {
switch (parseState_) {
case ParseState::STREAM_HEADER_TYPE: {
auto newCursor = cursor;
Expand Down Expand Up @@ -146,7 +147,12 @@ void MoQObjectStreamCodec::onIngress(
connError_ = res.error();
break;
}
curObjectHeader_.trackIdentifier = SubscribeID(res.value());
auto subscribeID = SubscribeID(res.value());
curObjectHeader_.trackIdentifier = subscribeID;
isFetch = true;
if (callback_) {
callback_->onFetchHeader(subscribeID);
}
parseState_ = ParseState::MULTI_OBJECT_HEADER;
cursor = newCursor;
break;
Expand All @@ -160,6 +166,16 @@ void MoQObjectStreamCodec::onIngress(
break;
}
curObjectHeader_ = res.value();
auto trackAlias =
std::get_if<TrackAlias>(&curObjectHeader_.trackIdentifier);
XCHECK(trackAlias);
if (callback_) {
callback_->onSubgroup(
*trackAlias,
curObjectHeader_.group,
curObjectHeader_.subgroup,
curObjectHeader_.priority);
}
parseState_ = ParseState::MULTI_OBJECT_HEADER;
cursor = newCursor;
[[fallthrough]];
Expand All @@ -174,84 +190,102 @@ void MoQObjectStreamCodec::onIngress(
break;
}
curObjectHeader_ = res.value();
if (callback_) {
callback_->onObjectHeader(std::move(res.value()));
}
cursor = newCursor;
if (curObjectHeader_.status == ObjectStatus::NORMAL) {
parseState_ = ParseState::OBJECT_PAYLOAD;
XLOG(DBG2) << "Parsing object with length, need="
<< *curObjectHeader_.length
<< " have=" << cursor.totalLength();
std::unique_ptr<folly::IOBuf> payload;
auto chunkLen = cursor.cloneAtMost(payload, *curObjectHeader_.length);
auto endOfObject = chunkLen == *curObjectHeader_.length;
if (endOfStream && !endOfObject) {
connError_ = ErrorCode::PARSE_ERROR;
XLOG(DBG4) << __func__ << " " << uint32_t(*connError_);
break;
}
if (callback_) {
callback_->onObjectBegin(
curObjectHeader_.group,
curObjectHeader_.subgroup,
curObjectHeader_.id,
*curObjectHeader_.length,
std::move(payload),
endOfObject,
endOfStream && cursor.isAtEnd());
}
*curObjectHeader_.length -= chunkLen;
if (endOfObject) {
if (endOfStream && cursor.isAtEnd()) {
parseState_ = ParseState::STREAM_FIN_DELIVERED;
} else {
parseState_ = ParseState::MULTI_OBJECT_HEADER;
}
break;
} else {
parseState_ = ParseState::OBJECT_PAYLOAD;
}
} else {
parseState_ = ParseState::MULTI_OBJECT_HEADER;
if (callback_) {
callback_->onObjectStatus(
curObjectHeader_.group,
curObjectHeader_.subgroup,
curObjectHeader_.id,
curObjectHeader_.status);
}
if (curObjectHeader_.status == ObjectStatus::END_OF_TRACK_AND_GROUP ||
(!isFetch &&
curObjectHeader_.status == ObjectStatus::END_OF_GROUP)) {
parseState_ = ParseState::STREAM_FIN_DELIVERED;
} else {
parseState_ = ParseState::MULTI_OBJECT_HEADER;
}
break;
}
[[fallthrough]];
}
case ParseState::OBJECT_PAYLOAD: {
auto newCursor = cursor;
// need to check for bufLen == 0?
std::unique_ptr<folly::IOBuf> payload;
// TODO: skip clone and do split
uint64_t chunkLen = 0;
XCHECK(curObjectHeader_.length);
XLOG(DBG2) << "Parsing object with length, need="
<< *curObjectHeader_.length;
if (ingress_.chainLength() > 0 && newCursor.canAdvance(1)) {
chunkLen = newCursor.cloneAtMost(payload, *curObjectHeader_.length);
if (ingress_.chainLength() > 0 && cursor.canAdvance(1)) {
chunkLen = cursor.cloneAtMost(payload, *curObjectHeader_.length);
}
*curObjectHeader_.length -= chunkLen;
if (eom && *curObjectHeader_.length != 0) {
if (endOfStream && *curObjectHeader_.length != 0) {
connError_ = ErrorCode::PARSE_ERROR;
XLOG(DBG4) << __func__ << " " << uint32_t(*connError_);
break;
}
bool endOfObject = (*curObjectHeader_.length == 0);
if (callback_ && (payload || endOfObject)) {
callback_->onObjectPayload(
curObjectHeader_.trackIdentifier,
curObjectHeader_.group,
curObjectHeader_.id,
std::move(payload),
endOfObject);
callback_->onObjectPayload(std::move(payload), endOfObject);
}
if (endOfObject) {
parseState_ = ParseState::MULTI_OBJECT_HEADER;
}
cursor = newCursor;
break;
}
#if 0
// This code is no longer reachable, but I'm leaving it here in case
// the wire format changes back
case ParseState::OBJECT_PAYLOAD_NO_LENGTH: {
auto newCursor = cursor;
// need to check for bufLen == 0?
std::unique_ptr<folly::IOBuf> payload;
// TODO: skip clone and do split
if (ingress_.chainLength() > 0 && newCursor.canAdvance(1)) {
newCursor.cloneAtMost(payload, std::numeric_limits<uint64_t>::max());
}
XCHECK(!curObjectHeader_.length);
if (callback_ && (payload || eom)) {
callback_->onObjectPayload(
curObjectHeader_.trackIdentifier,
curObjectHeader_.group,
curObjectHeader_.id,
std::move(payload),
eom);
}
if (eom) {
parseState_ = ParseState::FRAME_HEADER_TYPE;
}
cursor = newCursor;
case ParseState::STREAM_FIN_DELIVERED: {
XLOG(DBG2) << "Bytes=" << cursor.totalLength()
<< " remaining in STREAM_FIN_DELIVERED";
connError_ = ErrorCode::PARSE_ERROR;
break;
}
#endif
}
}
size_t remainingLength = 0;
if (!eom && !cursor.isAtEnd()) {
if (!endOfStream && !cursor.isAtEnd()) {
remainingLength = cursor.totalLength(); // must be less than 1 message
}
onIngressEnd(remainingLength, eom, callback_);
if (endOfStream && parseState_ != ParseState::STREAM_FIN_DELIVERED &&
!connError_ && callback_) {
callback_->onEndOfStream();
}
onIngressEnd(remainingLength, endOfStream, callback_);
}

folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
Expand Down
35 changes: 22 additions & 13 deletions moxygen/MoQCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,27 @@ class MoQObjectStreamCodec : public MoQCodec {
public:
~ObjectCallback() override = default;

virtual void onFetchHeader(uint64_t subscribeID) = 0;
virtual void onObjectHeader(ObjectHeader objectHeader) = 0;

virtual void onObjectPayload(
TrackIdentifier trackIdentifier,
uint64_t groupID,
uint64_t id,
std::unique_ptr<folly::IOBuf> payload,
bool eom) = 0;
virtual void onFetchHeader(SubscribeID subscribeID) = 0;
virtual void onSubgroup(
TrackAlias alias,
uint64_t group,
uint64_t subgroup,
uint8_t priority) = 0;
virtual void onObjectBegin(
uint64_t group,
uint64_t subgroup,
uint64_t objectID,
uint64_t length,
Payload initialPayload,
bool objectComplete,
bool subgroupComplete) = 0;
virtual void onObjectStatus(
uint64_t group,
uint64_t subgroup,
uint64_t objectID,
ObjectStatus status) = 0;
virtual void onObjectPayload(Payload payload, bool objectComplete) = 0;
virtual void onEndOfStream() = 0;
};

MoQObjectStreamCodec(ObjectCallback* callback) : callback_(callback) {}
Expand All @@ -160,17 +172,14 @@ class MoQObjectStreamCodec : public MoQCodec {

void onIngress(std::unique_ptr<folly::IOBuf> data, bool eom) override;

TrackIdentifier getTrackIdentifier() const {
return curObjectHeader_.trackIdentifier;
}

private:
enum class ParseState {
STREAM_HEADER_TYPE,
OBJECT_STREAM,
FETCH_HEADER,
MULTI_OBJECT_HEADER,
OBJECT_PAYLOAD,
STREAM_FIN_DELIVERED,
// OBJECT_PAYLOAD_NO_LENGTH
};
ParseState parseState_{ParseState::STREAM_HEADER_TYPE};
Expand Down
6 changes: 0 additions & 6 deletions moxygen/MoQServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@ void MoQServer::ControlVisitor::operator()(Fetch fetch) const {
XLOG(INFO) << "Fetch id=" << fetch.subscribeID;
}

void MoQServer::ControlVisitor::operator()(SubscribeDone subscribeDone) const {
XLOG(INFO) << "SubscribeDone id=" << subscribeDone.subscribeID
<< " code=" << folly::to_underlying(subscribeDone.statusCode)
<< " reason=" << subscribeDone.reasonPhrase;
}

void MoQServer::ControlVisitor::operator()(Unsubscribe unsubscribe) const {
XLOG(INFO) << "Unsubscribe id=" << unsubscribe.subscribeID;
}
Expand Down
1 change: 0 additions & 1 deletion moxygen/MoQServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class MoQServer : public MoQSession::ServerSetupCallback {
void operator()(AnnounceCancel announceCancel) const override;
void operator()(SubscribeAnnounces subscribeAnnounces) const override;
void operator()(UnsubscribeAnnounces unsubscribeAnnounces) const override;
void operator()(SubscribeDone subscribeDone) const override;
void operator()(Unsubscribe unsubscribe) const override;
void operator()(TrackStatusRequest trackStatusRequest) const override;
void operator()(TrackStatus trackStatus) const override;
Expand Down
Loading

0 comments on commit 42faf08

Please sign in to comment.