Skip to content

Commit

Permalink
Add dialled peers to list and send new IDs to subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
caolan committed Oct 7, 2024
1 parent 9f3afb7 commit 16cf598
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
56 changes: 42 additions & 14 deletions mutinyd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Server {
inbox_subscribers: HashMap<i64, HashMap<usize, mpsc::Sender<ResponseBody>>>,
client_request_receiver: mpsc::Receiver<ClientRequest>,
client_request_sender: mpsc::Sender<ClientRequest>,
peers: HashSet<(PeerId, Multiaddr)>,
peers: HashMap<PeerId, HashSet<Multiaddr>>,
peer_id: libp2p::PeerId,
delivery_attempts: HashMap<OutboundRequestId, i64>,
store: Store,
Expand All @@ -38,7 +38,7 @@ impl Server {
swarm: swarm::start(config.keypair).await?,
client_request_receiver: rx,
client_request_sender: tx,
peers: HashSet::new(),
peers: HashMap::new(),
peer_id: libp2p::identity::PeerId::from_public_key(pubkey),
delivery_attempts: HashMap::new(),
store: Store::new(config.db_connection),
Expand Down Expand Up @@ -199,25 +199,49 @@ impl Server {
}
}

async fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
match self.peers.entry(peer_id) {
std::collections::hash_map::Entry::Occupied(entry) => {
entry.into_mut().insert(addr);
},
std::collections::hash_map::Entry::Vacant(entry) => {
let mut addrs = HashSet::new();
addrs.insert(addr);
entry.insert(addrs);
self.peer_subscribers_send(ResponseBody::PeerDiscovered {
peer_id: peer_id.to_base58(),
}).await;
},
}
}

async fn remove_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
let mut expired = false;
if let Some(addrs) = self.peers.get_mut(&peer_id) {
addrs.remove(&addr);
expired = addrs.len() == 0;
}
if expired {
self.peers.remove(&peer_id);
self.peer_subscribers_send(ResponseBody::PeerExpired {
peer_id: peer_id.to_base58(),
}).await;
}
}

async fn swarm_event(&mut self, event: SwarmEvent<MutinyBehaviourEvent>) -> Result<(), Box<dyn Error>> {
match event {
SwarmEvent::Behaviour(swarm::MutinyBehaviourEvent::Mdns(ev)) => match ev {
mdns::Event::Discovered(list) => {
for (peer_id, multiaddr) in list {
for (peer_id, addr) in list {
println!("mDNS discovered a new peer: {peer_id}");
self.peers.insert((peer_id, multiaddr));
self.peer_subscribers_send(ResponseBody::PeerDiscovered {
peer_id: peer_id.to_base58(),
}).await;
self.add_peer_address(peer_id, addr).await
}
},
mdns::Event::Expired(list) => {
for (peer_id, multiaddr) in list {
for (peer_id, addr) in list {
println!("mDNS discover peer has expired: {peer_id}");
self.peers.remove(&(peer_id, multiaddr));
self.peer_subscribers_send(ResponseBody::PeerExpired {
peer_id: peer_id.to_base58(),
}).await;
self.remove_peer_address(peer_id, addr).await;
}
},
},
Expand Down Expand Up @@ -262,7 +286,11 @@ impl Server {
},
// Identification information has been received from a peer.
libp2p::identify::Event::Received { info, .. } => {
println!("Received identify info {info:?}")
println!("Received identify info {info:?}");
let peer_id = libp2p::identity::PeerId::from_public_key(&info.public_key);
for addr in info.listen_addrs {
self.add_peer_address(peer_id, addr).await
}
},
// Identification information of the local node has been actively pushed to a peer.
libp2p::identify::Event::Pushed { peer_id, .. } => {
Expand Down Expand Up @@ -381,7 +409,7 @@ impl Server {
},
RequestBody::Peers => {
let mut peers: Vec<String> = Vec::new();
for (id, _addr) in self.peers.iter() {
for (id, _addrs) in self.peers.iter() {
peers.push(id.to_base58());
}
let _ = request.response.send(ResponseBody::Peers {peers}).await;
Expand Down
1 change: 0 additions & 1 deletion mutinyd/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,3 @@ pub async fn start(keypair: Keypair) -> Result<
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(swarm)
}

0 comments on commit 16cf598

Please sign in to comment.