diff --git a/iroh-blobs/src/downloader.rs b/iroh-blobs/src/downloader.rs index 2f8e3bc7db..7d0eedd10b 100644 --- a/iroh-blobs/src/downloader.rs +++ b/iroh-blobs/src/downloader.rs @@ -79,7 +79,7 @@ pub trait Dialer: Stream)> + Un /// Get the number of dialing nodes. fn pending_count(&self) -> usize; /// Check if a node is being dialed. - fn is_pending(&self, node: &NodeId) -> bool; + fn is_pending(&self, node: NodeId) -> bool; } /// Signals what should be done with the request when it fails. @@ -996,7 +996,7 @@ impl, D: Dialer> Service { } else { best_connected = Some(match best_connected.take() { Some(old) if old.1 <= active_requests => old, - _ => (*node, active_requests), + _ => (node, active_requests), }); } } @@ -1036,7 +1036,7 @@ impl, D: Dialer> Service { // All slots are free: We can dial our candidate. if !at_connections_capacity && !at_dial_capacity { - NextStep::Dial(*node) + NextStep::Dial(node) } // The hash has free dial capacity, but the global connection capacity is reached. // But if we have idle nodes, we will disconnect the longest idling node, and then dial our @@ -1046,7 +1046,7 @@ impl, D: Dialer> Service { && !self.goodbye_nodes_queue.is_empty() { let key = self.goodbye_nodes_queue.peek().expect("just checked"); - NextStep::DialQueuedDisconnect(*node, key) + NextStep::DialQueuedDisconnect(node, key) } // No dial capacity, and no idling nodes: We have to wait until capacity is freed up. else { @@ -1147,13 +1147,13 @@ impl, D: Dialer> Service { } } - fn node_state<'a>(&'a self, node: &NodeId) -> NodeState<'a, D::Connection> { - if let Some(info) = self.connected_nodes.get(node) { + fn node_state(&self, node: NodeId) -> NodeState<'_, D::Connection> { + if let Some(info) = self.connected_nodes.get(&node) { NodeState::Connected(info) } else if self.dialer.is_pending(node) { NodeState::Dialing } else { - match self.retry_node_state.get(node) { + match self.retry_node_state.get(&node) { Some(state) if state.retry_is_queued => NodeState::WaitForRetry, _ => NodeState::Disconnected, } @@ -1221,12 +1221,13 @@ struct ProviderMap { impl ProviderMap { /// Get candidates to download this hash. - pub fn get_candidates(&self, hash: &Hash) -> impl Iterator { + pub fn get_candidates<'a>(&'a self, hash: &Hash) -> impl Iterator + 'a { self.hash_node .get(hash) .map(|nodes| nodes.iter()) .into_iter() .flatten() + .copied() } /// Whether we have any candidates to download this hash. @@ -1420,7 +1421,7 @@ impl Dialer for iroh_net::dialer::Dialer { self.pending_count() } - fn is_pending(&self, node: &NodeId) -> bool { + fn is_pending(&self, node: NodeId) -> bool { self.is_pending(node) } } diff --git a/iroh-blobs/src/downloader/test/dialer.rs b/iroh-blobs/src/downloader/test/dialer.rs index 4d087145fb..d099552a11 100644 --- a/iroh-blobs/src/downloader/test/dialer.rs +++ b/iroh-blobs/src/downloader/test/dialer.rs @@ -52,8 +52,8 @@ impl Dialer for TestingDialer { self.0.read().dialing.len() } - fn is_pending(&self, node: &NodeId) -> bool { - self.0.read().dialing.contains(node) + fn is_pending(&self, node: NodeId) -> bool { + self.0.read().dialing.contains(&node) } } diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 13d5940703..290866fef3 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -436,7 +436,7 @@ impl Actor { match msg { ToActor::ConnIncoming(peer_id, origin, conn) => { self.conns.insert(peer_id, conn.clone()); - self.dialer.abort_dial(&peer_id); + self.dialer.abort_dial(peer_id); let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP); self.conn_send_tx.insert(peer_id, send_tx.clone()); @@ -573,7 +573,7 @@ impl Actor { } self.conn_send_tx.remove(&peer); self.pending_sends.remove(&peer); - self.dialer.abort_dial(&peer); + self.dialer.abort_dial(peer); } OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) { Err(err) => warn!("Failed to decode {data:?} from {node_id}: {err}"), diff --git a/iroh-net/src/dialer.rs b/iroh-net/src/dialer.rs index 0dc21179d2..7a7685d97b 100644 --- a/iroh-net/src/dialer.rs +++ b/iroh-net/src/dialer.rs @@ -1,23 +1,26 @@ -//! A dialer to dial nodes +//! A dialer to conveniently dial many nodes. use std::{collections::HashMap, pin::Pin, task::Poll}; -use crate::{key::PublicKey, Endpoint, NodeAddr, NodeId}; use anyhow::anyhow; -use futures_lite::future::Boxed as BoxFuture; +use futures_lite::Stream; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::error; -/// Dial nodes and maintain a queue of pending dials +use crate::{Endpoint, NodeId}; + +/// Dials nodes and maintains a queue of pending dials. +/// +/// The [`Dialer`] wraps an [`Endpoint`], connects to nodes through the endpoint, stores the +/// pending connect futures and emits finished connect results. /// -/// This wraps a [`Endpoint`], connects to nodes through the endpoint, stores -/// the pending connect futures and emits finished connect results. +/// The [`Dialer`] also implements [`Stream`] to retrieve the dialled connections. #[derive(Debug)] pub struct Dialer { endpoint: Endpoint, - pending: JoinSet<(PublicKey, anyhow::Result)>, - pending_dials: HashMap, + pending: JoinSet<(NodeId, anyhow::Result)>, + pending_dials: HashMap, } impl Dialer { @@ -30,12 +33,15 @@ impl Dialer { } } - /// Start to dial a node. + /// Starts to dial a node by [`NodeId`]. /// - /// Note that the node's addresses and/or relay url must be added to the endpoint's - /// addressbook for a dial to succeed, see [`Endpoint::add_node_addr`]. + /// Since this dials by [`NodeId`] the [`Endpoint`] must know how to contact the node by + /// [`NodeId`] only. This relies on addressing information being provided by either the + /// [discovery service] or manually by calling [`Endpoint::add_node_addr`]. + /// + /// [discovery service]: crate::discovery::Discovery pub fn queue_dial(&mut self, node_id: NodeId, alpn: &'static [u8]) { - if self.is_pending(&node_id) { + if self.is_pending(node_id) { return; } let cancel = CancellationToken::new(); @@ -45,26 +51,26 @@ impl Dialer { let res = tokio::select! { biased; _ = cancel.cancelled() => Err(anyhow!("Cancelled")), - res = endpoint.connect(NodeAddr::new(node_id), alpn) => res + res = endpoint.connect_by_node_id(node_id, alpn) => res }; (node_id, res) }); } - /// Abort a pending dial - pub fn abort_dial(&mut self, node_id: &NodeId) { - if let Some(cancel) = self.pending_dials.remove(node_id) { + /// Aborts a pending dial. + pub fn abort_dial(&mut self, node_id: NodeId) { + if let Some(cancel) = self.pending_dials.remove(&node_id) { cancel.cancel(); } } - /// Check if a node is currently being dialed - pub fn is_pending(&self, node: &NodeId) -> bool { - self.pending_dials.contains_key(node) + /// Checks if a node is currently being dialed. + pub fn is_pending(&self, node: NodeId) -> bool { + self.pending_dials.contains_key(&node) } - /// Wait for the next dial operation to complete - pub async fn next_conn(&mut self) -> (PublicKey, anyhow::Result) { + /// Waits for the next dial operation to complete. + pub async fn next_conn(&mut self) -> (NodeId, anyhow::Result) { match self.pending_dials.is_empty() { false => { let (node_id, res) = loop { @@ -95,8 +101,8 @@ impl Dialer { } } -impl futures_lite::Stream for Dialer { - type Item = (PublicKey, anyhow::Result); +impl Stream for Dialer { + type Item = (NodeId, anyhow::Result); fn poll_next( mut self: Pin<&mut Self>, @@ -115,6 +121,3 @@ impl futures_lite::Stream for Dialer { } } } - -/// Future for a pending dial operation -pub type DialFuture = BoxFuture<(PublicKey, anyhow::Result)>; diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index 076ffa2323..997a184bf1 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -1,4 +1,43 @@ -//! Trait and utils for the node discovery mechanism. +//! Node address discovery. +//! +//! To connect to an iroh-net node a [`NodeAddr`] is needed, which needs to contain either a +//! [`RelayUrl`] or one or more *direct addresses*. However it is often more desirable to +//! be able to connect with only the [`NodeId`], as [`Endpoint::connect_by_node_id`] does. +//! +//! For connecting by [`NodeId`] to work however, the endpoint has to get the addressing +//! information by other means. This can be done by manually calling +//! [`Endpoint::add_node_addr`], but that still requires knowing the other addressing +//! information. +//! +//! Node discovery is an automated system for an [`Endpoint`] to retrieve this addressing +//! information. Each iroh-net node will automatically publish their own addressing +//! information. Usually this means publishing which [`RelayUrl`] to use for their +//! [`NodeId`], but they could also publish direct addresses. +//! +//! The [`Discovery`] trait is used to define node discovery. This allows multiple +//! implementations to co-exist because there are many possible ways to implement this. +//! Each [`Endpoint`] can use the discovery mechanisms most suitable to the application. +//! The [`Builder::discovery`] method is used to add a discovery mechanism to an +//! [`Endpoint`]. +//! +//! Some generally useful discovery implementations are provided: +//! +//! - The [`DnsDiscovery`] which supports publishing to a special DNS server and performs +//! lookups via the standard DNS systems. [Number 0] runs a public instance of this which +//! is globally available and a reliable default choice. +//! +//! - The [`PkarrResolver`] which can perform lookups from designated [pkarr relay servers] +//! using HTTP. +//! +//! To use multiple discovery systems simultaneously use [`ConcurrentDiscovery`] which will +//! perform lookups to all discovery systems at the same time. +//! +//! [`RelayUrl`]: crate::relay::RelayUrl +//! [`Builder::discovery`]: crate::endpoint::Builder::discovery +//! [`DnsDiscovery`]: dns::DnsDiscovery +//! [Number 0]: https://n0.computer +//! [`PkarrResolver`]: pkarr::PkarrResolver +//! [pkarr relay servers]: https://pkarr.org/#servers use std::time::Duration; @@ -21,25 +60,31 @@ const SOURCE_NAME: &str = "discovery"; /// Node discovery for [`super::Endpoint`]. /// -/// The purpose of this trait is to hook up a node discovery mechanism that -/// allows finding information such as the relay URL and direct addresses -/// of a node given its [`NodeId`]. +/// This trait defines publishing and resolving addressing information for a [`NodeId`]. +/// This enables connecting to other nodes with only knowing the [`NodeId`], by using this +/// [`Discovery`] system to look up the actual addressing information. It is common for +/// implementations to require each node to publish their own information before it can be +/// looked up by other nodes. +/// +/// The published addressing information can include both a [`RelayUrl`] and/or direct +/// addresses. /// /// To allow for discovery, the [`super::Endpoint`] will call `publish` whenever /// discovery information changes. If a discovery mechanism requires a periodic /// refresh, it should start its own task. +/// +/// [`RelayUrl`]: crate::relay::RelayUrl pub trait Discovery: std::fmt::Debug + Send + Sync { - /// Publish the given [`AddrInfo`] to the discovery mechanisms. + /// Publishes the given [`AddrInfo`] to the discovery mechanism. /// - /// This is fire and forget, since the magicsock can not wait for successful - /// publishing. If publishing is async, the implementation should start it's - /// own task. + /// This is fire and forget, since the [`Endpoint`] can not wait for successful + /// publishing. If publishing is async, the implementation should start it's own task. /// /// This will be called from a tokio task, so it is safe to spawn new tasks. /// These tasks will be run on the runtime of the [`super::Endpoint`]. fn publish(&self, _info: &AddrInfo) {} - /// Resolve the [`AddrInfo`] for the given [`NodeId`]. + /// Resolves the [`AddrInfo`] for the given [`NodeId`]. /// /// Once the returned [`BoxStream`] is dropped, the service should stop any pending /// work. @@ -77,17 +122,17 @@ pub struct ConcurrentDiscovery { } impl ConcurrentDiscovery { - /// Create a empty [`ConcurrentDiscovery`]. + /// Creates an empty [`ConcurrentDiscovery`]. pub fn empty() -> Self { Self::default() } - /// Create a new [`ConcurrentDiscovery`]. + /// Creates a new [`ConcurrentDiscovery`]. pub fn from_services(services: Vec>) -> Self { Self { services } } - /// Add a [`Discovery`] service. + /// Adds a [`Discovery`] service. pub fn add(&mut self, service: impl Discovery + 'static) { self.services.push(Box::new(service)); } @@ -136,8 +181,8 @@ pub(super) struct DiscoveryTask { } impl DiscoveryTask { - /// Start a discovery task. - pub fn start(ep: Endpoint, node_id: NodeId) -> Result { + /// Starts a discovery task. + pub(super) fn start(ep: Endpoint, node_id: NodeId) -> Result { ensure!(ep.discovery().is_some(), "No discovery services configured"); let (on_first_tx, on_first_rx) = oneshot::channel(); let me = ep.node_id(); @@ -149,7 +194,7 @@ impl DiscoveryTask { Ok(Self { task, on_first_rx }) } - /// Start a discovery task after a delay and only if no path to the node was recently active. + /// Starts a discovery task after a delay and only if no path to the node was recently active. /// /// This returns `None` if we received data or control messages from the remote endpoint /// recently enough. If not it returns a [`DiscoveryTask`]. @@ -157,7 +202,7 @@ impl DiscoveryTask { /// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again /// if we recently received messages from remote endpoint. If true, the task will abort. /// Otherwise, or if no `delay` is set, the discovery will be started. - pub fn maybe_start_after_delay( + pub(super) fn maybe_start_after_delay( ep: &Endpoint, node_id: NodeId, delay: Option, @@ -190,15 +235,15 @@ impl DiscoveryTask { Ok(Some(Self { task, on_first_rx })) } - /// Wait until the discovery task produced at least one result. - pub async fn first_arrived(&mut self) -> Result<()> { + /// Waits until the discovery task produced at least one result. + pub(super) async fn first_arrived(&mut self) -> Result<()> { let fut = &mut self.on_first_rx; fut.await??; Ok(()) } - /// Cancel the discovery task. - pub fn cancel(&self) { + /// Cancels the discovery task. + pub(super) fn cancel(&self) { self.task.abort(); } diff --git a/iroh-net/src/endpoint.rs b/iroh-net/src/endpoint.rs index 8cddbd2513..9f46390581 100644 --- a/iroh-net/src/endpoint.rs +++ b/iroh-net/src/endpoint.rs @@ -483,7 +483,7 @@ impl Endpoint { // Start connecting via quinn. This will time out after 10 seconds if no reachable address // is available. - let conn = self.connect_quinn(&node_id, alpn, addr).await; + let conn = self.connect_quinn(node_id, alpn, addr).await; // Cancel the node discovery task (if still running). if let Some(discovery) = discovery { @@ -501,16 +501,16 @@ impl Endpoint { /// uses the discovery service to establish a connection to a remote node. pub async fn connect_by_node_id( &self, - node_id: &NodeId, + node_id: NodeId, alpn: &[u8], ) -> Result { - let addr = NodeAddr::new(*node_id); + let addr = NodeAddr::new(node_id); self.connect(addr, alpn).await } async fn connect_quinn( &self, - node_id: &PublicKey, + node_id: NodeId, alpn: &[u8], addr: SocketAddr, ) -> Result { @@ -518,7 +518,7 @@ impl Endpoint { let alpn_protocols = vec![alpn.to_vec()]; let tls_client_config = tls::make_client_config( &self.static_config.secret_key, - Some(*node_id), + Some(node_id), alpn_protocols, self.static_config.keylog, )?; @@ -538,8 +538,8 @@ impl Endpoint { let rtt_msg = RttMessage::NewConnection { connection: connection.weak_handle(), - conn_type_changes: self.conn_type_stream(*node_id)?, - node_id: *node_id, + conn_type_changes: self.conn_type_stream(node_id)?, + node_id, }; if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await { // If this actor is dead, that's not great but we can still function. diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 40d4a533c1..02ad0ca0e1 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -190,7 +190,7 @@ impl BlobSearch { // Establish a connection to our node. // We use the default node discovery in iroh, so we can connect by node id without // providing further information. - let conn = self.endpoint.connect_by_node_id(&node_id, ALPN).await?; + let conn = self.endpoint.connect_by_node_id(node_id, ALPN).await?; // Open a bi-directional in our connection. let (mut send, mut recv) = conn.open_bi().await?;