diff --git a/Cargo.lock b/Cargo.lock index 8f5cdf2390..f4d214ce5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1405,37 +1405,6 @@ dependencies = [ "slab", ] -[[package]] -name = "genawaiter" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" -dependencies = [ - "futures-core", - "genawaiter-macro", - "genawaiter-proc-macro", - "proc-macro-hack", -] - -[[package]] -name = "genawaiter-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" - -[[package]] -name = "genawaiter-proc-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" -dependencies = [ - "proc-macro-error", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "generator" version = "0.7.5" @@ -2168,7 +2137,6 @@ dependencies = [ "futures-lite 2.5.0", "futures-sink", "futures-util", - "genawaiter", "governor 0.7.0", "hex", "hickory-resolver", @@ -3608,38 +3576,6 @@ dependencies = [ "toml_edit", ] -[[package]] -name = "proc-macro-error" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "syn-mid", - "version_check", -] - -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.92" @@ -4789,17 +4725,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn-mid" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "sync_wrapper" version = "1.0.2" diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 418d88103c..e6e226a921 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -115,9 +115,6 @@ iroh-metrics = { version = "0.29", default-features = false } data-encoding = { version = "2.2", optional = true } swarm-discovery = { version = "0.3.0-alpha.1", optional = true } -# dht_discovery -genawaiter = { version = "0.99", features = ["futures03"], optional = true } - # Examples clap = { version = "4", features = ["derive"], optional = true } tracing-subscriber = { version = "0.3", features = [ @@ -179,7 +176,7 @@ default = ["metrics", "discovery-pkarr-dht"] metrics = ["iroh-metrics/metrics", "iroh-relay/metrics", "net-report/metrics", "portmapper/metrics"] test-utils = ["iroh-relay/test-utils", "iroh-relay/server", "dep:axum"] discovery-local-network = ["dep:data-encoding", "dep:swarm-discovery"] -discovery-pkarr-dht = ["pkarr/dht", "dep:genawaiter"] +discovery-pkarr-dht = ["pkarr/dht"] examples = [ "dep:clap", "dep:tracing-subscriber", diff --git a/iroh/src/discovery/pkarr/dht.rs b/iroh/src/discovery/pkarr/dht.rs index 63ca19a0cb..1de962344c 100644 --- a/iroh/src/discovery/pkarr/dht.rs +++ b/iroh/src/discovery/pkarr/dht.rs @@ -12,12 +12,14 @@ use std::{ time::Duration, }; -use futures_lite::{stream::Boxed, StreamExt}; -use genawaiter::sync::{Co, Gen}; +use futures_lite::{ + stream::{Boxed, StreamExt}, + FutureExt, +}; use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey}; use pkarr::{ - PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey, - RelaySettings, SignedPacket, + PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, RelaySettings, + SignedPacket, }; use tokio_util::task::AbortOnDropHandle; use url::Url; @@ -278,90 +280,6 @@ impl DhtDiscovery { tokio::time::sleep(this.0.republish_delay).await; } } - - async fn resolve_relay( - &self, - pkarr_public_key: PublicKey, - co: &Co>, - ) { - let Some(relay) = &self.0.pkarr_relay else { - return; - }; - tracing::info!( - "resolving {} from relay {:?}", - pkarr_public_key.to_z32(), - self.0.relay_url - ); - let response = relay.resolve(&pkarr_public_key).await; - match response { - Ok(Some(signed_packet)) => { - if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) { - let node_addr: NodeAddr = node_info.into(); - - tracing::info!("discovered node info from relay {:?}", node_addr); - co.yield_(Ok(DiscoveryItem { - node_addr, - provenance: "relay", - last_updated: None, - })) - .await; - } else { - tracing::debug!("failed to parse signed packet as node info"); - } - } - Ok(None) => { - tracing::debug!("no signed packet found in relay"); - } - Err(e) => { - tracing::debug!("failed to get signed packet from relay: {}", e); - co.yield_(Err(e.into())).await; - } - } - } - - /// Resolves a node id from the DHT. - async fn resolve_dht( - &self, - pkarr_public_key: PublicKey, - co: &Co>, - ) { - if !self.0.dht { - return; - }; - tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32()); - let response = match self.0.pkarr.resolve(&pkarr_public_key).await { - Ok(r) => r, - Err(e) => { - co.yield_(Err(e.into())).await; - return; - } - }; - let Some(signed_packet) = response else { - tracing::debug!("no signed packet found in DHT"); - return; - }; - if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) { - let node_addr: NodeAddr = node_info.into(); - tracing::info!("discovered node info from DHT {:?}", node_addr); - co.yield_(Ok(DiscoveryItem { - node_addr, - provenance: "mainline", - last_updated: None, - })) - .await; - } else { - tracing::debug!("failed to parse signed packet as node info"); - } - } - - async fn gen_resolve(self, node_id: NodeId, co: Co>) { - let pkarr_public_key = - pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key"); - tokio::join!( - self.resolve_dht(pkarr_public_key.clone(), &co), - self.resolve_relay(pkarr_public_key, &co) - ); - } } impl Discovery for DhtDiscovery { @@ -395,11 +313,106 @@ impl Discovery for DhtDiscovery { _endpoint: Endpoint, node_id: NodeId, ) -> Option>> { - let this = self.clone(); let pkarr_public_key = pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key"); tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32()); - Some(Gen::new(|co| async move { this.gen_resolve(node_id, co).await }).boxed()) + + let mut stream = futures_buffered::FuturesUnorderedBounded::new(2); + if self.0.pkarr_relay.is_some() { + tracing::info!( + "resolving {} from relay {:?}", + pkarr_public_key.to_z32(), + self.0.relay_url + ); + let key = pkarr_public_key.clone(); + let discovery = self.0.clone(); + stream.push( + async move { + let maybe_packet = discovery + .pkarr_relay + .as_ref() + .expect("checked") + .resolve(&key) + .await; + match maybe_packet { + Ok(Some(signed_packet)) => { + match NodeInfo::from_pkarr_signed_packet(&signed_packet) { + Ok(node_info) => { + let node_addr: NodeAddr = node_info.into(); + + tracing::info!( + "discovered node info from relay {:?}", + node_addr + ); + Some(anyhow::Ok(DiscoveryItem { + node_addr, + provenance: "relay", + last_updated: None, + })) + } + Err(_err) => { + tracing::debug!("failed to parse signed packet as node info"); + None + } + } + } + Ok(None) => { + tracing::debug!("no signed packet found in relay"); + None + } + Err(err) => { + tracing::debug!("failed to get signed packet from relay: {}", err); + Some(Err(err.into())) + } + } + } + .boxed(), + ); + } + + if self.0.dht { + tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32()); + + let key = pkarr_public_key.clone(); + let discovery = self.0.clone(); + stream.push( + async move { + let maybe_packet = discovery.pkarr.resolve(&key).await; + match maybe_packet { + Ok(Some(signed_packet)) => { + match NodeInfo::from_pkarr_signed_packet(&signed_packet) { + Ok(node_info) => { + let node_addr: NodeAddr = node_info.into(); + tracing::info!("discovered node info from DHT {:?}", node_addr); + Some(Ok(DiscoveryItem { + node_addr, + provenance: "mainline", + last_updated: None, + })) + } + Err(_err) => { + tracing::debug!("failed to parse signed packet as node info"); + None + } + } + } + Ok(None) => { + // nothing to do + tracing::debug!("no signed packet found in DHT"); + None + } + Err(err) => Some(Err(err.into())), + } + } + .boxed(), + ); + } + + Some(stream.filter_map(|t| t).boxed()) + // Some(Box::pin(ResolveStream { + // dht_resolve, + // relay_resolve, + // })) } }