diff --git a/iroh/src/discovery/pkarr/dht.rs b/iroh/src/discovery/pkarr/dht.rs index 9da6985337..2729a57270 100644 --- a/iroh/src/discovery/pkarr/dht.rs +++ b/iroh/src/discovery/pkarr/dht.rs @@ -12,6 +12,7 @@ use std::{ time::Duration, }; +use anyhow::Result; use futures_lite::{ stream::{Boxed, StreamExt}, FutureExt, @@ -91,6 +92,73 @@ struct Inner { republish_delay: Duration, } +impl Inner { + async fn resolve_relay(&self, key: pkarr::PublicKey) -> Option> { + tracing::info!("resolving {} from relay {:?}", key.to_z32(), self.relay_url); + + let maybe_packet = self + .pkarr_relay + .as_ref() + .expect("checked") + .resolve(&key) + .await; + match maybe_packet { + Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) { + Ok(node_info) => { + let node_addr: NodeAddr = node_info.into(); + + tracing::info!("discovered node info from relay {:?}", node_addr); + Some(Ok(DiscoveryItem { + node_addr, + provenance: "relay", + last_updated: None, + })) + } + Err(_err) => { + tracing::debug!("failed to parse signed packet as node info"); + None + } + }, + Ok(None) => { + tracing::debug!("no signed packet found in relay"); + None + } + Err(err) => { + tracing::debug!("failed to get signed packet from relay: {}", err); + Some(Err(err.into())) + } + } + } + async fn resolve_dht(&self, key: pkarr::PublicKey) -> Option> { + tracing::info!("resolving {} from DHT", key.to_z32()); + + let maybe_packet = self.pkarr.resolve(&key).await; + match maybe_packet { + Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) { + Ok(node_info) => { + let node_addr: NodeAddr = node_info.into(); + tracing::info!("discovered node info from DHT {:?}", node_addr); + Some(Ok(DiscoveryItem { + node_addr, + provenance: "mainline", + last_updated: None, + })) + } + Err(_err) => { + tracing::debug!("failed to parse signed packet as node info"); + None + } + }, + Ok(None) => { + // nothing to do + tracing::debug!("no signed packet found in DHT"); + None + } + Err(err) => Some(Err(err.into())), + } + } +} + /// Builder for [`DhtDiscovery`]. /// /// By default, publishing to the DHT is enabled, and relay publishing is disabled. @@ -181,7 +249,7 @@ impl Builder { } /// Builds the discovery mechanism. - pub fn build(self) -> anyhow::Result { + pub fn build(self) -> Result { let pkarr = match self.client { Some(client) => client, None => PkarrClient::new(Default::default())?, @@ -319,93 +387,15 @@ impl Discovery for DhtDiscovery { let mut stream = futures_buffered::FuturesUnorderedBounded::new(2); if self.0.pkarr_relay.is_some() { - tracing::info!( - "resolving {} from relay {:?}", - pkarr_public_key.to_z32(), - self.0.relay_url - ); let key = pkarr_public_key.clone(); let discovery = self.0.clone(); - stream.push( - async move { - let maybe_packet = discovery - .pkarr_relay - .as_ref() - .expect("checked") - .resolve(&key) - .await; - match maybe_packet { - Ok(Some(signed_packet)) => { - match NodeInfo::from_pkarr_signed_packet(&signed_packet) { - Ok(node_info) => { - let node_addr: NodeAddr = node_info.into(); - - tracing::info!( - "discovered node info from relay {:?}", - node_addr - ); - Some(Ok(DiscoveryItem { - node_addr, - provenance: "relay", - last_updated: None, - })) - } - Err(_err) => { - tracing::debug!("failed to parse signed packet as node info"); - None - } - } - } - Ok(None) => { - tracing::debug!("no signed packet found in relay"); - None - } - Err(err) => { - tracing::debug!("failed to get signed packet from relay: {}", err); - Some(Err(err.into())) - } - } - } - .boxed(), - ); + stream.push(async move { discovery.resolve_relay(key).await }.boxed()); } if self.0.dht { - tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32()); - let key = pkarr_public_key.clone(); let discovery = self.0.clone(); - stream.push( - async move { - let maybe_packet = discovery.pkarr.resolve(&key).await; - match maybe_packet { - Ok(Some(signed_packet)) => { - match NodeInfo::from_pkarr_signed_packet(&signed_packet) { - Ok(node_info) => { - let node_addr: NodeAddr = node_info.into(); - tracing::info!("discovered node info from DHT {:?}", node_addr); - Some(Ok(DiscoveryItem { - node_addr, - provenance: "mainline", - last_updated: None, - })) - } - Err(_err) => { - tracing::debug!("failed to parse signed packet as node info"); - None - } - } - } - Ok(None) => { - // nothing to do - tracing::debug!("no signed packet found in DHT"); - None - } - Err(err) => Some(Err(err.into())), - } - } - .boxed(), - ); + stream.push(async move { discovery.resolve_dht(key).await }.boxed()); } Some(stream.filter_map(|t| t).boxed())