From 46ad91e28a7905641808f7e89c8b00d6856887d5 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Mon, 2 Sep 2024 10:44:14 -0500 Subject: [PATCH] geyser: wrap message into `Box` in snapshot channel (#418) --- CHANGELOG.md | 8 ++++++++ Cargo.lock | 2 +- Cargo.toml | 2 +- yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-geyser/src/grpc.rs | 16 ++++++---------- yellowstone-grpc-geyser/src/plugin.rs | 20 ++++++++------------ 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17a07bed..9bfbb1d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +## 2024-09-02 + +- yellowstone-grpc-geyser-1.15.3+solana.1.18.22 + +### Features + +- geyser: wrap message into `Box` in snapshot channel ([#418](https://github.com/rpcpool/yellowstone-grpc/pull/418)) + ## 2024-08-23 - yellowstone-grpc-client-1.15.4+solana.1.18.22 diff --git a/Cargo.lock b/Cargo.lock index 5fe7437d..627e3a54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5289,7 +5289,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "1.15.2+solana.1.18.22" +version = "1.15.3+solana.1.18.22" dependencies = [ "anyhow", "base64 0.21.7", diff --git a/Cargo.toml b/Cargo.toml index af3c0fe4..ac8c0e83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = [ "examples/rust", # 1.13.0+solana.1.18.22 "yellowstone-grpc-client", # 1.15.4+solana.1.18.22 - "yellowstone-grpc-geyser", # 1.15.2+solana.1.18.22 + "yellowstone-grpc-geyser", # 1.15.3+solana.1.18.22 "yellowstone-grpc-proto", # 1.14.2+solana.1.18.22 "yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.22 ] diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 091f3a88..92eb203f 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "1.15.2+solana.1.18.22" +version = "1.15.3+solana.1.18.22" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index dfda0b1c..360215b5 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -716,7 +716,7 @@ pub struct GrpcService { config_filters: Arc, blocks_meta: Option, subscribe_id: AtomicUsize, - snapshot_rx: Mutex>>>, + snapshot_rx: Mutex>>>, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, debug_clients_tx: Option>, } @@ -729,7 +729,7 @@ impl GrpcService { debug_clients_tx: Option>, is_reload: bool, ) -> anyhow::Result<( - Option>>, + Option>>, mpsc::UnboundedSender>, Arc, )> { @@ -1113,7 +1113,7 @@ impl GrpcService { config_filters: Arc, stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, - mut snapshot_rx: Option>>, + mut snapshot_rx: Option>>, mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>>)>, debug_client_tx: Option>, drop_client: impl FnOnce(), @@ -1244,7 +1244,7 @@ impl GrpcService { endpoint: &str, stream_tx: &mpsc::Sender>, client_rx: &mut mpsc::UnboundedReceiver>, - snapshot_rx: crossbeam_channel::Receiver>, + snapshot_rx: crossbeam_channel::Receiver>, is_alive: &mut bool, filter: &mut Filter, ) { @@ -1280,18 +1280,14 @@ impl GrpcService { let message = match snapshot_rx.try_recv() { Ok(message) => { MESSAGE_QUEUE_SIZE.dec(); - match message { - Some(message) => message, - None => break, - } + message } Err(crossbeam_channel::TryRecvError::Empty) => { sleep(Duration::from_millis(1)).await; continue; } Err(crossbeam_channel::TryRecvError::Disconnected) => { - error!("client #{id}: snapshot channel disconnected"); - *is_alive = false; + info!("client #{id}: end of startup"); break; } }; diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 54973969..14854b59 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -13,7 +13,7 @@ use { concat, env, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, time::Duration, }, @@ -26,7 +26,7 @@ use { #[derive(Debug)] pub struct PluginInner { runtime: Runtime, - snapshot_channel: Option>>, + snapshot_channel: Mutex>>>, snapshot_channel_closed: AtomicBool, grpc_channel: mpsc::UnboundedSender>, grpc_shutdown: Arc, @@ -100,7 +100,7 @@ impl GeyserPlugin for Plugin { self.inner = Some(PluginInner { runtime, - snapshot_channel, + snapshot_channel: Mutex::new(snapshot_channel), snapshot_channel_closed: AtomicBool::new(false), grpc_channel, grpc_shutdown, @@ -136,10 +136,10 @@ impl GeyserPlugin for Plugin { ReplicaAccountInfoVersions::V0_0_3(info) => info, }; - let message = Message::Account((account, slot, is_startup).into()); if is_startup { - if let Some(channel) = &inner.snapshot_channel { - match channel.send(Some(message)) { + if let Some(channel) = inner.snapshot_channel.lock().unwrap().as_ref() { + let message = Message::Account((account, slot, is_startup).into()); + match channel.send(Box::new(message)) { Ok(()) => MESSAGE_QUEUE_SIZE.inc(), Err(_) => { if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) { @@ -151,6 +151,7 @@ impl GeyserPlugin for Plugin { } } } else { + let message = Message::Account((account, slot, is_startup).into()); inner.send_message(message); } @@ -160,12 +161,7 @@ impl GeyserPlugin for Plugin { fn notify_end_of_startup(&self) -> PluginResult<()> { self.with_inner(|inner| { - if let Some(channel) = &inner.snapshot_channel { - match channel.send(None) { - Ok(()) => MESSAGE_QUEUE_SIZE.inc(), - Err(_) => panic!("failed to send message to startup queue: channel closed"), - } - } + let _snapshot_channel = inner.snapshot_channel.lock().unwrap().take(); Ok(()) }) }