Skip to content

Commit

Permalink
feat!: Gossip peer addresses
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Murzin <[email protected]>
  • Loading branch information
dima74 committed Oct 21, 2024
1 parent 8f7da1c commit de34371
Show file tree
Hide file tree
Showing 23 changed files with 390 additions and 246 deletions.
26 changes: 5 additions & 21 deletions crates/iroha_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use erased_serde::Serialize;
use error_stack::{fmt::ColorMode, IntoReportCompat, ResultExt};
use eyre::{eyre, Error, Result, WrapErr};
use iroha::{client::Client, config::Config, data_model::prelude::*};
use iroha_primitives::{addr::SocketAddr, json::Json};
use iroha_primitives::json::Json;
use thiserror::Error;

/// Re-usable clap `--metadata <PATH>` (`-m`) argument.
Expand Down Expand Up @@ -1039,9 +1039,6 @@ mod peer {
/// Register subcommand of peer
#[derive(clap::Args, Debug)]
pub struct Register {
/// P2P address of the peer e.g. `127.0.0.1:1337`
#[arg(short, long)]
pub address: SocketAddr,
/// Public key of the peer
#[arg(short, long)]
pub key: PublicKey,
Expand All @@ -1051,23 +1048,15 @@ mod peer {

impl RunArgs for Register {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
let Self {
address,
key,
metadata,
} = self;
let register_peer =
iroha::data_model::isi::Register::peer(Peer::new(PeerId::new(address, key)));
let Self { key, metadata } = self;
let register_peer = iroha::data_model::isi::Register::peer(key.into());
submit([register_peer], metadata.load()?, context).wrap_err("Failed to register peer")
}
}

/// Unregister subcommand of peer
#[derive(clap::Args, Debug)]
pub struct Unregister {
/// P2P address of the peer e.g. `127.0.0.1:1337`
#[arg(short, long)]
pub address: SocketAddr,
/// Public key of the peer
#[arg(short, long)]
pub key: PublicKey,
Expand All @@ -1077,13 +1066,8 @@ mod peer {

impl RunArgs for Unregister {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
let Self {
address,
key,
metadata,
} = self;
let unregister_peer =
iroha::data_model::isi::Unregister::peer(PeerId::new(address, key));
let Self { key, metadata } = self;
let unregister_peer = iroha::data_model::isi::Unregister::peer(key.into());
submit([unregister_peer], metadata.load()?, context)
.wrap_err("Failed to unregister peer")
}
Expand Down
1 change: 1 addition & 0 deletions crates/iroha_config/iroha_test_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ private_key = "802620282ED9F3CF92811C3818DBC4AE594ED59DC1A2F78E4241E31924E101D6B

[network]
address = "127.0.0.1:1337"
external_port = 1337

[genesis]
public_key = "ed01204164BF554923ECE1FD412D241036D863A6AE430476C898248B8237D77534CFC4"
Expand Down
1 change: 1 addition & 0 deletions crates/iroha_config/src/parameters/actual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct Common {
#[derive(Debug, Clone)]
pub struct Network {
pub address: WithOrigin<SocketAddr>,
pub external_port: WithOrigin<u16>,
pub idle_timeout: Duration,
}

Expand Down
4 changes: 4 additions & 0 deletions crates/iroha_config/src/parameters/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ pub struct Network {
/// Peer-to-peer address
#[config(env = "P2P_ADDRESS")]
pub address: WithOrigin<SocketAddr>,
#[config(env = "P2P_EXTERNAL_PORT")]
pub external_port: WithOrigin<u16>,
#[config(default = "defaults::network::BLOCK_GOSSIP_SIZE")]
pub block_gossip_size: NonZeroU32,
#[config(default = "defaults::network::BLOCK_GOSSIP_PERIOD.into()")]
Expand All @@ -296,6 +298,7 @@ impl Network {
) {
let Self {
address,
external_port,
block_gossip_size,
block_gossip_period_ms: block_gossip_period,
transaction_gossip_size,
Expand All @@ -306,6 +309,7 @@ impl Network {
(
actual::Network {
address,
external_port,
idle_timeout: idle_timeout.get(),
},
actual::BlockSync {
Expand Down
7 changes: 7 additions & 0 deletions crates/iroha_config/tests/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ fn minimal_config_snapshot() {
path: "tests/fixtures/base.toml",
},
},
external_port: WithOrigin {
value: 1337,
origin: File {
id: ParameterId(network.external_port),
path: "tests/fixtures/base.toml",
},
},
idle_timeout: 60s,
},
genesis: Genesis {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extends = ["base.toml", "base_trusted_peers.toml"]

[network]
address = "127.0.0.1:8080"
external_port = 8080

[torii]
address = "127.0.0.1:8080"
1 change: 1 addition & 0 deletions crates/iroha_config/tests/fixtures/base.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ private_key = "8026208F4C15E5D664DA3F13778801D23D4E89B76E94C1B94B389544168B6CB89

[network]
address = "127.0.0.1:1337"
external_port = 1337

[genesis]
public_key = "ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB"
Expand Down
1 change: 1 addition & 0 deletions crates/iroha_config/tests/fixtures/full.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CHAIN=0-0
PUBLIC_KEY=ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB
PRIVATE_KEY=8026208F4C15E5D664DA3F13778801D23D4E89B76E94C1B94B389544168B6CB894F84F
P2P_ADDRESS=127.0.0.1:5432
P2P_EXTERNAL_PORT=5432
GENESIS_PUBLIC_KEY=ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB
GENESIS=./genesis.signed.scale
API_ADDRESS=127.0.0.1:8080
Expand Down
1 change: 1 addition & 0 deletions crates/iroha_config/tests/fixtures/full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ file = "genesis.signed.scale"

[network]
address = "localhost:3840"
external_port = 3840
block_gossip_period_ms = 10_000
block_gossip_size = 4
transaction_gossip_period_ms = 1_000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ private_key = "8026208F4C15E5D664DA3F13778801D23D4E89B76E94C1B94B389544168B6CB89

[network]
address = "127.0.0.1:1337"
external_port = 1337

[genesis]
public_key = "ed01208BA62848CF767D72E7F7F4B9D2D7BA07FEE33760F79ABE5597A51520E292A0CB"
Expand Down
4 changes: 4 additions & 0 deletions crates/iroha_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod gossiper;
pub mod kiso;
pub mod kura;
pub mod metrics;
pub mod peers_gossiper;
pub mod query;
pub mod queue;
pub mod smartcontracts;
Expand All @@ -25,6 +26,7 @@ use tokio::sync::broadcast;

use crate::{
block_sync::message::Message as BlockSyncMessage,
peers_gossiper::PeersGossip,
prelude::*,
sumeragi::message::{BlockMessage, ControlFlowMessage},
};
Expand Down Expand Up @@ -52,6 +54,8 @@ pub enum NetworkMessage {
BlockSync(Box<BlockSyncMessage>),
/// Transaction gossiper message
TransactionGossiper(Box<TransactionGossip>),
/// Peers addresses gossiper message
PeersGossiper(Box<PeersGossip>),
/// Health check message
Health,
}
Expand Down
151 changes: 151 additions & 0 deletions crates/iroha_core/src/peers_gossiper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//! Peers gossiper is actor which is responsible for gossiping addresses of peers.
//!
//! E.g. peer A changes address, connects to peer B,
//! and then peer B will broadcast address of peer A to other peers.
use std::{
collections::{HashMap, HashSet},
time::Duration,
};

use iroha_config::parameters::actual::TrustedPeers;
use iroha_data_model::peer::{Peer, PeerId};
use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal};
use iroha_p2p::{Broadcast, UpdatePeers};
use iroha_primitives::{addr::SocketAddr, unique_vec::UniqueVec};
use iroha_version::{Decode, Encode};
use tokio::sync::mpsc;

use crate::{IrohaNetwork, NetworkMessage};

/// [`Gossiper`] actor handle.
#[derive(Clone)]
pub struct PeersGossiperHandle {
message_sender: mpsc::Sender<PeersGossip>,
}

impl PeersGossiperHandle {
/// Send [`PeersGossip`] to actor
pub async fn gossip(&self, gossip: PeersGossip) {
self.message_sender
.send(gossip)
.await
.expect("Gossiper must handle messages until there is at least one handle to it")
}
}

/// Actor which gossips peers addresses.
pub struct PeersGossiper {
/// Peers provided at startup
initial_peers: HashMap<PeerId, SocketAddr>,
/// Peers received via gossiping from other peers
gossip_peers: HashMap<PeerId, SocketAddr>,
network: IrohaNetwork,
}

/// Terminology:
/// * Topology - public keys of current network derived from blockchain (Register/Unregister Peer Isi)
/// * Peers addresses - currently known addresses for peers in topology. Might be unknown for some peer.
///
/// There are three sources of peers addresses:
/// 1. Provided at iroha startup (`TRUSTED_PEERS` env var)
/// 2. Currently connected online peers.
/// Some peer might change address and connect to our peer,
/// such connection will be accepted if peer public key is in topology.
/// 3. Received via gossiping from other peers.
impl PeersGossiper {
/// Start actor.
pub fn start(
trusted_peers: TrustedPeers,
network: IrohaNetwork,
shutdown_signal: ShutdownSignal,
) -> (PeersGossiperHandle, Child) {
let initial_peers = trusted_peers
.others
.into_iter()
.map(|peer| (peer.id, peer.address))
.collect();
let gossiper = Self {
initial_peers,
gossip_peers: HashMap::new(),
network,
};
gossiper.network_update_peers_addresses();

let (message_sender, message_receiver) = mpsc::channel(1);
(
PeersGossiperHandle { message_sender },
Child::new(
tokio::task::spawn(gossiper.run(message_receiver, shutdown_signal)),
OnShutdown::Abort,
),
)
}

async fn run(
mut self,
mut message_receiver: mpsc::Receiver<PeersGossip>,
shutdown_signal: ShutdownSignal,
) {
let mut gossip_period = tokio::time::interval(Duration::from_secs(60));
loop {
tokio::select! {
_ = gossip_period.tick() => {
self.gossip_peers()
}
_ = self.network.wait_online_peers_update(|_| ()) => {
self.gossip_peers();
}
Some(peers_gossip) = message_receiver.recv() => {
self.handle_peers_gossip(peers_gossip);
}
() = shutdown_signal.receive() => {
iroha_logger::debug!("Shutting down peers gossiper");
break;
},
}
tokio::task::yield_now().await;
}
}

fn gossip_peers(&self) {
let online_peers = self.network.online_peers(Clone::clone);
let online_peers = UniqueVec::from_iter(online_peers);
let data = NetworkMessage::PeersGossiper(Box::new(PeersGossip(online_peers)));
self.network.broadcast(Broadcast { data });
}

fn handle_peers_gossip(&mut self, PeersGossip(peers): PeersGossip) {
for peer in peers {
self.gossip_peers.insert(peer.id, peer.address);
}
self.network_update_peers_addresses();
}

fn network_update_peers_addresses(&self) {
let online_peers = self.network.online_peers(Clone::clone);
let online_peers_ids = online_peers
.into_iter()
.map(|peer| peer.id)
.collect::<HashSet<_>>();

let mut peers = Vec::new();
for (id, address) in &self.initial_peers {
if !online_peers_ids.contains(id) {
peers.push((id.clone(), address.clone()));
}
}
for (id, address) in &self.gossip_peers {
if !online_peers_ids.contains(id) {
peers.push((id.clone(), address.clone()));
}
}

let update = UpdatePeers(peers);
self.network.update_peers_addresses(update);
}
}

/// Message for gossiping peers addresses.
#[derive(Decode, Encode, Debug, Clone)]
pub struct PeersGossip(UniqueVec<Peer>);
Loading

0 comments on commit de34371

Please sign in to comment.