diff --git a/crates/iroha_cli/src/main.rs b/crates/iroha_cli/src/main.rs index 6afb8ee7033..c24c45aba5a 100644 --- a/crates/iroha_cli/src/main.rs +++ b/crates/iroha_cli/src/main.rs @@ -11,7 +11,7 @@ use erased_serde::Serialize; use error_stack::{fmt::ColorMode, IntoReportCompat, ResultExt}; use eyre::{eyre, Error, Result, WrapErr}; use iroha::{client::Client, config::Config, data_model::prelude::*}; -use iroha_primitives::{addr::SocketAddr, json::Json}; +use iroha_primitives::json::Json; use thiserror::Error; /// Re-usable clap `--metadata ` (`-m`) argument. @@ -1039,9 +1039,6 @@ mod peer { /// Register subcommand of peer #[derive(clap::Args, Debug)] pub struct Register { - /// P2P address of the peer e.g. `127.0.0.1:1337` - #[arg(short, long)] - pub address: SocketAddr, /// Public key of the peer #[arg(short, long)] pub key: PublicKey, @@ -1051,13 +1048,8 @@ mod peer { impl RunArgs for Register { fn run(self, context: &mut dyn RunContext) -> Result<()> { - let Self { - address, - key, - metadata, - } = self; - let register_peer = - iroha::data_model::isi::Register::peer(Peer::new(PeerId::new(address, key))); + let Self { key, metadata } = self; + let register_peer = iroha::data_model::isi::Register::peer(key.into()); submit([register_peer], metadata.load()?, context).wrap_err("Failed to register peer") } } @@ -1065,9 +1057,6 @@ mod peer { /// Unregister subcommand of peer #[derive(clap::Args, Debug)] pub struct Unregister { - /// P2P address of the peer e.g. `127.0.0.1:1337` - #[arg(short, long)] - pub address: SocketAddr, /// Public key of the peer #[arg(short, long)] pub key: PublicKey, @@ -1077,13 +1066,8 @@ mod peer { impl RunArgs for Unregister { fn run(self, context: &mut dyn RunContext) -> Result<()> { - let Self { - address, - key, - metadata, - } = self; - let unregister_peer = - iroha::data_model::isi::Unregister::peer(PeerId::new(address, key)); + let Self { key, metadata } = self; + let unregister_peer = iroha::data_model::isi::Unregister::peer(key.into()); submit([unregister_peer], metadata.load()?, context) .wrap_err("Failed to unregister peer") } diff --git a/crates/iroha_config/iroha_test_config.toml b/crates/iroha_config/iroha_test_config.toml index ac65c79a9c3..73175f81a47 100644 --- a/crates/iroha_config/iroha_test_config.toml +++ b/crates/iroha_config/iroha_test_config.toml @@ -4,6 +4,7 @@ private_key = "802620282ED9F3CF92811C3818DBC4AE594ED59DC1A2F78E4241E31924E101D6B [network] address = "127.0.0.1:1337" +external_port = 1337 [genesis] public_key = "ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4" diff --git a/crates/iroha_config/src/parameters/actual.rs b/crates/iroha_config/src/parameters/actual.rs index d752ce31a65..1ce133ebfd8 100644 --- a/crates/iroha_config/src/parameters/actual.rs +++ b/crates/iroha_config/src/parameters/actual.rs @@ -77,6 +77,7 @@ pub struct Common { #[derive(Debug, Clone)] pub struct Network { pub address: WithOrigin, + pub external_port: WithOrigin, pub idle_timeout: Duration, } diff --git a/crates/iroha_config/src/parameters/user.rs b/crates/iroha_config/src/parameters/user.rs index fbd804c1937..93403c0ac99 100644 --- a/crates/iroha_config/src/parameters/user.rs +++ b/crates/iroha_config/src/parameters/user.rs @@ -273,6 +273,8 @@ pub struct Network { /// Peer-to-peer address #[config(env = "P2P_ADDRESS")] pub address: WithOrigin, + #[config(env = "P2P_EXTERNAL_PORT")] + pub external_port: WithOrigin, #[config(default = "defaults::network::BLOCK_GOSSIP_SIZE")] pub block_gossip_size: NonZeroU32, #[config(default = "defaults::network::BLOCK_GOSSIP_PERIOD.into()")] @@ -296,6 +298,7 @@ impl Network { ) { let Self { address, + external_port, block_gossip_size, block_gossip_period_ms: block_gossip_period, transaction_gossip_size, @@ -306,6 +309,7 @@ impl Network { ( actual::Network { address, + external_port, idle_timeout: idle_timeout.get(), }, actual::BlockSync { diff --git a/crates/iroha_config/tests/fixtures.rs b/crates/iroha_config/tests/fixtures.rs index af8cd26c83b..d67ab21a85d 100644 --- a/crates/iroha_config/tests/fixtures.rs +++ b/crates/iroha_config/tests/fixtures.rs @@ -99,6 +99,13 @@ fn minimal_config_snapshot() { path: "tests/fixtures/base.toml", }, }, + external_port: WithOrigin { + value: 1337, + origin: File { + id: ParameterId(network.external_port), + path: "tests/fixtures/base.toml", + }, + }, idle_timeout: 60s, }, genesis: Genesis { diff --git a/crates/iroha_config/tests/fixtures/bad.torii_addr_eq_p2p_addr.toml b/crates/iroha_config/tests/fixtures/bad.torii_addr_eq_p2p_addr.toml index 79f9c324cee..22a4823d27b 100644 --- a/crates/iroha_config/tests/fixtures/bad.torii_addr_eq_p2p_addr.toml +++ b/crates/iroha_config/tests/fixtures/bad.torii_addr_eq_p2p_addr.toml @@ -2,6 +2,7 @@ extends = ["base.toml", "base_trusted_peers.toml"] [network] address = "127.0.0.1:8080" +external_port = 8080 [torii] address = "127.0.0.1:8080" diff --git a/crates/iroha_config/tests/fixtures/base.toml b/crates/iroha_config/tests/fixtures/base.toml index c0d1355a1f6..3d55a4d92ab 100644 --- a/crates/iroha_config/tests/fixtures/base.toml +++ b/crates/iroha_config/tests/fixtures/base.toml @@ -4,6 +4,7 @@ private_key = "8026208F4C15E5D664DA3F13778801D23D4E89B76E94C1B94B389544168B6CB89 [network] address = "127.0.0.1:1337" +external_port = 1337 [genesis] public_key = "ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB" diff --git a/crates/iroha_config/tests/fixtures/full.env b/crates/iroha_config/tests/fixtures/full.env index d7b916be708..7c45f3417c6 100644 --- a/crates/iroha_config/tests/fixtures/full.env +++ b/crates/iroha_config/tests/fixtures/full.env @@ -2,6 +2,7 @@ CHAIN=0-0 PUBLIC_KEY=ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB PRIVATE_KEY=8026208F4C15E5D664DA3F13778801D23D4E89B76E94C1B94B389544168B6CB894F84F P2P_ADDRESS=127.0.0.1:5432 +P2P_EXTERNAL_PORT=5432 GENESIS_PUBLIC_KEY=ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB GENESIS=./genesis.signed.scale API_ADDRESS=127.0.0.1:8080 diff --git a/crates/iroha_config/tests/fixtures/full.toml b/crates/iroha_config/tests/fixtures/full.toml index 298d5582a86..bf9bb61857f 100644 --- a/crates/iroha_config/tests/fixtures/full.toml +++ b/crates/iroha_config/tests/fixtures/full.toml @@ -10,6 +10,7 @@ file = "genesis.signed.scale" [network] address = "localhost:3840" +external_port = 3840 block_gossip_period_ms = 10_000 block_gossip_size = 4 transaction_gossip_period_ms = 1_000 diff --git a/crates/iroha_config/tests/fixtures/minimal_file_and_env.toml b/crates/iroha_config/tests/fixtures/minimal_file_and_env.toml index 10e7d77dac3..e5334d50dd3 100644 --- a/crates/iroha_config/tests/fixtures/minimal_file_and_env.toml +++ b/crates/iroha_config/tests/fixtures/minimal_file_and_env.toml @@ -6,6 +6,7 @@ private_key = "8026208F4C15E5D664DA3F13778801D23D4E89B76E94C1B94B389544168B6CB89 [network] address = "127.0.0.1:1337" +external_port = 1337 [genesis] public_key = "ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB" diff --git a/crates/iroha_core/src/lib.rs b/crates/iroha_core/src/lib.rs index 63456ff5c2f..29ba08689ff 100644 --- a/crates/iroha_core/src/lib.rs +++ b/crates/iroha_core/src/lib.rs @@ -7,6 +7,7 @@ pub mod gossiper; pub mod kiso; pub mod kura; pub mod metrics; +pub mod peers_gossiper; pub mod query; pub mod queue; pub mod smartcontracts; @@ -25,6 +26,7 @@ use tokio::sync::broadcast; use crate::{ block_sync::message::Message as BlockSyncMessage, + peers_gossiper::PeersGossip, prelude::*, sumeragi::message::{BlockMessage, ControlFlowMessage}, }; @@ -52,6 +54,8 @@ pub enum NetworkMessage { BlockSync(Box), /// Transaction gossiper message TransactionGossiper(Box), + /// Peers addresses gossiper message + PeersGossiper(Box), /// Health check message Health, } diff --git a/crates/iroha_core/src/peers_gossiper.rs b/crates/iroha_core/src/peers_gossiper.rs new file mode 100644 index 00000000000..4d653fb010b --- /dev/null +++ b/crates/iroha_core/src/peers_gossiper.rs @@ -0,0 +1,151 @@ +//! Peers gossiper is actor which is responsible for gossiping addresses of peers. +//! +//! E.g. peer A changes address, connects to peer B, +//! and then peer B will broadcast address of peer A to other peers. + +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use iroha_config::parameters::actual::TrustedPeers; +use iroha_data_model::peer::{Peer, PeerId}; +use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal}; +use iroha_p2p::{Broadcast, UpdatePeers}; +use iroha_primitives::{addr::SocketAddr, unique_vec::UniqueVec}; +use iroha_version::{Decode, Encode}; +use tokio::sync::mpsc; + +use crate::{IrohaNetwork, NetworkMessage}; + +/// [`Gossiper`] actor handle. +#[derive(Clone)] +pub struct PeersGossiperHandle { + message_sender: mpsc::Sender, +} + +impl PeersGossiperHandle { + /// Send [`PeersGossip`] to actor + pub async fn gossip(&self, gossip: PeersGossip) { + self.message_sender + .send(gossip) + .await + .expect("Gossiper must handle messages until there is at least one handle to it") + } +} + +/// Actor which gossips peers addresses. +pub struct PeersGossiper { + /// Peers provided at startup + initial_peers: HashMap, + /// Peers received via gossiping from other peers + gossip_peers: HashMap, + network: IrohaNetwork, +} + +/// Terminology: +/// * Topology - public keys of current network derived from blockchain (Register/Unregister Peer Isi) +/// * Peers addresses - currently known addresses for peers in topology. Might be unknown for some peer. +/// +/// There are three sources of peers addresses: +/// 1. Provided at iroha startup (`TRUSTED_PEERS` env var) +/// 2. Currently connected online peers. +/// Some peer might change address and connect to our peer, +/// such connection will be accepted if peer public key is in topology. +/// 3. Received via gossiping from other peers. +impl PeersGossiper { + /// Start actor. + pub fn start( + trusted_peers: TrustedPeers, + network: IrohaNetwork, + shutdown_signal: ShutdownSignal, + ) -> (PeersGossiperHandle, Child) { + let initial_peers = trusted_peers + .others + .into_iter() + .map(|peer| (peer.id, peer.address)) + .collect(); + let gossiper = Self { + initial_peers, + gossip_peers: HashMap::new(), + network, + }; + gossiper.network_update_peers_addresses(); + + let (message_sender, message_receiver) = mpsc::channel(1); + ( + PeersGossiperHandle { message_sender }, + Child::new( + tokio::task::spawn(gossiper.run(message_receiver, shutdown_signal)), + OnShutdown::Abort, + ), + ) + } + + async fn run( + mut self, + mut message_receiver: mpsc::Receiver, + shutdown_signal: ShutdownSignal, + ) { + let mut gossip_period = tokio::time::interval(Duration::from_secs(60)); + loop { + tokio::select! { + _ = gossip_period.tick() => { + self.gossip_peers() + } + _ = self.network.wait_online_peers_update(|_| ()) => { + self.gossip_peers(); + } + Some(peers_gossip) = message_receiver.recv() => { + self.handle_peers_gossip(peers_gossip); + } + () = shutdown_signal.receive() => { + iroha_logger::debug!("Shutting down peers gossiper"); + break; + }, + } + tokio::task::yield_now().await; + } + } + + fn gossip_peers(&self) { + let online_peers = self.network.online_peers(Clone::clone); + let online_peers = UniqueVec::from_iter(online_peers); + let data = NetworkMessage::PeersGossiper(Box::new(PeersGossip(online_peers))); + self.network.broadcast(Broadcast { data }); + } + + fn handle_peers_gossip(&mut self, PeersGossip(peers): PeersGossip) { + for peer in peers { + self.gossip_peers.insert(peer.id, peer.address); + } + self.network_update_peers_addresses(); + } + + fn network_update_peers_addresses(&self) { + let online_peers = self.network.online_peers(Clone::clone); + let online_peers_ids = online_peers + .into_iter() + .map(|peer| peer.id) + .collect::>(); + + let mut peers = Vec::new(); + for (id, address) in &self.initial_peers { + if !online_peers_ids.contains(id) { + peers.push((id.clone(), address.clone())); + } + } + for (id, address) in &self.gossip_peers { + if !online_peers_ids.contains(id) { + peers.push((id.clone(), address.clone())); + } + } + + let update = UpdatePeers(peers); + self.network.update_peers_addresses(update); + } +} + +/// Message for gossiping peers addresses. +#[derive(Decode, Encode, Debug, Clone)] +pub struct PeersGossip(UniqueVec); diff --git a/crates/iroha_data_model/src/peer.rs b/crates/iroha_data_model/src/peer.rs index b11c3d469fc..5cb7c4103cc 100644 --- a/crates/iroha_data_model/src/peer.rs +++ b/crates/iroha_data_model/src/peer.rs @@ -2,11 +2,7 @@ #[cfg(not(feature = "std"))] use alloc::{format, string::String, vec::Vec}; -use core::{ - borrow::Borrow, - cmp::Ordering, - hash::{Hash, Hasher}, -}; +use core::{borrow::Borrow, hash::Hash}; use derive_more::Display; use iroha_data_model_derive::model; @@ -30,87 +26,95 @@ mod model { /// Equality is tested by `public_key` field only. /// Each peer should have a unique public key. #[derive( - Debug, Display, Clone, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema, Getters, + Debug, + Display, + Clone, + Ord, + PartialOrd, + Eq, + PartialEq, + Hash, + Decode, + Encode, + Deserialize, + Serialize, + IntoSchema, + Getters, )] - #[display(fmt = "{public_key}@@{address}")] + #[display(fmt = "{public_key}")] #[getset(get = "pub")] - #[ffi_type] + #[serde(transparent)] + #[repr(transparent)] + // TODO: Make it transparent in FFI? + #[ffi_type(opaque)] pub struct PeerId { - /// Address of the [`Peer`]'s entrypoint. - pub address: SocketAddr, /// Public Key of the [`Peer`]. pub public_key: PublicKey, } /// Representation of other Iroha Peer instances running in separate processes. #[derive( - Debug, Display, Clone, IdEqOrdHash, Decode, Encode, Deserialize, Serialize, IntoSchema, + Debug, + Display, + Clone, + IdEqOrdHash, + Decode, + Encode, + Deserialize, + Serialize, + IntoSchema, + Getters, )] - #[display(fmt = "@@{}", "id.address")] - #[serde(transparent)] - #[repr(transparent)] - // TODO: Make it transparent in FFI? - #[ffi_type(opaque)] + #[display(fmt = "{id}@@{address}")] + #[getset(get = "pub")] + #[ffi_type] pub struct Peer { + /// Address of the [`Peer`]'s entrypoint. + pub address: SocketAddr, + #[serde(rename = "public_key")] /// Peer Identification. pub id: PeerId, } } impl PeerId { - /// Construct [`PeerId`] given `public_key` and `address`. - #[inline] - pub fn new(address: SocketAddr, public_key: PublicKey) -> Self { - Self { - address, - public_key, - } - } -} - -impl Peer { - /// Construct `Peer` given `id`. + /// Construct [`PeerId`] given `public_key`. #[inline] - pub const fn new(id: PeerId) -> ::With { - Self { id } - } -} - -impl PartialEq for PeerId { - fn eq(&self, other: &Self) -> bool { - // Comparison is done by public key only. - // It is a system invariant that each peer has a unique public key. - // Also it helps to handle peer id comparison without domain name resolution. - self.public_key == other.public_key + pub fn new(public_key: PublicKey) -> Self { + Self { public_key } } } -impl PartialOrd for PeerId { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) +impl From for PeerId { + fn from(public_key: PublicKey) -> Self { + Self { public_key } } } -impl Ord for PeerId { - fn cmp(&self, other: &Self) -> Ordering { - self.public_key.cmp(&other.public_key) +impl Peer { + /// Construct `Peer` given `id` and `address`. + #[inline] + pub fn new(address: SocketAddr, id: impl Into) -> Self { + Self { + address, + id: id.into(), + } } -} -impl Hash for PeerId { - fn hash(&self, state: &mut H) { - self.public_key.hash(state); + /// Get peer public key + pub fn public_key(&self) -> &PublicKey { + &self.id.public_key } } -impl Borrow for PeerId { - fn borrow(&self) -> &PublicKey { - &self.public_key +impl Borrow for Peer { + fn borrow(&self) -> &PeerId { + &self.id } } impl Registered for Peer { - type With = Self; + type With = PeerId; } /// The prelude re-exports most commonly used traits, structs and macros from this crate. diff --git a/crates/iroha_p2p/src/network.rs b/crates/iroha_p2p/src/network.rs index 22b1386c7cb..dc47df5388e 100644 --- a/crates/iroha_p2p/src/network.rs +++ b/crates/iroha_p2p/src/network.rs @@ -8,26 +8,25 @@ use std::{ use futures::{stream::FuturesUnordered, StreamExt}; use iroha_config::parameters::actual::Network as Config; -use iroha_crypto::{KeyPair, PublicKey}; -use iroha_data_model::prelude::PeerId; +use iroha_crypto::KeyPair; +use iroha_data_model::prelude::{Peer, PeerId}; use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal}; use iroha_logger::prelude::*; use iroha_primitives::addr::SocketAddr; -use parity_scale_codec::Encode as _; use tokio::{ net::{TcpListener, TcpStream}, sync::{mpsc, watch}, }; use crate::{ - blake2b_hash, boilerplate::*, peer::{ handles::{connected_from, connecting, PeerHandle}, message::*, Connection, ConnectionId, }, - unbounded_with_len, Broadcast, Error, NetworkMessage, OnlinePeers, Post, UpdateTopology, + unbounded_with_len, Broadcast, Error, NetworkMessage, OnlinePeers, Post, UpdatePeers, + UpdateTopology, }; /// [`NetworkBase`] actor handle. @@ -43,6 +42,8 @@ pub struct NetworkBaseHandle { online_peers_receiver: watch::Receiver, /// [`UpdateTopology`] message sender update_topology_sender: mpsc::UnboundedSender, + /// [`UpdatePeers`] message sender + update_peers_sender: mpsc::UnboundedSender, /// Sender of [`NetworkMessage`] message network_message_sender: unbounded_with_len::Sender>, /// Key exchange used by network @@ -57,6 +58,7 @@ impl Clone for NetworkBaseHandle { subscribe_to_peers_messages_sender: self.subscribe_to_peers_messages_sender.clone(), online_peers_receiver: self.online_peers_receiver.clone(), update_topology_sender: self.update_topology_sender.clone(), + update_peers_sender: self.update_peers_sender.clone(), network_message_sender: self.network_message_sender.clone(), _key_exchange: core::marker::PhantomData::, _encryptor: core::marker::PhantomData::, @@ -74,6 +76,7 @@ impl NetworkBaseHandle { key_pair: KeyPair, Config { address: listen_addr, + external_port, idle_timeout, }: Config, shutdown_signal: ShutdownSignal, @@ -85,12 +88,14 @@ impl NetworkBaseHandle { let (subscribe_to_peers_messages_sender, subscribe_to_peers_messages_receiver) = mpsc::unbounded_channel(); let (update_topology_sender, update_topology_receiver) = mpsc::unbounded_channel(); + let (update_peers_sender, update_peers_receiver) = mpsc::unbounded_channel(); let (network_message_sender, network_message_receiver) = unbounded_with_len::unbounded_channel(); let (peer_message_sender, peer_message_receiver) = mpsc::channel(1); let (service_message_sender, service_message_receiver) = mpsc::channel(1); let network = NetworkBase { listen_addr: listen_addr.into_value(), + external_port: external_port.into_value(), listener, peers: HashMap::new(), connecting_peers: HashMap::new(), @@ -99,13 +104,15 @@ impl NetworkBaseHandle { subscribe_to_peers_messages_receiver, online_peers_sender, update_topology_receiver, + update_peers_receiver, network_message_receiver, peer_message_receiver, peer_message_sender, service_message_receiver, service_message_sender, current_conn_id: 0, - current_topology: HashMap::new(), + current_topology: HashSet::new(), + current_peers_addresses: Vec::new(), idle_timeout, _key_exchange: core::marker::PhantomData::, _encryptor: core::marker::PhantomData::, @@ -119,6 +126,7 @@ impl NetworkBaseHandle { subscribe_to_peers_messages_sender, online_peers_receiver, update_topology_sender, + update_peers_sender, network_message_sender, _key_exchange: core::marker::PhantomData, _encryptor: core::marker::PhantomData, @@ -157,6 +165,13 @@ impl NetworkBaseHandle { .expect("NetworkBase must accept messages until there is at least one handle to it") } + /// Send [`UpdatePeers`] message on network actor. + pub fn update_peers_addresses(&self, peers: UpdatePeers) { + self.update_peers_sender + .send(peers) + .expect("NetworkBase must accept messages until there is at least one handle to it") + } + /// Receive latest update of [`OnlinePeers`] pub fn online_peers

(&self, f: impl FnOnce(&OnlinePeers) -> P) -> P { f(&self.online_peers_receiver.borrow()) @@ -179,10 +194,12 @@ impl NetworkBaseHandle { struct NetworkBase { /// Listening address for incoming connections. Must parse into [`std::net::SocketAddr`] listen_addr: SocketAddr, + /// Might be different from port of `listen_addr` in case when peer is inside docker or behind reverse proxy. + external_port: u16, /// Current [`Peer`]s in [`Peer::Ready`] state. - peers: HashMap>, + peers: HashMap>, /// [`Peer`]s in process of being connected. - connecting_peers: HashMap, + connecting_peers: HashMap, /// [`TcpListener`] that is accepting [`Peer`]s' connections listener: TcpListener, /// Our app-level key pair @@ -195,6 +212,8 @@ struct NetworkBase { online_peers_sender: watch::Sender, /// [`UpdateTopology`] message receiver update_topology_receiver: mpsc::UnboundedReceiver, + /// [`UpdatePeers`] message receiver + update_peers_receiver: mpsc::UnboundedReceiver, /// Receiver of [`Post`] message network_message_receiver: unbounded_with_len::Receiver>, /// Channel to gather messages from all peers @@ -208,8 +227,12 @@ struct NetworkBase { /// Current available connection id current_conn_id: ConnectionId, /// Current topology - /// Bool determines who is responsible for initiating connection - current_topology: HashMap, + current_topology: HashSet, + /// Can have two addresses for same `PeerId`. + /// * One initially provided via config + /// * Second received from other peers via gossiping + /// Will try to establish connection via both addresses. + current_peers_addresses: Vec<(PeerId, SocketAddr)>, /// Duration after which terminate connection with idle peer idle_timeout: Duration, /// Key exchange used by network @@ -236,6 +259,9 @@ impl NetworkBase { Some(update_topology) = self.update_topology_receiver.recv() => { self.set_current_topology(update_topology); } + Some(update_peers) = self.update_peers_receiver.recv() => { + self.set_current_peers_addresses(update_peers); + } // Frequency of update is relatively low, so it won't block other tasks from execution _ = update_topology_interval.tick() => { self.update_topology() @@ -302,6 +328,7 @@ impl NetworkBase { let service_message_sender = self.service_message_sender.clone(); connected_from::( addr.clone(), + self.external_port, self.key_pair.clone(), Connection::new(conn_id, stream), service_message_sender, @@ -311,43 +338,41 @@ impl NetworkBase { fn set_current_topology(&mut self, UpdateTopology(topology): UpdateTopology) { iroha_logger::debug!(?topology, "Network receive new topology"); - let self_public_key_hash = blake2b_hash(self.key_pair.public_key().encode()); let topology = topology .into_iter() .filter(|peer_id| peer_id.public_key() != self.key_pair.public_key()) - .map(|peer_id| { - // Determine who is responsible for connecting - let peer_public_key_hash = blake2b_hash(peer_id.public_key().encode()); - let is_active = self_public_key_hash >= peer_public_key_hash; - (peer_id, is_active) - }) .collect(); self.current_topology = topology; self.update_topology() } + fn set_current_peers_addresses(&mut self, UpdatePeers(peers): UpdatePeers) { + debug!(?peers, "Network receive new peers addresses"); + self.current_peers_addresses = peers; + self.update_topology() + } + fn update_topology(&mut self) { let to_connect = self - .current_topology + .current_peers_addresses .iter() // Peer is not connected but should - .filter_map(|(peer, is_active)| { - (!self.peers.contains_key(&peer.public_key) + .filter(|(id, address)| { + self.current_topology.contains(id) + && !self.peers.contains_key(id) && !self .connecting_peers .values() - .any(|public_key| peer.public_key() == public_key) - && *is_active) - .then_some(peer) + .any(|peer| (&peer.id, &peer.address) == (id, address)) }) - .cloned() + .map(|(id, address)| Peer::new(address.clone(), id.clone())) .collect::>(); let to_disconnect = self .peers .keys() // Peer is connected but shouldn't - .filter(|public_key| !self.current_topology.contains_key(*public_key)) + .filter(|&peer_id| !self.current_topology.contains(peer_id)) .cloned() .collect::>(); @@ -360,19 +385,19 @@ impl NetworkBase { } } - fn connect_peer(&mut self, peer: &PeerId) { + fn connect_peer(&mut self, peer: &Peer) { iroha_logger::trace!( listen_addr = %self.listen_addr, peer.id.address = %peer.address, "Creating new peer actor", ); let conn_id = self.get_conn_id(); - self.connecting_peers - .insert(conn_id, peer.public_key().clone()); + self.connecting_peers.insert(conn_id, peer.clone()); let service_message_sender = self.service_message_sender.clone(); connecting::( // NOTE: we intentionally use peer's address and our public key, it's used during handshake peer.address.clone(), + self.external_port, self.key_pair.clone(), conn_id, service_message_sender, @@ -380,15 +405,14 @@ impl NetworkBase { ); } - fn disconnect_peer(&mut self, public_key: &PublicKey) { - let peer = match self.peers.remove(public_key) { + fn disconnect_peer(&mut self, peer_id: &PeerId) { + let peer = match self.peers.remove(peer_id) { Some(peer) => peer, - _ => return iroha_logger::warn!(?public_key, "Not found peer to disconnect"), + _ => return iroha_logger::warn!(?peer_id, "Not found peer to disconnect"), }; iroha_logger::debug!(listen_addr = %self.listen_addr, %peer.conn_id, "Disconnecting peer"); - let peer_id = PeerId::new(peer.p2p_addr, public_key.clone()); - Self::remove_online_peer(&self.online_peers_sender, &peer_id); + Self::remove_online_peer(&self.online_peers_sender, peer_id); } #[log(skip_all, fields(peer=%peer, conn_id=connection_id, disambiguator=disambiguator))] @@ -404,7 +428,7 @@ impl NetworkBase { ) { self.connecting_peers.remove(&connection_id); - if !self.current_topology.contains_key(&peer.id) { + if !self.current_topology.contains(&peer.id) { iroha_logger::warn!(%peer.id, topology=?self.current_topology, "Peer not present in topology is trying to connect"); return; } @@ -535,14 +559,20 @@ impl NetworkBase { pub mod message { //! Module for network messages + use iroha_data_model::peer::Peer; + use super::*; /// Current online network peers - pub type OnlinePeers = HashSet; + pub type OnlinePeers = HashSet; /// The message that is sent to [`NetworkBase`] to update p2p topology of the network. #[derive(Clone, Debug)] - pub struct UpdateTopology(pub OnlinePeers); + pub struct UpdateTopology(pub HashSet); + + /// The message that is sent to [`NetworkBase`] to update peers addresses of the network. + #[derive(Clone, Debug)] + pub struct UpdatePeers(pub Vec<(PeerId, SocketAddr)>); /// The message to be sent to the other [`Peer`]. #[derive(Clone, Debug)] diff --git a/crates/iroha_p2p/src/peer.rs b/crates/iroha_p2p/src/peer.rs index 08b7afdace1..3d35aafa7df 100644 --- a/crates/iroha_p2p/src/peer.rs +++ b/crates/iroha_p2p/src/peer.rs @@ -34,6 +34,7 @@ pub mod handles { /// Start Peer in [`state::Connecting`] state pub fn connecting( peer_addr: SocketAddr, + our_external_port: u16, key_pair: KeyPair, connection_id: ConnectionId, service_message_sender: mpsc::Sender>, @@ -41,6 +42,7 @@ pub mod handles { ) { let peer = state::Connecting { peer_addr, + our_external_port, key_pair, connection_id, }; @@ -55,6 +57,7 @@ pub mod handles { /// Start Peer in [`state::ConnectedFrom`] state pub fn connected_from( peer_addr: SocketAddr, + our_external_port: u16, key_pair: KeyPair, connection: Connection, service_message_sender: mpsc::Sender>, @@ -62,6 +65,7 @@ pub mod handles { ) { let peer = state::ConnectedFrom { peer_addr, + our_external_port, key_pair, connection, }; @@ -454,6 +458,7 @@ mod state { //! Module for peer stages. use iroha_crypto::{KeyGenOption, KeyPair, PublicKey, Signature}; + use iroha_data_model::peer::Peer; use iroha_primitives::addr::SocketAddr; use super::{cryptographer::Cryptographer, *}; @@ -462,6 +467,7 @@ mod state { /// outgoing peer. pub(super) struct Connecting { pub peer_addr: SocketAddr, + pub our_external_port: u16, pub key_pair: KeyPair, pub connection_id: ConnectionId, } @@ -470,6 +476,7 @@ mod state { pub(super) async fn connect_to( Self { peer_addr, + our_external_port, key_pair, connection_id, }: Self, @@ -478,6 +485,7 @@ mod state { let connection = Connection::new(connection_id, stream); Ok(ConnectedTo { peer_addr, + our_external_port, key_pair, connection, }) @@ -487,6 +495,7 @@ mod state { /// Peer that is being connected to. pub(super) struct ConnectedTo { peer_addr: SocketAddr, + our_external_port: u16, key_pair: KeyPair, connection: Connection, } @@ -496,6 +505,7 @@ mod state { pub(super) async fn send_client_hello( Self { peer_addr, + our_external_port, key_pair, mut connection, }: Self, @@ -518,6 +528,7 @@ mod state { let cryptographer = Cryptographer::new(&shared_key); Ok(SendKey { peer_addr, + our_external_port, key_pair, kx_local_pk, kx_remote_pk, @@ -530,6 +541,7 @@ mod state { /// Peer that is being connected from pub(super) struct ConnectedFrom { pub peer_addr: SocketAddr, + pub our_external_port: u16, pub key_pair: KeyPair, pub connection: Connection, } @@ -539,6 +551,7 @@ mod state { pub(super) async fn read_client_hello( Self { peer_addr, + our_external_port, key_pair, mut connection, .. @@ -560,6 +573,7 @@ mod state { let cryptographer = Cryptographer::new(&shared_key); Ok(SendKey { peer_addr, + our_external_port, key_pair, kx_local_pk, kx_remote_pk, @@ -572,6 +586,7 @@ mod state { /// Peer that needs to send key. pub(super) struct SendKey { peer_addr: SocketAddr, + our_external_port: u16, key_pair: KeyPair, kx_local_pk: K::PublicKey, kx_remote_pk: K::PublicKey, @@ -583,6 +598,7 @@ mod state { pub(super) async fn send_our_public_key( Self { peer_addr, + our_external_port, key_pair, kx_local_pk, kx_remote_pk, @@ -594,7 +610,7 @@ mod state { let payload = create_payload::(&kx_local_pk, &kx_remote_pk); let signature = Signature::new(key_pair.private_key(), &payload); - let data = (key_pair.public_key(), signature).encode(); + let data = (key_pair.public_key(), signature, our_external_port).encode(); let data = &cryptographer.encrypt(data.as_slice())?; @@ -627,7 +643,7 @@ mod state { /// Read the peer's public key pub(super) async fn read_their_public_key( Self { - peer_addr, + mut peer_addr, mut connection, kx_local_pk, kx_remote_pk, @@ -642,17 +658,18 @@ mod state { let data = cryptographer.decrypt(data.as_slice())?; - let (remote_pub_key, signature): (PublicKey, Signature) = + let (remote_pub_key, signature, remote_external_port): (PublicKey, Signature, u16) = DecodeAll::decode_all(&mut data.as_slice())?; // Swap order of keys since we are verifying for other peer order remote/local keys is reversed let payload = create_payload::(&kx_remote_pk, &kx_local_pk); signature.verify(&remote_pub_key, &payload)?; - let peer_id = PeerId::new(peer_addr, remote_pub_key); + peer_addr.set_port(remote_external_port); + let peer = Peer::new(peer_addr, remote_pub_key); Ok(Ready { - peer_id, + peer, connection, cryptographer, }) diff --git a/crates/iroha_p2p/tests/integration/p2p.rs b/crates/iroha_p2p/tests/integration/p2p.rs index cd33e3843ea..8fc1c89fa21 100644 --- a/crates/iroha_p2p/tests/integration/p2p.rs +++ b/crates/iroha_p2p/tests/integration/p2p.rs @@ -11,7 +11,7 @@ use futures::{prelude::*, stream::FuturesUnordered, task::AtomicWaker}; use iroha_config::parameters::actual::Network as Config; use iroha_config_base::WithOrigin; use iroha_crypto::KeyPair; -use iroha_data_model::prelude::PeerId; +use iroha_data_model::prelude::Peer; use iroha_futures::supervisor::ShutdownSignal; use iroha_logger::{prelude::*, test_logger}; use iroha_p2p::{network::message::*, NetworkHandle}; @@ -44,6 +44,7 @@ async fn network_create() { let idle_timeout = Duration::from_secs(60); let config = Config { address: WithOrigin::inline(address.clone()), + external_port: WithOrigin::inline(address.port()), idle_timeout, }; let (network, _) = NetworkHandle::start(key_pair, config, ShutdownSignal::new()) @@ -52,15 +53,14 @@ async fn network_create() { tokio::time::sleep(delay).await; info!("Connecting to peer..."); - let peer1 = PeerId::new(address.clone(), public_key.clone()); - let topology = HashSet::from([peer1.clone()]); - network.update_topology(UpdateTopology(topology)); + let peer1 = Peer::new(address.clone(), public_key.clone()); + update_topology_and_peers_addresses(&network, vec![peer1.clone()]); tokio::time::sleep(delay).await; info!("Posting message..."); network.post(Post { data: TestMessage("Some data to send to peer".to_owned()), - peer_id: peer1, + peer_id: peer1.id, }); tokio::time::sleep(delay).await; @@ -157,6 +157,7 @@ async fn two_networks() { let address1 = socket_addr!(127.0.0.1:12_005); let config1 = Config { address: WithOrigin::inline(address1.clone()), + external_port: WithOrigin::inline(address1.port()), idle_timeout, }; let (mut network1, _) = NetworkHandle::start(key_pair1, config1, ShutdownSignal::new()) @@ -167,6 +168,7 @@ async fn two_networks() { let address2 = socket_addr!(127.0.0.1:12_010); let config2 = Config { address: WithOrigin::inline(address2.clone()), + external_port: WithOrigin::inline(address2.port()), idle_timeout, }; let (network2, _) = NetworkHandle::start(key_pair2, config2, ShutdownSignal::new()) @@ -178,13 +180,11 @@ async fn two_networks() { network2.subscribe_to_peers_messages(actor2); info!("Connecting peers..."); - let peer1 = PeerId::new(address1.clone(), public_key1); - let peer2 = PeerId::new(address2.clone(), public_key2); - let topology1 = HashSet::from([peer2.clone()]); - let topology2 = HashSet::from([peer1.clone()]); + let peer1 = Peer::new(address1.clone(), public_key1); + let peer2 = Peer::new(address2.clone(), public_key2); // Connect peers with each other - network1.update_topology(UpdateTopology(topology1.clone())); - network2.update_topology(UpdateTopology(topology2)); + update_topology_and_peers_addresses(&network1, vec![peer2.clone()]); + update_topology_and_peers_addresses(&network2, vec![peer1.clone()]); tokio::time::timeout(Duration::from_millis(2000), async { let mut connections = network1.wait_online_peers_update(HashSet::len).await; @@ -198,7 +198,7 @@ async fn two_networks() { info!("Posting message..."); network1.post(Post { data: TestMessage("Some data to send to peer".to_owned()), - peer_id: peer2, + peer_id: peer2.id, }); tokio::time::timeout(delay, &mut messages2) @@ -215,13 +215,6 @@ async fn two_networks() { let connected_peers2 = network2.online_peers(HashSet::len); assert_eq!(connected_peers2, 1); - - // Connecting to the same peer from network1 - network1.update_topology(UpdateTopology(topology1)); - tokio::time::sleep(delay).await; - - let connected_peers = network1.online_peers(HashSet::len); - assert_eq!(connected_peers, 1); } #[tokio::test(flavor = "multi_thread", worker_threads = 8)] @@ -235,7 +228,7 @@ async fn multiple_networks() { let address = socket_addr!(127.0.0.1: 12_015 + ( i * 5)); let key_pair = KeyPair::random(); let public_key = key_pair.public_key().clone(); - peers.push(PeerId::new(address, public_key)); + peers.push(Peer::new(address, public_key)); key_pairs.push(key_pair); } @@ -274,7 +267,7 @@ async fn multiple_networks() { for id in &peer_ids { let post = Post { data: TestMessage(String::from("Some data to send to peer")), - peer_id: id.clone(), + peer_id: id.id.clone(), }; network.post(post); } @@ -294,22 +287,23 @@ async fn multiple_networks() { } async fn start_network( - peer: PeerId, + peer: Peer, key_pair: KeyPair, - peers: Vec, + peers: Vec, messages: WaitForN, barrier: Arc, shutdown_signal: ShutdownSignal, -) -> (PeerId, NetworkHandle) { +) -> (Peer, NetworkHandle) { info!(peer_addr = %peer.address, "Starting network"); // This actor will get the messages from other peers and increment the counter let actor = TestActor::start(messages); - let PeerId { address, .. } = peer.clone(); + let Peer { address, .. } = peer.clone(); let idle_timeout = Duration::from_secs(60); let config = Config { - address: WithOrigin::inline(address), + address: WithOrigin::inline(address.clone()), + external_port: WithOrigin::inline(address.port()), idle_timeout, }; let (mut network, _) = NetworkHandle::start(key_pair, config, shutdown_signal) @@ -318,12 +312,9 @@ async fn start_network( network.subscribe_to_peers_messages(actor); let _ = barrier.wait().await; - let topology = peers - .into_iter() - .filter(|p| p != &peer) - .collect::>(); - let conn_count = topology.len(); - network.update_topology(UpdateTopology(topology)); + let peers = peers.into_iter().filter(|p| p != &peer).collect::>(); + let conn_count = peers.len(); + update_topology_and_peers_addresses(&network, peers); let _ = barrier.wait().await; tokio::time::timeout(Duration::from_millis(10_000), async { @@ -341,6 +332,17 @@ async fn start_network( (peer, network) } +fn update_topology_and_peers_addresses(network: &NetworkHandle, peers: Vec) { + let topology = peers.iter().map(|peer| peer.id.clone()).collect(); + network.update_topology(UpdateTopology(topology)); + + let addresses = peers + .iter() + .map(|peer| (peer.id.clone(), peer.address.clone())) + .collect(); + network.update_peers_addresses(UpdatePeers(addresses)); +} + #[test] fn test_encryption() { use iroha_crypto::encryption::{ChaCha20Poly1305, SymmetricEncryptor}; diff --git a/crates/iroha_swarm/src/lib.rs b/crates/iroha_swarm/src/lib.rs index 185d5b1e6e5..fe92faa1c64 100644 --- a/crates/iroha_swarm/src/lib.rs +++ b/crates/iroha_swarm/src/lib.rs @@ -192,7 +192,7 @@ mod tests { GENESIS_PUBLIC_KEY: ed0120F9F92758E815121F637C9704DFDA54842BA937AA721C0603018E208D6E25787E GENESIS_PRIVATE_KEY: 802620FB8B867188E4952F1E83534B9B2E0A12D5122BD6F417CBC79D50D8A8C9C917B0 GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad0:1337","public_key":"ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"}]' + TOPOLOGY: '["ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"]' ports: - 1337:1337 - 8080:8080 @@ -246,7 +246,7 @@ mod tests { GENESIS_PUBLIC_KEY: ed0120F9F92758E815121F637C9704DFDA54842BA937AA721C0603018E208D6E25787E GENESIS_PRIVATE_KEY: 802620FB8B867188E4952F1E83534B9B2E0A12D5122BD6F417CBC79D50D8A8C9C917B0 GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad0:1337","public_key":"ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"}]' + TOPOLOGY: '["ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"]' ports: - 1337:1337 - 8080:8080 @@ -300,7 +300,7 @@ mod tests { TRUSTED_PEERS: '[{"address":"irohad3:1340","public_key":"ed012063ED3DFEDEBD8A86B4941CC4379D2EF0B74BDFE61F033FC0C89867D57C882A26"},{"address":"irohad1:1338","public_key":"ed012064BD9B25BF8477144D03B26FC8CF5D8A354B2F780DA310EE69933DC1E86FBCE2"},{"address":"irohad2:1339","public_key":"ed01208EA177921AF051CD12FC07E3416419320908883A1104B31401B650EEB820A300"}]' GENESIS_PRIVATE_KEY: 802620FB8B867188E4952F1E83534B9B2E0A12D5122BD6F417CBC79D50D8A8C9C917B0 GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad3:1340","public_key":"ed012063ED3DFEDEBD8A86B4941CC4379D2EF0B74BDFE61F033FC0C89867D57C882A26"},{"address":"irohad1:1338","public_key":"ed012064BD9B25BF8477144D03B26FC8CF5D8A354B2F780DA310EE69933DC1E86FBCE2"},{"address":"irohad0:1337","public_key":"ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"},{"address":"irohad2:1339","public_key":"ed01208EA177921AF051CD12FC07E3416419320908883A1104B31401B650EEB820A300"}]' + TOPOLOGY: '["ed012063ED3DFEDEBD8A86B4941CC4379D2EF0B74BDFE61F033FC0C89867D57C882A26","ed012064BD9B25BF8477144D03B26FC8CF5D8A354B2F780DA310EE69933DC1E86FBCE2","ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA","ed01208EA177921AF051CD12FC07E3416419320908883A1104B31401B650EEB820A300"]' ports: - 1337:1337 - 8080:8080 @@ -412,7 +412,7 @@ mod tests { GENESIS_PUBLIC_KEY: ed0120F9F92758E815121F637C9704DFDA54842BA937AA721C0603018E208D6E25787E GENESIS_PRIVATE_KEY: 802620FB8B867188E4952F1E83534B9B2E0A12D5122BD6F417CBC79D50D8A8C9C917B0 GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad0:1337","public_key":"ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"}]' + TOPOLOGY: '["ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"]' ports: - 1337:1337 - 8080:8080 @@ -469,7 +469,7 @@ mod tests { TRUSTED_PEERS: '[{"address":"irohad3:1340","public_key":"ed012063ED3DFEDEBD8A86B4941CC4379D2EF0B74BDFE61F033FC0C89867D57C882A26"},{"address":"irohad1:1338","public_key":"ed012064BD9B25BF8477144D03B26FC8CF5D8A354B2F780DA310EE69933DC1E86FBCE2"},{"address":"irohad2:1339","public_key":"ed01208EA177921AF051CD12FC07E3416419320908883A1104B31401B650EEB820A300"}]' GENESIS_PRIVATE_KEY: 802620FB8B867188E4952F1E83534B9B2E0A12D5122BD6F417CBC79D50D8A8C9C917B0 GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad3:1340","public_key":"ed012063ED3DFEDEBD8A86B4941CC4379D2EF0B74BDFE61F033FC0C89867D57C882A26"},{"address":"irohad1:1338","public_key":"ed012064BD9B25BF8477144D03B26FC8CF5D8A354B2F780DA310EE69933DC1E86FBCE2"},{"address":"irohad0:1337","public_key":"ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA"},{"address":"irohad2:1339","public_key":"ed01208EA177921AF051CD12FC07E3416419320908883A1104B31401B650EEB820A300"}]' + TOPOLOGY: '["ed012063ED3DFEDEBD8A86B4941CC4379D2EF0B74BDFE61F033FC0C89867D57C882A26","ed012064BD9B25BF8477144D03B26FC8CF5D8A354B2F780DA310EE69933DC1E86FBCE2","ed012087FDCACF58B891947600B0C37795CADB5A2AE6DE612338FDA9489AB21CE427BA","ed01208EA177921AF051CD12FC07E3416419320908883A1104B31401B650EEB820A300"]' ports: - 1337:1337 - 8080:8080 diff --git a/crates/iroha_swarm/src/schema.rs b/crates/iroha_swarm/src/schema.rs index 3ddb61bf48d..a24ceaf9167 100644 --- a/crates/iroha_swarm/src/schema.rs +++ b/crates/iroha_swarm/src/schema.rs @@ -182,7 +182,7 @@ struct GenesisEnv<'a> { genesis_private_key: &'a iroha_crypto::ExposedPrivateKey, genesis: ContainerFile<'a>, #[serde_as(as = "serde_with::json::JsonString")] - topology: std::collections::BTreeSet<&'a iroha_data_model::peer::Peer>, + topology: std::collections::BTreeSet<&'a iroha_data_model::peer::PeerId>, } impl<'a> GenesisEnv<'a> { @@ -197,7 +197,7 @@ impl<'a> GenesisEnv<'a> { base: PeerEnv::new(key_pair, ports, chain, genesis_public_key, topology), genesis_private_key, genesis: CONTAINER_SIGNED_GENESIS, - topology: topology.iter().collect(), + topology: topology.iter().map(|peer| peer.id()).collect(), } } } diff --git a/crates/irohad/src/main.rs b/crates/irohad/src/main.rs index 45478a26e4e..ff1961ef513 100644 --- a/crates/irohad/src/main.rs +++ b/crates/irohad/src/main.rs @@ -19,6 +19,7 @@ use iroha_core::{ gossiper::{TransactionGossiper, TransactionGossiperHandle}, kiso::KisoHandle, kura::Kura, + peers_gossiper::{PeersGossiper, PeersGossiperHandle}, query::store::LiveQueryStore, queue::Queue, smartcontracts::isi::Registrable as _, @@ -132,6 +133,7 @@ struct NetworkRelay { sumeragi: SumeragiHandle, block_sync: BlockSynchronizerHandle, tx_gossiper: TransactionGossiperHandle, + peers_gossiper: PeersGossiperHandle, network: IrohaNetwork, } @@ -157,6 +159,7 @@ impl NetworkRelay { } BlockSync(data) => self.block_sync.message(*data).await, TransactionGossiper(data) => self.tx_gossiper.gossip(*data).await, + PeersGossiper(data) => self.peers_gossiper.gossip(*data).await, Health => {} } } @@ -299,11 +302,19 @@ impl Iroha { .start(supervisor.shutdown_signal()); supervisor.monitor(child); + let (peers_gossiper, child) = PeersGossiper::start( + config.sumeragi.trusted_peers.value().clone(), + network.clone(), + supervisor.shutdown_signal(), + ); + supervisor.monitor(child); + supervisor.monitor(task::spawn( NetworkRelay { sumeragi, block_sync, tx_gossiper, + peers_gossiper, network, } .run(), @@ -725,6 +736,7 @@ mod tests { .write("public_key", pubkey) .write("private_key", ExposedPrivateKey(privkey)) .write(["network", "address"], socket_addr!(127.0.0.1:1337)) + .write(["network", "external_port"], 1337) .write(["torii", "address"], socket_addr!(127.0.0.1:8080)) .write(["genesis", "public_key"], genesis_public_key); table diff --git a/defaults/docker-compose.local.yml b/defaults/docker-compose.local.yml index b75e03448c8..7f1284f6b1a 100644 --- a/defaults/docker-compose.local.yml +++ b/defaults/docker-compose.local.yml @@ -11,13 +11,14 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D PRIVATE_KEY: 802620A4DFC16789FBF9A588525E4AC7F791AC51B12AEE8919EACC03EB2FC31D32C692 + P2P_EXTERNAL_PORT: 1337 P2P_ADDRESS: 0.0.0.0:1337 API_ADDRESS: 0.0.0.0:8080 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 TRUSTED_PEERS: '[{"address":"irohad2:1339","public_key":"ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10"},{"address":"irohad1:1338","public_key":"ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D"},{"address":"irohad3:1340","public_key":"ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE"}]' GENESIS_PRIVATE_KEY: 80262082B3BDE54AEBECA4146257DA0DE8D59D8E46D5FE34887DCD8072866792FCB3AD GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad2:1339","public_key":"ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10"},{"address":"irohad1:1338","public_key":"ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D"},{"address":"irohad0:1337","public_key":"ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D"},{"address":"irohad3:1340","public_key":"ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE"}]' + TOPOLOGY: '["ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10","ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D","ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D","ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE"]' ports: - 1337:1337 - 8080:8080 @@ -56,6 +57,7 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D PRIVATE_KEY: 8026203ECA64ADC23DC106C9D703233375EA6AC345AD7299FF3AD45F355DE6CD1B5510 + P2P_EXTERNAL_PORT: 1338 P2P_ADDRESS: 0.0.0.0:1338 API_ADDRESS: 0.0.0.0:8081 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 @@ -82,6 +84,7 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10 PRIVATE_KEY: 8026207B1C78F733EDAFD6AF9BAC3A0D6C5A494557DD031609A4FDD9796EEF471D928C + P2P_EXTERNAL_PORT: 1339 P2P_ADDRESS: 0.0.0.0:1339 API_ADDRESS: 0.0.0.0:8082 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 @@ -108,6 +111,7 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE PRIVATE_KEY: 8026206C7FF4CA09D395C7B7332C654099406E929C6238942E3CE85155CC1A5E2CF519 + P2P_EXTERNAL_PORT: 1340 P2P_ADDRESS: 0.0.0.0:1340 API_ADDRESS: 0.0.0.0:8083 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 diff --git a/defaults/docker-compose.single.yml b/defaults/docker-compose.single.yml index a38642f9f23..85d706e2101 100644 --- a/defaults/docker-compose.single.yml +++ b/defaults/docker-compose.single.yml @@ -11,12 +11,13 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D PRIVATE_KEY: 802620A4DFC16789FBF9A588525E4AC7F791AC51B12AEE8919EACC03EB2FC31D32C692 + P2P_EXTERNAL_PORT: 1337 P2P_ADDRESS: 0.0.0.0:1337 API_ADDRESS: 0.0.0.0:8080 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 GENESIS_PRIVATE_KEY: 80262082B3BDE54AEBECA4146257DA0DE8D59D8E46D5FE34887DCD8072866792FCB3AD GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad0:1337","public_key":"ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D"}]' + TOPOLOGY: '["ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D"]' ports: - 1337:1337 - 8080:8080 diff --git a/defaults/docker-compose.yml b/defaults/docker-compose.yml index 960f34fb990..76a735186e5 100644 --- a/defaults/docker-compose.yml +++ b/defaults/docker-compose.yml @@ -9,13 +9,14 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D PRIVATE_KEY: 802620A4DFC16789FBF9A588525E4AC7F791AC51B12AEE8919EACC03EB2FC31D32C692 + P2P_EXTERNAL_PORT: 1337 P2P_ADDRESS: 0.0.0.0:1337 API_ADDRESS: 0.0.0.0:8080 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 TRUSTED_PEERS: '[{"address":"irohad2:1339","public_key":"ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10"},{"address":"irohad1:1338","public_key":"ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D"},{"address":"irohad3:1340","public_key":"ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE"}]' GENESIS_PRIVATE_KEY: 80262082B3BDE54AEBECA4146257DA0DE8D59D8E46D5FE34887DCD8072866792FCB3AD GENESIS: /tmp/genesis.signed.scale - TOPOLOGY: '[{"address":"irohad2:1339","public_key":"ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10"},{"address":"irohad1:1338","public_key":"ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D"},{"address":"irohad0:1337","public_key":"ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D"},{"address":"irohad3:1340","public_key":"ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE"}]' + TOPOLOGY: '["ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10","ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D","ed0120A98BAFB0663CE08D75EBD506FEC38A84E576A7C9B0897693ED4B04FD9EF2D18D","ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE"]' ports: - 1337:1337 - 8080:8080 @@ -51,6 +52,7 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed01209897952D14BDFAEA780087C38FF3EB800CB20B882748FC95A575ADB9CD2CB21D PRIVATE_KEY: 8026203ECA64ADC23DC106C9D703233375EA6AC345AD7299FF3AD45F355DE6CD1B5510 + P2P_EXTERNAL_PORT: 1338 P2P_ADDRESS: 0.0.0.0:1338 API_ADDRESS: 0.0.0.0:8081 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 @@ -74,6 +76,7 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed01204EE2FCD53E1730AF142D1E23951198678295047F9314B4006B0CB61850B1DB10 PRIVATE_KEY: 8026207B1C78F733EDAFD6AF9BAC3A0D6C5A494557DD031609A4FDD9796EEF471D928C + P2P_EXTERNAL_PORT: 1339 P2P_ADDRESS: 0.0.0.0:1339 API_ADDRESS: 0.0.0.0:8082 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 @@ -97,6 +100,7 @@ services: CHAIN: 00000000-0000-0000-0000-000000000000 PUBLIC_KEY: ed0120CACF3A84B8DC8710CE9D6B968EE95EC7EE4C93C85858F026F3B4417F569592CE PRIVATE_KEY: 8026206C7FF4CA09D395C7B7332C654099406E929C6238942E3CE85155CC1A5E2CF519 + P2P_EXTERNAL_PORT: 1340 P2P_ADDRESS: 0.0.0.0:1340 API_ADDRESS: 0.0.0.0:8083 GENESIS_PUBLIC_KEY: ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4 diff --git a/docs/source/references/schema.json b/docs/source/references/schema.json index ecd4b45257b..333f6ebe1dc 100644 --- a/docs/source/references/schema.json +++ b/docs/source/references/schema.json @@ -228,24 +228,12 @@ } ] }, - "Array": { - "Array": { - "type": "u16", - "len": 8 - } - }, "Array": { "Array": { "type": "u8", "len": 32 } }, - "Array": { - "Array": { - "type": "u8", - "len": 4 - } - }, "Asset": { "Struct": [ { @@ -2334,8 +2322,6 @@ ] }, "IpfsPath": "String", - "Ipv4Addr": "Array", - "Ipv6Addr": "Array", "Json": "String", "Level": { "Enum": [ @@ -2810,14 +2796,6 @@ } ] }, - "Peer": { - "Struct": [ - { - "name": "id", - "type": "PeerId" - } - ] - }, "PeerEvent": { "Enum": [ { @@ -2861,10 +2839,6 @@ }, "PeerId": { "Struct": [ - { - "name": "address", - "type": "SocketAddr" - }, { "name": "public_key", "type": "PublicKey" @@ -3116,7 +3090,7 @@ { "tag": "Peer", "discriminant": 8, - "type": "Vec" + "type": "Vec" }, { "tag": "RoleId", @@ -3459,7 +3433,7 @@ "Struct": [ { "name": "object", - "type": "Peer" + "type": "PeerId" } ] }, @@ -4129,61 +4103,6 @@ } ] }, - "SocketAddr": { - "Enum": [ - { - "tag": "Ipv4", - "discriminant": 0, - "type": "SocketAddrV4" - }, - { - "tag": "Ipv6", - "discriminant": 1, - "type": "SocketAddrV6" - }, - { - "tag": "Host", - "discriminant": 2, - "type": "SocketAddrHost" - } - ] - }, - "SocketAddrHost": { - "Struct": [ - { - "name": "host", - "type": "String" - }, - { - "name": "port", - "type": "u16" - } - ] - }, - "SocketAddrV4": { - "Struct": [ - { - "name": "ip", - "type": "Ipv4Addr" - }, - { - "name": "port", - "type": "u16" - } - ] - }, - "SocketAddrV6": { - "Struct": [ - { - "name": "ip", - "type": "Ipv6Addr" - }, - { - "name": "port", - "type": "u16" - } - ] - }, "SortedMap": { "Map": { "key": "CustomParameterId", @@ -4960,9 +4879,6 @@ "Vec": { "Vec": "Parameter" }, - "Vec": { - "Vec": "Peer" - }, "Vec": { "Vec": "PeerId" }, @@ -4999,9 +4915,6 @@ ] }, "WasmSmartContract": "Vec", - "u16": { - "Int": "FixedWidth" - }, "u32": { "Int": "FixedWidth" },