Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove transactions from Consensus struct #1786

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 3 additions & 24 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ use crate::{
};
use async_compatibility_layer::{
art::{async_spawn, async_spawn_local},
async_primitives::{broadcast::BroadcastSender, subscribable_rwlock::SubscribableRwLock},
async_primitives::broadcast::BroadcastSender,
channel::UnboundedSender,
};
use async_lock::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
use async_trait::async_trait;
use commit::{Commitment, Committable};
use commit::Committable;
use custom_debug::Debug;
use hotshot_task::{
event_stream::{ChannelStream, EventStream},
Expand Down Expand Up @@ -83,7 +83,7 @@ use hotshot_types::{
};
use snafu::ResultExt;
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap},
marker::PhantomData,
num::NonZeroUsize,
sync::Arc,
Expand Down Expand Up @@ -132,11 +132,6 @@ pub struct SystemContextInner<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// a reference to the metrics that the implementor is using.
_metrics: Box<dyn Metrics>,

/// Transactions
/// (this is shared btwn hotshot and `Consensus`)
transactions:
Arc<SubscribableRwLock<HashMap<Commitment<TYPES::Transaction>, TYPES::Transaction>>>,

/// The hotstuff implementation
consensus: Arc<RwLock<Consensus<TYPES, I::Leaf>>>,

Expand Down Expand Up @@ -218,8 +213,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
state_map,
cur_view: start_view,
last_decided_view: anchored_leaf.get_view_number(),
transactions: Arc::default(),
seen_transactions: HashSet::new(),
saved_leaves,
saved_blocks,
// TODO this is incorrect
Expand All @@ -230,17 +223,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
invalid_qc: 0,
};
let consensus = Arc::new(RwLock::new(consensus));
let txns = consensus.read().await.get_transactions();

let inner: Arc<SystemContextInner<TYPES, I>> = Arc::new(SystemContextInner {
id: nonce,
channel_maps: I::new_channel_maps(start_view),
consensus,
transactions: txns,
public_key,
private_key,
config,
// networking,
storage,
exchanges: Arc::new(exchanges),
event_sender: RwLock::default(),
Expand Down Expand Up @@ -545,11 +535,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
/// [`HotShot`] implementations that depend on [`TYPES::ConsensusType`].
#[async_trait]
pub trait HotShotType<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// Get the [`transactions`] field of [`HotShot`].
fn transactions(
&self,
) -> &Arc<SubscribableRwLock<HashMap<Commitment<TYPES::Transaction>, TYPES::Transaction>>>;

/// Get the [`hotstuff`] field of [`HotShot`].
fn consensus(&self) -> &Arc<RwLock<Consensus<TYPES, I::Leaf>>>;

Expand Down Expand Up @@ -683,12 +668,6 @@ where
Membership = MEMBERSHIP,
> + 'static,
{
fn transactions(
&self,
) -> &Arc<SubscribableRwLock<HashMap<Commitment<TYPES::Transaction>, TYPES::Transaction>>> {
&self.inner.transactions
}

fn consensus(&self) -> &Arc<RwLock<Consensus<TYPES, I::Leaf>>> {
&self.inner.consensus
}
Expand Down
9 changes: 8 additions & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ use hotshot_types::{
vote::{ViewSyncData, VoteType},
};
use serde::Serialize;
use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
marker::PhantomData,
sync::Arc,
time::Duration,
};

/// event for global event stream
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -441,6 +446,8 @@ where
registry: registry.clone(),
api: c_api.clone(),
consensus: handle.hotshot.get_consensus(),
transactions: Arc::default(),
seen_transactions: HashSet::new(),
cur_view: TYPES::Time::new(0),
committee_exchange: committee_exchange.into(),
event_stream: event_stream.clone(),
Expand Down
41 changes: 26 additions & 15 deletions crates/task-impls/src/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::events::SequencingHotShotEvent;
use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock;
use async_compatibility_layer::{
art::async_timeout, async_primitives::subscribable_rwlock::ReadView,
};
use async_lock::RwLock;
use bincode::config::Options;
use commit::Commitment;
use commit::Committable;
use either::{Either, Left, Right};
use hotshot_task::{
Expand All @@ -26,9 +28,16 @@ use hotshot_types::{
};
use hotshot_utils::bincode::bincode_opts;
use snafu::Snafu;
use std::{collections::HashSet, sync::Arc, time::Instant};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use tracing::{debug, error, instrument, warn};

/// A type alias for `HashMap<Commitment<T>, T>`
type CommitmentMap<T> = HashMap<Commitment<T>, T>;

#[derive(Snafu, Debug)]
/// Error type for consensus tasks
pub struct ConsensusTaskError {}
Expand Down Expand Up @@ -61,6 +70,12 @@ pub struct TransactionTaskState<
/// Reference to consensus. Leader will require a read lock on this.
pub consensus: Arc<RwLock<Consensus<TYPES, SequencingLeaf<TYPES>>>>,

/// A list of undecided transactions
pub transactions: Arc<SubscribableRwLock<CommitmentMap<TYPES::Transaction>>>,

/// A list of transactions we've seen decided, but didn't receive
pub seen_transactions: HashSet<Commitment<TYPES::Transaction>>,

/// the committee exchange
pub committee_exchange: Arc<CommitteeEx<TYPES, I>>,

Expand Down Expand Up @@ -97,15 +112,14 @@ where
) -> Option<HotShotTaskCompleted> {
match event {
SequencingHotShotEvent::TransactionsRecv(transactions) => {
let mut consensus = self.consensus.write().await;
consensus
.get_transactions()
let consensus = self.consensus.read().await;
self.transactions
.modify(|txns| {
for transaction in transactions {
let size = bincode_opts().serialized_size(&transaction).unwrap_or(0);

// If we didn't already know about this transaction, update our mempool metrics.
if !consensus.seen_transactions.remove(&transaction.commit())
if !self.seen_transactions.remove(&transaction.commit())
&& txns.insert(transaction.commit(), transaction).is_none()
{
consensus.metrics.outstanding_transactions.update(1);
Expand Down Expand Up @@ -138,17 +152,16 @@ where
Right(_) => {}
}
}
let mut consensus = self.consensus.write().await;
let txns = consensus.transactions.cloned().await;
let consensus = self.consensus.read().await;
let txns = self.transactions.cloned().await;

let _ = included_txns.iter().map(|hash| {
if !txns.contains_key(hash) {
consensus.seen_transactions.insert(*hash);
self.seen_transactions.insert(*hash);
}
});
drop(txns);
consensus
.transactions
self.transactions
.modify(|txns| {
*txns = txns
.drain()
Expand Down Expand Up @@ -255,12 +268,10 @@ where
Either::Right(_commitment) => HashSet::new(),
};

let consensus = self.consensus.read().await;

let receiver = consensus.transactions.subscribe().await;
let receiver = self.transactions.subscribe().await;

loop {
let all_txns = consensus.transactions.cloned().await;
let all_txns = self.transactions.cloned().await;
debug!("Size of transactions: {}", all_txns.len());
let unclaimed_txns: Vec<_> = all_txns
.iter()
Expand Down Expand Up @@ -290,7 +301,7 @@ where
}
break;
}
let all_txns = consensus.transactions.cloned().await;
let all_txns = self.transactions.cloned().await;
let txns: Vec<TYPES::Transaction> = all_txns
.iter()
.filter_map(|(txn_hash, txn)| {
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn test_catchup_one_node() {
..Default::default()
};
// only alow for the view which the catchup node hasn't started to fail
metadata.overall_safety_properties.num_failed_views = 1;
metadata.overall_safety_properties.num_failed_views = 5;

metadata
.gen_launcher::<SequencingTestTypes, SequencingMemoryImpl>()
Expand Down
14 changes: 0 additions & 14 deletions crates/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

pub use crate::traits::node_implementation::ViewQueue;
pub use crate::utils::{View, ViewInner};
use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock;
use std::collections::HashSet;

use crate::utils::Terminator;
use crate::{
Expand Down Expand Up @@ -41,12 +39,6 @@ pub struct Consensus<TYPES: NodeType, LEAF: LeafType<NodeType = TYPES>> {
/// last view had a successful decide event
pub last_decided_view: TYPES::Time,

/// A list of undecided transactions
pub transactions: Arc<SubscribableRwLock<CommitmentMap<TYPES::Transaction>>>,

/// A list of transactions we've seen decided, but didn't receive
pub seen_transactions: HashSet<Commitment<TYPES::Transaction>>,

/// Map of leaf hash -> leaf
/// - contains undecided leaves
/// - includes the MOST RECENT decided leaf
Expand Down Expand Up @@ -264,12 +256,6 @@ impl<TYPES: NodeType, LEAF: LeafType<NodeType = TYPES>> Consensus<TYPES, LEAF> {
self.state_map = self.state_map.split_off(&new_anchor_view);
}

/// return a clone of the internal storage of unclaimed transactions
#[must_use]
pub fn get_transactions(&self) -> Arc<SubscribableRwLock<CommitmentMap<TYPES::Transaction>>> {
self.transactions.clone()
}

/// Gets the last decided state
/// # Panics
/// if the last decided view's state does not exist in the state map
Expand Down