From c4ae964588081fac527a9847a54fe73d2af4ed3d Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 30 Nov 2024 09:26:47 -0500 Subject: [PATCH] example: add connection options to Rust client (#478) --- CHANGELOG.md | 1 + examples/rust/src/bin/client.rs | 93 +++++++++++++++++++++++++++--- yellowstone-grpc-client/src/lib.rs | 52 ++++++----------- 3 files changed, 103 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 317d7c72..352bfc99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The minor version will be incremented upon a breaking change and the patch versi - proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474)) - geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475)) +- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478)) ### Breaking diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 9813e580..eacf0919 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -55,14 +55,58 @@ struct Args { #[clap(long)] x_token: Option, - /// Commitment level: processed, confirmed or finalized + /// Apply a timeout to connecting to the uri. #[clap(long)] - commitment: Option, + connect_timeout_ms: Option, + + /// Sets the tower service default internal buffer size, default is 1024 + #[clap(long)] + buffer_size: Option, + + /// Sets whether to use an adaptive flow control. Uses hyper’s default otherwise. + #[clap(long)] + http2_adaptive_window: Option, + + /// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. + #[clap(long)] + http2_keep_alive_interval_ms: Option, + + /// Sets the max connection-level flow control for HTTP2, default is 65,535 + #[clap(long)] + initial_connection_window_size: Option, + + ///Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control, default is 65,535 + #[clap(long)] + initial_stream_window_size: Option, + + ///Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. + #[clap(long)] + keep_alive_timeout_ms: Option, + + /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper’s default otherwise. + #[clap(long)] + keep_alive_while_idle: Option, + + /// Set whether TCP keepalive messages are enabled on accepted connections. + #[clap(long)] + tcp_keepalive_ms: Option, + + /// Set the value of TCP_NODELAY option for accepted connections. Enabled by default. + #[clap(long)] + tcp_nodelay: Option, + + /// Apply a timeout to each request. + #[clap(long)] + timeout_ms: Option, /// Max message size before decoding, full blocks can be super large, default is 1GiB #[clap(long, default_value_t = 1024 * 1024 * 1024)] max_decoding_message_size: usize, + /// Commitment level: processed, confirmed or finalized + #[clap(long)] + commitment: Option, + #[command(subcommand)] action: Action, } @@ -73,15 +117,46 @@ impl Args { } async fn connect(&self) -> anyhow::Result> { - GeyserGrpcClient::build_from_shared(self.endpoint.clone())? + let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())? .x_token(self.x_token.clone())? - .connect_timeout(Duration::from_secs(10)) - .timeout(Duration::from_secs(10)) .tls_config(ClientTlsConfig::new().with_native_roots())? - .max_decoding_message_size(self.max_decoding_message_size) - .connect() - .await - .map_err(Into::into) + .max_decoding_message_size(self.max_decoding_message_size); + + if let Some(duration) = self.connect_timeout_ms { + builder = builder.connect_timeout(Duration::from_millis(duration)); + } + if let Some(sz) = self.buffer_size { + builder = builder.buffer_size(sz); + } + if let Some(enabled) = self.http2_adaptive_window { + builder = builder.http2_adaptive_window(enabled); + } + if let Some(duration) = self.http2_keep_alive_interval_ms { + builder = builder.http2_keep_alive_interval(Duration::from_millis(duration)); + } + if let Some(sz) = self.initial_connection_window_size { + builder = builder.initial_connection_window_size(sz); + } + if let Some(sz) = self.initial_stream_window_size { + builder = builder.initial_stream_window_size(sz); + } + if let Some(duration) = self.keep_alive_timeout_ms { + builder = builder.keep_alive_timeout(Duration::from_millis(duration)); + } + if let Some(enabled) = self.keep_alive_while_idle { + builder = builder.keep_alive_while_idle(enabled); + } + if let Some(duration) = self.tcp_keepalive_ms { + builder = builder.tcp_keepalive(Some(Duration::from_millis(duration))); + } + if let Some(enabled) = self.tcp_nodelay { + builder = builder.tcp_nodelay(enabled); + } + if let Some(duration) = self.timeout_ms { + builder = builder.timeout(Duration::from_millis(duration)); + } + + builder.connect().await.map_err(Into::into) } } diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index ce880dde..f3066f57 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -201,12 +201,8 @@ impl GeyserGrpcClient { pub enum GeyserGrpcBuilderError { #[error("Failed to parse x-token: {0}")] MetadataValueError(#[from] InvalidMetadataValue), - #[error("Invalid X-Token length: {0}, expected 28")] - InvalidXTokenLength(usize), #[error("gRPC transport error: {0}")] TonicError(#[from] tonic::transport::Error), - #[error("tonic::transport::Channel should be created, use `connect` or `connect_lazy` first")] - EmptyChannel, } pub type GeyserGrpcBuilderResult = Result; @@ -290,16 +286,7 @@ impl GeyserGrpcBuilder { T: TryInto, { Ok(Self { - x_token: match x_token { - Some(x_token) => { - let x_token = x_token.try_into()?; - if x_token.is_empty() { - return Err(GeyserGrpcBuilderError::InvalidXTokenLength(x_token.len())); - } - Some(x_token) - } - None => None, - }, + x_token: x_token.map(|x_token| x_token.try_into()).transpose()?, ..self }) } @@ -320,20 +307,6 @@ impl GeyserGrpcBuilder { } } - pub fn timeout(self, dur: Duration) -> Self { - Self { - endpoint: self.endpoint.timeout(dur), - ..self - } - } - - pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult { - Ok(Self { - endpoint: self.endpoint.tls_config(tls_config)?, - ..self - }) - } - pub fn buffer_size(self, sz: impl Into>) -> Self { Self { endpoint: self.endpoint.buffer_size(sz), @@ -397,6 +370,20 @@ impl GeyserGrpcBuilder { } } + pub fn timeout(self, dur: Duration) -> Self { + Self { + endpoint: self.endpoint.timeout(dur), + ..self + } + } + + pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult { + Ok(Self { + endpoint: self.endpoint.tls_config(tls_config)?, + ..self + }) + } + // Geyser options pub fn send_compressed(self, encoding: CompressionEncoding) -> Self { Self { @@ -429,7 +416,7 @@ impl GeyserGrpcBuilder { #[cfg(test)] mod tests { - use super::{GeyserGrpcBuilderError, GeyserGrpcClient}; + use super::GeyserGrpcClient; #[tokio::test] async fn test_channel_https_success() { @@ -462,7 +449,7 @@ mod tests { } #[tokio::test] - async fn test_channel_invalid_token_some() { + async fn test_channel_empty_token_some() { let endpoint = "http://127.0.0.1:10000"; let x_token = ""; @@ -470,10 +457,7 @@ mod tests { assert!(res.is_ok()); let res = res.unwrap().x_token(Some(x_token)); - assert!(matches!( - res, - Err(GeyserGrpcBuilderError::InvalidXTokenLength(_)) - )); + assert!(res.is_ok()); } #[tokio::test]