diff --git a/src/etlng/CMakeLists.txt b/src/etlng/CMakeLists.txt index f54f2dd39..5d9e29158 100644 --- a/src/etlng/CMakeLists.txt +++ b/src/etlng/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(clio_etlng) -target_sources(clio_etlng PRIVATE impl/Extraction.cpp) +target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp) target_link_libraries(clio_etlng PUBLIC clio_data) diff --git a/src/etlng/InitialLoadObserverInterface.hpp b/src/etlng/InitialLoadObserverInterface.hpp new file mode 100644 index 000000000..1aa164b7c --- /dev/null +++ b/src/etlng/InitialLoadObserverInterface.hpp @@ -0,0 +1,54 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etlng/Models.hpp" + +#include + +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief The interface for observing the initial ledger load + */ +struct InitialLoadObserverInterface { + virtual ~InitialLoadObserverInterface() = default; + + /** + * @brief Callback for each incoming batch of objects during initial ledger load + * + * @param seq The sequence for this batch of objects + * @param data The batch of objects + * @param lastKey The last key of the previous batch if there was one + */ + virtual void + onInitialLoadGotMoreObjects( + uint32_t seq, + std::vector const& data, + std::optional lastKey = std::nullopt + ) = 0; +}; + +} // namespace etlng diff --git a/src/etlng/impl/AsyncGrpcCall.cpp b/src/etlng/impl/AsyncGrpcCall.cpp new file mode 100644 index 000000000..6fd44dced --- /dev/null +++ b/src/etlng/impl/AsyncGrpcCall.cpp @@ -0,0 +1,188 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/impl/AsyncGrpcCall.hpp" + +#include "etl/ETLHelpers.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Models.hpp" +#include "etlng/impl/Extraction.hpp" +#include "util/Assert.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace etlng::impl { + +AsyncGrpcCall::AsyncGrpcCall( + uint32_t seq, + ripple::uint256 const& marker, + std::optional const& nextMarker +) +{ + request_.set_user("ETL"); + request_.mutable_ledger()->set_sequence(seq); + + if (marker.isNonZero()) + request_.set_marker(marker.data(), ripple::uint256::size()); + + nextPrefix_ = nextMarker ? nextMarker->data()[0] : 0x00; + auto const prefix = marker.data()[0]; + + LOG(log_.debug()) << "Setting up AsyncGrpcCall. marker = " << ripple::strHex(marker) + << ". prefix = " << ripple::strHex(std::string(1, prefix)) + << ". nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_)); + + ASSERT( + nextPrefix_ > prefix or nextPrefix_ == 0x00, + "Next prefix must be greater than current prefix. Got: nextPrefix_ = {}, prefix = {}", + nextPrefix_, + prefix + ); + + cur_ = std::make_unique(); + next_ = std::make_unique(); + context_ = std::make_unique(); +} + +AsyncGrpcCall::CallStatus +AsyncGrpcCall::process( + std::unique_ptr& stub, + grpc::CompletionQueue& cq, + etlng::InitialLoadObserverInterface& loader, + bool abort +) +{ + LOG(log_.trace()) << "Processing response. " + << "Marker prefix = " << getMarkerPrefix(); + + if (abort) { + LOG(log_.error()) << "AsyncGrpcCall aborted"; + return CallStatus::ERRORED; + } + + if (!status_.ok()) { + LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code() + << " message = " << status_.error_message(); + + return CallStatus::ERRORED; + } + + if (!next_->is_unlimited()) { + LOG(log_.warn()) << "AsyncGrpcCall is_unlimited is false. " + << "Make sure secure_gateway is set correctly at the ETL source"; + } + + std::swap(cur_, next_); + auto more = true; + + // if no marker returned, we are done + if (cur_->marker().empty()) + more = false; + + // if returned marker is greater than our end, we are done + auto const prefix = cur_->marker()[0]; + if (nextPrefix_ != 0x00 && prefix >= nextPrefix_) + more = false; + + // if we are not done, make the next async call + if (more) { + request_.set_marker(cur_->marker()); + call(stub, cq); + } + + auto const numObjects = cur_->ledger_objects().objects_size(); + std::vector data; + data.reserve(numObjects); + + for (int i = 0; i < numObjects; ++i) { + auto obj = std::move(*(cur_->mutable_ledger_objects()->mutable_objects(i))); + if (!more && nextPrefix_ != 0x00) { + if (static_cast(obj.key()[0]) >= nextPrefix_) + continue; + } + + lastKey_ = obj.key(); // this will end up the last key we actually touched eventually + data.push_back(etlng::impl::extractObj(std::move(obj))); + } + + if (not data.empty()) + loader.onInitialLoadGotMoreObjects(request_.ledger().sequence(), data, predecessorKey_); + + predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left + // off at so that we can link the two lists correctly + + return more ? CallStatus::MORE : CallStatus::DONE; +} + +void +AsyncGrpcCall::call(std::unique_ptr& stub, grpc::CompletionQueue& cq) +{ + context_ = std::make_unique(); + auto rpc = stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq); + + rpc->StartCall(); + rpc->Finish(next_.get(), &status_, this); +} + +std::string +AsyncGrpcCall::getMarkerPrefix() +{ + return next_->marker().empty() ? std::string{} : ripple::strHex(std::string{next_->marker().data()[0]}); +} + +// this is used to generate edgeKeys - keys that were the last one in the onInitialObjects list +// then we write them all in one go getting the successor from the cache once it's full +std::string +AsyncGrpcCall::getLastKey() +{ + return lastKey_; +} + +std::vector +AsyncGrpcCall::makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers) +{ + auto const markers = etl::getMarkers(numMarkers); + + std::vector result; + result.reserve(markers.size()); + + for (size_t i = 0; i + 1 < markers.size(); ++i) + result.emplace_back(sequence, markers[i], markers[i + 1]); + + if (not markers.empty()) + result.emplace_back(sequence, markers.back(), std::nullopt); + + return result; +} + +} // namespace etlng::impl diff --git a/src/etlng/impl/AsyncGrpcCall.hpp b/src/etlng/impl/AsyncGrpcCall.hpp new file mode 100644 index 000000000..8cb3edb84 --- /dev/null +++ b/src/etlng/impl/AsyncGrpcCall.hpp @@ -0,0 +1,85 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etlng/InitialLoadObserverInterface.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace etlng::impl { + +class AsyncGrpcCall { +public: + enum class CallStatus { MORE, DONE, ERRORED }; + using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest; + using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse; + using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub; + +private: + util::Logger log_{"ETL"}; + + std::unique_ptr cur_; + std::unique_ptr next_; + + RequestType request_; + std::unique_ptr context_; + + grpc::Status status_; + unsigned char nextPrefix_; + + std::string lastKey_; + std::optional predecessorKey_; + +public: + AsyncGrpcCall(uint32_t seq, ripple::uint256 const& marker, std::optional const& nextMarker); + + static std::vector + makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers); + + CallStatus + process( + std::unique_ptr& stub, + grpc::CompletionQueue& cq, + etlng::InitialLoadObserverInterface& loader, + bool abort + ); + + void + call(std::unique_ptr& stub, grpc::CompletionQueue& cq); + + std::string + getMarkerPrefix(); + + std::string + getLastKey(); +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/GrpcSource.cpp b/src/etlng/impl/GrpcSource.cpp new file mode 100644 index 000000000..126eb7136 --- /dev/null +++ b/src/etlng/impl/GrpcSource.cpp @@ -0,0 +1,162 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/impl/GrpcSource.hpp" + +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/impl/AsyncGrpcCall.hpp" +#include "util/Assert.hpp" +#include "util/log/Logger.hpp" +#include "web/Resolver.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +std::string +resolve(std::string const& ip, std::string const& port) +{ + web::Resolver resolver; + + if (auto const results = resolver.resolve(ip, port); not results.empty()) { + std::cout << "resolved ip: '" << results.at(0) << '\n'; + return results.at(0); + } + + throw std::runtime_error("Failed to resolve " + ip + ":" + port); +} + +} // namespace + +namespace etlng::impl { + +GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort) + : log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)) +{ + try { + grpc::ChannelArguments chArgs; + chArgs.SetMaxReceiveMessageSize(-1); + + stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub( + grpc::CreateCustomChannel(resolve(ip, grpcPort), grpc::InsecureChannelCredentials(), chArgs) + ); + + LOG(log_.debug()) << "Made stub for remote."; + } catch (std::exception const& e) { + LOG(log_.warn()) << "Exception while creating stub: " << e.what() << "."; + } +} + +std::pair +GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighbors) +{ + org::xrpl::rpc::v1::GetLedgerResponse response; + if (!stub_) + return {{grpc::StatusCode::INTERNAL, "No Stub"}, response}; + + // Ledger header with txns and metadata + org::xrpl::rpc::v1::GetLedgerRequest request; + grpc::ClientContext context; + + request.mutable_ledger()->set_sequence(sequence); + request.set_transactions(true); + request.set_expand(true); + request.set_get_objects(getObjects); + request.set_get_object_neighbors(getObjectNeighbors); + request.set_user("ETL"); + + grpc::Status const status = stub_->GetLedger(&context, request, &response); + + if (status.ok() and not response.is_unlimited()) { + log_.warn() << "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. Status = " + << status.error_message(); + } + + return {status, std::move(response)}; +} + +std::pair, bool> +GrpcSource::loadInitialLedger( + uint32_t const sequence, + uint32_t const numMarkers, + etlng::InitialLoadObserverInterface& observer +) +{ + if (!stub_) + return {{}, false}; + + std::vector calls = AsyncGrpcCall::makeAsyncCalls(sequence, numMarkers); + + LOG(log_.debug()) << "Starting data download for ledger " << sequence << "."; + + grpc::CompletionQueue queue; + for (auto& call : calls) + call.call(stub_, queue); + + std::vector edgeKeys; + void* tag = nullptr; + bool ok = false; + bool abort = false; + size_t numFinished = 0; + + while (numFinished < calls.size() && queue.Next(&tag, &ok)) { + ASSERT(tag != nullptr, "Tag can't be null."); + auto ptr = static_cast(tag); + + if (!ok) { + LOG(log_.error()) << "loadInitialLedger - ok is false"; + return {{}, false}; // cancelled + } + + LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix(); + + auto result = ptr->process(stub_, queue, observer, abort); + if (result != AsyncGrpcCall::CallStatus::MORE) { + ++numFinished; + LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished; + + if (auto lastKey = ptr->getLastKey(); !lastKey.empty()) + edgeKeys.push_back(std::move(lastKey)); + } + + if (result == AsyncGrpcCall::CallStatus::ERRORED) + abort = true; + } + + return {std::move(edgeKeys), !abort}; +} + +} // namespace etlng::impl diff --git a/src/etlng/impl/GrpcSource.hpp b/src/etlng/impl/GrpcSource.hpp new file mode 100644 index 000000000..0111f3330 --- /dev/null +++ b/src/etlng/impl/GrpcSource.hpp @@ -0,0 +1,70 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etlng/InitialLoadObserverInterface.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace etlng::impl { + +class GrpcSource { + util::Logger log_; + std::unique_ptr stub_; + +public: + GrpcSource(std::string const& ip, std::string const& grpcPort); + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param sequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true + * @param getObjectNeighbors Whether to request object neighbors; defaults to false + * @return A std::pair of the response status and the response itself + */ + std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false); + + /** + * @brief Download a ledger in full. + * + * @param sequence Sequence of the ledger to download + * @param numMarkers Number of markers to generate for async calls + * @param observer InitialLoadObserverInterface implementation + * @return A std::pair of the data and a bool indicating whether the download was successful + */ + std::pair, bool> + loadInitialLedger(uint32_t sequence, uint32_t numMarkers, etlng::InitialLoadObserverInterface& observer); +}; + +} // namespace etlng::impl diff --git a/src/web/Resolver.cpp b/src/web/Resolver.cpp index f5e9ea25b..33588154a 100644 --- a/src/web/Resolver.cpp +++ b/src/web/Resolver.cpp @@ -19,10 +19,15 @@ #include "web/Resolver.hpp" +#include "util/Assert.hpp" + #include +#include +#include #include #include +#include #include namespace asio = boost::asio; @@ -50,19 +55,51 @@ isAddress(std::string_view hostname) return ec == boost::system::errc::success; } +std::string +toString(auto const& endpoint) +{ + std::stringstream ss; + ss << endpoint; + return ss.str(); +} + } // namespace std::vector Resolver::resolve(std::string_view hostname, std::string_view service) { - if (isAddress(hostname)) { + ASSERT(not service.empty(), "Service is unspecified. Use `resolve(hostname)` instead."); + + if (isAddress(hostname)) + return {std::string(hostname) + ':' + std::string(service)}; + + std::vector endpoints; + for (auto const& endpoint : doResolve(hostname, service)) + endpoints.push_back(toString(endpoint)); + + return endpoints; +} + +std::vector +Resolver::resolve(std::string_view hostname) +{ + if (isAddress(hostname)) return {std::string(hostname)}; - } std::vector endpoints; - for (auto const& endpoint : resolver_.resolve(hostname, service)) { - endpoints.push_back(endpoint.endpoint().address().to_string()); - } + for (auto const& endpoint : doResolve(hostname, "")) + endpoints.push_back(endpoint.address().to_string()); + + return endpoints; +} + +std::vector +Resolver::doResolve(std::string_view hostname, std::string_view service) +{ + std::vector endpoints; + for (auto&& endpoint : resolver_.resolve(hostname, service)) + endpoints.push_back(std::move(endpoint)); + return endpoints; } diff --git a/src/web/Resolver.hpp b/src/web/Resolver.hpp index 2ed643c56..9612a45dd 100644 --- a/src/web/Resolver.hpp +++ b/src/web/Resolver.hpp @@ -37,12 +37,16 @@ template concept SomeResolver = requires(T t) { std::is_default_constructible_v; { t.resolve(std::string_view{}, std::string_view{}) } -> std::same_as>; + { t.resolve(std::string_view{}) } -> std::same_as>; }; /** * @brief Simple hostnames to IP addresses resolver. */ class Resolver { + boost::asio::io_context ioContext_; + boost::asio::ip::tcp::resolver resolver_{ioContext_}; + public: /** * @brief Resolve hostname to IP addresses. @@ -50,15 +54,26 @@ class Resolver { * @throw This method throws an exception when the hostname cannot be resolved. * * @param hostname Hostname to resolve - * @param service Service to resolve (could be empty or port number or http) * @return IP addresses of the hostname */ std::vector - resolve(std::string_view hostname, std::string_view service = ""); + resolve(std::string_view hostname); + + /** + * @brief Resolve to IP addresses with port. + * + * @throw This method throws an exception when the hostname cannot be resolved. + * + * @param hostname Hostname to resolve + * @param service Service to resolve + * @return IP addresses of the hostname + */ + std::vector + resolve(std::string_view hostname, std::string_view service); private: - boost::asio::io_context ioContext_; - boost::asio::ip::tcp::resolver resolver_{ioContext_}; + std::vector + doResolve(std::string_view hostname, std::string_view service); }; } // namespace web diff --git a/src/web/dosguard/WhitelistHandler.hpp b/src/web/dosguard/WhitelistHandler.hpp index d6f4f845f..e4d72b9b3 100644 --- a/src/web/dosguard/WhitelistHandler.hpp +++ b/src/web/dosguard/WhitelistHandler.hpp @@ -134,7 +134,7 @@ class WhitelistHandler : public WhitelistHandlerInterface { // resolve hostnames to ips std::unordered_set ips; for (auto const& hostname : hostnames) { - auto resolvedIps = resolver.resolve(hostname, ""); + auto resolvedIps = resolver.resolve(hostname); for (auto& ip : resolvedIps) { ips.insert(std::move(ip)); } diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 867a9a805..01045e716 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -32,8 +32,9 @@ target_sources( etl/SubscriptionSourceTests.cpp etl/TransformerTests.cpp # ETLng - etlng/RegistryTests.cpp etlng/ExtractionTests.cpp + etlng/GrpcSourceTests.cpp + etlng/RegistryTests.cpp # Feed feed/BookChangesFeedTests.cpp feed/ForwardFeedTests.cpp diff --git a/tests/unit/etl/GrpcSourceTests.cpp b/tests/unit/etl/GrpcSourceTests.cpp index d59b791ee..8f41aff95 100644 --- a/tests/unit/etl/GrpcSourceTests.cpp +++ b/tests/unit/etl/GrpcSourceTests.cpp @@ -49,7 +49,7 @@ struct GrpcSourceTests : NoLoggerFixture, util::prometheus::WithPrometheus, test } std::shared_ptr> mockBackend_; - testing::StrictMock grpcSource_; + GrpcSource grpcSource_; }; TEST_F(GrpcSourceTests, fetchLedger) @@ -82,14 +82,14 @@ TEST_F(GrpcSourceTests, fetchLedger) TEST_F(GrpcSourceTests, fetchLedgerNoStub) { - testing::StrictMock wrongGrpcSource{"wrong", "wrong", mockBackend_}; + GrpcSource wrongGrpcSource{"wrong", "wrong", mockBackend_}; auto const [status, _response] = wrongGrpcSource.fetchLedger(0, false, false); EXPECT_EQ(status.error_code(), grpc::StatusCode::INTERNAL); } TEST_F(GrpcSourceTests, loadInitialLedgerNoStub) { - testing::StrictMock wrongGrpcSource{"wrong", "wrong", mockBackend_}; + GrpcSource wrongGrpcSource{"wrong", "wrong", mockBackend_}; auto const [data, success] = wrongGrpcSource.loadInitialLedger(0, 0, false); EXPECT_TRUE(data.empty()); EXPECT_FALSE(success); diff --git a/tests/unit/etlng/GrpcSourceTests.cpp b/tests/unit/etlng/GrpcSourceTests.cpp new file mode 100644 index 000000000..1b573a55d --- /dev/null +++ b/tests/unit/etlng/GrpcSourceTests.cpp @@ -0,0 +1,290 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "data/DBHelpers.hpp" +#include "etl/ETLHelpers.hpp" +#include "etl/impl/GrpcSource.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Models.hpp" +#include "etlng/impl/GrpcSource.hpp" +#include "util/Assert.hpp" +#include "util/LoggerFixtures.hpp" +#include "util/MockXrpLedgerAPIService.hpp" +#include "util/TestObject.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace etlng::model; + +namespace { + +struct MockLoadObserver : etlng::InitialLoadObserverInterface { + MOCK_METHOD( + void, + onInitialLoadGotMoreObjects, + (uint32_t, std::vector const&, std::optional), + (override) + ); +}; + +struct GrpcSourceNgTests : NoLoggerFixture, tests::util::WithMockXrpLedgerAPIService { + GrpcSourceNgTests() + : WithMockXrpLedgerAPIService("localhost:0"), grpcSource_("localhost", std::to_string(getXRPLMockPort())) + { + } + + class KeyStore { + std::vector keys_; + std::map, std::greater<>> store_; + + std::mutex mtx_; + + public: + KeyStore(std::size_t totalKeys, std::size_t numMarkers) : keys_(etl::getMarkers(totalKeys)) + { + auto const totalPerMarker = totalKeys / numMarkers; + auto const markers = etl::getMarkers(numMarkers); + + for (auto mi = 0uz; mi < markers.size(); ++mi) { + for (auto i = 0uz; i < totalPerMarker; ++i) { + auto const mapKey = ripple::strHex(markers.at(mi)).substr(0, 2); + store_[mapKey].push(keys_.at(mi * totalPerMarker + i)); + } + } + } + + std::optional + next(std::string const& marker) + { + std::scoped_lock lock(mtx_); + + auto const mapKey = ripple::strHex(marker).substr(0, 2); + auto it = store_.lower_bound(mapKey); + ASSERT(it != store_.end(), "Lower bound not found for '{}'", mapKey); + + auto& queue = it->second; + if (queue.empty()) + return std::nullopt; + + auto data = queue.front(); + queue.pop(); + + return std::make_optional(uint256ToString(data)); + }; + + std::optional + peek(std::string const& marker) + { + std::scoped_lock lock(mtx_); + + auto const mapKey = ripple::strHex(marker).substr(0, 2); + auto it = store_.lower_bound(mapKey); + ASSERT(it != store_.end(), "Lower bound not found for '{}'", mapKey); + + auto& queue = it->second; + if (queue.empty()) + return std::nullopt; + + auto data = queue.front(); + return std::make_optional(uint256ToString(data)); + }; + }; + + testing::StrictMock observer_; + etlng::impl::GrpcSource grpcSource_; +}; + +struct GrpcSourceNgLoadInitialLedgerTests : GrpcSourceNgTests { + uint32_t const sequence_ = 123u; + uint32_t const numMarkers_ = 4u; + bool const cacheOnly_ = false; +}; + +} // namespace + +TEST_F(GrpcSourceNgTests, BasicFetchLedger) +{ + uint32_t const sequence = 123u; + bool const getObjects = true; + bool const getObjectNeighbors = false; + + EXPECT_CALL(mockXrpLedgerAPIService, GetLedger) + .WillOnce([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerRequest const* request, + org::xrpl::rpc::v1::GetLedgerResponse* response) { + EXPECT_EQ(request->ledger().sequence(), sequence); + EXPECT_TRUE(request->transactions()); + EXPECT_TRUE(request->expand()); + EXPECT_EQ(request->get_objects(), getObjects); + EXPECT_EQ(request->get_object_neighbors(), getObjectNeighbors); + EXPECT_EQ(request->user(), "ETL"); + + response->set_validated(true); + response->set_is_unlimited(false); + response->set_object_neighbors_included(false); + + return grpc::Status{}; + }); + + auto const [status, response] = grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors); + ASSERT_TRUE(status.ok()); + EXPECT_TRUE(response.validated()); + EXPECT_FALSE(response.is_unlimited()); + EXPECT_FALSE(response.object_neighbors_included()); +} + +TEST_F(GrpcSourceNgLoadInitialLedgerTests, GetLedgerDataNotFound) +{ + EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData) + .Times(numMarkers_) + .WillRepeatedly([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerDataRequest const* request, + org::xrpl::rpc::v1::GetLedgerDataResponse* /*response*/) { + EXPECT_EQ(request->ledger().sequence(), sequence_); + EXPECT_EQ(request->user(), "ETL"); + + return grpc::Status{grpc::StatusCode::NOT_FOUND, "Not found"}; + }); + + auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_); + EXPECT_TRUE(data.empty()); + EXPECT_FALSE(success); +} + +TEST_F(GrpcSourceNgLoadInitialLedgerTests, ObserverCalledCorrectly) +{ + auto const key = ripple::uint256{4}; + auto const keyStr = uint256ToString(key); + auto const object = CreateTicketLedgerObject("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", sequence_); + auto const objectData = object.getSerializer().peekData(); + + EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData) + .Times(numMarkers_) + .WillRepeatedly([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerDataRequest const* request, + org::xrpl::rpc::v1::GetLedgerDataResponse* response) { + EXPECT_EQ(request->ledger().sequence(), sequence_); + EXPECT_EQ(request->user(), "ETL"); + + response->set_is_unlimited(true); + auto newObject = response->mutable_ledger_objects()->add_objects(); + newObject->set_key(uint256ToString(key)); + newObject->set_data(objectData.data(), objectData.size()); + + return grpc::Status{}; + }); + + EXPECT_CALL(observer_, onInitialLoadGotMoreObjects) + .Times(numMarkers_) + .WillRepeatedly([&](uint32_t, std::vector const& data, std::optional lastKey) { + EXPECT_FALSE(lastKey.has_value()); + EXPECT_EQ(data.size(), 1); + }); + + auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_); + + EXPECT_TRUE(success); + EXPECT_EQ(data.size(), numMarkers_); + + EXPECT_EQ(data, std::vector(4, keyStr)); +} + +TEST_F(GrpcSourceNgLoadInitialLedgerTests, DataTransferredAndObserverCalledCorrectly) +{ + auto const totalKeys = 256uz; + auto const totalPerMarker = totalKeys / numMarkers_; + auto const batchSize = totalPerMarker / 4uz; + auto const batchesPerMarker = totalPerMarker / batchSize; + + auto keyStore = KeyStore(totalKeys, numMarkers_); + + auto const object = CreateTicketLedgerObject("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", sequence_); + auto const objectData = object.getSerializer().peekData(); + + EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData) + .Times(numMarkers_ * batchesPerMarker) + .WillRepeatedly([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerDataRequest const* request, + org::xrpl::rpc::v1::GetLedgerDataResponse* response) { + EXPECT_EQ(request->ledger().sequence(), sequence_); + EXPECT_EQ(request->user(), "ETL"); + + response->set_is_unlimited(true); + + auto next = request->marker().empty() ? std::string("00") : request->marker(); + for (auto i = 0uz; i < batchSize; ++i) { + if (auto maybeLastKey = keyStore.next(next); maybeLastKey.has_value()) { + next = *maybeLastKey; + + auto newObject = response->mutable_ledger_objects()->add_objects(); + newObject->set_key(next); + newObject->set_data(objectData.data(), objectData.size()); + } + } + + if (auto maybeNext = keyStore.peek(next); maybeNext.has_value()) + response->set_marker(*maybeNext); + + return grpc::Status::OK; + }); + + std::atomic_uint total = 0u; + [[maybe_unused]] testing::InSequence seqGuard; + + EXPECT_CALL(observer_, onInitialLoadGotMoreObjects) + .Times(numMarkers_) + .WillRepeatedly([&](uint32_t, std::vector const& data, std::optional lastKey) { + EXPECT_LE(data.size(), batchSize); + EXPECT_FALSE(lastKey.has_value()); + total += data.size(); + }); + + EXPECT_CALL(observer_, onInitialLoadGotMoreObjects) + .Times((numMarkers_ - 1) * batchesPerMarker) + .WillRepeatedly([&](uint32_t, std::vector const& data, std::optional lastKey) { + EXPECT_LE(data.size(), batchSize); + EXPECT_TRUE(lastKey.has_value()); + total += data.size(); + }); + + auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_); + + EXPECT_TRUE(success); + EXPECT_EQ(data.size(), numMarkers_); + EXPECT_EQ(total, totalKeys); +} diff --git a/tests/unit/web/dosguard/WhitelistHandlerTests.cpp b/tests/unit/web/dosguard/WhitelistHandlerTests.cpp index b16a95a94..5ea627512 100644 --- a/tests/unit/web/dosguard/WhitelistHandlerTests.cpp +++ b/tests/unit/web/dosguard/WhitelistHandlerTests.cpp @@ -37,6 +37,7 @@ TEST_F(WhitelistHandlerTest, TestWhiteListIPV4) { struct MockResolver { MOCK_METHOD(std::vector, resolve, (std::string_view, std::string_view)); + MOCK_METHOD(std::vector, resolve, (std::string_view)); }; testing::StrictMock mockResolver; @@ -53,9 +54,9 @@ TEST_F(WhitelistHandlerTest, TestWhiteListIPV4) } )JSON"; - EXPECT_CALL(mockResolver, resolve(testing::_, "")) + EXPECT_CALL(mockResolver, resolve(testing::_)) .Times(3) - .WillRepeatedly([](auto hostname, auto) -> std::vector { return {std::string{hostname}}; }); + .WillRepeatedly([](auto hostname) -> std::vector { return {std::string{hostname}}; }); Config const cfg{boost::json::parse(JSONDataIPV4)}; WhitelistHandler const whitelistHandler{cfg, mockResolver};