From f6c3bd381d442db33adda66cc9f68b42d312bd05 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 6 Jan 2024 10:59:29 -0500 Subject: [PATCH] client: add subscribe ping example (#269) --- examples/rust/src/bin/subscribe-ping.rs | 84 +++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 examples/rust/src/bin/subscribe-ping.rs diff --git a/examples/rust/src/bin/subscribe-ping.rs b/examples/rust/src/bin/subscribe-ping.rs new file mode 100644 index 00000000..7e8a1449 --- /dev/null +++ b/examples/rust/src/bin/subscribe-ping.rs @@ -0,0 +1,84 @@ +use { + clap::Parser, + futures::{sink::SinkExt, stream::StreamExt}, + log::info, + std::env, + tokio::time::{interval, Duration}, + yellowstone_grpc_client::GeyserGrpcClient, + yellowstone_grpc_proto::prelude::{ + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, + SubscribeRequestFilterSlots, SubscribeRequestPing, SubscribeUpdatePong, + SubscribeUpdateSlot, + }, +}; + +#[derive(Debug, Clone, Parser)] +#[clap(author, version, about)] +struct Args { + /// Service endpoint + #[clap(short, long, default_value_t = String::from("http://127.0.0.1:10000"))] + endpoint: String, + + #[clap(long)] + x_token: Option, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env::set_var( + env_logger::DEFAULT_FILTER_ENV, + env::var_os(env_logger::DEFAULT_FILTER_ENV).unwrap_or_else(|| "info".into()), + ); + env_logger::init(); + + let args = Args::parse(); + + let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)?; + let (mut subscribe_tx, mut stream) = client.subscribe().await?; + + futures::try_join!( + async move { + subscribe_tx + .send(SubscribeRequest { + slots: maplit::hashmap! { "".to_owned() => SubscribeRequestFilterSlots { filter_by_commitment: Some(true) } }, + commitment: Some(CommitmentLevel::Processed as i32), + ..Default::default() + }) + .await?; + + let mut timer = interval(Duration::from_secs(3)); + let mut id = 0; + loop { + timer.tick().await; + id += 1; + subscribe_tx + .send(SubscribeRequest { + ping: Some(SubscribeRequestPing { id }), + ..Default::default() + }) + .await?; + } + #[allow(unreachable_code)] + Ok::<(), anyhow::Error>(()) + }, + async move { + while let Some(message) = stream.next().await { + match message?.update_oneof.expect("valid message") { + UpdateOneof::Slot(SubscribeUpdateSlot { slot, .. }) => { + info!("slot received: {slot}"); + } + UpdateOneof::Ping(_msg) => { + info!("ping received"); + } + UpdateOneof::Pong(SubscribeUpdatePong { id }) => { + info!("pong received: id#{id}"); + } + msg => anyhow::bail!("received unexpected message: {msg:?}"), + } + } + Ok::<(), anyhow::Error>(()) + } + )?; + + Ok(()) +}