diff --git a/crates/constants/src/lib.rs b/crates/constants/src/lib.rs index 8a4d9d5d46..0b1b769650 100644 --- a/crates/constants/src/lib.rs +++ b/crates/constants/src/lib.rs @@ -8,3 +8,12 @@ pub const LOOK_AHEAD: u64 = 5; /// the default kademlia record republication interval (in seconds) pub const KAD_DEFAULT_REPUB_INTERVAL_SEC: u64 = 28800; + +/// the number of messages to cache in the combined network +pub const COMBINED_NETWORK_CACHE_SIZE: usize = 1000; + +/// the number of messages to attempt to send over the primary network before switching to prefer the secondary network +pub const COMBINED_NETWORK_MIN_PRIMARY_FAILURES: u64 = 10; + +/// the number of messages to send over the secondary network before re-attempting the (presumed down) primary network +pub const COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL: u64 = 10; diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index a0e56cf86d..8de2e3fc40 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -13,9 +13,9 @@ pub use storage::{Result as StorageResult, Storage}; pub mod implementations { pub use super::{ networking::{ + combined_network::{CombinedCommChannel, CombinedNetworks}, libp2p_network::{Libp2pCommChannel, Libp2pNetwork, PeerInfoVec}, memory_network::{DummyReliability, MasterMap, MemoryCommChannel, MemoryNetwork}, - web_server_libp2p_fallback::{CombinedNetworks, WebServerWithFallbackCommChannel}, web_server_network::{WebCommChannel, WebServerNetwork}, }, storage::memory_storage::MemoryStorage, // atomic_storage::AtomicStorage, diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index e48e71d29b..357e7a4da3 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -5,9 +5,9 @@ //! - [`MemoryNetwork`](memory_network::MemoryNetwork), an in memory testing-only implementation //! - [`Libp2pNetwork`](libp2p_network::Libp2pNetwork), a production-ready networking impelmentation built on top of libp2p-rs. +pub mod combined_network; pub mod libp2p_network; pub mod memory_network; -pub mod web_server_libp2p_fallback; pub mod web_server_network; pub use hotshot_types::traits::network::{ diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs new file mode 100644 index 0000000000..63fdb6df88 --- /dev/null +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -0,0 +1,475 @@ +//! Networking Implementation that has a primary and a fallback newtork. If the primary +//! Errors we will use the backup to send or receive +use super::NetworkError; +use crate::{ + traits::implementations::{Libp2pNetwork, WebServerNetwork}, + NodeImplementation, +}; +use async_lock::RwLock; +use hotshot_constants::{ + COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_MIN_PRIMARY_FAILURES, + COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL, +}; +use std::{ + collections::HashSet, + hash::Hasher, + sync::atomic::{AtomicU64, Ordering}, +}; +use tracing::error; + +use async_trait::async_trait; + +use futures::join; + +use async_compatibility_layer::channel::UnboundedSendError; +use hotshot_task::{boxed_sync, BoxSyncFuture}; +use hotshot_types::{ + data::ViewNumber, + message::Message, + traits::{ + election::Membership, + network::{ + CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, + TestableChannelImplementation, TestableNetworkingImplementation, TransmitType, + ViewMessage, + }, + node_implementation::NodeType, + }, +}; +use std::{collections::hash_map::DefaultHasher, marker::PhantomData, sync::Arc}; + +use std::hash::Hash; + +/// A cache to keep track of the last n messages we've seen, avoids reprocessing duplicates +/// from multiple networks +#[derive(Clone, Debug)] +struct Cache { + /// The maximum number of items to store in the cache + capacity: usize, + /// The cache itself + cache: HashSet, + /// The hashes of the messages in the cache, in order of insertion + hashes: Vec, +} + +impl Cache { + /// Create a new cache with the given capacity + fn new(capacity: usize) -> Self { + Self { + capacity, + cache: HashSet::with_capacity(capacity), + hashes: Vec::with_capacity(capacity), + } + } + + /// Insert a hash into the cache + fn insert(&mut self, hash: u64) { + if self.cache.contains(&hash) { + return; + } + + // calculate how much we are over and remove that many elements from the cache. deal with overflow + let over = (self.hashes.len() + 1).saturating_sub(self.capacity); + if over > 0 { + for _ in 0..over { + let hash = self.hashes.remove(0); + self.cache.remove(&hash); + } + } + + self.cache.insert(hash); + self.hashes.push(hash); + } + + /// Check if the cache contains a hash + fn contains(&self, hash: u64) -> bool { + self.cache.contains(&hash) + } + + /// Get the number of items in the cache + #[cfg(test)] + fn len(&self) -> usize { + self.cache.len() + } +} + +/// Helper function to calculate a hash of a type that implements Hash +fn calculate_hash_of(t: &T) -> u64 { + let mut s = DefaultHasher::new(); + t.hash(&mut s); + s.finish() +} + +/// A communication channel with 2 networks, where we can fall back to the slower network if the +/// primary fails +#[derive(Clone, Debug)] +pub struct CombinedCommChannel< + TYPES: NodeType, + I: NodeImplementation, + MEMBERSHIP: Membership, +> { + /// The two networks we'll use for send/recv + networks: Arc>, + + /// Last n seen messages to prevent processing duplicates + message_cache: Arc>, + + /// If the primary network is down (0) or not, and for how many messages + primary_down: Arc, +} + +impl, MEMBERSHIP: Membership> + CombinedCommChannel +{ + /// Constructor + #[must_use] + pub fn new(networks: Arc>) -> Self { + Self { + networks, + message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), + primary_down: Arc::new(AtomicU64::new(0)), + } + } + + /// Get a ref to the primary network + #[must_use] + pub fn primary(&self) -> &WebServerNetwork, TYPES::SignatureKey, TYPES> { + &self.networks.0 + } + + /// Get a ref to the backup network + #[must_use] + pub fn secondary(&self) -> &Libp2pNetwork, TYPES::SignatureKey> { + &self.networks.1 + } +} + +/// Wrapper for the tuple of `WebServerNetwork` and `Libp2pNetwork` +/// We need this so we can impl `TestableNetworkingImplementation` +/// on the tuple +#[derive(Debug, Clone)] +pub struct CombinedNetworks< + TYPES: NodeType, + I: NodeImplementation, + MEMBERSHIP: Membership, +>( + pub WebServerNetwork, TYPES::SignatureKey, TYPES>, + pub Libp2pNetwork, TYPES::SignatureKey>, + pub PhantomData, +); + +impl, MEMBERSHIP: Membership> + TestableNetworkingImplementation> + for CombinedNetworks +{ + fn generator( + expected_node_count: usize, + num_bootstrap: usize, + network_id: usize, + da_committee_size: usize, + is_da: bool, + ) -> Box Self + 'static> { + let generators = ( + , + TYPES::SignatureKey, + TYPES, + > as TestableNetworkingImplementation<_, _>>::generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + is_da + ), + , TYPES::SignatureKey> as TestableNetworkingImplementation<_, _>>::generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + is_da + ) + ); + Box::new(move |node_id| { + CombinedNetworks(generators.0(node_id), generators.1(node_id), PhantomData) + }) + } + + /// Get the number of messages in-flight. + /// + /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. + fn in_flight_message_count(&self) -> Option { + None + } +} + +impl, MEMBERSHIP: Membership> + TestableNetworkingImplementation> + for CombinedCommChannel +{ + fn generator( + expected_node_count: usize, + num_bootstrap: usize, + network_id: usize, + da_committee_size: usize, + is_da: bool, + ) -> Box Self + 'static> { + let generator = as TestableNetworkingImplementation<_, _>>::generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + is_da + ); + Box::new(move |node_id| Self { + networks: generator(node_id).into(), + message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), + primary_down: Arc::new(AtomicU64::new(0)), + }) + } + + /// Get the number of messages in-flight. + /// + /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. + fn in_flight_message_count(&self) -> Option { + None + } +} + +#[async_trait] +impl, MEMBERSHIP: Membership> + CommunicationChannel, MEMBERSHIP> + for CombinedCommChannel +{ + type NETWORK = CombinedNetworks; + + async fn wait_for_ready(&self) { + join!( + self.primary().wait_for_ready(), + self.secondary().wait_for_ready() + ); + } + + async fn is_ready(&self) -> bool { + self.primary().is_ready().await && self.secondary().is_ready().await + } + + fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> + where + 'a: 'b, + Self: 'b, + { + let closure = async move { + join!(self.primary().shut_down(), self.secondary().shut_down()); + }; + boxed_sync(closure) + } + + async fn broadcast_message( + &self, + message: Message, + election: &MEMBERSHIP, + ) -> Result<(), NetworkError> { + let recipients = + >::get_committee(election, message.get_view_number()); + + // broadcast optimistically on both networks, but if the primary network is down, skip it + if self.primary_down.load(Ordering::Relaxed) < COMBINED_NETWORK_MIN_PRIMARY_FAILURES + || self.primary_down.load(Ordering::Relaxed) % COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL + == 0 + { + // broadcast on the primary network as it is not down, or we are checking if it is back up + match self + .primary() + .broadcast_message(message.clone(), recipients.clone()) + .await + { + Ok(_) => { + self.primary_down.store(0, Ordering::Relaxed); + } + Err(e) => { + error!("Error on primary network: {}", e); + self.primary_down.fetch_add(1, Ordering::Relaxed); + } + }; + } + + self.secondary() + .broadcast_message(message, recipients) + .await + } + + async fn direct_message( + &self, + message: Message, + recipient: TYPES::SignatureKey, + ) -> Result<(), NetworkError> { + // DM optimistically on both networks, but if the primary network is down, skip it + if self.primary_down.load(Ordering::Relaxed) < COMBINED_NETWORK_MIN_PRIMARY_FAILURES + || self.primary_down.load(Ordering::Relaxed) % COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL + == 0 + { + // message on the primary network as it is not down, or we are checking if it is back up + match self + .primary() + .direct_message(message.clone(), recipient.clone()) + .await + { + Ok(_) => { + self.primary_down.store(0, Ordering::Relaxed); + } + Err(e) => { + error!("Error on primary network: {}", e); + self.primary_down.fetch_add(1, Ordering::Relaxed); + } + }; + } + + self.secondary().direct_message(message, recipient).await + } + + fn recv_msgs<'a, 'b>( + &'a self, + transmit_type: TransmitType, + ) -> BoxSyncFuture<'b, Result>, NetworkError>> + where + 'a: 'b, + Self: 'b, + { + // recv on both networks because nodes may be accessible only on either. discard duplicates + let closure = async move { + let mut primary_msgs = self.primary().recv_msgs(transmit_type).await?; + let mut secondary_msgs = self.secondary().recv_msgs(transmit_type).await?; + + primary_msgs.append(secondary_msgs.as_mut()); + + let mut filtered_msgs = Vec::with_capacity(primary_msgs.len()); + for msg in primary_msgs { + if !self + .message_cache + .read() + .await + .contains(calculate_hash_of(&msg)) + { + filtered_msgs.push(msg.clone()); + self.message_cache + .write() + .await + .insert(calculate_hash_of(&msg)); + } + } + + Ok(filtered_msgs) + }; + + boxed_sync(closure) + } + + async fn queue_node_lookup( + &self, + view_number: ViewNumber, + pk: TYPES::SignatureKey, + ) -> Result<(), UnboundedSendError>> { + self.primary() + .queue_node_lookup(view_number, pk.clone()) + .await?; + self.secondary().queue_node_lookup(view_number, pk).await + } + + async fn inject_consensus_info(&self, event: ConsensusIntentEvent) { + as ConnectedNetwork,TYPES::SignatureKey>>:: + inject_consensus_info(self.primary(), event.clone()).await; + + as ConnectedNetwork,TYPES::SignatureKey>>:: + inject_consensus_info(self.secondary(), event).await; + } +} + +impl, MEMBERSHIP: Membership> + TestableChannelImplementation< + TYPES, + Message, + MEMBERSHIP, + CombinedNetworks, + > for CombinedCommChannel +{ + fn generate_network() -> Box) -> Self + 'static> { + Box::new(move |network| CombinedCommChannel::new(network)) + } +} + +#[cfg(test)] +mod test { + use hotshot_types::block_impl::VIDTransaction; + + use super::*; + use tracing::instrument; + + /// cache eviction test + #[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) + )] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + #[instrument] + async fn test_cache_eviction() { + let mut cache = Cache::new(3); + cache.insert(1); + cache.insert(2); + cache.insert(3); + cache.insert(4); + assert_eq!(cache.cache.len(), 3); + assert_eq!(cache.hashes.len(), 3); + assert!(!cache.cache.contains(&1)); + assert!(cache.cache.contains(&2)); + assert!(cache.cache.contains(&3)); + assert!(cache.cache.contains(&4)); + assert!(!cache.hashes.contains(&1)); + assert!(cache.hashes.contains(&2)); + assert!(cache.hashes.contains(&3)); + assert!(cache.hashes.contains(&4)); + } + + #[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) + )] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + #[instrument] + async fn test_hash_calculation() { + let message1 = VIDTransaction(vec![0; 32]); + let message2 = VIDTransaction(vec![1; 32]); + + assert_eq!(calculate_hash_of(&message1), calculate_hash_of(&message1)); + assert_ne!(calculate_hash_of(&message1), calculate_hash_of(&message2)); + } + + #[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) + )] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + #[instrument] + async fn test_cache_integrity() { + let message1 = VIDTransaction(vec![0; 32]); + let message2 = VIDTransaction(vec![1; 32]); + + let mut cache = Cache::new(3); + + // test insertion integrity + cache.insert(calculate_hash_of(&message1)); + cache.insert(calculate_hash_of(&message2)); + + assert!(cache.contains(calculate_hash_of(&message1))); + assert!(cache.contains(calculate_hash_of(&message2))); + + // check that the cache is not modified on duplicate entries + cache.insert(calculate_hash_of(&message1)); + assert!(cache.contains(calculate_hash_of(&message1))); + assert!(cache.contains(calculate_hash_of(&message2))); + assert_eq!(cache.len(), 2); + } +} diff --git a/crates/hotshot/src/traits/networking/web_server_libp2p_fallback.rs b/crates/hotshot/src/traits/networking/web_server_libp2p_fallback.rs deleted file mode 100644 index b3324681a5..0000000000 --- a/crates/hotshot/src/traits/networking/web_server_libp2p_fallback.rs +++ /dev/null @@ -1,300 +0,0 @@ -//! Networking Implementation that has a primary and a fallback newtork. If the primary -//! Errors we will use the backup to send or receive -use super::NetworkError; -use crate::{ - traits::implementations::{Libp2pNetwork, WebServerNetwork}, - NodeImplementation, -}; - -use async_trait::async_trait; - -use futures::join; - -use async_compatibility_layer::channel::UnboundedSendError; -use hotshot_task::{boxed_sync, BoxSyncFuture}; -use hotshot_types::{ - data::ViewNumber, - message::Message, - traits::{ - election::Membership, - network::{ - CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, - TestableChannelImplementation, TestableNetworkingImplementation, TransmitType, - ViewMessage, - }, - node_implementation::NodeType, - }, -}; -use std::{marker::PhantomData, sync::Arc}; -use tracing::error; -/// A communication channel with 2 networks, where we can fall back to the slower network if the -/// primary fails -#[derive(Clone, Debug)] -pub struct WebServerWithFallbackCommChannel< - TYPES: NodeType, - I: NodeImplementation, - MEMBERSHIP: Membership, -> { - /// The two networks we'll use for send/recv - networks: Arc>, -} - -impl, MEMBERSHIP: Membership> - WebServerWithFallbackCommChannel -{ - /// Constructor - #[must_use] - pub fn new(networks: Arc>) -> Self { - Self { networks } - } - - /// Get a ref to the primary network - #[must_use] - pub fn network(&self) -> &WebServerNetwork, TYPES::SignatureKey, TYPES> { - &self.networks.0 - } - - /// Get a ref to the backup network - #[must_use] - pub fn fallback(&self) -> &Libp2pNetwork, TYPES::SignatureKey> { - &self.networks.1 - } -} - -/// Wrapper for the tuple of `WebServerNetwork` and `Libp2pNetwork` -/// We need this so we can impl `TestableNetworkingImplementation` -/// on the tuple -#[derive(Debug, Clone)] -pub struct CombinedNetworks< - TYPES: NodeType, - I: NodeImplementation, - MEMBERSHIP: Membership, ->( - pub WebServerNetwork, TYPES::SignatureKey, TYPES>, - pub Libp2pNetwork, TYPES::SignatureKey>, - pub PhantomData, -); - -impl, MEMBERSHIP: Membership> - TestableNetworkingImplementation> - for CombinedNetworks -{ - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - ) -> Box Self + 'static> { - let generators = ( - , - TYPES::SignatureKey, - TYPES, - > as TestableNetworkingImplementation<_, _>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da - ), - , TYPES::SignatureKey> as TestableNetworkingImplementation<_, _>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da - ) - ); - Box::new(move |node_id| { - CombinedNetworks(generators.0(node_id), generators.1(node_id), PhantomData) - }) - } - - /// Get the number of messages in-flight. - /// - /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. - fn in_flight_message_count(&self) -> Option { - None - } -} - -impl, MEMBERSHIP: Membership> - TestableNetworkingImplementation> - for WebServerWithFallbackCommChannel -{ - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - ) -> Box Self + 'static> { - let generator = as TestableNetworkingImplementation<_, _>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da - ); - Box::new(move |node_id| Self { - networks: generator(node_id).into(), - }) - } - - /// Get the number of messages in-flight. - /// - /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. - fn in_flight_message_count(&self) -> Option { - None - } -} - -#[async_trait] -impl, MEMBERSHIP: Membership> - CommunicationChannel, MEMBERSHIP> - for WebServerWithFallbackCommChannel -{ - type NETWORK = CombinedNetworks; - - async fn wait_for_ready(&self) { - join!( - self.network().wait_for_ready(), - self.fallback().wait_for_ready() - ); - } - - async fn is_ready(&self) -> bool { - self.network().is_ready().await && self.fallback().is_ready().await - } - - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - join!(self.network().shut_down(), self.fallback().shut_down()); - }; - boxed_sync(closure) - } - - async fn broadcast_message( - &self, - message: Message, - election: &MEMBERSHIP, - ) -> Result<(), NetworkError> { - let recipients = - >::get_committee(election, message.get_view_number()); - let fallback = self - .fallback() - .broadcast_message(message.clone(), recipients.clone()); - let network = self.network().broadcast_message(message, recipients); - match join!(fallback, network) { - (Err(e1), Err(e2)) => { - error!( - "Both network broadcasts failed primary error: {}, fallback error: {}", - e1, e2 - ); - Err(e1) - } - (Err(e), _) => { - error!("Failed primary broadcast with error: {}", e); - Ok(()) - } - (_, Err(e)) => { - error!("Failed backup broadcast with error: {}", e); - Ok(()) - } - _ => Ok(()), - } - } - - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { - match self - .network() - .direct_message(message.clone(), recipient.clone()) - .await - { - Ok(_) => Ok(()), - Err(e) => { - error!( - "Falling back on direct message, error on primary network: {}", - e - ); - self.fallback().direct_message(message, recipient).await - } - } - } - - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - match self.network().recv_msgs(transmit_type).await { - Ok(msgs) => Ok(msgs), - Err(e) => { - error!( - "Falling back on recv message, error on primary network: {}", - e - ); - self.fallback().recv_msgs(transmit_type).await - } - } - }; - boxed_sync(closure) - } - - async fn queue_node_lookup( - &self, - view_number: ViewNumber, - pk: TYPES::SignatureKey, - ) -> Result<(), UnboundedSendError>> { - self.network() - .queue_node_lookup(view_number, pk.clone()) - .await?; - self.fallback().queue_node_lookup(view_number, pk).await?; - - Ok(()) - } - - async fn inject_consensus_info(&self, event: ConsensusIntentEvent) { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::inject_consensus_info(self.network(), event.clone()) - .await; - - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::inject_consensus_info(self.fallback(), event) - .await; - } -} - -impl, MEMBERSHIP: Membership> - TestableChannelImplementation< - TYPES, - Message, - MEMBERSHIP, - CombinedNetworks, - > for WebServerWithFallbackCommChannel -{ - fn generate_network() -> Box) -> Self + 'static> { - Box::new(move |network| WebServerWithFallbackCommChannel::new(network)) - } -} diff --git a/crates/testing/src/node_types.rs b/crates/testing/src/node_types.rs index 4ffd74e2bf..7e5308b35f 100644 --- a/crates/testing/src/node_types.rs +++ b/crates/testing/src/node_types.rs @@ -6,8 +6,8 @@ use hotshot::{ traits::{ election::static_committee::{StaticCommittee, StaticElectionConfig, StaticVoteToken}, implementations::{ - Libp2pCommChannel, Libp2pNetwork, MemoryCommChannel, MemoryNetwork, MemoryStorage, - WebCommChannel, WebServerNetwork, WebServerWithFallbackCommChannel, + CombinedCommChannel, Libp2pCommChannel, Libp2pNetwork, MemoryCommChannel, + MemoryNetwork, MemoryStorage, WebCommChannel, WebServerNetwork, }, NodeImplementation, }, @@ -60,7 +60,7 @@ pub struct SequencingLibp2pImpl; pub struct SequencingWebImpl; #[derive(Clone, Debug, Deserialize, Serialize, Hash, Eq, PartialEq)] -pub struct StaticFallbackImpl; +pub struct SequencingCombinedImpl; pub type StaticMembership = StaticCommittee>; @@ -73,8 +73,8 @@ type StaticLibp2pDAComm = type StaticWebDAComm = WebCommChannel; -type StaticFallbackComm = - WebServerWithFallbackCommChannel; +type StaticCombinedDAComm = + CombinedCommChannel; pub type StaticMemoryQuorumComm = MemoryCommChannel; @@ -84,6 +84,9 @@ type StaticLibp2pQuorumComm = type StaticWebQuorumComm = WebCommChannel; +type StaticCombinedQuorumComm = + CombinedCommChannel; + pub type StaticMemoryViewSyncComm = MemoryCommChannel; @@ -93,6 +96,9 @@ type StaticLibp2pViewSyncComm = type StaticWebViewSyncComm = WebCommChannel; +type StaticCombinedViewSyncComm = + CombinedCommChannel; + pub type SequencingLibp2pExchange = SequencingExchanges< SequencingTestTypes, Message, @@ -471,38 +477,57 @@ impl NodeImplementation for SequencingWebImpl { } } -pub type SequencingFallbackExchange = SequencingExchanges< +pub type SequencingCombinedExchange = SequencingExchanges< SequencingTestTypes, - Message, + Message, QuorumExchange< SequencingTestTypes, - >::Leaf, + >::Leaf, QuorumProposal>, StaticMembership, - StaticFallbackComm, - Message, + StaticCombinedQuorumComm, + Message, >, CommitteeExchange< SequencingTestTypes, StaticMembership, - StaticFallbackComm, - Message, + StaticCombinedDAComm, + Message, >, ViewSyncExchange< SequencingTestTypes, ViewSyncCertificate, StaticMembership, - StaticFallbackComm, - Message, + StaticCombinedViewSyncComm, + Message, >, >; +impl NodeImplementation for SequencingCombinedImpl { + type Storage = MemoryStorage>; + type Leaf = SequencingLeaf; + type Exchanges = SequencingCombinedExchange; + type ConsensusMessage = SequencingMessage; + + fn new_channel_maps( + start_view: ::Time, + ) -> ( + ChannelMaps, + Option>, + ) { + ( + ChannelMaps::new(start_view), + Some(ChannelMaps::new(start_view)), + ) + } +} + impl TestableExchange< SequencingTestTypes, - >::Leaf, - Message, - > for SequencingFallbackExchange + >::Leaf, + Message, + > for SequencingCombinedExchange { #[allow(clippy::arc_with_non_send_sync)] fn gen_comm_channels( @@ -515,52 +540,54 @@ impl ) -> ( , + Message, >>::Networking, , + Message, >>::Networking, , + Message, >>::Networking, ) + 'static, > { - let libp2p_generator = Arc::new(, + let web_server_network_generator = Arc::new(, ::SignatureKey, + _, > as TestableNetworkingImplementation< SequencingTestTypes, - Message, + Message, >>::generator( expected_node_count, num_bootstrap, 0, da_committee_size, - true, + false, )); - let ws_generator = Arc::new(, + + let web_server_network_da_generator = Arc::new(, ::SignatureKey, - _, + SequencingTestTypes, > as TestableNetworkingImplementation< SequencingTestTypes, - Message, + Message, >>::generator( expected_node_count, num_bootstrap, 1, da_committee_size, - false, + true, )); - let ws_da_generator = Arc::new(, + + let libp2p_network_generator = Arc::new(, ::SignatureKey, - SequencingTestTypes, > as TestableNetworkingImplementation< SequencingTestTypes, - Message, + Message, >>::generator( expected_node_count, num_bootstrap, @@ -569,53 +596,43 @@ impl true, )); + // libp2p Box::new(move |id| { - let libp2p_network = libp2p_generator(id); - let ws = ws_generator(id); - let ws_da = ws_da_generator(id); - - // TODO make a proper constructor - let network = Arc::new(CombinedNetworks(ws, libp2p_network.clone(), PhantomData)); - let network_da = Arc::new(CombinedNetworks(ws_da, libp2p_network, PhantomData)); + let web_server_network = web_server_network_generator(id); + let web_server_network_da = web_server_network_da_generator(id); + + let libp2p_network = libp2p_network_generator(id); + + let network = Arc::new(CombinedNetworks( + web_server_network, + libp2p_network.clone(), + PhantomData, + )); + let network_da = Arc::new(CombinedNetworks( + web_server_network_da, + libp2p_network, + PhantomData, + )); let quorum_chan = <, + Message, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( )(network.clone()); let committee_chan = <, + Message, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( )(network_da); let view_sync_chan = <, + Message, >>::Networking as TestableChannelImplementation<_, _, _, _>>::generate_network( )(network); (quorum_chan, committee_chan, view_sync_chan) }) } } - -impl NodeImplementation for StaticFallbackImpl { - type Storage = MemoryStorage>; - type Leaf = SequencingLeaf; - type Exchanges = SequencingFallbackExchange; - type ConsensusMessage = SequencingMessage; - - fn new_channel_maps( - start_view: ::Time, - ) -> ( - ChannelMaps, - Option>, - ) { - ( - ChannelMaps::new(start_view), - Some(ChannelMaps::new(start_view)), - ) - } -} diff --git a/crates/testing/tests/fallback_network.rs b/crates/testing/tests/combined_network.rs similarity index 67% rename from crates/testing/tests/fallback_network.rs rename to crates/testing/tests/combined_network.rs index 7a66e653c6..30a7f5e277 100644 --- a/crates/testing/tests/fallback_network.rs +++ b/crates/testing/tests/combined_network.rs @@ -2,9 +2,9 @@ use std::time::Duration; use hotshot_testing::{ completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, - node_types::{SequencingLibp2pImpl, SequencingTestTypes}, + node_types::{SequencingCombinedImpl, SequencingTestTypes}, overall_safety_task::OverallSafetyPropertiesDescription, - test_builder::TestMetadata, + test_builder::{TestMetadata, TimingData}, }; use tracing::instrument; @@ -15,24 +15,32 @@ use tracing::instrument; )] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] #[instrument] -async fn webserver_libp2p_network() { +async fn test_combined_network() { async_compatibility_layer::logging::setup_logging(); async_compatibility_layer::logging::setup_backtrace(); - let metadata = TestMetadata { + let metadata: TestMetadata = TestMetadata { + timing_data: TimingData { + round_start_delay: 25, + next_view_timeout: 10000, + start_delay: 120000, + + ..Default::default() + }, overall_safety_properties: OverallSafetyPropertiesDescription { - check_leaf: true, + num_successful_views: 35, ..Default::default() }, + // allow more time to pass in CI completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( TimeBasedCompletionTaskDescription { - duration: Duration::new(240, 0), + duration: Duration::from_millis(1_200_000), }, ), ..TestMetadata::default_multiple_rounds() }; metadata - .gen_launcher::() + .gen_launcher::() .launch() .run_test() .await @@ -46,12 +54,12 @@ async fn webserver_libp2p_network() { #[cfg_attr(async_executor_impl = "async-std", async_std::test)] #[instrument] #[ignore] -async fn test_stress_webserver_libp2p_network() { +async fn test_stress_combined_network() { async_compatibility_layer::logging::setup_logging(); async_compatibility_layer::logging::setup_backtrace(); let metadata = TestMetadata::default_stress(); metadata - .gen_launcher::() + .gen_launcher::() .launch() .run_test() .await diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index c26de8bcef..2823c788e1 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -22,15 +22,13 @@ use serde::{Deserialize, Serialize}; use std::{fmt::Debug, marker::PhantomData}; /// Incoming message -#[derive(Serialize, Deserialize, Clone, Debug, Derivative)] +#[derive(Serialize, Deserialize, Clone, Debug, Derivative, PartialEq, Eq, Hash)] #[serde(bound(deserialize = "", serialize = ""))] -#[derivative(PartialEq)] pub struct Message> { /// The sender of this message pub sender: TYPES::SignatureKey, /// The message kind - #[derivative(PartialEq = "ignore")] pub kind: MessageKind, /// Phantom data. @@ -83,7 +81,7 @@ pub enum MessagePurpose { // TODO (da) make it more customized to the consensus layer, maybe separating the specific message // data from the kind enum. /// Enum representation of any message type -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)] #[serde(bound(deserialize = "", serialize = ""))] pub enum MessageKind> { /// Messages related to the consensus protocol @@ -132,7 +130,7 @@ impl> ViewMessage for Messa } /// Internal triggers sent by consensus messages. -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] #[serde(bound(deserialize = ""))] pub enum InternalTrigger { // May add other triggers if necessary. @@ -304,7 +302,7 @@ impl< } } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)] #[serde(bound(deserialize = "", serialize = ""))] /// Messages related to both validating and sequencing consensus. pub enum GeneralConsensusMessage> @@ -328,7 +326,7 @@ where InternalTrigger(InternalTrigger), } -#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)] #[serde(bound(deserialize = "", serialize = ""))] /// Messages related to the sequencing consensus protocol for the DA committee. pub enum CommitteeConsensusMessage { @@ -383,7 +381,7 @@ pub trait SequencingMessageType>: } /// Messages for sequencing consensus. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq)] #[serde(bound(deserialize = "", serialize = ""))] pub struct SequencingMessage< TYPES: NodeType, @@ -455,7 +453,7 @@ impl< CommitteeConsensusMessage::VidVote(_) => MessagePurpose::VidVote, CommitteeConsensusMessage::DACertificate(_) => MessagePurpose::DAC, CommitteeConsensusMessage::VidDisperseMsg(_) => MessagePurpose::VidDisperse, - CommitteeConsensusMessage::VidCertificate(_) => todo!(), + CommitteeConsensusMessage::VidCertificate(_) => MessagePurpose::VidCert, }, } } diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index 49d9184bc4..f229a9af9b 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -128,6 +128,8 @@ pub trait NodeImplementation: + Sync + 'static + for<'a> Deserialize<'a> + + Hash + + Eq + Serialize; /// Consensus type selected exchanges.