From cef0d2d3b1b09b645511cfe59ea2046529c33765 Mon Sep 17 00:00:00 2001 From: mwrites <20499416+mwrites@users.noreply.github.com> Date: Mon, 24 Apr 2023 16:47:57 +0700 Subject: [PATCH 1/2] fixed build-script --- scripts/build-all.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/scripts/build-all.sh b/scripts/build-all.sh index 36c23eb06..46c897560 100755 --- a/scripts/build-all.sh +++ b/scripts/build-all.sh @@ -15,7 +15,7 @@ EOF } # Set build flags -maybeRustVersion= +maybeRustVersion=+1.60.0 installDir= buildVariant=debug maybeReleaseFlag= @@ -47,6 +47,10 @@ else exit 1 fi +# Install 1.60 if not installed +rustup install "${maybeRustVersion:1}" + + # Check the install directory is provided if [[ -z "$installDir" ]]; then usage "Install directory not specified" From 59dae868283a43bdfb0510d62260135c1781acf4 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Tue, 25 Apr 2023 17:44:21 -0500 Subject: [PATCH 2/2] Debug failed rotation issues (#243) * Experiment with new min_context_slot rather than confirmation level * Use prior slot for min_context_slot * Add debug logs * Use confirmed commitment * Move to observer based interface for pools * Add rotation history to avoid spamming failed txs * Cleanup pr * More cleanup * Final cleanup --- plugin/src/executors/tx.rs | 51 +++++++++++++++++++++++++++++++-- plugin/src/executors/webhook.rs | 2 +- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/plugin/src/executors/tx.rs b/plugin/src/executors/tx.rs index baed69d24..a29cdbfc6 100644 --- a/plugin/src/executors/tx.rs +++ b/plugin/src/executors/tx.rs @@ -45,11 +45,15 @@ static MAX_THREAD_SIMULATION_FAILURES: u32 = 5; /// The constant of the exponential backoff function. static EXPONENTIAL_BACKOFF_CONSTANT: u32 = 2; +/// The number of slots to wait since the last rotation attempt. +static ROTATION_CONFIRMATION_PERIOD: u64 = 9; + /// TxExecutor pub struct TxExecutor { pub config: PluginConfig, pub executable_threads: RwLock>, pub transaction_history: RwLock>, + pub rotation_history: RwLock>, pub dropped_threads: AtomicU64, pub keypair: Keypair, } @@ -72,6 +76,7 @@ impl TxExecutor { config: config.clone(), executable_threads: RwLock::new(HashMap::new()), transaction_history: RwLock::new(HashMap::new()), + rotation_history: RwLock::new(None), dropped_threads: AtomicU64::new(0), keypair: read_or_new_keypair(config.keypath), } @@ -120,7 +125,7 @@ impl TxExecutor { // Get self worker's position in the delegate pool. let worker_pubkey = Worker::pubkey(self.config.worker_id); - if let Ok(pool_position) = client.get::(&Pool::pubkey(0)).await.map(|pool| { + if let Ok(pool_position) = (client.get::(&Pool::pubkey(0)).await).map(|pool| { let workers = &mut pool.workers.clone(); PoolPosition { current_position: pool @@ -131,6 +136,8 @@ impl TxExecutor { workers: workers.make_contiguous().to_vec().clone(), } }) { + info!("pool_position: {:?}", pool_position); + // Rotate into the worker pool. if pool_position.current_position.is_none() { self.clone() @@ -235,9 +242,41 @@ impl TxExecutor { async fn execute_pool_rotate_txs( self: Arc, client: Arc, - _slot: u64, + slot: u64, pool_position: PoolPosition, ) -> PluginResult<()> { + let r_rotation_history = self.rotation_history.read().await; + log::info!("Rotation history {:?}", r_rotation_history); + let should_attempt = if r_rotation_history.is_some() { + let rotation_history = r_rotation_history.as_ref().unwrap(); + if slot + > rotation_history + .slot_sent + .checked_add(ROTATION_CONFIRMATION_PERIOD) + .unwrap() + { + if let Ok(sig_status) = client + .get_signature_status(&rotation_history.signature) + .await + { + if let Some(status) = sig_status { + status.is_err() + } else { + true + } + } else { + true + } + } else { + false + } + } else { + true + }; + drop(r_rotation_history); + if !should_attempt { + return Ok(()); + } let registry = client.get::(&Registry::pubkey()).await.unwrap(); let snapshot_pubkey = Snapshot::pubkey(registry.current_epoch); let snapshot_frame_pubkey = SnapshotFrame::pubkey(snapshot_pubkey, self.config.worker_id); @@ -256,6 +295,12 @@ impl TxExecutor { { self.clone().simulate_tx(&tx).await?; self.clone().submit_tx(&tx).await?; + let mut w_rotation_history = self.rotation_history.write().await; + *w_rotation_history = Some(TransactionMetadata { + slot_sent: slot, + signature: tx.signatures[0], + }); + drop(w_rotation_history); } } } @@ -502,7 +547,7 @@ lazy_static! { let tpu_client = TpuClient::new( rpc_client, LOCAL_WEBSOCKET_URL.into(), - TpuClientConfig::default(), + TpuClientConfig { fanout_slots: 24 }, ) .await .unwrap(); diff --git a/plugin/src/executors/webhook.rs b/plugin/src/executors/webhook.rs index 641729f1c..07cf8d444 100644 --- a/plugin/src/executors/webhook.rs +++ b/plugin/src/executors/webhook.rs @@ -60,7 +60,7 @@ impl WebhookExecutor { let url = "http://127.0.0.1:8000/relay"; let client = reqwest::Client::new(); // for request_pubkey in requests { - let res = dbg!( + let _res = dbg!( client .post(url) .header(CONTENT_TYPE, "application/json")