Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move CInstantSendManager::AskNodesForLockedTx into PeerManager #6425

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/llmq/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
isman{[&]() -> llmq::CInstantSendManager* const {
assert(llmq::quorumInstantSendManager == nullptr);
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler,
chainman.ActiveChainstate(),
connman, *qman, *sigman, *shareman,
sporkman, mempool, mn_sync, peerman,
chainman.ActiveChainstate(), *qman,
*sigman, *shareman, sporkman,
mempool, mn_sync, peerman,
is_masternode, unit_tests, wipe);
return llmq::quorumInstantSendManager.get();
}()},
Expand Down
44 changes: 2 additions & 42 deletions src/llmq/instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
// bump mempool counter to make sure newly locked txes are picked up by getblocktemplate
mempool.AddTransactionsUpdated(1);
} else {
AskNodesForLockedTx(islock->txid, connman, *m_peerman, m_is_masternode);
m_peerman->AskPeersForTransaction(islock->txid, m_is_masternode);
}
}

Expand Down Expand Up @@ -1321,7 +1321,7 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
for (const auto& p : toDelete) {
RemoveConflictedTx(*p.second);
}
AskNodesForLockedTx(islock.txid, connman, *m_peerman, m_is_masternode);
m_peerman->AskPeersForTransaction(islock.txid, m_is_masternode);
}
}

Expand Down Expand Up @@ -1426,46 +1426,6 @@ void CInstantSendManager::RemoveConflictingLock(const uint256& islockHash, const
}
}

void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman,
bool is_masternode)
{
std::vector<CNode*> nodesToAskFor;
nodesToAskFor.reserve(4);

auto maybe_add_to_nodesToAskFor = [&peerman, &nodesToAskFor, &txid](CNode* pnode) {
if (nodesToAskFor.size() >= 4) {
return;
}
if (peerman.IsInvInFilter(pnode->GetId(), txid)) {
pnode->AddRef();
nodesToAskFor.emplace_back(pnode);
}
};

connman.ForEachNode([&](CNode* pnode) {
// Check masternodes first
if (pnode->m_masternode_connection) maybe_add_to_nodesToAskFor(pnode);
});
connman.ForEachNode([&](CNode* pnode) {
// Check non-masternodes next
if (!pnode->m_masternode_connection) maybe_add_to_nodesToAskFor(pnode);
});
{
LOCK(cs_main);
for (const CNode* pnode : nodesToAskFor) {
LogPrintf("CInstantSendManager::%s -- txid=%s: asking other peer %d for correct TX\n", __func__,
txid.ToString(), pnode->GetId());

CInv inv(MSG_TX, txid);
peerman.RequestObject(pnode->GetId(), inv, GetTime<std::chrono::microseconds>(), is_masternode,
/* fForce = */ true);
}
}
for (CNode* pnode : nodesToAskFor) {
pnode->Release();
}
}

void CInstantSendManager::ProcessPendingRetryLockTxs()
{
const auto retryTxs = WITH_LOCK(cs_pendingRetry, return pendingRetryTxs);
Expand Down
22 changes: 14 additions & 8 deletions src/llmq/instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ class CInstantSendManager : public CRecoveredSigsListener

CChainLocksHandler& clhandler;
CChainState& m_chainstate;
CConnman& connman;
CQuorumManager& qman;
CSigningManager& sigman;
CSigSharesManager& shareman;
Expand Down Expand Up @@ -254,13 +253,21 @@ class CInstantSendManager : public CRecoveredSigsListener
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs GUARDED_BY(cs_pendingRetry);

public:
explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CConnman& _connman,
CQuorumManager& _qman, CSigningManager& _sigman, CSigSharesManager& _shareman,
CSporkManager& sporkman, CTxMemPool& _mempool, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool is_masternode, bool unitTests, bool fWipe) :
explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman,
CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman,
CTxMemPool& _mempool, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool is_masternode, bool unitTests,
bool fWipe) :
db(unitTests, fWipe),
clhandler(_clhandler), m_chainstate(chainstate), connman(_connman), qman(_qman), sigman(_sigman),
shareman(_shareman), spork_manager(sporkman), mempool(_mempool), m_mn_sync(mn_sync), m_peerman(peerman),
clhandler(_clhandler),
m_chainstate(chainstate),
qman(_qman),
sigman(_sigman),
shareman(_shareman),
spork_manager(sporkman),
mempool(_mempool),
m_mn_sync(mn_sync),
m_peerman(peerman),
m_is_masternode{is_masternode}
{
workInterrupt.reset();
Expand Down Expand Up @@ -314,7 +321,6 @@ class CInstantSendManager : public CRecoveredSigsListener
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
static void AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman, bool is_masternode);
void ProcessPendingRetryLockTxs()
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);

Expand Down
49 changes: 48 additions & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ class PeerManagerImpl final : public PeerManager
bool is_masternode, bool fForce = false) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
size_t GetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);

void AskPeersForTransaction(const uint256& txid, bool is_masternode) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
private:
/** Helpers to process result of external handlers of message */
void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
Expand All @@ -634,6 +634,11 @@ class PeerManagerImpl final : public PeerManager
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);

/**
* Private implementation of IsInvInFilter which does not call GetPeerRef; to be prefered when the PeerRef is available.
*/
bool IsInvInFilter(const PeerRef& peer, const uint256& hash) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);

/** Get a shared pointer to the Peer object.
* May return an empty shared_ptr if the Peer object can't be found. */
PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
Expand Down Expand Up @@ -2242,9 +2247,51 @@ void PeerManagerImpl::SendPings()
for(auto& it : m_peer_map) it.second->m_ping_queued = true;
}

void PeerManagerImpl::AskPeersForTransaction(const uint256& txid, bool is_masternode)
{
std::vector<PeerRef> peersToAsk;
peersToAsk.reserve(4);

auto maybe_add_to_nodesToAskFor = [&](const PeerRef& peer) {
if (peersToAsk.size() >= 4) {
return false;
}
if (IsInvInFilter(peer, txid)) {
peersToAsk.emplace_back(peer);
}
return true;
};

{
LOCK(m_peer_mutex);
// TODO consider prioritizing MNs again, once that flag is moved into Peer
for (const auto& [_, peer] : m_peer_map) {
if (!maybe_add_to_nodesToAskFor(peer)) {
break;
}
}
}
Comment on lines +2255 to +2273
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe simplify it?

how soon are you planning to implement this TODO?
If it's not short-term plans like very soon, it can be simplified to:

Suggested change
auto maybe_add_to_nodesToAskFor = [&](const PeerRef& peer) {
if (peersToAsk.size() >= 4) {
return false;
}
if (IsInvInFilter(peer, txid)) {
peersToAsk.emplace_back(peer);
}
return true;
};
{
LOCK(m_peer_mutex);
// TODO consider prioritizing MNs again, once that flag is moved into Peer
for (const auto& [_, peer] : m_peer_map) {
if (!maybe_add_to_nodesToAskFor(peer)) {
break;
}
}
}
{
LOCK(m_peer_mutex);
// TODO consider prioritizing MNs again, once that flag is moved into Peer
for (const auto& [_, peer] : m_peer_map) {
if (peersToAsk.size() >= 4) {
break;
}
if (IsInvInFilter(peer, txid)) {
peersToAsk.emplace_back(peer);
}
}
}

{
CInv inv(MSG_TX, txid);
LOCK(cs_main);
for (PeerRef& peer : peersToAsk) {
LogPrintf("PeerManagerImpl::%s -- txid=%s: asking other peer %d for correct TX\n", __func__,
txid.ToString(), peer->m_id);

RequestObject(peer->m_id, inv, GetTime<std::chrono::microseconds>(), is_masternode,
/*fForce=*/true);
}
}
}

bool PeerManagerImpl::IsInvInFilter(NodeId nodeid, const uint256& hash) const
{
PeerRef peer = GetPeerRef(nodeid);
return IsInvInFilter(peer, hash);
}

bool PeerManagerImpl::IsInvInFilter(const PeerRef& peer, const uint256& hash) const
{
if (peer == nullptr)
return false;
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
Expand Down
3 changes: 3 additions & 0 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class PeerManager : public CValidationInterface, public NetEventsInterface
/** Is an inventory in the known inventory filter. Used by InstantSend. */
virtual bool IsInvInFilter(NodeId nodeid, const uint256& hash) const = 0;

/** Ask a number of our peers, which have a transaction in their inventory, for the transaction. */
virtual void AskPeersForTransaction(const uint256& txid, bool is_masternode) = 0;

/** Broadcast inventory message to a specific peer. */
virtual void PushInventory(NodeId nodeid, const CInv& inv) = 0;

Expand Down
Loading