diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a94074f..dd94afd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The minor version will be incremented upon a breaking change and the patch versi - geyser: add `transactions_status` filter ([#310](https://github.com/rpcpool/yellowstone-grpc/pull/310)) - geyser: add metric `slot_status_plugin` ([#312](https://github.com/rpcpool/yellowstone-grpc/pull/312)) - geyser: wrap `geyser_loop` with `unconstrained` ([#313](https://github.com/rpcpool/yellowstone-grpc/pull/313)) +- geyser: wrap messages to `Arc` ([#315](https://github.com/rpcpool/yellowstone-grpc/pull/315)) ### Breaking diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 8cba4e9e..9bcf7f2b 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -655,7 +655,7 @@ impl BlockMetaStorage { #[derive(Debug, Default)] struct SlotMessages { - messages: Vec>, // Option is used for accounts with low write_version + messages: Vec>>, // Option is used for accounts with low write_version block_meta: Option, transactions: Vec, accounts_dedup: HashMap, // (write_version, message_index) @@ -667,7 +667,7 @@ struct SlotMessages { } impl SlotMessages { - pub fn try_seal(&mut self) -> Option { + pub fn try_seal(&mut self) -> Option> { if !self.sealed { if let Some(block_meta) = &self.block_meta { let executed_transaction_count = block_meta.executed_transaction_count as usize; @@ -686,15 +686,15 @@ impl SlotMessages { let mut accounts = Vec::with_capacity(self.messages.len()); for item in self.messages.iter().flatten() { - if let Message::Account(account) = item { + if let Message::Account(account) = item.as_ref() { accounts.push(account.account.clone()); } } - let message = Message::Block( + let message = Arc::new(Message::Block( (block_meta.clone(), transactions, accounts, entries).into(), - ); - self.messages.push(Some(message.clone())); + )); + self.messages.push(Some(Arc::clone(&message))); self.sealed = true; self.entries_count = entries_count; @@ -713,7 +713,7 @@ pub struct GrpcService { blocks_meta: Option, subscribe_id: AtomicUsize, snapshot_rx: Mutex>>>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, + broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, } impl GrpcService { @@ -724,7 +724,7 @@ impl GrpcService { is_reload: bool, ) -> anyhow::Result<( Option>>, - mpsc::UnboundedSender, + mpsc::UnboundedSender>, Arc, )> { // Bind service address @@ -811,9 +811,9 @@ impl GrpcService { } async fn geyser_loop( - mut messages_rx: mpsc::UnboundedReceiver, + mut messages_rx: mpsc::UnboundedReceiver>, blocks_meta_tx: Option>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, + broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, block_fail_action: ConfigBlockFailAction, ) { const PROCESSED_MESSAGES_MAX: usize = 31; @@ -831,19 +831,19 @@ impl GrpcService { MESSAGE_QUEUE_SIZE.dec(); // Update metrics - if let Message::Slot(slot_message) = &message { + if let Message::Slot(slot_message) = message.as_ref() { prom::update_slot_plugin_status(slot_message.status, slot_message.slot); } // Update blocks info if let Some(blocks_meta_tx) = &blocks_meta_tx { - if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { - let _ = blocks_meta_tx.send(message.clone()); + if matches!(message.as_ref(), Message::Slot(_) | Message::BlockMeta(_)) { + let _ = blocks_meta_tx.send(message.as_ref().clone()); } } // Remove outdated block reconstruction info - match &message { + match message.as_ref() { // On startup we can receive few Confirmed/Finalized slots without BlockMeta message // With saved first Processed slot we can ignore errors caused by startup process Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => { @@ -904,11 +904,11 @@ impl GrpcService { // Update block reconstruction info let slot_messages = messages.entry(message.get_slot()).or_default(); - if !matches!(message, Message::Slot(_)) { - slot_messages.messages.push(Some(message.clone())); + if !matches!(message.as_ref(), Message::Slot(_)) { + slot_messages.messages.push(Some(Arc::clone(&message))); // If we already build Block message, new message will be a problem - if slot_messages.sealed && !(matches!(message, Message::Entry(_)) && slot_messages.entries_count == 0) { + if slot_messages.sealed && !(matches!(message.as_ref(), Message::Entry(_)) && slot_messages.entries_count == 0) { prom::update_invalid_blocks(format!("unexpected message {}", message.kind())); match block_fail_action { ConfigBlockFailAction::Log => { @@ -921,7 +921,7 @@ impl GrpcService { } } let mut sealed_block_msg = None; - match &message { + match message.as_ref() { Message::BlockMeta(msg) => { if slot_messages.block_meta.is_some() { prom::update_invalid_blocks("unexpected message: BlockMeta (duplicate)"); @@ -969,7 +969,7 @@ impl GrpcService { } for message in messages_vec { - if let Message::Slot(slot) = message { + if let Message::Slot(slot) = message.as_ref() { let (mut confirmed_messages, mut finalized_messages) = match slot.status { CommitmentLevel::Processed => { (Vec::with_capacity(1), Vec::with_capacity(1)) @@ -1003,7 +1003,7 @@ impl GrpcService { }; // processed - processed_messages.push(message.clone()); + processed_messages.push(Arc::clone(&message)); let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into())); processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX); @@ -1012,7 +1012,7 @@ impl GrpcService { .reset(Instant::now() + PROCESSED_MESSAGES_SLEEP); // confirmed - confirmed_messages.push(message.clone()); + confirmed_messages.push(Arc::clone(&message)); let _ = broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into())); @@ -1023,7 +1023,7 @@ impl GrpcService { } else { let mut confirmed_messages = vec![]; let mut finalized_messages = vec![]; - if matches!(message, Message::Block(_)) { + if matches!(message.as_ref(), Message::Block(_)) { if let Some(slot_messages) = messages.get(&message.get_slot()) { if let Some(confirmed_at) = slot_messages.confirmed_at { confirmed_messages.extend( @@ -1081,7 +1081,7 @@ impl GrpcService { stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, mut snapshot_rx: Option>>, - mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>)>, + mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>>)>, drop_client: impl FnOnce(), ) { CONNECTIONS_TOTAL.inc(); diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 7415cb7a..a5359b25 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -27,14 +27,14 @@ use { pub struct PluginInner { runtime: Runtime, snapshot_channel: Option>>, - grpc_channel: mpsc::UnboundedSender, + grpc_channel: mpsc::UnboundedSender>, grpc_shutdown: Arc, prometheus: PrometheusService, } impl PluginInner { fn send_message(&self, message: Message) { - if self.grpc_channel.send(message).is_ok() { + if self.grpc_channel.send(Arc::new(message)).is_ok() { MESSAGE_QUEUE_SIZE.inc(); } }