Skip to content

Commit

Permalink
If trigger condition is activated, set next_instruction as the kickof…
Browse files Browse the repository at this point in the history
…f_instruction
  • Loading branch information
Nick Garfield committed Oct 12, 2022
1 parent d83b7f7 commit eaf3339
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 238 deletions.
10 changes: 0 additions & 10 deletions plugin/src/builders/queue_crank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use {
},
};

static COMPUTE_BUDGET_LIMIT: u64 = 1_400_000; // Max number of compute units per transaction
static TRANSACTION_SIZE_LIMIT: usize = 1_232; // Max byte size of a serialized transaction

pub async fn build_crank_txs(
Expand Down Expand Up @@ -105,15 +104,6 @@ fn build_crank_tx(
break;
}

// If the compute budget limit was exceeded, then stop packing.
if response
.value
.units_consumed
.ge(&Some(COMPUTE_BUDGET_LIMIT))
{
break;
}

// Save the simulated tx. It is okay to submit.
tx = sim_tx;

Expand Down
290 changes: 62 additions & 228 deletions programs/queue/src/instructions/queue_crank.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
use anchor_lang::solana_program::program::set_return_data;

use {
crate::{errors::*, objects::*},
anchor_lang::prelude::*,
chrono::{DateTime, NaiveDateTime, Utc},
clockwork_cron::Schedule,
clockwork_network_program::objects::{Fee, Penalty, Pool, Worker, WorkerAccount},
std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
str::FromStr,
},
};

/// Number of lamports to reimburse the worker with after they've submitted a transaction's worth of cranks.
const TRANSACTION_BASE_FEE_REIMBURSEMENT: u64 = 5000;

/// The ID of the pool workers must be a member of to collect fees.
const POOL_ID: u64 = 0;

/// Number of lamports to reimburse the worker with after they've submitted a transaction's worth of cranks.
const TRANSACTION_BASE_FEE_REIMBURSEMENT: u64 = 5_000;

/// Accounts required by the `queue_crank` instruction.
#[derive(Accounts)]
#[instruction(data_hash: Option<u64>)]
Expand Down Expand Up @@ -87,237 +78,80 @@ pub fn handler(ctx: Context<QueueCrank>, data_hash: Option<u64>) -> Result<()> {
let signatory = &ctx.accounts.signatory;
let worker = &ctx.accounts.worker;

// If this queue does not have a next_instruction, verify the queue's trigger has been met and a new exec_context can be created.
let current_slot = Clock::get().unwrap().slot;
if queue.next_instruction.is_none() {
match queue.trigger.clone() {
Trigger::Account { pubkey } => {
// Require the provided data hash is non-null.
let data_hash = match data_hash {
None => return Err(ClockworkError::DataHashNotPresent.into()),
Some(data_hash) => data_hash,
};

// Verify proof that account data has been updated.
match ctx.remaining_accounts.first() {
None => {}
Some(account_info) => {
// Verify the remaining account is the account this queue is listening for.
require!(
pubkey.eq(account_info.key),
ClockworkError::TriggerNotActive
);

// Begin computing the data hash of this account.
let mut hasher = DefaultHasher::new();
let data = &account_info.try_borrow_data().unwrap();
data.to_vec().hash(&mut hasher);

// Check the exec context for the prior data hash.
let expected_data_hash = match queue.exec_context.clone() {
None => {
// This queue has not begun executing yet.
// There is no prior data hash to include in our hash.
hasher.finish()
}
Some(exec_context) => {
match exec_context.trigger_context {
TriggerContext::Account {
data_hash: prior_data_hash,
} => {
// Inject the prior data hash as a seed.
prior_data_hash.hash(&mut hasher);
hasher.finish()
}
_ => return Err(ClockworkError::InvalidQueueState.into()),
}
}
};

// Verify the data hash provided by the worker is equal to the expected data hash.
// This proves the account has been updated since the last crank and the worker has seen the new data.
require!(
data_hash.eq(&expected_data_hash),
ClockworkError::TriggerNotActive
);

// Set a new exec context with the new data hash and slot number.
queue.exec_context = Some(ExecContext {
cranks_since_reimbursement: 0,
cranks_since_slot: 0,
last_crank_at: current_slot,
trigger_context: TriggerContext::Account { data_hash },
})
}
// If this queue does not have a next_instruction, verify the queue's trigger condition is active.
queue.verify_trigger(data_hash, ctx.remaining_accounts)?;
} else {
// If the rate limit has been met, exit early.
match queue.exec_context {
None => return Err(ClockworkError::InvalidQueueState.into()),
Some(exec_context) => {
if exec_context.last_crank_at == Clock::get().unwrap().slot
&& exec_context.cranks_since_slot >= queue.rate_limit
{
return Err(ClockworkError::RateLimitExeceeded.into());
}
}
Trigger::Cron {
schedule,
skippable,
} => {
// Get the reference timestamp for calculating the queue's scheduled target timestamp.
let reference_timestamp = match queue.exec_context.clone() {
None => queue.created_at.unix_timestamp,
Some(exec_context) => match exec_context.trigger_context {
TriggerContext::Cron { started_at } => started_at,
_ => return Err(ClockworkError::InvalidQueueState.into()),
},
};

// Verify the current timestamp is greater than or equal to the threshold timestamp.
let current_timestamp = Clock::get().unwrap().unix_timestamp;
let threshold_timestamp = next_timestamp(reference_timestamp, schedule.clone())
.ok_or(ClockworkError::TriggerNotActive)?;
require!(
current_timestamp >= threshold_timestamp,
ClockworkError::TriggerNotActive
);

// If the schedule is marked as skippable, set the started_at of the exec context
// to be the threshold moment just before the current timestamp.
let started_at = if skippable && current_timestamp > threshold_timestamp {
prev_timestamp(current_timestamp, schedule)
.ok_or(ClockworkError::TriggerNotActive)?
} else {
threshold_timestamp
};

// Set the exec context.
queue.exec_context = Some(ExecContext {
cranks_since_reimbursement: 0,
cranks_since_slot: 0,
last_crank_at: current_slot,
trigger_context: TriggerContext::Cron { started_at },
});
}
Trigger::Immediate => {
// Set the exec context.
require!(
queue.exec_context.is_none(),
ClockworkError::InvalidQueueState
);
queue.exec_context = Some(ExecContext {
cranks_since_reimbursement: 0,
cranks_since_slot: 0,
last_crank_at: current_slot,
trigger_context: TriggerContext::Immediate,
});
}
}
}

// If the rate limit has been met, exit early.
match queue.exec_context {
None => return Err(ClockworkError::InvalidQueueState.into()),
Some(exec_context) => {
if exec_context.last_crank_at == Clock::get().unwrap().slot
&& exec_context.cranks_since_slot >= queue.rate_limit
{
return Err(ClockworkError::RateLimitExeceeded.into());
}
}
}

msg!("A");
// Crank the queue
let bump = ctx.bumps.get("queue").unwrap();
queue.crank(ctx.remaining_accounts, *bump, signatory)?;

// Crank the queue
let bump = ctx.bumps.get("queue").unwrap();
queue.crank(ctx.remaining_accounts, *bump, signatory)?;

msg!("B1");

set_return_data(&[]);

msg!("B2");

// set_return_data(data)

// Debit the crank fee from the queue account.
**queue.to_account_info().try_borrow_mut_lamports()? = queue
.to_account_info()
.lamports()
.checked_sub(queue.fee)
.unwrap();

msg!("C");

// If the worker is in the pool, pay fee to the worker's fee account.
// Otherwise, pay fee to the worker's penalty account.
if pool.clone().into_inner().workers.contains(&worker.key()) {
msg!("D");
**fee.to_account_info().try_borrow_mut_lamports()? = fee
.to_account_info()
.lamports()
.checked_add(queue.fee)
.unwrap();
} else {
msg!("E");
**penalty.to_account_info().try_borrow_mut_lamports()? = penalty
// Debit the crank fee from the queue account.
**queue.to_account_info().try_borrow_mut_lamports()? = queue
.to_account_info()
.lamports()
.checked_add(queue.fee)
.checked_sub(queue.fee)
.unwrap();
}

// If the queue has no more work or the number of cranks since the last payout has reached the rate limit,
// reimburse the worker for the transaction base fee.
match queue.exec_context {
None => {
msg!("F");
return Err(ClockworkError::InvalidQueueState.into());
// If the worker is in the pool, pay fee to the worker's fee account.
// Otherwise, pay fee to the worker's penalty account.
if pool.clone().into_inner().workers.contains(&worker.key()) {
**fee.to_account_info().try_borrow_mut_lamports()? = fee
.to_account_info()
.lamports()
.checked_add(queue.fee)
.unwrap();
} else {
**penalty.to_account_info().try_borrow_mut_lamports()? = penalty
.to_account_info()
.lamports()
.checked_add(queue.fee)
.unwrap();
}
Some(exec_context) => {
msg!("G");
if queue.next_instruction.is_none()
|| exec_context.cranks_since_reimbursement >= queue.rate_limit
{
msg!("H");
// Pay reimbursment for base transaction fee
**queue.to_account_info().try_borrow_mut_lamports()? = queue
.to_account_info()
.lamports()
.checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT)
.unwrap();
**signatory.to_account_info().try_borrow_mut_lamports()? = signatory
.to_account_info()
.lamports()
.checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT)
.unwrap();

msg!("I");

// Update the exec context to mark that a reimbursement happened this slot.
queue.exec_context = Some(ExecContext {
cranks_since_reimbursement: 0,
..exec_context
});
// If the queue has no more work or the number of cranks since the last payout has reached the rate limit,
// reimburse the worker for the transaction base fee.
match queue.exec_context {
None => {
return Err(ClockworkError::InvalidQueueState.into());
}
Some(exec_context) => {
if queue.next_instruction.is_none()
|| exec_context.cranks_since_reimbursement >= queue.rate_limit
{
// Pay reimbursment for base transaction fee
**queue.to_account_info().try_borrow_mut_lamports()? = queue
.to_account_info()
.lamports()
.checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT)
.unwrap();
**signatory.to_account_info().try_borrow_mut_lamports()? = signatory
.to_account_info()
.lamports()
.checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT)
.unwrap();

// Update the exec context to mark that a reimbursement happened this slot.
queue.exec_context = Some(ExecContext {
cranks_since_reimbursement: 0,
..exec_context
});
}
}
}
}

msg!("J");

Ok(())
}

fn next_timestamp(after: i64, schedule: String) -> Option<i64> {
Schedule::from_str(&schedule)
.unwrap()
.next_after(&DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(after, 0),
Utc,
))
.take()
.map(|datetime| datetime.timestamp())
}

fn prev_timestamp(before: i64, schedule: String) -> Option<i64> {
Schedule::from_str(&schedule)
.unwrap()
.prev_before(&DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(before, 0),
Utc,
))
.take()
.map(|datetime| datetime.timestamp())
}
Loading

0 comments on commit eaf3339

Please sign in to comment.