diff --git a/crates/hotshot/examples/infra/modDA.rs b/crates/hotshot/examples/infra/modDA.rs index a06f023159..5e50c1d44f 100644 --- a/crates/hotshot/examples/infra/modDA.rs +++ b/crates/hotshot/examples/infra/modDA.rs @@ -21,6 +21,7 @@ use hotshot_orchestrator::{ config::{NetworkConfig, WebServerConfig}, }; use hotshot_task::task::FilterEvent; +use hotshot_types::traits::election::VIDExchange; use hotshot_types::{ block_impl::{VIDBlockPayload, VIDTransaction}, certificate::ViewSyncCertificate, @@ -85,6 +86,7 @@ pub async fn run_orchestrator_da< DANETWORK: CommunicationChannel, MEMBERSHIP> + Debug, QUORUMNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, VIEWSYNCNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, + VIDNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, NODE: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -107,6 +109,7 @@ pub async fn run_orchestrator_da< VIEWSYNCNETWORK, Message, >, + VIDExchange>, >, Storage = MemoryStorage>, ConsensusMessage = SequencingMessage, @@ -148,6 +151,7 @@ pub trait RunDA< DANETWORK: CommunicationChannel, MEMBERSHIP> + Debug, QUORUMNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, VIEWSYNCNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, + VIDNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, NODE: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -170,6 +174,7 @@ pub trait RunDA< VIEWSYNCNETWORK, Message, >, + VIDExchange>, >, Storage = MemoryStorage>, ConsensusMessage = SequencingMessage, @@ -213,6 +218,7 @@ pub trait RunDA< let da_network = self.get_da_network(); let quorum_network = self.get_quorum_network(); let view_sync_network = self.get_view_sync_network(); + let vid_network = self.get_vid_network(); // Since we do not currently pass the election config type in the NetworkConfig, this will always be the default election config let quorum_election_config = config.config.election_config.clone().unwrap_or_else(|| { @@ -236,6 +242,7 @@ pub trait RunDA< quorum_network.clone(), da_network.clone(), view_sync_network.clone(), + vid_network.clone(), ), pk.clone(), entry.clone(), @@ -367,6 +374,9 @@ pub trait RunDA< ///Returns view sync network for this run fn get_view_sync_network(&self) -> VIEWSYNCNETWORK; + ///Returns VID network for this run + fn get_vid_network(&self) -> VIDNETWORK; + /// Returns the config for this run fn get_config( &self, @@ -393,6 +403,7 @@ pub struct WebServerDARun< quorum_network: WebCommChannel, da_network: WebCommChannel, view_sync_network: WebCommChannel, + vid_network: WebCommChannel, } #[async_trait] @@ -426,6 +437,12 @@ impl< WebCommChannel, Message, >, + VIDExchange< + TYPES, + MEMBERSHIP, + WebCommChannel, + Message, + >, >, Storage = MemoryStorage>, ConsensusMessage = SequencingMessage, @@ -437,6 +454,7 @@ impl< WebCommChannel, WebCommChannel, WebCommChannel, + WebCommChannel, NODE, > for WebServerDARun where @@ -489,6 +507,17 @@ where // Each node runs the DA network so that leaders have access to transactions and DA votes let da_network: WebCommChannel = WebCommChannel::new( + WebServerNetwork::create( + &host.to_string(), + port, + wait_between_polls, + pub_key.clone(), + true, + ) + .into(), + ); + + let vid_network: WebCommChannel = WebCommChannel::new( WebServerNetwork::create(&host.to_string(), port, wait_between_polls, pub_key, true) .into(), ); @@ -498,6 +527,7 @@ where quorum_network, da_network, view_sync_network, + vid_network, } } @@ -513,6 +543,10 @@ where self.view_sync_network.clone() } + fn get_vid_network(&self) -> WebCommChannel { + self.vid_network.clone() + } + fn get_config( &self, ) -> NetworkConfig< @@ -537,6 +571,7 @@ pub struct Libp2pDARun, MEMBERSHIP quorum_network: Libp2pCommChannel, da_network: Libp2pCommChannel, view_sync_network: Libp2pCommChannel, + vid_network: Libp2pCommChannel, } #[async_trait] @@ -570,6 +605,12 @@ impl< Libp2pCommChannel, Message, >, + VIDExchange< + TYPES, + MEMBERSHIP, + Libp2pCommChannel, + Message, + >, >, Storage = MemoryStorage>, ConsensusMessage = SequencingMessage, @@ -581,6 +622,7 @@ impl< Libp2pCommChannel, Libp2pCommChannel, Libp2pCommChannel, + Libp2pCommChannel, NODE, > for Libp2pDARun where @@ -722,11 +764,15 @@ where let da_network: Libp2pCommChannel = Libp2pCommChannel::new(underlying_quorum_network.clone().into()); + let vid_network: Libp2pCommChannel = + Libp2pCommChannel::new(underlying_quorum_network.clone().into()); + Libp2pDARun { config, quorum_network, da_network, view_sync_network, + vid_network, } } @@ -742,6 +788,10 @@ where self.view_sync_network.clone() } + fn get_vid_network(&self) -> Libp2pCommChannel { + self.vid_network.clone() + } + fn get_config( &self, ) -> NetworkConfig< @@ -760,6 +810,7 @@ pub async fn main_entry_point< DANETWORK: CommunicationChannel, MEMBERSHIP> + Debug, QUORUMNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, VIEWSYNCNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, + VIDNETWORK: CommunicationChannel, MEMBERSHIP> + Debug, NODE: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -782,11 +833,12 @@ pub async fn main_entry_point< VIEWSYNCNETWORK, Message, >, + VIDExchange>, >, Storage = MemoryStorage>, ConsensusMessage = SequencingMessage, >, - RUNDA: RunDA, + RUNDA: RunDA, >( args: ValidatorArgs, ) where diff --git a/crates/hotshot/examples/libp2p/multi-validator.rs b/crates/hotshot/examples/libp2p/multi-validator.rs index 3ed46fa979..b45c706304 100644 --- a/crates/hotshot/examples/libp2p/multi-validator.rs +++ b/crates/hotshot/examples/libp2p/multi-validator.rs @@ -7,6 +7,7 @@ use hotshot::demo::DemoTypes; use hotshot_orchestrator::client::ValidatorArgs; use std::net::IpAddr; use tracing::instrument; +use types::VIDNetwork; use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisMembership, ThisRun, ViewSyncNetwork}; @@ -54,6 +55,7 @@ async fn main() { DANetwork, QuorumNetwork, ViewSyncNetwork, + VIDNetwork, NodeImpl, ThisRun, >(ValidatorArgs { diff --git a/crates/hotshot/examples/libp2p/orchestrator.rs b/crates/hotshot/examples/libp2p/orchestrator.rs index 594d004a93..79e0b01560 100644 --- a/crates/hotshot/examples/libp2p/orchestrator.rs +++ b/crates/hotshot/examples/libp2p/orchestrator.rs @@ -9,7 +9,7 @@ use types::ThisMembership; use crate::{ infra::OrchestratorArgs, infra_da::run_orchestrator_da, - types::{DANetwork, NodeImpl, QuorumNetwork, ViewSyncNetwork}, + types::{DANetwork, NodeImpl, QuorumNetwork, VIDNetwork, ViewSyncNetwork}, }; #[path = "../infra/mod.rs"] @@ -34,6 +34,7 @@ async fn main() { DANetwork, QuorumNetwork, ViewSyncNetwork, + VIDNetwork, NodeImpl, >(args) .await; diff --git a/crates/hotshot/examples/libp2p/types.rs b/crates/hotshot/examples/libp2p/types.rs index 79b1ea1419..d42acfacdd 100644 --- a/crates/hotshot/examples/libp2p/types.rs +++ b/crates/hotshot/examples/libp2p/types.rs @@ -11,7 +11,7 @@ use hotshot_types::{ data::{DAProposal, QuorumProposal, SequencingLeaf}, message::{Message, SequencingMessage}, traits::{ - election::{CommitteeExchange, QuorumExchange, ViewSyncExchange}, + election::{CommitteeExchange, QuorumExchange, VIDExchange, ViewSyncExchange}, node_implementation::{ChannelMaps, NodeImplementation, NodeType, SequencingExchanges}, }, vote::{DAVote, QuorumVote, ViewSyncVote}, @@ -26,6 +26,7 @@ pub type ThisLeaf = SequencingLeaf; pub type ThisMembership = GeneralStaticCommittee::SignatureKey>; pub type DANetwork = Libp2pCommChannel; +pub type VIDNetwork = Libp2pCommChannel; pub type QuorumNetwork = Libp2pCommChannel; pub type ViewSyncNetwork = Libp2pCommChannel; @@ -60,6 +61,7 @@ impl NodeImplementation for NodeImpl { ViewSyncNetwork, Message, >, + VIDExchange>, >; type ConsensusMessage = SequencingMessage; diff --git a/crates/hotshot/examples/libp2p/validator.rs b/crates/hotshot/examples/libp2p/validator.rs index ab44e02991..8f6084ddd5 100644 --- a/crates/hotshot/examples/libp2p/validator.rs +++ b/crates/hotshot/examples/libp2p/validator.rs @@ -2,6 +2,7 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot::demo::DemoTypes; use tracing::{info, instrument}; +use types::VIDNetwork; use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisMembership, ThisRun, ViewSyncNetwork}; @@ -34,6 +35,7 @@ async fn main() { DANetwork, QuorumNetwork, ViewSyncNetwork, + VIDNetwork, NodeImpl, ThisRun, >(args) diff --git a/crates/hotshot/examples/web-server-da/multi-validator.rs b/crates/hotshot/examples/web-server-da/multi-validator.rs index 3ed46fa979..b45c706304 100644 --- a/crates/hotshot/examples/web-server-da/multi-validator.rs +++ b/crates/hotshot/examples/web-server-da/multi-validator.rs @@ -7,6 +7,7 @@ use hotshot::demo::DemoTypes; use hotshot_orchestrator::client::ValidatorArgs; use std::net::IpAddr; use tracing::instrument; +use types::VIDNetwork; use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisMembership, ThisRun, ViewSyncNetwork}; @@ -54,6 +55,7 @@ async fn main() { DANetwork, QuorumNetwork, ViewSyncNetwork, + VIDNetwork, NodeImpl, ThisRun, >(ValidatorArgs { diff --git a/crates/hotshot/examples/web-server-da/orchestrator.rs b/crates/hotshot/examples/web-server-da/orchestrator.rs index 594d004a93..0853caaef5 100644 --- a/crates/hotshot/examples/web-server-da/orchestrator.rs +++ b/crates/hotshot/examples/web-server-da/orchestrator.rs @@ -4,7 +4,7 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot::demo::DemoTypes; use tracing::instrument; -use types::ThisMembership; +use types::{ThisMembership, VIDNetwork}; use crate::{ infra::OrchestratorArgs, @@ -34,6 +34,7 @@ async fn main() { DANetwork, QuorumNetwork, ViewSyncNetwork, + VIDNetwork, NodeImpl, >(args) .await; diff --git a/crates/hotshot/examples/web-server-da/types.rs b/crates/hotshot/examples/web-server-da/types.rs index 017784b354..3de7a1531f 100644 --- a/crates/hotshot/examples/web-server-da/types.rs +++ b/crates/hotshot/examples/web-server-da/types.rs @@ -11,7 +11,7 @@ use hotshot_types::{ data::{DAProposal, QuorumProposal, SequencingLeaf}, message::{Message, SequencingMessage}, traits::{ - election::{CommitteeExchange, QuorumExchange, ViewSyncExchange}, + election::{CommitteeExchange, QuorumExchange, VIDExchange, ViewSyncExchange}, node_implementation::{ChannelMaps, NodeImplementation, NodeType, SequencingExchanges}, }, vote::{DAVote, QuorumVote, ViewSyncVote}, @@ -26,6 +26,7 @@ pub type ThisLeaf = SequencingLeaf; pub type ThisMembership = GeneralStaticCommittee::SignatureKey>; pub type DANetwork = WebCommChannel; +pub type VIDNetwork = WebCommChannel; pub type QuorumNetwork = WebCommChannel; pub type ViewSyncNetwork = WebCommChannel; @@ -60,6 +61,7 @@ impl NodeImplementation for NodeImpl { ViewSyncNetwork, Message, >, + VIDExchange>, >; type ConsensusMessage = SequencingMessage; diff --git a/crates/hotshot/examples/web-server-da/validator.rs b/crates/hotshot/examples/web-server-da/validator.rs index ab44e02991..8f6084ddd5 100644 --- a/crates/hotshot/examples/web-server-da/validator.rs +++ b/crates/hotshot/examples/web-server-da/validator.rs @@ -2,6 +2,7 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot::demo::DemoTypes; use tracing::{info, instrument}; +use types::VIDNetwork; use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisMembership, ThisRun, ViewSyncNetwork}; @@ -34,6 +35,7 @@ async fn main() { DANetwork, QuorumNetwork, ViewSyncNetwork, + VIDNetwork, NodeImpl, ThisRun, >(args) diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index dcf6670dc7..d105b9ca68 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -54,7 +54,9 @@ use hotshot_task::{ }; use hotshot_task_impls::{events::SequencingHotShotEvent, network::NetworkTaskKind}; use hotshot_types::{ - certificate::TimeoutCertificate, traits::node_implementation::SequencingTimeoutEx, + certificate::{TimeoutCertificate, VIDCertificate}, + data::VidDisperse, + traits::node_implementation::SequencingTimeoutEx, }; use hotshot_types::{ @@ -73,7 +75,7 @@ use hotshot_types::{ network::{CommunicationChannel, NetworkError}, node_implementation::{ ChannelMaps, CommitteeEx, ExchangesType, NodeType, SendToTasks, SequencingQuorumEx, - ViewSyncEx, + VIDEx, ViewSyncEx, }, signature_key::SignatureKey, state::ConsensusTime, @@ -91,6 +93,7 @@ use std::{ sync::Arc, time::Duration, }; +use tasks::add_vid_task; use tracing::{debug, error, info, instrument, trace, warn}; // -- Rexports // External @@ -658,6 +661,14 @@ where Commitment = Commitment>, Membership = MEMBERSHIP, > + 'static, + VIDEx: ConsensusExchange< + TYPES, + Message, + Proposal = VidDisperse, + Certificate = VIDCertificate, + Commitment = Commitment, + Membership = MEMBERSHIP, + > + 'static, SequencingTimeoutEx: ConsensusExchange< TYPES, Message, @@ -671,6 +682,7 @@ where &self.inner.consensus } + #[allow(clippy::too_many_lines)] async fn run_tasks(self) -> SystemContextHandle { // ED Need to set first first number to 1, or properly trigger the change upon start let task_runner = TaskRunner::new(); @@ -682,6 +694,7 @@ where let quorum_exchange = self.inner.exchanges.quorum_exchange().clone(); let committee_exchange = self.inner.exchanges.committee_exchange().clone(); let view_sync_exchange = self.inner.exchanges.view_sync_exchange().clone(); + let vid_exchange = self.inner.exchanges.vid_exchange().clone(); let handle = SystemContextHandle { registry, @@ -709,6 +722,12 @@ where view_sync_exchange.clone(), ) .await; + let task_runner = add_network_message_task( + task_runner, + internal_event_stream.clone(), + vid_exchange.clone(), + ) + .await; let task_runner = add_network_event_task( task_runner, internal_event_stream.clone(), @@ -730,6 +749,13 @@ where NetworkTaskKind::ViewSync, ) .await; + let task_runner = add_network_event_task( + task_runner, + internal_event_stream.clone(), + vid_exchange.clone(), + NetworkTaskKind::VID, + ) + .await; let task_runner = add_consensus_task( task_runner, internal_event_stream.clone(), @@ -744,6 +770,13 @@ where handle.clone(), ) .await; + let task_runner = add_vid_task( + task_runner, + internal_event_stream.clone(), + vid_exchange.clone(), + handle.clone(), + ) + .await; let task_runner = add_transaction_task( task_runner, internal_event_stream.clone(), @@ -761,7 +794,6 @@ where task_runner.launch().await; info!("Task runner exited!"); }); - handle } } diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index b1ee195e63..7207a273a9 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -24,11 +24,12 @@ use hotshot_task_impls::{ NetworkMessageTaskTypes, NetworkTaskKind, }, transactions::{TransactionTaskState, TransactionsTaskTypes}, + vid::{VIDTaskState, VIDTaskTypes}, view_sync::{ViewSyncTaskState, ViewSyncTaskStateTypes}, }; use hotshot_types::{ block_impl::{VIDBlockPayload, VIDTransaction}, - certificate::{TimeoutCertificate, ViewSyncCertificate}, + certificate::{TimeoutCertificate, VIDCertificate, ViewSyncCertificate}, data::{ProposalType, QuorumProposal, SequencingLeaf}, event::Event, message::{Message, Messages, SequencingMessage}, @@ -37,7 +38,7 @@ use hotshot_types::{ network::{CommunicationChannel, ConsensusIntentEvent, TransmitType}, node_implementation::{ CommitteeEx, ExchangesType, NodeImplementation, NodeType, QuorumEx, - SequencingTimeoutEx, ViewSyncEx, + SequencingTimeoutEx, VIDEx, ViewSyncEx, }, state::ConsensusTime, }, @@ -306,7 +307,8 @@ where timeout_task: async_spawn(async move {}), event_stream: event_stream.clone(), output_event_stream: output_stream, - certs: HashMap::new(), + da_certs: HashMap::new(), + vid_certs: HashMap::new(), current_proposal: None, id: handle.hotshot.inner.id, qc: None, @@ -362,6 +364,75 @@ where ) } +/// add the VID task +/// # Panics +/// Is unable to panic. This section here is just to satisfy clippy +pub async fn add_vid_task< + TYPES: NodeType, + I: NodeImplementation< + TYPES, + Leaf = SequencingLeaf, + ConsensusMessage = SequencingMessage, + >, +>( + task_runner: TaskRunner, + event_stream: ChannelStream>, + vid_exchange: VIDEx, + handle: SystemContextHandle, +) -> TaskRunner +where + VIDEx: ConsensusExchange< + TYPES, + Message, + Certificate = VIDCertificate, + Commitment = Commitment, + >, +{ + // build the vid task + let c_api: HotShotSequencingConsensusApi = HotShotSequencingConsensusApi { + inner: handle.hotshot.inner.clone(), + }; + let registry = task_runner.registry.clone(); + let vid_state = VIDTaskState { + registry: registry.clone(), + api: c_api.clone(), + consensus: handle.hotshot.get_consensus(), + cur_view: TYPES::Time::new(0), + vid_exchange: vid_exchange.into(), + vote_collector: None, + event_stream: event_stream.clone(), + id: handle.hotshot.inner.id, + }; + let vid_event_handler = HandleEvent(Arc::new( + move |event, mut state: VIDTaskState>| { + async move { + let completion_status = state.handle_event(event).await; + (completion_status, state) + } + .boxed() + }, + )); + let vid_name = "VID Task"; + let vid_event_filter = FilterEvent(Arc::new( + VIDTaskState::>::filter, + )); + + let vid_task_builder = TaskBuilder::< + VIDTaskTypes>, + >::new(vid_name.to_string()) + .register_event_stream(event_stream.clone(), vid_event_filter) + .await + .register_registry(&mut registry.clone()) + .await + .register_state(vid_state) + .register_event_handler(vid_event_handler); + // impossible for unwrap to fail + // we *just* registered + let vid_task_id = vid_task_builder.get_task_id().unwrap(); + let vid_task = VIDTaskTypes::build(vid_task_builder).launch(); + task_runner.add_task(vid_task_id, vid_name.to_string(), vid_task) +} + /// add the Data Availability task /// # Panics /// Is unable to panic. This section here is just to satisfy clippy diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 748e6e9f1d..13c1390b3e 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -16,7 +16,7 @@ use hotshot_task::{ task_impls::{HSTWithEvent, TaskBuilder}, }; use hotshot_types::{ - certificate::{DACertificate, QuorumCertificate, TimeoutCertificate}, + certificate::{DACertificate, QuorumCertificate, TimeoutCertificate, VIDCertificate}, consensus::{Consensus, View}, data::{LeafType, ProposalType, QuorumProposal, SequencingLeaf}, event::{Event, EventType}, @@ -128,7 +128,10 @@ pub struct SequencingConsensusTaskState< pub output_event_stream: ChannelStream>, /// All the DA certs we've received for current and future views. - pub certs: HashMap>, + pub da_certs: HashMap>, + + /// All the VID certs we've received for current and future views. + pub vid_certs: HashMap>, /// The most recent proposal we have, will correspond to the current view if Some() /// Will be none if the view advanced through timeout/view_sync @@ -486,7 +489,7 @@ where // Only vote if you have the DA cert // ED Need to update the view number this is stored under? - if let Some(cert) = self.certs.get(&(proposal.get_view_number())) { + if let Some(cert) = self.da_certs.get(&(proposal.get_view_number())) { let view = cert.view_number; let vote_token = self.quorum_exchange.make_vote_token(view); // TODO: do some of this logic without the vote token check, only do that when voting. @@ -589,7 +592,7 @@ where // Remove old certs, we won't vote on past views for view in *self.cur_view..*new_view - 1 { let v = TYPES::Time::new(view); - self.certs.remove(&v); + self.da_certs.remove(&v); } self.cur_view = new_view; @@ -989,7 +992,7 @@ where for v in (*self.cur_view)..=(*view) { let time = TYPES::Time::new(v); - self.certs.remove(&time); + self.da_certs.remove(&time); } } SequencingHotShotEvent::QuorumVoteRecv(vote) => { @@ -1247,7 +1250,7 @@ where debug!("DAC Recved for view ! {}", *cert.view_number); let view = cert.view_number; - self.certs.insert(view, cert); + self.da_certs.insert(view, cert); if self.vote_if_able().await { self.current_proposal = None; @@ -1257,7 +1260,7 @@ where debug!("VID cert received for view ! {}", *cert.view_number); let view = cert.view_number; - self.certs.insert(view, cert); // TODO new cert type for VID https://github.com/EspressoSystems/HotShot/issues/1701 + self.vid_certs.insert(view, cert); // TODO Make sure we aren't voting for an arbitrarily old round for no reason if self.vote_if_able().await { diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index b7d5ea78ad..9492f3fc6e 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -12,6 +12,7 @@ use hotshot_task::{ task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS}, task_impls::{HSTWithEvent, TaskBuilder}, }; +use hotshot_types::vote::DAVoteAccumulator; use hotshot_types::{ certificate::DACertificate, consensus::{Consensus, View}, @@ -27,7 +28,6 @@ use hotshot_types::{ BlockPayload, }, utils::ViewInner, - vote::{DAVoteAccumulator, VoteType}, }; use snafu::Snafu; @@ -186,47 +186,6 @@ where } } } - SequencingHotShotEvent::VidVoteRecv(vote) => { - // TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690 - - debug!("VID vote recv, collection task {:?}", vote.get_view()); - // panic!("Vote handle received DA vote for view {}", *vote.current_view); - - let accumulator = state.accumulator.left().unwrap(); - - match state.committee_exchange.accumulate_vote( - accumulator, - &vote, - &vote.block_commitment, - ) { - Left(new_accumulator) => { - state.accumulator = either::Left(new_accumulator); - } - - Right(vid_cert) => { - debug!("Sending VID cert! {:?}", vid_cert.view_number); - state - .event_stream - .publish(SequencingHotShotEvent::VidCertSend( - vid_cert.clone(), - state.committee_exchange.public_key().clone(), - )) - .await; - - state.accumulator = Right(vid_cert.clone()); - state - .committee_exchange - .network() - .inject_consensus_info(ConsensusIntentEvent::CancelPollForVotes( - *vid_cert.view_number, - )) - .await; - - // Return completed at this point - return (Some(HotShotTaskCompleted::ShutDown), state); - } - } - } SequencingHotShotEvent::Shutdown => return (Some(HotShotTaskCompleted::ShutDown), state), _ => { error!("unexpected event {:?}", event); @@ -412,165 +371,6 @@ where .await; }; } - SequencingHotShotEvent::VidVoteRecv(vote) => { - // TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690 - - // warn!( - // "VID vote recv, Main Task {:?}, key: {:?}", - // vote.current_view, - // self.committee_exchange.public_key() - // ); - // Check if we are the leader and the vote is from the sender. - let view = vote.current_view; - if !self.committee_exchange.is_leader(view) { - error!( - "We are not the VID leader for view {} are we leader for next view? {}", - *view, - self.committee_exchange.is_leader(view + 1) - ); - return None; - } - - let handle_event = HandleEvent(Arc::new(move |event, state| { - async move { vote_handle(state, event).await }.boxed() - })); - let collection_view = - if let Some((collection_view, collection_id, _)) = &self.vote_collector { - // TODO: Is this correct for consecutive leaders? - if view > *collection_view { - // warn!("shutting down for view {:?}", collection_view); - self.registry.shutdown_task(*collection_id).await; - } - *collection_view - } else { - TYPES::Time::new(0) - }; - - let new_accumulator = DAVoteAccumulator { - da_vote_outcomes: HashMap::new(), - success_threshold: self.committee_exchange.success_threshold(), - sig_lists: Vec::new(), - signers: bitvec![0; self.committee_exchange.total_nodes()], - phantom: PhantomData, - }; - - let accumulator = self.committee_exchange.accumulate_vote( - new_accumulator, - &vote, - &vote.clone().block_commitment, - ); - - if view > collection_view { - let state = DAVoteCollectionTaskState { - committee_exchange: self.committee_exchange.clone(), - - accumulator, - cur_view: view, - event_stream: self.event_stream.clone(), - id: self.id, - }; - let name = "VID Vote Collection"; - let filter = FilterEvent(Arc::new(|event| { - matches!(event, SequencingHotShotEvent::VidVoteRecv(_)) - })); - let builder = - TaskBuilder::>::new(name.to_string()) - .register_event_stream(self.event_stream.clone(), filter) - .await - .register_registry(&mut self.registry.clone()) - .await - .register_state(state) - .register_event_handler(handle_event); - let id = builder.get_task_id().unwrap(); - let stream_id = builder.get_stream_id().unwrap(); - let _task = - async_spawn( - async move { DAVoteCollectionTypes::build(builder).launch().await }, - ); - self.vote_collector = Some((view, id, stream_id)); - } else if let Some((_, _, stream_id)) = self.vote_collector { - self.event_stream - .direct_message(stream_id, SequencingHotShotEvent::VidVoteRecv(vote)) - .await; - }; - } - SequencingHotShotEvent::VidDisperseRecv(disperse, sender) => { - // TODO copy-pasted from DAProposalRecv https://github.com/EspressoSystems/HotShot/issues/1690 - debug!( - "VID disperse received for view: {:?}", - disperse.data.get_view_number() - ); - - // ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view - let view = disperse.data.get_view_number(); - - // Allow a DA proposal that is one view older, in case we have voted on a quorum - // proposal and updated the view. - // `self.cur_view` should be at least 1 since there is a view change before getting - // the `DAProposalRecv` event. Otherewise, the view number subtraction below will - // cause an overflow error. - // TODO ED Revisit this - - if self.cur_view != TYPES::Time::genesis() && view < self.cur_view - 1 { - warn!("Throwing away VID disperse data that is more than one view older"); - return None; - } - - debug!("VID disperse data is fresh."); - let block_commitment = disperse.data.commitment; - - // ED Is this the right leader? - let view_leader_key = self.committee_exchange.get_leader(view); - if view_leader_key != sender { - error!("VID proposal doesn't have expected leader key for view {} \n DA proposal is: [N/A for VID]", *view); - return None; - } - - if !view_leader_key.validate(&disperse.signature, block_commitment.as_ref()) { - error!("Could not verify VID proposal sig."); - return None; - } - - let vote_token = self.committee_exchange.make_vote_token(view); - match vote_token { - Err(e) => { - error!("Failed to generate vote token for {:?} {:?}", view, e); - } - Ok(None) => { - debug!("We were not chosen for VID quorum on {:?}", view); - } - Ok(Some(vote_token)) => { - // Generate and send vote - let vote = self.committee_exchange.create_vid_message( - block_commitment, - view, - vote_token, - ); - - // ED Don't think this is necessary? - // self.cur_view = view; - - debug!("Sending vote to the VID leader {:?}", vote.current_view); - self.event_stream - .publish(SequencingHotShotEvent::VidVoteSend(vote)) - .await; - let mut consensus = self.consensus.write().await; - - // Ensure this view is in the view map for garbage collection, but do not overwrite if - // there is already a view there: the replica task may have inserted a `Leaf` view which - // contains strictly more information. - consensus.state_map.entry(view).or_insert(View { - view_inner: ViewInner::DA { - block: block_commitment, - }, - }); - - // Record the block we have promised to make available. - // TODO https://github.com/EspressoSystems/HotShot/issues/1692 - // consensus.saved_blocks.insert(proposal.data.deltas); - } - } - } SequencingHotShotEvent::ViewChange(view) => { if *self.cur_view >= *view { return None; @@ -676,8 +476,6 @@ where | SequencingHotShotEvent::Shutdown | SequencingHotShotEvent::BlockReady(_, _) | SequencingHotShotEvent::Timeout(_) - | SequencingHotShotEvent::VidDisperseRecv(_, _) - | SequencingHotShotEvent::VidVoteRecv(_) | SequencingHotShotEvent::ViewChange(_) ) } diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index 4672e791ff..9ccdee9883 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -2,13 +2,13 @@ use crate::view_sync::ViewSyncPhase; use commit::Commitment; use either::Either; use hotshot_types::{ - certificate::{DACertificate, QuorumCertificate, TimeoutCertificate}, + certificate::{DACertificate, QuorumCertificate, TimeoutCertificate, VIDCertificate}, data::{DAProposal, VidDisperse}, message::Proposal, traits::node_implementation::{ NodeImplementation, NodeType, QuorumProposalType, ViewSyncProposalType, }, - vote::{DAVote, QuorumVote, TimeoutVote, ViewSyncVote}, + vote::{DAVote, QuorumVote, TimeoutVote, VIDVote, ViewSyncVote}, }; /// All of the possible events that can be passed between Sequecning `HotShot` tasks @@ -82,17 +82,17 @@ pub enum SequencingHotShotEvent> { /// Send a VID vote to the VID leader; emitted by VID storage nodes in the DA task after seeing a valid VID dispersal /// /// Like [`DAVoteSend`] - VidVoteSend(DAVote), + VidVoteSend(VIDVote), /// A VID vote has been received by the network; handled by the DA task /// /// Like [`DAVoteRecv`] - VidVoteRecv(DAVote), + VidVoteRecv(VIDVote), /// The VID leader has collected enough votes to form a VID cert; emitted by the VID leader in the DA task; sent to the entire network via the networking task /// /// Like [`DACSend`] - VidCertSend(DACertificate, TYPES::SignatureKey), + VidCertSend(VIDCertificate, TYPES::SignatureKey), /// A VID cert has been recieved by the network; handled by the consensus task /// /// Like [`DACRecv`] - VidCertRecv(DACertificate), + VidCertRecv(VIDCertificate), } diff --git a/crates/task-impls/src/lib.rs b/crates/task-impls/src/lib.rs index 5e7492d84a..01dcd1da6f 100644 --- a/crates/task-impls/src/lib.rs +++ b/crates/task-impls/src/lib.rs @@ -31,3 +31,6 @@ pub mod harness; /// The task which implements view synchronization pub mod view_sync; + +/// The task which implements verifiable information dispersal +pub mod vid; diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 62c67757d8..106805cf3e 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -33,6 +33,8 @@ pub enum NetworkTaskKind { Committee, /// view sync ViewSync, + /// vid + VID, } /// the network message task state @@ -337,6 +339,7 @@ impl< NetworkTaskKind::Quorum => FilterEvent(Arc::new(Self::quorum_filter)), NetworkTaskKind::Committee => FilterEvent(Arc::new(Self::committee_filter)), NetworkTaskKind::ViewSync => FilterEvent(Arc::new(Self::view_sync_filter)), + NetworkTaskKind::VID => FilterEvent(Arc::new(Self::vid_filter)), } } @@ -367,6 +370,17 @@ impl< ) } + /// vid filter + fn vid_filter(event: &SequencingHotShotEvent) -> bool { + matches!( + event, + SequencingHotShotEvent::Shutdown + | SequencingHotShotEvent::VidDisperseSend(_, _) + | SequencingHotShotEvent::VidVoteSend(_) + | SequencingHotShotEvent::ViewChange(_) + ) + } + /// view sync filter fn view_sync_filter(event: &SequencingHotShotEvent) -> bool { matches!( diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs new file mode 100644 index 0000000000..bde44d7781 --- /dev/null +++ b/crates/task-impls/src/vid.rs @@ -0,0 +1,437 @@ +use crate::events::SequencingHotShotEvent; +use async_compatibility_layer::art::async_spawn; +use async_lock::RwLock; + +use bitvec::prelude::*; +use commit::Commitment; +use either::{Either, Left, Right}; +use futures::FutureExt; +use hotshot_task::{ + event_stream::{ChannelStream, EventStream}, + global_registry::GlobalRegistry, + task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS}, + task_impls::{HSTWithEvent, TaskBuilder}, +}; +use hotshot_types::vote::VoteType; +use hotshot_types::{ + certificate::VIDCertificate, traits::election::SignedCertificate, vote::VIDVoteAccumulator, +}; +use hotshot_types::{ + consensus::{Consensus, View}, + data::{ProposalType, SequencingLeaf}, + message::{Message, SequencingMessage}, + traits::{ + consensus_api::SequencingConsensusApi, + election::{ConsensusExchange, VIDExchangeType}, + node_implementation::{NodeImplementation, NodeType, VIDEx}, + signature_key::SignatureKey, + state::ConsensusTime, + }, + utils::ViewInner, +}; + +use snafu::Snafu; +use std::marker::PhantomData; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, error, instrument, warn}; + +#[derive(Snafu, Debug)] +/// Error type for consensus tasks +pub struct ConsensusTaskError {} + +/// Tracks state of a DA task +pub struct VIDTaskState< + TYPES: NodeType, + I: NodeImplementation< + TYPES, + Leaf = SequencingLeaf, + ConsensusMessage = SequencingMessage, + >, + A: SequencingConsensusApi, I> + 'static, +> where + VIDEx: ConsensusExchange< + TYPES, + Message, + Certificate = VIDCertificate, + Commitment = Commitment, + >, +{ + /// The state's api + pub api: A, + /// Global registry task for the state + pub registry: GlobalRegistry, + + /// View number this view is executing in. + pub cur_view: TYPES::Time, + + /// Reference to consensus. Leader will require a read lock on this. + pub consensus: Arc>>>, + + /// the VID exchange + pub vid_exchange: Arc>, + + /// The view and ID of the current vote collection task, if there is one. + pub vote_collector: Option<(TYPES::Time, usize, usize)>, + + /// Global events stream to publish events + pub event_stream: ChannelStream>, + + /// This state's ID + pub id: u64, +} + +/// Struct to maintain DA Vote Collection task state +pub struct VIDVoteCollectionTaskState< + TYPES: NodeType, + I: NodeImplementation>, +> where + VIDEx: ConsensusExchange< + TYPES, + Message, + Certificate = VIDCertificate, + Commitment = Commitment, + >, +{ + /// the vid exchange + pub vid_exchange: Arc>, + #[allow(clippy::type_complexity)] + /// Accumulates VID votes + pub accumulator: Either< + as SignedCertificate< + TYPES, + TYPES::Time, + TYPES::VoteTokenType, + Commitment, + >>::VoteAccumulator, + VIDCertificate, + >, + /// the current view + pub cur_view: TYPES::Time, + /// event stream for channel events + pub event_stream: ChannelStream>, + /// the id of this task state + pub id: u64, +} + +impl>> TS + for VIDVoteCollectionTaskState +where + VIDEx: ConsensusExchange< + TYPES, + Message, + Certificate = VIDCertificate, + Commitment = Commitment, + >, +{ +} + +#[instrument(skip_all, fields(id = state.id, view = *state.cur_view), name = "VID Vote Collection Task", level = "error")] +async fn vote_handle( + mut state: VIDVoteCollectionTaskState, + event: SequencingHotShotEvent, +) -> ( + Option, + VIDVoteCollectionTaskState, +) +where + TYPES: NodeType, + I: NodeImplementation>, + VIDEx: ConsensusExchange< + TYPES, + Message, + Certificate = VIDCertificate, + Commitment = Commitment, + >, +{ + match event { + SequencingHotShotEvent::VidVoteRecv(vote) => { + // TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690 + + debug!("VID vote recv, collection task {:?}", vote.get_view()); + // panic!("Vote handle received DA vote for view {}", *vote.current_view); + + let accumulator = state.accumulator.left().unwrap(); + + match state + .vid_exchange + .accumulate_vote(accumulator, &vote, &vote.block_commitment) + { + Left(new_accumulator) => { + state.accumulator = either::Left(new_accumulator); + } + + Right(vid_cert) => { + debug!("Sending VID cert! {:?}", vid_cert.view_number); + state + .event_stream + .publish(SequencingHotShotEvent::VidCertSend( + vid_cert.clone(), + state.vid_exchange.public_key().clone(), + )) + .await; + + state.accumulator = Right(vid_cert.clone()); + + // Return completed at this point + return (Some(HotShotTaskCompleted::ShutDown), state); + } + } + } + SequencingHotShotEvent::Shutdown => return (Some(HotShotTaskCompleted::ShutDown), state), + _ => { + error!("unexpected event {:?}", event); + } + } + (None, state) +} + +impl< + TYPES: NodeType, + I: NodeImplementation< + TYPES, + Leaf = SequencingLeaf, + ConsensusMessage = SequencingMessage, + >, + A: SequencingConsensusApi, I> + 'static, + > VIDTaskState +where + VIDEx: ConsensusExchange< + TYPES, + Message, + Certificate = VIDCertificate, + Commitment = Commitment, + >, +{ + /// main task event handler + #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "DA Main Task", level = "error")] + pub async fn handle_event( + &mut self, + event: SequencingHotShotEvent, + ) -> Option { + match event { + SequencingHotShotEvent::VidVoteRecv(vote) => { + // TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690 + + // warn!( + // "VID vote recv, Main Task {:?}, key: {:?}", + // vote.current_view, + // self.vid_exchange.public_key() + // ); + // Check if we are the leader and the vote is from the sender. + let view = vote.current_view; + if !self.vid_exchange.is_leader(view) { + error!( + "We are not the VID leader for view {} are we leader for next view? {}", + *view, + self.vid_exchange.is_leader(view + 1) + ); + return None; + } + + let handle_event = HandleEvent(Arc::new(move |event, state| { + async move { vote_handle(state, event).await }.boxed() + })); + let collection_view = + if let Some((collection_view, collection_id, _)) = &self.vote_collector { + // TODO: Is this correct for consecutive leaders? + if view > *collection_view { + // warn!("shutting down for view {:?}", collection_view); + self.registry.shutdown_task(*collection_id).await; + } + *collection_view + } else { + TYPES::Time::new(0) + }; + + let new_accumulator = VIDVoteAccumulator { + vid_vote_outcomes: HashMap::new(), + success_threshold: self.vid_exchange.success_threshold(), + sig_lists: Vec::new(), + signers: bitvec![0; self.vid_exchange.total_nodes()], + phantom: PhantomData, + }; + + let accumulator = self.vid_exchange.accumulate_vote( + new_accumulator, + &vote, + &vote.clone().block_commitment, + ); + + if view > collection_view { + let state = VIDVoteCollectionTaskState { + vid_exchange: self.vid_exchange.clone(), + + accumulator, + cur_view: view, + event_stream: self.event_stream.clone(), + id: self.id, + }; + let name = "VID Vote Collection"; + let filter = FilterEvent(Arc::new(|event| { + matches!(event, SequencingHotShotEvent::VidVoteRecv(_)) + })); + let builder = + TaskBuilder::>::new(name.to_string()) + .register_event_stream(self.event_stream.clone(), filter) + .await + .register_registry(&mut self.registry.clone()) + .await + .register_state(state) + .register_event_handler(handle_event); + let id = builder.get_task_id().unwrap(); + let stream_id = builder.get_stream_id().unwrap(); + let _task = async_spawn(async move { + VIDVoteCollectionTypes::build(builder).launch().await + }); + self.vote_collector = Some((view, id, stream_id)); + } else if let Some((_, _, stream_id)) = self.vote_collector { + self.event_stream + .direct_message(stream_id, SequencingHotShotEvent::VidVoteRecv(vote)) + .await; + }; + } + SequencingHotShotEvent::VidDisperseRecv(disperse, sender) => { + // TODO copy-pasted from DAProposalRecv https://github.com/EspressoSystems/HotShot/issues/1690 + debug!( + "VID disperse received for view: {:?}", + disperse.data.get_view_number() + ); + + // ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view + let view = disperse.data.get_view_number(); + + // Allow a DA proposal that is one view older, in case we have voted on a quorum + // proposal and updated the view. + // `self.cur_view` should be at least 1 since there is a view change before getting + // the `DAProposalRecv` event. Otherewise, the view number subtraction below will + // cause an overflow error. + if view < self.cur_view - 1 { + warn!("Throwing away VID disperse data that is more than one view older"); + return None; + } + + debug!("VID disperse data is fresh."); + let block_commitment = disperse.data.commitment; + + // ED Is this the right leader? + let view_leader_key = self.vid_exchange.get_leader(view); + if view_leader_key != sender { + error!("VID proposal doesn't have expected leader key for view {} \n DA proposal is: [N/A for VID]", *view); + return None; + } + + if !view_leader_key.validate(&disperse.signature, block_commitment.as_ref()) { + error!("Could not verify VID proposal sig."); + return None; + } + + let vote_token = self.vid_exchange.make_vote_token(view); + match vote_token { + Err(e) => { + error!("Failed to generate vote token for {:?} {:?}", view, e); + } + Ok(None) => { + debug!("We were not chosen for VID quorum on {:?}", view); + } + Ok(Some(vote_token)) => { + // Generate and send vote + let vote = self.vid_exchange.create_vid_message( + block_commitment, + view, + vote_token, + ); + + // ED Don't think this is necessary? + // self.cur_view = view; + + debug!("Sending vote to the VID leader {:?}", vote.current_view); + self.event_stream + .publish(SequencingHotShotEvent::VidVoteSend(vote)) + .await; + let mut consensus = self.consensus.write().await; + + // Ensure this view is in the view map for garbage collection, but do not overwrite if + // there is already a view there: the replica task may have inserted a `Leaf` view which + // contains strictly more information. + consensus.state_map.entry(view).or_insert(View { + view_inner: ViewInner::DA { + block: block_commitment, + }, + }); + + // Record the block we have promised to make available. + // TODO https://github.com/EspressoSystems/HotShot/issues/1692 + // consensus.saved_blocks.insert(proposal.data.deltas); + } + } + } + SequencingHotShotEvent::ViewChange(view) => { + if *self.cur_view >= *view { + return None; + } + + if *view - *self.cur_view > 1 { + error!("View changed by more than 1 going to view {:?}", view); + } + self.cur_view = view; + + return None; + } + + SequencingHotShotEvent::Shutdown => { + return Some(HotShotTaskCompleted::ShutDown); + } + _ => { + error!("unexpected event {:?}", event); + } + } + None + } + + /// Filter the DA event. + pub fn filter(event: &SequencingHotShotEvent) -> bool { + matches!( + event, + SequencingHotShotEvent::Shutdown + | SequencingHotShotEvent::VidDisperseRecv(_, _) + | SequencingHotShotEvent::VidVoteRecv(_) + | SequencingHotShotEvent::ViewChange(_) + ) + } +} + +/// task state implementation for VID Task +impl< + TYPES: NodeType, + I: NodeImplementation< + TYPES, + Leaf = SequencingLeaf, + ConsensusMessage = SequencingMessage, + >, + A: SequencingConsensusApi, I> + 'static, + > TS for VIDTaskState +where + VIDEx: ConsensusExchange< + TYPES, + Message, + Certificate = VIDCertificate, + Commitment = Commitment, + >, +{ +} + +/// Type alias for VID Vote Collection Types +pub type VIDVoteCollectionTypes = HSTWithEvent< + ConsensusTaskError, + SequencingHotShotEvent, + ChannelStream>, + VIDVoteCollectionTaskState, +>; + +/// Type alias for VID Task Types +pub type VIDTaskTypes = HSTWithEvent< + ConsensusTaskError, + SequencingHotShotEvent, + ChannelStream>, + VIDTaskState, +>; diff --git a/crates/testing/src/node_types.rs b/crates/testing/src/node_types.rs index 7e5308b35f..1b8e4df7e5 100644 --- a/crates/testing/src/node_types.rs +++ b/crates/testing/src/node_types.rs @@ -19,7 +19,7 @@ use hotshot_types::{ data::{QuorumProposal, SequencingLeaf, ViewNumber}, message::{Message, SequencingMessage}, traits::{ - election::{CommitteeExchange, QuorumExchange, ViewSyncExchange}, + election::{CommitteeExchange, QuorumExchange, VIDExchange, ViewSyncExchange}, network::{TestableChannelImplementation, TestableNetworkingImplementation}, node_implementation::{ChannelMaps, NodeType, SequencingExchanges, TestableExchange}, }, @@ -99,6 +99,17 @@ type StaticWebViewSyncComm = type StaticCombinedViewSyncComm = CombinedCommChannel; +pub type StaticMemoryVIDComm = + MemoryCommChannel; + +type StaticLibp2pVIDComm = + Libp2pCommChannel; + +type StaticWebVIDComm = WebCommChannel; + +type StaticCombinedVIDComm = + CombinedCommChannel; + pub type SequencingLibp2pExchange = SequencingExchanges< SequencingTestTypes, Message, @@ -123,6 +134,12 @@ pub type SequencingLibp2pExchange = SequencingExchanges< StaticLibp2pViewSyncComm, Message, >, + VIDExchange< + SequencingTestTypes, + StaticMembership, + StaticLibp2pVIDComm, + Message, + >, >; impl NodeImplementation for SequencingLibp2pImpl { @@ -172,6 +189,10 @@ impl SequencingTestTypes, Message, >>::Networking, + , + >>::Networking, ) + 'static, > { let network_generator = Arc::new(, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( + )(network.clone()); + let vid_chan = + <, + >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( )(network); - (quorum_chan, committee_chan, view_sync_chan) + (quorum_chan, committee_chan, view_sync_chan, vid_chan) }) } } @@ -238,6 +265,12 @@ pub type SequencingMemoryExchange = SequencingExchanges< StaticMemoryViewSyncComm, Message, >, + VIDExchange< + SequencingTestTypes, + StaticMembership, + StaticMemoryVIDComm, + Message, + >, >; impl @@ -268,6 +301,10 @@ impl SequencingTestTypes, Message, >>::Networking, + , + >>::Networking, ) + 'static, > { let network_generator = Arc::new(, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( - )(network_da); + )(network_da.clone()); let view_sync_chan = <, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( + )(network_da); + let vid_chan = + <, + >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( )(network); - (quorum_chan, committee_chan, view_sync_chan) + (quorum_chan, committee_chan, view_sync_chan, vid_chan) }) } } @@ -371,6 +414,12 @@ pub type SequencingWebExchanges = SequencingExchanges< StaticWebViewSyncComm, Message, >, + VIDExchange< + SequencingTestTypes, + StaticMembership, + StaticWebVIDComm, + Message, + >, >; impl @@ -401,6 +450,10 @@ impl SequencingTestTypes, Message, >>::Networking, + , + >>::Networking, ) + 'static, > { let network_generator = Arc::new(, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( - )(network_da); + )(network_da.clone()); let view_sync_chan = <, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( )(network); + let vid_chan = + <, + >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( + )(network_da); - (quorum_chan, committee_chan, view_sync_chan) + (quorum_chan, committee_chan, view_sync_chan, vid_chan) }) } } @@ -501,6 +560,12 @@ pub type SequencingCombinedExchange = SequencingExchanges< StaticCombinedViewSyncComm, Message, >, + VIDExchange< + SequencingTestTypes, + StaticMembership, + StaticCombinedVIDComm, + Message, + >, >; impl NodeImplementation for SequencingCombinedImpl { @@ -550,6 +615,10 @@ impl SequencingTestTypes, Message, >>::Networking, + , + >>::Networking, ) + 'static, > { let web_server_network_generator = Arc::new(, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( - )(network_da); + )(network_da.clone()); let view_sync_chan = <, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( )(network); - (quorum_chan, committee_chan, view_sync_chan) + + let vid_chan = + <, + >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( + )(network_da); + (quorum_chan, committee_chan, view_sync_chan, vid_chan) }) } } diff --git a/crates/testing/src/test_builder.rs b/crates/testing/src/test_builder.rs index 3515f04368..1bf5ace197 100644 --- a/crates/testing/src/test_builder.rs +++ b/crates/testing/src/test_builder.rs @@ -260,11 +260,19 @@ impl TestMetadata { let overall_safety_task_generator = overall_safety_properties.build(); let spinning_task_generator = spinning_properties.build(); TestLauncher { - resource_generator: ResourceGenerators { - channel_generator: <>::Exchanges as TestableExchange<_, _, _>>::gen_comm_channels(total_nodes, num_bootstrap_nodes, da_committee_size), - storage: Box::new(|_| I::construct_tmp_storage().unwrap()), - config, - }, + resource_generator: + ResourceGenerators { + channel_generator: + <>::Exchanges as TestableExchange< + _, + _, + _, + >>::gen_comm_channels( + total_nodes, num_bootstrap_nodes, da_committee_size + ), + storage: Box::new(|_| I::construct_tmp_storage().unwrap()), + config, + }, metadata: self, txn_task_generator, overall_safety_task_generator, diff --git a/crates/testing/src/test_launcher.rs b/crates/testing/src/test_launcher.rs index 4fb230b315..25c6b14459 100644 --- a/crates/testing/src/test_launcher.rs +++ b/crates/testing/src/test_launcher.rs @@ -44,6 +44,11 @@ pub type Networks = ( >::Leaf, Message, >>::ViewSyncExchange as ConsensusExchange>>::Networking, + <<>::Exchanges as ExchangesType< + TYPES, + >::Leaf, + Message, + >>::VIDExchange as ConsensusExchange>>::Networking, ); /// Wrapper for a function that takes a `node_id` and returns an instance of `T`. diff --git a/crates/testing/tests/da_task.rs b/crates/testing/tests/da_task.rs index e31c34e6aa..0c32e724cc 100644 --- a/crates/testing/tests/da_task.rs +++ b/crates/testing/tests/da_task.rs @@ -7,7 +7,7 @@ use hotshot_testing::{ }; use hotshot_types::{ block_impl::VIDTransaction, - data::{DAProposal, VidDisperse, VidSchemeTrait, ViewNumber}, + data::{DAProposal, VidSchemeTrait, ViewNumber}, traits::{ consensus_api::ConsensusSharedApi, election::ConsensusExchange, node_implementation::ExchangesType, state::ConsensusTime, @@ -57,15 +57,7 @@ async fn test_da_task() { data: proposal, signature, }; - let vid_proposal = Proposal { - data: VidDisperse { - view_number: message.data.view_number, - commitment: block.commit(), - shares: vid_disperse.shares, - common: vid_disperse.common, - }, - signature: message.signature.clone(), - }; + // TODO for now reuse the same block commitment and signature as DA committee // https://github.com/EspressoSystems/jellyfish/issues/369 @@ -84,10 +76,7 @@ async fn test_da_task() { message.clone(), pub_key, )); - input.push(SequencingHotShotEvent::VidDisperseRecv( - vid_proposal.clone(), - pub_key, - )); + input.push(SequencingHotShotEvent::Shutdown); output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 1); @@ -108,19 +97,8 @@ async fn test_da_task() { committee_exchange.create_da_message(block.commit(), ViewNumber::new(2), vote_token); output.insert(SequencingHotShotEvent::DAVoteSend(da_vote), 1); - let vote_token = committee_exchange - .make_vote_token(ViewNumber::new(2)) - .unwrap() - .unwrap(); - let vid_vote = - committee_exchange.create_vid_message(block.commit(), ViewNumber::new(2), vote_token); - output.insert(SequencingHotShotEvent::VidVoteSend(vid_vote), 1); - output.insert(SequencingHotShotEvent::DAProposalRecv(message, pub_key), 1); - output.insert( - SequencingHotShotEvent::VidDisperseRecv(vid_proposal, pub_key), - 1, - ); + output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(2)), 1); output.insert(SequencingHotShotEvent::Shutdown, 1); diff --git a/crates/testing/tests/memory_network.rs b/crates/testing/tests/memory_network.rs index a4ee4282e7..3ad84cf6e6 100644 --- a/crates/testing/tests/memory_network.rs +++ b/crates/testing/tests/memory_network.rs @@ -17,7 +17,9 @@ use hotshot_types::block_impl::{VIDBlockPayload, VIDTransaction}; use hotshot_types::certificate::ViewSyncCertificate; use hotshot_types::data::{DAProposal, QuorumProposal, SequencingLeaf}; use hotshot_types::message::{Message, SequencingMessage}; -use hotshot_types::traits::election::{CommitteeExchange, QuorumExchange, ViewSyncExchange}; +use hotshot_types::traits::election::{ + CommitteeExchange, QuorumExchange, VIDExchange, ViewSyncExchange, +}; use hotshot_types::traits::network::TestableNetworkingImplementation; use hotshot_types::traits::network::{ConnectedNetwork, TransmitType}; use hotshot_types::traits::node_implementation::{ChannelMaps, NodeType, SequencingExchanges}; @@ -67,6 +69,7 @@ pub type ThisMembership = GeneralStaticCommittee; pub type QuorumNetwork = MemoryCommChannel; pub type ViewSyncNetwork = MemoryCommChannel; +pub type VIDNetwork = MemoryCommChannel; pub type ThisDAProposal = DAProposal; pub type ThisDAVote = DAVote; @@ -99,6 +102,7 @@ impl NodeImplementation for TestImpl { ViewSyncNetwork, Message, >, + VIDExchange>, >; type ConsensusMessage = SequencingMessage; diff --git a/crates/testing/tests/vid_task.rs b/crates/testing/tests/vid_task.rs new file mode 100644 index 0000000000..cf19740e1d --- /dev/null +++ b/crates/testing/tests/vid_task.rs @@ -0,0 +1,111 @@ +use commit::Committable; +use hotshot::{tasks::add_vid_task, HotShotSequencingConsensusApi}; +use hotshot_task_impls::events::SequencingHotShotEvent; +use hotshot_testing::{ + node_types::{SequencingMemoryImpl, SequencingTestTypes}, + task_helpers::vid_init, +}; +use hotshot_types::traits::election::VIDExchangeType; +use hotshot_types::{ + block_impl::VIDTransaction, + data::{DAProposal, VidDisperse, VidSchemeTrait, ViewNumber}, + traits::{ + consensus_api::ConsensusSharedApi, election::ConsensusExchange, + node_implementation::ExchangesType, state::ConsensusTime, + }, +}; +use std::collections::HashMap; + +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_vid_task() { + use hotshot_task_impls::harness::run_harness; + use hotshot_testing::task_helpers::build_system_handle; + use hotshot_types::{block_impl::VIDBlockPayload, message::Proposal}; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Build the API for node 2. + let handle = build_system_handle(2).await.0; + let api: HotShotSequencingConsensusApi = + HotShotSequencingConsensusApi { + inner: handle.hotshot.inner.clone(), + }; + let vid_exchange = api.inner.exchanges.vid_exchange().clone(); + let pub_key = *api.public_key(); + + let vid = vid_init(); + let txn = vec![0u8]; + let vid_disperse = vid.disperse(&txn).unwrap(); + let block_commitment = vid_disperse.commit; + let block = VIDBlockPayload { + transactions: vec![VIDTransaction(txn)], + commitment: block_commitment, + }; + + let signature = vid_exchange.sign_vid_proposal(&block.commit()); + let proposal: DAProposal = DAProposal { + deltas: block.clone(), + view_number: ViewNumber::new(2), + }; + let message = Proposal { + data: proposal, + signature, + }; + let vid_proposal = Proposal { + data: VidDisperse { + view_number: message.data.view_number, + commitment: block.commit(), + shares: vid_disperse.shares, + common: vid_disperse.common, + }, + signature: message.signature.clone(), + }; + + // Every event input is seen on the event stream in the output. + let mut input = Vec::new(); + let mut output = HashMap::new(); + + // In view 1, node 2 is the next leader. + input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(1))); + input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(2))); + input.push(SequencingHotShotEvent::BlockReady( + block.clone(), + ViewNumber::new(2), + )); + + input.push(SequencingHotShotEvent::VidDisperseRecv( + vid_proposal.clone(), + pub_key, + )); + input.push(SequencingHotShotEvent::Shutdown); + + output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 1); + output.insert( + SequencingHotShotEvent::BlockReady(block.clone(), ViewNumber::new(2)), + 1, + ); + + let vote_token = vid_exchange + .make_vote_token(ViewNumber::new(2)) + .unwrap() + .unwrap(); + let vid_vote = vid_exchange.create_vid_message(block.commit(), ViewNumber::new(2), vote_token); + output.insert(SequencingHotShotEvent::VidVoteSend(vid_vote), 1); + + output.insert( + SequencingHotShotEvent::VidDisperseRecv(vid_proposal, pub_key), + 1, + ); + output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(2)), 1); + output.insert(SequencingHotShotEvent::Shutdown, 1); + + let build_fn = + |task_runner, event_stream| add_vid_task(task_runner, event_stream, vid_exchange, handle); + + run_harness(input, output, None, build_fn).await; +} diff --git a/crates/types/src/certificate.rs b/crates/types/src/certificate.rs index bf02d2f871..8995e948a2 100644 --- a/crates/types/src/certificate.rs +++ b/crates/types/src/certificate.rs @@ -8,7 +8,8 @@ use crate::{ }, vote::{ DAVote, DAVoteAccumulator, QuorumVote, QuorumVoteAccumulator, TimeoutVote, - TimeoutVoteAccumulator, ViewSyncData, ViewSyncVote, ViewSyncVoteAccumulator, VoteType, + TimeoutVoteAccumulator, VIDVote, VIDVoteAccumulator, ViewSyncData, ViewSyncVote, + ViewSyncVoteAccumulator, VoteType, }, }; use bincode::Options; @@ -41,6 +42,21 @@ pub struct DACertificate { pub signatures: AssembledSignature, } +/// A `VIDCertificate` is a threshold signature that some data is available. +/// It is signed by the whole quorum. +#[derive(Clone, PartialEq, custom_debug::Debug, serde::Serialize, serde::Deserialize, Hash)] +#[serde(bound(deserialize = ""))] +pub struct VIDCertificate { + /// The view number this VID certificate was generated during + pub view_number: TYPES::Time, + + /// committment to the block + pub block_commitment: Commitment, + + /// Assembled signature for certificate aggregation + pub signatures: AssembledSignature, +} + /// The type used for Quorum Certificates /// /// A Quorum Certificate is a threshold signature of the `Leaf` being proposed, as well as some @@ -161,6 +177,8 @@ pub enum AssembledSignature { No(::QCType), /// These signatures are for a 'DA' certificate DA(::QCType), + /// These signatures are for a 'VID' certificate + VID(::QCType), /// These signatures are for a `Timeout` certificate Timeout(::QCType), /// These signatures are for genesis certificate @@ -279,8 +297,48 @@ impl } } +impl + SignedCertificate> + for VIDCertificate +{ + type Vote = VIDVote; + type VoteAccumulator = VIDVoteAccumulator, Self::Vote>; + + fn create_certificate(signatures: AssembledSignature, vote: Self::Vote) -> Self { + VIDCertificate { + view_number: vote.get_view(), + signatures, + block_commitment: vote.block_commitment, + } + } + + fn view_number(&self) -> TYPES::Time { + self.view_number + } + + fn signatures(&self) -> AssembledSignature { + self.signatures.clone() + } + + fn leaf_commitment(&self) -> Commitment { + self.block_commitment + } + + fn is_genesis(&self) -> bool { + // This function is only useful for QC. Will be removed after we have separated cert traits. + false + } + + fn genesis() -> Self { + // This function is only useful for QC. Will be removed after we have separated cert traits. + unimplemented!() + } +} + impl Eq for DACertificate {} +impl Eq for VIDCertificate {} + impl Committable for ViewSyncCertificate { fn commit(&self) -> Commitment { let signatures_bytes = serialize_signature(&self.signatures()); diff --git a/crates/types/src/data.rs b/crates/types/src/data.rs index e20a7da9de..356b7a60fa 100644 --- a/crates/types/src/data.rs +++ b/crates/types/src/data.rs @@ -880,6 +880,10 @@ pub fn serialize_signature(signature: &AssembledSignature { + signatures_bytes.extend("VID".as_bytes()); + Some(signatures.clone()) + } AssembledSignature::Yes(signatures) => { signatures_bytes.extend("Yes".as_bytes()); Some(signatures.clone()) diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index a44722d8ed..e85c9c51df 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -4,7 +4,7 @@ //! `HotShot` nodes can send among themselves. use crate::{ - certificate::DACertificate, + certificate::{DACertificate, VIDCertificate}, data::{DAProposal, ProposalType, VidDisperse}, traits::{ network::{NetworkMsg, ViewMessage}, @@ -13,7 +13,7 @@ use crate::{ }, signature_key::EncodedSignature, }, - vote::{DAVote, QuorumVote, TimeoutVote, ViewSyncVote, VoteType}, + vote::{DAVote, QuorumVote, TimeoutVote, VIDVote, ViewSyncVote, VoteType}, }; use commit::Commitment; use derivative::Derivative; @@ -218,9 +218,9 @@ pub enum ProcessedCommitteeConsensusMessage { /// VID dispersal data. Like [`DAProposal`] VidDisperseMsg(Proposal>, TYPES::SignatureKey), /// Vote from VID storage node. Like [`DAVote`] - VidVote(DAVote, TYPES::SignatureKey), + VidVote(VIDVote, TYPES::SignatureKey), /// Certificate for VID. Like [`DACertificate`] - VidCertificate(DACertificate, TYPES::SignatureKey), + VidCertificate(VIDCertificate, TYPES::SignatureKey), } impl From> @@ -352,13 +352,11 @@ pub enum CommitteeConsensusMessage { /// Vote for VID disperse data /// /// Like [`DAVote`]. - /// TODO currently re-using [`DAVote`]; do we need a separate VID vote? - VidVote(DAVote), + VidVote(VIDVote), /// VID certificate data is available /// /// Like [`DACertificate`] - /// TODO currently re-using [`DACertificate`]; do we need a separate VID cert? - VidCertificate(DACertificate), + VidCertificate(VIDCertificate), } /// Messages related to the consensus protocol. @@ -430,8 +428,8 @@ impl< p.data.get_view_number() } CommitteeConsensusMessage::DAVote(vote_message) => vote_message.get_view(), - CommitteeConsensusMessage::DACertificate(cert) - | CommitteeConsensusMessage::VidCertificate(cert) => cert.view_number, + CommitteeConsensusMessage::DACertificate(cert) => cert.view_number, + CommitteeConsensusMessage::VidCertificate(cert) => cert.view_number, CommitteeConsensusMessage::VidDisperseMsg(disperse) => { disperse.data.get_view_number() } diff --git a/crates/types/src/traits/election.rs b/crates/types/src/traits/election.rs index 0b660a4221..fa191d32b5 100644 --- a/crates/types/src/traits/election.rs +++ b/crates/types/src/traits/election.rs @@ -9,11 +9,11 @@ use super::{ }; use crate::{ certificate::{ - AssembledSignature, DACertificate, QuorumCertificate, TimeoutCertificate, + AssembledSignature, DACertificate, QuorumCertificate, TimeoutCertificate, VIDCertificate, ViewSyncCertificate, }, - data::{DAProposal, ProposalType}, - vote::TimeoutVote, + data::{DAProposal, ProposalType, VidDisperse}, + vote::{TimeoutVote, VIDVote}, }; use crate::{ @@ -84,6 +84,8 @@ where No(COMMITMENT), /// Vote to time out and proceed to the next view. Timeout(COMMITMENT), + /// Vote for VID proposal + VID(COMMITMENT), /// Vote to pre-commit the view sync. ViewSyncPreCommit(COMMITMENT), /// Vote to commit the view sync. @@ -100,6 +102,7 @@ where fn commit(&self) -> Commitment { let (tag, commit) = match self { VoteData::DA(c) => ("DA BlockPayload Commit", c), + VoteData::VID(c) => ("VID Proposal Commit", c), VoteData::Yes(c) => ("Yes Vote Commit", c), VoteData::No(c) => ("No Vote Commit", c), VoteData::Timeout(c) => ("Timeout View Number Commit", c), @@ -345,6 +348,14 @@ pub trait ConsensusExchange: Send + Sync { ); ::check(&real_qc_pp, real_commit.as_ref(), &qc) } + AssembledSignature::VID(qc) => { + let real_commit = VoteData::VID(leaf_commitment).commit(); + let real_qc_pp = ::get_public_parameter( + self.membership().get_committee_qc_stake_table(), + U256::from(self.membership().success_threshold().get()), + ); + ::check(&real_qc_pp, real_commit.as_ref(), &qc) + } AssembledSignature::Yes(qc) => { let real_commit = VoteData::Yes(leaf_commitment).commit(); let real_qc_pp = ::get_public_parameter( @@ -497,22 +508,6 @@ pub trait CommitteeExchangeType: current_view: TYPES::Time, vote_token: TYPES::VoteTokenType, ) -> DAVote; - - // TODO temporary vid methods, move to quorum https://github.com/EspressoSystems/HotShot/issues/1696 - - /// Create a message with a vote on VID disperse data. - fn create_vid_message( - &self, - block_commitment: Commitment, - current_view: TYPES::Time, - vote_token: TYPES::VoteTokenType, - ) -> DAVote; - - /// Sign a vote on VID proposal. - fn sign_vid_vote( - &self, - block_commitment: Commitment, - ) -> (EncodedPublicKey, EncodedSignature); } /// Standard implementation of [`CommitteeExchangeType`] utilizing a DA committee. @@ -584,15 +579,125 @@ impl< vote_data: VoteData::DA(block_commitment), } } +} +impl< + TYPES: NodeType, + MEMBERSHIP: Membership, + NETWORK: CommunicationChannel, + M: NetworkMsg, + > ConsensusExchange for CommitteeExchange +{ + type Proposal = DAProposal; + type Vote = DAVote; + type Certificate = DACertificate; + type Membership = MEMBERSHIP; + type Networking = NETWORK; + type Commitment = Commitment; + + fn create( + entries: Vec<::StakeTableEntry>, + config: TYPES::ElectionConfigType, + network: Self::Networking, + pk: TYPES::SignatureKey, + entry: ::StakeTableEntry, + sk: ::PrivateKey, + ) -> Self { + let membership = + >::Membership::create_election(entries, config); + Self { + network, + membership, + public_key: pk, + entry, + private_key: sk, + _pd: PhantomData, + } + } + fn network(&self) -> &NETWORK { + &self.network + } + fn make_vote_token( + &self, + view_number: TYPES::Time, + ) -> std::result::Result, ElectionError> { + self.membership + .make_vote_token(view_number, &self.private_key) + } + + fn membership(&self) -> &Self::Membership { + &self.membership + } + fn public_key(&self) -> &TYPES::SignatureKey { + &self.public_key + } + fn private_key(&self) -> &<::SignatureKey as SignatureKey>::PrivateKey { + &self.private_key + } +} + +/// A [`ConsensusExchange`] where participants vote to provide availability for blobs of data. +pub trait VIDExchangeType: ConsensusExchange { + /// Create a message with a vote on VID disperse data. + fn create_vid_message( + &self, + block_commitment: Commitment, + current_view: TYPES::Time, + vote_token: TYPES::VoteTokenType, + ) -> VIDVote; + + /// Sign a vote on VID proposal. + fn sign_vid_vote( + &self, + block_commitment: Commitment, + ) -> (EncodedPublicKey, EncodedSignature); + + /// Sign a VID proposal. + fn sign_vid_proposal( + &self, + block_commitment: &Commitment, + ) -> EncodedSignature; +} + +/// Standard implementation of [`VIDExchangeType`] +#[derive(Derivative)] +#[derivative(Clone, Debug)] +pub struct VIDExchange< + TYPES: NodeType, + MEMBERSHIP: Membership, + NETWORK: CommunicationChannel, + M: NetworkMsg, +> { + /// The network being used by this exchange. + network: NETWORK, + /// The committee which votes on proposals. + membership: MEMBERSHIP, + /// This participant's public key. + public_key: TYPES::SignatureKey, + /// Entry with public key and staking value for certificate aggregation + entry: ::StakeTableEntry, + /// This participant's private key. + #[derivative(Debug = "ignore")] + private_key: ::PrivateKey, + #[doc(hidden)] + _pd: PhantomData<(TYPES, MEMBERSHIP, M)>, +} + +impl< + TYPES: NodeType, + MEMBERSHIP: Membership, + NETWORK: CommunicationChannel, + M: NetworkMsg, + > VIDExchangeType for VIDExchange +{ fn create_vid_message( &self, block_commitment: Commitment, current_view: ::Time, vote_token: ::VoteTokenType, - ) -> DAVote { + ) -> VIDVote { let signature = self.sign_vid_vote(block_commitment); - DAVote { + VIDVote { signature, block_commitment, current_view, @@ -611,6 +716,15 @@ impl< ); (self.public_key.to_bytes(), signature) } + + /// Sign a VID proposal. + fn sign_vid_proposal( + &self, + block_commitment: &Commitment, + ) -> EncodedSignature { + let signature = TYPES::SignatureKey::sign(&self.private_key, block_commitment.as_ref()); + signature + } } impl< @@ -618,11 +732,11 @@ impl< MEMBERSHIP: Membership, NETWORK: CommunicationChannel, M: NetworkMsg, - > ConsensusExchange for CommitteeExchange + > ConsensusExchange for VIDExchange { - type Proposal = DAProposal; - type Vote = DAVote; - type Certificate = DACertificate; + type Proposal = VidDisperse; + type Vote = VIDVote; + type Certificate = VIDCertificate; type Membership = MEMBERSHIP; type Networking = NETWORK; type Commitment = Commitment; diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index 6d27d70103..90d7912539 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -7,7 +7,7 @@ use super::{ block_contents::Transaction, election::{ CommitteeExchangeType, ConsensusExchange, ElectionConfig, QuorumExchangeType, - TimeoutExchange, TimeoutExchangeType, ViewSyncExchangeType, VoteToken, + TimeoutExchange, TimeoutExchangeType, VIDExchangeType, ViewSyncExchangeType, VoteToken, }, network::{CommunicationChannel, NetworkMsg, TestableNetworkingImplementation}, state::{ConsensusTime, TestableBlock, TestableState}, @@ -153,6 +153,9 @@ pub trait ExchangesType, MESSA /// Protocol for exchanging data availability proposals and votes. type CommitteeExchange: CommitteeExchangeType + Clone + Debug; + /// Protocol for exchanging VID proposals and votes + type VIDExchange: VIDExchangeType + Clone + Debug; + /// Get the committee exchange fn committee_exchange(&self) -> &Self::CommitteeExchange; @@ -179,6 +182,7 @@ pub trait ExchangesType, MESSA >::Networking, >::Networking, >::Networking, + >::Networking, ), pk: TYPES::SignatureKey, entry: ::StakeTableEntry, @@ -191,6 +195,9 @@ pub trait ExchangesType, MESSA /// Get the view sync exchange. fn view_sync_exchange(&self) -> &Self::ViewSyncExchange; + /// Get the VID exchange + fn vid_exchange(&self) -> &Self::VIDExchange; + /// BlockPayload the underlying networking interfaces until node is successfully initialized into the /// networks. async fn wait_for_networks_ready(&self); @@ -216,6 +223,7 @@ pub trait TestableExchange, ME >::Networking, >::Networking, >::Networking, + >::Networking, ) + 'static, >; } @@ -228,6 +236,7 @@ pub struct SequencingExchanges< QUORUMEXCHANGE: QuorumExchangeType, MESSAGE> + Clone + Debug, COMMITTEEEXCHANGE: CommitteeExchangeType + Clone + Debug, VIEWSYNCEXCHANGE: ViewSyncExchangeType + Clone + Debug, + VIDEXCHANGE: VIDExchangeType + Clone + Debug, > { /// Quorum exchange. quorum_exchange: QUORUMEXCHANGE, @@ -238,34 +247,52 @@ pub struct SequencingExchanges< /// Committee exchange. committee_exchange: COMMITTEEEXCHANGE, + /// VID exchange + vid_exchange: VIDEXCHANGE, + /// Timeout exchange // This type can be simplified once we rework the exchanges trait // It is here to avoid needing to instantiate it where all the other exchanges are instantiated // https://github.com/EspressoSystems/HotShot/issues/1799 #[allow(clippy::type_complexity)] - pub timeout_exchange: TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE>, + pub timeout_exchange: TimeoutExchange + as ExchangesType, MESSAGE>>::QuorumExchange + as ConsensusExchange>::Proposal, < + as ExchangesType, MESSAGE>>::QuorumExchange + as ConsensusExchange>::Membership, >::Networking, MESSAGE>, /// Phantom data _phantom: PhantomData<(TYPES, MESSAGE)>, } #[async_trait] -impl +impl ExchangesType, MESSAGE> - for SequencingExchanges + for SequencingExchanges< + TYPES, + MESSAGE, + QUORUMEXCHANGE, + COMMITTEEEXCHANGE, + VIEWSYNCEXCHANGE, + VIDEXCHANGE, + > where TYPES: NodeType, MESSAGE: NetworkMsg, QUORUMEXCHANGE: QuorumExchangeType, MESSAGE> + Clone + Debug, COMMITTEEEXCHANGE: CommitteeExchangeType + Clone + Debug, VIEWSYNCEXCHANGE: ViewSyncExchangeType + Clone + Debug, + VIDEXCHANGE: VIDExchangeType + Clone + Debug, { type CommitteeExchange = COMMITTEEEXCHANGE; type QuorumExchange = QUORUMEXCHANGE; type ViewSyncExchange = VIEWSYNCEXCHANGE; + type VIDExchange = VIDEXCHANGE; #[allow(clippy::type_complexity)] - type TimeoutExchange = TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE>; + type TimeoutExchange = TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE>; + type ElectionConfigs = (TYPES::ElectionConfigType, TYPES::ElectionConfigType); fn committee_exchange(&self) -> &COMMITTEEEXCHANGE { @@ -283,6 +310,7 @@ where >::Networking, >::Networking, >::Networking, + >::Networking, ), pk: TYPES::SignatureKey, entry: ::StakeTableEntry, @@ -297,7 +325,7 @@ where sk.clone(), ); #[allow(clippy::type_complexity)] - let timeout_exchange: TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE> = TimeoutExchange::create( + let timeout_exchange: TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE> = TimeoutExchange::create( entries.clone(), configs.0.clone(), networks.0, @@ -314,13 +342,25 @@ where entry.clone(), sk.clone(), ); - let committee_exchange = - COMMITTEEEXCHANGE::create(entries, configs.1, networks.1, pk, entry, sk); + + let committee_exchange = COMMITTEEEXCHANGE::create( + entries.clone(), + configs.1.clone(), + networks.1, + pk.clone(), + entry.clone(), + sk.clone(), + ); + + // RM TODO: figure out if this is the proper config + // issue: https://github.com/EspressoSystems/HotShot/issues/1918 + let vid_exchange = VIDEXCHANGE::create(entries, configs.1, networks.3, pk, entry, sk); Self { quorum_exchange, committee_exchange, view_sync_exchange, + vid_exchange, timeout_exchange, _phantom: PhantomData, } @@ -334,6 +374,10 @@ where &self.view_sync_exchange } + fn vid_exchange(&self) -> &Self::VIDExchange { + &self.vid_exchange + } + async fn wait_for_networks_ready(&self) { self.quorum_exchange.network().wait_for_ready().await; self.committee_exchange.network().wait_for_ready().await; @@ -375,6 +419,13 @@ pub type CommitteeEx = <>::Exchanges as Message, >>::CommitteeExchange; +/// Alias for the [`VIDExchange`] type. +pub type VIDEx = <>::Exchanges as ExchangesType< + TYPES, + >::Leaf, + Message, +>>::VIDExchange; + /// Alias for the [`ViewSyncExchange`] type. pub type ViewSyncEx = <>::Exchanges as ExchangesType< TYPES, diff --git a/crates/types/src/vote.rs b/crates/types/src/vote.rs index bb4542ebe3..e5e3e15ba5 100644 --- a/crates/types/src/vote.rs +++ b/crates/types/src/vote.rs @@ -59,6 +59,22 @@ pub struct DAVote { pub vote_data: VoteData>, } +/// A vote on VID proposal. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)] +#[serde(bound(deserialize = ""))] +pub struct VIDVote { + /// The signature share associated with this vote + pub signature: (EncodedPublicKey, EncodedSignature), + /// The block commitment being voted on. + pub block_commitment: Commitment, + /// The view this vote was cast for + pub current_view: TYPES::Time, + /// The vote token generated by this replica + pub vote_token: TYPES::VoteTokenType, + /// The vote data this vote is signed over + pub vote_data: VoteData>, +} + /// A positive or negative vote on validating or commitment proposal. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] #[serde(bound(deserialize = ""))] @@ -239,6 +255,33 @@ impl DAVote { } } +impl VoteType> for VIDVote { + fn get_view(&self) -> TYPES::Time { + self.current_view + } + fn get_key(&self) -> ::SignatureKey { + self.signature_key() + } + fn get_signature(&self) -> EncodedSignature { + self.signature.1.clone() + } + fn get_data(&self) -> VoteData> { + self.vote_data.clone() + } + fn get_vote_token(&self) -> ::VoteTokenType { + self.vote_token.clone() + } +} + +impl VIDVote { + /// Get the signature key. + /// # Panics + /// If the deserialization fails. + pub fn signature_key(&self) -> TYPES::SignatureKey { + ::from_bytes(&self.signature.0).unwrap() + } +} + impl VoteType for QuorumVote { @@ -499,6 +542,78 @@ impl< } } +impl< + TYPES: NodeType, + COMMITMENT: CommitmentBounds + Clone + Copy + PartialEq + Eq + Hash, + VOTE: VoteType, + > Accumulator for VIDVoteAccumulator +{ + fn append( + mut self, + vote: VOTE, + vote_node_id: usize, + stake_table_entries: Vec<::StakeTableEntry>, + ) -> Either> { + let VoteData::VID(vote_commitment) = vote.get_data() else { + return Either::Left(self); + }; + + let encoded_key = vote.get_key().to_bytes(); + + // Deserialize the signature so that it can be assembeld into a QC + // TODO ED Update this once we've gotten rid of EncodedSignature + let original_signature: ::PureAssembledSignatureType = + bincode_opts() + .deserialize(&vote.get_signature().0) + .expect("Deserialization on the signature shouldn't be able to fail."); + + let (vid_stake_casted, vid_vote_map) = self + .vid_vote_outcomes + .entry(vote_commitment) + .or_insert_with(|| (0, BTreeMap::new())); + + // Check for duplicate vote + // TODO ED Re-encoding signature key to bytes until we get rid of EncodedKey + // Have to do this because SignatureKey is not hashable + if vid_vote_map.contains_key(&encoded_key) { + return Either::Left(self); + } + + if self.signers.get(vote_node_id).as_deref() == Some(&true) { + error!("Node id is already in signers list"); + return Either::Left(self); + } + self.signers.set(vote_node_id, true); + self.sig_lists.push(original_signature); + + // Already checked that vote data was for a VID vote above + *vid_stake_casted += u64::from(vote.get_vote_token().vote_count()); + vid_vote_map.insert( + encoded_key, + (vote.get_signature(), vote.get_data(), vote.get_vote_token()), + ); + + if *vid_stake_casted >= u64::from(self.success_threshold) { + // Assemble QC + let real_qc_pp = ::get_public_parameter( + stake_table_entries.clone(), + U256::from(self.success_threshold.get()), + ); + + let real_qc_sig = ::assemble( + &real_qc_pp, + self.signers.as_bitslice(), + &self.sig_lists[..], + ); + + self.vid_vote_outcomes.remove(&vote_commitment); + + return Either::Right(AssembledSignature::VID(real_qc_sig)); + } + Either::Left(self) + } +} + /// Accumulates DA votes pub struct DAVoteAccumulator< TYPES: NodeType, @@ -517,6 +632,24 @@ pub struct DAVoteAccumulator< pub phantom: PhantomData, } +/// Accumulates VID votes +pub struct VIDVoteAccumulator< + TYPES: NodeType, + COMMITMENT: CommitmentBounds + Clone, + VOTE: VoteType, +> { + /// Map of all VID signatures accumlated so far + pub vid_vote_outcomes: VoteMap, + /// A quorum's worth of stake, generally 2f + 1 + pub success_threshold: NonZeroU64, + /// A list of valid signatures for certificate aggregation + pub sig_lists: Vec<::PureAssembledSignatureType>, + /// A bitvec to indicate which node is active and send out a valid signature for certificate aggregation, this automatically do uniqueness check + pub signers: BitVec, + /// Phantom data to specify the vote this accumulator is for + pub phantom: PhantomData, +} + /// Accumulate quorum votes pub struct QuorumVoteAccumulator< TYPES: NodeType,