Skip to content

Commit

Permalink
Add an absolute cork (min) threshold for WINDOW_UPDATE
Browse files Browse the repository at this point in the history
Summary: Changed HTTPTransaction logic to check whether the minimum of 128KB and half the recv FC window has been delivered to the application/handler, effectively resulting in more frequent WINDOW_UPDATE frames.

Reviewed By: afrind

Differential Revision: D66999563

fbshipit-source-id: e5dac20bdfa8c4a6ae3e9879348bfba765d7169e
  • Loading branch information
Joanna Jo authored and facebook-github-bot committed Dec 14, 2024
1 parent 7025775 commit 7e851fc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace {
const int64_t kApproximateMTU = 1400;
const std::chrono::seconds kRateLimitMaxDelay(10);
const uint64_t kMaxBufferPerTxn = 65536;
constexpr uint32_t kMinThreshold = 128 * 1024;

using namespace proxygen;
HTTPException stateMachineError(HTTPException::Direction dir, std::string msg) {
Expand Down Expand Up @@ -298,6 +299,7 @@ bool HTTPTransaction::updateContentLengthRemaining(size_t len) {
void HTTPTransaction::onIngressBody(unique_ptr<IOBuf> chain, uint16_t padding) {
FOLLY_SCOPED_TRACE_SECTION("HTTPTransaction - onIngressBody");
DestructorGuard g(this);
VLOG(6) << __func__ << " chain_length=" << chain->computeChainDataLength();
if (isIngressEOMSeen()) {
std::stringstream ss;
// Use stringstream to invoke operator << for this
Expand Down Expand Up @@ -369,7 +371,8 @@ void HTTPTransaction::processIngressBody(unique_ptr<IOBuf> chain, size_t len) {
// closed
divisor = 1;
}
if (uint32_t(recvToAck_) >= (recvWindow_.getCapacity() / divisor)) {
if (uint32_t(recvToAck_) >= kMinThreshold ||
uint32_t(recvToAck_) >= (recvWindow_.getCapacity() / divisor)) {
flushWindowUpdate();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,16 @@ class MockHTTPHandler
}
}

void expectBodyRepeatedly(
const std::function<void()>& callback = std::function<void()>()) {
if (callback) {
EXPECT_CALL(*this, _onBodyWithOffset(testing::_, testing::_))
.WillRepeatedly(testing::InvokeWithoutArgs(callback));
} else {
EXPECT_CALL(*this, _onBodyWithOffset(testing::_, testing::_));
}
}

void expectBody(
std::function<void(uint64_t, std::shared_ptr<folly::IOBuf>)> callback) {
EXPECT_CALL(*this, _onBodyWithOffset(testing::_, testing::_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3163,6 +3163,74 @@ TEST_F(HTTP2UpstreamSessionTest, TestPingPreserveData) {
httpSession_->destroy();
}

class H2LargeFlowControl : public HTTP2UpstreamSessionTest {
void SetUp() override {
constexpr uint32_t kCapacity = 1024 * 1024;
flowControl_ = {kCapacity, kCapacity, kCapacity};
HTTP2UpstreamSessionTest::SetUp();
}
};

/*
* Verifies that WINDOW_UPDATE is sent when at least kMinThreshold bytes are
* read.
*/
TEST_F(H2LargeFlowControl, WindowUpdateThresholdTest) {
auto handler = openTransaction();
handler->txn_->pauseIngress(); // buffer incoming bytes
handler->sendRequest(); // getRequest, sends request and eom
eventBase_.loopOnce();
eventBase_.loopOnce();

auto serverCodec = makeServerCodec();
folly::IOBufQueue output(folly::IOBufQueue::cacheChainLength());
serverCodec->generateConnectionPreface(output);
serverCodec->generateSettings(output);

// enqueue 1MB of data into txn
auto resp = makeResponse(200);
serverCodec->generateHeader(output, handler->txn_->getID(), *resp, false);
serverCodec->generateBody(output,
handler->txn_->getID(),
makeBuf(1024 * 1024),
folly::none /* padding */,
false /* eom */);

auto input = output.move();
input->coalesce();
readAndLoop(input->data(),
input->length()); // buffers server's headers and body to txn

uint32_t bytesReadSoFar = 0;
constexpr uint32_t kMinThreshold = 128 * 1024;

handler->expectHeaders();
// setting up expectation, pause ingress after reading 128KB in callback
handler->expectBodyRepeatedly([&] {
bytesReadSoFar += input->computeChainDataLength();
if (bytesReadSoFar >= kMinThreshold) {
handler->txn_->pauseIngress();
}
});

handler->txn_->resumeIngress(); // only sent to the handler after this point
handler->expectBodyRepeatedly([&] {});
handler->expectDetachTransaction();

// expect window update here
NiceMock<MockHTTPCodecCallback> callbacks;
serverCodec->setCallback(&callbacks);
EXPECT_CALL(callbacks, onWindowUpdate(0, _)).Times(AnyNumber());
EXPECT_CALL(callbacks, onWindowUpdate(handler->txn_->getID(), kMinThreshold))
.Times(AtLeast(1));

handler->txn_->resumeIngress();
eventBase_.loop();
handler->txn_->sendAbort();
parseOutput(*serverCodec); // client's buffer -> serverCodec
httpSession_->destroy();
}

TEST_F(HTTP2UpstreamSessionTest, TestConnectionToken) {
auto handler = openTransaction();
handler->expectError();
Expand Down

0 comments on commit 7e851fc

Please sign in to comment.