Skip to content

Commit

Permalink
Merge pull request #1 from clockwork-xyz/main
Browse files Browse the repository at this point in the history
Sync latest changes
  • Loading branch information
thewuhxyz authored Apr 27, 2023
2 parents 1890d56 + 59dae86 commit 87d4a38
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
51 changes: 48 additions & 3 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ static MAX_THREAD_SIMULATION_FAILURES: u32 = 5;
/// The constant of the exponential backoff function.
static EXPONENTIAL_BACKOFF_CONSTANT: u32 = 2;

/// The number of slots to wait since the last rotation attempt.
static ROTATION_CONFIRMATION_PERIOD: u64 = 9;

/// TxExecutor
pub struct TxExecutor {
pub config: PluginConfig,
pub executable_threads: RwLock<HashMap<Pubkey, ExecutableThreadMetadata>>,
pub transaction_history: RwLock<HashMap<Pubkey, TransactionMetadata>>,
pub rotation_history: RwLock<Option<TransactionMetadata>>,
pub dropped_threads: AtomicU64,
pub keypair: Keypair,
}
Expand All @@ -72,6 +76,7 @@ impl TxExecutor {
config: config.clone(),
executable_threads: RwLock::new(HashMap::new()),
transaction_history: RwLock::new(HashMap::new()),
rotation_history: RwLock::new(None),
dropped_threads: AtomicU64::new(0),
keypair: read_or_new_keypair(config.keypath),
}
Expand Down Expand Up @@ -120,7 +125,7 @@ impl TxExecutor {

// Get self worker's position in the delegate pool.
let worker_pubkey = Worker::pubkey(self.config.worker_id);
if let Ok(pool_position) = client.get::<Pool>(&Pool::pubkey(0)).await.map(|pool| {
if let Ok(pool_position) = (client.get::<Pool>(&Pool::pubkey(0)).await).map(|pool| {
let workers = &mut pool.workers.clone();
PoolPosition {
current_position: pool
Expand All @@ -131,6 +136,8 @@ impl TxExecutor {
workers: workers.make_contiguous().to_vec().clone(),
}
}) {
info!("pool_position: {:?}", pool_position);

// Rotate into the worker pool.
if pool_position.current_position.is_none() {
self.clone()
Expand Down Expand Up @@ -235,9 +242,41 @@ impl TxExecutor {
async fn execute_pool_rotate_txs(
self: Arc<Self>,
client: Arc<RpcClient>,
_slot: u64,
slot: u64,
pool_position: PoolPosition,
) -> PluginResult<()> {
let r_rotation_history = self.rotation_history.read().await;
log::info!("Rotation history {:?}", r_rotation_history);
let should_attempt = if r_rotation_history.is_some() {
let rotation_history = r_rotation_history.as_ref().unwrap();
if slot
> rotation_history
.slot_sent
.checked_add(ROTATION_CONFIRMATION_PERIOD)
.unwrap()
{
if let Ok(sig_status) = client
.get_signature_status(&rotation_history.signature)
.await
{
if let Some(status) = sig_status {
status.is_err()
} else {
true
}
} else {
true
}
} else {
false
}
} else {
true
};
drop(r_rotation_history);
if !should_attempt {
return Ok(());
}
let registry = client.get::<Registry>(&Registry::pubkey()).await.unwrap();
let snapshot_pubkey = Snapshot::pubkey(registry.current_epoch);
let snapshot_frame_pubkey = SnapshotFrame::pubkey(snapshot_pubkey, self.config.worker_id);
Expand All @@ -256,6 +295,12 @@ impl TxExecutor {
{
self.clone().simulate_tx(&tx).await?;
self.clone().submit_tx(&tx).await?;
let mut w_rotation_history = self.rotation_history.write().await;
*w_rotation_history = Some(TransactionMetadata {
slot_sent: slot,
signature: tx.signatures[0],
});
drop(w_rotation_history);
}
}
}
Expand Down Expand Up @@ -529,7 +574,7 @@ lazy_static! {
let tpu_client = TpuClient::new(
rpc_client,
LOCAL_WEBSOCKET_URL.into(),
TpuClientConfig::default(),
TpuClientConfig { fanout_slots: 24 },
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion plugin/src/executors/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl WebhookExecutor {
let url = "http://127.0.0.1:8000/relay";
let client = reqwest::Client::new();
// for request_pubkey in requests {
let res = dbg!(
let _res = dbg!(
client
.post(url)
.header(CONTENT_TYPE, "application/json")
Expand Down
6 changes: 5 additions & 1 deletion scripts/build-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ EOF
}

# Set build flags
maybeRustVersion=
maybeRustVersion=+1.60.0
installDir=
buildVariant=debug
maybeReleaseFlag=
Expand Down Expand Up @@ -47,6 +47,10 @@ else
exit 1
fi

# Install 1.60 if not installed
rustup install "${maybeRustVersion:1}"


# Check the install directory is provided
if [[ -z "$installDir" ]]; then
usage "Install directory not specified"
Expand Down

0 comments on commit 87d4a38

Please sign in to comment.