Skip to content

Commit

Permalink
update plugin, client
Browse files Browse the repository at this point in the history
  • Loading branch information
thewuhxyz committed May 7, 2023
1 parent 220fb99 commit ef16082
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 15 deletions.
90 changes: 86 additions & 4 deletions plugin/src/builders/pool_rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use anchor_lang::{
use clockwork_network_program::state::{Config, Pool, Registry, Snapshot, SnapshotFrame, Worker};
use log::info;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{signature::Keypair, signer::Signer, transaction::Transaction};
use solana_program::message::{VersionedMessage, v0};
use solana_sdk::{signature::Keypair, signer::Signer, transaction::{VersionedTransaction}};

use crate::pool_position::PoolPosition;

Expand All @@ -19,7 +20,7 @@ pub async fn build_pool_rotation_tx<'a>(
snapshot: Snapshot,
snapshot_frame: SnapshotFrame,
worker_id: u64,
) -> Option<Transaction> {
) -> Option<VersionedTransaction> {
info!("nonce: {:?} total_stake: {:?} current_position: {:?} stake_offset: {:?} stake_amount: {:?}",
registry.nonce.checked_rem(snapshot.total_stake),
snapshot.total_stake,
Expand Down Expand Up @@ -80,8 +81,89 @@ pub async fn build_pool_rotation_tx<'a>(
data: clockwork_network_program::instruction::PoolRotate {}.data(),
};

// let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey()));
// tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap());

// Build and sign tx.
let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey()));
tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap());
let blockhash = client.get_latest_blockhash().await.unwrap();

let tx = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&keypair.pubkey(),
&[ix.clone()],
&[],
blockhash,
).expect("error compiling to v0 message")),
&[keypair],
).expect("error creating new versioned transaction");
return Some(tx);
}

// pub async fn build_pool_rotation_tx<'a>(
// client: Arc<RpcClient>,
// keypair: &Keypair,
// pool_position: PoolPosition,
// registry: Registry,
// snapshot: Snapshot,
// snapshot_frame: SnapshotFrame,
// worker_id: u64,
// ) -> Option<Transaction> {
// info!("nonce: {:?} total_stake: {:?} current_position: {:?} stake_offset: {:?} stake_amount: {:?}",
// registry.nonce.checked_rem(snapshot.total_stake),
// snapshot.total_stake,
// pool_position.current_position,
// snapshot_frame.stake_offset,
// snapshot_frame.stake_amount,
// );

// // Exit early if the rotator is not intialized
// if registry.nonce == 0 {
// return None;
// }

// // Exit early the snapshot has no stake
// if snapshot.total_stake == 0 {
// return None;
// }

// // Exit early if the worker is already in the pool.
// if pool_position.current_position.is_some() {
// return None;
// }

// // Exit early if the snapshot frame is none or the worker has no delegated stake.
// if snapshot_frame.stake_amount.eq(&0) {
// return None;
// }

// // Check if the rotation window is open for this worker.
// let is_rotation_window_open = match registry.nonce.checked_rem(snapshot.total_stake) {
// None => false,
// Some(sample) => {
// sample >= snapshot_frame.stake_offset
// && sample
// < snapshot_frame
// .stake_offset
// .checked_add(snapshot_frame.stake_amount)
// .unwrap()
// }
// };
// if !is_rotation_window_open {
// return None;
// }

// // Build rotation instruction to rotate the worker into pool 0.
// let snapshot_pubkey = Snapshot::pubkey(snapshot.id);
// let ix = clockwork_client::network::instruction::pool_rotate(
// Pool::pubkey(0),
// keypair.pubkey(),
// snapshot_pubkey,
// SnapshotFrame::pubkey(snapshot_pubkey, worker_id),
// Worker::pubkey(worker_id),
// );

// // Build and sign tx.
// let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey()));
// tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap());
// return Some(tx);
// }
35 changes: 26 additions & 9 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use solana_program::pubkey::Pubkey;
use solana_sdk::{
commitment_config::CommitmentConfig,
signature::{Keypair, Signature},
transaction::Transaction,
transaction::{VersionedTransaction},
};
use tokio::{runtime::Runtime, sync::RwLock};

Expand Down Expand Up @@ -423,7 +423,7 @@ impl TxExecutor {
observed_slot: u64,
due_slot: u64,
thread_pubkey: Pubkey,
) -> Option<(Pubkey, Transaction)> {
) -> Option<(Pubkey, VersionedTransaction)> {
let thread = match client.clone().get::<VersionedThread>(&thread_pubkey).await {
Err(_err) => {
self.increment_simulation_failure(thread_pubkey).await;
Expand All @@ -433,8 +433,9 @@ impl TxExecutor {
};

let lookup_tables_key = LookupTables::pubkey(thread.authority(), thread.pubkey());

let address_lookup_tables = match client.clone().get_lookup_tables(&lookup_tables_key).await {

let address_lookup_tables = match client.clone().get_lookup_tables(&lookup_tables_key).await
{
Err(_err) => {
return None;
}
Expand All @@ -448,7 +449,7 @@ impl TxExecutor {
thread,
thread_pubkey,
self.config.worker_id,
address_lookup_tables
address_lookup_tables,
)
.await
{
Expand Down Expand Up @@ -484,7 +485,7 @@ impl TxExecutor {
self: Arc<Self>,
slot: u64,
thread_pubkey: Pubkey,
tx: &Transaction,
tx: &VersionedTransaction,
) -> PluginResult<()> {
let r_transaction_history = self.transaction_history.read().await;
if let Some(metadata) = r_transaction_history.get(&thread_pubkey) {
Expand All @@ -496,7 +497,10 @@ impl TxExecutor {
Ok(())
}

async fn simulate_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
async fn simulate_tx(
self: Arc<Self>,
tx: &VersionedTransaction,
) -> PluginResult<VersionedTransaction> {
TPU_CLIENT
.get()
.await
Expand Down Expand Up @@ -525,14 +529,27 @@ impl TxExecutor {
})?
}

async fn submit_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
if !TPU_CLIENT.get().await.send_transaction(tx).await {
async fn submit_tx(
self: Arc<Self>,
tx: &VersionedTransaction,
) -> PluginResult<VersionedTransaction> {
let serialized_tx = serialize(&tx).unwrap();

if !TPU_CLIENT.get().await.send_wire_transaction(serialized_tx).await {
return Err(GeyserPluginError::Custom(
"Failed to send transaction".into(),
));
}
Ok(tx.clone())
}
// async fn submit_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
// if !TPU_CLIENT.get().await.send_transaction(tx).await {
// return Err(GeyserPluginError::Custom(
// "Failed to send transaction".into(),
// ));
// }
// Ok(tx.clone())
// }
}

impl Debug for TxExecutor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anchor_lang::{
use crate::state::*;


/// Accounts required by the `thread_create` instruction.
/// Accounts required by the `thread_lookup_tables_create` instruction.
#[derive(Accounts)]
#[instruction(address_lookup_tables: Vec<Pubkey>)]
pub struct LookupTablesCreate<'info> {
Expand Down Expand Up @@ -36,7 +36,7 @@ pub struct LookupTablesCreate<'info> {
)]
pub thread: Account<'info, Thread>,

/// The thread to be created.
/// The lookup_tables account to be created.
#[account(
init,
seeds = [
Expand Down

0 comments on commit ef16082

Please sign in to comment.