diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 5fb0d22e17..02dca5f53d 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -30,7 +30,7 @@ use iroh::{ dns::default_resolver, key::{PublicKey, SecretKey}, magic_endpoint, - magicsock::EndpointInfo, + magicsock::ConnectionInfo, netcheck, portmapper, relay::{RelayMap, RelayMode, RelayUrl}, util::AbortingJoinHandle, @@ -390,7 +390,7 @@ impl Gui { .unwrap_or_else(|| "unknown".to_string()) }; let msg = match endpoint.connection_info(*node_id) { - Some(EndpointInfo { + Some(ConnectionInfo { relay_url, conn_type, latency, diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 6f71cffe01..6109b7b594 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -20,7 +20,7 @@ use crate::{ tls, NodeId, }; -pub use super::magicsock::{EndpointInfo as ConnectionInfo, LocalEndpointsStream}; +pub use super::magicsock::{ConnectionInfo, LocalEndpointsStream}; pub use iroh_base::node_addr::{AddrInfo, NodeAddr}; @@ -376,7 +376,7 @@ impl MagicEndpoint { /// to the internal addressbook through [`MagicEndpoint::add_node_addr`]), so these connections /// are not necessarily active connections. pub fn connection_infos(&self) -> Vec { - self.msock.tracked_endpoints() + self.msock.connection_infos() } /// Get connection information about a specific node. @@ -385,7 +385,7 @@ impl MagicEndpoint { /// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are /// currently communicating with that node over a `Direct` (UDP) or `Relay` (relay) connection. pub fn connection_info(&self, node_id: PublicKey) -> Option { - self.msock.tracked_endpoint(node_id) + self.msock.connection_info(node_id) } pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> { diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index bc9de2f1b3..be1f48e351 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -81,7 +81,7 @@ pub use crate::net::UdpSocket; pub use self::metrics::Metrics; pub use self::node_map::{ - ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, EndpointInfo, + ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, NodeInfo as ConnectionInfo, }; pub use self::timer::Timer; @@ -153,7 +153,7 @@ pub(crate) type RelayContents = SmallVec<[Bytes; 1]>; /// This is responsible for routing packets to nodes based on node IDs, it will initially /// route packets via a relay and transparently try and establish a node-to-node /// connection and upgrade to it. It will also keep looking for better connections as the -/// network details of both endpoints change. +/// network details of both nodes change. /// /// It is usually only necessary to use a single [`MagicSock`] instance in an application, it /// means any QUIC endpoints on top will be sharing as much information about nodes as @@ -342,7 +342,7 @@ impl Inner { let mut transmits_sent = 0; match self .node_map - .get_send_addrs_for_quic_mapped_addr(&dest, self.ipv6_reported.load(Ordering::Relaxed)) + .get_send_addrs(&dest, self.ipv6_reported.load(Ordering::Relaxed)) { Some((public_key, udp_addr, relay_url, mut msgs)) => { let mut pings_sent = false; @@ -443,10 +443,10 @@ impl Inner { Poll::Ready(Ok(transmits_sent)) } None => { - error!(dst=%dest, "no endpoint for mapped address"); + error!(dst=%dest, "no node_state for mapped address"); Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, - "trying to send to unknown endpoint", + "trying to send to unknown node", ))) } } @@ -720,15 +720,15 @@ impl Inner { let handled = self.node_map.handle_ping(*sender, addr.clone(), dm.tx_id); match handled.role { PingRole::Duplicate => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint already confirmed, skip"); + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path already confirmed, skip"); return; } PingRole::LikelyHeartbeat => {} - PingRole::NewEndpoint => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new endpoint"); + PingRole::NewPath => { + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new path"); } PingRole::Reactivate => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint active"); + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path active"); } } @@ -1310,13 +1310,13 @@ impl MagicSock { } /// Retrieve connection information about nodes in the network. - pub fn tracked_endpoints(&self) -> Vec { - self.inner.node_map.endpoint_infos(Instant::now()) + pub fn connection_infos(&self) -> Vec { + self.inner.node_map.node_infos(Instant::now()) } /// Retrieve connection information about a node in the network. - pub fn tracked_endpoint(&self, node_key: PublicKey) -> Option { - self.inner.node_map.endpoint_info(&node_key) + pub fn connection_info(&self, node_key: PublicKey) -> Option { + self.inner.node_map.node_info(&node_key) } /// Returns the local endpoints as a stream. @@ -1719,7 +1719,7 @@ impl Actor { // TODO: this might trigger too many packets at once, pace this self.inner.node_map.prune_inactive(); - let msgs = self.inner.node_map.endpoints_stayin_alive(); + let msgs = self.inner.node_map.nodes_stayin_alive(); self.handle_ping_actions(msgs).await; } _ = endpoints_update_receiver.changed() => { @@ -2281,7 +2281,7 @@ impl Actor { /// This is called when connectivity changes enough that we no longer trust the old routes. #[instrument(skip_all, fields(me = %self.inner.me))] fn reset_endpoint_states(&mut self) { - self.inner.node_map.reset_endpoint_states() + self.inner.node_map.reset_node_states() } /// Tells the relay actor to close stale relay connections. @@ -2605,7 +2605,7 @@ pub(crate) mod tests { fn tracked_endpoints(&self) -> Vec { self.endpoint .magic_sock() - .tracked_endpoints() + .connection_infos() .into_iter() .map(|ep| ep.node_id) .collect() diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index 99fdf52e2c..18e0ba5df9 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -10,13 +10,14 @@ use std::{ use anyhow::{ensure, Context as _}; use futures::Stream; +use iroh_base::key::NodeId; use iroh_metrics::inc; use parking_lot::Mutex; use stun_rs::TransactionId; use tokio::io::AsyncWriteExt; use tracing::{debug, info, instrument, trace, warn}; -use self::endpoint::{Endpoint, Options, PingHandled}; +use self::node_state::{NodeState, Options, PingHandled}; use super::{ metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, QuicMappedAddr, }; @@ -28,20 +29,16 @@ use crate::{ }; mod best_addr; -mod endpoint; +mod node_state; -pub use endpoint::{ConnectionType, ControlMsg, DirectAddrInfo, EndpointInfo}; -pub(super) use endpoint::{DiscoPingPurpose, PingAction, PingRole, SendPing}; +pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, NodeInfo}; +pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing}; /// Number of nodes that are inactive for which we keep info about. This limit is enforced /// periodically via [`NodeMap::prune_inactive`]. const MAX_INACTIVE_NODES: usize = 30; -/// Map of the [`Endpoint`] information for all the known nodes. -/// -/// Each endpoint is also known as a "Node" in the "(iroh) network", but this is a bit of a -/// looser term. It is where "NodeMap" comes from however. -/// +/// Map of the [`NodeState`] information for all the known nodes. /// /// The nodes can be looked up by: /// @@ -65,24 +62,28 @@ pub(super) struct NodeMap { #[derive(Default, Debug)] pub(super) struct NodeMapInner { - by_node_key: HashMap, + by_node_key: HashMap, by_ip_port: HashMap, by_quic_mapped_addr: HashMap, - by_id: HashMap, + by_id: HashMap, next_id: usize, } +/// Identifier to look up a [`NodeState`] in the [`NodeMap`]. +/// +/// You can look up entries in [`NodeMap`] with various keys, depending on the context you +/// have for the node. These are all the keys the [`NodeMap`] can use. #[derive(Clone)] -enum EndpointId<'a> { - Id(&'a usize), - NodeKey(&'a PublicKey), +enum NodeStateKey<'a> { + Idx(&'a usize), + NodeId(&'a NodeId), QuicMappedAddr(&'a QuicMappedAddr), IpPort(&'a IpPort), } impl NodeMap { /// Create a new [`NodeMap`] from data stored in `path`. - pub fn load_from_file(path: impl AsRef) -> anyhow::Result { + pub(super) fn load_from_file(path: impl AsRef) -> anyhow::Result { Ok(Self::from_inner(NodeMapInner::load_from_file(path)?)) } @@ -95,29 +96,29 @@ impl NodeMap { /// Get the known node addresses stored in the map. Nodes with empty addressing information are /// filtered out. #[cfg(test)] - pub fn known_node_addresses(&self) -> Vec { + pub(super) fn known_node_addresses(&self) -> Vec { self.inner.lock().known_node_addresses().collect() } /// Add the contact information for a node. - pub fn add_node_addr(&self, node_addr: NodeAddr) { + pub(super) fn add_node_addr(&self, node_addr: NodeAddr) { self.inner.lock().add_node_addr(node_addr) } /// Number of nodes currently listed. - pub fn node_count(&self) -> usize { + pub(super) fn node_count(&self) -> usize { self.inner.lock().node_count() } - pub fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { + pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { self.inner.lock().receive_udp(udp_addr) } - pub fn receive_relay(&self, relay_url: &RelayUrl, src: PublicKey) -> QuicMappedAddr { + pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: PublicKey) -> QuicMappedAddr { self.inner.lock().receive_relay(relay_url, &src) } - pub fn notify_ping_sent( + pub(super) fn notify_ping_sent( &self, id: usize, dst: SendAddr, @@ -125,30 +126,30 @@ impl NodeMap { purpose: DiscoPingPurpose, msg_sender: tokio::sync::mpsc::Sender, ) { - if let Some(ep) = self.inner.lock().get_mut(EndpointId::Id(&id)) { + if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) { ep.ping_sent(dst, tx_id, purpose, msg_sender); } } - pub fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) { - if let Some(ep) = self.inner.lock().get_mut(EndpointId::Id(&id)) { + pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) { + if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) { ep.ping_timeout(tx_id); } } - pub fn get_quic_mapped_addr_for_node_key( + pub(super) fn get_quic_mapped_addr_for_node_key( &self, node_key: &PublicKey, ) -> Option { self.inner .lock() - .get(EndpointId::NodeKey(node_key)) + .get(NodeStateKey::NodeId(node_key)) .map(|ep| *ep.quic_mapped_addr()) } /// Insert a received ping into the node map, and return whether a ping with this tx_id was already /// received. - pub fn handle_ping( + pub(super) fn handle_ping( &self, sender: PublicKey, src: SendAddr, @@ -157,17 +158,21 @@ impl NodeMap { self.inner.lock().handle_ping(sender, src, tx_id) } - pub fn handle_pong(&self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { + pub(super) fn handle_pong(&self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { self.inner.lock().handle_pong(sender, src, pong) } #[must_use = "actions must be handled"] - pub fn handle_call_me_maybe(&self, sender: PublicKey, cm: CallMeMaybe) -> Vec { + pub(super) fn handle_call_me_maybe( + &self, + sender: PublicKey, + cm: CallMeMaybe, + ) -> Vec { self.inner.lock().handle_call_me_maybe(sender, cm) } #[allow(clippy::type_complexity)] - pub fn get_send_addrs_for_quic_mapped_addr( + pub(super) fn get_send_addrs( &self, addr: &QuicMappedAddr, have_ipv6: bool, @@ -178,38 +183,37 @@ impl NodeMap { Vec, )> { let mut inner = self.inner.lock(); - let ep = inner.get_mut(EndpointId::QuicMappedAddr(addr))?; + let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; let public_key = *ep.public_key(); let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); Some((public_key, udp_addr, relay_url, msgs)) } - pub fn notify_shutdown(&self) { + pub(super) fn notify_shutdown(&self) { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { + for (_, ep) in inner.node_states_mut() { ep.reset(); } } - pub fn reset_endpoint_states(&self) { + pub(super) fn reset_node_states(&self) { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { + for (_, ep) in inner.node_states_mut() { ep.note_connectivity_change(); } } - pub fn endpoints_stayin_alive(&self) -> Vec { - let mut msgs = Vec::new(); + pub(super) fn nodes_stayin_alive(&self) -> Vec { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { - msgs.extend(ep.stayin_alive()); - } - msgs + inner + .node_states_mut() + .flat_map(|(_idx, node_state)| node_state.stayin_alive()) + .collect() } - /// Get the [`EndpointInfo`]s for each endpoint - pub fn endpoint_infos(&self, now: Instant) -> Vec { - self.inner.lock().endpoint_infos(now) + /// Gets the [`NodeInfo`]s for each endpoint + pub(super) fn node_infos(&self, now: Instant) -> Vec { + self.inner.lock().node_infos(now) } /// Returns a stream of [`ConnectionType`]. @@ -221,17 +225,20 @@ impl NodeMap { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` - pub fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result { + pub(super) fn conn_type_stream( + &self, + public_key: &PublicKey, + ) -> anyhow::Result { self.inner.lock().conn_type_stream(public_key) } - /// Get the [`EndpointInfo`]s for each endpoint - pub fn endpoint_info(&self, public_key: &PublicKey) -> Option { - self.inner.lock().endpoint_info(public_key) + /// Get the [`NodeInfo`]s for each endpoint + pub(super) fn node_info(&self, public_key: &PublicKey) -> Option { + self.inner.lock().node_info(public_key) } /// Saves the known node info to the given path, returning the number of nodes persisted. - pub async fn save_to_file(&self, path: &Path) -> anyhow::Result { + pub(super) async fn save_to_file(&self, path: &Path) -> anyhow::Result { ensure!(!path.is_dir(), "{} must be a file", path.display()); // So, not sure what to do here. @@ -282,7 +289,7 @@ impl NodeMap { } /// Prunes nodes without recent activity so that at most [`MAX_INACTIVE_NODES`] are kept. - pub fn prune_inactive(&self) { + pub(super) fn prune_inactive(&self) { self.inner.lock().prune_inactive(); } } @@ -318,40 +325,44 @@ impl NodeMapInner { fn add_node_addr(&mut self, node_addr: NodeAddr) { let NodeAddr { node_id, info } = node_addr; - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(&node_id), || Options { - public_key: node_id, + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&node_id), || Options { + node_id, relay_url: info.relay_url.clone(), active: false, }); - endpoint.update_from_node_addr(&info); - let id = endpoint.id(); - for endpoint in &info.direct_addresses { - self.set_endpoint_for_ip_port(*endpoint, id); + node_state.update_from_node_addr(&info); + let id = node_state.id(); + for addr in &info.direct_addresses { + self.set_node_state_for_ip_port(*addr, id); } } - fn get_id(&self, id: EndpointId) -> Option { + fn get_id(&self, id: NodeStateKey) -> Option { match id { - EndpointId::Id(id) => Some(*id), - EndpointId::NodeKey(node_key) => self.by_node_key.get(node_key).copied(), - EndpointId::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(), - EndpointId::IpPort(ipp) => self.by_ip_port.get(ipp).copied(), + NodeStateKey::Idx(id) => Some(*id), + NodeStateKey::NodeId(node_key) => self.by_node_key.get(node_key).copied(), + NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(), + NodeStateKey::IpPort(ipp) => self.by_ip_port.get(ipp).copied(), } } - fn get_mut(&mut self, id: EndpointId) -> Option<&mut Endpoint> { + fn get_mut(&mut self, id: NodeStateKey) -> Option<&mut NodeState> { self.get_id(id).and_then(|id| self.by_id.get_mut(&id)) } - fn get(&self, id: EndpointId) -> Option<&Endpoint> { + fn get(&self, id: NodeStateKey) -> Option<&NodeState> { self.get_id(id).and_then(|id| self.by_id.get(&id)) } - fn get_or_insert_with(&mut self, id: EndpointId, f: impl FnOnce() -> Options) -> &mut Endpoint { + fn get_or_insert_with( + &mut self, + id: NodeStateKey, + f: impl FnOnce() -> Options, + ) -> &mut NodeState { let id = self.get_id(id); match id { - None => self.insert_endpoint(f()), + None => self.insert_node(f()), Some(id) => self.by_id.get_mut(&id).expect("is not empty"), } } @@ -361,47 +372,47 @@ impl NodeMapInner { self.by_id.len() } - /// Marks the node we believe to be at `ipp` as recently used, returning the [`Endpoint`] if found. - fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { + /// Marks the node we believe to be at `ipp` as recently used. + fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> { let ip_port: IpPort = udp_addr.into(); - let Some(endpoint) = self.get_mut(EndpointId::IpPort(&ip_port)) else { - info!(src=%udp_addr, "receive_udp: no node_map state found for addr, ignore"); + let Some(node_state) = self.get_mut(NodeStateKey::IpPort(&ip_port)) else { + info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore"); return None; }; - endpoint.receive_udp(ip_port, Instant::now()); - Some((*endpoint.public_key(), *endpoint.quic_mapped_addr())) + node_state.receive_udp(ip_port, Instant::now()); + Some((*node_state.public_key(), *node_state.quic_mapped_addr())) } #[instrument(skip_all, fields(src = %src.fmt_short()))] fn receive_relay(&mut self, relay_url: &RelayUrl, src: &PublicKey) -> QuicMappedAddr { - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(src), || { + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || { trace!("packets from unknown node, insert into node map"); Options { - public_key: *src, + node_id: *src, relay_url: Some(relay_url.clone()), active: true, } }); - endpoint.receive_relay(relay_url, src, Instant::now()); - *endpoint.quic_mapped_addr() + node_state.receive_relay(relay_url, src, Instant::now()); + *node_state.quic_mapped_addr() } - fn endpoints(&self) -> impl Iterator { + fn node_states(&self) -> impl Iterator { self.by_id.iter() } - fn endpoints_mut(&mut self) -> impl Iterator { + fn node_states_mut(&mut self) -> impl Iterator { self.by_id.iter_mut() } - /// Get the [`EndpointInfo`]s for each endpoint - fn endpoint_infos(&self, now: Instant) -> Vec { - self.endpoints().map(|(_, ep)| ep.info(now)).collect() + /// Get the [`NodeInfo`]s for each endpoint + fn node_infos(&self, now: Instant) -> Vec { + self.node_states().map(|(_, ep)| ep.info(now)).collect() } - /// Get the [`EndpointInfo`]s for each endpoint - fn endpoint_info(&self, public_key: &PublicKey) -> Option { - self.get(EndpointId::NodeKey(public_key)) + /// Get the [`NodeInfo`]s for each endpoint + fn node_info(&self, public_key: &PublicKey) -> Option { + self.get(NodeStateKey::NodeId(public_key)) .map(|ep| ep.info(Instant::now())) } @@ -415,18 +426,18 @@ impl NodeMapInner { /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result { - match self.get(EndpointId::NodeKey(public_key)) { + match self.get(NodeStateKey::NodeId(public_key)) { Some(ep) => Ok(ConnectionTypeStream { - initial: Some(ep.conn_type.get()), - inner: ep.conn_type.watch().into_stream(), + initial: Some(ep.conn_type()), + inner: ep.conn_type_stream(), }), None => anyhow::bail!("No endpoint for {public_key:?} found"), } } fn handle_pong(&mut self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { - if let Some(ep) = self.get_mut(EndpointId::NodeKey(&sender)).as_mut() { - let insert = ep.handle_pong(&pong, src.into()); + if let Some(ns) = self.get_mut(NodeStateKey::NodeId(&sender)).as_mut() { + let insert = ns.handle_pong(&pong, src.into()); if let Some((src, key)) = insert { self.set_node_key_for_ip_port(src, &key); } @@ -438,23 +449,23 @@ impl NodeMapInner { #[must_use = "actions must be handled"] fn handle_call_me_maybe(&mut self, sender: PublicKey, cm: CallMeMaybe) -> Vec { - let ep_id = EndpointId::NodeKey(&sender); - if let Some(id) = self.get_id(ep_id.clone()) { + let ns_id = NodeStateKey::NodeId(&sender); + if let Some(id) = self.get_id(ns_id.clone()) { for number in &cm.my_numbers { // ensure the new addrs are known - self.set_endpoint_for_ip_port(*number, id); + self.set_node_state_for_ip_port(*number, id); } } - match self.get_mut(ep_id) { + match self.get_mut(ns_id) { None => { inc!(MagicsockMetrics, recv_disco_call_me_maybe_bad_disco); debug!("received call-me-maybe: ignore, node is unknown"); vec![] } - Some(ep) => { + Some(ns) => { debug!(endpoints = ?cm.my_numbers, "received call-me-maybe"); - ep.handle_call_me_maybe(cm) + ns.handle_call_me_maybe(cm) } } } @@ -465,40 +476,41 @@ impl NodeMapInner { src: SendAddr, tx_id: TransactionId, ) -> PingHandled { - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(&sender), || { + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&sender), || { debug!("received ping: node unknown, add to node map"); Options { - public_key: sender, + node_id: sender, relay_url: src.relay_url(), active: true, } }); - let handled = endpoint.handle_ping(src.clone(), tx_id); + let handled = node_state.handle_ping(src.clone(), tx_id); if let SendAddr::Udp(ref addr) = src { - if matches!(handled.role, PingRole::NewEndpoint) { + if matches!(handled.role, PingRole::NewPath) { self.set_node_key_for_ip_port(*addr, &sender); } } handled } - /// Inserts a new endpoint into the [`NodeMap`]. - fn insert_endpoint(&mut self, options: Options) -> &mut Endpoint { + /// Inserts a new node into the [`NodeMap`]. + fn insert_node(&mut self, options: Options) -> &mut NodeState { info!( - node = %options.public_key.fmt_short(), + node = %options.node_id.fmt_short(), relay_url = ?options.relay_url, - "inserting new node endpoint in NodeMap", + "inserting new node in NodeMap", ); let id = self.next_id; self.next_id = self.next_id.wrapping_add(1); - let ep = Endpoint::new(id, options); + let node_state = NodeState::new(id, options); // update indices - self.by_quic_mapped_addr.insert(*ep.quic_mapped_addr(), id); - self.by_node_key.insert(*ep.public_key(), id); + self.by_quic_mapped_addr + .insert(*node_state.quic_mapped_addr(), id); + self.by_node_key.insert(*node_state.public_key(), id); - self.by_id.insert(id, ep); + self.by_id.insert(id, node_state); self.by_id.get_mut(&id).expect("just inserted") } @@ -521,7 +533,7 @@ impl NodeMapInner { } } - fn set_endpoint_for_ip_port(&mut self, ipp: impl Into, id: usize) { + fn set_node_state_for_ip_port(&mut self, ipp: impl Into, id: usize) { let ipp = ipp.into(); trace!(?ipp, ?id, "set endpoint for ip:port"); self.by_ip_port.insert(ipp, id); @@ -629,7 +641,7 @@ impl IpPort { #[cfg(test)] mod tests { - use super::endpoint::MAX_INACTIVE_DIRECT_ADDRESSES; + use super::node_state::MAX_INACTIVE_DIRECT_ADDRESSES; use super::*; use crate::{key::SecretKey, magic_endpoint::AddrInfo}; use std::net::Ipv4Addr; @@ -697,8 +709,8 @@ mod tests { let id = node_map .inner .lock() - .insert_endpoint(Options { - public_key, + .insert_node(Options { + node_id: public_key, relay_url: None, active: false, }) @@ -778,7 +790,7 @@ mod tests { node_map .inner .lock() - .get(EndpointId::NodeKey(&active_node)) + .get(NodeStateKey::NodeId(&active_node)) .expect("should not be pruned"); } } diff --git a/iroh-net/src/magicsock/node_map/endpoint.rs b/iroh-net/src/magicsock/node_map/node_state.rs similarity index 96% rename from iroh-net/src/magicsock/node_map/endpoint.rs rename to iroh-net/src/magicsock/node_map/node_state.rs index 391352bf8a..f468423d3e 100644 --- a/iroh-net/src/magicsock/node_map/endpoint.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -10,7 +10,7 @@ use rand::seq::IteratorRandom; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tracing::{debug, info, instrument, trace, warn}; -use watchable::Watchable; +use watchable::{Watchable, WatcherStream}; use crate::{ disco::{self, SendAddr}, @@ -31,7 +31,7 @@ use super::IpPort; /// Number of addresses that are not active that we keep around per node. /// -/// See [`Endpoint::prune_direct_addresses`]. +/// See [`NodeState::prune_direct_addresses`]. pub(super) const MAX_INACTIVE_DIRECT_ADDRESSES: usize = 20; /// How long since an endpoint path was last active before it might be pruned. @@ -75,14 +75,14 @@ pub(in crate::magicsock) struct SendPing { pub purpose: DiscoPingPurpose, } -/// Indicating an [`Endpoint`] has handled a ping. +/// Indicating an [`NodeState`] has handled a ping. #[derive(Debug)] pub struct PingHandled { - /// What this ping did to the [`Endpoint`]. + /// What this ping did to the [`NodeState`]. pub role: PingRole, /// Whether the sender path should also be pinged. /// - /// This is the case if an [`Endpoint`] does not yet have a direct path, i.e. it has no + /// This is the case if an [`NodeState`] does not yet have a direct path, i.e. it has no /// best_addr. In this case we want to ping right back to open the direct path in this /// direction as well. pub needs_ping_back: Option, @@ -91,24 +91,18 @@ pub struct PingHandled { #[derive(Debug)] pub enum PingRole { Duplicate, - // TODO: Clean up this naming, this is a new path to an endpoint. - NewEndpoint, + NewPath, LikelyHeartbeat, Reactivate, } -/// An endpoint, think [`MagicEndpoint`], which we can have connections with. -/// -/// Each endpoint is also known as a "Node" in the "(iroh) network", but this is a bit of a -/// looser term. +/// An iroh node, which we can have connections with. /// /// The whole point of the magicsock is that we can have multiple **paths** to a particular -/// endpoint. One of these paths is via the endpoint's home relay node but as we establish a +/// node. One of these paths is via the endpoint's home relay node but as we establish a /// connection we'll hopefully discover more direct paths. -/// -/// [`MagicEndpoint`]: crate::MagicEndpoint #[derive(Debug)] -pub(super) struct Endpoint { +pub(super) struct NodeState { /// The ID used as index in the [`NodeMap`]. /// /// [`NodeMap`]: super::NodeMap @@ -139,22 +133,22 @@ pub(super) struct Endpoint { /// do a full ping + call-me-maybe. Usually each side only needs to send one /// call-me-maybe to the other for holes to be punched in both directions however. So /// we only try and send one per [`HEARTBEAT_INTERVAL`]. Each [`HEARTBEAT_INTERVAL`] - /// the [`Endpoint::stayin_alive`] function is called, which will trigger new + /// the [`NodeState::stayin_alive`] function is called, which will trigger new /// call-me-maybe messages as backup. last_call_me_maybe: Option, /// The type of connection we have to the node, either direct, relay, mixed, or none. - pub conn_type: Watchable, + conn_type: Watchable, } #[derive(Debug)] pub(super) struct Options { - pub(super) public_key: PublicKey, + pub(super) node_id: NodeId, pub(super) relay_url: Option, /// Is this endpoint currently active (sending data)? pub(super) active: bool, } -impl Endpoint { +impl NodeState { pub(super) fn new(id: usize, options: Options) -> Self { let quic_mapped_addr = QuicMappedAddr::generate(); @@ -163,10 +157,10 @@ impl Endpoint { inc!(MagicsockMetrics, num_relay_conns_added); } - Endpoint { + NodeState { id, quic_mapped_addr, - node_id: options.public_key, + node_id: options.node_id, last_full_ping: None, relay_url: options.relay_url.map(|url| (url, PathState::default())), best_addr: Default::default(), @@ -190,8 +184,16 @@ impl Endpoint { self.id } + pub(super) fn conn_type(&self) -> ConnectionType { + self.conn_type.get() + } + + pub(super) fn conn_type_stream(&self) -> WatcherStream { + self.conn_type.watch().into_stream() + } + /// Returns info about this endpoint - pub(super) fn info(&self, now: Instant) -> EndpointInfo { + pub(super) fn info(&self, now: Instant) -> NodeInfo { let conn_type = self.conn_type.get(); let latency = match conn_type { ConnectionType::Direct(addr) => self @@ -231,7 +233,7 @@ impl Endpoint { }) .collect(); - EndpointInfo { + NodeInfo { id: self.id, node_id: self.node_id, relay_url: self.relay_url(), @@ -367,7 +369,7 @@ impl Endpoint { /// we only have a relay path, or our path is expired. /// /// When a call-me-maybe message is sent we also need to send pings to all known paths - /// of the endpoint. The [`Endpoint::send_call_me_maybe`] function takes care of this. + /// of the endpoint. The [`NodeState::send_call_me_maybe`] function takes care of this. #[instrument("want_call_me_maybe", skip_all)] fn want_call_me_maybe(&self, now: &Instant) -> bool { trace!("full ping: wanted?"); @@ -406,8 +408,8 @@ impl Endpoint { debug!(tx = %hex::encode(txid), addr = %sp.to, "pong not received in timeout"); match sp.to { SendAddr::Udp(addr) => { - if let Some(ep_state) = self.direct_addr_state.get_mut(&addr.into()) { - ep_state.last_ping = None; + if let Some(path_state) = self.direct_addr_state.get_mut(&addr.into()) { + path_state.last_ping = None; } // If we fail to ping our current best addr, it is not that good anymore. @@ -592,7 +594,7 @@ impl Endpoint { debug!( %ping_dsts, dst = %self.node_id.fmt_short(), - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "sending pings to endpoint", ); self.last_full_ping.replace(now); @@ -627,7 +629,7 @@ impl Endpoint { //TODOFRZ self.direct_addr_state.entry(addr.into()).or_default(); } - let paths = summarize_endpoint_paths(&self.direct_addr_state); + let paths = summarize_node_paths(&self.direct_addr_state); debug!(new = ?n.direct_addresses , %paths, "added new direct paths for endpoint"); } @@ -665,7 +667,7 @@ impl Endpoint { Entry::Vacant(vacant) => { info!(%addr, "new direct addr for node"); vacant.insert(PathState::with_ping(tx_id, now)); - PingRole::NewEndpoint + PingRole::NewPath } }, SendAddr::Relay(ref url) => { @@ -675,19 +677,19 @@ impl Endpoint { // node. In both cases, trust the new confirmed url info!(%url, "new relay addr for node"); self.relay_url = Some((url.clone(), PathState::with_ping(tx_id, now))); - PingRole::NewEndpoint + PingRole::NewPath } Some((_home_url, state)) => state.handle_ping(tx_id, now), None => { info!(%url, "new relay addr for node"); self.relay_url = Some((url.clone(), PathState::with_ping(tx_id, now))); - PingRole::NewEndpoint + PingRole::NewPath } } } }; - if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewEndpoint) { + if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewPath) { self.prune_direct_addresses(); } @@ -710,7 +712,7 @@ impl Endpoint { debug!( ?role, needs_ping_back = ?needs_ping_back.is_some(), - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "endpoint handled ping", ); PingHandled { @@ -741,7 +743,7 @@ impl Endpoint { if prune_count == 0 { // nothing to do, within limits debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "prune addresses: {prune_count} pruned", ); return; @@ -766,7 +768,7 @@ impl Endpoint { ); } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "prune addresses: {prune_count} pruned", ); } @@ -841,7 +843,7 @@ impl Endpoint { } } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "handled pong", ); } @@ -938,7 +940,7 @@ impl Endpoint { } } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "updated endpoint paths from call-me-maybe", ); self.send_pings(now) @@ -1089,7 +1091,7 @@ impl Endpoint { } } -/// State about a particular path to another [`Endpoint`]. +/// State about a particular path to another [`NodeState`]. /// /// This state is used for both the relay path and any direct UDP paths. #[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] @@ -1278,7 +1280,7 @@ impl PathState { } // TODO: Make an `EndpointPaths` struct and do things nicely. -fn summarize_endpoint_paths(paths: &BTreeMap) -> String { +fn summarize_node_paths(paths: &BTreeMap) -> String { use std::fmt::Write; let mut w = String::new(); @@ -1360,9 +1362,9 @@ pub struct DirectAddrInfo { pub last_payload: Option, } -/// Details about an Endpoint. +/// Details about an iroh node which is known to this node. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct EndpointInfo { +pub struct NodeInfo { /// The id in the node_map pub id: usize, /// The public key of the endpoint. @@ -1380,7 +1382,7 @@ pub struct EndpointInfo { pub last_used: Option, } -impl EndpointInfo { +impl NodeInfo { /// Get the duration since the last activity we received from this endpoint /// on any of its direct addresses. pub fn last_received(&self) -> Option { @@ -1449,7 +1451,7 @@ mod tests { )]); let key = SecretKey::generate(); ( - Endpoint { + NodeState { id: 0, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1480,7 +1482,7 @@ mod tests { pong_src: pong_src.clone(), }); let key = SecretKey::generate(); - Endpoint { + NodeState { id: 1, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1500,7 +1502,7 @@ mod tests { // let socket_addr = "0.0.0.0:8".parse().unwrap(); let endpoint_state = BTreeMap::new(); let key = SecretKey::generate(); - Endpoint { + NodeState { id: 2, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1536,7 +1538,7 @@ mod tests { }); let key = SecretKey::generate(); ( - Endpoint { + NodeState { id: 3, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1561,7 +1563,7 @@ mod tests { ) }; let expect = Vec::from([ - EndpointInfo { + NodeInfo { id: a_endpoint.id, node_id: a_endpoint.node_id, relay_url: a_endpoint.relay_url(), @@ -1575,7 +1577,7 @@ mod tests { latency: Some(latency), last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: b_endpoint.id, node_id: b_endpoint.node_id, relay_url: b_endpoint.relay_url(), @@ -1584,7 +1586,7 @@ mod tests { latency: Some(latency), last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: c_endpoint.id, node_id: c_endpoint.node_id, relay_url: c_endpoint.relay_url(), @@ -1593,7 +1595,7 @@ mod tests { latency: None, last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: d_endpoint.id, node_id: d_endpoint.node_id, relay_url: d_endpoint.relay_url(), @@ -1634,7 +1636,7 @@ mod tests { ]), next_id: 5, }); - let mut got = node_map.endpoint_infos(later); + let mut got = node_map.node_infos(later); got.sort_by_key(|p| p.id); assert_eq!(expect, got); } @@ -1646,11 +1648,11 @@ mod tests { let key = SecretKey::generate(); let opts = Options { - public_key: key.public(), + node_id: key.public(), relay_url: None, active: true, }; - let mut ep = Endpoint::new(0, opts); + let mut ep = NodeState::new(0, opts); let my_numbers_count: u16 = (MAX_INACTIVE_DIRECT_ADDRESSES + 5).try_into().unwrap(); let my_numbers = (0u16..my_numbers_count)