Skip to content

Commit

Permalink
feat: add jsonrpc endpoint for offer with trace (#1394)
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita authored Aug 23, 2024
1 parent d2f974f commit 3fc0ed8
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 3 deletions.
12 changes: 12 additions & 0 deletions ethportal-api/src/beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ pub trait BeaconNetworkApi {
content_value: BeaconContentValue,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Does not store the content locally.
/// Returns true if the content was accepted and successfully transferred,
/// returns false if the content was not accepted or the transfer failed.
#[method(name = "beaconTraceOffer")]
async fn trace_offer(
&self,
enr: Enr,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
) -> RpcResult<bool>;

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
Expand Down
12 changes: 12 additions & 0 deletions ethportal-api/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ pub trait HistoryNetworkApi {
content_value: HistoryContentValue,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Does not store the content locally.
/// Returns true if the content was accepted and successfully transferred,
/// returns false if the content was not accepted or the transfer failed.
#[method(name = "historyTraceOffer")]
async fn trace_offer(
&self,
enr: Enr,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
) -> RpcResult<bool>;

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
Expand Down
12 changes: 12 additions & 0 deletions ethportal-api/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ pub trait StateNetworkApi {
content_value: StateContentValue,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Does not store the content locally.
/// Returns true if the content was accepted and successfully transferred,
/// returns false if the content was not accepted or the transfer failed.
#[method(name = "stateTraceOffer")]
async fn trace_offer(
&self,
enr: Enr,
content_key: StateContentKey,
content_value: StateContentValue,
) -> RpcResult<bool>;

/// Store content key with a content data to the local database.
#[method(name = "stateStore")]
async fn store(
Expand Down
6 changes: 6 additions & 0 deletions ethportal-api/src/types/jsonrpc/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub enum StateEndpoint {
/// params: [enr, content_key, content_value]
Offer(Enr, StateContentKey, StateContentValue),
/// params: [enr, content_key, content_value]
TraceOffer(Enr, StateContentKey, StateContentValue),
/// params: [enr, content_key, content_value]
Gossip(StateContentKey, StateContentValue),
/// params: [content_key, content_value]
TraceGossip(StateContentKey, StateContentValue),
Expand Down Expand Up @@ -79,6 +81,8 @@ pub enum HistoryEndpoint {
TraceGossip(HistoryContentKey, HistoryContentValue),
/// params: [enr, content_key, content_value]
Offer(Enr, HistoryContentKey, HistoryContentValue),
/// params: [enr, content_key, content_value]
TraceOffer(Enr, HistoryContentKey, HistoryContentValue),
/// params: [enr, [content_key]]
WireOffer(Enr, Vec<HistoryContentKey>),
/// params: [enr]
Expand Down Expand Up @@ -127,6 +131,8 @@ pub enum BeaconEndpoint {
TraceGossip(BeaconContentKey, BeaconContentValue),
/// params: [enr, content_key, content_value]
Offer(Enr, BeaconContentKey, BeaconContentValue),
/// params: [enr, content_key, content_value]
TraceOffer(Enr, BeaconContentKey, BeaconContentValue),
/// params: [enr, [content_key]]
WireOffer(Enr, Vec<BeaconContentKey>),
/// params: enr
Expand Down
34 changes: 34 additions & 0 deletions ethportal-peertest/src/scenarios/offer_accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,40 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) {
);
}

pub async fn test_populated_offer_with_trace(peertest: &Peertest, target: &Client) {
info!("Testing Populated Offer/ACCEPT flow with trace");

// store header for validation
let (content_key, content_value) = fixture_header_with_proof();
let store_result = peertest
.bootnode
.ipc_client
.store(content_key.clone(), content_value.clone())
.await
.unwrap();
assert!(store_result);

// use block body to test transfer of large content over utp
let (content_key, content_value) = fixture_block_body();
let result = target
.trace_offer(
Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(),
content_key.clone(),
content_value.clone(),
)
.await
.unwrap();

// check that the result of the offer is true for a valid transfer
assert!(result);

// Check if the stored content value in bootnode's DB matches the offered
assert_eq!(
content_value,
wait_for_history_content(&peertest.bootnode.ipc_client, content_key).await,
);
}

pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client) {
info!("Testing populated offer propagates gossip");

Expand Down
39 changes: 38 additions & 1 deletion portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use ethportal_api::{
enr::Enr,
portal_wire::{
Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Ping, Pong,
PopulatedOffer, ProtocolId, Request, Response,
PopulatedOffer, PopulatedOfferWithResult, ProtocolId, Request, Response,
},
},
utils::bytes::hex_encode,
Expand Down Expand Up @@ -576,6 +576,43 @@ where
}
}

/// Send Offer request with trace, without storing the content into db
pub async fn send_offer_trace(
&self,
enr: Enr,
content_key: RawContentKey,
content_value: Vec<u8>,
) -> Result<bool, OverlayRequestError> {
// Construct the request.
let (result_tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let request = Request::PopulatedOfferWithResult(PopulatedOfferWithResult {
content_item: (content_key, content_value),
result_tx,
});

let direction = RequestDirection::Outgoing {
destination: enr.clone(),
};

// Send the offer request and wait on the response.
// Ignore the accept message, since we only care about the trace.
self.send_overlay_request(request, direction).await?;

// Wait for the trace response.
match rx.recv().await {
Some(accept) => Ok(accept),
None => {
warn!(
protocol = %self.protocol,
"Error receiving TraceOffer query response"
);
Err(OverlayRequestError::ChannelFailure(
"Error receiving TraceOffer query response".to_string(),
))
}
}
}

pub async fn lookup_node(&self, target: NodeId) -> Vec<Enr> {
if target == self.local_enr().node_id() {
return vec![self.local_enr()];
Expand Down
14 changes: 14 additions & 0 deletions rpc/src/beacon_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ impl BeaconNetworkApiServer for BeaconNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Does not store the content locally.
/// Returns true if the content was accepted and successfully transferred,
/// returns false if the content was not accepted or the transfer failed.
async fn trace_offer(
&self,
enr: Enr,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
) -> RpcResult<bool> {
let endpoint = BeaconEndpoint::TraceOffer(enr, content_key, content_value);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
Expand Down
14 changes: 14 additions & 0 deletions rpc/src/history_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Does not store the content locally.
/// Returns true if the content was accepted and successfully transferred,
/// returns false if the content was not accepted or the transfer failed.
async fn trace_offer(
&self,
enr: Enr,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
) -> RpcResult<bool> {
let endpoint = HistoryEndpoint::TraceOffer(enr, content_key, content_value);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
Expand Down
14 changes: 14 additions & 0 deletions rpc/src/state_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ impl StateNetworkApiServer for StateNetworkApi {
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Send an OFFER request with given ContentKey, to the designated peer.
/// Does not store the content locally.
/// Returns true if the content was accepted and successfully transferred,
/// returns false if the content was not accepted or the transfer failed.
async fn trace_offer(
&self,
enr: Enr,
content_key: StateContentKey,
content_value: StateContentValue,
) -> RpcResult<bool> {
let endpoint = StateEndpoint::TraceOffer(enr, content_key, content_value);
Ok(proxy_to_subnet(&self.network, endpoint).await?)
}

/// Store content key with a content data to the local database.
async fn store(
&self,
Expand Down
9 changes: 9 additions & 0 deletions tests/self_peertest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ async fn peertest_populated_offer() {
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_populated_offer_with_trace() {
let (peertest, target, handle) = setup_peertest("mainnet", &[HISTORY_NETWORK]).await;
peertest::scenarios::offer_accept::test_populated_offer_with_trace(&peertest, &target).await;
peertest.exit_all_nodes();
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_unpopulated_offer() {
Expand Down
20 changes: 20 additions & 0 deletions trin-beacon/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ async fn complete_request(network: Arc<BeaconNetwork>, request: BeaconJsonRpcReq
BeaconEndpoint::WireOffer(enr, content_keys) => {
wire_offer(network, enr, content_keys).await
}
BeaconEndpoint::TraceOffer(enr, content_key, content_value) => {
trace_offer(network, enr, content_key, content_value).await
}
BeaconEndpoint::Ping(enr) => ping(network, enr).await,
BeaconEndpoint::RoutingTableInfo => {
serde_json::to_value(network.overlay.routing_table_info())
Expand Down Expand Up @@ -361,6 +364,23 @@ async fn offer(
}
}

/// Constructs a JSON call for the Offer method with trace.
async fn trace_offer(
network: Arc<BeaconNetwork>,
enr: discv5::enr::Enr<discv5::enr::CombinedKey>,
content_key: BeaconContentKey,
content_value: BeaconContentValue,
) -> Result<Value, String> {
match network
.overlay
.send_offer_trace(enr, content_key.into(), content_value.encode())
.await
{
Ok(accept) => Ok(json!(accept)),
Err(msg) => Err(format!("Offer request timeout: {msg:?}")),
}
}

/// Constructs a JSON call for the WireOffer method.
async fn wire_offer(
network: Arc<BeaconNetwork>,
Expand Down
24 changes: 22 additions & 2 deletions trin-history/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ async fn complete_request(network: Arc<HistoryNetwork>, request: HistoryJsonRpcR
gossip(network, content_key, content_value).await
}
HistoryEndpoint::TraceGossip(content_key, content_value) => {
gossip_trace(network, content_key, content_value).await
trace_gossip(network, content_key, content_value).await
}
HistoryEndpoint::LookupEnr(node_id) => lookup_enr(network, node_id).await,
HistoryEndpoint::Offer(enr, content_key, content_value) => {
offer(network, enr, content_key, content_value).await
}
HistoryEndpoint::TraceOffer(enr, content_key, content_value) => {
trace_offer(network, enr, content_key, content_value).await
}
HistoryEndpoint::WireOffer(enr, content_keys) => {
wire_offer(network, enr, content_keys).await
}
Expand Down Expand Up @@ -314,7 +317,7 @@ async fn gossip(
}

/// Constructs a JSON call for the Gossip method, with tracing enabled.
async fn gossip_trace(
async fn trace_gossip(
network: Arc<HistoryNetwork>,
content_key: HistoryContentKey,
content_value: ethportal_api::HistoryContentValue,
Expand Down Expand Up @@ -347,6 +350,23 @@ async fn offer(
}
}

/// Constructs a JSON call for the Offer method with trace.
async fn trace_offer(
network: Arc<HistoryNetwork>,
enr: discv5::enr::Enr<discv5::enr::CombinedKey>,
content_key: HistoryContentKey,
content_value: HistoryContentValue,
) -> Result<Value, String> {
match network
.overlay
.send_offer_trace(enr, content_key.into(), content_value.encode())
.await
{
Ok(accept) => Ok(json!(accept)),
Err(msg) => Err(format!("Offer request timeout: {msg:?}")),
}
}

/// Constructs a JSON call for the WireOffer method.
async fn wire_offer(
network: Arc<HistoryNetwork>,
Expand Down
18 changes: 18 additions & 0 deletions trin-state/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ impl StateRequestHandler {
StateEndpoint::Offer(enr, content_key, content_value) => {
offer(network, enr, content_key, content_value).await
}
StateEndpoint::TraceOffer(enr, content_key, content_value) => {
trace_offer(network, enr, content_key, content_value).await
}
StateEndpoint::Gossip(content_key, content_value) => {
gossip(
network,
Expand Down Expand Up @@ -318,6 +321,21 @@ async fn offer(
)
}

async fn trace_offer(
network: Arc<StateNetwork>,
enr: Enr,
content_key: StateContentKey,
content_value: StateContentValue,
) -> Result<Value, String> {
to_json_result(
"TraceOffer",
network
.overlay
.send_offer_trace(enr, content_key.into(), content_value.encode())
.await,
)
}

async fn gossip(
network: Arc<StateNetwork>,
content_key: StateContentKey,
Expand Down

0 comments on commit 3fc0ed8

Please sign in to comment.