diff --git a/CHANGELOG.md b/CHANGELOG.md index e4b50dac..93baa6bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features - geyser: add optional TLS to gRPC server config ([#183](https://github.com/rpcpool/yellowstone-grpc/pull/183)). +- client: add timeout options to rust ([#187](https://github.com/rpcpool/yellowstone-grpc/pull/187)). ### Fixes diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 11da5763..109b2fcd 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -8,6 +8,7 @@ use { collections::HashMap, env, sync::{Arc, Mutex}, + time::Duration, }, yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}, yellowstone_grpc_proto::{ @@ -379,8 +380,17 @@ async fn main() -> anyhow::Result<()> { } let commitment = args.get_commitment(); - let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None) - .map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?; + let mut client = GeyserGrpcClient::connect_with_timeout( + args.endpoint, + args.x_token, + None, + Some(Duration::from_secs(10)), + Some(Duration::from_secs(10)), + false, + ) + .await + .map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?; + info!("Connected"); match &args.action { Action::HealthCheck => client diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index b44ef7a8..4517f4c3 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -6,12 +6,12 @@ use { stream::Stream, }, http::uri::InvalidUri, - std::collections::HashMap, + std::{collections::HashMap, time::Duration}, tonic::{ codec::Streaming, metadata::{errors::InvalidMetadataValue, AsciiMetadataValue}, service::{interceptor::InterceptedService, Interceptor}, - transport::channel::{Channel, ClientTlsConfig}, + transport::channel::{Channel, ClientTlsConfig, Endpoint}, Request, Response, Status, }, tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse}, @@ -65,23 +65,21 @@ pub struct GeyserGrpcClient { } impl GeyserGrpcClient<()> { - pub fn connect( + fn connect2( endpoint: E, - x_token: Option, tls_config: Option, - ) -> GeyserGrpcClientResult> + x_token: Option, + ) -> GeyserGrpcClientResult<(Endpoint, InterceptorFn)> where E: Into, T: TryInto, { let mut endpoint = Channel::from_shared(endpoint)?; - if let Some(tls_config) = tls_config { endpoint = endpoint.tls_config(tls_config)?; } else if endpoint.uri().scheme_str() == Some("https") { endpoint = endpoint.tls_config(ClientTlsConfig::new())?; } - let channel = endpoint.connect_lazy(); let x_token: Option = match x_token { Some(x_token) => Some(x_token.try_into()?), @@ -95,6 +93,53 @@ impl GeyserGrpcClient<()> { } let interceptor = InterceptorFn { x_token }; + Ok((endpoint, interceptor)) + } + + pub fn connect( + endpoint: E, + x_token: Option, + tls_config: Option, + ) -> GeyserGrpcClientResult> + where + E: Into, + T: TryInto, + { + let (endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?; + let channel = endpoint.connect_lazy(); + Ok(GeyserGrpcClient { + health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()), + geyser: GeyserClient::with_interceptor(channel, interceptor) + .max_decoding_message_size(64 * 1024 * 1024), // 64 MiB + }) + } + + pub async fn connect_with_timeout( + endpoint: E, + x_token: Option, + tls_config: Option, + connect_timeout: Option, + request_timeout: Option, + connect_lazy: bool, + ) -> GeyserGrpcClientResult> + where + E: Into, + T: TryInto, + { + let (mut endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?; + + if let Some(timeout) = connect_timeout { + endpoint = endpoint.connect_timeout(timeout); + } + if let Some(timeout) = request_timeout { + endpoint = endpoint.timeout(timeout); + } + let channel = if connect_lazy { + endpoint.connect_lazy() + } else { + endpoint.connect().await? + }; + Ok(GeyserGrpcClient { health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()), geyser: GeyserClient::with_interceptor(channel, interceptor)