Skip to content

Commit

Permalink
update plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
thewuhxyz committed Apr 26, 2023
1 parent b660b61 commit 9092151
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
simple-error = "0.2.3"
solana-account-decoder = "=1.14.16"
solana-address-lookup-table-program = "1.10.1"
solana-client = "=1.14.16"
solana-geyser-plugin-interface = "=1.14.16"
solana-logger = "=1.14.16"
Expand Down
226 changes: 216 additions & 10 deletions plugin/src/builders/thread_exec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use anchor_lang::{InstructionData, ToAccountMetas};
use bincode::serialize;
use clockwork_client::{network::state::Worker, thread::state::Trigger};
use clockwork_thread_program::state::VersionedThread;
use clockwork_utils::thread::PAYER_PUBKEY;
Expand All @@ -16,12 +17,16 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{
};
use solana_program::{
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
pubkey::Pubkey, address_lookup_table_account::AddressLookupTableAccount,
};
use solana_sdk::{
account::Account, commitment_config::CommitmentConfig,
compute_budget::ComputeBudgetInstruction, signature::Keypair, signer::Signer,
transaction::Transaction,
account::Account,
commitment_config::CommitmentConfig,
compute_budget::ComputeBudgetInstruction,
message::{v0, VersionedMessage},
signature::Keypair,
signer::Signer,
transaction::{VersionedTransaction},
};

/// Max byte size of a serialized transaction.
Expand All @@ -40,7 +45,8 @@ pub async fn build_thread_exec_tx(
thread: VersionedThread,
thread_pubkey: Pubkey,
worker_id: u64,
) -> PluginResult<Option<Transaction>> {
address_lookup_tables: Vec<AddressLookupTableAccount>
) -> PluginResult<Option<VersionedTransaction>> {
// Grab the thread and relevant data.
let now = std::time::Instant::now();
let blockhash = client.get_latest_blockhash().await.unwrap();
Expand Down Expand Up @@ -73,11 +79,21 @@ pub async fn build_thread_exec_tx(
let mut successful_ixs: Vec<Instruction> = vec![];
let mut units_consumed: Option<u64> = None;
loop {
let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey));
sim_tx.sign(&[payer], blockhash);
// let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey));
// sim_tx.sign(&[payer], blockhash);

let sim_tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&signatory_pubkey,
&ixs,
&address_lookup_tables,
blockhash,
).expect("error compiling to v0 message")),
&[payer],
).expect("error creating new versioned transaction");

// Exit early if the transaction exceeds the size limit.
if sim_tx.message_data().len() > TRANSACTION_MESSAGE_SIZE_LIMIT {
if serialize(&sim_tx).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT {
break;
}

Expand Down Expand Up @@ -198,9 +214,19 @@ pub async fn build_thread_exec_tx(
);
}

// let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey));
// tx.sign(&[payer], blockhash);

// Build and return the signed transaction.
let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey));
tx.sign(&[payer], blockhash);
let tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&signatory_pubkey,
&ixs,
&address_lookup_tables,
blockhash,
).expect("error compiling to v0 message")),
&[payer],
).expect("error creating new versioned transaction");
info!(
"slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}",
slot,
Expand All @@ -212,6 +238,186 @@ pub async fn build_thread_exec_tx(
);
Ok(Some(tx))
}
// pub async fn build_thread_exec_tx(
// client: Arc<RpcClient>,
// payer: &Keypair,
// slot: u64,
// thread: VersionedThread,
// thread_pubkey: Pubkey,
// worker_id: u64,
// ) -> PluginResult<Option<Transaction>> {
// // Grab the thread and relevant data.
// let now = std::time::Instant::now();
// let blockhash = client.get_latest_blockhash().await.unwrap();
// let signatory_pubkey = payer.pubkey();
// let worker_pubkey = Worker::pubkey(worker_id);

// // Build the first instruction of the transaction.
// let first_instruction = if thread.next_instruction().is_some() {
// build_exec_ix(
// thread.clone(),
// thread_pubkey,
// signatory_pubkey,
// worker_pubkey,
// )
// } else {
// build_kickoff_ix(
// thread.clone(),
// thread_pubkey,
// signatory_pubkey,
// worker_pubkey,
// )
// };

// // Simulate the transaction and pack as many instructions as possible until we hit mem/cpu limits.
// // TODO Migrate to versioned transactions.
// let mut ixs: Vec<Instruction> = vec![
// ComputeBudgetInstruction::set_compute_unit_limit(TRANSACTION_COMPUTE_UNIT_LIMIT),
// first_instruction,
// ];
// let mut successful_ixs: Vec<Instruction> = vec![];
// let mut units_consumed: Option<u64> = None;
// loop {

// let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey));
// sim_tx.sign(&[payer], blockhash);

// // Exit early if the transaction exceeds the size limit.
// if sim_tx.message_data().len() > TRANSACTION_MESSAGE_SIZE_LIMIT {
// break;
// }

// // Run the simulation.
// match client
// .simulate_transaction_with_config(
// &sim_tx,
// RpcSimulateTransactionConfig {
// sig_verify: false,
// replace_recent_blockhash: true,
// commitment: Some(CommitmentConfig::processed()),
// accounts: Some(RpcSimulateTransactionAccountsConfig {
// encoding: Some(UiAccountEncoding::Base64Zstd),
// addresses: vec![thread_pubkey.to_string()],
// }),
// min_context_slot: Some(slot),
// ..RpcSimulateTransactionConfig::default()
// },
// )
// .await
// {
// // If there was a simulation error, stop packing and exit now.
// Err(err) => {
// match err.kind {
// solana_client::client_error::ClientErrorKind::RpcError(rpc_err) => {
// match rpc_err {
// solana_client::rpc_request::RpcError::RpcResponseError {
// code,
// message: _,
// data: _,
// } => {
// if code.eq(&JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED) {
// return Err(GeyserPluginError::Custom(
// format!("RPC client has not reached min context slot")
// .into(),
// ));
// }
// }
// _ => {}
// }
// }
// _ => {}
// }
// break;
// }

// // If the simulation was successful, pack the ix into the tx.
// Ok(response) => {
// if response.value.err.is_some() {
// if successful_ixs.is_empty() {
// info!(
// "slot: {} thread: {} simulation_error: \"{}\" logs: {:?}",
// slot,
// thread_pubkey,
// response.value.err.unwrap(),
// response.value.logs.unwrap_or(vec![]),
// );
// }
// break;
// }

// // Update flag tracking if at least one instruction succeed.
// successful_ixs = ixs.clone();

// // Record the compute units consumed by the simulation.
// if response.value.units_consumed.is_some() {
// units_consumed = response.value.units_consumed;
// }

// // Parse the resulting thread account for the next instruction to simulate.
// if let Some(ui_accounts) = response.value.accounts {
// if let Some(Some(ui_account)) = ui_accounts.get(0) {
// if let Some(account) = ui_account.decode::<Account>() {
// if let Ok(sim_thread) = VersionedThread::try_from(account.data) {
// if sim_thread.next_instruction().is_some() {
// if let Some(exec_context) = sim_thread.exec_context() {
// if exec_context
// .execs_since_slot
// .lt(&sim_thread.rate_limit())
// {
// ixs.push(build_exec_ix(
// sim_thread,
// thread_pubkey,
// signatory_pubkey,
// worker_pubkey,
// ));
// } else {
// // Exit early if the thread has reached its rate limit.
// break;
// }
// }
// } else {
// break;
// }
// }
// }
// }
// }
// }
// }
// }

// // If there were no successful instructions, then exit early. There is nothing to do.
// // Alternatively, exit early if only the kickoff instruction (and no execs) succeeded.
// if successful_ixs.is_empty() {
// return Ok(None);
// }

// // Set the transaction's compute unit limit to be exactly the amount that was used in simulation.
// if let Some(units_consumed) = units_consumed {
// let units_committed = std::cmp::min(
// (units_consumed as u32) + TRANSACTION_COMPUTE_UNIT_BUFFER,
// TRANSACTION_COMPUTE_UNIT_LIMIT,
// );
// _ = std::mem::replace(
// &mut successful_ixs[0],
// ComputeBudgetInstruction::set_compute_unit_limit(units_committed),
// );
// }

// // Build and return the signed transaction.
// let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey));
// tx.sign(&[payer], blockhash);
// info!(
// "slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}",
// slot,
// thread_pubkey,
// now.elapsed(),
// successful_ixs.len(),
// units_consumed,
// tx.signatures[0]
// );
// Ok(Some(tx))
// }

fn build_kickoff_ix(
thread: VersionedThread,
Expand Down
60 changes: 60 additions & 0 deletions plugin/src/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ use std::{

use anchor_lang::{prelude::Pubkey, AccountDeserialize};
use async_trait::async_trait;
use clockwork_thread_program::state::LookupTables;
use log::info;
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_client::{
client_error::{ClientError, ClientErrorKind, Result as ClientResult},
nonblocking::rpc_client::RpcClient,
};
use solana_geyser_plugin_interface::geyser_plugin_interface::Result as PluginResult;
use solana_program::address_lookup_table_account::AddressLookupTableAccount;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::runtime::Runtime;
use tx::TxExecutor;
Expand Down Expand Up @@ -137,3 +140,60 @@ impl AccountGet for RpcClient {
})
}
}

#[async_trait]
pub trait LookupTablesGet {
async fn get_lookup_tables(
&self,
pubkey: &Pubkey,
) -> ClientResult<Vec<AddressLookupTableAccount>>;
}

#[async_trait]
impl LookupTablesGet for RpcClient {
async fn get_lookup_tables(
&self,
pubkey: &Pubkey,
) -> ClientResult<Vec<AddressLookupTableAccount>> {
let lookup_account = self
.get_account_with_commitment(pubkey, self.commitment()) // returns Ok(None) if lookup account is not initialized
.await
.expect("error getting lookup account")
.value;
match lookup_account {
// return empty vec if lookup account has not been initialized
None => Ok(vec![]),

// get lookup tables in lookup accounts if account has been initialized
Some(lookup) => {
let lookup_keys = LookupTables::try_deserialize(&mut lookup.data.as_slice())
.map_err(|_| {
ClientError::from(ClientErrorKind::Custom(format!(
"Failed to deserialize account data"
)))
})
.expect("Failed to deserialize lookup data")
.lookup_tables;

let mut lookup_tables: Vec<AddressLookupTableAccount> = vec![];

for key in lookup_keys {
let raw_account = self
.get_account(&key)
.await
.expect("Could not fetch Address Lookup Table account");
let address_lookup_table = AddressLookupTable::deserialize(&raw_account.data)
.expect("Could not deserialise Address Lookup Table");
let address_lookup_table_account = AddressLookupTableAccount {
key,
addresses: address_lookup_table.addresses.to_vec(),
};

lookup_tables.push(address_lookup_table_account)
}

return Ok(lookup_tables);
}
}
}
}
Loading

0 comments on commit 9092151

Please sign in to comment.