From 9ee8a24d8f74135986812cb7f984042a159c7891 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 23 Apr 2024 12:29:10 +0200 Subject: [PATCH] refactor(iroh-net): Rename endpoint for nodes to node_state We still have many things named "endpoint", this targets cleaning up the naming of the state for each node that is stored in the NodeMap. This is now called NodeState instead of Endpoint. All related APIs now talk about nodes and node states instead of endpoints. Another minor cleanup is in the NodeState we had private fields with accessor functions, except for one field which was directly accessible. This is migrated to accessor functions for consistency as well. --- iroh-cli/src/commands/doctor.rs | 4 +- iroh-net/src/magic_endpoint.rs | 6 +- iroh-net/src/magicsock.rs | 32 +-- iroh-net/src/magicsock/node_map.rs | 187 +++++++++--------- .../node_map/{endpoint.rs => node_state.rs} | 94 ++++----- 5 files changed, 165 insertions(+), 158 deletions(-) rename iroh-net/src/magicsock/node_map/{endpoint.rs => node_state.rs} (96%) diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 5fb0d22e17..33690a8e96 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -30,7 +30,7 @@ use iroh::{ dns::default_resolver, key::{PublicKey, SecretKey}, magic_endpoint, - magicsock::EndpointInfo, + magicsock::NodeInfo, netcheck, portmapper, relay::{RelayMap, RelayMode, RelayUrl}, util::AbortingJoinHandle, @@ -390,7 +390,7 @@ impl Gui { .unwrap_or_else(|| "unknown".to_string()) }; let msg = match endpoint.connection_info(*node_id) { - Some(EndpointInfo { + Some(NodeInfo { relay_url, conn_type, latency, diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 6f71cffe01..78f8137a02 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -20,7 +20,7 @@ use crate::{ tls, NodeId, }; -pub use super::magicsock::{EndpointInfo as ConnectionInfo, LocalEndpointsStream}; +pub use super::magicsock::{LocalEndpointsStream, NodeInfo as ConnectionInfo}; pub use iroh_base::node_addr::{AddrInfo, NodeAddr}; @@ -376,7 +376,7 @@ impl MagicEndpoint { /// to the internal addressbook through [`MagicEndpoint::add_node_addr`]), so these connections /// are not necessarily active connections. pub fn connection_infos(&self) -> Vec { - self.msock.tracked_endpoints() + self.msock.tracked_nodes() } /// Get connection information about a specific node. @@ -385,7 +385,7 @@ impl MagicEndpoint { /// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are /// currently communicating with that node over a `Direct` (UDP) or `Relay` (relay) connection. pub fn connection_info(&self, node_id: PublicKey) -> Option { - self.msock.tracked_endpoint(node_id) + self.msock.tracked_node(node_id) } pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> { diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index bc9de2f1b3..5cf47df161 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -81,7 +81,7 @@ pub use crate::net::UdpSocket; pub use self::metrics::Metrics; pub use self::node_map::{ - ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, EndpointInfo, + ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, NodeInfo, }; pub use self::timer::Timer; @@ -153,7 +153,7 @@ pub(crate) type RelayContents = SmallVec<[Bytes; 1]>; /// This is responsible for routing packets to nodes based on node IDs, it will initially /// route packets via a relay and transparently try and establish a node-to-node /// connection and upgrade to it. It will also keep looking for better connections as the -/// network details of both endpoints change. +/// network details of both nodes change. /// /// It is usually only necessary to use a single [`MagicSock`] instance in an application, it /// means any QUIC endpoints on top will be sharing as much information about nodes as @@ -342,7 +342,7 @@ impl Inner { let mut transmits_sent = 0; match self .node_map - .get_send_addrs_for_quic_mapped_addr(&dest, self.ipv6_reported.load(Ordering::Relaxed)) + .get_send_addrs(&dest, self.ipv6_reported.load(Ordering::Relaxed)) { Some((public_key, udp_addr, relay_url, mut msgs)) => { let mut pings_sent = false; @@ -443,10 +443,10 @@ impl Inner { Poll::Ready(Ok(transmits_sent)) } None => { - error!(dst=%dest, "no endpoint for mapped address"); + error!(dst=%dest, "no node_state for mapped address"); Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, - "trying to send to unknown endpoint", + "trying to send to unknown node", ))) } } @@ -720,15 +720,15 @@ impl Inner { let handled = self.node_map.handle_ping(*sender, addr.clone(), dm.tx_id); match handled.role { PingRole::Duplicate => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint already confirmed, skip"); + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path already confirmed, skip"); return; } PingRole::LikelyHeartbeat => {} - PingRole::NewEndpoint => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new endpoint"); + PingRole::NewPath => { + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new path"); } PingRole::Reactivate => { - debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint active"); + debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path active"); } } @@ -1310,13 +1310,13 @@ impl MagicSock { } /// Retrieve connection information about nodes in the network. - pub fn tracked_endpoints(&self) -> Vec { - self.inner.node_map.endpoint_infos(Instant::now()) + pub fn tracked_nodes(&self) -> Vec { + self.inner.node_map.node_infos(Instant::now()) } /// Retrieve connection information about a node in the network. - pub fn tracked_endpoint(&self, node_key: PublicKey) -> Option { - self.inner.node_map.endpoint_info(&node_key) + pub fn tracked_node(&self, node_key: PublicKey) -> Option { + self.inner.node_map.node_info(&node_key) } /// Returns the local endpoints as a stream. @@ -1719,7 +1719,7 @@ impl Actor { // TODO: this might trigger too many packets at once, pace this self.inner.node_map.prune_inactive(); - let msgs = self.inner.node_map.endpoints_stayin_alive(); + let msgs = self.inner.node_map.nodes_stayin_alive(); self.handle_ping_actions(msgs).await; } _ = endpoints_update_receiver.changed() => { @@ -2281,7 +2281,7 @@ impl Actor { /// This is called when connectivity changes enough that we no longer trust the old routes. #[instrument(skip_all, fields(me = %self.inner.me))] fn reset_endpoint_states(&mut self) { - self.inner.node_map.reset_endpoint_states() + self.inner.node_map.reset_node_states() } /// Tells the relay actor to close stale relay connections. @@ -2605,7 +2605,7 @@ pub(crate) mod tests { fn tracked_endpoints(&self) -> Vec { self.endpoint .magic_sock() - .tracked_endpoints() + .tracked_nodes() .into_iter() .map(|ep| ep.node_id) .collect() diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index 99fdf52e2c..361e4bc8a7 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -10,13 +10,14 @@ use std::{ use anyhow::{ensure, Context as _}; use futures::Stream; +use iroh_base::key::NodeId; use iroh_metrics::inc; use parking_lot::Mutex; use stun_rs::TransactionId; use tokio::io::AsyncWriteExt; use tracing::{debug, info, instrument, trace, warn}; -use self::endpoint::{Endpoint, Options, PingHandled}; +use self::node_state::{NodeState, Options, PingHandled}; use super::{ metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, QuicMappedAddr, }; @@ -28,20 +29,16 @@ use crate::{ }; mod best_addr; -mod endpoint; +mod node_state; -pub use endpoint::{ConnectionType, ControlMsg, DirectAddrInfo, EndpointInfo}; -pub(super) use endpoint::{DiscoPingPurpose, PingAction, PingRole, SendPing}; +pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, NodeInfo}; +pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing}; /// Number of nodes that are inactive for which we keep info about. This limit is enforced /// periodically via [`NodeMap::prune_inactive`]. const MAX_INACTIVE_NODES: usize = 30; -/// Map of the [`Endpoint`] information for all the known nodes. -/// -/// Each endpoint is also known as a "Node" in the "(iroh) network", but this is a bit of a -/// looser term. It is where "NodeMap" comes from however. -/// +/// Map of the [`NodeState`] information for all the known nodes. /// /// The nodes can be looked up by: /// @@ -65,17 +62,21 @@ pub(super) struct NodeMap { #[derive(Default, Debug)] pub(super) struct NodeMapInner { - by_node_key: HashMap, + by_node_key: HashMap, by_ip_port: HashMap, by_quic_mapped_addr: HashMap, - by_id: HashMap, + by_id: HashMap, next_id: usize, } +/// Identifier to look up a [`NodeState`] in the [`NodeMap`]. +/// +/// You can look up entries in [`NodeMap`] with various keys, depending on the context you +/// have for the node. These are all the keys the [`NodeMap`] can use. #[derive(Clone)] -enum EndpointId<'a> { - Id(&'a usize), - NodeKey(&'a PublicKey), +enum NodeStateKey<'a> { + Idx(&'a usize), + NodeId(&'a NodeId), QuicMappedAddr(&'a QuicMappedAddr), IpPort(&'a IpPort), } @@ -125,13 +126,13 @@ impl NodeMap { purpose: DiscoPingPurpose, msg_sender: tokio::sync::mpsc::Sender, ) { - if let Some(ep) = self.inner.lock().get_mut(EndpointId::Id(&id)) { + if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) { ep.ping_sent(dst, tx_id, purpose, msg_sender); } } pub fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) { - if let Some(ep) = self.inner.lock().get_mut(EndpointId::Id(&id)) { + if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) { ep.ping_timeout(tx_id); } } @@ -142,7 +143,7 @@ impl NodeMap { ) -> Option { self.inner .lock() - .get(EndpointId::NodeKey(node_key)) + .get(NodeStateKey::NodeId(node_key)) .map(|ep| *ep.quic_mapped_addr()) } @@ -167,7 +168,7 @@ impl NodeMap { } #[allow(clippy::type_complexity)] - pub fn get_send_addrs_for_quic_mapped_addr( + pub fn get_send_addrs( &self, addr: &QuicMappedAddr, have_ipv6: bool, @@ -178,7 +179,7 @@ impl NodeMap { Vec, )> { let mut inner = self.inner.lock(); - let ep = inner.get_mut(EndpointId::QuicMappedAddr(addr))?; + let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; let public_key = *ep.public_key(); let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); Some((public_key, udp_addr, relay_url, msgs)) @@ -186,30 +187,29 @@ impl NodeMap { pub fn notify_shutdown(&self) { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { + for (_, ep) in inner.node_states_mut() { ep.reset(); } } - pub fn reset_endpoint_states(&self) { + pub fn reset_node_states(&self) { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { + for (_, ep) in inner.node_states_mut() { ep.note_connectivity_change(); } } - pub fn endpoints_stayin_alive(&self) -> Vec { - let mut msgs = Vec::new(); + pub fn nodes_stayin_alive(&self) -> Vec { let mut inner = self.inner.lock(); - for (_, ep) in inner.endpoints_mut() { - msgs.extend(ep.stayin_alive()); - } - msgs + inner + .node_states_mut() + .flat_map(|(_idx, node_state)| node_state.stayin_alive()) + .collect() } /// Get the [`EndpointInfo`]s for each endpoint - pub fn endpoint_infos(&self, now: Instant) -> Vec { - self.inner.lock().endpoint_infos(now) + pub fn node_infos(&self, now: Instant) -> Vec { + self.inner.lock().node_infos(now) } /// Returns a stream of [`ConnectionType`]. @@ -226,8 +226,8 @@ impl NodeMap { } /// Get the [`EndpointInfo`]s for each endpoint - pub fn endpoint_info(&self, public_key: &PublicKey) -> Option { - self.inner.lock().endpoint_info(public_key) + pub fn node_info(&self, public_key: &PublicKey) -> Option { + self.inner.lock().node_info(public_key) } /// Saves the known node info to the given path, returning the number of nodes persisted. @@ -318,40 +318,44 @@ impl NodeMapInner { fn add_node_addr(&mut self, node_addr: NodeAddr) { let NodeAddr { node_id, info } = node_addr; - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(&node_id), || Options { - public_key: node_id, + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&node_id), || Options { + node_id, relay_url: info.relay_url.clone(), active: false, }); - endpoint.update_from_node_addr(&info); - let id = endpoint.id(); - for endpoint in &info.direct_addresses { - self.set_endpoint_for_ip_port(*endpoint, id); + node_state.update_from_node_addr(&info); + let id = node_state.id(); + for addr in &info.direct_addresses { + self.set_node_state_for_ip_port(*addr, id); } } - fn get_id(&self, id: EndpointId) -> Option { + fn get_id(&self, id: NodeStateKey) -> Option { match id { - EndpointId::Id(id) => Some(*id), - EndpointId::NodeKey(node_key) => self.by_node_key.get(node_key).copied(), - EndpointId::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(), - EndpointId::IpPort(ipp) => self.by_ip_port.get(ipp).copied(), + NodeStateKey::Idx(id) => Some(*id), + NodeStateKey::NodeId(node_key) => self.by_node_key.get(node_key).copied(), + NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(), + NodeStateKey::IpPort(ipp) => self.by_ip_port.get(ipp).copied(), } } - fn get_mut(&mut self, id: EndpointId) -> Option<&mut Endpoint> { + fn get_mut(&mut self, id: NodeStateKey) -> Option<&mut NodeState> { self.get_id(id).and_then(|id| self.by_id.get_mut(&id)) } - fn get(&self, id: EndpointId) -> Option<&Endpoint> { + fn get(&self, id: NodeStateKey) -> Option<&NodeState> { self.get_id(id).and_then(|id| self.by_id.get(&id)) } - fn get_or_insert_with(&mut self, id: EndpointId, f: impl FnOnce() -> Options) -> &mut Endpoint { + fn get_or_insert_with( + &mut self, + id: NodeStateKey, + f: impl FnOnce() -> Options, + ) -> &mut NodeState { let id = self.get_id(id); match id { - None => self.insert_endpoint(f()), + None => self.insert_node(f()), Some(id) => self.by_id.get_mut(&id).expect("is not empty"), } } @@ -361,47 +365,47 @@ impl NodeMapInner { self.by_id.len() } - /// Marks the node we believe to be at `ipp` as recently used, returning the [`Endpoint`] if found. - fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { + /// Marks the node we believe to be at `ipp` as recently used. + fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> { let ip_port: IpPort = udp_addr.into(); - let Some(endpoint) = self.get_mut(EndpointId::IpPort(&ip_port)) else { - info!(src=%udp_addr, "receive_udp: no node_map state found for addr, ignore"); + let Some(node_state) = self.get_mut(NodeStateKey::IpPort(&ip_port)) else { + info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore"); return None; }; - endpoint.receive_udp(ip_port, Instant::now()); - Some((*endpoint.public_key(), *endpoint.quic_mapped_addr())) + node_state.receive_udp(ip_port, Instant::now()); + Some((*node_state.public_key(), *node_state.quic_mapped_addr())) } #[instrument(skip_all, fields(src = %src.fmt_short()))] fn receive_relay(&mut self, relay_url: &RelayUrl, src: &PublicKey) -> QuicMappedAddr { - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(src), || { + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || { trace!("packets from unknown node, insert into node map"); Options { - public_key: *src, + node_id: *src, relay_url: Some(relay_url.clone()), active: true, } }); - endpoint.receive_relay(relay_url, src, Instant::now()); - *endpoint.quic_mapped_addr() + node_state.receive_relay(relay_url, src, Instant::now()); + *node_state.quic_mapped_addr() } - fn endpoints(&self) -> impl Iterator { + fn node_states(&self) -> impl Iterator { self.by_id.iter() } - fn endpoints_mut(&mut self) -> impl Iterator { + fn node_states_mut(&mut self) -> impl Iterator { self.by_id.iter_mut() } /// Get the [`EndpointInfo`]s for each endpoint - fn endpoint_infos(&self, now: Instant) -> Vec { - self.endpoints().map(|(_, ep)| ep.info(now)).collect() + fn node_infos(&self, now: Instant) -> Vec { + self.node_states().map(|(_, ep)| ep.info(now)).collect() } /// Get the [`EndpointInfo`]s for each endpoint - fn endpoint_info(&self, public_key: &PublicKey) -> Option { - self.get(EndpointId::NodeKey(public_key)) + fn node_info(&self, public_key: &PublicKey) -> Option { + self.get(NodeStateKey::NodeId(public_key)) .map(|ep| ep.info(Instant::now())) } @@ -415,18 +419,18 @@ impl NodeMapInner { /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result { - match self.get(EndpointId::NodeKey(public_key)) { + match self.get(NodeStateKey::NodeId(public_key)) { Some(ep) => Ok(ConnectionTypeStream { - initial: Some(ep.conn_type.get()), - inner: ep.conn_type.watch().into_stream(), + initial: Some(ep.conn_type()), + inner: ep.conn_type_stream(), }), None => anyhow::bail!("No endpoint for {public_key:?} found"), } } fn handle_pong(&mut self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { - if let Some(ep) = self.get_mut(EndpointId::NodeKey(&sender)).as_mut() { - let insert = ep.handle_pong(&pong, src.into()); + if let Some(ns) = self.get_mut(NodeStateKey::NodeId(&sender)).as_mut() { + let insert = ns.handle_pong(&pong, src.into()); if let Some((src, key)) = insert { self.set_node_key_for_ip_port(src, &key); } @@ -438,23 +442,23 @@ impl NodeMapInner { #[must_use = "actions must be handled"] fn handle_call_me_maybe(&mut self, sender: PublicKey, cm: CallMeMaybe) -> Vec { - let ep_id = EndpointId::NodeKey(&sender); - if let Some(id) = self.get_id(ep_id.clone()) { + let ns_id = NodeStateKey::NodeId(&sender); + if let Some(id) = self.get_id(ns_id.clone()) { for number in &cm.my_numbers { // ensure the new addrs are known - self.set_endpoint_for_ip_port(*number, id); + self.set_node_state_for_ip_port(*number, id); } } - match self.get_mut(ep_id) { + match self.get_mut(ns_id) { None => { inc!(MagicsockMetrics, recv_disco_call_me_maybe_bad_disco); debug!("received call-me-maybe: ignore, node is unknown"); vec![] } - Some(ep) => { + Some(ns) => { debug!(endpoints = ?cm.my_numbers, "received call-me-maybe"); - ep.handle_call_me_maybe(cm) + ns.handle_call_me_maybe(cm) } } } @@ -465,40 +469,41 @@ impl NodeMapInner { src: SendAddr, tx_id: TransactionId, ) -> PingHandled { - let endpoint = self.get_or_insert_with(EndpointId::NodeKey(&sender), || { + let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&sender), || { debug!("received ping: node unknown, add to node map"); Options { - public_key: sender, + node_id: sender, relay_url: src.relay_url(), active: true, } }); - let handled = endpoint.handle_ping(src.clone(), tx_id); + let handled = node_state.handle_ping(src.clone(), tx_id); if let SendAddr::Udp(ref addr) = src { - if matches!(handled.role, PingRole::NewEndpoint) { + if matches!(handled.role, PingRole::NewPath) { self.set_node_key_for_ip_port(*addr, &sender); } } handled } - /// Inserts a new endpoint into the [`NodeMap`]. - fn insert_endpoint(&mut self, options: Options) -> &mut Endpoint { + /// Inserts a new node into the [`NodeMap`]. + fn insert_node(&mut self, options: Options) -> &mut NodeState { info!( - node = %options.public_key.fmt_short(), + node = %options.node_id.fmt_short(), relay_url = ?options.relay_url, - "inserting new node endpoint in NodeMap", + "inserting new node in NodeMap", ); let id = self.next_id; self.next_id = self.next_id.wrapping_add(1); - let ep = Endpoint::new(id, options); + let node_state = NodeState::new(id, options); // update indices - self.by_quic_mapped_addr.insert(*ep.quic_mapped_addr(), id); - self.by_node_key.insert(*ep.public_key(), id); + self.by_quic_mapped_addr + .insert(*node_state.quic_mapped_addr(), id); + self.by_node_key.insert(*node_state.public_key(), id); - self.by_id.insert(id, ep); + self.by_id.insert(id, node_state); self.by_id.get_mut(&id).expect("just inserted") } @@ -521,7 +526,7 @@ impl NodeMapInner { } } - fn set_endpoint_for_ip_port(&mut self, ipp: impl Into, id: usize) { + fn set_node_state_for_ip_port(&mut self, ipp: impl Into, id: usize) { let ipp = ipp.into(); trace!(?ipp, ?id, "set endpoint for ip:port"); self.by_ip_port.insert(ipp, id); @@ -629,7 +634,7 @@ impl IpPort { #[cfg(test)] mod tests { - use super::endpoint::MAX_INACTIVE_DIRECT_ADDRESSES; + use super::node_state::MAX_INACTIVE_DIRECT_ADDRESSES; use super::*; use crate::{key::SecretKey, magic_endpoint::AddrInfo}; use std::net::Ipv4Addr; @@ -697,8 +702,8 @@ mod tests { let id = node_map .inner .lock() - .insert_endpoint(Options { - public_key, + .insert_node(Options { + node_id: public_key, relay_url: None, active: false, }) @@ -778,7 +783,7 @@ mod tests { node_map .inner .lock() - .get(EndpointId::NodeKey(&active_node)) + .get(NodeStateKey::NodeId(&active_node)) .expect("should not be pruned"); } } diff --git a/iroh-net/src/magicsock/node_map/endpoint.rs b/iroh-net/src/magicsock/node_map/node_state.rs similarity index 96% rename from iroh-net/src/magicsock/node_map/endpoint.rs rename to iroh-net/src/magicsock/node_map/node_state.rs index 391352bf8a..3835ddd30c 100644 --- a/iroh-net/src/magicsock/node_map/endpoint.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -10,7 +10,7 @@ use rand::seq::IteratorRandom; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tracing::{debug, info, instrument, trace, warn}; -use watchable::Watchable; +use watchable::{Watchable, WatcherStream}; use crate::{ disco::{self, SendAddr}, @@ -91,24 +91,18 @@ pub struct PingHandled { #[derive(Debug)] pub enum PingRole { Duplicate, - // TODO: Clean up this naming, this is a new path to an endpoint. - NewEndpoint, + NewPath, LikelyHeartbeat, Reactivate, } -/// An endpoint, think [`MagicEndpoint`], which we can have connections with. -/// -/// Each endpoint is also known as a "Node" in the "(iroh) network", but this is a bit of a -/// looser term. +/// An iroh node, which we can have connections with. /// /// The whole point of the magicsock is that we can have multiple **paths** to a particular -/// endpoint. One of these paths is via the endpoint's home relay node but as we establish a +/// node. One of these paths is via the endpoint's home relay node but as we establish a /// connection we'll hopefully discover more direct paths. -/// -/// [`MagicEndpoint`]: crate::MagicEndpoint #[derive(Debug)] -pub(super) struct Endpoint { +pub(super) struct NodeState { /// The ID used as index in the [`NodeMap`]. /// /// [`NodeMap`]: super::NodeMap @@ -143,18 +137,18 @@ pub(super) struct Endpoint { /// call-me-maybe messages as backup. last_call_me_maybe: Option, /// The type of connection we have to the node, either direct, relay, mixed, or none. - pub conn_type: Watchable, + conn_type: Watchable, } #[derive(Debug)] pub(super) struct Options { - pub(super) public_key: PublicKey, + pub(super) node_id: NodeId, pub(super) relay_url: Option, /// Is this endpoint currently active (sending data)? pub(super) active: bool, } -impl Endpoint { +impl NodeState { pub(super) fn new(id: usize, options: Options) -> Self { let quic_mapped_addr = QuicMappedAddr::generate(); @@ -163,10 +157,10 @@ impl Endpoint { inc!(MagicsockMetrics, num_relay_conns_added); } - Endpoint { + NodeState { id, quic_mapped_addr, - node_id: options.public_key, + node_id: options.node_id, last_full_ping: None, relay_url: options.relay_url.map(|url| (url, PathState::default())), best_addr: Default::default(), @@ -190,8 +184,16 @@ impl Endpoint { self.id } + pub(super) fn conn_type(&self) -> ConnectionType { + self.conn_type.get() + } + + pub(super) fn conn_type_stream(&self) -> WatcherStream { + self.conn_type.watch().into_stream() + } + /// Returns info about this endpoint - pub(super) fn info(&self, now: Instant) -> EndpointInfo { + pub(super) fn info(&self, now: Instant) -> NodeInfo { let conn_type = self.conn_type.get(); let latency = match conn_type { ConnectionType::Direct(addr) => self @@ -231,7 +233,7 @@ impl Endpoint { }) .collect(); - EndpointInfo { + NodeInfo { id: self.id, node_id: self.node_id, relay_url: self.relay_url(), @@ -406,8 +408,8 @@ impl Endpoint { debug!(tx = %hex::encode(txid), addr = %sp.to, "pong not received in timeout"); match sp.to { SendAddr::Udp(addr) => { - if let Some(ep_state) = self.direct_addr_state.get_mut(&addr.into()) { - ep_state.last_ping = None; + if let Some(path_state) = self.direct_addr_state.get_mut(&addr.into()) { + path_state.last_ping = None; } // If we fail to ping our current best addr, it is not that good anymore. @@ -592,7 +594,7 @@ impl Endpoint { debug!( %ping_dsts, dst = %self.node_id.fmt_short(), - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "sending pings to endpoint", ); self.last_full_ping.replace(now); @@ -627,7 +629,7 @@ impl Endpoint { //TODOFRZ self.direct_addr_state.entry(addr.into()).or_default(); } - let paths = summarize_endpoint_paths(&self.direct_addr_state); + let paths = summarize_node_paths(&self.direct_addr_state); debug!(new = ?n.direct_addresses , %paths, "added new direct paths for endpoint"); } @@ -665,7 +667,7 @@ impl Endpoint { Entry::Vacant(vacant) => { info!(%addr, "new direct addr for node"); vacant.insert(PathState::with_ping(tx_id, now)); - PingRole::NewEndpoint + PingRole::NewPath } }, SendAddr::Relay(ref url) => { @@ -675,19 +677,19 @@ impl Endpoint { // node. In both cases, trust the new confirmed url info!(%url, "new relay addr for node"); self.relay_url = Some((url.clone(), PathState::with_ping(tx_id, now))); - PingRole::NewEndpoint + PingRole::NewPath } Some((_home_url, state)) => state.handle_ping(tx_id, now), None => { info!(%url, "new relay addr for node"); self.relay_url = Some((url.clone(), PathState::with_ping(tx_id, now))); - PingRole::NewEndpoint + PingRole::NewPath } } } }; - if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewEndpoint) { + if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewPath) { self.prune_direct_addresses(); } @@ -710,7 +712,7 @@ impl Endpoint { debug!( ?role, needs_ping_back = ?needs_ping_back.is_some(), - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "endpoint handled ping", ); PingHandled { @@ -741,7 +743,7 @@ impl Endpoint { if prune_count == 0 { // nothing to do, within limits debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "prune addresses: {prune_count} pruned", ); return; @@ -766,7 +768,7 @@ impl Endpoint { ); } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "prune addresses: {prune_count} pruned", ); } @@ -841,7 +843,7 @@ impl Endpoint { } } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "handled pong", ); } @@ -938,7 +940,7 @@ impl Endpoint { } } debug!( - paths = %summarize_endpoint_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.direct_addr_state), "updated endpoint paths from call-me-maybe", ); self.send_pings(now) @@ -1278,7 +1280,7 @@ impl PathState { } // TODO: Make an `EndpointPaths` struct and do things nicely. -fn summarize_endpoint_paths(paths: &BTreeMap) -> String { +fn summarize_node_paths(paths: &BTreeMap) -> String { use std::fmt::Write; let mut w = String::new(); @@ -1360,9 +1362,9 @@ pub struct DirectAddrInfo { pub last_payload: Option, } -/// Details about an Endpoint. +/// Details about an iroh node which is known to this node. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct EndpointInfo { +pub struct NodeInfo { /// The id in the node_map pub id: usize, /// The public key of the endpoint. @@ -1380,7 +1382,7 @@ pub struct EndpointInfo { pub last_used: Option, } -impl EndpointInfo { +impl NodeInfo { /// Get the duration since the last activity we received from this endpoint /// on any of its direct addresses. pub fn last_received(&self) -> Option { @@ -1449,7 +1451,7 @@ mod tests { )]); let key = SecretKey::generate(); ( - Endpoint { + NodeState { id: 0, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1480,7 +1482,7 @@ mod tests { pong_src: pong_src.clone(), }); let key = SecretKey::generate(); - Endpoint { + NodeState { id: 1, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1500,7 +1502,7 @@ mod tests { // let socket_addr = "0.0.0.0:8".parse().unwrap(); let endpoint_state = BTreeMap::new(); let key = SecretKey::generate(); - Endpoint { + NodeState { id: 2, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1536,7 +1538,7 @@ mod tests { }); let key = SecretKey::generate(); ( - Endpoint { + NodeState { id: 3, quic_mapped_addr: QuicMappedAddr::generate(), node_id: key.public(), @@ -1561,7 +1563,7 @@ mod tests { ) }; let expect = Vec::from([ - EndpointInfo { + NodeInfo { id: a_endpoint.id, node_id: a_endpoint.node_id, relay_url: a_endpoint.relay_url(), @@ -1575,7 +1577,7 @@ mod tests { latency: Some(latency), last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: b_endpoint.id, node_id: b_endpoint.node_id, relay_url: b_endpoint.relay_url(), @@ -1584,7 +1586,7 @@ mod tests { latency: Some(latency), last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: c_endpoint.id, node_id: c_endpoint.node_id, relay_url: c_endpoint.relay_url(), @@ -1593,7 +1595,7 @@ mod tests { latency: None, last_used: Some(elapsed), }, - EndpointInfo { + NodeInfo { id: d_endpoint.id, node_id: d_endpoint.node_id, relay_url: d_endpoint.relay_url(), @@ -1634,7 +1636,7 @@ mod tests { ]), next_id: 5, }); - let mut got = node_map.endpoint_infos(later); + let mut got = node_map.node_infos(later); got.sort_by_key(|p| p.id); assert_eq!(expect, got); } @@ -1646,11 +1648,11 @@ mod tests { let key = SecretKey::generate(); let opts = Options { - public_key: key.public(), + node_id: key.public(), relay_url: None, active: true, }; - let mut ep = Endpoint::new(0, opts); + let mut ep = NodeState::new(0, opts); let my_numbers_count: u16 = (MAX_INACTIVE_DIRECT_ADDRESSES + 5).try_into().unwrap(); let my_numbers = (0u16..my_numbers_count)