diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index ba24fc4bd3..f97905b6a4 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -12,10 +12,17 @@ use std::{ use async_trait::async_trait; use chrono::Utc; use hotshot_task_impls::{ - builder::BuilderClient, consensus::ConsensusTaskState, da::DaTaskState, - quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState, - quorum_vote::QuorumVoteTaskState, request::NetworkRequestState, rewind::RewindTaskState, - transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState, + builder::BuilderClient, + consensus::ConsensusTaskState, + da::DaTaskState, + quorum_proposal::QuorumProposalTaskState, + quorum_proposal_recv::QuorumProposalRecvTaskState, + quorum_vote::{drb_computations::DrbComputations, QuorumVoteTaskState}, + request::NetworkRequestState, + rewind::RewindTaskState, + transactions::TransactionTaskState, + upgrade::UpgradeTaskState, + vid::VidTaskState, view_sync::ViewSyncTaskState, }; use hotshot_types::{ @@ -234,7 +241,7 @@ impl, V: Versions> CreateTaskState vote_dependencies: BTreeMap::new(), network: Arc::clone(&handle.hotshot.network), membership: (*handle.hotshot.memberships).clone().into(), - drb_computations: BTreeMap::new(), + drb_computations: DrbComputations::new(), output_event_stream: handle.hotshot.external_event_stream.0.clone(), id: handle.hotshot.id, storage: Arc::clone(&handle.storage), diff --git a/crates/task-impls/src/quorum_vote/drb_computations.rs b/crates/task-impls/src/quorum_vote/drb_computations.rs new file mode 100644 index 0000000000..fc5483f8dd --- /dev/null +++ b/crates/task-impls/src/quorum_vote/drb_computations.rs @@ -0,0 +1,126 @@ +use std::collections::{btree_map, BTreeMap}; + +use hotshot_types::{ + drb::{compute_drb_result, DrbResult, DrbSeedInput}, + traits::node_implementation::{ConsensusTime, NodeType}, +}; +use tokio::{spawn, task::JoinHandle}; + +/// Number of previous results and seeds to keep +pub const KEEP_PREVIOUS_RESULT_COUNT: u64 = 8; + +/// Helper struct to track state of DRB computations +pub struct DrbComputations { + /// Stored results from computations + results: BTreeMap, + + /// Currently live computation + task: Option<(TYPES::Epoch, JoinHandle)>, + + /// Stored inputs to computations + seeds: BTreeMap, +} + +impl DrbComputations { + #[must_use] + /// Create a new DrbComputations + pub fn new() -> Self { + Self { + results: BTreeMap::new(), + task: None, + seeds: BTreeMap::new(), + } + } + + /// If a task is currently live AND has finished, join it and save the result. + /// If the epoch for the calculation was the same as the provided epoch, return true + /// If a task is currently live and NOT finished, abort it UNLESS the task epoch is the same as + /// cur_epoch, in which case keep letting it run and return true. + /// Return false if a task should be spawned for the given epoch. + async fn join_or_abort_old_task(&mut self, epoch: TYPES::Epoch) -> bool { + if let Some((task_epoch, join_handle)) = &mut self.task { + if join_handle.is_finished() { + match join_handle.await { + Ok(result) => { + self.results.insert(*task_epoch, result); + let result = *task_epoch == epoch; + self.task = None; + result + } + Err(e) => { + tracing::error!("error joining DRB computation task: {e:?}"); + false + } + } + } else if *task_epoch == epoch { + true + } else { + join_handle.abort(); + self.task = None; + false + } + } else { + false + } + } + + /// Stores a seed for a particular epoch for later use by start_task_if_not_running, called from handle_quorum_proposal_validated_drb_calculation_start + pub fn store_seed(&mut self, epoch: TYPES::Epoch, drb_seed_input: DrbSeedInput) { + self.seeds.insert(epoch, drb_seed_input); + } + + /// Starts a new task. Cancels a current task if that task is not for the provided epoch. Allows a task to continue + /// running if it was already started for the given epoch. Avoids running the task if we already have a result for + /// the epoch. + pub async fn start_task_if_not_running(&mut self, epoch: TYPES::Epoch) { + // If join_or_abort_task returns true, then we either just completed a task for this epoch, or we currently + // have a running task for the epoch. + if self.join_or_abort_old_task(epoch).await { + return; + } + + // In case we somehow ended up processing this epoch already, don't start it again + if self.results.contains_key(&epoch) { + return; + } + + if let btree_map::Entry::Occupied(entry) = self.seeds.entry(epoch) { + let drb_seed_input = *entry.get(); + let new_drb_task = spawn(async move { compute_drb_result::(drb_seed_input) }); + self.task = Some((epoch, new_drb_task)); + entry.remove(); + } + } + + /// Retrieves the result for a given epoch + pub fn get_result(&self, epoch: TYPES::Epoch) -> Option { + self.results.get(&epoch).copied() + } + + /// Retrieves the seed for a given epoch + pub fn get_seed(&self, epoch: TYPES::Epoch) -> Option { + self.seeds.get(&epoch).copied() + } + + /// Garbage collects internal data structures + pub fn garbage_collect(&mut self, epoch: TYPES::Epoch) { + if epoch.u64() < KEEP_PREVIOUS_RESULT_COUNT { + return; + } + + let retain_epoch = epoch - KEEP_PREVIOUS_RESULT_COUNT; + // N.B. x.split_off(y) returns the part of the map where key >= y + + // Remove result entries older than EPOCH + self.results = self.results.split_off(&retain_epoch); + + // Remove result entries older than EPOCH+1 + self.seeds = self.seeds.split_off(&(retain_epoch + 1)); + } +} + +impl Default for DrbComputations { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/task-impls/src/quorum_vote/handlers.rs b/crates/task-impls/src/quorum_vote/handlers.rs index 56804f1d52..24629116ec 100644 --- a/crates/task-impls/src/quorum_vote/handlers.rs +++ b/crates/task-impls/src/quorum_vote/handlers.rs @@ -13,8 +13,7 @@ use committable::Committable; use hotshot_types::{ consensus::OuterConsensus, data::{Leaf2, QuorumProposal2, VidDisperseShare}, - drb::compute_drb_result, - event::{Event, EventType}, + event::{Event, EventType, LeafInfo}, message::{Proposal, UpgradeLock}, simple_vote::{QuorumData2, QuorumVote2}, traits::{ @@ -28,7 +27,6 @@ use hotshot_types::{ utils::epoch_from_block_number, vote::HasViewNumber, }; -use tokio::spawn; use tracing::instrument; use utils::anytrace::*; use vbs::version::StaticVersionType; @@ -43,6 +41,96 @@ use crate::{ quorum_vote::Versions, }; +/// Handles starting the DRB calculation. Uses the seed previously stored in +/// handle_quorum_proposal_validated_drb_calculation_seed +async fn handle_quorum_proposal_validated_drb_calculation_start< + TYPES: NodeType, + I: NodeImplementation, + V: Versions, +>( + proposal: &QuorumProposal2, + task_state: &mut QuorumVoteTaskState, +) { + let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number( + proposal.block_header.block_number(), + task_state.epoch_height, + )); + + // Start the new task if we're in the committee for this epoch + if task_state + .membership + .has_stake(&task_state.public_key, current_epoch_number) + { + task_state + .drb_computations + .start_task_if_not_running(current_epoch_number + 1) + .await; + } +} + +/// Handles storing the seed for an upcoming DRB calculation. +/// +/// We store the DRB computation seed 2 epochs in advance, if the decided block is the last but +/// third block in the current epoch and we are in the quorum committee of the next epoch. +/// +/// Special cases: +/// * Epoch 0: No DRB computation since we'll transition to epoch 1 immediately. +/// * Epoch 1 and 2: Use `[0u8; 32]` as the DRB result since when we first start the +/// computation in epoch 1, the result is for epoch 3. +/// +/// We don't need to handle the special cases explicitly here, because the first proposal +/// with which we'll start the DRB computation is for epoch 3. +fn handle_quorum_proposal_validated_drb_calculation_seed< + TYPES: NodeType, + I: NodeImplementation, + V: Versions, +>( + proposal: &QuorumProposal2, + task_state: &mut QuorumVoteTaskState, + leaf_views: &[LeafInfo], +) -> Result<()> { + // This is never none if we've reached a new decide, so this is safe to unwrap. + let decided_block_number = leaf_views + .last() + .unwrap() + .leaf + .block_header() + .block_number(); + + // Skip if this is not the expected block. + if task_state.epoch_height != 0 && (decided_block_number + 3) % task_state.epoch_height == 0 { + // Cancel old DRB computation tasks. + let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number( + decided_block_number, + task_state.epoch_height, + )); + + task_state + .drb_computations + .garbage_collect(current_epoch_number); + + // Skip if we are not in the committee of the next epoch. + if task_state + .membership + .has_stake(&task_state.public_key, current_epoch_number + 1) + { + let new_epoch_number = current_epoch_number + 2; + let Ok(drb_seed_input_vec) = bincode::serialize(&proposal.justify_qc.signatures) else { + bail!("Failed to serialize the QC signature."); + }; + let Ok(drb_seed_input) = drb_seed_input_vec.try_into() else { + bail!("Failed to convert the serialized QC signature into a DRB seed input."); + }; + + // Store the drb seed input for the next calculation + task_state + .drb_computations + .store_seed(new_epoch_number, drb_seed_input); + } + } + Ok(()) +} + /// Handles the `QuorumProposalValidated` event. #[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number))] pub(crate) async fn handle_quorum_proposal_validated< @@ -58,6 +146,10 @@ pub(crate) async fn handle_quorum_proposal_validated< .version(proposal.view_number()) .await?; + if version >= V::Epochs::VERSION { + handle_quorum_proposal_validated_drb_calculation_start(proposal, task_state).await; + } + let LeafChainTraversalOutcome { new_locked_view_number, new_decided_view_number, @@ -155,63 +247,12 @@ pub(crate) async fn handle_quorum_proposal_validated< .await; tracing::debug!("Successfully sent decide event"); - // Start the DRB computation two epochs in advance, if the decided block is the last but - // third block in the current epoch and we are in the quorum committee of the next epoch. - // - // Special cases: - // * Epoch 0: No DRB computation since we'll transition to epoch 1 immediately. - // * Epoch 1 and 2: Use `[0u8; 32]` as the DRB result since when we first start the - // computation in epoch 1, the result is for epoch 3. - // - // We don't need to handle the special cases explicitly here, because the first proposal - // with which we'll start the DRB computation is for epoch 3. if version >= V::Epochs::VERSION { - // This is never none if we've reached a new decide, so this is safe to unwrap. - let decided_block_number = leaf_views - .last() - .unwrap() - .leaf - .block_header() - .block_number(); - - // Skip if this is not the expected block. - if task_state.epoch_height != 0 - && (decided_block_number + 3) % task_state.epoch_height == 0 - { - // Cancel old DRB computation tasks. - let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number( - decided_block_number, - task_state.epoch_height, - )); - let current_tasks = task_state.drb_computations.split_off(¤t_epoch_number); - while let Some((_, task)) = task_state.drb_computations.pop_last() { - task.abort(); - } - task_state.drb_computations = current_tasks; - - // Skip if we are not in the committee of the next epoch. - if task_state - .membership - .has_stake(&task_state.public_key, current_epoch_number + 1) - { - let new_epoch_number = current_epoch_number + 2; - let Ok(drb_seed_input_vec) = - bincode::serialize(&proposal.justify_qc.signatures) - else { - bail!("Failed to serialize the QC signature."); - }; - let Ok(drb_seed_input) = drb_seed_input_vec.try_into() else { - bail!( - "Failed to convert the serialized QC signature into a DRB seed input." - ); - }; - let new_drb_task = - spawn(async move { compute_drb_result::(drb_seed_input) }); - task_state - .drb_computations - .insert(new_epoch_number, new_drb_task); - } - } + handle_quorum_proposal_validated_drb_calculation_seed( + proposal, + task_state, + &leaf_views, + )?; } } diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index 4162969397..cf8181d771 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -10,6 +10,7 @@ use async_broadcast::{InactiveReceiver, Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; use committable::Committable; +use drb_computations::DrbComputations; use hotshot_task::{ dependency::{AndDependency, EventDependency}, dependency_task::{DependencyTask, HandleDepOutput}, @@ -18,7 +19,6 @@ use hotshot_task::{ use hotshot_types::{ consensus::{ConsensusMetricsValue, OuterConsensus}, data::{Leaf2, QuorumProposal2}, - drb::DrbResult, event::Event, message::{Proposal, UpgradeLock}, traits::{ @@ -44,6 +44,9 @@ use crate::{ quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state}, }; +/// Helper for DRB Computations +pub mod drb_computations; + /// Event handlers for `QuorumProposalValidated`. mod handlers; @@ -276,7 +279,8 @@ pub struct QuorumVoteTaskState, V: pub membership: Arc, /// Table for the in-progress DRB computation tasks. - pub drb_computations: BTreeMap>, + //pub drb_computations: BTreeMap>, + pub drb_computations: DrbComputations, /// Output events to application pub output_event_stream: async_broadcast::Sender>,