From 5494c8250d25f4aa3eb024eb4061616803aa6afb Mon Sep 17 00:00:00 2001 From: chizor iwuh Date: Wed, 31 May 2023 14:33:25 +0100 Subject: [PATCH 1/4] migrate plugin to versioned tx --- plugin/src/builders/pool_rotation.rs | 18 ++++++++++--- plugin/src/builders/thread_exec.rs | 40 +++++++++++++++++++++------- plugin/src/executors/tx.rs | 29 +++++++++++++++----- 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/plugin/src/builders/pool_rotation.rs b/plugin/src/builders/pool_rotation.rs index 85c2f504b..02f5bf514 100644 --- a/plugin/src/builders/pool_rotation.rs +++ b/plugin/src/builders/pool_rotation.rs @@ -7,7 +7,8 @@ use anchor_lang::{ use clockwork_network_program::state::{Config, Pool, Registry, Snapshot, SnapshotFrame, Worker}; use log::info; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::{signature::Keypair, signer::Signer, transaction::Transaction}; +use solana_program::message::{VersionedMessage, v0}; +use solana_sdk::{signature::Keypair, signer::Signer, transaction::VersionedTransaction}; use crate::pool_position::PoolPosition; @@ -19,7 +20,7 @@ pub async fn build_pool_rotation_tx<'a>( snapshot: Snapshot, snapshot_frame: SnapshotFrame, worker_id: u64, -) -> Option { +) -> Option { info!("nonce: {:?} total_stake: {:?} current_position: {:?} stake_offset: {:?} stake_amount: {:?}", registry.nonce.checked_rem(snapshot.total_stake), snapshot.total_stake, @@ -81,7 +82,16 @@ pub async fn build_pool_rotation_tx<'a>( }; // Build and sign tx. - let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey())); - tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap()); + let blockhash = client.get_latest_blockhash().await.unwrap(); + + let tx = VersionedTransaction::try_new( + VersionedMessage::V0(v0::Message::try_compile( + &keypair.pubkey(), + &[ix.clone()], + &[], + blockhash, + ).expect("error compiling to v0 message")), + &[keypair], + ).expect("error creating new versioned transaction"); return Some(tx); } diff --git a/plugin/src/builders/thread_exec.rs b/plugin/src/builders/thread_exec.rs index e0ac02df7..b03efdac5 100644 --- a/plugin/src/builders/thread_exec.rs +++ b/plugin/src/builders/thread_exec.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anchor_lang::{InstructionData, ToAccountMetas}; use clockwork_thread_program::state::{VersionedThread, Trigger}; use clockwork_network_program::state::Worker; +use bincode::serialize; use clockwork_utils::thread::PAYER_PUBKEY; use log::info; use solana_account_decoder::UiAccountEncoding; @@ -16,12 +17,16 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{ }; use solana_program::{ instruction::{AccountMeta, Instruction}, - pubkey::Pubkey, + pubkey::Pubkey, address_lookup_table_account::AddressLookupTableAccount, }; use solana_sdk::{ - account::Account, commitment_config::CommitmentConfig, - compute_budget::ComputeBudgetInstruction, signature::Keypair, signer::Signer, - transaction::Transaction, + account::Account, + commitment_config::CommitmentConfig, + compute_budget::ComputeBudgetInstruction, + message::{v0, VersionedMessage}, + signature::Keypair, + signer::Signer, + transaction::{VersionedTransaction}, }; /// Max byte size of a serialized transaction. @@ -40,7 +45,8 @@ pub async fn build_thread_exec_tx( thread: VersionedThread, thread_pubkey: Pubkey, worker_id: u64, -) -> PluginResult> { + address_lookup_tables: Vec +) -> PluginResult> { // Grab the thread and relevant data. let now = std::time::Instant::now(); let blockhash = client.get_latest_blockhash().await.unwrap(); @@ -73,11 +79,18 @@ pub async fn build_thread_exec_tx( let mut successful_ixs: Vec = vec![]; let mut units_consumed: Option = None; loop { - let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey)); - sim_tx.sign(&[payer], blockhash); + let sim_tx = VersionedTransaction::try_new( + VersionedMessage::V0(v0::Message::try_compile( + &signatory_pubkey, + &ixs, + &address_lookup_tables, + blockhash, + ).expect("error compiling to v0 message")), + &[payer], + ).expect("error creating new versioned transaction"); // Exit early if the transaction exceeds the size limit. - if sim_tx.message_data().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { + if serialize(&sim_tx).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { break; } @@ -199,8 +212,15 @@ pub async fn build_thread_exec_tx( } // Build and return the signed transaction. - let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey)); - tx.sign(&[payer], blockhash); + let tx = VersionedTransaction::try_new( + VersionedMessage::V0(v0::Message::try_compile( + &signatory_pubkey, + &ixs, + &address_lookup_tables, + blockhash, + ).expect("error compiling to v0 message")), + &[payer], + ).expect("error creating new versioned transaction"); info!( "slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}", slot, diff --git a/plugin/src/executors/tx.rs b/plugin/src/executors/tx.rs index 8ff4d0511..175fc25ab 100644 --- a/plugin/src/executors/tx.rs +++ b/plugin/src/executors/tx.rs @@ -25,7 +25,7 @@ use solana_program::pubkey::Pubkey; use solana_sdk::{ commitment_config::CommitmentConfig, signature::{Keypair, Signature}, - transaction::Transaction, + transaction::{VersionedTransaction}, }; use tokio::{runtime::Runtime, sync::RwLock}; @@ -428,7 +428,7 @@ impl TxExecutor { observed_slot: u64, due_slot: u64, thread_pubkey: Pubkey, - ) -> Option<(Pubkey, Transaction, u64)> { + ) -> Option<(Pubkey, VersionedTransaction, u64)> { let thread = match client.clone().get::(&thread_pubkey).await { Err(_err) => { self.increment_simulation_failure(thread_pubkey).await; @@ -455,6 +455,7 @@ impl TxExecutor { thread, thread_pubkey, self.config.worker_id, + vec![], ) .await { @@ -490,7 +491,7 @@ impl TxExecutor { self: Arc, slot: u64, thread_pubkey: Pubkey, - tx: &Transaction, + tx: &VersionedTransaction, ) -> PluginResult<()> { let r_transaction_history = self.transaction_history.read().await; if let Some(metadata) = r_transaction_history.get(&thread_pubkey) { @@ -502,7 +503,10 @@ impl TxExecutor { Ok(()) } - async fn simulate_tx(self: Arc, tx: &Transaction) -> PluginResult { + async fn simulate_tx( + self: Arc, + tx: &VersionedTransaction, + ) -> PluginResult { TPU_CLIENT .get() .await @@ -531,14 +535,27 @@ impl TxExecutor { })? } - async fn submit_tx(self: Arc, tx: &Transaction) -> PluginResult { - if !TPU_CLIENT.get().await.send_transaction(tx).await { + async fn submit_tx( + self: Arc, + tx: &VersionedTransaction, + ) -> PluginResult { + let serialized_tx = serialize(tx).unwrap(); + + if !TPU_CLIENT.get().await.send_wire_transaction(serialized_tx).await { return Err(GeyserPluginError::Custom( "Failed to send transaction".into(), )); } Ok(tx.clone()) } + // async fn submit_tx(self: Arc, tx: &Transaction) -> PluginResult { + // if !TPU_CLIENT.get().await.send_transaction(tx).await { + // return Err(GeyserPluginError::Custom( + // "Failed to send transaction".into(), + // )); + // } + // Ok(tx.clone()) + // } } impl Debug for TxExecutor { From 12640d88a7ad60c9f0aa383009cac77cb61b4173 Mon Sep 17 00:00:00 2001 From: chizor iwuh Date: Thu, 1 Jun 2023 13:57:04 +0100 Subject: [PATCH 2/4] cleanup --- Cargo.lock | 4 ++-- plugin/src/builders/pool_rotation.rs | 9 +++------ plugin/src/builders/thread_exec.rs | 10 +++++----- plugin/src/executors/tx.rs | 10 +--------- 4 files changed, 11 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 657fa0005..e07f54e78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,9 +489,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" +checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" [[package]] name = "arrayref" diff --git a/plugin/src/builders/pool_rotation.rs b/plugin/src/builders/pool_rotation.rs index 02f5bf514..005d28121 100644 --- a/plugin/src/builders/pool_rotation.rs +++ b/plugin/src/builders/pool_rotation.rs @@ -1,9 +1,6 @@ use std::sync::Arc; -use anchor_lang::{ - solana_program::instruction::Instruction, - InstructionData, ToAccountMetas -}; +use anchor_lang::{solana_program::instruction::Instruction, InstructionData, ToAccountMetas}; use clockwork_network_program::state::{Config, Pool, Registry, Snapshot, SnapshotFrame, Worker}; use log::info; use solana_client::nonblocking::rpc_client::RpcClient; @@ -90,8 +87,8 @@ pub async fn build_pool_rotation_tx<'a>( &[ix.clone()], &[], blockhash, - ).expect("error compiling to v0 message")), + ).unwrap()), &[keypair], - ).expect("error creating new versioned transaction"); + ).unwrap(); return Some(tx); } diff --git a/plugin/src/builders/thread_exec.rs b/plugin/src/builders/thread_exec.rs index b03efdac5..a31639c87 100644 --- a/plugin/src/builders/thread_exec.rs +++ b/plugin/src/builders/thread_exec.rs @@ -26,7 +26,7 @@ use solana_sdk::{ message::{v0, VersionedMessage}, signature::Keypair, signer::Signer, - transaction::{VersionedTransaction}, + transaction::VersionedTransaction, }; /// Max byte size of a serialized transaction. @@ -85,9 +85,9 @@ pub async fn build_thread_exec_tx( &ixs, &address_lookup_tables, blockhash, - ).expect("error compiling to v0 message")), + ).unwrap()), &[payer], - ).expect("error creating new versioned transaction"); + ).unwrap(); // Exit early if the transaction exceeds the size limit. if serialize(&sim_tx).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { @@ -218,9 +218,9 @@ pub async fn build_thread_exec_tx( &ixs, &address_lookup_tables, blockhash, - ).expect("error compiling to v0 message")), + ).unwrap()), &[payer], - ).expect("error creating new versioned transaction"); + ).unwrap(); info!( "slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}", slot, diff --git a/plugin/src/executors/tx.rs b/plugin/src/executors/tx.rs index 175fc25ab..bdd928d94 100644 --- a/plugin/src/executors/tx.rs +++ b/plugin/src/executors/tx.rs @@ -25,7 +25,7 @@ use solana_program::pubkey::Pubkey; use solana_sdk::{ commitment_config::CommitmentConfig, signature::{Keypair, Signature}, - transaction::{VersionedTransaction}, + transaction::VersionedTransaction, }; use tokio::{runtime::Runtime, sync::RwLock}; @@ -548,14 +548,6 @@ impl TxExecutor { } Ok(tx.clone()) } - // async fn submit_tx(self: Arc, tx: &Transaction) -> PluginResult { - // if !TPU_CLIENT.get().await.send_transaction(tx).await { - // return Err(GeyserPluginError::Custom( - // "Failed to send transaction".into(), - // )); - // } - // Ok(tx.clone()) - // } } impl Debug for TxExecutor { From f6c382c4e1439014887dc984fd5a05e819c2f088 Mon Sep 17 00:00:00 2001 From: chizor iwuh Date: Fri, 2 Jun 2023 10:37:17 +0100 Subject: [PATCH 3/4] handle tx err --- plugin/src/builders/pool_rotation.rs | 20 ++++++++++----- plugin/src/builders/thread_exec.rs | 37 +++++++++++++++++++--------- 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/plugin/src/builders/pool_rotation.rs b/plugin/src/builders/pool_rotation.rs index 005d28121..4085e0d37 100644 --- a/plugin/src/builders/pool_rotation.rs +++ b/plugin/src/builders/pool_rotation.rs @@ -4,6 +4,7 @@ use anchor_lang::{solana_program::instruction::Instruction, InstructionData, ToA use clockwork_network_program::state::{Config, Pool, Registry, Snapshot, SnapshotFrame, Worker}; use log::info; use solana_client::nonblocking::rpc_client::RpcClient; +use solana_geyser_plugin_interface::geyser_plugin_interface::{GeyserPlugin, GeyserPluginError}; use solana_program::message::{VersionedMessage, v0}; use solana_sdk::{signature::Keypair, signer::Signer, transaction::VersionedTransaction}; @@ -81,14 +82,21 @@ pub async fn build_pool_rotation_tx<'a>( // Build and sign tx. let blockhash = client.get_latest_blockhash().await.unwrap(); - let tx = VersionedTransaction::try_new( - VersionedMessage::V0(v0::Message::try_compile( + let tx = match v0::Message::try_compile( &keypair.pubkey(), &[ix.clone()], &[], blockhash, - ).unwrap()), - &[keypair], - ).unwrap(); - return Some(tx); + ) { + Err(_) => Err(GeyserPluginError::Custom(format!("Failed to compile to v0 message ").into())), + Ok(message) => match VersionedTransaction::try_new( + VersionedMessage::V0(message), + &[keypair] + ) { + Err(_) => Err(GeyserPluginError::Custom(format!("Failed to create versioned transaction ").into())), + Ok(tx) => Ok(tx) + } + + }; + return tx.ok(); } diff --git a/plugin/src/builders/thread_exec.rs b/plugin/src/builders/thread_exec.rs index a31639c87..b93fda351 100644 --- a/plugin/src/builders/thread_exec.rs +++ b/plugin/src/builders/thread_exec.rs @@ -79,18 +79,25 @@ pub async fn build_thread_exec_tx( let mut successful_ixs: Vec = vec![]; let mut units_consumed: Option = None; loop { - let sim_tx = VersionedTransaction::try_new( - VersionedMessage::V0(v0::Message::try_compile( + let sim_tx = match v0::Message::try_compile( &signatory_pubkey, &ixs, &address_lookup_tables, blockhash, - ).unwrap()), - &[payer], - ).unwrap(); + ) { + Err(_) => Err(GeyserPluginError::Custom(format!("Failed to compile to v0 message ").into())), + Ok(message) => match VersionedTransaction::try_new( + VersionedMessage::V0(message), + &[payer] + ) { + Err(_) => Err(GeyserPluginError::Custom(format!("Failed to create versioned transaction ").into())), + Ok(tx) => Ok(tx) + } + + }; // Exit early if the transaction exceeds the size limit. - if serialize(&sim_tx).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { + if serialize(&sim_tx?).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { break; } @@ -212,15 +219,23 @@ pub async fn build_thread_exec_tx( } // Build and return the signed transaction. - let tx = VersionedTransaction::try_new( - VersionedMessage::V0(v0::Message::try_compile( + let tx = match v0::Message::try_compile( &signatory_pubkey, &ixs, &address_lookup_tables, blockhash, - ).unwrap()), - &[payer], - ).unwrap(); + ) { + Err(_) => Err(GeyserPluginError::Custom(format!("Failed to compile to v0 message ").into())), + Ok(message) => match VersionedTransaction::try_new( + VersionedMessage::V0(message), + &[payer] + ) { + Err(_) => Err(GeyserPluginError::Custom(format!("Failed to create versioned transaction ").into())), + Ok(tx) => Ok(tx) + } + + }; + info!( "slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}", slot, From da9a44bf99237cbb88cff22a3bc88d3889301e27 Mon Sep 17 00:00:00 2001 From: chizor iwuh Date: Fri, 2 Jun 2023 11:02:36 +0100 Subject: [PATCH 4/4] cleanup --- plugin/src/builders/pool_rotation.rs | 2 +- plugin/src/builders/thread_exec.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/plugin/src/builders/pool_rotation.rs b/plugin/src/builders/pool_rotation.rs index 4085e0d37..3376a5e19 100644 --- a/plugin/src/builders/pool_rotation.rs +++ b/plugin/src/builders/pool_rotation.rs @@ -4,7 +4,7 @@ use anchor_lang::{solana_program::instruction::Instruction, InstructionData, ToA use clockwork_network_program::state::{Config, Pool, Registry, Snapshot, SnapshotFrame, Worker}; use log::info; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_geyser_plugin_interface::geyser_plugin_interface::{GeyserPlugin, GeyserPluginError}; +use solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError; use solana_program::message::{VersionedMessage, v0}; use solana_sdk::{signature::Keypair, signer::Signer, transaction::VersionedTransaction}; diff --git a/plugin/src/builders/thread_exec.rs b/plugin/src/builders/thread_exec.rs index b93fda351..46c339fa0 100644 --- a/plugin/src/builders/thread_exec.rs +++ b/plugin/src/builders/thread_exec.rs @@ -93,11 +93,10 @@ pub async fn build_thread_exec_tx( Err(_) => Err(GeyserPluginError::Custom(format!("Failed to create versioned transaction ").into())), Ok(tx) => Ok(tx) } - - }; + }?; // Exit early if the transaction exceeds the size limit. - if serialize(&sim_tx?).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { + if serialize(&sim_tx).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { break; } @@ -234,7 +233,7 @@ pub async fn build_thread_exec_tx( Ok(tx) => Ok(tx) } - }; + }?; info!( "slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}",