Skip to content

Commit

Permalink
Lr/epoch tests (#3952)
Browse files Browse the repository at this point in the history
* Initial commit

* WIP: adding epoch to proposal and vote data, not compiling yet

* Make it compile

* Adjust tests

* Add a test type for two stake tables for even and odd epochs

* Debugging

* Fix extended voting

* Try "in epoch transition" approach

* Continue debugging

* Use correct epoch with Membership

* Adjust tests and lints

* Adapt to variable stake table after merge

* Fix accidentally pulled bug in eQC rule

* Commit includes epoch for vote and proposal data types

* Prune dependencies (#3787)

* add new message types and gate outgoing messages

* Use the proper message for the proposal response

* Modify commit for `Leaf2` and `QuorumData2`

* Adjust tests

* Clean up debug traces

* Initial commit for double quorum

* Add TODO

* Next epoch nodes vote during epoch transition

* Form the second QC at the end of an epoch

* Allow early payload save but check that's it's the same

* Attach next epoch justify qc to proposals

* Validate the next epoch justify qc

* Test with more network types

* Fix fmt in tests

* Use real threshold in the tests based on an epoch

* Membership thresholds depend on an epoch

* Make sure epoch transition proposals include the next epoch QC

* Use epoch from vote and add more tests

* Adjust marketplace ver number

* Epochs without Marketplace and adjust tests

* Add epoch to test_success

* Add debug traces

* try

* Adjust view change logic in transactions task

* Cleanup debug traces

* Don't chage view when voting for eQC

* Add a lot of traces to find a deadlock

* Keep the task for view one less than the current. We might still be transmitting

* Clean debug traces

* An epoch should not increment without view being incremented as well

* Use saturating_sub consistently

* Fix compiler error

* fix merge

* Add a new test

* Fixes after merge

* Fix vid share handling

* Submit transactions to the correct epoch

* Address review comments

* Fix compiler error

* Adjust test

* test_with_failures_2_with_epochs test uses only TestTwoStakeTablesTypes

* Modify test_epoch_end to use uneven number of nodes, it now fails

* VID share required target epoch as well

* VID share needs old epoch payload commitment

* Use odd number of nodes in tests

* Remove debug trace

---------

Co-authored-by: Artemii Gerasimovich <[email protected]>
Co-authored-by: ss-es <[email protected]>
Co-authored-by: Rob <[email protected]>
  • Loading branch information
4 people authored Dec 19, 2024
1 parent 71cf05d commit 3d705c6
Show file tree
Hide file tree
Showing 15 changed files with 336 additions and 85 deletions.
6 changes: 4 additions & 2 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
) {
let state = NetworkResponseState::<TYPES>::new(
handle.hotshot.consensus(),
Arc::clone(&handle.hotshot.memberships),
Arc::clone(&handle.memberships),
handle.public_key().clone(),
handle.private_key().clone(),
handle.hotshot.id,
Expand Down Expand Up @@ -156,7 +156,9 @@ pub fn add_network_message_task<
message = network.recv_message().fuse() => {
// Make sure the message did not fail
let message = match message {
Ok(message) => message,
Ok(message) => {
message
}
Err(e) => {
tracing::error!("Failed to receive message: {:?}", e);
continue;
Expand Down
1 change: 1 addition & 0 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
.marketplace_config
.fallback_builder_url
.clone(),
epoch_height: handle.epoch_height,
}
}
}
Expand Down
28 changes: 24 additions & 4 deletions crates/hotshot/src/traits/networking/push_cdn_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#[cfg(feature = "hotshot-testing")]
use std::sync::atomic::{AtomicBool, Ordering};
use std::{marker::PhantomData, sync::Arc};
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
#[cfg(feature = "hotshot-testing")]
use std::{path::Path, time::Duration};

Expand Down Expand Up @@ -46,6 +46,7 @@ use hotshot_types::{
BoxSyncFuture,
};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use parking_lot::Mutex;
#[cfg(feature = "hotshot-testing")]
use rand::{rngs::StdRng, RngCore, SeedableRng};
use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
Expand Down Expand Up @@ -191,6 +192,10 @@ pub struct PushCdnNetwork<K: SignatureKey + 'static> {
client: Client<ClientDef<K>>,
/// The CDN-specific metrics
metrics: Arc<CdnMetricsValue>,
/// The internal queue for messages to ourselves
internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
/// The public key of this node
public_key: K,
/// Whether or not the underlying network is supposed to be paused
#[cfg(feature = "hotshot-testing")]
is_paused: Arc<AtomicBool>,
Expand Down Expand Up @@ -229,7 +234,7 @@ impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
let config = ClientConfig {
endpoint: marshal_endpoint,
subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
keypair,
keypair: keypair.clone(),
use_local_authority: true,
};

Expand All @@ -239,6 +244,8 @@ impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
Ok(Self {
client,
metrics: Arc::from(metrics),
internal_queue: Arc::new(Mutex::new(VecDeque::new())),
public_key: keypair.public_key.0,
// Start unpaused
#[cfg(feature = "hotshot-testing")]
is_paused: Arc::from(AtomicBool::new(false)),
Expand Down Expand Up @@ -422,7 +429,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
ClientConfig {
keypair: KeyPair {
public_key: WrappedSignatureKey(public_key),
public_key: WrappedSignatureKey(public_key.clone()),
private_key,
},
subscribed_topics: topics,
Expand All @@ -434,6 +441,8 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
Arc::new(PushCdnNetwork {
client: Client::new(client_config),
metrics: Arc::new(CdnMetricsValue::default()),
internal_queue: Arc::new(Mutex::new(VecDeque::new())),
public_key,
#[cfg(feature = "hotshot-testing")]
is_paused: Arc::from(AtomicBool::new(false)),
})
Expand Down Expand Up @@ -533,6 +542,12 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
return Ok(());
}

// If the message is to ourselves, just add it to the internal queue
if recipient == self.public_key {
self.internal_queue.lock().push_back(message);
return Ok(());
}

// Send the message
if let Err(e) = self
.client
Expand All @@ -554,7 +569,12 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
/// # Errors
/// - If we fail to receive messages. Will trigger a retry automatically.
async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
// Receive a message
// If we have a message in the internal queue, return it
if let Some(message) = self.internal_queue.lock().pop_front() {
return Ok(message);
}

// Receive a message from the network
let message = self.client.receive_message().await;

// If we're paused, receive but don't process messages
Expand Down
1 change: 0 additions & 1 deletion crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ impl<
) -> std::result::Result<(), ()> {
if let Some(mut action) = maybe_action {
if !consensus.write().await.update_action(action, view) {
tracing::warn!("Already actioned {:?} in view {:?}", action, view);
return Err(());
}
// If the action was view sync record it as a vote, but we don't
Expand Down
77 changes: 50 additions & 27 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,13 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
}
}
HotShotEvent::VidShareValidated(share) => {
let vid_payload_commitment = &share.data.payload_commitment;
let vid_payload_commitment = if let Some(ref data_epoch_payload_commitment) =
share.data.data_epoch_payload_commitment
{
data_epoch_payload_commitment
} else {
&share.data.payload_commitment
};
vid_share = Some(share.clone());
if let Some(ref comm) = payload_commitment {
if vid_payload_commitment != comm {
Expand Down Expand Up @@ -372,8 +378,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
view_number: TYPES::View,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
event: Option<Arc<HotShotEvent<TYPES>>>,
event: Arc<HotShotEvent<TYPES>>,
) {
tracing::debug!(
"Attempting to make dependency task for view {view_number:?} and event {event:?}"
);

if self.vote_dependencies.contains_key(&view_number) {
return;
}
Expand All @@ -388,10 +398,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
let vid_dependency =
self.create_event_dependency(VoteDependency::Vid, view_number, event_receiver.clone());
// If we have an event provided to us
if let Some(event) = event {
if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
quorum_proposal_dependency.mark_as_completed(event);
}
if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
quorum_proposal_dependency.mark_as_completed(event);
}

let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
Expand Down Expand Up @@ -500,7 +508,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
proposal.data.view_number,
event_receiver,
&event_sender,
Some(Arc::clone(&event)),
Arc::clone(&event),
);
}
}
Expand Down Expand Up @@ -544,7 +552,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
&event_sender.clone(),
)
.await;
self.create_dependency_task_if_new(view, event_receiver, &event_sender, None);
self.create_dependency_task_if_new(
view,
event_receiver,
&event_sender,
Arc::clone(&event),
);
}
HotShotEvent::VidShareRecv(sender, disperse) => {
let view = disperse.data.view_number();
Expand All @@ -557,25 +570,25 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS

// Validate the VID share.
let payload_commitment = &disperse.data.payload_commitment;
let disperse_epoch = disperse.data.epoch;

// Check that the signature is valid
ensure!(
sender.validate(&disperse.signature, payload_commitment.as_ref()),
"VID share signature is invalid"
);

let vid_epoch = disperse.data.epoch;
let target_epoch = disperse.data.target_epoch;
let membership_reader = self.membership.read().await;
// ensure that the VID share was sent by a DA member OR the view leader
ensure!(
membership_reader
.da_committee_members(view, disperse_epoch)
.da_committee_members(view, vid_epoch)
.contains(sender)
|| *sender == membership_reader.leader(view, disperse_epoch)?,
|| *sender == membership_reader.leader(view, vid_epoch)?,
"VID share was not sent by a DA member or the view leader."
);

let membership_total_nodes = membership_reader.total_nodes(disperse_epoch);
let membership_total_nodes = membership_reader.total_nodes(target_epoch);
drop(membership_reader);

// NOTE: `verify_share` returns a nested `Result`, so we must check both the inner
Expand Down Expand Up @@ -606,7 +619,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
&event_sender.clone(),
)
.await;
self.create_dependency_task_if_new(view, event_receiver, &event_sender, None);
self.create_dependency_task_if_new(
view,
event_receiver,
&event_sender,
Arc::clone(&event),
);
}
HotShotEvent::Timeout(view, ..) => {
let view = TYPES::View::new(view.saturating_sub(1));
Expand Down Expand Up @@ -717,25 +735,30 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
current_block_number,
self.epoch_height,
));
tracing::trace!(
"Sending ViewChange for view {} and epoch {}",
proposal.data.view_number() + 1,
*current_epoch
);
broadcast_event(
Arc::new(HotShotEvent::ViewChange(
proposal.data.view_number() + 1,
current_epoch,
)),
&event_sender,
)
.await;

let is_vote_leaf_extended = self
.consensus
.read()
.await
.is_leaf_extended(proposed_leaf.commit());
if !is_vote_leaf_extended {
// We're voting for the proposal that will probably form the eQC. We don't want to change
// the view here because we will probably change it when we form the eQC.
// The main reason is to handle view change event only once in the transaction task.
tracing::trace!(
"Sending ViewChange for view {} and epoch {}",
proposal.data.view_number() + 1,
*current_epoch
);
broadcast_event(
Arc::new(HotShotEvent::ViewChange(
proposal.data.view_number() + 1,
current_epoch,
)),
&event_sender,
)
.await;
}
if let Err(e) = submit_vote::<TYPES, I, V>(
event_sender.clone(),
Arc::clone(&self.membership),
Expand Down
6 changes: 4 additions & 2 deletions crates/task-impls/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I
view,
signature,
};
let my_id = self.id;
let handle: JoinHandle<()> = spawn(async move {
// Do the delay only if primary is up and then start sending
if !network.is_primary_down() {
Expand Down Expand Up @@ -261,8 +262,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I
} else {
// This shouldnt be possible `recipients_it.next()` should clone original and start over if `None`
tracing::warn!(
"Sent VID request to all available DA members and got no response for view: {:?}",
view
"Sent VID request to all available DA members and got no response for view: {:?}, my id: {:?}",
view,
my_id,
);
return;
}
Expand Down
17 changes: 12 additions & 5 deletions crates/task-impls/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ pub struct TransactionTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V

/// fallback builder url
pub fallback_builder_url: Url,

/// Number of blocks in an epoch, zero means there are no epochs
pub epoch_height: u64,
}

impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TransactionTaskState<TYPES, I, V> {
Expand Down Expand Up @@ -477,21 +480,25 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TransactionTask
}
HotShotEvent::ViewChange(view, epoch) => {
let view = TYPES::View::new(std::cmp::max(1, **view));

let epoch = if self.epoch_height != 0 {
TYPES::Epoch::new(std::cmp::max(1, **epoch))
} else {
*epoch
};
ensure!(
*view > *self.cur_view || *epoch > self.cur_epoch,
*view > *self.cur_view && *epoch >= *self.cur_epoch,
debug!(
"Received a view change to an older view and epoch: tried to change view to {:?}\
and epoch {:?} though we are at view {:?} and epoch {:?}",
view, epoch, self.cur_view, self.cur_epoch
)
);
self.cur_view = view;
self.cur_epoch = *epoch;
self.cur_epoch = epoch;

let leader = self.membership.read().await.leader(view, *epoch)?;
let leader = self.membership.read().await.leader(view, epoch)?;
if leader == self.public_key {
self.handle_view_change(&event_stream, view, *epoch).await;
self.handle_view_change(&event_stream, view, epoch).await;
return Ok(());
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/task-impls/src/vid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> VidTaskState<TYPES, I> {
&Arc::clone(&self.membership),
*view_number,
epoch,
None,
epoch,
vid_precompute.clone(),
)
.await;
Expand Down Expand Up @@ -208,7 +208,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> VidTaskState<TYPES, I> {
&Arc::clone(&self.membership),
proposal_view_number,
target_epoch,
Some(sender_epoch),
sender_epoch,
None,
)
.await;
Expand Down
1 change: 1 addition & 0 deletions crates/testing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub async fn build_vid_proposal<TYPES: NodeType>(
vid.disperse(&encoded_transactions).unwrap(),
membership,
epoch_number,
epoch_number,
None,
)
.await;
Expand Down
Loading

0 comments on commit 3d705c6

Please sign in to comment.