Skip to content

Commit

Permalink
feat(bridge): add CLI flag for total request timeout value (#1499)
Browse files Browse the repository at this point in the history
  • Loading branch information
ogenev authored Oct 2, 2024
1 parent 9787399 commit a935c86
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 30 deletions.
22 changes: 15 additions & 7 deletions ethportal-peertest/src/scenarios/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ethportal_api::{
use portal_bridge::{
api::{consensus::ConsensusApi, execution::ExecutionApi},
bridge::{beacon::BeaconBridge, history::HistoryBridge},
constants::DEFAULT_GOSSIP_LIMIT,
constants::{DEFAULT_GOSSIP_LIMIT, DEFAULT_TOTAL_REQUEST_TIMEOUT},
types::mode::BridgeMode,
};
use serde::Deserialize;
Expand All @@ -24,9 +24,13 @@ pub async fn test_history_bridge(peertest: &Peertest, portal_client: &HttpClient
let mode = BridgeMode::Test("./test_assets/portalnet/bridge_data.json".into());
// url doesn't matter, we're not making any requests
let client_url = Url::parse("http://www.null.com").unwrap();
let execution_api = ExecutionApi::new(client_url.clone(), client_url)
.await
.unwrap();
let execution_api = ExecutionApi::new(
client_url.clone(),
client_url,
DEFAULT_TOTAL_REQUEST_TIMEOUT,
)
.await
.unwrap();
// Wait for bootnode to start
sleep(Duration::from_secs(1)).await;
let bridge = HistoryBridge::new(
Expand Down Expand Up @@ -54,9 +58,13 @@ pub async fn test_beacon_bridge(peertest: &Peertest, portal_client: &HttpClient)
sleep(Duration::from_secs(1)).await;
// url doesn't matter, we're not making any requests
let client_url = Url::parse("http://www.null.com").unwrap();
let consensus_api = ConsensusApi::new(client_url.clone(), client_url)
.await
.unwrap();
let consensus_api = ConsensusApi::new(
client_url.clone(),
client_url,
DEFAULT_TOTAL_REQUEST_TIMEOUT,
)
.await
.unwrap();
let bridge = BeaconBridge::new(consensus_api, mode, portal_client.clone());
bridge.launch().await;

Expand Down
26 changes: 19 additions & 7 deletions portal-bridge/src/api/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,29 @@ use crate::{
pub struct ConsensusApi {
primary: Url,
fallback: Url,
request_timeout: u64,
}

impl ConsensusApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
pub async fn new(
primary: Url,
fallback: Url,
request_timeout: u64,
) -> Result<Self, reqwest_middleware::Error> {
debug!(
"Starting ConsensusApi with primary provider: {primary} and fallback provider: {fallback}",
);
let client = url_to_client(primary.clone()).map_err(|err| {
let client = url_to_client(primary.clone(), request_timeout).map_err(|err| {
anyhow!("Unable to create primary client for consensus data provider: {err:?}")
})?;
if let Err(err) = check_provider(&client).await {
warn!("Primary consensus data provider may be offline: {err:?}");
}
Ok(Self { primary, fallback })
Ok(Self {
primary,
fallback,
request_timeout,
})
}

/// Requests the `BeaconState` structure corresponding to the current head of the beacon chain.
Expand Down Expand Up @@ -88,17 +97,20 @@ impl ConsensusApi {

/// Make a request to the cl provider.
async fn request(&self, endpoint: String) -> anyhow::Result<String> {
let client = url_to_client(self.primary.clone()).map_err(|err| {
let client = url_to_client(self.primary.clone(), self.request_timeout).map_err(|err| {
anyhow!("Unable to create client for primary consensus data provider: {err:?}")
})?;
match client.get(&endpoint)?.send().await?.text().await {
Ok(response) => Ok(response),
Err(err) => {
warn!("Error requesting consensus data from provider, retrying with fallback provider: {err:?}");
sleep(FALLBACK_RETRY_AFTER).await;
let client = url_to_client(self.fallback.clone()).map_err(|err| {
anyhow!("Unable to create client for fallback consensus data provider: {err:?}")
})?;
let client =
url_to_client(self.fallback.clone(), self.request_timeout).map_err(|err| {
anyhow!(
"Unable to create client for fallback consensus data provider: {err:?}"
)
})?;
client
.get(endpoint)?
.send()
Expand Down
32 changes: 22 additions & 10 deletions portal-bridge/src/api/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ pub struct ExecutionApi {
pub primary: Url,
pub fallback: Url,
pub header_validator: HeaderValidator,
pub request_timeout: u64,
}

impl ExecutionApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
pub async fn new(
primary: Url,
fallback: Url,
request_timeout: u64,
) -> Result<Self, reqwest_middleware::Error> {
// Only check that provider is connected & available if not using a test provider.
debug!(
"Starting ExecutionApi with primary provider: {primary} and fallback provider: {fallback}",
);
let client = url_to_client(primary.clone()).map_err(|err| {
let client = url_to_client(primary.clone(), request_timeout).map_err(|err| {
anyhow!("Unable to create primary client for execution data provider: {err:?}")
})?;
if let Err(err) = check_provider(&client).await {
Expand All @@ -65,6 +70,7 @@ impl ExecutionApi {
primary,
fallback,
header_validator,
request_timeout,
})
}

Expand Down Expand Up @@ -303,17 +309,20 @@ impl ExecutionApi {
"Attempting to send requests outnumbering provider request limit of {BATCH_LIMIT}."
)
}
let client = url_to_client(self.primary.clone()).map_err(|err| {
let client = url_to_client(self.primary.clone(), self.request_timeout).map_err(|err| {
anyhow!("Unable to create primary client for execution data provider: {err:?}")
})?;
match Self::send_batch_request(&client, &requests).await {
Ok(response) => Ok(response),
Err(msg) => {
warn!("Failed to send batch request to primary provider: {msg}");
sleep(FALLBACK_RETRY_AFTER).await;
let client = url_to_client(self.fallback.clone()).map_err(|err| {
anyhow!("Unable to create fallback client for execution data provider: {err:?}")
})?;
let client =
url_to_client(self.fallback.clone(), self.request_timeout).map_err(|err| {
anyhow!(
"Unable to create fallback client for execution data provider: {err:?}"
)
})?;
Self::send_batch_request(&client, &requests)
.await
.map_err(|err| {
Expand All @@ -340,17 +349,20 @@ impl ExecutionApi {
}

async fn try_request(&self, request: JsonRequest) -> anyhow::Result<Value> {
let client = url_to_client(self.primary.clone()).map_err(|err| {
let client = url_to_client(self.primary.clone(), self.request_timeout).map_err(|err| {
anyhow!("Unable to create primary client for execution data provider: {err:?}")
})?;
match Self::send_request(&client, &request).await {
Ok(response) => Ok(response),
Err(msg) => {
warn!("Failed to send request to primary provider, retrying with fallback provider: {msg}");
sleep(FALLBACK_RETRY_AFTER).await;
let client = url_to_client(self.fallback.clone()).map_err(|err| {
anyhow!("Unable to create fallback client for execution data provider: {err:?}")
})?;
let client =
url_to_client(self.fallback.clone(), self.request_timeout).map_err(|err| {
anyhow!(
"Unable to create fallback client for execution data provider: {err:?}"
)
})?;
Self::send_request(&client, &request)
.await
.map_err(|err| anyhow!("Failed to send request to fallback provider: {err:?}"))
Expand Down
14 changes: 11 additions & 3 deletions portal-bridge/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use reqwest::{
};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, RequestBuilder};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::time::Duration;
use tracing::error;
use url::Url;

use crate::{
census::ENR_OFFER_LIMIT,
constants::{DEFAULT_GOSSIP_LIMIT, DEFAULT_OFFER_LIMIT, HTTP_REQUEST_TIMEOUT},
constants::{DEFAULT_GOSSIP_LIMIT, DEFAULT_OFFER_LIMIT, DEFAULT_TOTAL_REQUEST_TIMEOUT},
types::mode::BridgeMode,
DEFAULT_BASE_CL_ENDPOINT, DEFAULT_BASE_EL_ENDPOINT, FALLBACK_BASE_CL_ENDPOINT,
FALLBACK_BASE_EL_ENDPOINT,
Expand Down Expand Up @@ -173,6 +174,13 @@ pub struct BridgeConfig {
value_delimiter = ','
)]
pub filter_clients: Vec<ClientType>,

#[arg(
default_value_t = DEFAULT_TOTAL_REQUEST_TIMEOUT,
long = "request-timeout",
help = "The timeout in seconds is applied from when the request starts connecting until the response body has finished. Also considered a total deadline.",
)]
pub request_timeout: u64,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -222,7 +230,7 @@ impl From<&Enr> for ClientType {
}
}

pub fn url_to_client(url: Url) -> Result<ClientWithBaseUrl, String> {
pub fn url_to_client(url: Url, request_timeout: u64) -> Result<ClientWithBaseUrl, String> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

Expand Down Expand Up @@ -256,7 +264,7 @@ pub fn url_to_client(url: Url) -> Result<ClientWithBaseUrl, String> {
// Add retry middleware
let reqwest_client = Client::builder()
.default_headers(headers)
.timeout(HTTP_REQUEST_TIMEOUT)
.timeout(Duration::from_secs(request_timeout))
.build()
.map_err(|_| "Failed to build HTTP client")?;
let client = ClientBuilder::new(reqwest_client)
Expand Down
6 changes: 3 additions & 3 deletions portal-bridge/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pub const HEADER_WITH_PROOF_CONTENT_VALUE: &str =
// Beacon chain mainnet genesis time: Tue Dec 01 2020 12:00:23 GMT+0000
pub const BEACON_GENESIS_TIME: u64 = 1606824023;

// This is a very conservative timeout if a provider takes longer than even 1 second it is probably
// overloaded and not performing well
pub const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
/// The timeout in seconds is applied from when the request starts connecting until the response
/// body has finished. Also considered a total deadline.
pub const DEFAULT_TOTAL_REQUEST_TIMEOUT: u64 = 20;

// The maximum number of active blocks being gossiped. Note that this doesn't
// exactly mean the number of concurrent gossip jsonrpc requests, as the gossip
Expand Down
2 changes: 2 additions & 0 deletions portal-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let consensus_api = ConsensusApi::new(
bridge_config.cl_provider,
bridge_config.cl_provider_fallback,
bridge_config.request_timeout,
)
.await?;
let portal_client_clone = portal_client.clone();
Expand All @@ -110,6 +111,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let execution_api = ExecutionApi::new(
bridge_config.el_provider,
bridge_config.el_provider_fallback,
bridge_config.request_timeout,
)
.await?;
match bridge_config.mode {
Expand Down
1 change: 1 addition & 0 deletions src/bin/test_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub async fn main() -> Result<()> {
primary: client_url.clone(),
fallback: client_url,
header_validator: HeaderValidator::default(),
request_timeout: 20,
};
for gossip_range in all_ranges.iter_mut() {
debug!("Testing range: {gossip_range:?}");
Expand Down

0 comments on commit a935c86

Please sign in to comment.