From 3c08e27e3f3ad3ebc454cb397c33aa0508eb97f8 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 23 Aug 2024 13:59:45 -0500 Subject: [PATCH] geyser: fix `x-request-snapshot` handler (#413) --- CHANGELOG.md | 11 ++++++++ Cargo.lock | 4 +-- Cargo.toml | 6 ++--- yellowstone-grpc-client/Cargo.toml | 2 +- yellowstone-grpc-client/src/lib.rs | 6 ++--- yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-geyser/src/grpc.rs | 36 +++++++++++++-------------- yellowstone-grpc-geyser/src/plugin.rs | 19 ++++++++++++-- 8 files changed, 56 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7eeaefe8..3021de62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,17 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +## 2024-08-23 + +- yellowstone-grpc-client-1.16.2+solana.2.0.5 +- yellowstone-grpc-geyser-1.16.2+solana.2.0.5 +- yellowstone-grpc-proto-1.15.0+solana.2.0.5 +- yellowstone-grpc-tools-1.0.0-rc.12+solana.2.0.5 + +### Fixes + +- geyser: fix `x-request-snapshot` handler ([#413](https://github.com/rpcpool/yellowstone-grpc/pull/413)) + ## 2024-08-22 - yellowstone-grpc-client-1.16.1+solana.2.0.5 diff --git a/Cargo.lock b/Cargo.lock index b007f757..52f7b8ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5162,7 +5162,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.16.1+solana.2.0.5" +version = "1.16.2+solana.2.0.5" dependencies = [ "bytes", "futures", @@ -5199,7 +5199,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "1.16.1+solana.2.0.5" +version = "1.16.2+solana.2.0.5" dependencies = [ "agave-geyser-plugin-interface", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index f88de695..b6f48774 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,8 +2,8 @@ resolver = "2" members = [ "examples/rust", # 1.14.1+solana.2.0.5 - "yellowstone-grpc-client", # 1.16.1+solana.2.0.5 - "yellowstone-grpc-geyser", # 1.16.1+solana.2.0.5 + "yellowstone-grpc-client", # 1.16.2+solana.2.0.5 + "yellowstone-grpc-geyser", # 1.16.2+solana.2.0.5 "yellowstone-grpc-proto", # 1.15.0+solana.2.0.5 "yellowstone-grpc-tools", # 1.0.0-rc.12+solana.2.0.5 ] @@ -71,7 +71,7 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" uuid = "1.8.0" vergen = "9.0.0" -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.16.1+solana.2.0.5" } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.16.2+solana.2.0.5" } yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.15.0+solana.2.0.5", default-features = false } [profile.release] diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index 186cf086..8cf4027b 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "1.16.1+solana.2.0.5" +version = "1.16.2+solana.2.0.5" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index c3b903ea..46f3d5b2 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -305,11 +305,11 @@ impl GeyserGrpcBuilder { } // Include `x-request-snapshot` - pub fn set_x_request_snapshot(self, value: bool) -> GeyserGrpcBuilderResult { - Ok(Self { + pub fn set_x_request_snapshot(self, value: bool) -> Self { + Self { x_request_snapshot: value, ..self - }) + } } // Endpoint options diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 0a416c84..d6da8fc4 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "1.16.1+solana.2.0.5" +version = "1.16.2+solana.2.0.5" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 6f1a5721..eb0a1233 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1122,7 +1122,6 @@ impl GrpcService { async fn client_loop( id: usize, endpoint: String, - x_request_snapshot: bool, config_filters: Arc, stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, @@ -1157,19 +1156,17 @@ impl GrpcService { info!("client #{id}: new"); let mut is_alive = true; - if x_request_snapshot { - if let Some(snapshot_rx) = snapshot_rx.take() { - Self::client_loop_snapshot( - id, - &endpoint, - &stream_tx, - &mut client_rx, - snapshot_rx, - &mut is_alive, - &mut filter, - ) - .await; - } + if let Some(snapshot_rx) = snapshot_rx.take() { + Self::client_loop_snapshot( + id, + &endpoint, + &stream_tx, + &mut client_rx, + snapshot_rx, + &mut is_alive, + &mut filter, + ) + .await; } if is_alive { @@ -1273,7 +1270,6 @@ impl GrpcService { if stream_tx.send(Ok(msg)).await.is_err() { error!("client #{id}: stream closed"); *is_alive = false; - break; } continue; } @@ -1332,7 +1328,13 @@ impl Geyser for GrpcService { mut request: Request>, ) -> TonicResult> { let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed); - let snapshot_rx = self.snapshot_rx.lock().await.take(); + + let x_request_snapshot = request.metadata().contains_key("x-request-snapshot"); + let snapshot_rx = if x_request_snapshot { + self.snapshot_rx.lock().await.take() + } else { + None + }; let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() { self.config_snapshot_client_channel_capacity } else { @@ -1378,7 +1380,6 @@ impl Geyser for GrpcService { .get("x-endpoint") .and_then(|h| h.to_str().ok().map(|s| s.to_string())) .unwrap_or_else(|| "".to_owned()); - let x_request_snapshot = request.metadata().contains_key("x-request-snapshot"); let config_filters = Arc::clone(&self.config_filters); let incoming_stream_tx = stream_tx.clone(); @@ -1425,7 +1426,6 @@ impl Geyser for GrpcService { tokio::spawn(Self::client_loop( id, endpoint, - x_request_snapshot, Arc::clone(&self.config_filters), stream_tx, client_rx, diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 4846037d..65730ef1 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -9,7 +9,14 @@ use { ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }, - std::{concat, env, sync::Arc, time::Duration}, + std::{ + concat, env, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, + }, tokio::{ runtime::{Builder, Runtime}, sync::{mpsc, Notify}, @@ -20,6 +27,7 @@ use { pub struct PluginInner { runtime: Runtime, snapshot_channel: Option>>, + snapshot_channel_closed: AtomicBool, grpc_channel: mpsc::UnboundedSender>, grpc_shutdown: Arc, prometheus: PrometheusService, @@ -94,6 +102,7 @@ impl GeyserPlugin for Plugin { self.inner = Some(PluginInner { runtime, snapshot_channel, + snapshot_channel_closed: AtomicBool::new(false), grpc_channel, grpc_shutdown, prometheus, @@ -133,7 +142,13 @@ impl GeyserPlugin for Plugin { if let Some(channel) = &inner.snapshot_channel { match channel.send(Some(message)) { Ok(()) => MESSAGE_QUEUE_SIZE.inc(), - Err(_) => panic!("failed to send message to startup queue: channel closed"), + Err(_) => { + if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) { + log::error!( + "failed to send message to startup queue: channel closed" + ) + } + } } } } else {