diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 51f7bbbdf7..976186c030 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -574,359 +574,3 @@ impl, MEMBERSHIP: Membership; - // type Transaction = VDemoTransaction; - // type ElectionConfigType = StaticElectionConfig; - // type StateType = VDemoState; - // } - // - // type TestMembership = GeneralStaticCommittee; - // type TestNetwork = MemoryCommChannel; - // - // impl NodeImplementation for TestImpl { - // type ConsensusMessage = ValidatingMessage; - // type Exchanges = ValidatingExchanges< - // Test, - // Message, - // QuorumExchange< - // Test, - // TestLeaf, - // TestProposal, - // TestMembership, - // TestNetwork, - // Message, - // >, - // ViewSyncExchange>, - // >; - // type Leaf = TestLeaf; - // type Storage = MemoryStorage; - // - // fn new_channel_maps( - // start_view: ViewNumber, - // ) -> (ChannelMaps, Option>) { - // (ChannelMaps::new(start_view), None) - // } - // } - // - // type TestLeaf = ValidatingLeaf; - // type TestVote = QuorumVote; - // type TestProposal = ValidatingProposal; - // - // /// fake Eq - // /// we can't compare the votetokentype for equality, so we can't - // /// derive EQ on `VoteType` and thereby message - // /// we are only sending data messages, though so we compare key and - // /// data message - // fn fake_message_eq(message_1: Message, message_2: Message) { - // assert_eq!(message_1.sender, message_2.sender); - // if let MessageKind::Data(DataMessage::SubmitTransaction(d_1, _)) = message_1.kind { - // if let MessageKind::Data(DataMessage::SubmitTransaction(d_2, _)) = message_2.kind { - // assert_eq!(d_1, d_2); - // } - // } else { - // panic!("Got unexpected message type in memory test!"); - // } - // } - // - // #[instrument] - // fn get_pubkey() -> Ed25519Pub { - // let priv_key = Ed25519Priv::generate(); - // Ed25519Pub::from_private(&priv_key) - // } - // - // /// create a message - // fn gen_messages(num_messages: u64, seed: u64, pk: Ed25519Pub) -> Vec> { - // let mut messages = Vec::new(); - // for i in 0..num_messages { - // let message = Message { - // sender: pk, - // kind: MessageKind::Data(DataMessage::SubmitTransaction( - // VDemoTransaction { - // add: Addition { - // account: "A".to_string(), - // amount: 50 + i + seed, - // }, - // sub: Subtraction { - // account: "B".to_string(), - // amount: 50 + i + seed, - // }, - // nonce: seed + i, - // padding: vec![50; 0], - // }, - // ::new(0), - // )), - // _phantom: PhantomData, - // }; - // messages.push(message); - // } - // messages - // } - // - // // Spawning a single MemoryNetwork should produce no errors - // #[cfg_attr( - // feature = "tokio-executor", - // tokio::test(flavor = "multi_thread", worker_threads = 2) - // )] - // #[cfg_attr(feature = "async-std-executor", async_std::test)] - // #[instrument] - // async fn spawn_single() { - // setup_logging(); - // let group: Arc, ::SignatureKey>> = - // MasterMap::new(); - // trace!(?group); - // let pub_key = get_pubkey(); - // let _network = MemoryNetwork::new(pub_key, NoMetrics::boxed(), group, Option::None); - // } - // - // // // Spawning a two MemoryNetworks and connecting them should produce no errors - // #[cfg_attr( - // feature = "tokio-executor", - // tokio::test(flavor = "multi_thread", worker_threads = 2) - // )] - // #[cfg_attr(feature = "async-std-executor", async_std::test)] - // #[instrument] - // async fn spawn_double() { - // setup_logging(); - // let group: Arc, ::SignatureKey>> = - // MasterMap::new(); - // trace!(?group); - // let pub_key_1 = get_pubkey(); - // let _network_1 = - // MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); - // let pub_key_2 = get_pubkey(); - // let _network_2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); - // } - // - // // Check to make sure direct queue works - // #[cfg_attr( - // feature = "tokio-executor", - // tokio::test(flavor = "multi_thread", worker_threads = 2) - // )] - // #[cfg_attr(feature = "async-std-executor", async_std::test)] - // #[allow(deprecated)] - // #[instrument] - // async fn direct_queue() { - // setup_logging(); - // // Create some dummy messages - // - // // Make and connect the networking instances - // let group: Arc, ::SignatureKey>> = - // MasterMap::new(); - // trace!(?group); - // let pub_key_1 = get_pubkey(); - // let network1 = - // MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); - // let pub_key_2 = get_pubkey(); - // let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); - // - // let first_messages: Vec> = gen_messages(5, 100, pub_key_1); - // - // // Test 1 -> 2 - // // Send messages - // for sent_message in first_messages { - // network1 - // .direct_message(sent_message.clone(), pub_key_2) - // .await - // .expect("Failed to message node"); - // let mut recv_messages = network2 - // .recv_msgs(TransmitType::Direct) - // .await - // .expect("Failed to receive message"); - // let recv_message = recv_messages.pop().unwrap(); - // assert!(recv_messages.is_empty()); - // fake_message_eq(sent_message, recv_message); - // } - // - // let second_messages: Vec> = gen_messages(5, 200, pub_key_2); - // - // // Test 2 -> 1 - // // Send messages - // for sent_message in second_messages { - // network2 - // .direct_message(sent_message.clone(), pub_key_1) - // .await - // .expect("Failed to message node"); - // let mut recv_messages = network1 - // .recv_msgs(TransmitType::Direct) - // .await - // .expect("Failed to receive message"); - // let recv_message = recv_messages.pop().unwrap(); - // assert!(recv_messages.is_empty()); - // fake_message_eq(sent_message, recv_message); - // } - // } - // - // // Check to make sure direct queue works - // #[cfg_attr( - // feature = "tokio-executor", - // tokio::test(flavor = "multi_thread", worker_threads = 2) - // )] - // #[cfg_attr(feature = "async-std-executor", async_std::test)] - // #[allow(deprecated)] - // #[instrument] - // async fn broadcast_queue() { - // setup_logging(); - // // Make and connect the networking instances - // let group: Arc, ::SignatureKey>> = - // MasterMap::new(); - // trace!(?group); - // let pub_key_1 = get_pubkey(); - // let network1 = - // MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); - // let pub_key_2 = get_pubkey(); - // let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); - // - // let first_messages: Vec> = gen_messages(5, 100, pub_key_1); - // - // // Test 1 -> 2 - // // Send messages - // for sent_message in first_messages { - // network1 - // .broadcast_message( - // sent_message.clone(), - // vec![pub_key_2].into_iter().collect::>(), - // ) - // .await - // .expect("Failed to message node"); - // let mut recv_messages = network2 - // .recv_msgs(TransmitType::Broadcast) - // .await - // .expect("Failed to receive message"); - // let recv_message = recv_messages.pop().unwrap(); - // assert!(recv_messages.is_empty()); - // fake_message_eq(sent_message, recv_message); - // } - // - // let second_messages: Vec> = gen_messages(5, 200, pub_key_2); - // - // // Test 2 -> 1 - // // Send messages - // for sent_message in second_messages { - // network2 - // .broadcast_message( - // sent_message.clone(), - // vec![pub_key_1].into_iter().collect::>(), - // ) - // .await - // .expect("Failed to message node"); - // let mut recv_messages = network1 - // .recv_msgs(TransmitType::Broadcast) - // .await - // .expect("Failed to receive message"); - // let recv_message = recv_messages.pop().unwrap(); - // assert!(recv_messages.is_empty()); - // fake_message_eq(sent_message, recv_message); - // } - // } - // - // #[cfg_attr( - // feature = "tokio-executor", - // tokio::test(flavor = "multi_thread", worker_threads = 2) - // )] - // #[cfg_attr(feature = "async-std-executor", async_std::test)] - // #[instrument] - // #[allow(deprecated)] - // async fn test_in_flight_message_count() { - // // setup_logging(); - // - // // let group: Arc, ::SignatureKey>> = - // // MasterMap::new(); - // // trace!(?group); - // // let pub_key_1 = get_pubkey(); - // // let network1 = - // // MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); - // // let pub_key_2 = get_pubkey(); - // // let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); - // - // // // Create some dummy messages - // // let messages: Vec> = gen_messages(5, 100, pub_key_1); - // - // // // assert_eq!(network1.in_flight_message_count(), Some(0)); - // // // assert_eq!(network2.in_flight_message_count(), Some(0)); - // - // // for (_count, message) in messages.iter().enumerate() { - // // network1 - // // .direct_message(message.clone(), pub_key_2) - // // .await - // // .unwrap(); - // // // network 2 has received `count` broadcast messages and `count + 1` direct messages - // // // assert_eq!(network2.in_flight_message_count(), Some(count + count + 1)); - // - // // // network2.broadcast_message(message.clone()).await.unwrap(); - // // // network 1 has received `count` broadcast messages - // // // assert_eq!(network1.in_flight_message_count(), Some(count + 1)); - // - // // // network 2 has received `count + 1` broadcast messages and `count + 1` direct messages - // // // assert_eq!(network2.in_flight_message_count(), Some((count + 1) * 2)); - // // } - // - // // for _count in (0..messages.len()).rev() { - // // network1.recv_msgs(TransmitType::Broadcast).await.unwrap(); - // // // assert_eq!(network1.in_flight_message_count(), Some(count)); - // - // // network2.recv_msgs(TransmitType::Broadcast).await.unwrap(); - // // network2.recv_msgs(TransmitType::Direct).await.unwrap(); - // // // assert_eq!(network2.in_flight_message_count(), Some(count * 2)); - // // } - // - // // // assert_eq!(network1.in_flight_message_count(), Some(0)); - // // // assert_eq!(network2.in_flight_message_count(), Some(0)); - // } -} diff --git a/crates/testing/tests/memory_network.rs b/crates/testing/tests/memory_network.rs new file mode 100644 index 0000000000..d1a231a2c0 --- /dev/null +++ b/crates/testing/tests/memory_network.rs @@ -0,0 +1,368 @@ +use std::collections::BTreeSet; +use std::marker::PhantomData; +use std::sync::Arc; + +use async_compatibility_layer::logging::setup_logging; +use hotshot::demo::SDemoState; +use hotshot::traits::election::static_committee::{ + GeneralStaticCommittee, StaticElectionConfig, StaticVoteToken, +}; +use hotshot::traits::implementations::{ + MasterMap, MemoryCommChannel, MemoryNetwork, MemoryStorage, +}; +use hotshot::traits::NodeImplementation; +use hotshot::types::bn254::{BLSPrivKey, BLSPubKey}; +use hotshot::types::SignatureKey; +use hotshot_types::block_impl::{VIDBlockPayload, VIDTransaction}; +use hotshot_types::certificate::ViewSyncCertificate; +use hotshot_types::data::{DAProposal, QuorumProposal, SequencingLeaf}; +use hotshot_types::message::{Message, SequencingMessage}; +use hotshot_types::traits::election::{CommitteeExchange, QuorumExchange, ViewSyncExchange}; +use hotshot_types::traits::metrics::NoMetrics; +use hotshot_types::traits::network::TestableNetworkingImplementation; +use hotshot_types::traits::network::{ConnectedNetwork, TransmitType}; +use hotshot_types::traits::node_implementation::{ChannelMaps, NodeType, SequencingExchanges}; +use hotshot_types::vote::{DAVote, ViewSyncVote}; +use hotshot_types::{ + data::ViewNumber, + message::{DataMessage, MessageKind}, + traits::state::ConsensusTime, + vote::QuorumVote, +}; +use rand::rngs::StdRng; +use rand::{RngCore, SeedableRng}; +use serde::{Deserialize, Serialize}; +use tracing::instrument; +use tracing::trace; + +#[derive( + Copy, + Clone, + Debug, + Default, + Hash, + PartialEq, + Eq, + PartialOrd, + Ord, + serde::Serialize, + serde::Deserialize, +)] +pub struct Test; + +impl NodeType for Test { + type Time = ViewNumber; + type BlockType = VIDBlockPayload; + type SignatureKey = BLSPubKey; + type VoteTokenType = StaticVoteToken; + type Transaction = VIDTransaction; + type ElectionConfigType = StaticElectionConfig; + type StateType = SDemoState; +} + +#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq)] +pub struct TestImpl {} + +pub type ThisLeaf = SequencingLeaf; +pub type ThisMembership = GeneralStaticCommittee::SignatureKey>; +pub type DANetwork = MemoryCommChannel; +pub type QuorumNetwork = MemoryCommChannel; +pub type ViewSyncNetwork = MemoryCommChannel; + +pub type ThisDAProposal = DAProposal; +pub type ThisDAVote = DAVote; + +pub type ThisQuorumProposal = QuorumProposal; +pub type ThisQuorumVote = QuorumVote; + +pub type ThisViewSyncProposal = ViewSyncCertificate; +pub type ThisViewSyncVote = ViewSyncVote; + +impl NodeImplementation for TestImpl { + type Storage = MemoryStorage; + type Leaf = SequencingLeaf; + type Exchanges = SequencingExchanges< + Test, + Message, + QuorumExchange< + Test, + Self::Leaf, + ThisQuorumProposal, + ThisMembership, + QuorumNetwork, + Message, + >, + CommitteeExchange>, + ViewSyncExchange< + Test, + ThisViewSyncProposal, + ThisMembership, + ViewSyncNetwork, + Message, + >, + >; + type ConsensusMessage = SequencingMessage; + + fn new_channel_maps( + start_view: ::Time, + ) -> (ChannelMaps, Option>) { + (ChannelMaps::new(start_view), None) + } +} + +/// fake Eq +/// we can't compare the votetokentype for equality, so we can't +/// derive EQ on `VoteType` and thereby message +/// we are only sending data messages, though so we compare key and +/// data message +fn fake_message_eq(message_1: Message, message_2: Message) { + assert_eq!(message_1.sender, message_2.sender); + if let MessageKind::Data(DataMessage::SubmitTransaction(d_1, _)) = message_1.kind { + if let MessageKind::Data(DataMessage::SubmitTransaction(d_2, _)) = message_2.kind { + assert_eq!(d_1, d_2); + } + } else { + panic!("Got unexpected message type in memory test!"); + } +} + +#[instrument] +fn get_pubkey() -> BLSPubKey { + // random 32 bytes + let mut bytes = [0; 32]; + rand::thread_rng().fill_bytes(&mut bytes); + BLSPubKey::from_private(&BLSPrivKey::generate_from_seed(bytes)) +} + +/// create a message +fn gen_messages(num_messages: u64, seed: u64, pk: BLSPubKey) -> Vec> { + let mut messages = Vec::new(); + for _ in 0..num_messages { + // create a random transaction from seed + let mut bytes = [0u8; 8]; + let mut rng = StdRng::seed_from_u64(seed); + rng.fill_bytes(&mut bytes); + + let message = Message { + sender: pk, + kind: MessageKind::Data(DataMessage::SubmitTransaction( + VIDTransaction(bytes.to_vec()), + ::new(0), + )), + _phantom: PhantomData, + }; + messages.push(message); + } + messages +} + +// Spawning a single MemoryNetwork should produce no errors +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[instrument] +async fn memory_network_spawn_single() { + setup_logging(); + let group: Arc, ::SignatureKey>> = + MasterMap::new(); + trace!(?group); + let pub_key = get_pubkey(); + let _network = MemoryNetwork::new(pub_key, NoMetrics::boxed(), group, Option::None); +} + +// // Spawning a two MemoryNetworks and connecting them should produce no errors +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[instrument] +async fn memory_network_spawn_double() { + setup_logging(); + let group: Arc, ::SignatureKey>> = + MasterMap::new(); + trace!(?group); + let pub_key_1 = get_pubkey(); + let _network_1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let pub_key_2 = get_pubkey(); + let _network_2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); +} + +// Check to make sure direct queue works +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[instrument] +async fn memory_network_direct_queue() { + setup_logging(); + // Create some dummy messages + + // Make and connect the networking instances + let group: Arc, ::SignatureKey>> = + MasterMap::new(); + trace!(?group); + + let pub_key_1 = get_pubkey(); + let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + + let pub_key_2 = get_pubkey(); + let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + + let first_messages: Vec> = gen_messages(5, 100, pub_key_1); + + // Test 1 -> 2 + // Send messages + for sent_message in first_messages { + network1 + .direct_message(sent_message.clone(), pub_key_2) + .await + .expect("Failed to message node"); + let mut recv_messages = network2 + .recv_msgs(TransmitType::Direct) + .await + .expect("Failed to receive message"); + let recv_message = recv_messages.pop().unwrap(); + assert!(recv_messages.is_empty()); + fake_message_eq(sent_message, recv_message); + } + + let second_messages: Vec> = gen_messages(5, 200, pub_key_2); + + // Test 2 -> 1 + // Send messages + for sent_message in second_messages { + network2 + .direct_message(sent_message.clone(), pub_key_1) + .await + .expect("Failed to message node"); + let mut recv_messages = network1 + .recv_msgs(TransmitType::Direct) + .await + .expect("Failed to receive message"); + let recv_message = recv_messages.pop().unwrap(); + assert!(recv_messages.is_empty()); + fake_message_eq(sent_message, recv_message); + } +} + +// Check to make sure direct queue works +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[instrument] +async fn memory_network_broadcast_queue() { + setup_logging(); + // Make and connect the networking instances + let group: Arc, ::SignatureKey>> = + MasterMap::new(); + trace!(?group); + let pub_key_1 = get_pubkey(); + let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let pub_key_2 = get_pubkey(); + let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + + let first_messages: Vec> = gen_messages(5, 100, pub_key_1); + + // Test 1 -> 2 + // Send messages + for sent_message in first_messages { + network1 + .broadcast_message( + sent_message.clone(), + vec![pub_key_2].into_iter().collect::>(), + ) + .await + .expect("Failed to message node"); + let mut recv_messages = network2 + .recv_msgs(TransmitType::Broadcast) + .await + .expect("Failed to receive message"); + let recv_message = recv_messages.pop().unwrap(); + assert!(recv_messages.is_empty()); + fake_message_eq(sent_message, recv_message); + } + + let second_messages: Vec> = gen_messages(5, 200, pub_key_2); + + // Test 2 -> 1 + // Send messages + for sent_message in second_messages { + network2 + .broadcast_message( + sent_message.clone(), + vec![pub_key_1].into_iter().collect::>(), + ) + .await + .expect("Failed to message node"); + let mut recv_messages = network1 + .recv_msgs(TransmitType::Broadcast) + .await + .expect("Failed to receive message"); + let recv_message = recv_messages.pop().unwrap(); + assert!(recv_messages.is_empty()); + fake_message_eq(sent_message, recv_message); + } +} + +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[instrument] +#[allow(deprecated)] +async fn memory_network_test_in_flight_message_count() { + setup_logging(); + + let group: Arc, ::SignatureKey>> = + MasterMap::new(); + trace!(?group); + let pub_key_1 = get_pubkey(); + let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let pub_key_2 = get_pubkey(); + let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + + // Create some dummy messages + let messages: Vec> = gen_messages(5, 100, pub_key_1); + let broadcast_recipients = BTreeSet::from([pub_key_1, pub_key_2]); + + assert_eq!(network1.in_flight_message_count(), Some(0)); + assert_eq!(network2.in_flight_message_count(), Some(0)); + + for (count, message) in messages.iter().enumerate() { + network1 + .direct_message(message.clone(), pub_key_2) + .await + .unwrap(); + // network 2 has received `count` broadcast messages and `count + 1` direct messages + assert_eq!(network2.in_flight_message_count(), Some(count + count + 1)); + + network2 + .broadcast_message(message.clone(), broadcast_recipients.clone()) + .await + .unwrap(); + // network 1 has received `count` broadcast messages + assert_eq!(network1.in_flight_message_count(), Some(count + 1)); + + // network 2 has received `count + 1` broadcast messages and `count + 1` direct messages + assert_eq!(network2.in_flight_message_count(), Some((count + 1) * 2)); + } + + while network1.in_flight_message_count().unwrap() > 0 { + network1.recv_msgs(TransmitType::Broadcast).await.unwrap(); + } + + while network2.in_flight_message_count().unwrap() > 0 { + network2.recv_msgs(TransmitType::Broadcast).await.unwrap(); + network2.recv_msgs(TransmitType::Direct).await.unwrap(); + } + + assert_eq!(network1.in_flight_message_count(), Some(0)); + assert_eq!(network2.in_flight_message_count(), Some(0)); +}