Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(census): keep track of liveness checks #1575

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::cli::BridgeConfig;
use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManager};

mod network;
mod peer;
mod peers;

/// The error that occured in [Census].
Expand All @@ -32,6 +33,7 @@ pub const ENR_OFFER_LIMIT: usize = 4;
/// The census is responsible for maintaining a list of known peers in the network,
/// checking their liveness, updating their data radius, iterating through their
/// rfn to find new peers, and providing interested enrs for a given content id.
#[derive(Clone)]
pub struct Census {
history: Network,
state: Network,
Expand Down
86 changes: 38 additions & 48 deletions portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ use crate::{

use super::peers::Peers;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// The result of the liveness check.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LivenessResult {
/// We pinged the peer successfully
Pass,
/// We failed to ping peer
Fail,
/// Peer is already registered and not expired (we didn't try to ping the peer)
AlreadyRegistered,
/// Peer is already known and doesn't need liveness check
Fresh,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Network {
NetworkManager::new(self.clone())
}

// Look up all known interested enrs for a given content id
/// Look up interested enrs for a given content id
pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
if self.peers.is_empty() {
error!(
Expand Down Expand Up @@ -174,8 +174,8 @@ impl Network {
.collect_vec();

// Concurrent execution of liveness check
let new_peers = enrs
.iter()
let starting_peers_count = self.peers.len();
enrs.iter()
.map(|enr| async {
if let Ok(_permit) = semaphore.acquire().await {
self.liveness_check(enr.clone()).await
Expand All @@ -188,12 +188,10 @@ impl Network {
}
})
.collect::<JoinAll<_>>()
.await
.into_iter()
.filter(|liveness_result| liveness_result == &LivenessResult::Pass)
.count();
.await;

let total_peers = self.peers.len();
let new_peers = total_peers - starting_peers_count;

debug!(
subnetwork = %self.subnetwork,
Expand Down Expand Up @@ -225,13 +223,15 @@ impl Network {
/// Performs liveness check.
///
/// Liveness check will pass if peer respond to a Ping request. It returns
/// `LivenessResult::AlreadyRegistered` if peer is already registered and not expired.
/// `LivenessResult::Fresh` if peer is already known and doesn't need liveness check.
async fn liveness_check(&self, enr: Enr) -> LivenessResult {
// if enr is already registered, check if delay map deadline has expired
if let Some(deadline) = self.peers.deadline(&enr) {
if Instant::now() < deadline {
return LivenessResult::AlreadyRegistered;
}
// check if peer needs liveness check
if self
.peers
.next_liveness_check(&enr)
.is_some_and(|next_liveness_check| Instant::now() < next_liveness_check)
{
return LivenessResult::Fresh;
}

let Ok(pong_info) = self.ping(&enr).await else {
Expand Down Expand Up @@ -259,7 +259,7 @@ impl Network {
enr
};

self.peers.record_successful_liveness_check(enr, radius);
self.peers.record_successful_liveness_check(&enr, radius);
LivenessResult::Pass
}

Expand Down Expand Up @@ -367,29 +367,21 @@ impl NetworkManager {

/// Returns next action that should be executed.
pub async fn next_action(&mut self) -> NetworkAction {
loop {
tokio::select! {
_ = self.peer_discovery_interval.tick() => {
return NetworkAction::PeerDiscovery;
}
peer = self.network.peers.next() => {
match peer {
Some(Ok(enr)) => {
return NetworkAction::LivenessCheck(enr);
}
Some(Err(err)) => {
error!(
subnetwork = %self.network.subnetwork,
"next-action: error getting peer - err: {err}",
);
}
None => {
warn!(
subnetwork = %self.network.subnetwork,
"next-action: no pending peers - re-initializing",
);
return NetworkAction::ReInitialization;
}
tokio::select! {
_ = self.peer_discovery_interval.tick() => {
NetworkAction::PeerDiscovery
}
peer = self.network.peers.next() => {
match peer {
Some(enr) => {
NetworkAction::LivenessCheck(enr)
}
None => {
warn!(
subnetwork = %self.network.subnetwork,
"next-action: no pending peers - re-initializing",
);
NetworkAction::ReInitialization
}
}
}
Expand All @@ -415,7 +407,7 @@ impl NetworkManager {
}
NetworkAction::PeerDiscovery => self.peer_discovery().await,
NetworkAction::LivenessCheck(enr) => {
if self.network.liveness_check(enr).await == LivenessResult::AlreadyRegistered {
if self.network.liveness_check(enr).await == LivenessResult::Fresh {
warn!(
subnetwork = %self.network.subnetwork,
"execute-action: liveness check on already registered peer",
Expand All @@ -438,17 +430,15 @@ impl NetworkManager {
}
};

let mut new_peers = 0;
let starting_peers = self.network.peers.len();
for enr in enrs {
if self.network.liveness_check(enr).await == LivenessResult::Pass {
new_peers += 1;
}
self.network.liveness_check(enr).await;
}

let total_peers = self.network.peers.len();
let ending_peers = self.network.peers.len();
let new_peers = ending_peers - starting_peers;
Copy link
Member

@KolbyML KolbyML Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming the possibility that new_peers can be negative is a feature to display we lost peers? Because isn't there a case where ending_peers can be less then starting_peers i.e. if more peers were removed then added?

This doesn't really look like an issue, just wanted to point it out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it and concluded that it shouldn't happen. Unless there is some weird race-condition and/or Census is performing very badly.

But, you are right, it's technically possible. It's also possible that we change logic in the future and forget to update and this would panic (or difference no longer means that these are added).

So I updated the code to not panic and changed the message.

info!(
subnetwork = %self.network.subnetwork,
"peer-discovery: finished - discovered {new_peers} / {total_peers} peers",
"peer-discovery: finished - discovered {new_peers} / {ending_peers} peers",
);
}
}
109 changes: 109 additions & 0 deletions portal-bridge/src/census/peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::{
collections::VecDeque,
time::{Duration, Instant},
};

use discv5::Enr;
use ethportal_api::types::distance::{Distance, Metric, XorMetric};
use tracing::error;

#[derive(Debug, Clone)]
pub struct LivenessCheck {
success: bool,
#[allow(dead_code)]
timestamp: Instant,
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct OfferEvent {
success: bool,
timestamp: Instant,
content_value_size: usize,
duration: Duration,
}

#[derive(Debug)]
/// Stores information about peer and its most recent interactions.
pub struct Peer {
enr: Enr,
radius: Distance,
/// Liveness checks, ordered from most recent (index `0`), to the earliest.
///
/// Contains at most [Self::MAX_LIVENESS_CHECKS] entries.
liveness_checks: VecDeque<LivenessCheck>,
}

impl Peer {
/// The maximum number of liveness checks that we store. Value chosen arbitrarily.
const MAX_LIVENESS_CHECKS: usize = 10;

pub fn new(enr: Enr) -> Self {
Self {
enr,
radius: Distance::ZERO,
liveness_checks: VecDeque::with_capacity(Self::MAX_LIVENESS_CHECKS + 1),
}
}

pub fn enr(&self) -> Enr {
self.enr.clone()
}

/// Returns true if latest liveness check was successful and content is within radius.
pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool {
// check that most recent liveness check was successful
if !self
.liveness_checks
.front()
.is_some_and(|liveness_check| liveness_check.success)
{
return false;
}

let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id);
distance <= self.radius
}

/// Returns true if all latest [Self::MAX_LIVENESS_CHECKS] liveness checks failed.
pub fn is_obsolete(&self) -> bool {
if self.liveness_checks.len() < Self::MAX_LIVENESS_CHECKS {
return false;
}
self.liveness_checks
.iter()
.all(|liveness_check| !liveness_check.success)
}

pub fn record_successful_liveness_check(&mut self, enr: &Enr, radius: Distance) {
if self.enr.seq() > enr.seq() {
error!(
"successful_liveness_check: received outdated enr: {enr} (existing enr: {})",
self.enr.seq()
);
} else {
self.enr = enr.clone();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else {
self.enr = enr.clone();
}
} else if self.enr.seq() < enr.seq() {
self.enr = enr.clone();
}

it probably doesn't matter, but wouldn't we only need to do the clone if a updated enr was given, which should be fairly infrequent so we might as well avoid the clone

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed function signature so it accepts Enr, in which case we never clone it.

self.radius = radius;
self.liveness_checks.push_front(LivenessCheck {
success: true,
timestamp: Instant::now(),
});
self.purge();
}

pub fn record_failed_liveness_check(&mut self) {
self.liveness_checks.push_front(LivenessCheck {
success: false,
timestamp: Instant::now(),
});
self.purge();
}

/// Removes oldest liveness checks and offer events, if we exceeded capacity.
fn purge(&mut self) {
if self.liveness_checks.len() > Self::MAX_LIVENESS_CHECKS {
self.liveness_checks.drain(Self::MAX_LIVENESS_CHECKS..);
}
}
}
Loading
Loading