From 3fc0ed81fc165961f8c587a95fb5de66a9df7b7e Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Fri, 23 Aug 2024 12:48:48 -0400 Subject: [PATCH] feat: add jsonrpc endpoint for offer with trace (#1394) --- ethportal-api/src/beacon.rs | 12 ++++++ ethportal-api/src/history.rs | 12 ++++++ ethportal-api/src/state.rs | 12 ++++++ ethportal-api/src/types/jsonrpc/endpoints.rs | 6 +++ .../src/scenarios/offer_accept.rs | 34 ++++++++++++++++ portalnet/src/overlay/protocol.rs | 39 ++++++++++++++++++- rpc/src/beacon_rpc.rs | 14 +++++++ rpc/src/history_rpc.rs | 14 +++++++ rpc/src/state_rpc.rs | 14 +++++++ tests/self_peertest.rs | 9 +++++ trin-beacon/src/jsonrpc.rs | 20 ++++++++++ trin-history/src/jsonrpc.rs | 24 +++++++++++- trin-state/src/jsonrpc.rs | 18 +++++++++ 13 files changed, 225 insertions(+), 3 deletions(-) diff --git a/ethportal-api/src/beacon.rs b/ethportal-api/src/beacon.rs index dda88748b..8e07b6a81 100644 --- a/ethportal-api/src/beacon.rs +++ b/ethportal-api/src/beacon.rs @@ -114,6 +114,18 @@ pub trait BeaconNetworkApi { content_value: BeaconContentValue, ) -> RpcResult; + /// Send an OFFER request with given ContentKey, to the designated peer. + /// Does not store the content locally. + /// Returns true if the content was accepted and successfully transferred, + /// returns false if the content was not accepted or the transfer failed. + #[method(name = "beaconTraceOffer")] + async fn trace_offer( + &self, + enr: Enr, + content_key: BeaconContentKey, + content_value: BeaconContentValue, + ) -> RpcResult; + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a /// response. Requires the content keys to be stored locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist diff --git a/ethportal-api/src/history.rs b/ethportal-api/src/history.rs index e09fbbdd5..50fe42915 100644 --- a/ethportal-api/src/history.rs +++ b/ethportal-api/src/history.rs @@ -110,6 +110,18 @@ pub trait HistoryNetworkApi { content_value: HistoryContentValue, ) -> RpcResult; + /// Send an OFFER request with given ContentKey, to the designated peer. + /// Does not store the content locally. + /// Returns true if the content was accepted and successfully transferred, + /// returns false if the content was not accepted or the transfer failed. + #[method(name = "historyTraceOffer")] + async fn trace_offer( + &self, + enr: Enr, + content_key: HistoryContentKey, + content_value: HistoryContentValue, + ) -> RpcResult; + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a /// response. Requires the content keys to be stored locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist diff --git a/ethportal-api/src/state.rs b/ethportal-api/src/state.rs index 2be2c01f8..7943b41c3 100644 --- a/ethportal-api/src/state.rs +++ b/ethportal-api/src/state.rs @@ -103,6 +103,18 @@ pub trait StateNetworkApi { content_value: StateContentValue, ) -> RpcResult; + /// Send an OFFER request with given ContentKey, to the designated peer. + /// Does not store the content locally. + /// Returns true if the content was accepted and successfully transferred, + /// returns false if the content was not accepted or the transfer failed. + #[method(name = "stateTraceOffer")] + async fn trace_offer( + &self, + enr: Enr, + content_key: StateContentKey, + content_value: StateContentValue, + ) -> RpcResult; + /// Store content key with a content data to the local database. #[method(name = "stateStore")] async fn store( diff --git a/ethportal-api/src/types/jsonrpc/endpoints.rs b/ethportal-api/src/types/jsonrpc/endpoints.rs index 57d84d3a8..d2d28dd05 100644 --- a/ethportal-api/src/types/jsonrpc/endpoints.rs +++ b/ethportal-api/src/types/jsonrpc/endpoints.rs @@ -47,6 +47,8 @@ pub enum StateEndpoint { /// params: [enr, content_key, content_value] Offer(Enr, StateContentKey, StateContentValue), /// params: [enr, content_key, content_value] + TraceOffer(Enr, StateContentKey, StateContentValue), + /// params: [enr, content_key, content_value] Gossip(StateContentKey, StateContentValue), /// params: [content_key, content_value] TraceGossip(StateContentKey, StateContentValue), @@ -79,6 +81,8 @@ pub enum HistoryEndpoint { TraceGossip(HistoryContentKey, HistoryContentValue), /// params: [enr, content_key, content_value] Offer(Enr, HistoryContentKey, HistoryContentValue), + /// params: [enr, content_key, content_value] + TraceOffer(Enr, HistoryContentKey, HistoryContentValue), /// params: [enr, [content_key]] WireOffer(Enr, Vec), /// params: [enr] @@ -127,6 +131,8 @@ pub enum BeaconEndpoint { TraceGossip(BeaconContentKey, BeaconContentValue), /// params: [enr, content_key, content_value] Offer(Enr, BeaconContentKey, BeaconContentValue), + /// params: [enr, content_key, content_value] + TraceOffer(Enr, BeaconContentKey, BeaconContentValue), /// params: [enr, [content_key]] WireOffer(Enr, Vec), /// params: enr diff --git a/ethportal-peertest/src/scenarios/offer_accept.rs b/ethportal-peertest/src/scenarios/offer_accept.rs index 30780c3d3..53e7361b9 100644 --- a/ethportal-peertest/src/scenarios/offer_accept.rs +++ b/ethportal-peertest/src/scenarios/offer_accept.rs @@ -93,6 +93,40 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) { ); } +pub async fn test_populated_offer_with_trace(peertest: &Peertest, target: &Client) { + info!("Testing Populated Offer/ACCEPT flow with trace"); + + // store header for validation + let (content_key, content_value) = fixture_header_with_proof(); + let store_result = peertest + .bootnode + .ipc_client + .store(content_key.clone(), content_value.clone()) + .await + .unwrap(); + assert!(store_result); + + // use block body to test transfer of large content over utp + let (content_key, content_value) = fixture_block_body(); + let result = target + .trace_offer( + Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), + content_key.clone(), + content_value.clone(), + ) + .await + .unwrap(); + + // check that the result of the offer is true for a valid transfer + assert!(result); + + // Check if the stored content value in bootnode's DB matches the offered + assert_eq!( + content_value, + wait_for_history_content(&peertest.bootnode.ipc_client, content_key).await, + ); +} + pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client) { info!("Testing populated offer propagates gossip"); diff --git a/portalnet/src/overlay/protocol.rs b/portalnet/src/overlay/protocol.rs index 84b5e11fe..b555f9188 100644 --- a/portalnet/src/overlay/protocol.rs +++ b/portalnet/src/overlay/protocol.rs @@ -43,7 +43,7 @@ use ethportal_api::{ enr::Enr, portal_wire::{ Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Ping, Pong, - PopulatedOffer, ProtocolId, Request, Response, + PopulatedOffer, PopulatedOfferWithResult, ProtocolId, Request, Response, }, }, utils::bytes::hex_encode, @@ -576,6 +576,43 @@ where } } + /// Send Offer request with trace, without storing the content into db + pub async fn send_offer_trace( + &self, + enr: Enr, + content_key: RawContentKey, + content_value: Vec, + ) -> Result { + // Construct the request. + let (result_tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let request = Request::PopulatedOfferWithResult(PopulatedOfferWithResult { + content_item: (content_key, content_value), + result_tx, + }); + + let direction = RequestDirection::Outgoing { + destination: enr.clone(), + }; + + // Send the offer request and wait on the response. + // Ignore the accept message, since we only care about the trace. + self.send_overlay_request(request, direction).await?; + + // Wait for the trace response. + match rx.recv().await { + Some(accept) => Ok(accept), + None => { + warn!( + protocol = %self.protocol, + "Error receiving TraceOffer query response" + ); + Err(OverlayRequestError::ChannelFailure( + "Error receiving TraceOffer query response".to_string(), + )) + } + } + } + pub async fn lookup_node(&self, target: NodeId) -> Vec { if target == self.local_enr().node_id() { return vec![self.local_enr()]; diff --git a/rpc/src/beacon_rpc.rs b/rpc/src/beacon_rpc.rs index f3a7b9a39..23698debe 100644 --- a/rpc/src/beacon_rpc.rs +++ b/rpc/src/beacon_rpc.rs @@ -171,6 +171,20 @@ impl BeaconNetworkApiServer for BeaconNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } + /// Send an OFFER request with given ContentKey, to the designated peer. + /// Does not store the content locally. + /// Returns true if the content was accepted and successfully transferred, + /// returns false if the content was not accepted or the transfer failed. + async fn trace_offer( + &self, + enr: Enr, + content_key: BeaconContentKey, + content_value: BeaconContentValue, + ) -> RpcResult { + let endpoint = BeaconEndpoint::TraceOffer(enr, content_key, content_value); + Ok(proxy_to_subnet(&self.network, endpoint).await?) + } + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a /// response. Requires the content keys to be stored locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist diff --git a/rpc/src/history_rpc.rs b/rpc/src/history_rpc.rs index 8f4426aa8..1e4789d01 100644 --- a/rpc/src/history_rpc.rs +++ b/rpc/src/history_rpc.rs @@ -157,6 +157,20 @@ impl HistoryNetworkApiServer for HistoryNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } + /// Send an OFFER request with given ContentKey, to the designated peer. + /// Does not store the content locally. + /// Returns true if the content was accepted and successfully transferred, + /// returns false if the content was not accepted or the transfer failed. + async fn trace_offer( + &self, + enr: Enr, + content_key: HistoryContentKey, + content_value: HistoryContentValue, + ) -> RpcResult { + let endpoint = HistoryEndpoint::TraceOffer(enr, content_key, content_value); + Ok(proxy_to_subnet(&self.network, endpoint).await?) + } + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a /// response. Requires the content keys to be stored locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist diff --git a/rpc/src/state_rpc.rs b/rpc/src/state_rpc.rs index 0315857be..4183a2691 100644 --- a/rpc/src/state_rpc.rs +++ b/rpc/src/state_rpc.rs @@ -150,6 +150,20 @@ impl StateNetworkApiServer for StateNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } + /// Send an OFFER request with given ContentKey, to the designated peer. + /// Does not store the content locally. + /// Returns true if the content was accepted and successfully transferred, + /// returns false if the content was not accepted or the transfer failed. + async fn trace_offer( + &self, + enr: Enr, + content_key: StateContentKey, + content_value: StateContentValue, + ) -> RpcResult { + let endpoint = StateEndpoint::TraceOffer(enr, content_key, content_value); + Ok(proxy_to_subnet(&self.network, endpoint).await?) + } + /// Store content key with a content data to the local database. async fn store( &self, diff --git a/tests/self_peertest.rs b/tests/self_peertest.rs index d680f38c2..bddeb6a7b 100644 --- a/tests/self_peertest.rs +++ b/tests/self_peertest.rs @@ -69,6 +69,15 @@ async fn peertest_populated_offer() { handle.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn peertest_populated_offer_with_trace() { + let (peertest, target, handle) = setup_peertest("mainnet", &[HISTORY_NETWORK]).await; + peertest::scenarios::offer_accept::test_populated_offer_with_trace(&peertest, &target).await; + peertest.exit_all_nodes(); + handle.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread")] #[serial] async fn peertest_unpopulated_offer() { diff --git a/trin-beacon/src/jsonrpc.rs b/trin-beacon/src/jsonrpc.rs index 48c5a2734..ce17644cf 100644 --- a/trin-beacon/src/jsonrpc.rs +++ b/trin-beacon/src/jsonrpc.rs @@ -78,6 +78,9 @@ async fn complete_request(network: Arc, request: BeaconJsonRpcReq BeaconEndpoint::WireOffer(enr, content_keys) => { wire_offer(network, enr, content_keys).await } + BeaconEndpoint::TraceOffer(enr, content_key, content_value) => { + trace_offer(network, enr, content_key, content_value).await + } BeaconEndpoint::Ping(enr) => ping(network, enr).await, BeaconEndpoint::RoutingTableInfo => { serde_json::to_value(network.overlay.routing_table_info()) @@ -361,6 +364,23 @@ async fn offer( } } +/// Constructs a JSON call for the Offer method with trace. +async fn trace_offer( + network: Arc, + enr: discv5::enr::Enr, + content_key: BeaconContentKey, + content_value: BeaconContentValue, +) -> Result { + match network + .overlay + .send_offer_trace(enr, content_key.into(), content_value.encode()) + .await + { + Ok(accept) => Ok(json!(accept)), + Err(msg) => Err(format!("Offer request timeout: {msg:?}")), + } +} + /// Constructs a JSON call for the WireOffer method. async fn wire_offer( network: Arc, diff --git a/trin-history/src/jsonrpc.rs b/trin-history/src/jsonrpc.rs index f8db1e11f..50f770054 100644 --- a/trin-history/src/jsonrpc.rs +++ b/trin-history/src/jsonrpc.rs @@ -68,12 +68,15 @@ async fn complete_request(network: Arc, request: HistoryJsonRpcR gossip(network, content_key, content_value).await } HistoryEndpoint::TraceGossip(content_key, content_value) => { - gossip_trace(network, content_key, content_value).await + trace_gossip(network, content_key, content_value).await } HistoryEndpoint::LookupEnr(node_id) => lookup_enr(network, node_id).await, HistoryEndpoint::Offer(enr, content_key, content_value) => { offer(network, enr, content_key, content_value).await } + HistoryEndpoint::TraceOffer(enr, content_key, content_value) => { + trace_offer(network, enr, content_key, content_value).await + } HistoryEndpoint::WireOffer(enr, content_keys) => { wire_offer(network, enr, content_keys).await } @@ -314,7 +317,7 @@ async fn gossip( } /// Constructs a JSON call for the Gossip method, with tracing enabled. -async fn gossip_trace( +async fn trace_gossip( network: Arc, content_key: HistoryContentKey, content_value: ethportal_api::HistoryContentValue, @@ -347,6 +350,23 @@ async fn offer( } } +/// Constructs a JSON call for the Offer method with trace. +async fn trace_offer( + network: Arc, + enr: discv5::enr::Enr, + content_key: HistoryContentKey, + content_value: HistoryContentValue, +) -> Result { + match network + .overlay + .send_offer_trace(enr, content_key.into(), content_value.encode()) + .await + { + Ok(accept) => Ok(json!(accept)), + Err(msg) => Err(format!("Offer request timeout: {msg:?}")), + } +} + /// Constructs a JSON call for the WireOffer method. async fn wire_offer( network: Arc, diff --git a/trin-state/src/jsonrpc.rs b/trin-state/src/jsonrpc.rs index 8dc4f005c..03aea10ff 100644 --- a/trin-state/src/jsonrpc.rs +++ b/trin-state/src/jsonrpc.rs @@ -65,6 +65,9 @@ impl StateRequestHandler { StateEndpoint::Offer(enr, content_key, content_value) => { offer(network, enr, content_key, content_value).await } + StateEndpoint::TraceOffer(enr, content_key, content_value) => { + trace_offer(network, enr, content_key, content_value).await + } StateEndpoint::Gossip(content_key, content_value) => { gossip( network, @@ -318,6 +321,21 @@ async fn offer( ) } +async fn trace_offer( + network: Arc, + enr: Enr, + content_key: StateContentKey, + content_value: StateContentValue, +) -> Result { + to_json_result( + "TraceOffer", + network + .overlay + .send_offer_trace(enr, content_key.into(), content_value.encode()) + .await, + ) +} + async fn gossip( network: Arc, content_key: StateContentKey,