Skip to content

Commit

Permalink
Review fixes: limit gossip_peers to peers from topology
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Murzin <[email protected]>
  • Loading branch information
dima74 committed Oct 25, 2024
1 parent e0ec32b commit 91e7ea6
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 16 deletions.
37 changes: 33 additions & 4 deletions crates/iroha_core/src/peers_gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use iroha_config::parameters::actual::TrustedPeers;
use iroha_data_model::peer::{Peer, PeerId};
use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal};
use iroha_p2p::{Broadcast, UpdatePeers};
use iroha_p2p::{Broadcast, UpdatePeers, UpdateTopology};
use iroha_primitives::{addr::SocketAddr, unique_vec::UniqueVec};
use iroha_version::{Decode, Encode};
use parity_scale_codec::{Error, Input};
Expand All @@ -23,6 +23,7 @@ use crate::{IrohaNetwork, NetworkMessage};
#[derive(Clone)]
pub struct PeersGossiperHandle {
message_sender: mpsc::Sender<PeersGossip>,
update_topology_sender: mpsc::UnboundedSender<UpdateTopology>,
}

impl PeersGossiperHandle {
Expand All @@ -33,6 +34,13 @@ impl PeersGossiperHandle {
.await
.expect("Gossiper must handle messages until there is at least one handle to it")
}

/// Send [`UpdateTopology`] message on network actor.
pub fn update_topology(&self, topology: UpdateTopology) {
self.update_topology_sender
.send(topology)
.expect("Gossiper must accept messages until there is at least one handle to it")
}
}

/// Actor which gossips peers addresses.
Expand All @@ -41,6 +49,7 @@ pub struct PeersGossiper {
initial_peers: HashMap<PeerId, SocketAddr>,
/// Peers received via gossiping from other peers
gossip_peers: HashMap<PeerId, SocketAddr>,
current_topology: HashSet<PeerId>,
network: IrohaNetwork,
}

Expand Down Expand Up @@ -69,15 +78,24 @@ impl PeersGossiper {
let gossiper = Self {
initial_peers,
gossip_peers: HashMap::new(),
current_topology: HashSet::new(),
network,
};
gossiper.network_update_peers_addresses();

let (message_sender, message_receiver) = mpsc::channel(1);
let (update_topology_sender, update_topology_receiver) = mpsc::unbounded_channel();
(
PeersGossiperHandle { message_sender },
PeersGossiperHandle {
message_sender,
update_topology_sender,
},
Child::new(
tokio::task::spawn(gossiper.run(message_receiver, shutdown_signal)),
tokio::task::spawn(gossiper.run(
message_receiver,
update_topology_receiver,
shutdown_signal,
)),
OnShutdown::Abort,
),
)
Expand All @@ -86,11 +104,15 @@ impl PeersGossiper {
async fn run(
mut self,
mut message_receiver: mpsc::Receiver<PeersGossip>,
mut update_topology_receiver: mpsc::UnboundedReceiver<UpdateTopology>,
shutdown_signal: ShutdownSignal,
) {
let mut gossip_period = tokio::time::interval(Duration::from_secs(60));
loop {
tokio::select! {
Some(update_topology) = update_topology_receiver.recv() => {
self.set_current_topology(update_topology);
}
_ = gossip_period.tick() => {
self.gossip_peers()
}
Expand All @@ -109,6 +131,11 @@ impl PeersGossiper {
}
}

fn set_current_topology(&mut self, UpdateTopology(topology): UpdateTopology) {
self.gossip_peers.retain(|peer, _| topology.contains(peer));
self.current_topology = topology;
}

fn gossip_peers(&self) {
let online_peers = self.network.online_peers(Clone::clone);
let online_peers = UniqueVec::from_iter(online_peers);
Expand All @@ -118,7 +145,9 @@ impl PeersGossiper {

fn handle_peers_gossip(&mut self, PeersGossip(peers): PeersGossip) {
for peer in peers {
self.gossip_peers.insert(peer.id, peer.address);
if self.current_topology.contains(&peer.id) {
self.gossip_peers.insert(peer.id, peer.address);
}
}
self.network_update_peers_addresses();
}
Expand Down
18 changes: 14 additions & 4 deletions crates/iroha_core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! The main event loop that powers sumeragi.
use std::{collections::BTreeSet, ops::Deref, sync::mpsc};
use std::{
collections::{BTreeSet, HashSet},
ops::Deref,
sync::mpsc,
};

use iroha_crypto::{HashOf, KeyPair};
use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId};
Expand All @@ -8,7 +12,10 @@ use iroha_p2p::UpdateTopology;
use tracing::{span, Level};

use super::{view_change::ProofBuilder, *};
use crate::{block::*, queue::TransactionGuard, sumeragi::tracing::instrument};
use crate::{
block::*, peers_gossiper::PeersGossiperHandle, queue::TransactionGuard,
sumeragi::tracing::instrument,
};

/// `Sumeragi` is the implementation of the consensus.
pub struct Sumeragi {
Expand All @@ -26,6 +33,8 @@ pub struct Sumeragi {
pub kura: Arc<Kura>,
/// [`iroha_p2p::Network`] actor address
pub network: IrohaNetwork,
/// Peers gossiper
pub peers_gossiper: PeersGossiperHandle,
/// Receiver channel, for control flow messages.
pub control_message_receiver: mpsc::Receiver<ControlFlowMessage>,
/// Receiver channel.
Expand Down Expand Up @@ -112,8 +121,9 @@ impl Sumeragi {

/// Connect or disconnect peers according to the current network topology.
fn connect_peers(&self, topology: &Topology) {
let peers = topology.iter().cloned().collect();
self.network.update_topology(UpdateTopology(peers));
let peers = topology.iter().cloned().collect::<HashSet<_>>();
self.network.update_topology(UpdateTopology(peers.clone()));
self.peers_gossiper.update_topology(UpdateTopology(peers));
}

fn send_event(&self, event: impl Into<EventBox>) {
Expand Down
8 changes: 7 additions & 1 deletion crates/iroha_core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ pub mod network_topology;
pub mod view_change;

use self::{message::*, view_change::ProofChain};
use crate::{kura::Kura, prelude::*, queue::Queue, EventsSender, IrohaNetwork, NetworkMessage};
use crate::{
kura::Kura, peers_gossiper::PeersGossiperHandle, prelude::*, queue::Queue, EventsSender,
IrohaNetwork, NetworkMessage,
};

/// Handle to `Sumeragi` actor
#[derive(Clone)]
Expand Down Expand Up @@ -144,6 +147,7 @@ impl SumeragiStartArgs {
queue,
kura,
network,
peers_gossiper,
genesis_network,
block_count: BlockCount(block_count),
#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -217,6 +221,7 @@ impl SumeragiStartArgs {
events_sender,
kura: Arc::clone(&kura),
network: network.clone(),
peers_gossiper,
control_message_receiver,
message_receiver,
debug_force_soft_fork,
Expand Down Expand Up @@ -297,6 +302,7 @@ pub struct SumeragiStartArgs {
pub queue: Arc<Queue>,
pub kura: Arc<Kura>,
pub network: IrohaNetwork,
pub peers_gossiper: PeersGossiperHandle,
pub genesis_network: GenesisWithPubKey,
pub block_count: BlockCount,
#[cfg(feature = "telemetry")]
Expand Down
15 changes: 8 additions & 7 deletions crates/irohad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ impl Iroha {
queue.clone(),
);

let (peers_gossiper, child) = PeersGossiper::start(
config.sumeragi.trusted_peers.value().clone(),
network.clone(),
supervisor.shutdown_signal(),
);
supervisor.monitor(child);

let (sumeragi, child) = SumeragiStartArgs {
sumeragi_config: config.sumeragi.clone(),
common_config: config.common.clone(),
Expand All @@ -267,6 +274,7 @@ impl Iroha {
queue: queue.clone(),
kura: kura.clone(),
network: network.clone(),
peers_gossiper: peers_gossiper.clone(),
genesis_network: GenesisWithPubKey {
genesis,
public_key: config.genesis.public_key.clone(),
Expand Down Expand Up @@ -302,13 +310,6 @@ impl Iroha {
.start(supervisor.shutdown_signal());
supervisor.monitor(child);

let (peers_gossiper, child) = PeersGossiper::start(
config.sumeragi.trusted_peers.value().clone(),
network.clone(),
supervisor.shutdown_signal(),
);
supervisor.monitor(child);

supervisor.monitor(task::spawn(
NetworkRelay {
sumeragi,
Expand Down

0 comments on commit 91e7ea6

Please sign in to comment.