Skip to content

Commit

Permalink
add another function
Browse files Browse the repository at this point in the history
  • Loading branch information
jparr721 committed Mar 27, 2024
1 parent 1779b77 commit 26ba29d
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 30 deletions.
3 changes: 3 additions & 0 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
consensus,
timeout_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
timeout_task: None,
public_key: handle.public_key().clone(),
timeout: handle.hotshot.config.next_view_timeout,
id: handle.hotshot.id,
}
}
Expand Down
198 changes: 168 additions & 30 deletions crates/task-impls/src/quorum_proposal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{collections::HashMap, sync::Arc};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use std::{collections::HashMap, sync::Arc, time::Duration};

use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use async_lock::{RwLock, RwLockUpgradableReadGuard};
use hotshot_task::{
dependency::{AndDependency, EventDependency},
dependency_task::{DependencyTask, HandleDepOutput},
Expand Down Expand Up @@ -155,6 +156,15 @@ pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>
/// Membership for Quorum Certs/votes
pub quorum_membership: Arc<TYPES::Membership>,

/// Timeout task handle
pub timeout_task: Option<JoinHandle<()>>,

/// Our public key
pub public_key: TYPES::SignatureKey,

/// View timeout from config.
pub timeout: u64,

/// The node's id
pub id: u64,
}
Expand Down Expand Up @@ -288,12 +298,157 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalTaskState<TYPE
false
}

/// Must only update the view and GC if the view actually changes
#[instrument(skip_all, fields(id = self.id, view = *self.latest_proposed_view), name = "Consensus update view", level = "error")]
async fn update_view(
&mut self,
new_view: TYPES::Time,
event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> bool {
if *self.latest_proposed_view < *new_view {
debug!(
"Updating view from {} to {} in consensus task",
*self.latest_proposed_view, *new_view
);

if *self.latest_proposed_view / 100 != *new_view / 100 {
// TODO (https://github.com/EspressoSystems/HotShot/issues/2296):
// switch to info! when INFO logs become less cluttered
error!("Progress: entered view {:>6}", *new_view);
}

// cancel the old timeout task
if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}
self.latest_proposed_view = new_view;

// Poll the future leader for lookahead
let lookahead_view = new_view + LOOK_AHEAD;
if self.quorum_membership.get_leader(lookahead_view) != self.public_key {
self.quorum_network
.inject_consensus_info(ConsensusIntentEvent::PollFutureLeader(
*lookahead_view,
self.quorum_membership.get_leader(lookahead_view),
))
.await;
}

// Start polling for proposals for the new view
self.quorum_network
.inject_consensus_info(ConsensusIntentEvent::PollForProposal(
*self.latest_proposed_view + 1,
))
.await;

self.quorum_network
.inject_consensus_info(ConsensusIntentEvent::PollForDAC(
*self.latest_proposed_view + 1,
))
.await;

if self
.quorum_membership
.get_leader(self.latest_proposed_view + 1)
== self.public_key
{
debug!(
"Polling for quorum votes for view {}",
*self.latest_proposed_view
);
self.quorum_network
.inject_consensus_info(ConsensusIntentEvent::PollForVotes(
*self.latest_proposed_view,
))
.await;
}

broadcast_event(Arc::new(HotShotEvent::ViewChange(new_view)), event_stream).await;

// Spawn a timeout task if we did actually update view
let timeout = self.timeout;
self.timeout_task = Some(async_spawn({
let stream = event_stream.clone();
// Nuance: We timeout on the view + 1 here because that means that we have
// not seen evidence to transition to this new view
let view_number = self.latest_proposed_view + 1;
async move {
async_sleep(Duration::from_millis(timeout)).await;
broadcast_event(
Arc::new(HotShotEvent::Timeout(TYPES::Time::new(*view_number))),
&stream,
)
.await;
}
}));
let consensus = self.consensus.upgradable_read().await;
consensus
.metrics
.current_view
.set(usize::try_from(self.latest_proposed_view.get_u64()).unwrap());
// Do the comparison before the substraction to avoid potential overflow, since
// `last_decided_view` may be greater than `latest_proposed_view` if the node is catching up.
if usize::try_from(self.latest_proposed_view.get_u64()).unwrap()
> usize::try_from(consensus.last_decided_view.get_u64()).unwrap()
{
consensus.metrics.number_of_views_since_last_decide.set(
usize::try_from(self.latest_proposed_view.get_u64()).unwrap()
- usize::try_from(consensus.last_decided_view.get_u64()).unwrap(),
);
}
let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await;
consensus.update_view(new_view);
drop(consensus);

return true;
}
false
}

/// Validates view change evidence and logs if a failure occurs
fn validate_view_change_evidence(
&self,
view: TYPES::Time,
evidence: ViewChangeEvidence<TYPES>,
) -> bool {
match evidence {
ViewChangeEvidence::Timeout(timeout_cert) => {
if timeout_cert.get_data().view != view - 1 {
tracing::warn!("Timeout certificate for view {} was not for the immediately preceding view", *view);
return false;
}

if !timeout_cert.is_valid_cert(self.timeout_membership.as_ref()) {
tracing::warn!("Timeout certificate for view {} was invalid", *view);
return false;
}
}
ViewChangeEvidence::ViewSync(view_sync_cert) => {
if view_sync_cert.view_number != view {
debug!(
"Cert view number {:?} does not match proposal view number {:?}",
view_sync_cert.view_number, view
);
return false;
}

// View sync certs must also be valid.
if !view_sync_cert.is_valid_cert(self.quorum_membership.as_ref()) {
debug!("Invalid ViewSyncFinalize cert provided");
return false;
}
}
}
true
}

/// Validate a quorum proposal.
async fn validate_quorum_proposal(
&mut self,
view: TYPES::Time,
quorum_proposal: Proposal<TYPES, QuorumProposal<TYPES>>,
sender: TYPES::SignatureKey,
event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
) -> bool {
let view_leader_key = self.quorum_membership.get_leader(view);
if view_leader_key != sender {
Expand All @@ -304,33 +459,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalTaskState<TYPE
if let Some(received_quorum_proposal_cert) =
quorum_proposal.data.proposal_certificate.clone()
{
match received_quorum_proposal_cert {
ViewChangeEvidence::Timeout(timeout_cert) => {
if timeout_cert.get_data().view != view - 1 {
tracing::warn!("Timeout certificate for view {} was not for the immediately preceding view", *view);
return false;
}

if !timeout_cert.is_valid_cert(self.timeout_membership.as_ref()) {
tracing::warn!("Timeout certificate for view {} was invalid", *view);
return false;
}
}
ViewChangeEvidence::ViewSync(view_sync_cert) => {
if view_sync_cert.view_number != view {
debug!(
"Cert view number {:?} does not match proposal view number {:?}",
view_sync_cert.view_number, view
);
return false;
}

// View sync certs must also be valid.
if !view_sync_cert.is_valid_cert(self.quorum_membership.as_ref()) {
debug!("Invalid ViewSyncFinalize cert provided");
return false;
}
}
if !self.validate_view_change_evidence(view, received_quorum_proposal_cert) {
debug!("Failed to validate view change evidence");
return false;
}
} else {
tracing::warn!(
Expand Down Expand Up @@ -376,6 +507,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalTaskState<TYPE
}
}

self.update_view(view, &event_stream).await;

true
}

Expand Down Expand Up @@ -499,7 +632,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalTaskState<TYPE
.await;

if !self
.validate_quorum_proposal(view, proposal.clone(), sender.clone())
.validate_quorum_proposal(
view,
proposal.clone(),
sender.clone(),
event_sender.clone(),
)
.await
{
return;
Expand Down

0 comments on commit 26ba29d

Please sign in to comment.