Skip to content

Commit

Permalink
Rename Endpoint::local_endpoints to direct_addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
flub committed Jun 17, 2024
1 parent 53dfed1 commit 1d5450e
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 34 deletions.
4 changes: 2 additions & 2 deletions iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ async fn make_endpoint(
};
let endpoint = endpoint.bind(0).await?;

tokio::time::timeout(Duration::from_secs(10), endpoint.local_endpoints().next())
tokio::time::timeout(Duration::from_secs(10), endpoint.direct_addresses().next())
.await
.context("wait for relay connection")?
.context("no endpoints")?;
Expand Down Expand Up @@ -727,7 +727,7 @@ async fn accept(
) -> anyhow::Result<()> {
let endpoint = make_endpoint(secret_key.clone(), relay_map, discovery).await?;
let endpoints = endpoint
.local_endpoints()
.direct_addresses()
.next()
.await
.context("no endpoints")?;
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/examples/connect-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> {
println!("node id: {me}");
println!("node listening addresses:");
for local_endpoint in endpoint
.local_endpoints()
.direct_addresses()
.next()
.await
.context("no endpoints")?
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
println!("node id: {me}");
println!("node listening addresses:");
for local_endpoint in endpoint
.local_endpoints()
.direct_addresses()
.next()
.await
.context("no endpoints")?
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() -> anyhow::Result<()> {
println!("node listening addresses:");

let local_addrs = endpoint
.local_endpoints()
.direct_addresses()
.next()
.await
.context("no endpoints")?
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() -> anyhow::Result<()> {
println!("node listening addresses:");

let local_addrs = endpoint
.local_endpoints()
.direct_addresses()
.next()
.await
.context("no endpoints")?
Expand Down
10 changes: 5 additions & 5 deletions iroh-net/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub use quinn::{

pub use super::magicsock::{
ConnectionInfo, ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo,
LocalEndpointsStream,
DirectAdressesStream,
};

pub use iroh_base::node_addr::{AddrInfo, NodeAddr};
Expand Down Expand Up @@ -570,7 +570,7 @@ impl Endpoint {
/// [`Endpoint::local_endpoints`].
pub async fn node_addr(&self) -> Result<NodeAddr> {
let addrs = self
.local_endpoints()
.direct_addresses()
.next()
.await
.ok_or(anyhow!("No IP endpoints found"))?;
Expand Down Expand Up @@ -637,13 +637,13 @@ impl Endpoint {
/// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
/// # rt.block_on(async move {
/// let mep = Endpoint::builder().bind(0).await.unwrap();
/// let _endpoints = mep.local_endpoints().next().await;
/// let _addrs = mep.direct_addresses().next().await;
/// # });
/// ```
///
/// [STUN]: https://en.wikipedia.org/wiki/STUN
pub fn local_endpoints(&self) -> LocalEndpointsStream {
self.msock.local_endpoints()
pub fn direct_addresses(&self) -> DirectAdressesStream {
self.msock.direct_addresses()
}

/// Returns the local socket addresses on which the underlying sockets are bound.
Expand Down
33 changes: 17 additions & 16 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,21 +304,22 @@ impl MagicSock {
self.node_map.node_info(node_id)
}

/// Returns the local endpoints as a stream.
/// Returns the direct addresses as a stream.
///
/// The [`MagicSock`] continuously monitors the local endpoints, the network addresses
/// it can listen on, for changes. Whenever changes are detected this stream will yield
/// a new list of endpoints.
/// The [`MagicSock`] continuously monitors the direct addresses, the network addresses
/// it might be able to be contacted on, for changes. Whenever changes are detected
/// this stream will yield a new list of addresses.
///
/// Upon the first creation on the [`MagicSock`] it may not yet have completed a first
/// local endpoint discovery, in this case the first item of the stream will not be
/// immediately available. Once this first set of local endpoints are discovered the
/// stream will always return the first set of endpoints immediately, which are the most
/// recently discovered endpoints.
/// direct addresses discovery, in this case the first item of the stream will not be
/// immediately available. Once this first set of direct addresses are discovered the
/// stream will always return the first set of addresses immediately, which are the most
/// recently discovered addresses.
///
/// To get the current endpoints, drop the stream after the first item was received.
pub fn local_endpoints(&self) -> LocalEndpointsStream {
LocalEndpointsStream {
/// To get the current direct addresses, drop the stream after the first item was
/// received.
pub fn direct_addresses(&self) -> DirectAdressesStream {
DirectAdressesStream {
initial: Some(self.endpoints.get()),
inner: self.endpoints.watch().into_stream(),
}
Expand Down Expand Up @@ -1493,12 +1494,12 @@ impl Handle {

/// Stream returning local endpoints as they change.
#[derive(Debug)]
pub struct LocalEndpointsStream {
pub struct DirectAdressesStream {
initial: Option<DiscoveredEndpoints>,
inner: watchable::WatcherStream<DiscoveredEndpoints>,
}

impl Stream for LocalEndpointsStream {
impl Stream for DirectAdressesStream {
type Item = Vec<config::Endpoint>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -2684,7 +2685,7 @@ pub(crate) mod tests {
let stacks = stacks.clone();
tasks.spawn(async move {
let me = m.endpoint.node_id().fmt_short();
let mut stream = m.endpoint.local_endpoints();
let mut stream = m.endpoint.direct_addresses();
while let Some(new_eps) = stream.next().await {
info!(%me, "conn{} endpoints update: {:?}", my_idx + 1, new_eps);
update_eps(&stacks, my_idx, new_eps);
Expand Down Expand Up @@ -3353,13 +3354,13 @@ pub(crate) mod tests {
let ms = Handle::new(Default::default()).await.unwrap();

// See if we can get endpoints.
let mut eps0 = ms.local_endpoints().next().await.unwrap();
let mut eps0 = ms.direct_addresses().next().await.unwrap();
eps0.sort();
println!("{eps0:?}");
assert!(!eps0.is_empty());

// Getting the endpoints again immediately should give the same results.
let mut eps1 = ms.local_endpoints().next().await.unwrap();
let mut eps1 = ms.direct_addresses().next().await.unwrap();
eps1.sort();
println!("{eps1:?}");
assert_eq!(eps0, eps1);
Expand Down
8 changes: 4 additions & 4 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_docs::engine::Engine;
use iroh_net::util::AbortingJoinHandle;
use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
use iroh_net::{endpoint::DirectAdressesStream, key::SecretKey, Endpoint};
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::RpcClient;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -116,8 +116,8 @@ impl<D: BaoStore> Node<D> {
}

/// Lists the local endpoint of this node.
pub fn local_endpoints(&self) -> LocalEndpointsStream {
self.inner.endpoint.local_endpoints()
pub fn local_endpoints(&self) -> DirectAdressesStream {
self.inner.endpoint.direct_addresses()
}

/// Convenience method to get just the addr part of [`Node::local_endpoints`].
Expand Down Expand Up @@ -185,7 +185,7 @@ impl<D> NodeInner<D> {
async fn local_endpoint_addresses(&self) -> Result<Vec<SocketAddr>> {
let endpoints = self
.endpoint
.local_endpoints()
.direct_addresses()
.next()
.await
.ok_or(anyhow!("no endpoints found"))?;
Expand Down
6 changes: 3 additions & 3 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ where

// spawn a task that updates the gossip endpoints.
// TODO: track task
let mut stream = endpoint.local_endpoints();
let mut stream = endpoint.direct_addresses();
tokio::task::spawn(async move {
while let Some(eps) = stream.next().await {
if let Err(err) = gossip.update_endpoints(&eps) {
Expand All @@ -534,7 +534,7 @@ where

// Wait for a single endpoint update, to make sure
// we found some endpoints
tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next())
tokio::time::timeout(ENDPOINT_WAIT, endpoint.direct_addresses().next())
.await
.context("waiting for endpoint")?
.context("no endpoints")?;
Expand Down Expand Up @@ -564,7 +564,7 @@ where
// forward our initial endpoints to the gossip protocol
// it may happen the the first endpoint update callback is missed because the gossip cell
// is only initialized once the endpoint is fully bound
if let Some(local_endpoints) = server.local_endpoints().next().await {
if let Some(local_endpoints) = server.direct_addresses().next().await {
debug!(me = ?server.node_id(), "gossip initial update: {local_endpoints:?}");
gossip.update_endpoints(&local_endpoints).ok();
}
Expand Down

0 comments on commit 1d5450e

Please sign in to comment.