From d281f3283563cd33ab116b3b3d1ed257eb7554a2 Mon Sep 17 00:00:00 2001 From: Milos Stankovic <82043364+morph-dev@users.noreply.github.com> Date: Thu, 31 Oct 2024 11:51:28 +0200 Subject: [PATCH 1/2] feat(census): keep track of liveness checks --- portal-bridge/src/census/mod.rs | 2 + portal-bridge/src/census/network.rs | 86 ++++++++--------- portal-bridge/src/census/peer.rs | 109 ++++++++++++++++++++++ portal-bridge/src/census/peers.rs | 140 ++++++++++++++++++---------- 4 files changed, 241 insertions(+), 96 deletions(-) create mode 100644 portal-bridge/src/census/peer.rs diff --git a/portal-bridge/src/census/mod.rs b/portal-bridge/src/census/mod.rs index dcef95f93..82c338c97 100644 --- a/portal-bridge/src/census/mod.rs +++ b/portal-bridge/src/census/mod.rs @@ -9,6 +9,7 @@ use crate::cli::BridgeConfig; use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManager}; mod network; +mod peer; mod peers; /// The error that occured in [Census]. @@ -32,6 +33,7 @@ pub const ENR_OFFER_LIMIT: usize = 4; /// The census is responsible for maintaining a list of known peers in the network, /// checking their liveness, updating their data radius, iterating through their /// rfn to find new peers, and providing interested enrs for a given content id. +#[derive(Clone)] pub struct Census { history: Network, state: Network, diff --git a/portal-bridge/src/census/network.rs b/portal-bridge/src/census/network.rs index 1958a8c76..3057fd6f0 100644 --- a/portal-bridge/src/census/network.rs +++ b/portal-bridge/src/census/network.rs @@ -23,15 +23,15 @@ use crate::{ use super::peers::Peers; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] /// The result of the liveness check. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum LivenessResult { /// We pinged the peer successfully Pass, /// We failed to ping peer Fail, - /// Peer is already registered and not expired (we didn't try to ping the peer) - AlreadyRegistered, + /// Peer is already known and doesn't need liveness check + Fresh, } #[derive(Debug, Clone)] @@ -98,7 +98,7 @@ impl Network { NetworkManager::new(self.clone()) } - // Look up all known interested enrs for a given content id + /// Look up interested enrs for a given content id pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result, CensusError> { if self.peers.is_empty() { error!( @@ -174,8 +174,8 @@ impl Network { .collect_vec(); // Concurrent execution of liveness check - let new_peers = enrs - .iter() + let starting_peers_count = self.peers.len(); + enrs.iter() .map(|enr| async { if let Ok(_permit) = semaphore.acquire().await { self.liveness_check(enr.clone()).await @@ -188,12 +188,10 @@ impl Network { } }) .collect::>() - .await - .into_iter() - .filter(|liveness_result| liveness_result == &LivenessResult::Pass) - .count(); + .await; let total_peers = self.peers.len(); + let new_peers = total_peers - starting_peers_count; debug!( subnetwork = %self.subnetwork, @@ -225,13 +223,15 @@ impl Network { /// Performs liveness check. /// /// Liveness check will pass if peer respond to a Ping request. It returns - /// `LivenessResult::AlreadyRegistered` if peer is already registered and not expired. + /// `LivenessResult::Fresh` if peer is already known and doesn't need liveness check. async fn liveness_check(&self, enr: Enr) -> LivenessResult { - // if enr is already registered, check if delay map deadline has expired - if let Some(deadline) = self.peers.deadline(&enr) { - if Instant::now() < deadline { - return LivenessResult::AlreadyRegistered; - } + // check if peer needs liveness check + if self + .peers + .next_liveness_check(&enr) + .is_some_and(|next_liveness_check| Instant::now() < next_liveness_check) + { + return LivenessResult::Fresh; } let Ok(pong_info) = self.ping(&enr).await else { @@ -259,7 +259,7 @@ impl Network { enr }; - self.peers.record_successful_liveness_check(enr, radius); + self.peers.record_successful_liveness_check(&enr, radius); LivenessResult::Pass } @@ -367,29 +367,21 @@ impl NetworkManager { /// Returns next action that should be executed. pub async fn next_action(&mut self) -> NetworkAction { - loop { - tokio::select! { - _ = self.peer_discovery_interval.tick() => { - return NetworkAction::PeerDiscovery; - } - peer = self.network.peers.next() => { - match peer { - Some(Ok(enr)) => { - return NetworkAction::LivenessCheck(enr); - } - Some(Err(err)) => { - error!( - subnetwork = %self.network.subnetwork, - "next-action: error getting peer - err: {err}", - ); - } - None => { - warn!( - subnetwork = %self.network.subnetwork, - "next-action: no pending peers - re-initializing", - ); - return NetworkAction::ReInitialization; - } + tokio::select! { + _ = self.peer_discovery_interval.tick() => { + NetworkAction::PeerDiscovery + } + peer = self.network.peers.next() => { + match peer { + Some(enr) => { + NetworkAction::LivenessCheck(enr) + } + None => { + warn!( + subnetwork = %self.network.subnetwork, + "next-action: no pending peers - re-initializing", + ); + NetworkAction::ReInitialization } } } @@ -415,7 +407,7 @@ impl NetworkManager { } NetworkAction::PeerDiscovery => self.peer_discovery().await, NetworkAction::LivenessCheck(enr) => { - if self.network.liveness_check(enr).await == LivenessResult::AlreadyRegistered { + if self.network.liveness_check(enr).await == LivenessResult::Fresh { warn!( subnetwork = %self.network.subnetwork, "execute-action: liveness check on already registered peer", @@ -438,17 +430,15 @@ impl NetworkManager { } }; - let mut new_peers = 0; + let starting_peers = self.network.peers.len(); for enr in enrs { - if self.network.liveness_check(enr).await == LivenessResult::Pass { - new_peers += 1; - } + self.network.liveness_check(enr).await; } - - let total_peers = self.network.peers.len(); + let ending_peers = self.network.peers.len(); + let new_peers = ending_peers - starting_peers; info!( subnetwork = %self.network.subnetwork, - "peer-discovery: finished - discovered {new_peers} / {total_peers} peers", + "peer-discovery: finished - discovered {new_peers} / {ending_peers} peers", ); } } diff --git a/portal-bridge/src/census/peer.rs b/portal-bridge/src/census/peer.rs new file mode 100644 index 000000000..c6ad65a12 --- /dev/null +++ b/portal-bridge/src/census/peer.rs @@ -0,0 +1,109 @@ +use std::{ + collections::VecDeque, + time::{Duration, Instant}, +}; + +use discv5::Enr; +use ethportal_api::types::distance::{Distance, Metric, XorMetric}; +use tracing::error; + +#[derive(Debug, Clone)] +pub struct LivenessCheck { + success: bool, + #[allow(dead_code)] + timestamp: Instant, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct OfferEvent { + success: bool, + timestamp: Instant, + content_value_size: usize, + duration: Duration, +} + +#[derive(Debug)] +/// Stores information about peer and its most recent interactions. +pub struct Peer { + enr: Enr, + radius: Distance, + /// Liveness checks, ordered from most recent (index `0`), to the earliest. + /// + /// Contains at most [Self::MAX_LIVENESS_CHECKS] entries. + liveness_checks: VecDeque, +} + +impl Peer { + /// The maximum number of liveness checks that we store. Value chosen arbitrarily. + const MAX_LIVENESS_CHECKS: usize = 10; + + pub fn new(enr: Enr) -> Self { + Self { + enr, + radius: Distance::ZERO, + liveness_checks: VecDeque::with_capacity(Self::MAX_LIVENESS_CHECKS + 1), + } + } + + pub fn enr(&self) -> Enr { + self.enr.clone() + } + + /// Returns true if latest liveness check was successful and content is within radius. + pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool { + // check that most recent liveness check was successful + if !self + .liveness_checks + .front() + .is_some_and(|liveness_check| liveness_check.success) + { + return false; + } + + let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id); + distance <= self.radius + } + + /// Returns true if all latest [Self::MAX_LIVENESS_CHECKS] liveness checks failed. + pub fn is_obsolete(&self) -> bool { + if self.liveness_checks.len() < Self::MAX_LIVENESS_CHECKS { + return false; + } + self.liveness_checks + .iter() + .all(|liveness_check| !liveness_check.success) + } + + pub fn record_successful_liveness_check(&mut self, enr: &Enr, radius: Distance) { + if self.enr.seq() > enr.seq() { + error!( + "successful_liveness_check: received outdated enr: {enr} (existing enr: {})", + self.enr.seq() + ); + } else { + self.enr = enr.clone(); + } + self.radius = radius; + self.liveness_checks.push_front(LivenessCheck { + success: true, + timestamp: Instant::now(), + }); + self.purge(); + } + + pub fn record_failed_liveness_check(&mut self) { + self.liveness_checks.push_front(LivenessCheck { + success: false, + timestamp: Instant::now(), + }); + self.purge(); + } + + /// Removes oldest liveness checks and offer events, if we exceeded capacity. + fn purge(&mut self) { + if self.liveness_checks.len() > Self::MAX_LIVENESS_CHECKS { + self.liveness_checks.drain(Self::MAX_LIVENESS_CHECKS..); + } + } +} diff --git a/portal-bridge/src/census/peers.rs b/portal-bridge/src/census/peers.rs index 44d4f1bcc..341d3a793 100644 --- a/portal-bridge/src/census/peers.rs +++ b/portal-bridge/src/census/peers.rs @@ -1,26 +1,36 @@ use std::{ + collections::HashMap, pin::Pin, - sync::{Arc, RwLock}, + sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, task::{Context, Poll}, time::Duration, }; -use delay_map::HashMapDelay; -use ethportal_api::{ - types::distance::{Distance, Metric, XorMetric}, - Enr, -}; +use delay_map::HashSetDelay; +use discv5::enr::NodeId; +use ethportal_api::{types::distance::Distance, Enr}; use futures::Stream; use rand::seq::IteratorRandom; use tokio::time::Instant; -use tracing::warn; +use tracing::error; + +use super::peer::Peer; /// How frequently liveness check should be done. /// /// Five minutes is chosen arbitrarily. const LIVENESS_CHECK_DELAY: Duration = Duration::from_secs(300); -type PeersHashMapDelay = HashMapDelay<[u8; 32], (Enr, Distance)>; +/// Stores peers and when they should be checked for liveness. +/// +/// Convinient structure for holding both objects behind single [RwLock]. +#[derive(Debug)] +struct PeersWithLivenessChecks { + /// Stores peers and their info + peers: HashMap, + /// Stores when peers should be checked for liveness using [HashSetDelay]. + liveness_checks: HashSetDelay, +} /// Contains all discovered peers on the network. /// @@ -28,77 +38,111 @@ type PeersHashMapDelay = HashMapDelay<[u8; 32], (Enr, Distance)>; /// pinged for liveness. #[derive(Clone, Debug)] pub(super) struct Peers { - peers: Arc>, + peers: Arc>, +} + +impl Default for Peers { + fn default() -> Self { + Self::new() + } } impl Peers { pub fn new() -> Self { Self { - peers: Arc::new(RwLock::new(HashMapDelay::new(LIVENESS_CHECK_DELAY))), + peers: Arc::new(RwLock::new(PeersWithLivenessChecks { + peers: HashMap::new(), + liveness_checks: HashSetDelay::new(LIVENESS_CHECK_DELAY), + })), } } pub fn is_empty(&self) -> bool { - self.peers.read().expect("to get peers lock").is_empty() + self.read().peers.is_empty() } pub fn len(&self) -> usize { - self.peers.read().expect("to get peers lock").len() + self.read().peers.len() } - pub fn deadline(&self, enr: &Enr) -> Option { - self.peers - .read() - .expect("to get peers lock") - .deadline(&enr.node_id().raw()) + pub fn next_liveness_check(&self, enr: &Enr) -> Option { + self.read().liveness_checks.deadline(&enr.node_id()) } - pub fn record_successful_liveness_check(&self, enr: Enr, radius: Distance) { - self.peers - .write() - .expect("to get peers lock") - .insert(enr.node_id().raw(), (enr, radius)); + pub fn record_successful_liveness_check(&self, enr: &Enr, radius: Distance) { + let node_id = enr.node_id(); + let mut guard = self.write(); + guard + .peers + .entry(node_id) + .or_insert_with(|| Peer::new(enr.clone())) + .record_successful_liveness_check(enr, radius); + guard.liveness_checks.insert(node_id); } pub fn record_failed_liveness_check(&self, enr: &Enr) { - let mut peers = self.peers.write().expect("to get peers lock"); - if peers.remove(&enr.node_id().raw()).is_some() { - warn!("liveness check failed, peer removed: {}", enr.node_id()); + let node_id = enr.node_id(); + + let mut guard = self.write(); + + let Some(peer) = guard.peers.get_mut(&node_id) else { + error!("record_failed_liveness_check: unknown peer: {node_id}"); + guard.liveness_checks.remove(&node_id); + return; + }; + + peer.record_failed_liveness_check(); + + if peer.is_obsolete() { + guard.peers.remove(&node_id); + guard.liveness_checks.remove(&node_id); + } else { + guard.liveness_checks.insert(node_id); } } /// Selects random `limit` peers that should be interested in content. pub fn get_interested_enrs(&self, content_id: &[u8; 32], limit: usize) -> Vec { - self.peers - .read() - .expect("to get peers lock") - .iter() - .filter_map(|(node_id, (enr, data_radius))| { - let distance = XorMetric::distance(node_id, content_id); - if data_radius >= &distance { - Some(enr.clone()) - } else { - None - } - }) + self.read() + .peers + .values() + .filter(|peer| peer.is_interested_in_content(content_id)) + .map(Peer::enr) .choose_multiple(&mut rand::thread_rng(), limit) } + + fn read(&self) -> RwLockReadGuard<'_, PeersWithLivenessChecks> { + self.peers.read().expect("to get peers lock") + } + + fn write(&self) -> RwLockWriteGuard<'_, PeersWithLivenessChecks> { + self.peers.write().expect("to get peers lock") + } } impl Stream for Peers { - type Item = Result; + type Item = Enr; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.peers - .write() - .expect("to get peers lock") - .poll_expired(cx) - .map_ok(|(_node_id, (enr, _distance))| enr) - } -} + let mut guard = self.write(); -impl Default for Peers { - fn default() -> Self { - Self::new() + // Poll expired until non-error is returned. + // Error can happen only if there is some race condition, which shouldn't happen because + // of the RwLock. + loop { + match guard.liveness_checks.poll_expired(cx) { + Poll::Ready(Some(Ok(node_id))) => match guard.peers.get(&node_id) { + Some(peer) => break Poll::Ready(Some(peer.enr())), + None => { + error!("poll_next: unknown peer: {node_id}"); + } + }, + Poll::Ready(Some(Err(err))) => { + error!("poll_next: error getting peer - err: {err}"); + } + Poll::Ready(None) => break Poll::Ready(None), + Poll::Pending => break Poll::Pending, + } + } } } From 1fd487ef636cdb8a4bfe27324c132133324c8ebe Mon Sep 17 00:00:00 2001 From: Milos Stankovic <82043364+morph-dev@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:51:26 +0200 Subject: [PATCH 2/2] fix: pr comments --- portal-bridge/src/census/network.rs | 22 ++++++++++------------ portal-bridge/src/census/peer.rs | 11 +++++++++-- portal-bridge/src/census/peers.rs | 8 ++++---- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/portal-bridge/src/census/network.rs b/portal-bridge/src/census/network.rs index 3057fd6f0..c43bf8c5b 100644 --- a/portal-bridge/src/census/network.rs +++ b/portal-bridge/src/census/network.rs @@ -174,7 +174,7 @@ impl Network { .collect_vec(); // Concurrent execution of liveness check - let starting_peers_count = self.peers.len(); + let starting_peers = self.peers.len() as f64; enrs.iter() .map(|enr| async { if let Ok(_permit) = semaphore.acquire().await { @@ -189,17 +189,16 @@ impl Network { }) .collect::>() .await; - - let total_peers = self.peers.len(); - let new_peers = total_peers - starting_peers_count; + let ending_peers = self.peers.len() as f64; + let new_peers = ending_peers - starting_peers; debug!( subnetwork = %self.subnetwork, - "init: added {new_peers} / {total_peers} peers", + "init: added {new_peers} / {ending_peers} peers", ); // Stop if number of new peers is less than a threshold fraction of all peers - if (new_peers as f64) < (total_peers as f64) * config.stop_fraction_threshold { + if new_peers < ending_peers * config.stop_fraction_threshold { break; } } @@ -228,14 +227,14 @@ impl Network { // check if peer needs liveness check if self .peers - .next_liveness_check(&enr) + .next_liveness_check(&enr.node_id()) .is_some_and(|next_liveness_check| Instant::now() < next_liveness_check) { return LivenessResult::Fresh; } let Ok(pong_info) = self.ping(&enr).await else { - self.peers.record_failed_liveness_check(&enr); + self.peers.record_failed_liveness_check(enr); return LivenessResult::Fail; }; @@ -244,7 +243,7 @@ impl Network { // If ENR seq is not the latest one, fetch fresh ENR let enr = if enr.seq() < pong_info.enr_seq { let Ok(enr) = self.fetch_enr(&enr).await else { - self.peers.record_failed_liveness_check(&enr); + self.peers.record_failed_liveness_check(enr); return LivenessResult::Fail; }; enr @@ -259,7 +258,7 @@ impl Network { enr }; - self.peers.record_successful_liveness_check(&enr, radius); + self.peers.record_successful_liveness_check(enr, radius); LivenessResult::Pass } @@ -435,10 +434,9 @@ impl NetworkManager { self.network.liveness_check(enr).await; } let ending_peers = self.network.peers.len(); - let new_peers = ending_peers - starting_peers; info!( subnetwork = %self.network.subnetwork, - "peer-discovery: finished - discovered {new_peers} / {ending_peers} peers", + "peer-discovery: finished - peers: {starting_peers} -> {ending_peers}", ); } } diff --git a/portal-bridge/src/census/peer.rs b/portal-bridge/src/census/peer.rs index c6ad65a12..b6f391586 100644 --- a/portal-bridge/src/census/peer.rs +++ b/portal-bridge/src/census/peer.rs @@ -75,14 +75,21 @@ impl Peer { .all(|liveness_check| !liveness_check.success) } - pub fn record_successful_liveness_check(&mut self, enr: &Enr, radius: Distance) { + pub fn record_successful_liveness_check(&mut self, enr: Enr, radius: Distance) { + assert_eq!( + self.enr.node_id(), + enr.node_id(), + "Received enr for different peer. Expected node-id: {}, received enr: {enr}", + self.enr.node_id(), + ); + if self.enr.seq() > enr.seq() { error!( "successful_liveness_check: received outdated enr: {enr} (existing enr: {})", self.enr.seq() ); } else { - self.enr = enr.clone(); + self.enr = enr; } self.radius = radius; self.liveness_checks.push_front(LivenessCheck { diff --git a/portal-bridge/src/census/peers.rs b/portal-bridge/src/census/peers.rs index 341d3a793..d7636d2d8 100644 --- a/portal-bridge/src/census/peers.rs +++ b/portal-bridge/src/census/peers.rs @@ -65,11 +65,11 @@ impl Peers { self.read().peers.len() } - pub fn next_liveness_check(&self, enr: &Enr) -> Option { - self.read().liveness_checks.deadline(&enr.node_id()) + pub fn next_liveness_check(&self, node_id: &NodeId) -> Option { + self.read().liveness_checks.deadline(node_id) } - pub fn record_successful_liveness_check(&self, enr: &Enr, radius: Distance) { + pub fn record_successful_liveness_check(&self, enr: Enr, radius: Distance) { let node_id = enr.node_id(); let mut guard = self.write(); guard @@ -80,7 +80,7 @@ impl Peers { guard.liveness_checks.insert(node_id); } - pub fn record_failed_liveness_check(&self, enr: &Enr) { + pub fn record_failed_liveness_check(&self, enr: Enr) { let node_id = enr.node_id(); let mut guard = self.write();