diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index 117ab689b3..8e5653f479 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -237,6 +237,10 @@ impl DiscoveryTask { }; match next { Some(Ok(r)) => { + if r.addr_info.is_empty() { + debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: empty address found"); + continue; + } debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: new address found"); let addr = NodeAddr { info: r.addr_info, @@ -551,8 +555,6 @@ mod test_dns_pkarr { use anyhow::Result; use iroh_base::key::SecretKey; - use tokio::task::JoinHandle; - use tokio_util::sync::CancellationToken; use url::Url; use crate::{ @@ -560,22 +562,21 @@ mod test_dns_pkarr { dns::node_info::{lookup_by_id, NodeInfo}, relay::{RelayMap, RelayMode}, test_utils::{ + dns_and_pkarr_servers::run_dns_and_pkarr_servers, dns_server::{create_dns_resolver, run_dns_server}, + pkarr_dns_state::State, run_relay_server, }, AddrInfo, MagicEndpoint, NodeAddr, }; - use self::{pkarr_relay::run_pkarr_relay, state::State}; - #[tokio::test] async fn dns_resolve() -> Result<()> { let _logging_guard = iroh_test::logging::setup(); - let cancel = CancellationToken::new(); let origin = "testdns.example".to_string(); let state = State::new(origin.clone()); - let (nameserver, dns_task) = run_dns_server(state.clone(), cancel.clone()).await?; + let (nameserver, _dns_drop_guard) = run_dns_server(state.clone()).await?; let secret_key = SecretKey::generate(); let node_info = NodeInfo::new( @@ -590,8 +591,6 @@ mod test_dns_pkarr { assert_eq!(resolved, node_info.into()); - cancel.cancel(); - dns_task.await??; Ok(()) } @@ -600,11 +599,10 @@ mod test_dns_pkarr { let _logging_guard = iroh_test::logging::setup(); let origin = "testdns.example".to_string(); - let cancel = CancellationToken::new(); let timeout = Duration::from_secs(2); - let (nameserver, pkarr_url, state, task) = - run_dns_and_pkarr_servers(origin.clone(), cancel.clone()).await?; + let (nameserver, pkarr_url, state, _dns_drop_guard, _pkarr_drop_guard) = + run_dns_and_pkarr_servers(origin.clone()).await?; let secret_key = SecretKey::generate(); let node_id = secret_key.public(); @@ -628,9 +626,6 @@ mod test_dns_pkarr { }; assert_eq!(resolved, expected); - - cancel.cancel(); - task.await??; Ok(()) } @@ -641,11 +636,10 @@ mod test_dns_pkarr { let _logging_guard = iroh_test::logging::setup(); let origin = "testdns.example".to_string(); - let cancel = CancellationToken::new(); let timeout = Duration::from_secs(2); - let (nameserver, pkarr_url, state, task) = - run_dns_and_pkarr_servers(&origin, cancel.clone()).await?; + let (nameserver, pkarr_url, state, _dns_drop_guard, _pkarr_drop_guard) = + run_dns_and_pkarr_servers(&origin).await?; let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?; let ep1 = ep_with_discovery(relay_map.clone(), nameserver, &origin, &pkarr_url).await?; @@ -657,8 +651,6 @@ mod test_dns_pkarr { // we connect only by node id! let res = ep2.connect(ep1.node_id().into(), TEST_ALPN).await; assert!(res.is_ok(), "connection established"); - cancel.cancel(); - task.await??; Ok(()) } @@ -685,203 +677,4 @@ mod test_dns_pkarr { .await?; Ok(ep) } - - async fn run_dns_and_pkarr_servers( - origin: impl ToString, - cancel: CancellationToken, - ) -> Result<(SocketAddr, Url, State, JoinHandle>)> { - let state = State::new(origin.to_string()); - let (nameserver, dns_task) = run_dns_server(state.clone(), cancel.clone()).await?; - let (pkarr_url, pkarr_task) = run_pkarr_relay(state.clone(), cancel.clone()).await?; - let join_handle = tokio::task::spawn(async move { - dns_task.await??; - pkarr_task.await??; - Ok(()) - }); - Ok((nameserver, pkarr_url, state, join_handle)) - } - - mod state { - use anyhow::{bail, Result}; - use parking_lot::{Mutex, MutexGuard}; - use pkarr::SignedPacket; - use std::{ - collections::{hash_map, HashMap}, - future::Future, - ops::Deref, - sync::Arc, - time::Duration, - }; - - use crate::dns::node_info::{node_id_from_hickory_name, NodeInfo}; - use crate::test_utils::dns_server::QueryHandler; - use crate::NodeId; - - #[derive(Debug, Clone)] - pub struct State { - packets: Arc>>, - origin: String, - notify: Arc, - } - - impl State { - pub fn new(origin: String) -> Self { - Self { - packets: Default::default(), - origin, - notify: Arc::new(tokio::sync::Notify::new()), - } - } - - pub fn on_update(&self) -> tokio::sync::futures::Notified<'_> { - self.notify.notified() - } - - pub async fn on_node(&self, node: &NodeId, timeout: Duration) -> Result<()> { - let timeout = tokio::time::sleep(timeout); - tokio::pin!(timeout); - while self.get(node).is_none() { - tokio::select! { - _ = &mut timeout => bail!("timeout"), - _ = self.on_update() => {} - } - } - Ok(()) - } - - pub fn upsert(&self, signed_packet: SignedPacket) -> anyhow::Result { - let node_id = NodeId::from_bytes(&signed_packet.public_key().to_bytes())?; - let mut map = self.packets.lock(); - let updated = match map.entry(node_id) { - hash_map::Entry::Vacant(e) => { - e.insert(signed_packet); - true - } - hash_map::Entry::Occupied(mut e) => { - if signed_packet.more_recent_than(e.get()) { - e.insert(signed_packet); - true - } else { - false - } - } - }; - if updated { - self.notify.notify_waiters(); - } - Ok(updated) - } - - /// Returns a mutex guard, do not hold over await points - pub fn get(&self, node_id: &NodeId) -> Option + '_> { - let map = self.packets.lock(); - if map.contains_key(node_id) { - let guard = MutexGuard::map(map, |state| state.get_mut(node_id).unwrap()); - Some(guard) - } else { - None - } - } - - pub fn resolve_dns( - &self, - query: &hickory_proto::op::Message, - reply: &mut hickory_proto::op::Message, - ttl: u32, - ) -> Result<()> { - for query in query.queries() { - let Some(node_id) = node_id_from_hickory_name(query.name()) else { - continue; - }; - let packet = self.get(&node_id); - let Some(packet) = packet.as_ref() else { - continue; - }; - let node_info = NodeInfo::from_pkarr_signed_packet(packet)?; - for record in node_info.to_hickory_records(&self.origin, ttl)? { - reply.add_answer(record); - } - } - Ok(()) - } - } - - impl QueryHandler for State { - fn resolve( - &self, - query: &hickory_proto::op::Message, - reply: &mut hickory_proto::op::Message, - ) -> impl Future> + Send { - const TTL: u32 = 30; - let res = self.resolve_dns(query, reply, TTL); - futures::future::ready(res) - } - } - } - - mod pkarr_relay { - use std::net::{Ipv4Addr, SocketAddr}; - - use anyhow::Result; - use axum::{ - extract::{Path, State}, - response::IntoResponse, - routing::put, - Router, - }; - use bytes::Bytes; - use tokio::task::JoinHandle; - use tokio_util::sync::CancellationToken; - use tracing::warn; - use url::Url; - - use super::State as AppState; - - pub async fn run_pkarr_relay( - state: AppState, - cancel: CancellationToken, - ) -> Result<(Url, JoinHandle>)> { - let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); - let app = Router::new() - .route("/pkarr/:key", put(pkarr_put)) - .with_state(state); - let listener = tokio::net::TcpListener::bind(bind_addr).await?; - let bound_addr = listener.local_addr()?; - let url: Url = format!("http://{bound_addr}/pkarr") - .parse() - .expect("valid url"); - let join_handle = tokio::task::spawn(async move { - let serve = axum::serve(listener, app); - let serve = serve.with_graceful_shutdown(cancel.cancelled_owned()); - serve.await?; - Ok(()) - }); - Ok((url, join_handle)) - } - - async fn pkarr_put( - State(state): State, - Path(key): Path, - body: Bytes, - ) -> Result { - let key = pkarr::PublicKey::try_from(key.as_str())?; - let signed_packet = pkarr::SignedPacket::from_relay_response(key, body)?; - let _updated = state.upsert(signed_packet)?; - Ok(http::StatusCode::NO_CONTENT) - } - - #[derive(Debug)] - struct AppError(anyhow::Error); - impl> From for AppError { - fn from(value: T) -> Self { - Self(value.into()) - } - } - impl IntoResponse for AppError { - fn into_response(self) -> axum::response::Response { - warn!(err = ?self, "request failed"); - (http::StatusCode::INTERNAL_SERVER_ERROR, self.0.to_string()).into_response() - } - } - } } diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index f6a88a9343..f16f49acad 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -530,9 +530,8 @@ impl MagicEndpoint { /// Note: updating the magic socket's *netmap* will also prune any connections that are *not* /// present in the netmap. /// - /// If no UDP addresses are added, and `relay_url` is `None`, it will error. - /// If no UDP addresses are added, and the given `relay_url` cannot be dialed, it will error. - // TODO: This is infallible, stop returning a result. + /// # Errors + /// Will return an error if we attempt to add our own [`PublicKey`] to the node map. pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> { // Connecting to ourselves is not supported. if node_addr.node_id == self.node_id() { @@ -1095,4 +1094,38 @@ mod tests { res_ep1.await.unwrap().unwrap(); res_ep2.await.unwrap().unwrap(); } + + #[tokio::test] + async fn test_dial_via_node_id_only() -> Result<()> { + // set up pkarr dns server + // start two magic endpoints w/ discovery + // dial one + + // let _logging_guard = iroh_test::logging::setup(); + // let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap(); + + // pkarr dns server + // let (nameserver, pkarr_url, state, pkarr_task) = iroh::net::discovery::test::run_dns_and_pkarr_servers(); + + // let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42); + // let ep1_secret_key = SecretKey::generate_with_rng(&mut rng); + // let ep2_secret_key = SecretKey::generate_with_rng(&mut rng); + // let ep1 = MagicEndpoint::builder() + // .secret_key(ep1_secret_key) + // .insecure_skip_relay_cert_verify(true) + // .alpns(vec![TEST_ALPN.to_vec()]) + // .relay_mode(RelayMode::Custom(relay_map.clone())) + // .bind(0) + // .await + // .unwrap(); + // let ep2 = MagicEndpoint::builder() + // .secret_key(ep2_secret_key) + // .insecure_skip_relay_cert_verify(true) + // .alpns(vec![TEST_ALPN.to_vec()]) + // .relay_mode(RelayMode::Custom(relay_map)) + // .bind(0) + // .await + // .unwrap(); + todo!(); + } } diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index 1dd5f33784..b3aa0a7b0f 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -1392,7 +1392,7 @@ impl NodeInfo { .min() } - /// Returns `true` if we this info contains either a relay URL or at least one direct address. + /// Returns `true` if this info contains either a relay URL or at least one direct address. pub fn has_send_address(&self) -> bool { self.relay_url.is_some() || !self.addrs.is_empty() } diff --git a/iroh-net/src/test_utils.rs b/iroh-net/src/test_utils.rs index 652d81a09d..88ac5b11f9 100644 --- a/iroh-net/src/test_utils.rs +++ b/iroh-net/src/test_utils.rs @@ -4,8 +4,10 @@ use anyhow::Result; use tokio::sync::oneshot; use tracing::{error_span, info_span, Instrument}; -use crate::key::SecretKey; -use crate::relay::{RelayMap, RelayNode, RelayUrl}; +use crate::{ + key::SecretKey, + relay::{RelayMap, RelayNode, RelayUrl}, +}; /// A drop guard to clean up test infrastructure. /// @@ -63,6 +65,34 @@ pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, CleanupDropGuard) Ok((m, url, CleanupDropGuard(tx))) } +#[cfg(test)] +pub(crate) mod dns_and_pkarr_servers { + use anyhow::Result; + use std::net::SocketAddr; + use url::Url; + + use super::CleanupDropGuard; + + use crate::test_utils::{ + dns_server::run_dns_server, pkarr_dns_state::State, pkarr_relay::run_pkarr_relay, + }; + + pub async fn run_dns_and_pkarr_servers( + origin: impl ToString, + ) -> Result<(SocketAddr, Url, State, CleanupDropGuard, CleanupDropGuard)> { + let state = State::new(origin.to_string()); + let (nameserver, dns_drop_guard) = run_dns_server(state.clone()).await?; + let (pkarr_url, pkarr_drop_guard) = run_pkarr_relay(state.clone()).await?; + Ok(( + nameserver, + pkarr_url, + state, + dns_drop_guard, + pkarr_drop_guard, + )) + } +} + #[cfg(test)] pub(crate) mod dns_server { use std::net::{Ipv4Addr, SocketAddr}; @@ -74,9 +104,10 @@ pub(crate) mod dns_server { serialize::binary::BinDecodable, }; use hickory_resolver::{config::NameServerConfig, TokioAsyncResolver}; - use tokio::{net::UdpSocket, task::JoinHandle}; - use tokio_util::sync::CancellationToken; - use tracing::{debug, warn}; + use tokio::{net::UdpSocket, sync::oneshot}; + use tracing::{debug, error, warn}; + + use super::CleanupDropGuard; /// Trait used by [`run_dns_server`] for answering DNS queries. pub trait QueryHandler: Send + Sync + 'static { @@ -105,18 +136,25 @@ pub(crate) mod dns_server { /// Must pass a [`QueryHandler`] that answers queries. Can be a [`ResolveCallback`] or a struct. pub async fn run_dns_server( resolver: impl QueryHandler, - cancel: CancellationToken, - ) -> Result<(SocketAddr, JoinHandle>)> { + ) -> Result<(SocketAddr, CleanupDropGuard)> { let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); let socket = UdpSocket::bind(bind_addr).await?; let bound_addr = socket.local_addr()?; - let s = TestDnsServer { - socket, - cancel, - resolver, - }; - let join_handle = tokio::task::spawn(async move { s.run().await }); - Ok((bound_addr, join_handle)) + let s = TestDnsServer { socket, resolver }; + let (tx, mut rx) = oneshot::channel(); + tokio::task::spawn(async move { + tokio::select! { + _ = &mut rx => { + debug!("shutting down dns server"); + } + res = s.run() => { + if let Err(e) = res { + error!("error running dns server {e:?}"); + } + } + } + }); + Ok((bound_addr, CleanupDropGuard(tx))) } /// Create a DNS resolver with a single nameserver. @@ -132,24 +170,18 @@ pub(crate) mod dns_server { struct TestDnsServer { resolver: R, socket: UdpSocket, - cancel: CancellationToken, } impl TestDnsServer { async fn run(self) -> Result<()> { let mut buf = [0; 1450]; loop { - tokio::select! { - _ = self.cancel.cancelled() => break, - res = self.socket.recv_from(&mut buf) => { - let (len, from) = res?; - if let Err(err) = self.handle_datagram(from, &buf[..len]).await { - warn!(?err, %from, "failed to handle incoming datagram"); - } - } - }; + let res = self.socket.recv_from(&mut buf).await; + let (len, from) = res?; + if let Err(err) = self.handle_datagram(from, &buf[..len]).await { + warn!(?err, %from, "failed to handle incoming datagram"); + } } - Ok(()) } async fn handle_datagram(&self, from: SocketAddr, buf: &[u8]) -> Result<()> { @@ -166,3 +198,197 @@ pub(crate) mod dns_server { } } } + +#[cfg(test)] +pub(crate) mod pkarr_relay { + use std::future::IntoFuture; + use std::net::{Ipv4Addr, SocketAddr}; + + use anyhow::Result; + use axum::{ + extract::{Path, State}, + response::IntoResponse, + routing::put, + Router, + }; + use bytes::Bytes; + use tokio::sync::oneshot; + use tracing::{debug, error, warn}; + use url::Url; + + use crate::test_utils::pkarr_dns_state::State as AppState; + + use super::CleanupDropGuard; + + pub async fn run_pkarr_relay(state: AppState) -> Result<(Url, CleanupDropGuard)> { + let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + let app = Router::new() + .route("/pkarr/:key", put(pkarr_put)) + .with_state(state); + let listener = tokio::net::TcpListener::bind(bind_addr).await?; + let bound_addr = listener.local_addr()?; + let url: Url = format!("http://{bound_addr}/pkarr") + .parse() + .expect("valid url"); + + let (tx, mut rx) = oneshot::channel(); + tokio::spawn(async move { + let serve = axum::serve(listener, app); + tokio::select! { + _ = &mut rx => { + debug!("shutting down pkarr server"); + } + res = serve.into_future() => { + if let Err(e) = res { + error!("pkarr server error: {e:?}"); + } + } + } + }); + Ok((url, CleanupDropGuard(tx))) + } + + async fn pkarr_put( + State(state): State, + Path(key): Path, + body: Bytes, + ) -> Result { + let key = pkarr::PublicKey::try_from(key.as_str())?; + let signed_packet = pkarr::SignedPacket::from_relay_response(key, body)?; + let _updated = state.upsert(signed_packet)?; + Ok(http::StatusCode::NO_CONTENT) + } + + #[derive(Debug)] + struct AppError(anyhow::Error); + impl> From for AppError { + fn from(value: T) -> Self { + Self(value.into()) + } + } + impl IntoResponse for AppError { + fn into_response(self) -> axum::response::Response { + warn!(err = ?self, "request failed"); + (http::StatusCode::INTERNAL_SERVER_ERROR, self.0.to_string()).into_response() + } + } +} + +#[cfg(test)] +pub(crate) mod pkarr_dns_state { + use anyhow::{bail, Result}; + use parking_lot::{Mutex, MutexGuard}; + use pkarr::SignedPacket; + use std::{ + collections::{hash_map, HashMap}, + future::Future, + ops::Deref, + sync::Arc, + time::Duration, + }; + + use crate::dns::node_info::{node_id_from_hickory_name, NodeInfo}; + use crate::test_utils::dns_server::QueryHandler; + use crate::NodeId; + + #[derive(Debug, Clone)] + pub struct State { + packets: Arc>>, + origin: String, + notify: Arc, + } + + impl State { + pub fn new(origin: String) -> Self { + Self { + packets: Default::default(), + origin, + notify: Arc::new(tokio::sync::Notify::new()), + } + } + + pub fn on_update(&self) -> tokio::sync::futures::Notified<'_> { + self.notify.notified() + } + + pub async fn on_node(&self, node: &NodeId, timeout: Duration) -> Result<()> { + let timeout = tokio::time::sleep(timeout); + tokio::pin!(timeout); + while self.get(node).is_none() { + tokio::select! { + _ = &mut timeout => bail!("timeout"), + _ = self.on_update() => {} + } + } + Ok(()) + } + + pub fn upsert(&self, signed_packet: SignedPacket) -> anyhow::Result { + let node_id = NodeId::from_bytes(&signed_packet.public_key().to_bytes())?; + let mut map = self.packets.lock(); + let updated = match map.entry(node_id) { + hash_map::Entry::Vacant(e) => { + e.insert(signed_packet); + true + } + hash_map::Entry::Occupied(mut e) => { + if signed_packet.more_recent_than(e.get()) { + e.insert(signed_packet); + true + } else { + false + } + } + }; + if updated { + self.notify.notify_waiters(); + } + Ok(updated) + } + + /// Returns a mutex guard, do not hold over await points + pub fn get(&self, node_id: &NodeId) -> Option + '_> { + let map = self.packets.lock(); + if map.contains_key(node_id) { + let guard = MutexGuard::map(map, |state| state.get_mut(node_id).unwrap()); + Some(guard) + } else { + None + } + } + + pub fn resolve_dns( + &self, + query: &hickory_proto::op::Message, + reply: &mut hickory_proto::op::Message, + ttl: u32, + ) -> Result<()> { + for query in query.queries() { + let Some(node_id) = node_id_from_hickory_name(query.name()) else { + continue; + }; + let packet = self.get(&node_id); + let Some(packet) = packet.as_ref() else { + continue; + }; + let node_info = NodeInfo::from_pkarr_signed_packet(packet)?; + for record in node_info.to_hickory_records(&self.origin, ttl)? { + reply.add_answer(record); + } + } + Ok(()) + } + } + + impl QueryHandler for State { + fn resolve( + &self, + query: &hickory_proto::op::Message, + reply: &mut hickory_proto::op::Message, + ) -> impl Future> + Send { + const TTL: u32 = 30; + let res = self.resolve_dns(query, reply, TTL); + futures::future::ready(res) + } + } +}