From eaf33394a201963cf3b5cd7c606b71f12f006384 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 12 Oct 2022 17:25:23 -0500 Subject: [PATCH] If trigger condition is activated, set next_instruction as the kickoff_instruction --- plugin/src/builders/queue_crank.rs | 10 - .../queue/src/instructions/queue_crank.rs | 290 ++++-------------- programs/queue/src/objects/queue.rs | 161 ++++++++++ 3 files changed, 223 insertions(+), 238 deletions(-) diff --git a/plugin/src/builders/queue_crank.rs b/plugin/src/builders/queue_crank.rs index 09bdc28f1..20acab82b 100644 --- a/plugin/src/builders/queue_crank.rs +++ b/plugin/src/builders/queue_crank.rs @@ -22,7 +22,6 @@ use { }, }; -static COMPUTE_BUDGET_LIMIT: u64 = 1_400_000; // Max number of compute units per transaction static TRANSACTION_SIZE_LIMIT: usize = 1_232; // Max byte size of a serialized transaction pub async fn build_crank_txs( @@ -105,15 +104,6 @@ fn build_crank_tx( break; } - // If the compute budget limit was exceeded, then stop packing. - if response - .value - .units_consumed - .ge(&Some(COMPUTE_BUDGET_LIMIT)) - { - break; - } - // Save the simulated tx. It is okay to submit. tx = sim_tx; diff --git a/programs/queue/src/instructions/queue_crank.rs b/programs/queue/src/instructions/queue_crank.rs index 76386f62e..f56f1722f 100644 --- a/programs/queue/src/instructions/queue_crank.rs +++ b/programs/queue/src/instructions/queue_crank.rs @@ -1,24 +1,15 @@ -use anchor_lang::solana_program::program::set_return_data; - use { crate::{errors::*, objects::*}, anchor_lang::prelude::*, - chrono::{DateTime, NaiveDateTime, Utc}, - clockwork_cron::Schedule, clockwork_network_program::objects::{Fee, Penalty, Pool, Worker, WorkerAccount}, - std::{ - collections::hash_map::DefaultHasher, - hash::{Hash, Hasher}, - str::FromStr, - }, }; -/// Number of lamports to reimburse the worker with after they've submitted a transaction's worth of cranks. -const TRANSACTION_BASE_FEE_REIMBURSEMENT: u64 = 5000; - /// The ID of the pool workers must be a member of to collect fees. const POOL_ID: u64 = 0; +/// Number of lamports to reimburse the worker with after they've submitted a transaction's worth of cranks. +const TRANSACTION_BASE_FEE_REIMBURSEMENT: u64 = 5_000; + /// Accounts required by the `queue_crank` instruction. #[derive(Accounts)] #[instruction(data_hash: Option)] @@ -87,237 +78,80 @@ pub fn handler(ctx: Context, data_hash: Option) -> Result<()> { let signatory = &ctx.accounts.signatory; let worker = &ctx.accounts.worker; - // If this queue does not have a next_instruction, verify the queue's trigger has been met and a new exec_context can be created. - let current_slot = Clock::get().unwrap().slot; if queue.next_instruction.is_none() { - match queue.trigger.clone() { - Trigger::Account { pubkey } => { - // Require the provided data hash is non-null. - let data_hash = match data_hash { - None => return Err(ClockworkError::DataHashNotPresent.into()), - Some(data_hash) => data_hash, - }; - - // Verify proof that account data has been updated. - match ctx.remaining_accounts.first() { - None => {} - Some(account_info) => { - // Verify the remaining account is the account this queue is listening for. - require!( - pubkey.eq(account_info.key), - ClockworkError::TriggerNotActive - ); - - // Begin computing the data hash of this account. - let mut hasher = DefaultHasher::new(); - let data = &account_info.try_borrow_data().unwrap(); - data.to_vec().hash(&mut hasher); - - // Check the exec context for the prior data hash. - let expected_data_hash = match queue.exec_context.clone() { - None => { - // This queue has not begun executing yet. - // There is no prior data hash to include in our hash. - hasher.finish() - } - Some(exec_context) => { - match exec_context.trigger_context { - TriggerContext::Account { - data_hash: prior_data_hash, - } => { - // Inject the prior data hash as a seed. - prior_data_hash.hash(&mut hasher); - hasher.finish() - } - _ => return Err(ClockworkError::InvalidQueueState.into()), - } - } - }; - - // Verify the data hash provided by the worker is equal to the expected data hash. - // This proves the account has been updated since the last crank and the worker has seen the new data. - require!( - data_hash.eq(&expected_data_hash), - ClockworkError::TriggerNotActive - ); - - // Set a new exec context with the new data hash and slot number. - queue.exec_context = Some(ExecContext { - cranks_since_reimbursement: 0, - cranks_since_slot: 0, - last_crank_at: current_slot, - trigger_context: TriggerContext::Account { data_hash }, - }) - } + // If this queue does not have a next_instruction, verify the queue's trigger condition is active. + queue.verify_trigger(data_hash, ctx.remaining_accounts)?; + } else { + // If the rate limit has been met, exit early. + match queue.exec_context { + None => return Err(ClockworkError::InvalidQueueState.into()), + Some(exec_context) => { + if exec_context.last_crank_at == Clock::get().unwrap().slot + && exec_context.cranks_since_slot >= queue.rate_limit + { + return Err(ClockworkError::RateLimitExeceeded.into()); } } - Trigger::Cron { - schedule, - skippable, - } => { - // Get the reference timestamp for calculating the queue's scheduled target timestamp. - let reference_timestamp = match queue.exec_context.clone() { - None => queue.created_at.unix_timestamp, - Some(exec_context) => match exec_context.trigger_context { - TriggerContext::Cron { started_at } => started_at, - _ => return Err(ClockworkError::InvalidQueueState.into()), - }, - }; - - // Verify the current timestamp is greater than or equal to the threshold timestamp. - let current_timestamp = Clock::get().unwrap().unix_timestamp; - let threshold_timestamp = next_timestamp(reference_timestamp, schedule.clone()) - .ok_or(ClockworkError::TriggerNotActive)?; - require!( - current_timestamp >= threshold_timestamp, - ClockworkError::TriggerNotActive - ); - - // If the schedule is marked as skippable, set the started_at of the exec context - // to be the threshold moment just before the current timestamp. - let started_at = if skippable && current_timestamp > threshold_timestamp { - prev_timestamp(current_timestamp, schedule) - .ok_or(ClockworkError::TriggerNotActive)? - } else { - threshold_timestamp - }; - - // Set the exec context. - queue.exec_context = Some(ExecContext { - cranks_since_reimbursement: 0, - cranks_since_slot: 0, - last_crank_at: current_slot, - trigger_context: TriggerContext::Cron { started_at }, - }); - } - Trigger::Immediate => { - // Set the exec context. - require!( - queue.exec_context.is_none(), - ClockworkError::InvalidQueueState - ); - queue.exec_context = Some(ExecContext { - cranks_since_reimbursement: 0, - cranks_since_slot: 0, - last_crank_at: current_slot, - trigger_context: TriggerContext::Immediate, - }); - } } - } - - // If the rate limit has been met, exit early. - match queue.exec_context { - None => return Err(ClockworkError::InvalidQueueState.into()), - Some(exec_context) => { - if exec_context.last_crank_at == Clock::get().unwrap().slot - && exec_context.cranks_since_slot >= queue.rate_limit - { - return Err(ClockworkError::RateLimitExeceeded.into()); - } - } - } - msg!("A"); + // Crank the queue + let bump = ctx.bumps.get("queue").unwrap(); + queue.crank(ctx.remaining_accounts, *bump, signatory)?; - // Crank the queue - let bump = ctx.bumps.get("queue").unwrap(); - queue.crank(ctx.remaining_accounts, *bump, signatory)?; - - msg!("B1"); - - set_return_data(&[]); - - msg!("B2"); - - // set_return_data(data) - - // Debit the crank fee from the queue account. - **queue.to_account_info().try_borrow_mut_lamports()? = queue - .to_account_info() - .lamports() - .checked_sub(queue.fee) - .unwrap(); - - msg!("C"); - - // If the worker is in the pool, pay fee to the worker's fee account. - // Otherwise, pay fee to the worker's penalty account. - if pool.clone().into_inner().workers.contains(&worker.key()) { - msg!("D"); - **fee.to_account_info().try_borrow_mut_lamports()? = fee - .to_account_info() - .lamports() - .checked_add(queue.fee) - .unwrap(); - } else { - msg!("E"); - **penalty.to_account_info().try_borrow_mut_lamports()? = penalty + // Debit the crank fee from the queue account. + **queue.to_account_info().try_borrow_mut_lamports()? = queue .to_account_info() .lamports() - .checked_add(queue.fee) + .checked_sub(queue.fee) .unwrap(); - } - // If the queue has no more work or the number of cranks since the last payout has reached the rate limit, - // reimburse the worker for the transaction base fee. - match queue.exec_context { - None => { - msg!("F"); - return Err(ClockworkError::InvalidQueueState.into()); + // If the worker is in the pool, pay fee to the worker's fee account. + // Otherwise, pay fee to the worker's penalty account. + if pool.clone().into_inner().workers.contains(&worker.key()) { + **fee.to_account_info().try_borrow_mut_lamports()? = fee + .to_account_info() + .lamports() + .checked_add(queue.fee) + .unwrap(); + } else { + **penalty.to_account_info().try_borrow_mut_lamports()? = penalty + .to_account_info() + .lamports() + .checked_add(queue.fee) + .unwrap(); } - Some(exec_context) => { - msg!("G"); - if queue.next_instruction.is_none() - || exec_context.cranks_since_reimbursement >= queue.rate_limit - { - msg!("H"); - // Pay reimbursment for base transaction fee - **queue.to_account_info().try_borrow_mut_lamports()? = queue - .to_account_info() - .lamports() - .checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT) - .unwrap(); - **signatory.to_account_info().try_borrow_mut_lamports()? = signatory - .to_account_info() - .lamports() - .checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT) - .unwrap(); - - msg!("I"); - // Update the exec context to mark that a reimbursement happened this slot. - queue.exec_context = Some(ExecContext { - cranks_since_reimbursement: 0, - ..exec_context - }); + // If the queue has no more work or the number of cranks since the last payout has reached the rate limit, + // reimburse the worker for the transaction base fee. + match queue.exec_context { + None => { + return Err(ClockworkError::InvalidQueueState.into()); + } + Some(exec_context) => { + if queue.next_instruction.is_none() + || exec_context.cranks_since_reimbursement >= queue.rate_limit + { + // Pay reimbursment for base transaction fee + **queue.to_account_info().try_borrow_mut_lamports()? = queue + .to_account_info() + .lamports() + .checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT) + .unwrap(); + **signatory.to_account_info().try_borrow_mut_lamports()? = signatory + .to_account_info() + .lamports() + .checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT) + .unwrap(); + + // Update the exec context to mark that a reimbursement happened this slot. + queue.exec_context = Some(ExecContext { + cranks_since_reimbursement: 0, + ..exec_context + }); + } } } } - msg!("J"); - Ok(()) } - -fn next_timestamp(after: i64, schedule: String) -> Option { - Schedule::from_str(&schedule) - .unwrap() - .next_after(&DateTime::::from_utc( - NaiveDateTime::from_timestamp(after, 0), - Utc, - )) - .take() - .map(|datetime| datetime.timestamp()) -} - -fn prev_timestamp(before: i64, schedule: String) -> Option { - Schedule::from_str(&schedule) - .unwrap() - .prev_before(&DateTime::::from_utc( - NaiveDateTime::from_timestamp(before, 0), - Utc, - )) - .take() - .map(|datetime| datetime.timestamp()) -} diff --git a/programs/queue/src/objects/queue.rs b/programs/queue/src/objects/queue.rs index 6e4d67d65..8285d7938 100644 --- a/programs/queue/src/objects/queue.rs +++ b/programs/queue/src/objects/queue.rs @@ -8,10 +8,14 @@ use { }, AnchorDeserialize, AnchorSerialize, }, + chrono::{DateTime, NaiveDateTime, Utc}, + clockwork_cron::Schedule, clockwork_utils::*, std::{ + collections::hash_map::DefaultHasher, convert::TryFrom, hash::{Hash, Hasher}, + str::FromStr, }, }; @@ -114,6 +118,12 @@ pub trait QueueAccount { fn realloc(&mut self) -> Result<()>; fn update(&mut self, settings: QueueSettings) -> Result<()>; + + fn verify_trigger( + &mut self, + data_hash: Option, + remaining_accounts: &[AccountInfo], + ) -> Result<()>; } impl QueueAccount for Account<'_, Queue> { @@ -284,6 +294,135 @@ impl QueueAccount for Account<'_, Queue> { Ok(()) } + + fn verify_trigger( + &mut self, + data_hash: Option, + remaining_accounts: &[AccountInfo], + ) -> Result<()> { + let clock = Clock::get().unwrap(); + match self.trigger.clone() { + Trigger::Account { pubkey } => { + // Require the provided data hash is non-null. + let data_hash = match data_hash { + None => return Err(ClockworkError::DataHashNotPresent.into()), + Some(data_hash) => data_hash, + }; + + // Verify proof that account data has been updated. + match remaining_accounts.first() { + None => {} + Some(account_info) => { + // Verify the remaining account is the account this queue is listening for. + require!( + pubkey.eq(account_info.key), + ClockworkError::TriggerNotActive + ); + + // Begin computing the data hash of this account. + let mut hasher = DefaultHasher::new(); + let data = &account_info.try_borrow_data().unwrap(); + data.to_vec().hash(&mut hasher); + + // Check the exec context for the prior data hash. + let expected_data_hash = match self.exec_context.clone() { + None => { + // This queue has not begun executing yet. + // There is no prior data hash to include in our hash. + hasher.finish() + } + Some(exec_context) => { + match exec_context.trigger_context { + TriggerContext::Account { + data_hash: prior_data_hash, + } => { + // Inject the prior data hash as a seed. + prior_data_hash.hash(&mut hasher); + hasher.finish() + } + _ => return Err(ClockworkError::InvalidQueueState.into()), + } + } + }; + + // Verify the data hash provided by the worker is equal to the expected data hash. + // This proves the account has been updated since the last crank and the worker has seen the new data. + require!( + data_hash.eq(&expected_data_hash), + ClockworkError::TriggerNotActive + ); + + // Set a new exec context with the new data hash and slot number. + self.exec_context = Some(ExecContext { + cranks_since_reimbursement: 0, + cranks_since_slot: 0, + last_crank_at: clock.slot, + trigger_context: TriggerContext::Account { data_hash }, + }) + } + } + } + Trigger::Cron { + schedule, + skippable, + } => { + // Get the reference timestamp for calculating the queue's scheduled target timestamp. + let reference_timestamp = match self.exec_context.clone() { + None => self.created_at.unix_timestamp, + Some(exec_context) => match exec_context.trigger_context { + TriggerContext::Cron { started_at } => started_at, + _ => return Err(ClockworkError::InvalidQueueState.into()), + }, + }; + + // Verify the current timestamp is greater than or equal to the threshold timestamp. + let threshold_timestamp = next_timestamp(reference_timestamp, schedule.clone()) + .ok_or(ClockworkError::TriggerNotActive)?; + require!( + clock.unix_timestamp >= threshold_timestamp, + ClockworkError::TriggerNotActive + ); + + // If the schedule is marked as skippable, set the started_at of the exec context + // to be the threshold moment just before the current timestamp. + let started_at = if skippable && clock.unix_timestamp > threshold_timestamp { + prev_timestamp(clock.unix_timestamp, schedule) + .ok_or(ClockworkError::TriggerNotActive)? + } else { + threshold_timestamp + }; + + // Set the exec context. + self.exec_context = Some(ExecContext { + cranks_since_reimbursement: 0, + cranks_since_slot: 0, + last_crank_at: clock.slot, + trigger_context: TriggerContext::Cron { started_at }, + }); + } + Trigger::Immediate => { + // Set the exec context. + require!( + self.exec_context.is_none(), + ClockworkError::InvalidQueueState + ); + self.exec_context = Some(ExecContext { + cranks_since_reimbursement: 0, + cranks_since_slot: 0, + last_crank_at: clock.slot, + trigger_context: TriggerContext::Immediate, + }); + } + } + + // If we make it here, the trigger is active. Update the next instruction and be done. + self.next_instruction = Some(self.kickoff_instruction.clone()); + + // Realloc the queue account + self.realloc()?; + + Ok(()) + } } /// The triggering conditions of a queue. @@ -343,3 +482,25 @@ pub enum TriggerContext { /// The immediate trigger context. Immediate, } + +fn next_timestamp(after: i64, schedule: String) -> Option { + Schedule::from_str(&schedule) + .unwrap() + .next_after(&DateTime::::from_utc( + NaiveDateTime::from_timestamp(after, 0), + Utc, + )) + .take() + .map(|datetime| datetime.timestamp()) +} + +fn prev_timestamp(before: i64, schedule: String) -> Option { + Schedule::from_str(&schedule) + .unwrap() + .prev_before(&DateTime::::from_utc( + NaiveDateTime::from_timestamp(before, 0), + Utc, + )) + .take() + .map(|datetime| datetime.timestamp()) +}