From 8314365de364be8f04bb4ae7c0e68fc89996394e Mon Sep 17 00:00:00 2001 From: Milos Stankovic <82043364+morph-dev@users.noreply.github.com> Date: Sat, 2 Nov 2024 11:29:01 +0200 Subject: [PATCH] feat(census): add peer scoring (#1579) --- portal-bridge/src/bridge/state.rs | 12 +-- portal-bridge/src/census/mod.rs | 11 +-- portal-bridge/src/census/network.rs | 22 +++--- portal-bridge/src/census/peer.rs | 35 ++++----- portal-bridge/src/census/peers.rs | 34 ++++---- portal-bridge/src/census/scoring.rs | 115 ++++++++++++++++++++++++++++ 6 files changed, 168 insertions(+), 61 deletions(-) create mode 100644 portal-bridge/src/census/scoring.rs diff --git a/portal-bridge/src/bridge/state.rs b/portal-bridge/src/bridge/state.rs index e7c95cb09..54087a60b 100644 --- a/portal-bridge/src/bridge/state.rs +++ b/portal-bridge/src/bridge/state.rs @@ -291,20 +291,16 @@ impl StateBridge { Ok(()) } - // request enrs interested in the content key from Census - fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result> { - Ok(self - .census - .get_interested_enrs(Subnetwork::State, &content_key.content_id())?) - } - // spawn individual offer tasks of the content key for each interested enr found in Census async fn spawn_offer_tasks( &self, content_key: StateContentKey, content_value: StateContentValue, ) { - let Ok(enrs) = self.request_enrs(&content_key) else { + let Ok(enrs) = self + .census + .select_peers(Subnetwork::State, &content_key.content_id()) + else { error!("Failed to request enrs for content key, skipping offer: {content_key:?}"); return; }; diff --git a/portal-bridge/src/census/mod.rs b/portal-bridge/src/census/mod.rs index 7b125f238..c37eb0b0e 100644 --- a/portal-bridge/src/census/mod.rs +++ b/portal-bridge/src/census/mod.rs @@ -16,6 +16,7 @@ use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManage mod network; mod peer; mod peers; +mod scoring; /// The error that occured in [Census]. #[derive(Error, Debug)] @@ -59,16 +60,16 @@ impl Census { } } - /// Returns ENRs interested in provided content id. - pub fn get_interested_enrs( + /// Selects peers to receive content. + pub fn select_peers( &self, subnetwork: Subnetwork, content_id: &[u8; 32], ) -> Result, CensusError> { match subnetwork { - Subnetwork::History => self.history.get_interested_enrs(content_id), - Subnetwork::State => self.state.get_interested_enrs(content_id), - Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id), + Subnetwork::History => self.history.select_peers(content_id), + Subnetwork::State => self.state.select_peers(content_id), + Subnetwork::Beacon => self.beacon.select_peers(content_id), _ => Err(CensusError::UnsupportedSubnetwork(subnetwork)), } } diff --git a/portal-bridge/src/census/network.rs b/portal-bridge/src/census/network.rs index 57c94d0b7..82d27299a 100644 --- a/portal-bridge/src/census/network.rs +++ b/portal-bridge/src/census/network.rs @@ -21,7 +21,10 @@ use crate::{ cli::{BridgeConfig, ClientType}, }; -use super::peers::Peers; +use super::{ + peers::Peers, + scoring::{AdditiveWeight, PeerSelector}, +}; /// The result of the liveness check. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -69,11 +72,10 @@ impl Default for NetworkInitializationConfig { /// should be used in a background task to keep it up-to-date. #[derive(Clone)] pub(super) struct Network { - peers: Peers, + peers: Peers, client: HttpClient, subnetwork: Subnetwork, filter_clients: Vec, - enr_offer_limit: usize, } impl Network { @@ -86,11 +88,13 @@ impl Network { } Self { - peers: Peers::new(), + peers: Peers::new(PeerSelector::new( + AdditiveWeight::default(), + bridge_config.enr_offer_limit, + )), client, subnetwork, filter_clients: bridge_config.filter_clients.to_vec(), - enr_offer_limit: bridge_config.enr_offer_limit, } } @@ -98,8 +102,8 @@ impl Network { NetworkManager::new(self.clone()) } - /// Look up interested enrs for a given content id - pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result, CensusError> { + /// Selects peers to receive content. + pub fn select_peers(&self, content_id: &[u8; 32]) -> Result, CensusError> { if self.peers.is_empty() { error!( subnetwork = %self.subnetwork, @@ -107,9 +111,7 @@ impl Network { ); return Err(CensusError::NoPeers); } - Ok(self - .peers - .get_interested_enrs(content_id, self.enr_offer_limit)) + Ok(self.peers.select_peers(content_id)) } /// Records the status of the most recent `Offer` request to one of the peers. diff --git a/portal-bridge/src/census/peer.rs b/portal-bridge/src/census/peer.rs index b7a82752e..a7015a57d 100644 --- a/portal-bridge/src/census/peer.rs +++ b/portal-bridge/src/census/peer.rs @@ -9,18 +9,18 @@ use tracing::error; #[derive(Debug, Clone)] pub struct LivenessCheck { - success: bool, - #[allow(dead_code)] - timestamp: Instant, + pub success: bool, + pub timestamp: Instant, } -#[allow(dead_code)] #[derive(Debug, Clone)] pub struct OfferEvent { - success: bool, - timestamp: Instant, - content_value_size: usize, - duration: Duration, + pub success: bool, + pub timestamp: Instant, + #[allow(dead_code)] + pub content_value_size: usize, + #[allow(dead_code)] + pub duration: Duration, } #[derive(Debug)] @@ -57,17 +57,8 @@ impl Peer { self.enr.clone() } - /// Returns true if latest liveness check was successful and content is within radius. + /// Returns true if content is within radius. pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool { - // check that most recent liveness check was successful - if !self - .liveness_checks - .front() - .is_some_and(|liveness_check| liveness_check.success) - { - return false; - } - let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id); distance <= self.radius } @@ -138,4 +129,12 @@ impl Peer { self.offer_events.drain(Self::MAX_OFFER_EVENTS..); } } + + pub fn iter_liveness_checks(&self) -> impl Iterator { + self.liveness_checks.iter() + } + + pub fn iter_offer_events(&self) -> impl Iterator { + self.offer_events.iter() + } } diff --git a/portal-bridge/src/census/peers.rs b/portal-bridge/src/census/peers.rs index 392bef291..971ce3965 100644 --- a/portal-bridge/src/census/peers.rs +++ b/portal-bridge/src/census/peers.rs @@ -13,11 +13,13 @@ use ethportal_api::{ Enr, }; use futures::Stream; -use rand::seq::IteratorRandom; use tokio::time::Instant; use tracing::error; -use super::peer::Peer; +use super::{ + peer::Peer, + scoring::{PeerSelector, Weight}, +}; /// How frequently liveness check should be done. /// @@ -40,23 +42,19 @@ struct PeersWithLivenessChecks { /// It provides thread safe access to peers and is responsible for deciding when they should be /// pinged for liveness. #[derive(Clone, Debug)] -pub(super) struct Peers { +pub(super) struct Peers { peers: Arc>, + selector: PeerSelector, } -impl Default for Peers { - fn default() -> Self { - Self::new() - } -} - -impl Peers { - pub fn new() -> Self { +impl Peers { + pub fn new(selector: PeerSelector) -> Self { Self { peers: Arc::new(RwLock::new(PeersWithLivenessChecks { peers: HashMap::new(), liveness_checks: HashSetDelay::new(LIVENESS_CHECK_DELAY), })), + selector, } } @@ -121,14 +119,10 @@ impl Peers { } } - /// Selects random `limit` peers that should be interested in content. - pub fn get_interested_enrs(&self, content_id: &[u8; 32], limit: usize) -> Vec { - self.read() - .peers - .values() - .filter(|peer| peer.is_interested_in_content(content_id)) - .map(Peer::enr) - .choose_multiple(&mut rand::thread_rng(), limit) + /// Selects peers to receive content. + pub fn select_peers(&self, content_id: &[u8; 32]) -> Vec { + self.selector + .select_peers(content_id, self.read().peers.values()) } fn read(&self) -> RwLockReadGuard<'_, PeersWithLivenessChecks> { @@ -140,7 +134,7 @@ impl Peers { } } -impl Stream for Peers { +impl Stream for Peers { type Item = Enr; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/portal-bridge/src/census/scoring.rs b/portal-bridge/src/census/scoring.rs new file mode 100644 index 000000000..124d769a6 --- /dev/null +++ b/portal-bridge/src/census/scoring.rs @@ -0,0 +1,115 @@ +use std::time::Duration; + +use ethportal_api::Enr; +use itertools::Itertools; +use rand::{seq::SliceRandom, thread_rng}; + +use super::peer::Peer; + +/// A trait for calculating peer's weight. +pub trait Weight: Send + Sync { + fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32; + + fn weight_all<'a>( + &self, + content_id: &[u8; 32], + peers: impl IntoIterator, + ) -> impl Iterator { + peers + .into_iter() + .map(|peer| (peer, self.weight(content_id, peer))) + } +} + +/// Calculates peer's weight by adding/subtracting weights of recent events. +/// +/// Weight is calculated using following rules: +/// 1. If peer is not interested in content, `0` is returned +/// 2. Weight's starting value is `starting_weight` +/// 3. All recent events (based on `timeframe` parameter) are scored separately: +/// - successful events increase weight by `success_weight` +/// - failed events decrease weight by `failure_weight` +/// 4. Final weight is restricted to `[0, maximum_weight]` range. +#[derive(Debug, Clone)] +pub struct AdditiveWeight { + pub timeframe: Duration, + pub starting_weight: u32, + pub maximum_weight: u32, + pub success_weight: i32, + pub failure_weight: i32, +} + +impl Default for AdditiveWeight { + fn default() -> Self { + Self { + timeframe: Duration::from_secs(15 * 60), // 15 min + starting_weight: 200, + maximum_weight: 400, + success_weight: 5, + failure_weight: -10, + } + } +} + +impl Weight for AdditiveWeight { + fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32 { + if !peer.is_interested_in_content(content_id) { + return 0; + } + let weight = self.starting_weight as i32 + + Iterator::chain( + peer.iter_liveness_checks() + .map(|liveness_check| (liveness_check.success, liveness_check.timestamp)), + peer.iter_offer_events() + .map(|offer_event| (offer_event.success, offer_event.timestamp)), + ) + .map(|(success, timestamp)| { + if timestamp.elapsed() > self.timeframe { + return 0; + } + if success { + self.success_weight + } else { + self.failure_weight + } + }) + .sum::(); + weight.clamp(0, self.maximum_weight as i32) as u32 + } +} + +/// Selects peers based on their weight provided by [Weight] trait. +/// +/// Selection is done using [SliceRandom::choose_multiple_weighted]. Peers are ranked so that +/// probability of peer A being ranked higher than peer B is proportional to their weights. +/// The top ranked peers are then selected and returned. +#[derive(Debug, Clone)] +pub struct PeerSelector { + weight: W, + /// The maximum number of peers to select + limit: usize, +} + +impl PeerSelector { + pub fn new(rank: W, limit: usize) -> Self { + Self { + weight: rank, + limit, + } + } + + /// Selects up to `self.limit` peers based on their weights. + pub fn select_peers<'a>( + &self, + content_id: &[u8; 32], + peers: impl IntoIterator, + ) -> Vec { + let weighted_peers = self.weight.weight_all(content_id, peers).collect_vec(); + + weighted_peers + .choose_multiple_weighted(&mut thread_rng(), self.limit, |(_peer, weight)| *weight) + .expect("choosing random sample shouldn't fail") + .map(|(peer, _weight)| peer.enr()) + .collect() + } +}