From ab11e0f998cf41f62e4f3601eb21f134c6f4994a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 6 Sep 2024 08:27:39 +0000 Subject: [PATCH] merge bitcoin#27324: bitcoin#27257 follow-ups --- src/net.cpp | 31 +++++++++++++++++-------------- src/net.h | 24 ++++++++++++++---------- src/net_processing.cpp | 2 +- src/test/fuzz/connman.cpp | 1 - src/test/util/net.cpp | 2 +- 5 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 9c9e8a8d38e86..f09ed6268621b 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -617,7 +617,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo pszDest ? pszDest : "", conn_type, /*inbound_onion=*/false, - CNodeOptions{ .i2p_sam_session = std::move(i2p_transient_session) }); + CNodeOptions{ + .i2p_sam_session = std::move(i2p_transient_session), + .recv_flood_size = nReceiveFloodSize, + }); pnode->AddRef(); ::g_stats_client->inc("peers.connect", 1.0f); @@ -1115,7 +1118,7 @@ bool CConnman::AttemptToEvictConnection() .m_is_local = node->addr.IsLocal(), .m_network = node->ConnectedThroughNetwork(), .m_noban = node->HasPermission(NetPermissionFlags::NoBan), - .m_conn_type = node->GetConnectionType(), + .m_conn_type = node->m_conn_type, }; vEvictionCandidates.push_back(candidate); } @@ -1279,8 +1282,9 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, ConnectionType::INBOUND, inbound_onion, CNodeOptions{ - .permission_flags = permission_flags, - .prefer_evict = discouraged, + .permission_flags = permission_flags, + .prefer_evict = discouraged, + .recv_flood_size = nReceiveFloodSize, }); pnode->AddRef(); // If this flag is present, the user probably expect that RPC and QT report it as whitelisted (backward compatibility) @@ -1339,7 +1343,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ // Count existing connections int existing_connections = WITH_LOCK(m_nodes_mutex, - return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; });); + return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; });); // Max connections of specified type already exist if (max_connections != std::nullopt && existing_connections >= max_connections) return false; @@ -2088,7 +2092,7 @@ size_t CConnman::SocketRecvData(CNode *pnode) } RecordBytesRecv(nBytes); if (notify) { - pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize); + pnode->MarkReceivedMsgsForProcessing(); WakeMessageHandler(); } } @@ -2464,7 +2468,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe if (pnode->IsFullOutboundConn() && pnode->ConnectedThroughNetwork() == Network::NET_ONION) nOutboundOnionRelay++; // Make sure our persistent outbound slots belong to different netgroups. - switch (pnode->GetConnectionType()) { + switch (pnode->m_conn_type) { // We currently don't take inbound connections into account. Since they are // free to make, an attacker could make them to prevent us from connecting to // certain peers. @@ -4015,8 +4019,6 @@ ServiceFlags CConnman::GetLocalServices() const return nLocalServices; } -unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; } - CNode::CNode(NodeId idIn, std::shared_ptr sock, const CAddress& addrIn, @@ -4037,9 +4039,10 @@ CNode::CNode(NodeId idIn, m_inbound_onion{inbound_onion}, m_prefer_evict{node_opts.prefer_evict}, nKeyedNetGroup{nKeyedNetGroupIn}, + m_conn_type{conn_type_in}, id{idIn}, nLocalHostNonce{nLocalHostNonceIn}, - m_conn_type{conn_type_in}, + m_recv_flood_size{node_opts.recv_flood_size}, m_i2p_sam_session{std::move(node_opts.i2p_sam_session)} { if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND); @@ -4055,7 +4058,7 @@ CNode::CNode(NodeId idIn, } } -void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) +void CNode::MarkReceivedMsgsForProcessing() { AssertLockNotHeld(m_msg_process_queue_mutex); @@ -4069,10 +4072,10 @@ void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) LOCK(m_msg_process_queue_mutex); m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg); m_msg_process_queue_size += nSizeAdded; - fPauseRecv = m_msg_process_queue_size > recv_flood_size; + fPauseRecv = m_msg_process_queue_size > m_recv_flood_size; } -std::optional> CNode::PollMessage(size_t recv_flood_size) +std::optional> CNode::PollMessage() { LOCK(m_msg_process_queue_mutex); if (m_msg_process_queue.empty()) return std::nullopt; @@ -4081,7 +4084,7 @@ std::optional> CNode::PollMessage(size_t recv_flood // Just take one message msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin()); m_msg_process_queue_size -= msgs.front().m_raw_message_size; - fPauseRecv = m_msg_process_queue_size > recv_flood_size; + fPauseRecv = m_msg_process_queue_size > m_recv_flood_size; return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty()); } diff --git a/src/net.h b/src/net.h index e48aef8702519..0c8c545946b29 100644 --- a/src/net.h +++ b/src/net.h @@ -277,6 +277,14 @@ class CNetMessage { std::string m_type; CNetMessage(CDataStream&& recv_in) : m_recv(std::move(recv_in)) {} + // Only one CNetMessage object will exist for the same message on either + // the receive or processing queue. For performance reasons we therefore + // delete the copy constructor and assignment operator to avoid the + // possibility of copying CNetMessage objects. + CNetMessage(CNetMessage&&) = default; + CNetMessage(const CNetMessage&) = delete; + CNetMessage& operator=(CNetMessage&&) = default; + CNetMessage& operator=(const CNetMessage&) = delete; void SetVersion(int nVersionIn) { @@ -454,6 +462,7 @@ struct CNodeOptions NetPermissionFlags permission_flags = NetPermissionFlags::None; std::unique_ptr i2p_sam_session = nullptr; bool prefer_evict = false; + size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000}; }; /** Information about a peer */ @@ -546,13 +555,10 @@ class CNode std::atomic_bool fHasRecvData{false}; std::atomic_bool fCanSendData{false}; - const ConnectionType& GetConnectionType() const - { - return m_conn_type; - } + const ConnectionType m_conn_type; /** Move all messages from the received queue to the processing queue. */ - void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) + void MarkReceivedMsgsForProcessing() EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); /** Poll the next message from the processing queue of this connection. @@ -560,7 +566,7 @@ class CNode * Returns std::nullopt if the processing queue is empty, or a pair * consisting of the message and a bool that indicates if the processing * queue has more entries. */ - std::optional> PollMessage(size_t recv_flood_size) + std::optional> PollMessage() EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); /** Account for the total size of a sent message in the per msg type connection stats. */ @@ -825,10 +831,10 @@ class CNode private: const NodeId id; const uint64_t nLocalHostNonce; - const ConnectionType m_conn_type; std::atomic m_greatest_common_version{INIT_PROTO_VERSION}; - std::list vRecvMsg; // Used only by SocketHandler thread + const size_t m_recv_flood_size; + std::list vRecvMsg; // Used only by SocketHandler thread Mutex m_msg_process_queue_mutex; std::list m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex); @@ -1257,8 +1263,6 @@ friend class CNode; /** Get a unique deterministic randomizer. */ CSipHasher GetDeterministicRandomizer(uint64_t id) const; - unsigned int GetReceiveFloodSize() const; - void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); /** Return true if we should disconnect the peer for failing an inactivity check. */ diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3213904ac19fd..c69ec237c124a 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -5048,7 +5048,7 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) return false; - auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())}; + auto poll_result{pfrom->PollMessage()}; if (!poll_result) { // No message to process return false; diff --git a/src/test/fuzz/connman.cpp b/src/test/fuzz/connman.cpp index 95f1b6516e20d..ead124552bcd3 100644 --- a/src/test/fuzz/connman.cpp +++ b/src/test/fuzz/connman.cpp @@ -126,7 +126,6 @@ FUZZ_TARGET_INIT(connman, initialize_connman) std::vector stats; connman.GetNodeStats(stats); (void)connman.GetOutboundTargetBytesLeft(); - (void)connman.GetReceiveFloodSize(); (void)connman.GetTotalBytesRecv(); (void)connman.GetTotalBytesSent(); (void)connman.GetTryNewOutboundPeer(); diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index 1b8680750b40c..e9f43e69cf73f 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -68,7 +68,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span msg_by { assert(node.ReceiveMsgBytes(msg_bytes, complete)); if (complete) { - node.MarkReceivedMsgsForProcessing(nReceiveFloodSize); + node.MarkReceivedMsgsForProcessing(); } }