From 9ec2c0b3841a9801978b76febe661bdcbba36e2c Mon Sep 17 00:00:00 2001 From: Jacob Steinebronn Date: Fri, 13 Dec 2024 13:16:45 -0800 Subject: [PATCH] HTTPTunnelSink -> HTTPConnectSink Summary: The naming of "HTTP Tunnel" was confusing folks, conflating it with the tunnel service. Hopefully this rename will be less confusing Reviewed By: hanidamlaj Differential Revision: D67170085 fbshipit-source-id: 802252a77d108e178e30ce3e5e7fe8da04c907db --- .../lib/http/sink/HTTPConnectSink.cpp | 192 +++++++++++++++ .../proxygen/lib/http/sink/HTTPConnectSink.h | 218 ++++++++++++++++++ ...elSinkTest.cpp => HTTPConnectSinkTest.cpp} | 38 +-- 3 files changed, 429 insertions(+), 19 deletions(-) create mode 100644 third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.cpp create mode 100644 third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.h rename third-party/proxygen/src/proxygen/lib/http/sink/test/{HTTPTunnelSinkTest.cpp => HTTPConnectSinkTest.cpp} (89%) diff --git a/third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.cpp b/third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.cpp new file mode 100644 index 00000000000000..e72362cc4935b4 --- /dev/null +++ b/third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.cpp @@ -0,0 +1,192 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include + +namespace proxygen { + +constexpr uint16_t kMinReadSize = 1460; +constexpr uint16_t kMaxReadSize = 4000; +constexpr uint8_t kMaxOutstandingWrites = 1; + +void HTTPConnectSink::detachAndAbortIfIncomplete( + std::unique_ptr self) { + sock_->setReadCB(nullptr); + handler_ = nullptr; + // If we haven't seen either EOM, sock is still active so close it + if (!egressEOMSeen_ && !ingressEOMRead_) { + sock_->closeWithReset(); + } + XCHECK(self.get() == this); + if (outstandingWrites_ > 0) { + destroyOnWriteComplete_ = true; + void(self.release()); + } +} + +void HTTPConnectSink::sendBody(std::unique_ptr body) { + DestructorCheck::Safety safety(*this); + resetIdleTimeout(); + ++outstandingWrites_; + sock_->writeChain(this, std::move(body)); + if (safety.destroyed()) { + return; + } + if (outstandingWrites_ >= kMaxOutstandingWrites && !handlerEgressPaused_) { + handlerEgressPaused_ = true; + handler_->onEgressPaused(); + } +} + +void HTTPConnectSink::sendEOM() { + sock_->shutdownWrite(); + egressEOMSeen_ = true; + if (ingressEOMRead_) { + handler_->detachTransaction(); + } +} + +bool HTTPConnectSink::isEgressEOMSeen() { + return egressEOMSeen_; +} + +void HTTPConnectSink::sendAbort() { + sock_->closeWithReset(); + handler_->detachTransaction(); +} + +void HTTPConnectSink::getCurrentTransportInfo( + wangle::TransportInfo* tinfo) const { + auto sock = sock_->getUnderlyingTransport(); + if (sock) { + tinfo->initWithSocket(sock); +#if defined(__linux__) || defined(__FreeBSD__) + tinfo->readTcpCongestionControl(sock); + tinfo->readMaxPacingRate(sock); +#endif // defined(__linux__) || defined(__FreeBSD__) + tinfo->totalBytes = sock->getRawBytesWritten(); + } +} + +void HTTPConnectSink::pauseIngress() { + sock_->setReadCB(nullptr); +} + +void HTTPConnectSink::resumeIngress() { + sock_->setReadCB(this); +} + +[[nodiscard]] bool HTTPConnectSink::isIngressPaused() const { + return sock_->getReadCallback() == nullptr; +} + +[[nodiscard]] bool HTTPConnectSink::isEgressPaused() const { + return outstandingWrites_ >= kMaxOutstandingWrites; +} + +void HTTPConnectSink::timeoutExpired() noexcept { + XLOG(DBG4) << "Closing socket now"; + sock_->closeNow(); + if (handler_) { + DestructorCheck::Safety safety(*this); + handler_->onError(HTTPException( + HTTPException::Direction::INGRESS_AND_EGRESS, "Idle timeout expired")); + if (!safety.destroyed() && handler_) { + handler_->detachTransaction(); + } + } + idleTimeout_ = std::chrono::milliseconds(0); +} + +void HTTPConnectSink::setIdleTimeout(std::chrono::milliseconds timeout) { + if (timeout.count() != 0) { + idleTimeout_ = timeout; + resetIdleTimeout(); + } +} + +// ReadCallback methods +void HTTPConnectSink::getReadBuffer(void** buf, size_t* bufSize) { + std::pair readSpace = + readBuf_.preallocate(kMinReadSize, kMaxReadSize); + *buf = readSpace.first; + *bufSize = readSpace.second; +} + +void HTTPConnectSink::readDataAvailable(size_t readSize) noexcept { + resetIdleTimeout(); + readBuf_.postallocate(readSize); + while (!readBuf_.empty()) { + // Skip any 0 length buffers. Since readBuf_ is not empty, we are + // guaranteed to find a non-empty buffer. + while (readBuf_.front()->length() == 0) { + readBuf_.pop_front(); + } + handler_->onBody(readBuf_.pop_front()); + } +} + +void HTTPConnectSink::readEOF() noexcept { + DestructorCheck::Safety safety(*this); + ingressEOMRead_ = true; + handler_->onEOM(); + if (!safety.destroyed() && egressEOMSeen_ && handler_) { + handler_->detachTransaction(); + } +} + +void HTTPConnectSink::readErr(const folly::AsyncSocketException& err) noexcept { + DestructorCheck::Safety safety(*this); + handler_->onError( + HTTPException(HTTPException::Direction::INGRESS_AND_EGRESS, err.what())); + if (!safety.destroyed() && handler_) { + handler_->detachTransaction(); + } +} + +// Returns true if this sink is destroyed +bool HTTPConnectSink::writeComplete() { + outstandingWrites_--; + if (outstandingWrites_ == 0 && destroyOnWriteComplete_) { + delete this; + return true; + } + + return false; +} + +// WriteCallback methods +void HTTPConnectSink::writeSuccess() noexcept { + bool destroyed = writeComplete(); + if (!destroyed) { + // If we drop below the max outstanding writes, resume egress + if (outstandingWrites_ < kMaxOutstandingWrites && handlerEgressPaused_ && + handler_) { + handlerEgressPaused_ = false; + handler_->onEgressResumed(); + } + resetIdleTimeout(); + } +} + +void HTTPConnectSink::writeErr( + size_t, const folly::AsyncSocketException& err) noexcept { + bool destroyed = writeComplete(); + if (!destroyed && handler_) { + DestructorCheck::Safety safety(*this); + handler_->onError(HTTPException( + HTTPException::Direction::INGRESS_AND_EGRESS, err.what())); + if (!safety.destroyed() && handler_) { + handler_->detachTransaction(); + } + } +} + +} // namespace proxygen diff --git a/third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.h b/third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.h new file mode 100644 index 00000000000000..20b92c85f1640e --- /dev/null +++ b/third-party/proxygen/src/proxygen/lib/http/sink/HTTPConnectSink.h @@ -0,0 +1,218 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once +#include +#include +#include +#include +#include + +namespace proxygen { + +/** + * A HTTPConnectSink writes data straight to an AsyncTransport. + */ +class HTTPConnectSink + : public HTTPSink + , public folly::DestructorCheck + , public folly::AsyncTransport::ReadCallback + , private folly::AsyncTransport::WriteCallback + , private folly::HHWheelTimer::Callback { + + public: + explicit HTTPConnectSink(folly::AsyncTransport::UniquePtr socket, + HTTPTransactionHandler* handler) + : sock_(std::move(socket)), handler_(handler) { + CHECK(sock_); + sock_->getLocalAddress(&localAddress_); + sock_->getPeerAddress(&peerAddress_); + } + + ~HTTPConnectSink() override = default; + + [[nodiscard]] folly::Optional getStreamID() + const override { + return folly::none; + } + + [[nodiscard]] CodecProtocol getCodecProtocol() const override { + // This is meaningless :( + return CodecProtocol::HTTP_1_1; + } + + [[nodiscard]] const folly::SocketAddress& getLocalAddress() const override { + return localAddress_; + } + + [[nodiscard]] const folly::SocketAddress& getPeerAddress() const override { + return peerAddress_; + } + + [[nodiscard]] const folly::AsyncTransportCertificate* getPeerCertificate() + const override { + return sock_->getPeerCertificate(); + } + + [[nodiscard]] folly::Optional getHTTPPriority() const override { + return folly::none; + } + + [[nodiscard]] int getTCPTransportFD() const override { + return CHECK_NOTNULL(sock_->getUnderlyingTransport()) + ->getNetworkSocket() + .toFd(); + } + + [[nodiscard]] quic::QuicSocket* getQUICTransport() const override { + return nullptr; + } + + [[nodiscard]] std::chrono::seconds getSessionIdleDuration() const override { + return std::chrono::seconds(0); + } + + void getCurrentFlowControlInfo(FlowControlInfo*) const override { + } + + [[nodiscard]] CompressionInfo getHeaderCompressionInfo() const override { + return {}; + } + + void detachAndAbortIfIncomplete(std::unique_ptr self) override; + // Sending data + void sendHeaders(const HTTPMessage&) override { + } + + bool sendHeadersWithDelegate(const HTTPMessage&, + std::unique_ptr) override { + return true; + } + + void sendHeadersWithEOM(const HTTPMessage&) override { + sendEOM(); + } + + void sendHeadersWithOptionalEOM(const HTTPMessage&, bool eom) override { + if (eom) { + sendEOM(); + } + } + + void sendBody(std::unique_ptr body) override; + + void sendChunkHeader(size_t) override { + } + + void sendChunkTerminator() override { + } + + void sendTrailers(const HTTPHeaders&) override { + } + + void sendPadding(uint16_t) override { + } + + void sendEOM() override; + + bool isEgressEOMSeen() override; + + void sendAbort() override; + + void updateAndSendPriority(HTTPPriority) override { + } + + bool trackEgressBodyOffset(uint64_t, ByteEventFlags) override { + return false; + } + + // Check state + [[nodiscard]] bool canSendHeaders() const override { + return sock_->good(); + } + + const wangle::TransportInfo& getSetupTransportInfo() const noexcept override { + return transportInfo_; + } + + void getCurrentTransportInfo(wangle::TransportInfo* tinfo) const override; + + // Flow control + void pauseIngress() override; + + void resumeIngress() override; + + [[nodiscard]] bool isIngressPaused() const override; + + [[nodiscard]] bool isEgressPaused() const override; + + void setEgressRateLimit(uint64_t) override { + } + + void timeoutExpired() noexcept override; + + void setIdleTimeout(std::chrono::milliseconds timeout) override; + + // Capabilities + bool safeToUpgrade(HTTPMessage*) const override { + return false; + } + + [[nodiscard]] bool supportsPush() const override { + return false; + } + + // Logging + void describe(std::ostream& os) override { + os << "HTTPConnectSink on " << localAddress_ << " to " << peerAddress_; + } + + // Callback methods + void getReadBuffer(void** buf, size_t* bufSize) override; + + void readDataAvailable(size_t readSize) noexcept override; + + void readEOF() noexcept override; + + void readErr(const folly::AsyncSocketException& err) noexcept override; + + void writeSuccess() noexcept override; + + void writeErr(size_t, + const folly::AsyncSocketException& err) noexcept override; + + private: + void resetIdleTimeout() { + cancelTimeout(); + if (idleTimeout_.count()) { + sock_->getEventBase()->timer().scheduleTimeout(this, idleTimeout_); + } + } + + // Returns true if this sink was destroyed + bool writeComplete(); + + folly::AsyncTransport::UniquePtr sock_; + folly::SocketAddress peerAddress_; + folly::SocketAddress localAddress_; + wangle::TransportInfo transportInfo_; + + /** Chain of ingress IOBufs */ + folly::IOBufQueue readBuf_{folly::IOBufQueue::cacheChainLength()}; + HTTPTransactionHandler* handler_{nullptr}; + + std::chrono::milliseconds idleTimeout_{0}; + bool destroyOnWriteComplete_{false}; + uint8_t outstandingWrites_{0}; + + bool ingressEOMRead_{false}; + bool egressEOMSeen_{false}; + bool handlerEgressPaused_{false}; +}; + +} // namespace proxygen diff --git a/third-party/proxygen/src/proxygen/lib/http/sink/test/HTTPTunnelSinkTest.cpp b/third-party/proxygen/src/proxygen/lib/http/sink/test/HTTPConnectSinkTest.cpp similarity index 89% rename from third-party/proxygen/src/proxygen/lib/http/sink/test/HTTPTunnelSinkTest.cpp rename to third-party/proxygen/src/proxygen/lib/http/sink/test/HTTPConnectSinkTest.cpp index 7792a52e77080b..94724df6ac9621 100644 --- a/third-party/proxygen/src/proxygen/lib/http/sink/test/HTTPTunnelSinkTest.cpp +++ b/third-party/proxygen/src/proxygen/lib/http/sink/test/HTTPConnectSinkTest.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include "proxygen/facebook/revproxy/test/MockHTTPTransactionHandler.h" @@ -19,10 +19,10 @@ using namespace testing; using namespace folly::test; // Subclass which tracks deletion -class TestHTTPTunnelSink : public HTTPTunnelSink { +class TestHTTPConnectSink : public HTTPConnectSink { public: - using HTTPTunnelSink::HTTPTunnelSink; - ~TestHTTPTunnelSink() override { + using HTTPConnectSink::HTTPConnectSink; + ~TestHTTPConnectSink() override { if (isDeleted != nullptr) { *isDeleted = true; } @@ -30,13 +30,13 @@ class TestHTTPTunnelSink : public HTTPTunnelSink { std::shared_ptr isDeleted; }; -class HTTPTunnelSinkTest : public Test { +class HTTPConnectSinkTest : public Test { public: void SetUp() override { evb_ = folly::EventBaseManager::get()->getEventBase(); mockSocket_ = new MockAsyncTransport(); mockHandler_ = new MockHTTPTransactionHandler(); - sink_ = std::unique_ptr(new TestHTTPTunnelSink( + sink_ = std::unique_ptr(new TestHTTPConnectSink( folly::AsyncTransport::UniquePtr(mockSocket_), mockHandler_)); sinkDeleted_ = std::make_shared(false); sink_->isDeleted = sinkDeleted_; @@ -55,12 +55,12 @@ class HTTPTunnelSinkTest : public Test { std::shared_ptr sinkDeleted_; - std::unique_ptr sink_; + std::unique_ptr sink_; }; using WriteCB = folly::AsyncTransport::WriteCallback; -TEST_F(HTTPTunnelSinkTest, test_egress_backpressure) { +TEST_F(HTTPConnectSinkTest, test_egress_backpressure) { // Test that egress backpressure is propagated to the handler // Mock underlying write to just store the callbacks for later @@ -97,7 +97,7 @@ TEST_F(HTTPTunnelSinkTest, test_egress_backpressure) { sink_->sendBody(folly::IOBuf::copyBuffer("We made it!")); } -TEST_F(HTTPTunnelSinkTest, test_ingress_control) { +TEST_F(HTTPConnectSinkTest, test_ingress_control) { folly::AsyncTransport::ReadCallback *read_cb = sink_.get(); EXPECT_CALL(*mockSocket_, setReadCB).WillRepeatedly(Invoke([&](auto cb) { read_cb = cb; @@ -113,7 +113,7 @@ TEST_F(HTTPTunnelSinkTest, test_ingress_control) { EXPECT_EQ(read_cb, sink_.get()); } -TEST_F(HTTPTunnelSinkTest, test_bytes_read) { +TEST_F(HTTPConnectSinkTest, test_bytes_read) { // Test that read bytes are propagated to the handler std::string bytes = "Shoelace is a cute dog!"; size_t num_bytes = bytes.size(); @@ -140,7 +140,7 @@ void sleep_evb(folly::EventBase *evb, std::chrono::milliseconds timeout) { } }; -TEST_F(HTTPTunnelSinkTest, test_idle_timeout) { +TEST_F(HTTPConnectSinkTest, test_idle_timeout) { EXPECT_CALL(*mockSocket_, getEventBase).WillRepeatedly(Invoke([&]() { return evb_; })); @@ -175,7 +175,7 @@ TEST_F(HTTPTunnelSinkTest, test_idle_timeout) { sleep_evb(evb_, std::chrono::milliseconds(8)); } -TEST_F(HTTPTunnelSinkTest, test_delayed_destruction) { +TEST_F(HTTPConnectSinkTest, test_delayed_destruction) { // Mock underlying write to just store the callbacks for later std::vector writeCallbacks; EXPECT_CALL(*mockSocket_, writeChain) @@ -195,7 +195,7 @@ TEST_F(HTTPTunnelSinkTest, test_delayed_destruction) { EXPECT_EQ(*sinkDeleted_, true); } -TEST_F(HTTPTunnelSinkTest, test_destruction_on_abort) { +TEST_F(HTTPConnectSinkTest, test_destruction_on_abort) { EXPECT_CALL(*mockSocket_, writeChain) .WillRepeatedly( Invoke([](WriteCB *cb, auto &&, auto &&) { cb->writeSuccess(); })); @@ -208,7 +208,7 @@ TEST_F(HTTPTunnelSinkTest, test_destruction_on_abort) { EXPECT_EQ(*sinkDeleted_, true); } -TEST_F(HTTPTunnelSinkTest, test_ingress_eom) { +TEST_F(HTTPConnectSinkTest, test_ingress_eom) { // expect handler onEOM EXPECT_CALL(*mockHandler_, onEOM).WillOnce(Return()); sink_->readEOF(); @@ -221,20 +221,20 @@ TEST_F(HTTPTunnelSinkTest, test_ingress_eom) { EXPECT_TRUE(sink_->isEgressEOMSeen()); } -TEST_F(HTTPTunnelSinkTest, test_send_abort) { +TEST_F(HTTPConnectSinkTest, test_send_abort) { EXPECT_CALL(*mockHandler_, detachTransaction).WillOnce(Return()); EXPECT_CALL(*mockSocket_, closeWithReset).WillOnce(Return()); sink_->sendAbort(); } -TEST_F(HTTPTunnelSinkTest, test_read_error) { +TEST_F(HTTPConnectSinkTest, test_read_error) { EXPECT_CALL(*mockHandler_, onError).WillOnce(Return()); EXPECT_CALL(*mockHandler_, detachTransaction).WillOnce(Return()); sink_->readErr(folly::AsyncSocketException( folly::AsyncSocketException::AsyncSocketExceptionType::UNKNOWN, "test")); } -TEST_F(HTTPTunnelSinkTest, test_read_error_with_abort) { +TEST_F(HTTPConnectSinkTest, test_read_error_with_abort) { EXPECT_CALL(*mockHandler_, onError).WillOnce(Invoke([&](auto &&) { sink_->detachAndAbortIfIncomplete(std::move(sink_)); })); @@ -243,7 +243,7 @@ TEST_F(HTTPTunnelSinkTest, test_read_error_with_abort) { folly::AsyncSocketException::AsyncSocketExceptionType::UNKNOWN, "test")); } -TEST_F(HTTPTunnelSinkTest, test_write_error) { +TEST_F(HTTPConnectSinkTest, test_write_error) { EXPECT_CALL(*mockSocket_, writeChain) .WillRepeatedly(Invoke([](WriteCB *cb, auto &&, auto &&) { cb->writeErr( @@ -257,7 +257,7 @@ TEST_F(HTTPTunnelSinkTest, test_write_error) { sink_->sendBody(folly::IOBuf::copyBuffer("hello world")); } -TEST_F(HTTPTunnelSinkTest, test_write_error_with_abort) { +TEST_F(HTTPConnectSinkTest, test_write_error_with_abort) { EXPECT_CALL(*mockSocket_, writeChain) .WillRepeatedly(Invoke([](WriteCB *cb, auto &&, auto &&) { cb->writeErr(