From 16cf59883219c7b32fe18213c2eed91912f6a2be Mon Sep 17 00:00:00 2001 From: Caolan McMahon Date: Mon, 7 Oct 2024 15:09:13 +0100 Subject: [PATCH] Add dialled peers to list and send new IDs to subscribers --- mutinyd/src/server.rs | 56 ++++++++++++++++++++++++++++++++----------- mutinyd/src/swarm.rs | 1 - 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/mutinyd/src/server.rs b/mutinyd/src/server.rs index a1c0335..baf1918 100644 --- a/mutinyd/src/server.rs +++ b/mutinyd/src/server.rs @@ -20,7 +20,7 @@ pub struct Server { inbox_subscribers: HashMap>>, client_request_receiver: mpsc::Receiver, client_request_sender: mpsc::Sender, - peers: HashSet<(PeerId, Multiaddr)>, + peers: HashMap>, peer_id: libp2p::PeerId, delivery_attempts: HashMap, store: Store, @@ -38,7 +38,7 @@ impl Server { swarm: swarm::start(config.keypair).await?, client_request_receiver: rx, client_request_sender: tx, - peers: HashSet::new(), + peers: HashMap::new(), peer_id: libp2p::identity::PeerId::from_public_key(pubkey), delivery_attempts: HashMap::new(), store: Store::new(config.db_connection), @@ -199,25 +199,49 @@ impl Server { } } + async fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) { + match self.peers.entry(peer_id) { + std::collections::hash_map::Entry::Occupied(entry) => { + entry.into_mut().insert(addr); + }, + std::collections::hash_map::Entry::Vacant(entry) => { + let mut addrs = HashSet::new(); + addrs.insert(addr); + entry.insert(addrs); + self.peer_subscribers_send(ResponseBody::PeerDiscovered { + peer_id: peer_id.to_base58(), + }).await; + }, + } + } + + async fn remove_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) { + let mut expired = false; + if let Some(addrs) = self.peers.get_mut(&peer_id) { + addrs.remove(&addr); + expired = addrs.len() == 0; + } + if expired { + self.peers.remove(&peer_id); + self.peer_subscribers_send(ResponseBody::PeerExpired { + peer_id: peer_id.to_base58(), + }).await; + } + } + async fn swarm_event(&mut self, event: SwarmEvent) -> Result<(), Box> { match event { SwarmEvent::Behaviour(swarm::MutinyBehaviourEvent::Mdns(ev)) => match ev { mdns::Event::Discovered(list) => { - for (peer_id, multiaddr) in list { + for (peer_id, addr) in list { println!("mDNS discovered a new peer: {peer_id}"); - self.peers.insert((peer_id, multiaddr)); - self.peer_subscribers_send(ResponseBody::PeerDiscovered { - peer_id: peer_id.to_base58(), - }).await; + self.add_peer_address(peer_id, addr).await } }, mdns::Event::Expired(list) => { - for (peer_id, multiaddr) in list { + for (peer_id, addr) in list { println!("mDNS discover peer has expired: {peer_id}"); - self.peers.remove(&(peer_id, multiaddr)); - self.peer_subscribers_send(ResponseBody::PeerExpired { - peer_id: peer_id.to_base58(), - }).await; + self.remove_peer_address(peer_id, addr).await; } }, }, @@ -262,7 +286,11 @@ impl Server { }, // Identification information has been received from a peer. libp2p::identify::Event::Received { info, .. } => { - println!("Received identify info {info:?}") + println!("Received identify info {info:?}"); + let peer_id = libp2p::identity::PeerId::from_public_key(&info.public_key); + for addr in info.listen_addrs { + self.add_peer_address(peer_id, addr).await + } }, // Identification information of the local node has been actively pushed to a peer. libp2p::identify::Event::Pushed { peer_id, .. } => { @@ -381,7 +409,7 @@ impl Server { }, RequestBody::Peers => { let mut peers: Vec = Vec::new(); - for (id, _addr) in self.peers.iter() { + for (id, _addrs) in self.peers.iter() { peers.push(id.to_base58()); } let _ = request.response.send(ResponseBody::Peers {peers}).await; diff --git a/mutinyd/src/swarm.rs b/mutinyd/src/swarm.rs index c51a733..cd14672 100644 --- a/mutinyd/src/swarm.rs +++ b/mutinyd/src/swarm.rs @@ -66,4 +66,3 @@ pub async fn start(keypair: Keypair) -> Result< swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(swarm) } -