Skip to content

Commit

Permalink
example: add connection options to Rust client (#478)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Nov 30, 2024
1 parent 625874f commit c4ae964
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The minor version will be incremented upon a breaking change and the patch versi

- proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474))
- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475))
- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478))

### Breaking

Expand Down
93 changes: 84 additions & 9 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,58 @@ struct Args {
#[clap(long)]
x_token: Option<String>,

/// Commitment level: processed, confirmed or finalized
/// Apply a timeout to connecting to the uri.
#[clap(long)]
commitment: Option<ArgsCommitment>,
connect_timeout_ms: Option<u64>,

/// Sets the tower service default internal buffer size, default is 1024
#[clap(long)]
buffer_size: Option<usize>,

/// Sets whether to use an adaptive flow control. Uses hyper’s default otherwise.
#[clap(long)]
http2_adaptive_window: Option<bool>,

/// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise.
#[clap(long)]
http2_keep_alive_interval_ms: Option<u64>,

/// Sets the max connection-level flow control for HTTP2, default is 65,535
#[clap(long)]
initial_connection_window_size: Option<u32>,

///Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control, default is 65,535
#[clap(long)]
initial_stream_window_size: Option<u32>,

///Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise.
#[clap(long)]
keep_alive_timeout_ms: Option<u64>,

/// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper’s default otherwise.
#[clap(long)]
keep_alive_while_idle: Option<bool>,

/// Set whether TCP keepalive messages are enabled on accepted connections.
#[clap(long)]
tcp_keepalive_ms: Option<u64>,

/// Set the value of TCP_NODELAY option for accepted connections. Enabled by default.
#[clap(long)]
tcp_nodelay: Option<bool>,

/// Apply a timeout to each request.
#[clap(long)]
timeout_ms: Option<u64>,

/// Max message size before decoding, full blocks can be super large, default is 1GiB
#[clap(long, default_value_t = 1024 * 1024 * 1024)]
max_decoding_message_size: usize,

/// Commitment level: processed, confirmed or finalized
#[clap(long)]
commitment: Option<ArgsCommitment>,

#[command(subcommand)]
action: Action,
}
Expand All @@ -73,15 +117,46 @@ impl Args {
}

async fn connect(&self) -> anyhow::Result<GeyserGrpcClient<impl Interceptor>> {
GeyserGrpcClient::build_from_shared(self.endpoint.clone())?
let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())?
.x_token(self.x_token.clone())?
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.tls_config(ClientTlsConfig::new().with_native_roots())?
.max_decoding_message_size(self.max_decoding_message_size)
.connect()
.await
.map_err(Into::into)
.max_decoding_message_size(self.max_decoding_message_size);

if let Some(duration) = self.connect_timeout_ms {
builder = builder.connect_timeout(Duration::from_millis(duration));
}
if let Some(sz) = self.buffer_size {
builder = builder.buffer_size(sz);
}
if let Some(enabled) = self.http2_adaptive_window {
builder = builder.http2_adaptive_window(enabled);
}
if let Some(duration) = self.http2_keep_alive_interval_ms {
builder = builder.http2_keep_alive_interval(Duration::from_millis(duration));
}
if let Some(sz) = self.initial_connection_window_size {
builder = builder.initial_connection_window_size(sz);
}
if let Some(sz) = self.initial_stream_window_size {
builder = builder.initial_stream_window_size(sz);
}
if let Some(duration) = self.keep_alive_timeout_ms {
builder = builder.keep_alive_timeout(Duration::from_millis(duration));
}
if let Some(enabled) = self.keep_alive_while_idle {
builder = builder.keep_alive_while_idle(enabled);
}
if let Some(duration) = self.tcp_keepalive_ms {
builder = builder.tcp_keepalive(Some(Duration::from_millis(duration)));
}
if let Some(enabled) = self.tcp_nodelay {
builder = builder.tcp_nodelay(enabled);
}
if let Some(duration) = self.timeout_ms {
builder = builder.timeout(Duration::from_millis(duration));
}

builder.connect().await.map_err(Into::into)
}
}

Expand Down
52 changes: 18 additions & 34 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,8 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
pub enum GeyserGrpcBuilderError {
#[error("Failed to parse x-token: {0}")]
MetadataValueError(#[from] InvalidMetadataValue),
#[error("Invalid X-Token length: {0}, expected 28")]
InvalidXTokenLength(usize),
#[error("gRPC transport error: {0}")]
TonicError(#[from] tonic::transport::Error),
#[error("tonic::transport::Channel should be created, use `connect` or `connect_lazy` first")]
EmptyChannel,
}

pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
Expand Down Expand Up @@ -290,16 +286,7 @@ impl GeyserGrpcBuilder {
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
Ok(Self {
x_token: match x_token {
Some(x_token) => {
let x_token = x_token.try_into()?;
if x_token.is_empty() {
return Err(GeyserGrpcBuilderError::InvalidXTokenLength(x_token.len()));
}
Some(x_token)
}
None => None,
},
x_token: x_token.map(|x_token| x_token.try_into()).transpose()?,
..self
})
}
Expand All @@ -320,20 +307,6 @@ impl GeyserGrpcBuilder {
}
}

pub fn timeout(self, dur: Duration) -> Self {
Self {
endpoint: self.endpoint.timeout(dur),
..self
}
}

pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
Ok(Self {
endpoint: self.endpoint.tls_config(tls_config)?,
..self
})
}

pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
Self {
endpoint: self.endpoint.buffer_size(sz),
Expand Down Expand Up @@ -397,6 +370,20 @@ impl GeyserGrpcBuilder {
}
}

pub fn timeout(self, dur: Duration) -> Self {
Self {
endpoint: self.endpoint.timeout(dur),
..self
}
}

pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
Ok(Self {
endpoint: self.endpoint.tls_config(tls_config)?,
..self
})
}

// Geyser options
pub fn send_compressed(self, encoding: CompressionEncoding) -> Self {
Self {
Expand Down Expand Up @@ -429,7 +416,7 @@ impl GeyserGrpcBuilder {

#[cfg(test)]
mod tests {
use super::{GeyserGrpcBuilderError, GeyserGrpcClient};
use super::GeyserGrpcClient;

#[tokio::test]
async fn test_channel_https_success() {
Expand Down Expand Up @@ -462,18 +449,15 @@ mod tests {
}

#[tokio::test]
async fn test_channel_invalid_token_some() {
async fn test_channel_empty_token_some() {
let endpoint = "http://127.0.0.1:10000";
let x_token = "";

let res = GeyserGrpcClient::build_from_shared(endpoint);
assert!(res.is_ok());

let res = res.unwrap().x_token(Some(x_token));
assert!(matches!(
res,
Err(GeyserGrpcBuilderError::InvalidXTokenLength(_))
));
assert!(res.is_ok());
}

#[tokio::test]
Expand Down

0 comments on commit c4ae964

Please sign in to comment.