diff --git a/Cargo.lock b/Cargo.lock index 15dd2108..a0ca17e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,9 +209,9 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] name = "arc-swap" -version = "1.6.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "ark-bn254" @@ -2570,6 +2570,7 @@ dependencies = [ name = "jito-relayer" version = "0.1.14" dependencies = [ + "arc-swap", "chrono", "crossbeam-channel", "dashmap 5.5.3", diff --git a/Cargo.toml b/Cargo.toml index 879e55e0..009c961b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ license = "Apache-2.0" edition = "2021" [workspace.dependencies] +arc-swap = "1.7.1" axum = "0.5.17" bincode = "1.3.3" bytes = "1.4.0" diff --git a/block_engine/src/block_engine.rs b/block_engine/src/block_engine.rs index 699cf11c..83dbf946 100644 --- a/block_engine/src/block_engine.rs +++ b/block_engine/src/block_engine.rs @@ -384,7 +384,7 @@ impl BlockEngineRelayerHandler { let mut heartbeat_interval = interval(Duration::from_millis(500)); let mut auth_refresh_interval = interval(Duration::from_secs(60)); - let mut metrics_interval = interval(Duration::from_secs(1)); + let mut metrics_interval = interval(Duration::from_secs(10)); let mut heartbeat_count = 0; while !exit.load(Ordering::Relaxed) { diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index 0f2c25a7..5f17b8dc 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -9,6 +9,7 @@ license = { workspace = true } publish = false [dependencies] +arc-swap = { workspace = true } chrono = { workspace = true } crossbeam-channel = { workspace = true } dashmap = { workspace = true } diff --git a/relayer/src/health_manager.rs b/relayer/src/health_manager.rs index 5bba767b..7e310333 100644 --- a/relayer/src/health_manager.rs +++ b/relayer/src/health_manager.rs @@ -8,7 +8,7 @@ use std::{ time::{Duration, Instant}, }; -use crossbeam_channel::{select, tick, Receiver, Sender}; +use crossbeam_channel::{select, tick, Receiver}; use solana_metrics::datapoint_info; use solana_sdk::clock::Slot; @@ -28,7 +28,6 @@ pub struct HealthManager { impl HealthManager { pub fn new( slot_receiver: Receiver, - slot_sender: Sender, missing_slot_unhealthy_threshold: Duration, exit: Arc, ) -> HealthManager { @@ -39,8 +38,6 @@ impl HealthManager { .name("health_manager".to_string()) .spawn(move || { let mut last_update = Instant::now(); - let mut slot_sender_max_len = 0usize; - let channel_len_tick = tick(Duration::from_secs(5)); let check_and_metrics_tick = tick(missing_slot_unhealthy_threshold / 2); while !exit.load(Ordering::Relaxed) { @@ -54,24 +51,14 @@ impl HealthManager { *health_state.write().unwrap() = new_health_state; datapoint_info!( "relayer-health-state", - ("health_state", new_health_state, i64) + ("health_state", new_health_state, i64), + ("slot_receiver_len", slot_receiver.len(), i64), ); } - recv(slot_receiver) -> maybe_slot => { - let slot = maybe_slot.expect("error receiving slot, exiting"); - slot_sender.send(slot).expect("error forwarding slot, exiting"); + recv(slot_receiver) -> _maybe_slot => { last_update = Instant::now(); } - recv(channel_len_tick) -> _ => { - datapoint_info!( - "health_manager-channel_stats", - ("slot_sender_len", slot_sender_max_len, i64), - ("slot_sender_capacity", slot_sender.capacity().unwrap(), i64), - ); - slot_sender_max_len = 0; - } } - slot_sender_max_len = std::cmp::max(slot_sender_max_len, slot_sender.len()); } }) .unwrap(), diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index 995bdfc5..004e2445 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -29,11 +29,8 @@ use prost_types::Timestamp; use solana_core::banking_trace::BankingPacketBatch; use solana_metrics::datapoint_info; use solana_sdk::{ - address_lookup_table::AddressLookupTableAccount, - clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, - pubkey::Pubkey, - saturating_add_assign, - transaction::VersionedTransaction, + address_lookup_table::AddressLookupTableAccount, clock::NUM_CONSECUTIVE_LEADER_SLOTS, + pubkey::Pubkey, saturating_add_assign, transaction::VersionedTransaction, }; use thiserror::Error; use tokio::sync::mpsc::{channel, error::TrySendError, Sender as TokioSender}; @@ -59,15 +56,12 @@ struct RelayerMetrics { pub num_try_send_channel_full: u64, pub packet_latencies_us: Histogram, - pub crossbeam_slot_receiver_processing_us: Histogram, pub crossbeam_delay_packet_receiver_processing_us: Histogram, pub crossbeam_subscription_receiver_processing_us: Histogram, pub crossbeam_heartbeat_tick_processing_us: Histogram, pub crossbeam_metrics_tick_processing_us: Histogram, // channel stats - pub slot_receiver_max_len: usize, - pub slot_receiver_capacity: usize, pub subscription_receiver_max_len: usize, pub subscription_receiver_capacity: usize, pub delay_packet_receiver_max_len: usize, @@ -77,11 +71,7 @@ struct RelayerMetrics { } impl RelayerMetrics { - fn new( - slot_receiver_capacity: usize, - subscription_receiver_capacity: usize, - delay_packet_receiver_capacity: usize, - ) -> Self { + fn new(subscription_receiver_capacity: usize, delay_packet_receiver_capacity: usize) -> Self { RelayerMetrics { highest_slot: 0, num_added_connections: 0, @@ -92,13 +82,10 @@ impl RelayerMetrics { metrics_latency_us: 0, num_try_send_channel_full: 0, packet_latencies_us: Histogram::default(), - crossbeam_slot_receiver_processing_us: Histogram::default(), crossbeam_delay_packet_receiver_processing_us: Histogram::default(), crossbeam_subscription_receiver_processing_us: Histogram::default(), crossbeam_heartbeat_tick_processing_us: Histogram::default(), crossbeam_metrics_tick_processing_us: Histogram::default(), - slot_receiver_max_len: 0, - slot_receiver_capacity, subscription_receiver_max_len: 0, subscription_receiver_capacity, delay_packet_receiver_max_len: 0, @@ -110,11 +97,9 @@ impl RelayerMetrics { fn update_max_len( &mut self, - slot_receiver_len: usize, subscription_receiver_len: usize, delay_packet_receiver_len: usize, ) { - self.slot_receiver_max_len = std::cmp::max(self.slot_receiver_max_len, slot_receiver_len); self.subscription_receiver_max_len = std::cmp::max( self.subscription_receiver_max_len, subscription_receiver_len, @@ -239,27 +224,6 @@ impl RelayerMetrics { .unwrap_or_default(), i64 ), - ( - "crossbeam_slot_receiver_processing_us_p50", - self.crossbeam_slot_receiver_processing_us - .percentile(50.0) - .unwrap_or_default(), - i64 - ), - ( - "crossbeam_slot_receiver_processing_us_p90", - self.crossbeam_slot_receiver_processing_us - .percentile(90.0) - .unwrap_or_default(), - i64 - ), - ( - "crossbeam_slot_receiver_processing_us_p99", - self.crossbeam_slot_receiver_processing_us - .percentile(99.0) - .unwrap_or_default(), - i64 - ), ( "crossbeam_metrics_tick_processing_us_p50", self.crossbeam_metrics_tick_processing_us @@ -324,8 +288,6 @@ impl RelayerMetrics { i64 ), // channel lengths - ("slot_receiver_len", self.slot_receiver_max_len, i64), - ("slot_receiver_capacity", self.slot_receiver_capacity, i64), ( "subscription_receiver_len", self.subscription_receiver_max_len, @@ -415,7 +377,7 @@ impl RelayerImpl { #[allow(clippy::too_many_arguments)] pub fn new( - slot_receiver: Receiver, + highest_slot: Arc, delay_packet_receiver: Receiver, leader_schedule_cache: LeaderScheduleUpdatingHandle, public_ip: IpAddr, @@ -434,7 +396,7 @@ impl RelayerImpl { let (subscription_sender, subscription_receiver) = bounded(LoadBalancer::SLOT_QUEUE_CAPACITY); - let packet_subscriptions = Arc::new(RwLock::new(HashMap::default())); + let packet_subscriptions = Arc::new(RwLock::new(HashMap::with_capacity(1_000))); let thread = { let health_state = health_state.clone(); @@ -443,7 +405,7 @@ impl RelayerImpl { .name("relayer_impl-event_loop_thread".to_string()) .spawn(move || { let res = Self::run_event_loop( - slot_receiver, + highest_slot, subscription_receiver, delay_packet_receiver, leader_schedule_cache, @@ -479,7 +441,7 @@ impl RelayerImpl { #[allow(clippy::too_many_arguments)] fn run_event_loop( - slot_receiver: Receiver, + highest_slot: Arc, subscription_receiver: Receiver, delay_packet_receiver: Receiver, leader_schedule_cache: LeaderScheduleUpdatingHandle, @@ -492,36 +454,24 @@ impl RelayerImpl { validator_packet_batch_size: usize, forward_all: bool, ) -> RelayerResult<()> { - let mut highest_slot = Slot::default(); - let heartbeat_tick = crossbeam_channel::tick(Duration::from_millis(500)); - let metrics_tick = crossbeam_channel::tick(Duration::from_millis(1000)); + let metrics_tick = crossbeam_channel::tick(Duration::from_secs(10)); let mut relayer_metrics = RelayerMetrics::new( - slot_receiver.capacity().unwrap(), subscription_receiver.capacity().unwrap(), delay_packet_receiver.capacity().unwrap(), ); - - let mut slot_leaders = HashSet::new(); + let mut last_observed_slot = highest_slot.load(Ordering::Relaxed); + let mut senders: Vec<( + Pubkey, + TokioSender>, + )> = vec![]; while !exit.load(Ordering::Relaxed) { crossbeam_channel::select! { - recv(slot_receiver) -> maybe_slot => { - let start = Instant::now(); - - Self::update_highest_slot(maybe_slot, &mut highest_slot, &mut relayer_metrics)?; - - let slots: Vec<_> = (highest_slot - ..highest_slot + leader_lookahead * NUM_CONSECUTIVE_LEADER_SLOTS) - .collect(); - slot_leaders = leader_schedule_cache.leaders_for_slots(&slots); - - let _ = relayer_metrics.crossbeam_slot_receiver_processing_us.increment(start.elapsed().as_micros() as u64); - }, recv(delay_packet_receiver) -> maybe_packet_batches => { let start = Instant::now(); - let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size, forward_all)?; + let failed_forwards = Self::forward_packets(maybe_packet_batches, &senders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size)?; Self::drop_connections(failed_forwards, packet_subscriptions, &mut relayer_metrics); let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64); }, @@ -544,7 +494,7 @@ impl RelayerImpl { &mut relayer_metrics, ) }, - HealthState::Unhealthy => packet_subscriptions.read().unwrap().keys().cloned().collect(), + HealthState::Unhealthy => packet_subscriptions.read().unwrap().keys().copied().collect(), }; Self::drop_connections(pubkeys_to_drop, packet_subscriptions, &mut relayer_metrics); let _ = relayer_metrics.crossbeam_heartbeat_tick_processing_us.increment(start.elapsed().as_micros() as u64); @@ -563,18 +513,36 @@ impl RelayerImpl { relayer_metrics.report(); relayer_metrics = RelayerMetrics::new( - slot_receiver.capacity().unwrap(), subscription_receiver.capacity().unwrap(), delay_packet_receiver.capacity().unwrap(), ); } } - relayer_metrics.update_max_len( - slot_receiver.len(), - subscription_receiver.len(), - delay_packet_receiver.len(), - ); + // update senders every new slot + let new_slot = highest_slot.load(Ordering::Relaxed); + if last_observed_slot != new_slot { + last_observed_slot = new_slot; + let packet_subscriptions = packet_subscriptions.read().unwrap(); + if forward_all { + senders = packet_subscriptions + .iter() + .map(|(pk, sender)| (*pk, sender.clone())) + .collect() + } else { + let slot_leaders = + new_slot..new_slot + leader_lookahead * NUM_CONSECUTIVE_LEADER_SLOTS; + let schedule = leader_schedule_cache.get_schedule().load(); + senders = slot_leaders + .filter_map(|s| schedule.get(&s)) + .filter_map(|pubkey| { + Some((*pubkey, packet_subscriptions.get(pubkey)?.clone())) + }) + .collect() + } + } + relayer_metrics + .update_max_len(subscription_receiver.len(), delay_packet_receiver.len()); } Ok(()) } @@ -633,13 +601,14 @@ impl RelayerImpl { /// Returns pubkeys of subscribers that failed to send fn forward_packets( maybe_packet_batches: Result, - subscriptions: &PacketSubscriptions, - slot_leaders: &HashSet, + senders: &Vec<( + Pubkey, + TokioSender>, + )>, relayer_metrics: &mut RelayerMetrics, ofac_addresses: &HashSet, address_lookup_table_cache: &Arc>, validator_packet_batch_size: usize, - forward_all: bool, ) -> RelayerResult> { let packet_batches = maybe_packet_batches?; @@ -652,26 +621,22 @@ impl RelayerImpl { .banking_packet_batch .0 .iter() - .flat_map(|batch| { - batch - .iter() - .filter(|p| !p.meta().discard()) - .filter_map(|packet| { - if !ofac_addresses.is_empty() { - let tx: VersionedTransaction = packet.deserialize_slice(..).ok()?; - if !is_tx_ofac_related(&tx, ofac_addresses, address_lookup_table_cache) - { - Some(packet) - } else { - None - } - } else { - Some(packet) - } - }) - .filter_map(packet_to_proto_packet) + .flat_map(|batch| batch.iter().filter(|p| !p.meta().discard())) + .filter_map(|packet| { + if ofac_addresses.is_empty() { + return Some(packet); + } + let tx: VersionedTransaction = packet.deserialize_slice(..).ok()?; + if is_tx_ofac_related(&tx, ofac_addresses, address_lookup_table_cache) { + return None; + } + Some(packet) }) + .filter_map(packet_to_proto_packet) .collect(); + if packets.is_empty() { + return Ok(vec![]); + } let mut proto_packet_batches = Vec::with_capacity(packets.len() / validator_packet_batch_size); @@ -681,33 +646,19 @@ impl RelayerImpl { }); } - let l_subscriptions = subscriptions.read().unwrap(); - - let senders = if forward_all { - l_subscriptions.iter().collect::>, - )>>() - } else { - slot_leaders - .iter() - .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) - .collect() - }; - let mut failed_forwards = Vec::new(); for batch in &proto_packet_batches { // NOTE: this is important to avoid divide-by-0 inside the validator if packets - // get routed to sigverify under the assumption theres > 0 packets in the batch + // get routed to sigverify under the assumption there's > 0 packets in the batch if batch.packets.is_empty() { continue; } - - for (pubkey, sender) in &senders { + let now = Timestamp::from(SystemTime::now()); + for (pubkey, sender) in senders { // try send because it's a bounded channel and we don't want to block if the channel is full match sender.try_send(Ok(SubscribePacketsResponse { header: Some(Header { - ts: Some(Timestamp::from(SystemTime::now())), + ts: Some(now.clone()), }), msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())), })) { @@ -722,7 +673,7 @@ impl RelayerImpl { } Err(TrySendError::Closed(_)) => { error!("channel is closed for pubkey: {:?}", pubkey); - failed_forwards.push(**pubkey); + failed_forwards.push(*pubkey); break; } } @@ -762,17 +713,6 @@ impl RelayerImpl { Ok(()) } - fn update_highest_slot( - maybe_slot: Result, - highest_slot: &mut Slot, - relayer_metrics: &mut RelayerMetrics, - ) -> RelayerResult<()> { - *highest_slot = maybe_slot?; - datapoint_info!("relayer-highest_slot", ("slot", *highest_slot as i64, i64)); - relayer_metrics.highest_slot = *highest_slot; - Ok(()) - } - /// Prevent validators from authenticating if the relayer is unhealthy fn check_health(health_state: &Arc>) -> Result<(), Status> { if *health_state.read().unwrap() != HealthState::Healthy { diff --git a/relayer/src/schedule_cache.rs b/relayer/src/schedule_cache.rs index 157af2d5..b04b826c 100644 --- a/relayer/src/schedule_cache.rs +++ b/relayer/src/schedule_cache.rs @@ -1,15 +1,17 @@ use std::{ collections::{HashMap, HashSet}, + ops::Range, str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + Arc, }, thread, thread::{sleep, Builder, JoinHandle}, time::Duration, }; +use arc_swap::ArcSwap; use jito_rpc::load_balancer::LoadBalancer; use log::{debug, error}; use solana_metrics::datapoint_info; @@ -20,7 +22,7 @@ use solana_sdk::{ pub struct LeaderScheduleCacheUpdater { /// Maps slots to scheduled pubkey - schedules: Arc>>, + schedules: Arc>>, /// Refreshes leader schedule refresh_thread: JoinHandle<()>, @@ -28,34 +30,34 @@ pub struct LeaderScheduleCacheUpdater { #[derive(Clone)] pub struct LeaderScheduleUpdatingHandle { - schedule: Arc>>, + schedule: Arc>>, } /// Access handle to a constantly updating leader schedule impl LeaderScheduleUpdatingHandle { - pub fn new(schedule: Arc>>) -> LeaderScheduleUpdatingHandle { + pub fn new(schedule: Arc>>) -> LeaderScheduleUpdatingHandle { LeaderScheduleUpdatingHandle { schedule } } pub fn leader_for_slot(&self, slot: &Slot) -> Option { - self.schedule.read().unwrap().get(slot).cloned() + self.schedule.load().get(slot).copied() } - pub fn leaders_for_slots(&self, slots: &[Slot]) -> HashSet { - let schedule = self.schedule.read().unwrap(); - slots - .iter() - .filter_map(|s| schedule.get(s).cloned()) - .collect() + pub fn leaders_for_slots(&self, slots: Range) -> HashSet { + let schedule = self.schedule.load(); + slots.filter_map(|s| schedule.get(&s)).copied().collect() } pub fn is_scheduled_validator(&self, pubkey: &Pubkey) -> bool { self.schedule - .read() - .unwrap() + .load() .iter() .any(|(_, scheduled_pubkey)| scheduled_pubkey == pubkey) } + + pub fn get_schedule(&self) -> &Arc>> { + &self.schedule + } } impl LeaderScheduleCacheUpdater { @@ -63,7 +65,7 @@ impl LeaderScheduleCacheUpdater { load_balancer: &Arc, exit: &Arc, ) -> LeaderScheduleCacheUpdater { - let schedules = Arc::new(RwLock::new(HashMap::new())); + let schedules = Arc::new(ArcSwap::from_pointee(HashMap::with_capacity(10_000))); let refresh_thread = Self::refresh_thread(schedules.clone(), load_balancer.clone(), exit); LeaderScheduleCacheUpdater { schedules, @@ -81,7 +83,7 @@ impl LeaderScheduleCacheUpdater { } fn refresh_thread( - schedule: Arc>>, + schedule: Arc>>, load_balancer: Arc, exit: &Arc, ) -> JoinHandle<()> { @@ -92,14 +94,16 @@ impl LeaderScheduleCacheUpdater { while !exit.load(Ordering::Relaxed) { let mut update_ok_count = 0; let mut update_fail_count = 0; + let mut slots_in_schedule = 0; match Self::update_leader_cache(&load_balancer, &schedule) { - true => update_ok_count += 1, - false => update_fail_count += 1, + Some(count) => { + update_ok_count = 1; + slots_in_schedule = count + } + None => update_fail_count = 1, } - let slots_in_schedule = schedule.read().unwrap().len(); - datapoint_info!( "schedule-cache-update", ("update_ok_count", update_ok_count, i64), @@ -115,33 +119,33 @@ impl LeaderScheduleCacheUpdater { pub fn update_leader_cache( load_balancer: &Arc, - schedule: &Arc>>, - ) -> bool { + schedule: &Arc>>, + ) -> Option { let rpc_client = load_balancer.rpc_client(); - if let Ok(epoch_info) = rpc_client.get_epoch_info() { - if let Ok(Some(leader_schedule)) = rpc_client.get_leader_schedule(None) { - let epoch_offset = epoch_info.absolute_slot - epoch_info.slot_index; + let Ok(epoch_info) = rpc_client.get_epoch_info() else { + error!("Couldn't Get Epoch Info from RPC!!!"); + return None; + }; + let Ok(Some(leader_schedule)) = rpc_client.get_leader_schedule(None) else { + error!("Couldn't Get Leader Schedule Update from RPC!!!"); + return None; + }; - debug!("read leader schedule of length: {}", leader_schedule.len()); + let epoch_offset = epoch_info.absolute_slot - epoch_info.slot_index; + debug!("read leader schedule of length: {}", leader_schedule.len()); - let mut new_schedule = HashMap::with_capacity(DEFAULT_SLOTS_PER_EPOCH as usize); - for (pk_str, slots) in leader_schedule.iter() { - for slot in slots.iter() { - if let Ok(pubkey) = Pubkey::from_str(pk_str) { - new_schedule.insert(*slot as u64 + epoch_offset, pubkey); - } - } + let mut new_schedule = HashMap::with_capacity(DEFAULT_SLOTS_PER_EPOCH as usize); + for (pk_str, slots) in leader_schedule.iter() { + for slot in slots.iter() { + if let Ok(pubkey) = Pubkey::from_str(pk_str) { + new_schedule.insert(*slot as u64 + epoch_offset, pubkey); } - *schedule.write().unwrap() = new_schedule; - - return true; - } else { - error!("Couldn't Get Leader Schedule Update from RPC!!!") - }; - } else { - error!("Couldn't Get Epoch Info from RPC!!!") - }; - false + } + } + let schedule_size = new_schedule.len(); + schedule.store(Arc::new(new_schedule)); + + Some(schedule_size) } } diff --git a/rpc/src/load_balancer.rs b/rpc/src/load_balancer.rs index d97ced6a..76542fe4 100644 --- a/rpc/src/load_balancer.rs +++ b/rpc/src/load_balancer.rs @@ -33,7 +33,7 @@ impl LoadBalancer { pub fn new( servers: &[(String, String)], /* http rpc url, ws url */ exit: &Arc, - ) -> (LoadBalancer, Receiver) { + ) -> (LoadBalancer, Receiver, Arc) { let server_to_slot = Arc::new(DashMap::from_iter( servers.iter().map(|(_, ws)| (ws.clone(), 0)), )); @@ -50,14 +50,20 @@ impl LoadBalancer { if let Err(e) = rpc_client.get_slot() { error!("error warming up rpc: {rpc_url}. error: {e}"); } - // store as ws instead of http so we can lookup by furthest ahead ws subscription + // store as ws instead of http, so we can look up by furthest ahead ws subscription (ws.clone(), rpc_client) })); - // sender tracked as health_manager-channel_stats.slot_sender_len + let highest_slot = Arc::new(AtomicU64::default()); + // size tracked as relayer-health-state.slot_receiver_len let (slot_sender, slot_receiver) = crossbeam_channel::bounded(Self::SLOT_QUEUE_CAPACITY); - let subscription_threads = - Self::start_subscription_threads(servers, server_to_slot.clone(), slot_sender, exit); + let subscription_threads = Self::start_subscription_threads( + servers, + server_to_slot.clone(), + slot_sender, + highest_slot.clone(), + exit, + ); ( LoadBalancer { server_to_slot, @@ -65,6 +71,7 @@ impl LoadBalancer { subscription_threads, }, slot_receiver, + highest_slot, ) } @@ -72,10 +79,9 @@ impl LoadBalancer { servers: &[(String, String)], server_to_slot: Arc>, slot_sender: Sender, + highest_slot: Arc, exit: &Arc, ) -> Vec> { - let highest_slot = Arc::new(AtomicU64::default()); - servers .iter() .map(|(_, websocket_url)| { @@ -104,7 +110,6 @@ impl LoadBalancer { { Ok(slot) => { last_slot_update = Instant::now(); - server_to_slot .insert(websocket_url.clone(), slot.slot); datapoint_info!( @@ -113,15 +118,13 @@ impl LoadBalancer { ("slot", slot.slot, i64) ); - { - let old_slot = highest_slot.fetch_max(slot.slot, Ordering::Relaxed); - if slot.slot > old_slot { - if let Err(e) = slot_sender.send(slot.slot) - { - error!("error sending slot: {e}"); - break; - } + let old_slot = highest_slot.fetch_max(slot.slot, Ordering::Relaxed); + if slot.slot > old_slot { + if let Err(e) = slot_sender.send(slot.slot) { + error!("error sending slot: {e}"); + break; } + datapoint_info!("relayer-highest_slot", ("slot", slot.slot, i64)); } } Err(RecvTimeoutError::Timeout) => { diff --git a/transaction-relayer/src/forwarder.rs b/transaction-relayer/src/forwarder.rs index 12b1d9b0..9af530d3 100644 --- a/transaction-relayer/src/forwarder.rs +++ b/transaction-relayer/src/forwarder.rs @@ -44,7 +44,7 @@ pub fn start_forward_and_delay_thread( let mut buffered_packet_batches: VecDeque = VecDeque::with_capacity(100_000); - let metrics_interval = Duration::from_secs(1); + let metrics_interval = Duration::from_secs(10); let mut forwarder_metrics = ForwarderMetrics::new( buffered_packet_batches.capacity(), verified_receiver.capacity().unwrap_or_default(), // TODO (LB): unbounded channel now, remove metric diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index b9fcc32d..c55ad588 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -410,7 +410,7 @@ fn main() { .unwrap_or_default(); info!("ofac addresses: {:?}", ofac_addresses); - let (rpc_load_balancer, slot_receiver) = LoadBalancer::new(&servers, &exit); + let (rpc_load_balancer, slot_receiver, highest_slot) = LoadBalancer::new(&servers, &exit); let rpc_load_balancer = Arc::new(rpc_load_balancer); // Lookup table refresher @@ -426,14 +426,10 @@ fn main() { let staked_nodes_overrides = match args.staked_nodes_overrides { None => StakedNodesOverrides::default(), Some(p) => { - let file = fs::File::open(&p).expect(&format!( - "Failed to open staked nodes overrides file: {:?}", - &p - )); - serde_yaml::from_reader(file).expect(&format!( - "Failed to read staked nodes overrides file: {:?}", - &p, - )) + let file = fs::File::open(&p) + .unwrap_or_else(|_| panic!("Failed to open staked nodes overrides file: {p:?}")); + serde_yaml::from_reader(file) + .unwrap_or_else(|_| panic!("Failed to read staked nodes overrides file: {p:?}")) } }; let (tpu, verified_receiver) = Tpu::new( @@ -494,26 +490,20 @@ fn main() { ofac_addresses.clone(), ); - // receiver tracked as relayer_metrics.slot_receiver_len - // downstream channel gets data that was duplicated by HealthManager - let (downstream_slot_sender, downstream_slot_receiver) = - crossbeam_channel::bounded(LoadBalancer::SLOT_QUEUE_CAPACITY); let health_manager = HealthManager::new( slot_receiver, - downstream_slot_sender, Duration::from_secs(args.missing_slot_unhealthy_secs), exit.clone(), ); let server_addr = SocketAddr::new(args.grpc_bind_ip, args.grpc_bind_port); let relayer_svc = RelayerImpl::new( - downstream_slot_receiver, + highest_slot, delay_packet_receiver, leader_cache.handle(), public_ip, - (args.tpu_quic_port..args.tpu_quic_port + args.num_tpu_quic_servers as u16).collect(), - (args.tpu_quic_fwd_port..args.tpu_quic_fwd_port + args.num_tpu_fwd_quic_servers as u16) - .collect(), + (args.tpu_quic_port..args.tpu_quic_port + args.num_tpu_quic_servers).collect(), + (args.tpu_quic_fwd_port..args.tpu_quic_fwd_port + args.num_tpu_fwd_quic_servers).collect(), health_manager.handle(), exit.clone(), ofac_addresses, @@ -557,9 +547,8 @@ fn main() { let rt = Builder::new_multi_thread().enable_all().build().unwrap(); rt.spawn({ - let relayer_state = relayer_state.clone(); start_relayer_web_server( - relayer_state, + relayer_state.clone(), args.webserver_bind_addr, MAX_BUFFERED_REQUESTS, REQUESTS_PER_SECOND,