diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index df5102fe35..3c65f17386 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -1092,9 +1092,14 @@ impl, A: ConsensusApi + debug!("VID disperse data is not more than one view older."); let payload_commitment = disperse.data.payload_commitment; - // Check whether the data comes from the right leader for this view + // Check whether the data comes from the right leader for this view or + // the data was calculated and signed by the current node let view_leader_key = self.quorum_membership.get_leader(view); - if !view_leader_key.validate(&disperse.signature, payload_commitment.as_ref()) { + if !view_leader_key.validate(&disperse.signature, payload_commitment.as_ref()) + && !self + .public_key + .validate(&disperse.signature, payload_commitment.as_ref()) + { warn!("Could not verify VID dispersal/share sig."); return; } diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index d2d4452067..852c0567a6 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -17,7 +17,6 @@ use hotshot_types::{ simple_certificate::DACertificate, simple_vote::{DAData, DAVote}, traits::{ - block_contents::vid_commitment, consensus_api::ConsensusApi, election::Membership, network::{ConnectedNetwork, ConsensusIntentEvent}, @@ -31,6 +30,7 @@ use hotshot_types::{ use sha2::{Digest, Sha256}; use crate::vote_collection::HandleVoteEvent; +use hotshot_types::traits::block_contents::vid_commitment; use std::{marker::PhantomData, sync::Arc}; #[cfg(async_executor_impl = "tokio")] use tokio::task::spawn_blocking; @@ -120,13 +120,6 @@ impl, A: ConsensusApi + return None; } - let txns = proposal.data.encoded_transactions.clone(); - let num_nodes = self.quorum_membership.total_nodes(); - let payload_commitment = - spawn_blocking(move || vid_commitment(&txns, num_nodes)).await; - #[cfg(async_executor_impl = "tokio")] - let payload_commitment = payload_commitment.unwrap(); - let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions); // ED Is this the right leader? @@ -141,6 +134,13 @@ impl, A: ConsensusApi + return None; } + broadcast_event( + Arc::new(HotShotEvent::DAProposalValidated(proposal.clone(), sender)), + &event_stream, + ) + .await; + } + HotShotEvent::DAProposalValidated(proposal, sender) => { // Proposal is fresh and valid, notify the application layer self.api .send_event(Event { @@ -167,7 +167,14 @@ impl, A: ConsensusApi + ); return None; } + let txns = proposal.data.encoded_transactions.clone(); + let num_nodes = self.quorum_membership.total_nodes(); + let payload_commitment = + spawn_blocking(move || vid_commitment(&txns, num_nodes)).await; + #[cfg(async_executor_impl = "tokio")] + let payload_commitment = payload_commitment.unwrap(); + let view = proposal.data.get_view_number(); // Generate and send vote let Ok(vote) = DAVote::create_signed_vote( DAData { @@ -363,6 +370,7 @@ impl, A: ConsensusApi + | HotShotEvent::TransactionsSequenced(_, _, _) | HotShotEvent::Timeout(_) | HotShotEvent::ViewChange(_) + | HotShotEvent::DAProposalValidated(_, _) ) } diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index 9f21adf391..f22c8428bc 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -36,6 +36,8 @@ pub enum HotShotEvent { TimeoutVoteSend(TimeoutVote), /// A DA proposal has been received from the network; handled by the DA task DAProposalRecv(Proposal>, TYPES::SignatureKey), + /// A DA proposal has been validated; handled by the DA task and VID task + DAProposalValidated(Proposal>, TYPES::SignatureKey), /// A DA vote has been received by the network; handled by the DA task DAVoteRecv(DAVote), /// A Data Availability Certificate (DAC) has been recieved by the network; handled by the consensus task diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index 3e23e68ea8..7b394a7bb4 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -5,10 +5,11 @@ use async_lock::RwLock; #[cfg(async_executor_impl = "async-std")] use async_std::task::spawn_blocking; +use futures::future::join_all; use hotshot_task::task::{Task, TaskState}; use hotshot_types::{ consensus::Consensus, - data::VidDisperse, + data::{VidDisperse, VidDisperseShare}, message::Proposal, traits::{ consensus_api::ConsensusApi, @@ -154,6 +155,37 @@ impl, A: ConsensusApi + return None; } + HotShotEvent::DAProposalValidated(proposal, _sender) => { + let txns = proposal.data.encoded_transactions.clone(); + let num_nodes = self.membership.total_nodes(); + let vid_disperse = spawn_blocking(move || { + #[allow(clippy::panic)] + vid_scheme(num_nodes).disperse(&txns).unwrap_or_else(|err|panic!("VID disperse failure:\n\t(num_storage nodes,payload_byte_len)=({num_nodes},{})\n\terror: : {err}", txns.len())) + }) + .await; + #[cfg(async_executor_impl = "tokio")] + let vid_disperse = vid_disperse.unwrap(); + + let vid_disperse = VidDisperse::from_membership( + proposal.data.view_number, + vid_disperse, + &self.membership, + ); + + let vid_disperse_tasks = VidDisperseShare::from_vid_disperse(vid_disperse) + .into_iter() + .filter_map(|vid_share| { + Some(broadcast_event( + Arc::new(HotShotEvent::VidDisperseRecv( + vid_share.to_proposal(&self.private_key)?, + )), + &event_stream, + )) + }); + + join_all(vid_disperse_tasks).await; + } + HotShotEvent::Shutdown => { return Some(HotShotTaskCompleted); } @@ -188,6 +220,7 @@ impl, A: ConsensusApi + | HotShotEvent::TransactionsSequenced(_, _, _) | HotShotEvent::BlockReady(_, _) | HotShotEvent::ViewChange(_) + | HotShotEvent::DAProposalValidated(_, _) ) } fn should_shutdown(event: &Self::Event) -> bool { diff --git a/crates/testing/src/predicates.rs b/crates/testing/src/predicates.rs index e78fc698fb..3b2b1c4fbf 100644 --- a/crates/testing/src/predicates.rs +++ b/crates/testing/src/predicates.rs @@ -33,6 +33,25 @@ where } } +pub fn multi_exact( + events: Vec>, +) -> Vec>>> +where + TYPES: NodeType, +{ + events + .into_iter() + .map(|event| { + let event = Arc::new(event); + let info = format!("{:?}", event); + Predicate { + function: Box::new(move |e| e == &event), + info, + } + }) + .collect() +} + pub fn leaf_decided() -> Predicate>> where TYPES: NodeType, diff --git a/crates/testing/src/task_helpers.rs b/crates/testing/src/task_helpers.rs index 67273199c7..f582c7d2e3 100644 --- a/crates/testing/src/task_helpers.rs +++ b/crates/testing/src/task_helpers.rs @@ -398,7 +398,7 @@ pub fn build_vid_proposal( view_number: ViewNumber, transactions: Vec, private_key: &::PrivateKey, -) -> Proposal> { +) -> Vec>> { let mut vid = vid_scheme_from_view_number::(quorum_membership, view_number); let encoded_transactions = TestTransaction::encode(transactions.clone()).unwrap(); @@ -409,9 +409,13 @@ pub fn build_vid_proposal( ); VidDisperseShare::from_vid_disperse(vid_disperse) - .swap_remove(0) - .to_proposal(private_key) - .expect("Failed to sign payload commitment") + .into_iter() + .map(|vid_disperse| { + vid_disperse + .to_proposal(private_key) + .expect("Failed to sign payload commitment") + }) + .collect() } pub fn build_da_certificate( diff --git a/crates/testing/src/view_generator.rs b/crates/testing/src/view_generator.rs index 2e9c904c28..49dfd39c9b 100644 --- a/crates/testing/src/view_generator.rs +++ b/crates/testing/src/view_generator.rs @@ -42,7 +42,7 @@ pub struct TestView { pub view_number: ViewNumber, pub quorum_membership: ::Membership, pub vid_proposal: ( - Proposal>, + Vec>>, ::SignatureKey, ), pub leader_public_key: ::SignatureKey, diff --git a/crates/testing/tests/consensus_task.rs b/crates/testing/tests/consensus_task.rs index 9d83cc9247..45de005266 100644 --- a/crates/testing/tests/consensus_task.rs +++ b/crates/testing/tests/consensus_task.rs @@ -56,7 +56,7 @@ async fn test_consensus_task() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -135,7 +135,7 @@ async fn test_consensus_vote() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), QuorumVoteRecv(votes[0].clone()), ], outputs: vec![ @@ -187,7 +187,7 @@ async fn test_vote_with_specific_order(input_permutation: Vec) { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -199,7 +199,7 @@ async fn test_vote_with_specific_order(input_permutation: Vec) { let inputs = vec![ // We need a VID share for view 2 otherwise we cannot vote at view 2 (as node 2). - VidDisperseRecv(vids[1].0.clone()), + VidDisperseRecv(vids[1].0[0].clone()), DACRecv(dacs[1].clone()), QuorumProposalRecv(proposals[1].clone(), leaders[1]), ]; @@ -299,7 +299,7 @@ async fn test_view_sync_finalize_propose() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -422,7 +422,7 @@ async fn test_view_sync_finalize_vote() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -527,7 +527,7 @@ async fn test_view_sync_finalize_vote_fail_view_number() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -623,7 +623,7 @@ async fn test_vid_disperse_storage_failure() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), diff --git a/crates/testing/tests/da_task.rs b/crates/testing/tests/da_task.rs index d3b3f8de16..a660945c6d 100644 --- a/crates/testing/tests/da_task.rs +++ b/crates/testing/tests/da_task.rs @@ -64,15 +64,22 @@ async fn test_da_task() { asserts: vec![], }; - // Run view 2 and propose. + // Run view 2 and validate proposal. let view_2 = TestScriptStage { inputs: vec![DAProposalRecv(proposals[1].clone(), leaders[1])], + outputs: vec![exact(DAProposalValidated(proposals[1].clone(), leaders[1]))], + asserts: vec![], + }; + + // Run view 3 and vote + let view_3 = TestScriptStage { + inputs: vec![DAProposalValidated(proposals[1].clone(), leaders[1])], outputs: vec![exact(DAVoteSend(votes[1].clone()))], asserts: vec![], }; let da_state = DATaskState::>::create_from(&handle).await; - let stages = vec![view_1, view_2]; + let stages = vec![view_1, view_2, view_3]; run_test_script(stages, da_state).await; } @@ -124,9 +131,16 @@ async fn test_da_task_storage_failure() { asserts: vec![], }; - // Run view 2 and propose. + // Run view 2 and validate proposal. let view_2 = TestScriptStage { inputs: vec![DAProposalRecv(proposals[1].clone(), leaders[1])], + outputs: vec![exact(DAProposalValidated(proposals[1].clone(), leaders[1]))], + asserts: vec![], + }; + + // Run view 3 and propose. + let view_3 = TestScriptStage { + inputs: vec![DAProposalValidated(proposals[1].clone(), leaders[1])], outputs: vec![ /* No vote was sent due to the storage failure */ ], @@ -134,7 +148,7 @@ async fn test_da_task_storage_failure() { }; let da_state = DATaskState::>::create_from(&handle).await; - let stages = vec![view_1, view_2]; + let stages = vec![view_1, view_2, view_3]; run_test_script(stages, da_state).await; } diff --git a/crates/testing/tests/proposal_ordering.rs b/crates/testing/tests/proposal_ordering.rs index a96b16a9d2..833a63900d 100644 --- a/crates/testing/tests/proposal_ordering.rs +++ b/crates/testing/tests/proposal_ordering.rs @@ -52,7 +52,7 @@ async fn test_ordering_with_specific_order(input_permutation: Vec) { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), diff --git a/crates/testing/tests/upgrade_task.rs b/crates/testing/tests/upgrade_task.rs index 4f9f10358e..ecd4e88cdd 100644 --- a/crates/testing/tests/upgrade_task.rs +++ b/crates/testing/tests/upgrade_task.rs @@ -67,7 +67,7 @@ async fn test_upgrade_task() { let view_1 = TestScriptStage { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), DACRecv(dacs[0].clone()), ], outputs: vec![ @@ -80,7 +80,7 @@ async fn test_upgrade_task() { let view_2 = TestScriptStage { inputs: vec![ - VidDisperseRecv(vids[1].0.clone()), + VidDisperseRecv(vids[1].0[0].clone()), QuorumProposalRecv(proposals[1].clone(), leaders[1]), DACRecv(dacs[1].clone()), ], @@ -96,7 +96,7 @@ async fn test_upgrade_task() { inputs: vec![ QuorumProposalRecv(proposals[2].clone(), leaders[2]), DACRecv(dacs[2].clone()), - VidDisperseRecv(vids[2].0.clone()), + VidDisperseRecv(vids[2].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(3))), @@ -111,7 +111,7 @@ async fn test_upgrade_task() { inputs: vec![ QuorumProposalRecv(proposals[3].clone(), leaders[3]), DACRecv(dacs[3].clone()), - VidDisperseRecv(vids[3].0.clone()), + VidDisperseRecv(vids[3].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(4))), @@ -229,7 +229,7 @@ async fn test_upgrade_and_consensus_task() { let inputs = vec![ vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), DACRecv(dacs[0].clone()), ], upgrade_vote_recvs, @@ -237,7 +237,7 @@ async fn test_upgrade_and_consensus_task() { vec![ DACRecv(dacs[1].clone()), SendPayloadCommitmentAndMetadata( - vids[2].0.data.payload_commitment, + vids[2].0[0].data.payload_commitment, (), ViewNumber::new(2), ), diff --git a/crates/testing/tests/vid_task.rs b/crates/testing/tests/vid_task.rs index d084b0002e..ab9e0689e4 100644 --- a/crates/testing/tests/vid_task.rs +++ b/crates/testing/tests/vid_task.rs @@ -46,7 +46,7 @@ async fn test_vid_task() { view_number: ViewNumber::new(2), }; let message = Proposal { - data: proposal, + data: proposal.clone(), signature, _pd: PhantomData, }; @@ -62,10 +62,19 @@ async fn test_vid_task() { signature: message.signature.clone(), _pd: PhantomData, }; - let vid_share_proposal = VidDisperseShare::from_vid_disperse(vid_disperse.clone()) - .swap_remove(0) - .to_proposal(handle.private_key()) - .expect("Failed to sign block payload!"); + let vid_share_proposals: Vec<_> = VidDisperseShare::from_vid_disperse(vid_disperse.clone()) + .into_iter() + .map(|vid_disperse_share| { + vid_disperse_share + .to_proposal(handle.private_key()) + .expect("Failed to sign block payload!") + }) + .collect(); + let vid_share_proposal = vid_share_proposals[0].clone(); + let disperse_receives: Vec<_> = vid_share_proposals + .into_iter() + .map(HotShotEvent::VidDisperseRecv) + .collect(); let mut input = Vec::new(); let mut output = HashMap::new(); @@ -82,6 +91,10 @@ async fn test_vid_task() { vid_disperse.clone(), ViewNumber::new(2), )); + input.push(HotShotEvent::DAProposalValidated( + message, + *handle.public_key(), + )); input.push(HotShotEvent::VidDisperseSend(vid_proposal.clone(), pub_key)); input.push(HotShotEvent::VidDisperseRecv(vid_share_proposal.clone())); input.push(HotShotEvent::Shutdown); @@ -99,6 +112,9 @@ async fn test_vid_task() { HotShotEvent::VidDisperseSend(vid_proposal.clone(), pub_key), 1, ); + for disperse_receive in disperse_receives { + output.insert(disperse_receive, 1); + } let vid_state = VIDTaskState { api: handle.clone(),