From 9da51af916d1a8a27768e707b4dea501a9ab70cd Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Thu, 13 Oct 2022 09:57:01 -0500 Subject: [PATCH] Refactor queue kickoff out into its own instruction --- cli/src/cli.rs | 9 ++ cli/src/parser.rs | 3 +- cli/src/processor/process.rs | 6 +- cli/src/processor/queue.rs | 18 ++- client/src/queue/instruction/mod.rs | 2 + client/src/queue/instruction/queue_crank.rs | 9 +- client/src/queue/instruction/queue_kickoff.rs | 24 ++++ plugin/src/builders/queue_crank.rs | 64 +++++++---- programs/queue/src/instructions/mod.rs | 2 + .../queue/src/instructions/queue_crank.rs | 104 +++++------------- .../queue/src/instructions/queue_kickoff.rs | 45 ++++++++ programs/queue/src/lib.rs | 9 +- programs/queue/src/objects/queue.rs | 99 ++++++++++++++--- utils/src/lib.rs | 2 + 14 files changed, 265 insertions(+), 131 deletions(-) create mode 100644 client/src/queue/instruction/queue_kickoff.rs create mode 100644 programs/queue/src/instructions/queue_kickoff.rs diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 7b8823315..ada6ace77 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -64,6 +64,7 @@ pub enum CliCommand { QueueUpdate { id: String, rate_limit: Option, + schedule: Option, }, // Registry @@ -323,6 +324,14 @@ pub fn app() -> Command<'static> { .help( "The maximum number of cranks allowed per slot for this queue", ), + ) + .arg( + Arg::new("schedule") + .long("schedule") + .short('s') + .takes_value(true) + .required(false) + .help("The cron schedule of the queue"), ), ), ) diff --git a/cli/src/parser.rs b/cli/src/parser.rs index b21ab12b3..430e1db3b 100644 --- a/cli/src/parser.rs +++ b/cli/src/parser.rs @@ -147,7 +147,8 @@ fn parse_queue_command(matches: &ArgMatches) -> Result { }), Some(("update", matches)) => Ok(CliCommand::QueueUpdate { id: parse_string("id", matches)?, - rate_limit: parse_u64("rate_limit", matches).map_or(None, |v| Some(v)), + rate_limit: parse_u64("rate_limit", matches).ok(), + schedule: parse_string("schedule", matches).ok(), }), _ => Err(CliError::CommandNotRecognized( matches.subcommand().unwrap().0.into(), diff --git a/cli/src/processor/process.rs b/cli/src/processor/process.rs index bf168e76e..767b79ea4 100644 --- a/cli/src/processor/process.rs +++ b/cli/src/processor/process.rs @@ -55,7 +55,11 @@ pub fn process(matches: &ArgMatches) -> Result<(), CliError> { } => super::queue::create(&client, id, kickoff_instruction, trigger), CliCommand::QueueDelete { id } => super::queue::delete(&client, id), CliCommand::QueueGet { id } => super::queue::get(&client, id), - CliCommand::QueueUpdate { id, rate_limit } => super::queue::update(&client, id, rate_limit), + CliCommand::QueueUpdate { + id, + rate_limit, + schedule, + } => super::queue::update(&client, id, rate_limit, schedule), CliCommand::RegistryGet => super::registry::get(&client), CliCommand::WebhookRequestNew { api, diff --git a/cli/src/processor/queue.rs b/cli/src/processor/queue.rs index 16d157ff1..51c30a189 100644 --- a/cli/src/processor/queue.rs +++ b/cli/src/processor/queue.rs @@ -47,13 +47,27 @@ pub fn get(client: &Client, id: String) -> Result<(), CliError> { Ok(()) } -pub fn update(client: &Client, id: String, rate_limit: Option) -> Result<(), CliError> { +pub fn update( + client: &Client, + id: String, + rate_limit: Option, + schedule: Option, +) -> Result<(), CliError> { let queue_pubkey = Queue::pubkey(client.payer_pubkey(), id); + + let trigger = if let Some(schedule) = schedule { + Some(Trigger::Cron { + schedule, + skippable: true, + }) + } else { + None + }; let settings = QueueSettings { fee: None, kickoff_instruction: None, rate_limit, - trigger: None, + trigger, }; let ix = clockwork_client::queue::instruction::queue_update( client.payer_pubkey(), diff --git a/client/src/queue/instruction/mod.rs b/client/src/queue/instruction/mod.rs index 5bdefe2ff..589859a25 100644 --- a/client/src/queue/instruction/mod.rs +++ b/client/src/queue/instruction/mod.rs @@ -1,9 +1,11 @@ mod queue_crank; mod queue_create; mod queue_delete; +mod queue_kickoff; mod queue_update; pub use queue_crank::*; pub use queue_create::*; pub use queue_delete::*; +pub use queue_kickoff::*; pub use queue_update::*; diff --git a/client/src/queue/instruction/queue_crank.rs b/client/src/queue/instruction/queue_crank.rs index b7a150ee6..cd9aaab41 100644 --- a/client/src/queue/instruction/queue_crank.rs +++ b/client/src/queue/instruction/queue_crank.rs @@ -9,12 +9,7 @@ use { clockwork_network_program::objects::{Fee, Penalty, Pool}, }; -pub fn queue_crank( - data_hash: Option, - queue: Pubkey, - signatory: Pubkey, - worker: Pubkey, -) -> Instruction { +pub fn queue_crank(queue: Pubkey, signatory: Pubkey, worker: Pubkey) -> Instruction { Instruction { program_id: clockwork_queue_program::ID, accounts: vec![ @@ -25,6 +20,6 @@ pub fn queue_crank( AccountMeta::new(signatory, true), AccountMeta::new_readonly(worker, false), ], - data: clockwork_queue_program::instruction::QueueCrank { data_hash }.data(), + data: clockwork_queue_program::instruction::QueueCrank {}.data(), } } diff --git a/client/src/queue/instruction/queue_kickoff.rs b/client/src/queue/instruction/queue_kickoff.rs new file mode 100644 index 000000000..a20463499 --- /dev/null +++ b/client/src/queue/instruction/queue_kickoff.rs @@ -0,0 +1,24 @@ +use anchor_lang::{ + solana_program::{ + instruction::{AccountMeta, Instruction}, + pubkey::Pubkey, + }, + InstructionData, +}; + +pub fn queue_kickoff( + data_hash: Option, + queue: Pubkey, + signatory: Pubkey, + worker: Pubkey, +) -> Instruction { + Instruction { + program_id: clockwork_queue_program::ID, + accounts: vec![ + AccountMeta::new(queue, false), + AccountMeta::new(signatory, true), + AccountMeta::new_readonly(worker, false), + ], + data: clockwork_queue_program::instruction::QueueKickoff { data_hash }.data(), + } +} diff --git a/plugin/src/builders/queue_crank.rs b/plugin/src/builders/queue_crank.rs index 20acab82b..a06529ab7 100644 --- a/plugin/src/builders/queue_crank.rs +++ b/plugin/src/builders/queue_crank.rs @@ -54,7 +54,7 @@ fn build_crank_tx( let signatory_pubkey = client.payer_pubkey(); // Pre-simulate crank ixs and pack into tx - let mut ixs: Vec = vec![build_crank_ix( + let mut ixs: Vec = vec![build_kickoff_ix( client.clone(), queue, signatory_pubkey, @@ -143,7 +143,7 @@ fn build_crank_tx( Some(tx) } -fn build_crank_ix( +fn build_kickoff_ix( client: Arc, queue: Queue, signatory_pubkey: Pubkey, @@ -188,11 +188,7 @@ fn build_crank_ix( // Build the instruction. let queue_pubkey = Queue::pubkey(queue.authority, queue.id); - let inner_ix = queue - .next_instruction - .clone() - .unwrap_or(queue.kickoff_instruction); - let mut crank_ix = clockwork_client::queue::instruction::queue_crank( + let mut kickoff_ix = clockwork_client::queue::instruction::queue_kickoff( data_hash, queue_pubkey, signatory_pubkey, @@ -202,29 +198,49 @@ fn build_crank_ix( // Inject the trigger account. match trigger_account_pubkey { None => {} - Some(pubkey) => crank_ix.accounts.push(AccountMeta { + Some(pubkey) => kickoff_ix.accounts.push(AccountMeta { pubkey, is_signer: false, is_writable: false, }), } - // Inject the target program account to the ix. - crank_ix - .accounts - .push(AccountMeta::new_readonly(inner_ix.program_id, false)); - - // Inject the worker pubkey as the Clockwork "payer" account - for acc in inner_ix.clone().accounts { - let acc_pubkey = if acc.pubkey == clockwork_utils::PAYER_PUBKEY { - signatory_pubkey - } else { - acc.pubkey - }; - crank_ix.accounts.push(match acc.is_writable { - true => AccountMeta::new(acc_pubkey, false), - false => AccountMeta::new_readonly(acc_pubkey, false), - }) + kickoff_ix +} + +fn build_crank_ix( + _client: Arc, + queue: Queue, + signatory_pubkey: Pubkey, + worker_id: u64, +) -> Instruction { + // Build the instruction. + let queue_pubkey = Queue::pubkey(queue.authority, queue.id); + let mut crank_ix = clockwork_client::queue::instruction::queue_crank( + queue_pubkey, + signatory_pubkey, + Worker::pubkey(worker_id), + ); + + if let Some(next_instruction) = queue.next_instruction { + // Inject the target program account to the ix. + crank_ix.accounts.push(AccountMeta::new_readonly( + next_instruction.program_id, + false, + )); + + // Inject the worker pubkey as the Clockwork "payer" account + for acc in next_instruction.clone().accounts { + let acc_pubkey = if acc.pubkey == clockwork_utils::PAYER_PUBKEY { + signatory_pubkey + } else { + acc.pubkey + }; + crank_ix.accounts.push(match acc.is_writable { + true => AccountMeta::new(acc_pubkey, false), + false => AccountMeta::new_readonly(acc_pubkey, false), + }) + } } crank_ix diff --git a/programs/queue/src/instructions/mod.rs b/programs/queue/src/instructions/mod.rs index 6f14ac9ec..25f550ba6 100644 --- a/programs/queue/src/instructions/mod.rs +++ b/programs/queue/src/instructions/mod.rs @@ -1,6 +1,7 @@ pub mod queue_crank; pub mod queue_create; pub mod queue_delete; +pub mod queue_kickoff; pub mod queue_pause; pub mod queue_resume; pub mod queue_update; @@ -9,6 +10,7 @@ pub mod queue_withdraw; pub use queue_crank::*; pub use queue_create::*; pub use queue_delete::*; +pub use queue_kickoff::*; pub use queue_pause::*; pub use queue_resume::*; pub use queue_update::*; diff --git a/programs/queue/src/instructions/queue_crank.rs b/programs/queue/src/instructions/queue_crank.rs index f56f1722f..5191ad92a 100644 --- a/programs/queue/src/instructions/queue_crank.rs +++ b/programs/queue/src/instructions/queue_crank.rs @@ -7,12 +7,8 @@ use { /// The ID of the pool workers must be a member of to collect fees. const POOL_ID: u64 = 0; -/// Number of lamports to reimburse the worker with after they've submitted a transaction's worth of cranks. -const TRANSACTION_BASE_FEE_REIMBURSEMENT: u64 = 5_000; - /// Accounts required by the `queue_crank` instruction. #[derive(Accounts)] -#[instruction(data_hash: Option)] pub struct QueueCrank<'info> { /// The worker's fee account. #[account( @@ -53,7 +49,8 @@ pub struct QueueCrank<'info> { queue.id.as_bytes(), ], bump, - constraint = !queue.paused @ ClockworkError::QueuePaused + constraint = !queue.paused @ ClockworkError::QueuePaused, + constraint = queue.next_instruction.is_some() )] pub queue: Box>, @@ -69,89 +66,38 @@ pub struct QueueCrank<'info> { pub worker: Account<'info, Worker>, } -pub fn handler(ctx: Context, data_hash: Option) -> Result<()> { +pub fn handler(ctx: Context) -> Result<()> { // Get accounts let fee = &mut ctx.accounts.fee; - let penalty = &ctx.accounts.penalty; + let penalty = &mut ctx.accounts.penalty; let pool = &ctx.accounts.pool; let queue = &mut ctx.accounts.queue; - let signatory = &ctx.accounts.signatory; + let signatory = &mut ctx.accounts.signatory; let worker = &ctx.accounts.worker; - if queue.next_instruction.is_none() { - // If this queue does not have a next_instruction, verify the queue's trigger condition is active. - queue.verify_trigger(data_hash, ctx.remaining_accounts)?; - } else { - // If the rate limit has been met, exit early. - match queue.exec_context { - None => return Err(ClockworkError::InvalidQueueState.into()), - Some(exec_context) => { - if exec_context.last_crank_at == Clock::get().unwrap().slot - && exec_context.cranks_since_slot >= queue.rate_limit - { - return Err(ClockworkError::RateLimitExeceeded.into()); - } - } - } - - // Crank the queue - let bump = ctx.bumps.get("queue").unwrap(); - queue.crank(ctx.remaining_accounts, *bump, signatory)?; - - // Debit the crank fee from the queue account. - **queue.to_account_info().try_borrow_mut_lamports()? = queue - .to_account_info() - .lamports() - .checked_sub(queue.fee) - .unwrap(); - - // If the worker is in the pool, pay fee to the worker's fee account. - // Otherwise, pay fee to the worker's penalty account. - if pool.clone().into_inner().workers.contains(&worker.key()) { - **fee.to_account_info().try_borrow_mut_lamports()? = fee - .to_account_info() - .lamports() - .checked_add(queue.fee) - .unwrap(); - } else { - **penalty.to_account_info().try_borrow_mut_lamports()? = penalty - .to_account_info() - .lamports() - .checked_add(queue.fee) - .unwrap(); - } - - // If the queue has no more work or the number of cranks since the last payout has reached the rate limit, - // reimburse the worker for the transaction base fee. - match queue.exec_context { - None => { - return Err(ClockworkError::InvalidQueueState.into()); - } - Some(exec_context) => { - if queue.next_instruction.is_none() - || exec_context.cranks_since_reimbursement >= queue.rate_limit - { - // Pay reimbursment for base transaction fee - **queue.to_account_info().try_borrow_mut_lamports()? = queue - .to_account_info() - .lamports() - .checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT) - .unwrap(); - **signatory.to_account_info().try_borrow_mut_lamports()? = signatory - .to_account_info() - .lamports() - .checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT) - .unwrap(); - - // Update the exec context to mark that a reimbursement happened this slot. - queue.exec_context = Some(ExecContext { - cranks_since_reimbursement: 0, - ..exec_context - }); - } + // If the rate limit has been met, exit early. + match queue.exec_context { + None => return Err(ClockworkError::InvalidQueueState.into()), + Some(exec_context) => { + if exec_context.last_crank_at == Clock::get().unwrap().slot + && exec_context.cranks_since_slot >= queue.rate_limit + { + return Err(ClockworkError::RateLimitExeceeded.into()); } } } + // Crank the queue + let bump = ctx.bumps.get("queue").unwrap(); + queue.crank( + ctx.remaining_accounts, + *bump, + fee, + penalty, + pool, + signatory, + worker, + )?; + Ok(()) } diff --git a/programs/queue/src/instructions/queue_kickoff.rs b/programs/queue/src/instructions/queue_kickoff.rs new file mode 100644 index 000000000..54672b53d --- /dev/null +++ b/programs/queue/src/instructions/queue_kickoff.rs @@ -0,0 +1,45 @@ +use { + crate::{errors::*, objects::*}, + anchor_lang::prelude::*, + clockwork_network_program::objects::{Worker, WorkerAccount}, +}; + +/// Accounts required by the `queue_crank` instruction. +#[derive(Accounts)] +#[instruction(data_hash: Option)] +pub struct QueueKickoff<'info> { + /// The queue to crank. + #[account( + mut, + seeds = [ + SEED_QUEUE, + queue.authority.as_ref(), + queue.id.as_bytes(), + ], + bump, + constraint = !queue.paused @ ClockworkError::QueuePaused, + constraint = queue.next_instruction.is_none() + )] + pub queue: Box>, + + /// The signatory. + #[account(mut)] + pub signatory: Signer<'info>, + + /// The worker. + #[account( + address = worker.pubkey(), + has_one = signatory + )] + pub worker: Account<'info, Worker>, +} + +pub fn handler(ctx: Context, data_hash: Option) -> Result<()> { + // Get accounts + let queue = &mut ctx.accounts.queue; + + // If this queue does not have a next_instruction, verify the queue's trigger condition is active. + queue.kickoff(data_hash, ctx.remaining_accounts)?; + + Ok(()) +} diff --git a/programs/queue/src/lib.rs b/programs/queue/src/lib.rs index 3194d18a8..a4ed6577d 100644 --- a/programs/queue/src/lib.rs +++ b/programs/queue/src/lib.rs @@ -20,8 +20,8 @@ pub mod queue_program { use super::*; /// Cranks a transaction queue. - pub fn queue_crank(ctx: Context, data_hash: Option) -> Result<()> { - queue_crank::handler(ctx, data_hash) + pub fn queue_crank(ctx: Context) -> Result<()> { + queue_crank::handler(ctx) } /// Creates a new transaction queue. @@ -39,6 +39,11 @@ pub mod queue_program { queue_delete::handler(ctx) } + /// Kicks off a queue if its trigger condition is active. + pub fn queue_kickoff(ctx: Context, data_hash: Option) -> Result<()> { + queue_kickoff::handler(ctx, data_hash) + } + /// Pauses an active queue. pub fn queue_pause(ctx: Context) -> Result<()> { queue_pause::handler(ctx) diff --git a/programs/queue/src/objects/queue.rs b/programs/queue/src/objects/queue.rs index 8285d7938..f1f5c29be 100644 --- a/programs/queue/src/objects/queue.rs +++ b/programs/queue/src/objects/queue.rs @@ -10,6 +10,7 @@ use { }, chrono::{DateTime, NaiveDateTime, Utc}, clockwork_cron::Schedule, + clockwork_network_program::objects::{Fee, Penalty, Pool, Worker}, clockwork_utils::*, std::{ collections::hash_map::DefaultHasher, @@ -19,20 +20,20 @@ use { }, }; -// TODO Add support for lookup tables. -// If the value is set, then use that lookup table when building the transaction. -// Add a property to CrankResponse to allow updating the lookup table. -// I believe Transaction.v0 only supports one lookup table at a time. So if this value changes between cranks, -// workers will need to stop packing the transaction and submit. - pub const SEED_QUEUE: &[u8] = b"queue"; +/// The default rate limit to initialize queues with const DEFAULT_RATE_LIMIT: u64 = 10; +/// The maximum rate limit which may be set on queue. const MAX_RATE_LIMIT: u64 = 32; +/// The Minimum crank fee that may be set on a queue. const MINIMUM_FEE: u64 = 1000; +/// The Number of lamports to reimburse the worker with after they've submitted a transaction's worth of cranks. +const TRANSACTION_BASE_FEE_REIMBURSEMENT: u64 = 5_000; + /// Tracks the current state of a transaction thread on Solana. #[account] #[derive(Debug)] @@ -112,18 +113,24 @@ pub trait QueueAccount { ) -> Result<()>; /// Crank the queue. Call out to the target program and parse the response for a next instruction. - fn crank(&mut self, account_infos: &[AccountInfo], bump: u8, worker: &Signer) -> Result<()>; + fn crank( + &mut self, + account_infos: &[AccountInfo], + bump: u8, + fee: &mut Account, + penalty: &mut Account, + pool: &Account, + signatory: &mut Signer, + worker: &Account, + ) -> Result<()>; + + fn kickoff(&mut self, data_hash: Option, remaining_accounts: &[AccountInfo]) + -> Result<()>; /// Reallocate the memory allocation for the account. fn realloc(&mut self) -> Result<()>; fn update(&mut self, settings: QueueSettings) -> Result<()>; - - fn verify_trigger( - &mut self, - data_hash: Option, - remaining_accounts: &[AccountInfo], - ) -> Result<()>; } impl QueueAccount for Account<'_, Queue> { @@ -151,7 +158,16 @@ impl QueueAccount for Account<'_, Queue> { Ok(()) } - fn crank(&mut self, account_infos: &[AccountInfo], bump: u8, signatory: &Signer) -> Result<()> { + fn crank( + &mut self, + account_infos: &[AccountInfo], + bump: u8, + fee: &mut Account, + penalty: &mut Account, + pool: &Account, + signatory: &mut Signer, + worker: &Account, + ) -> Result<()> { // Record the worker's lamports before invoking inner ixs let signatory_lamports_pre = signatory.lamports(); @@ -256,6 +272,59 @@ impl QueueAccount for Account<'_, Queue> { .checked_add(signatory_reimbursement) .unwrap(); + // Debit the crank fee from the queue account. + // If the worker is in the pool, pay fee to the worker's fee account. + // Otherwise, pay fee to the worker's penalty account. + **self.to_account_info().try_borrow_mut_lamports()? = self + .to_account_info() + .lamports() + .checked_sub(self.fee) + .unwrap(); + if pool.clone().into_inner().workers.contains(&worker.key()) { + **fee.to_account_info().try_borrow_mut_lamports()? = fee + .to_account_info() + .lamports() + .checked_add(self.fee) + .unwrap(); + } else { + **penalty.to_account_info().try_borrow_mut_lamports()? = penalty + .to_account_info() + .lamports() + .checked_add(self.fee) + .unwrap(); + } + + // If the self has no more work or the number of cranks since the last payout has reached the rate limit, + // reimburse the worker for the transaction base fee. + match self.exec_context { + None => { + return Err(ClockworkError::InvalidQueueState.into()); + } + Some(exec_context) => { + if self.next_instruction.is_none() + || exec_context.cranks_since_reimbursement >= self.rate_limit + { + // Pay reimbursment for base transaction fee + **self.to_account_info().try_borrow_mut_lamports()? = self + .to_account_info() + .lamports() + .checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT) + .unwrap(); + **signatory.to_account_info().try_borrow_mut_lamports()? = signatory + .to_account_info() + .lamports() + .checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT) + .unwrap(); + + // Update the exec context to mark that a reimbursement happened this slot. + self.exec_context = Some(ExecContext { + cranks_since_reimbursement: 0, + ..exec_context + }); + } + } + } + Ok(()) } @@ -295,7 +364,7 @@ impl QueueAccount for Account<'_, Queue> { Ok(()) } - fn verify_trigger( + fn kickoff( &mut self, data_hash: Option, remaining_accounts: &[AccountInfo], diff --git a/utils/src/lib.rs b/utils/src/lib.rs index cab25072f..0d62a27f1 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -13,6 +13,8 @@ use { std::{convert::TryFrom, hash::Hash}, }; +// TODO Add support for lookup tables. + /// The stand-in pubkey for delegating a payer address to a worker. All workers are re-imbursed by the user for lamports spent during this delegation. pub static PAYER_PUBKEY: Pubkey = static_pubkey!("C1ockworkPayer11111111111111111111111111111");