Skip to content

Commit

Permalink
Add length to control messages
Browse files Browse the repository at this point in the history
Summary:
This adds an explicit length to all control messages.  It's encoded as a varint, which is annoying.  moxygen always writes it as a 2 byte varint, and fails writing control messages longer than 2^14.

This removes all the calls to cursor.totalLength() from the control stream, which was a bit of a DoS vector.  There's still one on the object-stream path, which we can work on removing as we iterate.

This diff started as a backout of D57702981.

Reviewed By: yuandagits

Differential Revision: D63587022

fbshipit-source-id: df291d4d289efc69a645547605e81ec5860f6c43
  • Loading branch information
afrind authored and facebook-github-bot committed Oct 1, 2024
1 parent 583ec92 commit 3395237
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 214 deletions.
98 changes: 61 additions & 37 deletions moxygen/MoQCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ void MoQCodec::onIngressStart(std::unique_ptr<folly::IOBuf> data) {

void MoQControlCodec::onIngress(std::unique_ptr<folly::IOBuf> data, bool eom) {
onIngressStart(std::move(data));
size_t remainingLength = ingress_.chainLength();
folly::io::Cursor cursor(ingress_.front());
while (!connError_ && (ingress_.chainLength() > 0 && !cursor.isAtEnd())) {
while (!connError_ && remainingLength > 0) {
switch (parseState_) {
case ParseState::FRAME_HEADER_TYPE: {
auto newCursor = cursor;
auto type = quic::decodeQuicInteger(newCursor);
auto type = quic::decodeQuicInteger(cursor);
if (!type) {
XLOG(DBG1) << __func__ << " underflow";
XLOG(DBG6) << __func__ << " underflow";
connError_ = ErrorCode::PARSE_UNDERFLOW;
break;
}
cursor = newCursor;
curFrameType_ = FrameType(type->first);
remainingLength -= type->second;
auto res = checkFrameAllowed(curFrameType_);
if (!res) {
XLOG(DBG4) << "Frame not allowed: 0x" << std::setfill('0')
Expand All @@ -42,34 +42,54 @@ void MoQControlCodec::onIngress(std::unique_ptr<folly::IOBuf> data, bool eom) {
if (callback_) {
callback_->onFrame(curFrameType_);
}
parseState_ = ParseState::FRAME_LENGTH;
[[fallthrough]];
}
case ParseState::FRAME_LENGTH: {
auto length = quic::decodeQuicInteger(cursor);
if (!length) {
XLOG(DBG6) << __func__ << " underflow";
connError_ = ErrorCode::PARSE_UNDERFLOW;
break;
}
curFrameLength_ = length->first;
remainingLength -= length->second;
parseState_ = ParseState::FRAME_PAYLOAD;
[[fallthrough]];
}
case ParseState::FRAME_PAYLOAD: {
auto newCursor = cursor;
auto res = parseFrame(newCursor);
if (remainingLength < curFrameLength_) {
XLOG(DBG6) << __func__ << " underflow";
connError_ = ErrorCode::PARSE_UNDERFLOW;
break;
}
auto res = parseFrame(cursor);
if (res.hasError()) {
XLOG(DBG1) << __func__ << " " << uint32_t(res.error());
connError_ = res.error();
XLOG(DBG6) << __func__ << " " << uint32_t(res.error());
if (res.error() == ErrorCode::PARSE_UNDERFLOW) {
XLOG(ERR) << "Frame underflow -> parse error";
connError_ = ErrorCode::PARSE_ERROR;
} else {
connError_ = res.error();
}
break;
}
parseState_ = ParseState::FRAME_HEADER_TYPE;
cursor = newCursor;
remainingLength -= curFrameLength_;
break;
}
}
}
onIngressEnd(cursor, eom, callback_);
onIngressEnd(remainingLength, eom, callback_);
}

void MoQCodec::onIngressEnd(
folly::io::Cursor& cursor,
size_t remainingLength,
bool eom,
Callback* callback) {
if (connError_) {
if (connError_.value() == ErrorCode::PARSE_UNDERFLOW && !eom) {
auto remainingLen = cursor.totalLength(); // must be less than 1 message
ingress_.trimStart(ingress_.chainLength() - remainingLen);
ingress_.trimStart(ingress_.chainLength() - remainingLength);
connError_.reset();
return;
} else if (callback) {
Expand All @@ -94,7 +114,7 @@ void MoQObjectStreamCodec::onIngress(
auto newCursor = cursor;
auto type = quic::decodeQuicInteger(newCursor);
if (!type) {
XLOG(DBG1) << __func__ << " underflow";
XLOG(DBG6) << __func__ << " underflow";
connError_ = ErrorCode::PARSE_UNDERFLOW;
break;
}
Expand All @@ -120,9 +140,9 @@ void MoQObjectStreamCodec::onIngress(
}
case ParseState::DATAGRAM: {
auto newCursor = cursor;
auto res = parseObjectHeader(newCursor);
auto res = parseObjectHeader(newCursor, ingress_.chainLength());
if (res.hasError()) {
XLOG(DBG1) << __func__ << " " << uint32_t(res.error());
XLOG(DBG6) << __func__ << " " << uint32_t(res.error());
connError_ = res.error();
break;
}
Expand All @@ -138,7 +158,7 @@ void MoQObjectStreamCodec::onIngress(
auto newCursor = cursor;
auto res = parseStreamHeader(newCursor, streamType_);
if (res.hasError()) {
XLOG(DBG1) << __func__ << " " << uint32_t(res.error());
XLOG(DBG6) << __func__ << " " << uint32_t(res.error());
connError_ = res.error();
break;
}
Expand All @@ -152,7 +172,7 @@ void MoQObjectStreamCodec::onIngress(
auto res =
parseMultiObjectHeader(newCursor, streamType_, curObjectHeader_);
if (res.hasError()) {
XLOG(DBG1) << __func__ << " " << uint32_t(res.error());
XLOG(DBG6) << __func__ << " " << uint32_t(res.error());
connError_ = res.error();
break;
}
Expand Down Expand Up @@ -184,7 +204,7 @@ void MoQObjectStreamCodec::onIngress(
*curObjectHeader_.length -= chunkLen;
if (eom && *curObjectHeader_.length != 0) {
connError_ = ErrorCode::PARSE_ERROR;
XLOG(DBG1) << __func__ << " " << uint32_t(*connError_);
XLOG(DBG4) << __func__ << " " << uint32_t(*connError_);
break;
}
bool endOfObject = (*curObjectHeader_.length == 0);
Expand Down Expand Up @@ -232,15 +252,19 @@ void MoQObjectStreamCodec::onIngress(
#endif
}
}
onIngressEnd(cursor, eom, callback_);
size_t remainingLength = 0;
if (!eom && !cursor.isAtEnd()) {
remainingLength = cursor.totalLength(); // must be less than 1 message
}
onIngressEnd(remainingLength, eom, callback_);
}

folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
folly::io::Cursor& cursor) {
XLOG(DBG4) << "parsing frame type=" << folly::to_underlying(curFrameType_);
switch (curFrameType_) {
case FrameType::CLIENT_SETUP: {
auto res = parseClientSetup(cursor);
auto res = parseClientSetup(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onClientSetup(std::move(res.value()));
Expand All @@ -251,7 +275,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::SERVER_SETUP: {
auto res = parseServerSetup(cursor);
auto res = parseServerSetup(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onServerSetup(std::move(res.value()));
Expand All @@ -262,7 +286,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::SUBSCRIBE: {
auto res = parseSubscribeRequest(cursor);
auto res = parseSubscribeRequest(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onSubscribe(std::move(res.value()));
Expand All @@ -273,7 +297,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::SUBSCRIBE_UPDATE: {
auto res = parseSubscribeUpdate(cursor);
auto res = parseSubscribeUpdate(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onSubscribeUpdate(std::move(res.value()));
Expand All @@ -284,7 +308,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::SUBSCRIBE_OK: {
auto res = parseSubscribeOk(cursor);
auto res = parseSubscribeOk(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onSubscribeOk(std::move(res.value()));
Expand All @@ -295,7 +319,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::SUBSCRIBE_ERROR: {
auto res = parseSubscribeError(cursor);
auto res = parseSubscribeError(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onSubscribeError(std::move(res.value()));
Expand All @@ -306,7 +330,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::UNSUBSCRIBE: {
auto res = parseUnsubscribe(cursor);
auto res = parseUnsubscribe(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onUnsubscribe(std::move(res.value()));
Expand All @@ -317,7 +341,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::SUBSCRIBE_DONE: {
auto res = parseSubscribeDone(cursor);
auto res = parseSubscribeDone(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onSubscribeDone(std::move(res.value()));
Expand All @@ -328,7 +352,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::ANNOUNCE: {
auto res = parseAnnounce(cursor);
auto res = parseAnnounce(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onAnnounce(std::move(res.value()));
Expand All @@ -339,7 +363,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::ANNOUNCE_OK: {
auto res = parseAnnounceOk(cursor);
auto res = parseAnnounceOk(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onAnnounceOk(std::move(res.value()));
Expand All @@ -350,7 +374,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::ANNOUNCE_ERROR: {
auto res = parseAnnounceError(cursor);
auto res = parseAnnounceError(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onAnnounceError(std::move(res.value()));
Expand All @@ -361,7 +385,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::UNANNOUNCE: {
auto res = parseUnannounce(cursor);
auto res = parseUnannounce(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onUnannounce(std::move(res.value()));
Expand All @@ -372,7 +396,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::ANNOUNCE_CANCEL: {
auto res = parseAnnounceCancel(cursor);
auto res = parseAnnounceCancel(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onAnnounceCancel(std::move(res.value()));
Expand All @@ -383,7 +407,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::TRACK_STATUS_REQUEST: {
auto res = parseTrackStatusRequest(cursor);
auto res = parseTrackStatusRequest(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onTrackStatusRequest(std::move(res.value()));
Expand All @@ -394,7 +418,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::TRACK_STATUS: {
auto res = parseTrackStatus(cursor);
auto res = parseTrackStatus(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onTrackStatus(std::move(res.value()));
Expand All @@ -405,7 +429,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(
break;
}
case FrameType::GOAWAY: {
auto res = parseGoaway(cursor);
auto res = parseGoaway(cursor, curFrameLength_);
if (res) {
if (callback_) {
callback_->onGoaway(std::move(res.value()));
Expand Down
4 changes: 3 additions & 1 deletion moxygen/MoQCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MoQCodec {

protected:
void onIngressStart(std::unique_ptr<folly::IOBuf> data);
void onIngressEnd(folly::io::Cursor& cursor, bool eom, Callback* callback);
void onIngressEnd(size_t remainingLength, bool eom, Callback* callback);

uint64_t streamId_{std::numeric_limits<uint64_t>::max()};
folly::IOBufQueue ingress_{folly::IOBufQueue::cacheChainLength()};
Expand Down Expand Up @@ -103,8 +103,10 @@ class MoQControlCodec : public MoQCodec {
Direction dir_;
ControlCallback* callback_{nullptr};
FrameType curFrameType_;
size_t curFrameLength_{0};
enum class ParseState {
FRAME_HEADER_TYPE,
FRAME_LENGTH,
FRAME_PAYLOAD,
};
ParseState parseState_{ParseState::FRAME_HEADER_TYPE};
Expand Down
Loading

0 comments on commit 3395237

Please sign in to comment.