Skip to content

Commit

Permalink
feat(census): add peer scoring (#1579)
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev authored Nov 2, 2024
1 parent 38e383e commit 8314365
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 61 deletions.
12 changes: 4 additions & 8 deletions portal-bridge/src/bridge/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,16 @@ impl StateBridge {
Ok(())
}

// request enrs interested in the content key from Census
fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result<Vec<Enr>> {
Ok(self
.census
.get_interested_enrs(Subnetwork::State, &content_key.content_id())?)
}

// spawn individual offer tasks of the content key for each interested enr found in Census
async fn spawn_offer_tasks(
&self,
content_key: StateContentKey,
content_value: StateContentValue,
) {
let Ok(enrs) = self.request_enrs(&content_key) else {
let Ok(enrs) = self
.census
.select_peers(Subnetwork::State, &content_key.content_id())
else {
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
return;
};
Expand Down
11 changes: 6 additions & 5 deletions portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManage
mod network;
mod peer;
mod peers;
mod scoring;

/// The error that occured in [Census].
#[derive(Error, Debug)]
Expand Down Expand Up @@ -59,16 +60,16 @@ impl Census {
}
}

/// Returns ENRs interested in provided content id.
pub fn get_interested_enrs(
/// Selects peers to receive content.
pub fn select_peers(
&self,
subnetwork: Subnetwork,
content_id: &[u8; 32],
) -> Result<Vec<Enr>, CensusError> {
match subnetwork {
Subnetwork::History => self.history.get_interested_enrs(content_id),
Subnetwork::State => self.state.get_interested_enrs(content_id),
Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id),
Subnetwork::History => self.history.select_peers(content_id),
Subnetwork::State => self.state.select_peers(content_id),
Subnetwork::Beacon => self.beacon.select_peers(content_id),
_ => Err(CensusError::UnsupportedSubnetwork(subnetwork)),
}
}
Expand Down
22 changes: 12 additions & 10 deletions portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use crate::{
cli::{BridgeConfig, ClientType},
};

use super::peers::Peers;
use super::{
peers::Peers,
scoring::{AdditiveWeight, PeerSelector},
};

/// The result of the liveness check.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -69,11 +72,10 @@ impl Default for NetworkInitializationConfig {
/// should be used in a background task to keep it up-to-date.
#[derive(Clone)]
pub(super) struct Network {
peers: Peers,
peers: Peers<AdditiveWeight>,
client: HttpClient,
subnetwork: Subnetwork,
filter_clients: Vec<ClientType>,
enr_offer_limit: usize,
}

impl Network {
Expand All @@ -86,30 +88,30 @@ impl Network {
}

Self {
peers: Peers::new(),
peers: Peers::new(PeerSelector::new(
AdditiveWeight::default(),
bridge_config.enr_offer_limit,
)),
client,
subnetwork,
filter_clients: bridge_config.filter_clients.to_vec(),
enr_offer_limit: bridge_config.enr_offer_limit,
}
}

pub fn create_manager(&self) -> NetworkManager {
NetworkManager::new(self.clone())
}

/// Look up interested enrs for a given content id
pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
/// Selects peers to receive content.
pub fn select_peers(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
if self.peers.is_empty() {
error!(
subnetwork = %self.subnetwork,
"No known peers, unable to look up interested enrs",
);
return Err(CensusError::NoPeers);
}
Ok(self
.peers
.get_interested_enrs(content_id, self.enr_offer_limit))
Ok(self.peers.select_peers(content_id))
}

/// Records the status of the most recent `Offer` request to one of the peers.
Expand Down
35 changes: 17 additions & 18 deletions portal-bridge/src/census/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ use tracing::error;

#[derive(Debug, Clone)]
pub struct LivenessCheck {
success: bool,
#[allow(dead_code)]
timestamp: Instant,
pub success: bool,
pub timestamp: Instant,
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct OfferEvent {
success: bool,
timestamp: Instant,
content_value_size: usize,
duration: Duration,
pub success: bool,
pub timestamp: Instant,
#[allow(dead_code)]
pub content_value_size: usize,
#[allow(dead_code)]
pub duration: Duration,
}

#[derive(Debug)]
Expand Down Expand Up @@ -57,17 +57,8 @@ impl Peer {
self.enr.clone()
}

/// Returns true if latest liveness check was successful and content is within radius.
/// Returns true if content is within radius.
pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool {
// check that most recent liveness check was successful
if !self
.liveness_checks
.front()
.is_some_and(|liveness_check| liveness_check.success)
{
return false;
}

let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id);
distance <= self.radius
}
Expand Down Expand Up @@ -138,4 +129,12 @@ impl Peer {
self.offer_events.drain(Self::MAX_OFFER_EVENTS..);
}
}

pub fn iter_liveness_checks(&self) -> impl Iterator<Item = &LivenessCheck> {
self.liveness_checks.iter()
}

pub fn iter_offer_events(&self) -> impl Iterator<Item = &OfferEvent> {
self.offer_events.iter()
}
}
34 changes: 14 additions & 20 deletions portal-bridge/src/census/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ use ethportal_api::{
Enr,
};
use futures::Stream;
use rand::seq::IteratorRandom;
use tokio::time::Instant;
use tracing::error;

use super::peer::Peer;
use super::{
peer::Peer,
scoring::{PeerSelector, Weight},
};

/// How frequently liveness check should be done.
///
Expand All @@ -40,23 +42,19 @@ struct PeersWithLivenessChecks {
/// It provides thread safe access to peers and is responsible for deciding when they should be
/// pinged for liveness.
#[derive(Clone, Debug)]
pub(super) struct Peers {
pub(super) struct Peers<W: Weight> {
peers: Arc<RwLock<PeersWithLivenessChecks>>,
selector: PeerSelector<W>,
}

impl Default for Peers {
fn default() -> Self {
Self::new()
}
}

impl Peers {
pub fn new() -> Self {
impl<W: Weight> Peers<W> {
pub fn new(selector: PeerSelector<W>) -> Self {
Self {
peers: Arc::new(RwLock::new(PeersWithLivenessChecks {
peers: HashMap::new(),
liveness_checks: HashSetDelay::new(LIVENESS_CHECK_DELAY),
})),
selector,
}
}

Expand Down Expand Up @@ -121,14 +119,10 @@ impl Peers {
}
}

/// Selects random `limit` peers that should be interested in content.
pub fn get_interested_enrs(&self, content_id: &[u8; 32], limit: usize) -> Vec<Enr> {
self.read()
.peers
.values()
.filter(|peer| peer.is_interested_in_content(content_id))
.map(Peer::enr)
.choose_multiple(&mut rand::thread_rng(), limit)
/// Selects peers to receive content.
pub fn select_peers(&self, content_id: &[u8; 32]) -> Vec<Enr> {
self.selector
.select_peers(content_id, self.read().peers.values())
}

fn read(&self) -> RwLockReadGuard<'_, PeersWithLivenessChecks> {
Expand All @@ -140,7 +134,7 @@ impl Peers {
}
}

impl Stream for Peers {
impl<W: Weight> Stream for Peers<W> {
type Item = Enr;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
115 changes: 115 additions & 0 deletions portal-bridge/src/census/scoring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::time::Duration;

use ethportal_api::Enr;
use itertools::Itertools;
use rand::{seq::SliceRandom, thread_rng};

use super::peer::Peer;

/// A trait for calculating peer's weight.
pub trait Weight: Send + Sync {
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32;

fn weight_all<'a>(
&self,
content_id: &[u8; 32],
peers: impl IntoIterator<Item = &'a Peer>,
) -> impl Iterator<Item = (&'a Peer, u32)> {
peers
.into_iter()
.map(|peer| (peer, self.weight(content_id, peer)))
}
}

/// Calculates peer's weight by adding/subtracting weights of recent events.
///
/// Weight is calculated using following rules:
/// 1. If peer is not interested in content, `0` is returned
/// 2. Weight's starting value is `starting_weight`
/// 3. All recent events (based on `timeframe` parameter) are scored separately:
/// - successful events increase weight by `success_weight`
/// - failed events decrease weight by `failure_weight`
/// 4. Final weight is restricted to `[0, maximum_weight]` range.
#[derive(Debug, Clone)]
pub struct AdditiveWeight {
pub timeframe: Duration,
pub starting_weight: u32,
pub maximum_weight: u32,
pub success_weight: i32,
pub failure_weight: i32,
}

impl Default for AdditiveWeight {
fn default() -> Self {
Self {
timeframe: Duration::from_secs(15 * 60), // 15 min
starting_weight: 200,
maximum_weight: 400,
success_weight: 5,
failure_weight: -10,
}
}
}

impl Weight for AdditiveWeight {
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32 {
if !peer.is_interested_in_content(content_id) {
return 0;
}
let weight = self.starting_weight as i32
+ Iterator::chain(
peer.iter_liveness_checks()
.map(|liveness_check| (liveness_check.success, liveness_check.timestamp)),
peer.iter_offer_events()
.map(|offer_event| (offer_event.success, offer_event.timestamp)),
)
.map(|(success, timestamp)| {
if timestamp.elapsed() > self.timeframe {
return 0;
}
if success {
self.success_weight
} else {
self.failure_weight
}
})
.sum::<i32>();
weight.clamp(0, self.maximum_weight as i32) as u32
}
}

/// Selects peers based on their weight provided by [Weight] trait.
///
/// Selection is done using [SliceRandom::choose_multiple_weighted]. Peers are ranked so that
/// probability of peer A being ranked higher than peer B is proportional to their weights.
/// The top ranked peers are then selected and returned.
#[derive(Debug, Clone)]
pub struct PeerSelector<W: Weight> {
weight: W,
/// The maximum number of peers to select
limit: usize,
}

impl<W: Weight> PeerSelector<W> {
pub fn new(rank: W, limit: usize) -> Self {
Self {
weight: rank,
limit,
}
}

/// Selects up to `self.limit` peers based on their weights.
pub fn select_peers<'a>(
&self,
content_id: &[u8; 32],
peers: impl IntoIterator<Item = &'a Peer>,
) -> Vec<Enr> {
let weighted_peers = self.weight.weight_all(content_id, peers).collect_vec();

weighted_peers
.choose_multiple_weighted(&mut thread_rng(), self.limit, |(_peer, weight)| *weight)
.expect("choosing random sample shouldn't fail")
.map(|(peer, _weight)| peer.enr())
.collect()
}
}

0 comments on commit 8314365

Please sign in to comment.