Skip to content

Commit

Permalink
refactor: remove unmaintained surf for reqwest (#1448)
Browse files Browse the repository at this point in the history
  • Loading branch information
KolbyML authored Sep 16, 2024
1 parent 3b4576b commit 89ff040
Show file tree
Hide file tree
Showing 20 changed files with 674 additions and 1,169 deletions.
1,362 changes: 412 additions & 950 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ portalnet.workspace = true
prometheus_exporter.workspace = true
rand.workspace = true
regex = "1.10.2"
reqwest.workspace = true
reth-ipc.workspace = true
rpc.workspace = true
serde_json = { workspace = true, features = ["preserve_order"]}
sha3.workspace = true
surf.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down Expand Up @@ -111,6 +111,7 @@ quickcheck = "1.0.3"
r2d2 = "0.8.9"
r2d2_sqlite = "0.24.0"
rand = "0.8.5"
reqwest = { version = "0.12.7", default-features = false, features = ["rustls-tls", "json"] }
reth-ipc = { tag = "v0.2.0-beta.5", git = "https://github.com/paradigmxyz/reth.git"}
reth-rpc-types = { tag = "v1.0.6", git = "https://github.com/paradigmxyz/reth.git"}
revm = { version = "14.0.1", default-features = false, features = ["std", "secp256k1", "serde-json"] }
Expand All @@ -128,7 +129,6 @@ sha3 = "0.9.1"
snap = "1.1.1"
ssz_types = "0.8.0"
strum = { version = "0.26.1", features = ["derive"] }
surf = { version = "2.3.2", default-features = false, features = ["h1-client-rustls", "middleware-logger", "encoding"] } # we use rustls because OpenSSL cause issues compiling on aarch64
tempfile = "3.3.0"
test-log = { version = "0.2.11", features = ["trace"] }
thiserror = "1.0.57"
Expand Down
2 changes: 1 addition & 1 deletion e2store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ ethereum_ssz.workspace = true
ethereum_ssz_derive.workspace = true
ethportal-api.workspace = true
rand.workspace = true
reqwest.workspace = true
scraper.workspace = true
snap.workspace = true
surf.workspace = true

[dev-dependencies]
rstest.workspace = true
Expand Down
10 changes: 3 additions & 7 deletions e2store/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::HashMap, io};

use anyhow::{anyhow, ensure, Error};
use anyhow::{ensure, Error};
use rand::{seq::SliceRandom, thread_rng};
use reqwest::Client;
use scraper::{Html, Selector};
use surf::Client;

const ERA_DIR_URL: &str = "https://mainnet.era.nimbus.team/";
const ERA1_DIR_URL: &str = "https://era1.ethportal.net/";
Expand All @@ -22,11 +22,7 @@ pub async fn download_era_links(
http_client: &Client,
url: &str,
) -> anyhow::Result<HashMap<u64, String>> {
let index_html = http_client
.get(url)
.recv_string()
.await
.map_err(|e| anyhow!("{e}"))?;
let index_html = http_client.get(url).send().await?.text().await?;
let index_html = Html::parse_document(&index_html);
let selector = Selector::parse("a[href*='mainnet-']").expect("to be able to parse selector");
let era_files: HashMap<u64, String> = index_html
Expand Down
6 changes: 2 additions & 4 deletions ethportal-api/src/types/execution/block_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,7 @@ impl ssz::Decode for BlockBodyLegacy {
))
})?;
let uncles: Vec<Header> = Decodable::decode(&mut uncles.as_slice()).map_err(|e| {
ssz::DecodeError::BytesInvalid(
format!("Legacy block body contains invalid txs: {e:?}",),
)
ssz::DecodeError::BytesInvalid(format!("Legacy block body contains invalid txs: {e:?}"))
})?;
Ok(Self { txs, uncles })
}
Expand Down Expand Up @@ -350,7 +348,7 @@ impl ssz::Decode for BlockBodyMerge {
))
})?;
let uncles: Vec<Header> = Decodable::decode(&mut uncles.as_slice()).map_err(|e| {
ssz::DecodeError::BytesInvalid(format!("Merge block body contains invalid txs: {e:?}",))
ssz::DecodeError::BytesInvalid(format!("Merge block body contains invalid txs: {e:?}"))
})?;
if !uncles.is_empty() {
return Err(ssz::DecodeError::BytesInvalid(
Expand Down
1 change: 0 additions & 1 deletion ethportal-peertest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ rpc.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
surf.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jsonrpsee = { workspace = true, features = ["full"] }
log = "0.4.17"
milagro_bls = { package="snowbridge-milagro-bls", git = "https://github.com/Snowfork/milagro_bls" }
portalnet.workspace = true
reqwest = { version = "0.11.13", default-features = false, features = ["json", "rustls-tls"] }
reqwest.workspace = true
serde.workspace = true
serde-this-or-that.workspace = true
serde_json.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion portal-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ jsonrpsee = { workspace = true, features = [
portalnet.workspace = true
prometheus_exporter.workspace = true
rand.workspace = true
reqwest.workspace = true
reqwest-middleware = { version = "0.3.3", features = ["json"] }
reqwest-retry = "0.6.1"
revm.workspace = true
revm-primitives.workspace = true
scraper.workspace = true
serde = { workspace = true, features = ["rc"] }
serde_json.workspace = true
serde_yaml.workspace = true
ssz_types.workspace = true
surf.workspace = true
tokio.workspace = true
tracing.workspace = true
trin-execution.workspace = true
Expand Down
29 changes: 18 additions & 11 deletions portal-bridge/src/api/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::fmt::Display;

use anyhow::anyhow;
use surf::Client;
use tokio::time::sleep;
use tracing::{debug, warn};
use url::Url;

use crate::{cli::url_to_client, constants::FALLBACK_RETRY_AFTER};
use crate::{
cli::{url_to_client, ClientWithBaseUrl},
constants::FALLBACK_RETRY_AFTER,
};

/// Implements endpoints from the Beacon API to access data from the consensus layer.
#[derive(Clone, Debug)]
Expand All @@ -16,10 +18,9 @@ pub struct ConsensusApi {
}

impl ConsensusApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, surf::Error> {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
debug!(
"Starting ConsensusApi with primary provider: {} and fallback provider: {}",
primary, fallback
"Starting ConsensusApi with primary provider: {primary} and fallback provider: {fallback}",
);
let client = url_to_client(primary.clone()).map_err(|err| {
anyhow!("Unable to create primary client for consensus data provider: {err:?}")
Expand Down Expand Up @@ -90,26 +91,32 @@ impl ConsensusApi {
let client = url_to_client(self.primary.clone()).map_err(|err| {
anyhow!("Unable to create client for primary consensus data provider: {err:?}")
})?;
match client.get(&endpoint).recv_string().await {
match client.get(&endpoint)?.send().await?.text().await {
Ok(response) => Ok(response),
Err(err) => {
warn!("Error requesting consensus data from provider, retrying with fallback provider: {err:?}");
sleep(FALLBACK_RETRY_AFTER).await;
let client = url_to_client(self.fallback.clone()).map_err(|err| {
anyhow!("Unable to create client for fallback consensus data provider: {err:?}")
})?;
client.get(endpoint).recv_string().await.map_err(|err| {
anyhow!("Unable to request consensus data from fallback provider: {err:?}")
})
client
.get(endpoint)?
.send()
.await?
.text()
.await
.map_err(|err| {
anyhow!("Unable to request consensus data from fallback provider: {err:?}")
})
}
}
}
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &Client) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> {
let endpoint = "/eth/v1/node/version".to_string();
match client.get(endpoint).recv_string().await {
match client.get(endpoint)?.send().await?.text().await {
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(
"Unable to request consensus data from provider: {err:?}"
Expand Down
105 changes: 60 additions & 45 deletions portal-bridge/src/api/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@ use std::sync::Arc;

use alloy_primitives::B256;
use anyhow::{anyhow, bail};
use futures::future::join_all;
use serde_json::{json, Value};
use surf::Client;
use tokio::time::sleep;
use tracing::{debug, error, warn};
use url::Url;

use crate::{
cli::url_to_client,
constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER},
types::full_header::FullHeader,
};
use ethportal_api::{
types::{
execution::{
Expand All @@ -32,10 +20,21 @@ use ethportal_api::{
BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, Header, HistoryContentKey, HistoryContentValue,
Receipts,
};
use futures::future::join_all;
use serde_json::{json, Value};
use tokio::time::sleep;
use tracing::{debug, error, warn};
use trin_validation::{
accumulator::PreMergeAccumulator, constants::MERGE_BLOCK_NUMBER,
header_validator::HeaderValidator,
};
use url::Url;

use crate::{
cli::{url_to_client, ClientWithBaseUrl},
constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER},
types::full_header::FullHeader,
};

/// Limit the number of requests in a single batch to avoid exceeding the
/// provider's batch size limit configuration of 100.
Expand All @@ -51,7 +50,7 @@ pub struct ExecutionApi {
}

impl ExecutionApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, surf::Error> {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
// Only check that provider is connected & available if not using a test provider.
debug!(
"Starting ExecutionApi with primary provider: {primary} and fallback provider: {fallback}",
Expand Down Expand Up @@ -273,9 +272,6 @@ impl ExecutionApi {
Ok(header.number)
}

/// Used the "surf" library here instead of "ureq" since "surf" is much more capable of handling
/// multiple async requests. Using "ureq" consistently resulted in errors as soon as the number
/// of concurrent tasks increased significantly.
async fn batch_requests(&self, obj: Vec<JsonRequest>) -> anyhow::Result<String> {
let batched_request_futures = obj
.chunks(BATCH_LIMIT)
Expand Down Expand Up @@ -320,20 +316,19 @@ impl ExecutionApi {
}

async fn send_batch_request(
client: &Client,
client: &ClientWithBaseUrl,
requests: &Vec<JsonRequest>,
) -> anyhow::Result<Vec<Value>> {
let result = client
.post("")
.body_json(&json!(requests))
.map_err(|e| anyhow!("Unable to construct json post for batched requests: {e:?}"))?;
let response = result
.recv_string()
let response = client
.post("")?
.json(&requests)
.send()
.await
.map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?;
serde_json::from_str::<Vec<Value>>(&response).map_err(|err| {
anyhow!("Unable to parse execution batch from provider: {err:?} response: {response:?}")
})
response
.json::<Vec<Value>>()
.await
.map_err(|err| anyhow!("Unable to parse execution batch from provider: {err:?}"))
}

async fn try_request(&self, request: JsonRequest) -> anyhow::Result<Value> {
Expand All @@ -355,19 +350,27 @@ impl ExecutionApi {
}
}

async fn send_request(client: &Client, request: &JsonRequest) -> anyhow::Result<Value> {
let result = client
.post("")
.body_json(&request)
.map_err(|e| anyhow!("Unable to construct json post for single request: {e:?}"))?;
let response = result
.recv_string()
async fn send_request(
client: &ClientWithBaseUrl,
request: &JsonRequest,
) -> anyhow::Result<Value> {
let request = client
.post("")?
.json(&request)
.build()
.map_err(|e| anyhow!("Unable to construct JSON POST for single request: {e:?}"))?;
let response = client
.execute(request)
.await
.map_err(|err| anyhow!("Unable to request execution payload from provider: {err:?}"))?;
serde_json::from_str::<Value>(&response).map_err(|err| {
let response_text = response
.text()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
serde_json::from_str::<Value>(&response_text).map_err(|err| {
anyhow!(
"Unable to parse execution response from provider: {err:?} response: {response:?}"
)
"Unable to parse execution response from provider: {err:?} response: {response_text:?}",
)
})
}
}
Expand All @@ -383,19 +386,31 @@ pub async fn construct_proof(
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &Client) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> {
let request = client
.post("")
.body_json(
&json!({"jsonrpc": "2.0", "method": "web3_clientVersion", "params": [], "id": 1}),
)
.post("")?
.json(&json!({
"jsonrpc": "2.0",
"method": "web3_clientVersion",
"params": [],
"id": 1,
}))
.build()
.map_err(|e| anyhow!("Unable to construct json post request: {e:?}"))?;
let response = request
.recv_string()
let response = client
.execute(request)
.await
.map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?;
let response: Value = serde_json::from_str(&response).map_err(|err| anyhow!("Unable to parse json response from execution provider, it's likely unavailable/configured incorrectly: {err:?} response: {response:?}"))?;
if response["result"].as_str().is_none() {
let response_text = response
.text()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
let response_json: Value = serde_json::from_str(&response_text).map_err(|err| {
anyhow!(
"Unable to parse json response from execution provider, it's likely unavailable/configured incorrectly: {err:?} response: {response_text:?}",
)
})?;
if response_json["result"].as_str().is_none() {
bail!("Invalid response from execution provider check, it's likely unavailable/configured incorrectly");
}
Ok(())
Expand Down
Loading

0 comments on commit 89ff040

Please sign in to comment.