diff --git a/src/grpc_geyser.rs b/src/grpc_geyser.rs index 021ab28..fec4d95 100644 --- a/src/grpc_geyser.rs +++ b/src/grpc_geyser.rs @@ -10,32 +10,30 @@ use rand::distributions::Alphanumeric; use rand::Rng; use solana_sdk::clock::UnixTimestamp; use solana_sdk::signature::Signature; -use tokio::{sync::RwLock, time::sleep}; +use tokio::time::sleep; use tonic::async_trait; -use tracing::{error, info}; +use tracing::error; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::geyser::SubscribeRequestFilterBlocks; -use yellowstone_grpc_proto::{ - geyser::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestFilterSlots, SubscribeRequestPing, - }, - tonic::service::Interceptor, +use yellowstone_grpc_proto::geyser::{ + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, SubscribeRequestFilterSlots, + SubscribeRequestPing, }; use crate::solana_rpc::SolanaRpc; -use crate::utils::unix_to_time; -pub struct GrpcGeyserImpl { - grpc_client: Arc>>, +pub struct GrpcGeyserImpl { + endpoint: String, + auth_header: Option, cur_slot: Arc, signature_cache: Arc>, } -impl GrpcGeyserImpl { - pub fn new(grpc_client: Arc>>) -> Self { +impl GrpcGeyserImpl { + pub fn new(endpoint: String, auth_header: Option) -> Self { let grpc_geyser = Self { - grpc_client, + endpoint, + auth_header, cur_slot: Arc::new(AtomicU64::new(0)), signature_cache: Arc::new(DashMap::new()), }; @@ -59,15 +57,27 @@ impl GrpcGeyserImpl { } fn poll_blocks(&self) { - let grpc_client = self.grpc_client.clone(); + let endpoint = self.endpoint.clone(); + let auth_header = self.auth_header.clone(); let signature_cache = self.signature_cache.clone(); tokio::spawn(async move { loop { let mut grpc_tx; let mut grpc_rx; { - let mut grpc_client = grpc_client.write().await; + let mut grpc_client = GeyserGrpcClient::connect::( + endpoint.clone(), + auth_header.clone(), + None, + ); + if let Err(e) = grpc_client { + error!("Error connecting to gRPC, waiting one second then retrying connect: {}", e); + statsd_count!("grpc_connect_error", 1); + sleep(Duration::from_secs(1)).await; + continue; + } let subscription = grpc_client + .unwrap() .subscribe_with_request(Some(get_block_subscribe_request())) .await; if let Err(e) = subscription { @@ -105,17 +115,22 @@ impl GrpcGeyserImpl { } }, Err(error) => { - error!("error in txn subscribe, resubscribing in 1 second: {error:?}"); - sleep(Duration::from_secs(1)).await; + error!( + "error in block subscribe, resubscribing in 1 second: {error:?}" + ); + statsd_count!("grpc_resubscribe", 1); + break; } } } + sleep(Duration::from_secs(1)).await; } }); } fn poll_slots(&self) { - let grpc_client = self.grpc_client.clone(); + let endpoint = self.endpoint.clone(); + let auth_header = self.auth_header.clone(); let cur_slot = self.cur_slot.clone(); // let grpc_tx = self.grpc_tx.clone(); tokio::spawn(async move { @@ -123,8 +138,18 @@ impl GrpcGeyserImpl { let mut grpc_tx; let mut grpc_rx; { - let mut grpc_client = grpc_client.write().await; - let subscription = grpc_client.subscribe().await; + let mut grpc_client = GeyserGrpcClient::connect::( + endpoint.clone(), + auth_header.clone(), + None, + ); + if let Err(e) = grpc_client { + error!("Error connecting to gRPC, waiting one second then retrying connect: {}", e); + statsd_count!("grpc_connect_error", 1); + sleep(Duration::from_secs(1)).await; + continue; + } + let subscription = grpc_client.unwrap().subscribe().await; if let Err(e) = subscription { error!("Error subscribing to gRPC stream, waiting one second then retrying connect: {}", e); statsd_count!("grpc_subscribe_error", 1); @@ -158,7 +183,8 @@ impl GrpcGeyserImpl { } } Err(error) => { - error!("error: {error:?}"); + error!("error in slot subscribe, resubscribing in 1 second: {error:?}"); + statsd_count!("grpc_resubscribe", 1); break; } } @@ -170,7 +196,7 @@ impl GrpcGeyserImpl { } #[async_trait] -impl SolanaRpc for GrpcGeyserImpl { +impl SolanaRpc for GrpcGeyserImpl { async fn confirm_transaction(&self, signature: String) -> Option { let start = Instant::now(); // in practice if a tx doesn't land in less than 60 seconds it's probably not going to land @@ -178,7 +204,7 @@ impl SolanaRpc for GrpcGeyserImpl { if let Some(block_time) = self.signature_cache.get(&signature) { return Some(block_time.0.clone()); } - sleep(Duration::from_millis(200)).await; + sleep(Duration::from_millis(10)).await; } return None; } diff --git a/src/main.rs b/src/main.rs index 1432918..4cae900 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,7 @@ struct AtlasTxnSenderEnv { txn_sender_threads: Option, max_txn_send_retries: Option, txn_send_retry_interval: Option, + max_retry_queue_size: Option, } // Defualt on RPC is 4 @@ -104,12 +105,11 @@ async fn main() -> anyhow::Result<()> { )); } - let client = Arc::new(RwLock::new( - GeyserGrpcClient::connect::(env.grpc_url.unwrap(), env.x_token, None) - .unwrap(), - )); let transaction_store = Arc::new(TransactionStoreImpl::new()); - let solana_rpc = Arc::new(GrpcGeyserImpl::new(client)); + let solana_rpc = Arc::new(GrpcGeyserImpl::new( + env.grpc_url.clone().unwrap(), + env.x_token.clone(), + )); let rpc_client = Arc::new(RpcClient::new(env.rpc_url.unwrap())); let num_leaders = env.num_leaders.unwrap_or(2); let leader_offset = env.leader_offset.unwrap_or(0); @@ -127,6 +127,7 @@ async fn main() -> anyhow::Result<()> { solana_rpc, env.txn_sender_threads.unwrap_or(4), txn_send_retry_interval_seconds, + env.max_retry_queue_size, )); let max_txn_send_retries = env.max_txn_send_retries.unwrap_or(5); let atlas_txn_sender = diff --git a/src/txn_sender.rs b/src/txn_sender.rs index 6fc23aa..ab5b153 100644 --- a/src/txn_sender.rs +++ b/src/txn_sender.rs @@ -42,6 +42,7 @@ pub struct TxnSenderImpl { solana_rpc: Arc, txn_sender_runtime: Arc, txn_send_retry_interval_seconds: usize, + max_retry_queue_size: Option, } impl TxnSenderImpl { @@ -52,6 +53,7 @@ impl TxnSenderImpl { solana_rpc: Arc, txn_sender_threads: usize, txn_send_retry_interval_seconds: usize, + max_retry_queue_size: Option, ) -> Self { let txn_sender_runtime = Builder::new_multi_thread() .worker_threads(txn_sender_threads) @@ -65,6 +67,7 @@ impl TxnSenderImpl { solana_rpc, txn_sender_runtime: Arc::new(txn_sender_runtime), txn_send_retry_interval_seconds, + max_retry_queue_size, }; txn_sender.retry_transactions(); txn_sender @@ -76,14 +79,39 @@ impl TxnSenderImpl { let connection_cache = self.connection_cache.clone(); let txn_sender_runtime = self.txn_sender_runtime.clone(); let txn_send_retry_interval_seconds = self.txn_send_retry_interval_seconds.clone(); + let max_retry_queue_size = self.max_retry_queue_size.clone(); tokio::spawn(async move { loop { let mut transactions_reached_max_retries = vec![]; - let transactions = transaction_store.get_transactions(); - statsd_gauge!("transaction_retry_queue_length", transactions.len() as u64); + let transaction_map = transaction_store.get_transactions(); + let queue_length = transaction_map.len(); + statsd_gauge!("transaction_retry_queue_length", queue_length as u64); + + // Shed transactions by retry_count, if necessary. + if let Some(max_size) = max_retry_queue_size { + if queue_length > max_size { + warn!( + "Transaction retry queue length is over the limit of {}: {}. Load shedding transactions with highest retry count.", + max_size, + queue_length + ); + let mut transactions: Vec<(String, TransactionData)> = transaction_map + .iter() + .map(|x| (x.key().to_owned(), x.value().to_owned())) + .collect(); + transactions.sort_by(|(_, a), (_, b)| a.retry_count.cmp(&b.retry_count)); + let transactions_to_remove = transactions[(max_size + 1)..].to_vec(); + for (signature, _) in transactions_to_remove { + transaction_store.remove_transaction(signature.clone()); + transaction_map.remove(&signature); + } + let records_dropped = queue_length - max_size; + statsd_gauge!("transactions_retry_queue_dropped", records_dropped as u64); + } + } let mut wire_transactions = vec![]; - for mut transaction_data in transactions.iter_mut() { + for mut transaction_data in transaction_map.iter_mut() { wire_transactions.push(transaction_data.wire_transaction.clone()); if transaction_data.retry_count >= transaction_data.max_retries { transactions_reached_max_retries @@ -92,48 +120,49 @@ impl TxnSenderImpl { transaction_data.retry_count += 1; } } - let mut leader_num = 0; - for leader in leader_tracker.get_leaders() { - if leader.tpu_quic.is_none() { - error!("leader {:?} has no tpu_quic", leader); - continue; - } - let connection_cache = connection_cache.clone(); - let sent_at = Instant::now(); - let leader = Arc::new(leader.clone()); - let wire_transactions = wire_transactions.clone(); - txn_sender_runtime.spawn(async move { - // retry unless its a timeout - for i in 0..SEND_TXN_RETRIES { - let conn = connection_cache - .get_nonblocking_connection(&leader.tpu_quic.unwrap()); - if let Ok(result) = timeout(MAX_TIMEOUT_SEND_DATA_BATCH, conn.send_data_batch(&wire_transactions)).await { - if let Err(e) = result { - if i == SEND_TXN_RETRIES-1 { - error!( - retry = "true", - "Failed to send transaction batch to {:?}: {}", - leader, e - ); - } else { - warn!( - retry = "true", - "Retrying to send transaction batch to {:?}: {}", - leader, e - ); - } + for wire_transaction in wire_transactions.iter() { + let mut leader_num = 0; + for leader in leader_tracker.get_leaders() { + if leader.tpu_quic.is_none() { + error!("leader {:?} has no tpu_quic", leader); + continue; + } + let connection_cache = connection_cache.clone(); + let sent_at = Instant::now(); + let leader = Arc::new(leader.clone()); + let wire_transaction = wire_transaction.clone(); + txn_sender_runtime.spawn(async move { + // retry unless its a timeout + for i in 0..SEND_TXN_RETRIES { + let conn = connection_cache + .get_nonblocking_connection(&leader.tpu_quic.unwrap()); + if let Ok(result) = timeout(MAX_TIMEOUT_SEND_DATA_BATCH, conn.send_data(&wire_transaction)).await { + if let Err(e) = result { + if i == SEND_TXN_RETRIES-1 { + error!( + retry = "true", + "Failed to send transaction batch to {:?}: {}", + leader, e + ); + statsd_count!("transaction_send_error", 1, "retry" => "true", "last_attempt" => "true"); } else { - let leader_num_str = leader_num.to_string(); - statsd_time!( - "transaction_received_by_leader", - sent_at.elapsed(), "leader_num" => &leader_num_str, "api_key" => "not_applicable", "retry" => "true"); - return; + statsd_count!("transaction_send_error", 1, "retry" => "true", "last_attempt" => "false"); } + } else { + let leader_num_str = leader_num.to_string(); + statsd_time!( + "transaction_received_by_leader", + sent_at.elapsed(), "leader_num" => &leader_num_str, "api_key" => "not_applicable", "retry" => "true"); + return; } - statsd_count!("transaction_send_error", 1); + } else { + // Note: This is far too frequent to log. It will fill the disks on the host and cost too much on DD. + statsd_count!("transaction_send_timeout", 1); } - }); - leader_num += 1; + } + }); + leader_num += 1; + } } // remove transactions that reached max retries for signature in transactions_reached_max_retries { @@ -144,6 +173,7 @@ impl TxnSenderImpl { } }); } + fn track_transaction(&self, transaction_data: &TransactionData) { let sent_at = transaction_data.sent_at.clone(); let signature = get_signature(transaction_data); @@ -268,20 +298,17 @@ impl TxnSender for TxnSenderImpl { let conn = connection_cache.get_nonblocking_connection(&leader.tpu_quic.unwrap()); if let Ok(result) = timeout(MAX_TIMEOUT_SEND_DATA, conn.send_data(&wire_transaction)).await { - if let Err(e) = result { - if i == SEND_TXN_RETRIES-1 { - error!( - retry = "false", - "Failed to send transaction to {:?}: {}", - leader, e - ); - } else { - warn!( - retry = "false", - "Retrying to send transaction to {:?}: {}", - leader, e - ); - } + if let Err(e) = result { + if i == SEND_TXN_RETRIES-1 { + error!( + retry = "false", + "Failed to send transaction to {:?}: {}", + leader, e + ); + statsd_count!("transaction_send_error", 1, "retry" => "false", "last_attempt" => "true"); + } else { + statsd_count!("transaction_send_error", 1, "retry" => "false", "last_attempt" => "false"); + } } else { let leader_num_str = leader_num.to_string(); statsd_time!( @@ -289,8 +316,10 @@ impl TxnSender for TxnSenderImpl { transaction_data.sent_at.elapsed(), "leader_num" => &leader_num_str, "api_key" => &api_key, "retry" => "false"); return; } + } else { + // Note: This is far too frequent to log. It will fill the disks on the host and cost too much on DD. + statsd_count!("transaction_send_timeout", 1); } - statsd_count!("transaction_send_error", 1); } }); leader_num += 1;