Skip to content

Commit

Permalink
#3933 DrbComputations helper for quorum_vote (#3941)
Browse files Browse the repository at this point in the history
* 3933 DrbComputations helper for quorum_vote

* 3933 modify logic of drb computation to start at epoch transition

* 3933 check for if we should store the seed based on when we're expecting to be in the committee

* 3933 always start the task for drb

* 3933 fix clippy lints

* 3933 avoid unwrapping leaf_views.last()

* 3933 additional fixes

* 3933 correct nits, use proposal block number to calculate epoch number

* 3933 break drb_calculation_start and drb_calculation_seed into separate functions

* 3933 fix comment
  • Loading branch information
pls148 authored Dec 4, 2024
1 parent 9eb759c commit 2292e5d
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 66 deletions.
17 changes: 12 additions & 5 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ use std::{
use async_trait::async_trait;
use chrono::Utc;
use hotshot_task_impls::{
builder::BuilderClient, consensus::ConsensusTaskState, da::DaTaskState,
quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState,
quorum_vote::QuorumVoteTaskState, request::NetworkRequestState, rewind::RewindTaskState,
transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState,
builder::BuilderClient,
consensus::ConsensusTaskState,
da::DaTaskState,
quorum_proposal::QuorumProposalTaskState,
quorum_proposal_recv::QuorumProposalRecvTaskState,
quorum_vote::{drb_computations::DrbComputations, QuorumVoteTaskState},
request::NetworkRequestState,
rewind::RewindTaskState,
transactions::TransactionTaskState,
upgrade::UpgradeTaskState,
vid::VidTaskState,
view_sync::ViewSyncTaskState,
};
use hotshot_types::{
Expand Down Expand Up @@ -234,7 +241,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
vote_dependencies: BTreeMap::new(),
network: Arc::clone(&handle.hotshot.network),
membership: (*handle.hotshot.memberships).clone().into(),
drb_computations: BTreeMap::new(),
drb_computations: DrbComputations::new(),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
id: handle.hotshot.id,
storage: Arc::clone(&handle.storage),
Expand Down
126 changes: 126 additions & 0 deletions crates/task-impls/src/quorum_vote/drb_computations.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::collections::{btree_map, BTreeMap};

use hotshot_types::{
drb::{compute_drb_result, DrbResult, DrbSeedInput},
traits::node_implementation::{ConsensusTime, NodeType},
};
use tokio::{spawn, task::JoinHandle};

/// Number of previous results and seeds to keep
pub const KEEP_PREVIOUS_RESULT_COUNT: u64 = 8;

/// Helper struct to track state of DRB computations
pub struct DrbComputations<TYPES: NodeType> {
/// Stored results from computations
results: BTreeMap<TYPES::Epoch, DrbResult>,

/// Currently live computation
task: Option<(TYPES::Epoch, JoinHandle<DrbResult>)>,

/// Stored inputs to computations
seeds: BTreeMap<TYPES::Epoch, DrbSeedInput>,
}

impl<TYPES: NodeType> DrbComputations<TYPES> {
#[must_use]
/// Create a new DrbComputations
pub fn new() -> Self {
Self {
results: BTreeMap::new(),
task: None,
seeds: BTreeMap::new(),
}
}

/// If a task is currently live AND has finished, join it and save the result.
/// If the epoch for the calculation was the same as the provided epoch, return true
/// If a task is currently live and NOT finished, abort it UNLESS the task epoch is the same as
/// cur_epoch, in which case keep letting it run and return true.
/// Return false if a task should be spawned for the given epoch.
async fn join_or_abort_old_task(&mut self, epoch: TYPES::Epoch) -> bool {
if let Some((task_epoch, join_handle)) = &mut self.task {
if join_handle.is_finished() {
match join_handle.await {
Ok(result) => {
self.results.insert(*task_epoch, result);
let result = *task_epoch == epoch;
self.task = None;
result
}
Err(e) => {
tracing::error!("error joining DRB computation task: {e:?}");
false
}
}
} else if *task_epoch == epoch {
true
} else {
join_handle.abort();
self.task = None;
false
}
} else {
false
}
}

/// Stores a seed for a particular epoch for later use by start_task_if_not_running, called from handle_quorum_proposal_validated_drb_calculation_start
pub fn store_seed(&mut self, epoch: TYPES::Epoch, drb_seed_input: DrbSeedInput) {
self.seeds.insert(epoch, drb_seed_input);
}

/// Starts a new task. Cancels a current task if that task is not for the provided epoch. Allows a task to continue
/// running if it was already started for the given epoch. Avoids running the task if we already have a result for
/// the epoch.
pub async fn start_task_if_not_running(&mut self, epoch: TYPES::Epoch) {
// If join_or_abort_task returns true, then we either just completed a task for this epoch, or we currently
// have a running task for the epoch.
if self.join_or_abort_old_task(epoch).await {
return;
}

// In case we somehow ended up processing this epoch already, don't start it again
if self.results.contains_key(&epoch) {
return;
}

if let btree_map::Entry::Occupied(entry) = self.seeds.entry(epoch) {
let drb_seed_input = *entry.get();
let new_drb_task = spawn(async move { compute_drb_result::<TYPES>(drb_seed_input) });
self.task = Some((epoch, new_drb_task));
entry.remove();
}
}

/// Retrieves the result for a given epoch
pub fn get_result(&self, epoch: TYPES::Epoch) -> Option<DrbResult> {
self.results.get(&epoch).copied()
}

/// Retrieves the seed for a given epoch
pub fn get_seed(&self, epoch: TYPES::Epoch) -> Option<DrbSeedInput> {
self.seeds.get(&epoch).copied()
}

/// Garbage collects internal data structures
pub fn garbage_collect(&mut self, epoch: TYPES::Epoch) {
if epoch.u64() < KEEP_PREVIOUS_RESULT_COUNT {
return;
}

let retain_epoch = epoch - KEEP_PREVIOUS_RESULT_COUNT;
// N.B. x.split_off(y) returns the part of the map where key >= y

// Remove result entries older than EPOCH
self.results = self.results.split_off(&retain_epoch);

// Remove result entries older than EPOCH+1
self.seeds = self.seeds.split_off(&(retain_epoch + 1));
}
}

impl<TYPES: NodeType> Default for DrbComputations<TYPES> {
fn default() -> Self {
Self::new()
}
}
159 changes: 100 additions & 59 deletions crates/task-impls/src/quorum_vote/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use committable::Committable;
use hotshot_types::{
consensus::OuterConsensus,
data::{Leaf2, QuorumProposal2, VidDisperseShare},
drb::compute_drb_result,
event::{Event, EventType},
event::{Event, EventType, LeafInfo},
message::{Proposal, UpgradeLock},
simple_vote::{QuorumData2, QuorumVote2},
traits::{
Expand All @@ -28,7 +27,6 @@ use hotshot_types::{
utils::epoch_from_block_number,
vote::HasViewNumber,
};
use tokio::spawn;
use tracing::instrument;
use utils::anytrace::*;
use vbs::version::StaticVersionType;
Expand All @@ -43,6 +41,96 @@ use crate::{
quorum_vote::Versions,
};

/// Handles starting the DRB calculation. Uses the seed previously stored in
/// handle_quorum_proposal_validated_drb_calculation_seed
async fn handle_quorum_proposal_validated_drb_calculation_start<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
proposal: &QuorumProposal2<TYPES>,
task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
) {
let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
proposal.block_header.block_number(),
task_state.epoch_height,
));

// Start the new task if we're in the committee for this epoch
if task_state
.membership
.has_stake(&task_state.public_key, current_epoch_number)
{
task_state
.drb_computations
.start_task_if_not_running(current_epoch_number + 1)
.await;
}
}

/// Handles storing the seed for an upcoming DRB calculation.
///
/// We store the DRB computation seed 2 epochs in advance, if the decided block is the last but
/// third block in the current epoch and we are in the quorum committee of the next epoch.
///
/// Special cases:
/// * Epoch 0: No DRB computation since we'll transition to epoch 1 immediately.
/// * Epoch 1 and 2: Use `[0u8; 32]` as the DRB result since when we first start the
/// computation in epoch 1, the result is for epoch 3.
///
/// We don't need to handle the special cases explicitly here, because the first proposal
/// with which we'll start the DRB computation is for epoch 3.
fn handle_quorum_proposal_validated_drb_calculation_seed<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
proposal: &QuorumProposal2<TYPES>,
task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
leaf_views: &[LeafInfo<TYPES>],
) -> Result<()> {
// This is never none if we've reached a new decide, so this is safe to unwrap.
let decided_block_number = leaf_views
.last()
.unwrap()
.leaf
.block_header()
.block_number();

// Skip if this is not the expected block.
if task_state.epoch_height != 0 && (decided_block_number + 3) % task_state.epoch_height == 0 {
// Cancel old DRB computation tasks.
let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
decided_block_number,
task_state.epoch_height,
));

task_state
.drb_computations
.garbage_collect(current_epoch_number);

// Skip if we are not in the committee of the next epoch.
if task_state
.membership
.has_stake(&task_state.public_key, current_epoch_number + 1)
{
let new_epoch_number = current_epoch_number + 2;
let Ok(drb_seed_input_vec) = bincode::serialize(&proposal.justify_qc.signatures) else {
bail!("Failed to serialize the QC signature.");
};
let Ok(drb_seed_input) = drb_seed_input_vec.try_into() else {
bail!("Failed to convert the serialized QC signature into a DRB seed input.");
};

// Store the drb seed input for the next calculation
task_state
.drb_computations
.store_seed(new_epoch_number, drb_seed_input);
}
}
Ok(())
}

/// Handles the `QuorumProposalValidated` event.
#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number))]
pub(crate) async fn handle_quorum_proposal_validated<
Expand All @@ -58,6 +146,10 @@ pub(crate) async fn handle_quorum_proposal_validated<
.version(proposal.view_number())
.await?;

if version >= V::Epochs::VERSION {
handle_quorum_proposal_validated_drb_calculation_start(proposal, task_state).await;
}

let LeafChainTraversalOutcome {
new_locked_view_number,
new_decided_view_number,
Expand Down Expand Up @@ -155,63 +247,12 @@ pub(crate) async fn handle_quorum_proposal_validated<
.await;
tracing::debug!("Successfully sent decide event");

// Start the DRB computation two epochs in advance, if the decided block is the last but
// third block in the current epoch and we are in the quorum committee of the next epoch.
//
// Special cases:
// * Epoch 0: No DRB computation since we'll transition to epoch 1 immediately.
// * Epoch 1 and 2: Use `[0u8; 32]` as the DRB result since when we first start the
// computation in epoch 1, the result is for epoch 3.
//
// We don't need to handle the special cases explicitly here, because the first proposal
// with which we'll start the DRB computation is for epoch 3.
if version >= V::Epochs::VERSION {
// This is never none if we've reached a new decide, so this is safe to unwrap.
let decided_block_number = leaf_views
.last()
.unwrap()
.leaf
.block_header()
.block_number();

// Skip if this is not the expected block.
if task_state.epoch_height != 0
&& (decided_block_number + 3) % task_state.epoch_height == 0
{
// Cancel old DRB computation tasks.
let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
decided_block_number,
task_state.epoch_height,
));
let current_tasks = task_state.drb_computations.split_off(&current_epoch_number);
while let Some((_, task)) = task_state.drb_computations.pop_last() {
task.abort();
}
task_state.drb_computations = current_tasks;

// Skip if we are not in the committee of the next epoch.
if task_state
.membership
.has_stake(&task_state.public_key, current_epoch_number + 1)
{
let new_epoch_number = current_epoch_number + 2;
let Ok(drb_seed_input_vec) =
bincode::serialize(&proposal.justify_qc.signatures)
else {
bail!("Failed to serialize the QC signature.");
};
let Ok(drb_seed_input) = drb_seed_input_vec.try_into() else {
bail!(
"Failed to convert the serialized QC signature into a DRB seed input."
);
};
let new_drb_task =
spawn(async move { compute_drb_result::<TYPES>(drb_seed_input) });
task_state
.drb_computations
.insert(new_epoch_number, new_drb_task);
}
}
handle_quorum_proposal_validated_drb_calculation_seed(
proposal,
task_state,
&leaf_views,
)?;
}
}

Expand Down
8 changes: 6 additions & 2 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use committable::Committable;
use drb_computations::DrbComputations;
use hotshot_task::{
dependency::{AndDependency, EventDependency},
dependency_task::{DependencyTask, HandleDepOutput},
Expand All @@ -18,7 +19,6 @@ use hotshot_task::{
use hotshot_types::{
consensus::{ConsensusMetricsValue, OuterConsensus},
data::{Leaf2, QuorumProposal2},
drb::DrbResult,
event::Event,
message::{Proposal, UpgradeLock},
traits::{
Expand All @@ -44,6 +44,9 @@ use crate::{
quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
};

/// Helper for DRB Computations
pub mod drb_computations;

/// Event handlers for `QuorumProposalValidated`.
mod handlers;

Expand Down Expand Up @@ -276,7 +279,8 @@ pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
pub membership: Arc<TYPES::Membership>,

/// Table for the in-progress DRB computation tasks.
pub drb_computations: BTreeMap<TYPES::Epoch, JoinHandle<DrbResult>>,
//pub drb_computations: BTreeMap<TYPES::Epoch, JoinHandle<DrbResult>>,
pub drb_computations: DrbComputations<TYPES>,

/// Output events to application
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
Expand Down

0 comments on commit 2292e5d

Please sign in to comment.