From 26e4564441358a083787542b013a4d4e4b797ba1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 23 Apr 2024 18:58:08 +0200 Subject: [PATCH] refactor(iroh-net)!: Rename endpoint for nodes to node_state (#2222) ## Description We still have too many things named "endpoint", this targets cleaning up the naming of the state for each node that is stored in the NodeMap. This is now called `NodeState` instead of `Endpoint`. All related APIs now talk about nodes and node states instead of endpoints. Another minor cleanup is in the `NodeState` we had private fields with accessor functions, except for one field which was directly accessible. This is migrated to accessor functions for consistency as well. Finally it marks the visibility of some functions more explicitly, the visibility of those was already as such. This makes it easier to work and realise the impact changes have however. ## Breaking Changes * `MagicSock::tracked_endpoints` -> `MagicSock::connection_infos` * `MagicSock::tracked_endpoint` -> `MagicSock::connection_info` * `magicsock::EndpointInfo` -> `magicsock::ConnectionInfo` ## Notes & open questions Finally had to courage to change this, it's much less worse than I feared and to me it really helps calling this `NodeState`. I find it a noticeable improvement. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-cli/src/commands/doctor.rs | 4 +- iroh-net/src/magic_endpoint.rs | 6 +- iroh-net/src/magicsock.rs | 32 +-- iroh-net/src/magicsock/node_map.rs | 234 +++++++++--------- .../node_map/{endpoint.rs => node_state.rs} | 108 ++++---- 5 files changed, 199 insertions(+), 185 deletions(-) rename iroh-net/src/magicsock/node_map/{endpoint.rs => node_state.rs} (96%) diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 5fb0d22e17..02dca5f53d 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -30,7 +30,7 @@ use iroh::{ dns::default_resolver, key::{PublicKey, SecretKey}, magic_endpoint, - magicsock::EndpointInfo, + magicsock::ConnectionInfo, netcheck, portmapper, relay::{RelayMap, RelayMode, RelayUrl}, util::AbortingJoinHandle, @@ -390,7 +390,7 @@ impl Gui { .unwrap_or_else(|| "unknown".to_string()) }; let msg = match endpoint.connection_info(*node_id) { - Some(EndpointInfo { + Some(ConnectionInfo { relay_url, conn_type, latency, diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 6f71cffe01..6109b7b594 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -20,7 +20,7 @@ use crate::{ tls, NodeId, }; -pub use super::magicsock::{EndpointInfo as ConnectionInfo, LocalEndpointsStream}; +pub use super::magicsock::{ConnectionInfo, LocalEndpointsStream}; pub use iroh_base::node_addr::{AddrInfo, NodeAddr}; @@ -376,7 +376,7 @@ impl MagicEndpoint { /// to the internal addressbook through [`MagicEndpoint::add_node_addr`]), so these connections /// are not necessarily active connections. pub fn connection_infos(&self) -> Vec { - self.msock.tracked_endpoints() + self.msock.connection_infos() } /// Get connection information about a specific node. @@ -385,7 +385,7 @@ impl MagicEndpoint { /// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are /// currently communicating with that node over a `Direct` (UDP) or `Relay` (relay) connection. pub fn connection_info(&self, node_id: PublicKey) -> Option { - self.msock.tracked_endpoint(node_id) + self.msock.connection_info(node_id) } pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> { diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index bc9de2f1b3..be1f48e351 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -81,7 +81,7 @@ pub use crate::net::UdpSocket; pub use self::metrics::Metrics; pub use self::node_map::{ - ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, EndpointInfo, + ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, NodeInfo as ConnectionInfo, }; pub use self::timer::Timer; @@ -153,7 +153,7 @@ pub(crate) type RelayContents = SmallVec<[Bytes; 1]>; /// This is responsible for routing packets to nodes based on node IDs, it will initially /// route packets via a relay and transparently try and establish a node-to-node /// connection and upgrade to it. It will also keep looking for better connections as the -/// network details of both endpoints change. +/// network details of both nodes change. /// /// It is usually only necessary to use a single [`MagicSock`] instance in an application, it /// means any QUIC endpoints on top will be sharing as much information about nodes as @@ -342,7 +342,7 @@ impl Inner { let mut transmits_sent = 0; match self .node_map - .get_send_addrs_for_quic_mapped_addr(&dest, self.ipv6_reported.load(Ordering::Relaxed)) + .get_send_addrs(&dest, self.ipv6_reported.load(Ordering::Relaxed)) { Some((public_key, udp_addr, relay_url, mut msgs)) => { let mut pings_sent = false; @@ -443,10 +443,10 @@ impl Inner { Poll::Ready(Ok(transmits_sent)) } None => { - error!(dst=%dest, "no endpoint for mapped address"); + error!(dst=%dest, "no node_state for mapped address"); Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, - "trying to send to unknown endpoint", + "trying to send to unknown node", ))) } } @@ -720,15 +720,15 @@ impl Inner { let handled = self.node_map.handle_ping(*sender, addr.clone(), dm.tx_id); match handled.role { PingRole::Duplicate => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint already confirmed, skip"); + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path already confirmed, skip"); return; } PingRole::LikelyHeartbeat => {} - PingRole::NewEndpoint => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new endpoint"); + PingRole::NewPath => { + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new path"); } PingRole::Reactivate => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint active"); + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path active"); } } @@ -1310,13 +1310,13 @@ impl MagicSock { } /// Retrieve connection information about nodes in the network. - pub fn tracked_endpoints(&self) -> Vec { - self.inner.node_map.endpoint_infos(Instant::now()) + pub fn connection_infos(&self) -> Vec { + self.inner.node_map.node_infos(Instant::now()) } /// Retrieve connection information about a node in the network. - pub fn tracked_endpoint(&self, node_key: PublicKey) -> Option { - self.inner.node_map.endpoint_info(&node_key) + pub fn connection_info(&self, node_key: PublicKey) -> Option { + self.inner.node_map.node_info(&node_key) } /// Returns the local endpoints as a stream. @@ -1719,7 +1719,7 @@ impl Actor { // TODO: this might trigger too many packets at once, pace this self.inner.node_map.prune_inactive(); - let msgs = self.inner.node_map.endpoints_stayin_alive(); + let msgs = self.inner.node_map.nodes_stayin_alive(); self.handle_ping_actions(msgs).await; } _ = endpoints_update_receiver.changed() => { @@ -2281,7 +2281,7 @@ impl Actor { /// This is called when connectivity changes enough that we no longer trust the old routes. #[instrument(skip_all, fields(me = %self.inner.me))] fn reset_endpoint_states(&mut self) { - self.inner.node_map.reset_endpoint_states() + self.inner.node_map.reset_node_states() } /// Tells the relay actor to close stale relay connections. @@ -2605,7 +2605,7 @@ pub(crate) mod tests { fn tracked_endpoints(&self) -> Vec { self.endpoint .magic_sock() - .tracked_endpoints() + .connection_infos() .into_iter() .map(|ep| ep.node_id) .collect() diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index 99fdf52e2c..18e0ba5df9 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -10,13 +10,14 @@ use std::{ use anyhow::{ensure, Context as _}; use futures::Stream; +use iroh_base::key::NodeId; use iroh_metrics::inc; use parking_lot::Mutex; use stun_rs::TransactionId; use tokio::io::AsyncWriteExt; use tracing::{debug, info, instrument, trace, warn}; -use self::endpoint::{Endpoint, Options, PingHandled}; +use self::node_state::{NodeState, Options, PingHandled}; use super::{ metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, QuicMappedAddr, }; @@ -28,20 +29,16 @@ use crate::{ }; mod best_addr; -mod endpoint; +mod node_state; -pub use endpoint::{ConnectionType, ControlMsg, DirectAddrInfo, EndpointInfo}; -pub(super) use endpoint::{DiscoPingPurpose, PingAction, PingRole, SendPing}; +pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, NodeInfo}; +pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing}; /// Number of nodes that are inactive for which we keep info about. This limit is enforced /// periodically via [`NodeMap::prune_inactive`]. const MAX_INACTIVE_NODES: usize = 30; -/// Map of the [`Endpoint`] information for all the known nodes. -/// -/// Each endpoint is also known as a "Node" in the "(iroh) network", but this is a bit of a -/// looser term. It is where "NodeMap" comes from however. -/// +/// Map of the [`NodeState`] information for all the known nodes. /// /// The nodes can be looked up by: /// @@ -65,24 +62,28 @@ pub(super) struct NodeMap { #[derive(Default, Debug)] pub(super) struct NodeMapInner { - by_node_key: HashMap, + by_node_key: HashMap, by_ip_port: HashMap, by_quic_mapped_addr: HashMap, - by_id: HashMap, + by_id: HashMap, next_id: usize, } +/// Identifier to look up a [`NodeState`] in the [`NodeMap`]. +/// +/// You can look up entries in [`NodeMap`] with various keys, depending on the context you +/// have for the node. These are all the keys the [`NodeMap`] can use. #[derive(Clone)] -enum EndpointId<'a> { - Id(&'a usize), - NodeKey(&'a PublicKey), +enum NodeStateKey<'a> { + Idx(&'a usize), + NodeId(&'a NodeId), QuicMappedAddr(&'a QuicMappedAddr), IpPort(&'a IpPort), } impl NodeMap { /// Create a new [`NodeMap`] from data stored in `path`. - pub fn load_from_file(path: impl AsRef) -> anyhow::Result { + pub(super) fn load_from_file(path: impl AsRef) -> anyhow::Result { Ok(Self::from_inner(NodeMapInner::load_from_file(path)?)) } @@ -95,29 +96,29 @@ impl NodeMap { /// Get the known node addresses stored in the map. Nodes with empty addressing information are /// filtered out. #[cfg(test)] - pub fn known_node_addresses(&self) -> Vec { + pub(super) fn known_node_addresses(&self) -> Vec { self.inner.lock().known_node_addresses().collect() } /// Add the contact information for a node. - pub fn add_node_addr(&self, node_addr: NodeAddr) { + pub(super) fn add_node_addr(&self, node_addr: NodeAddr) { self.inner.lock().add_node_addr(node_addr) } /// Number of nodes currently listed. - pub fn node_count(&self) -> usize { + pub(super) fn node_count(&self) -> usize { self.inner.lock().node_count() } - pub fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { + pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { self.inner.lock().receive_udp(udp_addr) } - pub fn receive_relay(&self, relay_url: &RelayUrl, src: PublicKey) -> QuicMappedAddr { + pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: PublicKey) -> QuicMappedAddr { self.inner.lock().receive_relay(relay_url, &src) } - pub fn notify_ping_sent( + pub(super) fn notify_ping_sent( &self, id: usize, dst: SendAddr, @@ -125,30 +126,30 @@ impl NodeMap { purpose: DiscoPingPurpose, msg_sender: tokio::sync::mpsc::Sender, ) { - if let Some(ep) = self.inner.lock().get_mut(EndpointId::Id(&id)) { + if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) { ep.ping_sent(dst, tx_id, purpose, msg_sender); } } - pub fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) { - if let Some(ep) = self.inner.lock().get_mut(EndpointId::Id(&id)) { + pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) { + if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) { ep.ping_timeout(tx_id); } } - pub fn get_quic_mapped_addr_for_node_key( + pub(super) fn get_quic_mapped_addr_for_node_key( &self, node_key: &PublicKey, ) -> Option { self.inner .lock() - .get(EndpointId::NodeKey(node_key)) + .get(NodeStateKey::NodeId(node_key)) .map(|ep| *ep.quic_mapped_addr()) } /// Insert a received ping into the node map, and return whether a ping with this tx_id was already /// received. - pub fn handle_ping( + pub(super) fn handle_ping( &self, sender: PublicKey, src: SendAddr, @@ -157,17 +158,21 @@ impl NodeMap { self.inner.lock().handle_ping(sender, src, tx_id) } - pub fn handle_pong(&self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { + pub(super) fn handle_pong(&self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { self.inner.lock().handle_pong(sender, src, pong) } #[must_use = "actions must be handled"] - pub fn handle_call_me_maybe(&self, sender: PublicKey, cm: CallMeMaybe) -> Vec { + pub(super) fn handle_call_me_maybe( + &self, + sender: PublicKey, + cm: CallMeMaybe, + ) -> Vec { self.inner.lock().handle_call_me_maybe(sender, cm) } #[allow(clippy::type_complexity)] - pub fn get_send_addrs_for_quic_mapped_addr( + pub(super) fn get_send_addrs( &self, addr: &QuicMappedAddr, have_ipv6: bool, @@ -178,38 +183,37 @@ impl NodeMap { Vec, )> { let mut inner = self.inner.lock(); - let ep = inner.get_mut(EndpointId::QuicMappedAddr(addr))?; + let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; let public_key = *ep.public_key(); let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); Some((public_key, udp_addr, relay_url, msgs)) } - pub fn notify_shutdown(&self) { + pub(super) fn notify_shutdown(&self) { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { + for (_, ep) in inner.node_states_mut() { ep.reset(); } } - pub fn reset_endpoint_states(&self) { + pub(super) fn reset_node_states(&self) { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { + for (_, ep) in inner.node_states_mut() { ep.note_connectivity_change(); } } - pub fn endpoints_stayin_alive(&self) -> Vec { - let mut msgs = Vec::new(); + pub(super) fn nodes_stayin_alive(&self) -> Vec { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { - msgs.extend(ep.stayin_alive()); - } - msgs + inner + .node_states_mut() + .flat_map(|(_idx, node_state)| node_state.stayin_alive()) + .collect() } - /// Get the [`EndpointInfo`]s for each endpoint - pub fn endpoint_infos(&self, now: Instant) -> Vec { - self.inner.lock().endpoint_infos(now) + /// Gets the [`NodeInfo`]s for each endpoint + pub(super) fn node_infos(&self, now: Instant) -> Vec { + self.inner.lock().node_infos(now) } /// Returns a stream of [`ConnectionType`]. @@ -221,17 +225,20 @@ impl NodeMap { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` - pub fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result { + pub(super) fn conn_type_stream( + &self, + public_key: &PublicKey, + ) -> anyhow::Result { self.inner.lock().conn_type_stream(public_key) } - /// Get the [`EndpointInfo`]s for each endpoint - pub fn endpoint_info(&self, public_key: &PublicKey) -> Option { - self.inner.lock().endpoint_info(public_key) + /// Get the [`NodeInfo`]s for each endpoint + pub(super) fn node_info(&self, public_key: &PublicKey) -> Option { + self.inner.lock().node_info(public_key) } /// Saves the known node info to the given path, returning the number of nodes persisted. - pub async fn save_to_file(&self, path: &Path) -> anyhow::Result { + pub(super) async fn save_to_file(&self, path: &Path) -> anyhow::Result { ensure!(!path.is_dir(), "{} must be a file", path.display()); // So, not sure what to do here. @@ -282,7 +289,7 @@ impl NodeMap { } /// Prunes nodes without recent activity so that at most [`MAX_INACTIVE_NODES`] are kept. - pub fn prune_inactive(&self) { + pub(super) fn prune_inactive(&self) { self.inner.lock().prune_inactive(); } } @@ -318,40 +325,44 @@ impl NodeMapInner { fn add_node_addr(&mut self, node_addr: NodeAddr) { let NodeAddr { node_id, info } = node_addr; - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(&node_id), || Options { - public_key: node_id, + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&node_id), || Options { + node_id, relay_url: info.relay_url.clone(), active: false, }); - endpoint.update_from_node_addr(&info); - let id = endpoint.id(); - for endpoint in &info.direct_addresses { - self.set_endpoint_for_ip_port(*endpoint, id); + node_state.update_from_node_addr(&info); + let id = node_state.id(); + for addr in &info.direct_addresses { + self.set_node_state_for_ip_port(*addr, id); } } - fn get_id(&self, id: EndpointId) -> Option { + fn get_id(&self, id: NodeStateKey) -> Option { match id { - EndpointId::Id(id) => Some(*id), - EndpointId::NodeKey(node_key) => self.by_node_key.get(node_key).copied(), - EndpointId::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(), - EndpointId::IpPort(ipp) => self.by_ip_port.get(ipp).copied(), + NodeStateKey::Idx(id) => Some(*id), + NodeStateKey::NodeId(node_key) => self.by_node_key.get(node_key).copied(), + NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(), + NodeStateKey::IpPort(ipp) => self.by_ip_port.get(ipp).copied(), } } - fn get_mut(&mut self, id: EndpointId) -> Option<&mut Endpoint> { + fn get_mut(&mut self, id: NodeStateKey) -> Option<&mut NodeState> { self.get_id(id).and_then(|id| self.by_id.get_mut(&id)) } - fn get(&self, id: EndpointId) -> Option<&Endpoint> { + fn get(&self, id: NodeStateKey) -> Option<&NodeState> { self.get_id(id).and_then(|id| self.by_id.get(&id)) } - fn get_or_insert_with(&mut self, id: EndpointId, f: impl FnOnce() -> Options) -> &mut Endpoint { + fn get_or_insert_with( + &mut self, + id: NodeStateKey, + f: impl FnOnce() -> Options, + ) -> &mut NodeState { let id = self.get_id(id); match id { - None => self.insert_endpoint(f()), + None => self.insert_node(f()), Some(id) => self.by_id.get_mut(&id).expect("is not empty"), } } @@ -361,47 +372,47 @@ impl NodeMapInner { self.by_id.len() } - /// Marks the node we believe to be at `ipp` as recently used, returning the [`Endpoint`] if found. - fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { + /// Marks the node we believe to be at `ipp` as recently used. + fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> { let ip_port: IpPort = udp_addr.into(); - let Some(endpoint) = self.get_mut(EndpointId::IpPort(&ip_port)) else { - info!(src=%udp_addr, "receive_udp: no node_map state found for addr, ignore"); + let Some(node_state) = self.get_mut(NodeStateKey::IpPort(&ip_port)) else { + info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore"); return None; }; - endpoint.receive_udp(ip_port, Instant::now()); - Some((*endpoint.public_key(), *endpoint.quic_mapped_addr())) + node_state.receive_udp(ip_port, Instant::now()); + Some((*node_state.public_key(), *node_state.quic_mapped_addr())) } #[instrument(skip_all, fields(src = %src.fmt_short()))] fn receive_relay(&mut self, relay_url: &RelayUrl, src: &PublicKey) -> QuicMappedAddr { - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(src), || { + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || { trace!("packets from unknown node, insert into node map"); Options { - public_key: *src, + node_id: *src, relay_url: Some(relay_url.clone()), active: true, } }); - endpoint.receive_relay(relay_url, src, Instant::now()); - *endpoint.quic_mapped_addr() + node_state.receive_relay(relay_url, src, Instant::now()); + *node_state.quic_mapped_addr() } - fn endpoints(&self) -> impl Iterator { + fn node_states(&self) -> impl Iterator { self.by_id.iter() } - fn endpoints_mut(&mut self) -> impl Iterator { + fn node_states_mut(&mut self) -> impl Iterator { self.by_id.iter_mut() } - /// Get the [`EndpointInfo`]s for each endpoint - fn endpoint_infos(&self, now: Instant) -> Vec { - self.endpoints().map(|(_, ep)| ep.info(now)).collect() + /// Get the [`NodeInfo`]s for each endpoint + fn node_infos(&self, now: Instant) -> Vec { + self.node_states().map(|(_, ep)| ep.info(now)).collect() } - /// Get the [`EndpointInfo`]s for each endpoint - fn endpoint_info(&self, public_key: &PublicKey) -> Option { - self.get(EndpointId::NodeKey(public_key)) + /// Get the [`NodeInfo`]s for each endpoint + fn node_info(&self, public_key: &PublicKey) -> Option { + self.get(NodeStateKey::NodeId(public_key)) .map(|ep| ep.info(Instant::now())) } @@ -415,18 +426,18 @@ impl NodeMapInner { /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result { - match self.get(EndpointId::NodeKey(public_key)) { + match self.get(NodeStateKey::NodeId(public_key)) { Some(ep) => Ok(ConnectionTypeStream { - initial: Some(ep.conn_type.get()), - inner: ep.conn_type.watch().into_stream(), + initial: Some(ep.conn_type()), + inner: ep.conn_type_stream(), }), None => anyhow::bail!("No endpoint for {public_key:?} found"), } } fn handle_pong(&mut self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { - if let Some(ep) = self.get_mut(EndpointId::NodeKey(&sender)).as_mut() { - let insert = ep.handle_pong(&pong, src.into()); + if let Some(ns) = self.get_mut(NodeStateKey::NodeId(&sender)).as_mut() { + let insert = ns.handle_pong(&pong, src.into()); if let Some((src, key)) = insert { self.set_node_key_for_ip_port(src, &key); } @@ -438,23 +449,23 @@ impl NodeMapInner { #[must_use = "actions must be handled"] fn handle_call_me_maybe(&mut self, sender: PublicKey, cm: CallMeMaybe) -> Vec { - let ep_id = EndpointId::NodeKey(&sender); - if let Some(id) = self.get_id(ep_id.clone()) { + let ns_id = NodeStateKey::NodeId(&sender); + if let Some(id) = self.get_id(ns_id.clone()) { for number in &cm.my_numbers { // ensure the new addrs are known - self.set_endpoint_for_ip_port(*number, id); + self.set_node_state_for_ip_port(*number, id); } } - match self.get_mut(ep_id) { + match self.get_mut(ns_id) { None => { inc!(MagicsockMetrics, recv_disco_call_me_maybe_bad_disco); debug!("received call-me-maybe: ignore, node is unknown"); vec![] } - Some(ep) => { + Some(ns) => { debug!(endpoints = ?cm.my_numbers, "received call-me-maybe"); - ep.handle_call_me_maybe(cm) + ns.handle_call_me_maybe(cm) } } } @@ -465,40 +476,41 @@ impl NodeMapInner { src: SendAddr, tx_id: TransactionId, ) -> PingHandled { - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(&sender), || { + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&sender), || { debug!("received ping: node unknown, add to node map"); Options { - public_key: sender, + node_id: sender, relay_url: src.relay_url(), active: true, } }); - let handled = endpoint.handle_ping(src.clone(), tx_id); + let handled = node_state.handle_ping(src.clone(), tx_id); if let SendAddr::Udp(ref addr) = src { - if matches!(handled.role, PingRole::NewEndpoint) { + if matches!(handled.role, PingRole::NewPath) { self.set_node_key_for_ip_port(*addr, &sender); } } handled } - /// Inserts a new endpoint into the [`NodeMap`]. - fn insert_endpoint(&mut self, options: Options) -> &mut Endpoint { + /// Inserts a new node into the [`NodeMap`]. + fn insert_node(&mut self, options: Options) -> &mut NodeState { info!( - node = %options.public_key.fmt_short(), + node = %options.node_id.fmt_short(), relay_url = ?options.relay_url, - "inserting new node endpoint in NodeMap", + "inserting new node in NodeMap", ); let id = self.next_id; self.next_id = self.next_id.wrapping_add(1); - let ep = Endpoint::new(id, options); + let node_state = NodeState::new(id, options); // update indices - self.by_quic_mapped_addr.insert(*ep.quic_mapped_addr(), id); - self.by_node_key.insert(*ep.public_key(), id); + self.by_quic_mapped_addr + .insert(*node_state.quic_mapped_addr(), id); + self.by_node_key.insert(*node_state.public_key(), id); - self.by_id.insert(id, ep); + self.by_id.insert(id, node_state); self.by_id.get_mut(&id).expect("just inserted") } @@ -521,7 +533,7 @@ impl NodeMapInner { } } - fn set_endpoint_for_ip_port(&mut self, ipp: impl Into, id: usize) { + fn set_node_state_for_ip_port(&mut self, ipp: impl Into, id: usize) { let ipp = ipp.into(); trace!(?ipp, ?id, "set endpoint for ip:port"); self.by_ip_port.insert(ipp, id); @@ -629,7 +641,7 @@ impl IpPort { #[cfg(test)] mod tests { - use super::endpoint::MAX_INACTIVE_DIRECT_ADDRESSES; + use super::node_state::MAX_INACTIVE_DIRECT_ADDRESSES; use super::*; use crate::{key::SecretKey, magic_endpoint::AddrInfo}; use std::net::Ipv4Addr; @@ -697,8 +709,8 @@ mod tests { let id = node_map .inner .lock() - .insert_endpoint(Options { - public_key, + .insert_node(Options { + node_id: public_key, relay_url: None, active: false, }) @@ -778,7 +790,7 @@ mod tests { node_map .inner .lock() - .get(EndpointId::NodeKey(&active_node)) + .get(NodeStateKey::NodeId(&active_node)) .expect("should not be pruned"); } } diff --git a/iroh-net/src/magicsock/node_map/endpoint.rs b/iroh-net/src/magicsock/node_map/node_state.rs similarity index 96% rename from iroh-net/src/magicsock/node_map/endpoint.rs rename to iroh-net/src/magicsock/node_map/node_state.rs index 391352bf8a..f468423d3e 100644 --- a/iroh-net/src/magicsock/node_map/endpoint.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -10,7 +10,7 @@ use rand::seq::IteratorRandom; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tracing::{debug, info, instrument, trace, warn}; -use watchable::Watchable; +use watchable::{Watchable, WatcherStream}; use crate::{ disco::{self, SendAddr}, @@ -31,7 +31,7 @@ use super::IpPort; /// Number of addresses that are not active that we keep around per node. /// -/// See [`Endpoint::prune_direct_addresses`]. +/// See [`NodeState::prune_direct_addresses`]. pub(super) const MAX_INACTIVE_DIRECT_ADDRESSES: usize = 20; /// How long since an endpoint path was last active before it might be pruned. @@ -75,14 +75,14 @@ pub(in crate::magicsock) struct SendPing { pub purpose: DiscoPingPurpose, } -/// Indicating an [`Endpoint`] has handled a ping. +/// Indicating an [`NodeState`] has handled a ping. #[derive(Debug)] pub struct PingHandled { - /// What this ping did to the [`Endpoint`]. + /// What this ping did to the [`NodeState`]. pub role: PingRole, /// Whether the sender path should also be pinged. /// - /// This is the case if an [`Endpoint`] does not yet have a direct path, i.e. it has no + /// This is the case if an [`NodeState`] does not yet have a direct path, i.e. it has no /// best_addr. In this case we want to ping right back to open the direct path in this /// direction as well. pub needs_ping_back: Option, @@ -91,24 +91,18 @@ pub struct PingHandled { #[derive(Debug)] pub enum PingRole { Duplicate, - // TODO: Clean up this naming, this is a new path to an endpoint. - NewEndpoint, + NewPath, LikelyHeartbeat, Reactivate, } -/// An endpoint, think [`MagicEndpoint`], which we can have connections with. -/// -/// Each endpoint is also known as a "Node" in the "(iroh) network", but this is a bit of a -/// looser term. +/// An iroh node, which we can have connections with. /// /// The whole point of the magicsock is that we can have multiple **paths** to a particular -/// endpoint. One of these paths is via the endpoint's home relay node but as we establish a +/// node. One of these paths is via the endpoint's home relay node but as we establish a /// connection we'll hopefully discover more direct paths. -/// -/// [`MagicEndpoint`]: crate::MagicEndpoint #[derive(Debug)] -pub(super) struct Endpoint { +pub(super) struct NodeState { /// The ID used as index in the [`NodeMap`]. /// /// [`NodeMap`]: super::NodeMap @@ -139,22 +133,22 @@ pub(super) struct Endpoint { /// do a full ping + call-me-maybe. Usually each side only needs to send one /// call-me-maybe to the other for holes to be punched in both directions however. So /// we only try and send one per [`HEARTBEAT_INTERVAL`]. Each [`HEARTBEAT_INTERVAL`] - /// the [`Endpoint::stayin_alive`] function is called, which will trigger new + /// the [`NodeState::stayin_alive`] function is called, which will trigger new /// call-me-maybe messages as backup. last_call_me_maybe: Option, /// The type of connection we have to the node, either direct, relay, mixed, or none. - pub conn_type: Watchable, + conn_type: Watchable, } #[derive(Debug)] pub(super) struct Options { - pub(super) public_key: PublicKey, + pub(super) node_id: NodeId, pub(super) relay_url: Option, /// Is this endpoint currently active (sending data)? pub(super) active: bool, } -impl Endpoint { +impl NodeState { pub(super) fn new(id: usize, options: Options) -> Self { let quic_mapped_addr = QuicMappedAddr::generate(); @@ -163,10 +157,10 @@ impl Endpoint { inc!(MagicsockMetrics, num_relay_conns_added); } - Endpoint { + NodeState { id, quic_mapped_addr, - node_id: options.public_key, + node_id: options.node_id, last_full_ping: None, relay_url: options.relay_url.map(|url| (url, PathState::default())), best_addr: Default::default(), @@ -190,8 +184,16 @@ impl Endpoint { self.id } + pub(super) fn conn_type(&self) -> ConnectionType { + self.conn_type.get() + } + + pub(super) fn conn_type_stream(&self) -> WatcherStream { + self.conn_type.watch().into_stream() + } + /// Returns info about this endpoint - pub(super) fn info(&self, now: Instant) -> EndpointInfo { + pub(super) fn info(&self, now: Instant) -> NodeInfo { let conn_type = self.conn_type.get(); let latency = match conn_type { ConnectionType::Direct(addr) => self @@ -231,7 +233,7 @@ impl Endpoint { }) .collect(); - EndpointInfo { + NodeInfo { id: self.id, node_id: self.node_id, relay_url: self.relay_url(), @@ -367,7 +369,7 @@ impl Endpoint { /// we only have a relay path, or our path is expired. /// /// When a call-me-maybe message is sent we also need to send pings to all known paths - /// of the endpoint. The [`Endpoint::send_call_me_maybe`] function takes care of this. + /// of the endpoint. The [`NodeState::send_call_me_maybe`] function takes care of this. #[instrument("want_call_me_maybe", skip_all)] fn want_call_me_maybe(&self, now: &Instant) -> bool { trace!("full ping: wanted?"); @@ -406,8 +408,8 @@ impl Endpoint { debug!(tx = %hex::encode(txid), addr = %sp.to, "pong not received in timeout"); match sp.to { SendAddr::Udp(addr) => { - if let Some(ep_state) = self.direct_addr_state.get_mut(&addr.into()) { - ep_state.last_ping = None; + if let Some(path_state) = self.direct_addr_state.get_mut(&addr.into()) { + path_state.last_ping = None; } // If we fail to ping our current best addr, it is not that good anymore. @@ -592,7 +594,7 @@ impl Endpoint { debug!( %ping_dsts, dst = %self.node_id.fmt_short(), - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "sending pings to endpoint", ); self.last_full_ping.replace(now); @@ -627,7 +629,7 @@ impl Endpoint { //TODOFRZ self.direct_addr_state.entry(addr.into()).or_default(); } - let paths = summarize_endpoint_paths(&self.direct_addr_state); + let paths = summarize_node_paths(&self.direct_addr_state); debug!(new = ?n.direct_addresses , %paths, "added new direct paths for endpoint"); } @@ -665,7 +667,7 @@ impl Endpoint { Entry::Vacant(vacant) => { info!(%addr, "new direct addr for node"); vacant.insert(PathState::with_ping(tx_id, now)); - PingRole::NewEndpoint + PingRole::NewPath } }, SendAddr::Relay(ref url) => { @@ -675,19 +677,19 @@ impl Endpoint { // node. In both cases, trust the new confirmed url info!(%url, "new relay addr for node"); self.relay_url = Some((url.clone(), PathState::with_ping(tx_id, now))); - PingRole::NewEndpoint + PingRole::NewPath } Some((_home_url, state)) => state.handle_ping(tx_id, now), None => { info!(%url, "new relay addr for node"); self.relay_url = Some((url.clone(), PathState::with_ping(tx_id, now))); - PingRole::NewEndpoint + PingRole::NewPath } } } }; - if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewEndpoint) { + if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewPath) { self.prune_direct_addresses(); } @@ -710,7 +712,7 @@ impl Endpoint { debug!( ?role, needs_ping_back = ?needs_ping_back.is_some(), - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "endpoint handled ping", ); PingHandled { @@ -741,7 +743,7 @@ impl Endpoint { if prune_count == 0 { // nothing to do, within limits debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "prune addresses: {prune_count} pruned", ); return; @@ -766,7 +768,7 @@ impl Endpoint { ); } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "prune addresses: {prune_count} pruned", ); } @@ -841,7 +843,7 @@ impl Endpoint { } } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "handled pong", ); } @@ -938,7 +940,7 @@ impl Endpoint { } } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "updated endpoint paths from call-me-maybe", ); self.send_pings(now) @@ -1089,7 +1091,7 @@ impl Endpoint { } } -/// State about a particular path to another [`Endpoint`]. +/// State about a particular path to another [`NodeState`]. /// /// This state is used for both the relay path and any direct UDP paths. #[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] @@ -1278,7 +1280,7 @@ impl PathState { } // TODO: Make an `EndpointPaths` struct and do things nicely. -fn summarize_endpoint_paths(paths: &BTreeMap) -> String { +fn summarize_node_paths(paths: &BTreeMap) -> String { use std::fmt::Write; let mut w = String::new(); @@ -1360,9 +1362,9 @@ pub struct DirectAddrInfo { pub last_payload: Option, } -/// Details about an Endpoint. +/// Details about an iroh node which is known to this node. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct EndpointInfo { +pub struct NodeInfo { /// The id in the node_map pub id: usize, /// The public key of the endpoint. @@ -1380,7 +1382,7 @@ pub struct EndpointInfo { pub last_used: Option, } -impl EndpointInfo { +impl NodeInfo { /// Get the duration since the last activity we received from this endpoint /// on any of its direct addresses. pub fn last_received(&self) -> Option { @@ -1449,7 +1451,7 @@ mod tests { )]); let key = SecretKey::generate(); ( - Endpoint { + NodeState { id: 0, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1480,7 +1482,7 @@ mod tests { pong_src: pong_src.clone(), }); let key = SecretKey::generate(); - Endpoint { + NodeState { id: 1, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1500,7 +1502,7 @@ mod tests { // let socket_addr = "0.0.0.0:8".parse().unwrap(); let endpoint_state = BTreeMap::new(); let key = SecretKey::generate(); - Endpoint { + NodeState { id: 2, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1536,7 +1538,7 @@ mod tests { }); let key = SecretKey::generate(); ( - Endpoint { + NodeState { id: 3, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1561,7 +1563,7 @@ mod tests { ) }; let expect = Vec::from([ - EndpointInfo { + NodeInfo { id: a_endpoint.id, node_id: a_endpoint.node_id, relay_url: a_endpoint.relay_url(), @@ -1575,7 +1577,7 @@ mod tests { latency: Some(latency), last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: b_endpoint.id, node_id: b_endpoint.node_id, relay_url: b_endpoint.relay_url(), @@ -1584,7 +1586,7 @@ mod tests { latency: Some(latency), last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: c_endpoint.id, node_id: c_endpoint.node_id, relay_url: c_endpoint.relay_url(), @@ -1593,7 +1595,7 @@ mod tests { latency: None, last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: d_endpoint.id, node_id: d_endpoint.node_id, relay_url: d_endpoint.relay_url(), @@ -1634,7 +1636,7 @@ mod tests { ]), next_id: 5, }); - let mut got = node_map.endpoint_infos(later); + let mut got = node_map.node_infos(later); got.sort_by_key(|p| p.id); assert_eq!(expect, got); } @@ -1646,11 +1648,11 @@ mod tests { let key = SecretKey::generate(); let opts = Options { - public_key: key.public(), + node_id: key.public(), relay_url: None, active: true, }; - let mut ep = Endpoint::new(0, opts); + let mut ep = NodeState::new(0, opts); let my_numbers_count: u16 = (MAX_INACTIVE_DIRECT_ADDRESSES + 5).try_into().unwrap(); let my_numbers = (0u16..my_numbers_count)