diff --git a/iroh-dns-server/examples/resolve.rs b/iroh-dns-server/examples/resolve.rs index b9464ab38c..e660c6335c 100644 --- a/iroh-dns-server/examples/resolve.rs +++ b/iroh-dns-server/examples/resolve.rs @@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> { TxtAttrs::::lookup_by_id(&resolver, &node_id, origin).await? } Command::Domain { domain } => { - TxtAttrs::::lookup_by_domain(&resolver, &domain).await? + TxtAttrs::::lookup_by_name(&resolver, &domain).await? } }; println!("resolved node {}", resolved.node_id()); diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index 95e09bfc1e..117b2c6d9c 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -22,10 +22,7 @@ mod tests { }; use iroh_net::{ discovery::pkarr_publish::PkarrRelayClient, - dns::{ - node_info::{lookup_by_id, NodeInfo}, - DnsResolver, - }, + dns::{node_info::NodeInfo, DnsResolver, ResolverExt}, key::SecretKey, }; use pkarr::{PkarrClient, SignedPacket}; @@ -168,7 +165,7 @@ mod tests { pkarr.publish(&signed_packet).await?; let resolver = test_resolver(nameserver); - let res = lookup_by_id(&resolver, &node_id, origin).await?; + let res = resolver.lookup_by_id(&node_id, origin).await?; assert_eq!(res.node_id, node_id); assert_eq!(res.info.relay_url.map(Url::from), Some(relay_url)); @@ -204,7 +201,7 @@ mod tests { // resolve via DNS from our server, which will lookup from our DHT let resolver = test_resolver(nameserver); - let res = lookup_by_id(&resolver, &node_id, origin).await?; + let res = resolver.lookup_by_id(&node_id, origin).await?; assert_eq!(res.node_id, node_id); assert_eq!(res.info.relay_url.map(Url::from), Some(relay_url)); diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index 0fba47ca29..0b935621c2 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -562,7 +562,7 @@ mod test_dns_pkarr { use crate::{ discovery::pkarr_publish::PkarrPublisher, - dns::node_info::{lookup_by_id, NodeInfo}, + dns::{node_info::NodeInfo, ResolverExt}, relay::{RelayMap, RelayMode}, test_utils::{ dns_server::{create_dns_resolver, run_dns_server}, @@ -590,7 +590,7 @@ mod test_dns_pkarr { state.upsert(signed_packet)?; let resolver = create_dns_resolver(nameserver)?; - let resolved = lookup_by_id(&resolver, &node_info.node_id, &origin).await?; + let resolved = resolver.lookup_by_id(&node_info.node_id, &origin).await?; assert_eq!(resolved, node_info.into()); @@ -620,7 +620,7 @@ mod test_dns_pkarr { publisher.update_addr_info(&addr_info); // wait until our shared state received the update from pkarr publishing dns_pkarr_server.on_node(&node_id, timeout).await?; - let resolved = lookup_by_id(&resolver, &node_id, &origin).await?; + let resolved = resolver.lookup_by_id(&node_id, &origin).await?; let expected = NodeAddr { info: addr_info, diff --git a/iroh-net/src/discovery/dns.rs b/iroh-net/src/discovery/dns.rs index 76f487456f..13706deb5d 100644 --- a/iroh-net/src/discovery/dns.rs +++ b/iroh-net/src/discovery/dns.rs @@ -5,11 +5,13 @@ use futures_lite::stream::Boxed as BoxStream; use crate::{ discovery::{Discovery, DiscoveryItem}, - dns, Endpoint, NodeId, + dns::ResolverExt, + Endpoint, NodeId, }; /// The n0 testing DNS node origin pub const N0_DNS_NODE_ORIGIN: &str = "dns.iroh.link"; +const DNS_STAGGERING_MS: &[u64] = &[200, 300]; /// DNS node discovery /// @@ -53,8 +55,9 @@ impl Discovery for DnsDiscovery { let resolver = ep.dns_resolver().clone(); let origin_domain = self.origin_domain.clone(); let fut = async move { - let node_addr = - dns::node_info::lookup_by_id(&resolver, &node_id, &origin_domain).await?; + let node_addr = resolver + .lookup_by_id_staggered(&node_id, &origin_domain, DNS_STAGGERING_MS) + .await?; Ok(DiscoveryItem { provenance: "dns", last_updated: None, diff --git a/iroh-net/src/dns.rs b/iroh-net/src/dns.rs index 068166bb54..1ac64c2f7f 100644 --- a/iroh-net/src/dns.rs +++ b/iroh-net/src/dns.rs @@ -1,12 +1,19 @@ //! This module exports a DNS resolver, which is also the default resolver used in the //! [`crate::Endpoint`] if no custom resolver is configured. +//! +//! It also exports [`ResolverExt`]: A extension trait over [`DnsResolver`] to perform DNS queries +//! by ipv4, ipv6, name and node_id. See the [`node_info`] module documentation for details on how +//! iroh node records are structured. +use std::fmt::Write; use std::net::{IpAddr, Ipv6Addr}; use std::time::Duration; use anyhow::Result; -use futures_lite::Future; +use futures_lite::{Future, StreamExt}; use hickory_resolver::{AsyncResolver, IntoName, TokioAsyncResolver}; +use iroh_base::key::NodeId; +use iroh_base::node_addr::NodeAddr; use once_cell::sync::Lazy; pub mod node_info; @@ -78,14 +85,14 @@ fn create_default_resolver() -> Result { /// Extension trait to [`DnsResolver`]. pub trait ResolverExt { /// Perform an ipv4 lookup with a timeout. - fn lookup_ipv4( + fn lookup_ipv4( &self, host: N, timeout: Duration, ) -> impl Future>>; /// Perform an ipv6 lookup with a timeout. - fn lookup_ipv6( + fn lookup_ipv6( &self, host: N, timeout: Duration, @@ -97,10 +104,85 @@ pub trait ResolverExt { host: N, timeout: Duration, ) -> impl Future>>; + + /// Looks up node info by DNS name. + fn lookup_by_name(&self, name: &str) -> impl Future>; + + /// Looks up node info by [`NodeId`] and origin domain name. + fn lookup_by_id( + &self, + node_id: &NodeId, + origin: &str, + ) -> impl Future>; + + /// Perform an ipv4 lookup with a timeout in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The + /// result of the first successful call is returned, or a summary of all errors otherwise. + fn lookup_ipv4_staggered( + &self, + host: N, + timeout: Duration, + delays_ms: &[u64], + ) -> impl Future>>; + + /// Perform an ipv6 lookup with a timeout in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The + /// result of the first successful call is returned, or a summary of all errors otherwise. + fn lookup_ipv6_staggered( + &self, + host: N, + timeout: Duration, + delays_ms: &[u64], + ) -> impl Future>>; + + /// Race an ipv4 and ipv6 lookup with a timeout in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The `timeout` is applied as stated in + /// [`Self::lookup_ipv4_ipv6`]. The result of the first successful call is returned, or a + /// summary of all errors otherwise. + fn lookup_ipv4_ipv6_staggered( + &self, + host: N, + timeout: Duration, + delays_ms: &[u64], + ) -> impl Future>>; + + /// Looks up node info by DNS name in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The result of the first successful call is returned, or a + /// summary of all errors otherwise. + fn lookup_by_name_staggered( + &self, + name: &str, + delays_ms: &[u64], + ) -> impl Future>; + + /// Looks up node info by [`NodeId`] and origin domain name. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The result of the first successful call is returned, or a + /// summary of all errors otherwise. + fn lookup_by_id_staggered( + &self, + node_id: &NodeId, + origin: &str, + delays_ms: &[u64], + ) -> impl Future>; } impl ResolverExt for DnsResolver { - async fn lookup_ipv4( + async fn lookup_ipv4( &self, host: N, timeout: Duration, @@ -109,7 +191,7 @@ impl ResolverExt for DnsResolver { Ok(addrs.into_iter().map(|ip| IpAddr::V4(ip.0))) } - async fn lookup_ipv6( + async fn lookup_ipv6( &self, host: N, timeout: Duration, @@ -142,9 +224,103 @@ impl ResolverExt for DnsResolver { } } } + + /// Looks up node info by DNS name. + /// + /// The resource records returned for `name` must either contain an [`node_info::IROH_TXT_NAME`] TXT + /// record or be a CNAME record that leads to an [`node_info::IROH_TXT_NAME`] TXT record. + async fn lookup_by_name(&self, name: &str) -> Result { + let attrs = node_info::TxtAttrs::::lookup_by_name(self, name).await?; + let info: node_info::NodeInfo = attrs.into(); + Ok(info.into()) + } + + /// Looks up node info by [`NodeId`] and origin domain name. + async fn lookup_by_id(&self, node_id: &NodeId, origin: &str) -> Result { + let attrs = + node_info::TxtAttrs::::lookup_by_id(self, node_id, origin).await?; + let info: node_info::NodeInfo = attrs.into(); + Ok(info.into()) + } + + /// Perform an ipv4 lookup with a timeout in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The + /// result of the first successful call is returned, or a summary of all errors otherwise. + async fn lookup_ipv4_staggered( + &self, + host: N, + timeout: Duration, + delays_ms: &[u64], + ) -> Result> { + let f = || self.lookup_ipv4(host.clone(), timeout); + stagger_call(f, delays_ms).await + } + + /// Perform an ipv6 lookup with a timeout in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The + /// result of the first successful call is returned, or a summary of all errors otherwise. + async fn lookup_ipv6_staggered( + &self, + host: N, + timeout: Duration, + delays_ms: &[u64], + ) -> Result> { + let f = || self.lookup_ipv6(host.clone(), timeout); + stagger_call(f, delays_ms).await + } + + /// Race an ipv4 and ipv6 lookup with a timeout in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The `timeout` is applied as stated in + /// [`Self::lookup_ipv4_ipv6`]. The result of the first successful call is returned, or a + /// summary of all errors otherwise. + async fn lookup_ipv4_ipv6_staggered( + &self, + host: N, + timeout: Duration, + delays_ms: &[u64], + ) -> Result> { + let f = || self.lookup_ipv4_ipv6(host.clone(), timeout); + stagger_call(f, delays_ms).await + } + + /// Looks up node info by DNS name in a staggered fashion. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The result of the first successful call is returned, or a + /// summary of all errors otherwise. + async fn lookup_by_name_staggered(&self, name: &str, delays_ms: &[u64]) -> Result { + let f = || self.lookup_by_name(name); + stagger_call(f, delays_ms).await + } + + /// Looks up node info by [`NodeId`] and origin domain name. + /// + /// From the moment this function is called, each lookup is scheduled after the delays in + /// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls + /// at T+0ms, T+200ms and T+300ms. The result of the first successful call is returned, or a + /// summary of all errors otherwise. + async fn lookup_by_id_staggered( + &self, + node_id: &NodeId, + origin: &str, + delays_ms: &[u64], + ) -> Result { + let f = || self.lookup_by_id(node_id, origin); + stagger_call(f, delays_ms).await + } } -/// Helper enum to give a unified type to the iterators of [`ResolverExt::lookup_ipv4_ipv6`] +/// Helper enum to give a unified type to the iterators of [`ResolverExt::lookup_ipv4_ipv6`]. enum LookupIter { Ipv4(A), Ipv6(B), @@ -163,11 +339,53 @@ impl, B: Iterator> Iterator for Lookup } } +/// Staggers calls to the future F with the given delays. +/// +/// The first call is performed immediately. The first call to succeed generates an Ok result +/// ignoring any previous error. If all calls fail, an error sumarizing all errors is returned. +async fn stagger_call Fut, Fut: Future>>( + f: F, + delays_ms: &[u64], +) -> Result { + let mut calls = futures_buffered::FuturesUnorderedBounded::new(delays_ms.len() + 1); + // NOTE: we add the 0 delay here to have a uniform set of futures. This is more performant than + // using alternatives that allow futures of different types. + for delay in std::iter::once(&0u64).chain(delays_ms) { + let delay = std::time::Duration::from_millis(*delay); + let fut = f(); + let staggered_fut = async move { + tokio::time::sleep(delay).await; + fut.await + }; + calls.push(staggered_fut) + } + + let mut errors = vec![]; + while let Some(call_result) = calls.next().await { + match call_result { + Ok(t) => return Ok(t), + Err(e) => errors.push(e), + } + } + + anyhow::bail!( + "no calls succeed: [ {}]", + errors.into_iter().fold(String::new(), |mut summary, e| { + write!(summary, "{e} ").expect("infallible"); + summary + }) + ) +} + #[cfg(test)] pub(crate) mod tests { + use std::sync::atomic::AtomicUsize; + use crate::defaults::NA_RELAY_HOSTNAME; use super::*; + const TIMEOUT: Duration = Duration::from_secs(5); + const STAGGERING_DELAYS: &[u64] = &[200, 300]; #[tokio::test] #[cfg_attr(target_os = "windows", ignore = "flaky")] @@ -181,16 +399,33 @@ pub(crate) mod tests { } #[tokio::test] - #[cfg_attr(target_os = "windows", ignore = "flaky")] async fn test_dns_lookup_ipv4_ipv6() { let _logging = iroh_test::logging::setup(); let resolver = default_resolver(); let res: Vec<_> = resolver - .lookup_ipv4_ipv6(NA_RELAY_HOSTNAME, Duration::from_secs(5)) + .lookup_ipv4_ipv6_staggered(NA_RELAY_HOSTNAME, TIMEOUT, STAGGERING_DELAYS) .await .unwrap() .collect(); assert!(!res.is_empty()); dbg!(res); } + + #[tokio::test] + async fn stagger_basic() { + let _logging = iroh_test::logging::setup(); + const CALL_RESULTS: &[Result] = &[Err(2), Ok(3), Ok(5), Ok(7)]; + static DONE_CALL: AtomicUsize = AtomicUsize::new(0); + let f = || { + let r_pos = DONE_CALL.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + async move { + tracing::info!(r_pos, "call"); + CALL_RESULTS[r_pos].map_err(|e| anyhow::anyhow!("{e}")) + } + }; + + let delays = [1000, 15]; + let result = stagger_call(f, &delays).await.unwrap(); + assert_eq!(result, 5) + } } diff --git a/iroh-net/src/dns/node_info.rs b/iroh-net/src/dns/node_info.rs index 4e34107674..16bf532cf4 100644 --- a/iroh-net/src/dns/node_info.rs +++ b/iroh-net/src/dns/node_info.rs @@ -62,27 +62,6 @@ pub enum IrohAttr { Addr, } -/// Looks up node info by DNS name. -/// -/// The resource records returned for `name` must either contain an [`IROH_TXT_NAME`] TXT -/// record or be a CNAME record that leads to an [`IROH_TXT_NAME`] TXT record. -pub async fn lookup_by_domain(resolver: &TokioAsyncResolver, name: &str) -> Result { - let attrs = TxtAttrs::::lookup_by_domain(resolver, name).await?; - let info: NodeInfo = attrs.into(); - Ok(info.into()) -} - -/// Looks up node info by [`NodeId`] and origin domain name. -pub async fn lookup_by_id( - resolver: &TokioAsyncResolver, - node_id: &NodeId, - origin: &str, -) -> Result { - let attrs = TxtAttrs::::lookup_by_id(resolver, node_id, origin).await?; - let info: NodeInfo = attrs.into(); - Ok(info.into()) -} - /// Encodes a [`NodeId`] in [`z-base-32`] encoding. /// /// [z-base-32]: https://philzimmermann.com/docs/human-oriented-base-32-encoding.txt @@ -300,7 +279,7 @@ impl TxtAttrs { } /// Looks up attributes by DNS name. - pub async fn lookup_by_domain(resolver: &TokioAsyncResolver, name: &str) -> Result { + pub async fn lookup_by_name(resolver: &TokioAsyncResolver, name: &str) -> Result { let name = Name::from_str(name)?; TxtAttrs::lookup(resolver, name).await } diff --git a/iroh-net/src/netcheck/reportgen.rs b/iroh-net/src/netcheck/reportgen.rs index 2085610bde..665ea66e54 100644 --- a/iroh-net/src/netcheck/reportgen.rs +++ b/iroh-net/src/netcheck/reportgen.rs @@ -73,6 +73,9 @@ const ENOUGH_NODES: usize = 3; const DNS_TIMEOUT: Duration = Duration::from_secs(3); +/// Delay used to perform staggered dns queries. +const DNS_STAGGERING_MS: &[u64] = &[200, 300]; + /// Holds the state for a single invocation of [`netcheck::Client::get_report`]. /// /// Dropping this will cancel the actor and stop the report generation. @@ -192,7 +195,7 @@ struct Actor { /// /// This is essentially the summary of all the work the [`Actor`] is doing. outstanding_tasks: OutstandingTasks, - /// The DNS resolver to use for probes that need to resolve DNS records + /// The DNS resolver to use for probes that need to resolve DNS records. dns_resolver: DnsResolver, } @@ -945,7 +948,10 @@ async fn get_relay_addr( ProbeProto::StunIpv4 | ProbeProto::IcmpV4 => match relay_node.url.host() { Some(url::Host::Domain(hostname)) => { debug!(?proto, %hostname, "Performing DNS A lookup for relay addr"); - match dns_resolver.lookup_ipv4(hostname, DNS_TIMEOUT).await { + match dns_resolver + .lookup_ipv4_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS) + .await + { Ok(mut addrs) => addrs .next() .map(ip::to_canonical) @@ -962,7 +968,10 @@ async fn get_relay_addr( ProbeProto::StunIpv6 | ProbeProto::IcmpV6 => match relay_node.url.host() { Some(url::Host::Domain(hostname)) => { debug!(?proto, %hostname, "Performing DNS AAAA lookup for relay addr"); - match dns_resolver.lookup_ipv6(hostname, DNS_TIMEOUT).await { + match dns_resolver + .lookup_ipv6_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS) + .await + { Ok(mut addrs) => addrs .next() .map(ip::to_canonical) @@ -1316,7 +1325,6 @@ mod tests { // // TODO: Not sure what about IPv6 pings using sysctl. #[tokio::test] - #[cfg_attr(target_os = "windows", ignore = "flaky")] async fn test_icmpk_probe_eu_relayer() { let _logging_guard = iroh_test::logging::setup(); let pinger = Pinger::new();