Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
niks3089 committed Apr 10, 2024
1 parent 41e9838 commit 7190884
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 113 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ prometheus = "0.13.2"
prost = "0.12.1"
protobuf-src = "1.1.0"
rdkafka = "0.34.0"
scopeguard = "1.2.0"
serde = "1.0.145"
serde_json = "1.0.86"
serde_yaml = "0.9.25"
Expand Down
1 change: 0 additions & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ hyper = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
prometheus = { workspace = true }
scopeguard = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
Expand Down
144 changes: 52 additions & 92 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ use {
crate::{
config::{ConfigBlockFailAction, ConfigGrpc},
filters::{Filter, FilterAccountsDataSlice},
prom::{
self, BLOCK_UPDATE_HISTOGRAM, CONNECTIONS_TOTAL, GEYSER_LOOP_HISTOGRAM,
MESSAGE_QUEUE_SIZE, REMOVE_OUTDATED_HISTOGRAM, SNAPSHOT_MESSAGE_QUEUE_SIZE,
UPDATE_RECONSTRUCTION_HISTOGRAM,
},
prom::{self, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE, SNAPSHOT_MESSAGE_QUEUE_SIZE},
version::GrpcVersionInfo,
},
log::{error, info},
Expand Down Expand Up @@ -767,16 +763,16 @@ impl GrpcService {
.max_decoding_message_size(max_decoding_message_size);

// Run geyser message loop
// let geyser_service = self.clone(); // Clone the GrpcService or otherwise capture the needed context
let geyser_blocks_meta_tx = blocks_meta_tx.clone();
let geyser_broadcast_tx = broadcast_tx.clone();
let geyser_block_fail_action = block_fail_action;
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
thread::spawn(move || {
let rt = Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.expect("Failed to create a new Tokio runtime");
.expect("Failed to create a new runtime for geyser loop");

rt.block_on(async {
Self::geyser_loop(
Expand All @@ -789,13 +785,6 @@ impl GrpcService {
});
});

// tokio::spawn(Self::geyser_loop(
// messages_rx,
// blocks_meta_tx,
// broadcast_tx,
// block_fail_action,
// ));

// Run Server
let shutdown = Arc::new(Notify::new());
let shutdown_grpc = Arc::clone(&shutdown);
Expand Down Expand Up @@ -833,103 +822,75 @@ impl GrpcService {
loop {
tokio::select! {
Some(message) = messages_rx.recv() => {
let start_time = Instant::now();
let _guard = scopeguard::guard((), |_| {
let elapsed = start_time.elapsed().as_nanos();
GEYSER_LOOP_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0);
});
MESSAGE_QUEUE_SIZE.dec();

{
let block_update_start = Instant::now();
let _block_update_guard = scopeguard::guard((), |_| {
let elapsed = block_update_start.elapsed().as_nanos();
BLOCK_UPDATE_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0);
});

// Update blocks info
if let Some(blocks_meta_tx) = &blocks_meta_tx {
if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
}
// Update blocks info
if let Some(blocks_meta_tx) = &blocks_meta_tx {
if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
}
}

// Remove outdated block reconstruction info
{
let remove_outdated_start = Instant::now();
let _remove_outdated_guard = scopeguard::guard((), |_| {
let elapsed = remove_outdated_start.elapsed().as_nanos();
REMOVE_OUTDATED_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0);
});

match &message {
// On startup we can receive few Confirmed/Finalized slots without BlockMeta message
// With saved first Processed slot we can ignore errors caused by startup process
Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => {
processed_first_slot = Some(msg.slot);
}
Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => {
// keep extra 10 slots
if let Some(msg_slot) = msg.slot.checked_sub(10) {
loop {
match messages.keys().next().cloned() {
Some(slot) if slot < msg_slot => {
if let Some(slot_messages) = messages.remove(&slot) {
match processed_first_slot {
Some(processed_first) if slot <= processed_first => continue,
None => continue,
_ => {}
match &message {
// On startup we can receive few Confirmed/Finalized slots without BlockMeta message
// With saved first Processed slot we can ignore errors caused by startup process
Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => {
processed_first_slot = Some(msg.slot);
}
Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => {
// keep extra 10 slots
if let Some(msg_slot) = msg.slot.checked_sub(10) {
loop {
match messages.keys().next().cloned() {
Some(slot) if slot < msg_slot => {
if let Some(slot_messages) = messages.remove(&slot) {
match processed_first_slot {
Some(processed_first) if slot <= processed_first => continue,
None => continue,
_ => {}
}

if !slot_messages.sealed && slot_messages.finalized_at.is_some() {
let mut reasons = vec![];
if let Some(block_meta) = slot_messages.block_meta {
let block_txn_count = block_meta.executed_transaction_count as usize;
let msg_txn_count = slot_messages.transactions.len();
if block_txn_count != msg_txn_count {
reasons.push("InvalidTxnCount");
error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}");
}
let block_entries_count = block_meta.entries_count as usize;
let msg_entries_count = slot_messages.entries.len();
if block_entries_count != msg_entries_count {
reasons.push("InvalidEntriesCount");
error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}");
}
} else {
reasons.push("NoBlockMeta");
}
let reason = reasons.join(",");

if !slot_messages.sealed && slot_messages.finalized_at.is_some() {
let mut reasons = vec![];
if let Some(block_meta) = slot_messages.block_meta {
let block_txn_count = block_meta.executed_transaction_count as usize;
let msg_txn_count = slot_messages.transactions.len();
if block_txn_count != msg_txn_count {
reasons.push("InvalidTxnCount");
error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}");
}
let block_entries_count = block_meta.entries_count as usize;
let msg_entries_count = slot_messages.entries.len();
if block_entries_count != msg_entries_count {
reasons.push("InvalidEntriesCount");
error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}");
}
} else {
reasons.push("NoBlockMeta");
prom::update_invalid_blocks(format!("failed reconstruct {reason}"));
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("failed reconstruct #{slot} {reason}");
}
let reason = reasons.join(",");

prom::update_invalid_blocks(format!("failed reconstruct {reason}"));
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("failed reconstruct #{slot} {reason}");
}
ConfigBlockFailAction::Panic => {
panic!("failed reconstruct #{slot} {reason}");
}
ConfigBlockFailAction::Panic => {
panic!("failed reconstruct #{slot} {reason}");
}
}
}
}
_ => break,
}
_ => break,
}
}
}
_ => {}
}
_ => {}
}

{
let update_reconstruction_start = Instant::now();
let _update_reconstruction_guard = scopeguard::guard((), |_| {
let elapsed = update_reconstruction_start.elapsed().as_nanos();
UPDATE_RECONSTRUCTION_HISTOGRAM.observe(elapsed as f64 / 1_000_000_000.0);
});

// Update block reconstruction info
let slot_messages = messages.entry(message.get_slot()).or_default();
if !matches!(message, Message::Slot(_)) {
Expand Down Expand Up @@ -1091,7 +1052,6 @@ impl GrpcService {
}
}
}
}
() = &mut processed_sleep => {
if !processed_messages.is_empty() {
let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
Expand Down
18 changes: 0 additions & 18 deletions yellowstone-grpc-geyser/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,9 @@ lazy_static::lazy_static! {
"snapshot_message_queue_size", "Size of snapshot message queue"
).unwrap();

pub static ref GEYSER_LOOP_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("geyser_loop_histogram", "Processing loop time")
).unwrap();

pub static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new(
"connections_total", "Total number of connections to GRPC service"
).unwrap();

pub static ref BLOCK_UPDATE_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("block_update_histogram", "block loop time")
).unwrap();
pub static ref REMOVE_OUTDATED_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("remove_outdated_histogram", "outdated loop time")
).unwrap();
pub static ref UPDATE_RECONSTRUCTION_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("update_reconstruction_histogram", "reconstruction loop time")
).unwrap();
}

#[derive(Debug)]
Expand All @@ -88,10 +74,6 @@ impl PrometheusService {
register!(CONNECTIONS_TOTAL);
register!(INCOMING_MESSAGES_COUNTER);
register!(SNAPSHOT_MESSAGE_QUEUE_SIZE);
register!(GEYSER_LOOP_HISTOGRAM);
register!(BLOCK_UPDATE_HISTOGRAM);
register!(REMOVE_OUTDATED_HISTOGRAM);
register!(UPDATE_RECONSTRUCTION_HISTOGRAM);

VERSION
.with_label_values(&[
Expand Down

0 comments on commit 7190884

Please sign in to comment.