Skip to content

Commit

Permalink
Merge #6276: backport: merge bitcoin#22817, bitcoin#23042, bitcoin#22777
Browse files Browse the repository at this point in the history
, bitcoin#23774, bitcoin#25443, bitcoin#26138, bitcoin#26854, bitcoin#27128, bitcoin#27761, bitcoin#27863, bitcoin#28287, bitcoin#30118, partial bitcoin#22778 (auxiliary backports: part 16)

e458adb merge bitcoin#30118: improve robustness of connect_nodes() (UdjinM6)
ac94de2 merge bitcoin#28287: add `sendmsgtopeer` rpc and a test for net-level deadlock situation (Kittywhiskers Van Gogh)
d1fce0b fix: ensure that deadlocks are actually resolved (Kittywhiskers Van Gogh)
19e7bf6 merge bitcoin#27863: do not break when addr is not from a distinct network group (Kittywhiskers Van Gogh)
1adb9a2 merge bitcoin#27761: Log addresses of stalling peers (Kittywhiskers Van Gogh)
2854a6a merge bitcoin#27128: fix intermittent issue in `p2p_disconnect_ban` (Kittywhiskers Van Gogh)
d4b0fae merge bitcoin#26854: Fix intermittent timeout in p2p_permissions.py (Kittywhiskers Van Gogh)
892e329 merge bitcoin#26138: Avoid race in disconnect_nodes helper (Kittywhiskers Van Gogh)
d6ce037 merge bitcoin#25443: Fail if connect_nodes fails (Kittywhiskers Van Gogh)
60b5392 partial bitcoin#22778: Reduce resource usage for inbound block-relay-only connections (Kittywhiskers Van Gogh)
85c4aef merge bitcoin#23774: Add missing assert_equal import to p2p_add_connections.py (Kittywhiskers Van Gogh)
0354417 merge bitcoin#22777: don't request tx relay on feeler connections (Kittywhiskers Van Gogh)
7229eb0 merge bitcoin#23042: Avoid logging AlreadyHaveTx when disconnecting misbehaving peer (Kittywhiskers Van Gogh)
05395ff merge bitcoin#22817: Avoid race after connect_nodes (Kittywhiskers Van Gogh)

Pull request description:

  ## Additional Information

  * Depends on #6286

  * Depends on #6287

  * Depends on #6289

  * When backporting [bitcoin#28287](bitcoin#28287), `p2p_net_deadlock.py` relies on the function, `random_bytes()`, that is introduced in [bitcoin#25625](bitcoin#25625). Backporting [bitcoin#25625](bitcoin#25625) would attract changes outside the scope of this PR.

    In the interest of brevity, the changes that introduce `random_bytes()` have been included in [bitcoin#28287](bitcoin#28287) instead.

  ## Breaking Changes

  None expected.

  ## Checklist:

  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas **(note: N/A)**
  - [x] I have added or updated relevant unit/integration/functional/e2e tests
  - [x] I have made corresponding changes to the documentation **(note: N/A)**
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

ACKs for top commit:
  UdjinM6:
    utACK e458adb
  PastaPastaPasta:
    utACK e458adb

Tree-SHA512: 48494004dddecb31c53f5e19ab0114b92ed7b4381c7977800fd49b7403222badbfdcfe46241e854f5b086c6f54a35f6483f91c6f047b7ac9b1e88e35bb32ad02
  • Loading branch information
PastaPastaPasta committed Oct 4, 2024
2 parents 251f16b + e458adb commit 8cef87d
Show file tree
Hide file tree
Showing 14 changed files with 244 additions and 58 deletions.
11 changes: 6 additions & 5 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1998,7 +1998,6 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
switch (conn_type) {
case ConnectionType::INBOUND:
case ConnectionType::MANUAL:
case ConnectionType::FEELER:
return false;
case ConnectionType::OUTBOUND_FULL_RELAY:
max_connections = m_max_outbound_full_relay;
Expand All @@ -2009,6 +2008,9 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
// no limit for ADDR_FETCH because -seednode has no limit either
case ConnectionType::ADDR_FETCH:
break;
// no limit for FEELER connections since they're short-lived
case ConnectionType::FEELER:
break;
} // no default case, so the compiler can warn about missing cases

// Count existing connections
Expand Down Expand Up @@ -2253,6 +2255,7 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
for (CNode* pnode : nodes) {
bool select_recv = !pnode->fHasRecvData;
bool select_send = !pnode->fCanSendData;
if (!select_recv && !select_send) continue;

LOCK(pnode->m_sock_mutex);
if (!pnode->m_sock) {
Expand Down Expand Up @@ -2623,9 +2626,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
const auto& [to_send, more, _msg_type] = it->second->m_transport->GetBytesToSend(it->second->nSendMsgSize != 0);
const bool queue_is_empty{to_send.empty() && !more};
if (!it->second->fPauseRecv && !it->second->fDisconnect && queue_is_empty) {
if (!it->second->fPauseRecv && !it->second->fDisconnect && it->second->nSendMsgSize == 0) {
it->second->AddRef();
vReceivableNodes.emplace(it->second);
}
Expand Down Expand Up @@ -3301,7 +3302,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe

// Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups
if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) {
break;
continue;
}

// if we selected an invalid address, restart
Expand Down
4 changes: 2 additions & 2 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1433,8 +1433,8 @@ friend class CNode;
* Attempts to open a connection. Currently only used from tests.
*
* @param[in] address Address of node to try connecting to
* @param[in] conn_type ConnectionType::OUTBOUND or ConnectionType::BLOCK_RELAY
* or ConnectionType::ADDR_FETCH
* @param[in] conn_type ConnectionType::OUTBOUND, ConnectionType::BLOCK_RELAY,
* ConnectionType::ADDR_FETCH or ConnectionType::FEELER
* @return bool Returns false if there are no available
* slots for this connection:
* - conn_type not a supported ConnectionType
Expand Down
51 changes: 31 additions & 20 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,25 +272,34 @@ struct Peer {

struct TxRelay {
mutable RecursiveMutex m_bloom_filter_mutex;
// We use m_relay_txs for two purposes -
// a) it allows us to not relay tx invs before receiving the peer's version message
// b) the peer may tell us in its version message that we should not relay tx invs
// unless it loads a bloom filter.
/** Whether the peer wishes to receive transaction announcements.
*
* This is initially set based on the fRelay flag in the received
* `version` message. If initially set to false, it can only be flipped
* to true if we have offered the peer NODE_BLOOM services and it sends
* us a `filterload` or `filterclear` message. See BIP37. */
bool m_relay_txs GUARDED_BY(m_bloom_filter_mutex){false};
/** A bloom filter for which transactions to announce to the peer. See BIP37. */
std::unique_ptr<CBloomFilter> m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr};

mutable RecursiveMutex m_tx_inventory_mutex;
// inventory based relay
/** A filter of all the txids that the peer has announced to
* us or we have announced to the peer. We use this to avoid announcing
* the same txid to a peer that already has the transaction. */
CRollingBloomFilter m_tx_inventory_known_filter GUARDED_BY(m_tx_inventory_mutex){50000, 0.000001};
// Set of transaction ids we still have to announce.
// They are sorted by the mempool before relay, so the order is not important.
/** Set of transaction ids we still have to announce. We use the
* mempool to sort transactions in dependency order before relay, so
* this does not have to be sorted. */
std::set<uint256> m_tx_inventory_to_send GUARDED_BY(m_tx_inventory_mutex);
// List of non-tx/non-block inventory items
/** List of non-tx/non-block inventory items */
std::vector<CInv> vInventoryOtherToSend GUARDED_BY(m_tx_inventory_mutex);
// Used for BIP35 mempool sending, also protected by m_tx_inventory_mutex
/** Whether the peer has requested us to send our complete mempool. Only
* permitted if the peer has NetPermissionFlags::Mempool. See BIP35. */
bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false};
// Last time a "MEMPOOL" request was serviced.
/** The last time a BIP35 `mempool` request was serviced. */
std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
/** The next time after which we will send an `inv` message containing
* transaction announcements to this peer. */
std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
};

Expand Down Expand Up @@ -1384,7 +1393,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer)
nProtocolVersion = gArgs.GetArg("-pushversion", PROTOCOL_VERSION);
}

const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn();
const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn() && !pnode.IsFeelerConn();
m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, nProtocolVersion, my_services, nTime,
your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime)
my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime)
Expand Down Expand Up @@ -3831,6 +3840,12 @@ void PeerManagerImpl::ProcessMessage(
best_block = &inv.hash;
}
} else {
if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId());
pfrom.fDisconnect = true;
return;
}

const bool fAlreadyHave = AlreadyHave(inv);
LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId());
::g_stats_client->inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f);
Expand All @@ -3840,11 +3855,7 @@ void PeerManagerImpl::ProcessMessage(
};

AddKnownInv(*peer, inv.hash);
if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId());
pfrom.fDisconnect = true;
return;
} else if (!fAlreadyHave) {
if (!fAlreadyHave) {
if (fBlocksOnly && inv.type == MSG_ISDLOCK) {
if (pfrom.GetCommonVersion() <= ADDRV2_PROTO_VERSION) {
// It's ok to receive these invs, we just ignore them
Expand Down Expand Up @@ -5889,7 +5900,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// Stalling only triggers when the block download window cannot move. During normal steady state,
// the download window should be much larger than the to-be-downloaded set of blocks, so disconnection
// should only happen during initial block download.
LogPrintf("Peer=%d is stalling block download, disconnecting\n", pto->GetId());
LogPrintf("Peer=%d%s is stalling block download, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
pto->fDisconnect = true;
// Increase timeout for the next peer so that we don't disconnect multiple peers if our own
// bandwidth is insufficient.
Expand All @@ -5908,7 +5919,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
QueuedBlock &queuedBlock = state.vBlocksInFlight.front();
int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1;
if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) {
LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId());
LogPrintf("Timeout downloading block %s from peer=%d%s, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
pto->fDisconnect = true;
return true;
}
Expand All @@ -5924,11 +5935,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// disconnect our sync peer for stalling; we have bigger
// problems if we can't get any outbound peers.
if (!pto->HasPermission(NetPermissionFlags::NoBan)) {
LogPrintf("Timeout downloading headers from peer=%d, disconnecting\n", pto->GetId());
LogPrintf("Timeout downloading headers from peer=%d%s, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
pto->fDisconnect = true;
return true;
} else {
LogPrintf("Timeout downloading headers from noban peer=%d, not disconnecting\n", pto->GetId());
LogPrintf("Timeout downloading headers from noban peer=%d%s, not disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
// Reset the headers sync state so that we have a
// chance to try downloading from a different peer.
// Note: this will also result in at least one more
Expand Down
1 change: 1 addition & 0 deletions src/rpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "getnodeaddresses", 0, "count"},
{ "addpeeraddress", 1, "port"},
{ "addpeeraddress", 2, "tried"},
{ "sendmsgtopeer", 0, "peer_id" },
{ "stop", 0, "wait" },
{ "verifychainlock", 2, "blockHeight" },
{ "verifyislock", 3, "maxHeight" },
Expand Down
52 changes: 51 additions & 1 deletion src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ static RPCHelpMan addconnection()
"\nOpen an outbound connection to a specified node. This RPC is for testing only.\n",
{
{"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The IP address and port to attempt connecting to."},
{"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\" or \"addr-fetch\")."},
{"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\", \"addr-fetch\" or \"feeler\")."},
},
RPCResult{
RPCResult::Type::OBJ, "", "",
Expand Down Expand Up @@ -380,6 +380,8 @@ static RPCHelpMan addconnection()
conn_type = ConnectionType::BLOCK_RELAY;
} else if (conn_type_in == "addr-fetch") {
conn_type = ConnectionType::ADDR_FETCH;
} else if (conn_type_in == "feeler") {
conn_type = ConnectionType::FEELER;
} else {
throw JSONRPCError(RPC_INVALID_PARAMETER, self.ToString());
}
Expand Down Expand Up @@ -1019,6 +1021,53 @@ static RPCHelpMan addpeeraddress()
};
}

static RPCHelpMan sendmsgtopeer()
{
return RPCHelpMan{
"sendmsgtopeer",
"Send a p2p message to a peer specified by id.\n"
"The message type and body must be provided, the message header will be generated.\n"
"This RPC is for testing only.",
{
{"peer_id", RPCArg::Type::NUM, RPCArg::Optional::NO, "The peer to send the message to."},
{"msg_type", RPCArg::Type::STR, RPCArg::Optional::NO, strprintf("The message type (maximum length %i)", CMessageHeader::COMMAND_SIZE)},
{"msg", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The serialized message body to send, in hex, without a message header"},
},
RPCResult{RPCResult::Type::NONE, "", ""},
RPCExamples{
HelpExampleCli("sendmsgtopeer", "0 \"addr\" \"ffffff\"") + HelpExampleRpc("sendmsgtopeer", "0 \"addr\" \"ffffff\"")},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue {
const NodeId peer_id{request.params[0].get_int()};
const std::string& msg_type{request.params[1].get_str()};
if (msg_type.size() > CMessageHeader::COMMAND_SIZE) {
throw JSONRPCError(RPC_INVALID_PARAMETER, strprintf("Error: msg_type too long, max length is %i", CMessageHeader::COMMAND_SIZE));
}
const std::string& msg{request.params[2].get_str()};
if (!msg.empty() && !IsHex(msg)) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Error parsing input for msg");
}

NodeContext& node = EnsureAnyNodeContext(request.context);
CConnman& connman = EnsureConnman(node);

CSerializedNetMsg msg_ser;
msg_ser.data = ParseHex(msg);
msg_ser.m_type = msg_type;

bool success = connman.ForNode(peer_id, [&](CNode* node) {
connman.PushMessage(node, std::move(msg_ser));
return true;
});

if (!success) {
throw JSONRPCError(RPC_MISC_ERROR, "Error: Could not send message to peer");
}

return NullUniValue;
},
};
}

static RPCHelpMan setmnthreadactive()
{
return RPCHelpMan{"setmnthreadactive",
Expand Down Expand Up @@ -1068,6 +1117,7 @@ static const CRPCCommand commands[] =

{ "hidden", &addconnection, },
{ "hidden", &addpeeraddress, },
{ "hidden", &sendmsgtopeer },
{ "hidden", &setmnthreadactive },
};
// clang-format on
Expand Down
1 change: 1 addition & 0 deletions src/test/fuzz/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ const std::vector<std::string> RPC_COMMANDS_SAFE_FOR_FUZZING{
"pruneblockchain",
"reconsiderblock",
"scantxoutset",
"sendmsgtopeer", // when no peers are connected, no p2p message is sent
"sendrawtransaction",
"setmnthreadactive",
"setmocktime",
Expand Down
24 changes: 22 additions & 2 deletions test/functional/p2p_add_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@

from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import check_node_connections

from test_framework.util import (
assert_equal,
check_node_connections,
)

class P2PFeelerReceiver(P2PInterface):
def on_version(self, message):
# The bitcoind node closes feeler connections as soon as a version
# message is received from the test framework. Don't send any responses
# to the node's version message since the connection will already be
# closed.
pass

class P2PAddConnections(BitcoinTestFramework):
def set_test_params(self):
Expand Down Expand Up @@ -86,6 +96,16 @@ def run_test(self):

check_node_connections(node=self.nodes[1], num_in=5, num_out=10)

self.log.info("Add 1 feeler connection to node 0")
feeler_conn = self.nodes[0].add_outbound_p2p_connection(P2PFeelerReceiver(), p2p_idx=6, connection_type="feeler")

# Feeler connection is closed
assert not feeler_conn.is_connected

# Verify version message received
assert_equal(feeler_conn.message_count["version"], 1)
# Feeler connections do not request tx relay
assert_equal(feeler_conn.last_message["version"].relay, 0)

if __name__ == '__main__':
P2PAddConnections().main()
4 changes: 2 additions & 2 deletions test/functional/p2p_disconnect_ban.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def run_test(self):
self.log.info("disconnectnode: successfully disconnect node by address")
address1 = self.nodes[0].getpeerinfo()[0]['addr']
self.nodes[0].disconnectnode(address=address1)
self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10)
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10)
assert not [node for node in self.nodes[0].getpeerinfo() if node['addr'] == address1]

self.log.info("disconnectnode: successfully reconnect node")
Expand All @@ -102,7 +102,7 @@ def run_test(self):
self.log.info("disconnectnode: successfully disconnect node by node id")
id1 = self.nodes[0].getpeerinfo()[0]['id']
self.nodes[0].disconnectnode(nodeid=id1)
self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10)
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10)
assert not [node for node in self.nodes[0].getpeerinfo() if node['id'] == id1]

if __name__ == '__main__':
Expand Down
37 changes: 37 additions & 0 deletions test/functional/p2p_net_deadlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python3
# Copyright (c) 2023-present The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

import threading
from test_framework.messages import MAX_PROTOCOL_MESSAGE_LENGTH
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import random_bytes

class NetDeadlockTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2

def run_test(self):
node0 = self.nodes[0]
node1 = self.nodes[1]

self.log.info("Simultaneously send a large message on both sides")
rand_msg = random_bytes(MAX_PROTOCOL_MESSAGE_LENGTH).hex()

thread0 = threading.Thread(target=node0.sendmsgtopeer, args=(0, "unknown", rand_msg))
thread1 = threading.Thread(target=node1.sendmsgtopeer, args=(0, "unknown", rand_msg))

thread0.start()
thread1.start()
thread0.join()
thread1.join()

self.log.info("Check whether a deadlock happened")
self.nodes[0].generate(1)
self.sync_blocks()


if __name__ == '__main__':
NetDeadlockTest().main()
Loading

0 comments on commit 8cef87d

Please sign in to comment.