Skip to content

Commit

Permalink
Refactor queue kickoff out into its own instruction
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Garfield committed Oct 13, 2022
1 parent eaf3339 commit 9da51af
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 131 deletions.
9 changes: 9 additions & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub enum CliCommand {
QueueUpdate {
id: String,
rate_limit: Option<u64>,
schedule: Option<String>,
},

// Registry
Expand Down Expand Up @@ -323,6 +324,14 @@ pub fn app() -> Command<'static> {
.help(
"The maximum number of cranks allowed per slot for this queue",
),
)
.arg(
Arg::new("schedule")
.long("schedule")
.short('s')
.takes_value(true)
.required(false)
.help("The cron schedule of the queue"),
),
),
)
Expand Down
3 changes: 2 additions & 1 deletion cli/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ fn parse_queue_command(matches: &ArgMatches) -> Result<CliCommand, CliError> {
}),
Some(("update", matches)) => Ok(CliCommand::QueueUpdate {
id: parse_string("id", matches)?,
rate_limit: parse_u64("rate_limit", matches).map_or(None, |v| Some(v)),
rate_limit: parse_u64("rate_limit", matches).ok(),
schedule: parse_string("schedule", matches).ok(),
}),
_ => Err(CliError::CommandNotRecognized(
matches.subcommand().unwrap().0.into(),
Expand Down
6 changes: 5 additions & 1 deletion cli/src/processor/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ pub fn process(matches: &ArgMatches) -> Result<(), CliError> {
} => super::queue::create(&client, id, kickoff_instruction, trigger),
CliCommand::QueueDelete { id } => super::queue::delete(&client, id),
CliCommand::QueueGet { id } => super::queue::get(&client, id),
CliCommand::QueueUpdate { id, rate_limit } => super::queue::update(&client, id, rate_limit),
CliCommand::QueueUpdate {
id,
rate_limit,
schedule,
} => super::queue::update(&client, id, rate_limit, schedule),
CliCommand::RegistryGet => super::registry::get(&client),
CliCommand::WebhookRequestNew {
api,
Expand Down
18 changes: 16 additions & 2 deletions cli/src/processor/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,27 @@ pub fn get(client: &Client, id: String) -> Result<(), CliError> {
Ok(())
}

pub fn update(client: &Client, id: String, rate_limit: Option<u64>) -> Result<(), CliError> {
pub fn update(
client: &Client,
id: String,
rate_limit: Option<u64>,
schedule: Option<String>,
) -> Result<(), CliError> {
let queue_pubkey = Queue::pubkey(client.payer_pubkey(), id);

let trigger = if let Some(schedule) = schedule {
Some(Trigger::Cron {
schedule,
skippable: true,
})
} else {
None
};
let settings = QueueSettings {
fee: None,
kickoff_instruction: None,
rate_limit,
trigger: None,
trigger,
};
let ix = clockwork_client::queue::instruction::queue_update(
client.payer_pubkey(),
Expand Down
2 changes: 2 additions & 0 deletions client/src/queue/instruction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod queue_crank;
mod queue_create;
mod queue_delete;
mod queue_kickoff;
mod queue_update;

pub use queue_crank::*;
pub use queue_create::*;
pub use queue_delete::*;
pub use queue_kickoff::*;
pub use queue_update::*;
9 changes: 2 additions & 7 deletions client/src/queue/instruction/queue_crank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ use {
clockwork_network_program::objects::{Fee, Penalty, Pool},
};

pub fn queue_crank(
data_hash: Option<u64>,
queue: Pubkey,
signatory: Pubkey,
worker: Pubkey,
) -> Instruction {
pub fn queue_crank(queue: Pubkey, signatory: Pubkey, worker: Pubkey) -> Instruction {
Instruction {
program_id: clockwork_queue_program::ID,
accounts: vec![
Expand All @@ -25,6 +20,6 @@ pub fn queue_crank(
AccountMeta::new(signatory, true),
AccountMeta::new_readonly(worker, false),
],
data: clockwork_queue_program::instruction::QueueCrank { data_hash }.data(),
data: clockwork_queue_program::instruction::QueueCrank {}.data(),
}
}
24 changes: 24 additions & 0 deletions client/src/queue/instruction/queue_kickoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use anchor_lang::{
solana_program::{
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
},
InstructionData,
};

pub fn queue_kickoff(
data_hash: Option<u64>,
queue: Pubkey,
signatory: Pubkey,
worker: Pubkey,
) -> Instruction {
Instruction {
program_id: clockwork_queue_program::ID,
accounts: vec![
AccountMeta::new(queue, false),
AccountMeta::new(signatory, true),
AccountMeta::new_readonly(worker, false),
],
data: clockwork_queue_program::instruction::QueueKickoff { data_hash }.data(),
}
}
64 changes: 40 additions & 24 deletions plugin/src/builders/queue_crank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn build_crank_tx(
let signatory_pubkey = client.payer_pubkey();

// Pre-simulate crank ixs and pack into tx
let mut ixs: Vec<Instruction> = vec![build_crank_ix(
let mut ixs: Vec<Instruction> = vec![build_kickoff_ix(
client.clone(),
queue,
signatory_pubkey,
Expand Down Expand Up @@ -143,7 +143,7 @@ fn build_crank_tx(
Some(tx)
}

fn build_crank_ix(
fn build_kickoff_ix(
client: Arc<ClockworkClient>,
queue: Queue,
signatory_pubkey: Pubkey,
Expand Down Expand Up @@ -188,11 +188,7 @@ fn build_crank_ix(

// Build the instruction.
let queue_pubkey = Queue::pubkey(queue.authority, queue.id);
let inner_ix = queue
.next_instruction
.clone()
.unwrap_or(queue.kickoff_instruction);
let mut crank_ix = clockwork_client::queue::instruction::queue_crank(
let mut kickoff_ix = clockwork_client::queue::instruction::queue_kickoff(
data_hash,
queue_pubkey,
signatory_pubkey,
Expand All @@ -202,29 +198,49 @@ fn build_crank_ix(
// Inject the trigger account.
match trigger_account_pubkey {
None => {}
Some(pubkey) => crank_ix.accounts.push(AccountMeta {
Some(pubkey) => kickoff_ix.accounts.push(AccountMeta {
pubkey,
is_signer: false,
is_writable: false,
}),
}

// Inject the target program account to the ix.
crank_ix
.accounts
.push(AccountMeta::new_readonly(inner_ix.program_id, false));

// Inject the worker pubkey as the Clockwork "payer" account
for acc in inner_ix.clone().accounts {
let acc_pubkey = if acc.pubkey == clockwork_utils::PAYER_PUBKEY {
signatory_pubkey
} else {
acc.pubkey
};
crank_ix.accounts.push(match acc.is_writable {
true => AccountMeta::new(acc_pubkey, false),
false => AccountMeta::new_readonly(acc_pubkey, false),
})
kickoff_ix
}

fn build_crank_ix(
_client: Arc<ClockworkClient>,
queue: Queue,
signatory_pubkey: Pubkey,
worker_id: u64,
) -> Instruction {
// Build the instruction.
let queue_pubkey = Queue::pubkey(queue.authority, queue.id);
let mut crank_ix = clockwork_client::queue::instruction::queue_crank(
queue_pubkey,
signatory_pubkey,
Worker::pubkey(worker_id),
);

if let Some(next_instruction) = queue.next_instruction {
// Inject the target program account to the ix.
crank_ix.accounts.push(AccountMeta::new_readonly(
next_instruction.program_id,
false,
));

// Inject the worker pubkey as the Clockwork "payer" account
for acc in next_instruction.clone().accounts {
let acc_pubkey = if acc.pubkey == clockwork_utils::PAYER_PUBKEY {
signatory_pubkey
} else {
acc.pubkey
};
crank_ix.accounts.push(match acc.is_writable {
true => AccountMeta::new(acc_pubkey, false),
false => AccountMeta::new_readonly(acc_pubkey, false),
})
}
}

crank_ix
Expand Down
2 changes: 2 additions & 0 deletions programs/queue/src/instructions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod queue_crank;
pub mod queue_create;
pub mod queue_delete;
pub mod queue_kickoff;
pub mod queue_pause;
pub mod queue_resume;
pub mod queue_update;
Expand All @@ -9,6 +10,7 @@ pub mod queue_withdraw;
pub use queue_crank::*;
pub use queue_create::*;
pub use queue_delete::*;
pub use queue_kickoff::*;
pub use queue_pause::*;
pub use queue_resume::*;
pub use queue_update::*;
Expand Down
104 changes: 25 additions & 79 deletions programs/queue/src/instructions/queue_crank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ use {
/// The ID of the pool workers must be a member of to collect fees.
const POOL_ID: u64 = 0;

/// Number of lamports to reimburse the worker with after they've submitted a transaction's worth of cranks.
const TRANSACTION_BASE_FEE_REIMBURSEMENT: u64 = 5_000;

/// Accounts required by the `queue_crank` instruction.
#[derive(Accounts)]
#[instruction(data_hash: Option<u64>)]
pub struct QueueCrank<'info> {
/// The worker's fee account.
#[account(
Expand Down Expand Up @@ -53,7 +49,8 @@ pub struct QueueCrank<'info> {
queue.id.as_bytes(),
],
bump,
constraint = !queue.paused @ ClockworkError::QueuePaused
constraint = !queue.paused @ ClockworkError::QueuePaused,
constraint = queue.next_instruction.is_some()
)]
pub queue: Box<Account<'info, Queue>>,

Expand All @@ -69,89 +66,38 @@ pub struct QueueCrank<'info> {
pub worker: Account<'info, Worker>,
}

pub fn handler(ctx: Context<QueueCrank>, data_hash: Option<u64>) -> Result<()> {
pub fn handler(ctx: Context<QueueCrank>) -> Result<()> {
// Get accounts
let fee = &mut ctx.accounts.fee;
let penalty = &ctx.accounts.penalty;
let penalty = &mut ctx.accounts.penalty;
let pool = &ctx.accounts.pool;
let queue = &mut ctx.accounts.queue;
let signatory = &ctx.accounts.signatory;
let signatory = &mut ctx.accounts.signatory;
let worker = &ctx.accounts.worker;

if queue.next_instruction.is_none() {
// If this queue does not have a next_instruction, verify the queue's trigger condition is active.
queue.verify_trigger(data_hash, ctx.remaining_accounts)?;
} else {
// If the rate limit has been met, exit early.
match queue.exec_context {
None => return Err(ClockworkError::InvalidQueueState.into()),
Some(exec_context) => {
if exec_context.last_crank_at == Clock::get().unwrap().slot
&& exec_context.cranks_since_slot >= queue.rate_limit
{
return Err(ClockworkError::RateLimitExeceeded.into());
}
}
}

// Crank the queue
let bump = ctx.bumps.get("queue").unwrap();
queue.crank(ctx.remaining_accounts, *bump, signatory)?;

// Debit the crank fee from the queue account.
**queue.to_account_info().try_borrow_mut_lamports()? = queue
.to_account_info()
.lamports()
.checked_sub(queue.fee)
.unwrap();

// If the worker is in the pool, pay fee to the worker's fee account.
// Otherwise, pay fee to the worker's penalty account.
if pool.clone().into_inner().workers.contains(&worker.key()) {
**fee.to_account_info().try_borrow_mut_lamports()? = fee
.to_account_info()
.lamports()
.checked_add(queue.fee)
.unwrap();
} else {
**penalty.to_account_info().try_borrow_mut_lamports()? = penalty
.to_account_info()
.lamports()
.checked_add(queue.fee)
.unwrap();
}

// If the queue has no more work or the number of cranks since the last payout has reached the rate limit,
// reimburse the worker for the transaction base fee.
match queue.exec_context {
None => {
return Err(ClockworkError::InvalidQueueState.into());
}
Some(exec_context) => {
if queue.next_instruction.is_none()
|| exec_context.cranks_since_reimbursement >= queue.rate_limit
{
// Pay reimbursment for base transaction fee
**queue.to_account_info().try_borrow_mut_lamports()? = queue
.to_account_info()
.lamports()
.checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT)
.unwrap();
**signatory.to_account_info().try_borrow_mut_lamports()? = signatory
.to_account_info()
.lamports()
.checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT)
.unwrap();

// Update the exec context to mark that a reimbursement happened this slot.
queue.exec_context = Some(ExecContext {
cranks_since_reimbursement: 0,
..exec_context
});
}
// If the rate limit has been met, exit early.
match queue.exec_context {
None => return Err(ClockworkError::InvalidQueueState.into()),
Some(exec_context) => {
if exec_context.last_crank_at == Clock::get().unwrap().slot
&& exec_context.cranks_since_slot >= queue.rate_limit
{
return Err(ClockworkError::RateLimitExeceeded.into());
}
}
}

// Crank the queue
let bump = ctx.bumps.get("queue").unwrap();
queue.crank(
ctx.remaining_accounts,
*bump,
fee,
penalty,
pool,
signatory,
worker,
)?;

Ok(())
}
Loading

0 comments on commit 9da51af

Please sign in to comment.