diff --git a/src/Makefile.am b/src/Makefile.am index 99d3c99d864a6..0c2e16a6db2bd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -314,6 +314,7 @@ BITCOIN_CORE_H = \ streams.h \ statsd_client.h \ support/allocators/mt_pooled_secure.h \ + support/allocators/pool.h \ support/allocators/pooled_secure.h \ support/allocators/secure.h \ support/allocators/zeroafterfree.h \ @@ -328,6 +329,7 @@ BITCOIN_CORE_H = \ torcontrol.h \ txdb.h \ txmempool.h \ + txorphanage.h \ undo.h \ unordered_lru_cache.h \ util/bip32.h \ @@ -527,6 +529,7 @@ libbitcoin_server_a_SOURCES = \ torcontrol.cpp \ txdb.cpp \ txmempool.cpp \ + txorphanage.cpp \ validation.cpp \ validationinterface.cpp \ versionbits.cpp \ diff --git a/src/Makefile.bench.include b/src/Makefile.bench.include index 5da7d544a9c87..133bc282ef741 100644 --- a/src/Makefile.bench.include +++ b/src/Makefile.bench.include @@ -41,6 +41,7 @@ bench_bench_dash_SOURCES = \ bench/nanobench.h \ bench/nanobench.cpp \ bench/peer_eviction.cpp \ + bench/pool.cpp \ bench/rpc_blockchain.cpp \ bench/rpc_mempool.cpp \ bench/util_time.cpp \ diff --git a/src/Makefile.test.include b/src/Makefile.test.include index d7ccc53c5d2b5..471c38be44ac8 100644 --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -136,6 +136,7 @@ BITCOIN_TESTS =\ test/netbase_tests.cpp \ test/pmt_tests.cpp \ test/policyestimator_tests.cpp \ + test/pool_tests.cpp \ test/pow_tests.cpp \ test/prevector_tests.cpp \ test/raii_event_tests.cpp \ @@ -298,6 +299,7 @@ test_fuzz_fuzz_SOURCES = \ test/fuzz/parse_univalue.cpp \ test/fuzz/policy_estimator.cpp \ test/fuzz/policy_estimator_io.cpp \ + test/fuzz/poolresource.cpp \ test/fuzz/pow.cpp \ test/fuzz/prevector.cpp \ test/fuzz/primitives_transaction.cpp \ diff --git a/src/Makefile.test_util.include b/src/Makefile.test_util.include index ee349bbc6f248..5e17e398b1be9 100644 --- a/src/Makefile.test_util.include +++ b/src/Makefile.test_util.include @@ -14,6 +14,7 @@ TEST_UTIL_H = \ test/util/logging.h \ test/util/mining.h \ test/util/net.h \ + test/util/poolresourcetester.h \ test/util/script.h \ test/util/setup_common.h \ test/util/str.h \ diff --git a/src/bench/pool.cpp b/src/bench/pool.cpp new file mode 100644 index 0000000000000..0bf2b18514808 --- /dev/null +++ b/src/bench/pool.cpp @@ -0,0 +1,50 @@ +// Copyright (c) 2022 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include + +#include + +template +void BenchFillClearMap(benchmark::Bench& bench, Map& map) +{ + size_t batch_size = 5000; + + // make sure each iteration of the benchmark contains exactly 5000 inserts and one clear. + // do this at least 10 times so we get reasonable accurate results + + bench.batch(batch_size).minEpochIterations(10).run([&] { + auto rng = ankerl::nanobench::Rng(1234); + for (size_t i = 0; i < batch_size; ++i) { + map[rng()]; + } + map.clear(); + }); +} + +static void PoolAllocator_StdUnorderedMap(benchmark::Bench& bench) +{ + auto map = std::unordered_map(); + BenchFillClearMap(bench, map); +} + +static void PoolAllocator_StdUnorderedMapWithPoolResource(benchmark::Bench& bench) +{ + using Map = std::unordered_map, + std::equal_to, + PoolAllocator, + sizeof(std::pair) + 4 * sizeof(void*), + alignof(void*)>>; + + // make sure the resource supports large enough pools to hold the node. We do this by adding the size of a few pointers to it. + auto pool_resource = Map::allocator_type::ResourceType(); + auto map = Map{0, std::hash{}, std::equal_to{}, &pool_resource}; + BenchFillClearMap(bench, map); +} + +BENCHMARK(PoolAllocator_StdUnorderedMap); +BENCHMARK(PoolAllocator_StdUnorderedMapWithPoolResource); diff --git a/src/coins.cpp b/src/coins.cpp index 431d7223d8898..4a1cccd0abe64 100644 --- a/src/coins.cpp +++ b/src/coins.cpp @@ -33,7 +33,7 @@ size_t CCoinsViewBacked::EstimateSize() const { return base->EstimateSize(); } CCoinsViewCache::CCoinsViewCache(CCoinsView* baseIn, bool deterministic) : CCoinsViewBacked(baseIn), m_deterministic(deterministic), - cacheCoins(0, SaltedOutpointHasher(/*deterministic=*/deterministic)) + cacheCoins(0, SaltedOutpointHasher(/*deterministic=*/deterministic), CCoinsMap::key_equal{}, &m_cache_coins_memory_resource) {} size_t CCoinsViewCache::DynamicMemoryUsage() const { @@ -240,9 +240,12 @@ bool CCoinsViewCache::BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlockIn bool CCoinsViewCache::Flush() { bool fOk = base->BatchWrite(cacheCoins, hashBlock, /*erase=*/true); - if (fOk && !cacheCoins.empty()) { - /* BatchWrite must erase all cacheCoins elements when erase=true. */ - throw std::logic_error("Not all cached coins were erased"); + if (fOk) { + if (!cacheCoins.empty()) { + /* BatchWrite must erase all cacheCoins elements when erase=true. */ + throw std::logic_error("Not all cached coins were erased"); + } + ReallocateCache(); } cachedCoinsUsage = 0; return fOk; @@ -295,7 +298,9 @@ void CCoinsViewCache::ReallocateCache() // Cache should be empty when we're calling this. assert(cacheCoins.size() == 0); cacheCoins.~CCoinsMap(); - ::new (&cacheCoins) CCoinsMap(0, SaltedOutpointHasher(/*deterministic=*/m_deterministic)); + m_cache_coins_memory_resource.~CCoinsMapMemoryResource(); + ::new (&m_cache_coins_memory_resource) CCoinsMapMemoryResource{}; + ::new (&cacheCoins) CCoinsMap{0, SaltedOutpointHasher{/*deterministic=*/m_deterministic}, CCoinsMap::key_equal{}, &m_cache_coins_memory_resource}; } void CCoinsViewCache::SanityCheck() const diff --git a/src/coins.h b/src/coins.h index 3dbe23aea5820..fde5f688cc12a 100644 --- a/src/coins.h +++ b/src/coins.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -131,7 +132,23 @@ struct CCoinsCacheEntry CCoinsCacheEntry(Coin&& coin_, unsigned char flag) : coin(std::move(coin_)), flags(flag) {} }; -typedef std::unordered_map CCoinsMap; +/** + * PoolAllocator's MAX_BLOCK_SIZE_BYTES parameter here uses sizeof the data, and adds the size + * of 4 pointers. We do not know the exact node size used in the std::unordered_node implementation + * because it is implementation defined. Most implementations have an overhead of 1 or 2 pointers, + * so nodes can be connected in a linked list, and in some cases the hash value is stored as well. + * Using an additional sizeof(void*)*4 for MAX_BLOCK_SIZE_BYTES should thus be sufficient so that + * all implementations can allocate the nodes from the PoolAllocator. + */ +using CCoinsMap = std::unordered_map, + PoolAllocator, + sizeof(std::pair) + sizeof(void*) * 4, + alignof(void*)>>; + +using CCoinsMapMemoryResource = CCoinsMap::allocator_type::ResourceType; /** Cursor for iterating over CoinsView state */ class CCoinsViewCursor @@ -221,6 +238,7 @@ class CCoinsViewCache : public CCoinsViewBacked * declared as "const". */ mutable uint256 hashBlock; + mutable CCoinsMapMemoryResource m_cache_coins_memory_resource{}; mutable CCoinsMap cacheCoins; /* Cached dynamic memory usage for the inner Coin objects. */ diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 89336190ed720..defdfd6f64c90 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -157,10 +157,7 @@ PeerMsgRet CGovernanceManager::ProcessMessage(CNode& peer, CConnman& connman, Pe uint256 nHash = govobj.GetHash(); - { - LOCK(cs_main); - EraseObjectRequest(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash)); - } + WITH_LOCK(::cs_main, peerman.EraseObjectRequest(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash))); if (!m_mn_sync->IsBlockchainSynced()) { LogPrint(BCLog::GOBJECT, "MNGOVERNANCEOBJECT -- masternode list not synced\n"); @@ -223,11 +220,7 @@ PeerMsgRet CGovernanceManager::ProcessMessage(CNode& peer, CConnman& connman, Pe vRecv >> vote; uint256 nHash = vote.GetHash(); - - { - LOCK(cs_main); - EraseObjectRequest(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nHash)); - } + WITH_LOCK(::cs_main, peerman.EraseObjectRequest(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nHash))); // Ignore such messages until masternode list is synced if (!m_mn_sync->IsBlockchainSynced()) { @@ -1222,13 +1215,14 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::MNGOVERNANCESYNC, nHash, filter)); } -int CGovernanceManager::RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const +int CGovernanceManager::RequestGovernanceObjectVotes(CNode& peer, CConnman& connman, const PeerManager& peerman) const { const std::vector vNodeCopy{&peer}; - return RequestGovernanceObjectVotes(vNodeCopy, connman); + return RequestGovernanceObjectVotes(vNodeCopy, connman, peerman); } -int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman) const +int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman, + const PeerManager& peerman) const { static std::map > mapAskedRecently; @@ -1304,7 +1298,7 @@ int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& // stop early to prevent setAskFor overflow { LOCK(cs_main); - size_t nProjectedSize = GetRequestedObjectCount(pnode->GetId()) + nProjectedVotes; + size_t nProjectedSize = peerman.GetRequestedObjectCount(pnode->GetId()) + nProjectedVotes; if (nProjectedSize > MAX_INV_SZ) continue; // to early to ask the same node if (mapAskedRecently[nHashGovobj].count(pnode->addr)) continue; diff --git a/src/governance/governance.h b/src/governance/governance.h index 9e38f411fa226..aa1dec1306845 100644 --- a/src/governance/governance.h +++ b/src/governance/governance.h @@ -357,8 +357,9 @@ class CGovernanceManager : public GovernanceStore void InitOnLoad(); - int RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const; - int RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman) const; + int RequestGovernanceObjectVotes(CNode& peer, CConnman& connman, const PeerManager& peerman) const; + int RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman, + const PeerManager& peerman) const; /* * Trigger Management (formerly CGovernanceTriggerManager) diff --git a/src/init.cpp b/src/init.cpp index e52ab7a913cd9..4e1383a683d27 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -56,6 +56,7 @@ #include #include #include +#include #include #include #include @@ -578,7 +579,7 @@ void SetupServerArgs(ArgsManager& argsman) argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxconnections=", strprintf("Maintain at most connections to peers (temporary service connections excluded) (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxreceivebuffer=", strprintf("Maximum per-connection receive buffer, *1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); - argsman.AddArg("-maxsendbuffer=", strprintf("Maximum per-connection send buffer, *1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); + argsman.AddArg("-maxsendbuffer=", strprintf("Maximum per-connection memory usage for the send buffer, *1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxtimeadjustment", strprintf("Maximum allowed median peer time offset adjustment. Local perspective of time may be influenced by peers forward or backward by this amount. (default: %u seconds)", DEFAULT_MAX_TIME_ADJUSTMENT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxuploadtarget=", strprintf("Tries to keep outbound traffic under the given target (in MiB per 24h). Limit does not apply to peers with 'download' permission. 0 = no limit (default: %d)", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-onion=", "Use separate SOCKS5 proxy to reach peers via Tor onion services, set -noonion to disable (default: -proxy)", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); @@ -2219,7 +2220,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // ********************************************************* Step 10a: schedule Dash-specific tasks node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1}); - node.scheduler->scheduleEvery(std::bind(&CMasternodeSync::DoMaintenance, std::ref(*node.mn_sync)), std::chrono::seconds{1}); + node.scheduler->scheduleEvery(std::bind(&CMasternodeSync::DoMaintenance, std::ref(*node.mn_sync), std::cref(*node.peerman)), std::chrono::seconds{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeUtils::DoMaintenance, std::ref(*node.connman), std::ref(*node.dmnman), std::ref(*node.mn_sync), std::ref(*node.cj_ctx)), std::chrono::minutes{1}); node.scheduler->scheduleEvery(std::bind(&CDeterministicMNManager::DoMaintenance, std::ref(*node.dmnman)), std::chrono::seconds{10}); diff --git a/src/llmq/blockprocessor.cpp b/src/llmq/blockprocessor.cpp index 98c827ef6c36b..eec397ec5151b 100644 --- a/src/llmq/blockprocessor.cpp +++ b/src/llmq/blockprocessor.cpp @@ -60,7 +60,8 @@ PeerMsgRet CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_ CFinalCommitment qc; vRecv >> qc; - WITH_LOCK(cs_main, EraseObjectRequest(peer.GetId(), CInv(MSG_QUORUM_FINAL_COMMITMENT, ::SerializeHash(qc)))); + WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(peer.GetId(), + CInv(MSG_QUORUM_FINAL_COMMITMENT, ::SerializeHash(qc)))); if (qc.IsNull()) { LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- null commitment from peer=%d\n", __func__, peer.GetId()); diff --git a/src/llmq/chainlocks.cpp b/src/llmq/chainlocks.cpp index 39130b5a07ecf..cfd2938935dc0 100644 --- a/src/llmq/chainlocks.cpp +++ b/src/llmq/chainlocks.cpp @@ -115,8 +115,7 @@ PeerMsgRet CChainLocksHandler::ProcessNewChainLock(const NodeId from, const llmq CInv clsigInv(MSG_CLSIG, hash); if (from != -1) { - LOCK(cs_main); - EraseObjectRequest(from, clsigInv); + WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(from, clsigInv)); } { diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index e6f301cfa02f6..7b4b8c4ab29ea 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -72,8 +72,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman, uint256 hash = hw.GetHash(); if (from != -1) { - LOCK(cs_main); - EraseObjectRequest(from, CInv(invType, hash)); + WITH_LOCK(::cs_main, Assert(m_peerman.load())->EraseObjectRequest(from, CInv(invType, hash))); } LOCK(cs_messages); diff --git a/src/llmq/instantsend.cpp b/src/llmq/instantsend.cpp index 037dd2f1f661c..2d3b9883323e0 100644 --- a/src/llmq/instantsend.cpp +++ b/src/llmq/instantsend.cpp @@ -762,7 +762,7 @@ PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom { auto hash = ::SerializeHash(*islock); - WITH_LOCK(cs_main, EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash))); + WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash))); if (!islock->TriviallyValid()) { return tl::unexpected{100}; @@ -1446,7 +1446,8 @@ void CInstantSendManager::RemoveConflictingLock(const uint256& islockHash, const } } -void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnman& connman, const PeerManager& peerman, bool is_masternode) +void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman, + bool is_masternode) { std::vector nodesToAskFor; nodesToAskFor.reserve(4); @@ -1476,7 +1477,8 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnma txid.ToString(), pnode->GetId()); CInv inv(MSG_TX, txid); - RequestObject(pnode->GetId(), inv, GetTime(), is_masternode, /* fForce = */ true); + peerman.RequestObject(pnode->GetId(), inv, GetTime(), is_masternode, + /* fForce = */ true); } } for (CNode* pnode : nodesToAskFor) { diff --git a/src/llmq/instantsend.h b/src/llmq/instantsend.h index 733d2afd7a4f9..196d7ab25b345 100644 --- a/src/llmq/instantsend.h +++ b/src/llmq/instantsend.h @@ -315,8 +315,7 @@ class CInstantSendManager : public CRecoveredSigsListener EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - static void AskNodesForLockedTx(const uint256& txid, const CConnman& connman, const PeerManager& peerman, - bool is_masternode); + static void AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman, bool is_masternode); void ProcessPendingRetryLockTxs() EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 8200146d320f1..e04f924ba4de2 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -604,10 +604,8 @@ static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CR PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig) { - { - LOCK(cs_main); - EraseObjectRequest(pfrom.GetId(), CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash())); - } + WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), + CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()))); bool ban = false; if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) { diff --git a/src/masternode/sync.cpp b/src/masternode/sync.cpp index 9e3f53a0c2215..b4fc5440bf8c9 100644 --- a/src/masternode/sync.cpp +++ b/src/masternode/sync.cpp @@ -115,7 +115,7 @@ void CMasternodeSync::ProcessMessage(const CNode& peer, std::string_view msg_typ LogPrint(BCLog::MNSYNC, "SYNCSTATUSCOUNT -- got inventory count: nItemID=%d nCount=%d peer=%d\n", nItemID, nCount, peer.GetId()); } -void CMasternodeSync::ProcessTick() +void CMasternodeSync::ProcessTick(const PeerManager& peerman) { assert(m_netfulfilledman.IsValid()); @@ -144,7 +144,7 @@ void CMasternodeSync::ProcessTick() // gradually request the rest of the votes after sync finished if(IsSynced()) { - m_govman.RequestGovernanceObjectVotes(snap.Nodes(), connman); + m_govman.RequestGovernanceObjectVotes(snap.Nodes(), connman, peerman); return; } @@ -264,7 +264,7 @@ void CMasternodeSync::ProcessTick() if(!m_netfulfilledman.HasFulfilledRequest(pnode->addr, "governance-sync")) { continue; // to early for this node } - int nObjsLeftToAsk = m_govman.RequestGovernanceObjectVotes(*pnode, connman); + int nObjsLeftToAsk = m_govman.RequestGovernanceObjectVotes(*pnode, connman, peerman); // check for data if(nObjsLeftToAsk == 0) { static int64_t nTimeNoObjectsLeft = 0; @@ -368,9 +368,9 @@ void CMasternodeSync::UpdatedBlockTip(const CBlockIndex *pindexTip, const CBlock pindexNew->nHeight, pindexTip->nHeight, fInitialDownload, fReachedBestHeader); } -void CMasternodeSync::DoMaintenance() +void CMasternodeSync::DoMaintenance(const PeerManager& peerman) { if (ShutdownRequested()) return; - ProcessTick(); + ProcessTick(peerman); } diff --git a/src/masternode/sync.h b/src/masternode/sync.h index 2692cd17368e4..d61ef070ba91f 100644 --- a/src/masternode/sync.h +++ b/src/masternode/sync.h @@ -15,6 +15,7 @@ class CGovernanceManager; class CMasternodeSync; class CNetFulfilledRequestManager; class CNode; +class PeerManager; static constexpr int MASTERNODE_SYNC_BLOCKCHAIN = 1; static constexpr int MASTERNODE_SYNC_GOVERNANCE = 4; @@ -71,13 +72,13 @@ class CMasternodeSync void SwitchToNextAsset(); void ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) const; - void ProcessTick(); + void ProcessTick(const PeerManager& peerman); void AcceptedBlockHeader(const CBlockIndex *pindexNew); void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload); void UpdatedBlockTip(const CBlockIndex *pindexTip, const CBlockIndex *pindexNew, bool fInitialDownload); - void DoMaintenance(); + void DoMaintenance(const PeerManager& peerman); }; #endif // BITCOIN_MASTERNODE_SYNC_H diff --git a/src/memusage.h b/src/memusage.h index a6e894129aa29..5fffe4ec07b62 100644 --- a/src/memusage.h +++ b/src/memusage.h @@ -7,6 +7,7 @@ #include #include +#include #include @@ -167,6 +168,25 @@ static inline size_t DynamicUsage(const std::unordered_map& m) return MallocUsage(sizeof(unordered_node >)) * m.size() + MallocUsage(sizeof(void*) * m.bucket_count()); } +template +static inline size_t DynamicUsage(const std::unordered_map, + MAX_BLOCK_SIZE_BYTES, + ALIGN_BYTES>>& m) +{ + auto* pool_resource = m.get_allocator().resource(); + + // The allocated chunks are stored in a std::list. Size per node should + // therefore be 3 pointers: next, previous, and a pointer to the chunk. + size_t estimated_list_node_size = MallocUsage(sizeof(void*) * 3); + size_t usage_resource = estimated_list_node_size * pool_resource->NumAllocatedChunks(); + size_t usage_chunks = MallocUsage(pool_resource->ChunkSizeBytes()) * pool_resource->NumAllocatedChunks(); + return usage_resource + usage_chunks + MallocUsage(sizeof(void*) * m.bucket_count()); } +} // namespace memusage + #endif // BITCOIN_MEMUSAGE_H diff --git a/src/net.cpp b/src/net.cpp index 60d3d551adc78..0da4e01dd9311 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -142,6 +143,14 @@ std::map mapLocalHost GUARDED_BY(g_maplocalhost_mute static bool vfLimited[NET_MAX] GUARDED_BY(g_maplocalhost_mutex) = {}; std::string strSubVersion; +size_t CSerializedNetMsg::GetMemoryUsage() const noexcept +{ + // Don't count the dynamic memory used for the m_type string, by assuming it fits in the + // "small string" optimization area (which stores data inside the object itself, up to some + // size; 15 bytes in modern libstdc++). + return sizeof(*this) + memusage::DynamicUsage(data); +} + void CConnman::AddAddrFetch(const std::string& strDest) { LOCK(m_addr_fetches_mutex); @@ -783,16 +792,15 @@ bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) nRecvBytes += msg_bytes.size(); while (msg_bytes.size() > 0) { // absorb network data - int handled = m_deserializer->Read(msg_bytes); - if (handled < 0) { - // Serious header problem, disconnect from the peer. + if (!m_transport->ReceivedBytes(msg_bytes)) { + // Serious transport problem, disconnect from the peer. return false; } - if (m_deserializer->Complete()) { + if (m_transport->ReceivedMessageComplete()) { // decompose a transport agnostic CNetMessage from the deserializer bool reject_message{false}; - CNetMessage msg = m_deserializer->GetMessage(time, reject_message); + CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message); if (reject_message) { // Message deserialization failed. Drop the message but don't disconnect the peer. // store the size of the corrupt message @@ -820,8 +828,18 @@ bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) return true; } -int V1TransportDeserializer::readHeader(Span msg_bytes) +V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept : + m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { + assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes)); + std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes); + LOCK(m_recv_mutex); + Reset(); +} + +int V1Transport::readHeader(Span msg_bytes) +{ + AssertLockHeld(m_recv_mutex); // copy data to temporary parsing buffer unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos; unsigned int nCopy = std::min(nRemaining, msg_bytes.size()); @@ -843,7 +861,7 @@ int V1TransportDeserializer::readHeader(Span msg_bytes) } // Check start string, network magic - if (memcmp(hdr.pchMessageStart, m_chain_params.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { + if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id); return -1; } @@ -860,8 +878,9 @@ int V1TransportDeserializer::readHeader(Span msg_bytes) return nCopy; } -int V1TransportDeserializer::readData(Span msg_bytes) +int V1Transport::readData(Span msg_bytes) { + AssertLockHeld(m_recv_mutex); unsigned int nRemaining = hdr.nMessageSize - nDataPos; unsigned int nCopy = std::min(nRemaining, msg_bytes.size()); @@ -877,19 +896,22 @@ int V1TransportDeserializer::readData(Span msg_bytes) return nCopy; } -const uint256& V1TransportDeserializer::GetMessageHash() const +const uint256& V1Transport::GetMessageHash() const { - assert(Complete()); + AssertLockHeld(m_recv_mutex); + assert(CompleteInternal()); if (data_hash.IsNull()) hasher.Finalize(data_hash); return data_hash; } -CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds time, bool& reject_message) +CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message) { + AssertLockNotHeld(m_recv_mutex); // Initialize out parameter reject_message = false; // decompose a single CNetMessage from the TransportDeserializer + LOCK(m_recv_mutex); CNetMessage msg(std::move(vRecv)); // store message type string, time, and sizes @@ -922,47 +944,122 @@ CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds return msg; } -void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vector& header) const +bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept { + AssertLockNotHeld(m_send_mutex); + // Determine whether a new message can be set. + LOCK(m_send_mutex); + if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false; + // create dbl-sha256 checksum uint256 hash = Hash(msg.data); // create header - CMessageHeader hdr(Params().MessageStart(), msg.m_type.c_str(), msg.data.size()); + CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size()); memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE); // serialize header - header.reserve(CMessageHeader::HEADER_SIZE); - CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr}; + m_header_to_send.clear(); + CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr}; + + // update state + m_message_to_send = std::move(msg); + m_sending_header = true; + m_bytes_sent = 0; + return true; +} + +Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + if (m_sending_header) { + return {Span{m_header_to_send}.subspan(m_bytes_sent), + // We have more to send after the header if the message has payload. + !m_message_to_send.data.empty(), + m_message_to_send.m_type + }; + } else { + return {Span{m_message_to_send.data}.subspan(m_bytes_sent), + // We never have more to send after this message's payload. + false, + m_message_to_send.m_type + }; + } +} + +void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + m_bytes_sent += bytes_sent; + if (m_sending_header && m_bytes_sent == m_header_to_send.size()) { + // We're done sending a message's header. Switch to sending its data bytes. + m_sending_header = false; + m_bytes_sent = 0; + } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) { + // We're done sending a message's data. Wipe the data vector to reduce memory consumption. + m_message_to_send.data.clear(); + m_message_to_send.data.shrink_to_fit(); + m_bytes_sent = 0; + } +} + +size_t V1Transport::GetSendMemoryUsage() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + // Don't count sending-side fields besides m_message_to_send, as they're all small and bounded. + return m_message_to_send.GetMemoryUsage(); } -size_t CConnman::SocketSendData(CNode& node) +std::pair CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; - - while (it != node.vSendMsg.end()) { - const auto& data = *it; - assert(data.size() > node.nSendOffset); + bool data_left{false}; //!< second return value (whether unsent data remains) + + while (true) { + if (it != node.vSendMsg.end()) { + // If possible, move one message from the send queue to the transport. This fails when + // there is an existing message still being sent. + size_t memusage = it->GetMemoryUsage(); + if (node.m_transport->SetMessageToSend(*it)) { + // Update memory usage of send buffer (as *it will be deleted). + node.m_send_memusage -= memusage; + ++it; + } + } + const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(); + data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent int nBytes = 0; - { + if (!data.empty()) { LOCK(node.m_sock_mutex); + // There is no socket in case we've already disconnected, or in test cases without + // real connections. In these cases, we bail out immediately and just leave things + // in the send queue and transport. if (!node.m_sock) { break; } - nBytes = node.m_sock->Send(reinterpret_cast(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + int flags = MSG_NOSIGNAL | MSG_DONTWAIT; +#ifdef MSG_MORE + // We have more to send if either the transport itself has more, or if we have more + // messages to send. + if (more || it != node.vSendMsg.end()) { + flags |= MSG_MORE; + } +#endif + nBytes = node.m_sock->Send(reinterpret_cast(data.data()), data.size(), flags); } if (nBytes > 0) { node.m_last_send = GetTime(); node.nSendBytes += nBytes; - node.nSendOffset += nBytes; + // Notify transport that bytes have been processed. + node.m_transport->MarkBytesSent(nBytes); + // Update statistics per message type. + node.mapSendBytesPerMsgType[msg_type] += nBytes; nSentSize += nBytes; - if (node.nSendOffset == data.size()) { - node.nSendOffset = 0; - node.nSendSize -= data.size(); - node.fPauseSend = node.nSendSize > nSendBufferMaxSize; - it++; - } else { + if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more node.fCanSendData = false; break; @@ -976,19 +1073,18 @@ size_t CConnman::SocketSendData(CNode& node) node.fDisconnect = true; } } - // couldn't send anything at all - node.fCanSendData = false; break; } } + node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize; + if (it == node.vSendMsg.end()) { - assert(node.nSendOffset == 0); - assert(node.nSendSize == 0); + assert(node.m_send_memusage == 0); } node.vSendMsg.erase(node.vSendMsg.begin(), it); node.nSendMsgSize = node.vSendMsg.size(); - return nSentSize; + return {nSentSize, data_left}; } static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b) @@ -1513,7 +1609,9 @@ void CConnman::DisconnectNodes() } if (GetTimeMillis() < pnode->nDisconnectLingerTime) { // everything flushed to the kernel? - if (!pnode->fSocketShutdown && pnode->nSendMsgSize == 0) { + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + const bool queue_is_empty{to_send.empty() && pnode->nSendMsgSize == 0}; + if (!pnode->fSocketShutdown && queue_is_empty) { LOCK(pnode->m_sock_mutex); if (pnode->m_sock) { // Give the other side a chance to detect the disconnect as early as possible (recv() will return 0) @@ -1705,8 +1803,7 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, recv_set.insert(hListenSocket.sock->Get()); } - for (CNode* pnode : nodes) - { + for (CNode* pnode : nodes) { bool select_recv = !pnode->fHasRecvData; bool select_send = !pnode->fCanSendData; @@ -2021,9 +2118,9 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, if (interruptNet) return; - std::vector vErrorNodes; - std::vector vReceivableNodes; - std::vector vSendableNodes; + std::set vErrorNodes; + std::set vReceivableNodes; + std::set vSendableNodes; { LOCK(cs_mapSocketToNode); for (auto hSocket : error_set) { @@ -2032,7 +2129,7 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, continue; } it->second->AddRef(); - vErrorNodes.emplace_back(it->second); + vErrorNodes.emplace(it->second); } for (auto hSocket : recv_set) { if (error_set.count(hSocket)) { @@ -2067,7 +2164,6 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, { LOCK(cs_sendable_receivable_nodes); - vReceivableNodes.reserve(mapReceivableNodes.size()); for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { if (!it->second->fHasRecvData) { it = mapReceivableNodes.erase(it); @@ -2080,9 +2176,11 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // receiving data. This means properly utilizing TCP flow control signalling. // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try // receiving data (which should succeed as the socket signalled as receivable). - if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) { + const auto& [to_send, _more, _msg_type] = it->second->m_transport->GetBytesToSend(); + const bool queue_is_empty{to_send.empty() && it->second->nSendMsgSize == 0}; + if (!it->second->fPauseRecv && !it->second->fDisconnect && queue_is_empty) { it->second->AddRef(); - vReceivableNodes.emplace_back(it->second); + vReceivableNodes.emplace(it->second); } ++it; } @@ -2093,22 +2191,45 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration // but don't have any in this iteration LOCK(cs_mapNodesWithDataToSend); - vSendableNodes.reserve(mapNodesWithDataToSend.size()); for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) { - if (it->second->nSendMsgSize == 0) { + const auto& [to_send, _more, _msg_type] = it->second->m_transport->GetBytesToSend(); + if (to_send.empty() && it->second->nSendMsgSize == 0) { // See comment in PushMessage it->second->Release(); it = mapNodesWithDataToSend.erase(it); } else { if (it->second->fCanSendData) { it->second->AddRef(); - vSendableNodes.emplace_back(it->second); + vSendableNodes.emplace(it->second); } ++it; } } } + for (CNode* pnode : vSendableNodes) { + if (interruptNet) { + break; + } + + // Send data + auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode)); + if (bytes_sent) { + RecordBytesSent(bytes_sent); + + // If both receiving and (non-optimistic) sending were possible, we first attempt + // sending. If that succeeds, but does not fully drain the send queue, do not + // attempt to receive. This avoids needlessly queueing data if the remote peer + // is slow at receiving data, by means of TCP flow control. We only do this when + // sending actually succeeded to make sure progress is always made; otherwise a + // deadlock would be possible when both sides have data to send, but neither is + // receiving. + if (data_left && vReceivableNodes.erase(pnode)) { + pnode->Release(); + } + } + } + for (CNode* pnode : vErrorNodes) { if (interruptNet) { @@ -2130,16 +2251,6 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, SocketRecvData(pnode); } - for (CNode* pnode : vSendableNodes) { - if (interruptNet) { - break; - } - - // Send data - size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode)); - if (bytes_sent) RecordBytesSent(bytes_sent); - } - for (auto& node : vErrorNodes) { node->Release(); } @@ -2497,8 +2608,8 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe auto start = GetTime(); // Minimum time before next feeler connection (in microseconds). - auto next_feeler = PoissonNextSend(start, FEELER_INTERVAL); - auto next_extra_block_relay = PoissonNextSend(start, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL); + auto next_feeler = GetExponentialRand(start, FEELER_INTERVAL); + auto next_extra_block_relay = GetExponentialRand(start, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL); const bool dnsseed = gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED); bool add_fixed_seeds = gArgs.GetBoolArg("-fixedseeds", DEFAULT_FIXEDSEEDS); @@ -2632,7 +2743,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe // // This is similar to the logic for trying extra outbound (full-relay) // peers, except: - // - we do this all the time on a poisson timer, rather than just when + // - we do this all the time on an exponential timer, rather than just when // our tip is stale // - we potentially disconnect our next-youngest block-relay-only peer, if our // newest block-relay-only peer delivers a block more recently. @@ -2641,10 +2752,10 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe // Because we can promote these connections to block-relay-only // connections, they do not get their own ConnectionType enum // (similar to how we deal with extra outbound peers). - next_extra_block_relay = PoissonNextSend(now, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL); + next_extra_block_relay = GetExponentialRand(now, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL); conn_type = ConnectionType::BLOCK_RELAY; } else if (now > next_feeler) { - next_feeler = PoissonNextSend(now, FEELER_INTERVAL); + next_feeler = GetExponentialRand(now, FEELER_INTERVAL); conn_type = ConnectionType::FEELER; fFeeler = true; } else if (nOutboundOnionRelay < m_max_outbound_onion && IsReachable(Network::NET_ONION)) { @@ -3142,8 +3253,12 @@ void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, MasternodeP OpenNetworkConnection(addrConnect, false, nullptr, nullptr, ConnectionType::OUTBOUND_FULL_RELAY, MasternodeConn::IsConnection, probe); } +Mutex NetEventsInterface::g_msgproc_mutex; + void CConnman::ThreadMessageHandler() { + LOCK(NetEventsInterface::g_msgproc_mutex); + int64_t nLastSendMessagesTimeMasternodes = 0; FastRandomContext rng; @@ -3173,7 +3288,6 @@ void CConnman::ThreadMessageHandler() return; // Send messages if (!fSkipSendMessagesForMasternodes || !pnode->m_masternode_connection) { - LOCK(pnode->cs_sendProcessing); m_msgproc->SendMessages(pnode); } @@ -4123,8 +4237,7 @@ CNode::CNode(NodeId idIn, ConnectionType conn_type_in, bool inbound_onion, std::unique_ptr&& i2p_sam_session) - : m_deserializer{std::make_unique(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))}, - m_serializer{std::make_unique(V1TransportSerializer())}, + : m_transport{std::make_unique(idIn, SER_NETWORK, INIT_PROTO_VERSION)}, m_sock{sock}, m_connected{GetTime()}, addr{addrIn}, @@ -4163,26 +4276,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) if (gArgs.GetBoolArg("-capturemessages", false)) { CaptureMessage(pnode->addr, msg.m_type, msg.data, /* incoming */ false); } - - // make sure we use the appropriate network transport format - std::vector serializedHeader; - pnode->m_serializer->prepareForTransport(msg, serializedHeader); - - size_t nTotalSize = nMessageSize + serializedHeader.size(); - statsClient.count("bandwidth.message." + SanitizeString(msg.m_type.c_str()) + ".bytesSent", nTotalSize, 1.0f); - statsClient.inc("message.sent." + SanitizeString(msg.m_type.c_str()), 1.0f); + statsClient.count(strprintf("bandwidth.message.%s.bytesSent", msg.m_type), nMessageSize, 1.0f); + statsClient.inc(strprintf("message.sent.%s", msg.m_type), 1.0f); { LOCK(pnode->cs_vSend); - bool hasPendingData = !pnode->vSendMsg.empty(); - - //log total amount of bytes per message type - pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize; - pnode->nSendSize += nTotalSize; - - if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true; - pnode->vSendMsg.push_back(std::move(serializedHeader)); - if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()}; + + // Update memory usage of send buffer. + pnode->m_send_memusage += msg.GetMemoryUsage(); + if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true; + // Move message to vSendMsg queue. + pnode->vSendMsg.push_back(std::move(msg)); pnode->nSendMsgSize = pnode->vSendMsg.size(); { @@ -4196,9 +4302,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) } } - // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) - if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load())) - m_wakeup_pipe->Write(); + // Wake up select() call in case there was no pending data before (so it was not selecting + // this socket for sending) + if (queue_was_empty) { + if (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()) { + m_wakeup_pipe->Write(); + } + } } } @@ -4234,23 +4344,6 @@ bool CConnman::IsMasternodeOrDisconnectRequested(const CService& addr) { }); } -std::chrono::microseconds CConnman::PoissonNextSendInbound(std::chrono::microseconds now, std::chrono::seconds average_interval) -{ - if (m_next_send_inv_to_incoming.load() < now) { - // If this function were called from multiple threads simultaneously - // it would possible that both update the next send variable, and return a different result to their caller. - // This is not possible in practice as only the net processing thread invokes this function. - m_next_send_inv_to_incoming = PoissonNextSend(now, average_interval); - } - return m_next_send_inv_to_incoming; -} - -std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now, std::chrono::seconds average_interval) -{ - double unscaled = -log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */); - return now + std::chrono::duration_cast(unscaled * average_interval + 0.5us); -} - CConnman::NodesSnapshot::NodesSnapshot(const CConnman& connman, std::function filter, bool shuffle) { diff --git a/src/net.h b/src/net.h index e51d1a1e30b1b..a4d6a7dfcd202 100644 --- a/src/net.h +++ b/src/net.h @@ -151,6 +151,9 @@ struct CSerializedNetMsg { std::vector data; std::string m_type; + + /** Compute total memory usage of this object (own memory + any dynamic memory). */ + size_t GetMemoryUsage() const noexcept; }; /** Different types of connections to a peer. This enum encapsulates the @@ -350,42 +353,105 @@ class CNetMessage { } }; -/** The TransportDeserializer takes care of holding and deserializing the - * network receive buffer. It can deserialize the network buffer into a - * transport protocol agnostic CNetMessage (message type & payload) - */ -class TransportDeserializer { +/** The Transport converts one connection's sent messages to wire bytes, and received bytes back. */ +class Transport { public: - // returns true if the current deserialization is complete - virtual bool Complete() const = 0; - // set the serialization context version - virtual void SetVersion(int version) = 0; - /** read and deserialize data, advances msg_bytes data pointer */ - virtual int Read(Span& msg_bytes) = 0; - // decomposes a message from the context - virtual CNetMessage GetMessage(std::chrono::microseconds time, bool& reject_message) = 0; - virtual ~TransportDeserializer() {} + virtual ~Transport() {} + + // 1. Receiver side functions, for decoding bytes received on the wire into transport protocol + // agnostic CNetMessage (message type & payload) objects. + + /** Returns true if the current message is complete (so GetReceivedMessage can be called). */ + virtual bool ReceivedMessageComplete() const = 0; + /** Set the deserialization context version for objects returned by GetReceivedMessage. */ + virtual void SetReceiveVersion(int version) = 0; + + /** Feed wire bytes to the transport. + * + * @return false if some bytes were invalid, in which case the transport can't be used anymore. + * + * Consumed bytes are chopped off the front of msg_bytes. + */ + virtual bool ReceivedBytes(Span& msg_bytes) = 0; + + /** Retrieve a completed message from transport. + * + * This can only be called when ReceivedMessageComplete() is true. + * + * If reject_message=true is returned the message itself is invalid, but (other than false + * returned by ReceivedBytes) the transport is not in an inconsistent state. + */ + virtual CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) = 0; + + // 2. Sending side functions, for converting messages into bytes to be sent over the wire. + + /** Set the next message to send. + * + * If no message can currently be set (perhaps because the previous one is not yet done being + * sent), returns false, and msg will be unmodified. Otherwise msg is enqueued (and + * possibly moved-from) and true is returned. + */ + virtual bool SetMessageToSend(CSerializedNetMsg& msg) noexcept = 0; + + /** Return type for GetBytesToSend, consisting of: + * - Span to_send: span of bytes to be sent over the wire (possibly empty). + * - bool more: whether there will be more bytes to be sent after the ones in to_send are + * all sent (as signaled by MarkBytesSent()). + * - const std::string& m_type: message type on behalf of which this is being sent. + */ + using BytesToSend = std::tuple< + Span /*to_send*/, + bool /*more*/, + const std::string& /*m_type*/ + >; + + /** Get bytes to send on the wire. + * + * As a const function, it does not modify the transport's observable state, and is thus safe + * to be called multiple times. + * + * The bytes returned by this function act as a stream which can only be appended to. This + * means that with the exception of MarkBytesSent, operations on the transport can only append + * to what is being returned. + * + * Note that m_type and to_send refer to data that is internal to the transport, and calling + * any non-const function on this object may invalidate them. + */ + virtual BytesToSend GetBytesToSend() const noexcept = 0; + + /** Report how many bytes returned by the last GetBytesToSend() have been sent. + * + * bytes_sent cannot exceed to_send.size() of the last GetBytesToSend() result. + * + * If bytes_sent=0, this call has no effect. + */ + virtual void MarkBytesSent(size_t bytes_sent) noexcept = 0; + + /** Return the memory usage of this transport attributable to buffered data to send. */ + virtual size_t GetSendMemoryUsage() const noexcept = 0; }; -class V1TransportDeserializer final : public TransportDeserializer +class V1Transport final : public Transport { private: - const CChainParams& m_chain_params; + CMessageHeader::MessageStartChars m_magic_bytes; const NodeId m_node_id; // Only for logging - mutable CHash256 hasher; - mutable uint256 data_hash; - bool in_data; // parsing header (false) or data (true) - CDataStream hdrbuf; // partially received header - CMessageHeader hdr; // complete header - CDataStream vRecv; // received message data - unsigned int nHdrPos; - unsigned int nDataPos; - - const uint256& GetMessageHash() const; - int readHeader(Span msg_bytes); - int readData(Span msg_bytes); - - void Reset() { + mutable Mutex m_recv_mutex; //!< Lock for receive state + mutable CHash256 hasher GUARDED_BY(m_recv_mutex); + mutable uint256 data_hash GUARDED_BY(m_recv_mutex); + bool in_data GUARDED_BY(m_recv_mutex); // parsing header (false) or data (true) + CDataStream hdrbuf GUARDED_BY(m_recv_mutex); // partially received header + CMessageHeader hdr GUARDED_BY(m_recv_mutex); // complete header + CDataStream vRecv GUARDED_BY(m_recv_mutex); // received message data + unsigned int nHdrPos GUARDED_BY(m_recv_mutex); + unsigned int nDataPos GUARDED_BY(m_recv_mutex); + + const uint256& GetMessageHash() const EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex); + int readHeader(Span msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex); + int readData(Span msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex); + + void Reset() EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex) { + AssertLockHeld(m_recv_mutex); vRecv.clear(); hdrbuf.clear(); hdrbuf.resize(24); @@ -396,52 +462,60 @@ class V1TransportDeserializer final : public TransportDeserializer hasher.Reset(); } -public: - V1TransportDeserializer(const CChainParams& chain_params, const NodeId node_id, int nTypeIn, int nVersionIn) - : m_chain_params(chain_params), - m_node_id(node_id), - hdrbuf(nTypeIn, nVersionIn), - vRecv(nTypeIn, nVersionIn) + bool CompleteInternal() const noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex) { - Reset(); + AssertLockHeld(m_recv_mutex); + if (!in_data) return false; + return hdr.nMessageSize == nDataPos; } - bool Complete() const override + /** Lock for sending state. */ + mutable Mutex m_send_mutex; + /** The header of the message currently being sent. */ + std::vector m_header_to_send GUARDED_BY(m_send_mutex); + /** The data of the message currently being sent. */ + CSerializedNetMsg m_message_to_send GUARDED_BY(m_send_mutex); + /** Whether we're currently sending header bytes or message bytes. */ + bool m_sending_header GUARDED_BY(m_send_mutex) {false}; + /** How many bytes have been sent so far (from m_header_to_send, or from m_message_to_send.data). */ + size_t m_bytes_sent GUARDED_BY(m_send_mutex) {0}; + +public: + V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept; + + bool ReceivedMessageComplete() const override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex) { - if (!in_data) - return false; - return (hdr.nMessageSize == nDataPos); + AssertLockNotHeld(m_recv_mutex); + return WITH_LOCK(m_recv_mutex, return CompleteInternal()); } - void SetVersion(int nVersionIn) override + + void SetReceiveVersion(int nVersionIn) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex) { + AssertLockNotHeld(m_recv_mutex); + LOCK(m_recv_mutex); hdrbuf.SetVersion(nVersionIn); vRecv.SetVersion(nVersionIn); } - int Read(Span& msg_bytes) override + + bool ReceivedBytes(Span& msg_bytes) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex) { + AssertLockNotHeld(m_recv_mutex); + LOCK(m_recv_mutex); int ret = in_data ? readData(msg_bytes) : readHeader(msg_bytes); if (ret < 0) { Reset(); } else { msg_bytes = msg_bytes.subspan(ret); } - return ret; + return ret >= 0; } - CNetMessage GetMessage(std::chrono::microseconds time, bool& reject_message) override; -}; -/** The TransportSerializer prepares messages for the network transport - */ -class TransportSerializer { -public: - // prepare message for transport (header construction, error-correction computation, payload encryption, etc.) - virtual void prepareForTransport(CSerializedNetMsg& msg, std::vector& header) const = 0; - virtual ~TransportSerializer() {} -}; + CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex); -class V1TransportSerializer : public TransportSerializer { -public: - void prepareForTransport(CSerializedNetMsg& msg, std::vector& header) const override; + bool SetMessageToSend(CSerializedNetMsg& msg) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); + BytesToSend GetBytesToSend() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); + void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); + size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); }; /** Information about a peer */ @@ -451,8 +525,9 @@ class CNode friend struct ConnmanTestMsg; public: - const std::unique_ptr m_deserializer; // Used only by SocketHandler thread - const std::unique_ptr m_serializer; + /** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while + * the sending side functions are only called under cs_vSend. */ + const std::unique_ptr m_transport; NetPermissionFlags m_permissionFlags{NetPermissionFlags::None}; // treated as const outside of fuzz tester @@ -466,12 +541,12 @@ class CNode */ std::shared_ptr m_sock GUARDED_BY(m_sock_mutex); - /** Total size of all vSendMsg entries */ - size_t nSendSize GUARDED_BY(cs_vSend){0}; - /** Offset inside the first vSendMsg already sent */ - size_t nSendOffset GUARDED_BY(cs_vSend){0}; + /** Sum of GetMemoryUsage of all vSendMsg entries. */ + size_t m_send_memusage GUARDED_BY(cs_vSend){0}; + /** Total number of bytes sent on the wire to this peer. */ uint64_t nSendBytes GUARDED_BY(cs_vSend){0}; - std::list> vSendMsg GUARDED_BY(cs_vSend); + /** Messages still to be fed to m_transport->SetMessageToSend. */ + std::deque vSendMsg GUARDED_BY(cs_vSend); std::atomic nSendMsgSize{0}; Mutex cs_vSend; Mutex m_sock_mutex; @@ -481,8 +556,6 @@ class CNode std::list vProcessMsg GUARDED_BY(cs_vProcessMsg); size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0}; - RecursiveMutex cs_sendProcessing; - uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0}; std::atomic m_last_send{0s}; @@ -816,6 +889,9 @@ class CNode class NetEventsInterface { public: + /** Mutex for anything that is only accessed via the msg processing thread */ + static Mutex g_msgproc_mutex; + /** Initialize a peer (setup state, queue any initial messages) */ virtual void InitializeNode(CNode& node, ServiceFlags our_services) = 0; @@ -829,7 +905,7 @@ class NetEventsInterface * @param[in] interrupt Interrupt condition for processing threads * @return True if there is more work to be done */ - virtual bool ProcessMessages(CNode* pnode, std::atomic& interrupt) = 0; + virtual bool ProcessMessages(CNode* pnode, std::atomic& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; /** * Send queued protocol messages to a given node. @@ -837,7 +913,7 @@ class NetEventsInterface * @param[in] pnode The node which we are sending messages to. * @return True if there is more work to be done */ - virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0; + virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; protected: @@ -1205,12 +1281,6 @@ friend class CNode; void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); - /** Attempts to obfuscate tx time through exponentially distributed emitting. - Works assuming that a single interval is used. - Variable intervals will result in privacy decrease. - */ - std::chrono::microseconds PoissonNextSendInbound(std::chrono::microseconds now, std::chrono::seconds average_interval); - /** Return true if we should disconnect the peer for failing an inactivity check. */ bool ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const; @@ -1392,8 +1462,11 @@ friend class CNode; NodeId GetNewNodeId(); - size_t SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); + /** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */ + std::pair SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); + size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); + void DumpAddresses(); // Network stats @@ -1584,8 +1657,6 @@ friend class CNode; */ std::atomic_bool m_start_extra_block_relay_peers{false}; - std::atomic m_next_send_inv_to_incoming{0us}; - /** * A vector of -bind=
:=onion arguments each of which is * an address and port that are designated for incoming Tor connections. @@ -1616,9 +1687,6 @@ friend class CNode; friend struct ConnmanTestMsg; }; -/** Return a timestamp in the future (in microseconds) for exponentially distributed events. */ -std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now, std::chrono::seconds average_interval); - /** Dump binary message to file, with timestamp */ void CaptureMessageToFile(const CAddress& addr, const std::string& msg_type, @@ -1665,10 +1733,6 @@ class CExplicitNetCleanup extern RecursiveMutex cs_main; -void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main); -void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool is_masternode, bool fForce=false) EXCLUSIVE_LOCKS_REQUIRED(cs_main); -size_t GetRequestedObjectCount(NodeId nodeId) EXCLUSIVE_LOCKS_REQUIRED(cs_main); - /** Protect desirable or disadvantaged inbound peers from eviction by ratio. * * This function protects half of the peers which have been connected the diff --git a/src/net_processing.cpp b/src/net_processing.cpp index db6d9c70ff53c..8747950df7154 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include // For NDEBUG compile time check #include #include @@ -86,10 +87,6 @@ static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ static const unsigned int MAX_GETDATA_SZ = 1000; -/** Expiration time for orphan transactions in seconds */ -static constexpr int64_t ORPHAN_TX_EXPIRE_TIME = 20 * 60; -/** Minimum time between orphan transactions expire time checks in seconds */ -static constexpr int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60; /** How long to cache transactions in mapRelay for normal relay */ static constexpr auto RELAY_TX_CACHE_TIME = 15min; /** How long a transaction has to be in the mempool before it can unconditionally be relayed (even when not in mapRelay). */ @@ -191,24 +188,6 @@ static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND}; /** The compactblocks version we support. See BIP 152. */ static constexpr uint64_t CMPCTBLOCKS_VERSION{1}; -struct COrphanTx { - // When modifying, adapt the copy of this definition in tests/DoS_tests. - CTransactionRef tx; - NodeId fromPeer; - int64_t nTimeExpire; - size_t list_pos; - size_t nTxSize; -}; - -/** Guards orphan transactions and extra txs for compact blocks */ -RecursiveMutex g_cs_orphans; -/** Map from txid to orphan transaction record. Limited by - * -maxorphantx/DEFAULT_MAX_ORPHAN_TRANSACTIONS */ -std::map mapOrphanTransactions GUARDED_BY(g_cs_orphans); - -size_t nMapOrphanTransactionsSize = 0; -void EraseOrphansFor(NodeId peer); - // Internal stuff namespace { /** Blocks that are in flight, and that are in the queue to be downloaded. */ @@ -307,7 +286,7 @@ struct Peer { bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false}; // Last time a "MEMPOOL" request was serviced. std::atomic m_last_mempool_req{0s}; - std::chrono::microseconds m_next_inv_send_time{0}; + std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; }; // in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer @@ -316,7 +295,7 @@ struct Peer { std::unique_ptr m_tx_relay{std::make_unique()}; /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ - std::vector m_addrs_to_send; + std::vector m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex); /** Probabilistic filter to track recent addr messages relayed with this * peer. Used to avoid relaying redundant addresses to this peer. * @@ -326,7 +305,7 @@ struct Peer { * * Presence of this filter must correlate with m_addr_relay_enabled. **/ - std::unique_ptr m_addr_known; + std::unique_ptr m_addr_known GUARDED_BY(NetEventsInterface::g_msgproc_mutex); /** Whether we are participating in address relay with this connection. * * We set this bool to true for outbound peers (other than @@ -345,7 +324,7 @@ struct Peer { /** Whether a Peer can only be relayed blocks */ const bool m_block_relay_only{false}; /** Whether a getaddr request to this peer is outstanding. */ - bool m_getaddr_sent{false}; + bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Guards address sending timers. */ mutable Mutex m_addr_send_times_mutex; /** Time point to send the next ADDR message to this peer. */ @@ -356,12 +335,12 @@ struct Peer { * messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */ std::atomic_bool m_wants_addrv2{false}; /** Whether this peer has already sent us a getaddr message. */ - bool m_getaddr_recvd{false}; + bool m_getaddr_recvd GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Number of addresses that can be processed from this peer. Start at 1 to * permit self-announcement. */ - double m_addr_token_bucket{1.0}; + double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0}; /** When m_addr_token_bucket was last updated */ - std::chrono::microseconds m_addr_token_timestamp{GetTime()}; + std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime()}; /** Total number of addresses that were dropped due to rate limiting. */ std::atomic m_addr_rate_limited{0}; /** Total number of addresses that were processed (excludes rate-limited ones). */ @@ -371,7 +350,7 @@ struct Peer { std::set m_orphan_work_set GUARDED_BY(g_cs_orphans); /** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */ - bool m_inv_triggered_getheaders_before_sync{false}; + bool m_inv_triggered_getheaders_before_sync GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Protects m_getdata_requests **/ Mutex m_getdata_requests_mutex; @@ -379,7 +358,7 @@ struct Peer { std::deque m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); /** Time of the last getheaders message to this peer */ - std::atomic m_last_getheaders_timestamp{0s}; + std::atomic m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0s}; explicit Peer(NodeId id, ServiceFlags our_services, bool block_relay_only) : m_id(id) @@ -391,6 +370,157 @@ struct Peer { using PeerRef = std::shared_ptr; +/** + * Maintain validation-specific state about nodes, protected by cs_main, instead + * by CNode's own locks. This simplifies asynchronous operation, where + * processing of incoming data is done after the ProcessMessage call returns, + * and we're no longer holding the node's locks. + */ +struct CNodeState { + //! The best known block we know this peer has announced. + const CBlockIndex* pindexBestKnownBlock{nullptr}; + //! The hash of the last unknown block this peer has announced. + uint256 hashLastUnknownBlock{}; + //! The last full block we both have. + const CBlockIndex* pindexLastCommonBlock{nullptr}; + //! The best header we have sent our peer. + const CBlockIndex* pindexBestHeaderSent{nullptr}; + //! Length of current-streak of unconnecting headers announcements + int nUnconnectingHeaders{0}; + //! Whether we've started headers synchronization with this peer. + bool fSyncStarted{false}; + //! When to potentially disconnect peer for stalling headers download + std::chrono::microseconds m_headers_sync_timeout{0us}; + //! Since when we're stalling block download progress (in microseconds), or 0. + std::chrono::microseconds m_stalling_since{0us}; + std::list vBlocksInFlight; + //! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty. + std::chrono::microseconds m_downloading_since{0us}; + int nBlocksInFlight{0}; + int nBlocksInFlightValidHeaders{0}; + //! Whether we consider this a preferred download peer. + bool fPreferredDownload{false}; + //! Whether this peer wants invs or headers (when possible) for block announcements. + bool fPreferHeaders{false}; + //! Whether this peer wants invs or compressed headers (when possible) for block announcements. + bool fPreferHeadersCompressed{false}; + /** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */ + bool m_requested_hb_cmpctblocks{false}; + /** Whether this peer will send us cmpctblocks if we request them. */ + bool m_provides_cmpctblocks{false}; + + /** State used to enforce CHAIN_SYNC_TIMEOUT and EXTRA_PEER_CHECK_INTERVAL logic. + * + * Both are only in effect for outbound, non-manual, non-protected connections. + * Any peer protected (m_protect = true) is not chosen for eviction. A peer is + * marked as protected if all of these are true: + * - its connection type is IsBlockOnlyConn() == false + * - it gave us a valid connecting header + * - we haven't reached MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT yet + * - its chain tip has at least as much work as ours + * + * CHAIN_SYNC_TIMEOUT: if a peer's best known block has less work than our tip, + * set a timeout CHAIN_SYNC_TIMEOUT seconds in the future: + * - If at timeout their best known block now has more work than our tip + * when the timeout was set, then either reset the timeout or clear it + * (after comparing against our current tip's work) + * - If at timeout their best known block still has less work than our + * tip did when the timeout was set, then send a getheaders message, + * and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future. + * If their best known block is still behind when that new timeout is + * reached, disconnect. + * + * EXTRA_PEER_CHECK_INTERVAL: after each interval, if we have too many outbound peers, + * drop the outbound one that least recently announced us a new block. + */ + struct ChainSyncTimeoutState { + //! A timeout used for checking whether our peer has sufficiently synced + std::chrono::seconds m_timeout{0s}; + //! A header with the work we require on our peer's chain + const CBlockIndex* m_work_header{nullptr}; + //! After timeout is reached, set to true after sending getheaders + bool m_sent_getheaders{false}; + //! Whether this peer is protected from disconnection due to a bad/slow chain + bool m_protect{false}; + }; + + ChainSyncTimeoutState m_chain_sync; + + //! Time of last new block announcement + int64_t m_last_block_announcement{0}; + + /* + * State associated with objects download. + * + * Tx download algorithm: + * + * When inv comes in, queue up (process_time, inv) inside the peer's + * CNodeState (m_object_process_time) as long as m_object_announced for the peer + * isn't too big (MAX_PEER_OBJECT_ANNOUNCEMENTS). + * + * The process_time for a objects is set to nNow for outbound peers, + * nNow + 2 seconds for inbound peers. This is the time at which we'll + * consider trying to request the objects from the peer in + * SendMessages(). The delay for inbound peers is to allow outbound peers + * a chance to announce before we request from inbound peers, to prevent + * an adversary from using inbound connections to blind us to a + * objects (InvBlock). + * + * When we call SendMessages() for a given peer, + * we will loop over the objects in m_object_process_time, looking + * at the objects whose process_time <= nNow. We'll request each + * such objects that we don't have already and that hasn't been + * requested from another peer recently, up until we hit the + * MAX_PEER_OBJECT_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested inv, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate objects + * requests amongst our peers. + * + * For objects that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, inv) + * back into the peer's m_object_process_time at the point in the future at + * which the most recent GETDATA request would time out (ie + * GetObjectInterval + the request time stored in g_already_asked_for). + * We add an additional delay for inbound peers, again to prefer + * attempting download from outbound peers first. + * We also add an extra small random delay up to 2 seconds + * to avoid biasing some peers over others. (e.g., due to fixed ordering + * of peer processing in ThreadMessageHandler). + * + * When we receive a objects from a peer, we remove the inv from the + * peer's m_object_in_flight set and from their recently announced set + * (m_object_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the objects is not accepted but also not added to + * the reject filter, then we will eventually redownload from other + * peers. + */ + struct ObjectDownloadState { + /* Track when to attempt download of announced objects (process + * time in micros -> inv) + */ + std::multimap m_object_process_time; + + //! Store all the objects a peer has recently announced + std::set m_object_announced; + + //! Store objects which were requested by us, with timestamp + std::map m_object_in_flight; + + //! Periodically check for stuck getdata requests + std::chrono::microseconds m_check_expiry_timer{0}; + }; + + ObjectDownloadState m_object_download; + + //! Whether this peer is an inbound connection + const bool m_is_inbound; + + //! A rolling bloom filter of all announced tx CInvs to this peer. + CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001}; + + CNodeState(bool is_inbound) : m_is_inbound(is_inbound) {} +}; + class PeerManagerImpl final : public PeerManager { public: @@ -419,9 +549,9 @@ class PeerManagerImpl final : public PeerManager void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex); - bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex); + bool SendMessages(CNode* pto) override + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex); /** Implement PeerManager */ void StartScheduledTasks(CScheduler& scheduler) override; @@ -440,8 +570,13 @@ class PeerManagerImpl final : public PeerManager void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex); + void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override; bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); + void EraseObjectRequest(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + void RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, + bool is_masternode, bool fForce = false) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + size_t GetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); private: @@ -449,7 +584,7 @@ class PeerManagerImpl final : public PeerManager void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ - void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex); /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main); @@ -503,20 +638,21 @@ class PeerManagerImpl final : public PeerManager void ProcessHeadersMessage(CNode& pfrom, Peer& peer, const std::vector& headers, bool via_compact_block) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); /** Various helpers for headers processing, invoked by ProcessHeadersMessage() */ /** Deal with state tracking and headers sync for peers that send the * occasional non-connecting header (this can happen due to BIP 130 headers * announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */ void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector& headers) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); /** Return true if the headers connect to each other, false otherwise */ bool CheckHeadersAreContinuous(const std::vector& headers) const; /** Request further headers from this peer with a given locator. * We don't issue a getheaders message if we have a recent one outstanding. * This returns true if a getheaders is actually sent, and false otherwise. */ - bool MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer); + bool MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer) + EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Potentially fetch blocks from this peer upon receipt of a new headers tip */ void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast); /** Update peer state based on received headers message */ @@ -535,7 +671,8 @@ class PeerManagerImpl final : public PeerManager void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now); /** Send `addr` messages on a regular schedule. */ - void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time); + void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time) + EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Relay (gossip) an address to a few randomly chosen nodes. * @@ -544,7 +681,8 @@ class PeerManagerImpl final : public PeerManager * @param[in] fReachable Whether the address' network is reachable. We relay unreachable * addresses less. */ - void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); const CChainParams& m_chainparams; CConnman& m_connman; @@ -586,6 +724,16 @@ class PeerManagerImpl final : public PeerManager */ std::map m_peer_map GUARDED_BY(m_peer_mutex); + /** Map maintaining per-node state. */ + std::map m_node_states GUARDED_BY(cs_main); + + /** Get a pointer to a const CNodeState, used when not mutating the CNodeState object. */ + const CNodeState* State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Get a pointer to a mutable CNodeState. */ + CNodeState* State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + + std::atomic m_next_inv_to_inbounds{0us}; + /** Check whether the last unknown block a peer advertised is not yet known. */ void ProcessBlockAvailability(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main); /** Update tracking information about which blocks a peer is assumed to have. */ @@ -662,13 +810,16 @@ class PeerManagerImpl final : public PeerManager * @return True if address relay is enabled with peer * False if address relay is disallowed */ - bool SetupAddressRelay(const CNode& node, Peer& peer); + bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); + + void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); + void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Number of nodes with fSyncStarted. */ int nSyncStarted GUARDED_BY(cs_main) = 0; /** Hash of the last block we received via INV */ - uint256 m_last_block_inv_triggering_headers_sync{}; + uint256 m_last_block_inv_triggering_headers_sync GUARDED_BY(g_msgproc_mutex){}; /** * Sources of received blocks, saved to be able punish them when processing @@ -681,6 +832,9 @@ class PeerManagerImpl final : public PeerManager /** Number of outbound peers with m_chain_sync.m_protect. */ int m_outbound_peers_with_protect_from_disconnect GUARDED_BY(cs_main) = 0; + /** Number of preferable block download peers. */ + int m_num_preferred_download_peers GUARDED_BY(cs_main){0}; + bool AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex); @@ -724,6 +878,25 @@ class PeerManagerImpl final : public PeerManager Mutex m_recent_confirmed_transactions_mutex; CRollingBloomFilter m_recent_confirmed_transactions GUARDED_BY(m_recent_confirmed_transactions_mutex){48'000, 0.000'001}; + /** + * For sending `inv`s to inbound peers, we use a single (exponentially + * distributed) timer for all peers. If we used a separate timer for each + * peer, a spy node could make multiple inbound connections to us to + * accurately determine when we received the transaction (and potentially + * determine the transaction's origin). */ + std::chrono::microseconds NextInvToInbounds(std::chrono::microseconds now, + std::chrono::seconds average_interval); + + + // All of the following cache a recent block, and are protected by m_most_recent_block_mutex + RecursiveMutex m_most_recent_block_mutex; + std::shared_ptr m_most_recent_block GUARDED_BY(m_most_recent_block_mutex); + std::shared_ptr m_most_recent_compact_block GUARDED_BY(m_most_recent_block_mutex); + uint256 m_most_recent_block_hash GUARDED_BY(m_most_recent_block_mutex); + + /** Height of the highest block announced using BIP 152 high-bandwidth mode. */ + int m_highest_fast_announce GUARDED_BY(::cs_main){0}; + /* Returns a bool indicating whether we requested this block. * Also used if a block was /not/ received and timed out or started with another peer */ @@ -773,203 +946,37 @@ class PeerManagerImpl final : public PeerManager /** Number of peers from which we're downloading blocks. */ int nPeersWithValidatedDownloads GUARDED_BY(cs_main) = 0; -}; -} // namespace -namespace { - - /** Number of preferable block download peers. */ - int nPreferredDownload GUARDED_BY(cs_main) = 0; - - struct IteratorComparator - { - template - bool operator()(const I& a, const I& b) const - { - return &(*a) < &(*b); - } - }; + /** Storage for orphan information */ + TxOrphanage m_orphanage; - /** Index from the parents' COutPoint into the mapOrphanTransactions. Used - * to remove orphan transactions from the mapOrphanTransactions */ - std::map::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(g_cs_orphans); - /** Orphan transactions in vector for quick random eviction */ - std::vector::iterator> g_orphan_list GUARDED_BY(g_cs_orphans); + void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans); /** Orphan/conflicted/etc transactions that are kept for compact block reconstruction. * The last -blockreconstructionextratxn/DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN of * these are kept in a ring buffer */ - static std::vector> vExtraTxnForCompact GUARDED_BY(g_cs_orphans); + std::vector> vExtraTxnForCompact GUARDED_BY(g_cs_orphans); /** Offset into vExtraTxnForCompact to insert the next tx */ - static size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0; -} // namespace - -namespace { -/** - * Maintain validation-specific state about nodes, protected by cs_main, instead - * by CNode's own locks. This simplifies asynchronous operation, where - * processing of incoming data is done after the ProcessMessage call returns, - * and we're no longer holding the node's locks. - */ -struct CNodeState { - //! The best known block we know this peer has announced. - const CBlockIndex* pindexBestKnownBlock{nullptr}; - //! The hash of the last unknown block this peer has announced. - uint256 hashLastUnknownBlock{}; - //! The last full block we both have. - const CBlockIndex* pindexLastCommonBlock{nullptr}; - //! The best header we have sent our peer. - const CBlockIndex* pindexBestHeaderSent{nullptr}; - //! Length of current-streak of unconnecting headers announcements - int nUnconnectingHeaders{0}; - //! Whether we've started headers synchronization with this peer. - bool fSyncStarted{false}; - //! When to potentially disconnect peer for stalling headers download - std::chrono::microseconds m_headers_sync_timeout{0us}; - //! Since when we're stalling block download progress (in microseconds), or 0. - std::chrono::microseconds m_stalling_since{0us}; - std::list vBlocksInFlight; - //! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty. - std::chrono::microseconds m_downloading_since{0us}; - int nBlocksInFlight{0}; - int nBlocksInFlightValidHeaders{0}; - //! Whether we consider this a preferred download peer. - bool fPreferredDownload{false}; - //! Whether this peer wants invs or headers (when possible) for block announcements. - bool fPreferHeaders{false}; - //! Whether this peer wants invs or compressed headers (when possible) for block announcements. - bool fPreferHeadersCompressed{false}; - /** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */ - bool m_requested_hb_cmpctblocks{false}; - /** Whether this peer will send us cmpctblocks if we request them. */ - bool m_provides_cmpctblocks{false}; - - /** State used to enforce CHAIN_SYNC_TIMEOUT and EXTRA_PEER_CHECK_INTERVAL logic. - * - * Both are only in effect for outbound, non-manual, non-protected connections. - * Any peer protected (m_protect = true) is not chosen for eviction. A peer is - * marked as protected if all of these are true: - * - its connection type is IsBlockOnlyConn() == false - * - it gave us a valid connecting header - * - we haven't reached MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT yet - * - its chain tip has at least as much work as ours - * - * CHAIN_SYNC_TIMEOUT: if a peer's best known block has less work than our tip, - * set a timeout CHAIN_SYNC_TIMEOUT seconds in the future: - * - If at timeout their best known block now has more work than our tip - * when the timeout was set, then either reset the timeout or clear it - * (after comparing against our current tip's work) - * - If at timeout their best known block still has less work than our - * tip did when the timeout was set, then send a getheaders message, - * and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future. - * If their best known block is still behind when that new timeout is - * reached, disconnect. - * - * EXTRA_PEER_CHECK_INTERVAL: after each interval, if we have too many outbound peers, - * drop the outbound one that least recently announced us a new block. - */ - struct ChainSyncTimeoutState { - //! A timeout used for checking whether our peer has sufficiently synced - std::chrono::seconds m_timeout{0s}; - //! A header with the work we require on our peer's chain - const CBlockIndex* m_work_header{nullptr}; - //! After timeout is reached, set to true after sending getheaders - bool m_sent_getheaders{false}; - //! Whether this peer is protected from disconnection due to a bad/slow chain - bool m_protect{false}; - }; - - ChainSyncTimeoutState m_chain_sync; - - //! Time of last new block announcement - int64_t m_last_block_announcement{0}; - - /* - * State associated with objects download. - * - * Tx download algorithm: - * - * When inv comes in, queue up (process_time, inv) inside the peer's - * CNodeState (m_object_process_time) as long as m_object_announced for the peer - * isn't too big (MAX_PEER_OBJECT_ANNOUNCEMENTS). - * - * The process_time for a objects is set to nNow for outbound peers, - * nNow + 2 seconds for inbound peers. This is the time at which we'll - * consider trying to request the objects from the peer in - * SendMessages(). The delay for inbound peers is to allow outbound peers - * a chance to announce before we request from inbound peers, to prevent - * an adversary from using inbound connections to blind us to a - * objects (InvBlock). - * - * When we call SendMessages() for a given peer, - * we will loop over the objects in m_object_process_time, looking - * at the objects whose process_time <= nNow. We'll request each - * such objects that we don't have already and that hasn't been - * requested from another peer recently, up until we hit the - * MAX_PEER_OBJECT_IN_FLIGHT limit for the peer. Then we'll update - * g_already_asked_for for each requested inv, storing the time of the - * GETDATA request. We use g_already_asked_for to coordinate objects - * requests amongst our peers. - * - * For objects that we still need but we have already recently - * requested from some other peer, we'll reinsert (process_time, inv) - * back into the peer's m_object_process_time at the point in the future at - * which the most recent GETDATA request would time out (ie - * GetObjectInterval + the request time stored in g_already_asked_for). - * We add an additional delay for inbound peers, again to prefer - * attempting download from outbound peers first. - * We also add an extra small random delay up to 2 seconds - * to avoid biasing some peers over others. (e.g., due to fixed ordering - * of peer processing in ThreadMessageHandler). - * - * When we receive a objects from a peer, we remove the inv from the - * peer's m_object_in_flight set and from their recently announced set - * (m_object_announced). We also clear g_already_asked_for for that entry, so - * that if somehow the objects is not accepted but also not added to - * the reject filter, then we will eventually redownload from other - * peers. - */ - struct ObjectDownloadState { - /* Track when to attempt download of announced objects (process - * time in micros -> inv) - */ - std::multimap m_object_process_time; - - //! Store all the objects a peer has recently announced - std::set m_object_announced; - - //! Store objects which were requested by us, with timestamp - std::map m_object_in_flight; - - //! Periodically check for stuck getdata requests - std::chrono::microseconds m_check_expiry_timer{0}; - }; - - ObjectDownloadState m_object_download; - - //! Whether this peer is an inbound connection - const bool m_is_inbound; - - //! A rolling bloom filter of all announced tx CInvs to this peer. - CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001}; - - CNodeState(bool is_inbound) : m_is_inbound(is_inbound) {} + size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0; }; // Keeps track of the time (in microseconds) when transactions were requested last time unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); unordered_limitedmap g_erased_object_requests(MAX_INV_SZ, MAX_INV_SZ * 2); -/** Map maintaining per-node state. */ -static std::map mapNodeState GUARDED_BY(cs_main); - -static CNodeState *State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - std::map::iterator it = mapNodeState.find(pnode); - if (it == mapNodeState.end()) +const CNodeState* PeerManagerImpl::State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + std::map::const_iterator it = m_node_states.find(pnode); + if (it == m_node_states.end()) return nullptr; return &it->second; } +CNodeState* PeerManagerImpl::State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + return const_cast(std::as_const(*this).State(pnode)); +} + /** * Whether the peer supports the address. For example, a peer that does not * implement BIP155 cannot receive Tor v3 addresses because it requires @@ -980,13 +987,13 @@ static bool IsAddrCompatible(const Peer& peer, const CAddress& addr) return peer.m_wants_addrv2 || addr.IsAddrV1Compatible(); } -static void AddAddressKnown(Peer& peer, const CAddress& addr) +void PeerManagerImpl::AddAddressKnown(Peer& peer, const CAddress& addr) { assert(peer.m_addr_known); peer.m_addr_known->insert(addr.GetKey()); } -static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) +void PeerManagerImpl::PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) { // Known checking here is only to save space from duplicates. // Before sending, we'll filter it again for known addresses that were @@ -1054,14 +1061,16 @@ static void PushInv(Peer& peer, const CInv& inv) peer.m_tx_relay->vInventoryOtherToSend.push_back(inv); } -static void UpdatePreferredDownload(const CNode& node, const Peer& peer, CNodeState* state) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::microseconds now, + std::chrono::seconds average_interval) { - nPreferredDownload -= state->fPreferredDownload; - - // Whether this node should be marked as a preferred download node. - state->fPreferredDownload = (!node.IsInboundConn() || node.HasPermission(NetPermissionFlags::NoBan)) && !node.IsAddrFetchConn() && CanServeBlocks(peer); - - nPreferredDownload += state->fPreferredDownload; + if (m_next_inv_to_inbounds.load() < now) { + // If this function were called from multiple threads simultaneously + // it would possible that both update the next send variable, and return a different result to their caller. + // This is not possible in practice as only the net processing thread invokes this function. + m_next_inv_to_inbounds = GetExponentialRand(now, average_interval); + } + return m_next_inv_to_inbounds; } bool PeerManagerImpl::MarkBlockAsReceived(const uint256& hash) @@ -1359,27 +1368,20 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) } } -void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void PeerManagerImpl::EraseObjectRequest(NodeId nodeid, const CInv& inv) { AssertLockHeld(cs_main); + + CNodeState* state = State(nodeid); + if (state == nullptr) + return; + LogPrint(BCLog::NET, "%s -- inv=(%s)\n", __func__, inv.ToString()); g_already_asked_for.erase(inv.hash); g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime())); - if (nodestate) { - nodestate->m_object_download.m_object_announced.erase(inv); - nodestate->m_object_download.m_object_in_flight.erase(inv); - } -} - -void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - AssertLockHeld(cs_main); - auto* state = State(nodeId); - if (!state) { - return; - } - EraseObjectRequest(state, inv); + state->m_object_download.m_object_announced.erase(inv); + state->m_object_download.m_object_in_flight.erase(inv); } std::chrono::microseconds GetObjectRequestTime(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -1452,9 +1454,15 @@ std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chron return process_time; } -void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool is_masternode, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, + bool is_masternode, bool fForce) { AssertLockHeld(cs_main); + + CNodeState* state = State(nodeid); + if (state == nullptr) + return; + CNodeState::ObjectDownloadState& peer_download_state = state->m_object_download; if (peer_download_state.m_object_announced.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || peer_download_state.m_object_process_time.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || @@ -1480,29 +1488,18 @@ void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds LogPrint(BCLog::NET, "%s -- inv=(%s), current_time=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), current_time.count(), process_time.count(), (process_time - current_time).count()); } -void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool is_masternode, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +size_t PeerManagerImpl::GetRequestedObjectCount(NodeId nodeid) const { AssertLockHeld(cs_main); - auto* state = State(nodeId); - if (!state) { - return; - } - RequestObject(state, inv, current_time, is_masternode, fForce); -} -size_t GetRequestedObjectCount(NodeId nodeId) -{ - AssertLockHeld(cs_main); - auto* state = State(nodeId); - if (!state) { + const CNodeState* state = State(nodeid); + if (state == nullptr) return 0; - } + return state->m_object_download.m_object_process_time.size(); } -// This function is used for testing the stale tip eviction logic, see -// denialofservice_tests.cpp -void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) +void PeerManagerImpl::UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) { LOCK(cs_main); CNodeState *state = State(node); @@ -1513,7 +1510,7 @@ void PeerManagerImpl::InitializeNode(CNode& node, ServiceFlags our_services) { NodeId nodeid = node.GetId(); { LOCK(cs_main); - mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(node.IsInboundConn())); + m_node_states.emplace_hint(m_node_states.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(node.IsInboundConn())); } PeerRef peer = std::make_shared(nodeid, our_services, /* block_relay_only = */ node.IsBlockOnlyConn()); { @@ -1569,19 +1566,19 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) { for (const QueuedBlock& entry : state->vBlocksInFlight) { mapBlocksInFlight.erase(entry.hash); } - EraseOrphansFor(nodeid); - nPreferredDownload -= state->fPreferredDownload; + WITH_LOCK(g_cs_orphans, m_orphanage.EraseForPeer(nodeid)); + m_num_preferred_download_peers -= state->fPreferredDownload; nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); assert(nPeersWithValidatedDownloads >= 0); m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; assert(m_outbound_peers_with_protect_from_disconnect >= 0); - mapNodeState.erase(nodeid); + m_node_states.erase(nodeid); - if (mapNodeState.empty()) { + if (m_node_states.empty()) { // Do a consistency check after the last peer is removed. assert(mapBlocksInFlight.empty()); - assert(nPreferredDownload == 0); + assert(m_num_preferred_download_peers == 0); assert(nPeersWithValidatedDownloads == 0); assert(m_outbound_peers_with_protect_from_disconnect == 0); } @@ -1620,7 +1617,7 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c { { LOCK(cs_main); - CNodeState* state = State(nodeid); + const CNodeState* state = State(nodeid); if (state == nullptr) return false; stats.nSyncHeight = state->pindexBestKnownBlock ? state->pindexBestKnownBlock->nHeight : -1; @@ -1661,12 +1658,7 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c return true; } -////////////////////////////////////////////////////////////////////////////// -// -// mapOrphanTransactions -// - -static void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) +void PeerManagerImpl::AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) { size_t max_extra_txn = gArgs.GetArg("-blockreconstructionextratxn", DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN); if (max_extra_txn <= 0) @@ -1677,131 +1669,6 @@ static void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_L vExtraTxnForCompactIt = (vExtraTxnForCompactIt + 1) % max_extra_txn; } -bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) -{ - const uint256& hash = tx->GetHash(); - if (mapOrphanTransactions.count(hash)) - return false; - - // Ignore big transactions, to avoid a - // send-big-orphans memory exhaustion attack. If a peer has a legitimate - // large transaction with a missing parent then we assume - // it will rebroadcast it later, after the parent transaction(s) - // have been mined or received. - // 100 orphans, each of which is at most 99,999 bytes big is - // at most 10 megabytes of orphans and somewhat more byprev index (in the worst case): - unsigned int sz = GetSerializeSize(*tx, CTransaction::CURRENT_VERSION); - if (sz > MAX_STANDARD_TX_SIZE) - { - LogPrint(BCLog::MEMPOOL, "ignoring large orphan tx (size: %u, hash: %s)\n", sz, hash.ToString()); - return false; - } - - auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME, g_orphan_list.size(), sz}); - assert(ret.second); - g_orphan_list.push_back(ret.first); - for (const CTxIn& txin : tx->vin) { - mapOrphanTransactionsByPrev[txin.prevout].insert(ret.first); - } - - AddToCompactExtraTransactions(tx); - - nMapOrphanTransactionsSize += sz; - - LogPrint(BCLog::MEMPOOL, "stored orphan tx %s (mapsz %u outsz %u)\n", hash.ToString(), - mapOrphanTransactions.size(), mapOrphanTransactionsByPrev.size()); - statsClient.inc("transactions.orphans.add", 1.0f); - statsClient.gauge("transactions.orphans", mapOrphanTransactions.size()); - return true; -} - -int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) -{ - std::map::iterator it = mapOrphanTransactions.find(hash); - if (it == mapOrphanTransactions.end()) - return 0; - for (const CTxIn& txin : it->second.tx->vin) - { - auto itPrev = mapOrphanTransactionsByPrev.find(txin.prevout); - if (itPrev == mapOrphanTransactionsByPrev.end()) - continue; - itPrev->second.erase(it); - if (itPrev->second.empty()) - mapOrphanTransactionsByPrev.erase(itPrev); - } - - size_t old_pos = it->second.list_pos; - assert(g_orphan_list[old_pos] == it); - if (old_pos + 1 != g_orphan_list.size()) { - // Unless we're deleting the last entry in g_orphan_list, move the last - // entry to the position we're deleting. - auto it_last = g_orphan_list.back(); - g_orphan_list[old_pos] = it_last; - it_last->second.list_pos = old_pos; - } - g_orphan_list.pop_back(); - - assert(nMapOrphanTransactionsSize >= it->second.nTxSize); - nMapOrphanTransactionsSize -= it->second.nTxSize; - mapOrphanTransactions.erase(it); - statsClient.inc("transactions.orphans.remove", 1.0f); - statsClient.gauge("transactions.orphans", mapOrphanTransactions.size()); - return 1; -} - -void EraseOrphansFor(NodeId peer) -{ - LOCK(g_cs_orphans); - int nErased = 0; - std::map::iterator iter = mapOrphanTransactions.begin(); - while (iter != mapOrphanTransactions.end()) - { - std::map::iterator maybeErase = iter++; // increment to avoid iterator becoming invalid - if (maybeErase->second.fromPeer == peer) - { - nErased += EraseOrphanTx(maybeErase->second.tx->GetHash()); - } - } - if (nErased > 0) LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx from peer=%d\n", nErased, peer); -} - - -unsigned int LimitOrphanTxSize(unsigned int nMaxOrphansSize) -{ - LOCK(g_cs_orphans); - - unsigned int nEvicted = 0; - static int64_t nNextSweep; - int64_t nNow = GetTime(); - if (nNextSweep <= nNow) { - // Sweep out expired orphan pool entries: - int nErased = 0; - int64_t nMinExpTime = nNow + ORPHAN_TX_EXPIRE_TIME - ORPHAN_TX_EXPIRE_INTERVAL; - std::map::iterator iter = mapOrphanTransactions.begin(); - while (iter != mapOrphanTransactions.end()) - { - std::map::iterator maybeErase = iter++; - if (maybeErase->second.nTimeExpire <= nNow) { - nErased += EraseOrphanTx(maybeErase->second.tx->GetHash()); - } else { - nMinExpTime = std::min(maybeErase->second.nTimeExpire, nMinExpTime); - } - } - // Sweep again 5 minutes after the next entry that expires in order to batch the linear scan. - nNextSweep = nMinExpTime + ORPHAN_TX_EXPIRE_INTERVAL; - if (nErased > 0) LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx due to expiration\n", nErased); - } - FastRandomContext rng; - while (!mapOrphanTransactions.empty() && nMapOrphanTransactionsSize > nMaxOrphansSize) - { - // Evict a random orphan: - size_t randompos = rng.randrange(g_orphan_list.size()); - EraseOrphanTx(g_orphan_list[randompos]->first); - ++nEvicted; - } - return nEvicted; -} - void PeerManagerImpl::Misbehaving(const NodeId pnode, const int howmuch, const std::string& message) { assert(howmuch > 0); @@ -2022,52 +1889,17 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler) */ void PeerManagerImpl::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) { - { - LOCK2(cs_main, g_cs_orphans); - - std::vector vOrphanErase; - std::set orphanWorkSet; - - for (const CTransactionRef& ptx : pblock->vtx) { - const CTransaction& tx = *ptx; - - // Which orphan pool entries we should reprocess and potentially try to accept into mempool again? - for (size_t i = 0; i < tx.vin.size(); i++) { - auto itByPrev = mapOrphanTransactionsByPrev.find(COutPoint(tx.GetHash(), (uint32_t)i)); - if (itByPrev == mapOrphanTransactionsByPrev.end()) continue; - for (const auto& elem : itByPrev->second) { - orphanWorkSet.insert(elem->first); - } - } + LOCK2(::cs_main, g_cs_orphans); - // Which orphan pool entries must we evict? - for (const auto& txin : tx.vin) { - auto itByPrev = mapOrphanTransactionsByPrev.find(txin.prevout); - if (itByPrev == mapOrphanTransactionsByPrev.end()) continue; - for (auto mi = itByPrev->second.begin(); mi != itByPrev->second.end(); ++mi) { - const CTransaction& orphanTx = *(*mi)->second.tx; - const uint256& orphanHash = orphanTx.GetHash(); - vOrphanErase.push_back(orphanHash); - } - } - } - - // Erase orphan transactions included or precluded by this block - if (vOrphanErase.size()) { - int nErased = 0; - for (const uint256& orphanHash : vOrphanErase) { - nErased += EraseOrphanTx(orphanHash); - } - LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx included or conflicted by block\n", nErased); - } + auto orphanWorkSet = m_orphanage.GetCandidatesForBlock(*pblock); + while (!orphanWorkSet.empty()) { + LogPrint(BCLog::MEMPOOL, "Trying to process %d orphans\n", orphanWorkSet.size()); + ProcessOrphanTx(orphanWorkSet); + } - while (!orphanWorkSet.empty()) { - LogPrint(BCLog::MEMPOOL, "Trying to process %d orphans\n", orphanWorkSet.size()); - ProcessOrphanTx(orphanWorkSet); - } + m_orphanage.EraseForBlock(*pblock); + m_last_tip_update = GetTime(); - m_last_tip_update = GetTime(); - } { LOCK(m_recent_confirmed_transactions_mutex); for (const auto& ptx : pblock->vtx) { @@ -2090,12 +1922,6 @@ void PeerManagerImpl::BlockDisconnected(const std::shared_ptr &blo m_recent_confirmed_transactions.reset(); } -// All of the following cache a recent block, and are protected by cs_most_recent_block -static RecursiveMutex cs_most_recent_block; -static std::shared_ptr most_recent_block GUARDED_BY(cs_most_recent_block); -static std::shared_ptr most_recent_compact_block GUARDED_BY(cs_most_recent_block); -static uint256 most_recent_block_hash GUARDED_BY(cs_most_recent_block); - /** * Maintain state about the best-seen block and fast-announce a compact block * to compatible peers. @@ -2106,20 +1932,19 @@ void PeerManagerImpl::NewPoWValidBlock(const CBlockIndex *pindex, const std::sha LOCK(cs_main); - static int nHighestFastAnnounce = 0; - if (pindex->nHeight <= nHighestFastAnnounce) + if (pindex->nHeight <= m_highest_fast_announce) return; - nHighestFastAnnounce = pindex->nHeight; + m_highest_fast_announce = pindex->nHeight; uint256 hashBlock(pblock->GetHash()); const std::shared_future lazy_ser{ std::async(std::launch::deferred, [&] { return msgMaker.Make(NetMsgType::CMPCTBLOCK, *pcmpctblock); })}; { - LOCK(cs_most_recent_block); - most_recent_block_hash = hashBlock; - most_recent_block = pblock; - most_recent_compact_block = pcmpctblock; + LOCK(m_most_recent_block_mutex); + m_most_recent_block_hash = hashBlock; + m_most_recent_block = pblock; + m_most_recent_compact_block = pcmpctblock; } m_connman.ForEachNode([this, pindex, &lazy_ser, &hashBlock](CNode* pnode) { @@ -2241,10 +2066,7 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) m_recent_rejects.reset(); } - { - LOCK(g_cs_orphans); - if (mapOrphanTransactions.count(inv.hash)) return true; - } + if (m_orphanage.HaveTx(inv.hash)) return true; { LOCK(m_recent_confirmed_transactions_mutex); @@ -2428,8 +2250,8 @@ void PeerManagerImpl::RelayAddress(NodeId originator, // Relay to a limited number of other nodes // Use deterministic randomness to send to the same nodes for 24 hours // at a time so the m_addr_knowns of the chosen nodes prevent repeats - uint64_t hashAddr = addr.GetHash(); - const CSipHasher hasher = m_connman.GetDeterministicRandomizer(RANDOMIZER_ID_ADDRESS_RELAY).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24 * 60 * 60)); + const uint64_t hashAddr{addr.GetHash()}; + const CSipHasher hasher{m_connman.GetDeterministicRandomizer(RANDOMIZER_ID_ADDRESS_RELAY).Write(hashAddr).Write((GetTime() + hashAddr) / (24 * 60 * 60))}; FastRandomContext insecure_rand; // Relay reachable addresses to 2 peers. Unreachable addresses are relayed randomly to 1 or 2 peers. @@ -2463,9 +2285,9 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv& std::shared_ptr a_recent_block; std::shared_ptr a_recent_compact_block; { - LOCK(cs_most_recent_block); - a_recent_block = most_recent_block; - a_recent_compact_block = most_recent_compact_block; + LOCK(m_most_recent_block_mutex); + a_recent_block = m_most_recent_block; + a_recent_compact_block = m_most_recent_compact_block; } bool need_activate_chain = false; @@ -3144,40 +2966,32 @@ void PeerManagerImpl::ProcessOrphanTx(std::set& orphan_work_set) const uint256 orphanHash = *orphan_work_set.begin(); orphan_work_set.erase(orphan_work_set.begin()); - auto orphan_it = mapOrphanTransactions.find(orphanHash); - if (orphan_it == mapOrphanTransactions.end()) continue; + const auto [porphanTx, from_peer] = m_orphanage.GetTx(orphanHash); + if (porphanTx == nullptr) continue; - const CTransactionRef porphanTx = orphan_it->second.tx; const MempoolAcceptResult result = AcceptToMemoryPool(m_chainman.ActiveChainstate(), m_mempool, porphanTx, false /* bypass_limits */); const TxValidationState& state = result.m_state; if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); RelayTransaction(porphanTx->GetHash()); - for (unsigned int i = 0; i < porphanTx->vout.size(); i++) { - auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); - if (it_by_prev != mapOrphanTransactionsByPrev.end()) { - for (const auto& elem : it_by_prev->second) { - orphan_work_set.insert(elem->first); - } - } - } - EraseOrphanTx(orphanHash); + m_orphanage.AddChildrenToWorkSet(*porphanTx, orphan_work_set); + m_orphanage.EraseTx(orphanHash); break; } else if (state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) { if (state.IsInvalid()) { LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s from peer=%d. %s\n", orphanHash.ToString(), - orphan_it->second.fromPeer, + from_peer, state.ToString()); // Maybe punish peer that gave us an invalid orphan tx - MaybePunishNodeForTx(orphan_it->second.fromPeer, state); + MaybePunishNodeForTx(from_peer, state); } // Has inputs but not accepted to mempool // Probably non-standard or insufficient fee LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); m_recent_rejects.insert(orphanHash); - EraseOrphanTx(orphanHash); + m_orphanage.EraseTx(orphanHash); break; } } @@ -3442,6 +3256,8 @@ void PeerManagerImpl::ProcessMessage( const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) { + AssertLockHeld(g_msgproc_mutex); + LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); statsClient.inc("message.received." + SanitizeString(msg_type), 1.0f); @@ -3593,7 +3409,9 @@ void PeerManagerImpl::ProcessMessage( // Potentially mark this peer as a preferred download peer. { LOCK(cs_main); - UpdatePreferredDownload(pfrom, *peer, State(pfrom.GetId())); + CNodeState* state = State(pfrom.GetId()); + state->fPreferredDownload = (!pfrom.IsInboundConn() || pfrom.HasPermission(NetPermissionFlags::NoBan)) && !pfrom.IsAddrFetchConn() && CanServeBlocks(*peer); + m_num_preferred_download_peers += state->fPreferredDownload; } // Self advertisement & GETADDR logic @@ -3980,7 +3798,7 @@ void PeerManagerImpl::ProcessMessage( } bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); if (allowWhileInIBD || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) { - RequestObject(State(pfrom.GetId()), inv, current_time, is_masternode); + RequestObject(pfrom.GetId(), inv, current_time, is_masternode); } } } @@ -4062,8 +3880,8 @@ void PeerManagerImpl::ProcessMessage( { std::shared_ptr a_recent_block; { - LOCK(cs_most_recent_block); - a_recent_block = most_recent_block; + LOCK(m_most_recent_block_mutex); + a_recent_block = m_most_recent_block; } BlockValidationState state; if (!m_chainman.ActiveChainstate().ActivateBestChain(state, a_recent_block)) { @@ -4116,10 +3934,10 @@ void PeerManagerImpl::ProcessMessage( std::shared_ptr recent_block; { - LOCK(cs_most_recent_block); - if (most_recent_block_hash == req.blockhash) - recent_block = most_recent_block; - // Unlock cs_most_recent_block to avoid cs_main lock inversion + LOCK(m_most_recent_block_mutex); + if (m_most_recent_block_hash == req.blockhash) + recent_block = m_most_recent_block; + // Unlock m_most_recent_block_mutex to avoid cs_main lock inversion } if (recent_block) { SendBlockTransactions(pfrom, *recent_block, req); @@ -4261,6 +4079,11 @@ void PeerManagerImpl::ProcessMessage( } if (msg_type == NetMsgType::TX || msg_type == NetMsgType::DSTX) { + // Stop processing the transaction early if we are still in IBD since we don't + // have enough information to validate it yet. Sending unsolicited transactions + // is not considered a protocol violation, so don't punish the peer. + if (m_chainman.ActiveChainstate().IsInitialBlockDownload()) return; + CTransactionRef ptx; CCoinJoinBroadcastTx dstx; int nInvType = MSG_TX; @@ -4325,15 +4148,7 @@ void PeerManagerImpl::ProcessMessage( m_mempool.check(m_chainman.ActiveChainstate()); RelayTransaction(tx.GetHash()); - - for (unsigned int i = 0; i < tx.vout.size(); i++) { - auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i)); - if (it_by_prev != mapOrphanTransactionsByPrev.end()) { - for (const auto& elem : it_by_prev->second) { - peer->m_orphan_work_set.insert(elem->first); - } - } - } + m_orphanage.AddChildrenToWorkSet(tx, peer->m_orphan_work_set); pfrom.m_last_tx_time = GetTime(); @@ -4371,19 +4186,22 @@ void PeerManagerImpl::ProcessMessage( for (const uint256& parent_txid : unique_parents) { CInv _inv(MSG_TX, parent_txid); AddKnownInv(*peer, _inv.hash); - if (!AlreadyHave(_inv)) RequestObject(State(pfrom.GetId()), _inv, current_time, is_masternode); + if (!AlreadyHave(_inv)) RequestObject(pfrom.GetId(), _inv, current_time, is_masternode); // We don't know if the previous tx was a regular or a mixing one, try both CInv _inv2(MSG_DSTX, parent_txid); AddKnownInv(*peer, _inv2.hash); - if (!AlreadyHave(_inv2)) RequestObject(State(pfrom.GetId()), _inv2, current_time, is_masternode); + if (!AlreadyHave(_inv2)) RequestObject(pfrom.GetId(), _inv2, current_time, is_masternode); } - AddOrphanTx(ptx, pfrom.GetId()); - // DoS prevention: do not allow mapOrphanTransactions to grow unbounded (see CVE-2012-3789) + if (m_orphanage.AddTx(ptx, pfrom.GetId())) { + AddToCompactExtraTransactions(ptx); + } + + // DoS prevention: do not allow m_orphans to grow unbounded (see CVE-2012-3789) unsigned int nMaxOrphanTxSize = (unsigned int)std::max((int64_t)0, gArgs.GetArg("-maxorphantxsize", DEFAULT_MAX_ORPHAN_TRANSACTIONS_SIZE)) * 1000000; - unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTxSize); + unsigned int nEvicted = m_orphanage.LimitOrphans(nMaxOrphanTxSize); if (nEvicted > 0) { - LogPrint(BCLog::MEMPOOL, "mapOrphan overflow, removed %u tx\n", nEvicted); + LogPrint(BCLog::MEMPOOL, "orphanage overflow, removed %u tx\n", nEvicted); } } else { LogPrint(BCLog::MEMPOOL, "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString()); @@ -5161,6 +4979,8 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interruptMsgProc) { + AssertLockHeld(g_msgproc_mutex); + bool fMoreWork = false; PeerRef peer = GetPeerRef(pfrom->GetId()); @@ -5487,13 +5307,13 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros FastRandomContext insecure_rand; PushAddress(peer, local_addr, insecure_rand); } - peer.m_next_local_addr_send = PoissonNextSend(current_time, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL); + peer.m_next_local_addr_send = GetExponentialRand(current_time, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL); } // We sent an `addr` message to this peer recently. Nothing more to do. if (current_time <= peer.m_next_addr_send) return; - peer.m_next_addr_send = PoissonNextSend(current_time, AVG_ADDRESS_BROADCAST_INTERVAL); + peer.m_next_addr_send = GetExponentialRand(current_time, AVG_ADDRESS_BROADCAST_INTERVAL); if (!Assume(peer.m_addrs_to_send.size() <= MAX_ADDR_TO_SEND)) { // Should be impossible since we always check size before adding to @@ -5503,7 +5323,7 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros // Remove addr records that the peer already knows about, and add new // addrs to the m_addr_known filter on the same pass. - auto addr_already_known = [&peer](const CAddress& addr) { + auto addr_already_known = [&peer](const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) { bool ret = peer.m_addr_known->contains(addr.GetKey()); if (!ret) peer.m_addr_known->insert(addr.GetKey()); return ret; @@ -5569,6 +5389,8 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer) bool PeerManagerImpl::SendMessages(CNode* pto) { + AssertLockHeld(g_msgproc_mutex); + assert(m_llmq_ctx); const bool is_masternode = m_mn_activeman != nullptr; @@ -5629,7 +5451,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // the latest blocks is from an inbound peer, we have to be sure to // eventually download it (and not just wait indefinitely for an // outbound peer to have it). - if (nPreferredDownload == 0 || mapBlocksInFlight.empty()) { + if (m_num_preferred_download_peers == 0 || mapBlocksInFlight.empty()) { sync_blocks_and_headers_from_peer = true; } } @@ -5748,9 +5570,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto) bool fGotBlockFromCache = false; { - LOCK(cs_most_recent_block); - if (most_recent_block_hash == pBestIndex->GetBlockHash()) { - m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::CMPCTBLOCK, *most_recent_compact_block)); + LOCK(m_most_recent_block_mutex); + if (m_most_recent_block_hash == pBestIndex->GetBlockHash()) { + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::CMPCTBLOCK, *m_most_recent_compact_block)); fGotBlockFromCache = true; } } @@ -5868,12 +5690,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (peer->m_tx_relay->m_next_inv_send_time < current_time) { fSendTrickle = true; if (pto->IsInboundConn()) { - peer->m_tx_relay->m_next_inv_send_time = m_connman.PoissonNextSendInbound(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL); + peer->m_tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL); } else { // Use half the delay for Masternode outbound peers, as there is less privacy concern for them. peer->m_tx_relay->m_next_inv_send_time = pto->GetVerifiedProRegTxHash().IsNull() ? - PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) : - PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2); + GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) : + GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2); } } @@ -6023,7 +5845,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (state.fSyncStarted && state.m_headers_sync_timeout < std::chrono::microseconds::max()) { // Detect whether this is a stalling initial-headers-sync peer if (m_chainman.m_best_header->GetBlockTime() <= GetAdjustedTime() - nMaxTipAge) { - if (current_time > state.m_headers_sync_timeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) { + if (current_time > state.m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) { // Disconnect a peer (without NetPermissionFlags::NoBan permission) if it is our only sync peer, // and we have others we could be using instead. // Note: If all our peers are inbound, then we won't @@ -6152,17 +5974,4 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } } // release cs_main return true; -} - -class CNetProcessingCleanup -{ -public: - CNetProcessingCleanup() {} - ~CNetProcessingCleanup() { - // orphan transactions - mapOrphanTransactions.clear(); - mapOrphanTransactionsByPrev.clear(); - nMapOrphanTransactionsSize = 0; - } -}; -static CNetProcessingCleanup instance_of_cnetprocessingcleanup; +} \ No newline at end of file diff --git a/src/net_processing.h b/src/net_processing.h index b8fd7090b6d0b..295fcc1d6e0cd 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -29,7 +29,6 @@ struct CJContext; struct LLMQContext; extern RecursiveMutex cs_main; -extern RecursiveMutex g_cs_orphans; /** Default for -maxorphantxsize, maximum size in megabytes the orphan map can grow before entries are removed */ static const unsigned int DEFAULT_MAX_ORPHAN_TRANSACTIONS_SIZE = 10; // this allows around 100 TXs of max size (and many more of normal size) @@ -126,9 +125,17 @@ class PeerManager : public CValidationInterface, public NetEventsInterface /** Process a single message from a peer. Public for fuzz testing */ virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, - const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) = 0; + const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; + + /** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */ + virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0; virtual bool IsBanned(NodeId pnode) = 0; + + virtual void EraseObjectRequest(NodeId nodeid, const CInv& inv) = 0; + virtual void RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, + bool is_masternode, bool fForce = false) = 0; + virtual size_t GetRequestedObjectCount(NodeId nodeid) const = 0; }; #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/random.cpp b/src/random.cpp index 614ddeb11cb19..3ccbe82c4a1de 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -22,6 +22,7 @@ #include // for GetTimeMicros() #include +#include #include #include @@ -724,3 +725,9 @@ void RandomInit() ReportHardwareRand(); } + +std::chrono::microseconds GetExponentialRand(std::chrono::microseconds now, std::chrono::seconds average_interval) +{ + double unscaled = -std::log1p(GetRand(uint64_t{1} << 48) * -0.0000000000000035527136788 /* -1/2^48 */); + return now + std::chrono::duration_cast(unscaled * average_interval + 0.5us); +} diff --git a/src/random.h b/src/random.h index d461318e6cdaa..71cdbf4a11469 100644 --- a/src/random.h +++ b/src/random.h @@ -85,6 +85,18 @@ D GetRandomDuration(typename std::common_type::type max) noexcept }; constexpr auto GetRandMicros = GetRandomDuration; constexpr auto GetRandMillis = GetRandomDuration; + +/** + * Return a timestamp in the future sampled from an exponential distribution + * (https://en.wikipedia.org/wiki/Exponential_distribution). This distribution + * is memoryless and should be used for repeated network events (e.g. sending a + * certain type of message) to minimize leaking information to observers. + * + * The probability of an event occuring before time x is 1 - e^-(x/a) where a + * is the average interval between events. + * */ +std::chrono::microseconds GetExponentialRand(std::chrono::microseconds now, std::chrono::seconds average_interval); + int GetRandInt(int nMax) noexcept; uint256 GetRandHash() noexcept; diff --git a/src/spork.cpp b/src/spork.cpp index 6fd7a6388b2e7..2640989efcd2d 100644 --- a/src/spork.cpp +++ b/src/spork.cpp @@ -145,12 +145,9 @@ PeerMsgRet CSporkManager::ProcessSpork(const CNode& peer, PeerManager& peerman, uint256 hash = spork.GetHash(); - std::string strLogMsg; - { - LOCK(cs_main); - EraseObjectRequest(peer.GetId(), CInv(MSG_SPORK, hash)); - strLogMsg = strprintf("SPORK -- hash: %s id: %d value: %10d peer=%d", hash.ToString(), spork.nSporkID, spork.nValue, peer.GetId()); - } + WITH_LOCK(::cs_main, peerman.EraseObjectRequest(peer.GetId(), CInv(MSG_SPORK, hash))); + std::string strLogMsg{strprintf("SPORK -- hash: %s id: %d value: %10d peer=%d", hash.ToString(), spork.nSporkID, + spork.nValue, peer.GetId())}; if (spork.nTimeSigned > GetAdjustedTime() + 2 * 60 * 60) { LogPrint(BCLog::SPORK, "CSporkManager::ProcessSpork -- ERROR: too far into the future\n"); diff --git a/src/support/allocators/pool.h b/src/support/allocators/pool.h new file mode 100644 index 0000000000000..c8e70ebacff6c --- /dev/null +++ b/src/support/allocators/pool.h @@ -0,0 +1,349 @@ +// Copyright (c) 2022 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_SUPPORT_ALLOCATORS_POOL_H +#define BITCOIN_SUPPORT_ALLOCATORS_POOL_H + +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * A memory resource similar to std::pmr::unsynchronized_pool_resource, but + * optimized for node-based containers. It has the following properties: + * + * * Owns the allocated memory and frees it on destruction, even when deallocate + * has not been called on the allocated blocks. + * + * * Consists of a number of pools, each one for a different block size. + * Each pool holds blocks of uniform size in a freelist. + * + * * Exhausting memory in a freelist causes a new allocation of a fixed size chunk. + * This chunk is used to carve out blocks. + * + * * Block sizes or alignments that can not be served by the pools are allocated + * and deallocated by operator new(). + * + * PoolResource is not thread-safe. It is intended to be used by PoolAllocator. + * + * @tparam MAX_BLOCK_SIZE_BYTES Maximum size to allocate with the pool. If larger + * sizes are requested, allocation falls back to new(). + * + * @tparam ALIGN_BYTES Required alignment for the allocations. + * + * An example: If you create a PoolResource<128, 8>(262144) and perform a bunch of + * allocations and deallocate 2 blocks with size 8 bytes, and 3 blocks with size 16, + * the members will look like this: + * + * m_free_lists m_allocated_chunks + * ┌───┐ ┌───┐ ┌────────────-------──────┐ + * │ │ blocks │ ├─►│ 262144 B │ + * │ │ ┌─────┐ ┌─────┐ └─┬─┘ └────────────-------──────┘ + * │ 1 ├─►│ 8 B ├─►│ 8 B │ │ + * │ │ └─────┘ └─────┘ : + * │ │ │ + * │ │ ┌─────┐ ┌─────┐ ┌─────┐ ▼ + * │ 2 ├─►│16 B ├─►│16 B ├─►│16 B │ ┌───┐ ┌─────────────────────────┐ + * │ │ └─────┘ └─────┘ └─────┘ │ ├─►│ ▲ │ ▲ + * │ │ └───┘ └──────────┬──────────────┘ │ + * │ . │ │ m_available_memory_end + * │ . │ m_available_memory_it + * │ . │ + * │ │ + * │ │ + * │16 │ + * └───┘ + * + * Here m_free_lists[1] holds the 2 blocks of size 8 bytes, and m_free_lists[2] + * holds the 3 blocks of size 16. The blocks came from the data stored in the + * m_allocated_chunks list. Each chunk has bytes 262144. The last chunk has still + * some memory available for the blocks, and when m_available_memory_it is at the + * end, a new chunk will be allocated and added to the list. + */ +template +class PoolResource final +{ + static_assert(ALIGN_BYTES > 0, "ALIGN_BYTES must be nonzero"); + static_assert((ALIGN_BYTES & (ALIGN_BYTES - 1)) == 0, "ALIGN_BYTES must be a power of two"); + + /** + * In-place linked list of the allocations, used for the freelist. + */ + struct ListNode { + ListNode* m_next; + + explicit ListNode(ListNode* next) : m_next(next) {} + }; + static_assert(std::is_trivially_destructible_v, "Make sure we don't need to manually call a destructor"); + + /** + * Internal alignment value. The larger of the requested ALIGN_BYTES and alignof(FreeList). + */ + static constexpr std::size_t ELEM_ALIGN_BYTES = std::max(alignof(ListNode), ALIGN_BYTES); + static_assert((ELEM_ALIGN_BYTES & (ELEM_ALIGN_BYTES - 1)) == 0, "ELEM_ALIGN_BYTES must be a power of two"); + static_assert(sizeof(ListNode) <= ELEM_ALIGN_BYTES, "Units of size ELEM_SIZE_ALIGN need to be able to store a ListNode"); + static_assert((MAX_BLOCK_SIZE_BYTES & (ELEM_ALIGN_BYTES - 1)) == 0, "MAX_BLOCK_SIZE_BYTES needs to be a multiple of the alignment."); + + /** + * Size in bytes to allocate per chunk + */ + const size_t m_chunk_size_bytes; + + /** + * Contains all allocated pools of memory, used to free the data in the destructor. + */ + std::list m_allocated_chunks{}; + + /** + * Single linked lists of all data that came from deallocating. + * m_free_lists[n] will serve blocks of size n*ELEM_ALIGN_BYTES. + */ + std::array m_free_lists{}; + + /** + * Points to the beginning of available memory for carving out allocations. + */ + std::byte* m_available_memory_it = nullptr; + + /** + * Points to the end of available memory for carving out allocations. + * + * That member variable is redundant, and is always equal to `m_allocated_chunks.back() + m_chunk_size_bytes` + * whenever it is accessed, but `m_available_memory_end` caches this for clarity and efficiency. + */ + std::byte* m_available_memory_end = nullptr; + + /** + * How many multiple of ELEM_ALIGN_BYTES are necessary to fit bytes. We use that result directly as an index + * into m_free_lists. Round up for the special case when bytes==0. + */ + [[nodiscard]] static constexpr std::size_t NumElemAlignBytes(std::size_t bytes) + { + return (bytes + ELEM_ALIGN_BYTES - 1) / ELEM_ALIGN_BYTES + (bytes == 0); + } + + /** + * True when it is possible to make use of the freelist + */ + [[nodiscard]] static constexpr bool IsFreeListUsable(std::size_t bytes, std::size_t alignment) + { + return alignment <= ELEM_ALIGN_BYTES && bytes <= MAX_BLOCK_SIZE_BYTES; + } + + /** + * Replaces node with placement constructed ListNode that points to the previous node + */ + void PlacementAddToList(void* p, ListNode*& node) + { + node = new (p) ListNode{node}; + } + + /** + * Allocate one full memory chunk which will be used to carve out allocations. + * Also puts any leftover bytes into the freelist. + * + * Precondition: leftover bytes are either 0 or few enough to fit into a place in the freelist + */ + void AllocateChunk() + { + // if there is still any available memory left, put it into the freelist. + size_t remaining_available_bytes = std::distance(m_available_memory_it, m_available_memory_end); + if (0 != remaining_available_bytes) { + PlacementAddToList(m_available_memory_it, m_free_lists[remaining_available_bytes / ELEM_ALIGN_BYTES]); + } + + void* storage = ::operator new (m_chunk_size_bytes, std::align_val_t{ELEM_ALIGN_BYTES}); + m_available_memory_it = new (storage) std::byte[m_chunk_size_bytes]; + m_available_memory_end = m_available_memory_it + m_chunk_size_bytes; + m_allocated_chunks.emplace_back(m_available_memory_it); + } + + /** + * Access to internals for testing purpose only + */ + friend class PoolResourceTester; + +public: + /** + * Construct a new PoolResource object which allocates the first chunk. + * chunk_size_bytes will be rounded up to next multiple of ELEM_ALIGN_BYTES. + */ + explicit PoolResource(std::size_t chunk_size_bytes) + : m_chunk_size_bytes(NumElemAlignBytes(chunk_size_bytes) * ELEM_ALIGN_BYTES) + { + assert(m_chunk_size_bytes >= MAX_BLOCK_SIZE_BYTES); + AllocateChunk(); + } + + /** + * Construct a new Pool Resource object, defaults to 2^18=262144 chunk size. + */ + PoolResource() : PoolResource(262144) {} + + /** + * Disable copy & move semantics, these are not supported for the resource. + */ + PoolResource(const PoolResource&) = delete; + PoolResource& operator=(const PoolResource&) = delete; + PoolResource(PoolResource&&) = delete; + PoolResource& operator=(PoolResource&&) = delete; + + /** + * Deallocates all memory allocated associated with the memory resource. + */ + ~PoolResource() + { + for (std::byte* chunk : m_allocated_chunks) { + std::destroy(chunk, chunk + m_chunk_size_bytes); + ::operator delete ((void*)chunk, std::align_val_t{ELEM_ALIGN_BYTES}); + } + } + + /** + * Allocates a block of bytes. If possible the freelist is used, otherwise allocation + * is forwarded to ::operator new(). + */ + void* Allocate(std::size_t bytes, std::size_t alignment) + { + if (IsFreeListUsable(bytes, alignment)) { + const std::size_t num_alignments = NumElemAlignBytes(bytes); + if (nullptr != m_free_lists[num_alignments]) { + // we've already got data in the pool's freelist, unlink one element and return the pointer + // to the unlinked memory. Since FreeList is trivially destructible we can just treat it as + // uninitialized memory. + return std::exchange(m_free_lists[num_alignments], m_free_lists[num_alignments]->m_next); + } + + // freelist is empty: get one allocation from allocated chunk memory. + const std::ptrdiff_t round_bytes = static_cast(num_alignments * ELEM_ALIGN_BYTES); + if (round_bytes > m_available_memory_end - m_available_memory_it) { + // slow path, only happens when a new chunk needs to be allocated + AllocateChunk(); + } + + // Make sure we use the right amount of bytes for that freelist (might be rounded up), + return std::exchange(m_available_memory_it, m_available_memory_it + round_bytes); + } + + // Can't use the pool => use operator new() + return ::operator new (bytes, std::align_val_t{alignment}); + } + + /** + * Returns a block to the freelists, or deletes the block when it did not come from the chunks. + */ + void Deallocate(void* p, std::size_t bytes, std::size_t alignment) noexcept + { + if (IsFreeListUsable(bytes, alignment)) { + const std::size_t num_alignments = NumElemAlignBytes(bytes); + // put the memory block into the linked list. We can placement construct the FreeList + // into the memory since we can be sure the alignment is correct. + PlacementAddToList(p, m_free_lists[num_alignments]); + } else { + // Can't use the pool => forward deallocation to ::operator delete(). + ::operator delete (p, std::align_val_t{alignment}); + } + } + + /** + * Number of allocated chunks + */ + [[nodiscard]] std::size_t NumAllocatedChunks() const + { + return m_allocated_chunks.size(); + } + + /** + * Size in bytes to allocate per chunk, currently hardcoded to a fixed size. + */ + [[nodiscard]] size_t ChunkSizeBytes() const + { + return m_chunk_size_bytes; + } +}; + + +/** + * Forwards all allocations/deallocations to the PoolResource. + */ +template +class PoolAllocator +{ + PoolResource* m_resource; + + template + friend class PoolAllocator; + +public: + using value_type = T; + using ResourceType = PoolResource; + + /** + * Not explicit so we can easily construct it with the correct resource + */ + PoolAllocator(ResourceType* resource) noexcept + : m_resource(resource) + { + } + + PoolAllocator(const PoolAllocator& other) noexcept = default; + PoolAllocator& operator=(const PoolAllocator& other) noexcept = default; + + template + PoolAllocator(const PoolAllocator& other) noexcept + : m_resource(other.resource()) + { + } + + /** + * The rebind struct here is mandatory because we use non type template arguments for + * PoolAllocator. See https://en.cppreference.com/w/cpp/named_req/Allocator#cite_note-2 + */ + template + struct rebind { + using other = PoolAllocator; + }; + + /** + * Forwards each call to the resource. + */ + T* allocate(size_t n) + { + return static_cast(m_resource->Allocate(n * sizeof(T), alignof(T))); + } + + /** + * Forwards each call to the resource. + */ + void deallocate(T* p, size_t n) noexcept + { + m_resource->Deallocate(p, n * sizeof(T), alignof(T)); + } + + ResourceType* resource() const noexcept + { + return m_resource; + } +}; + +template +bool operator==(const PoolAllocator& a, + const PoolAllocator& b) noexcept +{ + return a.resource() == b.resource(); +} + +template +bool operator!=(const PoolAllocator& a, + const PoolAllocator& b) noexcept +{ + return !(a == b); +} + +#endif // BITCOIN_SUPPORT_ALLOCATORS_POOL_H diff --git a/src/test/coins_tests.cpp b/src/test/coins_tests.cpp index 2c8e0b10133db..75b25143e2884 100644 --- a/src/test/coins_tests.cpp +++ b/src/test/coins_tests.cpp @@ -6,6 +6,7 @@ #include #include