Skip to content

Commit

Permalink
feat(iroh-net): watch relay changes
Browse files Browse the repository at this point in the history
  • Loading branch information
divagant-martian committed May 14, 2024
1 parent acd859b commit c1fd2e6
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
10 changes: 9 additions & 1 deletion iroh-net/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Duration;

use anyhow::{anyhow, bail, ensure, Context, Result};
use derive_more::Debug;
use futures_lite::StreamExt;
use futures_lite::{Stream, StreamExt};
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
use tracing::{debug, info_span, trace, warn};

Expand Down Expand Up @@ -401,6 +401,14 @@ impl Endpoint {
Ok(NodeAddr::from_parts(self.node_id(), relay, addrs))
}

/// Watch for changes to the home relay.
///
/// Note that this can be used to wait for the initial home relay to be known. If the home
/// relay is known at this point, it will be the first item in the stream.
pub async fn watch_home_relay(&self) -> impl Stream<Item = RelayUrl> {
self.msock.watch_home_relay()
}

/// Get information on all the nodes we have connection information about.
///
/// Includes the node's [`PublicKey`], potential relay Url, its addresses with any known
Expand Down
55 changes: 47 additions & 8 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{

use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_lite::{FutureExt, Stream};
use futures_lite::{FutureExt, Stream, StreamExt};
use iroh_metrics::{inc, inc_by};
use quinn::AsyncUdpSocket;
use rand::{seq::SliceRandom, Rng, SeedableRng};
Expand Down Expand Up @@ -198,7 +198,7 @@ pub(super) struct MagicSock {
/// None (or zero nodes) means relay is disabled.
relay_map: RelayMap,
/// Nearest relay node ID; 0 means none/unknown.
my_relay: std::sync::RwLock<Option<RelayUrl>>,
my_relay: Watchable<Option<RelayUrl>>,
/// Tracks the networkmap node entity for each node discovery key.
node_map: NodeMap,
/// UDP IPv4 socket
Expand Down Expand Up @@ -246,17 +246,14 @@ impl MagicSock {
///
/// If `None`, then we are not connected to any relay nodes.
pub fn my_relay(&self) -> Option<RelayUrl> {
self.my_relay.read().expect("not poisoned").clone()
self.my_relay.get()
}

/// Sets the relay node with the best latency.
///
/// If we are not connected to any relay nodes, set this to `None`.
fn set_my_relay(&self, my_relay: Option<RelayUrl>) -> Option<RelayUrl> {
let mut lock = self.my_relay.write().expect("not poisoned");
let old = lock.take();
*lock = my_relay;
old
self.my_relay.replace(my_relay)
}

fn is_closing(&self) -> bool {
Expand Down Expand Up @@ -313,6 +310,20 @@ impl MagicSock {
}
}

/// Watch for changes to the home relay.
///
/// Note that this can be used to wait for the initial home relay to be known. If the home
/// relay is known at this point, it will be the first item in the stream.
pub fn watch_home_relay(&self) -> impl Stream<Item = RelayUrl> {
let current = futures_lite::stream::iter(self.my_relay());
let changes = self
.my_relay
.watch()
.into_stream()
.filter_map(|maybe_relay| maybe_relay);
current.chain(changes)
}

/// Returns a stream that reports the [`ConnectionType`] we have to the
/// given `node_id`.
///
Expand Down Expand Up @@ -2555,7 +2566,7 @@ pub(crate) mod tests {
use iroh_test::CallOnDrop;
use rand::RngCore;

use crate::{relay::RelayMode, tls, Endpoint};
use crate::{defaults::EU_RELAY_HOSTNAME, relay::RelayMode, tls, Endpoint};

use super::*;

Expand Down Expand Up @@ -3320,4 +3331,32 @@ pub(crate) mod tests {
println!("{eps1:?}");
assert_eq!(eps0, eps1);
}

#[tokio::test]
async fn test_watch_home_relay() {
let mut ops = Options::default();
// use an empty relay map to get full control of the changes during the test
ops.relay_map = RelayMap::empty();
let msock = MagicSock::spawn(ops).await.unwrap();
let mut relay_stream = msock.watch_home_relay();

// no relay, nothing to report
assert_eq!(
futures_lite::future::poll_once(relay_stream.next()).await,
None
);

let url: RelayUrl = format!("https://{}", EU_RELAY_HOSTNAME).parse().unwrap();
msock.set_my_relay(Some(url.clone()));

assert_eq!(relay_stream.next().await, Some(url.clone()));

// drop the stream and query it again, the result should be immediately available

let mut relay_stream = msock.watch_home_relay();
assert_eq!(
futures_lite::future::poll_once(relay_stream.next()).await,
Some(Some(url))
);
}
}

0 comments on commit c1fd2e6

Please sign in to comment.