From a2306c27a090faa63d127bb09be7c17e9c4a7e10 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Mon, 11 Nov 2024 15:49:42 -0500 Subject: [PATCH 01/18] removing some unused stuff and adding HighQC message + event --- crates/task-impls/src/events.rs | 7 ++++ crates/task-impls/src/network.rs | 5 +-- crates/types/src/message.rs | 56 +++++------------------------- crates/types/src/traits/network.rs | 9 +---- 4 files changed, 19 insertions(+), 58 deletions(-) diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index b78a688b61..9597b7cd45 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -252,6 +252,9 @@ pub enum HotShotEvent { TYPES::SignatureKey, Proposal>, ), + + /// A replica send us a High QC + HighQcRecv(QuorumCertificate), } impl HotShotEvent { @@ -333,6 +336,7 @@ impl HotShotEvent { | HotShotEvent::VidRequestRecv(request, _) => Some(request.view), HotShotEvent::VidResponseSend(_, _, proposal) | HotShotEvent::VidResponseRecv(_, proposal) => Some(proposal.data.view_number), + HotShotEvent::HighQcRecv(qc) => Some(qc.view_number()), } } } @@ -607,6 +611,9 @@ impl Display for HotShotEvent { proposal.data.view_number ) } + HotShotEvent::HighQcRecv(qc) => { + write!(f, "HighQcRecv(view_number={:?}", qc.view_number()) + } } } } diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 49ae29e964..fb6ada9362 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -75,7 +75,7 @@ impl NetworkMessageTaskState { GeneralConsensusMessage::ProposalRequested(req, sig) => { HotShotEvent::QuorumProposalRequestRecv(req, sig) } - GeneralConsensusMessage::LeaderProposalAvailable(proposal) => { + GeneralConsensusMessage::ProposalResponse(proposal) => { HotShotEvent::QuorumProposalResponseRecv(proposal) } GeneralConsensusMessage::Vote(vote) => { @@ -112,6 +112,7 @@ impl NetworkMessageTaskState { tracing::error!("Received upgrade vote!"); HotShotEvent::UpgradeVoteRecv(message) } + GeneralConsensusMessage::HighQC(qc) => HotShotEvent::HighQcRecv(qc), }, SequencingMessage::Da(da_message) => match da_message { DaConsensusMessage::DaProposal(proposal) => { @@ -410,7 +411,7 @@ impl< HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => Some(( sender_key.clone(), MessageKind::::from_consensus_message(SequencingMessage::General( - GeneralConsensusMessage::LeaderProposalAvailable(proposal), + GeneralConsensusMessage::ProposalResponse(proposal), )), TransmitType::Direct(sender_key), )), diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index 217b5d7578..bf31005c9a 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -29,7 +29,7 @@ use crate::{ data::{DaProposal, Leaf, QuorumProposal, UpgradeProposal, VidDisperseShare}, request_response::ProposalRequestPayload, simple_certificate::{ - DaCertificate, UpgradeCertificate, ViewSyncCommitCertificate2, + DaCertificate, QuorumCertificate, UpgradeCertificate, ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2, }, simple_vote::{ @@ -159,14 +159,6 @@ impl ViewMessage for MessageKind { MessageKind::External(_) => TYPES::View::new(1), } } - - fn purpose(&self) -> MessagePurpose { - match &self { - MessageKind::Consensus(message) => message.purpose(), - MessageKind::Data(_) => MessagePurpose::Data, - MessageKind::External(_) => MessagePurpose::External, - } - } } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] @@ -212,8 +204,11 @@ pub enum GeneralConsensusMessage { ::PureAssembledSignatureType, ), - /// The leader has responded with a valid proposal. - LeaderProposalAvailable(Proposal>), + /// A replica has responded with a valid proposal. + ProposalResponse(Proposal>), + + /// Message for the next leader containing our highest QC + HighQC(QuorumCertificate), } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)] @@ -258,7 +253,7 @@ impl SequencingMessage { p.data.view_number() } GeneralConsensusMessage::ProposalRequested(req, _) => req.view_number, - GeneralConsensusMessage::LeaderProposalAvailable(proposal) => { + GeneralConsensusMessage::ProposalResponse(proposal) => { proposal.data.view_number() } GeneralConsensusMessage::Vote(vote_message) => vote_message.view_number(), @@ -279,6 +274,7 @@ impl SequencingMessage { } GeneralConsensusMessage::UpgradeProposal(message) => message.data.view_number(), GeneralConsensusMessage::UpgradeVote(message) => message.view_number(), + GeneralConsensusMessage::HighQC(qc) => qc.view_number(), } } SequencingMessage::Da(da_message) => { @@ -295,42 +291,6 @@ impl SequencingMessage { } } } - - // TODO: Disable panic after the `ViewSync` case is implemented. - /// Get the message purpos - #[allow(clippy::panic)] - fn purpose(&self) -> MessagePurpose { - match &self { - SequencingMessage::General(general_message) => match general_message { - GeneralConsensusMessage::Proposal(_) => MessagePurpose::Proposal, - GeneralConsensusMessage::ProposalRequested(_, _) - | GeneralConsensusMessage::LeaderProposalAvailable(_) => { - MessagePurpose::LatestProposal - } - GeneralConsensusMessage::Vote(_) | GeneralConsensusMessage::TimeoutVote(_) => { - MessagePurpose::Vote - } - GeneralConsensusMessage::ViewSyncPreCommitVote(_) - | GeneralConsensusMessage::ViewSyncCommitVote(_) - | GeneralConsensusMessage::ViewSyncFinalizeVote(_) => MessagePurpose::ViewSyncVote, - - GeneralConsensusMessage::ViewSyncPreCommitCertificate(_) - | GeneralConsensusMessage::ViewSyncCommitCertificate(_) - | GeneralConsensusMessage::ViewSyncFinalizeCertificate(_) => { - MessagePurpose::ViewSyncCertificate - } - - GeneralConsensusMessage::UpgradeProposal(_) => MessagePurpose::UpgradeProposal, - GeneralConsensusMessage::UpgradeVote(_) => MessagePurpose::UpgradeVote, - }, - SequencingMessage::Da(da_message) => match da_message { - DaConsensusMessage::DaProposal(_) => MessagePurpose::Proposal, - DaConsensusMessage::DaVote(_) => MessagePurpose::Vote, - DaConsensusMessage::DaCertificate(_) => MessagePurpose::DaCertificate, - DaConsensusMessage::VidDisperseMsg(_) => MessagePurpose::VidDisperse, - }, - } - } } #[derive(Serialize, Deserialize, Derivative, Clone, Debug, PartialEq, Eq, Hash)] diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 7d03f17594..f40489f990 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -32,11 +32,7 @@ use rand::{ use serde::{Deserialize, Serialize}; use super::{node_implementation::NodeType, signature_key::SignatureKey}; -use crate::{ - data::ViewNumber, - message::{MessagePurpose, SequencingMessage}, - BoxSyncFuture, -}; +use crate::{data::ViewNumber, message::SequencingMessage, BoxSyncFuture}; /// Centralized server specific errors #[derive(Debug, Error, Serialize, Deserialize)] @@ -124,9 +120,6 @@ pub trait Id: Eq + PartialEq + Hash {} pub trait ViewMessage { /// get the view out of the message fn view_number(&self) -> TYPES::View; - // TODO move out of this trait. - /// get the purpose of the message - fn purpose(&self) -> MessagePurpose; } /// A request for some data that the consensus layer is asking for. From 6fd19b9ce3e741edc8d0ad51581b820adcc79ba2 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Mon, 11 Nov 2024 16:13:16 -0500 Subject: [PATCH 02/18] Update high qc events, add helper to send --- crates/task-impls/src/events.rs | 14 ++++++++-- crates/task-impls/src/helpers.rs | 28 +++++++++++++++++++ crates/task-impls/src/network.rs | 2 +- .../src/quorum_proposal_recv/mod.rs | 2 +- 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index 9597b7cd45..fee7f5d9f9 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -254,7 +254,10 @@ pub enum HotShotEvent { ), /// A replica send us a High QC - HighQcRecv(QuorumCertificate), + HighQcRecv(QuorumCertificate, TYPES::SignatureKey), + + /// Send our HighQC to the next leader, should go to the same leader as our vote + HighQcSend(QuorumCertificate, TYPES::SignatureKey), } impl HotShotEvent { @@ -336,7 +339,9 @@ impl HotShotEvent { | HotShotEvent::VidRequestRecv(request, _) => Some(request.view), HotShotEvent::VidResponseSend(_, _, proposal) | HotShotEvent::VidResponseRecv(_, proposal) => Some(proposal.data.view_number), - HotShotEvent::HighQcRecv(qc) => Some(qc.view_number()), + HotShotEvent::HighQcRecv(qc, _) | HotShotEvent::HighQcSend(qc, _) => { + Some(qc.view_number()) + } } } } @@ -611,9 +616,12 @@ impl Display for HotShotEvent { proposal.data.view_number ) } - HotShotEvent::HighQcRecv(qc) => { + HotShotEvent::HighQcRecv(qc, _) => { write!(f, "HighQcRecv(view_number={:?}", qc.view_number()) } + HotShotEvent::HighQcSend(qc, _) => { + write!(f, "HighQcSend(view_number={:?}", qc.view_number()) + } } } } diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 8fbfd6251d..9edd7c91dd 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -33,6 +33,7 @@ use hotshot_types::{ use tokio::{task::JoinHandle, time::timeout}; use tracing::instrument; use utils::anytrace::*; +use vbs::version::StaticVersionType; use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT}; @@ -342,6 +343,33 @@ pub async fn decide_from_proposal( res } +/// Send an event to the next leader containing the highest QC we have +/// This is a necessary part of HotStuff 2 but not the original HotStuff +/// +/// #Errors +/// Returns and error if we can't get the version or the version doesn't +/// yet support HS 2 +pub async fn send_high_qc( + event_sender: &Sender>>, + leader: TYPES::SignatureKey, + view: TYPES::View, + consensus: OuterConsensus, + upgrade_lock: &UpgradeLock, +) -> Result<()> { + let version = upgrade_lock.version(view).await?; + ensure!( + version >= V::Epochs::VERSION, + debug!("HotStuff 2 updgrade not yet in effect") + ); + let high_qc = consensus.read().await.high_qc().clone(); + broadcast_event( + Arc::new(HotShotEvent::HighQcSend(high_qc, leader)), + event_sender, + ) + .await; + Ok(()) +} + /// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not. #[instrument(skip_all)] #[allow(clippy::too_many_arguments)] diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index fb6ada9362..8d0d19e20e 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -112,7 +112,7 @@ impl NetworkMessageTaskState { tracing::error!("Received upgrade vote!"); HotShotEvent::UpgradeVoteRecv(message) } - GeneralConsensusMessage::HighQC(qc) => HotShotEvent::HighQcRecv(qc), + GeneralConsensusMessage::HighQC(qc) => HotShotEvent::HighQcRecv(qc, sender), }, SequencingMessage::Da(da_message) => match da_message { DaConsensusMessage::DaProposal(proposal) => { diff --git a/crates/task-impls/src/quorum_proposal_recv/mod.rs b/crates/task-impls/src/quorum_proposal_recv/mod.rs index f228edbc87..0077e62376 100644 --- a/crates/task-impls/src/quorum_proposal_recv/mod.rs +++ b/crates/task-impls/src/quorum_proposal_recv/mod.rs @@ -33,7 +33,7 @@ use vbs::version::Version; use self::handlers::handle_quorum_proposal_recv; use crate::{ events::{HotShotEvent, ProposalMissing}, - helpers::{broadcast_event, cancel_task, parent_leaf_and_state}, + helpers::{broadcast_event, cancel_task}, }; /// Event handlers for this task. mod handlers; From 3121fc58eb9aa758e3fd772db34334ce0f5346b4 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Mon, 11 Nov 2024 16:39:17 -0500 Subject: [PATCH 03/18] Send high Qc on view change --- crates/task-impls/src/consensus/handlers.rs | 34 +++++++++++++++++++++ crates/task-impls/src/helpers.rs | 28 ----------------- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs index 41db9a2e13..0e860cd2d4 100644 --- a/crates/task-impls/src/consensus/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -28,6 +28,7 @@ use crate::{ helpers::{broadcast_event, cancel_task}, vote_collection::handle_vote, }; +use vbs::version::StaticVersionType; /// Handle a `QuorumVoteRecv` event. pub(crate) async fn handle_quorum_vote_recv< @@ -107,6 +108,30 @@ pub(crate) async fn handle_timeout_vote_recv< Ok(()) } +/// Send an event to the next leader containing the highest QC we have +/// This is a necessary part of HotStuff 2 but not the original HotStuff +/// +/// #Errors +/// Returns and error if we can't get the version or the version doesn't +/// yet support HS 2 +pub async fn send_high_qc>( + new_view_number: TYPES::View, + sender: &Sender>>, + task_state: &mut ConsensusTaskState, +) -> Result<()> { + let version = task_state.upgrade_lock.version(new_view_number).await?; + ensure!( + version >= V::Epochs::VERSION, + debug!("HotStuff 2 updgrade not yet in effect") + ); + let high_qc = task_state.consensus.read().await.high_qc().clone(); + let leader = task_state + .quorum_membership + .leader(new_view_number, TYPES::Epoch::new(0))?; + broadcast_event(Arc::new(HotShotEvent::HighQcSend(high_qc, leader)), sender).await; + Ok(()) +} + /// Handle a `ViewChange` event. #[instrument(skip_all)] pub(crate) async fn handle_view_change< @@ -129,6 +154,15 @@ pub(crate) async fn handle_view_change< if *old_view_number / 100 != *new_view_number / 100 { tracing::info!("Progress: entered view {:>6}", *new_view_number); } + + // Send our high qc to the next leader immediately upon finishing a view. + // Part of HotStuff 2 + let _ = send_high_qc(new_view_number, sender, task_state) + .await + .inspect_err(|e| { + tracing::debug!("High QC sending failed with error: {:?}", e); + }); + // Move this node to the next view task_state.cur_view = new_view_number; diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 9edd7c91dd..8fbfd6251d 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -33,7 +33,6 @@ use hotshot_types::{ use tokio::{task::JoinHandle, time::timeout}; use tracing::instrument; use utils::anytrace::*; -use vbs::version::StaticVersionType; use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT}; @@ -343,33 +342,6 @@ pub async fn decide_from_proposal( res } -/// Send an event to the next leader containing the highest QC we have -/// This is a necessary part of HotStuff 2 but not the original HotStuff -/// -/// #Errors -/// Returns and error if we can't get the version or the version doesn't -/// yet support HS 2 -pub async fn send_high_qc( - event_sender: &Sender>>, - leader: TYPES::SignatureKey, - view: TYPES::View, - consensus: OuterConsensus, - upgrade_lock: &UpgradeLock, -) -> Result<()> { - let version = upgrade_lock.version(view).await?; - ensure!( - version >= V::Epochs::VERSION, - debug!("HotStuff 2 updgrade not yet in effect") - ); - let high_qc = consensus.read().await.high_qc().clone(); - broadcast_event( - Arc::new(HotShotEvent::HighQcSend(high_qc, leader)), - event_sender, - ) - .await; - Ok(()) -} - /// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not. #[instrument(skip_all)] #[allow(clippy::too_many_arguments)] From fa293eb5223c0067884ba38472cf93c24bad2347 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Mon, 11 Nov 2024 17:24:26 -0500 Subject: [PATCH 04/18] wait for highest qc in the proposal task --- .../src/quorum_proposal/handlers.rs | 92 +++++++++++++++---- crates/task-impls/src/quorum_proposal/mod.rs | 3 +- 2 files changed, 75 insertions(+), 20 deletions(-) diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 1add742a7a..27483813f1 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -7,12 +7,22 @@ //! This module holds the dependency task for the QuorumProposalTask. It is spawned whenever an event that could //! initiate a proposal occurs. -use std::{marker::PhantomData, sync::Arc}; +use std::{ + marker::PhantomData, + sync::Arc, + time::{Duration, Instant}, +}; +use crate::{ + events::HotShotEvent, + helpers::{broadcast_event, parent_leaf_and_state}, + quorum_proposal::{UpgradeLock, Versions}, +}; use anyhow::{ensure, Context, Result}; -use async_broadcast::{InactiveReceiver, Sender}; +use async_broadcast::{InactiveReceiver, Receiver, Sender}; use async_lock::RwLock; use hotshot_task::dependency_task::HandleDepOutput; +use hotshot_types::simple_certificate::QuorumCertificate; use hotshot_types::{ consensus::{CommitmentAndMetadata, OuterConsensus}, data::{Leaf, QuorumProposal, VidDisperse, ViewChangeEvidence}, @@ -27,12 +37,6 @@ use tracing::instrument; use utils::anytrace::*; use vbs::version::StaticVersionType; -use crate::{ - events::HotShotEvent, - helpers::{broadcast_event, parent_leaf_and_state}, - quorum_proposal::{UpgradeLock, Versions}, -}; - /// Proposal dependency types. These types represent events that precipitate a proposal. #[derive(PartialEq, Debug)] pub(crate) enum ProposalDependency { @@ -97,9 +101,62 @@ pub struct ProposalDependencyHandle { /// The node's id pub id: u64, + + /// The time this view started + pub view_start_time: Instant, } impl ProposalDependencyHandle { + /// Return the next HighQC we get from the event stream + async fn wait_for_qc_event( + &self, + rx: &mut Receiver>>, + ) -> Option> { + while let Ok(event) = rx.recv_direct().await { + if let HotShotEvent::HighQcRecv(qc, _sender) = event.as_ref() { + return Some(qc.clone()); + } + } + None + } + /// Waits for the ocnfigured timeout for nodes to send HighQC messages to us. We'll + /// then propose with the higest QC from among these proposals. + async fn wait_for_highest_qc(&mut self) -> QuorumCertificate { + let mut highest_qc = self.consensus.read().await.high_qc().clone(); + // If we haven't upgraded to Hotstuff 2 just return the high qc right away + if self + .upgrade_lock + .version(self.view_number) + .await + .is_ok_and(|version| version < V::Epochs::VERSION) + { + return highest_qc; + } + let mut rx = self.receiver.activate_cloned(); + + // TODO configure timeout + while self.view_start_time.elapsed() < Duration::from_secs(1) { + let Some(time_spent) = self.view_start_time.checked_duration_since(Instant::now()) + else { + return highest_qc; + }; + let Some(time_left) = Duration::from_secs(1).checked_sub(time_spent) else { + return highest_qc; + }; + let Ok(maybe_qc) = + tokio::time::timeout(time_left, self.wait_for_qc_event(&mut rx)).await + else { + return highest_qc; + }; + let Some(qc) = maybe_qc else { + continue; + }; + if qc.view_number() > highest_qc.view_number() { + highest_qc = qc; + } + } + highest_qc + } /// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`] /// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`], /// with optional [`ViewChangeEvidence`]. @@ -111,7 +168,7 @@ impl ProposalDependencyHandle { view_change_evidence: Option>, formed_upgrade_certificate: Option>, decided_upgrade_certificate: Arc>>>, - parent_view_number: TYPES::View, + parent_qc: QuorumCertificate, ) -> Result<()> { let (parent_leaf, state) = parent_leaf_and_state( self.view_number, @@ -122,7 +179,7 @@ impl ProposalDependencyHandle { self.private_key.clone(), OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), &self.upgrade_lock, - parent_view_number, + parent_qc.view_number(), ) .await?; @@ -166,8 +223,6 @@ impl ProposalDependencyHandle { let version = self.upgrade_lock.version(self.view_number).await?; - let high_qc = self.consensus.read().await.high_qc().clone(); - let builder_commitment = commitment_and_metadata.builder_commitment.clone(); let metadata = commitment_and_metadata.metadata.clone(); @@ -217,7 +272,7 @@ impl ProposalDependencyHandle { let proposal = QuorumProposal { block_header, view_number: self.view_number, - justify_qc: high_qc, + justify_qc: parent_qc, upgrade_certificate, proposal_certificate, }; @@ -262,12 +317,12 @@ impl HandleDepOutput for ProposalDependencyHandle< type Output = Vec>>>>; #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)] - async fn handle_dep_result(self, res: Self::Output) { + async fn handle_dep_result(mut self, res: Self::Output) { let mut commit_and_metadata: Option> = None; let mut timeout_certificate = None; let mut view_sync_finalize_cert = None; let mut vid_share = None; - let mut parent_view_number = None; + let mut parent_qc = None; for event in res.iter().flatten().flatten() { match event.as_ref() { HotShotEvent::SendPayloadCommitmentAndMetadata( @@ -293,7 +348,7 @@ impl HandleDepOutput for ProposalDependencyHandle< } either::Left(qc) => { // Handled by the UpdateHighQc event. - parent_view_number = Some(qc.view_number()); + parent_qc = Some(qc.clone()); } }, HotShotEvent::ViewSyncFinalizeCertificate2Recv(cert) => { @@ -306,8 +361,7 @@ impl HandleDepOutput for ProposalDependencyHandle< } } - let parent_view_number = - parent_view_number.unwrap_or(self.consensus.read().await.high_qc().view_number()); + let parent_qc = parent_qc.unwrap_or(self.wait_for_highest_qc().await); if commit_and_metadata.is_none() { tracing::error!( @@ -334,7 +388,7 @@ impl HandleDepOutput for ProposalDependencyHandle< proposal_cert, self.formed_upgrade_certificate.clone(), Arc::clone(&self.upgrade_lock.decided_upgrade_certificate), - parent_view_number, + parent_qc, ) .await { diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index 1cc857f335..f06014c05b 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -4,7 +4,7 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc, time::Instant}; use async_broadcast::{Receiver, Sender}; use async_lock::RwLock; @@ -328,6 +328,7 @@ impl, V: Versions> formed_upgrade_certificate: self.formed_upgrade_certificate.clone(), upgrade_lock: self.upgrade_lock.clone(), id: self.id, + view_start_time: Instant::now(), }, ); self.proposal_dependencies From 7655ca999a80099019d9045255b1364390e883bd Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Tue, 12 Nov 2024 16:27:51 -0500 Subject: [PATCH 05/18] fn stub --- crates/task-impls/src/helpers.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 8fbfd6251d..fcf080a71d 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -195,6 +195,14 @@ impl Default for LeafChainTraversalOutcome { } } +pub async fn decide_from_proposal_2( + proposal: &QuorumProposal, + consensus: OuterConsensus, + existing_upgrade_cert: Arc>>>, + public_key: &TYPES::SignatureKey, +) -> LeafChainTraversalOutcome { +} + /// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin /// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is /// one view newer), then we begin attempting to form the chain. This is a direct impl from From 09f6f2f955662a0c9147b5abc6e9b066cbdc0045 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Tue, 12 Nov 2024 21:14:27 -0500 Subject: [PATCH 06/18] working on fn --- crates/task-impls/src/helpers.rs | 10 ++++++++- crates/types/src/consensus.rs | 37 ++++++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index fab0f53de4..ac99c04005 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -14,7 +14,7 @@ use async_lock::RwLock; use committable::{Commitment, Committable}; use hotshot_task::dependency::{Dependency, EventDependency}; use hotshot_types::{ - consensus::OuterConsensus, + consensus::{self, OuterConsensus}, data::{Leaf, QuorumProposal, ViewChangeEvidence}, event::{Event, EventType, LeafInfo}, message::{Proposal, UpgradeLock}, @@ -201,6 +201,14 @@ pub async fn decide_from_proposal_2( existing_upgrade_cert: Arc>>>, public_key: &TYPES::SignatureKey, ) -> LeafChainTraversalOutcome { + let mut res = LeafChainTraversalOutcome::default(); + let consensus_reader = consensus.read().await; + let proposed_leaf = Leaf::from_quorum_proposal(proposal); + let Some(parent_info) = consensus_reader.parent_leaf_info(&proposed_leaf, public_key) else { + return res; + }; + + res } /// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 72964d3705..a83c2a63b4 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -23,7 +23,7 @@ pub use crate::utils::{View, ViewInner}; use crate::{ data::{Leaf, QuorumProposal, VidDisperse, VidDisperseShare}, error::HotShotError, - event::HotShotAction, + event::{HotShotAction, LeafInfo}, message::{Proposal, UpgradeLock}, simple_certificate::{DaCertificate, QuorumCertificate}, traits::{ @@ -33,7 +33,10 @@ use crate::{ signature_key::SignatureKey, BlockPayload, ValidatedState, }, - utils::{BuilderCommitment, StateAndDelta, Terminator, Terminator::Inclusive}, + utils::{ + BuilderCommitment, StateAndDelta, + Terminator::{self, Inclusive}, + }, vid::VidCommitment, vote::{Certificate, HasViewNumber}, }; @@ -495,6 +498,36 @@ impl Consensus { Ok(()) } + /// Get the parent Leaf Info from a given leaf and our public key. + /// Returns None if we don't have the data in out state + pub fn parent_leaf_info( + &self, + leaf: &Leaf, + public_key: &TYPES::SignatureKey, + ) -> Option> { + let parent_view_number = leaf.justify_qc().view_number(); + let parent_leaf = self + .saved_leaves + .get(&leaf.justify_qc().data().leaf_commit)?; + let parent_state_and_delta = self.state_and_delta(parent_view_number); + let (Some(state), delta) = parent_state_and_delta else { + return None; + }; + let parent_vid = self + .vid_shares() + .get(&parent_view_number)? + .get(public_key) + .cloned() + .map(|prop| prop.data); + + Some(LeafInfo { + leaf: parent_leaf.clone(), + state, + delta, + vid_share: parent_vid, + }) + } + /// Update the current epoch. /// # Errors /// Can return an error when the new epoch_number is not higher than the existing epoch number. From 80d33d8598f5b56658659173a6e150547cc33281 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Tue, 12 Nov 2024 22:12:40 -0500 Subject: [PATCH 07/18] new decide rule and locked qc rule for hs2 --- crates/task-impls/src/helpers.rs | 69 ++++++++++++++++++- .../src/quorum_proposal_recv/handlers.rs | 12 +++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 2fa60e86f5..3d44afdb1c 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -14,7 +14,7 @@ use async_lock::RwLock; use committable::{Commitment, Committable}; use hotshot_task::dependency::{Dependency, EventDependency}; use hotshot_types::{ - consensus::{self, OuterConsensus}, + consensus::OuterConsensus, data::{Leaf, QuorumProposal, ViewChangeEvidence}, event::{Event, EventType, LeafInfo}, message::{Proposal, UpgradeLock}, @@ -191,6 +191,10 @@ impl Default for LeafChainTraversalOutcome { } } +/// calculate the new decided leaf chain based on the rules of hostuff 2 +/// +/// # Panics +/// Can't actually panic pub async fn decide_from_proposal_2( proposal: &QuorumProposal, consensus: OuterConsensus, @@ -200,9 +204,72 @@ pub async fn decide_from_proposal_2( let mut res = LeafChainTraversalOutcome::default(); let consensus_reader = consensus.read().await; let proposed_leaf = Leaf::from_quorum_proposal(proposal); + res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number()); + + // If we don't have the proposals parent return early let Some(parent_info) = consensus_reader.parent_leaf_info(&proposed_leaf, public_key) else { return res; }; + // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided + // the grandparents view. If not we're done. + let Some(grand_parent_info) = consensus_reader.parent_leaf_info(&parent_info.leaf, public_key) + else { + return res; + }; + if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() { + return res; + } + res.new_decide_qc = Some(parent_info.leaf.justify_qc().clone()); + let decided_view_number = grand_parent_info.leaf.view_number(); + res.new_decided_view_number = Some(decided_view_number); + // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it. + let old_anchor_view = consensus_reader.last_decided_view(); + let mut current_leaf_info = Some(grand_parent_info); + let existing_upgrade_cert_reader = existing_upgrade_cert.read().await; + let mut txns = HashSet::new(); + while current_leaf_info + .as_ref() + .is_some_and(|info| info.leaf.view_number() > old_anchor_view) + { + // unwrap is safe, we just checked that he option is some + let info = &mut current_leaf_info.unwrap(); + // Check if there's a new upgrade certificate available. + if let Some(cert) = info.leaf.upgrade_certificate() { + if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader { + if cert.data.decide_by < decided_view_number { + tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring."); + } else { + tracing::info!("Reached decide on upgrade certificate: {:?}", cert); + res.decided_upgrade_cert = Some(cert.clone()); + } + } + } + + res.leaf_views.push(info.clone()); + // If the block payload is available for this leaf, include it in + // the leaf chain that we send to the client. + if let Some(encoded_txns) = consensus_reader + .saved_payloads() + .get(&info.leaf.view_number()) + { + let payload = + BlockPayload::from_bytes(encoded_txns, info.leaf.block_header().metadata()); + + info.leaf.fill_block_payload_unchecked(payload); + } + + if let Some(ref payload) = info.leaf.block_payload() { + for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) { + txns.insert(txn); + } + } + + current_leaf_info = consensus_reader.parent_leaf_info(&info.leaf, public_key); + } + + if !txns.is_empty() { + res.included_txns = Some(txns); + } res } diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs index 1a9479e1ad..615215c322 100644 --- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs +++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs @@ -39,7 +39,7 @@ use crate::{ }, quorum_proposal_recv::{UpgradeLock, Versions}, }; - +use vbs::version::StaticVersionType; /// Update states in the event that the parent state is not found for a given `proposal`. #[instrument(skip_all)] async fn validate_proposal_liveness, V: Versions>( @@ -76,6 +76,16 @@ async fn validate_proposal_liveness consensus_writer.locked_view(); + // if we are using HS2 we update our locked view for any QC from a leader greater than our current lock + if liveness_check + && validation_info + .upgrade_lock + .version(leaf.view_number()) + .await + .is_ok_and(|v| v >= V::Epochs::VERSION) + { + consensus_writer.update_locked_view(proposal.data.justify_qc.clone().view_number())?; + } drop(consensus_writer); From 4f6c364e061c434237eb9f30cbef953de3e2e3b3 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 14:46:30 -0500 Subject: [PATCH 08/18] Merge main --- crates/example-types/src/storage_types.rs | 5 + crates/hotshot/src/lib.rs | 15 ++- crates/hotshot/src/tasks/task_state.rs | 3 + crates/task-impls/src/consensus/handlers.rs | 7 +- crates/task-impls/src/consensus/mod.rs | 11 +- crates/task-impls/src/da.rs | 9 +- crates/task-impls/src/events.rs | 11 +- crates/task-impls/src/network.rs | 5 +- crates/task-impls/src/quorum_proposal/mod.rs | 2 +- .../src/quorum_proposal_recv/handlers.rs | 30 ++++- .../src/quorum_proposal_recv/mod.rs | 28 +++-- crates/task-impls/src/quorum_vote/mod.rs | 119 +++++++++--------- crates/task-impls/src/request.rs | 2 +- crates/task-impls/src/transactions.rs | 7 +- crates/task-impls/src/upgrade.rs | 7 +- crates/task-impls/src/vid.rs | 7 +- crates/task-impls/src/view_sync.rs | 17 ++- crates/testing/src/predicates/event.rs | 2 +- crates/testing/src/spinning_task.rs | 2 + crates/testing/tests/tests_1/da_task.rs | 8 +- .../tests_1/quorum_proposal_recv_task.rs | 5 +- .../testing/tests/tests_1/quorum_vote_task.rs | 4 +- .../testing/tests/tests_1/transaction_task.rs | 7 +- .../tests/tests_1/upgrade_task_with_vote.rs | 10 +- crates/testing/tests/tests_1/vid_task.rs | 4 +- .../testing/tests/tests_1/view_sync_task.rs | 6 +- .../tests/tests_1/vote_dependency_handle.rs | 7 +- crates/types/src/consensus.rs | 118 ++++++++--------- crates/types/src/utils.rs | 14 ++- 29 files changed, 289 insertions(+), 183 deletions(-) diff --git a/crates/example-types/src/storage_types.rs b/crates/example-types/src/storage_types.rs index e93a9b3e72..c4be058fe4 100644 --- a/crates/example-types/src/storage_types.rs +++ b/crates/example-types/src/storage_types.rs @@ -42,6 +42,7 @@ pub struct TestStorageState { proposals: BTreeMap>>, high_qc: Option>, action: TYPES::View, + epoch: TYPES::Epoch, } impl Default for TestStorageState { @@ -52,6 +53,7 @@ impl Default for TestStorageState { proposals: BTreeMap::new(), high_qc: None, action: TYPES::View::genesis(), + epoch: TYPES::Epoch::genesis(), } } } @@ -101,6 +103,9 @@ impl TestStorage { pub async fn last_actioned_view(&self) -> TYPES::View { self.inner.read().await.action } + pub async fn last_actioned_epoch(&self) -> TYPES::Epoch { + self.inner.read().await.epoch + } } #[async_trait] diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index b507a0a129..05ccea0a0c 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -128,6 +128,9 @@ pub struct SystemContext, V: Versi /// The view to enter when first starting consensus start_view: TYPES::View, + /// The epoch to enter when first starting consensus + start_epoch: TYPES::Epoch, + /// Access to the output event stream. output_event_stream: (Sender>, InactiveReceiver>), @@ -171,6 +174,7 @@ impl, V: Versions> Clone consensus: self.consensus.clone(), instance_state: Arc::clone(&self.instance_state), start_view: self.start_view, + start_epoch: self.start_epoch, output_event_stream: self.output_event_stream.clone(), external_event_stream: self.external_event_stream.clone(), anchored_leaf: self.anchored_leaf.clone(), @@ -345,6 +349,7 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext { /// Starting view number that should be equivelant to the view the node shut down with last. start_view: TYPES::View, + /// Starting epoch number that should be equivelant to the epoch the node shut down with last. + start_epoch: TYPES::Epoch, /// The view we last performed an action in. An action is Proposing or voting for /// Either the quorum or DA. actioned_view: TYPES::View, @@ -1000,6 +1010,7 @@ impl HotShotInitializer { validated_state: Some(Arc::new(validated_state)), state_delta: Some(Arc::new(state_delta)), start_view: TYPES::View::new(0), + start_epoch: TYPES::Epoch::new(0), actioned_view: TYPES::View::new(0), saved_proposals: BTreeMap::new(), high_qc, @@ -1023,6 +1034,7 @@ impl HotShotInitializer { instance_state: TYPES::InstanceState, validated_state: Option>, start_view: TYPES::View, + start_epoch: TYPES::Epoch, actioned_view: TYPES::View, saved_proposals: BTreeMap>>, high_qc: QuorumCertificate, @@ -1036,6 +1048,7 @@ impl HotShotInitializer { validated_state, state_delta: None, start_view, + start_epoch, actioned_view, saved_proposals, high_qc, diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index de7e0b8739..09bc3b0eb4 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -241,6 +241,7 @@ impl, V: Versions> CreateTaskState id: handle.hotshot.id, storage: Arc::clone(&handle.storage), upgrade_lock: handle.hotshot.upgrade_lock.clone(), + epoch_height: handle.hotshot.config.epoch_height, } } } @@ -293,6 +294,7 @@ impl, V: Versions> CreateTaskState spawned_tasks: BTreeMap::new(), id: handle.hotshot.id, upgrade_lock: handle.hotshot.upgrade_lock.clone(), + epoch_height: handle.hotshot.config.epoch_height, } } } @@ -324,6 +326,7 @@ impl, V: Versions> CreateTaskState consensus: OuterConsensus::new(consensus), id: handle.hotshot.id, upgrade_lock: handle.hotshot.upgrade_lock.clone(), + epoch_height: handle.hotshot.config.epoch_height, } } } diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs index c0bde143ac..0bb75d800b 100644 --- a/crates/task-impls/src/consensus/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -138,9 +138,15 @@ pub(crate) async fn handle_view_change< V: Versions, >( new_view_number: TYPES::View, + epoch_number: TYPES::Epoch, sender: &Sender>>, task_state: &mut ConsensusTaskState, ) -> Result<()> { + if epoch_number > task_state.cur_epoch { + task_state.cur_epoch = epoch_number; + tracing::info!("Progress: entered epoch {:>6}", *epoch_number); + } + ensure!( new_view_number > task_state.cur_view, "New view is not larger than the current view" @@ -163,7 +169,6 @@ pub(crate) async fn handle_view_change< // Move this node to the next view task_state.cur_view = new_view_number; - task_state .consensus .write() diff --git a/crates/task-impls/src/consensus/mod.rs b/crates/task-impls/src/consensus/mod.rs index e4d75057ac..15d6bc6ec8 100644 --- a/crates/task-impls/src/consensus/mod.rs +++ b/crates/task-impls/src/consensus/mod.rs @@ -92,10 +92,13 @@ pub struct ConsensusTaskState, V: /// Lock for a decided upgrade pub upgrade_lock: UpgradeLock, + + /// Number of blocks in an epoch, zero means there are no epochs + pub epoch_height: u64, } impl, V: Versions> ConsensusTaskState { /// Handles a consensus event received on the event stream - #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")] + #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = *self.cur_epoch), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")] pub async fn handle( &mut self, event: Arc>, @@ -116,8 +119,10 @@ impl, V: Versions> ConsensusTaskSt tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}"); } } - HotShotEvent::ViewChange(new_view_number) => { - if let Err(e) = handle_view_change(*new_view_number, &sender, self).await { + HotShotEvent::ViewChange(new_view_number, epoch_number) => { + if let Err(e) = + handle_view_change(*new_view_number, *epoch_number, &sender, self).await + { tracing::trace!("Failed to handle ViewChange event; error = {e}"); } } diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 33e36c04f0..d0182460f2 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -84,7 +84,7 @@ pub struct DaTaskState, V: Version impl, V: Versions> DaTaskState { /// main task event handler - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "DA Main Task", level = "error", target = "DaTaskState")] + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "DA Main Task", level = "error", target = "DaTaskState")] pub async fn handle( &mut self, event: Arc>, @@ -285,9 +285,12 @@ impl, V: Versions> DaTaskState { - let view = *view; + HotShotEvent::ViewChange(view, epoch) => { + if *epoch > self.cur_epoch { + self.cur_epoch = *epoch; + } + let view = *view; ensure!( *self.cur_view < *view, info!("Received a view change to an older view.") diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index 98ecd05851..80b5b34c61 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -122,7 +122,7 @@ pub enum HotShotEvent { /// The DA leader has collected enough votes to form a DAC; emitted by the DA leader in the DA task; sent to the entire network via the networking task DacSend(DaCertificate, TYPES::SignatureKey), /// The current view has changed; emitted by the replica in the consensus task or replica in the view sync task; received by almost all other tasks - ViewChange(TYPES::View), + ViewChange(TYPES::View, TYPES::Epoch), /// Timeout for the view sync protocol; emitted by a replica in the view sync task ViewSyncTimeout(TYPES::View, u64, ViewSyncPhase), @@ -300,7 +300,7 @@ impl HotShotEvent { } HotShotEvent::QuorumProposalRequestSend(req, _) | HotShotEvent::QuorumProposalRequestRecv(req, _) => Some(req.view_number), - HotShotEvent::ViewChange(view_number) + HotShotEvent::ViewChange(view_number, _) | HotShotEvent::ViewSyncTimeout(view_number, _, _) | HotShotEvent::ViewSyncTrigger(view_number) | HotShotEvent::Timeout(view_number) => Some(*view_number), @@ -388,8 +388,11 @@ impl Display for HotShotEvent { HotShotEvent::DacSend(cert, _) => { write!(f, "DacSend(view_number={:?})", cert.view_number()) } - HotShotEvent::ViewChange(view_number) => { - write!(f, "ViewChange(view_number={view_number:?})") + HotShotEvent::ViewChange(view_number, epoch_number) => { + write!( + f, + "ViewChange(view_number={view_number:?}, epoch_number={epoch_number:?})" + ) } HotShotEvent::ViewSyncTimeout(view_number, _, _) => { write!(f, "ViewSyncTimeout(view_number={view_number:?})") diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 99e64f63b7..79922d44f4 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -610,8 +610,11 @@ impl< TransmitType::Direct(leader), )) } - HotShotEvent::ViewChange(view) => { + HotShotEvent::ViewChange(view, epoch) => { self.view = view; + if epoch > self.epoch { + self.epoch = epoch; + } self.cancel_tasks(view); let net = Arc::clone(&self.network); let epoch = self.epoch.u64(); diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index 5252dd9927..5790e359fb 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -504,7 +504,7 @@ impl, V: Versions> Arc::clone(&event), )?; } - HotShotEvent::ViewChange(view) | HotShotEvent::Timeout(view) => { + HotShotEvent::ViewChange(view, _) | HotShotEvent::Timeout(view) => { self.cancel_tasks(*view); } _ => {} diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs index 615215c322..d7ce8aefc0 100644 --- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs +++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use async_broadcast::{broadcast, Receiver, Sender}; use async_lock::RwLockUpgradableReadGuard; use committable::Committable; +use hotshot_types::traits::block_contents::BlockHeader; use hotshot_types::{ consensus::OuterConsensus, data::{Leaf, QuorumProposal}, @@ -18,12 +19,12 @@ use hotshot_types::{ simple_certificate::QuorumCertificate, traits::{ election::Membership, - node_implementation::{NodeImplementation, NodeType}, + node_implementation::{ConsensusTime, NodeImplementation, NodeType}, signature_key::SignatureKey, storage::Storage, ValidatedState, }, - utils::{View, ViewInner}, + utils::{epoch_from_block_number, View, ViewInner}, vote::{Certificate, HasViewNumber}, }; use tokio::spawn; @@ -237,8 +238,18 @@ pub(crate) async fn handle_quorum_proposal_recv< justify_qc.data.leaf_commit ); validate_proposal_liveness(proposal, &validation_info).await?; + let block_number = proposal.data.block_header.block_number(); + let epoch = TYPES::Epoch::new(epoch_from_block_number( + block_number, + validation_info.epoch_height, + )); + tracing::trace!( + "Sending ViewChange for view {} and epoch {}", + view_number, + *epoch + ); broadcast_event( - Arc::new(HotShotEvent::ViewChange(view_number)), + Arc::new(HotShotEvent::ViewChange(view_number, epoch)), event_sender, ) .await; @@ -254,8 +265,19 @@ pub(crate) async fn handle_quorum_proposal_recv< quorum_proposal_sender_key, ) .await?; + + let epoch_number = TYPES::Epoch::new(epoch_from_block_number( + proposal.data.block_header.block_number(), + validation_info.epoch_height, + )); + + tracing::trace!( + "Sending ViewChange for view {} and epoch {}", + view_number, + *epoch_number + ); broadcast_event( - Arc::new(HotShotEvent::ViewChange(view_number)), + Arc::new(HotShotEvent::ViewChange(view_number, epoch_number)), event_sender, ) .await; diff --git a/crates/task-impls/src/quorum_proposal_recv/mod.rs b/crates/task-impls/src/quorum_proposal_recv/mod.rs index de584403d9..332ecaf241 100644 --- a/crates/task-impls/src/quorum_proposal_recv/mod.rs +++ b/crates/task-impls/src/quorum_proposal_recv/mod.rs @@ -8,14 +8,20 @@ use std::{collections::BTreeMap, sync::Arc}; +use self::handlers::handle_quorum_proposal_recv; +use crate::{ + events::{HotShotEvent, ProposalMissing}, + helpers::{broadcast_event, fetch_proposal}, +}; use async_broadcast::{broadcast, Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; -use futures::future::join_all; +use either::Either; +use futures::future::{err, join_all}; use hotshot_task::task::{Task, TaskState}; use hotshot_types::{ consensus::{Consensus, OuterConsensus}, - data::{Leaf, ViewChangeEvidence}, + data::{EpochNumber, Leaf, ViewChangeEvidence}, event::Event, message::UpgradeLock, simple_certificate::UpgradeCertificate, @@ -23,15 +29,12 @@ use hotshot_types::{ node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions}, signature_key::SignatureKey, }, - vote::HasViewNumber, + vote::{Certificate, HasViewNumber}, }; use tokio::task::JoinHandle; use tracing::{debug, error, info, instrument, warn}; use utils::anytrace::{bail, Result}; use vbs::version::Version; - -use self::handlers::handle_quorum_proposal_recv; -use crate::events::{HotShotEvent, ProposalMissing}; /// Event handlers for this task. mod handlers; @@ -74,6 +77,9 @@ pub struct QuorumProposalRecvTaskState, + + /// Number of blocks in an epoch, zero means there are no epochs + pub epoch_height: u64, } /// all the info we need to validate a proposal. This makes it easy to spawn an effemeral task to @@ -97,6 +103,8 @@ pub(crate) struct ValidationInfo, pub(crate) storage: Arc>, /// Lock for a decided upgrade pub(crate) upgrade_lock: UpgradeLock, + /// Number of blocks in an epoch, zero means there are no epochs + pub epoch_height: u64, } impl, V: Versions> @@ -114,7 +122,7 @@ impl, V: Versions> } /// Handles all consensus events relating to propose and vote-enabling events. - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Consensus replica task", level = "error")] + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "Consensus replica task", level = "error")] #[allow(unused_variables)] pub async fn handle( &mut self, @@ -140,6 +148,7 @@ impl, V: Versions> output_event_stream: self.output_event_stream.clone(), storage: Arc::clone(&self.storage), upgrade_lock: self.upgrade_lock.clone(), + epoch_height: self.epoch_height, }; match handle_quorum_proposal_recv( proposal, @@ -154,7 +163,10 @@ impl, V: Versions> Err(e) => debug!(?e, "Failed to validate the proposal"), } } - HotShotEvent::ViewChange(view) => { + HotShotEvent::ViewChange(view, epoch) => { + if *epoch > self.cur_epoch { + self.cur_epoch = *epoch; + } if self.cur_view >= *view { return; } diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index 48d4c5997d..053ef851f9 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -6,6 +6,11 @@ use std::{collections::BTreeMap, sync::Arc}; +use crate::{ + events::HotShotEvent, + helpers::broadcast_event, + quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state}, +}; use async_broadcast::{InactiveReceiver, Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; @@ -26,6 +31,7 @@ use hotshot_types::{ signature_key::SignatureKey, storage::Storage, }, + utils::epoch_from_block_number, vid::vid_scheme, vote::{Certificate, HasViewNumber}, }; @@ -35,12 +41,6 @@ use tracing::instrument; use utils::anytrace::*; use vbs::version::StaticVersionType; -use crate::{ - events::HotShotEvent, - helpers::broadcast_event, - quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state}, -}; - /// Event handlers for `QuorumProposalValidated`. mod handlers; @@ -71,8 +71,6 @@ pub struct VoteDependencyHandle, V pub storage: Arc>, /// View number to vote on. pub view_number: TYPES::View, - /// Epoch number to vote on. - pub epoch_number: TYPES::Epoch, /// Event sender. pub sender: Sender>>, /// Event receiver. @@ -81,6 +79,8 @@ pub struct VoteDependencyHandle, V pub upgrade_lock: UpgradeLock, /// The node's id pub id: u64, + /// Number of blocks in an epoch, zero means there are no epochs + pub epoch_height: u64, } impl + 'static, V: Versions> HandleDepOutput @@ -115,7 +115,7 @@ impl + 'static, V: Versions> Handl .consensus .read() .await - .is_qc_forming_eqc(&proposal.data.justify_qc) + .is_leaf_forming_eqc(proposal.data.justify_qc.data.leaf_commit) { tracing::debug!("Do not vote here. Voting for this case is handled in QuorumVoteTaskState"); return; @@ -168,11 +168,6 @@ impl + 'static, V: Versions> Handl } } - broadcast_event( - Arc::new(HotShotEvent::ViewChange(self.view_number + 1)), - &self.sender, - ) - .await; let Some(vid_share) = vid_share else { tracing::error!( "We don't have the VID share for this view {:?}, but we should, because the vote dependencies have completed.", @@ -211,6 +206,22 @@ impl + 'static, V: Versions> Handl return; } + let current_epoch = + TYPES::Epoch::new(epoch_from_block_number(leaf.height(), self.epoch_height)); + tracing::trace!( + "Sending ViewChange for view {} and epoch {}", + self.view_number + 1, + *current_epoch + ); + broadcast_event( + Arc::new(HotShotEvent::ViewChange( + self.view_number + 1, + current_epoch, + )), + &self.sender, + ) + .await; + if let Err(e) = submit_vote::( self.sender.clone(), Arc::clone(&self.quorum_membership), @@ -218,7 +229,7 @@ impl + 'static, V: Versions> Handl self.private_key.clone(), self.upgrade_lock.clone(), self.view_number, - self.epoch_number, + current_epoch, Arc::clone(&self.storage), leaf, vid_share, @@ -272,6 +283,9 @@ pub struct QuorumVoteTaskState, V: /// Lock for a decided upgrade pub upgrade_lock: UpgradeLock, + + /// Number of blocks in an epoch, zero means there are no epochs + pub epoch_height: u64, } impl, V: Versions> QuorumVoteTaskState { @@ -325,7 +339,6 @@ impl, V: Versions> QuorumVoteTaskS fn create_dependency_task_if_new( &mut self, view_number: TYPES::View, - epoch_number: TYPES::Epoch, event_receiver: Receiver>>, event_sender: &Sender>>, event: Option>>, @@ -364,11 +377,11 @@ impl, V: Versions> QuorumVoteTaskS quorum_membership: Arc::clone(&self.quorum_membership), storage: Arc::clone(&self.storage), view_number, - epoch_number, sender: event_sender.clone(), receiver: event_receiver.clone().deactivate(), upgrade_lock: self.upgrade_lock.clone(), id: self.id, + epoch_height: self.epoch_height, }, ); self.vote_dependencies @@ -432,25 +445,18 @@ impl, V: Versions> QuorumVoteTaskS .version(proposal.data.view_number()) .await?; - let consensus_reader = self.consensus.read().await; - let cur_epoch = consensus_reader.cur_epoch(); - let is_qc_forming_eqc = - consensus_reader.is_qc_forming_eqc(&proposal.data.justify_qc); - drop(consensus_reader); - - if version >= V::Epochs::VERSION && is_qc_forming_eqc { - self.handle_eqc_voting( - proposal, - parent_leaf, - event_sender, - event_receiver, - cur_epoch, - ) - .await; + let is_justify_qc_forming_eqc = self + .consensus + .read() + .await + .is_leaf_forming_eqc(proposal.data.justify_qc.data.leaf_commit); + + if version >= V::Epochs::VERSION && is_justify_qc_forming_eqc { + self.handle_eqc_voting(proposal, parent_leaf, event_sender, event_receiver) + .await; } else { self.create_dependency_task_if_new( proposal.data.view_number, - cur_epoch, event_receiver, &event_sender, Some(Arc::clone(&event)), @@ -486,13 +492,7 @@ impl, V: Versions> QuorumVoteTaskS &event_sender.clone(), ) .await; - self.create_dependency_task_if_new( - view, - cur_epoch, - event_receiver, - &event_sender, - None, - ); + self.create_dependency_task_if_new(view, event_receiver, &event_sender, None); } HotShotEvent::VidShareRecv(sender, disperse) => { let view = disperse.data.view_number(); @@ -550,13 +550,7 @@ impl, V: Versions> QuorumVoteTaskS &event_sender.clone(), ) .await; - self.create_dependency_task_if_new( - view, - cur_epoch, - event_receiver, - &event_sender, - None, - ); + self.create_dependency_task_if_new(view, event_receiver, &event_sender, None); } HotShotEvent::Timeout(view) => { let view = TYPES::View::new(view.saturating_sub(1)); @@ -567,7 +561,7 @@ impl, V: Versions> QuorumVoteTaskS } self.vote_dependencies = current_tasks; } - HotShotEvent::ViewChange(mut view) => { + HotShotEvent::ViewChange(mut view, _) => { view = TYPES::View::new(view.saturating_sub(1)); if !self.update_latest_voted_view(view).await { tracing::debug!("view not updated"); @@ -585,13 +579,13 @@ impl, V: Versions> QuorumVoteTaskS } /// Handles voting for the last block in the epoch to form the Extended QC. + #[allow(clippy::too_many_lines)] async fn handle_eqc_voting( &self, proposal: &Proposal>, parent_leaf: &Leaf, event_sender: Sender>>, event_receiver: Receiver>>, - epoch_number: TYPES::Epoch, ) { tracing::info!("Reached end of epoch. Justify QC is for the last block in the epoch."); let proposed_leaf = Leaf::from_quorum_proposal(&proposal.data); @@ -639,12 +633,6 @@ impl, V: Versions> QuorumVoteTaskS return; } - broadcast_event( - Arc::new(HotShotEvent::ViewChange(proposal.data.view_number() + 1)), - &event_sender, - ) - .await; - // Update internal state if let Err(e) = update_shared_state::( OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), @@ -667,6 +655,25 @@ impl, V: Versions> QuorumVoteTaskS return; } + let current_block_number = proposed_leaf.height(); + let current_epoch = TYPES::Epoch::new(epoch_from_block_number( + current_block_number, + self.epoch_height, + )); + tracing::trace!( + "Sending ViewChange for view {} and epoch {}", + proposal.data.view_number() + 1, + *current_epoch + ); + broadcast_event( + Arc::new(HotShotEvent::ViewChange( + proposal.data.view_number() + 1, + current_epoch, + )), + &event_sender, + ) + .await; + if let Err(e) = submit_vote::( event_sender.clone(), Arc::clone(&self.quorum_membership), @@ -674,7 +681,7 @@ impl, V: Versions> QuorumVoteTaskS self.private_key.clone(), self.upgrade_lock.clone(), proposal.data.view_number(), - epoch_number, + current_epoch, Arc::clone(&self.storage), proposed_leaf, updated_vid, diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index 23646c447e..1a565a85de 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -112,7 +112,7 @@ impl> TaskState for NetworkRequest } Ok(()) } - HotShotEvent::ViewChange(view) => { + HotShotEvent::ViewChange(view, _) => { let view = *view; if view > self.view { self.view = view; diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index 5b286bb095..43cdef8bfc 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -452,7 +452,7 @@ impl, V: Versions> TransactionTask } /// main task event handler - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")] + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "Transaction task", level = "error", target = "TransactionTaskState")] pub async fn handle( &mut self, event: Arc>, @@ -471,7 +471,10 @@ impl, V: Versions> TransactionTask ) .await; } - HotShotEvent::ViewChange(view) => { + HotShotEvent::ViewChange(view, epoch) => { + if *epoch > self.cur_epoch { + self.cur_epoch = *epoch; + } let view = TYPES::View::new(std::cmp::max(1, **view)); ensure!( *view > *self.cur_view, diff --git a/crates/task-impls/src/upgrade.rs b/crates/task-impls/src/upgrade.rs index 1eb60cf98f..71cd0a7ef0 100644 --- a/crates/task-impls/src/upgrade.rs +++ b/crates/task-impls/src/upgrade.rs @@ -105,7 +105,7 @@ impl, V: Versions> UpgradeTaskStat } /// main task event handler - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Upgrade Task", level = "error")] + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "Upgrade Task", level = "error")] pub async fn handle( &mut self, event: Arc>, @@ -249,7 +249,10 @@ impl, V: Versions> UpgradeTaskStat ) .await?; } - HotShotEvent::ViewChange(new_view) => { + HotShotEvent::ViewChange(new_view, epoch_number) => { + if *epoch_number > self.cur_epoch { + self.cur_epoch = *epoch_number; + } ensure!(self.cur_view < *new_view || *self.cur_view == 0); self.cur_view = *new_view; diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index ec59451f42..3795577bbb 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -59,7 +59,7 @@ pub struct VidTaskState> { impl> VidTaskState { /// main task event handler - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "VID Main Task", level = "error", target = "VidTaskState")] + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "VID Main Task", level = "error", target = "VidTaskState")] pub async fn handle( &mut self, event: Arc>, @@ -134,7 +134,7 @@ impl> VidTaskState { .await; } - HotShotEvent::ViewChange(view) => { + HotShotEvent::ViewChange(view, epoch) => { let view = *view; if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view { return None; @@ -144,6 +144,9 @@ impl> VidTaskState { info!("View changed by more than 1 going to view {:?}", view); } self.cur_view = view; + if *epoch > self.cur_epoch { + self.cur_epoch = *epoch; + } return None; } diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs index 26720d28eb..4b661a660a 100644 --- a/crates/task-impls/src/view_sync.rs +++ b/crates/task-impls/src/view_sync.rs @@ -265,7 +265,7 @@ impl, V: Versions> ViewSyncTaskSta task_map.insert(view, replica_state); } - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")] + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "View Sync Main Task", level = "error")] #[allow(clippy::type_complexity)] /// Handles incoming events for the main view sync task pub async fn handle( @@ -420,7 +420,10 @@ impl, V: Versions> ViewSyncTaskSta } } - &HotShotEvent::ViewChange(new_view) => { + &HotShotEvent::ViewChange(new_view, epoch) => { + if epoch > self.cur_epoch { + self.cur_epoch = epoch; + } let new_view = TYPES::View::new(*new_view); if self.cur_view < new_view { tracing::debug!( @@ -492,7 +495,7 @@ impl, V: Versions> ViewSyncTaskSta // If this is the first timeout we've seen advance to the next view self.cur_view = view_number + 1; broadcast_event( - Arc::new(HotShotEvent::ViewChange(TYPES::View::new(*self.cur_view))), + Arc::new(HotShotEvent::ViewChange(self.cur_view, self.cur_epoch)), &event_stream, ) .await; @@ -508,7 +511,7 @@ impl, V: Versions> ViewSyncTaskSta impl, V: Versions> ViewSyncReplicaTaskState { - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Replica Task", level = "error")] + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "View Sync Replica Task", level = "error")] /// Handle incoming events for the view sync replica task pub async fn handle( &mut self, @@ -658,8 +661,9 @@ impl, V: Versions> *self.next_view ); + // TODO: Figure out the correct way to view sync across epochs if needed broadcast_event( - Arc::new(HotShotEvent::ViewChange(self.next_view)), + Arc::new(HotShotEvent::ViewChange(self.next_view, self.cur_epoch)), &event_stream, ) .await; @@ -724,8 +728,9 @@ impl, V: Versions> timeout_task.abort(); } + // TODO: Figure out the correct way to view sync across epochs if needed broadcast_event( - Arc::new(HotShotEvent::ViewChange(self.next_view)), + Arc::new(HotShotEvent::ViewChange(self.next_view, self.cur_epoch)), &event_stream, ) .await; diff --git a/crates/testing/src/predicates/event.rs b/crates/testing/src/predicates/event.rs index 6af7275aed..b188338839 100644 --- a/crates/testing/src/predicates/event.rs +++ b/crates/testing/src/predicates/event.rs @@ -153,7 +153,7 @@ where { let info = "ViewChange".to_string(); let check: EventCallback = - Arc::new(move |e: Arc>| matches!(e.as_ref(), ViewChange(_))); + Arc::new(move |e: Arc>| matches!(e.as_ref(), ViewChange(_, _))); Box::new(EventPredicate { check, info }) } diff --git a/crates/testing/src/spinning_task.rs b/crates/testing/src/spinning_task.rs index 5593a4336e..e2a299a171 100644 --- a/crates/testing/src/spinning_task.rs +++ b/crates/testing/src/spinning_task.rs @@ -156,6 +156,7 @@ where TestInstanceState::new(self.async_delay_config.clone()), None, TYPES::View::genesis(), + TYPES::Epoch::genesis(), TYPES::View::genesis(), BTreeMap::new(), self.high_qc.clone(), @@ -238,6 +239,7 @@ where TestInstanceState::new(self.async_delay_config.clone()), None, read_storage.last_actioned_view().await, + read_storage.last_actioned_epoch().await, read_storage.last_actioned_view().await, read_storage.proposals_cloned().await, read_storage.high_qc_cloned().await.unwrap_or( diff --git a/crates/testing/tests/tests_1/da_task.rs b/crates/testing/tests/tests_1/da_task.rs index b89b141108..96a7de771d 100644 --- a/crates/testing/tests/tests_1/da_task.rs +++ b/crates/testing/tests/tests_1/da_task.rs @@ -89,8 +89,8 @@ async fn test_da_task() { let inputs = vec![ serial![ - ViewChange(ViewNumber::new(1)), - ViewChange(ViewNumber::new(2)), + ViewChange(ViewNumber::new(1), EpochNumber::new(1)), + ViewChange(ViewNumber::new(2), EpochNumber::new(1)), BlockRecv(PackedBundle::new( encoded_transactions.clone(), TestMetadata { @@ -189,8 +189,8 @@ async fn test_da_task_storage_failure() { let inputs = vec![ serial![ - ViewChange(ViewNumber::new(1)), - ViewChange(ViewNumber::new(2)), + ViewChange(ViewNumber::new(1), EpochNumber::new(1)), + ViewChange(ViewNumber::new(2), EpochNumber::new(1)), BlockRecv(PackedBundle::new( encoded_transactions.clone(), TestMetadata { diff --git a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs index bb96a52b91..dc42dfc723 100644 --- a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs +++ b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs @@ -27,6 +27,7 @@ use hotshot_testing::{ serial, view_generator::TestViewGenerator, }; +use hotshot_types::data::EpochNumber; use hotshot_types::{ data::{Leaf, ViewNumber}, request_response::ProposalRequestPayload, @@ -98,7 +99,7 @@ async fn test_quorum_proposal_recv_task() { proposals[1].clone(), leaves[0].clone(), )), - exact(ViewChange(ViewNumber::new(2))), + exact(ViewChange(ViewNumber::new(2), EpochNumber::new(0))), ])]; let state = @@ -192,7 +193,7 @@ async fn test_quorum_proposal_recv_task_liveness_check() { let expectations = vec![Expectations::from_outputs(all_predicates![ exact(QuorumProposalPreliminarilyValidated(proposals[2].clone())), - exact(ViewChange(ViewNumber::new(3))), + exact(ViewChange(ViewNumber::new(3), EpochNumber::new(0))), exact(QuorumProposalRequestSend(req, signature)), ])]; diff --git a/crates/testing/tests/tests_1/quorum_vote_task.rs b/crates/testing/tests/tests_1/quorum_vote_task.rs index 4bc81b829a..b4350abe3c 100644 --- a/crates/testing/tests/tests_1/quorum_vote_task.rs +++ b/crates/testing/tests/tests_1/quorum_vote_task.rs @@ -23,7 +23,7 @@ use hotshot_testing::{ script::{Expectations, InputOrder, TaskScript}, }; use hotshot_types::{ - data::{Leaf, ViewNumber}, + data::{EpochNumber, Leaf, ViewNumber}, traits::node_implementation::ConsensusTime, }; @@ -85,7 +85,7 @@ async fn test_quorum_vote_task_success() { let expectations = vec![Expectations::from_outputs(all_predicates![ exact(DaCertificateValidated(dacs[1].clone())), exact(VidShareValidated(vids[1].0[0].clone())), - exact(ViewChange(ViewNumber::new(3))), + exact(ViewChange(ViewNumber::new(3), EpochNumber::new(0))), quorum_vote_send(), ])]; diff --git a/crates/testing/tests/tests_1/transaction_task.rs b/crates/testing/tests/tests_1/transaction_task.rs index 7205628c87..0a4877fcc1 100644 --- a/crates/testing/tests/tests_1/transaction_task.rs +++ b/crates/testing/tests/tests_1/transaction_task.rs @@ -33,8 +33,11 @@ async fn test_transaction_task_leader_two_views_in_a_row() { let mut output = Vec::new(); let current_view = ViewNumber::new(4); - input.push(HotShotEvent::ViewChange(current_view)); - input.push(HotShotEvent::ViewChange(current_view + 1)); + input.push(HotShotEvent::ViewChange(current_view, EpochNumber::new(1))); + input.push(HotShotEvent::ViewChange( + current_view + 1, + EpochNumber::new(1), + )); input.push(HotShotEvent::Shutdown); let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); diff --git a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs index bc0a17617b..72790bec4d 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs @@ -29,7 +29,7 @@ use hotshot_testing::{ view_generator::TestViewGenerator, }; use hotshot_types::{ - data::{null_block, Leaf, ViewNumber}, + data::{null_block, EpochNumber, Leaf, ViewNumber}, simple_vote::UpgradeProposalData, traits::{election::Membership, node_implementation::ConsensusTime}, vote::HasViewNumber, @@ -134,14 +134,14 @@ async fn test_upgrade_task_with_vote() { Expectations::from_outputs(all_predicates![ exact(DaCertificateValidated(dacs[1].clone())), exact(VidShareValidated(vids[1].0[0].clone())), - exact(ViewChange(ViewNumber::new(3))), + exact(ViewChange(ViewNumber::new(3), EpochNumber::new(0))), quorum_vote_send(), ]), Expectations::from_outputs_and_task_states( all_predicates![ exact(DaCertificateValidated(dacs[2].clone())), exact(VidShareValidated(vids[2].0[0].clone())), - exact(ViewChange(ViewNumber::new(4))), + exact(ViewChange(ViewNumber::new(4), EpochNumber::new(0))), quorum_vote_send(), ], vec![no_decided_upgrade_certificate()], @@ -150,7 +150,7 @@ async fn test_upgrade_task_with_vote() { all_predicates![ exact(DaCertificateValidated(dacs[3].clone())), exact(VidShareValidated(vids[3].0[0].clone())), - exact(ViewChange(ViewNumber::new(5))), + exact(ViewChange(ViewNumber::new(5), EpochNumber::new(0))), quorum_vote_send(), ], vec![no_decided_upgrade_certificate()], @@ -159,7 +159,7 @@ async fn test_upgrade_task_with_vote() { all_predicates![ exact(DaCertificateValidated(dacs[4].clone())), exact(VidShareValidated(vids[4].0[0].clone())), - exact(ViewChange(ViewNumber::new(6))), + exact(ViewChange(ViewNumber::new(6), EpochNumber::new(0))), quorum_vote_send(), ], vec![no_decided_upgrade_certificate()], diff --git a/crates/testing/tests/tests_1/vid_task.rs b/crates/testing/tests/tests_1/vid_task.rs index d8328b98cf..8d8daf4c5e 100644 --- a/crates/testing/tests/tests_1/vid_task.rs +++ b/crates/testing/tests/tests_1/vid_task.rs @@ -100,9 +100,9 @@ async fn test_vid_task() { _pd: PhantomData, }; let inputs = vec![ - serial![ViewChange(ViewNumber::new(1))], + serial![ViewChange(ViewNumber::new(1), EpochNumber::new(1))], serial![ - ViewChange(ViewNumber::new(2)), + ViewChange(ViewNumber::new(2), EpochNumber::new(1)), BlockRecv(PackedBundle::new( encoded_transactions.clone(), TestMetadata { diff --git a/crates/testing/tests/tests_1/view_sync_task.rs b/crates/testing/tests/tests_1/view_sync_task.rs index 74a63cee82..85827dbbdb 100644 --- a/crates/testing/tests/tests_1/view_sync_task.rs +++ b/crates/testing/tests/tests_1/view_sync_task.rs @@ -10,6 +10,7 @@ use hotshot_task_impls::{ events::HotShotEvent, harness::run_harness, view_sync::ViewSyncTaskState, }; use hotshot_testing::helpers::build_system_handle; +use hotshot_types::data::EpochNumber; use hotshot_types::{ data::ViewNumber, simple_vote::ViewSyncPreCommitData, traits::node_implementation::ConsensusTime, @@ -49,7 +50,10 @@ async fn test_view_sync_task() { input.push(HotShotEvent::Shutdown); - output.push(HotShotEvent::ViewChange(ViewNumber::new(3))); + output.push(HotShotEvent::ViewChange( + ViewNumber::new(3), + EpochNumber::new(0), + )); output.push(HotShotEvent::ViewSyncPreCommitVoteSend(vote.clone())); let view_sync_state = diff --git a/crates/testing/tests/tests_1/vote_dependency_handle.rs b/crates/testing/tests/tests_1/vote_dependency_handle.rs index 7839b0d1c0..9a58e4046c 100644 --- a/crates/testing/tests/tests_1/vote_dependency_handle.rs +++ b/crates/testing/tests/tests_1/vote_dependency_handle.rs @@ -78,7 +78,10 @@ async fn test_vote_dependency_handle() { // For each permutation... for inputs in all_inputs.into_iter() { // The outputs are static here, but we re-make them since we use `into_iter` below - let outputs = vec![exact(ViewChange(ViewNumber::new(3))), quorum_vote_send()]; + let outputs = vec![ + exact(ViewChange(ViewNumber::new(3), EpochNumber::new(0))), + quorum_vote_send(), + ]; let (event_sender, mut event_receiver) = broadcast(1024); let view_number = ViewNumber::new(node_id); @@ -92,11 +95,11 @@ async fn test_vote_dependency_handle() { quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(), storage: Arc::clone(&handle.storage()), view_number, - epoch_number: EpochNumber::new(1), sender: event_sender.clone(), receiver: event_receiver.clone().deactivate(), upgrade_lock: handle.hotshot.upgrade_lock.clone(), id: handle.hotshot.id, + epoch_height: handle.hotshot.config.epoch_height, }; vote_dependency_handle_state diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index a83c2a63b4..bb62bb0e10 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -27,16 +27,13 @@ use crate::{ message::{Proposal, UpgradeLock}, simple_certificate::{DaCertificate, QuorumCertificate}, traits::{ - block_contents::{BlockHeader, BuilderFee}, + block_contents::BuilderFee, metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics}, node_implementation::{ConsensusTime, NodeType, Versions}, signature_key::SignatureKey, BlockPayload, ValidatedState, }, - utils::{ - BuilderCommitment, StateAndDelta, - Terminator::{self, Inclusive}, - }, + utils::{BuilderCommitment, LeafCommitment, StateAndDelta, Terminator}, vid::VidCommitment, vote::{Certificate, HasViewNumber}, }; @@ -536,6 +533,7 @@ impl Consensus { epoch_number > self.cur_epoch, debug!("New epoch isn't newer than the current epoch.") ); + tracing::trace!("Updating epoch from {} to {}", self.cur_epoch, epoch_number); self.cur_epoch = epoch_number; Ok(()) } @@ -915,100 +913,88 @@ impl Consensus { Some(()) } - /// Returns true if the current high qc is for the last block in the epoch - pub fn is_high_qc_for_last_block(&self) -> bool { - let high_qc = self.high_qc(); - self.is_qc_for_last_block(high_qc) - } - - /// Returns true if the given qc is for the last block in the epoch - pub fn is_qc_for_last_block(&self, cert: &QuorumCertificate) -> bool { - let Some(leaf) = self.saved_leaves.get(&cert.data().leaf_commit) else { - return false; - }; - let block_height = leaf.height(); - if block_height == 0 || self.epoch_height == 0 { - false - } else { - block_height % self.epoch_height == 0 + /// Return true if the high QC takes part in forming an eQC, i.e. + /// it is one of the 3-chain certificates but not the eQC itself + pub fn is_high_qc_forming_eqc(&self) -> bool { + let high_qc_leaf_commit = self.high_qc().data.leaf_commit; + let is_high_qc_extended = self.is_leaf_extended(high_qc_leaf_commit); + if is_high_qc_extended { + tracing::debug!("We have formed an eQC!"); } + self.is_leaf_for_last_block(high_qc_leaf_commit) && !is_high_qc_extended } - /// Returns true if the current high qc is an extended Quorum Certificate - /// The Extended Quorum Certificate (eQC) is the third Quorum Certificate formed in three - /// consecutive views for the last block in the epoch. - pub fn is_high_qc_extended(&self) -> bool { - let high_qc = self.high_qc(); - let ret = self.is_qc_extended(high_qc); - if ret { - tracing::debug!("We have formed an eQC!"); - }; - ret + /// Return true if the given leaf takes part in forming an eQC, i.e. + /// it is one of the 3-chain leaves but not the eQC leaf itself + pub fn is_leaf_forming_eqc(&self, leaf_commit: LeafCommitment) -> bool { + self.is_leaf_for_last_block(leaf_commit) && !self.is_leaf_extended(leaf_commit) } - /// Returns true if the given qc is an extended Quorum Certificate + /// Returns true if the given leaf can form an extended Quorum Certificate /// The Extended Quorum Certificate (eQC) is the third Quorum Certificate formed in three /// consecutive views for the last block in the epoch. - pub fn is_qc_extended(&self, cert: &QuorumCertificate) -> bool { - if !self.is_qc_for_last_block(cert) { - tracing::debug!("High QC is not for the last block in the epoch."); + pub fn is_leaf_extended(&self, leaf_commit: LeafCommitment) -> bool { + if !self.is_leaf_for_last_block(leaf_commit) { + tracing::debug!("The given leaf is not for the last block in the epoch."); return false; } - let qc_view = cert.view_number(); - let high_qc_block_number = - if let Some(leaf) = self.saved_leaves.get(&cert.data().leaf_commit) { - leaf.block_header().block_number() - } else { - return false; - }; - - let mut last_visited_view_number = qc_view; - let mut is_qc_extended = true; - if let Err(e) = - self.visit_leaf_ancestors(qc_view, Inclusive(qc_view - 2), true, |leaf, _, _| { + let Some(leaf) = self.saved_leaves.get(&leaf_commit) else { + return false; + }; + let leaf_view = leaf.view_number(); + let leaf_block_number = leaf.height(); + + let mut last_visited_view_number = leaf_view; + let mut is_leaf_extended = true; + if let Err(e) = self.visit_leaf_ancestors( + leaf_view, + Terminator::Inclusive(leaf_view - 2), + true, + |leaf, _, _| { tracing::trace!( "last_visited_view_number = {}, leaf.view_number = {}", *last_visited_view_number, *leaf.view_number() ); - if leaf.view_number() == qc_view { + if leaf.view_number() == leaf_view { return true; } if last_visited_view_number - 1 != leaf.view_number() { tracing::trace!("The chain is broken. Non consecutive views."); - is_qc_extended = false; + is_leaf_extended = false; return false; } - if high_qc_block_number != leaf.height() { + if leaf_block_number != leaf.height() { tracing::trace!("The chain is broken. Block numbers do not match."); - is_qc_extended = false; + is_leaf_extended = false; return false; } last_visited_view_number = leaf.view_number(); true - }) - { - is_qc_extended = false; + }, + ) { + is_leaf_extended = false; tracing::trace!("The chain is broken. Leaf ascension failed."); tracing::debug!("Leaf ascension failed; error={e}"); } - tracing::trace!("Is the given QC an eQC? {}", is_qc_extended); - is_qc_extended + tracing::trace!("Can the given leaf form an eQC? {}", is_leaf_extended); + is_leaf_extended } - /// Return true if the given Quorum Certificate takes part in forming an eQC, i.e. - /// it is one of the 3-chain certificates but not the eQC itself - pub fn is_qc_forming_eqc(&self, cert: &QuorumCertificate) -> bool { - self.is_qc_for_last_block(cert) && !self.is_qc_extended(cert) - } - - /// Return true if the high QC takes part in forming an eQC, i.e. - /// it is one of the 3-chain certificates but not the eQC itself - pub fn is_high_qc_forming_eqc(&self) -> bool { - self.is_high_qc_for_last_block() && !self.is_high_qc_extended() + /// Returns true if a given leaf is for the last block in the epoch + pub fn is_leaf_for_last_block(&self, leaf_commit: LeafCommitment) -> bool { + let Some(leaf) = self.saved_leaves.get(&leaf_commit) else { + return false; + }; + let block_height = leaf.height(); + if block_height == 0 || self.epoch_height == 0 { + false + } else { + block_height % self.epoch_height == 0 + } } } diff --git a/crates/types/src/utils.rs b/crates/types/src/utils.rs index e3d19a8286..c3f8780575 100644 --- a/crates/types/src/utils.rs +++ b/crates/types/src/utils.rs @@ -70,7 +70,7 @@ impl Clone for ViewInner { } } /// The hash of a leaf. -type LeafCommitment = Commitment>; +pub type LeafCommitment = Commitment>; /// Optional validated state and state delta. pub type StateAndDelta = ( @@ -210,3 +210,15 @@ pub fn bincode_opts() -> WithOtherTrailing< .with_fixint_encoding() .reject_trailing_bytes() } + +/// Returns an epoch number given a block number and an epoch height +#[must_use] +pub fn epoch_from_block_number(block_number: u64, epoch_height: u64) -> u64 { + if epoch_height == 0 { + 0 + } else if block_number % epoch_height == 0 { + block_number / epoch_height + } else { + block_number / epoch_height + 1 + } +} From 39c80f91dec91cfdc6f045040115ecb49a34aead Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 14:56:02 -0500 Subject: [PATCH 09/18] add failures test --- .../tests_3/test_with_failures_half_f.rs | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/crates/testing/tests/tests_3/test_with_failures_half_f.rs b/crates/testing/tests/tests_3/test_with_failures_half_f.rs index 797aa77cab..a8a2dbb14b 100644 --- a/crates/testing/tests/tests_3/test_with_failures_half_f.rs +++ b/crates/testing/tests/tests_3/test_with_failures_half_f.rs @@ -5,7 +5,7 @@ // along with the HotShot repository. If not, see . use hotshot_example_types::{ - node_types::{Libp2pImpl, MemoryImpl, PushCdnImpl, TestVersions}, + node_types::{EpochsTestVersions, Libp2pImpl, MemoryImpl, PushCdnImpl, TestVersions}, state_types::TestTypes, }; use hotshot_macros::cross_tests; @@ -52,3 +52,40 @@ cross_tests!( metadata } ); +cross_tests!( + TestName: test_with_failures_half_f_epochs, + Impls: [MemoryImpl, Libp2pImpl, PushCdnImpl], + Types: [TestTypes], + Versions: [EpochsTestVersions], + Ignore: false, + Metadata: { + let mut metadata = TestDescription::default_more_nodes(); + metadata.num_bootstrap_nodes = 17; + // The first 14 (i.e., 20 - f) nodes are in the DA committee and we may shutdown the + // remaining 6 (i.e., f) nodes. We could remove this restriction after fixing the + // following issue. + let dead_nodes = vec![ + ChangeNode { + idx: 17, + updown: NodeAction::Down, + }, + ChangeNode { + idx: 18, + updown: NodeAction::Down, + }, + ChangeNode { + idx: 19, + updown: NodeAction::Down, + }, + ]; + + metadata.spinning_properties = SpinningTaskDescription { + node_changes: vec![(5, dead_nodes)] + }; + + metadata.overall_safety_properties.num_failed_views = 3; + // Make sure we keep committing rounds after the bad leaders, but not the full 50 because of the numerous timeouts + metadata.overall_safety_properties.num_successful_views = 22; + metadata + } +); From 9945ecaf46a879159d00a513171adc898bf6882a Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 15:00:33 -0500 Subject: [PATCH 10/18] actually use new decide rule --- crates/task-impls/src/quorum_vote/handlers.rs | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/crates/task-impls/src/quorum_vote/handlers.rs b/crates/task-impls/src/quorum_vote/handlers.rs index 2f78773f88..20ef1c6a9d 100644 --- a/crates/task-impls/src/quorum_vote/handlers.rs +++ b/crates/task-impls/src/quorum_vote/handlers.rs @@ -26,11 +26,15 @@ use hotshot_types::{ }; use tracing::instrument; use utils::anytrace::*; +use vbs::version::StaticVersionType; use super::QuorumVoteTaskState; use crate::{ events::HotShotEvent, - helpers::{broadcast_event, decide_from_proposal, fetch_proposal, LeafChainTraversalOutcome}, + helpers::{ + broadcast_event, decide_from_proposal, decide_from_proposal_2, fetch_proposal, + LeafChainTraversalOutcome, + }, quorum_vote::Versions, }; @@ -44,6 +48,11 @@ pub(crate) async fn handle_quorum_proposal_validated< proposal: &QuorumProposal, task_state: &mut QuorumVoteTaskState, ) -> Result<()> { + let version = task_state + .upgrade_lock + .version(proposal.view_number()) + .await?; + let LeafChainTraversalOutcome { new_locked_view_number, new_decided_view_number, @@ -51,13 +60,23 @@ pub(crate) async fn handle_quorum_proposal_validated< leaf_views, included_txns, decided_upgrade_cert, - } = decide_from_proposal( - proposal, - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate), - &task_state.public_key, - ) - .await; + } = if version >= V::Epochs::VERSION { + decide_from_proposal_2( + proposal, + OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), + Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate), + &task_state.public_key, + ) + .await + } else { + decide_from_proposal( + proposal, + OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), + Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate), + &task_state.public_key, + ) + .await + }; if let Some(cert) = decided_upgrade_cert.clone() { let mut decided_certificate_lock = task_state From eb1607508d74a6054973e72d86a11534885086b1 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 15:49:03 -0500 Subject: [PATCH 11/18] add test verifying the new decide rule is one shorter --- crates/testing/tests/tests_1/test_success.rs | 36 ++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/crates/testing/tests/tests_1/test_success.rs b/crates/testing/tests/tests_1/test_success.rs index 588c718a83..e81060aedb 100644 --- a/crates/testing/tests/tests_1/test_success.rs +++ b/crates/testing/tests/tests_1/test_success.rs @@ -17,6 +17,7 @@ use hotshot_macros::cross_tests; use hotshot_testing::{ block_builder::SimpleBuilderImplementation, completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + spinning_task::{ChangeNode, NodeAction, SpinningTaskDescription}, test_builder::TestDescription, view_sync_task::ViewSyncTaskDescription, }; @@ -151,3 +152,38 @@ cross_tests!( } }, ); + +// Test to make sure we can decide in just 3 views +// This test fails with the old decide rule +cross_tests!( + TestName: test_shorter_decide, + Impls: [MemoryImpl], + Types: [TestTypes], + Versions: [EpochsTestVersions], + Ignore: false, + Metadata: { + let mut metadata = TestDescription { + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_millis(100000), + }, + ), + ..TestDescription::default() + }; + // after the first 3 leaders the next leader is down. It's a hack to make sure we decide in + // 3 views or else we get a timeout + let dead_nodes = vec![ + ChangeNode { + idx: 4, + updown: NodeAction::Down, + }, + + ]; + metadata.spinning_properties = SpinningTaskDescription { + node_changes: vec![(1, dead_nodes)] + }; + metadata.overall_safety_properties.num_successful_views = 1; + metadata.overall_safety_properties.num_failed_views = 0; + metadata + }, +); From 0dab02bcccc361100d9823de83618419a534792a Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 16:00:00 -0500 Subject: [PATCH 12/18] reduce epoch chain len by 1 --- crates/types/src/consensus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index bb62bb0e10..831def9148 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -949,7 +949,7 @@ impl Consensus { let mut is_leaf_extended = true; if let Err(e) = self.visit_leaf_ancestors( leaf_view, - Terminator::Inclusive(leaf_view - 2), + Terminator::Inclusive(leaf_view - 1), true, |leaf, _, _| { tracing::trace!( From 365ded5ae22084e10f547c32ee046adc659ba10e Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 22:40:17 -0500 Subject: [PATCH 13/18] fix waiting and validate QC --- crates/task-impls/src/helpers.rs | 6 ++-- .../src/quorum_proposal/handlers.rs | 34 ++++++++++++++----- crates/task-impls/src/quorum_proposal/mod.rs | 2 +- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 3d44afdb1c..bd88c45c6c 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -9,7 +9,7 @@ use std::{ sync::Arc, }; -use async_broadcast::{InactiveReceiver, Receiver, SendError, Sender}; +use async_broadcast::{Receiver, SendError, Sender}; use async_lock::RwLock; use committable::{Commitment, Committable}; use hotshot_task::dependency::{Dependency, EventDependency}; @@ -426,7 +426,7 @@ pub async fn decide_from_proposal( pub(crate) async fn parent_leaf_and_state( next_proposal_view_number: TYPES::View, event_sender: &Sender>>, - event_receiver: &InactiveReceiver>>, + event_receiver: &Receiver>>, quorum_membership: Arc, public_key: TYPES::SignatureKey, private_key: ::PrivateKey, @@ -452,7 +452,7 @@ pub(crate) async fn parent_leaf_and_state( let _ = fetch_proposal( parent_view_number, event_sender.clone(), - event_receiver.activate_cloned(), + event_receiver.clone(), quorum_membership, consensus.clone(), public_key.clone(), diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 03c6a732ef..27646e888c 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -19,20 +19,22 @@ use crate::{ quorum_proposal::{UpgradeLock, Versions}, }; use anyhow::{ensure, Context, Result}; -use async_broadcast::{InactiveReceiver, Receiver, Sender}; +use async_broadcast::{Receiver, Sender}; use async_lock::RwLock; use hotshot_task::dependency_task::HandleDepOutput; -use hotshot_types::simple_certificate::QuorumCertificate; use hotshot_types::{ consensus::{CommitmentAndMetadata, OuterConsensus}, data::{Leaf, QuorumProposal, VidDisperse, ViewChangeEvidence}, message::Proposal, simple_certificate::UpgradeCertificate, traits::{ - block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey, + block_contents::BlockHeader, + node_implementation::{ConsensusTime, NodeType}, + signature_key::SignatureKey, }, vote::HasViewNumber, }; +use hotshot_types::{simple_certificate::QuorumCertificate, vote::Certificate}; use tracing::instrument; use utils::anytrace::*; use vbs::version::StaticVersionType; @@ -71,7 +73,7 @@ pub struct ProposalDependencyHandle { pub sender: Sender>>, /// The event receiver. - pub receiver: InactiveReceiver>>, + pub receiver: Receiver>>, /// Immutable instance state pub instance_state: Arc, @@ -114,7 +116,16 @@ impl ProposalDependencyHandle { ) -> Option> { while let Ok(event) = rx.recv_direct().await { if let HotShotEvent::HighQcRecv(qc, _sender) = event.as_ref() { - return Some(qc.clone()); + if qc + .is_valid_cert( + self.quorum_membership.as_ref(), + TYPES::Epoch::new(0), + &self.upgrade_lock, + ) + .await + { + return Some(qc.clone()); + } } } None @@ -132,20 +143,25 @@ impl ProposalDependencyHandle { { return highest_qc; } - let mut rx = self.receiver.activate_cloned(); // TODO configure timeout while self.view_start_time.elapsed() < Duration::from_secs(1) { - let Some(time_spent) = self.view_start_time.checked_duration_since(Instant::now()) + let Some(time_spent) = Instant::now().checked_duration_since(self.view_start_time) else { + // Shouldn't be possible, now must be after the start return highest_qc; }; let Some(time_left) = Duration::from_secs(1).checked_sub(time_spent) else { + // No time left return highest_qc; }; - let Ok(maybe_qc) = - tokio::time::timeout(time_left, self.wait_for_qc_event(&mut rx)).await + let Ok(maybe_qc) = tokio::time::timeout( + time_left, + self.wait_for_qc_event(&mut self.receiver.clone()), + ) + .await else { + // we timeout out, don't wait any longer return highest_qc; }; let Some(qc) = maybe_qc else { diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index 5790e359fb..f7cd4ab38c 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -312,7 +312,7 @@ impl, V: Versions> latest_proposed_view: self.latest_proposed_view, view_number, sender: event_sender, - receiver: event_receiver.deactivate(), + receiver: event_receiver, quorum_membership: Arc::clone(&self.quorum_membership), public_key: self.public_key.clone(), private_key: self.private_key.clone(), From 29ec44890e9e3fca8419b51d26a96540f8af7d3e Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 22:46:08 -0500 Subject: [PATCH 14/18] use proper timeout --- crates/task-impls/src/quorum_proposal/handlers.rs | 7 +++++-- crates/task-impls/src/quorum_proposal/mod.rs | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 27646e888c..4a757120fd 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -90,6 +90,8 @@ pub struct ProposalDependencyHandle { /// Shared consensus task state pub consensus: OuterConsensus, + /// View timeout from config. + pub timeout: u64, /// The most recent upgrade certificate this node formed. /// Note: this is ONLY for certificates that have been formed internally, /// so that we can propose with them. @@ -143,15 +145,16 @@ impl ProposalDependencyHandle { { return highest_qc; } + let wait_duration = Duration::from_millis(self.timeout / 2); // TODO configure timeout - while self.view_start_time.elapsed() < Duration::from_secs(1) { + while self.view_start_time.elapsed() < wait_duration { let Some(time_spent) = Instant::now().checked_duration_since(self.view_start_time) else { // Shouldn't be possible, now must be after the start return highest_qc; }; - let Some(time_left) = Duration::from_secs(1).checked_sub(time_spent) else { + let Some(time_left) = wait_duration.checked_sub(time_spent) else { // No time left return highest_qc; }; diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index f7cd4ab38c..e4e80d6629 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -318,6 +318,7 @@ impl, V: Versions> private_key: self.private_key.clone(), instance_state: Arc::clone(&self.instance_state), consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), + timeout: self.timeout, formed_upgrade_certificate: self.formed_upgrade_certificate.clone(), upgrade_lock: self.upgrade_lock.clone(), id: self.id, From ac6f36e601ecbd3283b00b6b54b660b5ecb5b9a2 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 13 Nov 2024 23:01:48 -0500 Subject: [PATCH 15/18] more accurate high qc waiting logic --- crates/hotshot/src/tasks/task_state.rs | 1 + .../src/quorum_proposal/handlers.rs | 31 +++++++++++-------- crates/task-impls/src/quorum_proposal/mod.rs | 20 +++++++++++- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index 09bc3b0eb4..536e6239a6 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -270,6 +270,7 @@ impl, V: Versions> CreateTaskState formed_upgrade_certificate: None, upgrade_lock: handle.hotshot.upgrade_lock.clone(), epoch_height: handle.hotshot.config.epoch_height, + highest_qc: handle.hotshot.consensus.read().await.high_qc().clone(), } } } diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 4a757120fd..6543e382f0 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -26,15 +26,14 @@ use hotshot_types::{ consensus::{CommitmentAndMetadata, OuterConsensus}, data::{Leaf, QuorumProposal, VidDisperse, ViewChangeEvidence}, message::Proposal, - simple_certificate::UpgradeCertificate, + simple_certificate::{QuorumCertificate, UpgradeCertificate}, traits::{ block_contents::BlockHeader, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, }, - vote::HasViewNumber, + vote::{Certificate, HasViewNumber}, }; -use hotshot_types::{simple_certificate::QuorumCertificate, vote::Certificate}; use tracing::instrument; use utils::anytrace::*; use vbs::version::StaticVersionType; @@ -108,6 +107,9 @@ pub struct ProposalDependencyHandle { /// The time this view started pub view_start_time: Instant, + + /// The higest_qc we've seen at the start of this task + pub highest_qc: QuorumCertificate, } impl ProposalDependencyHandle { @@ -134,8 +136,7 @@ impl ProposalDependencyHandle { } /// Waits for the ocnfigured timeout for nodes to send HighQC messages to us. We'll /// then propose with the higest QC from among these proposals. - async fn wait_for_highest_qc(&mut self) -> QuorumCertificate { - let mut highest_qc = self.consensus.read().await.high_qc().clone(); + async fn wait_for_highest_qc(&mut self) { // If we haven't upgraded to Hotstuff 2 just return the high qc right away if self .upgrade_lock @@ -143,7 +144,7 @@ impl ProposalDependencyHandle { .await .is_ok_and(|version| version < V::Epochs::VERSION) { - return highest_qc; + return; } let wait_duration = Duration::from_millis(self.timeout / 2); @@ -152,11 +153,11 @@ impl ProposalDependencyHandle { let Some(time_spent) = Instant::now().checked_duration_since(self.view_start_time) else { // Shouldn't be possible, now must be after the start - return highest_qc; + return; }; let Some(time_left) = wait_duration.checked_sub(time_spent) else { // No time left - return highest_qc; + return; }; let Ok(maybe_qc) = tokio::time::timeout( time_left, @@ -165,16 +166,15 @@ impl ProposalDependencyHandle { .await else { // we timeout out, don't wait any longer - return highest_qc; + return; }; let Some(qc) = maybe_qc else { continue; }; - if qc.view_number() > highest_qc.view_number() { - highest_qc = qc; + if qc.view_number() > self.highest_qc.view_number() { + self.highest_qc = qc; } } - highest_qc } /// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`] /// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`], @@ -380,7 +380,12 @@ impl HandleDepOutput for ProposalDependencyHandle< } } - let parent_qc = parent_qc.unwrap_or(self.wait_for_highest_qc().await); + let parent_qc = if let Some(qc) = parent_qc { + qc + } else { + self.wait_for_highest_qc().await; + self.highest_qc.clone() + }; if commit_and_metadata.is_none() { tracing::error!( diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index e4e80d6629..a6b641e602 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -19,7 +19,7 @@ use hotshot_types::{ consensus::OuterConsensus, event::Event, message::UpgradeLock, - simple_certificate::UpgradeCertificate, + simple_certificate::{QuorumCertificate, UpgradeCertificate}, traits::{ election::Membership, node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions}, @@ -91,6 +91,9 @@ pub struct QuorumProposalTaskState /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, + + /// The higest_qc we've seen at the start of this task + pub highest_qc: QuorumCertificate, } impl, V: Versions> @@ -323,6 +326,7 @@ impl, V: Versions> upgrade_lock: self.upgrade_lock.clone(), id: self.id, view_start_time: Instant::now(), + highest_qc: self.highest_qc.clone(), }, ); self.proposal_dependencies @@ -508,6 +512,20 @@ impl, V: Versions> HotShotEvent::ViewChange(view, _) | HotShotEvent::Timeout(view) => { self.cancel_tasks(*view); } + HotShotEvent::HighQcSend(qc, _sender) => { + ensure!(qc.view_number() > self.highest_qc.view_number()); + let epoch_number = self.consensus.read().await.cur_epoch(); + ensure!( + qc.is_valid_cert( + self.quorum_membership.as_ref(), + epoch_number, + &self.upgrade_lock + ) + .await, + warn!("Qurom certificate {:?} was invalid", qc.data()) + ); + self.highest_qc = qc.clone(); + } _ => {} } Ok(()) From e4f597695e0e24770153644921619198e4db771a Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Thu, 14 Nov 2024 10:10:12 -0500 Subject: [PATCH 16/18] gate by versios before waiting for QC --- crates/task-impls/src/quorum_proposal/handlers.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 6543e382f0..1a928ebfa7 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -137,6 +137,7 @@ impl ProposalDependencyHandle { /// Waits for the ocnfigured timeout for nodes to send HighQC messages to us. We'll /// then propose with the higest QC from among these proposals. async fn wait_for_highest_qc(&mut self) { + tracing::error!("waiting for QC"); // If we haven't upgraded to Hotstuff 2 just return the high qc right away if self .upgrade_lock @@ -382,6 +383,13 @@ impl HandleDepOutput for ProposalDependencyHandle< let parent_qc = if let Some(qc) = parent_qc { qc + } else if self + .upgrade_lock + .version(self.view_number) + .await + .is_ok_and(|version| version < V::Epochs::VERSION) + { + self.consensus.read().await.high_qc().clone() } else { self.wait_for_highest_qc().await; self.highest_qc.clone() From d65b2339fe201217fa38264aea3a5ac13912bbe5 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Fri, 15 Nov 2024 10:06:29 -0500 Subject: [PATCH 17/18] Fix GC for epochs --- .../src/quorum_proposal/handlers.rs | 2 +- crates/types/src/consensus.rs | 27 +++++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 1a928ebfa7..f514555621 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -247,7 +247,7 @@ impl ProposalDependencyHandle { let metadata = commitment_and_metadata.metadata.clone(); let block_header = if version >= V::Epochs::VERSION - && self.consensus.read().await.is_high_qc_forming_eqc() + && self.consensus.read().await.is_qc_forming_eqc(&parent_qc) { tracing::info!("Reached end of epoch. Proposing the same block again to form an eQC."); let block_header = parent_leaf.block_header().clone(); diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 4e1912996d..499430c0ca 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -818,13 +818,14 @@ impl Consensus { /// # Panics /// On inconsistent stored entries pub fn collect_garbage(&mut self, old_anchor_view: TYPES::View, new_anchor_view: TYPES::View) { + let gc_view = TYPES::View::new(new_anchor_view.saturating_sub(1)); // state check let anchor_entry = self .validated_state_map .iter() .next() .expect("INCONSISTENT STATE: anchor leaf not in state map!"); - if *anchor_entry.0 != old_anchor_view { + if **anchor_entry.0 != old_anchor_view.saturating_sub(1) { tracing::error!( "Something about GC has failed. Older leaf exists than the previous anchor leaf." ); @@ -833,15 +834,15 @@ impl Consensus { self.saved_da_certs .retain(|view_number, _| *view_number >= old_anchor_view); self.validated_state_map - .range(old_anchor_view..new_anchor_view) + .range(old_anchor_view..gc_view) .filter_map(|(_view_number, view)| view.leaf_commitment()) .for_each(|leaf| { self.saved_leaves.remove(&leaf); }); - self.validated_state_map = self.validated_state_map.split_off(&new_anchor_view); - self.saved_payloads = self.saved_payloads.split_off(&new_anchor_view); - self.vid_shares = self.vid_shares.split_off(&new_anchor_view); - self.last_proposals = self.last_proposals.split_off(&new_anchor_view); + self.validated_state_map = self.validated_state_map.split_off(&gc_view); + self.saved_payloads = self.saved_payloads.split_off(&gc_view); + self.vid_shares = self.vid_shares.split_off(&gc_view); + self.last_proposals = self.last_proposals.split_off(&gc_view); } /// Gets the last decided leaf. @@ -915,10 +916,10 @@ impl Consensus { Some(()) } - /// Return true if the high QC takes part in forming an eQC, i.e. + /// Return true if the QC takes part in forming an eQC, i.e. /// it is one of the 3-chain certificates but not the eQC itself - pub fn is_high_qc_forming_eqc(&self) -> bool { - let high_qc_leaf_commit = self.high_qc().data.leaf_commit; + pub fn is_qc_forming_eqc(&self, qc: &QuorumCertificate) -> bool { + let high_qc_leaf_commit = qc.data.leaf_commit; let is_high_qc_extended = self.is_leaf_extended(high_qc_leaf_commit); if is_high_qc_extended { tracing::debug!("We have formed an eQC!"); @@ -926,6 +927,11 @@ impl Consensus { self.is_leaf_for_last_block(high_qc_leaf_commit) && !is_high_qc_extended } + /// Returns true if our high qc is forming an eQC + pub fn is_high_qc_forming_eqc(&self) -> bool { + self.is_qc_forming_eqc(self.high_qc()) + } + /// Return true if the given leaf takes part in forming an eQC, i.e. /// it is one of the 3-chain leaves but not the eQC leaf itself pub fn is_leaf_forming_eqc(&self, leaf_commit: LeafCommitment) -> bool { @@ -952,7 +958,7 @@ impl Consensus { let mut is_leaf_extended = true; if let Err(e) = self.visit_leaf_ancestors( leaf_view, - Terminator::Inclusive(leaf_view - 1), + Terminator::Inclusive(leaf_view - 2), true, |leaf, _, _| { tracing::trace!( @@ -980,7 +986,6 @@ impl Consensus { }, ) { is_leaf_extended = false; - tracing::trace!("The chain is broken. Leaf ascension failed."); tracing::debug!("Leaf ascension failed; error={e}"); } tracing::trace!("Can the given leaf form an eQC? {}", is_leaf_extended); From a63ddaa7ec996895ab104adb4654d8051867cf15 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Fri, 15 Nov 2024 10:10:35 -0500 Subject: [PATCH 18/18] fix version gating --- crates/task-impls/src/quorum_proposal/handlers.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index f514555621..cfc46f4201 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -381,14 +381,16 @@ impl HandleDepOutput for ProposalDependencyHandle< } } + let Ok(version) = self.upgrade_lock.version(self.view_number).await else { + tracing::error!( + "Failed to get version for view {:?}, not proposing", + self.view_number + ); + return; + }; let parent_qc = if let Some(qc) = parent_qc { qc - } else if self - .upgrade_lock - .version(self.view_number) - .await - .is_ok_and(|version| version < V::Epochs::VERSION) - { + } else if version < V::Epochs::VERSION { self.consensus.read().await.high_qc().clone() } else { self.wait_for_highest_qc().await;