From 5bf757051daaeaee60cdb55f1972ceb3c615b868 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 13 Dec 2024 14:24:40 -0500 Subject: [PATCH] proto: add `from_slot` (#477) --- CHANGELOG.md | 16 +- Cargo.lock | 8 +- Cargo.toml | 12 +- examples/rust/Cargo.toml | 2 +- examples/rust/src/bin/client.rs | 6 + examples/rust/src/bin/tx-blocktime.rs | 1 + .../solana-encoding-wasm/Cargo.lock | 2 +- .../solana-encoding-wasm/Cargo.toml | 2 +- yellowstone-grpc-client/Cargo.toml | 2 +- yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-geyser/config.json | 5 +- yellowstone-grpc-geyser/src/config.rs | 7 + yellowstone-grpc-geyser/src/grpc.rs | 187 +++++++++++++++--- yellowstone-grpc-proto/Cargo.toml | 2 +- yellowstone-grpc-proto/proto/geyser.proto | 1 + .../src/plugin/filter/filter.rs | 9 + 16 files changed, 212 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfe3f64f..d13b7a45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,17 +12,29 @@ The minor version will be incremented upon a breaking change and the patch versi ### Fixes +### Features + +### Breaking + +## 2024-12-13 + +- yellowstone-grpc-client-simple-4.2.0 +- yellowstone-grpc-client-4.1.0 +- yellowstone-grpc-geyser-4.2.0 +- yellowstone-grpc-proto-4.1.0 + +### Fixes + - nodejs: fix connector for custom port ([#488](https://github.com/rpcpool/yellowstone-grpc/pull/488)) - nodejs: fix connector for host/hostname ([#491](https://github.com/rpcpool/yellowstone-grpc/pull/491)) ### Features - proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474)) +- proto: add `from_slot` ([#477](https://github.com/rpcpool/yellowstone-grpc/pull/477)) - proto: add field `created_at` to update message ([#479](https://github.com/rpcpool/yellowstone-grpc/pull/479)) - nodejs: add parse err function ([#483](https://github.com/rpcpool/yellowstone-grpc/pull/483)) -### Breaking - ## 2024-12-01 - yellowstone-grpc-client-simple-4.1.0 diff --git a/Cargo.lock b/Cargo.lock index e224ec44..73d6b69d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5960,7 +5960,7 @@ checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" [[package]] name = "yellowstone-grpc-client" -version = "4.0.0" +version = "4.1.0" dependencies = [ "bytes", "futures", @@ -5973,7 +5973,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client-simple" -version = "4.1.0" +version = "4.2.0" dependencies = [ "anyhow", "backoff", @@ -5998,7 +5998,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "4.1.0" +version = "4.2.0" dependencies = [ "affinity", "agave-geyser-plugin-interface", @@ -6038,7 +6038,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "4.0.0" +version = "4.1.0" dependencies = [ "agave-geyser-plugin-interface", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 44b9280f..31dd9497 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] resolver = "2" members = [ - "examples/rust", # 4.1.0 - "yellowstone-grpc-client", # 4.0.0 - "yellowstone-grpc-geyser", # 4.1.0 - "yellowstone-grpc-proto", # 4.0.0 + "examples/rust", # 4.2.0 + "yellowstone-grpc-client", # 4.1.0 + "yellowstone-grpc-geyser", # 4.2.0 + "yellowstone-grpc-proto", # 4.1.0 ] exclude = [ "yellowstone-grpc-client-nodejs/solana-encoding-wasm", # 3.0.0 @@ -69,8 +69,8 @@ tonic = "0.12.1" tonic-build = "0.12.1" tonic-health = "0.12.1" vergen = "9.0.0" -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "4.0.0" } -yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "4.0.0", default-features = false } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "4.1.0" } +yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "4.1.0", default-features = false } [workspace.lints.clippy] clone_on_ref_ptr = "deny" diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index ffa89eca..a42d47d6 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client-simple" -version = "4.1.0" +version = "4.2.0" authors = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index fcd536b6..fbca240b 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -329,6 +329,10 @@ struct ActionSubscribe { #[clap(long)] blocks_meta: bool, + /// Re-send message from slot + #[clap(long)] + from_slot: Option, + /// Send ping in subscribe request #[clap(long)] ping: Option, @@ -522,6 +526,7 @@ impl Action { commitment: commitment.map(|x| x as i32), accounts_data_slice, ping, + from_slot: args.from_slot, }, args.resub.unwrap_or(0), args.stats, @@ -887,6 +892,7 @@ async fn geyser_subscribe( commitment: None, accounts_data_slice: Vec::default(), ping: None, + from_slot: None, }) .await .map_err(GeyserGrpcClientError::SubscribeSendError)?; diff --git a/examples/rust/src/bin/tx-blocktime.rs b/examples/rust/src/bin/tx-blocktime.rs index 4162e36b..b61ca1c0 100644 --- a/examples/rust/src/bin/tx-blocktime.rs +++ b/examples/rust/src/bin/tx-blocktime.rs @@ -111,6 +111,7 @@ async fn main() -> anyhow::Result<()> { commitment: Some(commitment as i32), accounts_data_slice: vec![], ping: None, + from_slot: None, }) .await?; diff --git a/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock b/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock index f9fa86ff..03193c23 100644 --- a/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock +++ b/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock @@ -3259,7 +3259,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "4.0.0" +version = "4.1.0" dependencies = [ "anyhow", "bincode", diff --git a/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.toml b/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.toml index ada23c27..f24589d2 100644 --- a/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.toml +++ b/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.toml @@ -16,7 +16,7 @@ crate-type = ["cdylib"] serde_json = "1.0.86" solana-transaction-status = "~2.1.1" wasm-bindgen = "0.2.95" -yellowstone-grpc-proto = { path = "../../yellowstone-grpc-proto", version = "4.0.0", default-features = false, features = ["convert"] } +yellowstone-grpc-proto = { path = "../../yellowstone-grpc-proto", version = "4.1.0", default-features = false, features = ["convert"] } [workspace.lints.clippy] clone_on_ref_ptr = "deny" diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index 168cbaa5..467c439a 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "4.0.0" +version = "4.1.0" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 9f35ae93..742246bf 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "4.1.0" +version = "4.2.0" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 180c913a..7c7373c2 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -24,8 +24,9 @@ "unary_concurrency_limit": 100, "unary_disabled": false, "x_token": null, - "filter_name_size_limit": 32, - "filter_names_size_limit": 1024, + "replay_stored_slots": 0, + "filter_name_size_limit": 128, + "filter_names_size_limit": 4096, "filter_names_cleanup_interval": "1s", "filter_limits": { "accounts": { diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index 0d6b2ec0..0c42f7e3 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -189,6 +189,9 @@ pub struct ConfigGrpc { with = "humantime_serde" )] pub filter_names_cleanup_interval: Duration, + /// Number of slots stored for re-broadcast (replay) + #[serde(default = "ConfigGrpc::default_replay_stored_slots")] + pub replay_stored_slots: u64, } impl ConfigGrpc { @@ -223,6 +226,10 @@ impl ConfigGrpc { const fn default_filter_names_cleanup_interval() -> Duration { Duration::from_secs(1) } + + const fn default_replay_stored_slots() -> u64 { + 0 + } } #[derive(Debug, Clone, Deserialize)] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 987a546c..07dff175 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -22,7 +22,7 @@ use { tokio::{ fs, runtime::Builder, - sync::{broadcast, mpsc, Mutex, Notify, RwLock, Semaphore}, + sync::{broadcast, mpsc, oneshot, Mutex, Notify, RwLock, Semaphore}, task::spawn_blocking, time::{sleep, Duration, Instant}, }, @@ -251,9 +251,22 @@ impl BlockMetaStorage { } } +#[derive(Debug, Default)] +struct MessageId { + id: u64, +} + +impl MessageId { + fn next(&mut self) -> u64 { + self.id = self.id.checked_add(1).expect("message id overflow"); + self.id + } +} + #[derive(Debug, Default)] struct SlotMessages { - messages: Vec>, // Option is used for accounts with low write_version + messages: Vec>, // Option is used for accounts with low write_version + messages_slots: Vec<(u64, Message)>, block_meta: Option>, transactions: Vec>, accounts_dedup: HashMap, // (write_version, message_index) @@ -268,7 +281,7 @@ struct SlotMessages { } impl SlotMessages { - pub fn try_seal(&mut self) -> Option { + pub fn try_seal(&mut self, msgid_gen: &mut MessageId) -> Option<(u64, Message)> { if !self.sealed { if let Some(block_meta) = &self.block_meta { let executed_transaction_count = block_meta.executed_transaction_count as usize; @@ -287,17 +300,18 @@ impl SlotMessages { let mut accounts = Vec::with_capacity(self.messages.len()); for item in self.messages.iter().flatten() { - if let Message::Account(account) = item { + if let (_msgid, Message::Account(account)) = item { accounts.push(Arc::clone(&account.account)); } } - let message = Message::Block(Arc::new(MessageBlock::new( + let message_block = Message::Block(Arc::new(MessageBlock::new( Arc::clone(block_meta), transactions, accounts, entries, ))); + let message = (msgid_gen.next(), message_block); self.messages.push(Some(message.clone())); self.sealed = true; @@ -311,6 +325,15 @@ impl SlotMessages { } } +type BroadcastedMessage = (CommitmentLevel, Arc>); + +enum ReplayedResponse { + Messages(Vec<(u64, Message)>), + Lagged(Slot), +} + +type ReplayStoredSlotsRequest = (CommitmentLevel, Slot, oneshot::Sender); + #[derive(Debug)] pub struct GrpcService { config_snapshot_client_channel_capacity: usize, @@ -319,7 +342,8 @@ pub struct GrpcService { blocks_meta: Option, subscribe_id: AtomicUsize, snapshot_rx: Mutex>>>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, + broadcast_tx: broadcast::Sender, + replay_stored_slots_tx: mpsc::Sender, debug_clients_tx: Option>, filter_names: Arc>, } @@ -364,6 +388,13 @@ impl GrpcService { // Messages to clients combined by commitment let (broadcast_tx, _) = broadcast::channel(config.channel_capacity); + // attempt to prevent spam of geyser loop with capacity eq 1 + let (replay_stored_slots_tx, replay_stored_slots_rx) = + mpsc::channel(if config.replay_stored_slots == 0 { + 0 + } else { + 1 + }); // gRPC server builder with optional TLS let mut server_builder = Server::builder(); @@ -394,6 +425,7 @@ impl GrpcService { subscribe_id: AtomicUsize::new(0), snapshot_rx: Mutex::new(snapshot_rx), broadcast_tx: broadcast_tx.clone(), + replay_stored_slots_tx, debug_clients_tx, filter_names, }) @@ -422,7 +454,13 @@ impl GrpcService { .enable_all() .build() .expect("Failed to create a new runtime for geyser loop") - .block_on(Self::geyser_loop(messages_rx, blocks_meta_tx, broadcast_tx)); + .block_on(Self::geyser_loop( + messages_rx, + blocks_meta_tx, + broadcast_tx, + replay_stored_slots_rx, + config.replay_stored_slots, + )); }); // Run Server @@ -457,11 +495,14 @@ impl GrpcService { async fn geyser_loop( mut messages_rx: mpsc::UnboundedReceiver, blocks_meta_tx: Option>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, + broadcast_tx: broadcast::Sender, + mut replay_stored_slots_rx: mpsc::Receiver, + replay_stored_slots: u64, ) { const PROCESSED_MESSAGES_MAX: usize = 31; const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10); + let mut msgid_gen = MessageId::default(); let mut messages: BTreeMap = Default::default(); let mut processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX); let mut processed_first_slot = None; @@ -472,6 +513,7 @@ impl GrpcService { tokio::select! { Some(message) = messages_rx.recv() => { metrics::message_queue_size_dec(); + let msgid = msgid_gen.next(); // Update metrics if let Message::Slot(slot_message) = &message { @@ -493,8 +535,8 @@ impl GrpcService { processed_first_slot = Some(msg.slot); } Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => { - // keep extra 10 slots - if let Some(msg_slot) = msg.slot.checked_sub(10) { + // keep extra 10 slots + slots for replay + if let Some(msg_slot) = msg.slot.checked_sub(10 + replay_stored_slots) { loop { match messages.keys().next().cloned() { Some(slot) if slot < msg_slot => { @@ -553,8 +595,10 @@ impl GrpcService { _ => {} } } - if !matches!(&message, Message::Slot(_)) { - slot_messages.messages.push(Some(message.clone())); + if matches!(&message, Message::Slot(_)) { + slot_messages.messages_slots.push((msgid, message.clone())); + } else { + slot_messages.messages.push(Some((msgid, message.clone()))); // If we already build Block message, new message will be a problem if slot_messages.sealed && !(matches!(&message, Message::Entry(_)) && slot_messages.entries_count == 0) { @@ -576,11 +620,11 @@ impl GrpcService { metrics::update_invalid_blocks("unexpected message: BlockMeta (duplicate)"); } slot_messages.block_meta = Some(Arc::clone(msg)); - sealed_block_msg = slot_messages.try_seal(); + sealed_block_msg = slot_messages.try_seal(&mut msgid_gen); } Message::Transaction(msg) => { slot_messages.transactions.push(Arc::clone(&msg.transaction)); - sealed_block_msg = slot_messages.try_seal(); + sealed_block_msg = slot_messages.try_seal(&mut msgid_gen); } // Dedup accounts by max write_version Message::Account(msg) => { @@ -598,7 +642,7 @@ impl GrpcService { } Message::Entry(msg) => { slot_messages.entries.push(Arc::clone(msg)); - sealed_block_msg = slot_messages.try_seal(); + sealed_block_msg = slot_messages.try_seal(&mut msgid_gen); } _ => {} } @@ -613,7 +657,7 @@ impl GrpcService { } else { None }; - messages_vec.push(message); + messages_vec.push((msgid, message)); // sometimes we do not receive all statuses if let Some((slot, status)) = slot_status { @@ -634,20 +678,21 @@ impl GrpcService { } slots.push(parent); - messages_vec.push(Message::Slot(MessageSlot { + let message_slot = Message::Slot(MessageSlot { slot: parent, parent: entry.parent_slot, status, dead_error: None, created_at: Timestamp::from(SystemTime::now()) - })); + }); + messages_vec.push((msgid_gen.next(), message_slot)); metrics::missed_status_message_inc(status); } } } for message in messages_vec.into_iter().rev() { - if let Message::Slot(slot) = &message { + if let Message::Slot(slot) = &message.1 { let (mut confirmed_messages, mut finalized_messages) = match slot.status { CommitmentLevel::Processed | CommitmentLevel::FirstShredReceived | CommitmentLevel::Completed | CommitmentLevel::CreatedBank | CommitmentLevel::Dead => { (Vec::with_capacity(1), Vec::with_capacity(1)) @@ -701,8 +746,8 @@ impl GrpcService { } else { let mut confirmed_messages = vec![]; let mut finalized_messages = vec![]; - if matches!(&message, Message::Block(_)) { - if let Some(slot_messages) = messages.get(&message.get_slot()) { + if matches!(&message.1, Message::Block(_)) { + if let Some(slot_messages) = messages.get(&message.1.get_slot()) { if let Some(confirmed_at) = slot_messages.confirmed_at { confirmed_messages.extend( slot_messages.messages.as_slice()[confirmed_at..].iter().filter_map(|x| x.clone()) @@ -748,6 +793,28 @@ impl GrpcService { } processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP); } + Some((commitment, replay_slot, tx)) = replay_stored_slots_rx.recv() => { + if let Some((slot, _)) = messages.first_key_value() { + if replay_slot < *slot { + let _ = tx.send(ReplayedResponse::Lagged(*slot)); + continue; + } + } + + let mut replayed_messages = Vec::with_capacity(32_768); + for (slot, messages) in messages.iter() { + if *slot >= replay_slot { + replayed_messages.extend_from_slice(&messages.messages_slots); + if commitment == CommitmentLevel::Processed + || (commitment == CommitmentLevel::Finalized && messages.finalized) + || (commitment == CommitmentLevel::Confirmed && messages.confirmed) + { + replayed_messages.extend(messages.messages.iter().filter_map(|v| v.clone())); + } + } + } + let _ = tx.send(ReplayedResponse::Messages(replayed_messages)); + } else => break, } } @@ -758,9 +825,10 @@ impl GrpcService { id: usize, endpoint: String, stream_tx: mpsc::Sender>, - mut client_rx: mpsc::UnboundedReceiver>, + mut client_rx: mpsc::UnboundedReceiver, Filter)>>, mut snapshot_rx: Option>>, - mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>)>, + mut messages_rx: broadcast::Receiver, + replay_stored_slots_tx: mpsc::Sender, debug_client_tx: Option>, drop_client: impl FnOnce(), ) { @@ -807,7 +875,7 @@ impl GrpcService { } match message { - Some(Some(filter_new)) => { + Some(Some((from_slot, filter_new))) => { if let Some(msg) = filter_new.get_pong_msg() { if stream_tx.send(Ok(msg)).await.is_err() { error!("client #{id}: stream closed"); @@ -820,6 +888,60 @@ impl GrpcService { filter = filter_new; DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter { id, filter: Box::new(filter.clone()) }); info!("client #{id}: filter updated"); + + if let Some(from_slot) = from_slot { + if replay_stored_slots_tx.max_capacity() == 0 { + info!("client #{id}: from_slot is not supported"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("from_slot is not supported"))).await; + }); + break 'outer; + } + + let (tx, rx) = oneshot::channel(); + let commitment = filter.get_commitment_level(); + if let Err(_error) = replay_stored_slots_tx.send((commitment, from_slot, tx)).await { + error!("client #{id}: failed to send from_slot request"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("failed to send from_slot request"))).await; + }); + break 'outer; + } + + let mut messages = match rx.await { + Ok(ReplayedResponse::Messages(messages)) => messages, + Ok(ReplayedResponse::Lagged(slot)) => { + info!("client #{id}: broadcast from {from_slot} is not available"); + tokio::spawn(async move { + let message = format!( + "broadcast from {from_slot} is not available, last available: {slot}" + ); + let _ = stream_tx.send(Err(Status::internal(message))).await; + }); + break 'outer; + }, + Err(_error) => { + error!("client #{id}: failed to get replay response"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("failed to get replay response"))).await; + }); + break 'outer; + } + }; + + messages.sort_by_key(|msg| msg.0); + for (_msgid, message) in messages.iter() { + for message in filter.get_updates(message, Some(commitment)) { + match stream_tx.send(Ok(message)).await { + Ok(()) => {} + Err(mpsc::error::SendError(_)) => { + error!("client #{id}: stream closed"); + break 'outer; + } + } + } + } + } } Some(None) => { break 'outer; @@ -838,21 +960,21 @@ impl GrpcService { Err(broadcast::error::RecvError::Lagged(_)) => { info!("client #{id}: lagged to receive geyser messages"); tokio::spawn(async move { - let _ = stream_tx.send(Err(Status::internal("lagged"))).await; + let _ = stream_tx.send(Err(Status::internal("lagged to receive geyser messages"))).await; }); break 'outer; } }; if commitment == filter.get_commitment_level() { - for message in messages.iter() { + for (_msgid, message) in messages.iter() { for message in filter.get_updates(message, Some(commitment)) { match stream_tx.try_send(Ok(message)) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { - error!("client #{id}: lagged to send update"); + error!("client #{id}: lagged to send an update"); tokio::spawn(async move { - let _ = stream_tx.send(Err(Status::internal("lagged"))).await; + let _ = stream_tx.send(Err(Status::internal("lagged to send an update"))).await; }); break 'outer; } @@ -867,7 +989,7 @@ impl GrpcService { if commitment == CommitmentLevel::Processed && debug_client_tx.is_some() { for message in messages.iter() { - if let Message::Slot(slot_message) = &message { + if let Message::Slot(slot_message) = &message.1 { DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateSlot { id, slot: slot_message.slot }); } } @@ -888,7 +1010,7 @@ impl GrpcService { id: usize, endpoint: &str, stream_tx: &mpsc::Sender>, - client_rx: &mut mpsc::UnboundedReceiver>, + client_rx: &mut mpsc::UnboundedReceiver, Filter)>>, snapshot_rx: crossbeam_channel::Receiver>, is_alive: &mut bool, filter: &mut Filter, @@ -898,7 +1020,7 @@ impl GrpcService { // we start with default filter, for snapshot we need wait actual filter first while *is_alive { match client_rx.recv().await { - Some(Some(filter_new)) => { + Some(Some((_from_slot, filter_new))) => { if let Some(msg) = filter_new.get_pong_msg() { if stream_tx.send(Ok(msg)).await.is_err() { error!("client #{id}: stream closed"); @@ -1026,7 +1148,7 @@ impl Geyser for GrpcService { filter_names.try_clean(); if let Err(error) = match Filter::new(&request, &config_filter_limits, &mut filter_names) { - Ok(filter) => match incoming_client_tx.send(Some(filter)) { + Ok(filter) => match incoming_client_tx.send(Some((request.from_slot ,filter))) { Ok(()) => Ok(()), Err(error) => Err(error.to_string()), }, @@ -1059,6 +1181,7 @@ impl Geyser for GrpcService { client_rx, snapshot_rx, self.broadcast_tx.subscribe(), + self.replay_stored_slots_tx.clone(), self.debug_clients_tx.clone(), move || { notify_exit1.notify_one(); diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 9ddf7e56..1bbac116 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-proto" -version = "4.0.0" +version = "4.1.0" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Protobuf Definitions" diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index 94f3113b..523c02e3 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -38,6 +38,7 @@ message SubscribeRequest { optional CommitmentLevel commitment = 6; repeated SubscribeRequestAccountsDataSlice accounts_data_slice = 7; optional SubscribeRequestPing ping = 9; + optional uint64 from_slot = 11; } message SubscribeRequestFilterAccounts { diff --git a/yellowstone-grpc-proto/src/plugin/filter/filter.rs b/yellowstone-grpc-proto/src/plugin/filter/filter.rs index a2bbd428..e81c7c15 100644 --- a/yellowstone-grpc-proto/src/plugin/filter/filter.rs +++ b/yellowstone-grpc-proto/src/plugin/filter/filter.rs @@ -1185,6 +1185,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()); @@ -1216,6 +1217,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let mut limit = FilterLimits::default(); limit.accounts.any = false; @@ -1251,6 +1253,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let mut limit = FilterLimits::default(); limit.transactions.any = false; @@ -1285,6 +1288,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let mut limit = FilterLimits::default(); limit.transactions.any = false; @@ -1325,6 +1329,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1389,6 +1394,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1453,6 +1459,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1503,6 +1510,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1575,6 +1583,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap();