Skip to content

Commit

Permalink
Fix clippy warnings
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Murzin <[email protected]>
  • Loading branch information
dima74 committed Oct 30, 2024
1 parent b1697ff commit bcd2b69
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 45 deletions.
37 changes: 17 additions & 20 deletions crates/iroha_core/src/peers_gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! and then peer B will broadcast address of peer A to other peers.
use std::{
collections::{HashMap, HashSet},
collections::{BTreeMap, BTreeSet},
time::Duration,
};

Expand Down Expand Up @@ -46,12 +46,12 @@ impl PeersGossiperHandle {
/// Actor which gossips peers addresses.
pub struct PeersGossiper {
/// Peers provided at startup
initial_peers: HashMap<PeerId, SocketAddr>,
initial_peers: BTreeMap<PeerId, SocketAddr>,
/// Peers received via gossiping from other peers
/// First-level key corresponds to SocketAddr
/// Second-level key - peer from which such SocketAddr was received
gossip_peers: HashMap<PeerId, HashMap<PeerId, SocketAddr>>,
current_topology: HashSet<PeerId>,
/// First-level key corresponds to `SocketAddr`
/// Second-level key - peer from which such `SocketAddr` was received
gossip_peers: BTreeMap<PeerId, BTreeMap<PeerId, SocketAddr>>,
current_topology: BTreeSet<PeerId>,
network: IrohaNetwork,
}

Expand Down Expand Up @@ -79,8 +79,8 @@ impl PeersGossiper {
.collect();
let gossiper = Self {
initial_peers,
gossip_peers: HashMap::new(),
current_topology: HashSet::new(),
gossip_peers: BTreeMap::new(),
current_topology: BTreeSet::new(),
network,
};
gossiper.network_update_peers_addresses();
Expand Down Expand Up @@ -118,11 +118,11 @@ impl PeersGossiper {
_ = gossip_period.tick() => {
self.gossip_peers()
}
_ = self.network.wait_online_peers_update(|_| ()) => {
() = self.network.wait_online_peers_update(|_| ()) => {
self.gossip_peers();
}
Some((peers_gossip, peer)) = message_receiver.recv() => {
self.handle_peers_gossip(peers_gossip, peer);
self.handle_peers_gossip(peers_gossip, &peer);
}
() = shutdown_signal.receive() => {
iroha_logger::debug!("Shutting down peers gossiper");
Expand All @@ -143,7 +143,7 @@ impl PeersGossiper {
!map.is_empty()
});

self.current_topology = topology;
self.current_topology = topology.into_iter().collect();
}

fn gossip_peers(&self) {
Expand All @@ -153,16 +153,13 @@ impl PeersGossiper {
self.network.broadcast(Broadcast { data });
}

fn handle_peers_gossip(&mut self, PeersGossip(peers): PeersGossip, from_peer: Peer) {
fn handle_peers_gossip(&mut self, PeersGossip(peers): PeersGossip, from_peer: &Peer) {
if !self.current_topology.contains(&from_peer.id) {
return;
}
for peer in peers {
if self.current_topology.contains(&peer.id) {
let map = self
.gossip_peers
.entry(peer.id)
.or_insert_with(HashMap::new);
let map = self.gossip_peers.entry(peer.id).or_default();
map.insert(from_peer.id.clone(), peer.address);
}
}
Expand All @@ -174,7 +171,7 @@ impl PeersGossiper {
let online_peers_ids = online_peers
.into_iter()
.map(|peer| peer.id)
.collect::<HashSet<_>>();
.collect::<BTreeSet<_>>();

let mut peers = Vec::new();
for (id, address) in &self.initial_peers {
Expand All @@ -193,16 +190,16 @@ impl PeersGossiper {
}
}

fn choose_address_majority_rule(addresses: &HashMap<PeerId, SocketAddr>) -> SocketAddr {
let mut count_map = HashMap::new();
fn choose_address_majority_rule(addresses: &BTreeMap<PeerId, SocketAddr>) -> SocketAddr {
let mut count_map = BTreeMap::new();
for address in addresses.values() {
*count_map.entry(address).or_insert(0) += 1;
}
count_map
.into_iter()
.max_by_key(|(_, count)| *count)
.map(|(address, _)| address)
.expect("There must be no empty inner HashMap in addresses")
.expect("There must be no empty inner map in addresses")
.clone()
}

Expand Down
12 changes: 4 additions & 8 deletions crates/iroha_core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
//! The main event loop that powers sumeragi.
use std::{
collections::{BTreeSet, HashSet},
ops::Deref,
sync::mpsc,
};
use std::{collections::BTreeSet, ops::Deref, sync::mpsc};

use iroha_crypto::{HashOf, KeyPair};
use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId};
Expand Down Expand Up @@ -121,9 +117,9 @@ impl Sumeragi {

/// Connect or disconnect peers according to the current network topology.
fn connect_peers(&self, topology: &Topology) {
let peers = topology.iter().cloned().collect::<HashSet<_>>();
self.network.update_topology(UpdateTopology(peers.clone()));
self.peers_gossiper.update_topology(UpdateTopology(peers));
let update = UpdateTopology(topology.iter().cloned().collect());
self.network.update_topology(update.clone());
self.peers_gossiper.update_topology(update);
}

fn send_event(&self, event: impl Into<EventBox>) {
Expand Down
15 changes: 8 additions & 7 deletions crates/iroha_p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
/// Can have two addresses for same `PeerId`.
/// * One initially provided via config
/// * Second received from other peers via gossiping
///
/// Will try to establish connection via both addresses.
current_peers_addresses: Vec<(PeerId, SocketAddr)>,
/// Duration after which terminate connection with idle peer
Expand Down Expand Up @@ -431,12 +432,12 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.connecting_peers.remove(&connection_id);

if !self.current_topology.contains(peer.id()) {
iroha_logger::warn!(%peer.id, topology=?self.current_topology, "Peer not present in topology is trying to connect");
iroha_logger::warn!(peer=%peer.id(), topology=?self.current_topology, "Peer not present in topology is trying to connect");
return;
}

// Insert peer if peer not in peers yet or replace peer if it's disambiguator value is smaller than new one (simultaneous connections resolution rule)
match self.peers.get(&peer.id()) {
match self.peers.get(peer.id()) {
Some(peer) if peer.disambiguator > disambiguator => {
iroha_logger::debug!(
"Peer is disconnected due to simultaneous connection resolution policy"
Expand All @@ -458,18 +459,18 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
disambiguator,
};
let _ = peer_message_sender.send(self.peer_message_sender.clone());
self.peers.insert(peer.id.clone(), ref_peer);
self.peers.insert(peer.id().clone(), ref_peer);
Self::add_online_peer(&self.online_peers_sender, peer);
}

fn peer_terminated(&mut self, Terminated { peer, conn_id }: Terminated) {
self.connecting_peers.remove(&conn_id);
if let Some(peer) = peer {
if let Some(ref_peer) = self.peers.get(&peer.id()) {
if let Some(ref_peer) = self.peers.get(peer.id()) {
if ref_peer.conn_id == conn_id {
iroha_logger::debug!(conn_id, peer=%peer, "Peer terminated");
self.peers.remove(&peer.id());
Self::remove_online_peer(&self.online_peers_sender, &peer.id);
self.peers.remove(peer.id());
Self::remove_online_peer(&self.online_peers_sender, peer.id());
}
}
}
Expand Down Expand Up @@ -504,7 +505,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
if ref_peer.handle.post(data.clone()).is_err() {
let peer = Peer::new(ref_peer.p2p_addr.clone(), public_key.clone());
iroha_logger::error!(peer=%peer, "Failed to send message to peer");
Self::remove_online_peer(online_peers_sender, &peer.id);
Self::remove_online_peer(online_peers_sender, peer.id());
false
} else {
true
Expand Down
13 changes: 6 additions & 7 deletions crates/iroha_p2p/tests/integration/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::{prelude::*, stream::FuturesUnordered, task::AtomicWaker};
use iroha_config::parameters::actual::Network as Config;
use iroha_config_base::WithOrigin;
use iroha_crypto::KeyPair;
use iroha_data_model::prelude::Peer;
use iroha_data_model::{prelude::Peer, Identifiable};
use iroha_futures::supervisor::ShutdownSignal;
use iroha_logger::{prelude::*, test_logger};
use iroha_p2p::{network::message::*, peer::message::PeerMessage, NetworkHandle};
Expand All @@ -21,7 +21,6 @@ use tokio::{
sync::{mpsc, Barrier},
time::Duration,
};
use iroha_data_model::Identifiable;

#[derive(Clone, Debug, Decode, Encode)]
struct TestMessage(String);
Expand Down Expand Up @@ -55,7 +54,7 @@ async fn network_create() {

info!("Connecting to peer...");
let peer1 = Peer::new(address.clone(), public_key.clone());
update_topology_and_peers_addresses(&network, vec![peer1.clone()]);
update_topology_and_peers_addresses(&network, &[peer1.clone()]);
tokio::time::sleep(delay).await;

info!("Posting message...");
Expand Down Expand Up @@ -184,8 +183,8 @@ async fn two_networks() {
let peer1 = Peer::new(address1.clone(), public_key1);
let peer2 = Peer::new(address2.clone(), public_key2);
// Connect peers with each other
update_topology_and_peers_addresses(&network1, vec![peer2.clone()]);
update_topology_and_peers_addresses(&network2, vec![peer1.clone()]);
update_topology_and_peers_addresses(&network1, &[peer2.clone()]);
update_topology_and_peers_addresses(&network2, &[peer1.clone()]);

tokio::time::timeout(Duration::from_millis(2000), async {
let mut connections = network1.wait_online_peers_update(HashSet::len).await;
Expand Down Expand Up @@ -315,7 +314,7 @@ async fn start_network(
let _ = barrier.wait().await;
let peers = peers.into_iter().filter(|p| p != &peer).collect::<Vec<_>>();
let conn_count = peers.len();
update_topology_and_peers_addresses(&network, peers);
update_topology_and_peers_addresses(&network, &peers);

let _ = barrier.wait().await;
tokio::time::timeout(Duration::from_millis(10_000), async {
Expand All @@ -340,7 +339,7 @@ async fn start_network(
(peer, network)
}

fn update_topology_and_peers_addresses(network: &NetworkHandle<TestMessage>, peers: Vec<Peer>) {
fn update_topology_and_peers_addresses(network: &NetworkHandle<TestMessage>, peers: &[Peer]) {
let topology = peers.iter().map(|peer| peer.id().clone()).collect();
network.update_topology(UpdateTopology(topology));

Expand Down
9 changes: 6 additions & 3 deletions crates/iroha_swarm/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<'a> PeerEnv<'a> {
) -> Self {
let p2p_public_address = topology
.iter()
.find(|&peer| peer.id.public_key() == public_key)
.find(|&peer| peer.id().public_key() == public_key)
.unwrap()
.address
.clone();
Expand All @@ -175,7 +175,7 @@ impl<'a> PeerEnv<'a> {
genesis_public_key,
trusted_peers: topology
.iter()
.filter(|&peer| peer.id.public_key() != public_key)
.filter(|&peer| peer.id().public_key() != public_key)
.collect(),
}
}
Expand Down Expand Up @@ -205,7 +205,10 @@ impl<'a> GenesisEnv<'a> {
base: PeerEnv::new(key_pair, ports, chain, genesis_public_key, topology),
genesis_private_key,
genesis: CONTAINER_SIGNED_GENESIS,
topology: topology.iter().map(|peer| peer.id()).collect(),
topology: topology
.iter()
.map(iroha_data_model::prelude::Peer::id)
.collect(),
}
}
}
Expand Down

0 comments on commit bcd2b69

Please sign in to comment.