From cb1e6a7053186d3a9c1eba4b8c5aa39a2bb8f344 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 22 Aug 2024 13:05:08 -0500 Subject: [PATCH 1/2] geyser: handle `x-request-snapshot` on client request --- CHANGELOG.md | 1 + yellowstone-grpc-geyser/src/grpc.rs | 140 +++++++++++++++++----------- 2 files changed, 84 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8e0c8ac..51de5dde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features - solana: bump proto/client version for publishing crates +- geyser: handle `x-request-snapshot` on client request ([#409](https://github.com/rpcpool/yellowstone-grpc/pull/409)) ## 2024-08-09 diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index f6d50b84..97922d42 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1110,6 +1110,7 @@ impl GrpcService { async fn client_loop( id: usize, endpoint: String, + x_request_snapshot: bool, config_filters: Arc, stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, @@ -1144,63 +1145,18 @@ impl GrpcService { info!("client #{id}: new"); let mut is_alive = true; - if let Some(snapshot_rx) = snapshot_rx.take() { - info!("client #{id}: going to receive snapshot data"); - - // we start with default filter, for snapshot we need wait actual filter first - while is_alive { - match client_rx.recv().await { - Some(Some(filter_new)) => { - if let Some(msg) = filter_new.get_pong_msg() { - if stream_tx.send(Ok(msg)).await.is_err() { - error!("client #{id}: stream closed"); - is_alive = false; - break; - } - continue; - } - - prom::update_subscriptions(&endpoint, Some(&filter), Some(&filter_new)); - filter = filter_new; - info!("client #{id}: filter updated"); - break; - } - Some(None) => { - is_alive = false; - } - None => { - is_alive = false; - } - }; - } - - while is_alive { - let message = match snapshot_rx.try_recv() { - Ok(message) => { - MESSAGE_QUEUE_SIZE.dec(); - match message { - Some(message) => message, - None => break, - } - } - Err(crossbeam_channel::TryRecvError::Empty) => { - sleep(Duration::from_millis(1)).await; - continue; - } - Err(crossbeam_channel::TryRecvError::Disconnected) => { - error!("client #{id}: snapshot channel disconnected"); - is_alive = false; - break; - } - }; - - for message in filter.get_update(&message, None) { - if stream_tx.send(Ok(message)).await.is_err() { - error!("client #{id}: stream closed"); - is_alive = false; - break; - } - } + if x_request_snapshot { + if let Some(snapshot_rx) = snapshot_rx.take() { + Self::client_loop_snapshot( + id, + &endpoint, + &stream_tx, + &mut client_rx, + snapshot_rx, + &mut is_alive, + &mut filter, + ) + .await; } } @@ -1285,6 +1241,74 @@ impl GrpcService { info!("client #{id}: removed"); drop_client(); } + + async fn client_loop_snapshot( + id: usize, + endpoint: &str, + stream_tx: &mpsc::Sender>, + client_rx: &mut mpsc::UnboundedReceiver>, + snapshot_rx: crossbeam_channel::Receiver>, + is_alive: &mut bool, + filter: &mut Filter, + ) { + info!("client #{id}: going to receive snapshot data"); + + // we start with default filter, for snapshot we need wait actual filter first + while *is_alive { + match client_rx.recv().await { + Some(Some(filter_new)) => { + if let Some(msg) = filter_new.get_pong_msg() { + if stream_tx.send(Ok(msg)).await.is_err() { + error!("client #{id}: stream closed"); + *is_alive = false; + break; + } + continue; + } + + prom::update_subscriptions(endpoint, Some(filter), Some(&filter_new)); + *filter = filter_new; + info!("client #{id}: filter updated"); + break; + } + Some(None) => { + *is_alive = false; + } + None => { + *is_alive = false; + } + }; + } + + while *is_alive { + let message = match snapshot_rx.try_recv() { + Ok(message) => { + MESSAGE_QUEUE_SIZE.dec(); + match message { + Some(message) => message, + None => break, + } + } + Err(crossbeam_channel::TryRecvError::Empty) => { + sleep(Duration::from_millis(1)).await; + continue; + } + Err(crossbeam_channel::TryRecvError::Disconnected) => { + error!("client #{id}: snapshot channel disconnected"); + *is_alive = false; + break; + } + }; + + for message in filter.get_update(&message, None) { + if stream_tx.send(Ok(message)).await.is_err() { + error!("client #{id}: stream closed"); + *is_alive = false; + break; + } + } + } + } } #[tonic::async_trait] @@ -1342,6 +1366,7 @@ impl Geyser for GrpcService { .get("x-endpoint") .and_then(|h| h.to_str().ok().map(|s| s.to_string())) .unwrap_or_else(|| "".to_owned()); + let x_request_snapshot = request.metadata().contains_key("x-request-snapshot"); let config_filters = Arc::clone(&self.config_filters); let incoming_stream_tx = stream_tx.clone(); @@ -1388,6 +1413,7 @@ impl Geyser for GrpcService { tokio::spawn(Self::client_loop( id, endpoint, + x_request_snapshot, Arc::clone(&self.config_filters), stream_tx, client_rx, From 5570f675ae2d2d330519d6d268cd19a00f2ade16 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 22 Aug 2024 13:13:03 -0500 Subject: [PATCH 2/2] client --- CHANGELOG.md | 2 +- Cargo.lock | 2 +- Cargo.toml | 4 ++-- yellowstone-grpc-client/Cargo.toml | 2 +- yellowstone-grpc-client/src/lib.rs | 29 +++++++++++++++++++++-------- 5 files changed, 26 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51de5dde..bc101367 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ The minor version will be incremented upon a breaking change and the patch versi ## 2024-08-22 -- yellowstone-grpc-client-1.15.2+solana.1.18.22 +- yellowstone-grpc-client-1.15.3+solana.1.18.22 - yellowstone-grpc-geyser-1.15.1+solana.1.18.22 - yellowstone-grpc-proto-1.14.2+solana.1.18.22 - yellowstone-grpc-tools-1.0.0-rc.11+solana.1.18.22 diff --git a/Cargo.lock b/Cargo.lock index 1f952edc..5614f5cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5253,7 +5253,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.15.2+solana.1.18.22" +version = "1.15.3+solana.1.18.22" dependencies = [ "bytes", "futures", diff --git a/Cargo.toml b/Cargo.toml index 273c0e18..fdba4aae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ resolver = "2" members = [ "examples/rust", # 1.13.0+solana.1.18.22 - "yellowstone-grpc-client", # 1.15.2+solana.1.18.22 + "yellowstone-grpc-client", # 1.15.3+solana.1.18.22 "yellowstone-grpc-geyser", # 1.15.1+solana.1.18.22 "yellowstone-grpc-proto", # 1.14.2+solana.1.18.22 "yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.22 @@ -69,7 +69,7 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" uuid = "1.8.0" vergen = "8.2.1" -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.2+solana.1.18.22" } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.3+solana.1.18.22" } yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.2+solana.1.18.22" } [profile.release] diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index 3dca65b2..c1f0de68 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "1.15.2+solana.1.18.22" +version = "1.15.3+solana.1.18.22" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 8d57d30a..c3b903ea 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -9,7 +9,7 @@ use { std::time::Duration, tonic::{ codec::{CompressionEncoding, Streaming}, - metadata::{errors::InvalidMetadataValue, AsciiMetadataValue}, + metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue}, service::interceptor::InterceptedService, transport::channel::{Channel, ClientTlsConfig, Endpoint}, Request, Response, Status, @@ -27,12 +27,7 @@ use { #[derive(Debug, Clone)] pub struct InterceptorXToken { pub x_token: Option, -} - -impl From> for InterceptorXToken { - fn from(x_token: Option) -> Self { - Self { x_token } - } + pub x_request_snapshot: bool, } impl Interceptor for InterceptorXToken { @@ -40,6 +35,11 @@ impl Interceptor for InterceptorXToken { if let Some(x_token) = self.x_token.clone() { request.metadata_mut().insert("x-token", x_token); } + if self.x_request_snapshot { + request + .metadata_mut() + .insert("x-request-snapshot", MetadataValue::from_static("true")); + } Ok(request) } } @@ -215,6 +215,7 @@ pub type GeyserGrpcBuilderResult = Result; pub struct GeyserGrpcBuilder { pub endpoint: Endpoint, pub x_token: Option, + pub x_request_snapshot: bool, pub send_compressed: Option, pub accept_compressed: Option, pub max_decoding_message_size: Option, @@ -227,6 +228,7 @@ impl GeyserGrpcBuilder { Self { endpoint, x_token: None, + x_request_snapshot: false, send_compressed: None, accept_compressed: None, max_decoding_message_size: None, @@ -247,7 +249,10 @@ impl GeyserGrpcBuilder { self, channel: Channel, ) -> GeyserGrpcBuilderResult> { - let interceptor: InterceptorXToken = self.x_token.into(); + let interceptor = InterceptorXToken { + x_token: self.x_token, + x_request_snapshot: self.x_request_snapshot, + }; let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone()); if let Some(encoding) = self.send_compressed { @@ -299,6 +304,14 @@ impl GeyserGrpcBuilder { }) } + // Include `x-request-snapshot` + pub fn set_x_request_snapshot(self, value: bool) -> GeyserGrpcBuilderResult { + Ok(Self { + x_request_snapshot: value, + ..self + }) + } + // Endpoint options pub fn connect_timeout(self, dur: Duration) -> Self { Self {