Skip to content

Commit

Permalink
Add atomic lock to transaction executor (#152)
Browse files Browse the repository at this point in the history
* Add lock to tx executor stage

* Refactor into a compare_exchange operation
  • Loading branch information
nickgarfield authored Jan 31, 2023
1 parent ee5494d commit 9b57892
Showing 1 changed file with 47 additions and 28 deletions.
75 changes: 47 additions & 28 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{fmt::Debug, sync::Arc};
use std::{
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use clockwork_client::{
network::state::{Pool, Registry, Snapshot, SnapshotFrame, Worker},
Expand Down Expand Up @@ -38,6 +44,7 @@ pub struct TxExecutor {
pub runtime: Arc<Runtime>,
pub tpu_client: Arc<TpuClient>,
pub simulation_failures: DashMap<Pubkey, u32>,
pub is_locked: AtomicBool,
}

impl TxExecutor {
Expand All @@ -56,35 +63,27 @@ impl TxExecutor {
runtime,
tpu_client,
simulation_failures: DashMap::new(),
is_locked: AtomicBool::new(false),
}
}

pub fn execute_txs(self: Arc<Self>, slot: u64) -> PluginResult<()> {
self.spawn(|this| async move {
// Get this worker's position in the delegate pool.
let worker_pubkey = Worker::pubkey(this.config.worker_id);
let pool_position = this
.client
.get::<Pool>(&Pool::pubkey(0))
.map(|pool| {
let workers = &mut pool.workers.clone();
PoolPosition {
current_position: pool
.workers
.iter()
.position(|k| k.eq(&worker_pubkey))
.map(|i| i as u64),
workers: workers.make_contiguous().to_vec().clone(),
}
})
.unwrap();
// Lock until work is done.
if this
.clone()
.is_locked
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Ok(());
}

// Drop threads that cross the simulation failure threshold.
this.clone()
.simulation_failures
.retain(|thread_pubkey, failures| {
if *failures >= MAX_THREAD_SIMULATION_FAILURES {
// this.observers.thread.drop_thread(*thread_pubkey);
this.observers
.thread
.executable_threads
Expand All @@ -100,17 +99,36 @@ impl TxExecutor {
.message_history
.retain(|_msg_hash, msg_slot| *msg_slot >= slot - MESSAGE_DEDUPE_PERIOD);

// Rotate into the worker pool.
this.clone()
.execute_pool_rotate_txs(slot, pool_position.clone())
.await
.ok();
// Get this worker's position in the delegate pool.
let worker_pubkey = Worker::pubkey(this.config.worker_id);
if let Ok(pool_position) = this.client.get::<Pool>(&Pool::pubkey(0)).map(|pool| {
let workers = &mut pool.workers.clone();
PoolPosition {
current_position: pool
.workers
.iter()
.position(|k| k.eq(&worker_pubkey))
.map(|i| i as u64),
workers: workers.make_contiguous().to_vec().clone(),
}
}) {
// Rotate into the worker pool.
this.clone()
.execute_pool_rotate_txs(slot, pool_position.clone())
.await
.ok();

// Execute thread transactions.
// Execute thread transactions.
this.clone()
.execute_thread_exec_txs(slot, pool_position)
.await
.ok();
}

// Release the lock.
this.clone()
.execute_thread_exec_txs(slot, pool_position)
.await
.ok();
.is_locked
.store(false, std::sync::atomic::Ordering::Relaxed);

Ok(())
})
Expand Down Expand Up @@ -193,6 +211,7 @@ impl TxExecutor {
self.clone().simulation_failures.remove(&thread_pubkey);
self.clone().execute_tx(slot, &tx).map_err(|err| err).ok();
});

Ok(())
}

Expand Down

0 comments on commit 9b57892

Please sign in to comment.