Skip to content

Commit

Permalink
Update thread retry logic to avoid unnecessary retries (#253)
Browse files Browse the repository at this point in the history
* Increase the timeout window for threads

* Adjust backoff values

* Update logic related to thread retries
  • Loading branch information
nickgarfield authored May 15, 2023
1 parent 2faa059 commit 4ef9583
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use crate::{config::PluginConfig, pool_position::PoolPosition, utils::read_or_ne
use super::AccountGet;

/// Number of slots to wait before checking for a confirmed transaction.
static TRANSACTION_CONFIRMATION_PERIOD: u64 = 10;
static TRANSACTION_CONFIRMATION_PERIOD: u64 = 24;

/// Number of slots to wait before trying to execute a thread while not in the pool.
static THREAD_TIMEOUT_WINDOW: u64 = 8;
static THREAD_TIMEOUT_WINDOW: u64 = 24;

/// Number of times to retry a thread simulation.
static MAX_THREAD_SIMULATION_FAILURES: u32 = 5;
Expand All @@ -46,7 +46,7 @@ static MAX_THREAD_SIMULATION_FAILURES: u32 = 5;
static EXPONENTIAL_BACKOFF_CONSTANT: u32 = 2;

/// The number of slots to wait since the last rotation attempt.
static ROTATION_CONFIRMATION_PERIOD: u64 = 9;
static ROTATION_CONFIRMATION_PERIOD: u64 = 16;

/// TxExecutor
pub struct TxExecutor {
Expand All @@ -66,7 +66,8 @@ pub struct ExecutableThreadMetadata {

#[derive(Debug)]
pub struct TransactionMetadata {
pub slot_sent: u64,
pub due_slot: u64,
pub sent_slot: u64,
pub signature: Signature,
}

Expand Down Expand Up @@ -163,14 +164,16 @@ impl TxExecutor {
) -> PluginResult<()> {
// Get transaction signatures and corresponding threads to check.
struct CheckableTransaction {
due_slot: u64,
thread_pubkey: Pubkey,
signature: Signature,
}
let r_transaction_history = self.transaction_history.read().await;
let checkable_transactions = r_transaction_history
.iter()
.filter(|(_, metadata)| slot > metadata.slot_sent + TRANSACTION_CONFIRMATION_PERIOD)
.filter(|(_, metadata)| slot > metadata.sent_slot + TRANSACTION_CONFIRMATION_PERIOD)
.map(|(pubkey, metadata)| CheckableTransaction {
due_slot: metadata.due_slot,
thread_pubkey: *pubkey,
signature: metadata.signature,
})
Expand All @@ -179,7 +182,7 @@ impl TxExecutor {

// Lookup transaction statuses and track which threads are successful / retriable.
let mut failed_threads: HashSet<Pubkey> = HashSet::new();
let mut retriable_threads: HashSet<Pubkey> = HashSet::new();
let mut retriable_threads: HashSet<(Pubkey, u64)> = HashSet::new();
let mut successful_threads: HashSet<Pubkey> = HashSet::new();
for data in checkable_transactions {
match client
Expand All @@ -196,7 +199,7 @@ impl TxExecutor {
"Retrying thread: {:?} missing_signature: {:?}",
data.thread_pubkey, data.signature
);
retriable_threads.insert(data.thread_pubkey);
retriable_threads.insert((data.thread_pubkey, data.due_slot));
}
Some(status) => match status {
Err(err) => {
Expand All @@ -223,12 +226,12 @@ impl TxExecutor {
for pubkey in failed_threads {
w_transaction_history.remove(&pubkey);
}
for pubkey in retriable_threads {
for (pubkey, due_slot) in retriable_threads {
w_transaction_history.remove(&pubkey);
w_executable_threads.insert(
pubkey,
ExecutableThreadMetadata {
due_slot: slot,
due_slot,
simulation_failures: 0,
},
);
Expand All @@ -251,7 +254,7 @@ impl TxExecutor {
let rotation_history = r_rotation_history.as_ref().unwrap();
if slot
> rotation_history
.slot_sent
.sent_slot
.checked_add(ROTATION_CONFIRMATION_PERIOD)
.unwrap()
{
Expand Down Expand Up @@ -297,7 +300,8 @@ impl TxExecutor {
self.clone().submit_tx(&tx).await?;
let mut w_rotation_history = self.rotation_history.write().await;
*w_rotation_history = Some(TransactionMetadata {
slot_sent: slot,
due_slot: slot,
sent_slot: slot,
signature: tx.signatures[0],
});
drop(w_rotation_history);
Expand Down Expand Up @@ -364,7 +368,7 @@ impl TxExecutor {
))
})
.collect();
let mut executed_threads: HashMap<Pubkey, Signature> = HashMap::new();
let mut executed_threads: HashMap<Pubkey, (Signature, u64)> = HashMap::new();

// Serialize to wire transactions.
let wire_txs = futures::future::join_all(tasks)
Expand All @@ -374,8 +378,8 @@ impl TxExecutor {
Err(_err) => None,
Ok(res) => match res {
None => None,
Some((pubkey, tx)) => {
executed_threads.insert(*pubkey, tx.signatures[0]);
Some((pubkey, tx, due_slot)) => {
executed_threads.insert(*pubkey, (tx.signatures[0], *due_slot));
Some(tx)
}
},
Expand All @@ -399,12 +403,13 @@ impl TxExecutor {
Ok(()) => {
let mut w_executable_threads = self.executable_threads.write().await;
let mut w_transaction_history = self.transaction_history.write().await;
for (pubkey, signature) in executed_threads {
for (pubkey, (signature, due_slot)) in executed_threads {
w_executable_threads.remove(&pubkey);
w_transaction_history.insert(
pubkey,
TransactionMetadata {
slot_sent: observed_slot,
due_slot,
sent_slot: observed_slot,
signature,
},
);
Expand All @@ -423,7 +428,7 @@ impl TxExecutor {
observed_slot: u64,
due_slot: u64,
thread_pubkey: Pubkey,
) -> Option<(Pubkey, Transaction)> {
) -> Option<(Pubkey, Transaction, u64)> {
let thread = match client.clone().get::<VersionedThread>(&thread_pubkey).await {
Err(_err) => {
self.increment_simulation_failure(thread_pubkey).await;
Expand All @@ -432,6 +437,13 @@ impl TxExecutor {
Ok(thread) => thread,
};

// Exit early if the thread has been executed after it became due.
if let Some(exec_context) = thread.exec_context() {
if exec_context.last_exec_at.gt(&due_slot) {
return None;
}
}

if let Ok(tx) = crate::builders::build_thread_exec_tx(
client.clone(),
&self.keypair,
Expand All @@ -449,7 +461,7 @@ impl TxExecutor {
.await
.is_ok()
{
Some((thread_pubkey, tx))
Some((thread_pubkey, tx, due_slot))
} else {
None
}
Expand Down Expand Up @@ -478,7 +490,7 @@ impl TxExecutor {
) -> PluginResult<()> {
let r_transaction_history = self.transaction_history.read().await;
if let Some(metadata) = r_transaction_history.get(&thread_pubkey) {
if metadata.signature.eq(&tx.signatures[0]) && metadata.slot_sent.le(&slot) {
if metadata.signature.eq(&tx.signatures[0]) && metadata.sent_slot.le(&slot) {
return Err(GeyserPluginError::Custom(format!("Transaction signature is a duplicate of a previously submitted transaction").into()));
}
}
Expand Down

0 comments on commit 4ef9583

Please sign in to comment.