Skip to content

Commit

Permalink
test: gossip propagation (#1342)
Browse files Browse the repository at this point in the history
* test: add test for gossip propagation

* feat: return error for unpopulated offer if content not available locally
  • Loading branch information
njgheorghita authored Jul 19, 2024
1 parent 5f32eb9 commit 9540bdd
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 3 deletions.
119 changes: 116 additions & 3 deletions ethportal-peertest/src/scenarios/offer_accept.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use std::str::FromStr;
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};

use tokio::time::{sleep, Duration};
use tracing::info;

use crate::{
utils::{fixture_header_with_proof, wait_for_history_content},
Peertest,
};
use ethportal_api::{
jsonrpsee::async_client::Client, types::enr::Enr, utils::bytes::hex_encode,
HistoryNetworkApiClient,
jsonrpsee::async_client::Client,
types::{cli::TrinConfig, enr::Enr},
utils::bytes::hex_encode,
Discv5ApiClient, HistoryNetworkApiClient,
};

pub async fn test_unpopulated_offer(peertest: &Peertest, target: &Client) {
Expand Down Expand Up @@ -45,6 +51,32 @@ pub async fn test_unpopulated_offer(peertest: &Peertest, target: &Client) {
);
}

pub async fn test_unpopulated_offer_fails_with_missing_content(
peertest: &Peertest,
target: &Client,
) {
info!("Testing Unpopulated OFFER/ACCEPT flow with missing content");

let (content_key, _content_value) = fixture_header_with_proof();

// validate that unpopulated offer fails if content not available locally
match target
.offer(
Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(),
content_key.clone(),
None,
)
.await
{
Ok(_) => panic!("Unpopulated offer should have failed"),
Err(e) => {
assert!(e
.to_string()
.contains("Content key not found in local store"));
}
}
}

pub async fn test_populated_offer(peertest: &Peertest, target: &Client) {
info!("Testing Populated Offer/ACCEPT flow");

Expand All @@ -69,3 +101,84 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) {
"The received content {received_content_value:?}, must match the expected {content_value:?}",
);
}

pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client) {
info!("Testing populated offer propagates gossip");

// connect target to network
let _ = target.ping(peertest.bootnode.enr.clone()).await.unwrap();

// Spin up a fresh client, not connected to existing peertest
let test_ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
// Use an uncommon port for the peertest to avoid clashes.
let test_discovery_port = 8889;
let external_addr = format!("{test_ip_addr}:{test_discovery_port}");
let fresh_ipc_path = format!("/tmp/trin-jsonrpc-{test_discovery_port}.ipc");
let trin_config = TrinConfig::new_from(
[
"trin",
"--portal-subnetworks",
"history",
"--external-address",
external_addr.as_str(),
"--mb",
"10",
"--web3-ipc-path",
fresh_ipc_path.as_str(),
"--ephemeral",
"--discovery-port",
test_discovery_port.to_string().as_ref(),
"--bootnodes",
"none",
]
.iter(),
)
.unwrap();
let _test_client_rpc_handle = trin::run_trin(trin_config).await.unwrap();
let fresh_target = reth_ipc::client::IpcClientBuilder::default()
.build(fresh_ipc_path)
.await
.unwrap();
let fresh_enr = fresh_target.node_info().await.unwrap().enr;

// connect target to network
let _ = target.ping(fresh_enr.clone()).await.unwrap();

let (content_key, content_value) = fixture_header_with_proof();
// use populated offer which means content will *not* be stored in the target's local db
target
.offer(
fresh_enr.clone(),
content_key.clone(),
Some(content_value.clone()),
)
.await
.unwrap();

// sleep to let gossip propagate
sleep(Duration::from_secs(1)).await;

// validate that every node in the network now has a local copy of the header
assert!(
HistoryNetworkApiClient::local_content(target, content_key.clone())
.await
.is_ok()
);
assert!(
HistoryNetworkApiClient::local_content(&fresh_target, content_key.clone())
.await
.is_ok()
);
assert!(HistoryNetworkApiClient::local_content(
&peertest.nodes[0].ipc_client,
content_key.clone()
)
.await
.is_ok());
assert!(HistoryNetworkApiClient::local_content(
&peertest.bootnode.ipc_client,
content_key.clone()
)
.await
.is_ok());
}
26 changes: 26 additions & 0 deletions portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,32 @@ where
destination: enr.clone(),
};

// Validate that the content keys are available in the local store, before sending the
// offer
for content_key in content_keys.into_iter() {
let content_key = TContentKey::try_from(content_key.clone()).map_err(|err| {
OverlayRequestError::ContentNotFound {
message: format!(
"Error decoding content key for content key: {content_key:02X?} - {err}"
),
utp: false,
trace: None,
}
})?;
match self.store.read().get(&content_key) {
Ok(Some(_)) => {}
_ => {
return Err(OverlayRequestError::ContentNotFound {
message: format!(
"Content key not found in local store: {content_key:02X?}"
),
utp: false,
trace: None,
});
}
}
}

// Send the request and wait on the response.
match self
.send_overlay_request(Request::Offer(request), direction)
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/beacon_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ impl BeaconNetworkApiServer for BeaconNetworkApi {
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// If the content value is provided, a "populated" offer is used, which will not store the
/// content locally. Otherwise a regular offer is sent, after validating that the content is
/// available locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/history_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// If the content value is provided, a "populated" offer is used, which will not store the
/// content locally. Otherwise a regular offer is sent, after validating that the content is
/// available locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/state_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ impl StateNetworkApiServer for StateNetworkApi {
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// If the content value is provided, a "populated" offer is used, which will not store the
/// content locally. Otherwise a regular offer is sent, after validating that the content is
/// available locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
Expand Down
21 changes: 21 additions & 0 deletions tests/self_peertest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ async fn peertest_unpopulated_offer() {
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_unpopulated_offer_fails_with_missing_content() {
let (peertest, target, handle) = setup_peertest("mainnet").await;
peertest::scenarios::offer_accept::test_unpopulated_offer_fails_with_missing_content(
&peertest, &target,
)
.await;
peertest.exit_all_nodes();
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_gossip_with_trace() {
Expand Down Expand Up @@ -212,6 +224,15 @@ async fn peertest_state_recursive_gossip() {
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_history_offer_propagates_gossip() {
let (peertest, target, handle) = setup_peertest("mainnet").await;
peertest::scenarios::offer_accept::test_offer_propagates_gossip(&peertest, &target).await;
peertest.exit_all_nodes();
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_ping_cross_discv5_protocol_id() {
Expand Down

0 comments on commit 9540bdd

Please sign in to comment.