From 47fdf1eb09d37057e93f72e2e718683190a227cd Mon Sep 17 00:00:00 2001 From: Milos Stankovic <82043364+morph-dev@users.noreply.github.com> Date: Mon, 28 Oct 2024 23:42:22 +0200 Subject: [PATCH] fix: census should update ENR on liveness check (#1560) --- portal-bridge/src/census/network.rs | 93 ++++++++++++++++++++++++----- portal-bridge/src/census/peers.rs | 27 ++++----- 2 files changed, 89 insertions(+), 31 deletions(-) diff --git a/portal-bridge/src/census/network.rs b/portal-bridge/src/census/network.rs index 6e743dfcd..1958a8c76 100644 --- a/portal-bridge/src/census/network.rs +++ b/portal-bridge/src/census/network.rs @@ -1,9 +1,11 @@ use std::time::Duration; -use anyhow::{anyhow, bail}; +use anyhow::{anyhow, bail, ensure}; use discv5::enr::NodeId; use ethportal_api::{ - generate_random_node_ids, jsonrpsee::http_client::HttpClient, types::network::Subnetwork, + generate_random_node_ids, + jsonrpsee::http_client::HttpClient, + types::{distance::Distance, network::Subnetwork, portal::PongInfo}, BeaconNetworkApiClient, Enr, HistoryNetworkApiClient, StateNetworkApiClient, }; use futures::{future::JoinAll, StreamExt}; @@ -232,26 +234,82 @@ impl Network { } } - if self.ping(enr).await { - LivenessResult::Pass + let Ok(pong_info) = self.ping(&enr).await else { + self.peers.record_failed_liveness_check(&enr); + return LivenessResult::Fail; + }; + + let radius = Distance::from(pong_info.data_radius); + + // If ENR seq is not the latest one, fetch fresh ENR + let enr = if enr.seq() < pong_info.enr_seq { + let Ok(enr) = self.fetch_enr(&enr).await else { + self.peers.record_failed_liveness_check(&enr); + return LivenessResult::Fail; + }; + enr } else { - LivenessResult::Fail - } + if enr.seq() > pong_info.enr_seq { + warn!( + subnetwork = %self.subnetwork, + "liveness_check: enr seq from pong ({}) is older than the one we know: {enr}", + pong_info.enr_seq + ); + } + enr + }; + + self.peers.record_successful_liveness_check(enr, radius); + LivenessResult::Pass } - async fn ping(&self, enr: Enr) -> bool { - if !self.is_eligible(&enr) { - return false; - } + async fn ping(&self, enr: &Enr) -> anyhow::Result { + ensure!(self.is_eligible(enr), "ping: peer is filtered out"); - let future_response = match self.subnetwork { + match self.subnetwork { Subnetwork::History => HistoryNetworkApiClient::ping(&self.client, enr.clone()), Subnetwork::State => StateNetworkApiClient::ping(&self.client, enr.clone()), Subnetwork::Beacon => BeaconNetworkApiClient::ping(&self.client, enr.clone()), - _ => unreachable!("Unsupported subnetwork: {}", self.subnetwork), - }; - let response = future_response.await.map_err(|e| anyhow!(e)); - self.peers.process_ping_response(enr, response) + _ => unreachable!("ping: unsupported subnetwork: {}", self.subnetwork), + } + .await + .map_err(|err| anyhow!(err)) + } + + /// Fetches node's ENR. + /// + /// Should be used when ENR sequence returned by Ping request is higher than the one we know. + async fn fetch_enr(&self, enr: &Enr) -> anyhow::Result { + ensure!(self.is_eligible(enr), "fetch_enr: peer is filtered out"); + + let enrs = self.find_nodes(enr, /* distances= */ vec![0]).await?; + if enrs.len() != 1 { + warn!( + subnetwork = %self.subnetwork, + "fetch_enr: expected 1 enr, received: {}", + enrs.len() + ); + } + enrs.into_iter() + .find(|response_enr| response_enr.node_id() == enr.node_id()) + .ok_or_else(|| anyhow!("fetch_enr: response doesn't contain requested NodeId")) + } + + async fn find_nodes(&self, enr: &Enr, distances: Vec) -> anyhow::Result> { + match self.subnetwork { + Subnetwork::History => { + HistoryNetworkApiClient::find_nodes(&self.client, enr.clone(), distances) + } + Subnetwork::State => { + StateNetworkApiClient::find_nodes(&self.client, enr.clone(), distances) + } + Subnetwork::Beacon => { + BeaconNetworkApiClient::find_nodes(&self.client, enr.clone(), distances) + } + _ => unreachable!("find_nodes: unsupported subnetwork: {}", self.subnetwork), + } + .await + .map_err(|err| anyhow!(err)) } async fn recursive_find_nodes(&self, node_id: NodeId) -> anyhow::Result> { @@ -265,7 +323,10 @@ impl Network { Subnetwork::Beacon => { BeaconNetworkApiClient::recursive_find_nodes(&self.client, node_id).await? } - _ => unreachable!("Unsupported subnetwork: {}", self.subnetwork), + _ => unreachable!( + "recursive_find_nodes: unsupported subnetwork: {}", + self.subnetwork + ), }; Ok(enrs .into_iter() diff --git a/portal-bridge/src/census/peers.rs b/portal-bridge/src/census/peers.rs index f13b56010..44d4f1bcc 100644 --- a/portal-bridge/src/census/peers.rs +++ b/portal-bridge/src/census/peers.rs @@ -7,15 +7,13 @@ use std::{ use delay_map::HashMapDelay; use ethportal_api::{ - types::{ - distance::{Distance, Metric, XorMetric}, - portal::PongInfo, - }, + types::distance::{Distance, Metric, XorMetric}, Enr, }; use futures::Stream; use rand::seq::IteratorRandom; use tokio::time::Instant; +use tracing::warn; /// How frequently liveness check should be done. /// @@ -55,18 +53,17 @@ impl Peers { .deadline(&enr.node_id().raw()) } - pub fn process_ping_response(&self, enr: Enr, ping_response: anyhow::Result) -> bool { + pub fn record_successful_liveness_check(&self, enr: Enr, radius: Distance) { + self.peers + .write() + .expect("to get peers lock") + .insert(enr.node_id().raw(), (enr, radius)); + } + + pub fn record_failed_liveness_check(&self, enr: &Enr) { let mut peers = self.peers.write().expect("to get peers lock"); - match ping_response { - Ok(pong_info) => { - let data_radius = Distance::from(pong_info.data_radius); - peers.insert(enr.node_id().raw(), (enr, data_radius)); - true - } - Err(_) => { - peers.remove(&enr.node_id().raw()); - false - } + if peers.remove(&enr.node_id().raw()).is_some() { + warn!("liveness check failed, peer removed: {}", enr.node_id()); } }