Skip to content

Commit

Permalink
client: add timeout options to rust (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Oct 1, 2023
1 parent 8a2c898 commit c7a2f1d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features

- geyser: add optional TLS to gRPC server config ([#183](https://github.com/rpcpool/yellowstone-grpc/pull/183)).
- client: add timeout options to rust ([#187](https://github.com/rpcpool/yellowstone-grpc/pull/187)).

### Fixes

Expand Down
14 changes: 12 additions & 2 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
collections::HashMap,
env,
sync::{Arc, Mutex},
time::Duration,
},
yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError},
yellowstone_grpc_proto::{
Expand Down Expand Up @@ -379,8 +380,17 @@ async fn main() -> anyhow::Result<()> {
}

let commitment = args.get_commitment();
let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)
.map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?;
let mut client = GeyserGrpcClient::connect_with_timeout(
args.endpoint,
args.x_token,
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
false,
)
.await
.map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?;
info!("Connected");

match &args.action {
Action::HealthCheck => client
Expand Down
59 changes: 52 additions & 7 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use {
stream::Stream,
},
http::uri::InvalidUri,
std::collections::HashMap,
std::{collections::HashMap, time::Duration},
tonic::{
codec::Streaming,
metadata::{errors::InvalidMetadataValue, AsciiMetadataValue},
service::{interceptor::InterceptedService, Interceptor},
transport::channel::{Channel, ClientTlsConfig},
transport::channel::{Channel, ClientTlsConfig, Endpoint},
Request, Response, Status,
},
tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
Expand Down Expand Up @@ -65,23 +65,21 @@ pub struct GeyserGrpcClient<F> {
}

impl GeyserGrpcClient<()> {
pub fn connect<E, T>(
fn connect2<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
x_token: Option<T>,
) -> GeyserGrpcClientResult<(Endpoint, InterceptorFn)>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
let mut endpoint = Channel::from_shared(endpoint)?;

if let Some(tls_config) = tls_config {
endpoint = endpoint.tls_config(tls_config)?;
} else if endpoint.uri().scheme_str() == Some("https") {
endpoint = endpoint.tls_config(ClientTlsConfig::new())?;
}
let channel = endpoint.connect_lazy();

let x_token: Option<AsciiMetadataValue> = match x_token {
Some(x_token) => Some(x_token.try_into()?),
Expand All @@ -95,6 +93,53 @@ impl GeyserGrpcClient<()> {
}
let interceptor = InterceptorFn { x_token };

Ok((endpoint, interceptor))
}

pub fn connect<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
let (endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?;
let channel = endpoint.connect_lazy();
Ok(GeyserGrpcClient {
health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
geyser: GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(64 * 1024 * 1024), // 64 MiB
})
}

pub async fn connect_with_timeout<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
connect_lazy: bool,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
let (mut endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?;

if let Some(timeout) = connect_timeout {
endpoint = endpoint.connect_timeout(timeout);
}
if let Some(timeout) = request_timeout {
endpoint = endpoint.timeout(timeout);
}
let channel = if connect_lazy {
endpoint.connect_lazy()
} else {
endpoint.connect().await?
};

Ok(GeyserGrpcClient {
health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
geyser: GeyserClient::with_interceptor(channel, interceptor)
Expand Down

0 comments on commit c7a2f1d

Please sign in to comment.