diff --git a/Cargo.lock b/Cargo.lock index 5099318c6..2d01f8c3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8041,6 +8041,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "anyhow", + "enr", "eth2_hashing", "ethereum_ssz", "ethereum_ssz_derive", @@ -8304,6 +8305,7 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.18", "trin-utils", + "trin-validation", "utp-rs", ] diff --git a/portalnet/src/discovery.rs b/portalnet/src/discovery.rs index 8c1346dd8..44276da15 100644 --- a/portalnet/src/discovery.rs +++ b/portalnet/src/discovery.rs @@ -17,8 +17,9 @@ use discv5::{ }; use lru::LruCache; use parking_lot::RwLock; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock as TokioRwLock}; use tracing::{debug, info, warn}; +use trin_validation::oracle::HeaderOracle; use utp_rs::{cid::ConnectionPeer, udp::AsyncUdpSocket}; use super::config::PortalnetConfig; @@ -346,13 +347,73 @@ impl Discovery { } pub struct Discv5UdpSocket { - talk_reqs: mpsc::UnboundedReceiver, + talk_request_receiver: mpsc::UnboundedReceiver, discv5: Arc, + enr_cache: Arc>>, + header_oracle: Arc>, } impl Discv5UdpSocket { - pub fn new(discv5: Arc, talk_reqs: mpsc::UnboundedReceiver) -> Self { - Self { discv5, talk_reqs } + pub fn new( + discv5: Arc, + talk_request_receiver: mpsc::UnboundedReceiver, + header_oracle: Arc>, + enr_cache_capacity: usize, + ) -> Self { + let enr_cache = LruCache::new(enr_cache_capacity); + let enr_cache = Arc::new(TokioRwLock::new(enr_cache)); + Self { + discv5, + talk_request_receiver, + enr_cache, + header_oracle, + } + } + + async fn find_enr(&mut self, node_id: &NodeId) -> io::Result { + if let Some(cached_enr) = self.enr_cache.write().await.get(node_id).cloned() { + return Ok(UtpEnr(cached_enr)); + } + + if let Some(enr) = self.discv5.find_enr(node_id) { + self.enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(UtpEnr(enr)); + } + + if let Some(enr) = self.discv5.cached_node_addr(node_id) { + self.enr_cache.write().await.put(*node_id, enr.enr.clone()); + return Ok(UtpEnr(enr.enr)); + } + + let history_jsonrpc_tx = self.header_oracle.read().await.history_jsonrpc_tx(); + if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx { + if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await { + self.enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(UtpEnr(enr)); + } + } + + let state_jsonrpc_tx = self.header_oracle.read().await.state_jsonrpc_tx(); + if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx { + if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await { + self.enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(UtpEnr(enr)); + } + } + + let beacon_jsonrpc_tx = self.header_oracle.read().await.beacon_jsonrpc_tx(); + if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx { + if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await { + self.enr_cache.write().await.put(*node_id, enr.clone()); + return Ok(UtpEnr(enr)); + } + } + + debug!(node_id = %node_id, "uTP packet from unknown source"); + Err(io::Error::new( + io::ErrorKind::Other, + "ENR not found for talk req destination", + )) } } @@ -418,25 +479,10 @@ impl AsyncUdpSocket for Discv5UdpSocket { } async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, UtpEnr)> { - match self.talk_reqs.recv().await { + match self.talk_request_receiver.recv().await { Some(talk_req) => { let src_node_id = talk_req.node_id(); - let enr = match self.discv5.find_enr(src_node_id) { - Some(enr) => UtpEnr(enr), - None => { - let enr = match self.discv5.cached_node_addr(src_node_id) { - Some(node_addr) => Ok(node_addr.enr), - None => { - debug!(node_id = %src_node_id, "uTP packet from unknown source"); - Err(io::Error::new( - io::ErrorKind::Other, - "ENR not found for talk req destination", - )) - } - }?; - UtpEnr(enr) - } - }; + let enr = self.find_enr(src_node_id).await?; let packet = talk_req.body(); let n = std::cmp::min(buf.len(), packet.len()); buf[..n].copy_from_slice(&packet[..n]); diff --git a/portalnet/src/overlay/service.rs b/portalnet/src/overlay/service.rs index 9f4aa6dd7..8973e904c 100644 --- a/portalnet/src/overlay/service.rs +++ b/portalnet/src/overlay/service.rs @@ -946,12 +946,12 @@ where Ok(Content::Content(content)) } else { // Generate a connection ID for the uTP connection. - let node_addr = self.discovery.cached_node_addr(source).ok_or_else(|| { + let enr = self.find_enr(source).ok_or_else(|| { OverlayRequestError::AcceptError( "unable to find ENR for NodeId".to_string(), ) })?; - let enr = UtpEnr(node_addr.enr); + let enr = UtpEnr(enr); let cid = self.utp_controller.cid(enr, false); let cid_send = cid.send; @@ -1038,7 +1038,7 @@ where // if we're unable to find the ENR for the source node we throw an error // since the enr is required for the accept queue, and it is expected to be present - let node_addr = self.discovery.cached_node_addr(source).ok_or_else(|| { + let enr = self.find_enr(source).ok_or_else(|| { OverlayRequestError::AcceptError("unable to find ENR for NodeId".to_string()) })?; for (i, key) in content_keys.iter().enumerate() { @@ -1055,11 +1055,7 @@ where })?; if accept { // accept all keys that are successfully added to the queue - if self - .accept_queue - .write() - .add_key_to_queue(key, &node_addr.enr) - { + if self.accept_queue.write().add_key_to_queue(key, &enr) { accepted_keys.push(key.clone()); } else { accept = false; @@ -1083,10 +1079,10 @@ where // Generate a connection ID for the uTP connection if there is data we would like to // accept. - let node_addr = self.discovery.cached_node_addr(source).ok_or_else(|| { + let enr = self.find_enr(source).ok_or_else(|| { OverlayRequestError::AcceptError("unable to find ENR for NodeId".to_string()) })?; - let enr = UtpEnr(node_addr.enr); + let enr = UtpEnr(enr); let enr_str = if enabled!(Level::TRACE) { enr.0.to_base64() } else { @@ -2656,7 +2652,7 @@ mod tests { use discv5::kbucket::Entry; use rstest::*; use serial_test::serial; - use tokio::sync::mpsc::unbounded_channel; + use tokio::sync::{mpsc::unbounded_channel, RwLock as TokioRwLock}; use tokio_test::{assert_pending, assert_ready, task}; use crate::{ @@ -2674,7 +2670,7 @@ mod tests { }; use trin_metrics::portalnet::PORTALNET_METRICS; use trin_storage::{DistanceFunction, MemoryContentStore}; - use trin_validation::validator::MockValidator; + use trin_validation::{oracle::HeaderOracle, validator::MockValidator}; macro_rules! poll_command_rx { ($service:ident) => { @@ -2691,9 +2687,15 @@ mod tests { let temp_dir = setup_temp_dir().unwrap().into_path(); let discovery = Arc::new(Discovery::new(portal_config, temp_dir, MAINNET.clone()).unwrap()); + let header_oracle = HeaderOracle::default(); + let header_oracle = Arc::new(TokioRwLock::new(header_oracle)); let (_utp_talk_req_tx, utp_talk_req_rx) = unbounded_channel(); - let discv5_utp = - crate::discovery::Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx); + let discv5_utp = crate::discovery::Discv5UdpSocket::new( + Arc::clone(&discovery), + utp_talk_req_rx, + header_oracle, + 50, + ); let utp_socket = utp_rs::socket::UtpSocket::with_socket(discv5_utp); let metrics = OverlayMetricsReporter { overlay_metrics: PORTALNET_METRICS.overlay(), diff --git a/portalnet/tests/overlay.rs b/portalnet/tests/overlay.rs index 15d27921f..5d4570031 100644 --- a/portalnet/tests/overlay.rs +++ b/portalnet/tests/overlay.rs @@ -6,7 +6,7 @@ use std::{ use discv5::TalkRequest; use parking_lot::RwLock; use tokio::{ - sync::{mpsc, mpsc::unbounded_channel}, + sync::{mpsc, mpsc::unbounded_channel, RwLock as TokioRwLock}, time::{self, Duration}, }; use utp_rs::socket::UtpSocket; @@ -27,7 +27,7 @@ use portalnet::{ utils::db::setup_temp_dir, }; use trin_storage::{ContentStore, DistanceFunction, MemoryContentStore}; -use trin_validation::validator::MockValidator; +use trin_validation::{oracle::HeaderOracle, validator::MockValidator}; async fn init_overlay( discovery: Arc, @@ -39,8 +39,11 @@ async fn init_overlay( let store = MemoryContentStore::new(node_id, DistanceFunction::Xor); let store = Arc::new(RwLock::new(store)); + let header_oracle = HeaderOracle::default(); + let header_oracle = Arc::new(TokioRwLock::new(header_oracle)); let (_utp_talk_req_tx, utp_talk_req_rx) = unbounded_channel(); - let discv5_utp = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx); + let discv5_utp = + Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx, header_oracle, 50); let utp_socket = Arc::new(UtpSocket::with_socket(discv5_utp)); let validator = Arc::new(MockValidator {}); diff --git a/src/lib.rs b/src/lib.rs index 83025955c..4d475adee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,12 +3,6 @@ use std::sync::Arc; -use rpc::{launch_jsonrpc_server, RpcServerHandle}; -use tokio::sync::{mpsc, RwLock}; -use tracing::info; -use tree_hash::TreeHash; -use utp_rs::socket::UtpSocket; - #[cfg(windows)] use ethportal_api::types::cli::Web3TransportType; use ethportal_api::{ @@ -21,12 +15,17 @@ use portalnet::{ events::PortalnetEvents, utils::db::{configure_node_data_dir, configure_trin_data_dir}, }; +use rpc::{launch_jsonrpc_server, RpcServerHandle}; +use tokio::sync::{mpsc, RwLock}; +use tracing::info; +use tree_hash::TreeHash; use trin_beacon::initialize_beacon_network; use trin_history::initialize_history_network; use trin_state::initialize_state_network; use trin_storage::PortalStorageConfig; use trin_utils::version::get_trin_version; use trin_validation::oracle::HeaderOracle; +use utp_rs::socket::UtpSocket; pub async fn run_trin( trin_config: TrinConfig, @@ -68,9 +67,27 @@ pub async fn run_trin( prometheus_exporter::start(addr)?; } + // Initialize validation oracle + let header_oracle = HeaderOracle::default(); + info!(hash_tree_root = %hex_encode(header_oracle.header_validator.pre_merge_acc.tree_hash_root().0),"Loaded + pre-merge accumulator."); + let header_oracle = Arc::new(RwLock::new(header_oracle)); + // Initialize and spawn uTP socket let (utp_talk_reqs_tx, utp_talk_reqs_rx) = mpsc::unbounded_channel(); - let discv5_utp_socket = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_reqs_rx); + + // Set the enr_cache_capacity to the maximum uTP limit between all active networks. This is + // a trade off between memory usage and increased searches from the networks for each Enr. + // utp_transfer_limit is 2x as it would be utp_transfer_limit for incoming and + // utp_transfer_limit for outgoing + let enr_cache_capacity = + portalnet_config.utp_transfer_limit * 2 * trin_config.portal_subnetworks.len(); + let discv5_utp_socket = Discv5UdpSocket::new( + Arc::clone(&discovery), + utp_talk_reqs_rx, + header_oracle.clone(), + enr_cache_capacity, + ); let utp_socket = UtpSocket::with_socket(discv5_utp_socket); let utp_socket = Arc::new(utp_socket); @@ -80,12 +97,6 @@ pub async fn run_trin( discovery.local_enr().node_id(), )?; - // Initialize validation oracle - let header_oracle = HeaderOracle::default(); - info!(hash_tree_root = %hex_encode(header_oracle.header_validator.pre_merge_acc.tree_hash_root().0),"Loaded - pre-merge accumulator."); - let header_oracle = Arc::new(RwLock::new(header_oracle)); - // Initialize state sub-network service and event handlers, if selected let (state_handler, state_network_task, state_event_tx, state_jsonrpc_tx, state_event_stream) = if trin_config diff --git a/trin-state/src/lib.rs b/trin-state/src/lib.rs index 66aca751d..27e1b35cb 100644 --- a/trin-state/src/lib.rs +++ b/trin-state/src/lib.rs @@ -48,6 +48,7 @@ pub async fn initialize_state_network( StateEventStream, )> { let (state_jsonrpc_tx, state_jsonrpc_rx) = mpsc::unbounded_channel::(); + header_oracle.write().await.state_jsonrpc_tx = Some(state_jsonrpc_tx.clone()); let (state_event_tx, state_event_rx) = mpsc::unbounded_channel::(); let state_network = StateNetwork::new( diff --git a/trin-validation/Cargo.toml b/trin-validation/Cargo.toml index 41b80a9c3..ba460eb80 100644 --- a/trin-validation/Cargo.toml +++ b/trin-validation/Cargo.toml @@ -13,6 +13,7 @@ authors = ["https://github.com/ethereum/trin/graphs/contributors"] [dependencies] alloy-primitives = { version = "0.7.0", features = ["ssz"] } anyhow = "1.0.68" +enr = "0.10.0" eth2_hashing = "0.2.0" ethereum_ssz = "0.5.3" ethereum_ssz_derive = "0.5.3" diff --git a/trin-validation/src/oracle.rs b/trin-validation/src/oracle.rs index d695914c6..31be2d826 100644 --- a/trin-validation/src/oracle.rs +++ b/trin-validation/src/oracle.rs @@ -1,5 +1,6 @@ use alloy_primitives::B256; use anyhow::anyhow; +use enr::NodeId; use serde_json::Value; use tokio::sync::mpsc; @@ -9,11 +10,11 @@ use ethportal_api::{ execution::header_with_proof::HeaderWithProof, history::ContentInfo, jsonrpc::{ - endpoints::HistoryEndpoint, - request::{BeaconJsonRpcRequest, HistoryJsonRpcRequest}, + endpoints::{BeaconEndpoint, HistoryEndpoint, StateEndpoint}, + request::{BeaconJsonRpcRequest, HistoryJsonRpcRequest, StateJsonRpcRequest}, }, }, - BlockHeaderKey, HistoryContentKey, HistoryContentValue, + BlockHeaderKey, Enr, HistoryContentKey, HistoryContentValue, }; /// Responsible for dispatching cross-overlay-network requests @@ -25,6 +26,7 @@ pub struct HeaderOracle { // determining which subnetworks are actually available. pub history_jsonrpc_tx: Option>, pub beacon_jsonrpc_tx: Option>, + pub state_jsonrpc_tx: Option>, pub header_validator: HeaderValidator, } @@ -40,6 +42,7 @@ impl HeaderOracle { Self { history_jsonrpc_tx: None, beacon_jsonrpc_tx: None, + state_jsonrpc_tx: None, header_validator, } } @@ -100,9 +103,80 @@ impl HeaderOracle { ) -> anyhow::Result> { match self.history_jsonrpc_tx.clone() { Some(val) => Ok(val), - None => Err(anyhow!("History subnetwork is not available")), + None => Err(anyhow!("History network is not available")), } } + + pub fn beacon_jsonrpc_tx(&self) -> anyhow::Result> { + match self.beacon_jsonrpc_tx.clone() { + Some(val) => Ok(val), + None => Err(anyhow!("Beacon network is not available")), + } + } + + pub fn state_jsonrpc_tx(&self) -> anyhow::Result> { + match self.state_jsonrpc_tx.clone() { + Some(val) => Ok(val), + None => Err(anyhow!("State network is not available")), + } + } + + pub async fn history_get_enr( + node_id: &NodeId, + history_jsonrpc_tx: mpsc::UnboundedSender, + ) -> anyhow::Result { + let endpoint = HistoryEndpoint::GetEnr(*node_id); + let (resp, mut resp_rx) = mpsc::unbounded_channel::>(); + let request = HistoryJsonRpcRequest { endpoint, resp }; + history_jsonrpc_tx.send(request)?; + + let enr_value = match resp_rx.recv().await { + Some(val) => val.map_err(|err| anyhow!("History network request error: {err:?}"))?, + None => return Err(anyhow!("No response from History network")), + }; + + let enr: Enr = serde_json::from_value(enr_value)?; + + Ok(enr) + } + + pub async fn state_get_enr( + node_id: &NodeId, + state_jsonrpc_tx: mpsc::UnboundedSender, + ) -> anyhow::Result { + let endpoint = StateEndpoint::GetEnr(*node_id); + let (resp, mut resp_rx) = mpsc::unbounded_channel::>(); + let request = StateJsonRpcRequest { endpoint, resp }; + state_jsonrpc_tx.send(request)?; + + let enr_value = match resp_rx.recv().await { + Some(val) => val.map_err(|err| anyhow!("State network request error: {err:?}"))?, + None => return Err(anyhow!("No response from State network")), + }; + + let enr: Enr = serde_json::from_value(enr_value)?; + + Ok(enr) + } + + pub async fn beacon_get_enr( + node_id: &NodeId, + beacon_jsonrpc_tx: mpsc::UnboundedSender, + ) -> anyhow::Result { + let endpoint = BeaconEndpoint::GetEnr(*node_id); + let (resp, mut resp_rx) = mpsc::unbounded_channel::>(); + let request = BeaconJsonRpcRequest { endpoint, resp }; + beacon_jsonrpc_tx.send(request)?; + + let enr_value = match resp_rx.recv().await { + Some(val) => val.map_err(|err| anyhow!("Beacon network request error: {err:?}"))?, + None => return Err(anyhow!("No response from Beacon network")), + }; + + let enr: Enr = serde_json::from_value(enr_value)?; + + Ok(enr) + } } #[cfg(test)] diff --git a/utp-testing/Cargo.toml b/utp-testing/Cargo.toml index d9bc42e26..bda6d61c1 100644 --- a/utp-testing/Cargo.toml +++ b/utp-testing/Cargo.toml @@ -21,6 +21,7 @@ rand = "0.8.4" tracing = "0.1.36" tracing-subscriber = "0.3.15" trin-utils = { path = "../trin-utils" } +trin-validation = { path="../trin-validation" } tokio = {version = "1.14.0", features = ["full"]} utp-rs = { git = "https://github.com/ethereum/utp", tag = "v0.1.0-alpha.12" } diff --git a/utp-testing/src/lib.rs b/utp-testing/src/lib.rs index e68dfdfe9..b55779286 100644 --- a/utp-testing/src/lib.rs +++ b/utp-testing/src/lib.rs @@ -29,6 +29,7 @@ use tokio::sync::{ mpsc::{self, Receiver}, RwLock, }; +use trin_validation::oracle::HeaderOracle; use utp_rs::{conn::ConnectionConfig, socket::UtpSocket}; /// uTP test app @@ -175,9 +176,15 @@ pub async fn run_test_app( let enr = discovery.local_enr(); let discovery = Arc::new(discovery); + let header_oracle = HeaderOracle::default(); + let header_oracle = Arc::new(RwLock::new(header_oracle)); let (utp_talk_req_tx, utp_talk_req_rx) = mpsc::unbounded_channel(); - let discv5_utp_socket = - portalnet::discovery::Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx); + let discv5_utp_socket = portalnet::discovery::Discv5UdpSocket::new( + Arc::clone(&discovery), + utp_talk_req_rx, + header_oracle, + 50, + ); let utp_socket = utp_rs::socket::UtpSocket::with_socket(discv5_utp_socket); let utp_socket = Arc::new(utp_socket);