Skip to content

Commit

Permalink
Merge pull request #41 from cronos-so/nick/anti-fragile-task-execution
Browse files Browse the repository at this point in the history
Rewrite plugin task execution algorithm to be less fragile
  • Loading branch information
nickgarfield authored Jun 1, 2022
2 parents 4f10e37 + cf579da commit 3ceeba5
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 72 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ docker-target
**/tsconfig.tsbuildinfo
*.diff
**/keypair.json
**/*.log
2 changes: 1 addition & 1 deletion cli/src/processor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub fn set(
settings,
);

client.sign_and_submit(&[ix], &[client.payer()]).unwrap();
client.send_and_confirm(&[ix], &[client.payer()]).unwrap();

Ok(())
}
2 changes: 1 addition & 1 deletion cli/src/processor/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn initialize(client: &Client, mint: Pubkey) -> Result<(), CliError> {

// Submit tx
client
.sign_and_submit(&[ix_a, ix_b, ix_c, ix_d, ix_e, ix_f], &[client.payer()])
.send_and_confirm(&[ix_a, ix_b, ix_c, ix_d, ix_e, ix_f], &[client.payer()])
.unwrap();
Ok(())
}
2 changes: 1 addition & 1 deletion cli/src/processor/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub fn create(client: &Client) -> Result<(), CliError> {
let authority = client.payer_pubkey();
let manager_pubkey = cronos_sdk::scheduler::state::Manager::pda(authority).0;
let ix = cronos_sdk::scheduler::instruction::manager_new(authority, authority, manager_pubkey);
client.sign_and_submit(&[ix], &[client.payer()]).unwrap();
client.send_and_confirm(&[ix], &[client.payer()]).unwrap();
get(client, &manager_pubkey)
}

Expand Down
4 changes: 2 additions & 2 deletions cli/src/processor/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub fn register(client: &Client, delegate: Keypair) -> Result<(), CliError> {
snapshot_queue_pubkey,
snapshot_task_pubkey,
);
client.sign_and_submit(&[ix], &[owner, &delegate]).unwrap();
client.send_and_confirm(&[ix], &[owner, &delegate]).unwrap();
get(client, &node_pubkey)
}

Expand All @@ -95,6 +95,6 @@ pub fn stake(client: &Client, amount: u64, delegate: Pubkey) -> Result<(), CliEr
signer.pubkey(),
);

client.sign_and_submit(&[ix], &[client.payer()]).unwrap();
client.send_and_confirm(&[ix], &[client.payer()]).unwrap();
get(client, &node_pubkey)
}
2 changes: 1 addition & 1 deletion cli/src/processor/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn create(client: &Client, schedule: String) -> Result<(), CliError> {

// Sign and submit
client
.sign_and_submit(&[queue_ix], &[client.payer()])
.send_and_confirm(&[queue_ix], &[client.payer()])
.unwrap();
get(client, &queue_pubkey)
}
Expand Down
195 changes: 135 additions & 60 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
},
solana_sdk::signature::Signature,
std::{collections::HashSet, fmt::Debug, sync::Arc},
tokio::runtime::{Builder, Runtime},
};
Expand Down Expand Up @@ -51,9 +52,10 @@ impl GeyserPlugin for CronosPlugin {
.max_blocking_threads(10) // TODO add to config
.build()
.unwrap(),
due_queues: DashSet::new(),
upcoming_queues: DashMap::new(),
unix_timestamps: DashMap::new(),
actionable_queues: DashSet::new(),
pending_queues: DashMap::new(),
signatures: DashMap::new(),
timestamps: DashMap::new(),
}));
Ok(())
}
Expand All @@ -66,7 +68,7 @@ impl GeyserPlugin for CronosPlugin {
&mut self,
account: ReplicaAccountInfoVersions,
_slot: u64,
is_startup: bool,
_is_startup: bool,
) -> PluginResult<()> {
let account_info = match account {
ReplicaAccountInfoVersions::V0_0_1(account_info) => account_info.clone(),
Expand All @@ -80,10 +82,7 @@ impl GeyserPlugin for CronosPlugin {
match account_update {
CronosAccountUpdate::Clock { clock } => this.handle_clock_update(clock),
CronosAccountUpdate::Queue { queue } => {
info!(
"Queue {:#?} updated. Is startup: {}",
account_pubkey, is_startup
);
info!("Caching queue {:#?}", account_pubkey);
this.handle_queue_update(queue, account_pubkey)
}
}
Expand Down Expand Up @@ -142,57 +141,70 @@ impl GeyserPlugin for CronosPlugin {
pub struct Inner {
pub client: Client,
pub runtime: Runtime,
pub due_queues: DashSet<Pubkey>,
pub upcoming_queues: DashMap<i64, DashSet<Pubkey>>,
pub unix_timestamps: DashMap<u64, i64>,

// The set of queues that can be processed
pub actionable_queues: DashSet<Pubkey>,

// Map from exec_at timestamps to the list of queues scheduled
// for that moment.
pub pending_queues: DashMap<i64, DashSet<Pubkey>>,

// Map from tx signatures to a (queue pubkey, slot) tuple. The slot
// represents the latest confirmed slot at the time the tx was sent.
pub signatures: DashMap<Signature, (Pubkey, u64)>,

// Map from slot numbers to sysvar clock unix_timestamps
pub timestamps: DashMap<u64, i64>,
}

impl Inner {
fn handle_confirmed_slot(self: Arc<Self>, slot: u64) -> PluginResult<()> {
info!("Confirmed slot: {}", slot);
info!("Upcoming queues: {:#?}", self.upcoming_queues);
info!("Due queues: {:#?}", self.due_queues);

// Use the confirmed slot number to move queue pubkeys from the indexed set of
// upcoming_queues to the combined set of due_queues.
let mut slots_to_delete = vec![];
for unix_timestamp_entry in self.unix_timestamps.iter() {
let slot_number = *unix_timestamp_entry.key();
info!("Unix ts entry for slot {}", slot_number);
if slot > slot_number {
let unix_timestamp = unix_timestamp_entry.value().clone();
match self.upcoming_queues.get(&unix_timestamp) {
Some(queue_pubkeys) => {
info!(
"Pushing queue pubkeys into due set for ts {}",
unix_timestamp
);
for queue_pubkey in queue_pubkeys.value().iter() {
self.due_queues.insert(queue_pubkey.clone());
fn handle_confirmed_slot(self: Arc<Self>, confirmed_slot: u64) -> PluginResult<()> {
info!("Confirmed slot: {}", confirmed_slot);

// Look for the latest confirmed sysvar unix timestamp
let mut confirmed_unix_timestamp = None;
self.timestamps.retain(|slot, unix_timestamp| {
if *slot == confirmed_slot {
confirmed_unix_timestamp = Some(unix_timestamp.clone());
return true;
}
*slot > confirmed_slot
});

// Move all pending queues that are due to the set of actionable queues.
// TODO By maintaining a sorted list of the pending_queue's keys,
// this operation can possibly be made much cheaper. By iterating
// through the sorted list up to the confirmed unix timestamp, we
// save compute cycles by not iterating over future exec_at timestamps.
// However before doing this, consider if retain() can be processed in parallel...
match confirmed_unix_timestamp {
Some(confirmed_unix_timestamp) => {
self.pending_queues.retain(|exec_at, queue_pubkeys| {
if *exec_at <= confirmed_unix_timestamp {
for queue_pubkey in queue_pubkeys.iter() {
self.actionable_queues.insert(queue_pubkey.clone());
}
return false;
}
None => (),
}
self.upcoming_queues.remove(&unix_timestamp);
slots_to_delete.push(slot_number);
true
});
}
}

// Delete all older slots and unix timestamps
for slot in slots_to_delete {
self.unix_timestamps.remove(&slot);
None => (),
}

// Process queues
self.clone()
.spawn(|this| async move { this.process_due_queues() })?;
.spawn(|this| async move { this.process_actionable_queues(confirmed_slot) })?;

// Confirm signatures
self.clone()
.spawn(|this| async move { this.confirm_signatures(confirmed_slot) })?;

Ok(())
}

fn handle_clock_update(self: Arc<Self>, clock: Clock) -> PluginResult<()> {
self.unix_timestamps
.insert(clock.slot, clock.unix_timestamp);
self.timestamps.insert(clock.slot, clock.unix_timestamp);
Ok(())
}

Expand All @@ -203,7 +215,7 @@ impl Inner {
) -> PluginResult<()> {
match queue.exec_at {
Some(exec_at) => {
self.upcoming_queues
self.pending_queues
.entry(exec_at)
.and_modify(|v| {
v.insert(queue_pubkey);
Expand All @@ -219,17 +231,28 @@ impl Inner {
Ok(())
}

fn process_due_queues(self: Arc<Self>) -> PluginResult<()> {
let due_queues = self.due_queues.iter();
for queue_pubkey_ref in due_queues {
let queue_pubkey = queue_pubkey_ref.clone();
fn process_actionable_queues(self: Arc<Self>, confirmed_slot: u64) -> PluginResult<()> {
// Error early if the node is not healthy
self.client.get_health().map_err(|_| {
info!("Node is not healthy");
return GeyserPluginError::Custom("Node is not healthy".into());
})?;

// Async process each queue
let actionable_queues = self.actionable_queues.iter();
for queue_pubkey_ref in actionable_queues {
let queue_pubkey = *queue_pubkey_ref.key();
self.clone()
.spawn(|this| async move { this.process_queue(queue_pubkey) })?;
.spawn(|this| async move { this.process_queue(queue_pubkey, confirmed_slot) })?;
}
Ok(())
}

fn process_queue(self: Arc<Self>, queue_pubkey: Pubkey) -> PluginResult<()> {
fn process_queue(
self: Arc<Self>,
queue_pubkey: Pubkey,
confirmed_slot: u64,
) -> PluginResult<()> {
info!("Processing queue {}", queue_pubkey);

// Get the queue
Expand All @@ -248,11 +271,11 @@ impl Inner {

// Build an ix for each task
for i in 0..queue.task_count {
// Get the action account
// Get the task account
let task_pubkey = Task::pda(queue_pubkey, i).0;
let task = self.client.get::<Task>(&task_pubkey).unwrap();

// Build ix
// Build task_exec ix
let mut task_exec_ix = cronos_sdk::scheduler::instruction::task_exec(
delegate_pubkey,
queue.manager,
Expand Down Expand Up @@ -289,22 +312,74 @@ impl Inner {
}
}

// Add to the list
// Add ix to the list
ixs.push(task_exec_ix)
}

// Pack all ixs into a single tx
match self.client.send(ixs.as_slice(), &[self.client.payer()]) {
Ok(signature) => {
info!("✅ {}", signature);
self.actionable_queues.remove(&queue_pubkey);
self.signatures
.insert(signature, (queue_pubkey, confirmed_slot));
}
Err(err) => {
info!("❌ {:#?}", err);
self.actionable_queues.remove(&queue_pubkey);
}
}

Ok(())
}

fn confirm_signatures(self: Arc<Self>, confirmed_slot: u64) -> PluginResult<()> {
for signature_ref in self.signatures.iter() {
let signature = *signature_ref.key();
let tuple = signature_ref.value();
let queue_pubkey = tuple.0;
let attempted_slot = tuple.1;
self.clone().spawn(|this| async move {
this.confirm_signature(attempted_slot, confirmed_slot, queue_pubkey, signature)
})?
}
Ok(())
}

fn confirm_signature(
self: Arc<Self>,
attempted_slot: u64,
confirmed_slot: u64,
queue_pubkey: Pubkey,
signature: Signature,
) -> PluginResult<()> {
match self
.client
.sign_and_submit(ixs.as_slice(), &[self.client.payer()])
.get_signature_status(&signature)
.map_err(|_| GeyserPluginError::Custom("Failed to get confirmation status".into()))?
{
Ok(sig) => {
info!("✅ {}", sig);
self.due_queues.remove(&queue_pubkey);
Some(res) => {
match res {
Ok(()) => {
// This signature doesn't need to be checked again
self.signatures.remove(&signature);
}
Err(err) => {
// TODO Check the error. Should this request be retried?
info!("Transaction {} failed with error {}", signature, err);
}
}
}
None => {
// If many slots have passed since the tx was sent, then assume failure
// and move the pubkey back into the set of actionable queues.
let timeout_threshold = 150;
if confirmed_slot > attempted_slot + timeout_threshold {
self.signatures.remove(&signature);
self.actionable_queues.insert(queue_pubkey);
}
}
Err(err) => info!("❌ {:#?}", err),
}

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions programs/scheduler/src/state/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl TaskAccount for Account<'_, Task> {
.delegate_fee
.checked_add(delegate_reimbursement)
.unwrap();
**queue.to_account_info().try_borrow_mut_lamports()? = queue
**manager.to_account_info().try_borrow_mut_lamports()? = manager
.to_account_info()
.lamports()
.checked_sub(total_delegate_fee)
Expand All @@ -217,7 +217,7 @@ impl TaskAccount for Account<'_, Task> {
.unwrap();

// Pay program fees
**queue.to_account_info().try_borrow_mut_lamports()? = queue
**manager.to_account_info().try_borrow_mut_lamports()? = manager
.to_account_info()
.lamports()
.checked_sub(config.program_fee)
Expand Down
8 changes: 7 additions & 1 deletion sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Client {
Ok(signature)
}

pub fn sign_and_submit<T: Signers>(
pub fn send_and_confirm<T: Signers>(
&self,
ixs: &[Instruction],
signers: &T,
Expand All @@ -86,6 +86,12 @@ impl Client {
tx.sign(signers, self.latest_blockhash()?);
Ok(self.send_and_confirm_transaction(&tx)?)
}

pub fn send<T: Signers>(&self, ixs: &[Instruction], signers: &T) -> ClientResult<Signature> {
let mut tx = Transaction::new_with_payer(ixs, Some(&self.payer_pubkey()));
tx.sign(signers, self.latest_blockhash()?);
Ok(self.send_transaction(&tx)?)
}
}

impl Debug for Client {
Expand Down
Loading

0 comments on commit 3ceeba5

Please sign in to comment.