From a62737573f7842815ba093a32d2ea207788a809c Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 08:41:28 -0500 Subject: [PATCH 01/13] Implement new task execution logic --- .gitignore | 1 + plugin/src/plugin.rs | 86 ++++++++++++++++++++++---------------------- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index 9dba716fa..dfde1f0fa 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ docker-target **/tsconfig.tsbuildinfo *.diff **/keypair.json +**/*.log \ No newline at end of file diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 181a47af6..936b529a5 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -51,9 +51,9 @@ impl GeyserPlugin for CronosPlugin { .max_blocking_threads(10) // TODO add to config .build() .unwrap(), - due_queues: DashSet::new(), - upcoming_queues: DashMap::new(), - unix_timestamps: DashMap::new(), + actionable_queues: DashSet::new(), + pending_queues: DashMap::new(), + timestamps: DashMap::new(), })); Ok(()) } @@ -81,7 +81,7 @@ impl GeyserPlugin for CronosPlugin { CronosAccountUpdate::Clock { clock } => this.handle_clock_update(clock), CronosAccountUpdate::Queue { queue } => { info!( - "Queue {:#?} updated. Is startup: {}", + "Caching queue {:#?}. Is startup: {}", account_pubkey, is_startup ); this.handle_queue_update(queue, account_pubkey) @@ -142,57 +142,57 @@ impl GeyserPlugin for CronosPlugin { pub struct Inner { pub client: Client, pub runtime: Runtime, - pub due_queues: DashSet, - pub upcoming_queues: DashMap>, - pub unix_timestamps: DashMap, + pub actionable_queues: DashSet, // The set of queues that can be processed + pub pending_queues: DashMap>, // Map of exec_at timestamps to the list of queues actionable at that moment + pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps } impl Inner { fn handle_confirmed_slot(self: Arc, slot: u64) -> PluginResult<()> { info!("Confirmed slot: {}", slot); - info!("Upcoming queues: {:#?}", self.upcoming_queues); - info!("Due queues: {:#?}", self.due_queues); - - // Use the confirmed slot number to move queue pubkeys from the indexed set of - // upcoming_queues to the combined set of due_queues. - let mut slots_to_delete = vec![]; - for unix_timestamp_entry in self.unix_timestamps.iter() { - let slot_number = *unix_timestamp_entry.key(); - info!("Unix ts entry for slot {}", slot_number); - if slot > slot_number { - let unix_timestamp = unix_timestamp_entry.value().clone(); - match self.upcoming_queues.get(&unix_timestamp) { - Some(queue_pubkeys) => { - info!( - "Pushing queue pubkeys into due set for ts {}", - unix_timestamp - ); - for queue_pubkey in queue_pubkeys.value().iter() { - self.due_queues.insert(queue_pubkey.clone()); + info!("Upcoming queues: {:#?}", self.pending_queues); + info!("Due queues: {:#?}", self.actionable_queues); + + // Look for the latest confirmed sysvar unix timestamp + let mut confirmed_unix_timestamp = None; + self.timestamps.retain(|slot_i, unix_timestamp_i| { + if *slot_i == slot { + confirmed_unix_timestamp = Some(unix_timestamp_i.clone()); + return true; + } + *slot_i > slot + }); + + // Move all pending queues that are due to the set of actionable queues. + // TODO By maintaining a sorted list of the pending_queue's keys, + // this operation can possibly be made much cheaper. By iterating + // through the sorted list up to the confirmed unix timestamp, we + // save compute cycles by not iterating over future exec_at timestamps. + // However before doing this, consider if retain() can be processed in parallel... + match confirmed_unix_timestamp { + Some(confirmed_unix_timestamp) => { + self.pending_queues.retain(|exec_at_i, queue_pubkeys_i| { + if *exec_at_i <= confirmed_unix_timestamp { + for queue_pubkey in queue_pubkeys_i.iter() { + self.actionable_queues.insert(queue_pubkey.clone()); } + return false; } - None => (), - } - self.upcoming_queues.remove(&unix_timestamp); - slots_to_delete.push(slot_number); + true + }); } - } - - // Delete all older slots and unix timestamps - for slot in slots_to_delete { - self.unix_timestamps.remove(&slot); + None => (), } // Process queues self.clone() - .spawn(|this| async move { this.process_due_queues() })?; + .spawn(|this| async move { this.process_actionable_queues() })?; Ok(()) } fn handle_clock_update(self: Arc, clock: Clock) -> PluginResult<()> { - self.unix_timestamps - .insert(clock.slot, clock.unix_timestamp); + self.timestamps.insert(clock.slot, clock.unix_timestamp); Ok(()) } @@ -203,7 +203,7 @@ impl Inner { ) -> PluginResult<()> { match queue.exec_at { Some(exec_at) => { - self.upcoming_queues + self.pending_queues .entry(exec_at) .and_modify(|v| { v.insert(queue_pubkey); @@ -219,9 +219,9 @@ impl Inner { Ok(()) } - fn process_due_queues(self: Arc) -> PluginResult<()> { - let due_queues = self.due_queues.iter(); - for queue_pubkey_ref in due_queues { + fn process_actionable_queues(self: Arc) -> PluginResult<()> { + let actionable_queues = self.actionable_queues.iter(); + for queue_pubkey_ref in actionable_queues { let queue_pubkey = queue_pubkey_ref.clone(); self.clone() .spawn(|this| async move { this.process_queue(queue_pubkey) })?; @@ -300,7 +300,7 @@ impl Inner { { Ok(sig) => { info!("✅ {}", sig); - self.due_queues.remove(&queue_pubkey); + self.actionable_queues.remove(&queue_pubkey); } Err(err) => info!("❌ {:#?}", err), } From 028d57fdfd682f22189888275e03a741c611b632 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 09:17:03 -0500 Subject: [PATCH 02/13] Error out early if the node is not healthy --- plugin/src/plugin.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 936b529a5..5d5aa3926 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -220,6 +220,13 @@ impl Inner { } fn process_actionable_queues(self: Arc) -> PluginResult<()> { + // Error early if the node is not healthy + self.client.get_health().map_err(|_| { + info!("Node is not healthy"); + return GeyserPluginError::Custom("Node is not healthy".into()); + })?; + + // Async process each queue let actionable_queues = self.actionable_queues.iter(); for queue_pubkey_ref in actionable_queues { let queue_pubkey = queue_pubkey_ref.clone(); From 5222153c05f56e79d0522a027d532a1388a79c25 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 09:44:34 -0500 Subject: [PATCH 03/13] Implement rudimentary locking mechanism to prevent spamming identical tx requests per queue --- plugin/src/plugin.rs | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 5d5aa3926..6e0951e1c 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -51,7 +51,7 @@ impl GeyserPlugin for CronosPlugin { .max_blocking_threads(10) // TODO add to config .build() .unwrap(), - actionable_queues: DashSet::new(), + actionable_queues: DashMap::new(), pending_queues: DashMap::new(), timestamps: DashMap::new(), })); @@ -142,7 +142,7 @@ impl GeyserPlugin for CronosPlugin { pub struct Inner { pub client: Client, pub runtime: Runtime, - pub actionable_queues: DashSet, // The set of queues that can be processed + pub actionable_queues: DashMap, // The set of queues that can be processed pub pending_queues: DashMap>, // Map of exec_at timestamps to the list of queues actionable at that moment pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps } @@ -174,7 +174,7 @@ impl Inner { self.pending_queues.retain(|exec_at_i, queue_pubkeys_i| { if *exec_at_i <= confirmed_unix_timestamp { for queue_pubkey in queue_pubkeys_i.iter() { - self.actionable_queues.insert(queue_pubkey.clone()); + self.actionable_queues.insert(queue_pubkey.clone(), false); } return false; } @@ -229,7 +229,7 @@ impl Inner { // Async process each queue let actionable_queues = self.actionable_queues.iter(); for queue_pubkey_ref in actionable_queues { - let queue_pubkey = queue_pubkey_ref.clone(); + let queue_pubkey = *queue_pubkey_ref.key(); self.clone() .spawn(|this| async move { this.process_queue(queue_pubkey) })?; } @@ -239,6 +239,18 @@ impl Inner { fn process_queue(self: Arc, queue_pubkey: Pubkey) -> PluginResult<()> { info!("Processing queue {}", queue_pubkey); + // Check basic locking mechanism to prevent + match self.actionable_queues.get(&queue_pubkey) { + Some(lock) => { + if *lock.value() { + return Ok(()); + } else { + self.actionable_queues.insert(queue_pubkey, true); + } + } + None => return Ok(()), + } + // Get the queue let queue = self.client.get::(&queue_pubkey).unwrap(); @@ -309,7 +321,12 @@ impl Inner { info!("✅ {}", sig); self.actionable_queues.remove(&queue_pubkey); } - Err(err) => info!("❌ {:#?}", err), + Err(err) => { + info!("❌ {:#?}", err); + self.actionable_queues.insert(queue_pubkey, false); + + // TODO Track failed attempts and purge queue if it fails too many times + } } Ok(()) From fc9b0810220c4e227892f4ed6856483479ea10a7 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 11:04:00 -0500 Subject: [PATCH 04/13] Add debug log statements --- plugin/src/plugin.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 6e0951e1c..8b36f49ba 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -242,6 +242,7 @@ impl Inner { // Check basic locking mechanism to prevent match self.actionable_queues.get(&queue_pubkey) { Some(lock) => { + info!("Lock value {} {}", *lock.value(), queue_pubkey); if *lock.value() { return Ok(()); } else { @@ -267,11 +268,11 @@ impl Inner { // Build an ix for each task for i in 0..queue.task_count { - // Get the action account + // Get the task account let task_pubkey = Task::pda(queue_pubkey, i).0; let task = self.client.get::(&task_pubkey).unwrap(); - // Build ix + // Build task_exec ix let mut task_exec_ix = cronos_sdk::scheduler::instruction::task_exec( delegate_pubkey, queue.manager, @@ -308,7 +309,7 @@ impl Inner { } } - // Add to the list + // Add ix to the list ixs.push(task_exec_ix) } @@ -325,7 +326,8 @@ impl Inner { info!("❌ {:#?}", err); self.actionable_queues.insert(queue_pubkey, false); - // TODO Track failed attempts and purge queue if it fails too many times + // TODO Track failed attempts and purge the queue if + // it fails too many times. } } From 22ae801f16bc51da08d6c21b36d64c53d42a974b Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 11:10:25 -0500 Subject: [PATCH 05/13] Switch back to Dashset --- plugin/src/plugin.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 8b36f49ba..d15dec447 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -51,7 +51,7 @@ impl GeyserPlugin for CronosPlugin { .max_blocking_threads(10) // TODO add to config .build() .unwrap(), - actionable_queues: DashMap::new(), + actionable_queues: DashSet::new(), pending_queues: DashMap::new(), timestamps: DashMap::new(), })); @@ -142,7 +142,7 @@ impl GeyserPlugin for CronosPlugin { pub struct Inner { pub client: Client, pub runtime: Runtime, - pub actionable_queues: DashMap, // The set of queues that can be processed + pub actionable_queues: DashSet, // The set of queues that can be processed pub pending_queues: DashMap>, // Map of exec_at timestamps to the list of queues actionable at that moment pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps } @@ -174,7 +174,7 @@ impl Inner { self.pending_queues.retain(|exec_at_i, queue_pubkeys_i| { if *exec_at_i <= confirmed_unix_timestamp { for queue_pubkey in queue_pubkeys_i.iter() { - self.actionable_queues.insert(queue_pubkey.clone(), false); + self.actionable_queues.insert(queue_pubkey.clone()); } return false; } @@ -240,17 +240,17 @@ impl Inner { info!("Processing queue {}", queue_pubkey); // Check basic locking mechanism to prevent - match self.actionable_queues.get(&queue_pubkey) { - Some(lock) => { - info!("Lock value {} {}", *lock.value(), queue_pubkey); - if *lock.value() { - return Ok(()); - } else { - self.actionable_queues.insert(queue_pubkey, true); - } - } - None => return Ok(()), - } + // match self.actionable_queues.get(&queue_pubkey) { + // Some(lock) => { + // info!("Lock value {} {}", *lock.value(), queue_pubkey); + // if *lock.value() { + // return Ok(()); + // } else { + // self.actionable_queues.insert(queue_pubkey, true); + // } + // } + // None => return Ok(()), + // } // Get the queue let queue = self.client.get::(&queue_pubkey).unwrap(); @@ -324,8 +324,7 @@ impl Inner { } Err(err) => { info!("❌ {:#?}", err); - self.actionable_queues.insert(queue_pubkey, false); - + // self.actionable_queues.insert(queue_pubkey, false); // TODO Track failed attempts and purge the queue if // it fails too many times. } From b86ac87306b04457da9d1790386881e547d58f14 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 11:20:39 -0500 Subject: [PATCH 06/13] Send txs without confirming to unblock queue processing --- cli/src/processor/config.rs | 2 +- cli/src/processor/initialize.rs | 2 +- cli/src/processor/manager.rs | 2 +- cli/src/processor/node.rs | 4 ++-- cli/src/processor/queue.rs | 2 +- plugin/src/plugin.rs | 18 +++++++++++------- sdk/src/client.rs | 8 +++++++- stress/src/processor/benchmark.rs | 8 +++++--- 8 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cli/src/processor/config.rs b/cli/src/processor/config.rs index 5c404b319..1017c6977 100644 --- a/cli/src/processor/config.rs +++ b/cli/src/processor/config.rs @@ -76,7 +76,7 @@ pub fn set( settings, ); - client.sign_and_submit(&[ix], &[client.payer()]).unwrap(); + client.send_and_confirm(&[ix], &[client.payer()]).unwrap(); Ok(()) } diff --git a/cli/src/processor/initialize.rs b/cli/src/processor/initialize.rs index 9af9fbbbf..718ce200f 100644 --- a/cli/src/processor/initialize.rs +++ b/cli/src/processor/initialize.rs @@ -22,7 +22,7 @@ pub fn initialize(client: &Client, mint: Pubkey) -> Result<(), CliError> { // Submit tx client - .sign_and_submit(&[ix_a, ix_b, ix_c, ix_d, ix_e, ix_f], &[client.payer()]) + .send_and_confirm(&[ix_a, ix_b, ix_c, ix_d, ix_e, ix_f], &[client.payer()]) .unwrap(); Ok(()) } diff --git a/cli/src/processor/manager.rs b/cli/src/processor/manager.rs index f3b4a33ae..b034c77ec 100644 --- a/cli/src/processor/manager.rs +++ b/cli/src/processor/manager.rs @@ -7,7 +7,7 @@ pub fn create(client: &Client) -> Result<(), CliError> { let authority = client.payer_pubkey(); let manager_pubkey = cronos_sdk::scheduler::state::Manager::pda(authority).0; let ix = cronos_sdk::scheduler::instruction::manager_new(authority, authority, manager_pubkey); - client.sign_and_submit(&[ix], &[client.payer()]).unwrap(); + client.send_and_confirm(&[ix], &[client.payer()]).unwrap(); get(client, &manager_pubkey) } diff --git a/cli/src/processor/node.rs b/cli/src/processor/node.rs index 159f9aa69..410a44fe3 100644 --- a/cli/src/processor/node.rs +++ b/cli/src/processor/node.rs @@ -70,7 +70,7 @@ pub fn register(client: &Client, delegate: Keypair) -> Result<(), CliError> { snapshot_queue_pubkey, snapshot_task_pubkey, ); - client.sign_and_submit(&[ix], &[owner, &delegate]).unwrap(); + client.send_and_confirm(&[ix], &[owner, &delegate]).unwrap(); get(client, &node_pubkey) } @@ -95,6 +95,6 @@ pub fn stake(client: &Client, amount: u64, delegate: Pubkey) -> Result<(), CliEr signer.pubkey(), ); - client.sign_and_submit(&[ix], &[client.payer()]).unwrap(); + client.send_and_confirm(&[ix], &[client.payer()]).unwrap(); get(client, &node_pubkey) } diff --git a/cli/src/processor/queue.rs b/cli/src/processor/queue.rs index c6d500f33..dee30bea8 100644 --- a/cli/src/processor/queue.rs +++ b/cli/src/processor/queue.rs @@ -29,7 +29,7 @@ pub fn create(client: &Client, schedule: String) -> Result<(), CliError> { // Sign and submit client - .sign_and_submit(&[queue_ix], &[client.payer()]) + .send_and_confirm(&[queue_ix], &[client.payer()]) .unwrap(); get(client, &queue_pubkey) } diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index d15dec447..f89f2e0c6 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -1,3 +1,5 @@ +use solana_sdk::signature::Signature; + use { crate::{config::Config as PluginConfig, filter::CronosAccountUpdate}, cronos_sdk::{ @@ -53,6 +55,7 @@ impl GeyserPlugin for CronosPlugin { .unwrap(), actionable_queues: DashSet::new(), pending_queues: DashMap::new(), + signatures: DashSet::new(), timestamps: DashMap::new(), })); Ok(()) @@ -144,7 +147,8 @@ pub struct Inner { pub runtime: Runtime, pub actionable_queues: DashSet, // The set of queues that can be processed pub pending_queues: DashMap>, // Map of exec_at timestamps to the list of queues actionable at that moment - pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps + pub signatures: DashSet, // The set of unconfirmed transaction signatures + pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps } impl Inner { @@ -188,6 +192,8 @@ impl Inner { self.clone() .spawn(|this| async move { this.process_actionable_queues() })?; + // TODO confirm signatures + Ok(()) } @@ -314,13 +320,11 @@ impl Inner { } // Pack all ixs into a single tx - match self - .client - .sign_and_submit(ixs.as_slice(), &[self.client.payer()]) - { - Ok(sig) => { - info!("✅ {}", sig); + match self.client.send(ixs.as_slice(), &[self.client.payer()]) { + Ok(signature) => { + info!("✅ {}", signature); self.actionable_queues.remove(&queue_pubkey); + self.signatures.insert(signature); } Err(err) => { info!("❌ {:#?}", err); diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 1c1463882..74395b606 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -77,7 +77,7 @@ impl Client { Ok(signature) } - pub fn sign_and_submit( + pub fn send_and_confirm( &self, ixs: &[Instruction], signers: &T, @@ -86,6 +86,12 @@ impl Client { tx.sign(signers, self.latest_blockhash()?); Ok(self.send_and_confirm_transaction(&tx)?) } + + pub fn send(&self, ixs: &[Instruction], signers: &T) -> ClientResult { + let mut tx = Transaction::new_with_payer(ixs, Some(&self.payer_pubkey())); + tx.sign(signers, self.latest_blockhash()?); + Ok(self.send_transaction(&tx)?) + } } impl Debug for Client { diff --git a/stress/src/processor/benchmark.rs b/stress/src/processor/benchmark.rs index 88d420f9b..ef7a5f0b6 100644 --- a/stress/src/processor/benchmark.rs +++ b/stress/src/processor/benchmark.rs @@ -39,7 +39,7 @@ pub fn run(count: u32, parallelism: f32, recurrence: u32) -> Result<(), CliError client .airdrop(&authority.pubkey(), LAMPORTS_PER_SOL) .unwrap(); - client.sign_and_submit(&[ix], &[authority]).unwrap(); + client.send_and_confirm(&[ix], &[authority]).unwrap(); // TODO Schedule tasks asynchronously @@ -47,7 +47,9 @@ pub fn run(count: u32, parallelism: f32, recurrence: u32) -> Result<(), CliError for i in 0..num_tasks_parallel { let ix_a = create_queue_ix(&authority, recurrence, &mut expected_exec_ats, i.into()); let ix_b = create_task_ix(&authority, i.into(), 0); - client.sign_and_submit(&[ix_a, ix_b], &[authority]).unwrap(); + client + .send_and_confirm(&[ix_a, ix_b], &[authority]) + .unwrap(); } // Create a queue for the serial tasks @@ -68,7 +70,7 @@ pub fn run(count: u32, parallelism: f32, recurrence: u32) -> Result<(), CliError i.into(), )); } - client.sign_and_submit(ixs, &[authority]).unwrap(); + client.send_and_confirm(ixs, &[authority]).unwrap(); } // Collect and report test results From cff652ce1e27bb080d70f5bc4721b41b67085dc2 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 11:36:57 -0500 Subject: [PATCH 07/13] Add flow to confirm tx signatures --- plugin/src/plugin.rs | 47 ++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index f89f2e0c6..bb3bba893 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -192,7 +192,9 @@ impl Inner { self.clone() .spawn(|this| async move { this.process_actionable_queues() })?; - // TODO confirm signatures + // Confirm signatures + self.clone() + .spawn(|this| async move { this.confirm_signatures() })?; Ok(()) } @@ -245,19 +247,6 @@ impl Inner { fn process_queue(self: Arc, queue_pubkey: Pubkey) -> PluginResult<()> { info!("Processing queue {}", queue_pubkey); - // Check basic locking mechanism to prevent - // match self.actionable_queues.get(&queue_pubkey) { - // Some(lock) => { - // info!("Lock value {} {}", *lock.value(), queue_pubkey); - // if *lock.value() { - // return Ok(()); - // } else { - // self.actionable_queues.insert(queue_pubkey, true); - // } - // } - // None => return Ok(()), - // } - // Get the queue let queue = self.client.get::(&queue_pubkey).unwrap(); @@ -328,7 +317,7 @@ impl Inner { } Err(err) => { info!("❌ {:#?}", err); - // self.actionable_queues.insert(queue_pubkey, false); + // TODO Track failed attempts and purge the queue if // it fails too many times. } @@ -337,6 +326,34 @@ impl Inner { Ok(()) } + fn confirm_signatures(self: Arc) -> PluginResult<()> { + for signature_ref in self.signatures.iter() { + let signature = signature_ref.clone(); + self.clone() + .spawn(|this| async move { this.confirm_signature(signature) })? + } + Ok(()) + } + + fn confirm_signature(self: Arc, signature: Signature) -> PluginResult<()> { + info!("Confirming signature {}", signature); + match self + .client + .confirm_transaction(&signature) + .map_err(|_| GeyserPluginError::Custom("Failed to get confirmation status".into()))? + { + true => { + // This signature doesn't need to be checked again + self.signatures.remove(&signature); + } + false => { + // Noop for now + } + } + + Ok(()) + } + fn spawn> + Send + 'static>( self: &Arc, f: impl FnOnce(Arc) -> F, From 2a8707a324abd06b3cb69d2c6d05879236d1192c Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 12:51:17 -0500 Subject: [PATCH 08/13] Adjust how transaction signatures are confirmed --- plugin/src/plugin.rs | 103 ++++++++++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 30 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index bb3bba893..6108fc688 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -55,7 +55,7 @@ impl GeyserPlugin for CronosPlugin { .unwrap(), actionable_queues: DashSet::new(), pending_queues: DashMap::new(), - signatures: DashSet::new(), + signatures: DashMap::new(), timestamps: DashMap::new(), })); Ok(()) @@ -146,25 +146,25 @@ pub struct Inner { pub client: Client, pub runtime: Runtime, pub actionable_queues: DashSet, // The set of queues that can be processed - pub pending_queues: DashMap>, // Map of exec_at timestamps to the list of queues actionable at that moment - pub signatures: DashSet, // The set of unconfirmed transaction signatures - pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps + pub pending_queues: DashMap>, // Map from exec_at timestamps to the list of queues actionable at that moment + pub signatures: DashMap, // Map from tx signatures to a (queue pubkey, slot) tuple. The slot represents when the tx was sent. + pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps } impl Inner { - fn handle_confirmed_slot(self: Arc, slot: u64) -> PluginResult<()> { - info!("Confirmed slot: {}", slot); + fn handle_confirmed_slot(self: Arc, confirmed_slot: u64) -> PluginResult<()> { + info!("Confirmed slot: {}", confirmed_slot); info!("Upcoming queues: {:#?}", self.pending_queues); info!("Due queues: {:#?}", self.actionable_queues); // Look for the latest confirmed sysvar unix timestamp let mut confirmed_unix_timestamp = None; - self.timestamps.retain(|slot_i, unix_timestamp_i| { - if *slot_i == slot { - confirmed_unix_timestamp = Some(unix_timestamp_i.clone()); + self.timestamps.retain(|slot, unix_timestamp| { + if *slot == confirmed_slot { + confirmed_unix_timestamp = Some(unix_timestamp.clone()); return true; } - *slot_i > slot + *slot > confirmed_slot }); // Move all pending queues that are due to the set of actionable queues. @@ -175,9 +175,9 @@ impl Inner { // However before doing this, consider if retain() can be processed in parallel... match confirmed_unix_timestamp { Some(confirmed_unix_timestamp) => { - self.pending_queues.retain(|exec_at_i, queue_pubkeys_i| { - if *exec_at_i <= confirmed_unix_timestamp { - for queue_pubkey in queue_pubkeys_i.iter() { + self.pending_queues.retain(|exec_at, queue_pubkeys| { + if *exec_at <= confirmed_unix_timestamp { + for queue_pubkey in queue_pubkeys.iter() { self.actionable_queues.insert(queue_pubkey.clone()); } return false; @@ -190,11 +190,11 @@ impl Inner { // Process queues self.clone() - .spawn(|this| async move { this.process_actionable_queues() })?; + .spawn(|this| async move { this.process_actionable_queues(confirmed_slot) })?; // Confirm signatures self.clone() - .spawn(|this| async move { this.confirm_signatures() })?; + .spawn(|this| async move { this.confirm_signatures(confirmed_slot) })?; Ok(()) } @@ -227,7 +227,7 @@ impl Inner { Ok(()) } - fn process_actionable_queues(self: Arc) -> PluginResult<()> { + fn process_actionable_queues(self: Arc, confirmed_slot: u64) -> PluginResult<()> { // Error early if the node is not healthy self.client.get_health().map_err(|_| { info!("Node is not healthy"); @@ -239,12 +239,16 @@ impl Inner { for queue_pubkey_ref in actionable_queues { let queue_pubkey = *queue_pubkey_ref.key(); self.clone() - .spawn(|this| async move { this.process_queue(queue_pubkey) })?; + .spawn(|this| async move { this.process_queue(queue_pubkey, confirmed_slot) })?; } Ok(()) } - fn process_queue(self: Arc, queue_pubkey: Pubkey) -> PluginResult<()> { + fn process_queue( + self: Arc, + queue_pubkey: Pubkey, + confirmed_slot: u64, + ) -> PluginResult<()> { info!("Processing queue {}", queue_pubkey); // Get the queue @@ -313,7 +317,8 @@ impl Inner { Ok(signature) => { info!("✅ {}", signature); self.actionable_queues.remove(&queue_pubkey); - self.signatures.insert(signature); + self.signatures + .insert(signature, (queue_pubkey, confirmed_slot)); } Err(err) => { info!("❌ {:#?}", err); @@ -326,31 +331,69 @@ impl Inner { Ok(()) } - fn confirm_signatures(self: Arc) -> PluginResult<()> { + fn confirm_signatures(self: Arc, confirmed_slot: u64) -> PluginResult<()> { for signature_ref in self.signatures.iter() { - let signature = signature_ref.clone(); - self.clone() - .spawn(|this| async move { this.confirm_signature(signature) })? + let signature = *signature_ref.key(); + let tuple = signature_ref.value(); + let queue_pubkey = tuple.0; + let attempted_slot = tuple.1; + self.clone().spawn(|this| async move { + this.confirm_signature(signature, queue_pubkey, confirmed_slot, attempted_slot) + })? } Ok(()) } - fn confirm_signature(self: Arc, signature: Signature) -> PluginResult<()> { + fn confirm_signature( + self: Arc, + signature: Signature, + _queue_pubkey: Pubkey, + _confirmed_slot: u64, + _attempted_slot: u64, + ) -> PluginResult<()> { info!("Confirming signature {}", signature); + match self .client - .confirm_transaction(&signature) + .get_signature_status(&signature) .map_err(|_| GeyserPluginError::Custom("Failed to get confirmation status".into()))? { - true => { - // This signature doesn't need to be checked again - self.signatures.remove(&signature); + Some(res) => { + match res { + Ok(()) => { + // This signature doesn't need to be checked again + self.signatures.remove(&signature); + } + Err(err) => { + // TODO + info!("Transaction {} failed with error {}", signature, err); + } + } } - false => { - // Noop for now + None => { + info!("Transaction {} has not been confirmed", signature); } } + // match self + // .client + // .confirm_transaction(&signature) + // .map_err(|_| GeyserPluginError::Custom("Failed to get confirmation status".into()))? + // { + // true => { + // // This signature doesn't need to be checked again + // self.signatures.remove(&signature); + // } + // false => { + // // Noop for now + // // TODO If this signature hasn't been confirmed for x slots, then it should be retried. + // // let SLOT_TIMEOUT_THRESHOLD = 50; + // // if confirmed_slot > attempted_slot + SLOT_TIMEOUT_THRESHOLD { + // // self.signatures.remove(signa) + // // } + // } + // } + Ok(()) } From 9e661a3f731b6425d53a66958ce8a9a6becf76af Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 13:11:18 -0500 Subject: [PATCH 09/13] Pay fees from the manager account rather than the queue --- programs/scheduler/src/state/task.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/programs/scheduler/src/state/task.rs b/programs/scheduler/src/state/task.rs index 162c7fd05..c9bafd19d 100644 --- a/programs/scheduler/src/state/task.rs +++ b/programs/scheduler/src/state/task.rs @@ -205,7 +205,7 @@ impl TaskAccount for Account<'_, Task> { .delegate_fee .checked_add(delegate_reimbursement) .unwrap(); - **queue.to_account_info().try_borrow_mut_lamports()? = queue + **manager.to_account_info().try_borrow_mut_lamports()? = manager .to_account_info() .lamports() .checked_sub(total_delegate_fee) @@ -217,7 +217,7 @@ impl TaskAccount for Account<'_, Task> { .unwrap(); // Pay program fees - **queue.to_account_info().try_borrow_mut_lamports()? = queue + **manager.to_account_info().try_borrow_mut_lamports()? = manager .to_account_info() .lamports() .checked_sub(config.program_fee) From 8173e1f0b190af65d241263bef5c052b09a61b04 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 13:50:37 -0500 Subject: [PATCH 10/13] Move queue back into actionable set if not confirmed within 150 slots --- plugin/src/plugin.rs | 42 +++++++++++++----------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 6108fc688..6dd48fc79 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -1,3 +1,4 @@ +use cronos_sdk::ClientError; use solana_sdk::signature::Signature; use { @@ -154,8 +155,6 @@ pub struct Inner { impl Inner { fn handle_confirmed_slot(self: Arc, confirmed_slot: u64) -> PluginResult<()> { info!("Confirmed slot: {}", confirmed_slot); - info!("Upcoming queues: {:#?}", self.pending_queues); - info!("Due queues: {:#?}", self.actionable_queues); // Look for the latest confirmed sysvar unix timestamp let mut confirmed_unix_timestamp = None; @@ -322,6 +321,7 @@ impl Inner { } Err(err) => { info!("❌ {:#?}", err); + self.actionable_queues.remove(&queue_pubkey); // TODO Track failed attempts and purge the queue if // it fails too many times. @@ -347,12 +347,10 @@ impl Inner { fn confirm_signature( self: Arc, signature: Signature, - _queue_pubkey: Pubkey, - _confirmed_slot: u64, - _attempted_slot: u64, + queue_pubkey: Pubkey, + confirmed_slot: u64, + attempted_slot: u64, ) -> PluginResult<()> { - info!("Confirming signature {}", signature); - match self .client .get_signature_status(&signature) @@ -365,35 +363,21 @@ impl Inner { self.signatures.remove(&signature); } Err(err) => { - // TODO + // TODO Check the error. Should this be retried? info!("Transaction {} failed with error {}", signature, err); } } } None => { - info!("Transaction {} has not been confirmed", signature); + // If many slots have passed since the tx was sent, then assume failure + // and move the pubkey back into the set of actionable queues. + let timeout_threshold = 150; + if confirmed_slot > attempted_slot + timeout_threshold { + self.signatures.remove(&signature); + self.actionable_queues.insert(queue_pubkey); + } } } - - // match self - // .client - // .confirm_transaction(&signature) - // .map_err(|_| GeyserPluginError::Custom("Failed to get confirmation status".into()))? - // { - // true => { - // // This signature doesn't need to be checked again - // self.signatures.remove(&signature); - // } - // false => { - // // Noop for now - // // TODO If this signature hasn't been confirmed for x slots, then it should be retried. - // // let SLOT_TIMEOUT_THRESHOLD = 50; - // // if confirmed_slot > attempted_slot + SLOT_TIMEOUT_THRESHOLD { - // // self.signatures.remove(signa) - // // } - // } - // } - Ok(()) } From 7241bd79670901403f9707080fd0c909bab1dfbd Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 13:51:31 -0500 Subject: [PATCH 11/13] Cleanup imports --- plugin/src/plugin.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 6dd48fc79..2f36338dd 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -1,6 +1,3 @@ -use cronos_sdk::ClientError; -use solana_sdk::signature::Signature; - use { crate::{config::Config as PluginConfig, filter::CronosAccountUpdate}, cronos_sdk::{ @@ -18,6 +15,7 @@ use { instruction::{AccountMeta, Instruction}, pubkey::Pubkey, }, + solana_sdk::signature::Signature, std::{collections::HashSet, fmt::Debug, sync::Arc}, tokio::runtime::{Builder, Runtime}, }; From 5651c7a676736b4c578d17c011f4daeff23f0584 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 14:15:27 -0500 Subject: [PATCH 12/13] Update comments and function signatures --- plugin/src/plugin.rs | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 2f36338dd..725a3c47a 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -68,7 +68,7 @@ impl GeyserPlugin for CronosPlugin { &mut self, account: ReplicaAccountInfoVersions, _slot: u64, - is_startup: bool, + _is_startup: bool, ) -> PluginResult<()> { let account_info = match account { ReplicaAccountInfoVersions::V0_0_1(account_info) => account_info.clone(), @@ -82,10 +82,7 @@ impl GeyserPlugin for CronosPlugin { match account_update { CronosAccountUpdate::Clock { clock } => this.handle_clock_update(clock), CronosAccountUpdate::Queue { queue } => { - info!( - "Caching queue {:#?}. Is startup: {}", - account_pubkey, is_startup - ); + info!("Caching queue {:#?}", account_pubkey); this.handle_queue_update(queue, account_pubkey) } } @@ -144,10 +141,20 @@ impl GeyserPlugin for CronosPlugin { pub struct Inner { pub client: Client, pub runtime: Runtime, - pub actionable_queues: DashSet, // The set of queues that can be processed - pub pending_queues: DashMap>, // Map from exec_at timestamps to the list of queues actionable at that moment - pub signatures: DashMap, // Map from tx signatures to a (queue pubkey, slot) tuple. The slot represents when the tx was sent. - pub timestamps: DashMap, // Map of slot numbers to sysvar clock unix_timestamps + + // The set of queues that can be processed + pub actionable_queues: DashSet, + + // Map from exec_at timestamps to the list of queues scheduled + // for that moment. + pub pending_queues: DashMap>, + + // Map from tx signatures to a (queue pubkey, slot) tuple. The slot + // represents the latest confirmed slot at the time the tx was sent. + pub signatures: DashMap, + + // Map from slot numbers to sysvar clock unix_timestamps + pub timestamps: DashMap, } impl Inner { @@ -321,8 +328,7 @@ impl Inner { info!("❌ {:#?}", err); self.actionable_queues.remove(&queue_pubkey); - // TODO Track failed attempts and purge the queue if - // it fails too many times. + // TODO Track failed attempts and purge the queue if it fails too many times. } } @@ -336,7 +342,7 @@ impl Inner { let queue_pubkey = tuple.0; let attempted_slot = tuple.1; self.clone().spawn(|this| async move { - this.confirm_signature(signature, queue_pubkey, confirmed_slot, attempted_slot) + this.confirm_signature(attempted_slot, confirmed_slot, queue_pubkey, signature) })? } Ok(()) @@ -344,10 +350,10 @@ impl Inner { fn confirm_signature( self: Arc, - signature: Signature, - queue_pubkey: Pubkey, - confirmed_slot: u64, attempted_slot: u64, + confirmed_slot: u64, + queue_pubkey: Pubkey, + signature: Signature, ) -> PluginResult<()> { match self .client @@ -361,7 +367,7 @@ impl Inner { self.signatures.remove(&signature); } Err(err) => { - // TODO Check the error. Should this be retried? + // TODO Check the error. Should this request be retried? info!("Transaction {} failed with error {}", signature, err); } } From cf579dadef08b8a320d31b587d8e72d7420c9c90 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Wed, 1 Jun 2022 14:21:20 -0500 Subject: [PATCH 13/13] Remove comment --- plugin/src/plugin.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 725a3c47a..b5ab35522 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -327,8 +327,6 @@ impl Inner { Err(err) => { info!("❌ {:#?}", err); self.actionable_queues.remove(&queue_pubkey); - - // TODO Track failed attempts and purge the queue if it fails too many times. } }