From 693a10fb72681bd6b6f252812ae8173016e37e4e Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 15 Dec 2024 12:09:18 -0500 Subject: [PATCH] geyser: fix `replay_stored_slots` (#496) --- CHANGELOG.md | 8 ++++++++ Cargo.lock | 2 +- Cargo.toml | 2 +- yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-geyser/config.json | 2 +- yellowstone-grpc-geyser/src/config.rs | 4 ++-- yellowstone-grpc-geyser/src/grpc.rs | 24 +++++++++++++----------- 7 files changed, 27 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dee78913..0483f285 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +## 2024-12-15 + +- yellowstone-grpc-geyser-3.2.1 + +### Fixes + +- geyser: fix `replay_stored_slots` ([#496](https://github.com/rpcpool/yellowstone-grpc/pull/496)) + ## 2024-12-13 - yellowstone-grpc-client-simple-3.2.0 diff --git a/Cargo.lock b/Cargo.lock index 664564b4..b1b2f9b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5052,7 +5052,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "3.2.0" +version = "3.2.1" dependencies = [ "affinity", "agave-geyser-plugin-interface", diff --git a/Cargo.toml b/Cargo.toml index 57cac968..7db44c01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = [ "examples/rust", # 3.2.0 "yellowstone-grpc-client", # 3.1.0 - "yellowstone-grpc-geyser", # 3.2.0 + "yellowstone-grpc-geyser", # 3.2.1 "yellowstone-grpc-proto", # 3.1.0 ] exclude = [ diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 05cbf15c..67927723 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "3.2.0" +version = "3.2.1" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 71cfa6fc..04a0569f 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -1,5 +1,5 @@ { - "libpath": "../target/debug/libyellowstone_grpc_geyser.so", + "libpath": "../target/release/libyellowstone_grpc_geyser.so", "log": { "level": "info" }, diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index c2faf724..0b5e9344 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -194,9 +194,9 @@ pub struct ConfigGrpc { pub replay_stored_slots: u64, #[serde(default)] pub server_http2_adaptive_window: Option, - #[serde(with = "humantime_serde")] + #[serde(default, with = "humantime_serde")] pub server_http2_keepalive_interval: Option, - #[serde(with = "humantime_serde")] + #[serde(default, with = "humantime_serde")] pub server_http2_keepalive_timeout: Option, #[serde(default)] pub server_initial_connection_window_size: Option, diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index fde7d139..914de4cd 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -327,7 +327,7 @@ pub struct GrpcService { subscribe_id: AtomicUsize, snapshot_rx: Mutex>>>, broadcast_tx: broadcast::Sender, - replay_stored_slots_tx: mpsc::Sender, + replay_stored_slots_tx: Option>, debug_clients_tx: Option>, filter_names: Arc>, } @@ -373,12 +373,12 @@ impl GrpcService { // Messages to clients combined by commitment let (broadcast_tx, _) = broadcast::channel(config.channel_capacity); // attempt to prevent spam of geyser loop with capacity eq 1 - let (replay_stored_slots_tx, replay_stored_slots_rx) = - mpsc::channel(if config.replay_stored_slots == 0 { - 0 - } else { - 1 - }); + let (replay_stored_slots_tx, replay_stored_slots_rx) = if config.replay_stored_slots == 0 { + (None, None) + } else { + let (tx, rx) = mpsc::channel(1); + (Some(tx), Some(rx)) + }; // gRPC server builder with optional TLS let mut server_builder = Server::builder(); @@ -495,7 +495,7 @@ impl GrpcService { mut messages_rx: mpsc::UnboundedReceiver, blocks_meta_tx: Option>, broadcast_tx: broadcast::Sender, - mut replay_stored_slots_rx: mpsc::Receiver, + replay_stored_slots_rx: Option>, replay_stored_slots: u64, ) { const PROCESSED_MESSAGES_MAX: usize = 31; @@ -507,6 +507,8 @@ impl GrpcService { let mut processed_first_slot = None; let processed_sleep = sleep(PROCESSED_MESSAGES_SLEEP); tokio::pin!(processed_sleep); + let (_tx, rx) = mpsc::channel(1); + let mut replay_stored_slots_rx = replay_stored_slots_rx.unwrap_or(rx); loop { tokio::select! { @@ -825,7 +827,7 @@ impl GrpcService { mut client_rx: mpsc::UnboundedReceiver, Filter)>>, mut snapshot_rx: Option>>, mut messages_rx: broadcast::Receiver, - replay_stored_slots_tx: mpsc::Sender, + replay_stored_slots_tx: Option>, debug_client_tx: Option>, drop_client: impl FnOnce(), ) { @@ -887,13 +889,13 @@ impl GrpcService { info!("client #{id}: filter updated"); if let Some(from_slot) = from_slot { - if replay_stored_slots_tx.max_capacity() == 0 { + let Some(replay_stored_slots_tx) = &replay_stored_slots_tx else { info!("client #{id}: from_slot is not supported"); tokio::spawn(async move { let _ = stream_tx.send(Err(Status::internal("from_slot is not supported"))).await; }); break 'outer; - } + }; let (tx, rx) = oneshot::channel(); let commitment = filter.get_commitment_level();