Skip to content

Commit

Permalink
feat(census): keep track of offer requests and their status
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev committed Nov 1, 2024
1 parent 40f5b7b commit e3d987e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 8 deletions.
22 changes: 19 additions & 3 deletions portal-bridge/src/bridge/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::{Arc, Mutex};
use std::{
sync::{Arc, Mutex},
time::Instant,
};

use alloy::rlp::Decodable;
use eth_trie::{decode_node, node::Node, RootWithTrieDiff};
Expand Down Expand Up @@ -309,24 +312,29 @@ impl StateBridge {
content_key.clone(),
enrs.len(),
)));
let encoded_content_value = content_value.encode();
for enr in enrs.clone() {
let permit = self.acquire_offer_permit().await;
let census = self.census.clone();
let portal_client = self.portal_client.clone();
let content_key = content_key.clone();
let content_value = content_value.clone();
let encoded_content_value = encoded_content_value.clone();
let offer_report = offer_report.clone();
let metrics = self.metrics.clone();
let global_offer_report = self.global_offer_report.clone();
tokio::spawn(async move {
let timer = metrics.start_process_timer("spawn_offer_state_proof");

let start_time = Instant::now();
let content_value_size = encoded_content_value.len();

let result = timeout(
SERVE_BLOCK_TIMEOUT,
StateNetworkApiClient::trace_offer(
&portal_client,
enr.clone(),
content_key.clone(),
content_value.encode(),
encoded_content_value,
),
)
.await;
Expand All @@ -348,6 +356,14 @@ impl StateBridge {
}
};

census.record_offer_result(
Subnetwork::State,
enr.node_id(),
content_value_size,
start_time.elapsed(),
offer_trace,
);

// Update report and metrics
global_offer_report
.lock()
Expand Down
31 changes: 28 additions & 3 deletions portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::collections::HashSet;

use ethportal_api::{jsonrpsee::http_client::HttpClient, types::network::Subnetwork, Enr};
use std::{collections::HashSet, time::Duration};

use discv5::enr::NodeId;
use ethportal_api::{
jsonrpsee::http_client::HttpClient,
types::{network::Subnetwork, portal_wire::OfferTrace},
Enr,
};
use thiserror::Error;
use tokio::task::JoinHandle;
use tracing::{error, info, Instrument};
Expand Down Expand Up @@ -68,6 +73,26 @@ impl Census {
}
}

pub fn record_offer_result(
&self,
subnetwork: Subnetwork,
node_id: NodeId,
content_value_size: usize,
duration: Duration,
offer_trace: &OfferTrace,
) {
let network = match subnetwork {
Subnetwork::History => &self.history,
Subnetwork::State => &self.state,
Subnetwork::Beacon => &self.beacon,
_ => {
error!("record_offer_result: subnetwork {subnetwork} is not supported");
return;
}
};
network.record_offer_result(node_id, content_value_size, duration, offer_trace);
}

/// Initialize subnetworks and starts background service that will keep our view of the network
/// up to date.
///
Expand Down
14 changes: 13 additions & 1 deletion portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use discv5::enr::NodeId;
use ethportal_api::{
generate_random_node_ids,
jsonrpsee::http_client::HttpClient,
types::{distance::Distance, network::Subnetwork, portal::PongInfo},
types::{distance::Distance, network::Subnetwork, portal::PongInfo, portal_wire::OfferTrace},
BeaconNetworkApiClient, Enr, HistoryNetworkApiClient, StateNetworkApiClient,
};
use futures::{future::JoinAll, StreamExt};
Expand Down Expand Up @@ -112,6 +112,18 @@ impl Network {
.get_interested_enrs(content_id, self.enr_offer_limit))
}

/// Records the status of the most recent `Offer` request to one of the peers.
pub fn record_offer_result(
&self,
node_id: NodeId,
content_value_size: usize,
duration: Duration,
offer_trace: &OfferTrace,
) {
self.peers
.record_offer_result(node_id, content_value_size, duration, offer_trace);
}

/// Returns whether `enr` represents eligible peer.
///
/// Currently this only filters out peers based on client type (using `filter_client` field).
Expand Down
25 changes: 25 additions & 0 deletions portal-bridge/src/census/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,24 @@ pub struct Peer {
///
/// Contains at most [Self::MAX_LIVENESS_CHECKS] entries.
liveness_checks: VecDeque<LivenessCheck>,
/// Offer events, ordered from most recent (index `0`), to the earliest.
///
/// Contains at most [Self::MAX_OFFER_EVENTS] entries.
offer_events: VecDeque<OfferEvent>,
}

impl Peer {
/// The maximum number of liveness checks that we store. Value chosen arbitrarily.
const MAX_LIVENESS_CHECKS: usize = 10;
/// The maximum number of events that we store. Value chosen arbitrarily.
const MAX_OFFER_EVENTS: usize = 255;

pub fn new(enr: Enr) -> Self {
Self {
enr,
radius: Distance::ZERO,
liveness_checks: VecDeque::with_capacity(Self::MAX_LIVENESS_CHECKS + 1),
offer_events: VecDeque::with_capacity(Self::MAX_OFFER_EVENTS + 1),
}
}

Expand Down Expand Up @@ -107,10 +114,28 @@ impl Peer {
self.purge();
}

pub fn record_offer_result(
&mut self,
success: bool,
content_value_size: usize,
duration: Duration,
) {
self.offer_events.push_front(OfferEvent {
success,
timestamp: Instant::now(),
content_value_size,
duration,
});
self.purge();
}

/// Removes oldest liveness checks and offer events, if we exceeded capacity.
fn purge(&mut self) {
if self.liveness_checks.len() > Self::MAX_LIVENESS_CHECKS {
self.liveness_checks.drain(Self::MAX_LIVENESS_CHECKS..);
}
if self.offer_events.len() > Self::MAX_OFFER_EVENTS {
self.offer_events.drain(Self::MAX_OFFER_EVENTS..);
}
}
}
22 changes: 21 additions & 1 deletion portal-bridge/src/census/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use std::{

use delay_map::HashSetDelay;
use discv5::enr::NodeId;
use ethportal_api::{types::distance::Distance, Enr};
use ethportal_api::{
types::{distance::Distance, portal_wire::OfferTrace},
Enr,
};
use futures::Stream;
use rand::seq::IteratorRandom;
use tokio::time::Instant;
Expand Down Expand Up @@ -101,6 +104,23 @@ impl Peers {
}
}

pub fn record_offer_result(
&self,
node_id: NodeId,
content_value_size: usize,
duration: Duration,
offer_trace: &OfferTrace,
) {
let success = match offer_trace {
OfferTrace::Success(_) | OfferTrace::Declined => true,
OfferTrace::Failed => false,
};
match self.write().peers.get_mut(&node_id) {
Some(peer) => peer.record_offer_result(success, content_value_size, duration),
None => error!("record_offer_result: unknown peer: {node_id}"),
}
}

/// Selects random `limit` peers that should be interested in content.
pub fn get_interested_enrs(&self, content_id: &[u8; 32], limit: usize) -> Vec<Enr> {
self.read()
Expand Down

0 comments on commit e3d987e

Please sign in to comment.