From e44ed0cfe5df825f2fe264b68be382b559ebb609 Mon Sep 17 00:00:00 2001 From: Milos Milosevic Date: Fri, 10 Feb 2023 14:50:23 +0000 Subject: [PATCH 1/8] #421 fix double in consensus of nft_lottery_token_purchase --- .../chain/include/graphene/chain/protocol/nft_lottery.hpp | 4 ++-- libraries/chain/nft_lottery_evaluator.cpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libraries/chain/include/graphene/chain/protocol/nft_lottery.hpp b/libraries/chain/include/graphene/chain/protocol/nft_lottery.hpp index 0c8ea8559..00ea31b08 100644 --- a/libraries/chain/include/graphene/chain/protocol/nft_lottery.hpp +++ b/libraries/chain/include/graphene/chain/protocol/nft_lottery.hpp @@ -18,7 +18,7 @@ namespace graphene // Buyer purchasing lottery tickets account_id_type buyer; // count of tickets to buy - uint64_t tickets_to_buy; + share_type tickets_to_buy; // amount that can spent asset amount; @@ -83,4 +83,4 @@ FC_REFLECT(graphene::chain::nft_lottery_reward_operation::fee_parameters_type, ( FC_REFLECT(graphene::chain::nft_lottery_end_operation::fee_parameters_type, (fee)) FC_REFLECT(graphene::chain::nft_lottery_token_purchase_operation, (fee)(lottery_id)(buyer)(tickets_to_buy)(amount)(extensions)) FC_REFLECT(graphene::chain::nft_lottery_reward_operation, (fee)(lottery_id)(winner)(amount)(win_percentage)(is_benefactor_reward)(winner_ticket_id)(extensions)) -FC_REFLECT(graphene::chain::nft_lottery_end_operation, (fee)(lottery_id)(extensions)) \ No newline at end of file +FC_REFLECT(graphene::chain::nft_lottery_end_operation, (fee)(lottery_id)(extensions)) diff --git a/libraries/chain/nft_lottery_evaluator.cpp b/libraries/chain/nft_lottery_evaluator.cpp index 794616cf3..2d9fe7f09 100644 --- a/libraries/chain/nft_lottery_evaluator.cpp +++ b/libraries/chain/nft_lottery_evaluator.cpp @@ -30,7 +30,7 @@ namespace graphene auto lottery_options = lottery_md_obj.lottery_data->lottery_options; FC_ASSERT(lottery_options.ticket_price.asset_id == op.amount.asset_id); - FC_ASSERT((double)op.amount.amount.value / lottery_options.ticket_price.amount.value == (double)op.tickets_to_buy); + FC_ASSERT(op.tickets_to_buy * lottery_options.ticket_price.amount.value == op.amount.amount.value); return void_result(); } FC_CAPTURE_AND_RETHROW((op)) @@ -142,4 +142,4 @@ namespace graphene FC_CAPTURE_AND_RETHROW((op)) } } // namespace chain -} // namespace graphene \ No newline at end of file +} // namespace graphene From f477af67714c79b044580008ca4d7e2d2537c41a Mon Sep 17 00:00:00 2001 From: Vlad Dobromyslov Date: Thu, 16 Feb 2023 12:45:03 +0200 Subject: [PATCH 2/8] #509 - fix hive withdrawal processing --- .../plugins/peerplays_sidechain/sidechain_net_handler.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp index 490cfe0be..a4fe12528 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp @@ -679,7 +679,8 @@ void sidechain_net_handler::on_applied_block(const signed_block &b) { const bool is_tracked_asset = ((sidechain == sidechain_type::bitcoin) && (transfer_op.amount.asset_id == gpo.parameters.btc_asset())) || ((sidechain == sidechain_type::ethereum) && (transfer_op.amount.asset_id == gpo.parameters.eth_asset())) || - (sidechain == sidechain_type::ethereum) || + ((sidechain == sidechain_type::ethereum) && (transfer_op.amount.asset_id != gpo.parameters.btc_asset()) + && (transfer_op.amount.asset_id != gpo.parameters.hbd_asset()) && (transfer_op.amount.asset_id != gpo.parameters.hive_asset())) || ((sidechain == sidechain_type::hive) && (transfer_op.amount.asset_id == gpo.parameters.hbd_asset())) || ((sidechain == sidechain_type::hive) && (transfer_op.amount.asset_id == gpo.parameters.hive_asset())); From 4e2850f82625b910a9d46f58a86d16aa00a274b2 Mon Sep 17 00:00:00 2001 From: serkixenos Date: Fri, 17 Feb 2023 05:45:22 +0100 Subject: [PATCH 3/8] Fix libbitcoin build in docker and related README instructions --- Dockerfile | 1 + Dockerfile.18.04 | 1 + README.md | 1 + 3 files changed, 3 insertions(+) diff --git a/Dockerfile b/Dockerfile index d49bbe08b..7a229aa74 100644 --- a/Dockerfile +++ b/Dockerfile @@ -136,6 +136,7 @@ RUN \ RUN \ git clone https://github.com/libbitcoin/libbitcoin-build.git && \ cd libbitcoin-build && \ + git reset --hard 92c215fc1ffa272bab4d485d369d0306db52d69d && \ ./generate3.sh && \ cd ../libbitcoin-explorer && \ ./install.sh && \ diff --git a/Dockerfile.18.04 b/Dockerfile.18.04 index d09e01b21..9531d986d 100644 --- a/Dockerfile.18.04 +++ b/Dockerfile.18.04 @@ -136,6 +136,7 @@ RUN \ RUN \ git clone https://github.com/libbitcoin/libbitcoin-build.git && \ cd libbitcoin-build && \ + git reset --hard 92c215fc1ffa272bab4d485d369d0306db52d69d && \ ./generate3.sh && \ cd ../libbitcoin-explorer && \ ./install.sh && \ diff --git a/README.md b/README.md index c3533d29e..00b2fff33 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ libbitcoin-explorer setup: ``` git clone https://github.com/libbitcoin/libbitcoin-build.git cd libbitcoin-build +git reset --hard 92c215fc1ffa272bab4d485d369d0306db52d69d ./generate3.sh cd ../libbitcoin-explorer sudo ./install.sh From 2788281062205a0bc9904b31aeae4ef30f7e7643 Mon Sep 17 00:00:00 2001 From: Vlad Dobromyslov Date: Thu, 23 Feb 2023 17:55:49 +0200 Subject: [PATCH 4/8] #501 - concurrent_unordered_set for connection --- libraries/net/node.cpp | 826 ++++++++++++++++++++++++++--------------- 1 file changed, 527 insertions(+), 299 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 85e8c6762..c3198d1de 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -128,6 +128,124 @@ namespace graphene { namespace net { namespace detail { namespace bmi = boost::multi_index; + + /******* + * A class to wrap std::unordered_set for multithreading + */ + template , class Pred = std::equal_to > + class concurrent_unordered_set : private std::unordered_set + { + private: + mutable fc::mutex mux; + + public: + /// Iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard) + fc::mutex& get_mutex()const { return mux; } + + /// Insertion + /// @{ + std::pair< typename std::unordered_set::iterator, bool> emplace( Key key) + { + fc::scoped_lock lock(mux); + return std::unordered_set::emplace( key ); + } + std::pair< typename std::unordered_set::iterator, bool> insert (const Key& val) + { + fc::scoped_lock lock(mux); + return std::unordered_set::insert( val ); + } + /// @} + /// Size + /// @{ + size_t size() const + { + fc::scoped_lock lock(mux); + return std::unordered_set::size(); + } + bool empty() const noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::empty(); + } + /// @} + /// Removal + /// @{ + void clear() noexcept + { + fc::scoped_lock lock(mux); + std::unordered_set::clear(); + } + typename std::unordered_set::iterator erase( + typename std::unordered_set::const_iterator itr) + { + fc::scoped_lock lock(mux); + return std::unordered_set::erase( itr); + } + size_t erase( const Key& key) + { + fc::scoped_lock lock(mux); + return std::unordered_set::erase( key ); + } + /// @} + /// Swap + /// @{ + void swap( typename std::unordered_set& other ) noexcept + { + fc::scoped_lock lock(mux); + std::unordered_set::swap( other ); + } + /// @} + /// Iteration + /// @{ + typename std::unordered_set::iterator begin() noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(); + } + typename std::unordered_set::const_iterator begin() const noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(); + } + typename std::unordered_set::local_iterator begin(size_t n) + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(n); + } + typename std::unordered_set::const_local_iterator begin(size_t n) const + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(n); + } + typename std::unordered_set::iterator end() noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(); + } + typename std::unordered_set::const_iterator end() const noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(); + } + typename std::unordered_set::local_iterator end(size_t n) + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(n); + } + typename std::unordered_set::const_local_iterator end(size_t n) const + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(n); + } + /// @} + /// Search + typename std::unordered_set::const_iterator find(Key key) + { + fc::scoped_lock lock(mux); + return std::unordered_set::find(key); + } + }; + class blockchain_tied_message_cache { private: @@ -481,9 +599,9 @@ namespace graphene { namespace net { namespace detail { /// used by the task that advertises inventory during normal operation // @{ - fc::promise::ptr _retrigger_advertise_inventory_loop_promise; - fc::future _advertise_inventory_loop_done; - std::unordered_set _new_inventory; /// list of items we have received but not yet advertised to our peers + fc::promise::ptr _retrigger_advertise_inventory_loop_promise; + fc::future _advertise_inventory_loop_done; + concurrent_unordered_set _new_inventory; /// list of items we have received but not yet advertised to our peers // @} fc::future _terminate_inactive_connections_loop_done; @@ -519,13 +637,13 @@ namespace graphene { namespace net { namespace detail { /** Stores all connections which have not yet finished key exchange or are still sending initial handshaking messages * back and forth (not yet ready to initiate syncing) */ - std::unordered_set _handshaking_connections; + concurrent_unordered_set _handshaking_connections; /** stores fully established connections we're either syncing with or in normal operation with */ - std::unordered_set _active_connections; + concurrent_unordered_set _active_connections; /** stores connections we've closed (sent closing message, not actually closed), but are still waiting for the remote end to close before we delete them */ - std::unordered_set _closing_connections; + concurrent_unordered_set _closing_connections; /** stores connections we've closed, but are still waiting for the OS to notify us that the socket is really closed */ - std::unordered_set _terminating_connections; + concurrent_unordered_set _terminating_connections; boost::circular_buffer _most_recent_blocks_accepted; // the /n/ most recent blocks we've accepted (currently tuned to the max number of connections) @@ -854,16 +972,19 @@ namespace graphene { namespace net { namespace detail { ilog( "cleaning up node" ); _node_is_shutting_down.store(true); - for (const peer_connection_ptr& active_peer : _active_connections) { - fc::optional inbound_endpoint = active_peer->get_endpoint_for_connecting(); - if (inbound_endpoint) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_peer : _active_connections) { - fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint); - if (updated_peer_record) + fc::optional inbound_endpoint = active_peer->get_endpoint_for_connecting(); + if (inbound_endpoint) { - updated_peer_record->last_seen_time = fc::time_point::now(); - _potential_peer_db.update_entry(*updated_peer_record); + fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint); + if (updated_peer_record) + { + updated_peer_record->last_seen_time = fc::time_point::now(); + _potential_peer_db.update_entry(*updated_peer_record); + } } } } @@ -1061,6 +1182,7 @@ namespace graphene { namespace net { namespace detail { std::set sync_items_to_request; // for each idle peer that we're syncing with + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if( peer->we_need_sync_items_from_peer && @@ -1119,6 +1241,7 @@ namespace graphene { namespace net { namespace detail { bool node_impl::is_item_in_any_peers_inventory(const item_id& item) const { + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() ) @@ -1158,9 +1281,13 @@ namespace graphene { namespace net { namespace detail { fetch_messages_to_send_set items_by_peer; // initialize the fetch_messages_to_send with an empty set of items for all idle peers - for (const peer_connection_ptr& peer : _active_connections) - if (peer->idle()) - items_by_peer.insert(peer_and_items_to_fetch(peer)); + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { + if (peer->idle()) + items_by_peer.insert(peer_and_items_to_fetch(peer)); + } + } // now loop over all items we want to fetch for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();) @@ -1262,56 +1389,59 @@ namespace graphene { namespace net { namespace detail { dlog("beginning an iteration of advertise inventory"); // swap inventory into local variable, clearing the node's copy std::unordered_set inventory_to_advertise; - inventory_to_advertise.swap(_new_inventory); + _new_inventory.swap(inventory_to_advertise); // process all inventory to advertise and construct the inventory messages we'll send // first, then send them all in a batch (to avoid any fiber interruption points while // we're computing the messages) std::list > inventory_messages_to_send; - for (const peer_connection_ptr& peer : _active_connections) { - // only advertise to peers who are in sync with us - idump((peer->peer_needs_sync_items_from_us)); - if( !peer->peer_needs_sync_items_from_us ) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - std::map > items_to_advertise_by_type; - // don't send the peer anything we've already advertised to it - // or anything it has advertised to us - // group the items we need to send by type, because we'll need to send one inventory message per type - unsigned total_items_to_send_to_this_peer = 0; - idump((inventory_to_advertise)); - for (const item_id& item_to_advertise : inventory_to_advertise) + // only advertise to peers who are in sync with us + idump((peer->peer_needs_sync_items_from_us)); + if( !peer->peer_needs_sync_items_from_us ) { - auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise); - auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise); - - if (adv_to_peer == peer->inventory_advertised_to_peer.end() && - adv_to_us == peer->inventory_peer_advertised_to_us.end()) - { - items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash); - peer->inventory_advertised_to_peer.insert(peer_connection::timestamped_item_id(item_to_advertise, fc::time_point::now())); - ++total_items_to_send_to_this_peer; - if (item_to_advertise.item_type == trx_message_type) - testnetlog("advertising transaction ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); - dlog("advertising item ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); - } - else + std::map > items_to_advertise_by_type; + // don't send the peer anything we've already advertised to it + // or anything it has advertised to us + // group the items we need to send by type, because we'll need to send one inventory message per type + unsigned total_items_to_send_to_this_peer = 0; + idump((inventory_to_advertise)); + for (const item_id& item_to_advertise : inventory_to_advertise) { - if (adv_to_peer != peer->inventory_advertised_to_peer.end() ) + auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise); + auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise); + + if (adv_to_peer == peer->inventory_advertised_to_peer.end() && + adv_to_us == peer->inventory_peer_advertised_to_us.end()) + { + items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash); + peer->inventory_advertised_to_peer.insert(peer_connection::timestamped_item_id(item_to_advertise, fc::time_point::now())); + ++total_items_to_send_to_this_peer; + if (item_to_advertise.item_type == trx_message_type) + testnetlog("advertising transaction ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); + dlog("advertising item ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); + } + else + { + if (adv_to_peer != peer->inventory_advertised_to_peer.end() ) idump( (*adv_to_peer) ); - if (adv_to_us != peer->inventory_peer_advertised_to_us.end() ) + if (adv_to_us != peer->inventory_peer_advertised_to_us.end() ) idump( (*adv_to_us) ); + } } - } dlog("advertising ${count} new item(s) of ${types} type(s) to peer ${endpoint}", ("count", total_items_to_send_to_this_peer) - ("types", items_to_advertise_by_type.size()) - ("endpoint", peer->get_remote_endpoint())); - for (auto items_group : items_to_advertise_by_type) - inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second))); + ("types", items_to_advertise_by_type.size()) + ("endpoint", peer->get_remote_endpoint())); + for (auto items_group : items_to_advertise_by_type) + inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second))); + } + peer->clear_old_inventory(); } - peer->clear_old_inventory(); } for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter) @@ -1360,25 +1490,30 @@ namespace graphene { namespace net { namespace detail { uint32_t handshaking_timeout = _peer_inactivity_timeout; fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout); - for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) - if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && - handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && - handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold ) + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) { - wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds", - ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) ); - wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", - ("status", handshaking_peer->negotiation_status) - ("sent", handshaking_peer->get_total_bytes_sent()) - ("received", handshaking_peer->get_total_bytes_received())); - handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", - ("peer", handshaking_peer->get_remote_endpoint()) - ("timeout", handshaking_timeout) - ("status", handshaking_peer->negotiation_status) - ("sent", handshaking_peer->get_total_bytes_sent()) - ("received", handshaking_peer->get_total_bytes_received()))); - peers_to_disconnect_forcibly.push_back( handshaking_peer ); + if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && + handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && + handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold ) + { + wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds", + ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) ); + wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", + ("status", handshaking_peer->negotiation_status) + ("sent", handshaking_peer->get_total_bytes_sent()) + ("received", handshaking_peer->get_total_bytes_received())); + handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", + ("peer", handshaking_peer->get_remote_endpoint()) + ("timeout", handshaking_timeout) + ("status", handshaking_peer->negotiation_status) + ("sent", handshaking_peer->get_total_bytes_sent()) + ("received", handshaking_peer->get_total_bytes_received()))); + peers_to_disconnect_forcibly.push_back( handshaking_peer ); + } } + } // timeout for any active peers is two block intervals uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds; @@ -1398,94 +1533,103 @@ namespace graphene { namespace net { namespace detail { fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout); fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout); fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout; - for( const peer_connection_ptr& active_peer : _active_connections ) { - if( active_peer->connection_initiation_time < active_disconnect_threshold && - active_peer->get_last_message_received_time() < active_disconnect_threshold ) + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& active_peer : _active_connections ) { - wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds", - ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) ); - peers_to_disconnect_gently.push_back( active_peer ); - } - else - { - bool disconnect_due_to_request_timeout = false; - for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->sync_items_requested_from_peer) - if (item_and_time.second < active_ignored_request_threshold) - { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ${id}", - ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); - disconnect_due_to_request_timeout = true; - break; - } - if (!disconnect_due_to_request_timeout && - active_peer->item_ids_requested_from_peer && - active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) - { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}", - ("peer", active_peer->get_remote_endpoint()) - ("synopsis", active_peer->item_ids_requested_from_peer->get<0>())); - disconnect_due_to_request_timeout = true; - } - if (!disconnect_due_to_request_timeout) - for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer) + if( active_peer->connection_initiation_time < active_disconnect_threshold && + active_peer->get_last_message_received_time() < active_disconnect_threshold ) + { + wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds", + ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) ); + peers_to_disconnect_gently.push_back( active_peer ); + } + else + { + bool disconnect_due_to_request_timeout = false; + for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->sync_items_requested_from_peer) if (item_and_time.second < active_ignored_request_threshold) { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}", - ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); + wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ${id}", + ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); disconnect_due_to_request_timeout = true; break; } - if (disconnect_due_to_request_timeout) - { - // we should probably disconnect nicely and give them a reason, but right now the logic - // for rescheduling the requests only executes when the connection is fully closed, - // and we want to get those requests rescheduled as soon as possible - peers_to_disconnect_forcibly.push_back(active_peer); - } - else if (active_peer->connection_initiation_time < active_send_keepalive_threshold && - active_peer->get_last_message_received_time() < active_send_keepalive_threshold) - { - wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", - ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); - peers_to_send_keep_alive.push_back(active_peer); - } - else if (active_peer->we_need_sync_items_from_peer && - !active_peer->is_currently_handling_message() && - !active_peer->item_ids_requested_from_peer && - active_peer->ids_of_items_to_get.empty()) - { - // This is a state we should never get into in the first place, but if we do, we should disconnect the peer - // to re-establish the connection. - fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", - ("peer", active_peer->get_remote_endpoint())); - wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", - ("peer", active_peer->get_remote_endpoint())); - peers_to_disconnect_forcibly.push_back(active_peer); + if (!disconnect_due_to_request_timeout && + active_peer->item_ids_requested_from_peer && + active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}", + ("peer", active_peer->get_remote_endpoint()) + ("synopsis", active_peer->item_ids_requested_from_peer->get<0>())); + disconnect_due_to_request_timeout = true; + } + if (!disconnect_due_to_request_timeout) + for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer) + if (item_and_time.second < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}", + ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); + disconnect_due_to_request_timeout = true; + break; + } + if (disconnect_due_to_request_timeout) + { + // we should probably disconnect nicely and give them a reason, but right now the logic + // for rescheduling the requests only executes when the connection is fully closed, + // and we want to get those requests rescheduled as soon as possible + peers_to_disconnect_forcibly.push_back(active_peer); + } + else if (active_peer->connection_initiation_time < active_send_keepalive_threshold && + active_peer->get_last_message_received_time() < active_send_keepalive_threshold) + { + wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", + ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); + peers_to_send_keep_alive.push_back(active_peer); + } + else if (active_peer->we_need_sync_items_from_peer && + !active_peer->is_currently_handling_message() && + !active_peer->item_ids_requested_from_peer && + active_peer->ids_of_items_to_get.empty()) + { + // This is a state we should never get into in the first place, but if we do, we should disconnect the peer + // to re-establish the connection. + fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(active_peer); + } } } } fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT); - for( const peer_connection_ptr& closing_peer : _closing_connections ) - if( closing_peer->connection_closed_time < closing_disconnect_threshold ) - { - // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT - // seconds ago, but they haven't done it yet. Terminate the connection now - wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner", - ( "peer", closing_peer->get_remote_endpoint() ) ); - peers_to_disconnect_forcibly.push_back( closing_peer ); + { + fc::scoped_lock lock(_closing_connections.get_mutex()); + for( const peer_connection_ptr& closing_peer : _closing_connections ) { + if (closing_peer->connection_closed_time < closing_disconnect_threshold) { + // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT + // seconds ago, but they haven't done it yet. Terminate the connection now + wlog("Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner", + ("peer", closing_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(closing_peer); + } } + } uint32_t failed_terminate_timeout_seconds = 120; fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds); - for (const peer_connection_ptr& peer : _terminating_connections ) - if (peer->get_connection_terminated_time() != fc::time_point::min() && - peer->get_connection_terminated_time() < failed_terminate_threshold) - { - wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint())); - peers_to_terminate.push_back(peer); + { + fc::scoped_lock lock(_terminating_connections.get_mutex()); + for (const peer_connection_ptr& peer : _terminating_connections ) { + if (peer->get_connection_terminated_time() != fc::time_point::min() && + peer->get_connection_terminated_time() < failed_terminate_threshold) { + wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint())); + peers_to_terminate.push_back(peer); + } } + } // That's the end of the sorting step; now all peers that require further processing are now in one of the // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate @@ -1493,11 +1637,14 @@ namespace graphene { namespace net { namespace detail { // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield, // and once we start yielding, we may find that we've moved that peer to another list (closed or active) // and that triggers assertions, maybe even errors - for (const peer_connection_ptr& peer : peers_to_terminate ) { - assert(_terminating_connections.find(peer) != _terminating_connections.end()); - _terminating_connections.erase(peer); - schedule_peer_for_deletion(peer); + fc::scoped_lock lock(_terminating_connections.get_mutex()); + for (const peer_connection_ptr& peer : peers_to_terminate ) + { + assert(_terminating_connections.find(peer) != _terminating_connections.end()); + _terminating_connections.erase(peer); + schedule_peer_for_deletion(peer); + } } peers_to_terminate.clear(); @@ -1516,6 +1663,7 @@ namespace graphene { namespace net { namespace detail { // disconnect reason, so it may yield) for( const peer_connection_ptr& peer : peers_to_disconnect_gently ) { + fc::scoped_lock lock(_active_connections.get_mutex()); fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity", ( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) @@ -1539,6 +1687,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); + fc::scoped_lock lock(_active_connections.get_mutex()); std::list original_active_peers(_active_connections.begin(), _active_connections.end()); for( const peer_connection_ptr& active_peer : original_active_peers ) { @@ -1710,12 +1859,19 @@ namespace graphene { namespace net { namespace detail { peer_connection_ptr node_impl::get_peer_by_node_id(const node_id_t& node_id) { - for (const peer_connection_ptr& active_peer : _active_connections) - if (node_id == active_peer->node_id) - return active_peer; - for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) - if (node_id == handshaking_peer->node_id) - return handshaking_peer; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_peer : _active_connections) + if (node_id == active_peer->node_id) + return active_peer; + } + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) + if (node_id == handshaking_peer->node_id) + return handshaking_peer; + } + return peer_connection_ptr(); } @@ -1727,18 +1883,25 @@ namespace graphene { namespace net { namespace detail { dlog("is_already_connected_to_id returning true because the peer is us"); return true; } - for (const peer_connection_ptr active_peer : _active_connections) - if (node_id == active_peer->node_id) - { - dlog("is_already_connected_to_id returning true because the peer is already in our active list"); - return true; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr active_peer : _active_connections) { + if (node_id == active_peer->node_id) { + dlog("is_already_connected_to_id returning true because the peer is already in our active list"); + return true; + } } - for (const peer_connection_ptr handshaking_peer : _handshaking_connections) - if (node_id == handshaking_peer->node_id) - { - dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list"); - return true; + } + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr handshaking_peer : _handshaking_connections) { + if (node_id == handshaking_peer->node_id) { + dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list"); + return true; + } } + } + return false; } @@ -1770,19 +1933,25 @@ namespace graphene { namespace net { namespace detail { ("max", _maximum_number_of_connections)); dlog(" my id is ${id}", ("id", _node_id)); - for (const peer_connection_ptr& active_connection : _active_connections) { - dlog(" active: ${endpoint} with ${id} [${direction}]", - ("endpoint", active_connection->get_remote_endpoint()) - ("id", active_connection->node_id) - ("direction", active_connection->direction)); + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_connection : _active_connections) + { + dlog(" active: ${endpoint} with ${id} [${direction}]", + ("endpoint", active_connection->get_remote_endpoint()) + ("id", active_connection->node_id) + ("direction", active_connection->direction)); + } } - for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) { - dlog(" handshaking: ${endpoint} with ${id} [${direction}]", - ("endpoint", handshaking_connection->get_remote_endpoint()) - ("id", handshaking_connection->node_id) - ("direction", handshaking_connection->direction)); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) + { + dlog(" handshaking: ${endpoint} with ${id} [${direction}]", + ("endpoint", handshaking_connection->get_remote_endpoint()) + ("id", handshaking_connection->node_id) + ("direction", handshaking_connection->direction)); + } } } @@ -2229,6 +2398,7 @@ namespace graphene { namespace net { namespace detail { if (!_peer_advertising_disabled) { reply.addresses.reserve(_active_connections.size()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& active_peer : _active_connections) { fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint()); @@ -2414,11 +2584,14 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); uint32_t max_number_of_unfetched_items = 0; - for( const peer_connection_ptr& peer : _active_connections ) { - uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids; - max_number_of_unfetched_items = std::max(max_number_of_unfetched_items, - this_peer_number_of_unfetched_items); + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& peer : _active_connections ) + { + uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids; + max_number_of_unfetched_items = std::max(max_number_of_unfetched_items, + this_peer_number_of_unfetched_items); + } } return max_number_of_unfetched_items; } @@ -2633,17 +2806,19 @@ namespace graphene { namespace net { namespace detail { originating_peer->ids_of_items_to_get.empty()) { bool is_first_item_for_other_peer = false; - for (const peer_connection_ptr& peer : _active_connections) - if (peer != originating_peer->shared_from_this() && - !peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - { - dlog("The item ${newitem} is the first item for peer ${peer}", - ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - ("peer", peer->get_remote_endpoint())); - is_first_item_for_other_peer = true; - break; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { + if (peer != originating_peer->shared_from_this() && + !peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) { + dlog("The item ${newitem} is the first item for peer ${peer}", + ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())("peer", peer->get_remote_endpoint())); + is_first_item_for_other_peer = true; + break; + } } + } dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); if (!is_first_item_for_other_peer) @@ -2933,15 +3108,18 @@ namespace graphene { namespace net { namespace detail { item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash); bool we_advertised_this_item_to_a_peer = false; bool we_requested_this_item_from_a_peer = false; - for (const peer_connection_ptr peer : _active_connections) { - if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end()) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr peer : _active_connections) { - we_advertised_this_item_to_a_peer = true; - break; + if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end()) + { + we_advertised_this_item_to_a_peer = true; + break; + } + if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end()) + we_requested_this_item_from_a_peer = true; } - if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end()) - we_requested_this_item_from_a_peer = true; } // if we have already advertised it to a peer, we must have it, no need to do anything else @@ -3172,6 +3350,7 @@ namespace graphene { namespace net { namespace detail { }; bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -3254,6 +3433,7 @@ namespace graphene { namespace net { namespace detail { else { // invalid message received + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -3356,15 +3536,18 @@ namespace graphene { namespace net { namespace detail { // find out if this block is the next block on the active chain or one of the forks bool potential_first_block = false; - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - if (!peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == received_block_iter->block_id) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - potential_first_block = true; - peer->ids_of_items_to_get.pop_front(); - peer->ids_of_items_being_processed.insert(received_block_iter->block_id); + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + if (!peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == received_block_iter->block_id) + { + potential_first_block = true; + peer->ids_of_items_to_get.pop_front(); + peer->ids_of_items_being_processed.insert(received_block_iter->block_id); + } } } @@ -3392,6 +3575,7 @@ namespace graphene { namespace net { namespace detail { { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); std::vector< peer_connection_ptr > peers_needing_next_batch; + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); @@ -3527,55 +3711,62 @@ namespace graphene { namespace net { namespace detail { fc::time_point_sec block_time = block_message_to_process.block.timestamp; bool disconnect_this_peer = false; - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - - auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id); - if (iter != peer->inventory_peer_advertised_to_us.end()) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - // this peer offered us the item. It will eventually expire from the peer's - // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes). - // For now, it will remain there, which will prevent us from offering the peer this - // block back when we rebroadcast the block below - peer->last_block_delegate_has_seen = block_message_to_process.block_id; - peer->last_block_time_delegate_has_seen = block_time; + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + + auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id); + if (iter != peer->inventory_peer_advertised_to_us.end()) + { + // this peer offered us the item. It will eventually expire from the peer's + // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes). + // For now, it will remain there, which will prevent us from offering the peer this + // block back when we rebroadcast the block below + peer->last_block_delegate_has_seen = block_message_to_process.block_id; + peer->last_block_time_delegate_has_seen = block_time; + } + peer->clear_old_inventory(); } - peer->clear_old_inventory(); } + message_propagation_data propagation_data{message_receive_time, message_validated_time, originating_peer->node_id}; broadcast( block_message_to_process, propagation_data ); _message_cache.block_accepted(); - for (const peer_connection_ptr& peer : _active_connections) { - if (is_hard_fork_block(block_number) ) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - if (peer->last_known_fork_block_number != 0) + if (is_hard_fork_block(block_number) ) { - uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number); - if (next_fork_block_number != 0 && - next_fork_block_number <= block_number) + if (peer->last_known_fork_block_number != 0) { - disconnect_this_peer = true; + uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number); + if (next_fork_block_number != 0 && + next_fork_block_number <= block_number) + { + disconnect_this_peer = true; + } } } - } - if(peer->last_known_hardfork_time < _delegate->get_last_known_hardfork_time()) - { - if(block_message_to_process.block.timestamp.sec_since_epoch() >= _delegate->get_last_known_hardfork_time().sec_since_epoch()) + if(peer->last_known_hardfork_time < _delegate->get_last_known_hardfork_time()) { - disconnect_this_peer = true; + if(block_message_to_process.block.timestamp.sec_since_epoch() >= _delegate->get_last_known_hardfork_time().sec_since_epoch()) + { + disconnect_this_peer = true; + } } - } - if( disconnect_this_peer ) - { - peers_to_disconnect.insert(peer); + if( disconnect_this_peer ) + { + peers_to_disconnect.insert(peer); #ifdef ENABLE_DEBUG_ULOGS - ulog("Disconnecting from peer because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp)); + ulog("Disconnecting from peer because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp)); #endif + } } } @@ -3614,9 +3805,11 @@ namespace graphene { namespace net { namespace detail { disconnect_reason = "You offered me a block that I have deemed to be invalid"; peers_to_disconnect.insert( originating_peer->shared_from_this() ); - for (const peer_connection_ptr& peer : _active_connections) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id) peers_to_disconnect.insert(peer); + } } if (restart_sync_exception) @@ -3737,25 +3930,29 @@ namespace graphene { namespace net { namespace detail { void node_impl::forward_firewall_check_to_next_available_peer(firewall_check_state_data* firewall_check_state) { - for (const peer_connection_ptr& peer : _active_connections) { - if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test - !peer->firewall_check_state && // the peer isn't already performing a check for another node - firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() && - peer->core_protocol_version >= 106) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - wlog("forwarding firewall check for node ${to_check} to peer ${checker}", - ("to_check", firewall_check_state->endpoint_to_test) - ("checker", peer->get_remote_endpoint())); - firewall_check_state->nodes_already_tested.insert(peer->node_id); - peer->firewall_check_state = firewall_check_state; - check_firewall_message check_request; - check_request.endpoint_to_check = firewall_check_state->endpoint_to_test; - check_request.node_id = firewall_check_state->expected_node_id; - peer->send_message(check_request); - return; + if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test + !peer->firewall_check_state && // the peer isn't already performing a check for another node + firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() && + peer->core_protocol_version >= 106) + { + wlog("forwarding firewall check for node ${to_check} to peer ${checker}", + ("to_check", firewall_check_state->endpoint_to_test) + ("checker", peer->get_remote_endpoint())); + firewall_check_state->nodes_already_tested.insert(peer->node_id); + peer->firewall_check_state = firewall_check_state; + check_firewall_message check_request; + check_request.endpoint_to_check = firewall_check_state->endpoint_to_test; + check_request.node_id = firewall_check_state->expected_node_id; + peer->send_message(check_request); + return; + } } } + wlog("Unable to forward firewall check for node ${to_check} to any other peers, returning 'unable'", ("to_check", firewall_check_state->endpoint_to_test)); @@ -3928,41 +4125,45 @@ namespace graphene { namespace net { namespace detail { } fc::time_point now = fc::time_point::now(); - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - - current_connection_data data_for_this_peer; - data_for_this_peer.connection_duration = now.sec_since_epoch() - peer->connection_initiation_time.sec_since_epoch(); - if (peer->get_remote_endpoint()) // should always be set for anyone we're actively connected to - data_for_this_peer.remote_endpoint = *peer->get_remote_endpoint(); - data_for_this_peer.clock_offset = peer->clock_offset; - data_for_this_peer.round_trip_delay = peer->round_trip_delay; - data_for_this_peer.node_id = peer->node_id; - data_for_this_peer.connection_direction = peer->direction; - data_for_this_peer.firewalled = peer->is_firewalled; - fc::mutable_variant_object user_data; - if (peer->graphene_git_revision_sha) - user_data["graphene_git_revision_sha"] = *peer->graphene_git_revision_sha; - if (peer->graphene_git_revision_unix_timestamp) - user_data["graphene_git_revision_unix_timestamp"] = *peer->graphene_git_revision_unix_timestamp; - if (peer->fc_git_revision_sha) - user_data["fc_git_revision_sha"] = *peer->fc_git_revision_sha; - if (peer->fc_git_revision_unix_timestamp) - user_data["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp; - if (peer->platform) - user_data["platform"] = *peer->platform; - if (peer->bitness) - user_data["bitness"] = *peer->bitness; - user_data["user_agent"] = peer->user_agent; - - user_data["last_known_block_hash"] = fc::variant( peer->last_block_delegate_has_seen, 1 ); - user_data["last_known_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen); - user_data["last_known_block_time"] = peer->last_block_time_delegate_has_seen; + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + { + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - data_for_this_peer.user_data = user_data; - reply.current_connections.emplace_back(data_for_this_peer); + current_connection_data data_for_this_peer; + data_for_this_peer.connection_duration = now.sec_since_epoch() - peer->connection_initiation_time.sec_since_epoch(); + if (peer->get_remote_endpoint()) // should always be set for anyone we're actively connected to + data_for_this_peer.remote_endpoint = *peer->get_remote_endpoint(); + data_for_this_peer.clock_offset = peer->clock_offset; + data_for_this_peer.round_trip_delay = peer->round_trip_delay; + data_for_this_peer.node_id = peer->node_id; + data_for_this_peer.connection_direction = peer->direction; + data_for_this_peer.firewalled = peer->is_firewalled; + fc::mutable_variant_object user_data; + if (peer->graphene_git_revision_sha) + user_data["graphene_git_revision_sha"] = *peer->graphene_git_revision_sha; + if (peer->graphene_git_revision_unix_timestamp) + user_data["graphene_git_revision_unix_timestamp"] = *peer->graphene_git_revision_unix_timestamp; + if (peer->fc_git_revision_sha) + user_data["fc_git_revision_sha"] = *peer->fc_git_revision_sha; + if (peer->fc_git_revision_unix_timestamp) + user_data["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp; + if (peer->platform) + user_data["platform"] = *peer->platform; + if (peer->bitness) + user_data["bitness"] = *peer->bitness; + user_data["user_agent"] = peer->user_agent; + + user_data["last_known_block_hash"] = fc::variant( peer->last_block_delegate_has_seen, 1 ); + user_data["last_known_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen); + user_data["last_known_block_time"] = peer->last_block_time_delegate_has_seen; + + data_for_this_peer.user_data = user_data; + reply.current_connections.emplace_back(data_for_this_peer); + } } + originating_peer->send_message(reply); } @@ -4047,6 +4248,7 @@ namespace graphene { namespace net { namespace detail { void node_impl::start_synchronizing() { + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) start_synchronizing_with_peer( peer ); } @@ -4253,9 +4455,19 @@ namespace graphene { namespace net { namespace detail { // the read loop before it gets an EOF). // operate off copies of the lists in case they change during iteration std::list all_peers; - boost::push_back(all_peers, _active_connections); - boost::push_back(all_peers, _handshaking_connections); - boost::push_back(all_peers, _closing_connections); + auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); }; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + std::for_each(_active_connections.begin(), _active_connections.end(), p_back); + } + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back); + } + { + fc::scoped_lock lock(_closing_connections.get_mutex()); + std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back); + } for (const peer_connection_ptr& peer : all_peers) { @@ -4521,9 +4733,7 @@ namespace graphene { namespace net { namespace detail { // whether the peer is firewalled, we want to disconnect now. _handshaking_connections.erase(new_peer); _terminating_connections.erase(new_peer); - assert(_active_connections.find(new_peer) == _active_connections.end()); _active_connections.erase(new_peer); - assert(_closing_connections.find(new_peer) == _closing_connections.end()); _closing_connections.erase(new_peer); display_current_connections(); @@ -4867,18 +5077,25 @@ namespace graphene { namespace net { namespace detail { peer_connection_ptr node_impl::get_connection_to_endpoint( const fc::ip::endpoint& remote_endpoint ) { VERIFY_CORRECT_THREAD(); - for( const peer_connection_ptr& active_peer : _active_connections ) { - fc::optional endpoint_for_this_peer( active_peer->get_remote_endpoint() ); - if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) - return active_peer; + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& active_peer : _active_connections ) + { + fc::optional endpoint_for_this_peer( active_peer->get_remote_endpoint() ); + if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) + return active_peer; + } } - for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) { - fc::optional endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() ); - if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) - return handshaking_peer; + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) + { + fc::optional endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() ); + if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) + return handshaking_peer; + } } + return peer_connection_ptr(); } @@ -4922,21 +5139,27 @@ namespace graphene { namespace net { namespace detail { ilog( " number of peers: ${active} active, ${handshaking}, ${closing} closing. attempting to maintain ${desired} - ${maximum} peers", ( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() ) ( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) ); - for( const peer_connection_ptr& peer : _active_connections ) { - ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}", - ( "endpoint", peer->get_remote_endpoint() ) - ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) ); - if( peer->we_need_sync_items_from_peer ) - ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) ); - if (peer->inhibit_fetching_sync_blocks) - ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" ); + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& peer : _active_connections ) + { + ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}", + ( "endpoint", peer->get_remote_endpoint() ) + ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) ); + if( peer->we_need_sync_items_from_peer ) + ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) ); + if (peer->inhibit_fetching_sync_blocks) + ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" ); + } } - for( const peer_connection_ptr& peer : _handshaking_connections ) { - ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})", - ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) ); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr& peer : _handshaking_connections ) + { + ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})", + ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) ); + } } ilog( "--------- MEMORY USAGE ------------" ); @@ -4946,6 +5169,7 @@ namespace graphene { namespace net { namespace detail { ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) ); ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) ); ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) ); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) ); @@ -5043,6 +5267,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); std::vector statuses; + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -5233,10 +5458,13 @@ namespace graphene { namespace net { namespace detail { _allowed_peers.clear(); _allowed_peers.insert(allowed_peers.begin(), allowed_peers.end()); std::list peers_to_disconnect; - if (!_allowed_peers.empty()) - for (const peer_connection_ptr& peer : _active_connections) + if (!_allowed_peers.empty()) { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr &peer : _active_connections) { if (_allowed_peers.find(peer->node_id) == _allowed_peers.end()) peers_to_disconnect.push_back(peer); + } + } for (const peer_connection_ptr& peer : peers_to_disconnect) disconnect_from_peer(peer.get(), "My allowed_peers list has changed, and you're no longer allowed. Bye."); #endif // ENABLE_P2P_DEBUGGING_API From 7af3d037b520aafde294e0353c366da2e28bf68f Mon Sep 17 00:00:00 2001 From: Vlad Dobromyslov Date: Fri, 24 Feb 2023 09:31:51 +0000 Subject: [PATCH 5/8] #495 hive wallet update --- .../sidechain_net_handler_bitcoin.cpp | 8 +++++-- .../sidechain_net_handler_ethereum.cpp | 3 ++- .../sidechain_net_handler_hive.cpp | 21 ++++++++++++++++--- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp index f37352bb5..c528d8a45 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp @@ -755,7 +755,7 @@ sidechain_net_handler_bitcoin::~sidechain_net_handler_bitcoin() { bool sidechain_net_handler_bitcoin::process_proposal(const proposal_object &po) { - // ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain))); + ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain))); bool should_approve = false; @@ -832,7 +832,7 @@ bool sidechain_net_handler_bitcoin::process_proposal(const proposal_object &po) std::string op_tx_str = op_obj_idx_1.get().transaction; const auto &st_idx = database.get_index_type().indices().get(); - const auto st = st_idx.find(obj_id); + const auto st = st_idx.find(object_id); if (st == st_idx.end()) { std::string tx_str = ""; @@ -1052,6 +1052,10 @@ void sidechain_net_handler_bitcoin::process_primary_wallet() { return; } + if (!plugin.can_son_participate(sidechain, chain::operation::tag::value, op_id)) { + return; + } + const chain::global_property_object &gpo = database.get_global_properties(); const auto &active_sons = gpo.active_sons.at(sidechain); diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp index a3be86496..bb6a9821c 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp @@ -205,6 +205,7 @@ sidechain_net_handler_ethereum::~sidechain_net_handler_ethereum() { } bool sidechain_net_handler_ethereum::process_proposal(const proposal_object &po) { + ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain))); bool should_approve = false; @@ -263,7 +264,7 @@ bool sidechain_net_handler_ethereum::process_proposal(const proposal_object &po) const std::string op_tx_str = op_obj_idx_1.get().transaction; const auto &st_idx = database.get_index_type().indices().get(); - const auto st = st_idx.find(obj_id); + const auto st = st_idx.find(object_id); if (st == st_idx.end()) { std::string tx_str = ""; diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp index 6ad958503..6a54b7f57 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp @@ -180,7 +180,8 @@ sidechain_net_handler_hive::~sidechain_net_handler_hive() { } bool sidechain_net_handler_hive::process_proposal(const proposal_object &po) { - //ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain))); + + ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain))); bool should_approve = false; @@ -238,7 +239,7 @@ bool sidechain_net_handler_hive::process_proposal(const proposal_object &po) { std::string op_tx_str = op_obj_idx_1.get().transaction; const auto &st_idx = database.get_index_type().indices().get(); - const auto st = st_idx.find(obj_id); + const auto st = st_idx.find(object_id); if (st == st_idx.end()) { std::string tx_str = ""; @@ -499,6 +500,10 @@ void sidechain_net_handler_hive::process_primary_wallet() { return; } + if (!plugin.can_son_participate(sidechain, chain::operation::tag::value, op_id)) { + return; + } + const chain::global_property_object &gpo = database.get_global_properties(); const auto &active_sons = gpo.active_sons.at(sidechain); @@ -577,7 +582,7 @@ void sidechain_net_handler_hive::process_primary_wallet() { stc_op.object_id = op_id; stc_op.sidechain = sidechain; stc_op.transaction = tx_str; - for (const auto &signer : gpo.active_sons.at(sidechain)) { + for (const auto &signer : signers) { son_info si; si.son_id = signer.son_id; si.weight = signer.weight; @@ -639,6 +644,11 @@ void sidechain_net_handler_hive::process_sidechain_addresses() { } bool sidechain_net_handler_hive::process_deposit(const son_wallet_deposit_object &swdo) { + + if (proposal_exists(chain::operation::tag::value, swdo.id)) { + return false; + } + const chain::global_property_object &gpo = database.get_global_properties(); price asset_price; @@ -685,6 +695,11 @@ bool sidechain_net_handler_hive::process_deposit(const son_wallet_deposit_object } bool sidechain_net_handler_hive::process_withdrawal(const son_wallet_withdraw_object &swwo) { + + if (proposal_exists(chain::operation::tag::value, swwo.id)) { + return false; + } + const chain::global_property_object &gpo = database.get_global_properties(); //===== From bfb961c7be078abbf61c7dff047f5b55dfaa9ee4 Mon Sep 17 00:00:00 2001 From: Milo M Date: Fri, 24 Feb 2023 09:36:10 +0000 Subject: [PATCH 6/8] automated test for nft_lottery --- tests/tests/nft_lottery_tests.cpp | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/tests/nft_lottery_tests.cpp b/tests/tests/nft_lottery_tests.cpp index 02d8bdc18..e0ecf5bbc 100644 --- a/tests/tests/nft_lottery_tests.cpp +++ b/tests/tests/nft_lottery_tests.cpp @@ -193,6 +193,34 @@ BOOST_AUTO_TEST_CASE(tickets_purchase_fail_test) } } +BOOST_AUTO_TEST_CASE(tickets_purchase_overflow) +{ + try + { + nft_metadata_id_type test_nft_md_id = db.get_index().get_next_id(); + INVOKE(create_lottery_nft_md_test); + auto &test_nft_md_obj = test_nft_md_id(db); + + nft_lottery_token_purchase_operation tpo; + tpo.fee = asset(); + tpo.buyer = account_id_type(); + tpo.lottery_id = test_nft_md_obj.id; + tpo.tickets_to_buy = 9223372036854775800; // Large number so that the overall amount overflows + trx.operations.push_back(tpo); + BOOST_REQUIRE_THROW(PUSH_TX(db, trx, ~0), fc::overflow_exception); + trx.operations.clear(); + + tpo.tickets_to_buy = -2; // Negative value should also be rejected + trx.operations.push_back(tpo); + BOOST_REQUIRE_THROW(PUSH_TX(db, trx, ~0), fc::exception); + } + catch (fc::exception &e) + { + edump((e.to_detail_string())); + throw; + } +} + BOOST_AUTO_TEST_CASE(lottery_end_by_stage_test) { try From 79974280c0a627e82fd6d07b55ed4b01ac533f1f Mon Sep 17 00:00:00 2001 From: timur <12267899-timur.5@users.noreply.gitlab.com> Date: Wed, 1 Mar 2023 06:01:52 +0000 Subject: [PATCH 7/8] SON connection pool --- .../peerplays_sidechain/common/rpc_client.cpp | 163 ++++++++++++++++-- .../peerplays_sidechain/common/rpc_client.hpp | 50 +++--- .../sidechain_net_handler.hpp | 6 +- .../sidechain_net_handler_bitcoin.hpp | 9 +- .../sidechain_net_handler_ethereum.hpp | 12 +- .../sidechain_net_handler_hive.hpp | 9 +- .../peerplays_sidechain_plugin.cpp | 10 +- .../sidechain_net_handler.cpp | 3 +- .../sidechain_net_handler_bitcoin.cpp | 51 ++++-- .../sidechain_net_handler_ethereum.cpp | 34 +++- .../sidechain_net_handler_hive.cpp | 37 +++- .../sidechain_net_handler_peerplays.cpp | 3 +- 12 files changed, 301 insertions(+), 86 deletions(-) diff --git a/libraries/plugins/peerplays_sidechain/common/rpc_client.cpp b/libraries/plugins/peerplays_sidechain/common/rpc_client.cpp index ac5267156..be050544c 100644 --- a/libraries/plugins/peerplays_sidechain/common/rpc_client.cpp +++ b/libraries/plugins/peerplays_sidechain/common/rpc_client.cpp @@ -18,10 +18,40 @@ namespace graphene { namespace peerplays_sidechain { -rpc_client::rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls) : - url(_url), - user(_user), - password(_password), +struct rpc_reply { + uint16_t status; + std::string body; +}; + +class rpc_connection { +public: + rpc_connection(const rpc_credentials &_credentials, bool _debug_rpc_calls); + + std::string send_post_request(std::string method, std::string params, bool show_log); + std::string get_url() const; + +protected: + rpc_credentials credentials; + bool debug_rpc_calls; + + std::string protocol; + std::string host; + std::string port; + std::string target; + std::string authorization; + + uint32_t request_id; + +private: + rpc_reply send_post_request(std::string body, bool show_log); + + boost::beast::net::io_context ioc; + boost::beast::net::ip::tcp::resolver resolver; + boost::asio::ip::basic_resolver_results results; +}; + +rpc_connection::rpc_connection(const rpc_credentials &_credentials, bool _debug_rpc_calls) : + credentials(_credentials), debug_rpc_calls(_debug_rpc_calls), request_id(0), resolver(ioc) { @@ -31,7 +61,7 @@ rpc_client::rpc_client(std::string _url, std::string _user, std::string _passwor boost::xpressive::smatch sm; - if (boost::xpressive::regex_search(url, sm, sr)) { + if (boost::xpressive::regex_search(credentials.url, sm, sr)) { protocol = sm["Protocol"]; if (protocol.empty()) { protocol = "http"; @@ -52,15 +82,19 @@ rpc_client::rpc_client(std::string _url, std::string _user, std::string _passwor target = "/"; } - authorization = "Basic " + base64_encode(user + ":" + password); + authorization = "Basic " + base64_encode(credentials.user + ":" + credentials.password); results = resolver.resolve(host, port); } else { - elog("Invalid URL: ${url}", ("url", url)); + elog("Invalid URL: ${url}", ("url", credentials.url)); } } +std::string rpc_connection::get_url() const { + return credentials.url; +} + std::string rpc_client::retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx) { if (reply_str.empty()) { wlog("RPC call ${function}, empty reply string", ("function", __FUNCTION__)); @@ -125,7 +159,7 @@ std::string rpc_client::retrieve_value_from_reply(std::string reply_str, std::st return ""; } -std::string rpc_client::send_post_request(std::string method, std::string params, bool show_log) { +std::string rpc_connection::send_post_request(std::string method, std::string params, bool show_log) { std::stringstream body; request_id = request_id + 1; @@ -164,7 +198,7 @@ std::string rpc_client::send_post_request(std::string method, std::string params return ""; } -rpc_reply rpc_client::send_post_request(std::string body, bool show_log) { +rpc_reply rpc_connection::send_post_request(std::string body, bool show_log) { // These object is used as a context for ssl connection boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client); @@ -239,7 +273,7 @@ rpc_reply rpc_client::send_post_request(std::string body, bool show_log) { reply.body = rbody; if (show_log) { - ilog("### Request URL: ${url}", ("url", url)); + ilog("### Request URL: ${url}", ("url", credentials.url)); ilog("### Request: ${body}", ("body", body)); ilog("### Response: ${rbody}", ("rbody", rbody)); } @@ -247,4 +281,113 @@ rpc_reply rpc_client::send_post_request(std::string body, bool show_log) { return reply; } +rpc_client::rpc_client(sidechain_type _sidechain, const std::vector &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection) : + sidechain(_sidechain), + debug_rpc_calls(_debug_rpc_calls), + simulate_connection_reselection(_simulate_connection_reselection) { + FC_ASSERT(_credentials.size()); + for (size_t i = 0; i < _credentials.size(); i++) + connections.push_back(new rpc_connection(_credentials[i], _debug_rpc_calls)); + n_active_conn = 0; + if (connections.size() > 1) + schedule_connection_selection(); +} + +void rpc_client::schedule_connection_selection() { + fc::time_point now = fc::time_point::now(); + static const int64_t time_to_next_conn_selection = 10 * 1000 * 1000; // 10 sec + fc::time_point next_wakeup = now + fc::microseconds(time_to_next_conn_selection); + connection_selection_task = fc::schedule([this] { + select_connection(); + }, + next_wakeup, "SON RPC connection selection"); +} + +void rpc_client::select_connection() { + FC_ASSERT(connections.size() > 1); + + const std::lock_guard lock(conn_mutex); + + static const int t_limit = 5 * 1000 * 1000, // 5 sec + quality_diff_threshold = 10 * 1000; // 10 ms + + int best_n = -1; + int best_quality = -1; + + std::vector head_block_numbers; + head_block_numbers.resize(connections.size()); + + std::vector qualities; + qualities.resize(connections.size()); + + for (size_t n = 0; n < connections.size(); n++) { + rpc_connection &conn = *connections[n]; + int quality = 0; + head_block_numbers[n] = std::numeric_limits::max(); + + // ping n'th node + if (debug_rpc_calls) + ilog("### Ping ${sidechain} node #${n}, ${url}", ("sidechain", fc::reflector::to_string(sidechain))("n", n)("url", conn.get_url())); + fc::time_point t_sent = fc::time_point::now(); + uint64_t head_block_number = ping(conn); + fc::time_point t_received = fc::time_point::now(); + int t = (t_received - t_sent).count(); + + // evaluate n'th node reply quality and switch to it if it's better + if (head_block_number != std::numeric_limits::max()) { + if (simulate_connection_reselection) + t += rand() % 10; + FC_ASSERT(t != -1); + head_block_numbers[n] = head_block_number; + if (t < t_limit) + quality = t_limit - t; // the less time, the higher quality + + // look for the best quality + if (quality > best_quality) { + best_n = n; + best_quality = quality; + } + } + qualities[n] = quality; + } + + FC_ASSERT(best_n != -1 && best_quality != -1); + if (best_n != n_active_conn) { // if the best client is not the current one, ... + uint64_t active_head_block_number = head_block_numbers[n_active_conn]; + if ((active_head_block_number == std::numeric_limits::max() // ...and the current one has no known head block... + || head_block_numbers[best_n] >= active_head_block_number) // ...or the best client's head is more recent than the current, ... + && best_quality > qualities[n_active_conn] + quality_diff_threshold) { // ...and the new client's quality exceeds current more than by threshold + n_active_conn = best_n; // ...then select new one + if (debug_rpc_calls) + ilog("### Reselected ${sidechain} node to #${n}, ${url}", ("sidechain", fc::reflector::to_string(sidechain))("n", n_active_conn)("url", connections[n_active_conn]->get_url())); + } + } + + schedule_connection_selection(); +} + +rpc_connection &rpc_client::get_active_connection() const { + return *connections[n_active_conn]; +} + +std::string rpc_client::send_post_request(std::string method, std::string params, bool show_log) { + const std::lock_guard lock(conn_mutex); + return send_post_request(get_active_connection(), method, params, show_log); +} + +std::string rpc_client::send_post_request(rpc_connection &conn, std::string method, std::string params, bool show_log) { + return conn.send_post_request(method, params, show_log); +} + +rpc_client::~rpc_client() { + try { + if (connection_selection_task.valid()) + connection_selection_task.cancel_and_wait(__FUNCTION__); + } catch (fc::canceled_exception &) { + //Expected exception. Move along. + } catch (fc::exception &e) { + edump((e.to_detail_string())); + } +} + }} // namespace graphene::peerplays_sidechain diff --git a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/common/rpc_client.hpp b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/common/rpc_client.hpp index eb8eac0c1..bdec60d1c 100644 --- a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/common/rpc_client.hpp +++ b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/common/rpc_client.hpp @@ -3,44 +3,52 @@ #include #include +#include +#include + #include #include +#include + namespace graphene { namespace peerplays_sidechain { -struct rpc_reply { - uint16_t status; - std::string body; +class rpc_connection; + +struct rpc_credentials { + std::string url; + std::string user; + std::string password; }; class rpc_client { public: - rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls); + const sidechain_type sidechain; -protected: - std::string retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx); - std::string retrieve_value_from_reply(std::string reply_str, std::string value_path); - std::string send_post_request(std::string method, std::string params, bool show_log); + rpc_client(sidechain_type _sidechain, const std::vector &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection); + ~rpc_client(); - std::string url; - std::string user; - std::string password; +protected: bool debug_rpc_calls; + bool simulate_connection_reselection; + std::string send_post_request(std::string method, std::string params, bool show_log); - std::string protocol; - std::string host; - std::string port; - std::string target; - std::string authorization; + static std::string send_post_request(rpc_connection &conn, std::string method, std::string params, bool show_log); - uint32_t request_id; + static std::string retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx); + static std::string retrieve_value_from_reply(std::string reply_str, std::string value_path); private: - rpc_reply send_post_request(std::string body, bool show_log); + std::vector connections; + int n_active_conn; + fc::future connection_selection_task; + std::mutex conn_mutex; + + rpc_connection &get_active_connection() const; - boost::beast::net::io_context ioc; - boost::beast::net::ip::tcp::resolver resolver; - boost::asio::ip::basic_resolver_results results; + void select_connection(); + void schedule_connection_selection(); + virtual uint64_t ping(rpc_connection &conn) const = 0; }; }} // namespace graphene::peerplays_sidechain diff --git a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler.hpp b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler.hpp index 836a1f054..85dbc1f62 100644 --- a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler.hpp +++ b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler.hpp @@ -16,8 +16,10 @@ namespace graphene { namespace peerplays_sidechain { class sidechain_net_handler { +protected: + sidechain_net_handler(sidechain_type _sidechain, peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options); + public: - sidechain_net_handler(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options); virtual ~sidechain_net_handler(); sidechain_type get_sidechain() const; @@ -54,9 +56,9 @@ class sidechain_net_handler { virtual optional estimate_withdrawal_transaction_fee() const = 0; protected: + const sidechain_type sidechain; peerplays_sidechain_plugin &plugin; graphene::chain::database &database; - sidechain_type sidechain; bool debug_rpc_calls; bool use_bitcoind_client; diff --git a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp index bfb938058..96e8ec9c5 100644 --- a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp +++ b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp @@ -98,7 +98,7 @@ class bitcoin_client_base { class bitcoin_rpc_client : public bitcoin_client_base, public rpc_client { public: public: - bitcoin_rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls); + bitcoin_rpc_client(const std::vector &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection); uint64_t estimatesmartfee(uint16_t conf_target = 1); std::vector getblock(const block_data &block, int32_t verbosity = 2); @@ -113,6 +113,8 @@ class bitcoin_rpc_client : public bitcoin_client_base, public rpc_client { std::string walletlock(); bool walletpassphrase(const std::string &passphrase, uint32_t timeout = 60); + virtual uint64_t ping(rpc_connection &conn) const override; + private: std::string ip; std::string user; @@ -213,14 +215,11 @@ class sidechain_net_handler_bitcoin : public sidechain_net_handler { virtual optional estimate_withdrawal_transaction_fee() const override; private: - std::string bitcoin_node_ip; + std::vector _rpc_credentials; std::string libbitcoin_server_ip; uint32_t libbitcoin_block_zmq_port; uint32_t libbitcoin_trx_zmq_port; uint32_t bitcoin_node_zmq_port; - uint32_t rpc_port; - std::string rpc_user; - std::string rpc_password; std::string wallet_name; std::string wallet_password; diff --git a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_ethereum.hpp b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_ethereum.hpp index 0cd22f96e..fcabccf56 100644 --- a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_ethereum.hpp +++ b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_ethereum.hpp @@ -14,7 +14,7 @@ namespace graphene { namespace peerplays_sidechain { class ethereum_rpc_client : public rpc_client { public: - ethereum_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls); + ethereum_rpc_client(const std::vector &credentials, bool debug_rpc_calls, bool simulate_connection_reselection); std::string eth_blockNumber(); std::string eth_get_block_by_number(std::string block_number, bool full_block); @@ -36,6 +36,8 @@ class ethereum_rpc_client : public rpc_client { std::string eth_send_raw_transaction(const std::string ¶ms); std::string eth_get_transaction_receipt(const std::string ¶ms); std::string eth_get_transaction_by_hash(const std::string ¶ms); + + virtual uint64_t ping(rpc_connection &conn) const override; }; class sidechain_net_handler_ethereum : public sidechain_net_handler { @@ -54,13 +56,9 @@ class sidechain_net_handler_ethereum : public sidechain_net_handler { virtual optional estimate_withdrawal_transaction_fee() const override; private: - using bimap_type = boost::bimap; - -private: - std::string rpc_url; - std::string rpc_user; - std::string rpc_password; + std::vector _rpc_credentials; std::string wallet_contract_address; + using bimap_type = boost::bimap; bimap_type erc20_addresses; ethereum_rpc_client *rpc_client; diff --git a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_hive.hpp b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_hive.hpp index 440a2520f..baa95bbfe 100644 --- a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_hive.hpp +++ b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_hive.hpp @@ -13,7 +13,7 @@ namespace graphene { namespace peerplays_sidechain { class hive_rpc_client : public rpc_client { public: - hive_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls); + hive_rpc_client(const std::vector &credentials, bool debug_rpc_calls, bool simulate_connection_reselection); std::string account_history_api_get_transaction(std::string transaction_id); std::string block_api_get_block(uint32_t block_number); @@ -30,6 +30,8 @@ class hive_rpc_client : public rpc_client { std::string get_head_block_time(); std::string get_is_test_net(); std::string get_last_irreversible_block_num(); + + virtual uint64_t ping(rpc_connection &conn) const override; }; class sidechain_net_handler_hive : public sidechain_net_handler { @@ -48,9 +50,8 @@ class sidechain_net_handler_hive : public sidechain_net_handler { virtual optional estimate_withdrawal_transaction_fee() const override; private: - std::string rpc_url; - std::string rpc_user; - std::string rpc_password; + std::vector _rpc_credentials; + std::string wallet_account_name; hive_rpc_client *rpc_client; diff --git a/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp b/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp index e4a2b8ea2..6f6747dbe 100644 --- a/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp +++ b/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp @@ -175,13 +175,14 @@ void peerplays_sidechain_plugin_impl::plugin_set_program_options( cli.add_options()("sidechain-retry-threshold", bpo::value()->default_value(150), "Sidechain retry throttling threshold"); cli.add_options()("debug-rpc-calls", bpo::value()->default_value(false), "Outputs RPC calls to console"); + cli.add_options()("simulate-rpc-connection-reselection", bpo::value()->default_value(false), "Simulate RPC connection reselection by altering their response times by a random value"); cli.add_options()("bitcoin-sidechain-enabled", bpo::value()->default_value(false), "Bitcoin sidechain handler enabled"); + cli.add_options()("bitcoin-node-ip", bpo::value>()->composing()->multitoken()->DEFAULT_VALUE_VECTOR("127.0.0.1"), "IP address of Bitcoin node"); cli.add_options()("use-bitcoind-client", bpo::value()->default_value(false), "Use bitcoind client instead of libbitcoin client"); cli.add_options()("libbitcoin-server-ip", bpo::value()->default_value("127.0.0.1"), "Libbitcoin server IP address"); cli.add_options()("libbitcoin-server-block-zmq-port", bpo::value()->default_value(9093), "Block ZMQ port of libbitcoin server"); cli.add_options()("libbitcoin-server-trx-zmq-port", bpo::value()->default_value(9094), "Trx ZMQ port of libbitcoin server"); - cli.add_options()("bitcoin-node-ip", bpo::value()->default_value("127.0.0.1"), "IP address of Bitcoin node"); cli.add_options()("bitcoin-node-zmq-port", bpo::value()->default_value(11111), "ZMQ port of Bitcoin node"); cli.add_options()("bitcoin-node-rpc-port", bpo::value()->default_value(8332), "RPC port of Bitcoin node"); cli.add_options()("bitcoin-node-rpc-user", bpo::value()->default_value("1"), "Bitcoin RPC user"); @@ -192,7 +193,7 @@ void peerplays_sidechain_plugin_impl::plugin_set_program_options( "Tuple of [Bitcoin public key, Bitcoin private key] (may specify multiple times)"); cli.add_options()("ethereum-sidechain-enabled", bpo::value()->default_value(false), "Ethereum sidechain handler enabled"); - cli.add_options()("ethereum-node-rpc-url", bpo::value()->default_value("127.0.0.1:8545"), "Ethereum node RPC URL [http[s]://]host[:port]"); + cli.add_options()("ethereum-node-rpc-url", bpo::value>()->composing()->multitoken()->DEFAULT_VALUE_VECTOR("127.0.0.1:8545"), "Ethereum node RPC URL [http[s]://]host[:port]"); cli.add_options()("ethereum-node-rpc-user", bpo::value(), "Ethereum RPC user"); cli.add_options()("ethereum-node-rpc-password", bpo::value(), "Ethereum RPC password"); cli.add_options()("ethereum-wallet-contract-address", bpo::value(), "Ethereum wallet contract address"); @@ -202,7 +203,7 @@ void peerplays_sidechain_plugin_impl::plugin_set_program_options( "Tuple of [Ethereum public key, Ethereum private key] (may specify multiple times)"); cli.add_options()("hive-sidechain-enabled", bpo::value()->default_value(false), "Hive sidechain handler enabled"); - cli.add_options()("hive-node-rpc-url", bpo::value()->default_value("127.0.0.1:28090"), "Hive node RPC URL [http[s]://]host[:port]"); + cli.add_options()("hive-node-rpc-url", bpo::value>()->composing()->multitoken()->DEFAULT_VALUE_VECTOR("127.0.0.1:28090"), "Hive node RPC URL [http[s]://]host[:port]"); cli.add_options()("hive-node-rpc-user", bpo::value(), "Hive node RPC user"); cli.add_options()("hive-node-rpc-password", bpo::value(), "Hive node RPC password"); cli.add_options()("hive-wallet-account-name", bpo::value(), "Hive wallet account name"); @@ -291,6 +292,9 @@ void peerplays_sidechain_plugin_impl::plugin_initialize(const boost::program_opt if (sidechain_enabled_peerplays && !config_ready_peerplays) { wlog("Haven't set up Peerplays sidechain parameters"); } + + if (options.at("simulate-rpc-connection-reselection").as()) + ilog("### RPC connection reselection will be simulated"); } void peerplays_sidechain_plugin_impl::plugin_startup() { diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp index a4fe12528..f762da493 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler.cpp @@ -9,7 +9,8 @@ namespace graphene { namespace peerplays_sidechain { -sidechain_net_handler::sidechain_net_handler(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) : +sidechain_net_handler::sidechain_net_handler(sidechain_type _sidechain, peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) : + sidechain(_sidechain), plugin(_plugin), database(_plugin.database()) { diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp index c528d8a45..76e23ab03 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp @@ -25,8 +25,8 @@ namespace graphene { namespace peerplays_sidechain { // ============================================================================= -bitcoin_rpc_client::bitcoin_rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls) : - rpc_client(_url, _user, _password, _debug_rpc_calls) { +bitcoin_rpc_client::bitcoin_rpc_client(const std::vector &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection) : + rpc_client(sidechain_type::bitcoin, _credentials, _debug_rpc_calls, _simulate_connection_reselection) { } uint64_t bitcoin_rpc_client::estimatesmartfee(uint16_t conf_target) { @@ -498,6 +498,13 @@ std::string bitcoin_libbitcoin_client::sendrawtransaction(const std::string &tx_ return res; } +uint64_t bitcoin_rpc_client::ping(rpc_connection &conn) const { + std::string str = send_post_request(conn, "getblockcount", "[]", debug_rpc_calls); + if (str.length() > 0) + return std::stoll(str); + return std::numeric_limits::max(); +} + // ============================================================================= zmq_listener::zmq_listener(std::string _ip, uint32_t _zmq_block_port, uint32_t _zmq_trx_port) : @@ -655,13 +662,19 @@ void zmq_listener_libbitcoin::handle_block() { // ============================================================================= sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) : - sidechain_net_handler(_plugin, options) { - sidechain = sidechain_type::bitcoin; + sidechain_net_handler(sidechain_type::bitcoin, _plugin, options) { if (options.count("debug-rpc-calls")) { debug_rpc_calls = options.at("debug-rpc-calls").as(); } + bool simulate_connection_reselection = options.at("simulate-rpc-connection-reselection").as(); + std::vector ips = options.at("bitcoin-node-ip").as>(); + bitcoin_node_zmq_port = options.at("bitcoin-node-zmq-port").as(); + uint32_t rpc_port = options.at("bitcoin-node-rpc-port").as(); + std::string rpc_user = options.at("bitcoin-node-rpc-user").as(); + std::string rpc_password = options.at("bitcoin-node-rpc-password").as(); + if (options.count("use-bitcoind-client")) { use_bitcoind_client = options.at("use-bitcoind-client").as(); } @@ -670,11 +683,6 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain libbitcoin_block_zmq_port = options.at("libbitcoin-server-block-zmq-port").as(); libbitcoin_trx_zmq_port = options.at("libbitcoin-server-trx-zmq-port").as(); - bitcoin_node_ip = options.at("bitcoin-node-ip").as(); - bitcoin_node_zmq_port = options.at("bitcoin-node-zmq-port").as(); - rpc_port = options.at("bitcoin-node-rpc-port").as(); - rpc_user = options.at("bitcoin-node-rpc-user").as(); - rpc_password = options.at("bitcoin-node-rpc-password").as(); wallet_name = ""; if (options.count("bitcoin-wallet-name")) { wallet_name = options.at("bitcoin-wallet-name").as(); @@ -697,17 +705,27 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain } if (use_bitcoind_client) { - std::string url = bitcoin_node_ip + ":" + std::to_string(rpc_port); - if (!wallet_name.empty()) { - url = url + "/wallet/" + wallet_name; + + for (size_t i = 0; i < ips.size(); i++) { + std::string ip = ips[i]; + std::string url = ip + ":" + std::to_string(rpc_port); + if (!wallet_name.empty()) { + url = url + "/wallet/" + wallet_name; + } + rpc_credentials creds; + creds.url = url; + creds.user = rpc_user; + creds.password = rpc_password; + _rpc_credentials.push_back(creds); } - bitcoin_client = std::unique_ptr(new bitcoin_rpc_client(url, rpc_user, rpc_password, debug_rpc_calls)); + FC_ASSERT(!_rpc_credentials.empty()); + + bitcoin_client = std::unique_ptr(new bitcoin_rpc_client(_rpc_credentials, debug_rpc_calls, simulate_connection_reselection)); if (!wallet_name.empty()) { bitcoin_client->loadwallet(wallet_name); } - listener = std::unique_ptr(new zmq_listener(bitcoin_node_ip, bitcoin_node_zmq_port)); - + listener = std::unique_ptr(new zmq_listener(ips[0], bitcoin_node_zmq_port)); } else { bitcoin_client = std::unique_ptr(new bitcoin_libbitcoin_client(libbitcoin_server_ip)); @@ -727,7 +745,6 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain bitcoin_client->getnetworkinfo(); - listener->start(); listener->block_event_received.connect([this](const block_data &block_event_data) { std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach(); }); @@ -736,6 +753,8 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach(); }); + listener->start(); + database.changed_objects.connect([this](const vector &ids, const flat_set &accounts) { on_changed_objects(ids, accounts); }); diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp index bb6a9821c..c4db3266b 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_ethereum.cpp @@ -25,8 +25,8 @@ namespace graphene { namespace peerplays_sidechain { -ethereum_rpc_client::ethereum_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls) : - rpc_client(url, user_name, password, debug_rpc_calls) { +ethereum_rpc_client::ethereum_rpc_client(const std::vector &credentials, bool debug_rpc_calls, bool simulate_connection_reselection) : + rpc_client(sidechain_type::ethereum, credentials, debug_rpc_calls, simulate_connection_reselection) { } std::string ethereum_rpc_client::eth_blockNumber() { @@ -126,20 +126,29 @@ std::string ethereum_rpc_client::eth_get_transaction_by_hash(const std::string & return send_post_request("eth_getTransactionByHash", "[\"" + params + "\"]", debug_rpc_calls); } +uint64_t ethereum_rpc_client::ping(rpc_connection &conn) const { + std::string reply = send_post_request(conn, "eth_blockNumber", "", debug_rpc_calls); + if (!reply.empty()) + return ethereum::from_hex(retrieve_value_from_reply(reply, "")); + return std::numeric_limits::max(); +} + sidechain_net_handler_ethereum::sidechain_net_handler_ethereum(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) : - sidechain_net_handler(_plugin, options) { - sidechain = sidechain_type::ethereum; + sidechain_net_handler(sidechain_type::ethereum, _plugin, options) { if (options.count("debug-rpc-calls")) { debug_rpc_calls = options.at("debug-rpc-calls").as(); } + bool simulate_connection_reselection = options.at("simulate-rpc-connection-reselection").as(); - rpc_url = options.at("ethereum-node-rpc-url").as(); + std::vector rpc_urls = options.at("ethereum-node-rpc-url").as>(); + std::string rpc_user; if (options.count("ethereum-node-rpc-user")) { rpc_user = options.at("ethereum-node-rpc-user").as(); } else { rpc_user = ""; } + std::string rpc_password; if (options.count("ethereum-node-rpc-password")) { rpc_password = options.at("ethereum-node-rpc-password").as(); } else { @@ -175,18 +184,27 @@ sidechain_net_handler_ethereum::sidechain_net_handler_ethereum(peerplays_sidecha } } - rpc_client = new ethereum_rpc_client(rpc_url, rpc_user, rpc_password, debug_rpc_calls); + for (size_t i = 0; i < rpc_urls.size(); i++) { + rpc_credentials creds; + creds.url = rpc_urls[i]; + creds.user = rpc_user; + creds.password = rpc_password; + _rpc_credentials.push_back(creds); + } + FC_ASSERT(!_rpc_credentials.empty()); + + rpc_client = new ethereum_rpc_client(_rpc_credentials, debug_rpc_calls, simulate_connection_reselection); const std::string chain_id_str = rpc_client->get_chain_id(); if (chain_id_str.empty()) { - elog("No Ethereum node running at ${url}", ("url", rpc_url)); + elog("No Ethereum node running at ${url}", ("url", _rpc_credentials[0].url)); FC_ASSERT(false); } chain_id = std::stoll(chain_id_str); const std::string network_id_str = rpc_client->get_network_id(); if (network_id_str.empty()) { - elog("No Ethereum node running at ${url}", ("url", rpc_url)); + elog("No Ethereum node running at ${url}", ("url", _rpc_credentials[0].url)); FC_ASSERT(false); } network_id = std::stoll(network_id_str); diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp index 6a54b7f57..eb2f332f8 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_hive.cpp @@ -30,8 +30,8 @@ namespace graphene { namespace peerplays_sidechain { -hive_rpc_client::hive_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls) : - rpc_client(url, user_name, password, debug_rpc_calls) { +hive_rpc_client::hive_rpc_client(const std::vector &credentials, bool debug_rpc_calls, bool simulate_connection_reselection) : + rpc_client(sidechain_type::hive, credentials, debug_rpc_calls, simulate_connection_reselection) { } std::string hive_rpc_client::account_history_api_get_transaction(std::string transaction_id) { @@ -112,20 +112,34 @@ std::string hive_rpc_client::get_last_irreversible_block_num() { return retrieve_value_from_reply(reply_str, "last_irreversible_block_num"); } +uint64_t hive_rpc_client::ping(rpc_connection &conn) const { + const std::string reply = send_post_request(conn, "database_api.get_dynamic_global_properties", "", debug_rpc_calls); + if (!reply.empty()) { + std::stringstream ss(reply); + boost::property_tree::ptree json; + boost::property_tree::read_json(ss, json); + if (json.count("result")) + return json.get("result.head_block_number"); + } + return std::numeric_limits::max(); +} + sidechain_net_handler_hive::sidechain_net_handler_hive(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) : - sidechain_net_handler(_plugin, options) { - sidechain = sidechain_type::hive; + sidechain_net_handler(sidechain_type::hive, _plugin, options) { if (options.count("debug-rpc-calls")) { debug_rpc_calls = options.at("debug-rpc-calls").as(); } + bool simulate_connection_reselection = options.at("simulate-rpc-connection-reselection").as(); - rpc_url = options.at("hive-node-rpc-url").as(); + std::vector rpc_urls = options.at("hive-node-rpc-url").as>(); + std::string rpc_user; if (options.count("hive-rpc-user")) { rpc_user = options.at("hive-rpc-user").as(); } else { rpc_user = ""; } + std::string rpc_password; if (options.count("hive-rpc-password")) { rpc_password = options.at("hive-rpc-password").as(); } else { @@ -146,11 +160,20 @@ sidechain_net_handler_hive::sidechain_net_handler_hive(peerplays_sidechain_plugi } } - rpc_client = new hive_rpc_client(rpc_url, rpc_user, rpc_password, debug_rpc_calls); + for (size_t i = 0; i < rpc_urls.size(); i++) { + rpc_credentials creds; + creds.url = rpc_urls[i]; + creds.user = rpc_user; + creds.password = rpc_password; + _rpc_credentials.push_back(creds); + } + FC_ASSERT(!_rpc_credentials.empty()); + + rpc_client = new hive_rpc_client(_rpc_credentials, debug_rpc_calls, simulate_connection_reselection); const std::string chain_id_str = rpc_client->get_chain_id(); if (chain_id_str.empty()) { - elog("No Hive node running at ${url}", ("url", rpc_url)); + elog("No Hive node running at ${url}", ("url", _rpc_credentials[0].url)); FC_ASSERT(false); } chain_id = chain_id_type(chain_id_str); diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_peerplays.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_peerplays.cpp index 0f6d06ebc..edf92e808 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_peerplays.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_peerplays.cpp @@ -23,8 +23,7 @@ namespace graphene { namespace peerplays_sidechain { sidechain_net_handler_peerplays::sidechain_net_handler_peerplays(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) : - sidechain_net_handler(_plugin, options) { - sidechain = sidechain_type::peerplays; + sidechain_net_handler(sidechain_type::peerplays, _plugin, options) { //const auto &assets_by_symbol = database.get_index_type().indices().get(); //const auto get_asset_id = [&assets_by_symbol](const string &symbol) { // auto asset_itr = assets_by_symbol.find(symbol); From 54ff842db10f667f91e81fa17b1e66f44e6d8b6c Mon Sep 17 00:00:00 2001 From: Milo M Date: Mon, 6 Mar 2023 15:50:17 +0000 Subject: [PATCH 8/8] num_son no overwriting --- libraries/chain/account_evaluator.cpp | 118 ++++++++++++++++-- .../graphene/chain/protocol/account.hpp | 15 ++- tests/cli/son.cpp | 13 ++ 3 files changed, 132 insertions(+), 14 deletions(-) diff --git a/libraries/chain/account_evaluator.cpp b/libraries/chain/account_evaluator.cpp index 4ecb3307a..884bba2d0 100644 --- a/libraries/chain/account_evaluator.cpp +++ b/libraries/chain/account_evaluator.cpp @@ -53,7 +53,54 @@ void verify_authority_accounts( const database& db, const authority& a ) } } -void verify_account_votes( const database& db, const account_options& options ) +// Overwrites the num_son values from the origin to the destination for those sidechains which are found in the origin. +// Keeps the values of num_son for the sidechains which are found in the destination, but not in the origin. +// Returns false if an error is detected. +bool merge_num_sons( flat_map& destination, + const flat_map& origin, + fc::optional head_block_time = {}) +{ + const auto active_sidechains = head_block_time.valid() ? active_sidechain_types(*head_block_time) : all_sidechain_types; + bool success = true; + + for (const auto &ns : origin) + { + destination[ns.first] = ns.second; + if (active_sidechains.find(ns.first) == active_sidechains.end()) + { + success = false; + } + } + + return success; +} + +flat_map count_SON_votes_per_sidechain( const flat_set& votes ) +{ + flat_map SON_votes_per_sidechain = account_options::ext::empty_num_son(); + + for (const auto &vote : votes) + { + switch (vote.type()) + { + case vote_id_type::son_bitcoin: + SON_votes_per_sidechain[sidechain_type::bitcoin]++; + break; + case vote_id_type::son_hive: + SON_votes_per_sidechain[sidechain_type::hive]++; + break; + case vote_id_type::son_ethereum: + SON_votes_per_sidechain[sidechain_type::ethereum]++; + break; + default: + break; + } + } + + return SON_votes_per_sidechain; +} + +void verify_account_votes( const database& db, const account_options& options, fc::optional account = {} ) { // ensure account's votes satisfy requirements // NB only the part of vote checking that requires chain state is here, @@ -69,14 +116,40 @@ void verify_account_votes( const database& db, const account_options& options ) FC_ASSERT( options.num_committee <= chain_params.maximum_committee_count, "Voted for more committee members than currently allowed (${c})", ("c", chain_params.maximum_committee_count) ); FC_ASSERT( chain_params.extensions.value.maximum_son_count.valid() , "Invalid maximum son count" ); + + flat_map merged_num_sons = account_options::ext::empty_num_son(); + + // Merge with existing account if exists + if ( account.valid() && account->options.extensions.value.num_son.valid()) + { + merge_num_sons( merged_num_sons, *account->options.extensions.value.num_son, db.head_block_time() ); + } + + // Apply update operation on top if ( options.extensions.value.num_son.valid() ) { - for(const auto& num_sons : *options.extensions.value.num_son) - { - FC_ASSERT( num_sons.second <= *chain_params.extensions.value.maximum_son_count, - "Voted for more sons than currently allowed (${c})", ("c", *chain_params.extensions.value.maximum_son_count) ); - } + merge_num_sons( merged_num_sons, *options.extensions.value.num_son, db.head_block_time() ); + } + + for(const auto& num_sons : merged_num_sons) + { + FC_ASSERT( num_sons.second <= *chain_params.extensions.value.maximum_son_count, + "Voted for more sons than currently allowed (${c})", ("c", *chain_params.extensions.value.maximum_son_count) ); } + + // Count the votes for SONs and confirm that the account did not vote for less SONs than num_son + flat_map SON_votes_per_sidechain = count_SON_votes_per_sidechain(options.votes); + + for (const auto& number_of_votes : SON_votes_per_sidechain) + { + // Number of votes of account_options are also checked in account_options::do_evaluate, + // but there we are checking the value before merging num_sons, so the values should be checked again + const auto sidechain = number_of_votes.first; + FC_ASSERT( number_of_votes.second >= merged_num_sons[sidechain], + "Voted for less sons than specified in num_son (votes ${v} < num_son ${ns}) for sidechain ${s}", + ("v", number_of_votes.second) ("ns", merged_num_sons[sidechain]) ("s", sidechain) ); + } + FC_ASSERT( db.find_object(options.voting_account), "Invalid proxy account specified." ); uint32_t max_vote_id = gpo.next_available_vote_id; @@ -191,9 +264,10 @@ object_id_type account_create_evaluator::do_apply( const account_create_operatio obj.active = o.active; obj.options = o.options; - if (!obj.options.extensions.value.num_son.valid()) + obj.options.extensions.value.num_son = account_options::ext::empty_num_son(); + if ( o.options.extensions.value.num_son.valid() ) { - obj.options.extensions.value = account_options::ext(); + merge_num_sons( *obj.options.extensions.value.num_son, *o.options.extensions.value.num_son ); } obj.statistics = d.create([&obj](account_statistics_object& s){ @@ -295,7 +369,7 @@ void_result account_update_evaluator::do_evaluate( const account_update_operatio acnt = &o.account(d); if( o.new_options.valid() ) - verify_account_votes( d, *o.new_options ); + verify_account_votes( d, *o.new_options, *acnt ); return void_result(); } FC_CAPTURE_AND_RETHROW( (o) ) } @@ -334,7 +408,31 @@ void_result account_update_evaluator::do_apply( const account_update_operation& a.active = *o.active; a.top_n_control_flags = 0; } - if( o.new_options ) a.options = *o.new_options; + + // New num_son structure initialized to 0 + flat_map new_num_son = account_options::ext::empty_num_son(); + + // If num_son of existing object is valid, we should merge the existing data + if ( a.options.extensions.value.num_son.valid() ) + { + merge_num_sons( new_num_son, *a.options.extensions.value.num_son ); + } + + // If num_son of the operation are valid, they should merge the existing data + if ( o.new_options ) + { + const auto new_options = *o.new_options; + + if ( new_options.extensions.value.num_son.valid() ) + { + merge_num_sons( new_num_son, *new_options.extensions.value.num_son ); + } + + a.options = *o.new_options; + } + + a.options.extensions.value.num_son = new_num_son; + if( o.extensions.value.owner_special_authority.valid() ) { a.owner_special_authority = *(o.extensions.value.owner_special_authority); diff --git a/libraries/chain/include/graphene/chain/protocol/account.hpp b/libraries/chain/include/graphene/chain/protocol/account.hpp index c80748854..c46caac5e 100644 --- a/libraries/chain/include/graphene/chain/protocol/account.hpp +++ b/libraries/chain/include/graphene/chain/protocol/account.hpp @@ -36,21 +36,28 @@ namespace graphene { namespace chain { bool is_cheap_name( const string& n ); /// These are the fields which can be updated by the active authority. - struct account_options + struct account_options { struct ext { /// The number of active son members this account votes the blockchain should appoint /// Must not exceed the actual number of son members voted for in @ref votes - optional< flat_map > num_son = []{ + optional< flat_map > num_son; + + /// Returns and empty num_son map with all sidechains + static flat_map empty_num_son() + { flat_map num_son; - for(const auto& active_sidechain_type : all_sidechain_types){ + for(const auto& active_sidechain_type : all_sidechain_types) + { num_son[active_sidechain_type] = 0; } + return num_son; - }(); + } }; + /// The memo key is the key this account will typically use to encrypt/sign transaction memos and other non- /// validated account activities. This field is here to prevent confusion if the active authority has zero or /// multiple keys in it. diff --git a/tests/cli/son.cpp b/tests/cli/son.cpp index 4662c57c3..e37585578 100644 --- a/tests/cli/son.cpp +++ b/tests/cli/son.cpp @@ -740,6 +740,19 @@ BOOST_AUTO_TEST_CASE( update_son_votes_test ) sidechain_type::ethereum, 0, true); BOOST_CHECK(generate_maintenance_block()); + // Vote for less SONs than num_son (2 votes, but num_son is 3) + accepted.clear(); + rejected.clear(); + accepted.push_back("son1account"); + accepted.push_back("son2account"); + BOOST_CHECK_THROW(update_votes_tx = con.wallet_api_ptr->update_son_votes("nathan", accepted, rejected, + sidechain_type::bitcoin, 3, true), fc::exception); + BOOST_CHECK_THROW(update_votes_tx = con.wallet_api_ptr->update_son_votes("nathan", accepted, rejected, + sidechain_type::hive, 3, true), fc::exception); + BOOST_CHECK_THROW(update_votes_tx = con.wallet_api_ptr->update_son_votes("nathan", accepted, rejected, + sidechain_type::ethereum, 3, true), fc::exception); + generate_block(); + // Verify the votes son1_obj = con.wallet_api_ptr->get_son("son1account"); son1_end_votes = son1_obj.total_votes;