Skip to content

Commit

Permalink
Merge pull request #1786 from EspressoSystems/bf/remove-consensus-txns
Browse files Browse the repository at this point in the history
Remove `transactions` from `Consensus` struct
  • Loading branch information
bfish713 authored Sep 21, 2023
2 parents 6f85c10 + 6d8f64d commit 32d9577
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 55 deletions.
27 changes: 3 additions & 24 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ use crate::{
};
use async_compatibility_layer::{
art::{async_spawn, async_spawn_local},
async_primitives::{broadcast::BroadcastSender, subscribable_rwlock::SubscribableRwLock},
async_primitives::broadcast::BroadcastSender,
channel::UnboundedSender,
};
use async_lock::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
use async_trait::async_trait;
use commit::{Commitment, Committable};
use commit::Committable;
use custom_debug::Debug;
use hotshot_task::{
event_stream::{ChannelStream, EventStream},
Expand Down Expand Up @@ -83,7 +83,7 @@ use hotshot_types::{
};
use snafu::ResultExt;
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap},
marker::PhantomData,
num::NonZeroUsize,
sync::Arc,
Expand Down Expand Up @@ -132,11 +132,6 @@ pub struct SystemContextInner<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// a reference to the metrics that the implementor is using.
_metrics: Box<dyn Metrics>,

/// Transactions
/// (this is shared btwn hotshot and `Consensus`)
transactions:
Arc<SubscribableRwLock<HashMap<Commitment<TYPES::Transaction>, TYPES::Transaction>>>,

/// The hotstuff implementation
consensus: Arc<RwLock<Consensus<TYPES, I::Leaf>>>,

Expand Down Expand Up @@ -218,8 +213,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
state_map,
cur_view: start_view,
last_decided_view: anchored_leaf.get_view_number(),
transactions: Arc::default(),
seen_transactions: HashSet::new(),
saved_leaves,
saved_blocks,
// TODO this is incorrect
Expand All @@ -230,17 +223,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
invalid_qc: 0,
};
let consensus = Arc::new(RwLock::new(consensus));
let txns = consensus.read().await.get_transactions();

let inner: Arc<SystemContextInner<TYPES, I>> = Arc::new(SystemContextInner {
id: nonce,
channel_maps: I::new_channel_maps(start_view),
consensus,
transactions: txns,
public_key,
private_key,
config,
// networking,
storage,
exchanges: Arc::new(exchanges),
event_sender: RwLock::default(),
Expand Down Expand Up @@ -545,11 +535,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
/// [`HotShot`] implementations that depend on [`TYPES::ConsensusType`].
#[async_trait]
pub trait HotShotType<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// Get the [`transactions`] field of [`HotShot`].
fn transactions(
&self,
) -> &Arc<SubscribableRwLock<HashMap<Commitment<TYPES::Transaction>, TYPES::Transaction>>>;

/// Get the [`hotstuff`] field of [`HotShot`].
fn consensus(&self) -> &Arc<RwLock<Consensus<TYPES, I::Leaf>>>;

Expand Down Expand Up @@ -683,12 +668,6 @@ where
Membership = MEMBERSHIP,
> + 'static,
{
fn transactions(
&self,
) -> &Arc<SubscribableRwLock<HashMap<Commitment<TYPES::Transaction>, TYPES::Transaction>>> {
&self.inner.transactions
}

fn consensus(&self) -> &Arc<RwLock<Consensus<TYPES, I::Leaf>>> {
&self.inner.consensus
}
Expand Down
9 changes: 8 additions & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ use hotshot_types::{
vote::{ViewSyncData, VoteType},
};
use serde::Serialize;
use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
marker::PhantomData,
sync::Arc,
time::Duration,
};

/// event for global event stream
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -441,6 +446,8 @@ where
registry: registry.clone(),
api: c_api.clone(),
consensus: handle.hotshot.get_consensus(),
transactions: Arc::default(),
seen_transactions: HashSet::new(),
cur_view: TYPES::Time::new(0),
committee_exchange: committee_exchange.into(),
event_stream: event_stream.clone(),
Expand Down
41 changes: 26 additions & 15 deletions crates/task-impls/src/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::events::SequencingHotShotEvent;
use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock;
use async_compatibility_layer::{
art::async_timeout, async_primitives::subscribable_rwlock::ReadView,
};
use async_lock::RwLock;
use bincode::config::Options;
use commit::Commitment;
use commit::Committable;
use either::{Either, Left, Right};
use hotshot_task::{
Expand All @@ -26,9 +28,16 @@ use hotshot_types::{
};
use hotshot_utils::bincode::bincode_opts;
use snafu::Snafu;
use std::{collections::HashSet, sync::Arc, time::Instant};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use tracing::{debug, error, instrument, warn};

/// A type alias for `HashMap<Commitment<T>, T>`
type CommitmentMap<T> = HashMap<Commitment<T>, T>;

#[derive(Snafu, Debug)]
/// Error type for consensus tasks
pub struct ConsensusTaskError {}
Expand Down Expand Up @@ -61,6 +70,12 @@ pub struct TransactionTaskState<
/// Reference to consensus. Leader will require a read lock on this.
pub consensus: Arc<RwLock<Consensus<TYPES, SequencingLeaf<TYPES>>>>,

/// A list of undecided transactions
pub transactions: Arc<SubscribableRwLock<CommitmentMap<TYPES::Transaction>>>,

/// A list of transactions we've seen decided, but didn't receive
pub seen_transactions: HashSet<Commitment<TYPES::Transaction>>,

/// the committee exchange
pub committee_exchange: Arc<CommitteeEx<TYPES, I>>,

Expand Down Expand Up @@ -97,15 +112,14 @@ where
) -> Option<HotShotTaskCompleted> {
match event {
SequencingHotShotEvent::TransactionsRecv(transactions) => {
let mut consensus = self.consensus.write().await;
consensus
.get_transactions()
let consensus = self.consensus.read().await;
self.transactions
.modify(|txns| {
for transaction in transactions {
let size = bincode_opts().serialized_size(&transaction).unwrap_or(0);

// If we didn't already know about this transaction, update our mempool metrics.
if !consensus.seen_transactions.remove(&transaction.commit())
if !self.seen_transactions.remove(&transaction.commit())
&& txns.insert(transaction.commit(), transaction).is_none()
{
consensus.metrics.outstanding_transactions.update(1);
Expand Down Expand Up @@ -138,17 +152,16 @@ where
Right(_) => {}
}
}
let mut consensus = self.consensus.write().await;
let txns = consensus.transactions.cloned().await;
let consensus = self.consensus.read().await;
let txns = self.transactions.cloned().await;

let _ = included_txns.iter().map(|hash| {
if !txns.contains_key(hash) {
consensus.seen_transactions.insert(*hash);
self.seen_transactions.insert(*hash);
}
});
drop(txns);
consensus
.transactions
self.transactions
.modify(|txns| {
*txns = txns
.drain()
Expand Down Expand Up @@ -255,12 +268,10 @@ where
Either::Right(_commitment) => HashSet::new(),
};

let consensus = self.consensus.read().await;

let receiver = consensus.transactions.subscribe().await;
let receiver = self.transactions.subscribe().await;

loop {
let all_txns = consensus.transactions.cloned().await;
let all_txns = self.transactions.cloned().await;
debug!("Size of transactions: {}", all_txns.len());
let unclaimed_txns: Vec<_> = all_txns
.iter()
Expand Down Expand Up @@ -290,7 +301,7 @@ where
}
break;
}
let all_txns = consensus.transactions.cloned().await;
let all_txns = self.transactions.cloned().await;
let txns: Vec<TYPES::Transaction> = all_txns
.iter()
.filter_map(|(txn_hash, txn)| {
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn test_catchup_one_node() {
..Default::default()
};
// only alow for the view which the catchup node hasn't started to fail
metadata.overall_safety_properties.num_failed_views = 1;
metadata.overall_safety_properties.num_failed_views = 5;

metadata
.gen_launcher::<SequencingTestTypes, SequencingMemoryImpl>()
Expand Down
14 changes: 0 additions & 14 deletions crates/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
pub use crate::traits::node_implementation::ViewQueue;
pub use crate::utils::{View, ViewInner};
use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock;
use std::collections::HashSet;

use crate::utils::Terminator;
use crate::{
Expand Down Expand Up @@ -41,12 +39,6 @@ pub struct Consensus<TYPES: NodeType, LEAF: LeafType<NodeType = TYPES>> {
/// last view had a successful decide event
pub last_decided_view: TYPES::Time,

/// A list of undecided transactions
pub transactions: Arc<SubscribableRwLock<CommitmentMap<TYPES::Transaction>>>,

/// A list of transactions we've seen decided, but didn't receive
pub seen_transactions: HashSet<Commitment<TYPES::Transaction>>,

/// Map of leaf hash -> leaf
/// - contains undecided leaves
/// - includes the MOST RECENT decided leaf
Expand Down Expand Up @@ -264,12 +256,6 @@ impl<TYPES: NodeType, LEAF: LeafType<NodeType = TYPES>> Consensus<TYPES, LEAF> {
self.state_map = self.state_map.split_off(&new_anchor_view);
}

/// return a clone of the internal storage of unclaimed transactions
#[must_use]
pub fn get_transactions(&self) -> Arc<SubscribableRwLock<CommitmentMap<TYPES::Transaction>>> {
self.transactions.clone()
}

/// Gets the last decided state
/// # Panics
/// if the last decided view's state does not exist in the state map
Expand Down

0 comments on commit 32d9577

Please sign in to comment.