diff --git a/.gitignore b/.gitignore index 9dba716fa..dfde1f0fa 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ docker-target **/tsconfig.tsbuildinfo *.diff **/keypair.json +**/*.log \ No newline at end of file diff --git a/cli/src/processor/config.rs b/cli/src/processor/config.rs index 5c404b319..1017c6977 100644 --- a/cli/src/processor/config.rs +++ b/cli/src/processor/config.rs @@ -76,7 +76,7 @@ pub fn set( settings, ); - client.sign_and_submit(&[ix], &[client.payer()]).unwrap(); + client.send_and_confirm(&[ix], &[client.payer()]).unwrap(); Ok(()) } diff --git a/cli/src/processor/initialize.rs b/cli/src/processor/initialize.rs index 9af9fbbbf..718ce200f 100644 --- a/cli/src/processor/initialize.rs +++ b/cli/src/processor/initialize.rs @@ -22,7 +22,7 @@ pub fn initialize(client: &Client, mint: Pubkey) -> Result<(), CliError> { // Submit tx client - .sign_and_submit(&[ix_a, ix_b, ix_c, ix_d, ix_e, ix_f], &[client.payer()]) + .send_and_confirm(&[ix_a, ix_b, ix_c, ix_d, ix_e, ix_f], &[client.payer()]) .unwrap(); Ok(()) } diff --git a/cli/src/processor/manager.rs b/cli/src/processor/manager.rs index f3b4a33ae..b034c77ec 100644 --- a/cli/src/processor/manager.rs +++ b/cli/src/processor/manager.rs @@ -7,7 +7,7 @@ pub fn create(client: &Client) -> Result<(), CliError> { let authority = client.payer_pubkey(); let manager_pubkey = cronos_sdk::scheduler::state::Manager::pda(authority).0; let ix = cronos_sdk::scheduler::instruction::manager_new(authority, authority, manager_pubkey); - client.sign_and_submit(&[ix], &[client.payer()]).unwrap(); + client.send_and_confirm(&[ix], &[client.payer()]).unwrap(); get(client, &manager_pubkey) } diff --git a/cli/src/processor/node.rs b/cli/src/processor/node.rs index 159f9aa69..410a44fe3 100644 --- a/cli/src/processor/node.rs +++ b/cli/src/processor/node.rs @@ -70,7 +70,7 @@ pub fn register(client: &Client, delegate: Keypair) -> Result<(), CliError> { snapshot_queue_pubkey, snapshot_task_pubkey, ); - client.sign_and_submit(&[ix], &[owner, &delegate]).unwrap(); + client.send_and_confirm(&[ix], &[owner, &delegate]).unwrap(); get(client, &node_pubkey) } @@ -95,6 +95,6 @@ pub fn stake(client: &Client, amount: u64, delegate: Pubkey) -> Result<(), CliEr signer.pubkey(), ); - client.sign_and_submit(&[ix], &[client.payer()]).unwrap(); + client.send_and_confirm(&[ix], &[client.payer()]).unwrap(); get(client, &node_pubkey) } diff --git a/cli/src/processor/queue.rs b/cli/src/processor/queue.rs index c6d500f33..dee30bea8 100644 --- a/cli/src/processor/queue.rs +++ b/cli/src/processor/queue.rs @@ -29,7 +29,7 @@ pub fn create(client: &Client, schedule: String) -> Result<(), CliError> { // Sign and submit client - .sign_and_submit(&[queue_ix], &[client.payer()]) + .send_and_confirm(&[queue_ix], &[client.payer()]) .unwrap(); get(client, &queue_pubkey) } diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 181a47af6..b5ab35522 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -15,6 +15,7 @@ use { instruction::{AccountMeta, Instruction}, pubkey::Pubkey, }, + solana_sdk::signature::Signature, std::{collections::HashSet, fmt::Debug, sync::Arc}, tokio::runtime::{Builder, Runtime}, }; @@ -51,9 +52,10 @@ impl GeyserPlugin for CronosPlugin { .max_blocking_threads(10) // TODO add to config .build() .unwrap(), - due_queues: DashSet::new(), - upcoming_queues: DashMap::new(), - unix_timestamps: DashMap::new(), + actionable_queues: DashSet::new(), + pending_queues: DashMap::new(), + signatures: DashMap::new(), + timestamps: DashMap::new(), })); Ok(()) } @@ -66,7 +68,7 @@ impl GeyserPlugin for CronosPlugin { &mut self, account: ReplicaAccountInfoVersions, _slot: u64, - is_startup: bool, + _is_startup: bool, ) -> PluginResult<()> { let account_info = match account { ReplicaAccountInfoVersions::V0_0_1(account_info) => account_info.clone(), @@ -80,10 +82,7 @@ impl GeyserPlugin for CronosPlugin { match account_update { CronosAccountUpdate::Clock { clock } => this.handle_clock_update(clock), CronosAccountUpdate::Queue { queue } => { - info!( - "Queue {:#?} updated. Is startup: {}", - account_pubkey, is_startup - ); + info!("Caching queue {:#?}", account_pubkey); this.handle_queue_update(queue, account_pubkey) } } @@ -142,57 +141,70 @@ impl GeyserPlugin for CronosPlugin { pub struct Inner { pub client: Client, pub runtime: Runtime, - pub due_queues: DashSet, - pub upcoming_queues: DashMap>, - pub unix_timestamps: DashMap, + + // The set of queues that can be processed + pub actionable_queues: DashSet, + + // Map from exec_at timestamps to the list of queues scheduled + // for that moment. + pub pending_queues: DashMap>, + + // Map from tx signatures to a (queue pubkey, slot) tuple. The slot + // represents the latest confirmed slot at the time the tx was sent. + pub signatures: DashMap, + + // Map from slot numbers to sysvar clock unix_timestamps + pub timestamps: DashMap, } impl Inner { - fn handle_confirmed_slot(self: Arc, slot: u64) -> PluginResult<()> { - info!("Confirmed slot: {}", slot); - info!("Upcoming queues: {:#?}", self.upcoming_queues); - info!("Due queues: {:#?}", self.due_queues); - - // Use the confirmed slot number to move queue pubkeys from the indexed set of - // upcoming_queues to the combined set of due_queues. - let mut slots_to_delete = vec![]; - for unix_timestamp_entry in self.unix_timestamps.iter() { - let slot_number = *unix_timestamp_entry.key(); - info!("Unix ts entry for slot {}", slot_number); - if slot > slot_number { - let unix_timestamp = unix_timestamp_entry.value().clone(); - match self.upcoming_queues.get(&unix_timestamp) { - Some(queue_pubkeys) => { - info!( - "Pushing queue pubkeys into due set for ts {}", - unix_timestamp - ); - for queue_pubkey in queue_pubkeys.value().iter() { - self.due_queues.insert(queue_pubkey.clone()); + fn handle_confirmed_slot(self: Arc, confirmed_slot: u64) -> PluginResult<()> { + info!("Confirmed slot: {}", confirmed_slot); + + // Look for the latest confirmed sysvar unix timestamp + let mut confirmed_unix_timestamp = None; + self.timestamps.retain(|slot, unix_timestamp| { + if *slot == confirmed_slot { + confirmed_unix_timestamp = Some(unix_timestamp.clone()); + return true; + } + *slot > confirmed_slot + }); + + // Move all pending queues that are due to the set of actionable queues. + // TODO By maintaining a sorted list of the pending_queue's keys, + // this operation can possibly be made much cheaper. By iterating + // through the sorted list up to the confirmed unix timestamp, we + // save compute cycles by not iterating over future exec_at timestamps. + // However before doing this, consider if retain() can be processed in parallel... + match confirmed_unix_timestamp { + Some(confirmed_unix_timestamp) => { + self.pending_queues.retain(|exec_at, queue_pubkeys| { + if *exec_at <= confirmed_unix_timestamp { + for queue_pubkey in queue_pubkeys.iter() { + self.actionable_queues.insert(queue_pubkey.clone()); } + return false; } - None => (), - } - self.upcoming_queues.remove(&unix_timestamp); - slots_to_delete.push(slot_number); + true + }); } - } - - // Delete all older slots and unix timestamps - for slot in slots_to_delete { - self.unix_timestamps.remove(&slot); + None => (), } // Process queues self.clone() - .spawn(|this| async move { this.process_due_queues() })?; + .spawn(|this| async move { this.process_actionable_queues(confirmed_slot) })?; + + // Confirm signatures + self.clone() + .spawn(|this| async move { this.confirm_signatures(confirmed_slot) })?; Ok(()) } fn handle_clock_update(self: Arc, clock: Clock) -> PluginResult<()> { - self.unix_timestamps - .insert(clock.slot, clock.unix_timestamp); + self.timestamps.insert(clock.slot, clock.unix_timestamp); Ok(()) } @@ -203,7 +215,7 @@ impl Inner { ) -> PluginResult<()> { match queue.exec_at { Some(exec_at) => { - self.upcoming_queues + self.pending_queues .entry(exec_at) .and_modify(|v| { v.insert(queue_pubkey); @@ -219,17 +231,28 @@ impl Inner { Ok(()) } - fn process_due_queues(self: Arc) -> PluginResult<()> { - let due_queues = self.due_queues.iter(); - for queue_pubkey_ref in due_queues { - let queue_pubkey = queue_pubkey_ref.clone(); + fn process_actionable_queues(self: Arc, confirmed_slot: u64) -> PluginResult<()> { + // Error early if the node is not healthy + self.client.get_health().map_err(|_| { + info!("Node is not healthy"); + return GeyserPluginError::Custom("Node is not healthy".into()); + })?; + + // Async process each queue + let actionable_queues = self.actionable_queues.iter(); + for queue_pubkey_ref in actionable_queues { + let queue_pubkey = *queue_pubkey_ref.key(); self.clone() - .spawn(|this| async move { this.process_queue(queue_pubkey) })?; + .spawn(|this| async move { this.process_queue(queue_pubkey, confirmed_slot) })?; } Ok(()) } - fn process_queue(self: Arc, queue_pubkey: Pubkey) -> PluginResult<()> { + fn process_queue( + self: Arc, + queue_pubkey: Pubkey, + confirmed_slot: u64, + ) -> PluginResult<()> { info!("Processing queue {}", queue_pubkey); // Get the queue @@ -248,11 +271,11 @@ impl Inner { // Build an ix for each task for i in 0..queue.task_count { - // Get the action account + // Get the task account let task_pubkey = Task::pda(queue_pubkey, i).0; let task = self.client.get::(&task_pubkey).unwrap(); - // Build ix + // Build task_exec ix let mut task_exec_ix = cronos_sdk::scheduler::instruction::task_exec( delegate_pubkey, queue.manager, @@ -289,22 +312,74 @@ impl Inner { } } - // Add to the list + // Add ix to the list ixs.push(task_exec_ix) } // Pack all ixs into a single tx + match self.client.send(ixs.as_slice(), &[self.client.payer()]) { + Ok(signature) => { + info!("✅ {}", signature); + self.actionable_queues.remove(&queue_pubkey); + self.signatures + .insert(signature, (queue_pubkey, confirmed_slot)); + } + Err(err) => { + info!("❌ {:#?}", err); + self.actionable_queues.remove(&queue_pubkey); + } + } + + Ok(()) + } + + fn confirm_signatures(self: Arc, confirmed_slot: u64) -> PluginResult<()> { + for signature_ref in self.signatures.iter() { + let signature = *signature_ref.key(); + let tuple = signature_ref.value(); + let queue_pubkey = tuple.0; + let attempted_slot = tuple.1; + self.clone().spawn(|this| async move { + this.confirm_signature(attempted_slot, confirmed_slot, queue_pubkey, signature) + })? + } + Ok(()) + } + + fn confirm_signature( + self: Arc, + attempted_slot: u64, + confirmed_slot: u64, + queue_pubkey: Pubkey, + signature: Signature, + ) -> PluginResult<()> { match self .client - .sign_and_submit(ixs.as_slice(), &[self.client.payer()]) + .get_signature_status(&signature) + .map_err(|_| GeyserPluginError::Custom("Failed to get confirmation status".into()))? { - Ok(sig) => { - info!("✅ {}", sig); - self.due_queues.remove(&queue_pubkey); + Some(res) => { + match res { + Ok(()) => { + // This signature doesn't need to be checked again + self.signatures.remove(&signature); + } + Err(err) => { + // TODO Check the error. Should this request be retried? + info!("Transaction {} failed with error {}", signature, err); + } + } + } + None => { + // If many slots have passed since the tx was sent, then assume failure + // and move the pubkey back into the set of actionable queues. + let timeout_threshold = 150; + if confirmed_slot > attempted_slot + timeout_threshold { + self.signatures.remove(&signature); + self.actionable_queues.insert(queue_pubkey); + } } - Err(err) => info!("❌ {:#?}", err), } - Ok(()) } diff --git a/programs/scheduler/src/state/task.rs b/programs/scheduler/src/state/task.rs index 162c7fd05..c9bafd19d 100644 --- a/programs/scheduler/src/state/task.rs +++ b/programs/scheduler/src/state/task.rs @@ -205,7 +205,7 @@ impl TaskAccount for Account<'_, Task> { .delegate_fee .checked_add(delegate_reimbursement) .unwrap(); - **queue.to_account_info().try_borrow_mut_lamports()? = queue + **manager.to_account_info().try_borrow_mut_lamports()? = manager .to_account_info() .lamports() .checked_sub(total_delegate_fee) @@ -217,7 +217,7 @@ impl TaskAccount for Account<'_, Task> { .unwrap(); // Pay program fees - **queue.to_account_info().try_borrow_mut_lamports()? = queue + **manager.to_account_info().try_borrow_mut_lamports()? = manager .to_account_info() .lamports() .checked_sub(config.program_fee) diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 1c1463882..74395b606 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -77,7 +77,7 @@ impl Client { Ok(signature) } - pub fn sign_and_submit( + pub fn send_and_confirm( &self, ixs: &[Instruction], signers: &T, @@ -86,6 +86,12 @@ impl Client { tx.sign(signers, self.latest_blockhash()?); Ok(self.send_and_confirm_transaction(&tx)?) } + + pub fn send(&self, ixs: &[Instruction], signers: &T) -> ClientResult { + let mut tx = Transaction::new_with_payer(ixs, Some(&self.payer_pubkey())); + tx.sign(signers, self.latest_blockhash()?); + Ok(self.send_transaction(&tx)?) + } } impl Debug for Client { diff --git a/stress/src/processor/benchmark.rs b/stress/src/processor/benchmark.rs index 88d420f9b..ef7a5f0b6 100644 --- a/stress/src/processor/benchmark.rs +++ b/stress/src/processor/benchmark.rs @@ -39,7 +39,7 @@ pub fn run(count: u32, parallelism: f32, recurrence: u32) -> Result<(), CliError client .airdrop(&authority.pubkey(), LAMPORTS_PER_SOL) .unwrap(); - client.sign_and_submit(&[ix], &[authority]).unwrap(); + client.send_and_confirm(&[ix], &[authority]).unwrap(); // TODO Schedule tasks asynchronously @@ -47,7 +47,9 @@ pub fn run(count: u32, parallelism: f32, recurrence: u32) -> Result<(), CliError for i in 0..num_tasks_parallel { let ix_a = create_queue_ix(&authority, recurrence, &mut expected_exec_ats, i.into()); let ix_b = create_task_ix(&authority, i.into(), 0); - client.sign_and_submit(&[ix_a, ix_b], &[authority]).unwrap(); + client + .send_and_confirm(&[ix_a, ix_b], &[authority]) + .unwrap(); } // Create a queue for the serial tasks @@ -68,7 +70,7 @@ pub fn run(count: u32, parallelism: f32, recurrence: u32) -> Result<(), CliError i.into(), )); } - client.sign_and_submit(ixs, &[authority]).unwrap(); + client.send_and_confirm(ixs, &[authority]).unwrap(); } // Collect and report test results