Skip to content

Commit

Permalink
Improve handling of large responses in nasty client
Browse files Browse the repository at this point in the history
Some of the responses to queries like payload queries and range
queries can be up to a few megabytes. This change prevents distracting
spurious warnings and errors for such responses by
* increasing the HTTP client timeout to allow plenty of time to stream
  the response over the network
* only counting queries as slow if it takes longer than 1s to receive
  a response header -- the time it takes to stream the response does
  not count toward slow warnings, as this time is not taking
  computational resources on the server
  • Loading branch information
jbearer committed Nov 22, 2024
1 parent 9ca9918 commit 8774a41
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ testing = [
"hotshot-query-service/testing",
]
benchmarking = []
nasty-client = ["reqwest", "bytesize"]

[[bin]]
name = "espresso-dev-node"
required-features = ["testing"]

[[bin]]
name = "nasty-client"
required-features = ["nasty-client"]

[dev-dependencies]
escargot = "0.5.10"
espresso-macros = { git = "https://github.com/EspressoSystems/espresso-macros.git", tag = "0.1.0" }
Expand Down Expand Up @@ -125,6 +130,10 @@ url = { workspace = true }
vbs = { workspace = true }
vec1 = { workspace = true }

# Dependencies for nasty-client
bytesize = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }

[package.metadata.cargo-udeps.ignore]
normal = ["hotshot-testing"]

Expand Down
77 changes: 59 additions & 18 deletions sequencer/src/bin/nasty-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use anyhow::{bail, ensure, Context};
use async_lock::RwLock;
use bytesize::ByteSize;
use clap::Parser;
use committable::Committable;
use derivative::Derivative;
Expand All @@ -38,6 +39,7 @@ use jf_merkle_tree::{
ForgetableMerkleTreeScheme, MerkleCommitment, MerkleTreeScheme, UniversalMerkleTreeScheme,
};
use rand::{seq::SliceRandom, RngCore};
use reqwest::header::ACCEPT;
use sequencer::{api::endpoints::NamespaceProofQueryData, SequencerApiVersion};
use sequencer_utils::logging;
use serde::de::DeserializeOwned;
Expand All @@ -50,13 +52,13 @@ use std::{
time::{Duration, Instant},
};
use strum::{EnumDiscriminants, VariantArray};
use surf_disco::{error::ClientError, socket, Error, StatusCode, Url};
use surf_disco::{error::ClientError, socket, StatusCode, Url};
use tide_disco::{error::ServerError, App};
use time::OffsetDateTime;
use tokio::{task::spawn, time::sleep};
use toml::toml;
use tracing::info_span;
use vbs::version::StaticVersionType;
use vbs::{version::StaticVersionType, BinarySerializer, Serializer};

/// An adversarial stress test for sequencer APIs.
#[derive(Clone, Debug, Parser)]
Expand Down Expand Up @@ -90,18 +92,22 @@ struct ClientConfig {
///
/// Requests that take longer than this will fail, causing an error log and an increment of the
/// `failed_actions` metric.
///
/// Note that this time includes the time taken to stream the response body, so it should be set
/// somewhat conservatively to allow time to stream large (few MB) responses.
#[clap(
long,
env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_ERROR",
default_value = "5s",
default_value = "60s",
value_parser = parse_duration,
)]
http_timeout_error: Duration,

/// Timeout for issuing a warning due to slow HTTP requests.
///
/// Requests that take longer than this but shorter than HTTP_TIMEOUT_ERROR will not generate an
/// error but will output a warning and increment a counter of slow HTTP requests.
/// Requests that take longer than this but shorter than HTTP_TIMEOUT_ERROR to return at least a
/// response header will not generate an error but will output a warning and increment a counter
/// of slow HTTP requests.
#[clap(
long,
env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_WARNING",
Expand Down Expand Up @@ -262,6 +268,8 @@ struct Metrics {
query_fee_state_actions: Box<dyn Counter>,
slow_requests: Box<dyn Counter>,
request_latency: Box<dyn Histogram>,
request_bandwidth: Box<dyn Histogram>,
response_size: Box<dyn Histogram>,
}

impl Metrics {
Expand Down Expand Up @@ -350,6 +358,12 @@ impl Metrics {
request_latency: registry
.subgroup("http".into())
.create_histogram("latency".into(), Some("s".into())),
request_bandwidth: registry
.subgroup("http".into())
.create_histogram("bandwidth".into(), Some("MiB/s".into())),
response_size: registry
.subgroup("http".into())
.create_histogram("response_size".into(), Some("mb".into())),
}
}
}
Expand Down Expand Up @@ -462,7 +476,9 @@ struct Subscription<T: Queryable> {

#[derive(Debug)]
struct ResourceManager<T: Queryable> {
client: surf_disco::Client<ClientError, SequencerApiVersion>,
stream_client: surf_disco::Client<ClientError, SequencerApiVersion>,
get_client: reqwest::Client,
base_url: Url,
open_streams: BTreeMap<u64, Subscription<T>>,
next_stream_id: u64,
metrics: Arc<Metrics>,
Expand All @@ -472,9 +488,12 @@ struct ResourceManager<T: Queryable> {
impl<T: Queryable> ResourceManager<T> {
fn new(opt: &Options, metrics: Arc<Metrics>) -> Self {
Self {
client: surf_disco::Client::builder(opt.url.clone())
.set_timeout(Some(opt.client_config.http_timeout_error))
.build(),
stream_client: surf_disco::Client::builder(opt.url.clone()).build(),
get_client: reqwest::Client::builder()
.timeout(opt.client_config.http_timeout_error)
.build()
.unwrap(),
base_url: opt.url.clone(),
open_streams: BTreeMap::new(),
next_stream_id: 0,
metrics,
Expand Down Expand Up @@ -551,14 +570,25 @@ impl<T: Queryable> ResourceManager<T> {
tracing::debug!("-> GET {path}");

let start = Instant::now();
let res = self.client.get::<R>(&path).send().await;
let res = self
.get_client
.get(self.base_url.join(&path)?)
.header(ACCEPT, "application/octet-stream")
.send()
.await
.context(format!("error sending request {path}"))?;
let elapsed = start.elapsed();
let status = res.status();

let status = match &res {
Ok(_) => StatusCode::OK,
Err(err) => err.status(),
};
tracing::debug!("<- GET {path} {} ({elapsed:?})", u16::from(status));
// Time the body separately; we don't want to penalize the server for time spent streaming a
// large response over the network; we're more interested in computation time on the server
// itself to generate the response.
let body_start = Instant::now();
let body = res.bytes().await.context("error streaming response body")?;
let body_elapsed = body_start.elapsed();
let body_size = ByteSize::b(body.len() as u64);

tracing::debug!("<- GET {path} {status} ({elapsed:?}) ({body_size} in {body_elapsed:?})",);

self.metrics
.request_latency
Expand All @@ -568,7 +598,18 @@ impl<T: Queryable> ResourceManager<T> {
tracing::warn!(%path, ?elapsed, "slow request");
}

res.context(format!("GET {path}"))
self.metrics.request_bandwidth.add_point(
((body_size.0 as f64) / (ByteSize::mib(1).0 as f64)) / (body_elapsed.as_secs() as f64),
);
self.metrics
.response_size
.add_point((body_size.0 as f64) / (ByteSize::kb(1).0 as f64));

ensure!(
status == StatusCode::OK,
"{path}: error from server: {status}",
);
Serializer::<SequencerApiVersion>::deserialize(&body).context("decoding response body")
}

async fn query(&self, at: u64) -> anyhow::Result<()> {
Expand Down Expand Up @@ -701,7 +742,7 @@ impl<T: Queryable> ResourceManager<T> {

let from = self.adjust_index(from).await?;
let stream = self
.client
.stream_client
.socket(&format!("availability/stream/{}/{from}", Self::plural()))
.subscribe()
.await
Expand Down Expand Up @@ -804,7 +845,7 @@ impl<T: Queryable> ResourceManager<T> {
// but refresh the connection and try again.
tracing::warn!("error in old connection, refreshing connection: {err:#}");
let conn = self
.client
.stream_client
.socket(&format!("availability/stream/{}/{pos}", Self::plural()))
.subscribe()
.await
Expand Down

0 comments on commit 8774a41

Please sign in to comment.