From 627189e570d5750cc0e9b009df13921b2973f99b Mon Sep 17 00:00:00 2001 From: Kolby Moroz Liebl <31669092+KolbyML@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:44:12 -0700 Subject: [PATCH] feat: update `portal_*Offer` to handle multiple pieces of content (#1507) --- ethportal-api/src/beacon.rs | 20 +-- ethportal-api/src/history.rs | 20 +-- ethportal-api/src/state.rs | 9 +- ethportal-api/src/types/jsonrpc/endpoints.rs | 18 +-- ethportal-api/src/types/portal.rs | 2 + ethportal-peertest/src/scenarios/gossip.rs | 8 +- .../src/scenarios/offer_accept.rs | 141 +++--------------- ethportal-peertest/src/scenarios/state.rs | 3 +- portalnet/src/gossip.rs | 7 +- portalnet/src/overlay/protocol.rs | 42 +----- rpc/src/beacon_rpc.rs | 49 +++--- rpc/src/history_rpc.rs | 49 +++--- rpc/src/state_rpc.rs | 33 ++-- tests/self_peertest.rs | 31 +--- trin-beacon/src/jsonrpc.rs | 34 +---- trin-history/src/jsonrpc.rs | 33 +--- trin-state/src/jsonrpc.rs | 14 +- 17 files changed, 151 insertions(+), 362 deletions(-) diff --git a/ethportal-api/src/beacon.rs b/ethportal-api/src/beacon.rs index dd08a339f..2a01cea68 100644 --- a/ethportal-api/src/beacon.rs +++ b/ethportal-api/src/beacon.rs @@ -115,19 +115,18 @@ pub trait BeaconNetworkApi { content_value: RawContentValue, ) -> RpcResult; - /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// Does not store the content locally. + /// Send an OFFER request with given ContentItems, to the designated peer and wait for a + /// response. Does not store the content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. #[method(name = "beaconOffer")] async fn offer( &self, enr: Enr, - content_key: BeaconContentKey, - content_value: RawContentValue, + content_items: Vec<(BeaconContentKey, RawContentValue)>, ) -> RpcResult; - /// Send an OFFER request with given ContentKey, to the designated peer. + /// Send an OFFER request with given ContentItems, to the designated peer. /// Does not store the content locally. /// Returns trace info for the offer. #[method(name = "beaconTraceOffer")] @@ -138,17 +137,6 @@ pub trait BeaconNetworkApi { content_value: RawContentValue, ) -> RpcResult; - /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a - /// response. Requires the content keys to be stored locally. - /// Returns the content keys bitlist upon successful content transmission or empty bitlist - /// receive. - #[method(name = "beaconWireOffer")] - async fn wire_offer( - &self, - enr: Enr, - content_keys: Vec, - ) -> RpcResult; - /// Store content key with a content data to the local database. #[method(name = "beaconStore")] async fn store( diff --git a/ethportal-api/src/history.rs b/ethportal-api/src/history.rs index e91afe782..5232fcdca 100644 --- a/ethportal-api/src/history.rs +++ b/ethportal-api/src/history.rs @@ -101,19 +101,18 @@ pub trait HistoryNetworkApi { content_value: RawContentValue, ) -> RpcResult; - /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// Does not store the content locally. + /// Send an OFFER request with given ContentItems, to the designated peer and wait for a + /// response. Does not store the content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. #[method(name = "historyOffer")] async fn offer( &self, enr: Enr, - content_key: HistoryContentKey, - content_value: RawContentValue, + content_items: Vec<(HistoryContentKey, RawContentValue)>, ) -> RpcResult; - /// Send an OFFER request with given ContentKey, to the designated peer. + /// Send an OFFER request with given ContentItems, to the designated peer. /// Does not store the content locally. /// Returns trace info for the offer. #[method(name = "historyTraceOffer")] @@ -124,17 +123,6 @@ pub trait HistoryNetworkApi { content_value: RawContentValue, ) -> RpcResult; - /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a - /// response. Requires the content keys to be stored locally. - /// Returns the content keys bitlist upon successful content transmission or empty bitlist - /// receive. - #[method(name = "historyWireOffer")] - async fn wire_offer( - &self, - enr: Enr, - content_keys: Vec, - ) -> RpcResult; - /// Store content key with a content data to the local database. #[method(name = "historyStore")] async fn store( diff --git a/ethportal-api/src/state.rs b/ethportal-api/src/state.rs index 88402adc8..b10c704d9 100644 --- a/ethportal-api/src/state.rs +++ b/ethportal-api/src/state.rs @@ -94,19 +94,18 @@ pub trait StateNetworkApi { content_value: RawContentValue, ) -> RpcResult; - /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// Does not store the content locally. + /// Send an OFFER request with given ContentItems, to the designated peer and wait for a + /// response. Does not store the content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. #[method(name = "stateOffer")] async fn offer( &self, enr: Enr, - content_key: StateContentKey, - content_value: RawContentValue, + content_items: Vec<(StateContentKey, RawContentValue)>, ) -> RpcResult; - /// Send an OFFER request with given ContentKey, to the designated peer. + /// Send an OFFER request with given ContentItems, to the designated peer. /// Does not store the content locally. /// Returns trace info for offer. #[method(name = "stateTraceOffer")] diff --git a/ethportal-api/src/types/jsonrpc/endpoints.rs b/ethportal-api/src/types/jsonrpc/endpoints.rs index 6426c63d1..951f94fb3 100644 --- a/ethportal-api/src/types/jsonrpc/endpoints.rs +++ b/ethportal-api/src/types/jsonrpc/endpoints.rs @@ -42,10 +42,8 @@ pub enum StateEndpoint { TraceRecursiveFindContent(StateContentKey), /// params: [content_key, content_value] Store(StateContentKey, StateContentValue), - /// WireOffer is not supported in the state network, since locally - /// stored values do not contain the proofs necessary for valid gossip. - /// params: [enr, content_key, content_value] - Offer(Enr, StateContentKey, StateContentValue), + /// params: [enr, Vec<(content_key, content_value>)] + Offer(Enr, Vec<(StateContentKey, StateContentValue)>), /// params: [enr, content_key, content_value] TraceOffer(Enr, StateContentKey, StateContentValue), /// params: [enr, content_key, content_value] @@ -79,12 +77,10 @@ pub enum HistoryEndpoint { Gossip(HistoryContentKey, HistoryContentValue), /// params: [content_key, content_value] TraceGossip(HistoryContentKey, HistoryContentValue), - /// params: [enr, content_key, content_value] - Offer(Enr, HistoryContentKey, HistoryContentValue), + /// params: [enr, Vec<(content_key, content_value)>] + Offer(Enr, Vec<(HistoryContentKey, HistoryContentValue)>), /// params: [enr, content_key, content_value] TraceOffer(Enr, HistoryContentKey, HistoryContentValue), - /// params: [enr, [content_key]] - WireOffer(Enr, Vec), /// params: [enr] Ping(Enr), /// params: content_key @@ -133,12 +129,10 @@ pub enum BeaconEndpoint { Gossip(BeaconContentKey, BeaconContentValue), /// params: [content_key, content_value] TraceGossip(BeaconContentKey, BeaconContentValue), - /// params: [enr, content_key, content_value] - Offer(Enr, BeaconContentKey, BeaconContentValue), + /// params: [enr, Vec<(content_key, content_value>)] + Offer(Enr, Vec<(BeaconContentKey, BeaconContentValue)>), /// params: [enr, content_key, content_value] TraceOffer(Enr, BeaconContentKey, BeaconContentValue), - /// params: [enr, [content_key]] - WireOffer(Enr, Vec), /// params: enr Ping(Enr), /// params: content_key diff --git a/ethportal-api/src/types/portal.rs b/ethportal-api/src/types/portal.rs index 4db8eb077..5821e8a23 100644 --- a/ethportal-api/src/types/portal.rs +++ b/ethportal-api/src/types/portal.rs @@ -32,6 +32,8 @@ pub struct PongInfo { pub type FindNodesInfo = Vec; +pub const MAX_CONTENT_KEYS_PER_OFFER: usize = 64; + /// Response for Offer endpoint #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/ethportal-peertest/src/scenarios/gossip.rs b/ethportal-peertest/src/scenarios/gossip.rs index 18265f7cf..ae276d50a 100644 --- a/ethportal-peertest/src/scenarios/gossip.rs +++ b/ethportal-peertest/src/scenarios/gossip.rs @@ -181,13 +181,15 @@ pub async fn test_gossip_dropped_with_offer(peertest: &Peertest, target: &Client target .offer( fresh_enr.clone(), - header_key_2.clone(), - header_value_2.encode(), + vec![(header_key_2.clone(), header_value_2.encode())], ) .await .unwrap(); target - .offer(fresh_enr.clone(), body_key_2.clone(), body_value_2.encode()) + .offer( + fresh_enr.clone(), + vec![(body_key_2.clone(), body_value_2.encode())], + ) .await .unwrap(); diff --git a/ethportal-peertest/src/scenarios/offer_accept.rs b/ethportal-peertest/src/scenarios/offer_accept.rs index ff712ab97..b56e7def9 100644 --- a/ethportal-peertest/src/scenarios/offer_accept.rs +++ b/ethportal-peertest/src/scenarios/offer_accept.rs @@ -18,71 +18,14 @@ use ethportal_api::{ ContentValue, Discv5ApiClient, HistoryNetworkApiClient, }; -pub async fn test_unpopulated_offer(peertest: &Peertest, target: &Client) { - info!("Testing Unpopulated OFFER/ACCEPT flow"); - - let (content_key, content_value) = fixture_header_by_hash(); - // Store content to offer in the testnode db - let store_result = target - .store(content_key.clone(), content_value.encode()) - .await - .unwrap(); - - assert!(store_result); - - // Send wire offer request from testnode to bootnode - let result = target - .wire_offer( - Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), - vec![content_key.clone()], - ) - .await - .unwrap(); - - // Check that ACCEPT response sent by bootnode accepted the offered content - assert_eq!(hex_encode(result.content_keys.into_bytes()), "0x03"); - - // Check if the stored content value in bootnode's DB matches the offered - assert_eq!( - content_value, - wait_for_history_content(&peertest.bootnode.ipc_client, content_key).await, - ); -} - -pub async fn test_unpopulated_offer_fails_with_missing_content( - peertest: &Peertest, - target: &Client, -) { - info!("Testing Unpopulated OFFER/ACCEPT flow with missing content"); - - let (content_key, _content_value) = fixture_header_by_hash(); - - // validate that wire offer fails if content not available locally - match target - .wire_offer( - Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), - vec![content_key.clone()], - ) - .await - { - Ok(_) => panic!("Unpopulated offer should have failed"), - Err(e) => { - assert!(e - .to_string() - .contains("Content key not found in local store")); - } - } -} - -pub async fn test_populated_offer(peertest: &Peertest, target: &Client) { - info!("Testing Populated Offer/ACCEPT flow"); +pub async fn test_offer(peertest: &Peertest, target: &Client) { + info!("Testing Offer/ACCEPT flow"); let (content_key, content_value) = fixture_header_by_hash(); let result = target .offer( Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), - content_key.clone(), - content_value.encode(), + vec![(content_key.clone(), content_value.encode())], ) .await .unwrap(); @@ -97,8 +40,8 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) { ); } -pub async fn test_populated_offer_with_trace(peertest: &Peertest, target: &Client) { - info!("Testing Populated Offer/ACCEPT flow with trace"); +pub async fn test_offer_with_trace(peertest: &Peertest, target: &Client) { + info!("Testing Offer/ACCEPT flow with trace"); // store header for validation let (content_key, content_value) = fixture_header_by_hash(); @@ -136,16 +79,15 @@ pub async fn test_populated_offer_with_trace(peertest: &Peertest, target: &Clien } pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client) { - info!("Testing populated offer propagates gossip"); + info!("Testing offer propagates gossip"); // get content values to gossip let (content_key, content_value) = fixture_header_by_hash(); - // use populated offer which means content will *not* be stored in the target's local db + // use offer which means content will *not* be stored in the target's local db target .offer( peertest.bootnode.enr.clone(), - content_key.clone(), - content_value.encode(), + vec![(content_key.clone(), content_value.encode())], ) .await .unwrap(); @@ -166,7 +108,7 @@ pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client) } pub async fn test_offer_propagates_gossip_with_large_content(peertest: &Peertest, target: &Client) { - info!("Testing populated offer propagates gossips single large content"); + info!("Testing offer propagates gossips single large content"); let (header_key, header_value) = fixture_header_by_hash_with_proof_15040708(); // 763kb block body @@ -178,15 +120,10 @@ pub async fn test_offer_propagates_gossip_with_large_content(peertest: &Peertest .await .unwrap(); assert!(store_result); - let store_result = target - .store(body_key.clone(), body_value.encode()) - .await - .unwrap(); - assert!(store_result); target - .wire_offer( + .offer( peertest.bootnode.ipc_client.node_info().await.unwrap().enr, - vec![body_key.clone()], + vec![(body_key.clone(), body_value.encode())], ) .await .unwrap(); @@ -211,7 +148,7 @@ pub async fn test_offer_propagates_gossip_multiple_content_values( peertest: &Peertest, target: &Client, ) { - info!("Testing populated offer propagates gossips multiple content values simultaneously"); + info!("Testing offer propagates gossips multiple content values simultaneously"); // get content values to gossip let (header_key, header_value) = fixture_header_by_hash_with_proof_15040708(); let (body_key, body_value) = fixture_block_body_15040708(); @@ -221,8 +158,7 @@ pub async fn test_offer_propagates_gossip_multiple_content_values( target .offer( peertest.bootnode.enr.clone(), - header_key.clone(), - header_value.encode(), + vec![(header_key.clone(), header_value.encode())], ) .await .unwrap(); @@ -241,23 +177,14 @@ pub async fn test_offer_propagates_gossip_multiple_content_values( wait_for_history_content(&peertest.nodes[0].ipc_client, header_key.clone()).await, ); - // Store content to offer in the testnode db - let store_result = target - .store(body_key.clone(), body_value.encode()) - .await - .unwrap(); - assert!(store_result); - let store_result = target - .store(receipts_key.clone(), receipts_value.encode()) - .await - .unwrap(); - assert!(store_result); - // here everythings stored in target target - .wire_offer( + .offer( peertest.bootnode.ipc_client.node_info().await.unwrap().enr, - vec![body_key.clone(), receipts_key.clone()], + vec![ + (body_key.clone(), body_value.encode()), + (receipts_key.clone(), receipts_value.encode()), + ], ) .await .unwrap(); @@ -295,7 +222,7 @@ pub async fn test_offer_propagates_gossip_multiple_large_content_values( peertest: &Peertest, target: &Client, ) { - info!("Testing populated offer propagates gossips multiple large content simultaneously"); + info!("Testing offer propagates gossips multiple large content simultaneously"); // get content values to gossip let (header_key_1, header_value_1) = fixture_header_by_hash_with_proof_15040708(); @@ -308,16 +235,6 @@ pub async fn test_offer_propagates_gossip_multiple_large_content_values( .await .unwrap(); assert!(store_result); - let store_result = target - .store(body_key_1.clone(), body_value_1.encode()) - .await - .unwrap(); - assert!(store_result); - let store_result = target - .store(receipts_key_1.clone(), receipts_value_1.encode()) - .await - .unwrap(); - assert!(store_result); let (header_key_2, header_value_2) = fixture_header_by_hash_with_proof_15040641(); let (body_key_2, body_value_2) = fixture_block_body_15040641(); @@ -329,25 +246,15 @@ pub async fn test_offer_propagates_gossip_multiple_large_content_values( .await .unwrap(); assert!(store_result); - let store_result = target - .store(body_key_2.clone(), body_value_2.encode()) - .await - .unwrap(); - assert!(store_result); - let store_result = target - .store(receipts_key_2.clone(), receipts_value_2.encode()) - .await - .unwrap(); - assert!(store_result); target - .wire_offer( + .offer( peertest.bootnode.ipc_client.node_info().await.unwrap().enr, vec![ - body_key_1.clone(), - receipts_key_1.clone(), - body_key_2.clone(), - receipts_key_2.clone(), + (body_key_1.clone(), body_value_1.encode()), + (receipts_key_1.clone(), receipts_value_1.encode()), + (body_key_2.clone(), body_value_2.encode()), + (receipts_key_2.clone(), receipts_value_2.encode()), ], ) .await diff --git a/ethportal-peertest/src/scenarios/state.rs b/ethportal-peertest/src/scenarios/state.rs index 457ef3c54..2af697855 100644 --- a/ethportal-peertest/src/scenarios/state.rs +++ b/ethportal-peertest/src/scenarios/state.rs @@ -63,8 +63,7 @@ async fn test_state_offer(fixture: &StateFixture, target: &Client, peer: &Peerte StateNetworkApiClient::offer( target, peer.enr.clone(), - fixture.key.clone(), - fixture.raw_offer_value.clone(), + vec![(fixture.key.clone(), fixture.raw_offer_value.clone())], ) .await .unwrap(); diff --git a/portalnet/src/gossip.rs b/portalnet/src/gossip.rs index ad5f0f7e6..b66cadb6d 100644 --- a/portalnet/src/gossip.rs +++ b/portalnet/src/gossip.rs @@ -18,6 +18,7 @@ use ethportal_api::{ types::{ distance::Metric, enr::Enr, + portal::MAX_CONTENT_KEYS_PER_OFFER, portal_wire::{OfferTrace, PopulatedOffer, PopulatedOfferWithResult, Request, Response}, }, utils::bytes::{hex_encode, hex_encode_compact}, @@ -100,19 +101,19 @@ pub fn propagate_gossip_cross_thread 64 { + if interested_content.len() > MAX_CONTENT_KEYS_PER_OFFER { warn!( enr = %enr, content.len = interested_content.len(), "Too many content items to offer to a single peer, dropping {}.", - interested_content.len() - 64 + interested_content.len() - MAX_CONTENT_KEYS_PER_OFFER ); // sort content keys by distance to the node interested_content.sort_by_cached_key(|(key, _)| { TMetric::distance(&key.content_id(), &enr.node_id().raw()) }); // take 64 closest content keys - interested_content.truncate(64); + interested_content.truncate(MAX_CONTENT_KEYS_PER_OFFER); } // change content keys to raw content keys let interested_content = interested_content diff --git a/portalnet/src/overlay/protocol.rs b/portalnet/src/overlay/protocol.rs index 92e043a8b..11fd258b4 100644 --- a/portalnet/src/overlay/protocol.rs +++ b/portalnet/src/overlay/protocol.rs @@ -470,52 +470,14 @@ where }) } - /// Offer is sent in order to store content to k nodes with radii that contain content-id - /// Offer is also sent to nodes after FindContent (POKE) - pub async fn send_wire_offer( - &self, - enr: Enr, - content_keys: Vec, - ) -> Result { - let content_items = content_keys - .into_iter() - .map(|key| match self.store.read().get(&key) { - Ok(Some(content)) => Ok((key.to_bytes(), content.clone())), - _ => Err(OverlayRequestError::ContentNotFound { - message: format!("Content key not found in local store: {key:02X?}"), - utp: false, - trace: None, - }), - }) - .collect::)>, OverlayRequestError>>()?; - // Construct the request. - let request = PopulatedOffer { content_items }; - let direction = RequestDirection::Outgoing { - destination: enr.clone(), - }; - - // Send the request and wait on the response. - match self - .send_overlay_request(Request::PopulatedOffer(request), direction) - .await - { - Ok(Response::Accept(accept)) => Ok(accept), - Ok(_) => Err(OverlayRequestError::InvalidResponse), - Err(error) => Err(error), - } - } - /// Send Offer request without storing the content into db pub async fn send_offer( &self, enr: Enr, - content_key: RawContentKey, - content_value: Vec, + content_items: Vec<(RawContentKey, Vec)>, ) -> Result { // Construct the request. - let request = Request::PopulatedOffer(PopulatedOffer { - content_items: vec![(content_key, content_value)], - }); + let request = Request::PopulatedOffer(PopulatedOffer { content_items }); let direction = RequestDirection::Outgoing { destination: enr.clone(), diff --git a/rpc/src/beacon_rpc.rs b/rpc/src/beacon_rpc.rs index c72e6e66c..8561600cb 100644 --- a/rpc/src/beacon_rpc.rs +++ b/rpc/src/beacon_rpc.rs @@ -15,7 +15,7 @@ use ethportal_api::{ jsonrpc::{endpoints::BeaconEndpoint, request::BeaconJsonRpcRequest}, portal::{ AcceptInfo, ContentInfo, DataRadius, FindNodesInfo, PaginateLocalContentInfo, PongInfo, - TraceContentInfo, TraceGossipInfo, + TraceContentInfo, TraceGossipInfo, MAX_CONTENT_KEYS_PER_OFFER, }, portal_wire::OfferTrace, }, @@ -179,23 +179,35 @@ impl BeaconNetworkApiServer for BeaconNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } - /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// Does not store content locally. + /// Send an OFFER request with given ContentItems, to the designated peer and wait for a + /// response. Does not store content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( &self, enr: Enr, - content_key: BeaconContentKey, - content_value: RawContentValue, + content_items: Vec<(BeaconContentKey, RawContentValue)>, ) -> RpcResult { - let content_value = BeaconContentValue::decode(&content_key, &content_value) - .map_err(RpcServeError::from)?; - let endpoint = BeaconEndpoint::Offer(enr, content_key, content_value); - Ok(proxy_to_subnet(&self.network, endpoint).await?) - } - - /// Send an OFFER request with given ContentKey, to the designated peer. + if !(1..=MAX_CONTENT_KEYS_PER_OFFER).contains(&content_items.len()) { + return Err(RpcServeError::Message(format!( + "Invalid amount of content items: {}", + content_items.len() + )) + .into()); + } + let content_items = content_items + .into_iter() + .map(|(key, value)| { + BeaconContentValue::decode(&key, &value) + .map(|value| (key, value)) + .map_err(RpcServeError::from) + }) + .collect::, _>>()?; + let endpoint = BeaconEndpoint::Offer(enr, content_items); + Ok(proxy_to_subnet(&self.network, endpoint).await?) + } + + /// Send an OFFER request with given ContentItems, to the designated peer. /// Does not store the content locally. /// Returns trace info from the offer. async fn trace_offer( @@ -210,19 +222,6 @@ impl BeaconNetworkApiServer for BeaconNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } - /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a - /// response. Requires the content keys to be stored locally. - /// Returns the content keys bitlist upon successful content transmission or empty bitlist - /// receive. - async fn wire_offer( - &self, - enr: Enr, - content_keys: Vec, - ) -> RpcResult { - let endpoint = BeaconEndpoint::WireOffer(enr, content_keys); - Ok(proxy_to_subnet(&self.network, endpoint).await?) - } - /// Store content key with a content data to the local database. async fn store( &self, diff --git a/rpc/src/history_rpc.rs b/rpc/src/history_rpc.rs index c806e0af5..0f8ee056d 100644 --- a/rpc/src/history_rpc.rs +++ b/rpc/src/history_rpc.rs @@ -7,7 +7,7 @@ use ethportal_api::{ jsonrpc::{endpoints::HistoryEndpoint, request::HistoryJsonRpcRequest}, portal::{ AcceptInfo, ContentInfo, DataRadius, FindNodesInfo, PaginateLocalContentInfo, PongInfo, - TraceContentInfo, TraceGossipInfo, + TraceContentInfo, TraceGossipInfo, MAX_CONTENT_KEYS_PER_OFFER, }, portal_wire::OfferTrace, }, @@ -152,23 +152,35 @@ impl HistoryNetworkApiServer for HistoryNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } - /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// Does not store content locally. + /// Send an OFFER request with given ContentItems, to the designated peer and wait for a + /// response. Does not store content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( &self, enr: Enr, - content_key: HistoryContentKey, - content_value: RawContentValue, + content_items: Vec<(HistoryContentKey, RawContentValue)>, ) -> RpcResult { - let content_value = HistoryContentValue::decode(&content_key, &content_value) - .map_err(RpcServeError::from)?; - let endpoint = HistoryEndpoint::Offer(enr, content_key, content_value); - Ok(proxy_to_subnet(&self.network, endpoint).await?) - } - - /// Send an OFFER request with given ContentKey, to the designated peer. + if !(1..=MAX_CONTENT_KEYS_PER_OFFER).contains(&content_items.len()) { + return Err(RpcServeError::Message(format!( + "Invalid amount of content items: {}", + content_items.len() + )) + .into()); + } + let content_items = content_items + .into_iter() + .map(|(key, value)| { + HistoryContentValue::decode(&key, &value) + .map(|value| (key, value)) + .map_err(RpcServeError::from) + }) + .collect::, _>>()?; + let endpoint = HistoryEndpoint::Offer(enr, content_items); + Ok(proxy_to_subnet(&self.network, endpoint).await?) + } + + /// Send an OFFER request with given ContentItems, to the designated peer. /// Does not store the content locally. /// Returns true if the content was accepted and successfully transferred, /// returns false if the content was not accepted or the transfer failed. @@ -184,19 +196,6 @@ impl HistoryNetworkApiServer for HistoryNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } - /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a - /// response. Requires the content keys to be stored locally. - /// Returns the content keys bitlist upon successful content transmission or empty bitlist - /// receive. - async fn wire_offer( - &self, - enr: Enr, - content_keys: Vec, - ) -> RpcResult { - let endpoint = HistoryEndpoint::WireOffer(enr, content_keys); - Ok(proxy_to_subnet(&self.network, endpoint).await?) - } - /// Store content key with a content data to the local database. async fn store( &self, diff --git a/rpc/src/state_rpc.rs b/rpc/src/state_rpc.rs index b29deaf31..658c049d0 100644 --- a/rpc/src/state_rpc.rs +++ b/rpc/src/state_rpc.rs @@ -7,7 +7,7 @@ use ethportal_api::{ jsonrpc::{endpoints::StateEndpoint, request::StateJsonRpcRequest}, portal::{ AcceptInfo, ContentInfo, DataRadius, FindNodesInfo, PaginateLocalContentInfo, PongInfo, - TraceContentInfo, TraceGossipInfo, + TraceContentInfo, TraceGossipInfo, MAX_CONTENT_KEYS_PER_OFFER, }, portal_wire::OfferTrace, }, @@ -145,23 +145,32 @@ impl StateNetworkApiServer for StateNetworkApi { Ok(proxy_to_subnet(&self.network, endpoint).await?) } - /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// Does not store content locally. + /// Send an OFFER request with given ContentItems, to the designated peer and wait for a + /// response. Does not store content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( &self, enr: Enr, - content_key: StateContentKey, - content_value: RawContentValue, + content_items: Vec<(StateContentKey, RawContentValue)>, ) -> RpcResult { - let content_value = - StateContentValue::decode(&content_key, &content_value).map_err(RpcServeError::from)?; - let endpoint = StateEndpoint::Offer(enr, content_key, content_value); - Ok(proxy_to_subnet(&self.network, endpoint).await?) - } - - /// Send an OFFER request with given ContentKey, to the designated peer. + if !(1..=MAX_CONTENT_KEYS_PER_OFFER).contains(&content_items.len()) { + return Err(RpcServeError::Message(format!( + "Invalid amount of content items: {}", + content_items.len() + )) + .into()); + } + let content_items = content_items + .into_iter() + .map(|(key, value)| StateContentValue::decode(&key, &value).map(|value| (key, value))) + .collect::, _>>() + .map_err(RpcServeError::from)?; + let endpoint = StateEndpoint::Offer(enr, content_items); + Ok(proxy_to_subnet(&self.network, endpoint).await?) + } + + /// Send an OFFER request with given ContentItems, to the designated peer. /// Does not store the content locally. /// Returns trace info from the offer. async fn trace_offer( diff --git a/tests/self_peertest.rs b/tests/self_peertest.rs index fa437dead..581530173 100644 --- a/tests/self_peertest.rs +++ b/tests/self_peertest.rs @@ -62,43 +62,20 @@ async fn peertest_stateless() { #[tokio::test(flavor = "multi_thread")] #[serial] -async fn peertest_populated_offer() { +async fn peertest_offer() { let (peertest, target, handle) = setup_peertest(&Network::Mainnet, &[Subnetwork::History]).await; - peertest::scenarios::offer_accept::test_populated_offer(&peertest, &target).await; + peertest::scenarios::offer_accept::test_offer(&peertest, &target).await; peertest.exit_all_nodes(); handle.stop().unwrap(); } #[tokio::test(flavor = "multi_thread")] #[serial] -async fn peertest_populated_offer_with_trace() { +async fn peertest_offer_with_trace() { let (peertest, target, handle) = setup_peertest(&Network::Mainnet, &[Subnetwork::History]).await; - peertest::scenarios::offer_accept::test_populated_offer_with_trace(&peertest, &target).await; - peertest.exit_all_nodes(); - handle.stop().unwrap(); -} - -#[tokio::test(flavor = "multi_thread")] -#[serial] -async fn peertest_unpopulated_offer() { - let (peertest, target, handle) = - setup_peertest(&Network::Mainnet, &[Subnetwork::History]).await; - peertest::scenarios::offer_accept::test_unpopulated_offer(&peertest, &target).await; - peertest.exit_all_nodes(); - handle.stop().unwrap(); -} - -#[tokio::test(flavor = "multi_thread")] -#[serial] -async fn peertest_unpopulated_offer_fails_with_missing_content() { - let (peertest, target, handle) = - setup_peertest(&Network::Mainnet, &[Subnetwork::History]).await; - peertest::scenarios::offer_accept::test_unpopulated_offer_fails_with_missing_content( - &peertest, &target, - ) - .await; + peertest::scenarios::offer_accept::test_offer_with_trace(&peertest, &target).await; peertest.exit_all_nodes(); handle.stop().unwrap(); } diff --git a/trin-beacon/src/jsonrpc.rs b/trin-beacon/src/jsonrpc.rs index b4dbe3fd8..57efaa012 100644 --- a/trin-beacon/src/jsonrpc.rs +++ b/trin-beacon/src/jsonrpc.rs @@ -72,12 +72,7 @@ async fn complete_request(network: Arc, request: BeaconJsonRpcReq } BeaconEndpoint::LightClientStore => light_client_store(&network).await, BeaconEndpoint::LookupEnr(node_id) => lookup_enr(network, node_id).await, - BeaconEndpoint::Offer(enr, content_key, content_value) => { - offer(network, enr, content_key, content_value).await - } - BeaconEndpoint::WireOffer(enr, content_keys) => { - wire_offer(network, enr, content_keys).await - } + BeaconEndpoint::Offer(enr, content_items) => offer(network, enr, content_items).await, BeaconEndpoint::TraceOffer(enr, content_key, content_value) => { trace_offer(network, enr, content_key, content_value).await } @@ -363,14 +358,13 @@ async fn gossip( async fn offer( network: Arc, enr: discv5::enr::Enr, - content_key: BeaconContentKey, - content_value: BeaconContentValue, + content_items: Vec<(BeaconContentKey, BeaconContentValue)>, ) -> Result { - match network - .overlay - .send_offer(enr, content_key.to_bytes(), content_value.encode().to_vec()) - .await - { + let content_items = content_items + .into_iter() + .map(|(key, value)| (key.to_bytes(), value.encode().to_vec())) + .collect(); + match network.overlay.send_offer(enr, content_items).await { Ok(accept) => Ok(json!(AcceptInfo { content_keys: accept.content_keys, })), @@ -395,20 +389,6 @@ async fn trace_offer( } } -/// Constructs a JSON call for the WireOffer method. -async fn wire_offer( - network: Arc, - enr: discv5::enr::Enr, - content_keys: Vec, -) -> Result { - match network.overlay.send_wire_offer(enr, content_keys).await { - Ok(accept) => Ok(json!(AcceptInfo { - content_keys: accept.content_keys, - })), - Err(msg) => Err(format!("WireOffer request timeout: {msg:?}")), - } -} - /// Constructs a JSON call for the Ping method. async fn ping( network: Arc, diff --git a/trin-history/src/jsonrpc.rs b/trin-history/src/jsonrpc.rs index 0ded5881f..5651309fb 100644 --- a/trin-history/src/jsonrpc.rs +++ b/trin-history/src/jsonrpc.rs @@ -70,15 +70,10 @@ async fn complete_request(network: Arc, request: HistoryJsonRpcR trace_gossip(network, content_key, content_value).await } HistoryEndpoint::LookupEnr(node_id) => lookup_enr(network, node_id).await, - HistoryEndpoint::Offer(enr, content_key, content_value) => { - offer(network, enr, content_key, content_value).await - } + HistoryEndpoint::Offer(enr, content_items) => offer(network, enr, content_items).await, HistoryEndpoint::TraceOffer(enr, content_key, content_value) => { trace_offer(network, enr, content_key, content_value).await } - HistoryEndpoint::WireOffer(enr, content_keys) => { - wire_offer(network, enr, content_keys).await - } HistoryEndpoint::Ping(enr) => ping(network, enr).await, HistoryEndpoint::RoutingTableInfo => { serde_json::to_value(network.overlay.routing_table_info()) @@ -335,14 +330,13 @@ async fn trace_gossip( async fn offer( network: Arc, enr: discv5::enr::Enr, - content_key: HistoryContentKey, - content_value: HistoryContentValue, + content_items: Vec<(HistoryContentKey, HistoryContentValue)>, ) -> Result { - match network - .overlay - .send_offer(enr, content_key.to_bytes(), content_value.encode().to_vec()) - .await - { + let content_items = content_items + .into_iter() + .map(|(key, value)| (key.to_bytes(), value.encode().to_vec())) + .collect(); + match network.overlay.send_offer(enr, content_items).await { Ok(accept) => Ok(json!(AcceptInfo { content_keys: accept.content_keys, })), @@ -367,19 +361,6 @@ async fn trace_offer( } } -/// Constructs a JSON call for the WireOffer method. -async fn wire_offer( - network: Arc, - enr: discv5::enr::Enr, - content_keys: Vec, -) -> Result { - match network.overlay.send_wire_offer(enr, content_keys).await { - Ok(accept) => Ok(json!(AcceptInfo { - content_keys: accept.content_keys, - })), - Err(msg) => Err(format!("WireOffer request timeout: {msg:?}")), - } -} /// Constructs a JSON call for the Ping method. async fn ping( network: Arc, diff --git a/trin-state/src/jsonrpc.rs b/trin-state/src/jsonrpc.rs index cbc8682b0..657f9a61a 100644 --- a/trin-state/src/jsonrpc.rs +++ b/trin-state/src/jsonrpc.rs @@ -61,9 +61,7 @@ impl StateRequestHandler { StateEndpoint::Store(content_key, content_value) => { store(network, content_key, content_value).await } - StateEndpoint::Offer(enr, content_key, content_value) => { - offer(network, enr, content_key, content_value).await - } + StateEndpoint::Offer(enr, content_items) => offer(network, enr, content_items).await, StateEndpoint::TraceOffer(enr, content_key, content_value) => { trace_offer(network, enr, content_key, content_value).await } @@ -303,14 +301,18 @@ async fn store( async fn offer( network: Arc, enr: Enr, - content_key: StateContentKey, - content_value: StateContentValue, + content_items: Vec<(StateContentKey, StateContentValue)>, ) -> Result { + let content_items = content_items + .into_iter() + .map(|(key, value)| (key.to_bytes(), value.encode().to_vec())) + .collect(); + to_json_result( "Offer", network .overlay - .send_offer(enr, content_key.to_bytes(), content_value.encode().to_vec()) + .send_offer(enr, content_items) .await .map(|accept| AcceptInfo { content_keys: accept.content_keys,