Skip to content

Commit

Permalink
Implement FETCH streams (#8)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #8

Adds support to MoQFramer, MoQCodec and MoQSession to send and receive fetch streams

Reviewed By: sharmafb

Differential Revision: D65532676

fbshipit-source-id: a667a677c1a9f3af98e9681e508769cf3df050c9
  • Loading branch information
afrind authored and facebook-github-bot committed Dec 6, 2024
1 parent 82bec08 commit 43e3659
Show file tree
Hide file tree
Showing 12 changed files with 734 additions and 54 deletions.
18 changes: 17 additions & 1 deletion moxygen/MoQCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ void MoQObjectStreamCodec::onIngress(
case StreamType::STREAM_HEADER_SUBGROUP:
parseState_ = ParseState::OBJECT_STREAM;
break;
// CONTROL doesn't have a wire type yet.
case StreamType::FETCH_HEADER:
parseState_ = ParseState::FETCH_HEADER;
break;
// CONTROL doesn't have a wire type yet.
default:
XLOG(DBG4) << "Stream not allowed: 0x" << std::setfill('0')
<< std::setw(sizeof(uint64_t) * 2) << std::hex
Expand All @@ -154,6 +157,19 @@ void MoQObjectStreamCodec::onIngress(
}
break;
}
case ParseState::FETCH_HEADER: {
auto newCursor = cursor;
auto res = parseFetchHeader(newCursor);
if (res.hasError()) {
XLOG(DBG6) << __func__ << " " << uint32_t(res.error());
connError_ = res.error();
break;
}
curObjectHeader_.trackIdentifier = SubscribeID(res.value());
parseState_ = ParseState::MULTI_OBJECT_HEADER;
cursor = newCursor;
break;
}
case ParseState::OBJECT_STREAM: {
auto newCursor = cursor;
auto res = parseStreamHeader(newCursor, streamType_);
Expand Down
2 changes: 2 additions & 0 deletions moxygen/MoQCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class MoQObjectStreamCodec : public MoQCodec {
public:
~ObjectCallback() override = default;

virtual void onFetchHeader(uint64_t subscribeID) = 0;
virtual void onObjectHeader(ObjectHeader objectHeader) = 0;

virtual void onObjectPayload(
Expand Down Expand Up @@ -168,6 +169,7 @@ class MoQObjectStreamCodec : public MoQCodec {
STREAM_HEADER_TYPE,
DATAGRAM,
OBJECT_STREAM,
FETCH_HEADER,
MULTI_OBJECT_HEADER,
OBJECT_PAYLOAD,
// OBJECT_PAYLOAD_NO_LENGTH
Expand Down
47 changes: 42 additions & 5 deletions moxygen/MoQFramer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ folly::Expected<ServerSetup, ErrorCode> parseServerSetup(
return serverSetup;
}

folly::Expected<uint64_t, ErrorCode> parseFetchHeader(
folly::io::Cursor& cursor) noexcept {
auto subscribeID = quic::decodeQuicInteger(cursor);
if (!subscribeID) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
return subscribeID->first;
}

folly::Expected<ObjectHeader, ErrorCode> parseObjectHeader(
folly::io::Cursor& cursor,
size_t length) noexcept {
Expand Down Expand Up @@ -287,10 +296,13 @@ folly::Expected<ObjectHeader, ErrorCode> parseMultiObjectHeader(
const ObjectHeader& headerTemplate) noexcept {
DCHECK(
streamType == StreamType::STREAM_HEADER_TRACK ||
streamType == StreamType::STREAM_HEADER_SUBGROUP);
streamType == StreamType::STREAM_HEADER_SUBGROUP ||
streamType == StreamType::FETCH_HEADER);
// TODO get rid of this
auto length = cursor.totalLength();
ObjectHeader objectHeader = headerTemplate;
if (streamType == StreamType::STREAM_HEADER_TRACK) {
if (streamType == StreamType::STREAM_HEADER_TRACK ||
streamType == StreamType::FETCH_HEADER) {
auto group = quic::decodeQuicInteger(cursor, length);
if (!group) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
Expand All @@ -301,12 +313,28 @@ folly::Expected<ObjectHeader, ErrorCode> parseMultiObjectHeader(
} else {
objectHeader.forwardPreference = ForwardPreference::Subgroup;
}
if (streamType == StreamType::FETCH_HEADER) {
objectHeader.forwardPreference = ForwardPreference::Fetch;
auto subgroup = quic::decodeQuicInteger(cursor, length);
if (!subgroup) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
length -= subgroup->second;
objectHeader.subgroup = subgroup->first;
}
auto id = quic::decodeQuicInteger(cursor, length);
if (!id) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
length -= id->second;
objectHeader.id = id->first;
if (streamType == StreamType::FETCH_HEADER) {
if (length < 2) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
objectHeader.priority = cursor.readBE<uint8_t>();
length--;
}
auto payloadLength = quic::decodeQuicInteger(cursor, length);
if (!payloadLength) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
Expand Down Expand Up @@ -1153,6 +1181,9 @@ WriteResult writeStreamHeader(
folly::to_underlying(StreamType::STREAM_HEADER_SUBGROUP),
size,
error);
} else if (objectHeader.forwardPreference == ForwardPreference::Fetch) {
writeVarint(
writeBuf, folly::to_underlying(StreamType::FETCH_HEADER), size, error);
} else {
LOG(FATAL) << "Unsupported forward preference to stream header";
}
Expand All @@ -1161,8 +1192,10 @@ WriteResult writeStreamHeader(
writeVarint(writeBuf, objectHeader.group, size, error);
writeVarint(writeBuf, objectHeader.subgroup, size, error);
}
writeBuf.append(&objectHeader.priority, 1);
size += 1;
if (objectHeader.forwardPreference != ForwardPreference::Fetch) {
writeBuf.append(&objectHeader.priority, 1);
size += 1;
}
if (error) {
return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR);
}
Expand Down Expand Up @@ -1199,12 +1232,16 @@ WriteResult writeObject(
if (objectHeader.forwardPreference != ForwardPreference::Subgroup) {
writeVarint(writeBuf, objectHeader.group, size, error);
}
if (objectHeader.forwardPreference == ForwardPreference::Fetch) {
writeVarint(writeBuf, objectHeader.subgroup, size, error);
}
writeVarint(writeBuf, objectHeader.id, size, error);
CHECK(
objectHeader.status != ObjectStatus::NORMAL ||
(objectHeader.length && *objectHeader.length > 0))
<< "Normal objects require non-zero length";
if (objectHeader.forwardPreference == ForwardPreference::Datagram) {
if (objectHeader.forwardPreference == ForwardPreference::Datagram ||
objectHeader.forwardPreference == ForwardPreference::Fetch) {
writeBuf.append(&objectHeader.priority, 1);
size += 1;
}
Expand Down
8 changes: 6 additions & 2 deletions moxygen/MoQFramer.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ enum class StreamType : uint64_t {
OBJECT_DATAGRAM = 1,
STREAM_HEADER_TRACK = 0x2,
STREAM_HEADER_SUBGROUP = 0x4,
FETCH_HEADER = 0x5,
CONTROL = 100000000
};

Expand Down Expand Up @@ -148,7 +149,7 @@ constexpr uint64_t kVersionDraft06_exp =
constexpr uint64_t kVersionDraft07_exp = 0xff070001; // Draft 7 FETCH support
constexpr uint64_t kVersionDraft07_exp2 =
0xff070002; // Draft 7 FETCH + removal of Subscribe ID on objects
constexpr uint64_t kVersionDraftCurrent = kVersionDraft07_exp2;
constexpr uint64_t kVersionDraftCurrent = kVersionDraft07;

struct ClientSetup {
std::vector<uint64_t> supportedVersions;
Expand All @@ -168,7 +169,7 @@ folly::Expected<ServerSetup, ErrorCode> parseServerSetup(
folly::io::Cursor& cursor,
size_t length) noexcept;

enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram };
enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram, Fetch };

enum class ObjectStatus : uint64_t {
NORMAL = 0,
Expand Down Expand Up @@ -264,6 +265,9 @@ folly::Expected<ObjectHeader, ErrorCode> parseObjectHeader(
folly::io::Cursor& cursor,
size_t length) noexcept;

folly::Expected<uint64_t, ErrorCode> parseFetchHeader(
folly::io::Cursor& cursor) noexcept;

folly::Expected<ObjectHeader, ErrorCode> parseStreamHeader(
folly::io::Cursor& cursor,
StreamType streamType) noexcept;
Expand Down
12 changes: 0 additions & 12 deletions moxygen/MoQServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,6 @@ void MoQServer::ControlVisitor::operator()(Fetch fetch) const {
XLOG(INFO) << "Fetch id=" << fetch.subscribeID;
}

void MoQServer::ControlVisitor::operator()(FetchCancel fetchCancel) const {
XLOG(INFO) << "FetchCancel id=" << fetchCancel.subscribeID;
}

void MoQServer::ControlVisitor::operator()(FetchOk fetchOk) const {
XLOG(INFO) << "FetchOk id=" << fetchOk.subscribeID;
}

void MoQServer::ControlVisitor::operator()(FetchError fetchError) const {
XLOG(INFO) << "FetchError id=" << fetchError.subscribeID;
}

void MoQServer::ControlVisitor::operator()(SubscribeDone subscribeDone) const {
XLOG(INFO) << "SubscribeDone id=" << subscribeDone.subscribeID
<< " code=" << folly::to_underlying(subscribeDone.statusCode)
Expand Down
3 changes: 0 additions & 3 deletions moxygen/MoQServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ class MoQServer : public MoQSession::ServerSetupCallback {
void operator()(SubscribeRequest subscribeReq) const override;
void operator()(SubscribeUpdate subscribeUpdate) const override;
void operator()(Fetch fetch) const override;
void operator()(FetchCancel fetchCancel) const override;
void operator()(FetchOk fetchOk) const override;
void operator()(FetchError fetchError) const override;
void operator()(Unannounce unannounce) const override;
void operator()(AnnounceCancel announceCancel) const override;
void operator()(SubscribeAnnounces subscribeAnnounces) const override;
Expand Down
Loading

0 comments on commit 43e3659

Please sign in to comment.