Skip to content

Commit

Permalink
refactor: remove unmaintained surf for reqwest
Browse files Browse the repository at this point in the history
  • Loading branch information
KolbyML committed Sep 13, 2024
1 parent e58bcb4 commit 6613627
Show file tree
Hide file tree
Showing 19 changed files with 631 additions and 1,157 deletions.
1,362 changes: 412 additions & 950 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ portalnet.workspace = true
prometheus_exporter.workspace = true
rand.workspace = true
regex = "1.10.2"
reqwest.workspace = true
reth-ipc.workspace = true
rpc.workspace = true
serde_json = { workspace = true, features = ["preserve_order"]}
sha3.workspace = true
surf.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down Expand Up @@ -111,6 +111,7 @@ quickcheck = "1.0.3"
r2d2 = "0.8.9"
r2d2_sqlite = "0.24.0"
rand = "0.8.5"
reqwest = { version = "0.12.7", default-features = false, features = ["rustls-tls", "json"] }
reth-ipc = { tag = "v0.2.0-beta.5", git = "https://github.com/paradigmxyz/reth.git"}
reth-rpc-types = { tag = "v1.0.6", git = "https://github.com/paradigmxyz/reth.git"}
revm = { version = "14.0.1", features = ["std", "secp256k1", "serde-json"], default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion e2store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ ethereum_ssz.workspace = true
ethereum_ssz_derive.workspace = true
ethportal-api.workspace = true
rand.workspace = true
reqwest.workspace = true
scraper.workspace = true
snap.workspace = true
surf.workspace = true

[dev-dependencies]
rstest.workspace = true
Expand Down
10 changes: 3 additions & 7 deletions e2store/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::HashMap, io};

use anyhow::{anyhow, ensure, Error};
use anyhow::{ensure, Error};
use rand::{seq::SliceRandom, thread_rng};
use reqwest::Client;
use scraper::{Html, Selector};
use surf::Client;

const ERA_DIR_URL: &str = "https://mainnet.era.nimbus.team/";
const ERA1_DIR_URL: &str = "https://era1.ethportal.net/";
Expand All @@ -22,11 +22,7 @@ pub async fn download_era_links(
http_client: &Client,
url: &str,
) -> anyhow::Result<HashMap<u64, String>> {
let index_html = http_client
.get(url)
.recv_string()
.await
.map_err(|e| anyhow!("{e}"))?;
let index_html = http_client.get(url).send().await?.text().await?;
let index_html = Html::parse_document(&index_html);
let selector = Selector::parse("a[href*='mainnet-']").expect("to be able to parse selector");
let era_files: HashMap<u64, String> = index_html
Expand Down
6 changes: 2 additions & 4 deletions ethportal-api/src/types/execution/block_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,7 @@ impl ssz::Decode for BlockBodyLegacy {
))
})?;
let uncles: Vec<Header> = Decodable::decode(&mut uncles.as_slice()).map_err(|e| {
ssz::DecodeError::BytesInvalid(
format!("Legacy block body contains invalid txs: {e:?}",),
)
ssz::DecodeError::BytesInvalid(format!("Legacy block body contains invalid txs: {e:?}"))
})?;
Ok(Self { txs, uncles })
}
Expand Down Expand Up @@ -350,7 +348,7 @@ impl ssz::Decode for BlockBodyMerge {
))
})?;
let uncles: Vec<Header> = Decodable::decode(&mut uncles.as_slice()).map_err(|e| {
ssz::DecodeError::BytesInvalid(format!("Merge block body contains invalid txs: {e:?}",))
ssz::DecodeError::BytesInvalid(format!("Merge block body contains invalid txs: {e:?}"))
})?;
if !uncles.is_empty() {
return Err(ssz::DecodeError::BytesInvalid(
Expand Down
1 change: 0 additions & 1 deletion ethportal-peertest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ rpc.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
surf.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion portal-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ jsonrpsee = { workspace = true, features = [
portalnet.workspace = true
prometheus_exporter.workspace = true
rand.workspace = true
reqwest.workspace = true
reqwest-middleware = { version = "0.3.3", features = ["json"] }
reqwest-retry = "0.6.1"
revm.workspace = true
revm-primitives.workspace = true
scraper.workspace = true
serde = { workspace = true, features = ["rc"] }
serde_json.workspace = true
serde_yaml.workspace = true
ssz_types.workspace = true
surf.workspace = true
tokio.workspace = true
tracing.workspace = true
trin-execution.workspace = true
Expand Down
25 changes: 15 additions & 10 deletions portal-bridge/src/api/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Display;

use anyhow::anyhow;
use surf::Client;
use reqwest_middleware::ClientWithMiddleware;
use tokio::time::sleep;
use tracing::{debug, warn};
use url::Url;
Expand All @@ -16,10 +16,9 @@ pub struct ConsensusApi {
}

impl ConsensusApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, surf::Error> {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
debug!(
"Starting ConsensusApi with primary provider: {} and fallback provider: {}",
primary, fallback
"Starting ConsensusApi with primary provider: {primary} and fallback provider: {fallback}",
);
let client = url_to_client(primary.clone()).map_err(|err| {
anyhow!("Unable to create primary client for consensus data provider: {err:?}")
Expand Down Expand Up @@ -90,26 +89,32 @@ impl ConsensusApi {
let client = url_to_client(self.primary.clone()).map_err(|err| {
anyhow!("Unable to create client for primary consensus data provider: {err:?}")
})?;
match client.get(&endpoint).recv_string().await {
match client.get(&endpoint).send().await?.text().await {
Ok(response) => Ok(response),
Err(err) => {
warn!("Error requesting consensus data from provider, retrying with fallback provider: {err:?}");
sleep(FALLBACK_RETRY_AFTER).await;
let client = url_to_client(self.fallback.clone()).map_err(|err| {
anyhow!("Unable to create client for fallback consensus data provider: {err:?}")
})?;
client.get(endpoint).recv_string().await.map_err(|err| {
anyhow!("Unable to request consensus data from fallback provider: {err:?}")
})
client
.get(endpoint)
.send()
.await?
.text()
.await
.map_err(|err| {
anyhow!("Unable to request consensus data from fallback provider: {err:?}")
})
}
}
}
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &Client) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithMiddleware) -> anyhow::Result<()> {
let endpoint = "/eth/v1/node/version".to_string();
match client.get(endpoint).recv_string().await {
match client.get(endpoint).send().await?.text().await {
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(
"Unable to request consensus data from provider: {err:?}"
Expand Down
102 changes: 63 additions & 39 deletions portal-bridge/src/api/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@ use std::sync::Arc;

use alloy_primitives::B256;
use anyhow::{anyhow, bail};
use futures::future::join_all;
use serde_json::{json, Value};
use surf::Client;
use tokio::time::sleep;
use tracing::{debug, error, warn};
use url::Url;

use crate::{
cli::url_to_client,
constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER},
types::full_header::FullHeader,
};
use ethportal_api::{
types::{
execution::{
Expand All @@ -32,10 +20,22 @@ use ethportal_api::{
BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, Header, HistoryContentKey, HistoryContentValue,
Receipts,
};
use futures::future::join_all;
use reqwest_middleware::ClientWithMiddleware;
use serde_json::{json, Value};
use tokio::time::sleep;
use tracing::{debug, error, warn};
use trin_validation::{
accumulator::PreMergeAccumulator, constants::MERGE_BLOCK_NUMBER,
header_validator::HeaderValidator,
};
use url::Url;

use crate::{
cli::url_to_client,
constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER},
types::full_header::FullHeader,
};

/// Limit the number of requests in a single batch to avoid exceeding the
/// provider's batch size limit configuration of 100.
Expand All @@ -51,7 +51,7 @@ pub struct ExecutionApi {
}

impl ExecutionApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, surf::Error> {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
// Only check that provider is connected & available if not using a test provider.
debug!(
"Starting ExecutionApi with primary provider: {primary} and fallback provider: {fallback}",
Expand Down Expand Up @@ -320,19 +320,23 @@ impl ExecutionApi {
}

async fn send_batch_request(
client: &Client,
client: &ClientWithMiddleware,
requests: &Vec<JsonRequest>,
) -> anyhow::Result<Vec<Value>> {
let result = client
.post("")
.body_json(&json!(requests))
.map_err(|e| anyhow!("Unable to construct json post for batched requests: {e:?}"))?;
let response = result
.recv_string()
let request =
client.post("").json(&requests).build().map_err(|e| {
anyhow!("Unable to construct JSON POST for batched requests: {e:?}")
})?;
let response = client
.execute(request)
.await
.map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?;
serde_json::from_str::<Vec<Value>>(&response).map_err(|err| {
anyhow!("Unable to parse execution batch from provider: {err:?} response: {response:?}")
let response_text = response
.text()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
serde_json::from_str::<Vec<Value>>(&response_text).map_err(|err| {
anyhow!("Unable to parse execution batch from provider: {err:?} response: {response_text:?}")
})
}

Expand All @@ -355,19 +359,27 @@ impl ExecutionApi {
}
}

async fn send_request(client: &Client, request: &JsonRequest) -> anyhow::Result<Value> {
let result = client
async fn send_request(
client: &ClientWithMiddleware,
request: &JsonRequest,
) -> anyhow::Result<Value> {
let request = client
.post("")
.body_json(&request)
.map_err(|e| anyhow!("Unable to construct json post for single request: {e:?}"))?;
let response = result
.recv_string()
.json(&request)
.build()
.map_err(|e| anyhow!("Unable to construct JSON POST for single request: {e:?}"))?;
let response = client
.execute(request)
.await
.map_err(|err| anyhow!("Unable to request execution payload from provider: {err:?}"))?;
serde_json::from_str::<Value>(&response).map_err(|err| {
let response_text = response
.text()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
serde_json::from_str::<Value>(&response_text).map_err(|err| {
anyhow!(
"Unable to parse execution response from provider: {err:?} response: {response:?}"
)
"Unable to parse execution response from provider: {err:?} response: {response_text:?}",
)
})
}
}
Expand All @@ -383,19 +395,31 @@ pub async fn construct_proof(
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &Client) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithMiddleware) -> anyhow::Result<()> {
let request = client
.post("")
.body_json(
&json!({"jsonrpc": "2.0", "method": "web3_clientVersion", "params": [], "id": 1}),
)
.json(&json!({
"jsonrpc": "2.0",
"method": "web3_clientVersion",
"params": [],
"id": 1,
}))
.build()
.map_err(|e| anyhow!("Unable to construct json post request: {e:?}"))?;
let response = request
.recv_string()
let response = client
.execute(request)
.await
.map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?;
let response: Value = serde_json::from_str(&response).map_err(|err| anyhow!("Unable to parse json response from execution provider, it's likely unavailable/configured incorrectly: {err:?} response: {response:?}"))?;
if response["result"].as_str().is_none() {
let response_text = response
.text()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
let response_json: Value = serde_json::from_str(&response_text).map_err(|err| {
anyhow!(
"Unable to parse json response from execution provider, it's likely unavailable/configured incorrectly: {err:?} response: {response_text:?}",
)
})?;
if response_json["result"].as_str().is_none() {
bail!("Invalid response from execution provider check, it's likely unavailable/configured incorrectly");
}
Ok(())
Expand Down
19 changes: 12 additions & 7 deletions portal-bridge/src/bridge/era1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use e2store::{
};
use futures::future::join_all;
use rand::{seq::SliceRandom, thread_rng};
use surf::{Client, Config};
use reqwest::{
header::{HeaderMap, HeaderValue, CONTENT_TYPE},
Client,
};
use tokio::{
sync::{OwnedSemaphorePermit, Semaphore},
task::JoinHandle,
Expand Down Expand Up @@ -65,10 +68,9 @@ impl Era1Bridge {
gossip_limit: usize,
execution_api: ExecutionApi,
) -> anyhow::Result<Self> {
let http_client: Client = Config::new()
.add_header("Content-Type", "application/xml")
.expect("to be able to add header")
.try_into()?;
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/xml"));
let http_client: Client = Client::builder().default_headers(headers).build()?;
let era1_files = get_shuffled_era1_files(&http_client).await?;
let metrics = BridgeMetricsReporter::new("era1".to_string(), &format!("{mode:?}"));
let gossip_semaphore = Arc::new(Semaphore::new(gossip_limit));
Expand Down Expand Up @@ -235,7 +237,10 @@ impl Era1Bridge {
let raw_era1 = self
.http_client
.get(era1_path.clone())
.recv_bytes()
.send()
.await
.expect("to be able to send request")
.bytes()
.await
.unwrap_or_else(|err| {
panic!("unable to read era1 file at path: {era1_path:?} : {err}")
Expand All @@ -256,7 +261,7 @@ impl Era1Bridge {
let header_validator = Arc::new(self.header_oracle.header_validator.clone());
info!("Era1 file read successfully, gossiping block tuples for epoch: {epoch_index}");
let mut serve_block_tuple_handles = vec![];
for block_tuple in Era1::iter_tuples(raw_era1) {
for block_tuple in Era1::iter_tuples(raw_era1.to_vec()) {
let block_number = block_tuple.header.header.number;
if let Some(range) = gossip_range.clone() {
if !range.contains(&block_number) {
Expand Down
Loading

0 comments on commit 6613627

Please sign in to comment.