From a566cb66fdffd877c951b85ace3232bc5c3af8ba Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 20 Sep 2023 12:22:17 -0400 Subject: [PATCH 1/3] remove transactions from consensus struct --- crates/hotshot/src/lib.rs | 21 -------------- crates/hotshot/src/tasks/mod.rs | 4 ++- crates/task-impls/src/transactions.rs | 41 +++++++++++++++++---------- crates/types/src/consensus.rs | 14 --------- 4 files changed, 29 insertions(+), 51 deletions(-) diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index b09ea6235d..38c05e19be 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -133,11 +133,6 @@ pub struct SystemContextInner> { /// a reference to the metrics that the implementor is using. _metrics: Box, - /// Transactions - /// (this is shared btwn hotshot and `Consensus`) - transactions: - Arc, TYPES::Transaction>>>, - /// The hotstuff implementation consensus: Arc>>, @@ -219,8 +214,6 @@ impl> SystemContext { state_map, cur_view: start_view, last_decided_view: anchored_leaf.get_view_number(), - transactions: Arc::default(), - seen_transactions: HashSet::new(), saved_leaves, saved_blocks, // TODO this is incorrect @@ -231,17 +224,14 @@ impl> SystemContext { invalid_qc: 0, }; let consensus = Arc::new(RwLock::new(consensus)); - let txns = consensus.read().await.get_transactions(); let inner: Arc> = Arc::new(SystemContextInner { id: nonce, channel_maps: I::new_channel_maps(start_view), consensus, - transactions: txns, public_key, private_key, config, - // networking, storage, exchanges: Arc::new(exchanges), event_sender: RwLock::default(), @@ -546,11 +536,6 @@ impl> SystemContext { /// [`HotShot`] implementations that depend on [`TYPES::ConsensusType`]. #[async_trait] pub trait HotShotType> { - /// Get the [`transactions`] field of [`HotShot`]. - fn transactions( - &self, - ) -> &Arc, TYPES::Transaction>>>; - /// Get the [`hotstuff`] field of [`HotShot`]. fn consensus(&self) -> &Arc>>; @@ -684,12 +669,6 @@ where Membership = MEMBERSHIP, > + 'static, { - fn transactions( - &self, - ) -> &Arc, TYPES::Transaction>>> { - &self.inner.transactions - } - fn consensus(&self) -> &Arc>> { &self.inner.consensus } diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 619fdb6d59..a2e68528f4 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -43,7 +43,7 @@ use hotshot_types::{ vote::{ViewSyncData, VoteType}, }; use serde::Serialize; -use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration}; +use std::{collections::{HashMap, HashSet}, marker::PhantomData, sync::Arc, time::Duration}; /// event for global event stream #[derive(Clone, Debug)] @@ -441,6 +441,8 @@ where registry: registry.clone(), api: c_api.clone(), consensus: handle.hotshot.get_consensus(), + transactions: Arc::default(), + seen_transactions: HashSet::new(), cur_view: TYPES::Time::new(0), committee_exchange: committee_exchange.into(), event_stream: event_stream.clone(), diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index 8609708653..b41604376d 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -1,9 +1,11 @@ use crate::events::SequencingHotShotEvent; +use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock; use async_compatibility_layer::{ art::async_timeout, async_primitives::subscribable_rwlock::ReadView, }; use async_lock::RwLock; use bincode::config::Options; +use commit::Commitment; use commit::Committable; use either::{Either, Left, Right}; use hotshot_task::{ @@ -26,9 +28,16 @@ use hotshot_types::{ }; use hotshot_utils::bincode::bincode_opts; use snafu::Snafu; -use std::{collections::HashSet, sync::Arc, time::Instant}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Instant, +}; use tracing::{debug, error, instrument, warn}; +/// A type alias for `HashMap, T>` +type CommitmentMap = HashMap, T>; + #[derive(Snafu, Debug)] /// Error type for consensus tasks pub struct ConsensusTaskError {} @@ -61,6 +70,12 @@ pub struct TransactionTaskState< /// Reference to consensus. Leader will require a read lock on this. pub consensus: Arc>>>, + /// A list of undecided transactions + pub transactions: Arc>>, + + /// A list of transactions we've seen decided, but didn't receive + pub seen_transactions: HashSet>, + /// the committee exchange pub committee_exchange: Arc>, @@ -97,15 +112,14 @@ where ) -> Option { match event { SequencingHotShotEvent::TransactionsRecv(transactions) => { - let mut consensus = self.consensus.write().await; - consensus - .get_transactions() + let consensus = self.consensus.read().await; + self.transactions .modify(|txns| { for transaction in transactions { let size = bincode_opts().serialized_size(&transaction).unwrap_or(0); // If we didn't already know about this transaction, update our mempool metrics. - if !consensus.seen_transactions.remove(&transaction.commit()) + if !self.seen_transactions.remove(&transaction.commit()) && txns.insert(transaction.commit(), transaction).is_none() { consensus.metrics.outstanding_transactions.update(1); @@ -138,17 +152,16 @@ where Right(_) => {} } } - let mut consensus = self.consensus.write().await; - let txns = consensus.transactions.cloned().await; + let consensus = self.consensus.read().await; + let txns = self.transactions.cloned().await; let _ = included_txns.iter().map(|hash| { if !txns.contains_key(hash) { - consensus.seen_transactions.insert(*hash); + self.seen_transactions.insert(*hash); } }); drop(txns); - consensus - .transactions + self.transactions .modify(|txns| { *txns = txns .drain() @@ -255,12 +268,10 @@ where Either::Right(_commitment) => HashSet::new(), }; - let consensus = self.consensus.read().await; - - let receiver = consensus.transactions.subscribe().await; + let receiver = self.transactions.subscribe().await; loop { - let all_txns = consensus.transactions.cloned().await; + let all_txns = self.transactions.cloned().await; debug!("Size of transactions: {}", all_txns.len()); let unclaimed_txns: Vec<_> = all_txns .iter() @@ -290,7 +301,7 @@ where } break; } - let all_txns = consensus.transactions.cloned().await; + let all_txns = self.transactions.cloned().await; let txns: Vec = all_txns .iter() .filter_map(|(txn_hash, txn)| { diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index a29c840056..ce27d7a547 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -2,8 +2,6 @@ pub use crate::traits::node_implementation::ViewQueue; pub use crate::utils::{View, ViewInner}; -use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock; -use std::collections::HashSet; use crate::utils::Terminator; use crate::{ @@ -41,12 +39,6 @@ pub struct Consensus> { /// last view had a successful decide event pub last_decided_view: TYPES::Time, - /// A list of undecided transactions - pub transactions: Arc>>, - - /// A list of transactions we've seen decided, but didn't receive - pub seen_transactions: HashSet>, - /// Map of leaf hash -> leaf /// - contains undecided leaves /// - includes the MOST RECENT decided leaf @@ -264,12 +256,6 @@ impl> Consensus { self.state_map = self.state_map.split_off(&new_anchor_view); } - /// return a clone of the internal storage of unclaimed transactions - #[must_use] - pub fn get_transactions(&self) -> Arc>> { - self.transactions.clone() - } - /// Gets the last decided state /// # Panics /// if the last decided view's state does not exist in the state map From fb690fb9e36b8453d5a75370849defc30725fb71 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 20 Sep 2023 12:44:33 -0400 Subject: [PATCH 2/3] linting --- crates/hotshot/src/lib.rs | 6 +++--- crates/hotshot/src/tasks/mod.rs | 7 ++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index e543f22901..0f317c9410 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -42,12 +42,12 @@ use crate::{ }; use async_compatibility_layer::{ art::{async_spawn, async_spawn_local}, - async_primitives::{broadcast::BroadcastSender, subscribable_rwlock::SubscribableRwLock}, + async_primitives::broadcast::BroadcastSender, channel::UnboundedSender, }; use async_lock::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; use async_trait::async_trait; -use commit::{Commitment, Committable}; +use commit::Committable; use custom_debug::Debug; use hotshot_task::{ event_stream::{ChannelStream, EventStream}, @@ -83,7 +83,7 @@ use hotshot_types::{ }; use snafu::ResultExt; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap}, marker::PhantomData, num::NonZeroUsize, sync::Arc, diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index a2e68528f4..82ea383353 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -43,7 +43,12 @@ use hotshot_types::{ vote::{ViewSyncData, VoteType}, }; use serde::Serialize; -use std::{collections::{HashMap, HashSet}, marker::PhantomData, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + marker::PhantomData, + sync::Arc, + time::Duration, +}; /// event for global event stream #[derive(Clone, Debug)] From 6d8f64d1b349e21aa4d627e9c3af46c306b6f333 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Wed, 20 Sep 2023 15:10:51 -0400 Subject: [PATCH 3/3] allow more failures in web catchup test --- crates/testing/tests/catchup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/testing/tests/catchup.rs b/crates/testing/tests/catchup.rs index 9c6bd0bb38..01610f38f2 100644 --- a/crates/testing/tests/catchup.rs +++ b/crates/testing/tests/catchup.rs @@ -164,7 +164,7 @@ async fn test_catchup_one_node() { ..Default::default() }; // only alow for the view which the catchup node hasn't started to fail - metadata.overall_safety_properties.num_failed_views = 1; + metadata.overall_safety_properties.num_failed_views = 5; metadata .gen_launcher::()