Skip to content

Commit

Permalink
cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Dec 16, 2024
1 parent 5dbaa49 commit 4956e00
Showing 1 changed file with 71 additions and 81 deletions.
152 changes: 71 additions & 81 deletions iroh/src/discovery/pkarr/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
time::Duration,
};

use anyhow::Result;
use futures_lite::{
stream::{Boxed, StreamExt},
FutureExt,
Expand Down Expand Up @@ -91,6 +92,73 @@ struct Inner {
republish_delay: Duration,
}

impl Inner {
async fn resolve_relay(&self, key: pkarr::PublicKey) -> Option<Result<DiscoveryItem>> {
tracing::info!("resolving {} from relay {:?}", key.to_z32(), self.relay_url);

let maybe_packet = self
.pkarr_relay
.as_ref()
.expect("checked")
.resolve(&key)
.await;
match maybe_packet {
Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();

tracing::info!("discovered node info from relay {:?}", node_addr);
Some(Ok(DiscoveryItem {
node_addr,
provenance: "relay",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
},
Ok(None) => {
tracing::debug!("no signed packet found in relay");
None
}
Err(err) => {
tracing::debug!("failed to get signed packet from relay: {}", err);
Some(Err(err.into()))
}
}
}
async fn resolve_dht(&self, key: pkarr::PublicKey) -> Option<Result<DiscoveryItem>> {
tracing::info!("resolving {} from DHT", key.to_z32());

let maybe_packet = self.pkarr.resolve(&key).await;
match maybe_packet {
Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();
tracing::info!("discovered node info from DHT {:?}", node_addr);
Some(Ok(DiscoveryItem {
node_addr,
provenance: "mainline",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
},
Ok(None) => {
// nothing to do
tracing::debug!("no signed packet found in DHT");
None
}
Err(err) => Some(Err(err.into())),
}
}
}

/// Builder for [`DhtDiscovery`].
///
/// By default, publishing to the DHT is enabled, and relay publishing is disabled.
Expand Down Expand Up @@ -181,7 +249,7 @@ impl Builder {
}

/// Builds the discovery mechanism.
pub fn build(self) -> anyhow::Result<DhtDiscovery> {
pub fn build(self) -> Result<DhtDiscovery> {
let pkarr = match self.client {
Some(client) => client,
None => PkarrClient::new(Default::default())?,
Expand Down Expand Up @@ -319,93 +387,15 @@ impl Discovery for DhtDiscovery {

let mut stream = futures_buffered::FuturesUnorderedBounded::new(2);
if self.0.pkarr_relay.is_some() {
tracing::info!(
"resolving {} from relay {:?}",
pkarr_public_key.to_z32(),
self.0.relay_url
);
let key = pkarr_public_key.clone();
let discovery = self.0.clone();
stream.push(
async move {
let maybe_packet = discovery
.pkarr_relay
.as_ref()
.expect("checked")
.resolve(&key)
.await;
match maybe_packet {
Ok(Some(signed_packet)) => {
match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();

tracing::info!(
"discovered node info from relay {:?}",
node_addr
);
Some(Ok(DiscoveryItem {
node_addr,
provenance: "relay",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
}
}
Ok(None) => {
tracing::debug!("no signed packet found in relay");
None
}
Err(err) => {
tracing::debug!("failed to get signed packet from relay: {}", err);
Some(Err(err.into()))
}
}
}
.boxed(),
);
stream.push(async move { discovery.resolve_relay(key).await }.boxed());
}

if self.0.dht {
tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32());

let key = pkarr_public_key.clone();
let discovery = self.0.clone();
stream.push(
async move {
let maybe_packet = discovery.pkarr.resolve(&key).await;
match maybe_packet {
Ok(Some(signed_packet)) => {
match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();
tracing::info!("discovered node info from DHT {:?}", node_addr);
Some(Ok(DiscoveryItem {
node_addr,
provenance: "mainline",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
}
}
Ok(None) => {
// nothing to do
tracing::debug!("no signed packet found in DHT");
None
}
Err(err) => Some(Err(err.into())),
}
}
.boxed(),
);
stream.push(async move { discovery.resolve_dht(key).await }.boxed());
}

Some(stream.filter_map(|t| t).boxed())
Expand Down

0 comments on commit 4956e00

Please sign in to comment.