diff --git a/Cargo.lock b/Cargo.lock index 59931ea318..a3cdfa9db8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2952,6 +2952,7 @@ name = "hotshot-examples" version = "0.5.79" dependencies = [ "anyhow", + "async-lock 3.4.0", "async-trait", "cdn-broker", "cdn-marshal", @@ -4385,6 +4386,7 @@ name = "libp2p-networking" version = "0.5.79" dependencies = [ "anyhow", + "async-lock 3.4.0", "async-trait", "bincode", "blake3", diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 09e13b8e7d..3136acdae4 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -78,6 +78,7 @@ name = "whitelist-push-cdn" path = "push-cdn/whitelist-adapter.rs" [dependencies] +async-lock = { workspace = true } async-trait = { workspace = true } cdn-broker = { workspace = true, features = ["global-permits"] } diff --git a/crates/examples/infra/mod.rs b/crates/examples/infra/mod.rs index 88a2ff900b..63dd75d203 100755 --- a/crates/examples/infra/mod.rs +++ b/crates/examples/infra/mod.rs @@ -15,6 +15,7 @@ use std::{ time::Instant, }; +use async_lock::RwLock; use async_trait::async_trait; use cdn_broker::reexports::crypto::signature::KeyPair; use chrono::Utc; @@ -350,13 +351,17 @@ pub trait RunDa< config: NetworkConfig, validator_config: ValidatorConfig, libp2p_advertise_address: Option, + membership: &Arc::Membership>>, ) -> Self; /// Initializes the genesis state and HotShot instance; does not start HotShot consensus /// # Panics if it cannot generate a genesis block, fails to initialize HotShot, or cannot /// get the anchored view /// Note: sequencing leaf does not have state, so does not return state - async fn initialize_state_and_hotshot(&self) -> SystemContextHandle { + async fn initialize_state_and_hotshot( + &self, + membership: Arc::Membership>>, + ) -> SystemContextHandle { let initializer = hotshot::HotShotInitializer::::from_genesis::(TestInstanceState::default()) .await @@ -371,20 +376,6 @@ pub trait RunDa< let network = self.network(); - let all_nodes = if cfg!(feature = "fixed-leader-election") { - let mut vec = config.config.known_nodes_with_stake.clone(); - vec.truncate(config.config.fixed_leader_for_gpuvid); - vec - } else { - config.config.known_nodes_with_stake.clone() - }; - - let da_nodes = config.config.known_da_nodes.clone(); - - // Create the quorum membership from all nodes, specifying the committee - // as the known da nodes - let memberships = ::Membership::new(all_nodes, da_nodes); - let marketplace_config = MarketplaceConfig { auction_results_provider: TestAuctionResultsProvider::::default().into(), // TODO: we need to pass a valid fallback builder url here somehow @@ -396,7 +387,7 @@ pub trait RunDa< sk, config.node_index, config.config, - memberships, + membership, Arc::from(network), initializer, ConsensusMetricsValue::default(), @@ -526,13 +517,15 @@ pub trait RunDa< } } } - let consensus_lock = context.hotshot.consensus(); - let consensus = consensus_lock.read().await; let num_eligible_leaders = context .hotshot .memberships + .read() + .await .committee_leaders(TYPES::View::genesis(), TYPES::Epoch::genesis()) .len(); + let consensus_lock = context.hotshot.consensus(); + let consensus = consensus_lock.read().await; let total_num_views = usize::try_from(consensus.locked_view().u64()).unwrap(); // `failed_num_views` could include uncommitted views let failed_num_views = total_num_views - num_successful_commits; @@ -622,6 +615,7 @@ where config: NetworkConfig, validator_config: ValidatorConfig, _libp2p_advertise_address: Option, + _membership: &Arc::Membership>>, ) -> PushCdnDaRun { // Convert to the Push-CDN-compatible type let keypair = KeyPair { @@ -708,6 +702,7 @@ where config: NetworkConfig, validator_config: ValidatorConfig, libp2p_advertise_address: Option, + membership: &Arc::Membership>>, ) -> Libp2pDaRun { // Extrapolate keys for ease of use let public_key = &validator_config.public_key; @@ -736,11 +731,6 @@ where .to_string() }; - // Create the quorum membership from the list of known nodes - let all_nodes = config.config.known_nodes_with_stake.clone(); - let da_nodes = config.config.known_da_nodes.clone(); - let quorum_membership = TYPES::Membership::new(all_nodes, da_nodes); - // Derive the bind address let bind_address = derive_libp2p_multiaddr(&bind_address).expect("failed to derive bind address"); @@ -748,7 +738,7 @@ where // Create the Libp2p network let libp2p_network = Libp2pNetwork::from_config( config.clone(), - quorum_membership, + Arc::clone(membership), GossipConfig::default(), RequestResponseConfig::default(), bind_address, @@ -820,6 +810,7 @@ where config: NetworkConfig, validator_config: ValidatorConfig, libp2p_advertise_address: Option, + membership: &Arc::Membership>>, ) -> CombinedDaRun { // Initialize our Libp2p network let libp2p_network: Libp2pDaRun = as RunDa< @@ -831,6 +822,7 @@ where config.clone(), validator_config.clone(), libp2p_advertise_address.clone(), + membership, ) .await; @@ -844,6 +836,7 @@ where config.clone(), validator_config.clone(), libp2p_advertise_address, + membership, ) .await; @@ -878,6 +871,7 @@ where } } +#[allow(clippy::too_many_lines)] /// Main entry point for validators /// # Panics /// if unable to get the local ip address @@ -974,11 +968,27 @@ pub async fn main_entry_point< .join(",") ); + let all_nodes = if cfg!(feature = "fixed-leader-election") { + let mut vec = run_config.config.known_nodes_with_stake.clone(); + vec.truncate(run_config.config.fixed_leader_for_gpuvid); + vec + } else { + run_config.config.known_nodes_with_stake.clone() + }; + let membership = Arc::new(RwLock::new(::Membership::new( + all_nodes, + run_config.config.known_da_nodes.clone(), + ))); + info!("Initializing networking"); - let run = - RUNDA::initialize_networking(run_config.clone(), validator_config, args.advertise_address) - .await; - let hotshot = run.initialize_state_and_hotshot().await; + let run = RUNDA::initialize_networking( + run_config.clone(), + validator_config, + args.advertise_address, + &membership, + ) + .await; + let hotshot = run.initialize_state_and_hotshot(membership).await; if let Some(task) = builder_task { task.start(Box::new(hotshot.event_stream())); diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index b76b3cc7c9..8055ae595c 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -108,7 +108,7 @@ pub struct SystemContext, V: Versi pub network: Arc, /// Memberships used by consensus - pub memberships: Arc, + pub memberships: Arc>, /// the metrics that the implementor is using. metrics: Arc, @@ -199,7 +199,7 @@ impl, V: Versions> SystemContext::PrivateKey, nonce: u64, config: HotShotConfig, - memberships: TYPES::Membership, + memberships: Arc>, network: Arc, initializer: HotShotInitializer, metrics: ConsensusMetricsValue, @@ -252,7 +252,7 @@ impl, V: Versions> SystemContext::PrivateKey, nonce: u64, config: HotShotConfig, - memberships: TYPES::Membership, + memberships: Arc>, network: Arc, initializer: HotShotInitializer, metrics: ConsensusMetricsValue, @@ -365,7 +365,7 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext, V: Versions> SystemContext, V: Versions> SystemContext::PrivateKey, node_id: u64, config: HotShotConfig, - memberships: TYPES::Membership, + memberships: Arc>, network: Arc, initializer: HotShotInitializer, metrics: ConsensusMetricsValue, @@ -771,7 +780,7 @@ where private_key: ::PrivateKey, nonce: u64, config: HotShotConfig, - memberships: TYPES::Membership, + memberships: Arc>, network: Arc, initializer: HotShotInitializer, metrics: ConsensusMetricsValue, @@ -787,7 +796,7 @@ where private_key.clone(), nonce, config.clone(), - memberships.clone(), + Arc::clone(&memberships), Arc::clone(&network), initializer.clone(), metrics.clone(), diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 6b30d5b535..d32a671b94 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -192,7 +192,7 @@ pub fn add_network_event_task< >( handle: &mut SystemContextHandle, network: Arc, - membership: TYPES::Membership, + membership: Arc>, ) { let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState { network, @@ -323,7 +323,7 @@ where private_key: ::PrivateKey, nonce: u64, config: HotShotConfig, - memberships: TYPES::Membership, + memberships: Arc>, network: Arc, initializer: HotShotInitializer, metrics: ConsensusMetricsValue, @@ -518,8 +518,9 @@ where /// Adds the `NetworkEventTaskState` tasks possibly modifying them as well. fn add_network_event_tasks(&self, handle: &mut SystemContextHandle) { let network = Arc::clone(&handle.network); + let memberships = Arc::clone(&handle.memberships); - self.add_network_event_task(handle, Arc::clone(&network), (*handle.memberships).clone()); + self.add_network_event_task(handle, network, memberships); } /// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour. @@ -527,7 +528,7 @@ where &self, handle: &mut SystemContextHandle, channel: Arc<>::Network>, - membership: TYPES::Membership, + membership: Arc>, ) { add_network_event_task(handle, channel, membership); } @@ -565,6 +566,6 @@ pub fn add_network_event_tasks, V: add_network_event_task( handle, Arc::clone(&handle.network), - (*handle.memberships).clone(), + Arc::clone(&handle.memberships), ); } diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index 78ba2ad8f1..9b0230c990 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -58,7 +58,7 @@ impl, V: Versions> CreateTaskState consensus: OuterConsensus::new(handle.hotshot.consensus()), view: handle.cur_view().await, delay: handle.hotshot.config.data_request_delay, - membership: (*handle.hotshot.memberships).clone(), + membership: Arc::clone(&handle.hotshot.memberships), public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), id: handle.hotshot.id, @@ -78,7 +78,7 @@ impl, V: Versions> CreateTaskState output_event_stream: handle.hotshot.external_event_stream.0.clone(), cur_view: handle.cur_view().await, cur_epoch: handle.cur_epoch().await, - quorum_membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), vote_collectors: BTreeMap::default(), public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), @@ -99,7 +99,7 @@ impl, V: Versions> CreateTaskState output_event_stream: handle.hotshot.external_event_stream.0.clone(), cur_view: handle.cur_view().await, cur_epoch: handle.cur_epoch().await, - membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), network: Arc::clone(&handle.hotshot.network), vote_collector: None.into(), public_key: handle.public_key().clone(), @@ -128,7 +128,7 @@ impl, V: Versions> CreateTaskState cur_view: handle.cur_view().await, cur_epoch: handle.cur_epoch().await, network: Arc::clone(&handle.hotshot.network), - membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), id: handle.hotshot.id, @@ -145,7 +145,7 @@ impl, V: Versions> CreateTaskState Self { consensus: OuterConsensus::new(handle.hotshot.consensus()), output_event_stream: handle.hotshot.external_event_stream.0.clone(), - membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), network: Arc::clone(&handle.hotshot.network), cur_view: handle.cur_view().await, cur_epoch: handle.cur_epoch().await, @@ -170,7 +170,7 @@ impl, V: Versions> CreateTaskState cur_view, next_view: cur_view, cur_epoch: handle.cur_epoch().await, - membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), num_timeouts_tracked: 0, @@ -197,7 +197,7 @@ impl, V: Versions> CreateTaskState consensus: OuterConsensus::new(handle.hotshot.consensus()), cur_view: handle.cur_view().await, cur_epoch: handle.cur_epoch().await, - membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), instance_state: handle.hotshot.instance_state(), @@ -242,7 +242,7 @@ impl, V: Versions> CreateTaskState latest_voted_view: handle.cur_view().await, vote_dependencies: BTreeMap::new(), network: Arc::clone(&handle.hotshot.network), - membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), drb_computations: DrbComputations::new(), output_event_stream: handle.hotshot.external_event_stream.0.clone(), id: handle.hotshot.id, @@ -267,7 +267,7 @@ impl, V: Versions> CreateTaskState proposal_dependencies: BTreeMap::new(), consensus: OuterConsensus::new(consensus), instance_state: handle.hotshot.instance_state(), - quorum_membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), storage: Arc::clone(&handle.storage), @@ -294,7 +294,7 @@ impl, V: Versions> CreateTaskState consensus: OuterConsensus::new(consensus), cur_view: handle.cur_view().await, cur_epoch: handle.cur_epoch().await, - quorum_membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), timeout: handle.hotshot.config.next_view_timeout, output_event_stream: handle.hotshot.external_event_stream.0.clone(), storage: Arc::clone(&handle.storage), @@ -318,7 +318,7 @@ impl, V: Versions> CreateTaskState private_key: handle.private_key().clone(), instance_state: handle.hotshot.instance_state(), network: Arc::clone(&handle.hotshot.network), - membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), vote_collectors: BTreeMap::default(), next_epoch_vote_collectors: BTreeMap::default(), timeout_vote_collectors: BTreeMap::default(), diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 6c891fa6f7..7021c3e892 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -475,8 +475,12 @@ impl ConnectedNetwork for CombinedNetworks self.secondary().queue_node_lookup(view_number, pk) } - async fn update_view<'a, T>(&'a self, view: u64, epoch: u64, membership: &T::Membership) - where + async fn update_view<'a, T>( + &'a self, + view: u64, + epoch: u64, + membership: Arc>, + ) where T: NodeType + 'a, { let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels); diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 0f1fbfc858..39c61e638f 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -390,7 +390,7 @@ impl Libp2pNetwork { #[allow(clippy::too_many_arguments)] pub async fn from_config( mut config: NetworkConfig, - quorum_membership: T::Membership, + membership: Arc>, gossip_config: GossipConfig, request_response_config: RequestResponseConfig, bind_address: Multiaddr, @@ -421,7 +421,7 @@ impl Libp2pNetwork { // Set the auth message and stake table config_builder - .stake_table(Some(quorum_membership)) + .stake_table(Some(membership)) .auth_message(Some(auth_message)); // The replication factor is the minimum of [the default and 2/3 the number of nodes] @@ -978,13 +978,18 @@ impl ConnectedNetwork for Libp2pNetwork { /// So the logic with libp2p is to prefetch upcoming leaders libp2p address to /// save time when we later need to direct message the leader our vote. Hence the /// use of the future view and leader to queue the lookups. - async fn update_view<'a, TYPES>(&'a self, view: u64, epoch: u64, membership: &TYPES::Membership) - where + async fn update_view<'a, TYPES>( + &'a self, + view: u64, + epoch: u64, + membership: Arc>, + ) where TYPES: NodeType + 'a, { let future_view = ::View::new(view) + LOOK_AHEAD; let epoch = ::Epoch::new(epoch); - let future_leader = match membership.leader(future_view, epoch) { + + let future_leader = match membership.read().await.leader(future_view, epoch) { Ok(l) => l, Err(e) => { return tracing::info!( diff --git a/crates/hotshot/src/types/handle.rs b/crates/hotshot/src/types/handle.rs index e3024e876b..556ff0e1c5 100644 --- a/crates/hotshot/src/types/handle.rs +++ b/crates/hotshot/src/types/handle.rs @@ -69,7 +69,7 @@ pub struct SystemContextHandle, V: pub network: Arc, /// Memberships used by consensus - pub memberships: Arc, + pub memberships: Arc>, /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, @@ -156,7 +156,7 @@ impl + 'static, V: Versions> signed_proposal_request.commit().as_ref(), )?; - let mem = (*self.memberships).clone(); + let mem = Arc::clone(&self.memberships); let receiver = self.internal_event_stream.1.activate_cloned(); let sender = self.internal_event_stream.0.clone(); let epoch_height = self.epoch_height; @@ -187,10 +187,13 @@ impl + 'static, V: Versions> if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = hs_event.as_ref() { // Make sure that the quorum_proposal is valid - if let Err(err) = quorum_proposal.validate_signature(&mem, epoch_height) { + let mem_reader = mem.read().await; + if let Err(err) = quorum_proposal.validate_signature(&mem_reader, epoch_height) + { tracing::warn!("Invalid Proposal Received after Request. Err {:?}", err); continue; } + drop(mem_reader); let proposed_leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data); let commit = proposed_leaf.commit(); if commit == leaf_commitment { @@ -326,6 +329,8 @@ impl + 'static, V: Versions> ) -> Result { self.hotshot .memberships + .read() + .await .leader(view_number, epoch_number) .context("Failed to lookup leader") } diff --git a/crates/libp2p-networking/Cargo.toml b/crates/libp2p-networking/Cargo.toml index 98f1449508..83856af400 100644 --- a/crates/libp2p-networking/Cargo.toml +++ b/crates/libp2p-networking/Cargo.toml @@ -15,6 +15,7 @@ hotshot-example-types = { path = "../example-types" } [dependencies] anyhow = { workspace = true } +async-lock = { workspace = true } async-trait = { workspace = true } bincode = { workspace = true } blake3 = { workspace = true } diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 21a2811bb8..eeb654997e 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -16,8 +16,9 @@ pub mod transport; /// Forked `cbor` codec with altered request/response sizes pub mod cbor; -use std::{collections::HashSet, fmt::Debug}; +use std::{collections::HashSet, fmt::Debug, sync::Arc}; +use async_lock::RwLock; use futures::channel::oneshot::Sender; use hotshot_types::traits::{network::NetworkError, node_implementation::NodeType}; use libp2p::{ @@ -159,7 +160,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; #[instrument(skip(identity))] pub async fn gen_transport( identity: Keypair, - stake_table: Option, + stake_table: Option>>, auth_message: Option>, ) -> Result { // Create the initial `Quic` transport diff --git a/crates/libp2p-networking/src/network/node/config.rs b/crates/libp2p-networking/src/network/node/config.rs index 1f5422e321..3958d17f6d 100644 --- a/crates/libp2p-networking/src/network/node/config.rs +++ b/crates/libp2p-networking/src/network/node/config.rs @@ -4,8 +4,9 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{collections::HashSet, num::NonZeroUsize, time::Duration}; +use std::{collections::HashSet, num::NonZeroUsize, sync::Arc, time::Duration}; +use async_lock::RwLock; use hotshot_types::traits::node_implementation::NodeType; use libp2p::{identity::Keypair, Multiaddr}; use libp2p_identity::PeerId; @@ -16,15 +17,17 @@ use super::MAX_GOSSIP_MSG_SIZE; pub const DEFAULT_REPLICATION_FACTOR: Option = NonZeroUsize::new(10); /// describe the configuration of the network -#[derive(Clone, Default, derive_builder::Builder, derive_more::Debug)] +#[derive(Default, derive_builder::Builder, derive_more::Debug)] pub struct NetworkNodeConfig { /// The keypair for the node #[builder(setter(into, strip_option), default)] #[debug(skip)] pub keypair: Option, + /// The address to bind to #[builder(default)] pub bind_address: Option, + /// Replication factor for entries in the DHT #[builder(setter(into, strip_option), default = "DEFAULT_REPLICATION_FACTOR")] pub replication_factor: Option, @@ -39,9 +42,11 @@ pub struct NetworkNodeConfig { /// list of addresses to connect to at initialization pub to_connect_addrs: HashSet<(PeerId, Multiaddr)>, + /// republication interval in DHT, must be much less than `ttl` #[builder(default)] pub republication_interval: Option, + /// expiratiry for records in DHT #[builder(default)] pub ttl: Option, @@ -49,7 +54,7 @@ pub struct NetworkNodeConfig { /// The stake table. Used for authenticating other nodes. If not supplied /// we will not check other nodes against the stake table #[builder(default)] - pub stake_table: Option, + pub stake_table: Option>>, /// The path to the file to save the DHT to #[builder(default)] @@ -65,6 +70,25 @@ pub struct NetworkNodeConfig { pub dht_timeout: Option, } +impl Clone for NetworkNodeConfig { + fn clone(&self) -> Self { + Self { + keypair: self.keypair.clone(), + bind_address: self.bind_address.clone(), + replication_factor: self.replication_factor, + gossip_config: self.gossip_config.clone(), + request_response_config: self.request_response_config.clone(), + to_connect_addrs: self.to_connect_addrs.clone(), + republication_interval: self.republication_interval, + ttl: self.ttl, + stake_table: self.stake_table.as_ref().map(Arc::clone), + dht_file_path: self.dht_file_path.clone(), + auth_message: self.auth_message.clone(), + dht_timeout: self.dht_timeout, + } + } +} + /// Configuration for Libp2p's Gossipsub #[derive(Clone, Debug)] #[allow(missing_docs)] diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index aac8a66b60..458b7f032b 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::{ensure, Context, Result as AnyhowResult}; +use async_lock::RwLock; use futures::{future::poll_fn, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use hotshot_types::traits::{ election::Membership, @@ -46,7 +47,7 @@ pub struct StakeTableAuthentication>, + pub stake_table: Arc>>>, /// A pre-signed message that we send to the remote peer for authentication pub auth_message: Arc>>, @@ -64,7 +65,7 @@ impl StakeTableAuthentica /// and authenticates connections against the stake table. pub fn new( inner: T, - stake_table: Option, + stake_table: Option>>, auth_message: Option>, ) -> Self { Self { @@ -109,7 +110,7 @@ impl StakeTableAuthentica /// - The signature is invalid pub async fn verify_peer_authentication( stream: &mut R, - stake_table: Arc>, + stake_table: Arc>>>, required_peer_id: &PeerId, ) -> AnyhowResult<()> { // If we have a stake table, check if the remote peer is in it @@ -136,7 +137,11 @@ impl StakeTableAuthentica } // Check if the public key is in the stake table - if !stake_table.has_stake(&public_key, Types::Epoch::new(0)) { + if !stake_table + .read() + .await + .has_stake(&public_key, Types::Epoch::new(0)) + { return Err(anyhow::anyhow!("Peer not in stake table")); } } @@ -151,7 +156,7 @@ impl StakeTableAuthentica fn gen_handshake> + Send + 'static>( original_future: F, outgoing: bool, - stake_table: Arc>, + stake_table: Arc>>>, auth_message: Arc>>, ) -> UpgradeFuture where @@ -624,7 +629,7 @@ mod test { // Verify the authentication message let result = MockStakeTableAuth::verify_peer_authentication( &mut stream, - Arc::new(Some(stake_table)), + Arc::new(Some(Arc::new(RwLock::new(stake_table)))), &peer_id, ) .await; @@ -644,7 +649,10 @@ mod test { let mut stream = cursor_from!(auth_message); // Create an empty stake table - let stake_table = ::Membership::new(vec![], vec![]); + let stake_table = Arc::new(RwLock::new(::Membership::new( + vec![], + vec![], + ))); // Verify the authentication message let result = MockStakeTableAuth::verify_peer_authentication( @@ -680,8 +688,10 @@ mod test { stake_table_entry: keypair.0.stake_table_entry(1), state_ver_key: StateVerKey::default(), }; - let stake_table = - ::Membership::new(vec![peer_config.clone()], vec![peer_config]); + let stake_table = Arc::new(RwLock::new(::Membership::new( + vec![peer_config.clone()], + vec![peer_config], + ))); // Check against the malicious peer ID let result = MockStakeTableAuth::verify_peer_authentication( diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs index 3a9f44c34f..437e066ea0 100644 --- a/crates/task-impls/src/consensus/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -47,6 +47,8 @@ pub(crate) async fn handle_quorum_vote_recv< .is_high_qc_for_last_block(); let we_are_leader = task_state .membership + .read() + .await .leader(vote.view_number() + 1, vote.data.epoch)? == task_state.public_key; ensure!( @@ -77,10 +79,12 @@ pub(crate) async fn handle_quorum_vote_recv< .await?; // If the vote sender belongs to the next epoch, collect it separately to form the second QC - if task_state + let has_stake = task_state .membership - .has_stake(&vote.signing_key(), vote.epoch() + 1) - { + .read() + .await + .has_stake(&vote.signing_key(), vote.epoch() + 1); + if has_stake { handle_vote( &mut task_state.next_epoch_vote_collectors, &vote.clone().into(), @@ -114,6 +118,8 @@ pub(crate) async fn handle_timeout_vote_recv< ensure!( task_state .membership + .read() + .await .leader(vote.view_number() + 1, task_state.cur_epoch)? == task_state.public_key, info!( @@ -158,6 +164,8 @@ pub async fn send_high_qc ensure!( task_state .membership + .read() + .await .has_stake(&task_state.public_key, epoch), debug!( "We were not chosen for the consensus committee for view {:?}", @@ -365,25 +377,16 @@ pub(crate) async fn handle_timeout ) .await; - task_state - .consensus + let leader = task_state + .membership .read() .await - .metrics - .number_of_timeouts - .add(1); - if task_state - .membership - .leader(view_number, task_state.cur_epoch)? - == task_state.public_key - { - task_state - .consensus - .read() - .await - .metrics - .number_of_timeouts_as_leader - .add(1); + .leader(view_number, task_state.cur_epoch); + + let consensus_reader = task_state.consensus.read().await; + consensus_reader.metrics.number_of_timeouts.add(1); + if leader? == task_state.public_key { + consensus_reader.metrics.number_of_timeouts_as_leader.add(1); } Ok(()) diff --git a/crates/task-impls/src/consensus/mod.rs b/crates/task-impls/src/consensus/mod.rs index e6897a2fe5..fe25a9ec2e 100644 --- a/crates/task-impls/src/consensus/mod.rs +++ b/crates/task-impls/src/consensus/mod.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use async_broadcast::{Receiver, Sender}; +use async_lock::RwLock; use async_trait::async_trait; use either::Either; use hotshot_task::task::TaskState; @@ -50,7 +51,7 @@ pub struct ConsensusTaskState, V: pub network: Arc, /// Membership for Quorum Certs/votes - pub membership: Arc, + pub membership: Arc>, /// A map of `QuorumVote` collector tasks. pub vote_collectors: VoteCollectorsMap, QuorumCertificate2, V>, @@ -97,6 +98,7 @@ pub struct ConsensusTaskState, V: /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, } + impl, V: Versions> ConsensusTaskState { /// Handles a consensus event received on the event stream #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = *self.cur_epoch), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")] diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index b292814e24..c4c7f0ab46 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -56,7 +56,7 @@ pub struct DaTaskState, V: Version /// Membership for the DA committee and quorum committee. /// We need the latter only for calculating the proper VID scheme /// from the number of nodes in the quorum. - pub membership: Arc, + pub membership: Arc>, /// The underlying network pub network: Arc, @@ -115,7 +115,11 @@ impl, V: Versions> DaTaskState, V: Versions> DaTaskState, V: Versions> DaTaskState, V: Versions> DaTaskState( view_number: TYPES::View, event_sender: Sender>>, event_receiver: Receiver>>, - quorum_membership: Arc, + membership: Arc>, consensus: OuterConsensus, sender_public_key: TYPES::SignatureKey, sender_private_key: ::PrivateKey, @@ -73,7 +73,7 @@ pub(crate) async fn fetch_proposal( ) .await; - let mem = Arc::clone(&quorum_membership); + let mem = Arc::clone(&membership); // Make a background task to await the arrival of the event data. let Ok(Some(proposal)) = // We want to explicitly timeout here so we aren't waiting around for the data. @@ -105,7 +105,8 @@ pub(crate) async fn fetch_proposal( hs_event.as_ref() { // Make sure that the quorum_proposal is valid - if quorum_proposal.validate_signature(&mem, epoch_height).is_ok() { + let mem_reader = mem.read().await; + if quorum_proposal.validate_signature(&mem_reader, epoch_height).is_ok() { proposal = Some(quorum_proposal.clone()); } @@ -126,10 +127,16 @@ pub(crate) async fn fetch_proposal( let justify_qc = proposal.data.justify_qc.clone(); let justify_qc_epoch = justify_qc.data.epoch(); + + let membership_reader = membership.read().await; + let membership_stake_table = membership_reader.stake_table(justify_qc_epoch); + let membership_success_threshold = membership_reader.success_threshold(justify_qc_epoch); + drop(membership_reader); + if !justify_qc .is_valid_cert( - quorum_membership.stake_table(justify_qc_epoch), - quorum_membership.success_threshold(justify_qc_epoch), + membership_stake_table, + membership_success_threshold, upgrade_lock, ) .await @@ -430,7 +437,7 @@ pub async fn decide_from_proposal( pub(crate) async fn parent_leaf_and_state( event_sender: &Sender>>, event_receiver: &Receiver>>, - quorum_membership: Arc, + membership: Arc>, public_key: TYPES::SignatureKey, private_key: ::PrivateKey, consensus: OuterConsensus, @@ -449,7 +456,7 @@ pub(crate) async fn parent_leaf_and_state( parent_view_number, event_sender.clone(), event_receiver.clone(), - quorum_membership, + membership, consensus.clone(), public_key.clone(), private_key.clone(), @@ -535,7 +542,7 @@ pub async fn validate_proposal_safety_and_liveness< UpgradeCertificate::validate( &proposal.data.upgrade_certificate, - &validation_info.quorum_membership, + &validation_info.membership, TYPES::Epoch::new(proposal_epoch), &validation_info.upgrade_lock, ) @@ -660,10 +667,9 @@ pub(crate) async fn validate_proposal_view_and_certs< ); // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment - proposal.validate_signature( - &validation_info.quorum_membership, - validation_info.epoch_height, - )?; + let membership_reader = validation_info.membership.read().await; + proposal.validate_signature(&membership_reader, validation_info.epoch_height)?; + drop(membership_reader); // Verify a timeout certificate OR a view sync certificate exists and is valid. if proposal.data.justify_qc.view_number() != view_number - 1 { @@ -681,15 +687,18 @@ pub(crate) async fn validate_proposal_view_and_certs< *view_number ); let timeout_cert_epoch = timeout_cert.data().epoch(); + + let membership_reader = validation_info.membership.read().await; + let membership_stake_table = membership_reader.stake_table(timeout_cert_epoch); + let membership_success_threshold = + membership_reader.success_threshold(timeout_cert_epoch); + drop(membership_reader); + ensure!( timeout_cert .is_valid_cert( - validation_info - .quorum_membership - .stake_table(timeout_cert_epoch), - validation_info - .quorum_membership - .success_threshold(timeout_cert_epoch), + membership_stake_table, + membership_success_threshold, &validation_info.upgrade_lock ) .await, @@ -706,16 +715,19 @@ pub(crate) async fn validate_proposal_view_and_certs< ); let view_sync_cert_epoch = view_sync_cert.data().epoch(); + + let membership_reader = validation_info.membership.read().await; + let membership_stake_table = membership_reader.stake_table(view_sync_cert_epoch); + let membership_success_threshold = + membership_reader.success_threshold(view_sync_cert_epoch); + drop(membership_reader); + // View sync certs must also be valid. ensure!( view_sync_cert .is_valid_cert( - validation_info - .quorum_membership - .stake_table(view_sync_cert_epoch), - validation_info - .quorum_membership - .success_threshold(view_sync_cert_epoch), + membership_stake_table, + membership_success_threshold, &validation_info.upgrade_lock ) .await, @@ -734,7 +746,7 @@ pub(crate) async fn validate_proposal_view_and_certs< )); UpgradeCertificate::validate( &proposal.data.upgrade_certificate, - &validation_info.quorum_membership, + &validation_info.membership, epoch, &validation_info.upgrade_lock, ) diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 139269ae4b..7d6664f39c 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -10,18 +10,13 @@ use std::{ sync::Arc, }; -use crate::{ - events::{HotShotEvent, HotShotTaskCompleted}, - helpers::broadcast_event, -}; use async_broadcast::{Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; use hotshot_task::task::TaskState; -use hotshot_types::data::{VidDisperseShare, VidDisperseShare2}; use hotshot_types::{ consensus::OuterConsensus, - data::VidDisperse, + data::{VidDisperse, VidDisperseShare, VidDisperseShare2}, event::{Event, EventType, HotShotAction}, message::{ convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, @@ -44,6 +39,11 @@ use tracing::instrument; use utils::anytrace::*; use vbs::version::StaticVersionType; +use crate::{ + events::{HotShotEvent, HotShotTaskCompleted}, + helpers::broadcast_event, +}; + /// the network message task state #[derive(Clone)] pub struct NetworkMessageTaskState { @@ -255,18 +255,25 @@ pub struct NetworkEventTaskState< > { /// comm network pub network: Arc, + /// view number pub view: TYPES::View, + /// epoch number pub epoch: TYPES::Epoch, + /// network memberships - pub membership: TYPES::Membership, + pub membership: Arc>, + /// Storage to store actionable events pub storage: Arc>, + /// Shared consensus state pub consensus: OuterConsensus, + /// Lock for a decided upgrade pub upgrade_lock: UpgradeLock, + /// map view number to transmit tasks pub transmit_tasks: BTreeMap>>, } @@ -311,7 +318,8 @@ impl< if let Some((sender, message_kind, transmit)) = self.parse_event(event, &mut maybe_action).await { - self.spawn_transmit_task(message_kind, maybe_action, transmit, sender); + self.spawn_transmit_task(message_kind, maybe_action, transmit, sender) + .await; }; } @@ -468,7 +476,12 @@ impl< HotShotEvent::QuorumVoteSend(vote) => { *maybe_action = Some(HotShotAction::Vote); let view_number = vote.view_number() + 1; - let leader = match self.membership.leader(view_number, vote.epoch()) { + let leader = match self + .membership + .read() + .await + .leader(view_number, vote.epoch()) + { Ok(l) => l, Err(e) => { tracing::warn!( @@ -573,7 +586,7 @@ impl< *maybe_action = Some(HotShotAction::DaVote); let view_number = vote.view_number(); let epoch = vote.data.epoch; - let leader = match self.membership.leader(view_number, epoch) { + let leader = match self.membership.read().await.leader(view_number, epoch) { Ok(l) => l, Err(e) => { tracing::warn!( @@ -620,7 +633,7 @@ impl< } HotShotEvent::ViewSyncPreCommitVoteSend(vote) => { let view_number = vote.view_number() + vote.date().relay; - let leader = match self.membership.leader(view_number, self.epoch) { + let leader = match self.membership.read().await.leader(view_number, self.epoch) { Ok(l) => l, Err(e) => { tracing::warn!( @@ -651,7 +664,7 @@ impl< HotShotEvent::ViewSyncCommitVoteSend(vote) => { *maybe_action = Some(HotShotAction::ViewSyncVote); let view_number = vote.view_number() + vote.date().relay; - let leader = match self.membership.leader(view_number, self.epoch) { + let leader = match self.membership.read().await.leader(view_number, self.epoch) { Ok(l) => l, Err(e) => { tracing::warn!( @@ -682,7 +695,7 @@ impl< HotShotEvent::ViewSyncFinalizeVoteSend(vote) => { *maybe_action = Some(HotShotAction::ViewSyncVote); let view_number = vote.view_number() + vote.date().relay; - let leader = match self.membership.leader(view_number, self.epoch) { + let leader = match self.membership.read().await.leader(view_number, self.epoch) { Ok(l) => l, Err(e) => { tracing::warn!( @@ -761,7 +774,7 @@ impl< HotShotEvent::TimeoutVoteSend(vote) => { *maybe_action = Some(HotShotAction::Vote); let view_number = vote.view_number() + 1; - let leader = match self.membership.leader(view_number, self.epoch) { + let leader = match self.membership.read().await.leader(view_number, self.epoch) { Ok(l) => l, Err(e) => { tracing::warn!( @@ -799,7 +812,7 @@ impl< HotShotEvent::UpgradeVoteSend(vote) => { tracing::error!("Sending upgrade vote!"); let view_number = vote.view_number(); - let leader = match self.membership.leader(view_number, self.epoch) { + let leader = match self.membership.read().await.leader(view_number, self.epoch) { Ok(l) => l, Err(e) => { tracing::warn!( @@ -827,9 +840,9 @@ impl< self.cancel_tasks(keep_view); let net = Arc::clone(&self.network); let epoch = self.epoch.u64(); - let mem = self.membership.clone(); + let mem = Arc::clone(&self.membership); spawn(async move { - net.update_view::(*keep_view, epoch, &mem).await; + net.update_view::(*keep_view, epoch, mem).await; }); None } @@ -869,7 +882,7 @@ impl< } /// Creates a network message and spawns a task that transmits it on the wire. - fn spawn_transmit_task( + async fn spawn_transmit_task( &mut self, message_kind: MessageKind, maybe_action: Option, @@ -891,6 +904,8 @@ impl< let committee_topic = Topic::Global; let da_committee = self .membership + .read() + .await .da_committee_members(view_number, self.epoch); let network = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); @@ -1012,13 +1027,17 @@ pub mod test { self.parse_event(event, &mut maybe_action).await { // Modify the values acquired by parsing the event. + let membership_reader = self.membership.read().await; (self.modifier)( &mut sender, &mut message_kind, &mut transmit, - &self.membership, + &membership_reader, ); - self.spawn_transmit_task(message_kind, maybe_action, transmit, sender); + drop(membership_reader); + + self.spawn_transmit_task(message_kind, maybe_action, transmit, sender) + .await; } } } diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 7dbedb9107..b9d696f01b 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -87,7 +87,7 @@ pub struct ProposalDependencyHandle { pub instance_state: Arc, /// Membership for Quorum Certs/votes - pub quorum_membership: Arc, + pub membership: Arc>, /// Our public key pub public_key: TYPES::SignatureKey, @@ -100,6 +100,7 @@ pub struct ProposalDependencyHandle { /// View timeout from config. pub timeout: u64, + /// The most recent upgrade certificate this node formed. /// Note: this is ONLY for certificates that have been formed internally, /// so that we can propose with them. @@ -132,10 +133,16 @@ impl ProposalDependencyHandle { ) -> Option> { while let Ok(event) = rx.recv_direct().await { if let HotShotEvent::HighQcRecv(qc, _sender) = event.as_ref() { + let membership_reader = self.membership.read().await; + let membership_stake_table = membership_reader.stake_table(qc.data.epoch); + let membership_success_threshold = + membership_reader.success_threshold(qc.data.epoch); + drop(membership_reader); + if qc .is_valid_cert( - self.quorum_membership.stake_table(qc.data.epoch), - self.quorum_membership.success_threshold(qc.data.epoch), + membership_stake_table, + membership_success_threshold, &self.upgrade_lock, ) .await @@ -267,7 +274,7 @@ impl ProposalDependencyHandle { let (parent_leaf, state) = parent_leaf_and_state( &self.sender, &self.receiver, - Arc::clone(&self.quorum_membership), + Arc::clone(&self.membership), self.public_key.clone(), self.private_key.clone(), OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), @@ -370,7 +377,13 @@ impl ProposalDependencyHandle { )); // Make sure we are the leader for the view and epoch. // We might have ended up here because we were in the epoch transition. - if self.quorum_membership.leader(self.view_number, epoch)? != self.public_key { + if self + .membership + .read() + .await + .leader(self.view_number, epoch)? + != self.public_key + { tracing::debug!( "We are not the leader in the epoch for which we are about to propose. Do not send the quorum proposal." ); diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index 77bfef856c..9eb75188a3 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -52,7 +52,7 @@ pub struct QuorumProposalTaskState pub instance_state: Arc, /// Membership for Quorum Certs/votes - pub quorum_membership: Arc, + pub membership: Arc>, /// Our public key pub public_key: TYPES::SignatureKey, @@ -271,7 +271,7 @@ impl, V: Versions> /// without losing the data that it received, as the dependency task would otherwise have no /// ability to receive the event and, thus, would never propose. #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")] - fn create_dependency_task_if_new( + async fn create_dependency_task_if_new( &mut self, view_number: TYPES::View, epoch_number: TYPES::Epoch, @@ -280,17 +280,18 @@ impl, V: Versions> event: Arc>, epoch_transition_indicator: EpochTransitionIndicator, ) -> Result<()> { + let membership_reader = self.membership.read().await; let leader_in_current_epoch = - self.quorum_membership.leader(view_number, epoch_number)? == self.public_key; + membership_reader.leader(view_number, epoch_number)? == self.public_key; // If we are in the epoch transition and we are the leader in the next epoch, // we might want to start collecting dependencies for our next epoch proposal. let leader_in_next_epoch = matches!( epoch_transition_indicator, EpochTransitionIndicator::InTransition - ) && self - .quorum_membership - .leader(view_number, epoch_number + 1)? + ) && membership_reader.leader(view_number, epoch_number + 1)? == self.public_key; + drop(membership_reader); + // Don't even bother making the task if we are not entitled to propose anyway. ensure!( leader_in_current_epoch || leader_in_next_epoch, @@ -322,7 +323,7 @@ impl, V: Versions> view_number, sender: event_sender, receiver: event_receiver, - quorum_membership: Arc::clone(&self.quorum_membership), + membership: Arc::clone(&self.membership), public_key: self.public_key.clone(), private_key: self.private_key.clone(), instance_state: Arc::clone(&self.instance_state), @@ -405,7 +406,8 @@ impl, V: Versions> event_sender, Arc::clone(&event), epoch_transition_indicator, - )?; + ) + .await?; } either::Left(qc) => { // Only update if the qc is from a newer view @@ -439,7 +441,8 @@ impl, V: Versions> event_sender, Arc::clone(&event), epoch_transition_indicator, - )?; + ) + .await?; } }, HotShotEvent::SendPayloadCommitmentAndMetadata( @@ -459,16 +462,23 @@ impl, V: Versions> event_sender, Arc::clone(&event), EpochTransitionIndicator::NotInTransition, - )?; + ) + .await?; } HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => { let epoch_number = certificate.data.epoch; + let membership_reader = self.membership.read().await; + let membership_stake_table = membership_reader.stake_table(epoch_number); + let membership_success_threshold = + membership_reader.success_threshold(epoch_number); + drop(membership_reader); + ensure!( certificate .is_valid_cert( - self.quorum_membership.stake_table(epoch_number), - self.quorum_membership.success_threshold(epoch_number), + membership_stake_table, + membership_success_threshold, &self.upgrade_lock ) .await, @@ -487,7 +497,8 @@ impl, V: Versions> event_sender, event, EpochTransitionIndicator::NotInTransition, - )?; + ) + .await?; } HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => { let view_number = proposal.data.view_number(); @@ -503,7 +514,8 @@ impl, V: Versions> event_sender, Arc::clone(&event), epoch_transition_indicator, - )?; + ) + .await?; } HotShotEvent::QuorumProposalSend(proposal, _) => { let view = proposal.data.view_number(); @@ -522,7 +534,8 @@ impl, V: Versions> event_sender, Arc::clone(&event), EpochTransitionIndicator::NotInTransition, - )?; + ) + .await?; } HotShotEvent::ViewChange(view, epoch) => { if epoch > &self.cur_epoch { @@ -538,10 +551,17 @@ impl, V: Versions> HotShotEvent::HighQcSend(qc, ..) => { ensure!(qc.view_number() > self.highest_qc.view_number()); let cert_epoch_number = qc.data.epoch; + + let membership_reader = self.membership.read().await; + let membership_stake_table = membership_reader.stake_table(cert_epoch_number); + let membership_success_threshold = + membership_reader.success_threshold(cert_epoch_number); + drop(membership_reader); + ensure!( qc.is_valid_cert( - self.quorum_membership.stake_table(cert_epoch_number), - self.quorum_membership.success_threshold(cert_epoch_number), + membership_stake_table, + membership_success_threshold, &self.upgrade_lock ) .await, diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs index d6a768c2cd..b08064205c 100644 --- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs +++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use async_broadcast::{broadcast, Receiver, Sender}; -use async_lock::RwLockUpgradableReadGuard; +use async_lock::{RwLock, RwLockUpgradableReadGuard}; use committable::Committable; use hotshot_types::{ consensus::OuterConsensus, @@ -100,7 +100,7 @@ fn spawn_fetch_proposal( view: TYPES::View, event_sender: Sender>>, event_receiver: Receiver>>, - membership: Arc, + membership: Arc>, consensus: OuterConsensus, sender_public_key: TYPES::SignatureKey, sender_private_key: ::PrivateKey, @@ -162,14 +162,15 @@ pub(crate) async fn handle_quorum_proposal_recv< validation_info.epoch_height, )); + let membership_reader = validation_info.membership.read().await; + let membership_stake_table = membership_reader.stake_table(justify_qc.data.epoch); + let membership_success_threshold = membership_reader.success_threshold(justify_qc.data.epoch); + drop(membership_reader); + if !justify_qc .is_valid_cert( - validation_info - .quorum_membership - .stake_table(justify_qc.data.epoch), - validation_info - .quorum_membership - .success_threshold(justify_qc.data.epoch), + membership_stake_table, + membership_success_threshold, &validation_info.upgrade_lock, ) .await @@ -187,15 +188,18 @@ pub(crate) async fn handle_quorum_proposal_recv< { bail!("Next epoch justify qc exists but it's not equal with justify qc."); } + + let membership_reader = validation_info.membership.read().await; + let membership_next_stake_table = membership_reader.stake_table(justify_qc.data.epoch + 1); + let membership_next_success_threshold = + membership_reader.success_threshold(justify_qc.data.epoch + 1); + drop(membership_reader); + // Validate the next epoch justify qc as well if !next_epoch_justify_qc .is_valid_cert( - validation_info - .quorum_membership - .stake_table(justify_qc.data.epoch + 1), - validation_info - .quorum_membership - .success_threshold(justify_qc.data.epoch + 1), + membership_next_stake_table, + membership_next_success_threshold, &validation_info.upgrade_lock, ) .await @@ -229,7 +233,7 @@ pub(crate) async fn handle_quorum_proposal_recv< justify_qc.view_number(), event_sender.clone(), event_receiver.clone(), - Arc::clone(&validation_info.quorum_membership), + Arc::clone(&validation_info.membership), OuterConsensus::new(Arc::clone(&validation_info.consensus.inner_consensus)), // Note that we explicitly use the node key here instead of the provided key in the signature. // This is because the key that we receive is for the prior leader, so the payload would be routed diff --git a/crates/task-impls/src/quorum_proposal_recv/mod.rs b/crates/task-impls/src/quorum_proposal_recv/mod.rs index a30e965d69..f6ed129a6e 100644 --- a/crates/task-impls/src/quorum_proposal_recv/mod.rs +++ b/crates/task-impls/src/quorum_proposal_recv/mod.rs @@ -58,7 +58,7 @@ pub struct QuorumProposalRecvTaskState, + pub membership: Arc>, /// View timeout from config. pub timeout: u64, @@ -88,20 +88,28 @@ pub struct QuorumProposalRecvTaskState, V: Versions> { /// The node's id pub id: u64, + /// Our public key pub(crate) public_key: TYPES::SignatureKey, + /// Our Private Key pub(crate) private_key: ::PrivateKey, + /// Reference to consensus. The replica will require a write lock on this. pub(crate) consensus: OuterConsensus, + /// Membership for Quorum Certs/votes - pub quorum_membership: Arc, + pub membership: Arc>, + /// Output events to application pub output_event_stream: async_broadcast::Sender>, + /// This node's storage ref pub(crate) storage: Arc>, + /// Lock for a decided upgrade pub(crate) upgrade_lock: UpgradeLock, + /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, } @@ -142,7 +150,7 @@ impl, V: Versions> public_key: self.public_key.clone(), private_key: self.private_key.clone(), consensus: self.consensus.clone(), - quorum_membership: Arc::clone(&self.quorum_membership), + membership: Arc::clone(&self.membership), output_event_stream: self.output_event_stream.clone(), storage: Arc::clone(&self.storage), upgrade_lock: self.upgrade_lock.clone(), diff --git a/crates/task-impls/src/quorum_vote/handlers.rs b/crates/task-impls/src/quorum_vote/handlers.rs index 577c5325e6..0c9cdbb666 100644 --- a/crates/task-impls/src/quorum_vote/handlers.rs +++ b/crates/task-impls/src/quorum_vote/handlers.rs @@ -59,6 +59,8 @@ async fn handle_quorum_proposal_validated_drb_calculation_start< // Start the new task if we're in the committee for this epoch if task_state .membership + .read() + .await .has_stake(&task_state.public_key, current_epoch_number) { task_state @@ -80,7 +82,7 @@ async fn handle_quorum_proposal_validated_drb_calculation_start< /// /// We don't need to handle the special cases explicitly here, because the first proposal /// with which we'll start the DRB computation is for epoch 3. -fn handle_quorum_proposal_validated_drb_calculation_seed< +async fn handle_quorum_proposal_validated_drb_calculation_seed< TYPES: NodeType, I: NodeImplementation, V: Versions, @@ -112,6 +114,8 @@ fn handle_quorum_proposal_validated_drb_calculation_seed< // Skip if we are not in the committee of the next epoch. if task_state .membership + .read() + .await .has_stake(&task_state.public_key, current_epoch_number + 1) { let new_epoch_number = current_epoch_number + 2; @@ -252,7 +256,8 @@ pub(crate) async fn handle_quorum_proposal_validated< proposal, task_state, &leaf_views, - )?; + ) + .await?; } } @@ -270,7 +275,7 @@ pub(crate) async fn update_shared_state< consensus: OuterConsensus, sender: Sender>>, receiver: InactiveReceiver>>, - quorum_membership: Arc, + membership: Arc>, public_key: TYPES::SignatureKey, private_key: ::PrivateKey, upgrade_lock: UpgradeLock, @@ -309,7 +314,7 @@ pub(crate) async fn update_shared_state< justify_qc.view_number(), sender.clone(), receiver.activate_cloned(), - Arc::clone(&quorum_membership), + Arc::clone(&membership), OuterConsensus::new(Arc::clone(&consensus.inner_consensus)), public_key.clone(), private_key.clone(), @@ -396,7 +401,7 @@ pub(crate) async fn update_shared_state< #[allow(clippy::too_many_arguments)] pub(crate) async fn submit_vote, V: Versions>( sender: Sender>>, - quorum_membership: Arc, + membership: Arc>, public_key: TYPES::SignatureKey, private_key: ::PrivateKey, upgrade_lock: UpgradeLock, @@ -411,11 +416,13 @@ pub(crate) async fn submit_vote, V TYPES::EPOCH_HEIGHT, )); - let committee_member_in_current_epoch = quorum_membership.has_stake(&public_key, epoch_number); + let membership_reader = membership.read().await; + let committee_member_in_current_epoch = membership_reader.has_stake(&public_key, epoch_number); // If the proposed leaf is for the last block in the epoch and the node is part of the quorum committee // in the next epoch, the node should vote to achieve the double quorum. let committee_member_in_next_epoch = is_last_block_in_epoch(leaf.height(), TYPES::EPOCH_HEIGHT) - && quorum_membership.has_stake(&public_key, epoch_number + 1); + && membership_reader.has_stake(&public_key, epoch_number + 1); + drop(membership_reader); ensure!( committee_member_in_current_epoch || committee_member_in_next_epoch, diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index 74f2801c2d..2b30bab7a9 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -65,28 +65,40 @@ enum VoteDependency { pub struct VoteDependencyHandle, V: Versions> { /// Public key. pub public_key: TYPES::SignatureKey, + /// Private Key. pub private_key: ::PrivateKey, + /// Reference to consensus. The replica will require a write lock on this. pub consensus: OuterConsensus, + /// Immutable instance state pub instance_state: Arc, + /// Membership for Quorum certs/votes. - pub quorum_membership: Arc, + pub membership: Arc>, + /// Reference to the storage. pub storage: Arc>, + /// View number to vote on. pub view_number: TYPES::View, + /// Event sender. pub sender: Sender>>, + /// Event receiver. pub receiver: InactiveReceiver>>, + /// Lock for a decided upgrade pub upgrade_lock: UpgradeLock, + /// The consensus metrics pub consensus_metrics: Arc, + /// The node's id pub id: u64, + /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, } @@ -203,7 +215,7 @@ impl + 'static, V: Versions> Handl OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), self.sender.clone(), self.receiver.clone(), - Arc::clone(&self.quorum_membership), + Arc::clone(&self.membership), self.public_key.clone(), self.private_key.clone(), self.upgrade_lock.clone(), @@ -239,7 +251,7 @@ impl + 'static, V: Versions> Handl if let Err(e) = submit_vote::( self.sender.clone(), - Arc::clone(&self.quorum_membership), + Arc::clone(&self.membership), self.public_key.clone(), self.private_key.clone(), self.upgrade_lock.clone(), @@ -282,7 +294,7 @@ pub struct QuorumVoteTaskState, V: pub network: Arc, /// Membership for Quorum certs/votes and DA committee certs/votes. - pub membership: Arc, + pub membership: Arc>, /// Table for the in-progress DRB computation tasks. //pub drb_computations: BTreeMap>, @@ -401,7 +413,7 @@ impl, V: Versions> QuorumVoteTaskS private_key: self.private_key.clone(), consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), instance_state: Arc::clone(&self.instance_state), - quorum_membership: Arc::clone(&self.membership), + membership: Arc::clone(&self.membership), storage: Arc::clone(&self.storage), view_number, sender: event_sender.clone(), @@ -512,11 +524,17 @@ impl, V: Versions> QuorumVoteTaskS let cert_epoch = cert.data.epoch; + let membership_reader = self.membership.read().await; + let membership_da_stake_table = membership_reader.da_stake_table(cert_epoch); + let membership_da_success_threshold = + membership_reader.da_success_threshold(cert_epoch); + drop(membership_reader); + // Validate the DAC. ensure!( cert.is_valid_cert( - self.membership.da_stake_table(cert_epoch), - self.membership.da_success_threshold(cert_epoch), + membership_da_stake_table, + membership_da_success_threshold, &self.upgrade_lock ) .await, @@ -560,18 +578,22 @@ impl, V: Versions> QuorumVoteTaskS let vid_epoch = disperse.data.epoch; let target_epoch = disperse.data.target_epoch; + let membership_reader = self.membership.read().await; // ensure that the VID share was sent by a DA member OR the view leader ensure!( - self.membership + membership_reader .da_committee_members(view, vid_epoch) .contains(sender) - || *sender == self.membership.leader(view, vid_epoch)?, + || *sender == membership_reader.leader(view, vid_epoch)?, "VID share was not sent by a DA member or the view leader." ); + let membership_total_nodes = membership_reader.total_nodes(target_epoch); + drop(membership_reader); + // NOTE: `verify_share` returns a nested `Result`, so we must check both the inner // and outer results - match vid_scheme(self.membership.total_nodes(target_epoch)).verify_share( + match vid_scheme(membership_total_nodes).verify_share( &disperse.data.share, &disperse.data.common, payload_commitment, diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index 62ba74edb7..c3bb9a2fb8 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -14,6 +14,7 @@ use std::{ }; use async_broadcast::{Receiver, Sender}; +use async_lock::RwLock; use async_trait::async_trait; use hotshot_task::{ dependency::{Dependency, EventDependency}, @@ -54,23 +55,32 @@ pub struct NetworkRequestState> { /// Network to send requests over /// The underlying network pub network: Arc, + /// Consensus shared state so we can check if we've gotten the information /// before sending a request pub consensus: OuterConsensus, + /// Last seen view, we won't request for proposals before older than this view pub view: TYPES::View, + /// Delay before requesting peers pub delay: Duration, - /// Membership (Here containing only DA) - pub membership: TYPES::Membership, + + /// Membership (Used here only for DA) + pub membership: Arc>, + /// This nodes public key pub public_key: TYPES::SignatureKey, + /// This nodes private/signing key, used to sign requests. pub private_key: ::PrivateKey, + /// The node's id pub id: u64, + /// A flag indicating that `HotShotEvent::Shutdown` has been received pub shutdown_flag: Arc, + /// A flag indicating that `HotShotEvent::Shutdown` has been received pub spawned_tasks: BTreeMap>>, } @@ -113,7 +123,8 @@ impl> TaskState for NetworkRequest .vid_shares() .contains_key(&prop_view) { - self.spawn_requests(prop_view, prop_epoch, sender, receiver); + self.spawn_requests(prop_view, prop_epoch, sender, receiver) + .await; } Ok(()) } @@ -145,7 +156,7 @@ impl> TaskState for NetworkRequest impl> NetworkRequestState { /// Creates and signs the payload, then will create a request task - fn spawn_requests( + async fn spawn_requests( &mut self, view: TYPES::View, epoch: TYPES::Epoch, @@ -163,13 +174,14 @@ impl> NetworkRequestState, signature: Signature, @@ -185,17 +197,19 @@ impl> NetworkRequestState = self - .membership + let mut recipients: Vec = membership_reader .da_committee_members(view, epoch) .into_iter() .collect(); + drop(membership_reader); + // Randomize the recipients so all replicas don't overload the same 1 recipients // and so we don't implicitly rely on the same replica all the time. recipients.shuffle(&mut thread_rng()); diff --git a/crates/task-impls/src/response.rs b/crates/task-impls/src/response.rs index 27983cdffa..86543a5db9 100644 --- a/crates/task-impls/src/response.rs +++ b/crates/task-impls/src/response.rs @@ -7,6 +7,7 @@ use std::{sync::Arc, time::Duration}; use async_broadcast::{Receiver, Sender}; +use async_lock::RwLock; use committable::Committable; use hotshot_types::{ consensus::{Consensus, LockedConsensusState, OuterConsensus}, @@ -31,12 +32,16 @@ const TXNS_TIMEOUT: Duration = Duration::from_millis(100); pub struct NetworkResponseState { /// Locked consensus state consensus: LockedConsensusState, + /// Quorum membership for checking if requesters have state - quorum: Arc, + membership: Arc>, + /// This replicas public key pub_key: TYPES::SignatureKey, + /// This replicas private key private_key: ::PrivateKey, + /// The node's id id: u64, } @@ -45,14 +50,14 @@ impl NetworkResponseState { /// Create the network request state with the info it needs pub fn new( consensus: LockedConsensusState, - quorum: Arc, + membership: Arc>, pub_key: TYPES::SignatureKey, private_key: ::PrivateKey, id: u64, ) -> Self { Self { consensus, - quorum, + membership, pub_key, private_key, id, @@ -71,8 +76,9 @@ impl NetworkResponseState { // break loop when false, this means shutdown received match event.as_ref() { HotShotEvent::VidRequestRecv(request, sender) => { + let cur_epoch = self.consensus.read().await.cur_epoch(); // Verify request is valid - if !self.valid_sender(sender, self.consensus.read().await.cur_epoch()) + if !self.valid_sender(sender, cur_epoch).await || !valid_signature::(request, sender) { continue; @@ -99,17 +105,18 @@ impl NetworkResponseState { return; } - if let Some(quorum_proposal) = self + let quorum_proposal_result = self .consensus .read() .await .last_proposals() .get(&req.view_number) - { + .cloned(); + if let Some(quorum_proposal) = quorum_proposal_result { broadcast_event( HotShotEvent::QuorumProposalResponseSend( req.key.clone(), - quorum_proposal.clone(), + quorum_proposal, ) .into(), &event_sender, @@ -151,7 +158,7 @@ impl NetworkResponseState { if Consensus::calculate_and_update_vid( OuterConsensus::new(Arc::clone(&self.consensus)), view, - Arc::clone(&self.quorum), + Arc::clone(&self.membership), &self.private_key, ) .await @@ -162,7 +169,7 @@ impl NetworkResponseState { Consensus::calculate_and_update_vid( OuterConsensus::new(Arc::clone(&self.consensus)), view, - Arc::clone(&self.quorum), + Arc::clone(&self.membership), &self.private_key, ) .await?; @@ -178,8 +185,8 @@ impl NetworkResponseState { } /// Makes sure the sender is allowed to send a request in the given epoch. - fn valid_sender(&self, sender: &TYPES::SignatureKey, epoch: TYPES::Epoch) -> bool { - self.quorum.has_stake(sender, epoch) + async fn valid_sender(&self, sender: &TYPES::SignatureKey, epoch: TYPES::Epoch) -> bool { + self.membership.read().await.has_stake(sender, epoch) } } diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index c635b6ab57..cb911e8522 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -10,6 +10,7 @@ use std::{ }; use async_broadcast::{Receiver, Sender}; +use async_lock::RwLock; use async_trait::async_trait; use futures::{future::join_all, stream::FuturesUnordered, StreamExt}; use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo; @@ -94,7 +95,7 @@ pub struct TransactionTaskState, V pub consensus: OuterConsensus, /// Membership for the quorum - pub membership: Arc, + pub membership: Arc>, /// Builder 0.1 API clients pub builder_clients: Vec>, @@ -216,12 +217,10 @@ impl, V: Versions> TransactionTask .number_of_empty_blocks_proposed .add(1); - let membership_total_nodes = self.membership.total_nodes(self.cur_epoch); - let Some(null_fee) = null_block::builder_fee::( - self.membership.total_nodes(self.cur_epoch), - version, - *block_view, - ) else { + let membership_total_nodes = self.membership.read().await.total_nodes(self.cur_epoch); + let Some(null_fee) = + null_block::builder_fee::(membership_total_nodes, version, *block_view) + else { tracing::error!("Failed to get null fee"); return None; }; @@ -359,18 +358,16 @@ impl, V: Versions> TransactionTask } /// Produce a null block - pub fn null_block( + pub async fn null_block( &self, block_view: TYPES::View, block_epoch: TYPES::Epoch, version: Version, ) -> Option> { - let membership_total_nodes = self.membership.total_nodes(self.cur_epoch); - let Some(null_fee) = null_block::builder_fee::( - self.membership.total_nodes(self.cur_epoch), - version, - *block_view, - ) else { + let membership_total_nodes = self.membership.read().await.total_nodes(self.cur_epoch); + let Some(null_fee) = + null_block::builder_fee::(membership_total_nodes, version, *block_view) + else { tracing::error!("Failed to calculate null block fee."); return None; }; @@ -421,7 +418,7 @@ impl, V: Versions> TransactionTask e ); - let null_block = self.null_block(block_view, block_epoch, version)?; + let null_block = self.null_block(block_view, block_epoch, version).await?; // Increment the metric for number of empty blocks proposed self.consensus @@ -499,7 +496,8 @@ impl, V: Versions> TransactionTask self.cur_view = view; self.cur_epoch = epoch; - if self.membership.leader(view, epoch)? == self.public_key { + let leader = self.membership.read().await.leader(view, epoch)?; + if leader == self.public_key { self.handle_view_change(&event_stream, view, epoch).await; return Ok(()); } @@ -763,8 +761,9 @@ impl, V: Versions> TransactionTask // If epochs are supported, provide the latest `num_nodes` information to the // builder for VID computation. let (block, header_input) = if version >= V::Epochs::VERSION { + let total_nodes = self.membership.read().await.total_nodes(self.cur_epoch); futures::join! { - client.claim_block_with_num_nodes(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature, self.membership.total_nodes(self.cur_epoch)) , + client.claim_block_with_num_nodes(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature, total_nodes), client.claim_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature) } } else { diff --git a/crates/task-impls/src/upgrade.rs b/crates/task-impls/src/upgrade.rs index 16f8c7e555..3ff8e073a8 100644 --- a/crates/task-impls/src/upgrade.rs +++ b/crates/task-impls/src/upgrade.rs @@ -7,6 +7,7 @@ use std::{marker::PhantomData, sync::Arc, time::SystemTime}; use async_broadcast::{Receiver, Sender}; +use async_lock::RwLock; use async_trait::async_trait; use committable::Committable; use hotshot_task::task::TaskState; @@ -50,7 +51,7 @@ pub struct UpgradeTaskState { pub cur_epoch: TYPES::Epoch, /// Membership for Quorum Certs/votes - pub quorum_membership: Arc, + pub membership: Arc>, /// A map of `UpgradeVote` collector tasks pub vote_collectors: VoteCollectorsMap, UpgradeCertificate, V>, @@ -180,7 +181,7 @@ impl UpgradeTaskState { ); // We then validate that the proposal was issued by the leader for the view. - let view_leader_key = self.quorum_membership.leader(view, self.cur_epoch)?; + let view_leader_key = self.membership.read().await.leader(view, self.cur_epoch)?; ensure!( view_leader_key == *sender, info!( @@ -223,13 +224,13 @@ impl UpgradeTaskState { // Check if we are the leader. { let view = vote.view_number(); + let membership_reader = self.membership.read().await; ensure!( - self.quorum_membership.leader(view, self.cur_epoch)? == self.public_key, + membership_reader.leader(view, self.cur_epoch)? == self.public_key, debug!( "We are not the leader for view {} are we leader for next view? {}", *view, - self.quorum_membership.leader(view + 1, self.cur_epoch)? - == self.public_key + membership_reader.leader(view + 1, self.cur_epoch)? == self.public_key ) ); } @@ -238,7 +239,7 @@ impl UpgradeTaskState { &mut self.vote_collectors, vote, self.public_key.clone(), - &self.quorum_membership, + &self.membership, self.cur_epoch, self.id, &event, @@ -265,16 +266,18 @@ impl UpgradeTaskState { ))? .as_secs(); + let leader = self.membership.read().await.leader( + TYPES::View::new(view + UPGRADE_PROPOSE_OFFSET), + self.cur_epoch, + )?; + // We try to form a certificate 5 views before we're leader. if view >= self.start_proposing_view && view < self.stop_proposing_view && time >= self.start_proposing_time && time < self.stop_proposing_time && !self.upgraded().await - && self.quorum_membership.leader( - TYPES::View::new(view + UPGRADE_PROPOSE_OFFSET), - self.cur_epoch, - )? == self.public_key + && leader == self.public_key { let upgrade_proposal_data = UpgradeProposalData { old_version: V::Base::VERSION, diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index 8b651ef540..3754e2a01d 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -7,6 +7,7 @@ use std::{marker::PhantomData, sync::Arc}; use async_broadcast::{Receiver, Sender}; +use async_lock::RwLock; use async_trait::async_trait; use hotshot_task::task::TaskState; use hotshot_types::{ @@ -45,7 +46,7 @@ pub struct VidTaskState> { pub network: Arc, /// Membership for the quorum - pub membership: Arc, + pub membership: Arc>, /// This Nodes Public Key pub public_key: TYPES::SignatureKey, @@ -83,7 +84,14 @@ impl> VidTaskState { ::BlockPayload::from_bytes(encoded_transactions, metadata); let builder_commitment = payload.builder_commitment(metadata); let epoch = self.cur_epoch; - if self.membership.leader(*view_number, epoch).ok()? != self.public_key { + if self + .membership + .read() + .await + .leader(*view_number, epoch) + .ok()? + != self.public_key + { tracing::debug!( "We are not the leader in the current epoch. Do not send the VID dispersal." ); diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs index 65e1ad7d6b..4f40516b3e 100644 --- a/crates/task-impls/src/view_sync.rs +++ b/crates/task-impls/src/view_sync.rs @@ -74,7 +74,7 @@ pub struct ViewSyncTaskState { pub cur_epoch: TYPES::Epoch, /// Membership for the quorum - pub membership: Arc, + pub membership: Arc>, /// This Nodes Public Key pub public_key: TYPES::SignatureKey, @@ -161,7 +161,7 @@ pub struct ViewSyncReplicaTaskState { pub id: u64, /// Membership for the quorum - pub membership: Arc, + pub membership: Arc>, /// This Nodes Public Key pub public_key: TYPES::SignatureKey, @@ -311,7 +311,11 @@ impl ViewSyncTaskState { // We do not have a relay task already running, so start one ensure!( - self.membership.leader(vote_view + relay, self.cur_epoch)? == self.public_key, + self.membership + .read() + .await + .leader(vote_view + relay, self.cur_epoch)? + == self.public_key, "View sync vote sent to wrong leader" ); @@ -356,7 +360,11 @@ impl ViewSyncTaskState { // We do not have a relay task already running, so start one ensure!( - self.membership.leader(vote_view + relay, self.cur_epoch)? == self.public_key, + self.membership + .read() + .await + .leader(vote_view + relay, self.cur_epoch)? + == self.public_key, debug!("View sync vote sent to wrong leader") ); @@ -401,7 +409,11 @@ impl ViewSyncTaskState { // We do not have a relay task already running, so start one ensure!( - self.membership.leader(vote_view + relay, self.cur_epoch)? == self.public_key, + self.membership + .read() + .await + .leader(vote_view + relay, self.cur_epoch)? + == self.public_key, debug!("View sync vote sent to wrong leader") ); @@ -474,7 +486,11 @@ impl ViewSyncTaskState { ); self.num_timeouts_tracked += 1; - let leader = self.membership.leader(view_number, self.cur_epoch)?; + let leader = self + .membership + .read() + .await + .leader(view_number, self.cur_epoch)?; tracing::warn!( %leader, leader_mnemonic = hotshot_types::utils::mnemonic(&leader), @@ -532,11 +548,17 @@ impl ViewSyncReplicaTaskState { return None; } + let membership_reader = self.membership.read().await; + let membership_stake_table = membership_reader.stake_table(self.cur_epoch); + let membership_failure_threshold = + membership_reader.failure_threshold(self.cur_epoch); + drop(membership_reader); + // If certificate is not valid, return current state if !certificate .is_valid_cert( - self.membership.stake_table(self.cur_epoch), - self.membership.failure_threshold(self.cur_epoch), + membership_stake_table, + membership_failure_threshold, &self.upgrade_lock, ) .await @@ -616,11 +638,17 @@ impl ViewSyncReplicaTaskState { return None; } + let membership_reader = self.membership.read().await; + let membership_stake_table = membership_reader.stake_table(self.cur_epoch); + let membership_success_threshold = + membership_reader.success_threshold(self.cur_epoch); + drop(membership_reader); + // If certificate is not valid, return current state if !certificate .is_valid_cert( - self.membership.stake_table(self.cur_epoch), - self.membership.success_threshold(self.cur_epoch), + membership_stake_table, + membership_success_threshold, &self.upgrade_lock, ) .await @@ -711,11 +739,17 @@ impl ViewSyncReplicaTaskState { return None; } + let membership_reader = self.membership.read().await; + let membership_stake_table = membership_reader.stake_table(self.cur_epoch); + let membership_success_threshold = + membership_reader.success_threshold(self.cur_epoch); + drop(membership_reader); + // If certificate is not valid, return current state if !certificate .is_valid_cert( - self.membership.stake_table(self.cur_epoch), - self.membership.success_threshold(self.cur_epoch), + membership_stake_table, + membership_success_threshold, &self.upgrade_lock, ) .await diff --git a/crates/task-impls/src/vote_collection.rs b/crates/task-impls/src/vote_collection.rs index cc2ec6c7c9..902630794a 100644 --- a/crates/task-impls/src/vote_collection.rs +++ b/crates/task-impls/src/vote_collection.rs @@ -12,6 +12,7 @@ use std::{ }; use async_broadcast::Sender; +use async_lock::RwLock; use async_trait::async_trait; use either::Either::{self, Left, Right}; use hotshot_types::{ @@ -51,7 +52,7 @@ pub struct VoteCollectionTaskState< pub public_key: TYPES::SignatureKey, /// Membership for voting - pub membership: Arc, + pub membership: Arc>, /// accumulator handles aggregating the votes pub accumulator: Option>, @@ -113,7 +114,7 @@ impl< matches!( self.transition_indicator, EpochTransitionIndicator::InTransition - ) || vote.leader(&self.membership, self.epoch)? == self.public_key, + ) || vote.leader(&*self.membership.read().await, self.epoch)? == self.public_key, info!("Received vote for a view in which we were not the leader.") ); @@ -177,12 +178,16 @@ where pub struct AccumulatorInfo { /// This nodes Pub Key pub public_key: TYPES::SignatureKey, + /// Membership we are accumulation votes for - pub membership: Arc, + pub membership: Arc>, + /// View of the votes we are collecting pub view: TYPES::View, + /// Epoch of the votes we are collecting pub epoch: TYPES::Epoch, + /// This nodes id pub id: u64, } @@ -256,7 +261,7 @@ pub async fn handle_vote< collectors: &mut VoteCollectorsMap, vote: &VOTE, public_key: TYPES::SignatureKey, - membership: &Arc, + membership: &Arc>, epoch: TYPES::Epoch, id: u64, event: &Arc>, diff --git a/crates/testing/src/byzantine/byzantine_behaviour.rs b/crates/testing/src/byzantine/byzantine_behaviour.rs index 449629d5ca..3c0790b595 100644 --- a/crates/testing/src/byzantine/byzantine_behaviour.rs +++ b/crates/testing/src/byzantine/byzantine_behaviour.rs @@ -339,7 +339,7 @@ impl + std::fmt::Debug, V: Version &self, handle: &mut SystemContextHandle, network: Arc<>::Network>, - membership: TYPES::Membership, + membership: Arc>, ) { let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState { network, diff --git a/crates/testing/src/helpers.rs b/crates/testing/src/helpers.rs index 39d31c86a7..6d945894f3 100644 --- a/crates/testing/src/helpers.rs +++ b/crates/testing/src/helpers.rs @@ -8,6 +8,7 @@ use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; use async_broadcast::{Receiver, Sender}; +use async_lock::RwLock; use bitvec::bitvec; use committable::Committable; use hotshot::{ @@ -109,10 +110,10 @@ pub async fn build_system_handle_from_launcher< let private_key = validator_config.private_key.clone(); let public_key = validator_config.public_key.clone(); - let memberships = TYPES::Membership::new( + let memberships = Arc::new(RwLock::new(TYPES::Membership::new( config.known_nodes_with_stake.clone(), config.known_da_nodes.clone(), - ); + ))); SystemContext::init( public_key, @@ -141,7 +142,7 @@ pub async fn build_cert< CERT: Certificate, >( data: DATAType, - da_membership: &TYPES::Membership, + membership: &Arc>, view: TYPES::View, epoch: TYPES::Epoch, public_key: &TYPES::SignatureKey, @@ -150,7 +151,7 @@ pub async fn build_cert< ) -> CERT { let real_qc_sig = build_assembled_sig::( &data, - da_membership, + membership, view, epoch, upgrade_lock, @@ -207,17 +208,20 @@ pub async fn build_assembled_sig< DATAType: Committable + Clone + Eq + Hash + Serialize + Debug + 'static, >( data: &DATAType, - membership: &TYPES::Membership, + membership: &Arc>, view: TYPES::View, epoch: TYPES::Epoch, upgrade_lock: &UpgradeLock, ) -> ::QcType { - let stake_table = CERT::stake_table(membership, epoch); + let membership_reader = membership.read().await; + let stake_table = CERT::stake_table(&*membership_reader, epoch); let real_qc_pp: ::QcParams = ::public_parameter( stake_table.clone(), - U256::from(CERT::threshold(membership, epoch)), + U256::from(CERT::threshold(&*membership_reader, epoch)), ); + drop(membership_reader); + let total_nodes = stake_table.len(); let signers = bitvec![1; total_nodes]; let mut sig_lists = Vec::new(); @@ -265,33 +269,34 @@ pub fn key_pair_for_id( /// # Panics /// if unable to create a [`VidSchemeType`] #[must_use] -pub fn vid_scheme_from_view_number( - membership: &TYPES::Membership, +pub async fn vid_scheme_from_view_number( + membership: &Arc>, view_number: TYPES::View, epoch_number: TYPES::Epoch, ) -> VidSchemeType { let num_storage_nodes = membership + .read() + .await .committee_members(view_number, epoch_number) .len(); vid_scheme(num_storage_nodes) } -pub fn vid_payload_commitment( - quorum_membership: &::Membership, +pub async fn vid_payload_commitment( + membership: &Arc::Membership>>, view_number: TYPES::View, epoch_number: TYPES::Epoch, transactions: Vec, ) -> VidCommitment { - let mut vid = - vid_scheme_from_view_number::(quorum_membership, view_number, epoch_number); + let mut vid = vid_scheme_from_view_number::(membership, view_number, epoch_number).await; let encoded_transactions = TestTransaction::encode(&transactions); let vid_disperse = vid.disperse(&encoded_transactions).unwrap(); vid_disperse.commit } -pub fn da_payload_commitment( - quorum_membership: &::Membership, +pub async fn da_payload_commitment( + membership: &Arc::Membership>>, transactions: Vec, epoch_number: TYPES::Epoch, ) -> VidCommitment { @@ -299,42 +304,42 @@ pub fn da_payload_commitment( vid_commitment( &encoded_transactions, - quorum_membership.total_nodes(epoch_number), + membership.read().await.total_nodes(epoch_number), ) } -pub fn build_payload_commitment( - membership: &::Membership, +pub async fn build_payload_commitment( + membership: &Arc::Membership>>, view: TYPES::View, epoch: TYPES::Epoch, ) -> ::Commit { // Make some empty encoded transactions, we just care about having a commitment handy for the // later calls. We need the VID commitment to be able to propose later. - let mut vid = vid_scheme_from_view_number::(membership, view, epoch); + let mut vid = vid_scheme_from_view_number::(membership, view, epoch).await; let encoded_transactions = Vec::new(); vid.commit_only(&encoded_transactions).unwrap() } /// TODO: -pub fn build_vid_proposal( - quorum_membership: &::Membership, +pub async fn build_vid_proposal( + membership: &Arc::Membership>>, view_number: TYPES::View, epoch_number: TYPES::Epoch, transactions: Vec, private_key: &::PrivateKey, ) -> VidProposal { - let mut vid = - vid_scheme_from_view_number::(quorum_membership, view_number, epoch_number); + let mut vid = vid_scheme_from_view_number::(membership, view_number, epoch_number).await; let encoded_transactions = TestTransaction::encode(&transactions); let vid_disperse = VidDisperse::from_membership( view_number, vid.disperse(&encoded_transactions).unwrap(), - quorum_membership, + membership, epoch_number, epoch_number, None, - ); + ) + .await; let signature = TYPES::SignatureKey::sign(private_key, vid_disperse.payload_commitment.as_ref()) @@ -360,7 +365,7 @@ pub fn build_vid_proposal( #[allow(clippy::too_many_arguments)] pub async fn build_da_certificate( - membership: &::Membership, + membership: &Arc::Membership>>, view_number: TYPES::View, epoch_number: TYPES::Epoch, transactions: Vec, @@ -370,8 +375,10 @@ pub async fn build_da_certificate( ) -> DaCertificate2 { let encoded_transactions = TestTransaction::encode(&transactions); - let da_payload_commitment = - vid_commitment(&encoded_transactions, membership.total_nodes(epoch_number)); + let da_payload_commitment = vid_commitment( + &encoded_transactions, + membership.read().await.total_nodes(epoch_number), + ); let da_data = DaData2 { payload_commit: da_payload_commitment, diff --git a/crates/testing/src/overall_safety_task.rs b/crates/testing/src/overall_safety_task.rs index caf92a87f6..4aee625c08 100644 --- a/crates/testing/src/overall_safety_task.rs +++ b/crates/testing/src/overall_safety_task.rs @@ -192,27 +192,23 @@ impl, V: Versions> TestTas } let epoch = TYPES::Epoch::new(self.ctx.latest_epoch); - let len = self - .handles - .read() - .await - .first() - .unwrap() - .handle - .memberships - .total_nodes(epoch); + let memberships_arc = Arc::clone( + &self + .handles + .read() + .await + .first() + .unwrap() + .handle + .memberships, + ); + let memberships_reader = memberships_arc.read().await; + let len = memberships_reader.total_nodes(epoch); // update view count - let threshold = self - .handles - .read() - .await - .first() - .unwrap() - .handle - .memberships - .success_threshold(epoch) - .get() as usize; + let threshold = memberships_reader.success_threshold(epoch).get() as usize; + drop(memberships_reader); + drop(memberships_arc); let view = self.ctx.round_results.get_mut(&view_number).unwrap(); if let Some(key) = key { diff --git a/crates/testing/src/spinning_task.rs b/crates/testing/src/spinning_task.rs index e9dd819802..66438d701d 100644 --- a/crates/testing/src/spinning_task.rs +++ b/crates/testing/src/spinning_task.rs @@ -232,7 +232,7 @@ where }; let storage = node.handle.storage().clone(); - let memberships = node.handle.memberships.clone(); + let memberships = Arc::clone(&node.handle.memberships); let config = node.handle.hotshot.config.clone(); let marketplace_config = node.handle.hotshot.marketplace_config.clone(); @@ -270,7 +270,7 @@ where TestRunner::::add_node_with_config_and_channels( node_id, generated_network.clone(), - (*memberships).clone(), + memberships, initializer, config, validator_config, diff --git a/crates/testing/src/test_builder.rs b/crates/testing/src/test_builder.rs index 2e2329b1b6..a4083f461c 100644 --- a/crates/testing/src/test_builder.rs +++ b/crates/testing/src/test_builder.rs @@ -4,18 +4,12 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use super::{ - completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, - overall_safety_task::OverallSafetyPropertiesDescription, - txn_task::TxnTaskDescription, -}; -use crate::{ - spinning_task::SpinningTaskDescription, - test_launcher::{Network, ResourceGenerators, TestLauncher}, - test_task::TestTaskStateSeed, - view_sync_task::ViewSyncTaskDescription, +use std::{ + any::TypeId, collections::HashMap, num::NonZeroUsize, rc::Rc, sync::Arc, time::Duration, }; + use anyhow::{ensure, Result}; +use async_lock::RwLock; use hotshot::{ tasks::EventTransformerState, traits::{NetworkReliability, NodeImplementation, TestableNodeImplementation}, @@ -31,11 +25,21 @@ use hotshot_types::{ traits::node_implementation::{NodeType, Versions}, HotShotConfig, ValidatorConfig, }; -use std::any::TypeId; -use std::{collections::HashMap, num::NonZeroUsize, rc::Rc, sync::Arc, time::Duration}; use tide_disco::Url; use vec1::Vec1; +use super::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + overall_safety_task::OverallSafetyPropertiesDescription, + txn_task::TxnTaskDescription, +}; +use crate::{ + spinning_task::SpinningTaskDescription, + test_launcher::{Network, ResourceGenerators, TestLauncher}, + test_task::TestTaskStateSeed, + view_sync_task::ViewSyncTaskDescription, +}; + pub type TransactionValidator = Arc) -> Result<()> + Send + Sync>; /// data describing how a round should be timed. @@ -172,7 +176,7 @@ pub async fn create_test_handle< metadata: TestDescription, node_id: u64, network: Network, - memberships: TYPES::Membership, + memberships: Arc>, config: HotShotConfig, storage: I::Storage, marketplace_config: MarketplaceConfig, diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index ec03e3770a..7d0c1cc6bb 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -425,10 +425,11 @@ where self.next_node_id += 1; tracing::debug!("launch node {}", i); - let memberships = ::Membership::new( - config.known_nodes_with_stake.clone(), - config.known_da_nodes.clone(), - ); + //let memberships =Arc::new(RwLock::new(::Membership::new( + //config.known_nodes_with_stake.clone(), + //config.known_da_nodes.clone(), + //))); + config.builder_urls = builder_urls .clone() .try_into() @@ -465,7 +466,10 @@ where context: LateNodeContext::UninitializedContext( LateNodeContextParameters { storage, - memberships, + memberships: ::Membership::new( + config.known_nodes_with_stake.clone(), + config.known_da_nodes.clone(), + ), config, marketplace_config, }, @@ -489,7 +493,10 @@ where let hotshot = Self::add_node_with_config( node_id, network.clone(), - memberships, + ::Membership::new( + config.known_nodes_with_stake.clone(), + config.known_da_nodes.clone(), + ), initializer, config, validator_config, @@ -509,7 +516,10 @@ where uninitialized_nodes.push(( node_id, network, - memberships, + ::Membership::new( + config.known_nodes_with_stake.clone(), + config.known_da_nodes.clone(), + ), config, storage, marketplace_config, @@ -544,7 +554,7 @@ where self.launcher.metadata.clone(), node_id, network.clone(), - memberships, + Arc::new(RwLock::new(memberships)), config.clone(), storage, marketplace_config, @@ -599,7 +609,7 @@ where private_key, node_id, config, - memberships, + Arc::new(RwLock::new(memberships)), network, initializer, ConsensusMetricsValue::default(), @@ -616,7 +626,7 @@ where pub async fn add_node_with_config_and_channels( node_id: u64, network: Network, - memberships: TYPES::Membership, + memberships: Arc>, initializer: HotShotInitializer, config: HotShotConfig, validator_config: ValidatorConfig, diff --git a/crates/testing/src/view_generator.rs b/crates/testing/src/view_generator.rs index a7390f98ed..81da7fffb4 100644 --- a/crates/testing/src/view_generator.rs +++ b/crates/testing/src/view_generator.rs @@ -12,6 +12,7 @@ use std::{ task::{Context, Poll}, }; +use async_lock::RwLock; use committable::Committable; use futures::{FutureExt, Stream}; use hotshot::types::{BLSPubKey, SignatureKey, SystemContextHandle}; @@ -55,7 +56,7 @@ pub struct TestView { pub leaf: Leaf2, pub view_number: ViewNumber, pub epoch_number: EpochNumber, - pub membership: ::Membership, + pub membership: Arc::Membership>>, pub vid_disperse: Proposal>, pub vid_proposal: ( Vec>>, @@ -72,7 +73,7 @@ pub struct TestView { } impl TestView { - pub async fn genesis(membership: &::Membership) -> Self { + pub async fn genesis(membership: &Arc::Membership>>) -> Self { let genesis_view = ViewNumber::new(1); let genesis_epoch = EpochNumber::new(0); let upgrade_lock = UpgradeLock::new(); @@ -98,7 +99,8 @@ impl TestView { let leader_public_key = public_key; let payload_commitment = - da_payload_commitment::(membership, transactions.clone(), genesis_epoch); + da_payload_commitment::(membership, transactions.clone(), genesis_epoch) + .await; let (vid_disperse, vid_proposal) = build_vid_proposal( membership, @@ -106,7 +108,8 @@ impl TestView { genesis_epoch, transactions.clone(), &private_key, - ); + ) + .await; let da_certificate = build_da_certificate( membership, @@ -241,7 +244,8 @@ impl TestView { ); let payload_commitment = - da_payload_commitment::(membership, transactions.clone(), self.epoch_number); + da_payload_commitment::(membership, transactions.clone(), self.epoch_number) + .await; let (vid_disperse, vid_proposal) = build_vid_proposal( membership, @@ -249,7 +253,8 @@ impl TestView { self.epoch_number, transactions.clone(), &private_key, - ); + ) + .await; let da_certificate = build_da_certificate::( membership, @@ -492,11 +497,11 @@ impl TestView { pub struct TestViewGenerator { pub current_view: Option, - pub membership: ::Membership, + pub membership: Arc::Membership>>, } impl TestViewGenerator { - pub fn generate(membership: ::Membership) -> Self { + pub fn generate(membership: Arc::Membership>>) -> Self { TestViewGenerator { current_view: None, membership, @@ -576,13 +581,13 @@ impl Stream for TestViewGenerator { type Item = TestView; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mem = &self.membership.clone(); + let mem = Arc::clone(&self.membership); let curr_view = &self.current_view.clone(); let mut fut = if let Some(ref view) = curr_view { async move { TestView::next_view(view).await }.boxed() } else { - async move { TestView::genesis(mem).await }.boxed() + async move { TestView::genesis(&mem).await }.boxed() }; match fut.as_mut().poll(cx) { diff --git a/crates/testing/tests/tests_1/da_task.rs b/crates/testing/tests/tests_1/da_task.rs index c7af796395..4e594bb4a2 100644 --- a/crates/testing/tests/tests_1/da_task.rs +++ b/crates/testing/tests/tests_1/da_task.rs @@ -40,7 +40,7 @@ async fn test_da_task() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); // Make some empty encoded transactions, we just care about having a commitment handy for the // later calls. We need the VID commitment to be able to propose later. @@ -48,7 +48,12 @@ async fn test_da_task() { let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); let (payload_commit, precompute) = precompute_vid_commitment( &encoded_transactions, - handle.hotshot.memberships.total_nodes(EpochNumber::new(0)), + handle + .hotshot + .memberships + .read() + .await + .total_nodes(EpochNumber::new(0)), ); let mut generator = TestViewGenerator::generate(membership.clone()); @@ -107,7 +112,7 @@ async fn test_da_task() { ViewNumber::new(2), EpochNumber::new(0), vec1::vec1![null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(0)), + membership.read().await.total_nodes(EpochNumber::new(0)), ::Base::VERSION, *ViewNumber::new(2), ) @@ -148,7 +153,7 @@ async fn test_da_task_storage_failure() { // Set the error flag here for the system handle. This causes it to emit an error on append. handle.storage().write().await.should_return_err = true; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); // Make some empty encoded transactions, we just care about having a commitment handy for the // later calls. We need the VID commitment to be able to propose later. @@ -156,10 +161,15 @@ async fn test_da_task_storage_failure() { let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); let (payload_commit, precompute) = precompute_vid_commitment( &encoded_transactions, - handle.hotshot.memberships.total_nodes(EpochNumber::new(0)), + handle + .hotshot + .memberships + .read() + .await + .total_nodes(EpochNumber::new(0)), ); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(Arc::clone(&membership)); let mut proposals = Vec::new(); let mut leaders = Vec::new(); @@ -215,7 +225,7 @@ async fn test_da_task_storage_failure() { ViewNumber::new(2), EpochNumber::new(0), vec1::vec1![null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(0)), + membership.read().await.total_nodes(EpochNumber::new(0)), ::Base::VERSION, *ViewNumber::new(2), ) diff --git a/crates/testing/tests/tests_1/message.rs b/crates/testing/tests/tests_1/message.rs index e19cce630b..456a1321bd 100644 --- a/crates/testing/tests/tests_1/message.rs +++ b/crates/testing/tests/tests_1/message.rs @@ -6,6 +6,7 @@ #[cfg(test)] use std::marker::PhantomData; +use std::sync::Arc; use committable::Committable; use hotshot_example_types::node_types::TestTypes; @@ -78,9 +79,9 @@ async fn test_certificate2_validity() { let handle = build_system_handle::(node_id) .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(Arc::clone(&membership)); let mut proposals = Vec::new(); let mut leaders = Vec::new(); @@ -103,10 +104,15 @@ async fn test_certificate2_validity() { let qc2 = proposal.data.justify_qc.clone(); let qc = qc2.clone().to_qc(); + let membership_reader = membership.read().await; + let membership_stake_table = membership_reader.stake_table(EpochNumber::new(0)); + let membership_success_threshold = membership_reader.success_threshold(EpochNumber::new(0)); + drop(membership_reader); + assert!( qc.is_valid_cert( - membership.stake_table(EpochNumber::new(0)), - membership.success_threshold(EpochNumber::new(0)), + membership_stake_table.clone(), + membership_success_threshold, &handle.hotshot.upgrade_lock ) .await @@ -114,8 +120,8 @@ async fn test_certificate2_validity() { assert!( qc2.is_valid_cert( - membership.stake_table(EpochNumber::new(0)), - membership.success_threshold(EpochNumber::new(0)), + membership_stake_table, + membership_success_threshold, &handle.hotshot.upgrade_lock ) .await diff --git a/crates/testing/tests/tests_1/network_task.rs b/crates/testing/tests/tests_1/network_task.rs index a4e1c29a03..a23ae64cd5 100644 --- a/crates/testing/tests/tests_1/network_task.rs +++ b/crates/testing/tests/tests_1/network_task.rs @@ -58,13 +58,16 @@ async fn test_network_task() { let all_nodes = config.known_nodes_with_stake.clone(); - let membership = ::Membership::new(all_nodes.clone(), all_nodes); + let membership = Arc::new(RwLock::new(::Membership::new( + all_nodes.clone(), + all_nodes, + ))); let network_state: NetworkEventTaskState, _> = NetworkEventTaskState { network: network.clone(), view: ViewNumber::new(0), epoch: EpochNumber::new(0), - membership: membership.clone(), + membership: Arc::clone(&membership), upgrade_lock: upgrade_lock.clone(), storage, consensus, @@ -227,13 +230,16 @@ async fn test_network_storage_fail() { let all_nodes = config.known_nodes_with_stake.clone(); let upgrade_lock = UpgradeLock::::new(); - let membership = ::Membership::new(all_nodes.clone(), all_nodes); + let membership = Arc::new(RwLock::new(::Membership::new( + all_nodes.clone(), + all_nodes, + ))); let network_state: NetworkEventTaskState, _> = NetworkEventTaskState { network: network.clone(), view: ViewNumber::new(0), epoch: EpochNumber::new(0), - membership: membership.clone(), + membership: Arc::clone(&membership), upgrade_lock: upgrade_lock.clone(), storage, consensus, diff --git a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs index 97742d0deb..97d39cc9ca 100644 --- a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs +++ b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs @@ -54,11 +54,11 @@ async fn test_quorum_proposal_recv_task() { let handle = build_system_handle::(2) .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let consensus = handle.hotshot.consensus(); let mut consensus_writer = consensus.write().await; - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(membership); let mut proposals = Vec::new(); let mut leaders = Vec::new(); let mut votes = Vec::new(); @@ -129,7 +129,7 @@ async fn test_quorum_proposal_recv_task_liveness_check() { let handle = build_system_handle::(4) .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let consensus = handle.hotshot.consensus(); let mut consensus_writer = consensus.write().await; diff --git a/crates/testing/tests/tests_1/quorum_proposal_task.rs b/crates/testing/tests/tests_1/quorum_proposal_task.rs index 0935aef8f8..caf464ac51 100644 --- a/crates/testing/tests/tests_1/quorum_proposal_task.rs +++ b/crates/testing/tests/tests_1/quorum_proposal_task.rs @@ -51,15 +51,16 @@ async fn test_quorum_proposal_task_quorum_proposal_view_1() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let payload_commitment = build_payload_commitment::( &membership, ViewNumber::new(node_id), EpochNumber::new(1), - ); + ) + .await; - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(Arc::clone(&membership)); let mut proposals = Vec::new(); let mut leaders = Vec::new(); @@ -90,7 +91,7 @@ async fn test_quorum_proposal_task_quorum_proposal_view_1() { let genesis_cert = proposals[0].data.justify_qc.clone(); let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); let builder_fee = null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(1)), + membership.read().await.total_nodes(EpochNumber::new(1)), ::Base::VERSION, *ViewNumber::new(1), ) @@ -145,7 +146,7 @@ async fn test_quorum_proposal_task_quorum_proposal_view_gt_1() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let mut generator = TestViewGenerator::generate(membership.clone()); @@ -182,7 +183,7 @@ async fn test_quorum_proposal_task_quorum_proposal_view_gt_1() { let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); let builder_fee = null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(1)), + membership.read().await.total_nodes(EpochNumber::new(1)), ::Base::VERSION, *ViewNumber::new(1), ) @@ -196,7 +197,8 @@ async fn test_quorum_proposal_task_quorum_proposal_view_gt_1() { &membership, ViewNumber::new(1), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), TestMetadata { num_transactions: 0 @@ -215,7 +217,8 @@ async fn test_quorum_proposal_task_quorum_proposal_view_gt_1() { &membership, ViewNumber::new(2), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[0].data.block_header.metadata, ViewNumber::new(2), @@ -232,7 +235,8 @@ async fn test_quorum_proposal_task_quorum_proposal_view_gt_1() { &membership, ViewNumber::new(3), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[1].data.block_header.metadata, ViewNumber::new(3), @@ -249,7 +253,8 @@ async fn test_quorum_proposal_task_quorum_proposal_view_gt_1() { &membership, ViewNumber::new(4), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[2].data.block_header.metadata, ViewNumber::new(4), @@ -266,7 +271,8 @@ async fn test_quorum_proposal_task_quorum_proposal_view_gt_1() { &membership, ViewNumber::new(5), EpochNumber::new(1) - ), + ) + .await, builder_commitment, proposals[3].data.block_header.metadata, ViewNumber::new(5), @@ -308,16 +314,17 @@ async fn test_quorum_proposal_task_qc_timeout() { let handle = build_system_handle::(node_id) .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let payload_commitment = build_payload_commitment::( &membership, ViewNumber::new(node_id), EpochNumber::new(1), - ); + ) + .await; let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(Arc::clone(&membership)); let mut proposals = Vec::new(); let mut leaders = Vec::new(); @@ -360,7 +367,7 @@ async fn test_quorum_proposal_task_qc_timeout() { }, ViewNumber::new(3), vec1![null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(1)), + membership.read().await.total_nodes(EpochNumber::new(1)), ::Base::VERSION, *ViewNumber::new(3), ) @@ -397,16 +404,17 @@ async fn test_quorum_proposal_task_view_sync() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let payload_commitment = build_payload_commitment::( &membership, ViewNumber::new(node_id), EpochNumber::new(1), - ); + ) + .await; let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(Arc::clone(&membership)); let mut proposals = Vec::new(); let mut leaders = Vec::new(); @@ -451,7 +459,7 @@ async fn test_quorum_proposal_task_view_sync() { }, ViewNumber::new(2), vec1![null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(1)), + membership.read().await.total_nodes(EpochNumber::new(1)), ::Base::VERSION, *ViewNumber::new(2), ) @@ -486,9 +494,9 @@ async fn test_quorum_proposal_task_liveness_check() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(Arc::clone(&membership)); let mut proposals = Vec::new(); let mut leaders = Vec::new(); @@ -518,7 +526,7 @@ async fn test_quorum_proposal_task_liveness_check() { let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); let builder_fee = null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(1)), + membership.read().await.total_nodes(EpochNumber::new(1)), ::Base::VERSION, *ViewNumber::new(1), ) @@ -536,7 +544,8 @@ async fn test_quorum_proposal_task_liveness_check() { &membership, ViewNumber::new(1), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), TestMetadata { num_transactions: 0 @@ -555,7 +564,8 @@ async fn test_quorum_proposal_task_liveness_check() { &membership, ViewNumber::new(2), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[0].data.block_header.metadata, ViewNumber::new(2), @@ -572,7 +582,8 @@ async fn test_quorum_proposal_task_liveness_check() { &membership, ViewNumber::new(3), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[1].data.block_header.metadata, ViewNumber::new(3), @@ -589,7 +600,8 @@ async fn test_quorum_proposal_task_liveness_check() { &membership, ViewNumber::new(4), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[2].data.block_header.metadata, ViewNumber::new(4), @@ -606,7 +618,8 @@ async fn test_quorum_proposal_task_liveness_check() { &membership, ViewNumber::new(5), EpochNumber::new(1) - ), + ) + .await, builder_commitment, proposals[3].data.block_header.metadata, ViewNumber::new(5), @@ -644,7 +657,7 @@ async fn test_quorum_proposal_task_with_incomplete_events() { let handle = build_system_handle::(2) .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let mut generator = TestViewGenerator::generate(membership); diff --git a/crates/testing/tests/tests_1/quorum_vote_task.rs b/crates/testing/tests/tests_1/quorum_vote_task.rs index b5d079f56c..4d6c018fbc 100644 --- a/crates/testing/tests/tests_1/quorum_vote_task.rs +++ b/crates/testing/tests/tests_1/quorum_vote_task.rs @@ -45,9 +45,9 @@ async fn test_quorum_vote_task_success() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(membership); let mut proposals = Vec::new(); let mut leaves = Vec::new(); @@ -112,9 +112,9 @@ async fn test_quorum_vote_task_miss_dependency() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(membership); let mut proposals = Vec::new(); let mut leaders = Vec::new(); @@ -196,7 +196,7 @@ async fn test_quorum_vote_task_incorrect_dependency() { .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let mut generator = TestViewGenerator::generate(membership); diff --git a/crates/testing/tests/tests_1/transaction_task.rs b/crates/testing/tests/tests_1/transaction_task.rs index e4ed70be64..491400d28d 100644 --- a/crates/testing/tests/tests_1/transaction_task.rs +++ b/crates/testing/tests/tests_1/transaction_task.rs @@ -42,7 +42,12 @@ async fn test_transaction_task_leader_two_views_in_a_row() { let (_, precompute_data) = precompute_vid_commitment( &[], - handle.hotshot.memberships.total_nodes(EpochNumber::new(0)), + handle + .hotshot + .memberships + .read() + .await + .total_nodes(EpochNumber::new(0)), ); // current view @@ -55,7 +60,12 @@ async fn test_transaction_task_leader_two_views_in_a_row() { EpochNumber::new(1), vec1::vec1![ null_block::builder_fee::( - handle.hotshot.memberships.total_nodes(EpochNumber::new(0)), + handle + .hotshot + .memberships + .read() + .await + .total_nodes(EpochNumber::new(0)), ::Base::VERSION, *ViewNumber::new(4), ) diff --git a/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs b/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs index 44833d7727..7dd4324426 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs @@ -83,9 +83,9 @@ async fn test_upgrade_task_with_proposal() { let consensus = handle.hotshot.consensus(); let mut consensus_writer = consensus.write().await; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(Arc::clone(&membership)); for view in (&mut generator).take(1).collect::>().await { proposals.push(view.quorum_proposal.clone()); @@ -126,7 +126,7 @@ async fn test_upgrade_task_with_proposal() { let genesis_cert = proposals[0].data.justify_qc.clone(); let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); let builder_fee = null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(1)), + membership.read().await.total_nodes(EpochNumber::new(1)), ::Base::VERSION, *ViewNumber::new(1), ) @@ -156,7 +156,8 @@ async fn test_upgrade_task_with_proposal() { &membership, ViewNumber::new(1), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), TestMetadata { num_transactions: 0 @@ -175,7 +176,8 @@ async fn test_upgrade_task_with_proposal() { &membership, ViewNumber::new(2), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[0].data.block_header.metadata, ViewNumber::new(2), @@ -193,7 +195,8 @@ async fn test_upgrade_task_with_proposal() { &membership, ViewNumber::new(3), EpochNumber::new(1) - ), + ) + .await, builder_commitment.clone(), proposals[1].data.block_header.metadata, ViewNumber::new(3), diff --git a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs index 5390f56e03..7e21efe163 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs @@ -70,7 +70,7 @@ async fn test_upgrade_task_with_vote() { let consensus = handle.hotshot.consensus().clone(); let mut consensus_writer = consensus.write().await; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let mut generator = TestViewGenerator::generate(membership); for view in (&mut generator).take(2).collect::>().await { diff --git a/crates/testing/tests/tests_1/vid_task.rs b/crates/testing/tests/tests_1/vid_task.rs index a8e219e7ad..adafcbfbf7 100644 --- a/crates/testing/tests/tests_1/vid_task.rs +++ b/crates/testing/tests/tests_1/vid_task.rs @@ -45,13 +45,14 @@ async fn test_vid_task() { .0; let pub_key = handle.public_key(); - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); let mut vid = vid_scheme_from_view_number::( &membership, ViewNumber::new(0), EpochNumber::new(0), - ); + ) + .await; let transactions = vec![TestTransaction::new(vec![0])]; let (payload, metadata) = >::from_transactions( @@ -94,7 +95,8 @@ async fn test_vid_task() { EpochNumber::new(0), EpochNumber::new(0), None, - ); + ) + .await; let vid_proposal = Proposal { data: vid_disperse.clone(), @@ -113,7 +115,7 @@ async fn test_vid_task() { ViewNumber::new(2), EpochNumber::new(0), vec1::vec1![null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(0)), + membership.read().await.total_nodes(EpochNumber::new(0)), ::Base::VERSION, *ViewNumber::new(2), ) @@ -135,7 +137,7 @@ async fn test_vid_task() { }, ViewNumber::new(2), vec1![null_block::builder_fee::( - membership.total_nodes(EpochNumber::new(0)), + membership.read().await.total_nodes(EpochNumber::new(0)), ::Base::VERSION, *ViewNumber::new(2), ) diff --git a/crates/testing/tests/tests_1/vote_dependency_handle.rs b/crates/testing/tests/tests_1/vote_dependency_handle.rs index 1b12e0b0f0..51c91750fb 100644 --- a/crates/testing/tests/tests_1/vote_dependency_handle.rs +++ b/crates/testing/tests/tests_1/vote_dependency_handle.rs @@ -36,9 +36,9 @@ async fn test_vote_dependency_handle() { let handle = build_system_handle::(node_id) .await .0; - let membership = (*handle.hotshot.memberships).clone(); + let membership = Arc::clone(&handle.hotshot.memberships); - let mut generator = TestViewGenerator::generate(membership.clone()); + let mut generator = TestViewGenerator::generate(membership); // Generate our state for the test let mut proposals = Vec::new(); @@ -90,7 +90,7 @@ async fn test_vote_dependency_handle() { consensus: OuterConsensus::new(consensus.clone()), consensus_metrics: Arc::clone(&consensus.read().await.metrics), instance_state: handle.hotshot.instance_state(), - quorum_membership: (*handle.hotshot.memberships).clone().into(), + membership: Arc::clone(&handle.hotshot.memberships), storage: Arc::clone(&handle.storage()), view_number, sender: event_sender.clone(), diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 383be130cd..2530417f90 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -938,7 +938,7 @@ impl Consensus { pub async fn calculate_and_update_vid( consensus: OuterConsensus, view: ::View, - membership: Arc, + membership: Arc>, private_key: &::PrivateKey, ) -> Option<()> { let txns = Arc::clone(consensus.read().await.saved_payloads().get(&view)?); diff --git a/crates/types/src/data.rs b/crates/types/src/data.rs index c0dbea802a..0d6fcfff7a 100644 --- a/crates/types/src/data.rs +++ b/crates/types/src/data.rs @@ -228,15 +228,17 @@ impl VidDisperse { /// Create VID dispersal from a specified membership for the target epoch. /// Uses the specified function to calculate share dispersal /// Allows for more complex stake table functionality - pub fn from_membership( + pub async fn from_membership( view_number: TYPES::View, mut vid_disperse: JfVidDisperse, - membership: &TYPES::Membership, + membership: &Arc>, target_epoch: TYPES::Epoch, data_epoch: TYPES::Epoch, data_epoch_payload_commitment: Option, ) -> Self { let shares = membership + .read() + .await .committee_members(view_number, target_epoch) .iter() .map(|node| (node.clone(), vid_disperse.shares.remove(0))) @@ -262,13 +264,13 @@ impl VidDisperse { #[allow(clippy::panic)] pub async fn calculate_vid_disperse( txns: Arc<[u8]>, - membership: &Arc, + membership: &Arc>, view: TYPES::View, target_epoch: TYPES::Epoch, data_epoch: TYPES::Epoch, precompute_data: Option, ) -> Self { - let num_nodes = membership.total_nodes(target_epoch); + let num_nodes = membership.read().await.total_nodes(target_epoch); let txns_clone = Arc::clone(&txns); let vid_disperse = spawn_blocking(move || { @@ -282,7 +284,7 @@ impl VidDisperse { let data_epoch_payload_commitment = if target_epoch == data_epoch { None } else { - let data_epoch_num_nodes = membership.total_nodes(data_epoch); + let data_epoch_num_nodes = membership.read().await.total_nodes(data_epoch); Some(spawn_blocking(move || { vid_scheme(data_epoch_num_nodes).commit_only(&txns) .unwrap_or_else(|err| panic!("VID commit_only failure:(num_storage nodes,payload_byte_len)=({num_nodes},{}) error: {err}", txns.len())) @@ -296,11 +298,12 @@ impl VidDisperse { Self::from_membership( view, vid_disperse, - membership.as_ref(), + membership, target_epoch, data_epoch, data_epoch_payload_commitment, ) + .await } } diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index 0f36bae2de..877cdd6282 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -432,7 +432,7 @@ where /// Returns an error when the proposal signature is invalid. pub async fn validate_signature( &self, - quorum_membership: &TYPES::Membership, + membership: &TYPES::Membership, epoch_height: u64, upgrade_lock: &UpgradeLock, ) -> Result<()> { @@ -441,7 +441,7 @@ where self.data.block_header.block_number(), epoch_height, )); - let view_leader_key = quorum_membership.leader(view_number, proposal_epoch)?; + let view_leader_key = membership.leader(view_number, proposal_epoch)?; let proposed_leaf = Leaf::from_quorum_proposal(&self.data); ensure!( @@ -465,7 +465,7 @@ where /// Returns an error when the proposal signature is invalid. pub fn validate_signature( &self, - quorum_membership: &TYPES::Membership, + membership: &TYPES::Membership, epoch_height: u64, ) -> Result<()> { let view_number = self.data.view_number(); @@ -473,7 +473,7 @@ where self.data.block_header.block_number(), epoch_height, )); - let view_leader_key = quorum_membership.leader(view_number, proposal_epoch)?; + let view_leader_key = membership.leader(view_number, proposal_epoch)?; let proposed_leaf = Leaf2::from_quorum_proposal(&self.data); ensure!( diff --git a/crates/types/src/simple_certificate.rs b/crates/types/src/simple_certificate.rs index 5ea857ed21..0973f8805a 100644 --- a/crates/types/src/simple_certificate.rs +++ b/crates/types/src/simple_certificate.rs @@ -451,15 +451,20 @@ impl UpgradeCertificate { /// Returns an error when the upgrade certificate is invalid. pub async fn validate( upgrade_certificate: &Option, - quorum_membership: &TYPES::Membership, + membership: &RwLock, epoch: TYPES::Epoch, upgrade_lock: &UpgradeLock, ) -> Result<()> { if let Some(ref cert) = upgrade_certificate { + let membership_reader = membership.read().await; + let membership_stake_table = membership_reader.stake_table(epoch); + let membership_upgrade_threshold = membership_reader.upgrade_threshold(epoch); + drop(membership_reader); + ensure!( cert.is_valid_cert( - quorum_membership.stake_table(epoch), - quorum_membership.upgrade_threshold(epoch), + membership_stake_table, + membership_upgrade_threshold, upgrade_lock ) .await, diff --git a/crates/types/src/traits/election.rs b/crates/types/src/traits/election.rs index 5b72ea4f84..0509918574 100644 --- a/crates/types/src/traits/election.rs +++ b/crates/types/src/traits/election.rs @@ -13,7 +13,7 @@ use super::node_implementation::NodeType; use crate::{traits::signature_key::SignatureKey, PeerConfig}; /// A protocol for determining membership in and participating in a committee. -pub trait Membership: Clone + Debug + Send + Sync { +pub trait Membership: Debug + Send + Sync { /// The error type returned by methods like `lookup_leader`. type Error: std::fmt::Display; diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 5d6670407f..f85036dfb9 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -17,6 +17,7 @@ use std::{ time::Duration, }; +use async_lock::RwLock; use async_trait::async_trait; use dyn_clone::DynClone; use futures::{future::join_all, Future}; @@ -262,7 +263,7 @@ pub trait ConnectedNetwork: Clone + Send + Sync + 'st &'a self, _view: u64, _epoch: u64, - _membership: &TYPES::Membership, + _membership: Arc>, ) where TYPES: NodeType + 'a, { diff --git a/crates/types/src/vote.rs b/crates/types/src/vote.rs index 103c470e6d..20d36e67ee 100644 --- a/crates/types/src/vote.rs +++ b/crates/types/src/vote.rs @@ -10,8 +10,10 @@ use std::{ collections::{BTreeMap, HashMap}, marker::PhantomData, num::NonZeroU64, + sync::Arc, }; +use async_lock::RwLock; use bitvec::{bitvec, vec::BitVec}; use committable::{Commitment, Committable}; use either::Either; @@ -162,7 +164,7 @@ impl< pub async fn accumulate( &mut self, vote: &VOTE, - membership: &TYPES::Membership, + membership: &Arc>, epoch: TYPES::Epoch, ) -> Either<(), CERT> { let key = vote.signing_key(); @@ -186,10 +188,16 @@ impl< return Either::Left(()); } - let Some(stake_table_entry) = CERT::stake_table_entry(membership, &key, epoch) else { + let membership_reader = membership.read().await; + let Some(stake_table_entry) = CERT::stake_table_entry(&*membership_reader, &key, epoch) + else { return Either::Left(()); }; - let stake_table = CERT::stake_table(membership, epoch); + let stake_table = CERT::stake_table(&*membership_reader, epoch); + let total_nodes = CERT::total_nodes(&*membership_reader, epoch); + let threshold = CERT::threshold(&*membership_reader, epoch); + drop(membership_reader); + let Some(vote_node_id) = stake_table .iter() .position(|x| *x == stake_table_entry.clone()) @@ -212,7 +220,7 @@ impl< let (signers, sig_list) = self .signers .entry(vote_commitment) - .or_insert((bitvec![0; CERT::total_nodes(membership, epoch)], Vec::new())); + .or_insert((bitvec![0; total_nodes], Vec::new())); if signers.get(vote_node_id).as_deref() == Some(&true) { error!("Node id is already in signers list"); return Either::Left(()); @@ -223,12 +231,12 @@ impl< *total_stake_casted += stake_table_entry.stake(); total_vote_map.insert(key, (vote.signature(), vote_commitment)); - if *total_stake_casted >= CERT::threshold(membership, epoch).into() { + if *total_stake_casted >= threshold.into() { // Assemble QC let real_qc_pp: <::SignatureKey as SignatureKey>::QcParams = ::public_parameter( stake_table, - U256::from(CERT::threshold(membership, epoch)), + U256::from(threshold), ); let real_qc_sig = ::assemble(