From 90921515e82e6089cf517873239311097498d851 Mon Sep 17 00:00:00 2001 From: chizor iwuh Date: Wed, 26 Apr 2023 22:18:50 +0100 Subject: [PATCH] update plugin --- Cargo.lock | 1 + plugin/Cargo.toml | 1 + plugin/src/builders/thread_exec.rs | 226 +++++++++++++++++++++++++++-- plugin/src/executors/mod.rs | 60 ++++++++ plugin/src/executors/tx.rs | 14 +- rust-toolchain.toml | 2 +- 6 files changed, 291 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a2060610..e055675a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1230,6 +1230,7 @@ dependencies = [ "serde_json", "simple-error", "solana-account-decoder", + "solana-address-lookup-table-program", "solana-client", "solana-geyser-plugin-interface", "solana-logger", diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index 478e04df6..746e79ae5 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -39,6 +39,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" simple-error = "0.2.3" solana-account-decoder = "=1.14.16" +solana-address-lookup-table-program = "1.10.1" solana-client = "=1.14.16" solana-geyser-plugin-interface = "=1.14.16" solana-logger = "=1.14.16" diff --git a/plugin/src/builders/thread_exec.rs b/plugin/src/builders/thread_exec.rs index c6fe35be8..9a5461a6c 100644 --- a/plugin/src/builders/thread_exec.rs +++ b/plugin/src/builders/thread_exec.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anchor_lang::{InstructionData, ToAccountMetas}; +use bincode::serialize; use clockwork_client::{network::state::Worker, thread::state::Trigger}; use clockwork_thread_program::state::VersionedThread; use clockwork_utils::thread::PAYER_PUBKEY; @@ -16,12 +17,16 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{ }; use solana_program::{ instruction::{AccountMeta, Instruction}, - pubkey::Pubkey, + pubkey::Pubkey, address_lookup_table_account::AddressLookupTableAccount, }; use solana_sdk::{ - account::Account, commitment_config::CommitmentConfig, - compute_budget::ComputeBudgetInstruction, signature::Keypair, signer::Signer, - transaction::Transaction, + account::Account, + commitment_config::CommitmentConfig, + compute_budget::ComputeBudgetInstruction, + message::{v0, VersionedMessage}, + signature::Keypair, + signer::Signer, + transaction::{VersionedTransaction}, }; /// Max byte size of a serialized transaction. @@ -40,7 +45,8 @@ pub async fn build_thread_exec_tx( thread: VersionedThread, thread_pubkey: Pubkey, worker_id: u64, -) -> PluginResult> { + address_lookup_tables: Vec +) -> PluginResult> { // Grab the thread and relevant data. let now = std::time::Instant::now(); let blockhash = client.get_latest_blockhash().await.unwrap(); @@ -73,11 +79,21 @@ pub async fn build_thread_exec_tx( let mut successful_ixs: Vec = vec![]; let mut units_consumed: Option = None; loop { - let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey)); - sim_tx.sign(&[payer], blockhash); + // let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey)); + // sim_tx.sign(&[payer], blockhash); + + let sim_tx = VersionedTransaction::try_new( + VersionedMessage::V0(v0::Message::try_compile( + &signatory_pubkey, + &ixs, + &address_lookup_tables, + blockhash, + ).expect("error compiling to v0 message")), + &[payer], + ).expect("error creating new versioned transaction"); // Exit early if the transaction exceeds the size limit. - if sim_tx.message_data().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { + if serialize(&sim_tx).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { break; } @@ -198,9 +214,19 @@ pub async fn build_thread_exec_tx( ); } + // let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey)); + // tx.sign(&[payer], blockhash); + // Build and return the signed transaction. - let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey)); - tx.sign(&[payer], blockhash); + let tx = VersionedTransaction::try_new( + VersionedMessage::V0(v0::Message::try_compile( + &signatory_pubkey, + &ixs, + &address_lookup_tables, + blockhash, + ).expect("error compiling to v0 message")), + &[payer], + ).expect("error creating new versioned transaction"); info!( "slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}", slot, @@ -212,6 +238,186 @@ pub async fn build_thread_exec_tx( ); Ok(Some(tx)) } +// pub async fn build_thread_exec_tx( +// client: Arc, +// payer: &Keypair, +// slot: u64, +// thread: VersionedThread, +// thread_pubkey: Pubkey, +// worker_id: u64, +// ) -> PluginResult> { +// // Grab the thread and relevant data. +// let now = std::time::Instant::now(); +// let blockhash = client.get_latest_blockhash().await.unwrap(); +// let signatory_pubkey = payer.pubkey(); +// let worker_pubkey = Worker::pubkey(worker_id); + +// // Build the first instruction of the transaction. +// let first_instruction = if thread.next_instruction().is_some() { +// build_exec_ix( +// thread.clone(), +// thread_pubkey, +// signatory_pubkey, +// worker_pubkey, +// ) +// } else { +// build_kickoff_ix( +// thread.clone(), +// thread_pubkey, +// signatory_pubkey, +// worker_pubkey, +// ) +// }; + +// // Simulate the transaction and pack as many instructions as possible until we hit mem/cpu limits. +// // TODO Migrate to versioned transactions. +// let mut ixs: Vec = vec![ +// ComputeBudgetInstruction::set_compute_unit_limit(TRANSACTION_COMPUTE_UNIT_LIMIT), +// first_instruction, +// ]; +// let mut successful_ixs: Vec = vec![]; +// let mut units_consumed: Option = None; +// loop { + +// let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey)); +// sim_tx.sign(&[payer], blockhash); + +// // Exit early if the transaction exceeds the size limit. +// if sim_tx.message_data().len() > TRANSACTION_MESSAGE_SIZE_LIMIT { +// break; +// } + +// // Run the simulation. +// match client +// .simulate_transaction_with_config( +// &sim_tx, +// RpcSimulateTransactionConfig { +// sig_verify: false, +// replace_recent_blockhash: true, +// commitment: Some(CommitmentConfig::processed()), +// accounts: Some(RpcSimulateTransactionAccountsConfig { +// encoding: Some(UiAccountEncoding::Base64Zstd), +// addresses: vec![thread_pubkey.to_string()], +// }), +// min_context_slot: Some(slot), +// ..RpcSimulateTransactionConfig::default() +// }, +// ) +// .await +// { +// // If there was a simulation error, stop packing and exit now. +// Err(err) => { +// match err.kind { +// solana_client::client_error::ClientErrorKind::RpcError(rpc_err) => { +// match rpc_err { +// solana_client::rpc_request::RpcError::RpcResponseError { +// code, +// message: _, +// data: _, +// } => { +// if code.eq(&JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED) { +// return Err(GeyserPluginError::Custom( +// format!("RPC client has not reached min context slot") +// .into(), +// )); +// } +// } +// _ => {} +// } +// } +// _ => {} +// } +// break; +// } + +// // If the simulation was successful, pack the ix into the tx. +// Ok(response) => { +// if response.value.err.is_some() { +// if successful_ixs.is_empty() { +// info!( +// "slot: {} thread: {} simulation_error: \"{}\" logs: {:?}", +// slot, +// thread_pubkey, +// response.value.err.unwrap(), +// response.value.logs.unwrap_or(vec![]), +// ); +// } +// break; +// } + +// // Update flag tracking if at least one instruction succeed. +// successful_ixs = ixs.clone(); + +// // Record the compute units consumed by the simulation. +// if response.value.units_consumed.is_some() { +// units_consumed = response.value.units_consumed; +// } + +// // Parse the resulting thread account for the next instruction to simulate. +// if let Some(ui_accounts) = response.value.accounts { +// if let Some(Some(ui_account)) = ui_accounts.get(0) { +// if let Some(account) = ui_account.decode::() { +// if let Ok(sim_thread) = VersionedThread::try_from(account.data) { +// if sim_thread.next_instruction().is_some() { +// if let Some(exec_context) = sim_thread.exec_context() { +// if exec_context +// .execs_since_slot +// .lt(&sim_thread.rate_limit()) +// { +// ixs.push(build_exec_ix( +// sim_thread, +// thread_pubkey, +// signatory_pubkey, +// worker_pubkey, +// )); +// } else { +// // Exit early if the thread has reached its rate limit. +// break; +// } +// } +// } else { +// break; +// } +// } +// } +// } +// } +// } +// } +// } + +// // If there were no successful instructions, then exit early. There is nothing to do. +// // Alternatively, exit early if only the kickoff instruction (and no execs) succeeded. +// if successful_ixs.is_empty() { +// return Ok(None); +// } + +// // Set the transaction's compute unit limit to be exactly the amount that was used in simulation. +// if let Some(units_consumed) = units_consumed { +// let units_committed = std::cmp::min( +// (units_consumed as u32) + TRANSACTION_COMPUTE_UNIT_BUFFER, +// TRANSACTION_COMPUTE_UNIT_LIMIT, +// ); +// _ = std::mem::replace( +// &mut successful_ixs[0], +// ComputeBudgetInstruction::set_compute_unit_limit(units_committed), +// ); +// } + +// // Build and return the signed transaction. +// let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey)); +// tx.sign(&[payer], blockhash); +// info!( +// "slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}", +// slot, +// thread_pubkey, +// now.elapsed(), +// successful_ixs.len(), +// units_consumed, +// tx.signatures[0] +// ); +// Ok(Some(tx)) +// } fn build_kickoff_ix( thread: VersionedThread, diff --git a/plugin/src/executors/mod.rs b/plugin/src/executors/mod.rs index f47c3464a..8baa28465 100644 --- a/plugin/src/executors/mod.rs +++ b/plugin/src/executors/mod.rs @@ -11,12 +11,15 @@ use std::{ use anchor_lang::{prelude::Pubkey, AccountDeserialize}; use async_trait::async_trait; +use clockwork_thread_program::state::LookupTables; use log::info; +use solana_address_lookup_table_program::state::AddressLookupTable; use solana_client::{ client_error::{ClientError, ClientErrorKind, Result as ClientResult}, nonblocking::rpc_client::RpcClient, }; use solana_geyser_plugin_interface::geyser_plugin_interface::Result as PluginResult; +use solana_program::address_lookup_table_account::AddressLookupTableAccount; use solana_sdk::commitment_config::CommitmentConfig; use tokio::runtime::Runtime; use tx::TxExecutor; @@ -137,3 +140,60 @@ impl AccountGet for RpcClient { }) } } + +#[async_trait] +pub trait LookupTablesGet { + async fn get_lookup_tables( + &self, + pubkey: &Pubkey, + ) -> ClientResult>; +} + +#[async_trait] +impl LookupTablesGet for RpcClient { + async fn get_lookup_tables( + &self, + pubkey: &Pubkey, + ) -> ClientResult> { + let lookup_account = self + .get_account_with_commitment(pubkey, self.commitment()) // returns Ok(None) if lookup account is not initialized + .await + .expect("error getting lookup account") + .value; + match lookup_account { + // return empty vec if lookup account has not been initialized + None => Ok(vec![]), + + // get lookup tables in lookup accounts if account has been initialized + Some(lookup) => { + let lookup_keys = LookupTables::try_deserialize(&mut lookup.data.as_slice()) + .map_err(|_| { + ClientError::from(ClientErrorKind::Custom(format!( + "Failed to deserialize account data" + ))) + }) + .expect("Failed to deserialize lookup data") + .lookup_tables; + + let mut lookup_tables: Vec = vec![]; + + for key in lookup_keys { + let raw_account = self + .get_account(&key) + .await + .expect("Could not fetch Address Lookup Table account"); + let address_lookup_table = AddressLookupTable::deserialize(&raw_account.data) + .expect("Could not deserialise Address Lookup Table"); + let address_lookup_table_account = AddressLookupTableAccount { + key, + addresses: address_lookup_table.addresses.to_vec(), + }; + + lookup_tables.push(address_lookup_table_account) + } + + return Ok(lookup_tables); + } + } + } +} diff --git a/plugin/src/executors/tx.rs b/plugin/src/executors/tx.rs index baed69d24..0e340c6d2 100644 --- a/plugin/src/executors/tx.rs +++ b/plugin/src/executors/tx.rs @@ -10,7 +10,7 @@ use std::{ use async_once::AsyncOnce; use bincode::serialize; use clockwork_client::network::state::{Pool, Registry, Snapshot, SnapshotFrame, Worker}; -use clockwork_thread_program::state::VersionedThread; +use clockwork_thread_program::state::{LookupTables, VersionedThread}; use lazy_static::lazy_static; use log::info; use solana_client::{ @@ -31,7 +31,7 @@ use tokio::{runtime::Runtime, sync::RwLock}; use crate::{config::PluginConfig, pool_position::PoolPosition, utils::read_or_new_keypair}; -use super::AccountGet; +use super::{AccountGet, LookupTablesGet}; /// Number of slots to wait before checking for a confirmed transaction. static TRANSACTION_CONFIRMATION_PERIOD: u64 = 10; @@ -387,6 +387,15 @@ impl TxExecutor { Ok(thread) => thread, }; + let lookup_tables_key = LookupTables::pubkey(thread.authority(), thread.pubkey()); + + let address_lookup_tables = match client.clone().get_lookup_tables(&lookup_tables_key).await { + Err(_err) => { + return None; + } + Ok(address_lookup_tables) => address_lookup_tables, + }; + if let Ok(tx) = crate::builders::build_thread_exec_tx( client.clone(), &self.keypair, @@ -394,6 +403,7 @@ impl TxExecutor { thread, thread_pubkey, self.config.worker_id, + address_lookup_tables ) .await { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 20abe1ae7..61c017840 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "1.65.0" +channel = "1.60.0" components = [ "rustfmt" ] profile = "minimal"