From f6d3ade70f41208fe7d84b4dca7cb8bec707fb0a Mon Sep 17 00:00:00 2001 From: Rob Walworth <110835868+rwalworth@users.noreply.github.com> Date: Tue, 3 Oct 2023 15:19:25 -0500 Subject: [PATCH] Some examples aren't working properly (#511) Signed-off-by: Rob Walworth --- sdk/examples/AccountCreateWithHtsExample.cc | 8 +- sdk/examples/ExemptCustomFeesExample.cc | 7 +- sdk/examples/TransferCryptoExample.cc | 1 + sdk/main/include/Client.h | 6 +- sdk/main/include/TokenMintTransaction.h | 2 +- sdk/main/include/impl/BaseNetwork.h | 19 ++++ sdk/main/include/impl/BaseNode.h | 54 +++++++-- sdk/main/include/impl/MirrorNode.h | 6 +- sdk/main/include/impl/Network.h | 29 ++++- sdk/main/include/impl/Node.h | 24 ++-- sdk/main/src/Client.cc | 52 +++++---- sdk/main/src/Executable.cc | 23 +++- sdk/main/src/Query.cc | 9 +- sdk/main/src/Transaction.cc | 1 + sdk/main/src/impl/BaseNetwork.cc | 43 ++++++- sdk/main/src/impl/BaseNode.cc | 27 ++++- sdk/main/src/impl/BaseNodeAddress.cc | 1 + sdk/main/src/impl/MirrorNetwork.cc | 4 + sdk/main/src/impl/MirrorNode.cc | 6 +- sdk/main/src/impl/Network.cc | 118 ++++++++++++-------- sdk/main/src/impl/Node.cc | 35 ++++-- 21 files changed, 340 insertions(+), 135 deletions(-) diff --git a/sdk/examples/AccountCreateWithHtsExample.cc b/sdk/examples/AccountCreateWithHtsExample.cc index d06825c0a..cbf66a09b 100644 --- a/sdk/examples/AccountCreateWithHtsExample.cc +++ b/sdk/examples/AccountCreateWithHtsExample.cc @@ -87,8 +87,12 @@ int main(int argc, char** argv) /** * Step 2: Mint the NFTs. */ - TransactionReceipt txReceipt = - TokenMintTransaction().setTokenId(tokenId).setMetadata(CIDs).execute(client).getReceipt(client); + TransactionReceipt txReceipt = TokenMintTransaction() + .setMaxTransactionFee(Hbar(10LL)) + .setTokenId(tokenId) + .setMetadata(CIDs) + .execute(client) + .getReceipt(client); std::cout << "Minted " << txReceipt.mSerialNumbers.size() << " NFTs" << std::endl; /** diff --git a/sdk/examples/ExemptCustomFeesExample.cc b/sdk/examples/ExemptCustomFeesExample.cc index e285484ca..47a42241e 100644 --- a/sdk/examples/ExemptCustomFeesExample.cc +++ b/sdk/examples/ExemptCustomFeesExample.cc @@ -98,12 +98,13 @@ int main(int argc, char** argv) /** * Step 2: Create a fungible token that has three fractional fees. - * - Fee #1 sends 1/100 of the transferred value to collector 0.0.A. - * - Fee #2 sends 2/100 of the transferred value to collector 0.0.B. - * - Fee #3 sends 3/100 of the transferred value to collector 0.0.C. + * - Fee #1 sends 1/10th of the transferred value to collector 0.0.A. + * - Fee #2 sends 2/10th of the transferred value to collector 0.0.B. + * - Fee #3 sends 3/10th of the transferred value to collector 0.0.C. */ const TokenId createdTokenId = TokenCreateTransaction() + .setMaxTransactionFee(Hbar(50LL)) .setTokenName("HIP-573 Token") .setTokenSymbol("H573") .setInitialSupply(100000000ULL) diff --git a/sdk/examples/TransferCryptoExample.cc b/sdk/examples/TransferCryptoExample.cc index 196ad3125..ec95012fc 100644 --- a/sdk/examples/TransferCryptoExample.cc +++ b/sdk/examples/TransferCryptoExample.cc @@ -24,6 +24,7 @@ #include "ED25519PrivateKey.h" #include "Hbar.h" #include "TransactionRecord.h" +#include "TransactionRecordQuery.h" #include "TransactionResponse.h" #include "TransferTransaction.h" diff --git a/sdk/main/include/Client.h b/sdk/main/include/Client.h index f16672af2..bb283a55d 100644 --- a/sdk/main/include/Client.h +++ b/sdk/main/include/Client.h @@ -315,11 +315,9 @@ class Client void startNetworkUpdateThread(const std::chrono::duration& period); /** - * Schedule a network update a certain period of time from when this is called. - * - * @param period The period of time to wait before a network update is performed. + * Schedule a network update. */ - void scheduleNetworkUpdate(const std::chrono::duration& period); + void scheduleNetworkUpdate(); /** * Cancel any scheduled network updates. diff --git a/sdk/main/include/TokenMintTransaction.h b/sdk/main/include/TokenMintTransaction.h index 0d780836f..353720201 100644 --- a/sdk/main/include/TokenMintTransaction.h +++ b/sdk/main/include/TokenMintTransaction.h @@ -182,7 +182,7 @@ class TokenMintTransaction : public Transaction * The amount of the token to mint. This is for tokens of type FUNGIBLE_COMMON. The amount provided must be in the * lowest denomination possible (i.e. if a token has 2 decimals, a value of 10,000 here will mint 100 tokens). */ - uint64_t mAmount; + uint64_t mAmount = 0ULL; /** * The metadata of the NFTs to mint. This for tokens of type NON_FUNGIBLE_UNIQUE. Once an NFT is minted, its metadata diff --git a/sdk/main/include/impl/BaseNetwork.h b/sdk/main/include/impl/BaseNetwork.h index da92fb041..5c8f3c29c 100644 --- a/sdk/main/include/impl/BaseNetwork.h +++ b/sdk/main/include/impl/BaseNetwork.h @@ -20,6 +20,7 @@ #ifndef HEDERA_SDK_CPP_IMPL_BASE_NETWORK_H_ #define HEDERA_SDK_CPP_IMPL_BASE_NETWORK_H_ +#include "AccountId.h" #include "BaseNode.h" #include "Defaults.h" #include "LedgerId.h" @@ -27,6 +28,7 @@ #include #include +#include #include #include #include @@ -222,6 +224,10 @@ class BaseNetwork { return mNetwork; } + [[nodiscard]] inline std::unordered_map>> getNetworkInternal() + { + return mNetwork; + } /** * Get the list of NodeTypes on this BaseNetwork. @@ -229,6 +235,14 @@ class BaseNetwork * @return The list of NodeTypes on this BaseNetwork. */ [[nodiscard]] inline const std::unordered_set>& getNodes() const { return mNodes; } + [[nodiscard]] inline std::unordered_set>& getNodes() { return mNodes; } + + /** + * Get this BaseNetwork's mutex. + * + * @return This BaseNetwork's mutex. + */ + [[nodiscard]] inline std::shared_ptr getLock() const { return mMutex; } private: /** @@ -317,6 +331,11 @@ class BaseNetwork * The ledger ID of the network. */ LedgerId mLedgerId; + + /** + * The mutex for this BaseNetwork, kept inside a std::shared_ptr to keep BaseNetwork copyable/movable. + */ + std::shared_ptr mMutex = std::make_shared(); }; } // namespace Hedera::internal diff --git a/sdk/main/include/impl/BaseNode.h b/sdk/main/include/impl/BaseNode.h index 2aa86f1f3..f647ab787 100644 --- a/sdk/main/include/impl/BaseNode.h +++ b/sdk/main/include/impl/BaseNode.h @@ -28,6 +28,7 @@ #include #include #include +#include namespace Hedera::internal { @@ -99,35 +100,62 @@ class BaseNode * * @return address The BaseNodeAddress of this BaseNode. */ - [[nodiscard]] inline BaseNodeAddress getAddress() const { return mAddress; } + [[nodiscard]] inline BaseNodeAddress getAddress() const + { + std::unique_lock lock(*mMutex); + return mAddress; + } /** * Get the minimum amount of time for this BaseNode to backoff after a bad gRPC status is received. * * @return The minimum amount of time for this BaseNode to backoff after a bad gRPC status is received. */ - [[nodiscard]] inline std::chrono::duration getMinNodeBackoff() const { return mMinNodeBackoff; } + [[nodiscard]] inline std::chrono::duration getMinNodeBackoff() const + { + std::unique_lock lock(*mMutex); + return mMinNodeBackoff; + } /** * Get the maximum amount of time for this BaseNode to backoff after a bad gRPC status is received. * * @return The maximum amount of time for this BaseNode to backoff after a bad gRPC status is received. */ - [[nodiscard]] inline std::chrono::duration getMaxNodeBackoff() const { return mMaxNodeBackoff; } + [[nodiscard]] inline std::chrono::duration getMaxNodeBackoff() const + { + std::unique_lock lock(*mMutex); + return mMaxNodeBackoff; + } /** * Get the number of times this BaseNode has received a bad gRPC status when attempting to submit a request. * * @return The number of times this BaseNode has received a bad gRPC status. */ - [[nodiscard]] inline unsigned int getBadGrpcStatusCount() const { return mBadGrpcStatusCount; } + [[nodiscard]] inline unsigned int getBadGrpcStatusCount() const + { + std::unique_lock lock(*mMutex); + return mBadGrpcStatusCount; + } /** * Get the time at which this BaseNode will be considered "healthy". * * @return The time at which this BaseNode will be considered "healthy". */ - [[nodiscard]] inline std::chrono::system_clock::time_point getReadmitTime() const { return mReadmitTime; } + [[nodiscard]] inline std::chrono::system_clock::time_point getReadmitTime() const + { + std::unique_lock lock(*mMutex); + return mReadmitTime; + } + + /** + * Get this BaseNode's mutex. + * + * @return This BaseNode's mutex. + */ + [[nodiscard]] inline std::shared_ptr getLock() const { return mMutex; } protected: ~BaseNode() = default; @@ -172,11 +200,9 @@ class BaseNode [[nodiscard]] virtual std::shared_ptr getTlsChannelCredentials() const; /** - * Initialize the stubs in this derived BaseNode with a gRPC channel. - * - * @param channel The gRPC channel with which to initialize the stubs. + * Initialize the stubs in this derived BaseNode with this BaseNode's gRPC channel. */ - virtual void initializeStubs([[maybe_unused]] const std::shared_ptr& channel) + virtual void initializeStubs() { // Intentionally unimplemented, derived BaseNodes that don't use stubs require no functionality. } @@ -194,6 +220,11 @@ class BaseNode */ [[nodiscard]] virtual inline std::string getAuthority() const { return "127.0.0.1"; } + /** + * Close this BaseNode's channel and any stubs using that channel. + */ + void closeChannel(); + /** * The address of this BaseNode. */ @@ -234,6 +265,11 @@ class BaseNode * Is the gRPC channel being utilized by this BaseNode to communicate with its remote node initialized? */ bool mIsConnected = false; + + /** + * The mutex for this BaseNode, kept inside a std::shared_ptr to keep BaseNetwork copyable/movable. + */ + std::shared_ptr mMutex = std::make_shared(); }; } // namespace Hedera::internal diff --git a/sdk/main/include/impl/MirrorNode.h b/sdk/main/include/impl/MirrorNode.h index a6c3f0eff..90fc9940a 100644 --- a/sdk/main/include/impl/MirrorNode.h +++ b/sdk/main/include/impl/MirrorNode.h @@ -91,11 +91,9 @@ class MirrorNode : public BaseNode [[nodiscard]] inline std::string getAuthority() const override { return {}; } /** - * Derived from BaseNode. Initialize the stubs in this MirrorNode with a gRPC channel. - * - * @param channel The gRPC channel with which to initialize the stubs. + * Derived from BaseNode. Initialize the stubs in this MirrorNode with this MirrorNode's gRPC channel. */ - void initializeStubs(const std::shared_ptr& channel) override; + void initializeStubs() override; /** * Derived from BaseNode. Close the stubs in this MirrorNode. diff --git a/sdk/main/include/impl/Network.h b/sdk/main/include/impl/Network.h index a3d469713..bb64c0db3 100644 --- a/sdk/main/include/impl/Network.h +++ b/sdk/main/include/impl/Network.h @@ -21,9 +21,6 @@ #define HEDERA_SDK_CPP_IMPL_NETWORK_H_ #include "BaseNetwork.h" -#include "Node.h" -#include "NodeAddressBook.h" -#include "TLSBehavior.h" #include #include @@ -32,7 +29,16 @@ namespace Hedera { +namespace internal +{ +class Node; +enum class TLSBehavior; +} + class AccountId; +class LedgerId; +class NodeAddress; +class NodeAddressBook; } namespace Hedera::internal @@ -69,6 +75,17 @@ class Network : public BaseNetwork */ [[nodiscard]] static Network forNetwork(const std::unordered_map& network); + /** + * Construct a network map from a NodeAddressBook with a specific port for the endpoints. + * + * @param addressBook The NodeAddressBook from which to construct the network map. + * @param port The desired port. + * @return A network map that contains the nodes in the input NodeAddressBook. + */ + [[nodiscard]] static std::unordered_map getNetworkFromAddressBook( + const NodeAddressBook& addressBook, + unsigned int port); + /** * Derived from BaseNetwork. Set the ledger ID of this Network. * @@ -141,7 +158,7 @@ class Network : public BaseNetwork * @return The map of node addresses and AccountIds of the Nodes that exist on the network represented by the input * LedgerId. */ - [[nodiscard]] static std::unordered_map getAddressBookForLedgerId(const LedgerId& ledgerId); + [[nodiscard]] static NodeAddressBook getAddressBookForLedgerId(const LedgerId& ledgerId); /** * Derived from BaseNetwork. Create a Node for this Network based on a network entry. @@ -158,9 +175,9 @@ class Network : public BaseNetwork * contained in the input map. * * @param ledgerId The new LedgerId of the Network. - * @param addressBook The address book with which to update this Network's Node's address book entry. + * @param addressBook The NodeAddressBook with which to update this Network's Node's address book entry. */ - Network& setLedgerIdInternal(const LedgerId& ledgerId, const std::unordered_map& addressBook); + Network& setLedgerIdInternal(const LedgerId& ledgerId, const NodeAddressBook& addressBook); /** * The maximum number of nodes to be returned for each request. diff --git a/sdk/main/include/impl/Node.h b/sdk/main/include/impl/Node.h index 0880664a2..f08f93e8b 100644 --- a/sdk/main/include/impl/Node.h +++ b/sdk/main/include/impl/Node.h @@ -136,21 +136,33 @@ class Node : public BaseNode * * @return The AccountId of this Node. */ - [[nodiscard]] inline AccountId getAccountId() const { return mAccountId; }; + [[nodiscard]] inline AccountId getAccountId() const + { + std::unique_lock lock(*getLock()); + return mAccountId; + }; /** * Get the node certificate hash of this Node. * * @return The node certificate hash of this Node. */ - [[nodiscard]] inline std::vector getNodeCertificateHash() const { return mNodeCertificateHash; } + [[nodiscard]] inline std::vector getNodeCertificateHash() const + { + std::unique_lock lock(*getLock()); + return mNodeCertificateHash; + } /** * Get the certificate verification policy of this Node. * * @return \c TRUE if this Node is currently configured to verify certificates, otherwise \c FALSE. */ - [[nodiscard]] inline bool getVerifyCertificates() const { return mVerifyCertificates; } + [[nodiscard]] inline bool getVerifyCertificates() const + { + std::unique_lock lock(*getLock()); + return mVerifyCertificates; + } private: /** @@ -169,11 +181,9 @@ class Node : public BaseNode [[nodiscard]] std::shared_ptr getTlsChannelCredentials() const override; /** - * Derived from BaseNode. Initialize the stubs in this Node with a gRPC channel. - * - * @param channel The gRPC channel with which to initialize the stubs. + * Derived from BaseNode. Initialize the stubs in this Node with this Node's gRPC channel. */ - void initializeStubs(const std::shared_ptr& channel) override; + void initializeStubs() override; /** * Derived from BaseNode. Close the stubs in this Node. diff --git a/sdk/main/src/Client.cc b/sdk/main/src/Client.cc index 6371ebf27..7be5995fa 100644 --- a/sdk/main/src/Client.cc +++ b/sdk/main/src/Client.cc @@ -22,10 +22,13 @@ #include "AddressBookQuery.h" #include "Defaults.h" #include "Hbar.h" +#include "NodeAddressBook.h" #include "PrivateKey.h" #include "PublicKey.h" +#include "impl/BaseNodeAddress.h" #include "impl/MirrorNetwork.h" #include "impl/Network.h" +#include "impl/TLSBehavior.h" #include "impl/ValuePtr.h" #include @@ -378,37 +381,42 @@ void Client::startNetworkUpdateThread(const std::chrono::duration& perio { std::unique_lock lock(mImpl->mMutex); mImpl->mStartNetworkUpdateWaitTime = std::chrono::system_clock::now(); - mImpl->mNetworkUpdateThread = std::make_unique(&Client::scheduleNetworkUpdate, this, period); + mImpl->mNetworkUpdatePeriod = period; + mImpl->mNetworkUpdateThread = std::make_unique(&Client::scheduleNetworkUpdate, this); } //----- -void Client::scheduleNetworkUpdate(const std::chrono::duration& period) +void Client::scheduleNetworkUpdate() { - // Wait for the period of time to pass and update the network. If the network update is being cancelled, do nothing. - if (std::unique_lock lock(mImpl->mMutex); - !mImpl->mConditionVariable.wait_for(lock, period, [this]() { return mImpl->mCancelUpdate; })) + // Network updates should keep occurring until they're cancelled. + while (true) { - // Get the address book. - const NodeAddressBook nodeAddressBook = AddressBookQuery().setFileId(FileId::ADDRESS_BOOK).execute(*this); - - // Set the network based on the address book. - std::unordered_map addressBookMap; - for (const auto& nodeAddress : nodeAddressBook.getNodeAddresses()) + if (std::unique_lock lock(mImpl->mMutex); !mImpl->mConditionVariable.wait_for( + lock, mImpl->mNetworkUpdatePeriod, [this]() { return mImpl->mCancelUpdate; })) { - addressBookMap[nodeAddress.toString()] = nodeAddress.getAccountId(); + // Get the address book and set the network based on the address book. + mImpl->mNetwork->setNetwork(internal::Network::getNetworkFromAddressBook( + AddressBookQuery().setFileId(FileId::ADDRESS_BOOK).execute(*this), + mImpl->mNetwork->isTransportSecurity() == internal::TLSBehavior::REQUIRE + ? internal::BaseNodeAddress::PORT_NODE_TLS + : internal::BaseNodeAddress::PORT_NODE_PLAIN)); + + // Adjust the network update period if this is the initial update. + if (!mImpl->mMadeInitialNetworkUpdate) + { + mImpl->mNetworkUpdatePeriod = DEFAULT_NETWORK_UPDATE_PERIOD; + mImpl->mMadeInitialNetworkUpdate = true; + } + + // Schedule the next network update. + mImpl->mStartNetworkUpdateWaitTime = std::chrono::system_clock::now(); } - mImpl->mNetwork->setNetwork(addressBookMap); - // Adjust the network update period if this is the initial update. - if (!mImpl->mMadeInitialNetworkUpdate) + // The network update was cancelled, stop looping. + else { - mImpl->mNetworkUpdatePeriod = DEFAULT_NETWORK_UPDATE_PERIOD; - mImpl->mMadeInitialNetworkUpdate = true; + break; } - - // Schedule the next network update. - mImpl->mStartNetworkUpdateWaitTime = std::chrono::system_clock::now(); - return scheduleNetworkUpdate(mImpl->mNetworkUpdatePeriod); } } @@ -423,7 +431,7 @@ void Client::cancelScheduledNetworkUpdate() // Signal the thread to stop and wait for it to stop. mImpl->mCancelUpdate = true; - mImpl->mConditionVariable.notify_one(); + mImpl->mConditionVariable.notify_all(); if (mImpl->mNetworkUpdateThread->joinable()) { mImpl->mNetworkUpdateThread->join(); diff --git a/sdk/main/src/Executable.cc b/sdk/main/src/Executable.cc index a55676e8d..8d136ea3f 100644 --- a/sdk/main/src/Executable.cc +++ b/sdk/main/src/Executable.cc @@ -132,12 +132,15 @@ SdkResponseType Executable(timeout); + // Keep track of the responses from each node. + std::unordered_map, Status> nodeResponses; + for (unsigned int attempt = 0U;; ++attempt) { if (attempt >= mCurrentMaxAttempts) { - throw MaxAttemptsExceededException("Max number of attempts made (max attempts allowed: " + - std::to_string(mCurrentMaxAttempts)); + throw MaxAttemptsExceededException( + "Max number of attempts made (max attempts allowed: " + std::to_string(mCurrentMaxAttempts) + ')'); } const unsigned int nodeIndex = getNodeIndexForExecute(nodes, attempt); @@ -179,12 +182,26 @@ SdkResponseType ExecutabledecreaseBackoff(); + // Grab and save the response status, and determine what to do next. const Status responseStatus = mapResponseStatus(response); + nodeResponses[node] = responseStatus; switch (determineStatus(responseStatus, client, response)) { case ExecutionStatus::SERVER_ERROR: { - continue; + // If all nodes have returned a BUSY signal, backoff (just fallthrough to ExecutionStatus::RETRY case). + // Otherwise, try the next node. + if (nodeResponses.size() != nodes.size() || + !std::all_of(nodeResponses.cbegin(), + nodeResponses.cend(), + [](const auto& nodeAndStatus) { return nodeAndStatus.second == Status::BUSY; })) + { + continue; + } + + // If all nodes have returned BUSY, clear the responses. + nodeResponses.clear(); + [[fallthrough]]; } // Response isn't ready yet from the network case ExecutionStatus::RETRY: diff --git a/sdk/main/src/Query.cc b/sdk/main/src/Query.cc index 93fef51ef..08ea9ac2c 100644 --- a/sdk/main/src/Query.cc +++ b/sdk/main/src/Query.cc @@ -60,6 +60,7 @@ #include #include #include +#include namespace Hedera { @@ -110,8 +111,11 @@ Hbar Query::getCost(const Client& client) template Hbar Query::getCost(const Client& client, const std::chrono::duration& timeout) { + // Configure this Query to get the cost. mImpl->mGetCost = true; Executable::execute(client, timeout); + + // Reset this Query to not get the cost. mImpl->mGetCost = false; return mImpl->mCost; @@ -241,7 +245,7 @@ void Query::onExecute(const Client& client) throw UninitializedException("Client has not been initialized with a valid network"); } - // Have the Client's network generate the node account IDs to which to send this Transaction. + // Have the Client's network generate the node account IDs to which to send this Query. Executable::setNodeAccountIds( client.getNetwork()->getNodeAccountIdsForExecute()); } @@ -256,6 +260,9 @@ void Query::onExecute(const Client& client) mImpl->mClient = &client; // Get the cost and make sure it's willing to be paid. + std::this_thread::sleep_for( + std::chrono::seconds(2)); // It's not really clear why this needs to be here, but if it isn't the incorrect + // transaction fee is fetched. This should be investigated at a later point. mImpl->mCost = getCost(client); if (mImpl->mCost.toTinybars() > ((mImpl->mMaxPayment.has_value()) ? mImpl->mMaxPayment->toTinybars() : ((client.getMaxQueryPayment().has_value()) diff --git a/sdk/main/src/Transaction.cc b/sdk/main/src/Transaction.cc index 9fbb368fc..899cd63b2 100644 --- a/sdk/main/src/Transaction.cc +++ b/sdk/main/src/Transaction.cc @@ -78,6 +78,7 @@ #include #include #include +#include #include namespace Hedera diff --git a/sdk/main/src/impl/BaseNetwork.cc b/sdk/main/src/impl/BaseNetwork.cc index cc5ac2b45..417d40a9b 100644 --- a/sdk/main/src/impl/BaseNetwork.cc +++ b/sdk/main/src/impl/BaseNetwork.cc @@ -26,6 +26,7 @@ #include "impl/Node.h" #include "impl/Utilities.h" +#include #include namespace Hedera::internal @@ -35,6 +36,8 @@ template NetworkType& BaseNetwork::setNetwork( const std::unordered_map& network) { + std::unique_lock lock(*mMutex); + // New containers to hold new network. std::unordered_map>> newNetwork; std::unordered_set> newNodes; @@ -43,21 +46,40 @@ NetworkType& BaseNetwork::setNetwork( // the newNodes list, or create a new NodeType for that entry. for (const auto& [address, key] : network) { + // Addresses can be added with the same IP addresses, but different ports. Since the different ports just represent + // a TLS connection or not, they shouldn't be treated as different NodeTypes. Grab just the IP address and use that + // to compare to the current NodeTypes. + const std::string ipAddress = BaseNodeAddress::fromString(address).getAddress(); + // Determine if this entry already has a NodeType. bool entryAlreadyExists = false; for (const auto& node : mNodes) { - if (node->getAddress().toString() == address && node->getKey() == key) + if (node->getAddress().getAddress() == ipAddress && node->getKey() == key) { // Move this node to the newNodes set and go to the next entry. - mNodes.erase(node); - newNodes.insert(node); - newNetwork[key].insert(node); + const std::shared_ptr extractedNode = mNodes.extract(node).value(); + newNodes.insert(extractedNode); + newNetwork[key].insert(extractedNode); entryAlreadyExists = true; break; } } + // If the entry wasn't found, verify that the address doesn't match a node that has already been added to newNodes. + // Addresses can be repeated with different ports, so this makes sure duplicates don't get added. + if (!entryAlreadyExists) + { + for (const std::shared_ptr& node : newNodes) + { + if (node->getAddress().getAddress() == ipAddress && node->getKey() == key) + { + entryAlreadyExists = true; + break; + } + } + } + // If the entry was found, go to the next entry. if (entryAlreadyExists) { @@ -82,6 +104,8 @@ NetworkType& BaseNetwork::setNetwork( mNetwork = newNetwork; mHealthyNodes.clear(); + // Try to readmit all nodes. + mEarliestReadmitTime = std::chrono::system_clock::now(); readmitNodes(); return static_cast(*this); @@ -91,6 +115,7 @@ NetworkType& BaseNetwork::setNetwork( template void BaseNetwork::increaseBackoff(const std::shared_ptr& node) { + std::unique_lock lock(*mMutex); node->increaseBackoff(); mHealthyNodes.erase(node); } @@ -99,6 +124,7 @@ void BaseNetwork::increaseBackoff(const std::sha template void BaseNetwork::decreaseBackoff(const std::shared_ptr& node) const { + std::unique_lock lock(*mMutex); node->decreaseBackoff(); } @@ -106,6 +132,7 @@ void BaseNetwork::decreaseBackoff(const std::sha template std::vector> BaseNetwork::getNodeProxies(const KeyType& key) { + std::unique_lock lock(*mMutex); readmitNodes(); return { mNetwork[key].cbegin(), mNetwork[key].cend() }; } @@ -114,6 +141,7 @@ std::vector> BaseNetwork void BaseNetwork::close() const { + std::unique_lock lock(*mMutex); for (const std::shared_ptr& node : mNodes) { node->close(); @@ -124,6 +152,7 @@ void BaseNetwork::close() const template NetworkType& BaseNetwork::setMaxNodeAttempts(unsigned int attempts) { + std::unique_lock lock(*mMutex); mMaxNodeAttempts = attempts; return static_cast(*this); } @@ -133,6 +162,7 @@ template NetworkType& BaseNetwork::setMinNodeBackoff( const std::chrono::duration& backoff) { + std::unique_lock lock(*mMutex); mMinNodeBackoff = backoff; return static_cast(*this); } @@ -142,6 +172,7 @@ template NetworkType& BaseNetwork::setMaxNodeBackoff( const std::chrono::duration& backoff) { + std::unique_lock lock(*mMutex); mMaxNodeBackoff = backoff; return static_cast(*this); } @@ -151,6 +182,7 @@ template NetworkType& BaseNetwork::setMinNodeReadmitTime( const std::chrono::duration& time) { + std::unique_lock lock(*mMutex); mMinNodeReadmitTime = time; return static_cast(*this); } @@ -160,6 +192,7 @@ template NetworkType& BaseNetwork::setMaxNodeReadmitTime( const std::chrono::duration& time) { + std::unique_lock lock(*mMutex); mMaxNodeReadmitTime = time; return static_cast(*this); } @@ -168,6 +201,7 @@ NetworkType& BaseNetwork::setMaxNodeReadmitTime( template NetworkType& BaseNetwork::setCloseTimeout(const std::chrono::duration& timeout) { + std::unique_lock lock(*mMutex); mCloseTimeout = timeout; return static_cast(*this); } @@ -176,6 +210,7 @@ NetworkType& BaseNetwork::setCloseTimeout(const template NetworkType& BaseNetwork::setLedgerId(const LedgerId& ledgerId) { + std::unique_lock lock(*mMutex); mLedgerId = ledgerId; return static_cast(*this); } diff --git a/sdk/main/src/impl/BaseNode.cc b/sdk/main/src/impl/BaseNode.cc index c13f06a56..16556113d 100644 --- a/sdk/main/src/impl/BaseNode.cc +++ b/sdk/main/src/impl/BaseNode.cc @@ -34,16 +34,15 @@ namespace Hedera::internal template void BaseNode::close() { - closeStubs(); - - // The connection is closed automatically upon destruction of the channel. - mChannel = nullptr; + std::unique_lock lock(*mMutex); + closeChannel(); } //----- template void BaseNode::increaseBackoff() { + std::unique_lock lock(*mMutex); ++mBadGrpcStatusCount; mReadmitTime = std::chrono::system_clock::now() + std::chrono::duration_cast(mCurrentBackoff); @@ -60,6 +59,7 @@ void BaseNode::increaseBackoff() template void BaseNode::decreaseBackoff() { + std::unique_lock lock(*mMutex); mCurrentBackoff /= 2.0; // Make sure the current backoff doesn't go below the min backoff. @@ -73,6 +73,7 @@ void BaseNode::decreaseBackoff() template bool BaseNode::isHealthy() const { + std::unique_lock lock(*mMutex); return mReadmitTime < std::chrono::system_clock::now(); } @@ -80,6 +81,7 @@ bool BaseNode::isHealthy() const template bool BaseNode::channelFailedToConnect() { + std::unique_lock lock(*mMutex); if (mIsConnected) { return false; @@ -95,6 +97,7 @@ bool BaseNode::channelFailedToConnect() template std::chrono::duration BaseNode::getRemainingTimeForBackoff() const { + std::unique_lock lock(*mMutex); return mReadmitTime - std::chrono::system_clock::now(); } @@ -102,6 +105,7 @@ std::chrono::duration BaseNode::getRemainingTimeForBa template NodeType& BaseNode::setMinNodeBackoff(const std::chrono::duration& backoff) { + std::unique_lock lock(*mMutex); if (mCurrentBackoff == mMinNodeBackoff) { mCurrentBackoff = backoff; @@ -115,6 +119,7 @@ NodeType& BaseNode::setMinNodeBackoff(const std::chrono::dura template NodeType& BaseNode::setMaxNodeBackoff(const std::chrono::duration& backoff) { + std::unique_lock lock(*mMutex); mMaxNodeBackoff = backoff; return static_cast(*this); } @@ -131,7 +136,7 @@ template NodeType& BaseNode::setAddress(const BaseNodeAddress& address) { // Close the connection since the address is changing. - close(); + closeChannel(); mAddress = address; return static_cast(*this); @@ -157,7 +162,7 @@ std::shared_ptr BaseNode::getChannel() mAddress.isTransportSecurity() ? getTlsChannelCredentials() : grpc::InsecureChannelCredentials(), channelArguments); - initializeStubs(mChannel); + initializeStubs(); } return mChannel; @@ -170,6 +175,16 @@ std::shared_ptr BaseNode::getTlsCha return grpc::experimental::TlsCredentials(grpc::experimental::TlsChannelCredentialsOptions()); } +//----- +template +void BaseNode::closeChannel() +{ + closeStubs(); + + // The connection is closed automatically upon destruction of the channel. + mChannel = nullptr; +} + /** * Explicit template instantiations. */ diff --git a/sdk/main/src/impl/BaseNodeAddress.cc b/sdk/main/src/impl/BaseNodeAddress.cc index 0a0fd9104..0761a501e 100644 --- a/sdk/main/src/impl/BaseNodeAddress.cc +++ b/sdk/main/src/impl/BaseNodeAddress.cc @@ -20,6 +20,7 @@ #include "impl/BaseNodeAddress.h" #include +#include #include namespace Hedera::internal diff --git a/sdk/main/src/impl/MirrorNetwork.cc b/sdk/main/src/impl/MirrorNetwork.cc index 56ce2d55e..d3204982a 100644 --- a/sdk/main/src/impl/MirrorNetwork.cc +++ b/sdk/main/src/impl/MirrorNetwork.cc @@ -71,6 +71,8 @@ MirrorNetwork& MirrorNetwork::setNetwork(const std::vector& network //----- std::vector MirrorNetwork::getNetwork() const { + std::unique_lock lock(*getLock()); + std::vector network; for (const auto& [address, nodes] : BaseNetwork::getNetworkInternal()) { @@ -83,6 +85,8 @@ std::vector MirrorNetwork::getNetwork() const //----- std::shared_ptr MirrorNetwork::getNextMirrorNode() const { + std::unique_lock lock(*getLock()); + if (getNodes().empty()) { return nullptr; diff --git a/sdk/main/src/impl/MirrorNode.cc b/sdk/main/src/impl/MirrorNode.cc index eca23f770..7bb0459b7 100644 --- a/sdk/main/src/impl/MirrorNode.cc +++ b/sdk/main/src/impl/MirrorNode.cc @@ -35,16 +35,16 @@ MirrorNode::MirrorNode(std::string_view address) } //----- -void MirrorNode::initializeStubs(const std::shared_ptr& channel) +void MirrorNode::initializeStubs() { if (!mConsensusStub) { - mConsensusStub = com::hedera::mirror::api::proto::ConsensusService::NewStub(channel); + mConsensusStub = com::hedera::mirror::api::proto::ConsensusService::NewStub(getChannel()); } if (!mNetworkStub) { - mNetworkStub = com::hedera::mirror::api::proto::NetworkService::NewStub(channel); + mNetworkStub = com::hedera::mirror::api::proto::NetworkService::NewStub(getChannel()); } } diff --git a/sdk/main/src/impl/Network.cc b/sdk/main/src/impl/Network.cc index 8244c612e..73bebe27a 100644 --- a/sdk/main/src/impl/Network.cc +++ b/sdk/main/src/impl/Network.cc @@ -21,6 +21,8 @@ #include "AccountId.h" #include "Endpoint.h" #include "NodeAddress.h" +#include "NodeAddressBook.h" +#include "impl/Node.h" #include #include @@ -52,6 +54,25 @@ Network Network::forNetwork(const std::unordered_map& ne return Network(network); } +//----- +std::unordered_map Network::getNetworkFromAddressBook(const NodeAddressBook& addressBook, + unsigned int port) +{ + std::unordered_map network; + for (const auto& nodeAddress : addressBook.getNodeAddresses()) + { + for (const auto& endpoint : nodeAddress.getEndpoints()) + { + if (endpoint.getPort() == port) + { + network[endpoint.toString()] = nodeAddress.getAccountId(); + } + } + } + + return network; +} + //----- Network& Network::setLedgerId(const LedgerId& ledgerId) { @@ -61,6 +82,7 @@ Network& Network::setLedgerId(const LedgerId& ledgerId) //----- Network& Network::setVerifyCertificates(bool verify) { + std::unique_lock lock(*getLock()); mVerifyCertificates = verify; // Set the new certificate verification policy for all Nodes on this Network. @@ -74,6 +96,7 @@ Network& Network::setVerifyCertificates(bool verify) //----- Network& Network::setMaxNodesPerRequest(unsigned int max) { + std::unique_lock lock(*getLock()); mMaxNodesPerRequest = max; return *this; } @@ -81,6 +104,7 @@ Network& Network::setMaxNodesPerRequest(unsigned int max) //----- Network& Network::setTransportSecurity(TLSBehavior tls) { + std::unique_lock lock(*getLock()); if (isTransportSecurity() != tls) { for (const std::shared_ptr& node : getNodes()) @@ -113,13 +137,18 @@ Network& Network::setTransportSecurity(TLSBehavior tls) //----- std::vector Network::getNodeAccountIdsForExecute() { + std::unique_lock lock(*getLock()); + + // Get either the 1/3 most healthy nodes, or the number of most healthy nodes specified by mMaxNodesPerRequest. + const std::vector> nodes = getNumberOfMostHealthyNodes( + mMaxNodesPerRequest > 0U ? std::min(mMaxNodesPerRequest, static_cast(getNodes().size())) + : static_cast(std::ceil(static_cast(getNodes().size()) / 3.0))); + std::vector accountIds; - for (const std::shared_ptr& node : getNumberOfMostHealthyNodes( - mMaxNodesPerRequest > 0U ? std::min(mMaxNodesPerRequest, static_cast(getNodes().size())) - : static_cast(std::ceil(static_cast(getNodes().size()) / 3.0)))) - { - accountIds.push_back(node->getAccountId()); - } + accountIds.reserve(nodes.size()); + std::for_each(nodes.cbegin(), + nodes.cend(), + [&accountIds](const std::shared_ptr& node) { accountIds.push_back(node->getAccountId()); }); return accountIds; } @@ -127,11 +156,12 @@ std::vector Network::getNodeAccountIdsForExecute() //----- std::unordered_map Network::getNetwork() const { + std::unique_lock lock(*getLock()); std::unordered_map network; - for (const std::shared_ptr& node : getNodes()) - { - network[node->getAddress().toString()] = node->getAccountId(); - } + std::for_each(getNodes().cbegin(), + getNodes().cend(), + [&network](const std::shared_ptr& node) + { network[node->getAddress().toString()] = node->getAccountId(); }); return network; } @@ -145,25 +175,13 @@ Network::Network(const std::unordered_map& network) //----- Network Network::getNetworkForLedgerId(const LedgerId& ledgerId) { - const std::unordered_map addressBook = getAddressBookForLedgerId(ledgerId); - - std::unordered_map network; - for (const auto& [accountId, nodeAddress] : addressBook) - { - for (const auto& endpoint : nodeAddress.getEndpoints()) - { - if (endpoint.getPort() == BaseNodeAddress::PORT_NODE_PLAIN) - { - network[endpoint.toString()] = accountId; - } - } - } - - return Network(network).setLedgerIdInternal(ledgerId, addressBook); + const NodeAddressBook addressBook = getAddressBookForLedgerId(ledgerId); + return Network(Network::getNetworkFromAddressBook(addressBook, BaseNodeAddress::PORT_NODE_PLAIN)) + .setLedgerIdInternal(ledgerId, addressBook); } //----- -std::unordered_map Network::getAddressBookForLedgerId(const LedgerId& ledgerId) +NodeAddressBook Network::getAddressBookForLedgerId(const LedgerId& ledgerId) { // The address book can only be fetched for known Hedera networks. if (!ledgerId.isKnownNetwork()) @@ -172,16 +190,7 @@ std::unordered_map Network::getAddressBookForLedgerId(co } std::ifstream infile(ledgerId.toString() + ".pb", std::ios_base::binary); - const NodeAddressBook addressBook = - NodeAddressBook::fromBytes({ std::istreambuf_iterator(infile), std::istreambuf_iterator() }); - - std::unordered_map addresses; - for (const auto& nodeAddress : addressBook.getNodeAddresses()) - { - addresses[nodeAddress.getAccountId()] = nodeAddress; - } - - return addresses; + return NodeAddressBook::fromBytes({ std::istreambuf_iterator(infile), std::istreambuf_iterator() }); } //----- @@ -193,21 +202,34 @@ std::shared_ptr Network::createNodeFromNetworkEntry(std::string_view addre } //----- -Network& Network::setLedgerIdInternal(const LedgerId& ledgerId, - const std::unordered_map& addressBook) +Network& Network::setLedgerIdInternal(const LedgerId& ledgerId, const NodeAddressBook& addressBook) { - // Set the ledger ID. + // Set the ledger ID. Don't lock here because setLedgerId locks. BaseNetwork::setLedgerId(ledgerId); - // Update the node certificate hash of each node. - std::for_each(getNodes().cbegin(), - getNodes().cend(), - [&addressBook](const std::shared_ptr& node) - { - node->setNodeCertificateHash(addressBook.empty() - ? std::vector() - : addressBook.at(node->getAccountId()).getCertHash()); - }); + // Reset the node certificate hash of each node if the address book is empty. + std::unique_lock lock(*getLock()); + if (addressBook.getNodeAddresses().empty()) + { + std::for_each(getNodes().cbegin(), + getNodes().cend(), + [](const std::shared_ptr& node) { node->setNodeCertificateHash({}); }); + } + else + { + std::for_each(getNodes().cbegin(), + getNodes().cend(), + [&addressBook](const std::shared_ptr& node) + { + for (const NodeAddress& address : addressBook.getNodeAddresses()) + { + if (node->getAccountId() == address.getAccountId()) + { + node->setNodeCertificateHash(address.getCertHash()); + } + } + }); + } return *this; } diff --git a/sdk/main/src/impl/Node.cc b/sdk/main/src/impl/Node.cc index 68ad0d627..b60b87fa5 100644 --- a/sdk/main/src/impl/Node.cc +++ b/sdk/main/src/impl/Node.cc @@ -18,7 +18,6 @@ * */ #include "impl/Node.h" -#include "exceptions/UninitializedException.h" #include "impl/BaseNodeAddress.h" #include "impl/HederaCertificateVerifier.h" @@ -46,6 +45,8 @@ grpc::Status Node::submitQuery(proto::Query::QueryCase funcEnum, const std::chrono::system_clock::time_point& deadline, proto::Response* response) { + std::unique_lock lock(*getLock()); + grpc::ClientContext context; context.set_deadline(deadline); @@ -97,6 +98,8 @@ grpc::Status Node::submitTransaction(proto::TransactionBody::DataCase funcEnum, const std::chrono::system_clock::time_point& deadline, proto::TransactionResponse* response) { + std::unique_lock lock(*getLock()); + grpc::ClientContext context; context.set_deadline(deadline); @@ -195,18 +198,25 @@ grpc::Status Node::submitTransaction(proto::TransactionBody::DataCase funcEnum, //----- Node& Node::toInsecure() { - return BaseNode::setAddress(getAddress().toInsecure()); + const BaseNodeAddress address = getAddress().toInsecure(); + + std::unique_lock lock(*getLock()); + return BaseNode::setAddress(address); } //----- Node& Node::toSecure() { - return BaseNode::setAddress(getAddress().toSecure()); + const BaseNodeAddress address = getAddress().toSecure(); + + std::unique_lock lock(*getLock()); + return BaseNode::setAddress(address); } //----- Node& Node::setNodeCertificateHash(const std::vector& hash) { + std::unique_lock lock(*getLock()); mNodeCertificateHash = hash; return *this; } @@ -214,6 +224,7 @@ Node& Node::setNodeCertificateHash(const std::vector& hash) //----- Node& Node::setVerifyCertificates(bool verify) { + std::unique_lock lock(*getLock()); mVerifyCertificates = verify; return *this; } @@ -251,17 +262,17 @@ std::shared_ptr Node::getTlsChannelCredentials() const } //----- -void Node::initializeStubs(const std::shared_ptr& channel) +void Node::initializeStubs() { // clang-format off - if (!mConsensusStub) mConsensusStub = proto::ConsensusService::NewStub(channel); - if (!mCryptoStub) mCryptoStub = proto::CryptoService::NewStub(channel); - if (!mFileStub) mFileStub = proto::FileService::NewStub(channel); - if (!mFreezeStub) mFreezeStub = proto::FreezeService::NewStub(channel); - if (!mNetworkStub) mNetworkStub = proto::NetworkService::NewStub(channel); - if (!mScheduleStub) mScheduleStub = proto::ScheduleService::NewStub(channel); - if (!mSmartContractStub) mSmartContractStub = proto::SmartContractService::NewStub(channel); - if (!mTokenStub) mTokenStub = proto::TokenService::NewStub(channel); + if (!mConsensusStub) mConsensusStub = proto::ConsensusService::NewStub(getChannel()); + if (!mCryptoStub) mCryptoStub = proto::CryptoService::NewStub(getChannel()); + if (!mFileStub) mFileStub = proto::FileService::NewStub(getChannel()); + if (!mFreezeStub) mFreezeStub = proto::FreezeService::NewStub(getChannel()); + if (!mNetworkStub) mNetworkStub = proto::NetworkService::NewStub(getChannel()); + if (!mScheduleStub) mScheduleStub = proto::ScheduleService::NewStub(getChannel()); + if (!mSmartContractStub) mSmartContractStub = proto::SmartContractService::NewStub(getChannel()); + if (!mTokenStub) mTokenStub = proto::TokenService::NewStub(getChannel()); // clang-format on }