From 05395ff37b4ae601c3ddf095920ac25c9959a6ed Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 22 Sep 2024 13:58:07 +0000 Subject: [PATCH 01/14] merge bitcoin#22817: Avoid race after connect_nodes Due to stricter checks, we can no longer start masternodes in parallel, as entities used to process `to_connection` checks are reused before the previous check is completed, resulting in an exception. Since we're now validating the establishment of a two-way connection, we have to do it one at a time. --- .../test_framework/test_framework.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 0eeed8ba3a71f..00b5c74463508 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -695,18 +695,19 @@ def connect_nodes(self, a, b): if (a == b): return - def connect_nodes_helper(from_connection, node_num): - ip_port = "127.0.0.1:" + str(p2p_port(node_num)) - from_connection.addnode(ip_port, "onetry") - # poll until version handshake complete to avoid race conditions - # with transaction relaying - # See comments in net_processing: - # * Must have a version message before anything else - # * Must have a verack message before anything else - wait_until_helper(lambda: all(peer['version'] != 0 for peer in from_connection.getpeerinfo())) - wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo())) - - connect_nodes_helper(self.nodes[a], b) + from_connection = self.nodes[a] + to_connection = self.nodes[b] + ip_port = "127.0.0.1:" + str(p2p_port(b)) + from_connection.addnode(ip_port, "onetry") + # poll until version handshake complete to avoid race conditions + # with transaction relaying + # See comments in net_processing: + # * Must have a version message before anything else + # * Must have a verack message before anything else + wait_until_helper(lambda: all(peer['version'] != 0 for peer in from_connection.getpeerinfo())) + wait_until_helper(lambda: all(peer['version'] != 0 for peer in to_connection.getpeerinfo())) + wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo())) + wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in to_connection.getpeerinfo())) def disconnect_nodes(self, a, b): # A node cannot disconnect from itself, bail out early From 7229eb0ae231a1275e8ee484173990ef0e01ebc8 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 30 Sep 2024 11:22:33 +0000 Subject: [PATCH 02/14] merge bitcoin#23042: Avoid logging AlreadyHaveTx when disconnecting misbehaving peer --- src/net_processing.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f78889857ddab..8bfbdefa14faf 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -3831,6 +3831,12 @@ void PeerManagerImpl::ProcessMessage( best_block = &inv.hash; } } else { + if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) { + LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId()); + pfrom.fDisconnect = true; + return; + } + const bool fAlreadyHave = AlreadyHave(inv); LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); ::g_stats_client->inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f); @@ -3840,11 +3846,7 @@ void PeerManagerImpl::ProcessMessage( }; AddKnownInv(*peer, inv.hash); - if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) { - LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId()); - pfrom.fDisconnect = true; - return; - } else if (!fAlreadyHave) { + if (!fAlreadyHave) { if (fBlocksOnly && inv.type == MSG_ISDLOCK) { if (pfrom.GetCommonVersion() <= ADDRV2_PROTO_VERSION) { // It's ok to receive these invs, we just ignore them From 03544175d9e9cd544a178574c3e042e058e9b717 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Wed, 2 Oct 2024 08:31:15 +0000 Subject: [PATCH 03/14] merge bitcoin#22777: don't request tx relay on feeler connections --- src/net.cpp | 4 +++- src/net.h | 4 ++-- src/net_processing.cpp | 2 +- src/rpc/net.cpp | 4 +++- test/functional/p2p_add_connections.py | 17 +++++++++++++++++ test/functional/test_framework/test_node.py | 15 ++++++++++----- 6 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index a49132c8dfc77..2df0b3ecb5311 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1998,7 +1998,6 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ switch (conn_type) { case ConnectionType::INBOUND: case ConnectionType::MANUAL: - case ConnectionType::FEELER: return false; case ConnectionType::OUTBOUND_FULL_RELAY: max_connections = m_max_outbound_full_relay; @@ -2009,6 +2008,9 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ // no limit for ADDR_FETCH because -seednode has no limit either case ConnectionType::ADDR_FETCH: break; + // no limit for FEELER connections since they're short-lived + case ConnectionType::FEELER: + break; } // no default case, so the compiler can warn about missing cases // Count existing connections diff --git a/src/net.h b/src/net.h index ab17ca77587d9..23988dd1d2c38 100644 --- a/src/net.h +++ b/src/net.h @@ -1433,8 +1433,8 @@ friend class CNode; * Attempts to open a connection. Currently only used from tests. * * @param[in] address Address of node to try connecting to - * @param[in] conn_type ConnectionType::OUTBOUND or ConnectionType::BLOCK_RELAY - * or ConnectionType::ADDR_FETCH + * @param[in] conn_type ConnectionType::OUTBOUND, ConnectionType::BLOCK_RELAY, + * ConnectionType::ADDR_FETCH or ConnectionType::FEELER * @return bool Returns false if there are no available * slots for this connection: * - conn_type not a supported ConnectionType diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 8bfbdefa14faf..2005644b64a91 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1384,7 +1384,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) nProtocolVersion = gArgs.GetArg("-pushversion", PROTOCOL_VERSION); } - const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn(); + const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn() && !pnode.IsFeelerConn(); m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, nProtocolVersion, my_services, nTime, your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime) my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime) diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index a5ae324644c0d..da1534ecada09 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -352,7 +352,7 @@ static RPCHelpMan addconnection() "\nOpen an outbound connection to a specified node. This RPC is for testing only.\n", { {"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The IP address and port to attempt connecting to."}, - {"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\" or \"addr-fetch\")."}, + {"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\", \"addr-fetch\" or \"feeler\")."}, }, RPCResult{ RPCResult::Type::OBJ, "", "", @@ -380,6 +380,8 @@ static RPCHelpMan addconnection() conn_type = ConnectionType::BLOCK_RELAY; } else if (conn_type_in == "addr-fetch") { conn_type = ConnectionType::ADDR_FETCH; + } else if (conn_type_in == "feeler") { + conn_type = ConnectionType::FEELER; } else { throw JSONRPCError(RPC_INVALID_PARAMETER, self.ToString()); } diff --git a/test/functional/p2p_add_connections.py b/test/functional/p2p_add_connections.py index 8b7ea12d91f6c..3b11b8c5f8ff2 100755 --- a/test/functional/p2p_add_connections.py +++ b/test/functional/p2p_add_connections.py @@ -8,6 +8,13 @@ from test_framework.test_framework import BitcoinTestFramework from test_framework.util import check_node_connections +class P2PFeelerReceiver(P2PInterface): + def on_version(self, message): + # The bitcoind node closes feeler connections as soon as a version + # message is received from the test framework. Don't send any responses + # to the node's version message since the connection will already be + # closed. + pass class P2PAddConnections(BitcoinTestFramework): def set_test_params(self): @@ -86,6 +93,16 @@ def run_test(self): check_node_connections(node=self.nodes[1], num_in=5, num_out=10) + self.log.info("Add 1 feeler connection to node 0") + feeler_conn = self.nodes[0].add_outbound_p2p_connection(P2PFeelerReceiver(), p2p_idx=6, connection_type="feeler") + + # Feeler connection is closed + assert not feeler_conn.is_connected + + # Verify version message received + assert_equal(feeler_conn.message_count["version"], 1) + # Feeler connections do not request tx relay + assert_equal(feeler_conn.last_message["version"].relay, 0) if __name__ == '__main__': P2PAddConnections().main() diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index dbdeb255debfe..5a7a54eb97118 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -574,7 +574,7 @@ def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, **kwargs): def add_outbound_p2p_connection(self, p2p_conn, *, p2p_idx, connection_type="outbound-full-relay", **kwargs): """Add an outbound p2p connection from node. Must be an - "outbound-full-relay", "block-relay-only" or "addr-fetch" connection. + "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection. This method adds the p2p connection to the self.p2ps list and returns the connection to the caller. @@ -586,11 +586,16 @@ def addconnection_callback(address, port): p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, **kwargs)() - p2p_conn.wait_for_connect() - self.p2ps.append(p2p_conn) + if connection_type == "feeler": + # feeler connections are closed as soon as the node receives a `version` message + p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False) + p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False) + else: + p2p_conn.wait_for_connect() + self.p2ps.append(p2p_conn) - p2p_conn.wait_for_verack() - p2p_conn.sync_with_ping() + p2p_conn.wait_for_verack() + p2p_conn.sync_with_ping() return p2p_conn From 85c4aef9cbe24ad34f92fdaadf3a6e2ab45fd7fe Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Tue, 14 Dec 2021 12:28:25 -0500 Subject: [PATCH 04/14] merge bitcoin#23774: Add missing assert_equal import to p2p_add_connections.py --- test/functional/p2p_add_connections.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/functional/p2p_add_connections.py b/test/functional/p2p_add_connections.py index 3b11b8c5f8ff2..c9b3bc335a53d 100755 --- a/test/functional/p2p_add_connections.py +++ b/test/functional/p2p_add_connections.py @@ -6,7 +6,10 @@ from test_framework.p2p import P2PInterface from test_framework.test_framework import BitcoinTestFramework -from test_framework.util import check_node_connections +from test_framework.util import ( + assert_equal, + check_node_connections, +) class P2PFeelerReceiver(P2PInterface): def on_version(self, message): From 60b5392d92ed2ee0d658e8c4270793fb87e7cb29 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 16 Aug 2021 13:49:50 +0100 Subject: [PATCH 05/14] partial bitcoin#22778: Reduce resource usage for inbound block-relay-only connections includes: - 290a8dab0288344fa5731ec2ffd09478e9420a2f --- src/net_processing.cpp | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 2005644b64a91..34f7df8eba030 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -272,25 +272,34 @@ struct Peer { struct TxRelay { mutable RecursiveMutex m_bloom_filter_mutex; - // We use m_relay_txs for two purposes - - // a) it allows us to not relay tx invs before receiving the peer's version message - // b) the peer may tell us in its version message that we should not relay tx invs - // unless it loads a bloom filter. + /** Whether the peer wishes to receive transaction announcements. + * + * This is initially set based on the fRelay flag in the received + * `version` message. If initially set to false, it can only be flipped + * to true if we have offered the peer NODE_BLOOM services and it sends + * us a `filterload` or `filterclear` message. See BIP37. */ bool m_relay_txs GUARDED_BY(m_bloom_filter_mutex){false}; + /** A bloom filter for which transactions to announce to the peer. See BIP37. */ std::unique_ptr m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr}; mutable RecursiveMutex m_tx_inventory_mutex; - // inventory based relay + /** A filter of all the txids that the peer has announced to + * us or we have announced to the peer. We use this to avoid announcing + * the same txid to a peer that already has the transaction. */ CRollingBloomFilter m_tx_inventory_known_filter GUARDED_BY(m_tx_inventory_mutex){50000, 0.000001}; - // Set of transaction ids we still have to announce. - // They are sorted by the mempool before relay, so the order is not important. + /** Set of transaction ids we still have to announce. We use the + * mempool to sort transactions in dependency order before relay, so + * this does not have to be sorted. */ std::set m_tx_inventory_to_send GUARDED_BY(m_tx_inventory_mutex); - // List of non-tx/non-block inventory items + /** List of non-tx/non-block inventory items */ std::vector vInventoryOtherToSend GUARDED_BY(m_tx_inventory_mutex); - // Used for BIP35 mempool sending, also protected by m_tx_inventory_mutex + /** Whether the peer has requested us to send our complete mempool. Only + * permitted if the peer has NetPermissionFlags::Mempool. See BIP35. */ bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false}; - // Last time a "MEMPOOL" request was serviced. + /** The last time a BIP35 `mempool` request was serviced. */ std::atomic m_last_mempool_req{0s}; + /** The next time after which we will send an `inv` message containing + * transaction announcements to this peer. */ std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; }; From d6ce037814dbbe3d9a5c22427a9c0195a5a572b7 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 22 Sep 2024 12:40:48 +0000 Subject: [PATCH 06/14] merge bitcoin#25443: Fail if connect_nodes fails --- test/functional/test_framework/test_framework.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 00b5c74463508..86d818b04567c 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -697,6 +697,8 @@ def connect_nodes(self, a, b): from_connection = self.nodes[a] to_connection = self.nodes[b] + from_num_peers = 1 + len(from_connection.getpeerinfo()) + to_num_peers = 1 + len(to_connection.getpeerinfo()) ip_port = "127.0.0.1:" + str(p2p_port(b)) from_connection.addnode(ip_port, "onetry") # poll until version handshake complete to avoid race conditions @@ -704,10 +706,10 @@ def connect_nodes(self, a, b): # See comments in net_processing: # * Must have a version message before anything else # * Must have a verack message before anything else - wait_until_helper(lambda: all(peer['version'] != 0 for peer in from_connection.getpeerinfo())) - wait_until_helper(lambda: all(peer['version'] != 0 for peer in to_connection.getpeerinfo())) - wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo())) - wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in to_connection.getpeerinfo())) + self.wait_until(lambda: sum(peer['version'] != 0 for peer in from_connection.getpeerinfo()) == from_num_peers) + self.wait_until(lambda: sum(peer['version'] != 0 for peer in to_connection.getpeerinfo()) == to_num_peers) + self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo()) == from_num_peers) + self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in to_connection.getpeerinfo()) == to_num_peers) def disconnect_nodes(self, a, b): # A node cannot disconnect from itself, bail out early @@ -740,7 +742,7 @@ def get_peer_ids(): raise # wait to disconnect - wait_until_helper(lambda: not get_peer_ids(), timeout=5) + self.wait_until(lambda: not get_peer_ids(), timeout=5) disconnect_nodes_helper(self.nodes[a], b) From 892e329ada7e51278f7bb548eae6a450d8eb1bb9 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Tue, 20 Sep 2022 16:00:43 +0200 Subject: [PATCH 07/14] merge bitcoin#26138: Avoid race in disconnect_nodes helper --- .../functional/test_framework/test_framework.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 86d818b04567c..3d3b970cbd54b 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -716,24 +716,24 @@ def disconnect_nodes(self, a, b): if (a == b): return - def disconnect_nodes_helper(from_connection, node_num): - def get_peer_ids(): + def disconnect_nodes_helper(node_a, node_b): + def get_peer_ids(from_connection, node_num): result = [] for peer in from_connection.getpeerinfo(): if "testnode{}".format(node_num) in peer['subver']: result.append(peer['id']) return result - peer_ids = get_peer_ids() + peer_ids = get_peer_ids(node_a, node_b.index) if not peer_ids: self.log.warning("disconnect_nodes: {} and {} were not connected".format( - from_connection.index, - node_num, + node_a.index, + node_b.index, )) return for peer_id in peer_ids: try: - from_connection.disconnectnode(nodeid=peer_id) + node_a.disconnectnode(nodeid=peer_id) except JSONRPCException as e: # If this node is disconnected between calculating the peer id # and issuing the disconnect, don't worry about it. @@ -742,9 +742,10 @@ def get_peer_ids(): raise # wait to disconnect - self.wait_until(lambda: not get_peer_ids(), timeout=5) + self.wait_until(lambda: not get_peer_ids(node_a, node_b.index), timeout=5) + self.wait_until(lambda: not get_peer_ids(node_b, node_a.index), timeout=5) - disconnect_nodes_helper(self.nodes[a], b) + disconnect_nodes_helper(self.nodes[a], self.nodes[b]) def isolate_node(self, node_num, timeout=5): self.nodes[node_num].setnetworkactive(False) From d4b0faeae1e7cf9185e77c33cc0b1bdcaded3035 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 19 Sep 2024 07:48:07 +0000 Subject: [PATCH 08/14] merge bitcoin#26854: Fix intermittent timeout in p2p_permissions.py --- test/functional/test_framework/test_framework.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 3d3b970cbd54b..54776e5dcb1e8 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -710,6 +710,10 @@ def connect_nodes(self, a, b): self.wait_until(lambda: sum(peer['version'] != 0 for peer in to_connection.getpeerinfo()) == to_num_peers) self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo()) == from_num_peers) self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in to_connection.getpeerinfo()) == to_num_peers) + # The message bytes are counted before processing the message, so make + # sure it was fully processed by waiting for a ping. + self.wait_until(lambda: sum(peer["bytesrecv_per_msg"].pop("pong", 0) >= 32 for peer in from_connection.getpeerinfo()) == from_num_peers) + self.wait_until(lambda: sum(peer["bytesrecv_per_msg"].pop("pong", 0) >= 32 for peer in to_connection.getpeerinfo()) == to_num_peers) def disconnect_nodes(self, a, b): # A node cannot disconnect from itself, bail out early From 2854a6aa5a8dafc25496dc41267a969f6205e223 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 20 Feb 2023 10:35:20 -0300 Subject: [PATCH 09/14] merge bitcoin#27128: fix intermittent issue in `p2p_disconnect_ban` --- test/functional/p2p_disconnect_ban.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/functional/p2p_disconnect_ban.py b/test/functional/p2p_disconnect_ban.py index eb9314cd82840..e51b22b2864ac 100755 --- a/test/functional/p2p_disconnect_ban.py +++ b/test/functional/p2p_disconnect_ban.py @@ -91,7 +91,7 @@ def run_test(self): self.log.info("disconnectnode: successfully disconnect node by address") address1 = self.nodes[0].getpeerinfo()[0]['addr'] self.nodes[0].disconnectnode(address=address1) - self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10) + self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10) assert not [node for node in self.nodes[0].getpeerinfo() if node['addr'] == address1] self.log.info("disconnectnode: successfully reconnect node") @@ -102,7 +102,7 @@ def run_test(self): self.log.info("disconnectnode: successfully disconnect node by node id") id1 = self.nodes[0].getpeerinfo()[0]['id'] self.nodes[0].disconnectnode(nodeid=id1) - self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10) + self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10) assert not [node for node in self.nodes[0].getpeerinfo() if node['id'] == id1] if __name__ == '__main__': From 1adb9a232ce929cff05f7a662ac2c7963ad42278 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 25 May 2023 15:06:54 -0400 Subject: [PATCH 10/14] merge bitcoin#27761: Log addresses of stalling peers --- src/net_processing.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 34f7df8eba030..c521338e142e8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -5900,7 +5900,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // should only happen during initial block download. - LogPrintf("Peer=%d is stalling block download, disconnecting\n", pto->GetId()); + LogPrintf("Peer=%d%s is stalling block download, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); pto->fDisconnect = true; // Increase timeout for the next peer so that we don't disconnect multiple peers if our own // bandwidth is insufficient. @@ -5919,7 +5919,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1; if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { - LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId()); + LogPrintf("Timeout downloading block %s from peer=%d%s, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); pto->fDisconnect = true; return true; } @@ -5935,11 +5935,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // disconnect our sync peer for stalling; we have bigger // problems if we can't get any outbound peers. if (!pto->HasPermission(NetPermissionFlags::NoBan)) { - LogPrintf("Timeout downloading headers from peer=%d, disconnecting\n", pto->GetId()); + LogPrintf("Timeout downloading headers from peer=%d%s, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); pto->fDisconnect = true; return true; } else { - LogPrintf("Timeout downloading headers from noban peer=%d, not disconnecting\n", pto->GetId()); + LogPrintf("Timeout downloading headers from noban peer=%d%s, not disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); // Reset the headers sync state so that we have a // chance to try downloading from a different peer. // Note: this will also result in at least one more From 19e7bf64c8fd15fc60f93bfe37c297a8f9972905 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 10 Jun 2023 08:05:10 -0300 Subject: [PATCH 11/14] merge bitcoin#27863: do not break when addr is not from a distinct network group --- src/net.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net.cpp b/src/net.cpp index 2df0b3ecb5311..e39b0896454d0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3303,7 +3303,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe // Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) { - break; + continue; } // if we selected an invalid address, restart From d1fce0b7cabc2f73b6085f31532f655d827f4e53 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Wed, 18 Sep 2024 23:12:00 +0000 Subject: [PATCH 12/14] fix: ensure that deadlocks are actually resolved The fix introduced in bitcoin#27981 (8c986d6b, dash#6067) is not validated to work until bitcoin#28287, an upcoming backport. As the latter has identified the former backport didn't pass validation, let's fix it so that the latter tests succeed when they're backported in an upcoming commit. --- src/net.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index e39b0896454d0..39b7f5a9046d1 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2255,6 +2255,7 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, for (CNode* pnode : nodes) { bool select_recv = !pnode->fHasRecvData; bool select_send = !pnode->fCanSendData; + if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); if (!pnode->m_sock) { @@ -2625,9 +2626,7 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // receiving data. This means properly utilizing TCP flow control signalling. // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try // receiving data (which should succeed as the socket signalled as receivable). - const auto& [to_send, more, _msg_type] = it->second->m_transport->GetBytesToSend(it->second->nSendMsgSize != 0); - const bool queue_is_empty{to_send.empty() && !more}; - if (!it->second->fPauseRecv && !it->second->fDisconnect && queue_is_empty) { + if (!it->second->fPauseRecv && !it->second->fDisconnect && it->second->nSendMsgSize == 0) { it->second->AddRef(); vReceivableNodes.emplace(it->second); } From ac94de23ae0385940c081b18057ee6ef2bdc6e4a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 19 Sep 2024 08:51:45 +0000 Subject: [PATCH 13/14] merge bitcoin#28287: add `sendmsgtopeer` rpc and a test for net-level deadlock situation `random_bytes()` is introduced in bitcoin#25625 but the function def alone doesn't warrant a full backport, so we'll only implement the section relevant to this PR. --- src/rpc/client.cpp | 1 + src/rpc/net.cpp | 48 ++++++++++++++++++++++++++ src/test/fuzz/rpc.cpp | 1 + test/functional/p2p_net_deadlock.py | 37 ++++++++++++++++++++ test/functional/rpc_net.py | 33 ++++++++++++++++++ test/functional/test_framework/util.py | 6 ++++ test/functional/test_runner.py | 1 + 7 files changed, 127 insertions(+) create mode 100755 test/functional/p2p_net_deadlock.py diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 8c3e96a22c733..80fc983824576 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -230,6 +230,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "getnodeaddresses", 0, "count"}, { "addpeeraddress", 1, "port"}, { "addpeeraddress", 2, "tried"}, + { "sendmsgtopeer", 0, "peer_id" }, { "stop", 0, "wait" }, { "verifychainlock", 2, "blockHeight" }, { "verifyislock", 3, "maxHeight" }, diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index da1534ecada09..7d5418c829b8e 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -1021,6 +1021,53 @@ static RPCHelpMan addpeeraddress() }; } +static RPCHelpMan sendmsgtopeer() +{ + return RPCHelpMan{ + "sendmsgtopeer", + "Send a p2p message to a peer specified by id.\n" + "The message type and body must be provided, the message header will be generated.\n" + "This RPC is for testing only.", + { + {"peer_id", RPCArg::Type::NUM, RPCArg::Optional::NO, "The peer to send the message to."}, + {"msg_type", RPCArg::Type::STR, RPCArg::Optional::NO, strprintf("The message type (maximum length %i)", CMessageHeader::COMMAND_SIZE)}, + {"msg", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The serialized message body to send, in hex, without a message header"}, + }, + RPCResult{RPCResult::Type::NONE, "", ""}, + RPCExamples{ + HelpExampleCli("sendmsgtopeer", "0 \"addr\" \"ffffff\"") + HelpExampleRpc("sendmsgtopeer", "0 \"addr\" \"ffffff\"")}, + [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { + const NodeId peer_id{request.params[0].get_int()}; + const std::string& msg_type{request.params[1].get_str()}; + if (msg_type.size() > CMessageHeader::COMMAND_SIZE) { + throw JSONRPCError(RPC_INVALID_PARAMETER, strprintf("Error: msg_type too long, max length is %i", CMessageHeader::COMMAND_SIZE)); + } + const std::string& msg{request.params[2].get_str()}; + if (!msg.empty() && !IsHex(msg)) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Error parsing input for msg"); + } + + NodeContext& node = EnsureAnyNodeContext(request.context); + CConnman& connman = EnsureConnman(node); + + CSerializedNetMsg msg_ser; + msg_ser.data = ParseHex(msg); + msg_ser.m_type = msg_type; + + bool success = connman.ForNode(peer_id, [&](CNode* node) { + connman.PushMessage(node, std::move(msg_ser)); + return true; + }); + + if (!success) { + throw JSONRPCError(RPC_MISC_ERROR, "Error: Could not send message to peer"); + } + + return NullUniValue; + }, + }; +} + static RPCHelpMan setmnthreadactive() { return RPCHelpMan{"setmnthreadactive", @@ -1070,6 +1117,7 @@ static const CRPCCommand commands[] = { "hidden", &addconnection, }, { "hidden", &addpeeraddress, }, + { "hidden", &sendmsgtopeer }, { "hidden", &setmnthreadactive }, }; // clang-format on diff --git a/src/test/fuzz/rpc.cpp b/src/test/fuzz/rpc.cpp index 7a2e052649ffc..435ebc1b91b09 100644 --- a/src/test/fuzz/rpc.cpp +++ b/src/test/fuzz/rpc.cpp @@ -149,6 +149,7 @@ const std::vector RPC_COMMANDS_SAFE_FOR_FUZZING{ "pruneblockchain", "reconsiderblock", "scantxoutset", + "sendmsgtopeer", // when no peers are connected, no p2p message is sent "sendrawtransaction", "setmnthreadactive", "setmocktime", diff --git a/test/functional/p2p_net_deadlock.py b/test/functional/p2p_net_deadlock.py new file mode 100755 index 0000000000000..cf62d1331096e --- /dev/null +++ b/test/functional/p2p_net_deadlock.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import threading +from test_framework.messages import MAX_PROTOCOL_MESSAGE_LENGTH +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import random_bytes + +class NetDeadlockTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 2 + + def run_test(self): + node0 = self.nodes[0] + node1 = self.nodes[1] + + self.log.info("Simultaneously send a large message on both sides") + rand_msg = random_bytes(MAX_PROTOCOL_MESSAGE_LENGTH).hex() + + thread0 = threading.Thread(target=node0.sendmsgtopeer, args=(0, "unknown", rand_msg)) + thread1 = threading.Thread(target=node1.sendmsgtopeer, args=(0, "unknown", rand_msg)) + + thread0.start() + thread1.start() + thread0.join() + thread1.join() + + self.log.info("Check whether a deadlock happened") + self.nodes[0].generate(1) + self.sync_blocks() + + +if __name__ == '__main__': + NetDeadlockTest().main() diff --git a/test/functional/rpc_net.py b/test/functional/rpc_net.py index 1b7c6311bf7e1..30b626ea2397d 100755 --- a/test/functional/rpc_net.py +++ b/test/functional/rpc_net.py @@ -10,6 +10,7 @@ from test_framework.p2p import P2PInterface import test_framework.messages from test_framework.messages import ( + MAX_PROTOCOL_MESSAGE_LENGTH, NODE_NETWORK, ) @@ -66,6 +67,7 @@ def run_test(self): self.test_service_flags() self.test_getnodeaddresses() self.test_addpeeraddress() + self.test_sendmsgtopeer() def test_connection_count(self): self.log.info("Test getconnectioncount") @@ -341,6 +343,37 @@ def test_addpeeraddress(self): addrs = node.getnodeaddresses(count=0) # getnodeaddresses re-runs the addrman checks assert_equal(len(addrs), 2) + def test_sendmsgtopeer(self): + node = self.nodes[0] + + self.restart_node(0) + self.connect_nodes(0, 1) + + self.log.info("Test sendmsgtopeer") + self.log.debug("Send a valid message") + with self.nodes[1].assert_debug_log(expected_msgs=["received: addr"]): + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="FFFFFF") + + self.log.debug("Test error for sending to non-existing peer") + assert_raises_rpc_error(-1, "Error: Could not send message to peer", node.sendmsgtopeer, peer_id=100, msg_type="addr", msg="FF") + + self.log.debug("Test that zero-length msg_type is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="") + + self.log.debug("Test error for msg_type that is too long") + assert_raises_rpc_error(-8, "Error: msg_type too long, max length is 12", node.sendmsgtopeer, peer_id=0, msg_type="long_msg_type", msg="FF") + + self.log.debug("Test that unknown msg_type is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="unknown", msg="FF") + + self.log.debug("Test that empty msg is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="FF") + + self.log.debug("Test that oversized messages are allowed, but get us disconnected") + zero_byte_string = b'\x00' * int(MAX_PROTOCOL_MESSAGE_LENGTH + 1) + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg=zero_byte_string.hex()) + self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0, timeout=10) + if __name__ == '__main__': NetTest().main() diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py index 3577e98a2013a..d014fd2dda6c9 100644 --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -13,6 +13,7 @@ import json import logging import os +import random import shutil import re import time @@ -274,6 +275,11 @@ def sha256sum_file(filename): d = f.read(4096) return h.digest() +# TODO: Remove and use random.randbytes(n) instead, available in Python 3.9 +def random_bytes(n): + """Return a random bytes object of length n.""" + return bytes(random.getrandbits(8) for i in range(n)) + # RPC/P2P connection constants and functions ############################################ diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index d973f61e22507..1ad6cc54eb6d7 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -259,6 +259,7 @@ 'p2p_leak_tx.py', 'p2p_eviction.py', 'p2p_ibd_stalling.py', + 'p2p_net_deadlock.py', 'rpc_signmessage.py', 'rpc_generateblock.py', 'rpc_generate.py', From e458adb61ce4ca557f45bc5b713f976676ee4f78 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Thu, 23 May 2024 10:00:00 -0400 Subject: [PATCH 14/14] merge bitcoin#30118: improve robustness of connect_nodes() --- .../test_framework/test_framework.py | 28 +++++++++++++------ test/functional/test_framework/test_node.py | 2 +- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 54776e5dcb1e8..bbd3ee06c9173 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -697,23 +697,35 @@ def connect_nodes(self, a, b): from_connection = self.nodes[a] to_connection = self.nodes[b] - from_num_peers = 1 + len(from_connection.getpeerinfo()) - to_num_peers = 1 + len(to_connection.getpeerinfo()) ip_port = "127.0.0.1:" + str(p2p_port(b)) from_connection.addnode(ip_port, "onetry") + + # Use subversion as peer id. Test nodes have their node number appended to the user agent string + from_connection_subver = from_connection.getnetworkinfo()['subversion'] + to_connection_subver = to_connection.getnetworkinfo()['subversion'] + + def find_conn(node, peer_subversion, inbound): + return next(filter(lambda peer: peer['subver'] == peer_subversion and peer['inbound'] == inbound, node.getpeerinfo()), None) + # poll until version handshake complete to avoid race conditions # with transaction relaying # See comments in net_processing: # * Must have a version message before anything else # * Must have a verack message before anything else - self.wait_until(lambda: sum(peer['version'] != 0 for peer in from_connection.getpeerinfo()) == from_num_peers) - self.wait_until(lambda: sum(peer['version'] != 0 for peer in to_connection.getpeerinfo()) == to_num_peers) - self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo()) == from_num_peers) - self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in to_connection.getpeerinfo()) == to_num_peers) + self.wait_until(lambda: find_conn(from_connection, to_connection_subver, inbound=False) is not None) + self.wait_until(lambda: find_conn(to_connection, from_connection_subver, inbound=True) is not None) + + def check_bytesrecv(peer, msg_type, min_bytes_recv): + assert peer is not None, "Error: peer disconnected" + return peer['bytesrecv_per_msg'].pop(msg_type, 0) >= min_bytes_recv + + self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'verack', 24)) + self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'verack', 24)) + # The message bytes are counted before processing the message, so make # sure it was fully processed by waiting for a ping. - self.wait_until(lambda: sum(peer["bytesrecv_per_msg"].pop("pong", 0) >= 32 for peer in from_connection.getpeerinfo()) == from_num_peers) - self.wait_until(lambda: sum(peer["bytesrecv_per_msg"].pop("pong", 0) >= 32 for peer in to_connection.getpeerinfo()) == to_num_peers) + self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 32)) + self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 32)) def disconnect_nodes(self, a, b): # A node cannot disconnect from itself, bail out early diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index 5a7a54eb97118..49c96cd779114 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -103,7 +103,7 @@ def __init__(self, i, datadir, extra_args_from_options, *, chain, rpchost, timew "-debug", "-debugexclude=libevent", "-debugexclude=leveldb", - "-uacomment=testnode%d" % i, + "-uacomment=testnode%d" % i, # required for subversion uniqueness across peers ] if self.mocktime != 0: self.args.append(f"-mocktime={mocktime}")