From 295414b9279c20afcdda490164c28d87a430a11f Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sat, 8 Jun 2024 13:33:31 +0100 Subject: [PATCH] Add tests, some small fixes and cleanup --- iroh-cli/src/commands/doctor.rs | 2 +- iroh-net/src/bin/iroh-relay.rs | 762 +----------------------------- iroh-net/src/netcheck.rs | 19 +- iroh-net/src/relay/http/server.rs | 2 +- iroh-net/src/relay/iroh_relay.rs | 392 +++++++++------ iroh-net/src/stun.rs | 21 +- 6 files changed, 275 insertions(+), 923 deletions(-) diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 7c7fa374f6..1383f79447 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -93,7 +93,7 @@ pub enum Commands { #[clap(long)] stun_host: Option, /// The port of the STUN server. - #[clap(long, default_value_t = DEFAULT_RELAY_STUN_PORT)] + #[clap(long, default_value_t = DEFAULT_STUN_PORT)] stun_port: u16, }, /// Wait for incoming requests from iroh doctor connect diff --git a/iroh-net/src/bin/iroh-relay.rs b/iroh-net/src/bin/iroh-relay.rs index b0c5cd9696..f9a79d9f8e 100644 --- a/iroh-net/src/bin/iroh-relay.rs +++ b/iroh-net/src/bin/iroh-relay.rs @@ -2,57 +2,27 @@ //! //! Based on /tailscale/cmd/derper. -use std::{ - borrow::Cow, - fmt, - future::Future, - net::{IpAddr, Ipv6Addr, SocketAddr}, - path::{Path, PathBuf}, - pin::Pin, - sync::Arc, -}; +use std::borrow::Cow; +use std::net::{Ipv6Addr, SocketAddr}; +use std::path::{Path, PathBuf}; use anyhow::{anyhow, bail, Context as _, Result}; use clap::Parser; -use futures_lite::StreamExt; -use http::{response::Builder as ResponseBuilder, HeaderMap}; -use hyper::body::Incoming; -use hyper::{Method, Request, Response, StatusCode}; -use iroh_metrics::inc; use iroh_net::defaults::{ DEFAULT_HTTPS_PORT, DEFAULT_HTTP_PORT, DEFAULT_METRICS_PORT, DEFAULT_STUN_PORT, NA_RELAY_HOSTNAME, }; use iroh_net::key::SecretKey; -use iroh_net::relay::http::{ - ServerBuilder as RelayServerBuilder, TlsAcceptor, TlsConfig as RelayTlsConfig, -}; -use iroh_net::relay::{self, iroh_relay}; -use iroh_net::stun; +use iroh_net::relay::iroh_relay; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use tokio::{ - net::{TcpListener, UdpSocket}, - task::JoinHandle, -}; use tokio_rustls_acme::{caches::DirCache, AcmeConfig}; -use tracing::{debug, debug_span, error, info, info_span, trace, warn, Instrument}; +use tracing::{debug, info}; use tracing_subscriber::{prelude::*, EnvFilter}; -use metrics::StunMetrics; - -type BytesBody = http_body_util::Full; -type HyperError = Box; -type HyperResult = std::result::Result; - /// The default `http_bind_port` when using `--dev`. const DEV_MODE_HTTP_PORT: u16 = 3340; -/// Creates a new [`BytesBody`] with no content. -fn body_empty() -> BytesBody { - http_body_util::Full::new(hyper::body::Bytes::new()) -} - /// A relay server for iroh-net. #[derive(Parser, Debug, Clone)] #[clap(version, about, long_about = None)] @@ -78,67 +48,6 @@ enum CertMode { LetsEncrypt, } -impl CertMode { - async fn gen_server_config( - &self, - hostname: String, - contact: String, - is_production: bool, - dir: PathBuf, - ) -> Result<(Arc, TlsAcceptor)> { - let config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth(); - - match self { - CertMode::LetsEncrypt => { - let mut state = AcmeConfig::new(vec![hostname]) - .contact([format!("mailto:{contact}")]) - .cache_option(Some(DirCache::new(dir))) - .directory_lets_encrypt(is_production) - .state(); - - let config = config.with_cert_resolver(state.resolver()); - let acceptor = state.acceptor(); - - tokio::spawn( - async move { - while let Some(event) = state.next().await { - match event { - Ok(ok) => debug!("acme event: {:?}", ok), - Err(err) => error!("error: {:?}", err), - } - } - debug!("event stream finished"); - } - .instrument(info_span!("acme")), - ); - - Ok((Arc::new(config), TlsAcceptor::LetsEncrypt(acceptor))) - } - CertMode::Manual => { - // load certificates manually - let keyname = escape_hostname(&hostname); - let cert_path = dir.join(format!("{keyname}.crt")); - let key_path = dir.join(format!("{keyname}.key")); - - let (certs, secret_key) = tokio::task::spawn_blocking(move || { - let certs = load_certs(cert_path)?; - let key = load_secret_key(key_path)?; - anyhow::Ok((certs, key)) - }) - .await??; - - let config = config.with_single_cert(certs, secret_key)?; - let config = Arc::new(config); - let acceptor = tokio_rustls::TlsAcceptor::from(config.clone()); - - Ok((config, TlsAcceptor::Manual(acceptor))) - } - } - } -} - fn escape_hostname(hostname: &str) -> Cow<'_, str> { let unsafe_hostname_characters = regex::Regex::new(r"[^a-zA-Z0-9-\.]").expect("regex manually checked"); @@ -544,455 +453,6 @@ async fn build_relay_config(cfg: Config) -> Result>, -// ) -> Result<()> { -// let (addr, tls_config) = if dev_mode { -// let port = if cfg.addr.port() != 443 { -// cfg.addr.port() -// } else { -// DEV_PORT -// }; - -// let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port); -// info!(%addr, "Running in dev mode."); -// (addr, None) -// } else { -// (cfg.addr, cfg.tls) -// }; - -// if let Some(tls_config) = &tls_config { -// if let Some(captive_portal_port) = tls_config.captive_portal_port { -// if addr.port() == captive_portal_port { -// bail!("The main listening address {addr:?} and the `captive_portal_port` have the same port number."); -// } -// } -// } else if addr.port() == 443 { -// // no tls config, but the port is 443 -// warn!("The address port is 443, which is typically the expected tls port, but you have not supplied any tls configuration.\nIf you meant to run the relay server with tls enabled, adjust the config file to include tls configuration."); -// } - -// // set up relay configuration details -// let secret_key = if cfg.enable_relay { -// Some(cfg.secret_key) -// } else { -// None -// }; - -// // run stun -// let stun_task = if cfg.enable_stun { -// Some(tokio::task::spawn(async move { -// serve_stun(addr.ip(), cfg.stun_port).await -// })) -// } else { -// None -// }; - -// // set up tls configuration details -// let (tls_config, headers, captive_portal_port) = if let Some(tls_config) = tls_config { -// let contact = tls_config.contact; -// let is_production = tls_config.prod_tls; -// let (config, acceptor) = tls_config -// .cert_mode -// .gen_server_config( -// cfg.hostname.clone(), -// contact, -// is_production, -// tls_config.cert_dir.unwrap_or_else(|| PathBuf::from(".")), -// ) -// .await?; -// let mut headers = HeaderMap::new(); -// for (name, value) in TLS_HEADERS.iter() { -// headers.insert(*name, value.parse()?); -// } -// ( -// Some(RelayTlsConfig { config, acceptor }), -// headers, -// tls_config -// .captive_portal_port -// .unwrap_or(DEFAULT_CAPTIVE_PORTAL_PORT), -// ) -// } else { -// (None, HeaderMap::new(), 0) -// }; - -// let mut builder = RelayServerBuilder::new(addr) -// .secret_key(secret_key.map(Into::into)) -// .headers(headers) -// .tls_config(tls_config.clone()) -// .relay_override(Box::new(relay_disabled_handler)) -// .request_handler(Method::GET, "/", Box::new(root_handler)) -// .request_handler(Method::GET, "/index.html", Box::new(root_handler)) -// .request_handler(Method::GET, "/derp/probe", Box::new(probe_handler)) -// .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); -// // if tls is enabled, we need to serve this endpoint from a non-tls connection -// // which we check for below -// if tls_config.is_none() { -// builder = builder.request_handler( -// Method::GET, -// "/generate_204", -// Box::new(serve_no_content_handler), -// ); -// } -// let relay_server = builder.spawn().await?; - -// // captive portal detections must be served over HTTP -// let captive_portal_task = if tls_config.is_some() { -// let http_addr = SocketAddr::new(addr.ip(), captive_portal_port); -// let task = serve_captive_portal_service(http_addr).await?; -// Some(task) -// } else { -// None -// }; - -// if let Some(addr_sender) = addr_sender { -// if let Err(e) = addr_sender.send(relay_server.addr()) { -// bail!("Unable to send the local SocketAddr, the Sender was dropped - {e:?}"); -// } -// } - -// tokio::signal::ctrl_c().await?; -// // Shutdown all tasks -// if let Some(task) = stun_task { -// task.abort(); -// } -// if let Some(task) = captive_portal_task { -// task.abort() -// } -// relay_server.shutdown().await; - -// Ok(()) -// } - -// const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge"; -// const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response"; - -// const NOTFOUND: &[u8] = b"Not Found"; -// const RELAY_DISABLED: &[u8] = b"relay server disabled"; -// const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; -// const INDEX: &[u8] = br#" -//

RELAY

-//

-// This is an -// Iroh Relay -// server. -//

-// "#; - -// const TLS_HEADERS: [(&str, &str); 2] = [ -// ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"), -// ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'") -// ]; - -// async fn serve_captive_portal_service(addr: SocketAddr) -> Result> { -// let http_listener = TcpListener::bind(&addr) -// .await -// .context("failed to bind http")?; -// let http_addr = http_listener.local_addr()?; -// info!("[CaptivePortalService]: serving on {}", http_addr); - -// let task = tokio::spawn( -// async move { -// loop { -// match http_listener.accept().await { -// Ok((stream, peer_addr)) => { -// debug!( -// "[CaptivePortalService] Connection opened from {}", -// peer_addr -// ); -// let handler = CaptivePortalService; - -// tokio::task::spawn(async move { -// let stream = relay::MaybeTlsStreamServer::Plain(stream); -// let stream = hyper_util::rt::TokioIo::new(stream); -// if let Err(err) = hyper::server::conn::http1::Builder::new() -// .serve_connection(stream, handler) -// .with_upgrades() -// .await -// { -// error!( -// "[CaptivePortalService] Failed to serve connection: {:?}", -// err -// ); -// } -// }); -// } -// Err(err) => { -// error!( -// "[CaptivePortalService] failed to accept connection: {:#?}", -// err -// ); -// } -// } -// } -// } -// .instrument(info_span!("captive-portal.service")), -// ); -// Ok(task) -// } - -// #[derive(Clone)] -// struct CaptivePortalService; - -// impl hyper::service::Service> for CaptivePortalService { -// type Response = Response; -// type Error = HyperError; -// type Future = Pin> + Send>>; - -// fn call(&self, req: Request) -> Self::Future { -// match (req.method(), req.uri().path()) { -// // Captive Portal checker -// (&Method::GET, "/generate_204") => { -// Box::pin(async move { serve_no_content_handler(req, Response::builder()) }) -// } -// _ => { -// // Return 404 not found response. -// let r = Response::builder() -// .status(StatusCode::NOT_FOUND) -// .body(NOTFOUND.into()) -// .map_err(|err| Box::new(err) as HyperError); -// Box::pin(async move { r }) -// } -// } -// } -// } - -// fn relay_disabled_handler( -// _r: Request, -// response: ResponseBuilder, -// ) -> HyperResult> { -// response -// .status(StatusCode::NOT_FOUND) -// .body(RELAY_DISABLED.into()) -// .map_err(|err| Box::new(err) as HyperError) -// } - -// fn root_handler( -// _r: Request, -// response: ResponseBuilder, -// ) -> HyperResult> { -// response -// .status(StatusCode::OK) -// .header("Content-Type", "text/html; charset=utf-8") -// .body(INDEX.into()) -// .map_err(|err| Box::new(err) as HyperError) -// } - -// /// HTTP latency queries -// fn probe_handler( -// _r: Request, -// response: ResponseBuilder, -// ) -> HyperResult> { -// response -// .status(StatusCode::OK) -// .header("Access-Control-Allow-Origin", "*") -// .body(body_empty()) -// .map_err(|err| Box::new(err) as HyperError) -// } - -// fn robots_handler( -// _r: Request, -// response: ResponseBuilder, -// ) -> HyperResult> { -// response -// .status(StatusCode::OK) -// .body(ROBOTS_TXT.into()) -// .map_err(|err| Box::new(err) as HyperError) -// } - -// /// For captive portal detection. -// fn serve_no_content_handler( -// r: Request, -// mut response: ResponseBuilder, -// ) -> HyperResult> { -// if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { -// if !challenge.is_empty() -// && challenge.len() < 64 -// && challenge -// .as_bytes() -// .iter() -// .all(|c| is_challenge_char(*c as char)) -// { -// response = response.header( -// NO_CONTENT_RESPONSE_HEADER, -// format!("response {}", challenge.to_str()?), -// ); -// } -// } - -// response -// .status(StatusCode::NO_CONTENT) -// .body(body_empty()) -// .map_err(|err| Box::new(err) as HyperError) -// } - -// fn is_challenge_char(c: char) -> bool { -// // Semi-randomly chosen as a limited set of valid characters -// c.is_ascii_lowercase() -// || c.is_ascii_uppercase() -// || c.is_ascii_digit() -// || c == '.' -// || c == '-' -// || c == '_' -// } - -// async fn serve_stun(host: IpAddr, port: u16) { -// match UdpSocket::bind((host, port)).await { -// Ok(sock) => { -// let addr = sock.local_addr().expect("socket just bound"); -// info!(%addr, "running STUN server"); -// server_stun_listener(sock) -// .instrument(debug_span!("stun_server", %addr)) -// .await; -// } -// Err(err) => { -// error!( -// "failed to open STUN listener at host {host} and port {port}: {:#?}", -// err -// ); -// } -// } -// } - -// async fn server_stun_listener(sock: UdpSocket) { -// let sock = Arc::new(sock); -// let mut buffer = vec![0u8; 64 << 10]; -// loop { -// match sock.recv_from(&mut buffer).await { -// Ok((n, src_addr)) => { -// inc!(StunMetrics, requests); -// let pkt = buffer[..n].to_vec(); -// let sock = sock.clone(); -// tokio::task::spawn(async move { -// if !stun::is(&pkt) { -// debug!(%src_addr, "STUN: ignoring non stun packet"); -// inc!(StunMetrics, bad_requests); -// return; -// } -// match tokio::task::spawn_blocking(move || stun::parse_binding_request(&pkt)) -// .await -// { -// Ok(Ok(txid)) => { -// debug!(%src_addr, %txid, "STUN: received binding request"); -// let res = match tokio::task::spawn_blocking(move || { -// stun::response(txid, src_addr) -// }) -// .await -// { -// Ok(res) => res, -// Err(err) => { -// error!("JoinError: {err:#}"); -// return; -// } -// }; -// match sock.send_to(&res, src_addr).await { -// Ok(len) => { -// if len != res.len() { -// warn!(%src_addr, %txid, "STUN: failed to write response sent: {}, but expected {}", len, res.len()); -// } -// match src_addr { -// SocketAddr::V4(_) => { -// inc!(StunMetrics, ipv4_success); -// } -// SocketAddr::V6(_) => { -// inc!(StunMetrics, ipv6_success); -// } -// } -// trace!(%src_addr, %txid, "STUN: sent {} bytes", len); -// } -// Err(err) => { -// inc!(StunMetrics, failures); -// warn!(%src_addr, %txid, "STUN: failed to write response: {:?}", err); -// } -// } -// } -// Ok(Err(err)) => { -// inc!(StunMetrics, bad_requests); -// warn!(%src_addr, "STUN: invalid binding request: {:?}", err); -// } -// Err(err) => error!("JoinError parsing STUN binding: {err:#}"), -// } -// }); -// } -// Err(err) => { -// inc!(StunMetrics, failures); -// warn!("STUN: failed to recv: {:?}", err); -// } -// } -// } -// } - -// // var validProdHostname = regexp.MustCompile(`^relay([^.]*)\.tailscale\.com\.?$`) - -// // func prodAutocertHostPolicy(_ context.Context, host string) error { -// // if validProdHostname.MatchString(host) { -// // return nil -// // } -// // return errors.New("invalid hostname") -// // } - -// // func rateLimitedListenAndServeTLS(srv *http.Server) error { -// // addr := srv.Addr -// // if addr == "" { -// // addr = ":https" -// // } -// // ln, err := net.Listen("tcp", addr) -// // if err != nil { -// // return err -// // } -// // rln := newRateLimitedListener(ln, rate.Limit(*acceptConnLimit), *acceptConnBurst) -// // expvar.Publish("tls_listener", rln.ExpVar()) -// // defer rln.Close() -// // return srv.ServeTLS(rln, "", "") -// // } - -// // type rateLimitedListener struct { -// // // These are at the start of the struct to ensure 64-bit alignment -// // // on 32-bit architecture regardless of what other fields may exist -// // // in this package. -// // numAccepts expvar.Int // does not include number of rejects -// // numRejects expvar.Int - -// // net.Listener - -// // lim *rate.Limiter -// // } - -// // func newRateLimitedListener(ln net.Listener, limit rate.Limit, burst int) *rateLimitedListener { -// // return &rateLimitedListener{Listener: ln, lim: rate.NewLimiter(limit, burst)} -// // } - -// // func (l *rateLimitedListener) ExpVar() expvar.Var { -// // m := new(metrics.Set) -// // m.Set("counter_accepted_connections", &l.numAccepts) -// // m.Set("counter_rejected_connections", &l.numRejects) -// // return m -// // } - -// // var errLimitedConn = errors.New("cannot accept connection; rate limited") - -// // func (l *rateLimitedListener) Accept() (net.Conn, error) { -// // // Even under a rate limited situation, we accept the connection immediately -// // // and close it, rather than being slow at accepting new connections. -// // // This provides two benefits: 1) it signals to the client that something -// // // is going on on the server, and 2) it prevents new connections from -// // // piling up and occupying resources in the OS kernel. -// // // The client will retry as needing (with backoffs in place). -// // cn, err := l.Listener.Accept() -// // if err != nil { -// // return nil, err -// // } -// // if !l.lim.Allow() { -// // l.numRejects.Add(1) -// // cn.Close() -// // return nil, errLimitedConn -// // } -// // l.numAccepts.Add(1) -// // return cn, nil -// // } -// // mod metrics { use iroh_metrics::{ core::{Counter, Metric}, @@ -1041,205 +501,15 @@ mod metrics { } } -// #[cfg(test)] -// mod tests { -// use super::*; - -// use std::net::Ipv4Addr; -// use std::time::Duration; - -// use bytes::Bytes; -// use http_body_util::BodyExt; -// use iroh_base::node_addr::RelayUrl; -// use iroh_net::relay::http::ClientBuilder; -// use iroh_net::relay::ReceivedMessage; -// use tokio::task::JoinHandle; - -// #[tokio::test] -// async fn test_serve_no_content_handler() { -// let challenge = "123az__."; -// let req = Request::builder() -// .header(NO_CONTENT_CHALLENGE_HEADER, challenge) -// .body(body_empty()) -// .unwrap(); - -// let res = serve_no_content_handler(req, Response::builder()).unwrap(); -// assert_eq!(res.status(), StatusCode::NO_CONTENT); - -// let header = res -// .headers() -// .get(NO_CONTENT_RESPONSE_HEADER) -// .unwrap() -// .to_str() -// .unwrap(); -// assert_eq!(header, format!("response {challenge}")); -// assert!(res -// .into_body() -// .collect() -// .await -// .unwrap() -// .to_bytes() -// .is_empty()); -// } - -// #[test] -// fn test_escape_hostname() { -// assert_eq!( -// escape_hostname("hello.host.name_foo-bar%baz"), -// "hello.host.namefoo-barbaz" -// ); -// } - -// struct DropServer { -// server_task: JoinHandle<()>, -// } - -// impl Drop for DropServer { -// fn drop(&mut self) { -// self.server_task.abort(); -// } -// } - -// #[tokio::test] -// async fn test_relay_server_basic() -> Result<()> { -// tracing_subscriber::registry() -// .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) -// .with(EnvFilter::from_default_env()) -// .try_init() -// .ok(); -// // Binding to LOCALHOST to satisfy issues when binding to UNSPECIFIED in Windows for tests -// // Binding to Ipv4 because, when binding to `IPv6::UNSPECIFIED`, it will also listen for -// // IPv4 connections, but will not automatically do the same for `LOCALHOST`. In order to -// // test STUN, which only listens on Ipv4, we must bind the whole relay server to Ipv4::LOCALHOST. -// let cfg = Config { -// addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), -// ..Default::default() -// }; -// let (addr_send, addr_recv) = tokio::sync::oneshot::channel(); -// let relay_server_task = tokio::spawn( -// async move { -// // dev mode will bind to IPv6::UNSPECIFIED, so setting it `false` -// let res = run(false, cfg, Some(addr_send)).await; -// if let Err(e) = res { -// eprintln!("error starting relay server {e}"); -// } -// } -// .instrument(debug_span!("relay server")), -// ); -// let _drop_server = DropServer { -// server_task: relay_server_task, -// }; - -// let relay_server_addr = addr_recv.await?; -// let relay_server_str_url = format!("http://{}", relay_server_addr); -// let relay_server_url: RelayUrl = relay_server_str_url.parse().unwrap(); - -// // set up clients -// let a_secret_key = SecretKey::generate(); -// let a_key = a_secret_key.public(); -// let resolver = iroh_net::dns::default_resolver().clone(); -// let (client_a, mut client_a_receiver) = -// ClientBuilder::new(relay_server_url.clone()).build(a_secret_key, resolver); -// let connect_client = client_a.clone(); - -// // give the relay server some time to set up -// if let Err(e) = tokio::time::timeout(Duration::from_secs(10), async move { -// loop { -// match connect_client.connect().await { -// Ok(_) => break, -// Err(e) => { -// tracing::warn!("client a unable to connect to relay server: {e:?}. Attempting to dial again in 10ms"); -// tokio::time::sleep(Duration::from_millis(100)).await -// } -// } -// } -// }) -// .await -// { -// bail!("error connecting client a to relay server: {e:?}"); -// } - -// let b_secret_key = SecretKey::generate(); -// let b_key = b_secret_key.public(); -// let resolver = iroh_net::dns::default_resolver().clone(); -// let (client_b, mut client_b_receiver) = -// ClientBuilder::new(relay_server_url.clone()).build(b_secret_key, resolver); -// client_b.connect().await?; - -// let msg = Bytes::from("hello, b"); -// client_a.send(b_key, msg.clone()).await?; - -// let (res, _) = client_b_receiver.recv().await.unwrap()?; -// if let ReceivedMessage::ReceivedPacket { source, data } = res { -// assert_eq!(a_key, source); -// assert_eq!(msg, data); -// } else { -// bail!("client_b received unexpected message {res:?}"); -// } - -// let msg = Bytes::from("howdy, a"); -// client_b.send(a_key, msg.clone()).await?; - -// let (res, _) = client_a_receiver.recv().await.unwrap()?; -// if let ReceivedMessage::ReceivedPacket { source, data } = res { -// assert_eq!(b_key, source); -// assert_eq!(msg, data); -// } else { -// bail!("client_a received unexpected message {res:?}"); -// } - -// // run stun check -// let stun_addr: SocketAddr = -// SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 3478); - -// let txid = stun::TransactionId::default(); -// let req = stun::request(txid); -// let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap()); - -// let server_socket = socket.clone(); -// let server_task = tokio::task::spawn(async move { -// let mut buf = vec![0u8; 64000]; -// let len = server_socket.recv(&mut buf).await.unwrap(); -// dbg!(len); -// buf.truncate(len); -// buf -// }); - -// tracing::info!("sending stun request to {stun_addr}"); -// if let Err(e) = socket.send_to(&req, stun_addr).await { -// bail!("socket.send_to error: {e:?}"); -// } - -// let response = server_task.await.unwrap(); -// let (txid_back, response_addr) = stun::parse_response(&response).unwrap(); -// assert_eq!(txid, txid_back); -// tracing::info!("got {response_addr}"); - -// // get 200 home page response -// tracing::info!("send request for homepage"); -// let res = reqwest::get(relay_server_str_url).await?; -// assert!(res.status().is_success()); -// tracing::info!("got OK"); - -// // test captive portal -// tracing::info!("test captive portal response"); - -// let url = relay_server_url.join("/generate_204")?; -// let challenge = "123az__."; -// let client = reqwest::Client::new(); -// let res = client -// .get(url) -// .header(NO_CONTENT_CHALLENGE_HEADER, challenge) -// .send() -// .await?; -// assert_eq!(StatusCode::NO_CONTENT.as_u16(), res.status().as_u16()); -// let header = res.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); -// assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); -// let body = res.bytes().await?; -// assert!(body.is_empty()); - -// tracing::info!("got successful captive portal response"); +#[cfg(test)] +mod tests { + use super::*; -// Ok(()) -// } -// } + #[test] + fn test_escape_hostname() { + assert_eq!( + escape_hostname("hello.host.name_foo-bar%baz"), + "hello.host.namefoo-barbaz" + ); + } +} diff --git a/iroh-net/src/netcheck.rs b/iroh-net/src/netcheck.rs index 574115c55c..391e174202 100644 --- a/iroh-net/src/netcheck.rs +++ b/iroh-net/src/netcheck.rs @@ -787,7 +787,6 @@ mod tests { use crate::defaults::{DEFAULT_STUN_PORT, EU_RELAY_HOSTNAME}; use crate::ping::Pinger; - use crate::relay::iroh_relay; use crate::relay::RelayNode; use super::*; @@ -795,18 +794,12 @@ mod tests { #[tokio::test] async fn test_basic() -> Result<()> { let _guard = iroh_test::logging::setup(); - let stun_server = iroh_relay::Server::new(iroh_relay::ServerConfig { - relay: None, - stun: iroh_relay::StunConfig { - bind_addr: (Ipv4Addr::LOCALHOST, DEFAULT_STUN_PORT).into(), - }, - metrics_addr: None, - }); - let stun_addr = stun_server.stun_addr(); + let (stun_addr, stun_stats, _cleanup_guard) = + stun::tests::serve("127.0.0.1".parse().unwrap()).await?; let resolver = crate::dns::default_resolver(); let mut client = Client::new(None, resolver.clone())?; - let dm = stun::test::relay_map_of([stun_addr].into_iter()); + let dm = stun::tests::relay_map_of([stun_addr].into_iter()); // Note that the ProbePlan will change with each iteration. for i in 0..5 { @@ -897,7 +890,7 @@ mod tests { // the STUN server being blocked will look like from the client's perspective. let blackhole = tokio::net::UdpSocket::bind("127.0.0.1:0").await?; let stun_addr = blackhole.local_addr()?; - let dm = stun::test::relay_map_of_opts([(stun_addr, false)].into_iter()); + let dm = stun::tests::relay_map_of_opts([(stun_addr, false)].into_iter()); // Now create a client and generate a report. let resolver = crate::dns::default_resolver().clone(); @@ -1134,8 +1127,8 @@ mod tests { // can easily use to identify the packet. // Setup STUN server and create relay_map. - let (stun_addr, _stun_stats, _done) = stun::test::serve_v4().await?; - let dm = stun::test::relay_map_of([stun_addr].into_iter()); + let (stun_addr, _stun_stats, _done) = stun::tests::serve_v4().await?; + let dm = stun::tests::relay_map_of([stun_addr].into_iter()); dbg!(&dm); let resolver = crate::dns::default_resolver().clone(); diff --git a/iroh-net/src/relay/http/server.rs b/iroh-net/src/relay/http/server.rs index 0be768a13a..eaf6ffd70a 100644 --- a/iroh-net/src/relay/http/server.rs +++ b/iroh-net/src/relay/http/server.rs @@ -334,7 +334,7 @@ impl ServerState { } = self; let listener = TcpListener::bind(&addr) .await - .context("failed to bind https")?; + .context("failed to bind server socket")?; // we will use this cancel token to stop the infinite loop in the `listener.accept() task` let cancel_server_loop = CancellationToken::new(); let addr = listener.local_addr()?; diff --git a/iroh-net/src/relay/iroh_relay.rs b/iroh-net/src/relay/iroh_relay.rs index b3b25a45ad..62bc93bf72 100644 --- a/iroh-net/src/relay/iroh_relay.rs +++ b/iroh-net/src/relay/iroh_relay.rs @@ -55,6 +55,9 @@ fn body_empty() -> BytesBody { } /// Configuration for the full Relay & STUN server. +/// +/// Be aware the generic parameters are for when using the Let's Encrypt TLS configuration. +/// If not used dummy ones need to be provided, e.g. `ServerConfig::<(), ()>::default()`. #[derive(Debug, Default)] pub struct ServerConfig { /// Configuration for the DERP server, disabled if `None`. @@ -180,6 +183,7 @@ impl Server { #[cfg(feature = "metrics")] if let Some(addr) = config.metrics_addr { + debug!("Starting metrics server"); use iroh_metrics::core::Metric; iroh_metrics::core::Core::init(|reg, metrics| { @@ -194,22 +198,27 @@ impl Server { // Start the STUN server. let stun_addr = match config.stun { - Some(stun) => match UdpSocket::bind(stun.bind_addr).await { - Ok(sock) => { - let addr = sock.local_addr()?; - tasks.spawn( - server_stun_listener(sock).instrument(info_span!("stun-server", %addr)), - ); - Some(addr) + Some(stun) => { + debug!("Starting STUN server"); + match UdpSocket::bind(stun.bind_addr).await { + Ok(sock) => { + let addr = sock.local_addr()?; + info!("STUN server bound on {addr}"); + tasks.spawn( + server_stun_listener(sock).instrument(info_span!("stun-server", %addr)), + ); + Some(addr) + } + Err(err) => bail!("failed to bind STUN listener: {err:#?}"), } - Err(err) => bail!("failed to bind STUN listener: {err:#?}"), - }, + } None => None, }; // Start the Relay server. let (relay_server, http_addr) = match config.relay { Some(relay_config) => { + debug!("Starting Relay server"); let mut headers = HeaderMap::new(); for (name, value) in TLS_HEADERS.iter() { headers.insert(*name, value.parse()?); @@ -370,13 +379,20 @@ async fn relay_supervisor( mut tasks: JoinSet>, mut relay_http_server: Option, ) -> Result<()> { - let res = tokio::select! { - biased; - Some(ret) = tasks.join_next() => ret, - ret = relay_http_server.as_mut().expect("protected by if branch").0.task_handle(), - if relay_http_server.is_some() - => ret.map(|res| Ok(res)), - else => Ok(Err(anyhow!("Empty JoinSet"))), + let res = match (relay_http_server.as_mut(), tasks.len()) { + (None, _) => tasks + .join_next() + .await + .unwrap_or_else(|| Ok(Err(anyhow!("Nothing to supervise")))), + (Some(relay), 0) => relay.0.task_handle().await.and_then(|r| Ok(anyhow::Ok(r))), + (Some(relay), _) => { + tokio::select! { + biased; + Some(ret) = tasks.join_next() => ret, + ret = relay.0.task_handle() => ret.and_then(|r| Ok(anyhow::Ok(r))), + else => Ok(Err(anyhow!("Empty JoinSet (unreachable)"))), + } + } }; let ret = match res { Ok(Ok(())) => { @@ -406,141 +422,11 @@ async fn relay_supervisor( ret } -// /// Supervisor for a set of fallible tasks. -// /// -// /// As soon as one of the tasks fails, the supervisor will exit with a failure. Thus -// /// dropping the [`JoinSet`] and aborting all remaining tasks. -// async fn supervisor(mut tasks: JoinSet>) -> Result<()> { -// while let Some(res) = tasks.join_next().await { -// match res { -// Ok(_) => continue, -// Err(err) => bail!("Task failed: {err:#}"), -// } -// } -// Ok(()) -// } - -// /// An actor which supervises other tasks, with no restarting and one-for-all strategy. -// /// -// /// The supervisor itself does no restarting of tasks. It only terminates all other tasks -// /// when one fails. It is essentially a one-for-all supervisor strategy with a max-restarts -// /// count of 0. -// #[derive(Debug)] -// struct TaskSupervisor { -// addr_tx: mpsc::Sender, -// addr_rx: mpsc::Receiver, -// tasks: FuturesUnordered>>, -// } - -// impl TaskSupervisor { -// fn new() -> Self { -// let (addr_tx, addr_rx) = mpsc::channel(16); -// Self { -// addr_tx, -// addr_rx, -// tasks: FuturesUnordered::new(), -// } -// } - -// async fn run(&mut self) { -// // Note this can never fail! -// loop { -// tokio::select! { -// biased; -// res = self.addr_rx.recv() => { -// match res { -// Some(msg) => self.handle_msg(msg), -// None => { -// error!("All senders closed, impossible"); -// break; -// } -// } -// } -// item = self.tasks.next() => { -// match item { -// Some(res) => { -// self.handle_task_finished(res); -// if self.tasks.is_terminated() { -// break; -// } -// } -// None => break, -// } -// } -// } -// } -// debug!("Supervisor finished"); -// } - -// fn handle_msg(&mut self, msg: SupervisorMessage) { -// match msg { -// SupervisorMessage::AddTask(task) => { -// self.tasks.push(task); -// } -// SupervisorMessage::Abort => { -// for task in self.tasks.iter() { -// task.abort(); -// } -// } -// } -// } - -// fn handle_task_finished(&mut self, res: Result, JoinError>) { -// match res { -// Ok(Ok(())) => info!("Supervised task gracefully finished, aborting others"), -// Ok(Err(err)) => error!("Supervised task failed, aborting others. err: {err}"), -// Err(err) => { -// if err.is_cancelled() { -// info!("Supervised task cancelled, aborting others"); -// } -// if err.is_panic() { -// // TODO: We just swallow the panic. Unfortunately we can only resume -// // it, which is not (yet?) what we want? Or maybe it is. -// error!("Supervised task paniced, aborting others"); -// } -// } -// } -// for task in self.tasks.iter() { -// task.abort(); -// } -// } - -// fn addr(&self) -> SupervisorAddr { -// SupervisorAddr { -// tx: self.addr_tx.clone(), -// } -// } -// } - -// #[derive(Debug)] -// enum SupervisorMessage { -// AddTask(JoinHandle>), -// Abort, -// } - -// #[derive(Debug)] -// struct SupervisorAddr { -// tx: mpsc::Sender, -// } - -// impl SupervisorAddr { -// fn add_task( -// &self, -// task: JoinHandle>, -// ) -> Result<(), mpsc::error::TrySendError> { -// self.tx.try_send(SupervisorMessage::AddTask(task)) -// } - -// fn shutdown(&self) -> Result<(), mpsc::error::TrySendError> { -// self.tx.try_send(SupervisorMessage::Abort) -// } -// } - /// Runs a STUN server. /// /// When the future is dropped, the server stops. async fn server_stun_listener(sock: UdpSocket) -> Result<()> { - info!("running STUN server"); + info!(addr = ?sock.local_addr().ok(), "running STUN server"); // TODO: re-write this as structured-concurrency and returning errors // let mut buffer = vec![0u8; 64 << 10]; @@ -828,3 +714,215 @@ mod metrics { } } } + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + use std::time::Duration; + + use bytes::Bytes; + use iroh_base::node_addr::RelayUrl; + + use crate::relay::http::ClientBuilder; + + use self::relay::ReceivedMessage; + + use super::*; + + #[tokio::test] + async fn test_no_services() { + let _guard = iroh_test::logging::setup(); + let mut server = Server::spawn(ServerConfig::<(), ()>::default()) + .await + .unwrap(); + let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle()) + .await + .expect("timeout, server not finished") + .expect("server task JoinError"); + assert!(res.is_err()); + } + + #[tokio::test] + async fn test_conflicting_bind() { + let _guard = iroh_test::logging::setup(); + let mut server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 1234).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: Some((Ipv4Addr::LOCALHOST, 1234).into()), + }) + .await + .unwrap(); + let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle()) + .await + .expect("timeout, server not finished") + .expect("server task JoinError"); + assert!(res.is_err()); // AddrInUse + } + + #[tokio::test] + async fn test_root_handler() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let url = format!("http://{}", server.http_addr().unwrap()); + + let response = reqwest::get(&url).await.unwrap(); + assert_eq!(response.status(), 200); + let body = response.text().await.unwrap(); + assert!(body.contains("iroh.computer")); + } + + #[tokio::test] + async fn test_captive_portal_service() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let url = format!("http://{}/generate_204", server.http_addr().unwrap()); + let challenge = "123az__."; + + let client = reqwest::Client::new(); + let response = client + .get(&url) + .header(NO_CONTENT_CHALLENGE_HEADER, challenge) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + let header = response.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); + assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); + let body = response.text().await.unwrap(); + assert!(body.is_empty()); + } + + #[tokio::test] + async fn test_relay_clients() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let relay_url = format!("http://{}", server.http_addr().unwrap()); + let relay_url: RelayUrl = relay_url.parse().unwrap(); + + // set up client a + let a_secret_key = SecretKey::generate(); + let a_key = a_secret_key.public(); + let resolver = crate::dns::default_resolver().clone(); + let (client_a, mut client_a_receiver) = + ClientBuilder::new(relay_url.clone()).build(a_secret_key, resolver); + let connect_client = client_a.clone(); + + // give the relay server some time to accept connections + if let Err(err) = tokio::time::timeout(Duration::from_secs(10), async move { + loop { + match connect_client.connect().await { + Ok(_) => break, + Err(err) => { + warn!("client unable to connect to relay server: {err:#}"); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + }) + .await + { + panic!("error connecting to relay server: {err:#}"); + } + + // set up client b + let b_secret_key = SecretKey::generate(); + let b_key = b_secret_key.public(); + let resolver = crate::dns::default_resolver().clone(); + let (client_b, mut client_b_receiver) = + ClientBuilder::new(relay_url.clone()).build(b_secret_key, resolver); + client_b.connect().await.unwrap(); + + // send message from a to b + let msg = Bytes::from("hello, b"); + client_a.send(b_key, msg.clone()).await.unwrap(); + + let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + if let ReceivedMessage::ReceivedPacket { source, data } = res { + assert_eq!(a_key, source); + assert_eq!(msg, data); + } else { + panic!("client_b received unexpected message {res:?}"); + } + + // send message from b to a + let msg = Bytes::from("howdy, a"); + client_b.send(a_key, msg.clone()).await.unwrap(); + + let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + if let ReceivedMessage::ReceivedPacket { source, data } = res { + assert_eq!(b_key, source); + assert_eq!(msg, data); + } else { + panic!("client_a received unexpected message {res:?}"); + } + } + + #[tokio::test] + async fn test_stun() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: None, + stun: Some(StunConfig { + bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + }), + metrics_addr: None, + }) + .await + .unwrap(); + + let txid = stun::TransactionId::default(); + let req = stun::request(txid); + let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + socket + .send_to(&req, server.stun_addr().unwrap()) + .await + .unwrap(); + + // get response + let mut buf = vec![0u8; 64000]; + let (len, addr) = socket.recv_from(&mut buf).await.unwrap(); + assert_eq!(addr, server.stun_addr().unwrap()); + buf.truncate(len); + let (txid_back, response_addr) = stun::parse_response(&buf).unwrap(); + assert_eq!(txid, txid_back); + assert_eq!(response_addr, socket.local_addr().unwrap()); + } +} diff --git a/iroh-net/src/stun.rs b/iroh-net/src/stun.rs index 2ec1dad5f2..e0ed936782 100644 --- a/iroh-net/src/stun.rs +++ b/iroh-net/src/stun.rs @@ -149,9 +149,10 @@ pub fn parse_response(b: &[u8]) -> Result<(TransactionId, SocketAddr), Error> { Err(Error::MalformedAttrs) } -#[cfg(any(test, feature = "test-utils"))] -pub(crate) mod test { - use std::{net::IpAddr, sync::Arc}; +#[cfg(test)] +pub(crate) mod tests { + use std::net::{IpAddr, Ipv4Addr}; + use std::sync::Arc; use anyhow::Result; use tokio::{ @@ -160,30 +161,28 @@ pub(crate) mod test { }; use tracing::{debug, trace}; - #[cfg(test)] use crate::relay::{RelayMap, RelayNode, RelayUrl}; use crate::test_utils::CleanupDropGuard; use super::*; + // TODO: make all this private + // (read_ipv4, read_ipv5) #[derive(Debug, Default, Clone)] pub struct StunStats(Arc>); impl StunStats { - #[cfg(test)] pub async fn total(&self) -> usize { let s = self.0.lock().await; s.0 + s.1 } } - #[cfg(test)] pub fn relay_map_of(stun: impl Iterator) -> RelayMap { relay_map_of_opts(stun.map(|addr| (addr, true))) } - #[cfg(test)] pub fn relay_map_of_opts(stun: impl Iterator) -> RelayMap { let nodes = stun.map(|(addr, stun_only)| { let host = addr.ip(); @@ -202,7 +201,6 @@ pub(crate) mod test { /// Sets up a simple STUN server binding to `0.0.0.0:0`. /// /// See [`serve`] for more details. - #[cfg(test)] pub(crate) async fn serve_v4() -> Result<(SocketAddr, StunStats, CleanupDropGuard)> { serve(std::net::Ipv4Addr::UNSPECIFIED.into()).await } @@ -272,13 +270,6 @@ pub(crate) mod test { } } } -} - -#[cfg(test)] -mod tests { - use std::net::{IpAddr, Ipv4Addr}; - - use super::*; // Test to check if an existing stun server works // #[tokio::test]