From 4b542b6fb1f37d9a1ffabadc687966c8a719c80e Mon Sep 17 00:00:00 2001 From: Lukasz Rzasik Date: Mon, 25 Mar 2024 16:58:34 +0100 Subject: [PATCH 1/4] Calculate VID shared locally --- crates/task-impls/src/consensus.rs | 9 ++++++-- crates/task-impls/src/da.rs | 36 +++++++++++++++++++++++++----- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index a553b70d3b..c94cec0993 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -1051,9 +1051,14 @@ impl, A: ConsensusApi + debug!("VID disperse data is not more than one view older."); let payload_commitment = disperse.data.payload_commitment; - // Check whether the data comes from the right leader for this view + // Check whether the data comes from the right leader for this view or + // the data was calculated and signed by the current node let view_leader_key = self.quorum_membership.get_leader(view); - if !view_leader_key.validate(&disperse.signature, payload_commitment.as_ref()) { + if !view_leader_key.validate(&disperse.signature, payload_commitment.as_ref()) + && !self + .public_key + .validate(&disperse.signature, payload_commitment.as_ref()) + { warn!("Could not verify VID dispersal/share sig."); return; } diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index d2d4452067..5ed2579ffd 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -17,7 +17,6 @@ use hotshot_types::{ simple_certificate::DACertificate, simple_vote::{DAData, DAVote}, traits::{ - block_contents::vid_commitment, consensus_api::ConsensusApi, election::Membership, network::{ConnectedNetwork, ConsensusIntentEvent}, @@ -31,6 +30,10 @@ use hotshot_types::{ use sha2::{Digest, Sha256}; use crate::vote_collection::HandleVoteEvent; +use futures::future::join_all; +use hotshot_types::data::{VidDisperse, VidDisperseShare}; +use hotshot_types::vid::vid_scheme; +use jf_primitives::vid::VidScheme; use std::{marker::PhantomData, sync::Arc}; #[cfg(async_executor_impl = "tokio")] use tokio::task::spawn_blocking; @@ -122,10 +125,29 @@ impl, A: ConsensusApi + let txns = proposal.data.encoded_transactions.clone(); let num_nodes = self.quorum_membership.total_nodes(); - let payload_commitment = - spawn_blocking(move || vid_commitment(&txns, num_nodes)).await; - #[cfg(async_executor_impl = "tokio")] - let payload_commitment = payload_commitment.unwrap(); + let vid_disperse = spawn_blocking(move || { + #[allow(clippy::panic)] + vid_scheme(num_nodes).disperse(&txns).unwrap_or_else(|err|panic!("VID disperse failure:\n\t(num_storage nodes,payload_byte_len)=({num_nodes},{})\n\terror: : {err}", txns.len())) + }) + .await; + + let vid_disperse = VidDisperse::from_membership( + proposal.data.view_number, + vid_disperse, + &self.quorum_membership, + ); + let payload_commitment = vid_disperse.payload_commitment; + let vid_disperse_tasks: Vec<_> = VidDisperseShare::from_vid_disperse(vid_disperse) + .into_iter() + .map(|vid_share| { + Some(broadcast_event( + Arc::new(HotShotEvent::VidDisperseRecv( + vid_share.to_proposal(&self.private_key)?, + )), + &event_stream, + )) + }) + .collect(); let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions); @@ -189,6 +211,10 @@ impl, A: ConsensusApi + broadcast_event(Arc::new(HotShotEvent::DAVoteSend(vote)), &event_stream).await; let mut consensus = self.consensus.write().await; + if vid_disperse_tasks.iter().all(Option::is_some) { + join_all(vid_disperse_tasks.into_iter().flatten()).await; + } + // Ensure this view is in the view map for garbage collection, but do not overwrite if // there is already a view there: the replica task may have inserted a `Leaf` view which // contains strictly more information. From 3e137fdd6bdbfb62c131b80029762078d5c172c7 Mon Sep 17 00:00:00 2001 From: Lukasz Rzasik Date: Mon, 25 Mar 2024 21:03:31 +0100 Subject: [PATCH 2/4] Adjust to tokio and adjust tests --- crates/task-impls/src/da.rs | 2 ++ crates/testing/src/predicates.rs | 19 +++++++++++++++++++ crates/testing/src/task_helpers.rs | 12 ++++++++---- crates/testing/src/view_generator.rs | 2 +- crates/testing/tests/consensus_task.rs | 16 ++++++++-------- crates/testing/tests/da_task.rs | 8 ++++++-- crates/testing/tests/proposal_ordering.rs | 2 +- crates/testing/tests/upgrade_task.rs | 12 ++++++------ 8 files changed, 51 insertions(+), 22 deletions(-) diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 5ed2579ffd..70496c5e1a 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -130,6 +130,8 @@ impl, A: ConsensusApi + vid_scheme(num_nodes).disperse(&txns).unwrap_or_else(|err|panic!("VID disperse failure:\n\t(num_storage nodes,payload_byte_len)=({num_nodes},{})\n\terror: : {err}", txns.len())) }) .await; + #[cfg(async_executor_impl = "tokio")] + let vid_disperse = vid_disperse.unwrap(); let vid_disperse = VidDisperse::from_membership( proposal.data.view_number, diff --git a/crates/testing/src/predicates.rs b/crates/testing/src/predicates.rs index e78fc698fb..3b2b1c4fbf 100644 --- a/crates/testing/src/predicates.rs +++ b/crates/testing/src/predicates.rs @@ -33,6 +33,25 @@ where } } +pub fn multi_exact( + events: Vec>, +) -> Vec>>> +where + TYPES: NodeType, +{ + events + .into_iter() + .map(|event| { + let event = Arc::new(event); + let info = format!("{:?}", event); + Predicate { + function: Box::new(move |e| e == &event), + info, + } + }) + .collect() +} + pub fn leaf_decided() -> Predicate>> where TYPES: NodeType, diff --git a/crates/testing/src/task_helpers.rs b/crates/testing/src/task_helpers.rs index 67273199c7..f582c7d2e3 100644 --- a/crates/testing/src/task_helpers.rs +++ b/crates/testing/src/task_helpers.rs @@ -398,7 +398,7 @@ pub fn build_vid_proposal( view_number: ViewNumber, transactions: Vec, private_key: &::PrivateKey, -) -> Proposal> { +) -> Vec>> { let mut vid = vid_scheme_from_view_number::(quorum_membership, view_number); let encoded_transactions = TestTransaction::encode(transactions.clone()).unwrap(); @@ -409,9 +409,13 @@ pub fn build_vid_proposal( ); VidDisperseShare::from_vid_disperse(vid_disperse) - .swap_remove(0) - .to_proposal(private_key) - .expect("Failed to sign payload commitment") + .into_iter() + .map(|vid_disperse| { + vid_disperse + .to_proposal(private_key) + .expect("Failed to sign payload commitment") + }) + .collect() } pub fn build_da_certificate( diff --git a/crates/testing/src/view_generator.rs b/crates/testing/src/view_generator.rs index 2e9c904c28..49dfd39c9b 100644 --- a/crates/testing/src/view_generator.rs +++ b/crates/testing/src/view_generator.rs @@ -42,7 +42,7 @@ pub struct TestView { pub view_number: ViewNumber, pub quorum_membership: ::Membership, pub vid_proposal: ( - Proposal>, + Vec>>, ::SignatureKey, ), pub leader_public_key: ::SignatureKey, diff --git a/crates/testing/tests/consensus_task.rs b/crates/testing/tests/consensus_task.rs index 9d83cc9247..45de005266 100644 --- a/crates/testing/tests/consensus_task.rs +++ b/crates/testing/tests/consensus_task.rs @@ -56,7 +56,7 @@ async fn test_consensus_task() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -135,7 +135,7 @@ async fn test_consensus_vote() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), QuorumVoteRecv(votes[0].clone()), ], outputs: vec![ @@ -187,7 +187,7 @@ async fn test_vote_with_specific_order(input_permutation: Vec) { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -199,7 +199,7 @@ async fn test_vote_with_specific_order(input_permutation: Vec) { let inputs = vec![ // We need a VID share for view 2 otherwise we cannot vote at view 2 (as node 2). - VidDisperseRecv(vids[1].0.clone()), + VidDisperseRecv(vids[1].0[0].clone()), DACRecv(dacs[1].clone()), QuorumProposalRecv(proposals[1].clone(), leaders[1]), ]; @@ -299,7 +299,7 @@ async fn test_view_sync_finalize_propose() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -422,7 +422,7 @@ async fn test_view_sync_finalize_vote() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -527,7 +527,7 @@ async fn test_view_sync_finalize_vote_fail_view_number() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), @@ -623,7 +623,7 @@ async fn test_vid_disperse_storage_failure() { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), diff --git a/crates/testing/tests/da_task.rs b/crates/testing/tests/da_task.rs index d3b3f8de16..84efc79346 100644 --- a/crates/testing/tests/da_task.rs +++ b/crates/testing/tests/da_task.rs @@ -7,7 +7,7 @@ use hotshot_example_types::{ use hotshot_task_impls::da::DATaskState; use hotshot_task_impls::events::HotShotEvent::*; use hotshot_testing::{ - predicates::exact, + predicates::{exact, multi_exact}, script::{run_test_script, TestScriptStage}, task_helpers::build_system_handle, view_generator::TestViewGenerator, @@ -65,9 +65,13 @@ async fn test_da_task() { }; // Run view 2 and propose. + let disperse_receives: Vec<_> = vids[1].0.clone().into_iter().map(VidDisperseRecv).collect(); + let mut multi_predicate = multi_exact(disperse_receives); + let mut output_predicates = vec![exact(DAVoteSend(votes[1].clone()))]; + output_predicates.append(&mut multi_predicate); let view_2 = TestScriptStage { inputs: vec![DAProposalRecv(proposals[1].clone(), leaders[1])], - outputs: vec![exact(DAVoteSend(votes[1].clone()))], + outputs: output_predicates, asserts: vec![], }; diff --git a/crates/testing/tests/proposal_ordering.rs b/crates/testing/tests/proposal_ordering.rs index a96b16a9d2..833a63900d 100644 --- a/crates/testing/tests/proposal_ordering.rs +++ b/crates/testing/tests/proposal_ordering.rs @@ -52,7 +52,7 @@ async fn test_ordering_with_specific_order(input_permutation: Vec) { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), DACRecv(dacs[0].clone()), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(1))), diff --git a/crates/testing/tests/upgrade_task.rs b/crates/testing/tests/upgrade_task.rs index 4f9f10358e..ecd4e88cdd 100644 --- a/crates/testing/tests/upgrade_task.rs +++ b/crates/testing/tests/upgrade_task.rs @@ -67,7 +67,7 @@ async fn test_upgrade_task() { let view_1 = TestScriptStage { inputs: vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), DACRecv(dacs[0].clone()), ], outputs: vec![ @@ -80,7 +80,7 @@ async fn test_upgrade_task() { let view_2 = TestScriptStage { inputs: vec![ - VidDisperseRecv(vids[1].0.clone()), + VidDisperseRecv(vids[1].0[0].clone()), QuorumProposalRecv(proposals[1].clone(), leaders[1]), DACRecv(dacs[1].clone()), ], @@ -96,7 +96,7 @@ async fn test_upgrade_task() { inputs: vec![ QuorumProposalRecv(proposals[2].clone(), leaders[2]), DACRecv(dacs[2].clone()), - VidDisperseRecv(vids[2].0.clone()), + VidDisperseRecv(vids[2].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(3))), @@ -111,7 +111,7 @@ async fn test_upgrade_task() { inputs: vec![ QuorumProposalRecv(proposals[3].clone(), leaders[3]), DACRecv(dacs[3].clone()), - VidDisperseRecv(vids[3].0.clone()), + VidDisperseRecv(vids[3].0[0].clone()), ], outputs: vec![ exact(ViewChange(ViewNumber::new(4))), @@ -229,7 +229,7 @@ async fn test_upgrade_and_consensus_task() { let inputs = vec![ vec![ QuorumProposalRecv(proposals[0].clone(), leaders[0]), - VidDisperseRecv(vids[0].0.clone()), + VidDisperseRecv(vids[0].0[0].clone()), DACRecv(dacs[0].clone()), ], upgrade_vote_recvs, @@ -237,7 +237,7 @@ async fn test_upgrade_and_consensus_task() { vec![ DACRecv(dacs[1].clone()), SendPayloadCommitmentAndMetadata( - vids[2].0.data.payload_commitment, + vids[2].0[0].data.payload_commitment, (), ViewNumber::new(2), ), From 97c4fd96a67ff92671e591525f9cce3da841c871 Mon Sep 17 00:00:00 2001 From: Lukasz Rzasik Date: Tue, 26 Mar 2024 17:04:10 +0100 Subject: [PATCH 3/4] Split DAProsal validation and voting --- crates/task-impls/src/da.rs | 52 ++++++++++---------------------- crates/task-impls/src/events.rs | 2 ++ crates/task-impls/src/vid.rs | 38 ++++++++++++++++++++++- crates/testing/tests/da_task.rs | 30 ++++++++++++------ crates/testing/tests/vid_task.rs | 26 +++++++++++++--- 5 files changed, 96 insertions(+), 52 deletions(-) diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 70496c5e1a..852c0567a6 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -30,10 +30,7 @@ use hotshot_types::{ use sha2::{Digest, Sha256}; use crate::vote_collection::HandleVoteEvent; -use futures::future::join_all; -use hotshot_types::data::{VidDisperse, VidDisperseShare}; -use hotshot_types::vid::vid_scheme; -use jf_primitives::vid::VidScheme; +use hotshot_types::traits::block_contents::vid_commitment; use std::{marker::PhantomData, sync::Arc}; #[cfg(async_executor_impl = "tokio")] use tokio::task::spawn_blocking; @@ -123,34 +120,6 @@ impl, A: ConsensusApi + return None; } - let txns = proposal.data.encoded_transactions.clone(); - let num_nodes = self.quorum_membership.total_nodes(); - let vid_disperse = spawn_blocking(move || { - #[allow(clippy::panic)] - vid_scheme(num_nodes).disperse(&txns).unwrap_or_else(|err|panic!("VID disperse failure:\n\t(num_storage nodes,payload_byte_len)=({num_nodes},{})\n\terror: : {err}", txns.len())) - }) - .await; - #[cfg(async_executor_impl = "tokio")] - let vid_disperse = vid_disperse.unwrap(); - - let vid_disperse = VidDisperse::from_membership( - proposal.data.view_number, - vid_disperse, - &self.quorum_membership, - ); - let payload_commitment = vid_disperse.payload_commitment; - let vid_disperse_tasks: Vec<_> = VidDisperseShare::from_vid_disperse(vid_disperse) - .into_iter() - .map(|vid_share| { - Some(broadcast_event( - Arc::new(HotShotEvent::VidDisperseRecv( - vid_share.to_proposal(&self.private_key)?, - )), - &event_stream, - )) - }) - .collect(); - let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions); // ED Is this the right leader? @@ -165,6 +134,13 @@ impl, A: ConsensusApi + return None; } + broadcast_event( + Arc::new(HotShotEvent::DAProposalValidated(proposal.clone(), sender)), + &event_stream, + ) + .await; + } + HotShotEvent::DAProposalValidated(proposal, sender) => { // Proposal is fresh and valid, notify the application layer self.api .send_event(Event { @@ -191,7 +167,14 @@ impl, A: ConsensusApi + ); return None; } + let txns = proposal.data.encoded_transactions.clone(); + let num_nodes = self.quorum_membership.total_nodes(); + let payload_commitment = + spawn_blocking(move || vid_commitment(&txns, num_nodes)).await; + #[cfg(async_executor_impl = "tokio")] + let payload_commitment = payload_commitment.unwrap(); + let view = proposal.data.get_view_number(); // Generate and send vote let Ok(vote) = DAVote::create_signed_vote( DAData { @@ -213,10 +196,6 @@ impl, A: ConsensusApi + broadcast_event(Arc::new(HotShotEvent::DAVoteSend(vote)), &event_stream).await; let mut consensus = self.consensus.write().await; - if vid_disperse_tasks.iter().all(Option::is_some) { - join_all(vid_disperse_tasks.into_iter().flatten()).await; - } - // Ensure this view is in the view map for garbage collection, but do not overwrite if // there is already a view there: the replica task may have inserted a `Leaf` view which // contains strictly more information. @@ -391,6 +370,7 @@ impl, A: ConsensusApi + | HotShotEvent::TransactionsSequenced(_, _, _) | HotShotEvent::Timeout(_) | HotShotEvent::ViewChange(_) + | HotShotEvent::DAProposalValidated(_, _) ) } diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index 9f21adf391..f22c8428bc 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -36,6 +36,8 @@ pub enum HotShotEvent { TimeoutVoteSend(TimeoutVote), /// A DA proposal has been received from the network; handled by the DA task DAProposalRecv(Proposal>, TYPES::SignatureKey), + /// A DA proposal has been validated; handled by the DA task and VID task + DAProposalValidated(Proposal>, TYPES::SignatureKey), /// A DA vote has been received by the network; handled by the DA task DAVoteRecv(DAVote), /// A Data Availability Certificate (DAC) has been recieved by the network; handled by the consensus task diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index 3e23e68ea8..41ce95256f 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -5,10 +5,11 @@ use async_lock::RwLock; #[cfg(async_executor_impl = "async-std")] use async_std::task::spawn_blocking; +use futures::future::join_all; use hotshot_task::task::{Task, TaskState}; use hotshot_types::{ consensus::Consensus, - data::VidDisperse, + data::{VidDisperse, VidDisperseShare}, message::Proposal, traits::{ consensus_api::ConsensusApi, @@ -154,6 +155,40 @@ impl, A: ConsensusApi + return None; } + HotShotEvent::DAProposalValidated(proposal, _sender) => { + let txns = proposal.data.encoded_transactions.clone(); + let num_nodes = self.membership.total_nodes(); + let vid_disperse = spawn_blocking(move || { + #[allow(clippy::panic)] + vid_scheme(num_nodes).disperse(&txns).unwrap_or_else(|err|panic!("VID disperse failure:\n\t(num_storage nodes,payload_byte_len)=({num_nodes},{})\n\terror: : {err}", txns.len())) + }) + .await; + #[cfg(async_executor_impl = "tokio")] + let vid_disperse = vid_disperse.unwrap(); + + let vid_disperse = VidDisperse::from_membership( + proposal.data.view_number, + vid_disperse, + &self.membership, + ); + + let vid_disperse_tasks: Vec<_> = VidDisperseShare::from_vid_disperse(vid_disperse) + .into_iter() + .map(|vid_share| { + Some(broadcast_event( + Arc::new(HotShotEvent::VidDisperseRecv( + vid_share.to_proposal(&self.private_key)?, + )), + &event_stream, + )) + }) + .collect(); + + if vid_disperse_tasks.iter().all(Option::is_some) { + join_all(vid_disperse_tasks.into_iter().flatten()).await; + } + } + HotShotEvent::Shutdown => { return Some(HotShotTaskCompleted); } @@ -188,6 +223,7 @@ impl, A: ConsensusApi + | HotShotEvent::TransactionsSequenced(_, _, _) | HotShotEvent::BlockReady(_, _) | HotShotEvent::ViewChange(_) + | HotShotEvent::DAProposalValidated(_, _) ) } fn should_shutdown(event: &Self::Event) -> bool { diff --git a/crates/testing/tests/da_task.rs b/crates/testing/tests/da_task.rs index 84efc79346..a660945c6d 100644 --- a/crates/testing/tests/da_task.rs +++ b/crates/testing/tests/da_task.rs @@ -7,7 +7,7 @@ use hotshot_example_types::{ use hotshot_task_impls::da::DATaskState; use hotshot_task_impls::events::HotShotEvent::*; use hotshot_testing::{ - predicates::{exact, multi_exact}, + predicates::exact, script::{run_test_script, TestScriptStage}, task_helpers::build_system_handle, view_generator::TestViewGenerator, @@ -64,19 +64,22 @@ async fn test_da_task() { asserts: vec![], }; - // Run view 2 and propose. - let disperse_receives: Vec<_> = vids[1].0.clone().into_iter().map(VidDisperseRecv).collect(); - let mut multi_predicate = multi_exact(disperse_receives); - let mut output_predicates = vec![exact(DAVoteSend(votes[1].clone()))]; - output_predicates.append(&mut multi_predicate); + // Run view 2 and validate proposal. let view_2 = TestScriptStage { inputs: vec![DAProposalRecv(proposals[1].clone(), leaders[1])], - outputs: output_predicates, + outputs: vec![exact(DAProposalValidated(proposals[1].clone(), leaders[1]))], + asserts: vec![], + }; + + // Run view 3 and vote + let view_3 = TestScriptStage { + inputs: vec![DAProposalValidated(proposals[1].clone(), leaders[1])], + outputs: vec![exact(DAVoteSend(votes[1].clone()))], asserts: vec![], }; let da_state = DATaskState::>::create_from(&handle).await; - let stages = vec![view_1, view_2]; + let stages = vec![view_1, view_2, view_3]; run_test_script(stages, da_state).await; } @@ -128,9 +131,16 @@ async fn test_da_task_storage_failure() { asserts: vec![], }; - // Run view 2 and propose. + // Run view 2 and validate proposal. let view_2 = TestScriptStage { inputs: vec![DAProposalRecv(proposals[1].clone(), leaders[1])], + outputs: vec![exact(DAProposalValidated(proposals[1].clone(), leaders[1]))], + asserts: vec![], + }; + + // Run view 3 and propose. + let view_3 = TestScriptStage { + inputs: vec![DAProposalValidated(proposals[1].clone(), leaders[1])], outputs: vec![ /* No vote was sent due to the storage failure */ ], @@ -138,7 +148,7 @@ async fn test_da_task_storage_failure() { }; let da_state = DATaskState::>::create_from(&handle).await; - let stages = vec![view_1, view_2]; + let stages = vec![view_1, view_2, view_3]; run_test_script(stages, da_state).await; } diff --git a/crates/testing/tests/vid_task.rs b/crates/testing/tests/vid_task.rs index d084b0002e..ab9e0689e4 100644 --- a/crates/testing/tests/vid_task.rs +++ b/crates/testing/tests/vid_task.rs @@ -46,7 +46,7 @@ async fn test_vid_task() { view_number: ViewNumber::new(2), }; let message = Proposal { - data: proposal, + data: proposal.clone(), signature, _pd: PhantomData, }; @@ -62,10 +62,19 @@ async fn test_vid_task() { signature: message.signature.clone(), _pd: PhantomData, }; - let vid_share_proposal = VidDisperseShare::from_vid_disperse(vid_disperse.clone()) - .swap_remove(0) - .to_proposal(handle.private_key()) - .expect("Failed to sign block payload!"); + let vid_share_proposals: Vec<_> = VidDisperseShare::from_vid_disperse(vid_disperse.clone()) + .into_iter() + .map(|vid_disperse_share| { + vid_disperse_share + .to_proposal(handle.private_key()) + .expect("Failed to sign block payload!") + }) + .collect(); + let vid_share_proposal = vid_share_proposals[0].clone(); + let disperse_receives: Vec<_> = vid_share_proposals + .into_iter() + .map(HotShotEvent::VidDisperseRecv) + .collect(); let mut input = Vec::new(); let mut output = HashMap::new(); @@ -82,6 +91,10 @@ async fn test_vid_task() { vid_disperse.clone(), ViewNumber::new(2), )); + input.push(HotShotEvent::DAProposalValidated( + message, + *handle.public_key(), + )); input.push(HotShotEvent::VidDisperseSend(vid_proposal.clone(), pub_key)); input.push(HotShotEvent::VidDisperseRecv(vid_share_proposal.clone())); input.push(HotShotEvent::Shutdown); @@ -99,6 +112,9 @@ async fn test_vid_task() { HotShotEvent::VidDisperseSend(vid_proposal.clone(), pub_key), 1, ); + for disperse_receive in disperse_receives { + output.insert(disperse_receive, 1); + } let vid_state = VIDTaskState { api: handle.clone(), From 28852cc3163ce09c25306652f23ddab3f55dbf1b Mon Sep 17 00:00:00 2001 From: Lukasz Rzasik Date: Wed, 27 Mar 2024 17:57:44 +0100 Subject: [PATCH 4/4] Store only successfully signed VID shares --- crates/task-impls/src/vid.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index 41ce95256f..7b394a7bb4 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -172,21 +172,18 @@ impl, A: ConsensusApi + &self.membership, ); - let vid_disperse_tasks: Vec<_> = VidDisperseShare::from_vid_disperse(vid_disperse) + let vid_disperse_tasks = VidDisperseShare::from_vid_disperse(vid_disperse) .into_iter() - .map(|vid_share| { + .filter_map(|vid_share| { Some(broadcast_event( Arc::new(HotShotEvent::VidDisperseRecv( vid_share.to_proposal(&self.private_key)?, )), &event_stream, )) - }) - .collect(); + }); - if vid_disperse_tasks.iter().all(Option::is_some) { - join_all(vid_disperse_tasks.into_iter().flatten()).await; - } + join_all(vid_disperse_tasks).await; } HotShotEvent::Shutdown => {