Skip to content

Commit

Permalink
tests(iroh-gossip): fix net smoke test (#2314)
Browse files Browse the repository at this point in the history
## Description

* Use shared test setup code from `iroh_net`
* Fix stupid typo

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
Frando authored May 21, 2024
1 parent 370075c commit 8ad6ff1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 131 deletions.
2 changes: 1 addition & 1 deletion iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ iroh-base = { version = "0.16.0", path = "../iroh-base" }

# net dependencies (optional)
futures-lite = { version = "2.3", optional = true }
iroh-net = { path = "../iroh-net", version = "0.16.0", optional = true, default-features = false }
iroh-net = { path = "../iroh-net", version = "0.16.0", optional = true, default-features = false, features = ["test-utils"] }
tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] }
tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }
Expand Down
146 changes: 16 additions & 130 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ fn decode_peer_data(peer_data: &PeerData) -> anyhow::Result<AddrInfo> {
mod test {
use std::time::Duration;

use iroh_net::key::SecretKey;
use iroh_net::relay::{RelayMap, RelayMode};
use tokio::spawn;
use tokio::time::timeout;
Expand All @@ -661,10 +662,15 @@ mod test {

use super::*;

async fn create_endpoint(relay_map: RelayMap) -> anyhow::Result<Endpoint> {
async fn create_endpoint(
rng: &mut rand_chacha::ChaCha12Rng,
relay_map: RelayMap,
) -> anyhow::Result<Endpoint> {
Endpoint::builder()
.secret_key(SecretKey::generate_with_rng(rng))
.alpns(vec![GOSSIP_ALPN.to_vec()])
.relay_mode(RelayMode::Custom(relay_map))
.insecure_skip_relay_cert_verify(true)
.bind(0)
.await
}
Expand All @@ -688,16 +694,15 @@ mod test {
}

#[tokio::test]
#[ignore = "flaky"]
async fn gossip_net_smoke() {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let _guard = iroh_test::logging::setup();
let (relay_map, relay_url, cleanup) = util::run_relay_and_stun([127, 0, 0, 1].into())
.await
.unwrap();
let (relay_map, relay_url, _guard) =
iroh_net::test_utils::run_relay_server().await.unwrap();

let ep1 = create_endpoint(relay_map.clone()).await.unwrap();
let ep2 = create_endpoint(relay_map.clone()).await.unwrap();
let ep3 = create_endpoint(relay_map.clone()).await.unwrap();
let ep1 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap();
let ep2 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap();
let ep3 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap();
let addr1 = AddrInfo {
relay_url: Some(relay_url.clone()),
direct_addresses: Default::default(),
Expand All @@ -722,8 +727,8 @@ mod test {
let cancel = CancellationToken::new();
let tasks = [
spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
spawn(endpoint_loop(ep2.clone(), go3.clone(), cancel.clone())),
spawn(endpoint_loop(ep3.clone(), go2.clone(), cancel.clone())),
spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
];

debug!("----- adding peers ----- ");
Expand All @@ -739,7 +744,7 @@ mod test {
go2.join(topic, vec![pi1]).await.unwrap().await.unwrap();
go3.join(topic, vec![pi1]).await.unwrap().await.unwrap();

let len = 10;
let len = 2;

// subscribe nodes 2 and 3 to the topic
let mut stream2 = go2.subscribe(topic).await.unwrap();
Expand Down Expand Up @@ -814,124 +819,5 @@ mod test {
.unwrap()
.unwrap();
}
drop(cleanup);
}

// This is copied from iroh-net/src/hp/magicsock/conn.rs
// TODO: Move into a public test_utils module in iroh-net?
mod util {
use std::net::{IpAddr, SocketAddr};

use anyhow::Result;
use iroh_net::{
key::SecretKey,
relay::{RelayMap, RelayUrl},
stun::{is, parse_binding_request, response},
};
use tokio::sync::oneshot;
use tracing::{debug, info, trace};

/// A drop guard to clean up test infrastructure.
///
/// After dropping the test infrastructure will asynchronously shutdown and release its
/// resources.
// Nightly sees the sender as dead code currently, but we only rely on Drop of the
// sender.
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct CleanupDropGuard(pub(crate) oneshot::Sender<()>);

/// Runs a relay server with STUN enabled suitable for tests.
///
/// The returned `Url` is the url of the relay server in the returned [`RelayMap`], it
/// is always `Some` as that is how the [`Endpoint::connect`] API expects it.
///
/// [`Endpoint::connect`]: crate::endpoint::Endpoint
pub(crate) async fn run_relay_and_stun(
stun_ip: IpAddr,
) -> Result<(RelayMap, RelayUrl, CleanupDropGuard)> {
let server_key = SecretKey::generate();
let server = iroh_net::relay::http::ServerBuilder::new("127.0.0.1:0".parse().unwrap())
.secret_key(Some(server_key))
.tls_config(None)
.spawn()
.await?;

let http_addr = server.addr();
info!("relay listening on {:?}", http_addr);

let (stun_addr, stun_drop_guard) = serve(stun_ip).await?;
let relay_url: RelayUrl = format!("http://localhost:{}", http_addr.port())
.parse()
.unwrap();
let m = RelayMap::default_from_node(relay_url.clone(), stun_addr.port());

let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let _stun_cleanup = stun_drop_guard; // move into this closure

// Wait until we're dropped or receive a message.
rx.await.ok();
server.shutdown().await;
});

Ok((m, relay_url, CleanupDropGuard(tx)))
}

/// Sets up a simple STUN server.
async fn serve(ip: IpAddr) -> Result<(SocketAddr, CleanupDropGuard)> {
let pc = tokio::net::UdpSocket::bind((ip, 0)).await?;
let mut addr = pc.local_addr()?;
match addr.ip() {
IpAddr::V4(ip) => {
if ip.octets() == [0, 0, 0, 0] {
addr.set_ip("127.0.0.1".parse().unwrap());
}
}
_ => unreachable!("using ipv4"),
}

info!("STUN listening on {}", addr);
let (s, r) = oneshot::channel();
tokio::task::spawn(async move {
run_stun(pc, r).await;
});

Ok((addr, CleanupDropGuard(s)))
}

async fn run_stun(pc: tokio::net::UdpSocket, mut done: oneshot::Receiver<()>) {
let mut buf = vec![0u8; 64 << 10];
loop {
trace!("read loop");
tokio::select! {
_ = &mut done => {
debug!("shutting down");
break;
}
res = pc.recv_from(&mut buf) => match res {
Ok((n, addr)) => {
trace!("read packet {}bytes from {}", n, addr);
let pkt = &buf[..n];
if !is(pkt) {
debug!("received non STUN pkt");
continue;
}
if let Ok(txid) = parse_binding_request(pkt) {
debug!("received binding request");

let res = response(txid, addr);
if let Err(err) = pc.send_to(&res, addr).await {
eprintln!("STUN server write failed: {:?}", err);
}
}
}
Err(err) => {
eprintln!("failed to read: {:?}", err);
}
}
}
}
}
}
}

0 comments on commit 8ad6ff1

Please sign in to comment.