From 9540bddc0191d93392dcdbe05b4328fc6e736d76 Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Fri, 19 Jul 2024 09:36:08 -0400 Subject: [PATCH] test: gossip propagation (#1342) * test: add test for gossip propagation * feat: return error for unpopulated offer if content not available locally --- .../src/scenarios/offer_accept.rs | 119 +++++++++++++++++- portalnet/src/overlay/protocol.rs | 26 ++++ rpc/src/beacon_rpc.rs | 3 + rpc/src/history_rpc.rs | 3 + rpc/src/state_rpc.rs | 3 + tests/self_peertest.rs | 21 ++++ 6 files changed, 172 insertions(+), 3 deletions(-) diff --git a/ethportal-peertest/src/scenarios/offer_accept.rs b/ethportal-peertest/src/scenarios/offer_accept.rs index 578fc2dbc..fe52aa240 100644 --- a/ethportal-peertest/src/scenarios/offer_accept.rs +++ b/ethportal-peertest/src/scenarios/offer_accept.rs @@ -1,5 +1,9 @@ -use std::str::FromStr; +use std::{ + net::{IpAddr, Ipv4Addr}, + str::FromStr, +}; +use tokio::time::{sleep, Duration}; use tracing::info; use crate::{ @@ -7,8 +11,10 @@ use crate::{ Peertest, }; use ethportal_api::{ - jsonrpsee::async_client::Client, types::enr::Enr, utils::bytes::hex_encode, - HistoryNetworkApiClient, + jsonrpsee::async_client::Client, + types::{cli::TrinConfig, enr::Enr}, + utils::bytes::hex_encode, + Discv5ApiClient, HistoryNetworkApiClient, }; pub async fn test_unpopulated_offer(peertest: &Peertest, target: &Client) { @@ -45,6 +51,32 @@ pub async fn test_unpopulated_offer(peertest: &Peertest, target: &Client) { ); } +pub async fn test_unpopulated_offer_fails_with_missing_content( + peertest: &Peertest, + target: &Client, +) { + info!("Testing Unpopulated OFFER/ACCEPT flow with missing content"); + + let (content_key, _content_value) = fixture_header_with_proof(); + + // validate that unpopulated offer fails if content not available locally + match target + .offer( + Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), + content_key.clone(), + None, + ) + .await + { + Ok(_) => panic!("Unpopulated offer should have failed"), + Err(e) => { + assert!(e + .to_string() + .contains("Content key not found in local store")); + } + } +} + pub async fn test_populated_offer(peertest: &Peertest, target: &Client) { info!("Testing Populated Offer/ACCEPT flow"); @@ -69,3 +101,84 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) { "The received content {received_content_value:?}, must match the expected {content_value:?}", ); } + +pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client) { + info!("Testing populated offer propagates gossip"); + + // connect target to network + let _ = target.ping(peertest.bootnode.enr.clone()).await.unwrap(); + + // Spin up a fresh client, not connected to existing peertest + let test_ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + // Use an uncommon port for the peertest to avoid clashes. + let test_discovery_port = 8889; + let external_addr = format!("{test_ip_addr}:{test_discovery_port}"); + let fresh_ipc_path = format!("/tmp/trin-jsonrpc-{test_discovery_port}.ipc"); + let trin_config = TrinConfig::new_from( + [ + "trin", + "--portal-subnetworks", + "history", + "--external-address", + external_addr.as_str(), + "--mb", + "10", + "--web3-ipc-path", + fresh_ipc_path.as_str(), + "--ephemeral", + "--discovery-port", + test_discovery_port.to_string().as_ref(), + "--bootnodes", + "none", + ] + .iter(), + ) + .unwrap(); + let _test_client_rpc_handle = trin::run_trin(trin_config).await.unwrap(); + let fresh_target = reth_ipc::client::IpcClientBuilder::default() + .build(fresh_ipc_path) + .await + .unwrap(); + let fresh_enr = fresh_target.node_info().await.unwrap().enr; + + // connect target to network + let _ = target.ping(fresh_enr.clone()).await.unwrap(); + + let (content_key, content_value) = fixture_header_with_proof(); + // use populated offer which means content will *not* be stored in the target's local db + target + .offer( + fresh_enr.clone(), + content_key.clone(), + Some(content_value.clone()), + ) + .await + .unwrap(); + + // sleep to let gossip propagate + sleep(Duration::from_secs(1)).await; + + // validate that every node in the network now has a local copy of the header + assert!( + HistoryNetworkApiClient::local_content(target, content_key.clone()) + .await + .is_ok() + ); + assert!( + HistoryNetworkApiClient::local_content(&fresh_target, content_key.clone()) + .await + .is_ok() + ); + assert!(HistoryNetworkApiClient::local_content( + &peertest.nodes[0].ipc_client, + content_key.clone() + ) + .await + .is_ok()); + assert!(HistoryNetworkApiClient::local_content( + &peertest.bootnode.ipc_client, + content_key.clone() + ) + .await + .is_ok()); +} diff --git a/portalnet/src/overlay/protocol.rs b/portalnet/src/overlay/protocol.rs index 20edd1620..993e3c7f6 100644 --- a/portalnet/src/overlay/protocol.rs +++ b/portalnet/src/overlay/protocol.rs @@ -531,6 +531,32 @@ where destination: enr.clone(), }; + // Validate that the content keys are available in the local store, before sending the + // offer + for content_key in content_keys.into_iter() { + let content_key = TContentKey::try_from(content_key.clone()).map_err(|err| { + OverlayRequestError::ContentNotFound { + message: format!( + "Error decoding content key for content key: {content_key:02X?} - {err}" + ), + utp: false, + trace: None, + } + })?; + match self.store.read().get(&content_key) { + Ok(Some(_)) => {} + _ => { + return Err(OverlayRequestError::ContentNotFound { + message: format!( + "Content key not found in local store: {content_key:02X?}" + ), + utp: false, + trace: None, + }); + } + } + } + // Send the request and wait on the response. match self .send_overlay_request(Request::Offer(request), direction) diff --git a/rpc/src/beacon_rpc.rs b/rpc/src/beacon_rpc.rs index 3bf567631..6aec3797f 100644 --- a/rpc/src/beacon_rpc.rs +++ b/rpc/src/beacon_rpc.rs @@ -230,6 +230,9 @@ impl BeaconNetworkApiServer for BeaconNetworkApi { } /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. + /// If the content value is provided, a "populated" offer is used, which will not store the + /// content locally. Otherwise a regular offer is sent, after validating that the content is + /// available locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( diff --git a/rpc/src/history_rpc.rs b/rpc/src/history_rpc.rs index a99086c87..bd4082821 100644 --- a/rpc/src/history_rpc.rs +++ b/rpc/src/history_rpc.rs @@ -170,6 +170,9 @@ impl HistoryNetworkApiServer for HistoryNetworkApi { } /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. + /// If the content value is provided, a "populated" offer is used, which will not store the + /// content locally. Otherwise a regular offer is sent, after validating that the content is + /// available locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( diff --git a/rpc/src/state_rpc.rs b/rpc/src/state_rpc.rs index bf0de771b..9e0e60368 100644 --- a/rpc/src/state_rpc.rs +++ b/rpc/src/state_rpc.rs @@ -190,6 +190,9 @@ impl StateNetworkApiServer for StateNetworkApi { } /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. + /// If the content value is provided, a "populated" offer is used, which will not store the + /// content locally. Otherwise a regular offer is sent, after validating that the content is + /// available locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( diff --git a/tests/self_peertest.rs b/tests/self_peertest.rs index 3b05d45da..2081b201d 100644 --- a/tests/self_peertest.rs +++ b/tests/self_peertest.rs @@ -74,6 +74,18 @@ async fn peertest_unpopulated_offer() { handle.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn peertest_unpopulated_offer_fails_with_missing_content() { + let (peertest, target, handle) = setup_peertest("mainnet").await; + peertest::scenarios::offer_accept::test_unpopulated_offer_fails_with_missing_content( + &peertest, &target, + ) + .await; + peertest.exit_all_nodes(); + handle.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread")] #[serial] async fn peertest_gossip_with_trace() { @@ -212,6 +224,15 @@ async fn peertest_state_recursive_gossip() { handle.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn peertest_history_offer_propagates_gossip() { + let (peertest, target, handle) = setup_peertest("mainnet").await; + peertest::scenarios::offer_accept::test_offer_propagates_gossip(&peertest, &target).await; + peertest.exit_all_nodes(); + handle.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread")] #[serial] async fn peertest_ping_cross_discv5_protocol_id() {