Skip to content

Commit

Permalink
update plugin, client
Browse files Browse the repository at this point in the history
  • Loading branch information
thewuhxyz committed Apr 27, 2023
1 parent 9092151 commit 1890d56
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 16 deletions.
87 changes: 86 additions & 1 deletion client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use solana_sdk::{
signature::{Keypair, Signature, Signer},
signers::Signers,
system_instruction,
transaction::Transaction,
transaction::{Transaction, VersionedTransaction}, message::{VersionedMessage, v0}, address_lookup_table_account::AddressLookupTableAccount,
};
use std::{
fmt::Debug,
Expand Down Expand Up @@ -111,6 +111,19 @@ impl Client {
tx.sign(signers, self.latest_blockhash()?);
Ok(self.send_transaction(&tx)?)
}

pub fn send_versioned_tx<T: Signers>(&self, ixs: &[Instruction], signers: &T, address_lookup_tables: &[AddressLookupTableAccount]) -> ClientResult<Signature> {
let tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&self.payer_pubkey(),
&ixs,
&address_lookup_tables,
self.latest_blockhash()?,
).expect("error compiling to v0 message")),
signers,
).expect("error creating new versioned transaction");
Ok(self.send_transaction(&tx)?)
}

pub fn send_with_config<T: Signers>(
&self,
Expand All @@ -121,6 +134,25 @@ impl Client {
let tx = self.transaction(ixs, signers)?;
Ok(self.client.send_transaction_with_config(&tx, config)?)
}

pub fn send_versioned_tx_with_config<T: Signers>(
&self,
ixs: &[Instruction],
signers: &T,
address_lookup_tables: &[AddressLookupTableAccount],
config: RpcSendTransactionConfig,
) -> ClientResult<Signature> {
let tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&self.payer_pubkey(),
&ixs,
&address_lookup_tables,
self.latest_blockhash()?,
).expect("error compiling to v0 message")),
signers,
).expect("error creating new versioned transaction");
Ok(self.client.send_transaction_with_config(&tx, config)?)
}

pub fn send_and_confirm<T: Signers>(
&self,
Expand All @@ -131,6 +163,24 @@ impl Client {
Ok(self.send_and_confirm_transaction(&tx)?)
}

pub fn send_and_confirm_versioned_tx<T: Signers>(
&self,
ixs: &[Instruction],
signers: &T,
address_lookup_tables: &[AddressLookupTableAccount]
) -> ClientResult<Signature> {
let tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&self.payer_pubkey(),
&ixs,
&address_lookup_tables,
self.latest_blockhash()?,
).expect("error compiling to v0 message")),
signers,
).expect("error creating new versioned transaction");
Ok(self.send_and_confirm_transaction(&tx)?)
}

pub fn simulate_transaction<T: Signers>(
&self,
ixs: &[Instruction],
Expand All @@ -144,6 +194,21 @@ impl Client {
Ok(result.value)
}
}

pub fn simulate_versioned_transaction<T: Signers>(
&self,
ixs: &[Instruction],
signers: &T,
address_lookup_tables: &[AddressLookupTableAccount]
) -> ClientResult<RpcSimulateTransactionResult> {
let tx = self.versioned_transaction(ixs, signers, address_lookup_tables)?;
let result = self.client.simulate_transaction(&tx)?;
if result.value.err.is_some() {
Err(ClientError::DeserializationError)
} else {
Ok(result.value)
}
}

fn transaction<T: Signers>(
&self,
Expand All @@ -154,6 +219,26 @@ impl Client {
tx.sign(signers, self.latest_blockhash()?);
Ok(tx)
}

fn versioned_transaction<T: Signers>(
&self,
ixs: &[Instruction],
signers: &T,
address_lookup_tables: &[AddressLookupTableAccount]
) -> ClientResult<VersionedTransaction> {
// let mut tx = Transaction::new_with_payer(ixs, Some(&self.payer_pubkey()));
// tx.sign(signers, self.latest_blockhash()?);
let tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&self.payer_pubkey(),
&ixs,
&address_lookup_tables,
self.latest_blockhash()?,
).expect("error compiling to v0 message")),
signers,
).expect("error creating new versioned transaction");
Ok(tx)
}
}

impl Debug for Client {
Expand Down
90 changes: 86 additions & 4 deletions plugin/src/builders/pool_rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::sync::Arc;
use clockwork_client::network::state::{Pool, Registry, Snapshot, SnapshotFrame, Worker};
use log::info;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{signature::Keypair, signer::Signer, transaction::Transaction};
use solana_program::message::{VersionedMessage, v0};
use solana_sdk::{signature::Keypair, signer::Signer, transaction::{VersionedTransaction}};

use crate::pool_position::PoolPosition;

Expand All @@ -15,7 +16,7 @@ pub async fn build_pool_rotation_tx<'a>(
snapshot: Snapshot,
snapshot_frame: SnapshotFrame,
worker_id: u64,
) -> Option<Transaction> {
) -> Option<VersionedTransaction> {
info!("nonce: {:?} total_stake: {:?} current_position: {:?} stake_offset: {:?} stake_amount: {:?}",
registry.nonce.checked_rem(snapshot.total_stake),
snapshot.total_stake,
Expand Down Expand Up @@ -70,8 +71,89 @@ pub async fn build_pool_rotation_tx<'a>(
Worker::pubkey(worker_id),
);

// let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey()));
// tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap());

// Build and sign tx.
let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey()));
tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap());
let blockhash = client.get_latest_blockhash().await.unwrap();

let tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&keypair.pubkey(),
&[ix.clone()],
&[],
blockhash,
).expect("error compiling to v0 message")),
&[keypair],
).expect("error creating new versioned transaction");
return Some(tx);
}

// pub async fn build_pool_rotation_tx<'a>(
// client: Arc<RpcClient>,
// keypair: &Keypair,
// pool_position: PoolPosition,
// registry: Registry,
// snapshot: Snapshot,
// snapshot_frame: SnapshotFrame,
// worker_id: u64,
// ) -> Option<Transaction> {
// info!("nonce: {:?} total_stake: {:?} current_position: {:?} stake_offset: {:?} stake_amount: {:?}",
// registry.nonce.checked_rem(snapshot.total_stake),
// snapshot.total_stake,
// pool_position.current_position,
// snapshot_frame.stake_offset,
// snapshot_frame.stake_amount,
// );

// // Exit early if the rotator is not intialized
// if registry.nonce == 0 {
// return None;
// }

// // Exit early the snapshot has no stake
// if snapshot.total_stake == 0 {
// return None;
// }

// // Exit early if the worker is already in the pool.
// if pool_position.current_position.is_some() {
// return None;
// }

// // Exit early if the snapshot frame is none or the worker has no delegated stake.
// if snapshot_frame.stake_amount.eq(&0) {
// return None;
// }

// // Check if the rotation window is open for this worker.
// let is_rotation_window_open = match registry.nonce.checked_rem(snapshot.total_stake) {
// None => false,
// Some(sample) => {
// sample >= snapshot_frame.stake_offset
// && sample
// < snapshot_frame
// .stake_offset
// .checked_add(snapshot_frame.stake_amount)
// .unwrap()
// }
// };
// if !is_rotation_window_open {
// return None;
// }

// // Build rotation instruction to rotate the worker into pool 0.
// let snapshot_pubkey = Snapshot::pubkey(snapshot.id);
// let ix = clockwork_client::network::instruction::pool_rotate(
// Pool::pubkey(0),
// keypair.pubkey(),
// snapshot_pubkey,
// SnapshotFrame::pubkey(snapshot_pubkey, worker_id),
// Worker::pubkey(worker_id),
// );

// // Build and sign tx.
// let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey()));
// tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap());
// return Some(tx);
// }
35 changes: 26 additions & 9 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use solana_program::pubkey::Pubkey;
use solana_sdk::{
commitment_config::CommitmentConfig,
signature::{Keypair, Signature},
transaction::Transaction,
transaction::{VersionedTransaction},
};
use tokio::{runtime::Runtime, sync::RwLock};

Expand Down Expand Up @@ -378,7 +378,7 @@ impl TxExecutor {
observed_slot: u64,
due_slot: u64,
thread_pubkey: Pubkey,
) -> Option<(Pubkey, Transaction)> {
) -> Option<(Pubkey, VersionedTransaction)> {
let thread = match client.clone().get::<VersionedThread>(&thread_pubkey).await {
Err(_err) => {
self.increment_simulation_failure(thread_pubkey).await;
Expand All @@ -388,8 +388,9 @@ impl TxExecutor {
};

let lookup_tables_key = LookupTables::pubkey(thread.authority(), thread.pubkey());

let address_lookup_tables = match client.clone().get_lookup_tables(&lookup_tables_key).await {

let address_lookup_tables = match client.clone().get_lookup_tables(&lookup_tables_key).await
{
Err(_err) => {
return None;
}
Expand All @@ -403,7 +404,7 @@ impl TxExecutor {
thread,
thread_pubkey,
self.config.worker_id,
address_lookup_tables
address_lookup_tables,
)
.await
{
Expand Down Expand Up @@ -439,7 +440,7 @@ impl TxExecutor {
self: Arc<Self>,
slot: u64,
thread_pubkey: Pubkey,
tx: &Transaction,
tx: &VersionedTransaction,
) -> PluginResult<()> {
let r_transaction_history = self.transaction_history.read().await;
if let Some(metadata) = r_transaction_history.get(&thread_pubkey) {
Expand All @@ -451,7 +452,10 @@ impl TxExecutor {
Ok(())
}

async fn simulate_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
async fn simulate_tx(
self: Arc<Self>,
tx: &VersionedTransaction,
) -> PluginResult<VersionedTransaction> {
TPU_CLIENT
.get()
.await
Expand Down Expand Up @@ -480,14 +484,27 @@ impl TxExecutor {
})?
}

async fn submit_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
if !TPU_CLIENT.get().await.send_transaction(tx).await {
async fn submit_tx(
self: Arc<Self>,
tx: &VersionedTransaction,
) -> PluginResult<VersionedTransaction> {
let serialized_tx = serialize(&tx).unwrap();

if !TPU_CLIENT.get().await.send_wire_transaction(serialized_tx).await {
return Err(GeyserPluginError::Custom(
"Failed to send transaction".into(),
));
}
Ok(tx.clone())
}
// async fn submit_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
// if !TPU_CLIENT.get().await.send_transaction(tx).await {
// return Err(GeyserPluginError::Custom(
// "Failed to send transaction".into(),
// ));
// }
// Ok(tx.clone())
// }
}

impl Debug for TxExecutor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anchor_lang::{
use crate::state::*;


/// Accounts required by the `thread_create` instruction.
/// Accounts required by the `thread_lookup_tables_create` instruction.
#[derive(Accounts)]
#[instruction(address_lookup_tables: Vec<Pubkey>)]
pub struct LookupTablesCreate<'info> {
Expand Down Expand Up @@ -36,7 +36,7 @@ pub struct LookupTablesCreate<'info> {
)]
pub thread: Account<'info, Thread>,

/// The thread to be created.
/// The lookup_tables account to be created.
#[account(
init,
seeds = [
Expand Down

0 comments on commit 1890d56

Please sign in to comment.