Skip to content

Commit

Permalink
feat(iroh-net)!: add subscribe method to Discovery trait (#2656)
Browse files Browse the repository at this point in the history
## Description

Adds a `subscribe` method to the `Discovery` trait that returns an
`Option<BoxStream>`. The `subscribe` method will send `DiscoveryItems`
each time the discovery service discovers a remote node.

The magicsock is now subscribed to the discovery service and updates the
internal address book each time it receives a `DiscoveryItem`. The
source is marked as `Source::Discovery{ service: String }` and the
`Instant` that the magicsock received the information.

Users can now filter their list of `RemoteInfo`s for sources.

## Breaking Changes

- struct `RemoteInfo` now has field `sources`, which is a `Vec` of
`(iroh::net::endpoint::Source, Duration)`. The `Source` is how we heard
about the remote, and the `Duration` is how long ago we heard about it.
The `sources` field is ordered from the first time we heard about the
remote node to the most recent time we learned about the remote node, in
this session.

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.

---------

Co-authored-by: Kasey Huizinga <[email protected]>
Co-authored-by: Divma <[email protected]>
Co-authored-by: Floris Bruynooghe <[email protected]>
  • Loading branch information
4 people authored Sep 27, 2024
1 parent 25c8305 commit 06c0844
Show file tree
Hide file tree
Showing 16 changed files with 553 additions and 106 deletions.
12 changes: 9 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion iroh-cli/src/commands/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ fn fmt_info(info: RemoteInfo) -> String {
let general_info = table.to_string();

let addrs_info = fmt_addrs(addrs);
format!("{general_info}\n\n{addrs_info}",)

format!("{general_info}\n\n{addrs_info}")
}

/// Formats the [`DirectAddrInfo`] into a [`Table`].
Expand All @@ -175,6 +176,7 @@ fn direct_addr_row(info: DirectAddrInfo) -> comfy_table::Row {
last_control,
last_payload,
last_alive,
..
} = info;

let last_control = match last_control {
Expand Down
10 changes: 7 additions & 3 deletions iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bytes = "1.7"
netdev = "0.30.0"
der = { version = "0.7", features = ["alloc", "derive"] }
derive_more = { version = "1.0.0", features = ["debug", "display", "from", "try_into", "deref"] }
futures-buffered = "0.2.4"
futures-buffered = "0.2.8"
futures-concurrency = "7.6.0"
futures-lite = "2.3"
futures-sink = "0.3.25"
Expand Down Expand Up @@ -66,6 +66,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin
tokio-tungstenite = "0.21"
tokio-tungstenite-wasm = "0.3"
tokio-util = { version = "0.7.12", features = ["io-util", "io", "codec", "rt"] }
tokio-stream = { version = "0.1.15" }
tracing = "0.1"
tungstenite = "0.21"
url = { version = "2.4", features = ["serde"] }
Expand All @@ -91,7 +92,6 @@ strum = { version = "0.26.2", features = ["derive"] }

# local-swarm-discovery
swarm-discovery = { version = "0.2.1", optional = true }
tokio-stream = { version = "0.1.15", optional = true }

# dht_discovery
genawaiter = { version = "0.99.1", features = ["futures03"], optional = true }
Expand Down Expand Up @@ -145,7 +145,7 @@ iroh-relay = [
]
metrics = ["iroh-metrics/metrics"]
test-utils = ["iroh-relay"]
discovery-local-network = ["dep:swarm-discovery", "dep:tokio-stream"]
discovery-local-network = ["dep:swarm-discovery"]
discovery-pkarr-dht = ["pkarr/dht", "dep:genawaiter"]

[[bin]]
Expand All @@ -170,3 +170,7 @@ name = "connect-unreliable"
[[example]]
name = "dht_discovery"
required-features = ["discovery-pkarr-dht"]

[[example]]
name = "locally-discovered-nodes"
required-features = ["discovery-local-network"]
69 changes: 69 additions & 0 deletions iroh-net/examples/locally-discovered-nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! A small example showing how to get a list of nodes that were discovered via [`iroh_net::discovery::LocalSwarmDiscovery`]. LocalSwarmDiscovery uses [`swarm-discovery`](https://crates.io/crates/swarm-discovery) to discover other nodes in the local network ala mDNS.
//!
//! This example creates an iroh endpoint, a few additional iroh endpoints to discover, waits a few seconds, and reports all of the iroh NodeIds (also called `[iroh_net::key::PublicKey]`s) it has discovered.
//!
//! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`LocalSwarmDiscovery`] enabled, it may discover those nodes as well.
use iroh_net::{
discovery::local_swarm_discovery::LocalSwarmDiscovery, endpoint::Source, key::SecretKey,
Endpoint,
};
use std::time::Duration;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
println!("locally discovered nodes example!\n");
let key = SecretKey::generate();
let id = key.public();
println!("creating endpoint {id:?}\n");
let ep = Endpoint::builder()
.secret_key(key)
.discovery(Box::new(LocalSwarmDiscovery::new(id)?))
.bind()
.await?;

let node_count = 5;
println!("creating {node_count} additional endpoints to discover locally:");
let mut discoverable_eps = Vec::with_capacity(node_count);
for _ in 0..node_count {
let key = SecretKey::generate();
let id = key.public();
println!("\t{id:?}");
let ep = Endpoint::builder()
.secret_key(key)
.discovery(Box::new(LocalSwarmDiscovery::new(id)?))
.bind()
.await?;
discoverable_eps.push(ep);
}

let duration = Duration::from_secs(3);
println!("\nwaiting {duration:?} to allow discovery to occur...\n");
tokio::time::sleep(duration).await;

// get an iterator of all the remote nodes this endpoint knows about
let remotes = ep.remote_info_iter();
// filter that list down to the nodes that have a `Source::Discovery` with
// the `service` name [`iroh_net::discovery::local_swarm_discovery::NAME`]
// If you have a long running node and want to only get the nodes that were
// discovered recently, you can also filter on the `Duration` of the source,
// which indicates how long ago we got information from that source.
let locally_discovered: Vec<_> = remotes
.filter(|remote| {
remote.sources().iter().any(|(source, _duration)| {
if let Source::Discovery { name } = source {
name == iroh_net::discovery::local_swarm_discovery::NAME
} else {
false
}
})
})
.map(|remote| remote.node_id)
.collect();

println!("found:");
for id in locally_discovered {
println!("\t{id:?}");
}
Ok(())
}
49 changes: 42 additions & 7 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,19 @@
use std::time::Duration;

use crate::{AddrInfo, Endpoint, NodeId};
use anyhow::{anyhow, ensure, Result};
use futures_lite::stream::{Boxed as BoxStream, StreamExt};
use iroh_base::node_addr::NodeAddr;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::{debug, error_span, warn, Instrument};

use crate::{AddrInfo, Endpoint, NodeId};

pub mod dns;

#[cfg(feature = "discovery-local-network")]
pub mod local_swarm_discovery;
pub mod pkarr;

/// Name used for logging when new node addresses are added from discovery.
const SOURCE_NAME: &str = "discovery";

/// Node discovery for [`super::Endpoint`].
///
/// This trait defines publishing and resolving addressing information for a [`NodeId`].
Expand Down Expand Up @@ -95,11 +91,37 @@ pub trait Discovery: std::fmt::Debug + Send + Sync {
) -> Option<BoxStream<Result<DiscoveryItem>>> {
None
}

/// Subscribe to all addresses that get *passively* discovered.
///
/// An implementation may choose to defer emitting passively discovered nodes
/// until the stream is actually polled. To avoid missing discovered nodes,
/// poll the stream as soon as possible.
///
/// Any discovery systems that only discover when explicitly resolving a
/// specific [`NodeId`] do not need to implement this method. Any nodes or
/// addresses that are discovered by calling `resolve` should NOT be added
/// to the `subscribe` stream.
///
/// Discovery systems that are capable of receiving information about [`NodeId`]s
/// and their [`AddrInfo`]s without explicitly calling `resolve`, i.e.,
/// systems that do "passive" discovery, should implement this method. If
/// `subscribe` is called multiple times, the passively discovered addresses
/// should be sent on all streams.
///
/// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system
/// and add the discovered addresses to the internal address book as they arrive
/// on this stream.
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
None
}
}

/// The results returned from [`Discovery::resolve`].
#[derive(Debug, Clone)]
pub struct DiscoveryItem {
/// The [`NodeId`] whose address we have discovered
pub node_id: NodeId,
/// A static string to identify the discovery source.
///
/// Should be uniform per discovery service.
Expand Down Expand Up @@ -165,7 +187,19 @@ impl Discovery for ConcurrentDiscovery {
.iter()
.filter_map(|service| service.resolve(endpoint.clone(), node_id));

let streams = futures_buffered::Merge::from_iter(streams);
let streams = futures_buffered::MergeBounded::from_iter(streams);
Some(Box::pin(streams))
}

fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
let mut streams = vec![];
for service in self.services.iter() {
if let Some(stream) = service.subscribe() {
streams.push(stream)
}
}

let streams = futures_buffered::MergeBounded::from_iter(streams);
Some(Box::pin(streams))
}
}
Expand Down Expand Up @@ -307,7 +341,7 @@ impl DiscoveryTask {
info: r.addr_info,
node_id,
};
ep.add_node_addr_with_source(addr, SOURCE_NAME).ok();
ep.add_node_addr_with_source(addr, r.provenance).ok();
if let Some(tx) = on_first_tx.take() {
tx.send(Ok(())).ok();
}
Expand Down Expand Up @@ -417,6 +451,7 @@ mod tests {
let stream = match addr_info {
Some((addr_info, ts)) => {
let item = DiscoveryItem {
node_id,
provenance: "test-disco",
last_updated: Some(ts),
addr_info,
Expand Down
1 change: 1 addition & 0 deletions iroh-net/src/discovery/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl Discovery for DnsDiscovery {
.lookup_by_id_staggered(&node_id, &origin_domain, DNS_STAGGERING_MS)
.await?;
Ok(DiscoveryItem {
node_id,
provenance: "dns",
last_updated: None,
addr_info: node_addr.info,
Expand Down
Loading

0 comments on commit 06c0844

Please sign in to comment.