From a9e2fa0a5d42719f372da602776cfed49c10f309 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 22 Aug 2024 22:39:27 -0500 Subject: [PATCH] geyser: handle `x-request-snapshot` on client request (#411) --- CHANGELOG.md | 6 +- Cargo.lock | 2 +- Cargo.toml | 4 +- yellowstone-grpc-client/Cargo.toml | 2 +- yellowstone-grpc-client/src/lib.rs | 29 ++++-- yellowstone-grpc-geyser/src/grpc.rs | 140 +++++++++++++++++----------- 6 files changed, 113 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e358c6f..7eeaefe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ The minor version will be incremented upon a breaking change and the patch versi ## 2024-08-22 -- yellowstone-grpc-client-1.16.0+solana.2.0.5 +- yellowstone-grpc-client-1.16.1+solana.2.0.5 - yellowstone-grpc-geyser-1.16.1+solana.2.0.5 - yellowstone-grpc-proto-1.15.0+solana.2.0.5 - yellowstone-grpc-tools-1.0.0-rc.12+solana.2.0.5 @@ -28,6 +28,10 @@ The minor version will be incremented upon a breaking change and the patch versi - example: fix tls root issue in rust example ([#404](https://github.com/rpcpool/yellowstone-grpc/pull/404)) - geyser: fix filter update loop on snapshot ([#410](https://github.com/rpcpool/yellowstone-grpc/pull/410)) +### Features + +- geyser: handle `x-request-snapshot` on client request ([#411](https://github.com/rpcpool/yellowstone-grpc/pull/411)) + ## 2024-08-09 - yellowstone-grpc-client-1.16.0+solana.2.0.5 diff --git a/Cargo.lock b/Cargo.lock index 7edcf176..b007f757 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5162,7 +5162,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.16.0+solana.2.0.5" +version = "1.16.1+solana.2.0.5" dependencies = [ "bytes", "futures", diff --git a/Cargo.toml b/Cargo.toml index 9722e4a2..f88de695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ resolver = "2" members = [ "examples/rust", # 1.14.1+solana.2.0.5 - "yellowstone-grpc-client", # 1.16.0+solana.2.0.5 + "yellowstone-grpc-client", # 1.16.1+solana.2.0.5 "yellowstone-grpc-geyser", # 1.16.1+solana.2.0.5 "yellowstone-grpc-proto", # 1.15.0+solana.2.0.5 "yellowstone-grpc-tools", # 1.0.0-rc.12+solana.2.0.5 @@ -71,7 +71,7 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" uuid = "1.8.0" vergen = "9.0.0" -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.16.0+solana.2.0.5" } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.16.1+solana.2.0.5" } yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.15.0+solana.2.0.5", default-features = false } [profile.release] diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index bfcca12a..186cf086 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "1.16.0+solana.2.0.5" +version = "1.16.1+solana.2.0.5" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 8d57d30a..c3b903ea 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -9,7 +9,7 @@ use { std::time::Duration, tonic::{ codec::{CompressionEncoding, Streaming}, - metadata::{errors::InvalidMetadataValue, AsciiMetadataValue}, + metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue}, service::interceptor::InterceptedService, transport::channel::{Channel, ClientTlsConfig, Endpoint}, Request, Response, Status, @@ -27,12 +27,7 @@ use { #[derive(Debug, Clone)] pub struct InterceptorXToken { pub x_token: Option, -} - -impl From> for InterceptorXToken { - fn from(x_token: Option) -> Self { - Self { x_token } - } + pub x_request_snapshot: bool, } impl Interceptor for InterceptorXToken { @@ -40,6 +35,11 @@ impl Interceptor for InterceptorXToken { if let Some(x_token) = self.x_token.clone() { request.metadata_mut().insert("x-token", x_token); } + if self.x_request_snapshot { + request + .metadata_mut() + .insert("x-request-snapshot", MetadataValue::from_static("true")); + } Ok(request) } } @@ -215,6 +215,7 @@ pub type GeyserGrpcBuilderResult = Result; pub struct GeyserGrpcBuilder { pub endpoint: Endpoint, pub x_token: Option, + pub x_request_snapshot: bool, pub send_compressed: Option, pub accept_compressed: Option, pub max_decoding_message_size: Option, @@ -227,6 +228,7 @@ impl GeyserGrpcBuilder { Self { endpoint, x_token: None, + x_request_snapshot: false, send_compressed: None, accept_compressed: None, max_decoding_message_size: None, @@ -247,7 +249,10 @@ impl GeyserGrpcBuilder { self, channel: Channel, ) -> GeyserGrpcBuilderResult> { - let interceptor: InterceptorXToken = self.x_token.into(); + let interceptor = InterceptorXToken { + x_token: self.x_token, + x_request_snapshot: self.x_request_snapshot, + }; let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone()); if let Some(encoding) = self.send_compressed { @@ -299,6 +304,14 @@ impl GeyserGrpcBuilder { }) } + // Include `x-request-snapshot` + pub fn set_x_request_snapshot(self, value: bool) -> GeyserGrpcBuilderResult { + Ok(Self { + x_request_snapshot: value, + ..self + }) + } + // Endpoint options pub fn connect_timeout(self, dur: Duration) -> Self { Self { diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index e7de9fa3..6f1a5721 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1122,6 +1122,7 @@ impl GrpcService { async fn client_loop( id: usize, endpoint: String, + x_request_snapshot: bool, config_filters: Arc, stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, @@ -1156,63 +1157,18 @@ impl GrpcService { info!("client #{id}: new"); let mut is_alive = true; - if let Some(snapshot_rx) = snapshot_rx.take() { - info!("client #{id}: going to receive snapshot data"); - - // we start with default filter, for snapshot we need wait actual filter first - while is_alive { - match client_rx.recv().await { - Some(Some(filter_new)) => { - if let Some(msg) = filter_new.get_pong_msg() { - if stream_tx.send(Ok(msg)).await.is_err() { - error!("client #{id}: stream closed"); - is_alive = false; - break; - } - continue; - } - - metrics::update_subscriptions(&endpoint, Some(&filter), Some(&filter_new)); - filter = filter_new; - info!("client #{id}: filter updated"); - break; - } - Some(None) => { - is_alive = false; - } - None => { - is_alive = false; - } - }; - } - - while is_alive { - let message = match snapshot_rx.try_recv() { - Ok(message) => { - MESSAGE_QUEUE_SIZE.dec(); - match message { - Some(message) => message, - None => break, - } - } - Err(crossbeam_channel::TryRecvError::Empty) => { - sleep(Duration::from_millis(1)).await; - continue; - } - Err(crossbeam_channel::TryRecvError::Disconnected) => { - error!("client #{id}: snapshot channel disconnected"); - is_alive = false; - break; - } - }; - - for message in filter.get_update(&message, None) { - if stream_tx.send(Ok(message)).await.is_err() { - error!("client #{id}: stream closed"); - is_alive = false; - break; - } - } + if x_request_snapshot { + if let Some(snapshot_rx) = snapshot_rx.take() { + Self::client_loop_snapshot( + id, + &endpoint, + &stream_tx, + &mut client_rx, + snapshot_rx, + &mut is_alive, + &mut filter, + ) + .await; } } @@ -1297,6 +1253,74 @@ impl GrpcService { info!("client #{id}: removed"); drop_client(); } + + async fn client_loop_snapshot( + id: usize, + endpoint: &str, + stream_tx: &mpsc::Sender>, + client_rx: &mut mpsc::UnboundedReceiver>, + snapshot_rx: crossbeam_channel::Receiver>, + is_alive: &mut bool, + filter: &mut Filter, + ) { + info!("client #{id}: going to receive snapshot data"); + + // we start with default filter, for snapshot we need wait actual filter first + while *is_alive { + match client_rx.recv().await { + Some(Some(filter_new)) => { + if let Some(msg) = filter_new.get_pong_msg() { + if stream_tx.send(Ok(msg)).await.is_err() { + error!("client #{id}: stream closed"); + *is_alive = false; + break; + } + continue; + } + + metrics::update_subscriptions(endpoint, Some(filter), Some(&filter_new)); + *filter = filter_new; + info!("client #{id}: filter updated"); + break; + } + Some(None) => { + *is_alive = false; + } + None => { + *is_alive = false; + } + }; + } + + while *is_alive { + let message = match snapshot_rx.try_recv() { + Ok(message) => { + MESSAGE_QUEUE_SIZE.dec(); + match message { + Some(message) => message, + None => break, + } + } + Err(crossbeam_channel::TryRecvError::Empty) => { + sleep(Duration::from_millis(1)).await; + continue; + } + Err(crossbeam_channel::TryRecvError::Disconnected) => { + error!("client #{id}: snapshot channel disconnected"); + *is_alive = false; + break; + } + }; + + for message in filter.get_update(&message, None) { + if stream_tx.send(Ok(message)).await.is_err() { + error!("client #{id}: stream closed"); + *is_alive = false; + break; + } + } + } + } } #[tonic::async_trait] @@ -1354,6 +1378,7 @@ impl Geyser for GrpcService { .get("x-endpoint") .and_then(|h| h.to_str().ok().map(|s| s.to_string())) .unwrap_or_else(|| "".to_owned()); + let x_request_snapshot = request.metadata().contains_key("x-request-snapshot"); let config_filters = Arc::clone(&self.config_filters); let incoming_stream_tx = stream_tx.clone(); @@ -1400,6 +1425,7 @@ impl Geyser for GrpcService { tokio::spawn(Self::client_loop( id, endpoint, + x_request_snapshot, Arc::clone(&self.config_filters), stream_tx, client_rx,