Skip to content

Commit

Permalink
fix: allow optional x-token in rust client (#72)
Browse files Browse the repository at this point in the history
* fix: allow optional x-token in rust client

---------

Co-authored-by: Kirill Fomichev <[email protected]>
  • Loading branch information
shuimuliang and fanatid authored Mar 3, 2023
1 parent 1aa1884 commit c4487d8
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 20 deletions.
118 changes: 113 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ path = "client/client.rs"
anyhow = "1.0.62"
backoff = { version = "0.4.0", features = ["tokio"] }
clap = { version = "3.2.22", features = ["cargo", "derive"] }
env_logger = { version = "0.10.0" }
futures = "0.3.24"
log = { version = "0.4.14", features = ["std"] }
solana-geyser-grpc = { path = "../solana-geyser-grpc" }
thiserror = "1.0"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time"] }
Expand Down
37 changes: 22 additions & 15 deletions rust/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use {
backoff::{future::retry, ExponentialBackoff},
clap::Parser,
env_logger,
futures::stream::{once, StreamExt},
log::{error, info, warn},
solana_geyser_grpc::proto::{
geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
Expand Down Expand Up @@ -114,20 +116,23 @@ impl RetryChannel {
}
}
Some(token_str) => return Err(Error::XToken(token_str)),
None => return Err(Error::XToken("".to_owned())),
None => {
x_token = None;
warn!("x_token is None");
}
}

let res = Channel::from_shared(endpoint_str.clone());
match res {
Err(e) => {
println!("{}", e);
error!("{}", e);
return Err(Error::InvalidUri(endpoint_str));
}
Ok(_endpoint) => {
if _endpoint.uri().scheme_str() == Some("https") {
match _endpoint.tls_config(ClientTlsConfig::new()) {
Err(e) => {
println!("{}", e);
error!("{}", e);
return Err(Error::InvalidUri(endpoint_str));
}
Ok(e) => endpoint = e,
Expand Down Expand Up @@ -170,7 +175,7 @@ impl RetryChannel {
// [500ms, 750ms, 1.125s, 1.6875s, 2.53125s, 3.796875s, 5.6953125s,
// 8.5s, 12.8s, 19.2s, 28.8s, 43.2s, 64.8s, 97s, ... ]
retry(ExponentialBackoff::default(), move || async {
println!("Retry to connect to the server");
info!("Retry to connect to the server");
let mut client = self.client();
client
.subscribe(slots, accounts, transactions, blocks, blocks_meta)
Expand Down Expand Up @@ -209,26 +214,29 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> RetryClient<F> {
blocks: blocks.clone(),
blocks_meta: blocks_meta.clone(),
};
println!("Going to send request: {:?}", request);
info!("Going to send request: {:?}", request);

let response: Response<Streaming<SubscribeUpdate>> =
self.client.subscribe(once(async move { request })).await?;
let mut stream: Streaming<SubscribeUpdate> = response.into_inner();

println!("stream opened");
info!("stream opened");
while let Some(message) = stream.next().await {
match message {
Ok(message) => println!("new message: {:?}", message),
Err(error) => eprintln!("error: {:?}", error),
Ok(message) => info!("new message: {:?}", message),
Err(error) => error!("error: {:?}", error),
}
}
println!("stream closed");
info!("stream closed");
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<(), Error> {
::std::env::set_var("RUST_LOG", "info");
env_logger::init();

let args = Args::parse();

let mut accounts = HashMap::new();
Expand Down Expand Up @@ -275,7 +283,7 @@ async fn main() -> Result<(), Error> {
let res: Result<RetryChannel, Error> =
RetryChannel::new(args.endpoint.clone(), args.x_token.clone());
if let Err(e) = res {
eprintln!("Error: {}", e);
error!("Error: {}", e);
return Err(e);
}

Expand All @@ -284,7 +292,7 @@ async fn main() -> Result<(), Error> {
.subscribe_retry(&slots, &accounts, &transactions, &blocks, &blocks_meta)
.await;
if let Err(e) = res {
println!("Error: {}", e);
error!("Error: {}", e);
return Err(Error::RetrySubscribe(e));
}

Expand Down Expand Up @@ -329,10 +337,9 @@ mod tests {
async fn test_channel_invalid_token_none() {
let endpoint = "http://127.0.0.1:10000".to_owned();
let x_token = None;
assert!(matches!(
RetryChannel::new(endpoint, x_token),
Err(Error::XToken(_))
));
// only show warning in log
let res: Result<RetryChannel, Error> = RetryChannel::new(endpoint, x_token);
assert!(res.is_ok());
}

#[tokio::test]
Expand Down

0 comments on commit c4487d8

Please sign in to comment.