Skip to content

Commit

Permalink
Rename "crank" to "exec" (#83)
Browse files Browse the repository at this point in the history
* Rename 'crank' to 'exec'

* Rename crank to exec

Co-authored-by: Nick Garfield <[email protected]>
  • Loading branch information
nickgarfield and Nick Garfield authored Oct 26, 2022
1 parent 8a5a8e7 commit 3b85069
Show file tree
Hide file tree
Showing 32 changed files with 189 additions and 188 deletions.
2 changes: 1 addition & 1 deletion cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ pub fn app() -> Command<'static> {
.takes_value(true)
.required(false)
.help(
"The maximum number of cranks allowed per slot for this thread",
"The maximum number of instructions this thread can execute per slot",
),
)
.arg(
Expand Down
4 changes: 2 additions & 2 deletions client/src/thread/instruction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
mod thread_crank;
mod thread_create;
mod thread_delete;
mod thread_exec;
mod thread_kickoff;
mod thread_pause;
mod thread_resume;
mod thread_stop;
mod thread_update;

pub use thread_crank::*;
pub use thread_create::*;
pub use thread_delete::*;
pub use thread_exec::*;
pub use thread_kickoff::*;
pub use thread_pause::*;
pub use thread_resume::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
clockwork_network_program::objects::{Fee, Penalty, Pool},
};

pub fn thread_crank(signatory: Pubkey, thread: Pubkey, worker: Pubkey) -> Instruction {
pub fn thread_exec(signatory: Pubkey, thread: Pubkey, worker: Pubkey) -> Instruction {
Instruction {
program_id: clockwork_thread_program::ID,
accounts: vec![
Expand All @@ -20,6 +20,6 @@ pub fn thread_crank(signatory: Pubkey, thread: Pubkey, worker: Pubkey) -> Instru
AccountMeta::new(thread, false),
AccountMeta::new_readonly(worker, false),
],
data: clockwork_thread_program::instruction::ThreadCrank {}.data(),
data: clockwork_thread_program::instruction::ThreadExec {}.data(),
}
}
4 changes: 2 additions & 2 deletions plugin/src/builders/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod pool_rotation;
mod thread_crank;
mod thread_exec;

pub use pool_rotation::*;
pub use thread_crank::*;
pub use thread_exec::*;
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,44 @@ use {

static TRANSACTION_SIZE_LIMIT: usize = 1_232; // Max byte size of a serialized transaction

pub async fn build_crank_txs(
pub async fn build_thread_exec_txs(
client: Arc<ClockworkClient>,
crankable_threads: DashSet<Pubkey>,
executable_threads: DashSet<Pubkey>,
worker_id: u64,
) -> Vec<Transaction> {
// Build the set of crank transactions
// Build the set of exec transactions
// TODO Use rayon to parallelize this operation
let txs = crankable_threads
let txs = executable_threads
.iter()
.filter_map(|thread_pubkey_ref| {
build_crank_tx(client.clone(), *thread_pubkey_ref.key(), worker_id)
build_thread_exec_tx(client.clone(), *thread_pubkey_ref.key(), worker_id)
})
.collect::<Vec<Transaction>>();
txs
}

fn build_crank_tx(
fn build_thread_exec_tx(
client: Arc<ClockworkClient>,
thread_pubkey: Pubkey,
worker_id: u64,
) -> Option<Transaction> {
// Build the first crank ix
// Build the first ix
let thread = match client.get::<Thread>(&thread_pubkey) {
Err(_err) => return None,
Ok(thread) => thread,
};
let blockhash = client.get_latest_blockhash().unwrap();
let signatory_pubkey = client.payer_pubkey();

// Pre-simulate crank ixs and pack into tx
// Pre-simulate exec ixs and pack into tx
let first_instruction = if thread.next_instruction.is_some() {
build_crank_ix(client.clone(), thread, signatory_pubkey, worker_id)
build_exec_ix(client.clone(), thread, signatory_pubkey, worker_id)
} else {
build_kickoff_ix(client.clone(), thread, signatory_pubkey, worker_id)
};
let mut ixs: Vec<Instruction> = vec![first_instruction];

// Pre-simulate crank ixs and pack as many as possible into tx.
// Pre-simulate exec ixs and pack as many as possible into tx.
let mut tx: Transaction = Transaction::new_with_payer(&vec![], Some(&signatory_pubkey));
let now = std::time::Instant::now();
loop {
Expand All @@ -87,12 +87,12 @@ fn build_crank_tx(
..RpcSimulateTransactionConfig::default()
},
) {
// If there was an error, stop packing and continue with the cranks up until this one.
// If there was an error, stop packing and continue with the ixs up until this one.
Err(_err) => {
break;
}

// If the simulation was successful, pack the crank ix into the tx.
// If the simulation was successful, pack the ix into the tx.
Ok(response) => {
// If there was an error, then stop packing.
if response.value.err.is_some() {
Expand All @@ -107,13 +107,13 @@ fn build_crank_tx(
// Save the simulated tx. It is okay to submit.
tx = sim_tx;

// Parse the resulting thread account for the next crank ix to simulate.
// Parse the resulting thread account for the next ix to simulate.
if let Some(ui_accounts) = response.value.accounts {
if let Some(Some(ui_account)) = ui_accounts.get(0) {
if let Some(account) = ui_account.decode::<Account>() {
if let Ok(sim_thread) = Thread::try_from(account.data) {
if sim_thread.next_instruction.is_some() {
ixs.push(build_crank_ix(
ixs.push(build_exec_ix(
client.clone(),
sim_thread,
signatory_pubkey,
Expand All @@ -131,7 +131,7 @@ fn build_crank_tx(
}

info!(
"Time spent packing {} cranks: {:#?}",
"Time spent packing {} instructions: {:#?}",
tx.message.instructions.len(),
now.elapsed()
);
Expand Down Expand Up @@ -214,23 +214,23 @@ fn build_kickoff_ix(
kickoff_ix
}

fn build_crank_ix(
fn build_exec_ix(
_client: Arc<ClockworkClient>,
thread: Thread,
signatory_pubkey: Pubkey,
worker_id: u64,
) -> Instruction {
// Build the instruction.
let thread_pubkey = Thread::pubkey(thread.authority, thread.id);
let mut crank_ix = clockwork_client::thread::instruction::thread_crank(
let mut exec_ix = clockwork_client::thread::instruction::thread_exec(
signatory_pubkey,
thread_pubkey,
Worker::pubkey(worker_id),
);

if let Some(next_instruction) = thread.next_instruction {
// Inject the target program account to the ix.
crank_ix.accounts.push(AccountMeta::new_readonly(
exec_ix.accounts.push(AccountMeta::new_readonly(
next_instruction.program_id,
false,
));
Expand All @@ -242,12 +242,12 @@ fn build_crank_ix(
} else {
acc.pubkey
};
crank_ix.accounts.push(match acc.is_writable {
exec_ix.accounts.push(match acc.is_writable {
true => AccountMeta::new(acc_pubkey, false),
false => AccountMeta::new_readonly(acc_pubkey, false),
})
}
}

crank_ix
exec_ix
}
12 changes: 6 additions & 6 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ impl TxExecutor {
// Rotate worker pools
this.clone().execute_pool_rotate_txs(slot).await.ok();

// Thread crank threads
this.clone().execute_thread_crank_txs(slot).await.ok();
// Thread exec txs
this.clone().execute_thread_exec_txs(slot).await.ok();

// Purge message history that is beyond the dedupe period
this.message_history
Expand Down Expand Up @@ -82,7 +82,7 @@ impl TxExecutor {
Ok(())
}

async fn execute_thread_crank_txs(self: Arc<Self>, slot: u64) -> PluginResult<()> {
async fn execute_thread_exec_txs(self: Arc<Self>, slot: u64) -> PluginResult<()> {
// Exit early if we are not in the worker pool.
let r_pool_positions = self.observers.network.pool_positions.read().await;
let thread_pool = r_pool_positions.thread_pool.clone();
Expand All @@ -93,10 +93,10 @@ impl TxExecutor {
));
}

// Execute thread_crank txs.
crate::builders::build_crank_txs(
// Execute thread_exec txs.
crate::builders::build_thread_exec_txs(
self.client.clone(),
self.observers.thread.crankable_threads.clone(),
self.observers.thread.executable_threads.clone(),
self.config.worker_id,
)
.await
Expand Down
22 changes: 11 additions & 11 deletions plugin/src/observers/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub struct ThreadObserver {
// Plugin config values.
pub config: PluginConfig,

// The set of the threads that are currently crankable (i.e. have a next_instruction)
pub crankable_threads: DashSet<Pubkey>,
// The set of the threads that are currently executable (i.e. have a next_instruction)
pub executable_threads: DashSet<Pubkey>,

// Map from unix timestamps to the list of threads scheduled for that moment.
pub cron_threads: DashMap<i64, DashSet<Pubkey>>,
Expand All @@ -37,7 +37,7 @@ impl ThreadObserver {
Self {
clocks: DashMap::new(),
config: config.clone(),
crankable_threads: DashSet::new(),
executable_threads: DashSet::new(),
cron_threads: DashMap::new(),
listener_threads: DashMap::new(),
runtime,
Expand All @@ -60,7 +60,7 @@ impl ThreadObserver {
let is_due = clock.unix_timestamp >= *target_timestamp;
if is_due {
for thread_pubkey_ref in thread_pubkeys.iter() {
this.crankable_threads.insert(*thread_pubkey_ref.key());
this.executable_threads.insert(*thread_pubkey_ref.key());
}
}
!is_due
Expand All @@ -85,11 +85,11 @@ impl ThreadObserver {
_account_replica: ReplicaAccountInfo,
) -> PluginResult<()> {
self.spawn(|this| async move {
// Move all threads listening to this account into the crankable set.
// Move all threads listening to this account into the executable set.
this.listener_threads.retain(|pubkey, thread_pubkeys| {
if account_pubkey.eq(pubkey) {
for thread_pubkey in thread_pubkeys.iter() {
this.crankable_threads.insert(*thread_pubkey.key());
this.executable_threads.insert(*thread_pubkey.key());
}
false
} else {
Expand All @@ -110,17 +110,17 @@ impl ThreadObserver {
thread_pubkey: Pubkey,
) -> PluginResult<()> {
self.spawn(|this| async move {
// Remove thread from crankable set
this.crankable_threads.remove(&thread_pubkey);
// Remove thread from executable set
this.executable_threads.remove(&thread_pubkey);

// If the thread is paused, just return without indexing
if thread.paused {
return Ok(());
}

if thread.next_instruction.is_some() {
// If the thread has a next instruction, index it as crankable.
this.crankable_threads.insert(thread_pubkey);
// If the thread has a next instruction, index it as executable.
this.executable_threads.insert(thread_pubkey);
} else {
// Otherwise, index the thread according to its trigger type.
match thread.trigger {
Expand Down Expand Up @@ -176,7 +176,7 @@ impl ThreadObserver {
}
}
Trigger::Immediate => {
this.crankable_threads.insert(thread_pubkey);
this.executable_threads.insert(thread_pubkey);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ impl GeyserPlugin for ClockworkPlugin {
SlotStatus::Processed => match &self.executors {
Some(executors) => {
info!(
"slot: {} crankable_threads: {} cron_threads: {}",
"slot: {} executable_threads: {} cron_threads: {}",
slot,
self.observers.thread.crankable_threads.len(),
self.observers.thread.executable_threads.len(),
self.observers.thread.cron_threads.len()
);
self.observers.thread.clone().observe_slot(slot)?;
Expand Down
8 changes: 4 additions & 4 deletions programs/network/src/instructions/delegation_stake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
associated_token::get_associated_token_address,
token::{transfer, Token, TokenAccount, Transfer},
},
clockwork_utils::{anchor_sighash, AccountMetaData, CrankResponse, InstructionData},
clockwork_utils::{anchor_sighash, AccountMetaData, ExecResponse, InstructionData},
};

#[derive(Accounts)]
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct DelegationStake<'info> {
pub worker_stake: Account<'info, TokenAccount>,
}

pub fn handler(ctx: Context<DelegationStake>) -> Result<CrankResponse> {
pub fn handler(ctx: Context<DelegationStake>) -> Result<ExecResponse> {
// Get accounts.
let config = &ctx.accounts.config;
let delegation = &mut ctx.accounts.delegation;
Expand Down Expand Up @@ -154,8 +154,8 @@ pub fn handler(ctx: Context<DelegationStake>) -> Result<CrankResponse> {
})
};

Ok(CrankResponse {
Ok(ExecResponse {
next_instruction,
..CrankResponse::default()
..ExecResponse::default()
})
}
8 changes: 4 additions & 4 deletions programs/network/src/instructions/fee_distribute.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::objects::*,
anchor_lang::prelude::*,
clockwork_utils::{anchor_sighash, AccountMetaData, CrankResponse, InstructionData},
clockwork_utils::{anchor_sighash, AccountMetaData, ExecResponse, InstructionData},
};

#[derive(Accounts)]
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct FeeDistribute<'info> {
pub worker: Account<'info, Worker>,
}

pub fn handler(ctx: Context<FeeDistribute>) -> Result<CrankResponse> {
pub fn handler(ctx: Context<FeeDistribute>) -> Result<ExecResponse> {
// Get accounts
let config = &ctx.accounts.config;
let delegation = &mut ctx.accounts.delegation;
Expand Down Expand Up @@ -169,8 +169,8 @@ pub fn handler(ctx: Context<FeeDistribute>) -> Result<CrankResponse> {
})
};

Ok(CrankResponse {
Ok(ExecResponse {
next_instruction,
..CrankResponse::default()
..ExecResponse::default()
})
}
8 changes: 4 additions & 4 deletions programs/network/src/instructions/registry_epoch_cutover.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use clockwork_utils::{anchor_sighash, AccountMetaData, CrankResponse, InstructionData};
use clockwork_utils::{anchor_sighash, AccountMetaData, ExecResponse, InstructionData};

use {crate::objects::*, anchor_lang::prelude::*};

Expand All @@ -18,7 +18,7 @@ pub struct RegistryEpochCutover<'info> {
pub registry: Account<'info, Registry>,
}

pub fn handler(ctx: Context<RegistryEpochCutover>) -> Result<CrankResponse> {
pub fn handler(ctx: Context<RegistryEpochCutover>) -> Result<ExecResponse> {
// Get accounts.
let config = &ctx.accounts.config;
let thread = &ctx.accounts.thread;
Expand All @@ -44,8 +44,8 @@ pub fn handler(ctx: Context<RegistryEpochCutover>) -> Result<CrankResponse> {
data: anchor_sighash("snapshot_delete").to_vec(),
});

Ok(CrankResponse {
Ok(ExecResponse {
next_instruction,
..CrankResponse::default()
..ExecResponse::default()
})
}
Loading

0 comments on commit 3b85069

Please sign in to comment.