diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index 6b06799f4c..ffce2ce6f4 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -1,12 +1,12 @@ //! Trait and utils for the node discovery mechanism. -use std::time::Duration; +use std::{collections::BTreeMap, time::Duration}; use anyhow::{anyhow, ensure, Result}; use futures_lite::stream::{Boxed as BoxStream, StreamExt}; use iroh_base::node_addr::NodeAddr; use tokio::{sync::oneshot, task::JoinHandle}; -use tracing::{debug, error_span, warn, Instrument}; +use tracing::{debug, error_span, trace, warn, Instrument}; use crate::{AddrInfo, MagicEndpoint, NodeId}; @@ -123,26 +123,13 @@ impl Discovery for ConcurrentDiscovery { const MAX_AGE: Duration = Duration::from_secs(10); /// A wrapper around a tokio task which runs a node discovery. +#[derive(derive_more::Debug)] pub(super) struct DiscoveryTask { - on_first_rx: oneshot::Receiver>, task: JoinHandle<()>, } impl DiscoveryTask { - /// Start a discovery task. - pub fn start(ep: MagicEndpoint, node_id: NodeId) -> Result { - ensure!(ep.discovery().is_some(), "No discovery services configured"); - let (on_first_tx, on_first_rx) = oneshot::channel(); - let me = ep.node_id(); - let task = tokio::task::spawn( - async move { Self::run(ep, node_id, on_first_tx).await }.instrument( - error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()), - ), - ); - Ok(Self { task, on_first_rx }) - } - - /// Start a discovery task after a delay and only if no path to the node was recently active. + /// Start a discovery task after a delay /// /// This returns `None` if we received data or control messages from the remote endpoint /// recently enough. If not it returns a [`DiscoveryTask`]. @@ -150,17 +137,12 @@ impl DiscoveryTask { /// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again /// if we recently received messages from remote endpoint. If true, the task will abort. /// Otherwise, or if no `delay` is set, the discovery will be started. - pub fn maybe_start_after_delay( + pub fn start_after_delay( ep: &MagicEndpoint, node_id: NodeId, delay: Option, - ) -> Result> { - // If discovery is not needed, don't even spawn a task. - if !Self::needs_discovery(ep, node_id) { - return Ok(None); - } - ensure!(ep.discovery().is_some(), "No discovery services configured"); - let (on_first_tx, on_first_rx) = oneshot::channel(); + on_first_tx: Option>>, + ) -> Option { let ep = ep.clone(); let me = ep.node_id(); let task = tokio::task::spawn( @@ -170,7 +152,7 @@ impl DiscoveryTask { tokio::time::sleep(delay).await; if !Self::needs_discovery(&ep, node_id) { debug!("no discovery needed, abort"); - on_first_tx.send(Ok(())).ok(); + on_first_tx.map(|tx| tx.send(Ok(())).ok()); return; } } @@ -180,14 +162,7 @@ impl DiscoveryTask { error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()), ), ); - Ok(Some(Self { task, on_first_rx })) - } - - /// Wait until the discovery task produced at least one result. - pub async fn first_arrived(&mut self) -> Result<()> { - let fut = &mut self.on_first_rx; - fut.await??; - Ok(()) + Some(Self { task }) } /// Cancel the discovery task. @@ -229,15 +204,18 @@ impl DiscoveryTask { } } - async fn run(ep: MagicEndpoint, node_id: NodeId, on_first_tx: oneshot::Sender>) { + async fn run( + ep: MagicEndpoint, + node_id: NodeId, + mut on_first_tx: Option>>, + ) { let mut stream = match Self::create_stream(&ep, node_id) { Ok(stream) => stream, Err(err) => { - on_first_tx.send(Err(err)).ok(); + on_first_tx.map(|s| s.send(Err(err)).ok()); return; } }; - let mut on_first_tx = Some(on_first_tx); debug!("discovery: start"); loop { let next = tokio::select! { @@ -280,6 +258,128 @@ impl Drop for DiscoveryTask { } } +use flume::{Receiver, Sender}; +use std::sync::Arc; + +/// Responsible for starting and cancelling Discovery requests from +/// the magicsock. +#[derive(derive_more::Debug, Clone)] +pub(super) struct DiscoveryTasks { + handle: Arc>, + sender: Sender, +} + +impl Drop for DiscoveryTasks { + fn drop(&mut self) { + self.handle.abort(); + } +} + +pub(super) type DiscoveryTasksChans = + (Sender, Receiver); + +impl DiscoveryTasks { + /// Create a new `DiscoveryTasks` worker. + /// + /// There should only ever be one `DiscoveryTask` running for each attempted connection, + pub(crate) fn new(ep: MagicEndpoint, chans: DiscoveryTasksChans) -> Result { + ensure!( + ep.discovery().is_some(), + "No discovery enabled, cannot start discovery tasks" + ); + let (sender, recv) = chans; + let handle = tokio::spawn(async move { + let mut tasks = BTreeMap::default(); + loop { + let msg = tokio::select! { + _ = ep.cancelled() => break, + msg = recv.recv_async() => { + match msg { + Err(e) => { + debug!("{e:?}"); + break; + }, + Ok(msg) => msg, + } + } + }; + match msg { + DiscoveryTaskMessage::Start{node_id, delay, on_first_tx} => { + if !DiscoveryTask::needs_discovery(&ep, node_id) { + trace!("Discovery for {node_id} requested, but the node does not need discovery."); + continue; + } + if let Some(new_task) = DiscoveryTask::start_after_delay(&ep, node_id, delay, on_first_tx) { + if let Some(old_task) = tasks.insert(node_id, new_task) { + old_task.cancel(); + } + } + } + DiscoveryTaskMessage::Cancel(node_id) => { + match tasks.remove(&node_id) { + None => trace!("Cancelled Discovery for {node_id}, but no Discovery for that id is currently running."), + Some(task) => task.cancel() + } + } + } + } + }); + Ok(DiscoveryTasks { + handle: Arc::new(handle), + sender, + }) + } + + /// Cancel a [`DiscoveryTask`] + /// + /// If the receiver is full, it drops the request. There will only ever be + /// one [`DiscoveryTask`] per node dialed, so if this happens, there is + /// something very wrong. + pub fn cancel(&self, node_id: NodeId) { + self.sender.send(DiscoveryTaskMessage::Cancel(node_id)).ok(); + } + + /// Start a [`DiscoveryTask`], if necessary. + /// + /// You can start the task on a delay by providing an optional [`Duration`]. + /// + /// If the receiver is full, it drops the request. There will only ever be + /// one [`DiscoveryTask`] per node dialed, so if this happens, there is + /// something very wrong. + pub fn start( + &self, + node_id: NodeId, + delay: Option, + on_first_tx: Option>>, + ) { + self.sender + .send(DiscoveryTaskMessage::Start { + node_id, + delay, + on_first_tx, + }) + .ok(); + } +} + +/// Messages used by the [`DiscoveryTasks`] struct to manage [`DiscoveryTask`]s. +#[derive(Debug)] +pub(super) enum DiscoveryTaskMessage { + /// Launch discovery for the given [`NodeId`] + Start { + /// The node ID for the node we are trying to discover + node_id: NodeId, + /// When `None`, start discovery immediately + /// When `Some`, start discovery after a delay. + delay: Option, + /// If it exists, send the first time a discovery message returns, + /// or send an error if the discovery was unable to occur. + on_first_tx: Option>>, + }, + /// Cancel any discovery for the given [`NodeId`] + Cancel(NodeId), +} + #[cfg(test)] mod tests { use std::{ diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index d0ff7f7d1c..738421a796 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -8,13 +8,14 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use derive_more::Debug; use futures_lite::StreamExt; use quinn_proto::VarInt; +use tokio::sync::oneshot; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use tracing::{debug, trace}; use crate::{ config, defaults::default_relay_map, - discovery::{Discovery, DiscoveryTask}, + discovery::{Discovery, DiscoveryTasks, DiscoveryTasksChans}, dns::{default_resolver, DnsResolver}, key::{PublicKey, SecretKey}, magicsock::{self, Handle}, @@ -204,17 +205,31 @@ impl MagicEndpointBuilder { .dns_resolver .unwrap_or_else(|| default_resolver().clone()); + // Discovery should not happen that often, and only happens for + // nodes we are trying to connect to or already connected to. + // TODO: possibly make this configurable? It should only be an issue + // if you are connected to many connections and all of them suddenly need + // discovery at the same time. + let discovery_tasks_chans = self.discovery.as_ref().map(|_| flume::bounded(64)); + let msock_opts = magicsock::Options { port: bind_port, secret_key, relay_map, nodes_path: self.peers_path, discovery: self.discovery, + discovery_tasks_sender: discovery_tasks_chans.as_ref().map(|(s, _)| s.clone()), dns_resolver, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, }; - MagicEndpoint::bind(Some(server_config), msock_opts, self.keylog).await + MagicEndpoint::bind( + Some(server_config), + msock_opts, + self.keylog, + discovery_tasks_chans, + ) + .await } } @@ -248,6 +263,7 @@ pub struct MagicEndpoint { endpoint: quinn::Endpoint, keylog: bool, cancel_token: CancellationToken, + discovery_tasks: Option, } impl MagicEndpoint { @@ -264,6 +280,7 @@ impl MagicEndpoint { server_config: Option, msock_opts: magicsock::Options, keylog: bool, + discovery_tasks: Option, ) -> Result { let secret_key = msock_opts.secret_key.clone(); let msock = magicsock::MagicSock::spawn(msock_opts).await?; @@ -285,13 +302,18 @@ impl MagicEndpoint { )?; trace!("created quinn endpoint"); - Ok(Self { + let mut ep = Self { secret_key: Arc::new(secret_key), msock, endpoint, keylog, cancel_token: CancellationToken::new(), - }) + discovery_tasks: None, + }; + let discovery_tasks = + discovery_tasks.map(|chans| DiscoveryTasks::new(ep.clone(), chans).expect("checked")); + ep.discovery_tasks = discovery_tasks; + Ok(ep) } /// Accept an incoming connection on the socket. @@ -459,7 +481,7 @@ impl MagicEndpoint { // Get the mapped IPv6 address from the magic socket. Quinn will connect to this address. // Start discovery for this node if it's enabled and we have no valid or verified // address information for this node. - let (addr, discovery) = self + let addr = self .get_mapping_addr_and_maybe_start_discovery(node_addr) .await?; @@ -473,8 +495,8 @@ impl MagicEndpoint { let conn = self.connect_quinn(&node_id, alpn, addr).await; // Cancel the node discovery task (if still running). - if let Some(discovery) = discovery { - discovery.cancel(); + if let Some(ref discovery_tasks) = self.discovery_tasks { + discovery_tasks.cancel(node_id); } conn @@ -525,7 +547,7 @@ impl MagicEndpoint { async fn get_mapping_addr_and_maybe_start_discovery( &self, node_addr: NodeAddr, - ) -> Result<(SocketAddr, Option)> { + ) -> Result { let node_id = node_addr.node_id; // Only return a mapped addr if we have some way of dialing this node, in other @@ -545,10 +567,10 @@ impl MagicEndpoint { // followed by a recheck before starting the discovery, to give the magicsocket a // chance to test the newly provided addresses. let delay = (!node_addr.info.is_empty()).then_some(DISCOVERY_WAIT_PERIOD); - let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay) - .ok() - .flatten(); - Ok((addr, discovery)) + if let Some(ref discovery_tasks) = self.discovery_tasks { + discovery_tasks.start(node_id, delay, None); + } + Ok(addr) } None => { @@ -556,14 +578,16 @@ impl MagicEndpoint { // So, we start a discovery task and wait for the first result to arrive, and // only then continue, because otherwise we wouldn't have any // path to the remote endpoint. - let mut discovery = DiscoveryTask::start(self.clone(), node_id)?; - discovery.first_arrived().await?; - if self.msock.has_send_address(node_id) { - let addr = self.msock.get_mapping_addr(&node_id).expect("checked"); - Ok((addr, Some(discovery))) - } else { - bail!("Failed to retrieve the mapped address from the magic socket. Unable to dial node {node_id:?}"); + if let Some(ref discovery_tasks) = self.discovery_tasks { + let (first_arrived_tx, first_arrived_rx) = oneshot::channel(); + discovery_tasks.start(node_id, None, Some(first_arrived_tx)); + let _ = first_arrived_rx.await; + if self.msock.has_send_address(node_id) { + let addr = self.msock.get_mapping_addr(&node_id).expect("checked"); + return Ok(addr); + } } + bail!("Failed to retrieve the mapped address from the magic socket. Unable to dial node {node_id:?}"); } } } diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index cb62c8db53..a6db2c6955 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -51,7 +51,6 @@ use watchable::Watchable; use crate::{ config, disco::{self, SendAddr}, - discovery::Discovery, dns::DnsResolver, key::{PublicKey, SecretKey, SharedSecret}, magic_endpoint::NodeAddr, @@ -62,12 +61,14 @@ use crate::{ }; use self::{ + discovery::{Discovery, DiscoveryService, MAX_AGE}, metrics::Metrics as MagicsockMetrics, node_map::{NodeMap, PingAction, PingRole, SendPing}, relay_actor::{RelayActor, RelayActorMessage, RelayReadResult}, udp_conn::UdpConn, }; +mod discovery; mod metrics; mod node_map; mod relay_actor; @@ -109,7 +110,7 @@ pub(super) struct Options { pub nodes_path: Option, /// Optional node discovery mechanism. - pub discovery: Option>, + pub discovery: Option>, /// A DNS resolver to use for resolving relay URLs. /// @@ -217,7 +218,7 @@ pub(super) struct MagicSock { udp_disco_sender: mpsc::Sender<(SocketAddr, PublicKey, disco::Message)>, /// Optional discovery service - discovery: Option>, + discovery: Option, /// Our discovered endpoints endpoints: Watchable, @@ -352,8 +353,31 @@ impl MagicSock { } /// Reference to optional discovery service - pub fn discovery(&self) -> Option<&dyn Discovery> { - self.discovery.as_ref().map(Box::as_ref) + pub fn discovery(&self) -> Option<&DiscoveryService> { + self.discovery.as_ref() + } + + /// Check if an endpoint needs discovery. + /// + /// We need discovery if we have no paths to the node, or if the paths we do have + /// have timed out. + pub fn needs_discovery(&self, node_id: PublicKey) -> bool { + match self.connection_info(node_id) { + None => true, + + Some(info) => { + match (info.last_received(), info.last_alive_relay()) { + // No path to node -> start discovery. + (None, None) => true, + // If we haven't received on direct addresses or the relay for MAX_AGE, + // start discovery. + (Some(elapsed), Some(elapsed_relay)) => { + elapsed > MAX_AGE && elapsed_relay > MAX_AGE + } + (Some(elapsed), _) | (_, Some(elapsed)) => elapsed > MAX_AGE, + } + } + } } /// Call to notify the system of potential network changes. @@ -508,11 +532,17 @@ impl MagicSock { if udp_addr.is_none() && relay_url.is_none() { // Handle no addresses being available - warn!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::NotConnected, - "no UDP or relay address available for node", - ))); + // If discovery exists, start discovery + if let Some(ref discovery) = self.discovery { + debug!(node = %public_key.fmt_short(), "no UDP or relay addr, starting discovery"); + discovery.start(public_key); + } else { + warn!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); + } + + // TODO: should this be Poll::Pending or Poll::Ready(Ok(0))? + // we want the connection to timeout + return Poll::Pending; } if (udp_addr.is_none() || udp_pending) && (relay_url.is_none() || relay_pending) { @@ -525,14 +555,14 @@ impl MagicSock { } if !relay_sent && !udp_sent && !pings_sent { + // TODO: If some udp socket error exists, we should report it, right? + if let Some(udp_error) = udp_error { + return Poll::Ready(Err(udp_error)); + } warn!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); - let err = udp_error.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::NotConnected, - "no UDP or relay address available for node", - ) - }); - return Poll::Ready(Err(err)); + // TODO: should this be Poll::Pending or Poll::Ready(Ok(0))? + // we want the connection to timeout + return Poll::Pending; } trace!( @@ -1276,6 +1306,8 @@ impl Handle { insecure_skip_relay_cert_verify, } = opts; + let discovery = discovery.map(|d| DiscoveryService::new(d)); + let nodes_path = match nodes_path { Some(path) => { let path = path.canonicalize().unwrap_or(path); diff --git a/iroh-net/src/magicsock/discovery.rs b/iroh-net/src/magicsock/discovery.rs new file mode 100644 index 0000000000..2c9191fb27 --- /dev/null +++ b/iroh-net/src/magicsock/discovery.rs @@ -0,0 +1,344 @@ +//! Trait and utils for the node discovery mechanism. +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use anyhow::{anyhow, Result}; +use futures_lite::{stream::Boxed as BoxStream, StreamExt}; +use iroh_base::node_addr::NodeAddr; +use tokio::{sync::oneshot, task::JoinHandle}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error_span, trace, warn, Instrument}; + +use crate::{AddrInfo, NodeId}; + +/// Default amout of time we wait for discovery before closing the process. +pub(super) const DISCOVERY_TIMEOUT: Duration = Duration::from_secs(60); +/// Maximum duration since the last control or data message received from an endpoint to make us +/// start a discovery task. +pub(super) const MAX_AGE: Duration = Duration::from_secs(10); + +/// Node discovery for [`super::MagicSock`]. +/// +/// The purpose of this trait is to hook up a node discovery mechanism that +/// allows finding information such as the relay URL and direct addresses +/// of a node given its [`NodeId`]. +/// +/// To allow for discovery, the [`super::MagicSock`] will call `publish` whenever +/// discovery information changes. If a discovery mechanism requires a periodic +/// refresh, it should start its own task. +pub trait Discovery: std::fmt::Debug + Send + Sync { + /// Publish the given [`AddrInfo`] to the discovery mechanisms. + /// + /// This is fire and forget, since the magicsock can not wait for successful + /// publishing. If publishing is async, the implementation should start it's + /// own task. + /// + /// This will be called from a tokio task, so it is safe to spawn new tasks. + /// These tasks will be run on the runtime of the [`super::MagicSock`]. + fn publish(&self, _info: &AddrInfo) {} + + /// Resolve the [`AddrInfo`] for the given [`NodeId`]. + /// + /// Once the returned [`BoxStream`] is dropped, the service should stop any pending + /// work. + fn resolve(&self, _node_id: NodeId) -> Option>> { + None + } +} + +/// The results returned from [`Discovery::resolve`]. +#[derive(Debug, Clone)] +pub struct DiscoveryItem { + /// A static string to identify the discovery source. + /// + /// Should be uniform per discovery service. + pub provenance: &'static str, + /// Optional timestamp when this node address info was last updated. + /// + /// Must be microseconds since the unix epoch. + pub last_updated: Option, + /// The adress info for the node being resolved. + pub addr_info: AddrInfo, +} +/// A discovery service that combines multiple discovery sources. +/// +/// The discovery services will resolve concurrently. +#[derive(Debug, Default)] +pub struct ConcurrentDiscovery { + services: Vec>, +} + +impl ConcurrentDiscovery { + /// Create a empty [`ConcurrentDiscovery`]. + pub fn empty() -> Self { + Self::default() + } + + /// Create a new [`ConcurrentDiscovery`]. + pub fn from_services(services: Vec>) -> Self { + Self { services } + } + + /// Add a [`Discovery`] service. + pub fn add(&mut self, service: impl Discovery + 'static) { + self.services.push(Box::new(service)); + } +} + +impl From for ConcurrentDiscovery +where + T: IntoIterator>, +{ + fn from(iter: T) -> Self { + let services = iter.into_iter().collect::>(); + Self { services } + } +} + +impl Discovery for ConcurrentDiscovery { + fn publish(&self, info: &AddrInfo) { + for service in &self.services { + service.publish(info); + } + } + + fn resolve(&self, node_id: NodeId) -> Option>> { + let streams = self + .services + .iter() + .filter_map(|service| service.resolve(node_id)); + + let streams = futures_buffered::Merge::from_iter(streams); + Some(Box::pin(streams)) + } +} + +#[derive(Debug)] +pub(super) struct DiscoveryService { + handle: JoinHandle<()>, + sender: flume::Sender, + cancel: CancellationToken, +} + +impl DiscoveryService { + pub(super) fn new(discovery: Arc) -> Self { + let cancel = CancellationToken::new(); + let (sender, recv) = flume::bounded(64); + let handle = tokio::spawn(async move { + let mut tasks: BTreeMap = BTreeMap::default(); + loop { + let msg = tokio::select! { + _ = cancel.cancelled() => break, + msg = recv.recv_async() => { + match msg { + Err(e) => { + debug!("{e:?}"); + break; + }, + Ok(msg) => msg, + } + } + }; + match msg { + DiscoveryServiceMessage::Start{node_id, delay, on_first_tx} => { + if let Some(new_task) = DiscoveryTask::start_after_delay(discovery.clone(), node_id, delay, on_first_tx, cancel.clone()) { + if let Some(old_task) = tasks.insert(node_id, new_task) { + old_task.cancel(); + } + } + } + DiscoveryServiceMessage::Cancel(node_id) => { + match tasks.remove(&node_id) { + None => trace!("Cancelled Discovery for {node_id}, but no Discovery for that id is currently running."), + Some(task) => task.cancel() + } + } + DiscoveryServiceMessage::Publish(addr_info) => { + discovery.publish(&addr_info); + } + } + } + }); + Self { + handle, + sender, + cancel, + } + } + + pub(super) fn publish(&self, info: &AddrInfo) { + self.sender + .send(DiscoveryServiceMessage::Publish(*info)) + .ok(); + } + + pub(super) fn start(&self, node_id: NodeId) { + self.sender + .send(DiscoveryServiceMessage::Start { + node_id, + delay: None, + on_first_tx: None, + }) + .ok(); + } + + pub(super) fn start_with_delay(&self, node_id: NodeId, delay: Duration) { + self.sender + .send(DiscoveryServiceMessage::Start { + node_id, + delay: Some(delay), + on_first_tx: None, + }) + .ok(); + } + + pub(super) fn start_with_alert( + &self, + node_id: NodeId, + on_first_tx: oneshot::Sender>, + ) { + self.sender + .send(DiscoveryServiceMessage::Start { + node_id, + delay: None, + on_first_tx: Some(on_first_tx), + }) + .ok(); + } + + pub(super) fn cancel(&self, node_id: NodeId) { + self.sender + .send(DiscoveryServiceMessage::Cancel(node_id)) + .ok(); + } +} + +impl Drop for DiscoveryService { + fn drop(&mut self) { + self.cancel.cancel(); + self.handle.abort(); + } +} + +/// Messages used by the [`DiscoveryService`] struct to manage [`DiscoveryService`]s. +#[derive(Debug)] +pub(super) enum DiscoveryServiceMessage { + /// Launch discovery for the given [`NodeId`] + Start { + /// The node ID for the node we are trying to discover + node_id: NodeId, + /// When `None`, start discovery immediately + /// When `Some`, start discovery after a delay. + delay: Option, + /// If it exists, send the first address you receive, + /// or send an error if the discovery was unable to occur. + on_first_tx: Option>>, + }, + /// Cancel any discovery for the given [`NodeId`] + Cancel(NodeId), + /// Publish your address info + Publish(AddrInfo), +} + +/// A wrapper around a tokio task which runs a node discovery. +#[derive(derive_more::Debug)] +pub(super) struct DiscoveryTask { + task: JoinHandle<()>, +} + +impl DiscoveryTask { + /// Start a discovery task after a delay + /// + /// This returns `None` if we received data or control messages from the remote endpoint + /// recently enough. If not it returns a [`DiscoveryTask`]. + /// + /// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again + /// if we recently received messages from remote endpoint. If true, the task will abort. + /// Otherwise, or if no `delay` is set, the discovery will be started. + pub fn start_after_delay( + discovery: Arc, + node_id: NodeId, + delay: Option, + on_first_tx: Option>>, + cancel: CancellationToken, + ) -> Option { + let task = tokio::task::spawn( + async move { + // If delay is set, wait and recheck if discovery is needed. If not, early-exit. + if let Some(delay) = delay { + tokio::time::sleep(delay).await; + } + Self::run(discovery, node_id, on_first_tx, cancel).await + } + .instrument(error_span!("discovery", node = %node_id.fmt_short())), + ); + Some(Self { task }) + } + + /// Cancel the discovery task. + pub fn cancel(&self) { + self.task.abort(); + } + + fn create_stream( + discovery: Arc, + node_id: NodeId, + ) -> Result>> { + let stream = discovery + .resolve(node_id) + .ok_or_else(|| anyhow!("No discovery service can resolve node {node_id}",))?; + Ok(stream) + } + + async fn run( + discovery: Arc, + node_id: NodeId, + mut on_first_tx: Option>>, + cancel: CancellationToken, + ) { + let mut stream = match Self::create_stream(discovery, node_id) { + Ok(stream) => stream, + Err(err) => { + on_first_tx.map(|s| s.send(Err(err)).ok()); + return; + } + }; + debug!("discovery: start"); + loop { + let next = tokio::select! { + _ = cancel.cancelled() => break, + next = stream.next() => next + }; + match next { + Some(Ok(r)) => { + if r.addr_info.is_empty() { + debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: empty address found"); + continue; + } + debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: new address found"); + let addr = NodeAddr { + info: r.addr_info, + node_id, + }; + if let Some(tx) = on_first_tx.take() { + tx.send(Ok(addr)).ok(); + } + } + Some(Err(err)) => { + warn!(?err, "discovery service produced error"); + break; + } + None => break, + } + } + if let Some(tx) = on_first_tx.take() { + let err = anyhow!("Discovery produced no results for {}", node_id.fmt_short()); + tx.send(Err(err)).ok(); + } + } +} + +impl Drop for DiscoveryTask { + fn drop(&mut self) { + self.task.abort(); + } +}