Skip to content

Commit

Permalink
Merge pull request #853 from AntelopeIO/GH-525-p2p-resolve
Browse files Browse the repository at this point in the history
[1.0.2] P2P: Resolve on reconnect
  • Loading branch information
heifner authored Oct 2, 2024
2 parents 35a9bae + 6b2ea49 commit eb40e5e
Showing 1 changed file with 98 additions and 103 deletions.
201 changes: 98 additions & 103 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,6 @@ namespace eosio {
struct connection_detail {
std::string host;
connection_ptr c;
tcp::endpoint active_ip;
tcp::resolver::results_type ips;
};

using connection_details_index = multi_index_container<
Expand Down Expand Up @@ -404,6 +402,8 @@ namespace eosio {
boost::asio::steady_timer::duration conn_period,
uint32_t maximum_client_count);

std::chrono::milliseconds get_heartbeat_timeout() const { return heartbeat_timeout; }

uint32_t get_max_client_count() const { return max_client_count; }

fc::microseconds get_connector_period() const;
Expand All @@ -421,8 +421,6 @@ namespace eosio {
void add(connection_ptr c);
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address);
void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint);
void connect(const connection_ptr& c);
string disconnect(const string& host);
void close_all();

Expand Down Expand Up @@ -1015,7 +1013,7 @@ namespace eosio {

bool populate_handshake( handshake_message& hello ) const;

bool reconnect();
bool resolve_and_connect();
void connect( const tcp::resolver::results_type& endpoints );
void start_read_message();

Expand Down Expand Up @@ -1192,16 +1190,21 @@ namespace eosio {
};


std::tuple<std::string, std::string, std::string> split_host_port_type(const std::string& peer_add) {
std::tuple<std::string, std::string, std::string> split_host_port_type(const std::string& peer_add, bool incoming) {
// host:port:[<trx>|<blk>]
if (peer_add.empty()) return {};

string::size_type p = peer_add[0] == '[' ? peer_add.find(']') : 0;
if (p == string::npos) {
fc_wlog( logger, "Invalid peer address: ${peer}", ("peer", peer_add) );
string::size_type colon = p != string::npos ? peer_add.find(':', p) : string::npos;
if (colon == std::string::npos || colon == 0) {
// if incoming then not an error this peer can do anything about
if (incoming) {
fc_dlog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_add) );
} else {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_add) );
}
return {};
}
string::size_type colon = peer_add.find(':', p);
string::size_type colon2 = peer_add.find(':', colon + 1);
string::size_type end = colon2 == string::npos
? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex
Expand Down Expand Up @@ -1278,8 +1281,8 @@ namespace eosio {
last_handshake_sent(),
p2p_address( endpoint )
{
set_connection_type( peer_address() );
my_impl->mark_bp_connection(this);
update_endpoints();
fc_ilog( logger, "created connection - ${c} to ${n}", ("c", connection_id)("n", endpoint) );
}

Expand All @@ -1294,7 +1297,6 @@ namespace eosio {
last_handshake_recv(),
last_handshake_sent()
{
update_endpoints();
fc_dlog( logger, "new connection - ${c} object created for peer ${address}:${port} from listener ${addr}",
("c", connection_id)("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) );
}
Expand Down Expand Up @@ -1327,7 +1329,7 @@ namespace eosio {

// called from connection strand
void connection::set_connection_type( const std::string& peer_add ) {
auto [host, port, type] = split_host_port_type(peer_add);
auto [host, port, type] = split_host_port_type(peer_add, false);
if( type.empty() ) {
fc_dlog( logger, "Setting connection - ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) );
connection_type = both;
Expand Down Expand Up @@ -1388,6 +1390,7 @@ namespace eosio {
bool connection::start_session() {
verify_strand_in_this_thread( strand, __func__, __LINE__ );

update_endpoints();
boost::asio::ip::tcp::no_delay nodelay( true );
boost::system::error_code ec;
socket->set_option( nodelay, ec );
Expand Down Expand Up @@ -2810,31 +2813,6 @@ namespace eosio {

//------------------------------------------------------------------------

bool connection::reconnect() {
switch ( no_retry ) {
case no_reason:
case wrong_version:
case benign_other:
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
break;
default:
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}
if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}
connection_ptr c = shared_from_this();
strand.post([c]() {
my_impl->connections.connect(c);
});
return true;
}

// called from connection strand
void connection::connect( const tcp::resolver::results_type& endpoints ) {
set_state(connection_state::connecting);
Expand All @@ -2844,7 +2822,6 @@ namespace eosio {
boost::asio::bind_executor( strand,
[c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
if( !err && socket->is_open() && socket == c->socket ) {
my_impl->connections.update_connection_endpoint(c, endpoint);
c->update_endpoints(endpoint);
if( c->start_session() ) {
c->send_handshake();
Expand Down Expand Up @@ -2894,7 +2871,7 @@ namespace eosio {
fc_ilog(logger, "Accepted new connection: " + paddr_str);

connections.any_of_supplied_peers([&listen_address, &paddr_str, &paddr_desc, &limit](const string& peer_addr) {
auto [host, port, type] = split_host_port_type(peer_addr);
auto [host, port, type] = split_host_port_type(peer_addr, false);
if (host == paddr_str) {
if (limit > 0) {
fc_dlog(logger, "Connection inbound to ${la} from ${a} is a configured p2p-peer-address and will not be throttled", ("la", listen_address)("a", paddr_desc));
Expand Down Expand Up @@ -3390,9 +3367,9 @@ namespace eosio {
}

if( incoming() ) {
auto [host, port, type] = split_host_port_type(msg.p2p_address);
auto [host, port, type] = split_host_port_type(msg.p2p_address, true);
if (host.size())
set_connection_type( msg.p2p_address );
set_connection_type( msg.p2p_address);

peer_dlog( this, "checking for duplicate" );
auto is_duplicate = [&](const connection_ptr& check) {
Expand Down Expand Up @@ -4607,7 +4584,7 @@ namespace eosio {
//----------------------------------------------------------------------------

size_t connections_manager::number_connections() const {
std::lock_guard g(connections_mtx);
std::shared_lock g(connections_mtx);
return connections.size();
}

Expand Down Expand Up @@ -4636,8 +4613,9 @@ namespace eosio {
update_p2p_connection_metrics = std::move(fun);
}

// can be called from any thread
void connections_manager::connect_supplied_peers(const string& p2p_address) {
std::unique_lock g(connections_mtx);
std::shared_lock g(connections_mtx);
chain::flat_set<string> peers = supplied_peers;
g.unlock();
for (const auto& peer : peers) {
Expand All @@ -4647,12 +4625,9 @@ namespace eosio {

void connections_manager::add( connection_ptr c ) {
std::lock_guard g( connections_mtx );
boost::system::error_code ec;
auto endpoint = c->socket->remote_endpoint(ec);
connections.insert( connection_detail{
.host = c->peer_address(),
.c = std::move(c),
.active_ip = endpoint} );
.c = std::move(c)} );
}

// called by API
Expand All @@ -4664,62 +4639,72 @@ namespace eosio {
}

string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) {
string::size_type colon = peer_address.find(':');
if (colon == std::string::npos || colon == 0) {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address) );
auto [host, port, type] = split_host_port_type(peer_address, false);
if (host.empty()) {
return "invalid peer address";
}

std::lock_guard g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";

auto [host, port, type] = split_host_port_type(peer_address);

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );

resolver->async_resolve(host, port,
[resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
c->set_heartbeat_timeout( heartbeat_timeout );
std::lock_guard g( connections_mtx );
auto [it, inserted] = connections.emplace( connection_detail{
.host = peer_address,
.c = std::move(c),
.ips = results
});
if( !err ) {
it->c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
it->c->set_state(connection::connection_state::closed);
++(it->c->consecutive_immediate_connection_close);
}
} );
{
std::shared_lock g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";
}

return "added connection";
}
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
if (c->resolve_and_connect()) {
add(std::move(c));

void connections_manager::update_connection_endpoint(connection_ptr c,
const tcp::endpoint& endpoint) {
std::unique_lock g( connections_mtx );
auto& index = connections.get<by_connection>();
const auto& it = index.find(c);
if( it != index.end() ) {
index.modify(it, [endpoint](connection_detail& cd) {
cd.active_ip = endpoint;
});
return "added connection";
}

return "connection failed";
}

void connections_manager::connect(const connection_ptr& c) {
std::lock_guard g( connections_mtx );
const auto& index = connections.get<by_connection>();
const auto& it = index.find(c);
if( it != index.end() ) {
it->c->connect( it->ips );
// called from any thread
bool connection::resolve_and_connect() {
switch ( no_retry ) {
case no_reason:
case wrong_version:
case benign_other:
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
break;
default:
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}

auto [host, port, type] = split_host_port_type(peer_address(), false);
if (host.empty())
return false;

connection_ptr c = shared_from_this();

if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}

strand.post([c, host, port]() {
auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );
resolver->async_resolve(host, port,
[resolver, c, host, port]
( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
c->set_heartbeat_timeout( my_impl->connections.get_heartbeat_timeout() );
if( !err ) {
c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
c->set_state(connection::connection_state::closed);
++c->consecutive_immediate_connection_close;
}
} );
} );

return true;
}

// called by API
Expand Down Expand Up @@ -4748,21 +4733,31 @@ namespace eosio {
}

std::optional<connection_status> connections_manager::status( const string& host )const {
std::shared_lock g( connections_mtx );
auto con = find_connection_i( host );
connection_ptr con;
{
std::shared_lock g( connections_mtx );
con = find_connection_i( host );
}
if( con ) {
return con->get_status();
}
return {};
}

vector<connection_status> connections_manager::connection_statuses()const {
vector<connection_ptr> conns;
vector<connection_status> result;
std::shared_lock g( connections_mtx );
auto& index = connections.get<by_connection>();
result.reserve( index.size() );
for( const connection_detail& cd : index ) {
result.emplace_back( cd.c->get_status() );
{
std::shared_lock g( connections_mtx );
auto& index = connections.get<by_connection>();
result.reserve( index.size() );
conns.reserve( index.size() );
for( const connection_detail& cd : index ) {
conns.emplace_back( cd.c );
}
}
for (const auto& c : conns) {
result.push_back( c->get_status() );
}
return result;
}
Expand Down Expand Up @@ -4824,7 +4819,7 @@ namespace eosio {
auto cleanup = [&num_peers, &num_rm, this](vector<connection_ptr>&& reconnecting,
vector<connection_ptr>&& removing) {
for( auto& c : reconnecting ) {
if (!c->reconnect()) {
if (!c->resolve_and_connect()) {
--num_peers;
++num_rm;
removing.push_back(c);
Expand Down Expand Up @@ -4890,7 +4885,7 @@ namespace eosio {
assert(update_p2p_connection_metrics);
auto from = from_connection.lock();
std::shared_lock g(connections_mtx);
auto& index = connections.get<by_connection>();
const auto& index = connections.get<by_connection>();
size_t num_clients = 0, num_peers = 0, num_bp_peers = 0;
net_plugin::p2p_per_connection_metrics per_connection(index.size());
for (auto it = index.begin(); it != index.end(); ++it) {
Expand Down

0 comments on commit eb40e5e

Please sign in to comment.