From 8774a4154696f2816f6431efa3faecb979c9132c Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 21 Nov 2024 16:10:54 -0800 Subject: [PATCH] Improve handling of large responses in nasty client Some of the responses to queries like payload queries and range queries can be up to a few megabytes. This change prevents distracting spurious warnings and errors for such responses by * increasing the HTTP client timeout to allow plenty of time to stream the response over the network * only counting queries as slow if it takes longer than 1s to receive a response header -- the time it takes to stream the response does not count toward slow warnings, as this time is not taking computational resources on the server --- Cargo.lock | 1 + sequencer/Cargo.toml | 9 ++++ sequencer/src/bin/nasty-client.rs | 77 +++++++++++++++++++++++-------- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d6ccbb79..4294a866c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8660,6 +8660,7 @@ dependencies = [ "async-once-cell", "async-trait", "bincode", + "bytesize", "cdn-broker 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.5)", "cdn-marshal 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.5)", "clap", diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index eb1789656..73fc59ce0 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -16,11 +16,16 @@ testing = [ "hotshot-query-service/testing", ] benchmarking = [] +nasty-client = ["reqwest", "bytesize"] [[bin]] name = "espresso-dev-node" required-features = ["testing"] +[[bin]] +name = "nasty-client" +required-features = ["nasty-client"] + [dev-dependencies] escargot = "0.5.10" espresso-macros = { git = "https://github.com/EspressoSystems/espresso-macros.git", tag = "0.1.0" } @@ -125,6 +130,10 @@ url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } +# Dependencies for nasty-client +bytesize = { workspace = true, optional = true } +reqwest = { workspace = true, optional = true } + [package.metadata.cargo-udeps.ignore] normal = ["hotshot-testing"] diff --git a/sequencer/src/bin/nasty-client.rs b/sequencer/src/bin/nasty-client.rs index 7fff64412..f44bb8074 100644 --- a/sequencer/src/bin/nasty-client.rs +++ b/sequencer/src/bin/nasty-client.rs @@ -14,6 +14,7 @@ use anyhow::{bail, ensure, Context}; use async_lock::RwLock; +use bytesize::ByteSize; use clap::Parser; use committable::Committable; use derivative::Derivative; @@ -38,6 +39,7 @@ use jf_merkle_tree::{ ForgetableMerkleTreeScheme, MerkleCommitment, MerkleTreeScheme, UniversalMerkleTreeScheme, }; use rand::{seq::SliceRandom, RngCore}; +use reqwest::header::ACCEPT; use sequencer::{api::endpoints::NamespaceProofQueryData, SequencerApiVersion}; use sequencer_utils::logging; use serde::de::DeserializeOwned; @@ -50,13 +52,13 @@ use std::{ time::{Duration, Instant}, }; use strum::{EnumDiscriminants, VariantArray}; -use surf_disco::{error::ClientError, socket, Error, StatusCode, Url}; +use surf_disco::{error::ClientError, socket, StatusCode, Url}; use tide_disco::{error::ServerError, App}; use time::OffsetDateTime; use tokio::{task::spawn, time::sleep}; use toml::toml; use tracing::info_span; -use vbs::version::StaticVersionType; +use vbs::{version::StaticVersionType, BinarySerializer, Serializer}; /// An adversarial stress test for sequencer APIs. #[derive(Clone, Debug, Parser)] @@ -90,18 +92,22 @@ struct ClientConfig { /// /// Requests that take longer than this will fail, causing an error log and an increment of the /// `failed_actions` metric. + /// + /// Note that this time includes the time taken to stream the response body, so it should be set + /// somewhat conservatively to allow time to stream large (few MB) responses. #[clap( long, env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_ERROR", - default_value = "5s", + default_value = "60s", value_parser = parse_duration, )] http_timeout_error: Duration, /// Timeout for issuing a warning due to slow HTTP requests. /// - /// Requests that take longer than this but shorter than HTTP_TIMEOUT_ERROR will not generate an - /// error but will output a warning and increment a counter of slow HTTP requests. + /// Requests that take longer than this but shorter than HTTP_TIMEOUT_ERROR to return at least a + /// response header will not generate an error but will output a warning and increment a counter + /// of slow HTTP requests. #[clap( long, env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_WARNING", @@ -262,6 +268,8 @@ struct Metrics { query_fee_state_actions: Box, slow_requests: Box, request_latency: Box, + request_bandwidth: Box, + response_size: Box, } impl Metrics { @@ -350,6 +358,12 @@ impl Metrics { request_latency: registry .subgroup("http".into()) .create_histogram("latency".into(), Some("s".into())), + request_bandwidth: registry + .subgroup("http".into()) + .create_histogram("bandwidth".into(), Some("MiB/s".into())), + response_size: registry + .subgroup("http".into()) + .create_histogram("response_size".into(), Some("mb".into())), } } } @@ -462,7 +476,9 @@ struct Subscription { #[derive(Debug)] struct ResourceManager { - client: surf_disco::Client, + stream_client: surf_disco::Client, + get_client: reqwest::Client, + base_url: Url, open_streams: BTreeMap>, next_stream_id: u64, metrics: Arc, @@ -472,9 +488,12 @@ struct ResourceManager { impl ResourceManager { fn new(opt: &Options, metrics: Arc) -> Self { Self { - client: surf_disco::Client::builder(opt.url.clone()) - .set_timeout(Some(opt.client_config.http_timeout_error)) - .build(), + stream_client: surf_disco::Client::builder(opt.url.clone()).build(), + get_client: reqwest::Client::builder() + .timeout(opt.client_config.http_timeout_error) + .build() + .unwrap(), + base_url: opt.url.clone(), open_streams: BTreeMap::new(), next_stream_id: 0, metrics, @@ -551,14 +570,25 @@ impl ResourceManager { tracing::debug!("-> GET {path}"); let start = Instant::now(); - let res = self.client.get::(&path).send().await; + let res = self + .get_client + .get(self.base_url.join(&path)?) + .header(ACCEPT, "application/octet-stream") + .send() + .await + .context(format!("error sending request {path}"))?; let elapsed = start.elapsed(); + let status = res.status(); - let status = match &res { - Ok(_) => StatusCode::OK, - Err(err) => err.status(), - }; - tracing::debug!("<- GET {path} {} ({elapsed:?})", u16::from(status)); + // Time the body separately; we don't want to penalize the server for time spent streaming a + // large response over the network; we're more interested in computation time on the server + // itself to generate the response. + let body_start = Instant::now(); + let body = res.bytes().await.context("error streaming response body")?; + let body_elapsed = body_start.elapsed(); + let body_size = ByteSize::b(body.len() as u64); + + tracing::debug!("<- GET {path} {status} ({elapsed:?}) ({body_size} in {body_elapsed:?})",); self.metrics .request_latency @@ -568,7 +598,18 @@ impl ResourceManager { tracing::warn!(%path, ?elapsed, "slow request"); } - res.context(format!("GET {path}")) + self.metrics.request_bandwidth.add_point( + ((body_size.0 as f64) / (ByteSize::mib(1).0 as f64)) / (body_elapsed.as_secs() as f64), + ); + self.metrics + .response_size + .add_point((body_size.0 as f64) / (ByteSize::kb(1).0 as f64)); + + ensure!( + status == StatusCode::OK, + "{path}: error from server: {status}", + ); + Serializer::::deserialize(&body).context("decoding response body") } async fn query(&self, at: u64) -> anyhow::Result<()> { @@ -701,7 +742,7 @@ impl ResourceManager { let from = self.adjust_index(from).await?; let stream = self - .client + .stream_client .socket(&format!("availability/stream/{}/{from}", Self::plural())) .subscribe() .await @@ -804,7 +845,7 @@ impl ResourceManager { // but refresh the connection and try again. tracing::warn!("error in old connection, refreshing connection: {err:#}"); let conn = self - .client + .stream_client .socket(&format!("availability/stream/{}/{pos}", Self::plural())) .subscribe() .await