Skip to content

Commit

Permalink
client: add subscribe ping example (rpcpool#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Jan 6, 2024
1 parent 1c9a0cf commit f6c3bd3
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions examples/rust/src/bin/subscribe-ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use {
clap::Parser,
futures::{sink::SinkExt, stream::StreamExt},
log::info,
std::env,
tokio::time::{interval, Duration},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterSlots, SubscribeRequestPing, SubscribeUpdatePong,
SubscribeUpdateSlot,
},
};

#[derive(Debug, Clone, Parser)]
#[clap(author, version, about)]
struct Args {
/// Service endpoint
#[clap(short, long, default_value_t = String::from("http://127.0.0.1:10000"))]
endpoint: String,

#[clap(long)]
x_token: Option<String>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env::set_var(
env_logger::DEFAULT_FILTER_ENV,
env::var_os(env_logger::DEFAULT_FILTER_ENV).unwrap_or_else(|| "info".into()),
);
env_logger::init();

let args = Args::parse();

let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)?;
let (mut subscribe_tx, mut stream) = client.subscribe().await?;

futures::try_join!(
async move {
subscribe_tx
.send(SubscribeRequest {
slots: maplit::hashmap! { "".to_owned() => SubscribeRequestFilterSlots { filter_by_commitment: Some(true) } },
commitment: Some(CommitmentLevel::Processed as i32),
..Default::default()
})
.await?;

let mut timer = interval(Duration::from_secs(3));
let mut id = 0;
loop {
timer.tick().await;
id += 1;
subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id }),
..Default::default()
})
.await?;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
},
async move {
while let Some(message) = stream.next().await {
match message?.update_oneof.expect("valid message") {
UpdateOneof::Slot(SubscribeUpdateSlot { slot, .. }) => {
info!("slot received: {slot}");
}
UpdateOneof::Ping(_msg) => {
info!("ping received");
}
UpdateOneof::Pong(SubscribeUpdatePong { id }) => {
info!("pong received: id#{id}");
}
msg => anyhow::bail!("received unexpected message: {msg:?}"),
}
}
Ok::<(), anyhow::Error>(())
}
)?;

Ok(())
}

0 comments on commit f6c3bd3

Please sign in to comment.