diff --git a/Cargo.lock b/Cargo.lock index acc94c379c..a97145b4fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,9 +175,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" +checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" [[package]] name = "arbitrary" @@ -1374,9 +1374,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.21" +version = "4.5.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" +checksum = "69371e34337c4c984bbe322360c2547210bf632eb2814bbe78a6e87a2935bd2b" dependencies = [ "clap_builder", "clap_derive", @@ -1384,9 +1384,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.21" +version = "4.5.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" +checksum = "6e24c1b4099818523236a8ca881d2b45db98dadfb4625cf6608c12069fcbbde1" dependencies = [ "anstream", "anstyle", @@ -3666,18 +3666,18 @@ dependencies = [ [[package]] name = "impl-codec" -version = "0.7.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67aa010c1e3da95bf151bd8b4c059b2ed7e75387cdb969b4f8f2723a43f9941" +checksum = "ba6a270039626615617f3f36d15fc827041df3b78c439da2cadfa47455a77f2f" dependencies = [ "parity-scale-codec", ] [[package]] name = "impl-serde" -version = "0.5.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a143eada6a1ec4aefa5049037a26a6d597bfd64f8c026d07b77133e02b7dd0b" +checksum = "ebc88fc67028ae3db0c853baa36269d398d5f45b6982f95549ff5def78c935cd" dependencies = [ "serde", ] @@ -4104,7 +4104,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -4339,7 +4339,7 @@ dependencies = [ "smallvec", "thiserror 1.0.68", "tracing", - "uint 0.9.5", + "uint", "void", ] @@ -5530,14 +5530,14 @@ dependencies = [ [[package]] name = "primitive-types" -version = "0.13.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d15600a7d856470b7d278b3fe0e311fe28c2526348549f8ef2ff7db3299c87f5" +checksum = "0b34d9fd68ae0b74a41b21c03c2f62847aa0ffea044eee893b4c140b37e244e2" dependencies = [ "fixed-hash", "impl-codec", "impl-serde", - "uint 0.10.0", + "uint", ] [[package]] @@ -5771,7 +5771,7 @@ dependencies = [ "once_cell", "socket2 0.5.7", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -7963,18 +7963,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "uint" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "909988d098b2f738727b161a106cfc7cab00c539c2687a8836f8e565976fb53e" -dependencies = [ - "byteorder", - "crunchy", - "hex", - "static_assertions", -] - [[package]] name = "unicase" version = "2.8.0" diff --git a/Cargo.toml b/Cargo.toml index 62b93b4d14..4ebda6b06d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ derive_more = { version = "1.0" } digest = "0.10" either = "1.13" espresso-systems-common = { git = "https://github.com/espressosystems/espresso-systems-common", tag = "0.4.1" } -primitive-types = { version = "0.13.1", default-features = false, features = [ +primitive-types = { version = "0.12.2", default-features = false, features = [ "serde", ] } futures = { version = "0.3", default-features = false } diff --git a/README.md b/README.md index c641255b0f..dc1b35ed97 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ RUST_LOG=$ERROR_LOG_LEVEL RUST_LOG_FORMAT=$ERROR_LOG_FORMAT just run_test test_s ## Careful -To double check for UB: +To double-check for UB: ```bash nix develop .#correctnessShell diff --git a/crates/examples/push-cdn/README.md b/crates/examples/push-cdn/README.md index c460beb89a..a20e46f4ce 100644 --- a/crates/examples/push-cdn/README.md +++ b/crates/examples/push-cdn/README.md @@ -51,7 +51,7 @@ sleep 1m just example_fixed_leader multi-validator-push-cdn -- 9 http://127.0.0.1:4444 ``` -Where ones using `example_gpuvid_leader` could be the leader and should be running on a nvidia GPU, and other validators using `example_fixed_leader` will never be a leader. In practice, these url should be changed to the corresponding ip and port. +Where ones using `example_gpuvid_leader` could be the leader and should be running on an nvidia GPU, and other validators using `example_fixed_leader` will never be a leader. In practice, these url should be changed to the corresponding ip and port. If you don't have a gpu but want to test out fixed leader, you can run: @@ -65,4 +65,4 @@ sleep 1m just example_fixed_leader multi-validator-push-cdn -- 9 http://127.0.0.1:4444 ``` -Remember, you have to run leaders first, then other validators, so that leaders will have lower index. \ No newline at end of file +Remember, you have to run leaders first, then other validators, so that leaders will have lower index. diff --git a/crates/hotshot-stake-table/src/utils.rs b/crates/hotshot-stake-table/src/utils.rs index b295cdeb28..07a3d261b3 100644 --- a/crates/hotshot-stake-table/src/utils.rs +++ b/crates/hotshot-stake-table/src/utils.rs @@ -21,6 +21,6 @@ pub trait ToFields { /// convert a U256 to a field element. pub(crate) fn u256_to_field(v: &U256) -> F { let mut bytes = vec![0u8; 32]; - v.write_as_little_endian(&mut bytes); + v.to_little_endian(&mut bytes); F::from_le_bytes_mod_order(&bytes) } diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index 0c580c5414..2c8d8607a4 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -12,10 +12,17 @@ use std::{ use async_trait::async_trait; use chrono::Utc; use hotshot_task_impls::{ - builder::BuilderClient, consensus::ConsensusTaskState, da::DaTaskState, - quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState, - quorum_vote::QuorumVoteTaskState, request::NetworkRequestState, rewind::RewindTaskState, - transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState, + builder::BuilderClient, + consensus::ConsensusTaskState, + da::DaTaskState, + quorum_proposal::QuorumProposalTaskState, + quorum_proposal_recv::QuorumProposalRecvTaskState, + quorum_vote::{drb_computations::DrbComputations, QuorumVoteTaskState}, + request::NetworkRequestState, + rewind::RewindTaskState, + transactions::TransactionTaskState, + upgrade::UpgradeTaskState, + vid::VidTaskState, view_sync::ViewSyncTaskState, }; use hotshot_types::{ @@ -234,7 +241,7 @@ impl, V: Versions> CreateTaskState vote_dependencies: BTreeMap::new(), network: Arc::clone(&handle.hotshot.network), membership: (*handle.hotshot.memberships).clone().into(), - drb_computations: BTreeMap::new(), + drb_computations: DrbComputations::new(), output_event_stream: handle.hotshot.external_event_stream.0.clone(), id: handle.hotshot.id, storage: Arc::clone(&handle.storage), diff --git a/crates/task-impls/src/quorum_vote/drb_computations.rs b/crates/task-impls/src/quorum_vote/drb_computations.rs new file mode 100644 index 0000000000..fc5483f8dd --- /dev/null +++ b/crates/task-impls/src/quorum_vote/drb_computations.rs @@ -0,0 +1,126 @@ +use std::collections::{btree_map, BTreeMap}; + +use hotshot_types::{ + drb::{compute_drb_result, DrbResult, DrbSeedInput}, + traits::node_implementation::{ConsensusTime, NodeType}, +}; +use tokio::{spawn, task::JoinHandle}; + +/// Number of previous results and seeds to keep +pub const KEEP_PREVIOUS_RESULT_COUNT: u64 = 8; + +/// Helper struct to track state of DRB computations +pub struct DrbComputations { + /// Stored results from computations + results: BTreeMap, + + /// Currently live computation + task: Option<(TYPES::Epoch, JoinHandle)>, + + /// Stored inputs to computations + seeds: BTreeMap, +} + +impl DrbComputations { + #[must_use] + /// Create a new DrbComputations + pub fn new() -> Self { + Self { + results: BTreeMap::new(), + task: None, + seeds: BTreeMap::new(), + } + } + + /// If a task is currently live AND has finished, join it and save the result. + /// If the epoch for the calculation was the same as the provided epoch, return true + /// If a task is currently live and NOT finished, abort it UNLESS the task epoch is the same as + /// cur_epoch, in which case keep letting it run and return true. + /// Return false if a task should be spawned for the given epoch. + async fn join_or_abort_old_task(&mut self, epoch: TYPES::Epoch) -> bool { + if let Some((task_epoch, join_handle)) = &mut self.task { + if join_handle.is_finished() { + match join_handle.await { + Ok(result) => { + self.results.insert(*task_epoch, result); + let result = *task_epoch == epoch; + self.task = None; + result + } + Err(e) => { + tracing::error!("error joining DRB computation task: {e:?}"); + false + } + } + } else if *task_epoch == epoch { + true + } else { + join_handle.abort(); + self.task = None; + false + } + } else { + false + } + } + + /// Stores a seed for a particular epoch for later use by start_task_if_not_running, called from handle_quorum_proposal_validated_drb_calculation_start + pub fn store_seed(&mut self, epoch: TYPES::Epoch, drb_seed_input: DrbSeedInput) { + self.seeds.insert(epoch, drb_seed_input); + } + + /// Starts a new task. Cancels a current task if that task is not for the provided epoch. Allows a task to continue + /// running if it was already started for the given epoch. Avoids running the task if we already have a result for + /// the epoch. + pub async fn start_task_if_not_running(&mut self, epoch: TYPES::Epoch) { + // If join_or_abort_task returns true, then we either just completed a task for this epoch, or we currently + // have a running task for the epoch. + if self.join_or_abort_old_task(epoch).await { + return; + } + + // In case we somehow ended up processing this epoch already, don't start it again + if self.results.contains_key(&epoch) { + return; + } + + if let btree_map::Entry::Occupied(entry) = self.seeds.entry(epoch) { + let drb_seed_input = *entry.get(); + let new_drb_task = spawn(async move { compute_drb_result::(drb_seed_input) }); + self.task = Some((epoch, new_drb_task)); + entry.remove(); + } + } + + /// Retrieves the result for a given epoch + pub fn get_result(&self, epoch: TYPES::Epoch) -> Option { + self.results.get(&epoch).copied() + } + + /// Retrieves the seed for a given epoch + pub fn get_seed(&self, epoch: TYPES::Epoch) -> Option { + self.seeds.get(&epoch).copied() + } + + /// Garbage collects internal data structures + pub fn garbage_collect(&mut self, epoch: TYPES::Epoch) { + if epoch.u64() < KEEP_PREVIOUS_RESULT_COUNT { + return; + } + + let retain_epoch = epoch - KEEP_PREVIOUS_RESULT_COUNT; + // N.B. x.split_off(y) returns the part of the map where key >= y + + // Remove result entries older than EPOCH + self.results = self.results.split_off(&retain_epoch); + + // Remove result entries older than EPOCH+1 + self.seeds = self.seeds.split_off(&(retain_epoch + 1)); + } +} + +impl Default for DrbComputations { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/task-impls/src/quorum_vote/handlers.rs b/crates/task-impls/src/quorum_vote/handlers.rs index cd04b7f430..9bb73f6cb0 100644 --- a/crates/task-impls/src/quorum_vote/handlers.rs +++ b/crates/task-impls/src/quorum_vote/handlers.rs @@ -13,8 +13,7 @@ use committable::Committable; use hotshot_types::{ consensus::OuterConsensus, data::{Leaf2, QuorumProposal2, VidDisperseShare}, - drb::compute_drb_result, - event::{Event, EventType}, + event::{Event, EventType, LeafInfo}, message::{Proposal, UpgradeLock}, simple_vote::{QuorumData2, QuorumVote2}, traits::{ @@ -28,7 +27,6 @@ use hotshot_types::{ utils::epoch_from_block_number, vote::HasViewNumber, }; -use tokio::spawn; use tracing::instrument; use utils::anytrace::*; use vbs::version::StaticVersionType; @@ -43,6 +41,96 @@ use crate::{ quorum_vote::Versions, }; +/// Handles starting the DRB calculation. Uses the seed previously stored in +/// handle_quorum_proposal_validated_drb_calculation_seed +async fn handle_quorum_proposal_validated_drb_calculation_start< + TYPES: NodeType, + I: NodeImplementation, + V: Versions, +>( + proposal: &QuorumProposal2, + task_state: &mut QuorumVoteTaskState, +) { + let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number( + proposal.block_header.block_number(), + task_state.epoch_height, + )); + + // Start the new task if we're in the committee for this epoch + if task_state + .membership + .has_stake(&task_state.public_key, current_epoch_number) + { + task_state + .drb_computations + .start_task_if_not_running(current_epoch_number + 1) + .await; + } +} + +/// Handles storing the seed for an upcoming DRB calculation. +/// +/// We store the DRB computation seed 2 epochs in advance, if the decided block is the last but +/// third block in the current epoch and we are in the quorum committee of the next epoch. +/// +/// Special cases: +/// * Epoch 0: No DRB computation since we'll transition to epoch 1 immediately. +/// * Epoch 1 and 2: Use `[0u8; 32]` as the DRB result since when we first start the +/// computation in epoch 1, the result is for epoch 3. +/// +/// We don't need to handle the special cases explicitly here, because the first proposal +/// with which we'll start the DRB computation is for epoch 3. +fn handle_quorum_proposal_validated_drb_calculation_seed< + TYPES: NodeType, + I: NodeImplementation, + V: Versions, +>( + proposal: &QuorumProposal2, + task_state: &mut QuorumVoteTaskState, + leaf_views: &[LeafInfo], +) -> Result<()> { + // This is never none if we've reached a new decide, so this is safe to unwrap. + let decided_block_number = leaf_views + .last() + .unwrap() + .leaf + .block_header() + .block_number(); + + // Skip if this is not the expected block. + if task_state.epoch_height != 0 && (decided_block_number + 3) % task_state.epoch_height == 0 { + // Cancel old DRB computation tasks. + let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number( + decided_block_number, + task_state.epoch_height, + )); + + task_state + .drb_computations + .garbage_collect(current_epoch_number); + + // Skip if we are not in the committee of the next epoch. + if task_state + .membership + .has_stake(&task_state.public_key, current_epoch_number + 1) + { + let new_epoch_number = current_epoch_number + 2; + let Ok(drb_seed_input_vec) = bincode::serialize(&proposal.justify_qc.signatures) else { + bail!("Failed to serialize the QC signature."); + }; + let Ok(drb_seed_input) = drb_seed_input_vec.try_into() else { + bail!("Failed to convert the serialized QC signature into a DRB seed input."); + }; + + // Store the drb seed input for the next calculation + task_state + .drb_computations + .store_seed(new_epoch_number, drb_seed_input); + } + } + Ok(()) +} + /// Handles the `QuorumProposalValidated` event. #[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number))] pub(crate) async fn handle_quorum_proposal_validated< @@ -58,6 +146,10 @@ pub(crate) async fn handle_quorum_proposal_validated< .version(proposal.view_number()) .await?; + if version >= V::Epochs::VERSION { + handle_quorum_proposal_validated_drb_calculation_start(proposal, task_state).await; + } + let LeafChainTraversalOutcome { new_locked_view_number, new_decided_view_number, @@ -155,63 +247,12 @@ pub(crate) async fn handle_quorum_proposal_validated< .await; tracing::debug!("Successfully sent decide event"); - // Start the DRB computation two epochs in advance, if the decided block is the last but - // third block in the current epoch and we are in the quorum committee of the next epoch. - // - // Special cases: - // * Epoch 0: No DRB computation since we'll transition to epoch 1 immediately. - // * Epoch 1 and 2: Use `[0u8; 32]` as the DRB result since when we first start the - // computation in epoch 1, the result is for epoch 3. - // - // We don't need to handle the special cases explicitly here, because the first proposal - // with which we'll start the DRB computation is for epoch 3. if version >= V::Epochs::VERSION { - // This is never none if we've reached a new decide, so this is safe to unwrap. - let decided_block_number = leaf_views - .last() - .unwrap() - .leaf - .block_header() - .block_number(); - - // Skip if this is not the expected block. - if task_state.epoch_height != 0 - && (decided_block_number + 3) % task_state.epoch_height == 0 - { - // Cancel old DRB computation tasks. - let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number( - decided_block_number, - task_state.epoch_height, - )); - let current_tasks = task_state.drb_computations.split_off(¤t_epoch_number); - while let Some((_, task)) = task_state.drb_computations.pop_last() { - task.abort(); - } - task_state.drb_computations = current_tasks; - - // Skip if we are not in the committee of the next epoch. - if task_state - .membership - .has_stake(&task_state.public_key, current_epoch_number + 1) - { - let new_epoch_number = current_epoch_number + 2; - let Ok(drb_seed_input_vec) = - bincode::serialize(&proposal.justify_qc.signatures) - else { - bail!("Failed to serialize the QC signature."); - }; - let Ok(drb_seed_input) = drb_seed_input_vec.try_into() else { - bail!( - "Failed to convert the serialized QC signature into a DRB seed input." - ); - }; - let new_drb_task = - spawn(async move { compute_drb_result::(drb_seed_input) }); - task_state - .drb_computations - .insert(new_epoch_number, new_drb_task); - } - } + handle_quorum_proposal_validated_drb_calculation_seed( + proposal, + task_state, + &leaf_views, + )?; } } diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index 5952e64c87..4daaf78619 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -10,6 +10,7 @@ use async_broadcast::{InactiveReceiver, Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; use committable::Committable; +use drb_computations::DrbComputations; use hotshot_task::{ dependency::{AndDependency, EventDependency}, dependency_task::{DependencyTask, HandleDepOutput}, @@ -18,7 +19,6 @@ use hotshot_task::{ use hotshot_types::{ consensus::{ConsensusMetricsValue, OuterConsensus}, data::{Leaf2, QuorumProposal2}, - drb::DrbResult, event::Event, message::{Proposal, UpgradeLock}, simple_vote::HasEpoch, @@ -45,6 +45,9 @@ use crate::{ quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state}, }; +/// Helper for DRB Computations +pub mod drb_computations; + /// Event handlers for `QuorumProposalValidated`. mod handlers; @@ -278,7 +281,8 @@ pub struct QuorumVoteTaskState, V: pub membership: Arc, /// Table for the in-progress DRB computation tasks. - pub drb_computations: BTreeMap>, + //pub drb_computations: BTreeMap>, + pub drb_computations: DrbComputations, /// Output events to application pub output_event_stream: async_broadcast::Sender>,