From 4ef9583bc467ee5d6018d4fdaa00b3f2892a7624 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Mon, 15 May 2023 15:01:05 -0500 Subject: [PATCH] Update thread retry logic to avoid unnecessary retries (#253) * Increase the timeout window for threads * Adjust backoff values * Update logic related to thread retries --- plugin/src/executors/tx.rs | 50 +++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/plugin/src/executors/tx.rs b/plugin/src/executors/tx.rs index 5898989ad..4f54e7812 100644 --- a/plugin/src/executors/tx.rs +++ b/plugin/src/executors/tx.rs @@ -34,10 +34,10 @@ use crate::{config::PluginConfig, pool_position::PoolPosition, utils::read_or_ne use super::AccountGet; /// Number of slots to wait before checking for a confirmed transaction. -static TRANSACTION_CONFIRMATION_PERIOD: u64 = 10; +static TRANSACTION_CONFIRMATION_PERIOD: u64 = 24; /// Number of slots to wait before trying to execute a thread while not in the pool. -static THREAD_TIMEOUT_WINDOW: u64 = 8; +static THREAD_TIMEOUT_WINDOW: u64 = 24; /// Number of times to retry a thread simulation. static MAX_THREAD_SIMULATION_FAILURES: u32 = 5; @@ -46,7 +46,7 @@ static MAX_THREAD_SIMULATION_FAILURES: u32 = 5; static EXPONENTIAL_BACKOFF_CONSTANT: u32 = 2; /// The number of slots to wait since the last rotation attempt. -static ROTATION_CONFIRMATION_PERIOD: u64 = 9; +static ROTATION_CONFIRMATION_PERIOD: u64 = 16; /// TxExecutor pub struct TxExecutor { @@ -66,7 +66,8 @@ pub struct ExecutableThreadMetadata { #[derive(Debug)] pub struct TransactionMetadata { - pub slot_sent: u64, + pub due_slot: u64, + pub sent_slot: u64, pub signature: Signature, } @@ -163,14 +164,16 @@ impl TxExecutor { ) -> PluginResult<()> { // Get transaction signatures and corresponding threads to check. struct CheckableTransaction { + due_slot: u64, thread_pubkey: Pubkey, signature: Signature, } let r_transaction_history = self.transaction_history.read().await; let checkable_transactions = r_transaction_history .iter() - .filter(|(_, metadata)| slot > metadata.slot_sent + TRANSACTION_CONFIRMATION_PERIOD) + .filter(|(_, metadata)| slot > metadata.sent_slot + TRANSACTION_CONFIRMATION_PERIOD) .map(|(pubkey, metadata)| CheckableTransaction { + due_slot: metadata.due_slot, thread_pubkey: *pubkey, signature: metadata.signature, }) @@ -179,7 +182,7 @@ impl TxExecutor { // Lookup transaction statuses and track which threads are successful / retriable. let mut failed_threads: HashSet = HashSet::new(); - let mut retriable_threads: HashSet = HashSet::new(); + let mut retriable_threads: HashSet<(Pubkey, u64)> = HashSet::new(); let mut successful_threads: HashSet = HashSet::new(); for data in checkable_transactions { match client @@ -196,7 +199,7 @@ impl TxExecutor { "Retrying thread: {:?} missing_signature: {:?}", data.thread_pubkey, data.signature ); - retriable_threads.insert(data.thread_pubkey); + retriable_threads.insert((data.thread_pubkey, data.due_slot)); } Some(status) => match status { Err(err) => { @@ -223,12 +226,12 @@ impl TxExecutor { for pubkey in failed_threads { w_transaction_history.remove(&pubkey); } - for pubkey in retriable_threads { + for (pubkey, due_slot) in retriable_threads { w_transaction_history.remove(&pubkey); w_executable_threads.insert( pubkey, ExecutableThreadMetadata { - due_slot: slot, + due_slot, simulation_failures: 0, }, ); @@ -251,7 +254,7 @@ impl TxExecutor { let rotation_history = r_rotation_history.as_ref().unwrap(); if slot > rotation_history - .slot_sent + .sent_slot .checked_add(ROTATION_CONFIRMATION_PERIOD) .unwrap() { @@ -297,7 +300,8 @@ impl TxExecutor { self.clone().submit_tx(&tx).await?; let mut w_rotation_history = self.rotation_history.write().await; *w_rotation_history = Some(TransactionMetadata { - slot_sent: slot, + due_slot: slot, + sent_slot: slot, signature: tx.signatures[0], }); drop(w_rotation_history); @@ -364,7 +368,7 @@ impl TxExecutor { )) }) .collect(); - let mut executed_threads: HashMap = HashMap::new(); + let mut executed_threads: HashMap = HashMap::new(); // Serialize to wire transactions. let wire_txs = futures::future::join_all(tasks) @@ -374,8 +378,8 @@ impl TxExecutor { Err(_err) => None, Ok(res) => match res { None => None, - Some((pubkey, tx)) => { - executed_threads.insert(*pubkey, tx.signatures[0]); + Some((pubkey, tx, due_slot)) => { + executed_threads.insert(*pubkey, (tx.signatures[0], *due_slot)); Some(tx) } }, @@ -399,12 +403,13 @@ impl TxExecutor { Ok(()) => { let mut w_executable_threads = self.executable_threads.write().await; let mut w_transaction_history = self.transaction_history.write().await; - for (pubkey, signature) in executed_threads { + for (pubkey, (signature, due_slot)) in executed_threads { w_executable_threads.remove(&pubkey); w_transaction_history.insert( pubkey, TransactionMetadata { - slot_sent: observed_slot, + due_slot, + sent_slot: observed_slot, signature, }, ); @@ -423,7 +428,7 @@ impl TxExecutor { observed_slot: u64, due_slot: u64, thread_pubkey: Pubkey, - ) -> Option<(Pubkey, Transaction)> { + ) -> Option<(Pubkey, Transaction, u64)> { let thread = match client.clone().get::(&thread_pubkey).await { Err(_err) => { self.increment_simulation_failure(thread_pubkey).await; @@ -432,6 +437,13 @@ impl TxExecutor { Ok(thread) => thread, }; + // Exit early if the thread has been executed after it became due. + if let Some(exec_context) = thread.exec_context() { + if exec_context.last_exec_at.gt(&due_slot) { + return None; + } + } + if let Ok(tx) = crate::builders::build_thread_exec_tx( client.clone(), &self.keypair, @@ -449,7 +461,7 @@ impl TxExecutor { .await .is_ok() { - Some((thread_pubkey, tx)) + Some((thread_pubkey, tx, due_slot)) } else { None } @@ -478,7 +490,7 @@ impl TxExecutor { ) -> PluginResult<()> { let r_transaction_history = self.transaction_history.read().await; if let Some(metadata) = r_transaction_history.get(&thread_pubkey) { - if metadata.signature.eq(&tx.signatures[0]) && metadata.slot_sent.le(&slot) { + if metadata.signature.eq(&tx.signatures[0]) && metadata.sent_slot.le(&slot) { return Err(GeyserPluginError::Custom(format!("Transaction signature is a duplicate of a previously submitted transaction").into())); } }