Skip to content

Commit

Permalink
fix: census should update ENR on liveness check (#1560)
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev authored Oct 28, 2024
1 parent 12da3d1 commit 47fdf1e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 31 deletions.
93 changes: 77 additions & 16 deletions portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::time::Duration;

use anyhow::{anyhow, bail};
use anyhow::{anyhow, bail, ensure};
use discv5::enr::NodeId;
use ethportal_api::{
generate_random_node_ids, jsonrpsee::http_client::HttpClient, types::network::Subnetwork,
generate_random_node_ids,
jsonrpsee::http_client::HttpClient,
types::{distance::Distance, network::Subnetwork, portal::PongInfo},
BeaconNetworkApiClient, Enr, HistoryNetworkApiClient, StateNetworkApiClient,
};
use futures::{future::JoinAll, StreamExt};
Expand Down Expand Up @@ -232,26 +234,82 @@ impl Network {
}
}

if self.ping(enr).await {
LivenessResult::Pass
let Ok(pong_info) = self.ping(&enr).await else {
self.peers.record_failed_liveness_check(&enr);
return LivenessResult::Fail;
};

let radius = Distance::from(pong_info.data_radius);

// If ENR seq is not the latest one, fetch fresh ENR
let enr = if enr.seq() < pong_info.enr_seq {
let Ok(enr) = self.fetch_enr(&enr).await else {
self.peers.record_failed_liveness_check(&enr);
return LivenessResult::Fail;
};
enr
} else {
LivenessResult::Fail
}
if enr.seq() > pong_info.enr_seq {
warn!(
subnetwork = %self.subnetwork,
"liveness_check: enr seq from pong ({}) is older than the one we know: {enr}",
pong_info.enr_seq
);
}
enr
};

self.peers.record_successful_liveness_check(enr, radius);
LivenessResult::Pass
}

async fn ping(&self, enr: Enr) -> bool {
if !self.is_eligible(&enr) {
return false;
}
async fn ping(&self, enr: &Enr) -> anyhow::Result<PongInfo> {
ensure!(self.is_eligible(enr), "ping: peer is filtered out");

let future_response = match self.subnetwork {
match self.subnetwork {
Subnetwork::History => HistoryNetworkApiClient::ping(&self.client, enr.clone()),
Subnetwork::State => StateNetworkApiClient::ping(&self.client, enr.clone()),
Subnetwork::Beacon => BeaconNetworkApiClient::ping(&self.client, enr.clone()),
_ => unreachable!("Unsupported subnetwork: {}", self.subnetwork),
};
let response = future_response.await.map_err(|e| anyhow!(e));
self.peers.process_ping_response(enr, response)
_ => unreachable!("ping: unsupported subnetwork: {}", self.subnetwork),
}
.await
.map_err(|err| anyhow!(err))
}

/// Fetches node's ENR.
///
/// Should be used when ENR sequence returned by Ping request is higher than the one we know.
async fn fetch_enr(&self, enr: &Enr) -> anyhow::Result<Enr> {
ensure!(self.is_eligible(enr), "fetch_enr: peer is filtered out");

let enrs = self.find_nodes(enr, /* distances= */ vec![0]).await?;
if enrs.len() != 1 {
warn!(
subnetwork = %self.subnetwork,
"fetch_enr: expected 1 enr, received: {}",
enrs.len()
);
}
enrs.into_iter()
.find(|response_enr| response_enr.node_id() == enr.node_id())
.ok_or_else(|| anyhow!("fetch_enr: response doesn't contain requested NodeId"))
}

async fn find_nodes(&self, enr: &Enr, distances: Vec<u16>) -> anyhow::Result<Vec<Enr>> {
match self.subnetwork {
Subnetwork::History => {
HistoryNetworkApiClient::find_nodes(&self.client, enr.clone(), distances)
}
Subnetwork::State => {
StateNetworkApiClient::find_nodes(&self.client, enr.clone(), distances)
}
Subnetwork::Beacon => {
BeaconNetworkApiClient::find_nodes(&self.client, enr.clone(), distances)
}
_ => unreachable!("find_nodes: unsupported subnetwork: {}", self.subnetwork),
}
.await
.map_err(|err| anyhow!(err))
}

async fn recursive_find_nodes(&self, node_id: NodeId) -> anyhow::Result<Vec<Enr>> {
Expand All @@ -265,7 +323,10 @@ impl Network {
Subnetwork::Beacon => {
BeaconNetworkApiClient::recursive_find_nodes(&self.client, node_id).await?
}
_ => unreachable!("Unsupported subnetwork: {}", self.subnetwork),
_ => unreachable!(
"recursive_find_nodes: unsupported subnetwork: {}",
self.subnetwork
),
};
Ok(enrs
.into_iter()
Expand Down
27 changes: 12 additions & 15 deletions portal-bridge/src/census/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ use std::{

use delay_map::HashMapDelay;
use ethportal_api::{
types::{
distance::{Distance, Metric, XorMetric},
portal::PongInfo,
},
types::distance::{Distance, Metric, XorMetric},
Enr,
};
use futures::Stream;
use rand::seq::IteratorRandom;
use tokio::time::Instant;
use tracing::warn;

/// How frequently liveness check should be done.
///
Expand Down Expand Up @@ -55,18 +53,17 @@ impl Peers {
.deadline(&enr.node_id().raw())
}

pub fn process_ping_response(&self, enr: Enr, ping_response: anyhow::Result<PongInfo>) -> bool {
pub fn record_successful_liveness_check(&self, enr: Enr, radius: Distance) {
self.peers
.write()
.expect("to get peers lock")
.insert(enr.node_id().raw(), (enr, radius));
}

pub fn record_failed_liveness_check(&self, enr: &Enr) {
let mut peers = self.peers.write().expect("to get peers lock");
match ping_response {
Ok(pong_info) => {
let data_radius = Distance::from(pong_info.data_radius);
peers.insert(enr.node_id().raw(), (enr, data_radius));
true
}
Err(_) => {
peers.remove(&enr.node_id().raw());
false
}
if peers.remove(&enr.node_id().raw()).is_some() {
warn!("liveness check failed, peer removed: {}", enr.node_id());
}
}

Expand Down

0 comments on commit 47fdf1e

Please sign in to comment.