Skip to content

Commit

Permalink
add geyser reconnect logic and retry queue load shedding (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasPennie authored Apr 6, 2024
1 parent 97398f1 commit f42452e
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 85 deletions.
74 changes: 50 additions & 24 deletions src/grpc_geyser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,30 @@ use rand::distributions::Alphanumeric;
use rand::Rng;
use solana_sdk::clock::UnixTimestamp;
use solana_sdk::signature::Signature;
use tokio::{sync::RwLock, time::sleep};
use tokio::time::sleep;
use tonic::async_trait;
use tracing::{error, info};
use tracing::error;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::SubscribeRequestFilterBlocks;
use yellowstone_grpc_proto::{
geyser::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterSlots, SubscribeRequestPing,
},
tonic::service::Interceptor,
use yellowstone_grpc_proto::geyser::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, SubscribeRequestFilterSlots,
SubscribeRequestPing,
};

use crate::solana_rpc::SolanaRpc;
use crate::utils::unix_to_time;

pub struct GrpcGeyserImpl<T> {
grpc_client: Arc<RwLock<GeyserGrpcClient<T>>>,
pub struct GrpcGeyserImpl {
endpoint: String,
auth_header: Option<String>,
cur_slot: Arc<AtomicU64>,
signature_cache: Arc<DashMap<String, (UnixTimestamp, Instant)>>,
}

impl<T: Interceptor + Send + Sync + 'static> GrpcGeyserImpl<T> {
pub fn new(grpc_client: Arc<RwLock<GeyserGrpcClient<T>>>) -> Self {
impl GrpcGeyserImpl {
pub fn new(endpoint: String, auth_header: Option<String>) -> Self {
let grpc_geyser = Self {
grpc_client,
endpoint,
auth_header,
cur_slot: Arc::new(AtomicU64::new(0)),
signature_cache: Arc::new(DashMap::new()),
};
Expand All @@ -59,15 +57,27 @@ impl<T: Interceptor + Send + Sync + 'static> GrpcGeyserImpl<T> {
}

fn poll_blocks(&self) {
let grpc_client = self.grpc_client.clone();
let endpoint = self.endpoint.clone();
let auth_header = self.auth_header.clone();
let signature_cache = self.signature_cache.clone();
tokio::spawn(async move {
loop {
let mut grpc_tx;
let mut grpc_rx;
{
let mut grpc_client = grpc_client.write().await;
let mut grpc_client = GeyserGrpcClient::connect::<String, String>(
endpoint.clone(),
auth_header.clone(),
None,
);
if let Err(e) = grpc_client {
error!("Error connecting to gRPC, waiting one second then retrying connect: {}", e);
statsd_count!("grpc_connect_error", 1);
sleep(Duration::from_secs(1)).await;
continue;
}
let subscription = grpc_client
.unwrap()
.subscribe_with_request(Some(get_block_subscribe_request()))
.await;
if let Err(e) = subscription {
Expand Down Expand Up @@ -105,26 +115,41 @@ impl<T: Interceptor + Send + Sync + 'static> GrpcGeyserImpl<T> {
}
},
Err(error) => {
error!("error in txn subscribe, resubscribing in 1 second: {error:?}");
sleep(Duration::from_secs(1)).await;
error!(
"error in block subscribe, resubscribing in 1 second: {error:?}"
);
statsd_count!("grpc_resubscribe", 1);
break;
}
}
}
sleep(Duration::from_secs(1)).await;
}
});
}

fn poll_slots(&self) {
let grpc_client = self.grpc_client.clone();
let endpoint = self.endpoint.clone();
let auth_header = self.auth_header.clone();
let cur_slot = self.cur_slot.clone();
// let grpc_tx = self.grpc_tx.clone();
tokio::spawn(async move {
loop {
let mut grpc_tx;
let mut grpc_rx;
{
let mut grpc_client = grpc_client.write().await;
let subscription = grpc_client.subscribe().await;
let mut grpc_client = GeyserGrpcClient::connect::<String, String>(
endpoint.clone(),
auth_header.clone(),
None,
);
if let Err(e) = grpc_client {
error!("Error connecting to gRPC, waiting one second then retrying connect: {}", e);
statsd_count!("grpc_connect_error", 1);
sleep(Duration::from_secs(1)).await;
continue;
}
let subscription = grpc_client.unwrap().subscribe().await;
if let Err(e) = subscription {
error!("Error subscribing to gRPC stream, waiting one second then retrying connect: {}", e);
statsd_count!("grpc_subscribe_error", 1);
Expand Down Expand Up @@ -158,7 +183,8 @@ impl<T: Interceptor + Send + Sync + 'static> GrpcGeyserImpl<T> {
}
}
Err(error) => {
error!("error: {error:?}");
error!("error in slot subscribe, resubscribing in 1 second: {error:?}");
statsd_count!("grpc_resubscribe", 1);
break;
}
}
Expand All @@ -170,15 +196,15 @@ impl<T: Interceptor + Send + Sync + 'static> GrpcGeyserImpl<T> {
}

#[async_trait]
impl<T: Interceptor + Send + Sync> SolanaRpc for GrpcGeyserImpl<T> {
impl SolanaRpc for GrpcGeyserImpl {
async fn confirm_transaction(&self, signature: String) -> Option<UnixTimestamp> {
let start = Instant::now();
// in practice if a tx doesn't land in less than 60 seconds it's probably not going to land
while start.elapsed() < Duration::from_secs(60) {
if let Some(block_time) = self.signature_cache.get(&signature) {
return Some(block_time.0.clone());
}
sleep(Duration::from_millis(200)).await;
sleep(Duration::from_millis(10)).await;
}
return None;
}
Expand Down
11 changes: 6 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct AtlasTxnSenderEnv {
txn_sender_threads: Option<usize>,
max_txn_send_retries: Option<usize>,
txn_send_retry_interval: Option<usize>,
max_retry_queue_size: Option<usize>,
}

// Defualt on RPC is 4
Expand Down Expand Up @@ -104,12 +105,11 @@ async fn main() -> anyhow::Result<()> {
));
}

let client = Arc::new(RwLock::new(
GeyserGrpcClient::connect::<String, String>(env.grpc_url.unwrap(), env.x_token, None)
.unwrap(),
));
let transaction_store = Arc::new(TransactionStoreImpl::new());
let solana_rpc = Arc::new(GrpcGeyserImpl::new(client));
let solana_rpc = Arc::new(GrpcGeyserImpl::new(
env.grpc_url.clone().unwrap(),
env.x_token.clone(),
));
let rpc_client = Arc::new(RpcClient::new(env.rpc_url.unwrap()));
let num_leaders = env.num_leaders.unwrap_or(2);
let leader_offset = env.leader_offset.unwrap_or(0);
Expand All @@ -127,6 +127,7 @@ async fn main() -> anyhow::Result<()> {
solana_rpc,
env.txn_sender_threads.unwrap_or(4),
txn_send_retry_interval_seconds,
env.max_retry_queue_size,
));
let max_txn_send_retries = env.max_txn_send_retries.unwrap_or(5);
let atlas_txn_sender =
Expand Down
141 changes: 85 additions & 56 deletions src/txn_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct TxnSenderImpl {
solana_rpc: Arc<dyn SolanaRpc>,
txn_sender_runtime: Arc<Runtime>,
txn_send_retry_interval_seconds: usize,
max_retry_queue_size: Option<usize>,
}

impl TxnSenderImpl {
Expand All @@ -52,6 +53,7 @@ impl TxnSenderImpl {
solana_rpc: Arc<dyn SolanaRpc>,
txn_sender_threads: usize,
txn_send_retry_interval_seconds: usize,
max_retry_queue_size: Option<usize>,
) -> Self {
let txn_sender_runtime = Builder::new_multi_thread()
.worker_threads(txn_sender_threads)
Expand All @@ -65,6 +67,7 @@ impl TxnSenderImpl {
solana_rpc,
txn_sender_runtime: Arc::new(txn_sender_runtime),
txn_send_retry_interval_seconds,
max_retry_queue_size,
};
txn_sender.retry_transactions();
txn_sender
Expand All @@ -76,14 +79,39 @@ impl TxnSenderImpl {
let connection_cache = self.connection_cache.clone();
let txn_sender_runtime = self.txn_sender_runtime.clone();
let txn_send_retry_interval_seconds = self.txn_send_retry_interval_seconds.clone();
let max_retry_queue_size = self.max_retry_queue_size.clone();
tokio::spawn(async move {
loop {
let mut transactions_reached_max_retries = vec![];
let transactions = transaction_store.get_transactions();
statsd_gauge!("transaction_retry_queue_length", transactions.len() as u64);
let transaction_map = transaction_store.get_transactions();
let queue_length = transaction_map.len();
statsd_gauge!("transaction_retry_queue_length", queue_length as u64);

// Shed transactions by retry_count, if necessary.
if let Some(max_size) = max_retry_queue_size {
if queue_length > max_size {
warn!(
"Transaction retry queue length is over the limit of {}: {}. Load shedding transactions with highest retry count.",
max_size,
queue_length
);
let mut transactions: Vec<(String, TransactionData)> = transaction_map
.iter()
.map(|x| (x.key().to_owned(), x.value().to_owned()))
.collect();
transactions.sort_by(|(_, a), (_, b)| a.retry_count.cmp(&b.retry_count));
let transactions_to_remove = transactions[(max_size + 1)..].to_vec();
for (signature, _) in transactions_to_remove {
transaction_store.remove_transaction(signature.clone());
transaction_map.remove(&signature);
}
let records_dropped = queue_length - max_size;
statsd_gauge!("transactions_retry_queue_dropped", records_dropped as u64);
}
}

let mut wire_transactions = vec![];
for mut transaction_data in transactions.iter_mut() {
for mut transaction_data in transaction_map.iter_mut() {
wire_transactions.push(transaction_data.wire_transaction.clone());
if transaction_data.retry_count >= transaction_data.max_retries {
transactions_reached_max_retries
Expand All @@ -92,48 +120,49 @@ impl TxnSenderImpl {
transaction_data.retry_count += 1;
}
}
let mut leader_num = 0;
for leader in leader_tracker.get_leaders() {
if leader.tpu_quic.is_none() {
error!("leader {:?} has no tpu_quic", leader);
continue;
}
let connection_cache = connection_cache.clone();
let sent_at = Instant::now();
let leader = Arc::new(leader.clone());
let wire_transactions = wire_transactions.clone();
txn_sender_runtime.spawn(async move {
// retry unless its a timeout
for i in 0..SEND_TXN_RETRIES {
let conn = connection_cache
.get_nonblocking_connection(&leader.tpu_quic.unwrap());
if let Ok(result) = timeout(MAX_TIMEOUT_SEND_DATA_BATCH, conn.send_data_batch(&wire_transactions)).await {
if let Err(e) = result {
if i == SEND_TXN_RETRIES-1 {
error!(
retry = "true",
"Failed to send transaction batch to {:?}: {}",
leader, e
);
} else {
warn!(
retry = "true",
"Retrying to send transaction batch to {:?}: {}",
leader, e
);
}
for wire_transaction in wire_transactions.iter() {
let mut leader_num = 0;
for leader in leader_tracker.get_leaders() {
if leader.tpu_quic.is_none() {
error!("leader {:?} has no tpu_quic", leader);
continue;
}
let connection_cache = connection_cache.clone();
let sent_at = Instant::now();
let leader = Arc::new(leader.clone());
let wire_transaction = wire_transaction.clone();
txn_sender_runtime.spawn(async move {
// retry unless its a timeout
for i in 0..SEND_TXN_RETRIES {
let conn = connection_cache
.get_nonblocking_connection(&leader.tpu_quic.unwrap());
if let Ok(result) = timeout(MAX_TIMEOUT_SEND_DATA_BATCH, conn.send_data(&wire_transaction)).await {
if let Err(e) = result {
if i == SEND_TXN_RETRIES-1 {
error!(
retry = "true",
"Failed to send transaction batch to {:?}: {}",
leader, e
);
statsd_count!("transaction_send_error", 1, "retry" => "true", "last_attempt" => "true");
} else {
let leader_num_str = leader_num.to_string();
statsd_time!(
"transaction_received_by_leader",
sent_at.elapsed(), "leader_num" => &leader_num_str, "api_key" => "not_applicable", "retry" => "true");
return;
statsd_count!("transaction_send_error", 1, "retry" => "true", "last_attempt" => "false");
}
} else {
let leader_num_str = leader_num.to_string();
statsd_time!(
"transaction_received_by_leader",
sent_at.elapsed(), "leader_num" => &leader_num_str, "api_key" => "not_applicable", "retry" => "true");
return;
}
statsd_count!("transaction_send_error", 1);
} else {
// Note: This is far too frequent to log. It will fill the disks on the host and cost too much on DD.
statsd_count!("transaction_send_timeout", 1);
}
});
leader_num += 1;
}
});
leader_num += 1;
}
}
// remove transactions that reached max retries
for signature in transactions_reached_max_retries {
Expand All @@ -144,6 +173,7 @@ impl TxnSenderImpl {
}
});
}

fn track_transaction(&self, transaction_data: &TransactionData) {
let sent_at = transaction_data.sent_at.clone();
let signature = get_signature(transaction_data);
Expand Down Expand Up @@ -268,29 +298,28 @@ impl TxnSender for TxnSenderImpl {
let conn =
connection_cache.get_nonblocking_connection(&leader.tpu_quic.unwrap());
if let Ok(result) = timeout(MAX_TIMEOUT_SEND_DATA, conn.send_data(&wire_transaction)).await {
if let Err(e) = result {
if i == SEND_TXN_RETRIES-1 {
error!(
retry = "false",
"Failed to send transaction to {:?}: {}",
leader, e
);
} else {
warn!(
retry = "false",
"Retrying to send transaction to {:?}: {}",
leader, e
);
}
if let Err(e) = result {
if i == SEND_TXN_RETRIES-1 {
error!(
retry = "false",
"Failed to send transaction to {:?}: {}",
leader, e
);
statsd_count!("transaction_send_error", 1, "retry" => "false", "last_attempt" => "true");
} else {
statsd_count!("transaction_send_error", 1, "retry" => "false", "last_attempt" => "false");
}
} else {
let leader_num_str = leader_num.to_string();
statsd_time!(
"transaction_received_by_leader",
transaction_data.sent_at.elapsed(), "leader_num" => &leader_num_str, "api_key" => &api_key, "retry" => "false");
return;
}
} else {
// Note: This is far too frequent to log. It will fill the disks on the host and cost too much on DD.
statsd_count!("transaction_send_timeout", 1);
}
statsd_count!("transaction_send_error", 1);
}
});
leader_num += 1;
Expand Down

0 comments on commit f42452e

Please sign in to comment.