From 71908843304c74b706384b8a15c30d1d91761da4 Mon Sep 17 00:00:00 2001 From: niks3089 Date: Wed, 10 Apr 2024 11:05:14 +0530 Subject: [PATCH] Cleanup code --- Cargo.lock | 1 - Cargo.toml | 1 - yellowstone-grpc-geyser/Cargo.toml | 1 - yellowstone-grpc-geyser/src/grpc.rs | 144 ++++++++++------------------ yellowstone-grpc-geyser/src/prom.rs | 18 ---- 5 files changed, 52 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 702c90a8..5a54aa8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5227,7 +5227,6 @@ dependencies = [ "lazy_static", "log", "prometheus", - "scopeguard", "serde", "serde_json", "solana-geyser-plugin-interface", diff --git a/Cargo.toml b/Cargo.toml index 920c6937..b853f010 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,6 @@ prometheus = "0.13.2" prost = "0.12.1" protobuf-src = "1.1.0" rdkafka = "0.34.0" -scopeguard = "1.2.0" serde = "1.0.145" serde_json = "1.0.86" serde_yaml = "0.9.25" diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 234c6cd7..8cac576e 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -29,7 +29,6 @@ hyper = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } prometheus = { workspace = true } -scopeguard = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } solana-geyser-plugin-interface = { workspace = true } diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 56851050..ea6155c2 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -2,11 +2,7 @@ use { crate::{ config::{ConfigBlockFailAction, ConfigGrpc}, filters::{Filter, FilterAccountsDataSlice}, - prom::{ - self, BLOCK_UPDATE_HISTOGRAM, CONNECTIONS_TOTAL, GEYSER_LOOP_HISTOGRAM, - MESSAGE_QUEUE_SIZE, REMOVE_OUTDATED_HISTOGRAM, SNAPSHOT_MESSAGE_QUEUE_SIZE, - UPDATE_RECONSTRUCTION_HISTOGRAM, - }, + prom::{self, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE, SNAPSHOT_MESSAGE_QUEUE_SIZE}, version::GrpcVersionInfo, }, log::{error, info}, @@ -767,16 +763,16 @@ impl GrpcService { .max_decoding_message_size(max_decoding_message_size); // Run geyser message loop - // let geyser_service = self.clone(); // Clone the GrpcService or otherwise capture the needed context let geyser_blocks_meta_tx = blocks_meta_tx.clone(); let geyser_broadcast_tx = broadcast_tx.clone(); let geyser_block_fail_action = block_fail_action; let (messages_tx, messages_rx) = mpsc::unbounded_channel(); thread::spawn(move || { let rt = Builder::new_multi_thread() + .worker_threads(4) .enable_all() .build() - .expect("Failed to create a new Tokio runtime"); + .expect("Failed to create a new runtime for geyser loop"); rt.block_on(async { Self::geyser_loop( @@ -789,13 +785,6 @@ impl GrpcService { }); }); - // tokio::spawn(Self::geyser_loop( - // messages_rx, - // blocks_meta_tx, - // broadcast_tx, - // block_fail_action, - // )); - // Run Server let shutdown = Arc::new(Notify::new()); let shutdown_grpc = Arc::clone(&shutdown); @@ -833,103 +822,75 @@ impl GrpcService { loop { tokio::select! { Some(message) = messages_rx.recv() => { - let start_time = Instant::now(); - let _guard = scopeguard::guard((), |_| { - let elapsed = start_time.elapsed().as_nanos(); - GEYSER_LOOP_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0); - }); MESSAGE_QUEUE_SIZE.dec(); - { - let block_update_start = Instant::now(); - let _block_update_guard = scopeguard::guard((), |_| { - let elapsed = block_update_start.elapsed().as_nanos(); - BLOCK_UPDATE_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0); - }); - - // Update blocks info - if let Some(blocks_meta_tx) = &blocks_meta_tx { - if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { - let _ = blocks_meta_tx.send(message.clone()); - } + // Update blocks info + if let Some(blocks_meta_tx) = &blocks_meta_tx { + if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { + let _ = blocks_meta_tx.send(message.clone()); } } // Remove outdated block reconstruction info - { - let remove_outdated_start = Instant::now(); - let _remove_outdated_guard = scopeguard::guard((), |_| { - let elapsed = remove_outdated_start.elapsed().as_nanos(); - REMOVE_OUTDATED_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0); - }); - - match &message { - // On startup we can receive few Confirmed/Finalized slots without BlockMeta message - // With saved first Processed slot we can ignore errors caused by startup process - Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => { - processed_first_slot = Some(msg.slot); - } - Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => { - // keep extra 10 slots - if let Some(msg_slot) = msg.slot.checked_sub(10) { - loop { - match messages.keys().next().cloned() { - Some(slot) if slot < msg_slot => { - if let Some(slot_messages) = messages.remove(&slot) { - match processed_first_slot { - Some(processed_first) if slot <= processed_first => continue, - None => continue, - _ => {} + match &message { + // On startup we can receive few Confirmed/Finalized slots without BlockMeta message + // With saved first Processed slot we can ignore errors caused by startup process + Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => { + processed_first_slot = Some(msg.slot); + } + Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => { + // keep extra 10 slots + if let Some(msg_slot) = msg.slot.checked_sub(10) { + loop { + match messages.keys().next().cloned() { + Some(slot) if slot < msg_slot => { + if let Some(slot_messages) = messages.remove(&slot) { + match processed_first_slot { + Some(processed_first) if slot <= processed_first => continue, + None => continue, + _ => {} + } + + if !slot_messages.sealed && slot_messages.finalized_at.is_some() { + let mut reasons = vec![]; + if let Some(block_meta) = slot_messages.block_meta { + let block_txn_count = block_meta.executed_transaction_count as usize; + let msg_txn_count = slot_messages.transactions.len(); + if block_txn_count != msg_txn_count { + reasons.push("InvalidTxnCount"); + error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}"); + } + let block_entries_count = block_meta.entries_count as usize; + let msg_entries_count = slot_messages.entries.len(); + if block_entries_count != msg_entries_count { + reasons.push("InvalidEntriesCount"); + error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}"); + } + } else { + reasons.push("NoBlockMeta"); } + let reason = reasons.join(","); - if !slot_messages.sealed && slot_messages.finalized_at.is_some() { - let mut reasons = vec![]; - if let Some(block_meta) = slot_messages.block_meta { - let block_txn_count = block_meta.executed_transaction_count as usize; - let msg_txn_count = slot_messages.transactions.len(); - if block_txn_count != msg_txn_count { - reasons.push("InvalidTxnCount"); - error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}"); - } - let block_entries_count = block_meta.entries_count as usize; - let msg_entries_count = slot_messages.entries.len(); - if block_entries_count != msg_entries_count { - reasons.push("InvalidEntriesCount"); - error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}"); - } - } else { - reasons.push("NoBlockMeta"); + prom::update_invalid_blocks(format!("failed reconstruct {reason}")); + match block_fail_action { + ConfigBlockFailAction::Log => { + error!("failed reconstruct #{slot} {reason}"); } - let reason = reasons.join(","); - - prom::update_invalid_blocks(format!("failed reconstruct {reason}")); - match block_fail_action { - ConfigBlockFailAction::Log => { - error!("failed reconstruct #{slot} {reason}"); - } - ConfigBlockFailAction::Panic => { - panic!("failed reconstruct #{slot} {reason}"); - } + ConfigBlockFailAction::Panic => { + panic!("failed reconstruct #{slot} {reason}"); } } } } - _ => break, } + _ => break, } } } - _ => {} } + _ => {} } - { - let update_reconstruction_start = Instant::now(); - let _update_reconstruction_guard = scopeguard::guard((), |_| { - let elapsed = update_reconstruction_start.elapsed().as_nanos(); - UPDATE_RECONSTRUCTION_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0); - }); - // Update block reconstruction info let slot_messages = messages.entry(message.get_slot()).or_default(); if !matches!(message, Message::Slot(_)) { @@ -1091,7 +1052,6 @@ impl GrpcService { } } } - } () = &mut processed_sleep => { if !processed_messages.is_empty() { let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into())); diff --git a/yellowstone-grpc-geyser/src/prom.rs b/yellowstone-grpc-geyser/src/prom.rs index aaa65fe0..1d329df1 100644 --- a/yellowstone-grpc-geyser/src/prom.rs +++ b/yellowstone-grpc-geyser/src/prom.rs @@ -46,23 +46,9 @@ lazy_static::lazy_static! { "snapshot_message_queue_size", "Size of snapshot message queue" ).unwrap(); - pub static ref GEYSER_LOOP_HISTOGRAM: Histogram = Histogram::with_opts( - HistogramOpts::new("geyser_loop_histogram", "Processing loop time") - ).unwrap(); - pub static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new( "connections_total", "Total number of connections to GRPC service" ).unwrap(); - - pub static ref BLOCK_UPDATE_HISTOGRAM: Histogram = Histogram::with_opts( - HistogramOpts::new("block_update_histogram", "block loop time") - ).unwrap(); - pub static ref REMOVE_OUTDATED_HISTOGRAM: Histogram = Histogram::with_opts( - HistogramOpts::new("remove_outdated_histogram", "outdated loop time") - ).unwrap(); - pub static ref UPDATE_RECONSTRUCTION_HISTOGRAM: Histogram = Histogram::with_opts( - HistogramOpts::new("update_reconstruction_histogram", "reconstruction loop time") - ).unwrap(); } #[derive(Debug)] @@ -88,10 +74,6 @@ impl PrometheusService { register!(CONNECTIONS_TOTAL); register!(INCOMING_MESSAGES_COUNTER); register!(SNAPSHOT_MESSAGE_QUEUE_SIZE); - register!(GEYSER_LOOP_HISTOGRAM); - register!(BLOCK_UPDATE_HISTOGRAM); - register!(REMOVE_OUTDATED_HISTOGRAM); - register!(UPDATE_RECONSTRUCTION_HISTOGRAM); VERSION .with_label_values(&[