Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update portal_*Offer to handle multiple pieces of content #1507

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions ethportal-api/src/beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,18 @@ pub trait BeaconNetworkApi {
content_value: RawContentValue,
) -> RpcResult<TraceGossipInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store the content locally.
/// Send an OFFER request with given ContentItems, to the designated peer and wait for a
/// response. Does not store the content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "beaconOffer")]
async fn offer(
&self,
enr: Enr,
content_key: BeaconContentKey,
content_value: RawContentValue,
content_items: Vec<(BeaconContentKey, RawContentValue)>,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Send an OFFER request with given ContentItems, to the designated peer.
/// Does not store the content locally.
/// Returns trace info for the offer.
#[method(name = "beaconTraceOffer")]
Expand All @@ -138,7 +137,7 @@ pub trait BeaconNetworkApi {
content_value: RawContentValue,
) -> RpcResult<OfferTrace>;

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// Send an OFFER request with given ContentItemss, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
Expand Down
11 changes: 5 additions & 6 deletions ethportal-api/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,18 @@ pub trait HistoryNetworkApi {
content_value: RawContentValue,
) -> RpcResult<TraceGossipInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store the content locally.
/// Send an OFFER request with given ContentItems, to the designated peer and wait for a
/// response. Does not store the content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "historyOffer")]
async fn offer(
&self,
enr: Enr,
content_key: HistoryContentKey,
content_value: RawContentValue,
content_items: Vec<(HistoryContentKey, RawContentValue)>,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Send an OFFER request with given ContentItems, to the designated peer.
/// Does not store the content locally.
/// Returns trace info for the offer.
#[method(name = "historyTraceOffer")]
Expand All @@ -124,7 +123,7 @@ pub trait HistoryNetworkApi {
content_value: RawContentValue,
) -> RpcResult<OfferTrace>;

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// Send an OFFER request with given ContentItemss, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
Expand Down
9 changes: 4 additions & 5 deletions ethportal-api/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,18 @@ pub trait StateNetworkApi {
content_value: RawContentValue,
) -> RpcResult<TraceGossipInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store the content locally.
/// Send an OFFER request with given ContentItems, to the designated peer and wait for a
/// response. Does not store the content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "stateOffer")]
async fn offer(
&self,
enr: Enr,
content_key: StateContentKey,
content_value: RawContentValue,
content_items: Vec<(StateContentKey, RawContentValue)>,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Send an OFFER request with given ContentItems, to the designated peer.
/// Does not store the content locally.
/// Returns trace info for offer.
#[method(name = "stateTraceOffer")]
Expand Down
12 changes: 6 additions & 6 deletions ethportal-api/src/types/jsonrpc/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub enum StateEndpoint {
Store(StateContentKey, StateContentValue),
/// WireOffer is not supported in the state network, since locally
/// stored values do not contain the proofs necessary for valid gossip.
/// params: [enr, content_key, content_value]
Offer(Enr, StateContentKey, StateContentValue),
/// params: [enr, Vec<(content_key, content_value>)]
Offer(Enr, Vec<(StateContentKey, StateContentValue)>),
/// params: [enr, content_key, content_value]
TraceOffer(Enr, StateContentKey, StateContentValue),
/// params: [enr, content_key, content_value]
Expand Down Expand Up @@ -79,8 +79,8 @@ pub enum HistoryEndpoint {
Gossip(HistoryContentKey, HistoryContentValue),
/// params: [content_key, content_value]
TraceGossip(HistoryContentKey, HistoryContentValue),
/// params: [enr, content_key, content_value]
Offer(Enr, HistoryContentKey, HistoryContentValue),
/// params: [enr, Vec<(content_key, content_value)>]
Offer(Enr, Vec<(HistoryContentKey, HistoryContentValue)>),
/// params: [enr, content_key, content_value]
TraceOffer(Enr, HistoryContentKey, HistoryContentValue),
/// params: [enr, [content_key]]
Expand Down Expand Up @@ -133,8 +133,8 @@ pub enum BeaconEndpoint {
Gossip(BeaconContentKey, BeaconContentValue),
/// params: [content_key, content_value]
TraceGossip(BeaconContentKey, BeaconContentValue),
/// params: [enr, content_key, content_value]
Offer(Enr, BeaconContentKey, BeaconContentValue),
/// params: [enr, Vec<(content_key, content_value>)]
Offer(Enr, Vec<(BeaconContentKey, BeaconContentValue)>),
/// params: [enr, content_key, content_value]
TraceOffer(Enr, BeaconContentKey, BeaconContentValue),
/// params: [enr, [content_key]]
Expand Down
8 changes: 5 additions & 3 deletions ethportal-peertest/src/scenarios/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,15 @@ pub async fn test_gossip_dropped_with_offer(peertest: &Peertest, target: &Client
target
.offer(
fresh_enr.clone(),
header_key_2.clone(),
header_value_2.encode(),
vec![(header_key_2.clone(), header_value_2.encode())],
)
.await
.unwrap();
target
.offer(fresh_enr.clone(), body_key_2.clone(), body_value_2.encode())
.offer(
fresh_enr.clone(),
vec![(body_key_2.clone(), body_value_2.encode())],
)
.await
.unwrap();

Expand Down
9 changes: 3 additions & 6 deletions ethportal-peertest/src/scenarios/offer_accept.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add test here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove WireOffer then Offer will take it's place for tests which should be sufficient for test coverage

Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) {
let result = target
.offer(
Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(),
content_key.clone(),
content_value.encode(),
vec![(content_key.clone(), content_value.encode())],
)
.await
.unwrap();
Expand Down Expand Up @@ -144,8 +143,7 @@ pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client)
target
.offer(
peertest.bootnode.enr.clone(),
content_key.clone(),
content_value.encode(),
vec![(content_key.clone(), content_value.encode())],
)
.await
.unwrap();
Expand Down Expand Up @@ -221,8 +219,7 @@ pub async fn test_offer_propagates_gossip_multiple_content_values(
target
.offer(
peertest.bootnode.enr.clone(),
header_key.clone(),
header_value.encode(),
vec![(header_key.clone(), header_value.encode())],
)
.await
.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions ethportal-peertest/src/scenarios/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ async fn test_state_offer(fixture: &StateFixture, target: &Client, peer: &Peerte
StateNetworkApiClient::offer(
target,
peer.enr.clone(),
fixture.key.clone(),
fixture.raw_offer_value.clone(),
vec![(fixture.key.clone(), fixture.raw_offer_value.clone())],
)
.await
.unwrap();
Expand Down
7 changes: 2 additions & 5 deletions portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,10 @@ where
pub async fn send_offer(
&self,
enr: Enr,
content_key: RawContentKey,
content_value: Vec<u8>,
content_items: Vec<(RawContentKey, Vec<u8>)>,
) -> Result<Accept, OverlayRequestError> {
// Construct the request.
let request = Request::PopulatedOffer(PopulatedOffer {
content_items: vec![(content_key, content_value)],
});
let request = Request::PopulatedOffer(PopulatedOffer { content_items });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, check that number of content items is 1..=64 should happen here (in which case it should return Err(OverlayRequestError::InvalidRequest(..))).

In that case, it's probably not needed in *_rpc.rs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should move the request check to portalnet/src/overlay/protocol.rs. Because then we would be sending possibly over 64 key-value pairs through the *_rx channel which I think should be avoided. Yes this suggestion would reduce code duplication, but I don't think we should push validating length checks after a channel. We should do the range check as soon as possible to avoid that send. Why waste extra resources when it can be avoided.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I think it should be there is not because I want to avoid code duplication, but because this type of check should be at lower level. Reason being is that what if there is alternative call to this function (or is added in the future).

Now that I think about it, we might want it at even lower level (e.g. right before sending offer request over network). Because poke, and gossip on deletion (and maybe some other mechanics) wouldn't have this check.

But, regarding your point, it's fine to have it on upper level as well (and not make calls that we know would fail).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a check at the lower level https://github.com/ethereum/trin/blob/master/portalnet/src/gossip.rs#L103-L115 ... but i'd include an update in this pr to use the MAX_AMOUNT_OF_OFFERED_CONTENT_KEYS const in that code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the code to use that const. I think it is fine if we have a lower and higher level check


let direction = RequestDirection::Outgoing {
destination: enr.clone(),
Expand Down
30 changes: 21 additions & 9 deletions rpc/src/beacon_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,35 @@ impl BeaconNetworkApiServer for BeaconNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store content locally.
/// Send an OFFER request with given ContentItems, to the designated peer and wait for a
/// response. Does not store content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
&self,
enr: Enr,
content_key: BeaconContentKey,
content_value: RawContentValue,
content_items: Vec<(BeaconContentKey, RawContentValue)>,
) -> RpcResult<AcceptInfo> {
let content_value = BeaconContentValue::decode(&content_key, &content_value)
.map_err(RpcServeError::from)?;
let endpoint = BeaconEndpoint::Offer(enr, content_key, content_value);
if !(1..=64).contains(&content_items.len()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably define 64 as a constant somewhere in the ethportal-api.

return Err(RpcServeError::Message(format!(
"Invalid amount of content items: {}",
content_items.len()
))
.into());
}
let content_items = content_items
.into_iter()
.map(|(key, value)| {
BeaconContentValue::decode(&key, &value)
.map(|value| (key, value))
.map_err(RpcServeError::from)
})
.collect::<Result<Vec<_>, _>>()?;
let endpoint = BeaconEndpoint::Offer(enr, content_items);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Send an OFFER request with given ContentItems, to the designated peer.
/// Does not store the content locally.
/// Returns trace info from the offer.
async fn trace_offer(
Expand All @@ -210,7 +222,7 @@ impl BeaconNetworkApiServer for BeaconNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// Send an OFFER request with given ContentItemss, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
Expand Down
36 changes: 24 additions & 12 deletions rpc/src/history_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,35 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store content locally.
/// Send an OFFER request with given ContentItems, to the designated peer and wait for a
/// response. Does not store content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
&self,
enr: Enr,
content_key: HistoryContentKey,
content_value: RawContentValue,
content_items: Vec<(HistoryContentKey, RawContentValue)>,
) -> RpcResult<AcceptInfo> {
let content_value = HistoryContentValue::decode(&content_key, &content_value)
.map_err(RpcServeError::from)?;
let endpoint = HistoryEndpoint::Offer(enr, content_key, content_value);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer.
if !(1..=64).contains(&content_items.len()) {
return Err(RpcServeError::Message(format!(
"Invalid amount of content items: {}",
content_items.len()
))
.into());
}
let content_items = content_items
.into_iter()
.map(|(key, value)| {
HistoryContentValue::decode(&key, &value)
.map(|value| (key, value))
.map_err(RpcServeError::from)
})
.collect::<Result<Vec<_>, _>>()?;
let endpoint = HistoryEndpoint::Offer(enr, content_items);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentItems, to the designated peer.
/// Does not store the content locally.
/// Returns true if the content was accepted and successfully transferred,
/// returns false if the content was not accepted or the transfer failed.
Expand All @@ -184,7 +196,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// Send an OFFER request with given ContentItemss, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
Expand Down
31 changes: 20 additions & 11 deletions rpc/src/state_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,32 @@ impl StateNetworkApiServer for StateNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store content locally.
/// Send an OFFER request with given ContentItems, to the designated peer and wait for a
/// response. Does not store content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
&self,
enr: Enr,
content_key: StateContentKey,
content_value: RawContentValue,
content_items: Vec<(StateContentKey, RawContentValue)>,
) -> RpcResult<AcceptInfo> {
let content_value =
StateContentValue::decode(&content_key, &content_value).map_err(RpcServeError::from)?;
let endpoint = StateEndpoint::Offer(enr, content_key, content_value);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer.
if !(1..=64).contains(&content_items.len()) {
return Err(RpcServeError::Message(format!(
"Invalid amount of content items: {}",
content_items.len()
))
.into());
}
let content_items = content_items
.into_iter()
.map(|(key, value)| StateContentValue::decode(&key, &value).map(|value| (key, value)))
.collect::<Result<Vec<_>, _>>()
.map_err(RpcServeError::from)?;
let endpoint = StateEndpoint::Offer(enr, content_items);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentItems, to the designated peer.
/// Does not store the content locally.
/// Returns trace info from the offer.
async fn trace_offer(
Expand Down
17 changes: 7 additions & 10 deletions trin-beacon/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ async fn complete_request(network: Arc<BeaconNetwork>, request: BeaconJsonRpcReq
}
BeaconEndpoint::LightClientStore => light_client_store(&network).await,
BeaconEndpoint::LookupEnr(node_id) => lookup_enr(network, node_id).await,
BeaconEndpoint::Offer(enr, content_key, content_value) => {
offer(network, enr, content_key, content_value).await
}
BeaconEndpoint::Offer(enr, content_items) => offer(network, enr, content_items).await,
BeaconEndpoint::WireOffer(enr, content_keys) => {
wire_offer(network, enr, content_keys).await
}
Expand Down Expand Up @@ -363,14 +361,13 @@ async fn gossip(
async fn offer(
network: Arc<BeaconNetwork>,
enr: discv5::enr::Enr<discv5::enr::CombinedKey>,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
content_items: Vec<(BeaconContentKey, BeaconContentValue)>,
) -> Result<Value, String> {
match network
.overlay
.send_offer(enr, content_key.to_bytes(), content_value.encode().to_vec())
.await
{
let content_items = content_items
.into_iter()
.map(|(key, value)| (key.to_bytes(), value.encode().to_vec()))
.collect();
match network.overlay.send_offer(enr, content_items).await {
Ok(accept) => Ok(json!(AcceptInfo {
content_keys: accept.content_keys,
})),
Expand Down
Loading
Loading