Skip to content

Commit

Permalink
move run_dns_and_pkarr_servers to iroh::net::test_utils
Browse files Browse the repository at this point in the history
to properly test that we have fixed the discovery bug, we need to run the test dns and pkarr relay servers, so they should be moved to `test_utils` so the code can be shared
  • Loading branch information
ramfox committed Apr 25, 2024
1 parent 8a35dd4 commit 8b483b8
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 247 deletions.
229 changes: 11 additions & 218 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl DiscoveryTask {
};
match next {
Some(Ok(r)) => {
if r.addr_info.is_empty() {
debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: empty address found");
continue;
}
debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: new address found");
let addr = NodeAddr {
info: r.addr_info,
Expand Down Expand Up @@ -551,31 +555,28 @@ mod test_dns_pkarr {

use anyhow::Result;
use iroh_base::key::SecretKey;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use url::Url;

use crate::{
discovery::{dns::DnsDiscovery, pkarr_publish::PkarrPublisher, ConcurrentDiscovery},
dns::node_info::{lookup_by_id, NodeInfo},
relay::{RelayMap, RelayMode},
test_utils::{
dns_and_pkarr_servers::run_dns_and_pkarr_servers,
dns_server::{create_dns_resolver, run_dns_server},
pkarr_dns_state::State,
run_relay_server,
},
AddrInfo, MagicEndpoint, NodeAddr,
};

use self::{pkarr_relay::run_pkarr_relay, state::State};

#[tokio::test]
async fn dns_resolve() -> Result<()> {
let _logging_guard = iroh_test::logging::setup();

let cancel = CancellationToken::new();
let origin = "testdns.example".to_string();
let state = State::new(origin.clone());
let (nameserver, dns_task) = run_dns_server(state.clone(), cancel.clone()).await?;
let (nameserver, _dns_drop_guard) = run_dns_server(state.clone()).await?;

let secret_key = SecretKey::generate();
let node_info = NodeInfo::new(
Expand All @@ -590,8 +591,6 @@ mod test_dns_pkarr {

assert_eq!(resolved, node_info.into());

cancel.cancel();
dns_task.await??;
Ok(())
}

Expand All @@ -600,11 +599,10 @@ mod test_dns_pkarr {
let _logging_guard = iroh_test::logging::setup();

let origin = "testdns.example".to_string();
let cancel = CancellationToken::new();
let timeout = Duration::from_secs(2);

let (nameserver, pkarr_url, state, task) =
run_dns_and_pkarr_servers(origin.clone(), cancel.clone()).await?;
let (nameserver, pkarr_url, state, _dns_drop_guard, _pkarr_drop_guard) =
run_dns_and_pkarr_servers(origin.clone()).await?;

let secret_key = SecretKey::generate();
let node_id = secret_key.public();
Expand All @@ -628,9 +626,6 @@ mod test_dns_pkarr {
};

assert_eq!(resolved, expected);

cancel.cancel();
task.await??;
Ok(())
}

Expand All @@ -641,11 +636,10 @@ mod test_dns_pkarr {
let _logging_guard = iroh_test::logging::setup();

let origin = "testdns.example".to_string();
let cancel = CancellationToken::new();
let timeout = Duration::from_secs(2);

let (nameserver, pkarr_url, state, task) =
run_dns_and_pkarr_servers(&origin, cancel.clone()).await?;
let (nameserver, pkarr_url, state, _dns_drop_guard, _pkarr_drop_guard) =
run_dns_and_pkarr_servers(&origin).await?;
let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;

let ep1 = ep_with_discovery(relay_map.clone(), nameserver, &origin, &pkarr_url).await?;
Expand All @@ -657,8 +651,6 @@ mod test_dns_pkarr {
// we connect only by node id!
let res = ep2.connect(ep1.node_id().into(), TEST_ALPN).await;
assert!(res.is_ok(), "connection established");
cancel.cancel();
task.await??;
Ok(())
}

Expand All @@ -685,203 +677,4 @@ mod test_dns_pkarr {
.await?;
Ok(ep)
}

async fn run_dns_and_pkarr_servers(
origin: impl ToString,
cancel: CancellationToken,
) -> Result<(SocketAddr, Url, State, JoinHandle<Result<()>>)> {
let state = State::new(origin.to_string());
let (nameserver, dns_task) = run_dns_server(state.clone(), cancel.clone()).await?;
let (pkarr_url, pkarr_task) = run_pkarr_relay(state.clone(), cancel.clone()).await?;
let join_handle = tokio::task::spawn(async move {
dns_task.await??;
pkarr_task.await??;
Ok(())
});
Ok((nameserver, pkarr_url, state, join_handle))
}

mod state {
use anyhow::{bail, Result};
use parking_lot::{Mutex, MutexGuard};
use pkarr::SignedPacket;
use std::{
collections::{hash_map, HashMap},
future::Future,
ops::Deref,
sync::Arc,
time::Duration,
};

use crate::dns::node_info::{node_id_from_hickory_name, NodeInfo};
use crate::test_utils::dns_server::QueryHandler;
use crate::NodeId;

#[derive(Debug, Clone)]
pub struct State {
packets: Arc<Mutex<HashMap<NodeId, SignedPacket>>>,
origin: String,
notify: Arc<tokio::sync::Notify>,
}

impl State {
pub fn new(origin: String) -> Self {
Self {
packets: Default::default(),
origin,
notify: Arc::new(tokio::sync::Notify::new()),
}
}

pub fn on_update(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}

pub async fn on_node(&self, node: &NodeId, timeout: Duration) -> Result<()> {
let timeout = tokio::time::sleep(timeout);
tokio::pin!(timeout);
while self.get(node).is_none() {
tokio::select! {
_ = &mut timeout => bail!("timeout"),
_ = self.on_update() => {}
}
}
Ok(())
}

pub fn upsert(&self, signed_packet: SignedPacket) -> anyhow::Result<bool> {
let node_id = NodeId::from_bytes(&signed_packet.public_key().to_bytes())?;
let mut map = self.packets.lock();
let updated = match map.entry(node_id) {
hash_map::Entry::Vacant(e) => {
e.insert(signed_packet);
true
}
hash_map::Entry::Occupied(mut e) => {
if signed_packet.more_recent_than(e.get()) {
e.insert(signed_packet);
true
} else {
false
}
}
};
if updated {
self.notify.notify_waiters();
}
Ok(updated)
}

/// Returns a mutex guard, do not hold over await points
pub fn get(&self, node_id: &NodeId) -> Option<impl Deref<Target = SignedPacket> + '_> {
let map = self.packets.lock();
if map.contains_key(node_id) {
let guard = MutexGuard::map(map, |state| state.get_mut(node_id).unwrap());
Some(guard)
} else {
None
}
}

pub fn resolve_dns(
&self,
query: &hickory_proto::op::Message,
reply: &mut hickory_proto::op::Message,
ttl: u32,
) -> Result<()> {
for query in query.queries() {
let Some(node_id) = node_id_from_hickory_name(query.name()) else {
continue;
};
let packet = self.get(&node_id);
let Some(packet) = packet.as_ref() else {
continue;
};
let node_info = NodeInfo::from_pkarr_signed_packet(packet)?;
for record in node_info.to_hickory_records(&self.origin, ttl)? {
reply.add_answer(record);
}
}
Ok(())
}
}

impl QueryHandler for State {
fn resolve(
&self,
query: &hickory_proto::op::Message,
reply: &mut hickory_proto::op::Message,
) -> impl Future<Output = Result<()>> + Send {
const TTL: u32 = 30;
let res = self.resolve_dns(query, reply, TTL);
futures::future::ready(res)
}
}
}

mod pkarr_relay {
use std::net::{Ipv4Addr, SocketAddr};

use anyhow::Result;
use axum::{
extract::{Path, State},
response::IntoResponse,
routing::put,
Router,
};
use bytes::Bytes;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use url::Url;

use super::State as AppState;

pub async fn run_pkarr_relay(
state: AppState,
cancel: CancellationToken,
) -> Result<(Url, JoinHandle<Result<()>>)> {
let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
let app = Router::new()
.route("/pkarr/:key", put(pkarr_put))
.with_state(state);
let listener = tokio::net::TcpListener::bind(bind_addr).await?;
let bound_addr = listener.local_addr()?;
let url: Url = format!("http://{bound_addr}/pkarr")
.parse()
.expect("valid url");
let join_handle = tokio::task::spawn(async move {
let serve = axum::serve(listener, app);
let serve = serve.with_graceful_shutdown(cancel.cancelled_owned());
serve.await?;
Ok(())
});
Ok((url, join_handle))
}

async fn pkarr_put(
State(state): State<AppState>,
Path(key): Path<String>,
body: Bytes,
) -> Result<impl IntoResponse, AppError> {
let key = pkarr::PublicKey::try_from(key.as_str())?;
let signed_packet = pkarr::SignedPacket::from_relay_response(key, body)?;
let _updated = state.upsert(signed_packet)?;
Ok(http::StatusCode::NO_CONTENT)
}

#[derive(Debug)]
struct AppError(anyhow::Error);
impl<T: Into<anyhow::Error>> From<T> for AppError {
fn from(value: T) -> Self {
Self(value.into())
}
}
impl IntoResponse for AppError {
fn into_response(self) -> axum::response::Response {
warn!(err = ?self, "request failed");
(http::StatusCode::INTERNAL_SERVER_ERROR, self.0.to_string()).into_response()
}
}
}
}
39 changes: 36 additions & 3 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,8 @@ impl MagicEndpoint {
/// Note: updating the magic socket's *netmap* will also prune any connections that are *not*
/// present in the netmap.
///
/// If no UDP addresses are added, and `relay_url` is `None`, it will error.
/// If no UDP addresses are added, and the given `relay_url` cannot be dialed, it will error.
// TODO: This is infallible, stop returning a result.
/// # Errors
/// Will return an error if we attempt to add our own [`PublicKey`] to the node map.
pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> {
// Connecting to ourselves is not supported.
if node_addr.node_id == self.node_id() {
Expand Down Expand Up @@ -1095,4 +1094,38 @@ mod tests {
res_ep1.await.unwrap().unwrap();
res_ep2.await.unwrap().unwrap();
}

#[tokio::test]
async fn test_dial_via_node_id_only() -> Result<()> {
// set up pkarr dns server
// start two magic endpoints w/ discovery
// dial one

// let _logging_guard = iroh_test::logging::setup();
// let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();

// pkarr dns server
// let (nameserver, pkarr_url, state, pkarr_task) = iroh::net::discovery::test::run_dns_and_pkarr_servers();

// let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
// let ep1_secret_key = SecretKey::generate_with_rng(&mut rng);
// let ep2_secret_key = SecretKey::generate_with_rng(&mut rng);
// let ep1 = MagicEndpoint::builder()
// .secret_key(ep1_secret_key)
// .insecure_skip_relay_cert_verify(true)
// .alpns(vec![TEST_ALPN.to_vec()])
// .relay_mode(RelayMode::Custom(relay_map.clone()))
// .bind(0)
// .await
// .unwrap();
// let ep2 = MagicEndpoint::builder()
// .secret_key(ep2_secret_key)
// .insecure_skip_relay_cert_verify(true)
// .alpns(vec![TEST_ALPN.to_vec()])
// .relay_mode(RelayMode::Custom(relay_map))
// .bind(0)
// .await
// .unwrap();
todo!();
}
}
2 changes: 1 addition & 1 deletion iroh-net/src/magicsock/node_map/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ impl NodeInfo {
.min()
}

/// Returns `true` if we this info contains either a relay URL or at least one direct address.
/// Returns `true` if this info contains either a relay URL or at least one direct address.
pub fn has_send_address(&self) -> bool {
self.relay_url.is_some() || !self.addrs.is_empty()
}
Expand Down
Loading

0 comments on commit 8b483b8

Please sign in to comment.