From 64ff7200643217ce900020bb20045035796bf866 Mon Sep 17 00:00:00 2001 From: 0x009922 <43530070+0x009922@users.noreply.github.com> Date: Fri, 25 Oct 2024 11:31:39 +0900 Subject: [PATCH] feat: implement p2p TCP relay Signed-off-by: 0x009922 <43530070+0x009922@users.noreply.github.com> --- Cargo.lock | 7 + crates/iroha/Cargo.toml | 1 + crates/iroha/tests/faulty_peers.rs | 363 +++++++++++++++++++++++++++ crates/iroha/tests/permissions.rs | 3 +- crates/iroha_test_network/src/lib.rs | 86 +++---- 5 files changed, 417 insertions(+), 43 deletions(-) create mode 100644 crates/iroha/tests/faulty_peers.rs diff --git a/Cargo.lock b/Cargo.lock index 850a3a1bb87..3622ce10d68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,6 +305,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "ascii_table" +version = "4.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed8a80a95ab122e7cc43bfde1d51949c89ff67e0c76eb795dc045003418473e2" + [[package]] name = "assert_matches" version = "1.5.0" @@ -2904,6 +2910,7 @@ checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" name = "iroha" version = "2.0.0-rc.1.0" dependencies = [ + "ascii_table", "assert_matches", "assertables", "attohttpc", diff --git a/crates/iroha/Cargo.toml b/crates/iroha/Cargo.toml index c729b7433dc..e5a0a1bc463 100644 --- a/crates/iroha/Cargo.toml +++ b/crates/iroha/Cargo.toml @@ -95,3 +95,4 @@ hex = { workspace = true } assertables = { workspace = true } trybuild = { workspace = true } assert_matches = "1.5.0" +ascii_table = "4.0.4" diff --git a/crates/iroha/tests/faulty_peers.rs b/crates/iroha/tests/faulty_peers.rs new file mode 100644 index 00000000000..083b3f778ec --- /dev/null +++ b/crates/iroha/tests/faulty_peers.rs @@ -0,0 +1,363 @@ +use std::time::Duration; + +use eyre::Result; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use iroha_config_base::toml::WriteExt; +use iroha_test_network::{ + genesis_factory, once_blocks_sync, Network, NetworkBuilder, PeerLifecycleEvent, +}; +use relay::P2pRelay; +use tokio::{self, time::timeout}; + +mod relay { + use std::{ + collections::HashMap, + iter::once, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + }; + + use futures_util::{stream::FuturesUnordered, StreamExt}; + use iroha_data_model::peer::PeerId; + use iroha_primitives::{ + addr::{socket_addr, SocketAddr}, + unique_vec::UniqueVec, + }; + use iroha_test_network::fslock_ports::AllocatedPort; + use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + select, + sync::Notify, + task::JoinSet, + }; + + #[derive(Debug)] + pub struct P2pRelay { + peers: HashMap, + tasks: JoinSet<()>, + } + + #[derive(Debug)] + struct RelayPeer { + real_addr: SocketAddr, + mock_outgoing: HashMap, + suspend: Suspend, + } + + impl P2pRelay { + pub fn new(real_topology: &UniqueVec) -> Self { + let peers: HashMap<_, _> = real_topology + .iter() + .map(|peer_id| { + let real_addr = peer_id.address().clone(); + let mock_outgoing = real_topology + .iter() + .filter(|x| *x != peer_id) + .map(|other_id| { + let mock_port = AllocatedPort::new(); + let mock_addr = socket_addr!(127.0.0.1:*mock_port); + (other_id.clone(), (mock_addr, mock_port)) + }) + .collect(); + let peer = RelayPeer { + real_addr, + mock_outgoing, + suspend: Suspend::new(), + }; + (peer_id.clone(), peer) + }) + .collect(); + + let mut table = ascii_table::AsciiTable::default(); + table.set_max_width(30 * (1 + real_topology.len())); + table.column(0).set_header("From"); + for (i, id) in real_topology.iter().enumerate() { + table + .column(i + 1) + .set_header(format!("To {}", id.address())); + } + table.print(real_topology.iter().map(|id| { + once(format!("{}", id.address())) + .chain(real_topology.iter().map(|peer_id| { + if *peer_id == *id { + "".to_string() + } else { + let (mock_addr, _) = + peers.get(id).unwrap().mock_outgoing.get(peer_id).unwrap(); + format!("{mock_addr}") + } + })) + .collect::>() + })); + + Self { + peers, + tasks: <_>::default(), + } + } + + pub fn topology_for(&self, peer: &PeerId) -> UniqueVec { + self.peers + .get(peer) + .expect("existing peer must be supplied") + .mock_outgoing + .iter() + .map(|(other, (addr, _port))| PeerId::new(addr.clone(), other.public_key().clone())) + .collect() + } + + pub fn start(&mut self) { + for (_peer_id, peer) in self.peers.iter() { + for (other_id, (other_mock_addr, _)) in peer.mock_outgoing.iter() { + let other_peer = self.peers.get(other_id).expect("must be present"); + let suspend = + SuspendIfAny(vec![peer.suspend.clone(), other_peer.suspend.clone()]); + + P2pRelay::run_proxy( + &mut self.tasks, + other_mock_addr.clone(), + other_peer.real_addr.clone(), + suspend, + ); + } + } + } + + fn run_proxy( + tasks: &mut JoinSet<()>, + from: SocketAddr, + to: SocketAddr, + suspend: SuspendIfAny, + ) { + eprintln!("proxy: {from} → {to}"); + let mut proxy = Proxy::new(from, to, suspend); + + tasks.spawn(async move { + if let Err(err) = proxy.run().await { + eprintln!("proxy at {} exited with an error: {err}", proxy.from); + } else { + eprintln!("proxy exited normally"); + } + }); + } + + pub fn suspend(&self, peer: &PeerId) -> Suspend { + self.peers + .get(peer) + .expect("must be present") + .suspend + .clone() + } + } + + #[derive(Clone, Debug, Default)] + pub struct Suspend { + active: Arc, + notify: Arc, + } + + impl Suspend { + fn new() -> Self { + Self::default() + } + + pub fn activate(&self) { + self.active.store(true, Ordering::Release); + } + + pub fn deactivate(&self) { + self.active.store(false, Ordering::Release); + self.notify.notify_waiters(); + } + } + + #[derive(Clone, Debug)] + struct SuspendIfAny(Vec); + + impl SuspendIfAny { + async fn is_not_active(&self) { + loop { + let waited_for = self + .0 + .iter() + .filter_map(|x| { + x.active + .load(Ordering::Acquire) + .then_some(x.notify.notified()) + }) + .collect::>() + .collect::>() + .await + .len(); + if waited_for == 0 { + break; + } + } + } + } + + struct Proxy { + from: SocketAddr, + to: SocketAddr, + suspend: SuspendIfAny, + } + + impl Proxy { + fn new(from: SocketAddr, to: SocketAddr, suspend: SuspendIfAny) -> Self { + Self { from, to, suspend } + } + + async fn run(&mut self) -> eyre::Result<()> { + let listener = TcpListener::bind(self.from.to_string()).await?; + loop { + let (client, _) = listener.accept().await?; + let server = TcpStream::connect(self.to.to_string()).await?; + + let (mut eread, mut ewrite) = client.into_split(); + let (mut oread, mut owrite) = server.into_split(); + + let suspend = self.suspend.clone(); + let e2o = + tokio::spawn( + async move { Proxy::copy(&suspend, &mut eread, &mut owrite).await }, + ); + let suspend = self.suspend.clone(); + let o2e = + tokio::spawn( + async move { Proxy::copy(&suspend, &mut oread, &mut ewrite).await }, + ); + + select! { + _ = e2o => { + // eprintln!("{} → {}: client-to-server closed ×", self.from, self.to); + }, + _ = o2e => { + // eprintln!("{} → {}: server-to-client closed ×", self.from, self.to); + }, + } + } + } + + async fn copy( + suspend: &SuspendIfAny, + mut reader: R, + mut writer: W, + ) -> std::io::Result<()> + where + R: AsyncRead + Unpin, + W: AsyncWrite + Unpin, + { + // NOTE: stack overflow happens without the box + let mut buf = Box::new([0u8; 2usize.pow(20)]); + + loop { + suspend.is_not_active().await; + + let n = reader.read(&mut *buf).await?; + if n == 0 { + break; + } + + writer.write_all(&buf[..n]).await?; + } + + Ok(()) + } + } +} + +async fn start_network_with_relay(network: &Network) -> Result { + let relay = P2pRelay::new(&network.peers().iter().map(|peer| peer.id()).collect()); + + timeout( + network.peer_startup_timeout(), + network + .peers() + .iter() + .enumerate() + .map(|(i, peer)| { + let topology = relay.topology_for(&peer.id()); + let config = network + .config() + .write(["sumeragi", "trusted_peers"], &topology); + let genesis = genesis_factory(network.genesis_isi().clone(), topology); + async move { + // FIXME: parallel + peer.start(config, (i == 0).then_some(&genesis)).await; + peer.once(|e| matches!(e, PeerLifecycleEvent::ServerStarted)) + .await; + } + }) + .collect::>() + .collect::>(), + ) + .await?; + + Ok(relay) +} + +#[tokio::test] +async fn network_starts_with_relay() -> Result<()> { + let network = NetworkBuilder::new().with_peers(4).build(); + let mut relay = start_network_with_relay(&network).await?; + + relay.start(); + network.ensure_blocks(1).await?; + + Ok(()) +} + +#[tokio::test] +async fn network_doesnt_start_without_relay() -> Result<()> { + let network = NetworkBuilder::new().with_peers(4).build(); + let _relay = start_network_with_relay(&network).await?; + + if let Ok(_) = timeout( + Duration::from_secs(3), + once_blocks_sync(network.peers().iter(), 1), + ) + .await + { + panic!("network must not start!") + }; + + Ok(()) +} + +#[tokio::test] +async fn suspending_works() -> Result<()> { + const SYNC: Duration = Duration::from_secs(3); + const N_PEERS: usize = 4; + const { assert!(N_PEERS > 0) }; + + let network = NetworkBuilder::new().with_peers(N_PEERS).build(); + let mut relay = start_network_with_relay(&network).await?; + // we will plug/unplug the last peer who doesn't have the genesis + let last_peer = network + .peers() + .last() + .expect("there are more than 0 of them"); + let suspend = relay.suspend(&last_peer.id()); + + suspend.activate(); + relay.start(); + + // all peers except the last one should get the genesis + timeout( + SYNC, + once_blocks_sync(network.peers().iter().take(N_PEERS - 1), 1), + ) + .await?; + let Err(_) = timeout(SYNC, last_peer.once_block(1)).await else { + panic!("should not get block within timeout!") + }; + + // unsuspend, the last peer should get the block too + suspend.deactivate(); + timeout(SYNC, last_peer.once_block(1)).await?; + + Ok(()) +} diff --git a/crates/iroha/tests/permissions.rs b/crates/iroha/tests/permissions.rs index 13559c9c5fd..23f03f4b6f1 100644 --- a/crates/iroha/tests/permissions.rs +++ b/crates/iroha/tests/permissions.rs @@ -27,12 +27,13 @@ async fn genesis_transactions_are_validated_by_executor() { let network = NetworkBuilder::new() .with_genesis_instruction(invalid_instruction) .build(); + let genesis = genesis_factory(network.genesis_isi().clone(), network.topology()); let peer = network.peer(); timeout(Duration::from_secs(3), async { join!( // Peer should start... - peer.start(network.config(), Some(network.genesis())), + peer.start(network.config(), Some(&genesis)), peer.once(|event| matches!(event, PeerLifecycleEvent::ServerStarted)), // ...but it should shortly exit with an error peer.once(|event| match event { diff --git a/crates/iroha_test_network/src/lib.rs b/crates/iroha_test_network/src/lib.rs index 70190e9f660..4b3daf3773e 100644 --- a/crates/iroha_test_network/src/lib.rs +++ b/crates/iroha_test_network/src/lib.rs @@ -1,7 +1,7 @@ //! Puppeteer for `irohad`, to create test networks mod config; -mod fslock_ports; +pub mod fslock_ports; use core::{fmt::Debug, time::Duration}; use std::{ @@ -49,11 +49,13 @@ use tokio::{ }; use toml::Table; +pub use crate::config::genesis as genesis_factory; + const INSTANT_PIPELINE_TIME: Duration = Duration::from_millis(10); const DEFAULT_BLOCK_SYNC: Duration = Duration::from_millis(150); const PEER_START_TIMEOUT: Duration = Duration::from_secs(30); const PEER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); -const SYNC_TIMEOUT: Duration = Duration::from_secs(30); +const SYNC_TIMEOUT: Duration = Duration::from_secs(5); fn iroha_bin() -> impl AsRef { static PATH: OnceLock = OnceLock::new(); @@ -87,7 +89,7 @@ fn tempdir_in() -> Option> { pub struct Network { peers: Vec, - genesis: GenesisBlock, + genesis_isi: Vec, block_time: Duration, commit_time: Duration, @@ -137,7 +139,7 @@ impl Network { }; let start = async move { - peer.start(self.config(), (i == 0).then_some(&self.genesis)) + peer.start(self.config(), (i == 0).then_some(&self.genesis())) .await; peer.once_block(1).await; }; @@ -191,8 +193,16 @@ impl Network { } /// Network genesis block. - pub fn genesis(&self) -> &GenesisBlock { - &self.genesis + /// + /// It uses the basic [`genesis_factory`] with [`Self::genesis_isi`] + + /// topology of the network peers. + pub fn genesis(&self) -> GenesisBlock { + genesis_factory(self.genesis_isi.clone(), self.topology()) + } + + /// Base network instructions included into the genesis block. + pub fn genesis_isi(&self) -> &Vec { + &self.genesis_isi } /// Shutdown running peers @@ -217,12 +227,7 @@ impl Network { pub async fn ensure_blocks(&self, height: u64) -> Result<&Self> { timeout( self.sync_timeout(), - self.peers - .iter() - .filter(|x| x.is_running()) - .map(|x| x.once_block(height)) - .collect::>() - .collect::>(), + once_blocks_sync(self.peers.iter().filter(|x| x.is_running()), height), ) .await .wrap_err_with(|| { @@ -240,7 +245,7 @@ pub struct NetworkBuilder { n_peers: usize, config: Table, pipeline_time: Option, - extra_isi: Vec, + genesis_isi: Vec, } impl Default for NetworkBuilder { @@ -257,7 +262,7 @@ impl NetworkBuilder { n_peers: 1, config: config::base_iroha_config(), pipeline_time: Some(INSTANT_PIPELINE_TIME), - extra_isi: vec![], + genesis_isi: vec![], } } @@ -309,7 +314,7 @@ impl NetworkBuilder { /// Append an instruction to genesis. pub fn with_genesis_instruction(mut self, isi: impl Into) -> Self { - self.extra_isi.push(isi.into()); + self.genesis_isi.push(isi.into()); self } @@ -317,46 +322,33 @@ impl NetworkBuilder { pub fn build(self) -> Network { let peers: Vec<_> = (0..self.n_peers).map(|_| NetworkPeer::generate()).collect(); - let topology: UniqueVec<_> = peers.iter().map(|peer| peer.id.clone()).collect(); - let block_sync_gossip_period = DEFAULT_BLOCK_SYNC; - let mut extra_isi = vec![]; let block_time; let commit_time; if let Some(duration) = self.pipeline_time { block_time = duration / 3; commit_time = duration / 2; - extra_isi.extend([ - InstructionBox::SetParameter(SetParameter(Parameter::Sumeragi( - SumeragiParameter::BlockTimeMs(block_time.as_millis() as u64), - ))), - InstructionBox::SetParameter(SetParameter(Parameter::Sumeragi( - SumeragiParameter::CommitTimeMs(commit_time.as_millis() as u64), - ))), - ]); } else { block_time = SumeragiParameters::default().block_time(); commit_time = SumeragiParameters::default().commit_time(); } - let genesis = config::genesis( - [ - InstructionBox::SetParameter(SetParameter(Parameter::Sumeragi( - SumeragiParameter::BlockTimeMs(block_time.as_millis() as u64), - ))), - InstructionBox::SetParameter(SetParameter(Parameter::Sumeragi( - SumeragiParameter::CommitTimeMs(commit_time.as_millis() as u64), - ))), - ] - .into_iter() - .chain(self.extra_isi), - topology, - ); + let genesis_isi = [ + InstructionBox::SetParameter(SetParameter(Parameter::Sumeragi( + SumeragiParameter::BlockTimeMs(block_time.as_millis() as u64), + ))), + InstructionBox::SetParameter(SetParameter(Parameter::Sumeragi( + SumeragiParameter::CommitTimeMs(commit_time.as_millis() as u64), + ))), + ] + .into_iter() + .chain(self.genesis_isi) + .collect(); Network { peers, - genesis, + genesis_isi, block_time, commit_time, config: self.config.write( @@ -508,9 +500,10 @@ impl NetworkPeer { }; eprintln!( - "{} generated peer, dir: {}", + "{} generated peer\n dir: {}\n public key: {}", result.log_prefix(), - result.dir.path().display() + result.dir.path().display(), + result.key_pair.public_key(), ); result @@ -906,6 +899,15 @@ impl PeerExit { } } +/// Wait until [`NetworkPeer::once_block`] resolves for all the peers. +pub async fn once_blocks_sync(peers: impl Iterator, height: u64) { + peers + .map(|x| x.once_block(height)) + .collect::>() + .collect::>() + .await; +} + #[cfg(test)] mod tests { use super::*;