Skip to content

Commit

Permalink
docs(iroh-net): Update discovery and dialing docs, signatures (#2472)
Browse files Browse the repository at this point in the history
## Description

This updates the documentationfor the dialing and discovery modules.
It also fixes up some signatures to be more consistent with the rest
of iroh-net:

- NodeId is used to identify an iroh-net node that is communicated
  with.  PublicKey is only used in contexts where
  encryption/decryption happens

- NodeId is Copy and we pass it by value normally.  Not by reference.

Some more changes:

- DialFuture is entirely unused, probably left over from a refactor.
  Removed.

- Use pub(super) explicitly, helps clarity of intention as well as
  dead-code detection.

## Breaking Changes

The following APIs now take NodeId by value rather than by reference:
- iroh_blobs::downloader::Dialer::is_pending
- iroh_net::dialer::Dialer::abort_dial
- iroh_net::dialer::Dialer::is_pending
- iroh_net::Endpoint::connect_by_node_id

Other changes:
- iroh_blobs::downloader::ProviderMap::get_candidates yields NodeId
rather than &NodeId

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- ~~[ ] Tests if relevant.~~
- [x] All breaking changes documented.

---------

Co-authored-by: Philipp Krüger <[email protected]>
Co-authored-by: Friedel Ziegelmayer <[email protected]>
  • Loading branch information
3 people authored Jul 8, 2024
1 parent 3866b6f commit e53714c
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 67 deletions.
19 changes: 10 additions & 9 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub trait Dialer: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Un
/// Get the number of dialing nodes.
fn pending_count(&self) -> usize;
/// Check if a node is being dialed.
fn is_pending(&self, node: &NodeId) -> bool;
fn is_pending(&self, node: NodeId) -> bool;
}

/// Signals what should be done with the request when it fails.
Expand Down Expand Up @@ -996,7 +996,7 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
} else {
best_connected = Some(match best_connected.take() {
Some(old) if old.1 <= active_requests => old,
_ => (*node, active_requests),
_ => (node, active_requests),
});
}
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {

// All slots are free: We can dial our candidate.
if !at_connections_capacity && !at_dial_capacity {
NextStep::Dial(*node)
NextStep::Dial(node)
}
// The hash has free dial capacity, but the global connection capacity is reached.
// But if we have idle nodes, we will disconnect the longest idling node, and then dial our
Expand All @@ -1046,7 +1046,7 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
&& !self.goodbye_nodes_queue.is_empty()
{
let key = self.goodbye_nodes_queue.peek().expect("just checked");
NextStep::DialQueuedDisconnect(*node, key)
NextStep::DialQueuedDisconnect(node, key)
}
// No dial capacity, and no idling nodes: We have to wait until capacity is freed up.
else {
Expand Down Expand Up @@ -1147,13 +1147,13 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
}
}

fn node_state<'a>(&'a self, node: &NodeId) -> NodeState<'a, D::Connection> {
if let Some(info) = self.connected_nodes.get(node) {
fn node_state(&self, node: NodeId) -> NodeState<'_, D::Connection> {
if let Some(info) = self.connected_nodes.get(&node) {
NodeState::Connected(info)
} else if self.dialer.is_pending(node) {
NodeState::Dialing
} else {
match self.retry_node_state.get(node) {
match self.retry_node_state.get(&node) {
Some(state) if state.retry_is_queued => NodeState::WaitForRetry,
_ => NodeState::Disconnected,
}
Expand Down Expand Up @@ -1221,12 +1221,13 @@ struct ProviderMap {

impl ProviderMap {
/// Get candidates to download this hash.
pub fn get_candidates(&self, hash: &Hash) -> impl Iterator<Item = &NodeId> {
pub fn get_candidates<'a>(&'a self, hash: &Hash) -> impl Iterator<Item = NodeId> + 'a {
self.hash_node
.get(hash)
.map(|nodes| nodes.iter())
.into_iter()
.flatten()
.copied()
}

/// Whether we have any candidates to download this hash.
Expand Down Expand Up @@ -1420,7 +1421,7 @@ impl Dialer for iroh_net::dialer::Dialer {
self.pending_count()
}

fn is_pending(&self, node: &NodeId) -> bool {
fn is_pending(&self, node: NodeId) -> bool {
self.is_pending(node)
}
}
4 changes: 2 additions & 2 deletions iroh-blobs/src/downloader/test/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl Dialer for TestingDialer {
self.0.read().dialing.len()
}

fn is_pending(&self, node: &NodeId) -> bool {
self.0.read().dialing.contains(node)
fn is_pending(&self, node: NodeId) -> bool {
self.0.read().dialing.contains(&node)
}
}

Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl Actor {
match msg {
ToActor::ConnIncoming(peer_id, origin, conn) => {
self.conns.insert(peer_id, conn.clone());
self.dialer.abort_dial(&peer_id);
self.dialer.abort_dial(peer_id);
let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
self.conn_send_tx.insert(peer_id, send_tx.clone());

Expand Down Expand Up @@ -573,7 +573,7 @@ impl Actor {
}
self.conn_send_tx.remove(&peer);
self.pending_sends.remove(&peer);
self.dialer.abort_dial(&peer);
self.dialer.abort_dial(peer);
}
OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) {
Err(err) => warn!("Failed to decode {data:?} from {node_id}: {err}"),
Expand Down
55 changes: 29 additions & 26 deletions iroh-net/src/dialer.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
//! A dialer to dial nodes
//! A dialer to conveniently dial many nodes.
use std::{collections::HashMap, pin::Pin, task::Poll};

use crate::{key::PublicKey, Endpoint, NodeAddr, NodeId};
use anyhow::anyhow;
use futures_lite::future::Boxed as BoxFuture;
use futures_lite::Stream;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::error;

/// Dial nodes and maintain a queue of pending dials
use crate::{Endpoint, NodeId};

/// Dials nodes and maintains a queue of pending dials.
///
/// The [`Dialer`] wraps an [`Endpoint`], connects to nodes through the endpoint, stores the
/// pending connect futures and emits finished connect results.
///
/// This wraps a [`Endpoint`], connects to nodes through the endpoint, stores
/// the pending connect futures and emits finished connect results.
/// The [`Dialer`] also implements [`Stream`] to retrieve the dialled connections.
#[derive(Debug)]
pub struct Dialer {
endpoint: Endpoint,
pending: JoinSet<(PublicKey, anyhow::Result<quinn::Connection>)>,
pending_dials: HashMap<PublicKey, CancellationToken>,
pending: JoinSet<(NodeId, anyhow::Result<quinn::Connection>)>,
pending_dials: HashMap<NodeId, CancellationToken>,
}

impl Dialer {
Expand All @@ -30,12 +33,15 @@ impl Dialer {
}
}

/// Start to dial a node.
/// Starts to dial a node by [`NodeId`].
///
/// Note that the node's addresses and/or relay url must be added to the endpoint's
/// addressbook for a dial to succeed, see [`Endpoint::add_node_addr`].
/// Since this dials by [`NodeId`] the [`Endpoint`] must know how to contact the node by
/// [`NodeId`] only. This relies on addressing information being provided by either the
/// [discovery service] or manually by calling [`Endpoint::add_node_addr`].
///
/// [discovery service]: crate::discovery::Discovery
pub fn queue_dial(&mut self, node_id: NodeId, alpn: &'static [u8]) {
if self.is_pending(&node_id) {
if self.is_pending(node_id) {
return;
}
let cancel = CancellationToken::new();
Expand All @@ -45,26 +51,26 @@ impl Dialer {
let res = tokio::select! {
biased;
_ = cancel.cancelled() => Err(anyhow!("Cancelled")),
res = endpoint.connect(NodeAddr::new(node_id), alpn) => res
res = endpoint.connect_by_node_id(node_id, alpn) => res
};
(node_id, res)
});
}

/// Abort a pending dial
pub fn abort_dial(&mut self, node_id: &NodeId) {
if let Some(cancel) = self.pending_dials.remove(node_id) {
/// Aborts a pending dial.
pub fn abort_dial(&mut self, node_id: NodeId) {
if let Some(cancel) = self.pending_dials.remove(&node_id) {
cancel.cancel();
}
}

/// Check if a node is currently being dialed
pub fn is_pending(&self, node: &NodeId) -> bool {
self.pending_dials.contains_key(node)
/// Checks if a node is currently being dialed.
pub fn is_pending(&self, node: NodeId) -> bool {
self.pending_dials.contains_key(&node)
}

/// Wait for the next dial operation to complete
pub async fn next_conn(&mut self) -> (PublicKey, anyhow::Result<quinn::Connection>) {
/// Waits for the next dial operation to complete.
pub async fn next_conn(&mut self) -> (NodeId, anyhow::Result<quinn::Connection>) {
match self.pending_dials.is_empty() {
false => {
let (node_id, res) = loop {
Expand Down Expand Up @@ -95,8 +101,8 @@ impl Dialer {
}
}

impl futures_lite::Stream for Dialer {
type Item = (PublicKey, anyhow::Result<quinn::Connection>);
impl Stream for Dialer {
type Item = (NodeId, anyhow::Result<quinn::Connection>);

fn poll_next(
mut self: Pin<&mut Self>,
Expand All @@ -115,6 +121,3 @@ impl futures_lite::Stream for Dialer {
}
}
}

/// Future for a pending dial operation
pub type DialFuture = BoxFuture<(PublicKey, anyhow::Result<quinn::Connection>)>;
85 changes: 65 additions & 20 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,43 @@
//! Trait and utils for the node discovery mechanism.
//! Node address discovery.
//!
//! To connect to an iroh-net node a [`NodeAddr`] is needed, which needs to contain either a
//! [`RelayUrl`] or one or more *direct addresses*. However it is often more desirable to
//! be able to connect with only the [`NodeId`], as [`Endpoint::connect_by_node_id`] does.
//!
//! For connecting by [`NodeId`] to work however, the endpoint has to get the addressing
//! information by other means. This can be done by manually calling
//! [`Endpoint::add_node_addr`], but that still requires knowing the other addressing
//! information.
//!
//! Node discovery is an automated system for an [`Endpoint`] to retrieve this addressing
//! information. Each iroh-net node will automatically publish their own addressing
//! information. Usually this means publishing which [`RelayUrl`] to use for their
//! [`NodeId`], but they could also publish direct addresses.
//!
//! The [`Discovery`] trait is used to define node discovery. This allows multiple
//! implementations to co-exist because there are many possible ways to implement this.
//! Each [`Endpoint`] can use the discovery mechanisms most suitable to the application.
//! The [`Builder::discovery`] method is used to add a discovery mechanism to an
//! [`Endpoint`].
//!
//! Some generally useful discovery implementations are provided:
//!
//! - The [`DnsDiscovery`] which supports publishing to a special DNS server and performs
//! lookups via the standard DNS systems. [Number 0] runs a public instance of this which
//! is globally available and a reliable default choice.
//!
//! - The [`PkarrResolver`] which can perform lookups from designated [pkarr relay servers]
//! using HTTP.
//!
//! To use multiple discovery systems simultaneously use [`ConcurrentDiscovery`] which will
//! perform lookups to all discovery systems at the same time.
//!
//! [`RelayUrl`]: crate::relay::RelayUrl
//! [`Builder::discovery`]: crate::endpoint::Builder::discovery
//! [`DnsDiscovery`]: dns::DnsDiscovery
//! [Number 0]: https://n0.computer
//! [`PkarrResolver`]: pkarr::PkarrResolver
//! [pkarr relay servers]: https://pkarr.org/#servers
use std::time::Duration;

Expand All @@ -21,25 +60,31 @@ const SOURCE_NAME: &str = "discovery";

/// Node discovery for [`super::Endpoint`].
///
/// The purpose of this trait is to hook up a node discovery mechanism that
/// allows finding information such as the relay URL and direct addresses
/// of a node given its [`NodeId`].
/// This trait defines publishing and resolving addressing information for a [`NodeId`].
/// This enables connecting to other nodes with only knowing the [`NodeId`], by using this
/// [`Discovery`] system to look up the actual addressing information. It is common for
/// implementations to require each node to publish their own information before it can be
/// looked up by other nodes.
///
/// The published addressing information can include both a [`RelayUrl`] and/or direct
/// addresses.
///
/// To allow for discovery, the [`super::Endpoint`] will call `publish` whenever
/// discovery information changes. If a discovery mechanism requires a periodic
/// refresh, it should start its own task.
///
/// [`RelayUrl`]: crate::relay::RelayUrl
pub trait Discovery: std::fmt::Debug + Send + Sync {
/// Publish the given [`AddrInfo`] to the discovery mechanisms.
/// Publishes the given [`AddrInfo`] to the discovery mechanism.
///
/// This is fire and forget, since the magicsock can not wait for successful
/// publishing. If publishing is async, the implementation should start it's
/// own task.
/// This is fire and forget, since the [`Endpoint`] can not wait for successful
/// publishing. If publishing is async, the implementation should start it's own task.
///
/// This will be called from a tokio task, so it is safe to spawn new tasks.
/// These tasks will be run on the runtime of the [`super::Endpoint`].
fn publish(&self, _info: &AddrInfo) {}

/// Resolve the [`AddrInfo`] for the given [`NodeId`].
/// Resolves the [`AddrInfo`] for the given [`NodeId`].
///
/// Once the returned [`BoxStream`] is dropped, the service should stop any pending
/// work.
Expand Down Expand Up @@ -77,17 +122,17 @@ pub struct ConcurrentDiscovery {
}

impl ConcurrentDiscovery {
/// Create a empty [`ConcurrentDiscovery`].
/// Creates an empty [`ConcurrentDiscovery`].
pub fn empty() -> Self {
Self::default()
}

/// Create a new [`ConcurrentDiscovery`].
/// Creates a new [`ConcurrentDiscovery`].
pub fn from_services(services: Vec<Box<dyn Discovery>>) -> Self {
Self { services }
}

/// Add a [`Discovery`] service.
/// Adds a [`Discovery`] service.
pub fn add(&mut self, service: impl Discovery + 'static) {
self.services.push(Box::new(service));
}
Expand Down Expand Up @@ -136,8 +181,8 @@ pub(super) struct DiscoveryTask {
}

impl DiscoveryTask {
/// Start a discovery task.
pub fn start(ep: Endpoint, node_id: NodeId) -> Result<Self> {
/// Starts a discovery task.
pub(super) fn start(ep: Endpoint, node_id: NodeId) -> Result<Self> {
ensure!(ep.discovery().is_some(), "No discovery services configured");
let (on_first_tx, on_first_rx) = oneshot::channel();
let me = ep.node_id();
Expand All @@ -149,15 +194,15 @@ impl DiscoveryTask {
Ok(Self { task, on_first_rx })
}

/// Start a discovery task after a delay and only if no path to the node was recently active.
/// Starts a discovery task after a delay and only if no path to the node was recently active.
///
/// This returns `None` if we received data or control messages from the remote endpoint
/// recently enough. If not it returns a [`DiscoveryTask`].
///
/// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again
/// if we recently received messages from remote endpoint. If true, the task will abort.
/// Otherwise, or if no `delay` is set, the discovery will be started.
pub fn maybe_start_after_delay(
pub(super) fn maybe_start_after_delay(
ep: &Endpoint,
node_id: NodeId,
delay: Option<Duration>,
Expand Down Expand Up @@ -190,15 +235,15 @@ impl DiscoveryTask {
Ok(Some(Self { task, on_first_rx }))
}

/// Wait until the discovery task produced at least one result.
pub async fn first_arrived(&mut self) -> Result<()> {
/// Waits until the discovery task produced at least one result.
pub(super) async fn first_arrived(&mut self) -> Result<()> {
let fut = &mut self.on_first_rx;
fut.await??;
Ok(())
}

/// Cancel the discovery task.
pub fn cancel(&self) {
/// Cancels the discovery task.
pub(super) fn cancel(&self) {
self.task.abort();
}

Expand Down
Loading

0 comments on commit e53714c

Please sign in to comment.