Skip to content

Commit

Permalink
Merge pull request #1872 from EspressoSystems/ed/timeout-hotfix-merge
Browse files Browse the repository at this point in the history
Ed/timeout hotfix merge
  • Loading branch information
elliedavidson authored Oct 10, 2023
2 parents 34c9d01 + 6abf91e commit 6bd7452
Show file tree
Hide file tree
Showing 10 changed files with 634 additions and 482 deletions.
520 changes: 286 additions & 234 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where
.expect("Failed to receive broadcast messages"),
);
if msgs.0.is_empty() {
async_sleep(Duration::new(0, 500)).await;
async_sleep(Duration::from_millis(100)).await;
} else {
break msgs;
}
Expand All @@ -120,7 +120,7 @@ where
.expect("Failed to receive direct messages"),
);
if msgs.0.is_empty() {
async_sleep(Duration::new(0, 500)).await;
async_sleep(Duration::from_millis(100)).await;
} else {
break msgs;
}
Expand Down Expand Up @@ -288,7 +288,7 @@ where
consensus,
timeout: handle.hotshot.inner.config.next_view_timeout,
cur_view: TYPES::Time::new(0),
block: VIDBlockPayload::genesis(),
block: Some(VIDBlockPayload::genesis()),
quorum_exchange: c_api.inner.exchanges.quorum_exchange().clone().into(),
api: c_api.clone(),
committee_exchange: c_api.inner.exchanges.committee_exchange().clone().into(),
Expand Down
480 changes: 275 additions & 205 deletions crates/task-impls/src/consensus.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use hotshot_types::{
use snafu::Snafu;
use std::{marker::PhantomData, sync::Arc};
use tracing::error;
use tracing::instrument;

/// the type of network task
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -190,6 +191,8 @@ impl<
/// # Panics
/// Panic sif a direct message event is received with no recipient
#[allow(clippy::too_many_lines)] // TODO https://github.com/EspressoSystems/HotShot/issues/1704
#[instrument(skip_all, fields(view = *self.view), name = "Newtork Task", level = "error")]

pub async fn handle_event(
&mut self,
event: SequencingHotShotEvent<TYPES, I>,
Expand Down
71 changes: 51 additions & 20 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ where
(certificate_internal, ViewSyncPhase::Finalize)
}
};
debug!(
error!(
"Received view sync cert for phase {:?}",
last_seen_certificate
);
Expand All @@ -311,19 +311,20 @@ where
}

// We do not have a replica task already running, so start one
let mut replica_state = ViewSyncReplicaTaskState {
current_view: certificate_internal.round,
next_view: certificate_internal.round,
relay: 0,
finalized: false,
sent_view_change_event: false,
phase: ViewSyncPhase::None,
exchange: self.exchange.clone(),
api: self.api.clone(),
event_stream: self.event_stream.clone(),
view_sync_timeout: self.view_sync_timeout,
id: self.id,
};
let mut replica_state: ViewSyncReplicaTaskState<TYPES, I, A> =
ViewSyncReplicaTaskState {
current_view: certificate_internal.round,
next_view: certificate_internal.round,
relay: 0,
finalized: false,
sent_view_change_event: false,
phase: ViewSyncPhase::None,
exchange: self.exchange.clone(),
api: self.api.clone(),
event_stream: self.event_stream.clone(),
view_sync_timeout: self.view_sync_timeout,
id: self.id,
};

let result = replica_state.handle_event(event.clone()).await;

Expand Down Expand Up @@ -494,14 +495,21 @@ where
}

self.num_timeouts_tracked += 1;
error!("Num timeouts tracked is {}", self.num_timeouts_tracked);
error!(
"Num timeouts tracked is {}. View {} timed out",
self.num_timeouts_tracked, *view_number
);

if self.num_timeouts_tracked > 2 {
error!("Too many timeouts! This shouldn't happen");
}

// TODO ED Make this a configurable variable
if self.num_timeouts_tracked == 2 {
error!(
"Starting view sync protocol; attempting to sync on view {}",
*view_number + 1
);
// Start polling for view sync certificates
self.exchange
.network()
Expand All @@ -518,10 +526,29 @@ where
.await;
// panic!("Starting view sync!");
// Spawn replica task
let next_view = *view_number + 1;
// Subscribe to the view after we are leader since we know we won't propose in the next view if we are leader.
let subscribe_view = if self.exchange.is_leader(TYPES::Time::new(next_view)) {
next_view + 1
} else {
next_view
};
// Subscribe to the next view just in case there is progress being made
self.exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForProposal(
subscribe_view,
))
.await;

self.exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForDAC(subscribe_view))
.await;

let mut replica_state = ViewSyncReplicaTaskState {
current_view: self.current_view,
next_view: TYPES::Time::new(*view_number + 1),
next_view: TYPES::Time::new(next_view),
relay: 0,
finalized: false,
sent_view_change_event: false,
Expand All @@ -547,7 +574,8 @@ where

let name = format!(
"View Sync Replica Task: Attempting to enter view {:?} from view {:?}",
self.next_view, self.current_view
*view_number + 1,
*view_number
);

let replica_handle_event = HandleEvent(Arc::new(
Expand Down Expand Up @@ -648,7 +676,7 @@ where

// Ignore certificate if it is for an older round
if certificate_internal.round < self.next_view {
debug!("We're already in a higher round");
error!("We're already in a higher round");

return (None, self);
}
Expand All @@ -665,9 +693,9 @@ where
// If certificate is not valid, return current state
if !self
.exchange
.is_valid_view_sync_cert(message.data, certificate_internal.round)
.is_valid_view_sync_cert(message.data.clone(), certificate_internal.round)
{
error!("Not valid view sync cert!");
error!("Not valid view sync cert! {:?}", message.data);

return (None, self);
}
Expand Down Expand Up @@ -785,6 +813,7 @@ where
let phase = self.phase.clone();
async move {
async_sleep(self.view_sync_timeout).await;
error!("Vote sending timed out in ViewSyncCertificateRecv");
stream
.publish(SequencingHotShotEvent::ViewSyncTimeout(
TYPES::Time::new(*self.next_view),
Expand Down Expand Up @@ -846,6 +875,7 @@ where
let stream = self.event_stream.clone();
async move {
async_sleep(self.view_sync_timeout).await;
error!("Vote sending timed out in ViewSyncTrigger");
stream
.publish(SequencingHotShotEvent::ViewSyncTimeout(
TYPES::Time::new(*self.next_view),
Expand Down Expand Up @@ -916,6 +946,7 @@ where
let stream = self.event_stream.clone();
async move {
async_sleep(self.view_sync_timeout).await;
error!("Vote sending timed out in ViewSyncTimeout");
stream
.publish(SequencingHotShotEvent::ViewSyncTimeout(
TYPES::Time::new(*self.next_view),
Expand Down
3 changes: 1 addition & 2 deletions crates/testing/tests/web_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ async fn web_server_network() {
num_successful_views: 35,
..Default::default()
},
// allow more time to pass in CI
completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_millis(1_200_000),
duration: Duration::from_secs(20),
},
),
..TestMetadata::default()
Expand Down
9 changes: 2 additions & 7 deletions crates/types/src/traits/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,13 @@ pub trait ConsensusExchange<TYPES: NodeType, M: NetworkMsg>: Send + Sync {
/// The contents of a vote on `commit`.
fn vote_data(&self, commit: Self::Commitment) -> VoteData<Self::Commitment>;

/// Validate a QC.
fn is_valid_cert(&self, qc: &Self::Certificate, commit: Self::Commitment) -> bool {
/// Validate a certificate.
fn is_valid_cert(&self, qc: &Self::Certificate) -> bool {
if qc.is_genesis() && qc.view_number() == TYPES::Time::genesis() {
return true;
}
let leaf_commitment = qc.leaf_commitment();

if leaf_commitment != commit {
error!("Leaf commitment does not equal parent commitment");
return false;
}

match qc.signatures() {
AssembledSignature::DA(qc) => {
let real_commit = VoteData::DA(leaf_commitment).get_commit();
Expand Down
21 changes: 11 additions & 10 deletions crates/web_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
self.oldest_vote += 1;
}
}
let highest_index = self.vote_index.entry(view_number).or_insert(0);
let next_index = self.vote_index.entry(view_number).or_insert(0);
self.votes
.entry(view_number)
.and_modify(|current_votes| current_votes.push((*highest_index, vote.clone())))
.or_insert_with(|| vec![(*highest_index, vote)]);
.and_modify(|current_votes| current_votes.push((*next_index, vote.clone())))
.or_insert_with(|| vec![(*next_index, vote)]);
self.vote_index
.entry(view_number)
.and_modify(|index| *index += 1);
Expand All @@ -310,11 +310,11 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
self.oldest_view_sync_vote += 1;
}
}
let highest_index = self.view_sync_vote_index.entry(view_number).or_insert(0);
let next_index = self.view_sync_vote_index.entry(view_number).or_insert(0);
self.view_sync_votes
.entry(view_number)
.and_modify(|current_votes| current_votes.push((*highest_index, vote.clone())))
.or_insert_with(|| vec![(*highest_index, vote)]);
.and_modify(|current_votes| current_votes.push((*next_index, vote.clone())))
.or_insert_with(|| vec![(*next_index, vote)]);
self.view_sync_vote_index
.entry(view_number)
.and_modify(|index| *index += 1);
Expand Down Expand Up @@ -349,22 +349,23 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
) -> Result<(), Error> {
// Only keep proposal history for MAX_VIEWS number of view
if self.view_sync_proposals.len() >= MAX_VIEWS {
self.view_sync_proposals.remove(&self.oldest_view_sync_vote);
self.view_sync_proposals
.remove(&self.oldest_view_sync_proposal);
while !self
.view_sync_proposals
.contains_key(&self.oldest_view_sync_proposal)
{
self.oldest_view_sync_proposal += 1;
}
}
let highest_index = self
let next_index = self
.view_sync_proposal_index
.entry(view_number)
.or_insert(0);
self.view_sync_proposals
.entry(view_number)
.and_modify(|current_props| current_props.push((*highest_index, proposal.clone())))
.or_insert_with(|| vec![(*highest_index, proposal)]);
.and_modify(|current_props| current_props.push((*next_index, proposal.clone())))
.or_insert_with(|| vec![(*next_index, proposal)]);
self.view_sync_proposal_index
.entry(view_number)
.and_modify(|index| *index += 1);
Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
openssl.out
] ++ lib.optionals stdenv.isDarwin [
darwin.apple_sdk.frameworks.Security
darwin.apple_sdk.frameworks.CoreServices
pkgs.libiconv
darwin.apple_sdk.frameworks.SystemConfiguration
];
Expand Down
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ run_ci: lint build test

@async_std target *ARGS:
echo setting executor to async-std
export RUSTDOCFLAGS='--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std" {{original_rustdocflags}}' RUSTFLAGS='--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std" {{original_rustflags}}' && just {{target}} {{ARGS}}
export RUST_MIN_STACK=4194304 RUSTDOCFLAGS='--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std" {{original_rustdocflags}}' RUSTFLAGS='--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std" {{original_rustflags}}' && just {{target}} {{ARGS}}

build:
cargo build --verbose --workspace --examples --bins --tests --lib --benches
Expand Down

0 comments on commit 6bd7452

Please sign in to comment.