diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index e30512c4f3..a28f749cf6 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -27,7 +27,7 @@ use iroh::{ }, docs::{Capability, DocTicket}, net::{ - defaults::DEFAULT_RELAY_STUN_PORT, + defaults::DEFAULT_STUN_PORT, discovery::{ dns::DnsDiscovery, pkarr_publish::PkarrPublisher, ConcurrentDiscovery, Discovery, }, @@ -93,7 +93,7 @@ pub enum Commands { #[clap(long)] stun_host: Option, /// The port of the STUN server. - #[clap(long, default_value_t = DEFAULT_RELAY_STUN_PORT)] + #[clap(long, default_value_t = DEFAULT_STUN_PORT)] stun_port: u16, }, /// Wait for incoming requests from iroh doctor connect @@ -631,7 +631,7 @@ async fn passive_side(gui: Gui, connection: Connection) -> anyhow::Result<()> { } fn configure_local_relay_map() -> RelayMap { - let stun_port = DEFAULT_RELAY_STUN_PORT; + let stun_port = DEFAULT_STUN_PORT; let url = "http://localhost:3340".parse().unwrap(); RelayMap::default_from_node(url, stun_port) } @@ -669,7 +669,7 @@ async fn make_endpoint( }; let endpoint = endpoint.bind(0).await?; - tokio::time::timeout(Duration::from_secs(10), endpoint.local_endpoints().next()) + tokio::time::timeout(Duration::from_secs(10), endpoint.direct_addresses().next()) .await .context("wait for relay connection")? .context("no endpoints")?; @@ -727,7 +727,7 @@ async fn accept( ) -> anyhow::Result<()> { let endpoint = make_endpoint(secret_key.clone(), relay_map, discovery).await?; let endpoints = endpoint - .local_endpoints() + .direct_addresses() .next() .await .context("no endpoints")?; diff --git a/iroh-gossip/examples/chat.rs b/iroh-gossip/examples/chat.rs index cf99a6942e..f9bf38863f 100644 --- a/iroh-gossip/examples/chat.rs +++ b/iroh-gossip/examples/chat.rs @@ -206,11 +206,11 @@ async fn handle_connection( let alpn = conn.alpn().await?; let conn = conn.await?; let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?; - match alpn.as_bytes() { - GOSSIP_ALPN => gossip - .handle_connection(conn) - .await - .context(format!("connection to {peer_id} with ALPN {alpn} failed"))?, + match alpn.as_ref() { + GOSSIP_ALPN => gossip.handle_connection(conn).await.context(format!( + "connection to {peer_id} with ALPN {} failed", + String::from_utf8_lossy(&alpn) + ))?, _ => println!("> ignoring connection from {peer_id}: unsupported ALPN protocol"), } Ok(()) diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 4027f91d5d..13d5940703 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -72,7 +72,7 @@ type ProtoMessage = proto::Message; #[derive(Debug, Clone)] pub struct Gossip { to_actor_tx: mpsc::Sender, - on_endpoints_tx: mpsc::Sender>, + on_direct_addrs_tx: mpsc::Sender>, _actor_handle: Arc>>, max_message_size: usize, } @@ -101,7 +101,7 @@ impl Gossip { to_actor_rx, in_event_rx, in_event_tx, - on_endpoints_rx, + on_direct_addr_rx: on_endpoints_rx, conns: Default::default(), conn_send_tx: Default::default(), pending_sends: Default::default(), @@ -123,7 +123,7 @@ impl Gossip { ); Self { to_actor_tx, - on_endpoints_tx, + on_direct_addrs_tx: on_endpoints_tx, _actor_handle: Arc::new(actor_handle), max_message_size, } @@ -243,16 +243,19 @@ impl Gossip { Ok(()) } - /// Set info on our local endpoints. + /// Set info on our direct addresses. /// /// This will be sent to peers on Neighbor and Join requests so that they can connect directly /// to us. /// /// This is only best effort, and will drop new events if backed up. - pub fn update_endpoints(&self, endpoints: &[iroh_net::config::Endpoint]) -> anyhow::Result<()> { - let endpoints = endpoints.to_vec(); - self.on_endpoints_tx - .try_send(endpoints) + pub fn update_direct_addresses( + &self, + addrs: &[iroh_net::endpoint::DirectAddr], + ) -> anyhow::Result<()> { + let addrs = addrs.to_vec(); + self.on_direct_addrs_tx + .try_send(addrs) .map_err(|_| anyhow!("endpoints channel dropped"))?; Ok(()) } @@ -344,7 +347,7 @@ struct Actor { /// Input events to the state (emitted from the connection loops) in_event_rx: mpsc::Receiver, /// Updates of discovered endpoint addresses - on_endpoints_rx: mpsc::Receiver>, + on_direct_addr_rx: mpsc::Receiver>, /// Queued timers timers: Timers, /// Currently opened quinn connections to peers @@ -377,7 +380,7 @@ impl Actor { } } }, - new_endpoints = self.on_endpoints_rx.recv() => { + new_endpoints = self.on_direct_addr_rx.recv() => { match new_endpoints { Some(endpoints) => { let addr = NodeAddr::from_parts( diff --git a/iroh-net/examples/connect-unreliable.rs b/iroh-net/examples/connect-unreliable.rs index 5438673557..3be041353c 100644 --- a/iroh-net/examples/connect-unreliable.rs +++ b/iroh-net/examples/connect-unreliable.rs @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); for local_endpoint in endpoint - .local_endpoints() + .direct_addresses() .next() .await .context("no endpoints")? diff --git a/iroh-net/examples/connect.rs b/iroh-net/examples/connect.rs index ccaffb6e54..216a4e42eb 100644 --- a/iroh-net/examples/connect.rs +++ b/iroh-net/examples/connect.rs @@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); for local_endpoint in endpoint - .local_endpoints() + .direct_addresses() .next() .await .context("no endpoints")? diff --git a/iroh-net/examples/listen-unreliable.rs b/iroh-net/examples/listen-unreliable.rs index ded70a0f56..7dbc5e246d 100644 --- a/iroh-net/examples/listen-unreliable.rs +++ b/iroh-net/examples/listen-unreliable.rs @@ -38,7 +38,7 @@ async fn main() -> anyhow::Result<()> { println!("node listening addresses:"); let local_addrs = endpoint - .local_endpoints() + .direct_addresses() .next() .await .context("no endpoints")? @@ -67,7 +67,8 @@ async fn main() -> anyhow::Result<()> { let conn = conn.await?; let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?; info!( - "new (unreliable) connection from {node_id} with ALPN {alpn} (coming from {})", + "new (unreliable) connection from {node_id} with ALPN {} (coming from {})", + String::from_utf8_lossy(&alpn), conn.remote_address() ); // spawn a task to handle reading and writing off of the connection diff --git a/iroh-net/examples/listen.rs b/iroh-net/examples/listen.rs index 9dc38ab258..6f538534a4 100644 --- a/iroh-net/examples/listen.rs +++ b/iroh-net/examples/listen.rs @@ -38,7 +38,7 @@ async fn main() -> anyhow::Result<()> { println!("node listening addresses:"); let local_addrs = endpoint - .local_endpoints() + .direct_addresses() .next() .await .context("no endpoints")? @@ -66,7 +66,8 @@ async fn main() -> anyhow::Result<()> { let conn = conn.await?; let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?; info!( - "new connection from {node_id} with ALPN {alpn} (coming from {})", + "new connection from {node_id} with ALPN {} (coming from {})", + String::from_utf8_lossy(&alpn), conn.remote_address() ); diff --git a/iroh-net/src/bin/iroh-relay.rs b/iroh-net/src/bin/iroh-relay.rs index f9717a46a1..45e076e66c 100644 --- a/iroh-net/src/bin/iroh-relay.rs +++ b/iroh-net/src/bin/iroh-relay.rs @@ -1,49 +1,28 @@ -//! A simple relay server. +//! A simple relay server for iroh-net. //! -//! Based on /tailscale/cmd/derper +//! This handles only the CLI and config file loading, the server implementation lives in +//! [`iroh_net::relay::iroh_relay`]. -use std::{ - borrow::Cow, - future::Future, - net::{IpAddr, Ipv6Addr, SocketAddr}, - path::{Path, PathBuf}, - pin::Pin, - sync::Arc, -}; +use std::net::{Ipv6Addr, SocketAddr}; +use std::path::{Path, PathBuf}; use anyhow::{anyhow, bail, Context as _, Result}; use clap::Parser; -use futures_lite::StreamExt; -use http::{response::Builder as ResponseBuilder, HeaderMap}; -use hyper::body::Incoming; -use hyper::{Method, Request, Response, StatusCode}; -use iroh_metrics::inc; -use iroh_net::defaults::{DEFAULT_RELAY_STUN_PORT, NA_RELAY_HOSTNAME}; -use iroh_net::key::SecretKey; -use iroh_net::relay::http::{ - ServerBuilder as RelayServerBuilder, TlsAcceptor, TlsConfig as RelayTlsConfig, +use iroh_net::defaults::{ + DEFAULT_HTTPS_PORT, DEFAULT_HTTP_PORT, DEFAULT_METRICS_PORT, DEFAULT_STUN_PORT, }; -use iroh_net::relay::{self}; -use iroh_net::stun; +use iroh_net::key::SecretKey; +use iroh_net::relay::iroh_relay; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use tokio::net::{TcpListener, UdpSocket}; use tokio_rustls_acme::{caches::DirCache, AcmeConfig}; -use tracing::{debug, debug_span, error, info, info_span, trace, warn, Instrument}; +use tracing::{debug, info}; use tracing_subscriber::{prelude::*, EnvFilter}; -use metrics::StunMetrics; - -type BytesBody = http_body_util::Full; -type HyperError = Box; -type HyperResult = std::result::Result; +/// The default `http_bind_port` when using `--dev`. +const DEV_MODE_HTTP_PORT: u16 = 3340; -/// Creates a new [`BytesBody`] with no content. -fn body_empty() -> BytesBody { - http_body_util::Full::new(hyper::body::Bytes::new()) -} - -/// A simple relay server. +/// A relay server for iroh-net. #[derive(Parser, Debug, Clone)] #[clap(version, about, long_about = None)] struct Cli { @@ -54,7 +33,10 @@ struct Cli { /// Running in dev mode will ignore any config file fields pertaining to TLS. #[clap(long, default_value_t = false)] dev: bool, - /// Config file path. Generate a default configuration file by supplying a path. + /// Path to the configuration file. + /// + /// If provided and no configuration file exists the default configuration will be + /// written to the file. #[clap(long, short)] config_path: Option, } @@ -65,73 +47,6 @@ enum CertMode { LetsEncrypt, } -impl CertMode { - async fn gen_server_config( - &self, - hostname: String, - contact: String, - is_production: bool, - dir: PathBuf, - ) -> Result<(Arc, TlsAcceptor)> { - let config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth(); - - match self { - CertMode::LetsEncrypt => { - let mut state = AcmeConfig::new(vec![hostname]) - .contact([format!("mailto:{contact}")]) - .cache_option(Some(DirCache::new(dir))) - .directory_lets_encrypt(is_production) - .state(); - - let config = config.with_cert_resolver(state.resolver()); - let acceptor = state.acceptor(); - - tokio::spawn( - async move { - while let Some(event) = state.next().await { - match event { - Ok(ok) => debug!("acme event: {:?}", ok), - Err(err) => error!("error: {:?}", err), - } - } - debug!("event stream finished"); - } - .instrument(info_span!("acme")), - ); - - Ok((Arc::new(config), TlsAcceptor::LetsEncrypt(acceptor))) - } - CertMode::Manual => { - // load certificates manually - let keyname = escape_hostname(&hostname); - let cert_path = dir.join(format!("{keyname}.crt")); - let key_path = dir.join(format!("{keyname}.key")); - - let (certs, secret_key) = tokio::task::spawn_blocking(move || { - let certs = load_certs(cert_path)?; - let key = load_secret_key(key_path)?; - anyhow::Ok((certs, key)) - }) - .await??; - - let config = config.with_single_cert(certs, secret_key)?; - let config = Arc::new(config); - let acceptor = tokio_rustls::TlsAcceptor::from(config.clone()); - - Ok((config, TlsAcceptor::Manual(acceptor))) - } - } - } -} - -fn escape_hostname(hostname: &str) -> Cow<'_, str> { - let unsafe_hostname_characters = - regex::Regex::new(r"[^a-zA-Z0-9-\.]").expect("regex manually checked"); - unsafe_hostname_characters.replace_all(hostname, "") -} - fn load_certs(filename: impl AsRef) -> Result> { let certfile = std::fs::File::open(filename).context("cannot open certificate file")?; let mut reader = std::io::BufReader::new(certfile); @@ -164,72 +79,203 @@ fn load_secret_key(filename: impl AsRef) -> Result { ); } +/// Configuration for the relay-server. +/// +/// This is (de)serialised to/from a TOML config file. #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct Config { - /// [`SecretKey`] for this relay server. + /// The iroh [`SecretKey`] for this relay server. + /// + /// If not specified a new key will be generated and the config file will be re-written + /// using it. #[serde_as(as = "DisplayFromStr")] #[serde(default = "SecretKey::generate")] secret_key: SecretKey, - /// Server listen address. + /// Whether to enable the Relay server. /// - /// Defaults to `[::]:443`. + /// Defaults to `true`. /// - /// If the port address is 443, the relay server will issue a warning if it is started - /// without a `tls` config. - addr: SocketAddr, - - /// The UDP port on which to serve STUN. The listener is bound to the same IP (if any) as - /// specified in the `addr` field. Defaults to [`DEFAULT_RELAY_STUN_PORT`]. - stun_port: u16, - /// Certificate hostname. Defaults to [`NA_RELAY_HOSTNAME`]. - hostname: String, + /// Disabling will leave only the STUN server. The `http_bind_addr` and `tls` + /// configuration options will be ignored. + #[serde(default = "cfg_defaults::enable_relay")] + enable_relay: bool, + /// The socket address to bind the Relay HTTP server on. + /// + /// Defaults to `[::]:80`. + /// + /// When running with `--dev` defaults to [::]:3340`. If specified overrides these + /// defaults. + /// + /// The Relay server always starts an HTTP server, this specifies the socket this will + /// be bound on. If there is no `tls` configuration set all the HTTP relay services + /// will be bound on this socket. Otherwise most Relay HTTP services will run on the + /// `https_bind_addr` of the `tls` configuration section and only the captive portal + /// will be served from the HTTP socket. + http_bind_addr: Option, + /// TLS specific configuration. + /// + /// TLS is disabled if not present and the Relay server will serve all services over + /// plain HTTP. + /// + /// If disabled all services will run on plain HTTP. The `--dev` option disables this, + /// regardless of what is in the configuration file. + tls: Option, /// Whether to run a STUN server. It will bind to the same IP as the `addr` field. /// /// Defaults to `true`. + #[serde(default = "cfg_defaults::enable_stun")] enable_stun: bool, - /// Whether to run a relay server. The only reason to set this false is if you're decommissioning a - /// server but want to keep its bootstrap DNS functionality still running. + /// The socket address to bind the STUN server on. /// - /// Defaults to `true` - enable_relay: bool, - /// TLS specific configuration - tls: Option, - /// Rate limiting configuration + /// Defaults to using the `http_bind_addr` with the port set to [`DEFAULT_STUN_PORT`]. + stun_bind_addr: Option, + /// Rate limiting configuration. + /// + /// Disabled if not present. limits: Option, - #[cfg(feature = "metrics")] - /// Metrics serve address. If not set, metrics are not served. - metrics_addr: Option, + /// Whether to run the metrics server. + /// + /// Defaults to `true`, when the metrics feature is enabled. + #[serde(default = "cfg_defaults::enable_metrics")] + enable_metrics: bool, + /// Metrics serve address. + /// + /// Defaults to `http_bind_addr` with the port set to [`DEFAULT_METRICS_PORT`] + /// (`[::]:9090` when `http_bind_addr` is set to the default). + metrics_bind_addr: Option, +} + +impl Config { + fn http_bind_addr(&self) -> SocketAddr { + self.http_bind_addr + .unwrap_or((Ipv6Addr::UNSPECIFIED, DEFAULT_HTTP_PORT).into()) + } + + fn stun_bind_addr(&self) -> SocketAddr { + self.stun_bind_addr + .unwrap_or_else(|| SocketAddr::new(self.http_bind_addr().ip(), DEFAULT_STUN_PORT)) + } + + fn metrics_bind_addr(&self) -> SocketAddr { + self.metrics_bind_addr + .unwrap_or_else(|| SocketAddr::new(self.http_bind_addr().ip(), DEFAULT_METRICS_PORT)) + } +} + +impl Default for Config { + fn default() -> Self { + Self { + secret_key: SecretKey::generate(), + enable_relay: true, + http_bind_addr: None, + tls: None, + enable_stun: true, + stun_bind_addr: None, + limits: None, + enable_metrics: true, + metrics_bind_addr: None, + } + } } -#[derive(Serialize, Deserialize)] +/// Defaults for fields from [`Config`]. +/// +/// These are the defaults that serde will fill in. Other defaults depends on each other +/// and can not immediately be substituded by serde. +mod cfg_defaults { + pub(crate) fn enable_relay() -> bool { + true + } + + pub(crate) fn enable_stun() -> bool { + true + } + + pub(crate) fn enable_metrics() -> bool { + true + } + + pub(crate) mod tls_config { + pub(crate) fn prod_tls() -> bool { + true + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] struct TlsConfig { - /// Mode for getting a cert. possible options: 'Manual', 'LetsEncrypt' - /// When using manual mode, a certificate will be read from `.crt` and a secret key from - /// `.key`, with the `` being the escaped hostname. + /// The socket address to bind the Relay HTTPS server on. + /// + /// Defaults to the `http_bind_addr` with the port set to `443`. + https_bind_addr: Option, + /// Certificate hostname when using LetsEncrypt. + hostname: Option, + /// Mode for getting a cert. + /// + /// Possible options: 'Manual', 'LetsEncrypt'. cert_mode: CertMode, + /// Directory to store LetsEncrypt certs or read manual certificates from. + /// + /// Defaults to the servers' current working directory. + cert_dir: Option, + /// Path of where to read the certificate from for the `Manual` `cert_mode`. + /// + /// Defaults to `/default.crt`. + /// + /// Only used when `cert_mode` is `Manual`. + manual_cert_path: Option, + /// Path of where to read the private key from for the `Manual` `cert_mode`. + /// + /// Defaults to `/default.key`. + /// + /// Only used when `cert_mode` is `Manual`. + manual_key_path: Option, /// Whether to use the LetsEncrypt production or staging server. /// - /// While in development, LetsEncrypt prefers you to use the staging server. However, the staging server seems to - /// only use `ECDSA` keys. In their current set up, you can only get intermediate certificates - /// for `ECDSA` keys if you are on their "allowlist". The production server uses `RSA` keys, - /// which allow for issuing intermediate certificates in all normal circumstances. - /// So, to have valid certificates, we must use the LetsEncrypt production server. - /// Read more here: - /// Default is true. This field is ignored if we are not using `cert_mode: CertMode::LetsEncrypt`. + /// Default is `true`. + /// + /// Only used when `cert_mode` is `LetsEncrypt`. + /// + /// While in development, LetsEncrypt prefers you to use the staging server. However, + /// the staging server seems to only use `ECDSA` keys. In their current set up, you can + /// only get intermediate certificates for `ECDSA` keys if you are on their + /// "allowlist". The production server uses `RSA` keys, which allow for issuing + /// intermediate certificates in all normal circumstances. So, to have valid + /// certificates, we must use the LetsEncrypt production server. Read more here: + /// . + #[serde(default = "cfg_defaults::tls_config::prod_tls")] prod_tls: bool, /// The contact email for the tls certificate. - contact: String, - /// Directory to store LetsEncrypt certs or read certificates from, if TLS is used. - cert_dir: Option, - /// The port on which to serve a response for the captive portal probe over HTTP. /// - /// The listener is bound to the same IP as specified in the `addr` field. Defaults to 80. - /// This field is only read in we are serving the relay server over HTTPS. In that case, we must listen for requests for the `/generate_204` over a non-TLS connection. - captive_portal_port: Option, + /// Used when `cert_mode` is `LetsEncrypt`. + contact: Option, } -#[derive(Serialize, Deserialize)] +impl TlsConfig { + fn https_bind_addr(&self, cfg: &Config) -> SocketAddr { + self.https_bind_addr + .unwrap_or_else(|| SocketAddr::new(cfg.http_bind_addr().ip(), DEFAULT_HTTPS_PORT)) + } + + fn cert_dir(&self) -> PathBuf { + self.cert_dir.clone().unwrap_or_else(|| PathBuf::from(".")) + } + + fn cert_path(&self) -> PathBuf { + self.manual_cert_path + .clone() + .unwrap_or_else(|| self.cert_dir().join("default.crt")) + } + + fn key_path(&self) -> PathBuf { + self.manual_key_path + .clone() + .unwrap_or_else(|| self.cert_dir().join("default.key")) + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] struct Limits { /// Rate limit for accepting new connection. Unlimited if not set. accept_conn_limit: Option, @@ -237,23 +283,6 @@ struct Limits { accept_conn_burst: Option, } -impl Default for Config { - fn default() -> Self { - Self { - secret_key: SecretKey::generate(), - addr: (Ipv6Addr::UNSPECIFIED, 443).into(), - stun_port: DEFAULT_RELAY_STUN_PORT, - hostname: NA_RELAY_HOSTNAME.into(), - enable_stun: true, - enable_relay: true, - tls: None, - limits: None, - #[cfg(feature = "metrics")] - metrics_addr: None, - } - } -} - impl Config { async fn load(opts: &Cli) -> Result { let config_path = if let Some(config_path) = &opts.config_path { @@ -274,12 +303,12 @@ impl Config { async fn read_from_file(path: impl AsRef) -> Result { if !path.as_ref().is_file() { - bail!("config-path must be a valid toml file"); + bail!("config-path must be a file"); } let config_ser = tokio::fs::read_to_string(&path) .await .context("unable to read config")?; - let config: Self = toml::from_str(&config_ser).context("unable to decode config")?; + let config: Self = toml::from_str(&config_ser).context("config file must be valid toml")?; if !config_ser.contains("secret_key") { info!("generating new secret key and updating config file"); config.write_to_file(path).await?; @@ -307,36 +336,6 @@ impl Config { } } -#[cfg(feature = "metrics")] -pub fn init_metrics_collection( - metrics_addr: Option, -) -> Option> { - use iroh_metrics::core::Metric; - - let rt = tokio::runtime::Handle::current(); - - // doesn't start the server if the address is None - if let Some(metrics_addr) = metrics_addr { - iroh_metrics::core::Core::init(|reg, metrics| { - metrics.insert(iroh_net::metrics::RelayMetrics::new(reg)); - metrics.insert(StunMetrics::new(reg)); - }); - - return Some(rt.spawn(async move { - if let Err(e) = iroh_metrics::metrics::start_metrics_server(metrics_addr).await { - eprintln!("Failed to start metrics server: {e}"); - } - })); - } - tracing::info!("Metrics server not started, no address provided"); - None -} - -/// Only used when in `dev` mode & the given port is `443` -const DEV_PORT: u16 = 3340; -/// Only used when tls is enabled & a captive protal port is not given -const DEFAULT_CAPTIVE_PORTAL_PORT: u16 = 80; - #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::registry() @@ -345,470 +344,100 @@ async fn main() -> Result<()> { .init(); let cli = Cli::parse(); - let cfg = Config::load(&cli).await?; - - #[cfg(feature = "metrics")] - let metrics_fut = init_metrics_collection(cfg.metrics_addr); - - let r = run(cli.dev, cfg, None).await; - - #[cfg(feature = "metrics")] - if let Some(metrics_fut) = metrics_fut { - metrics_fut.abort(); - drop(metrics_fut); - } - r -} - -async fn run( - dev_mode: bool, - cfg: Config, - addr_sender: Option>, -) -> Result<()> { - let (addr, tls_config) = if dev_mode { - let port = if cfg.addr.port() != 443 { - cfg.addr.port() - } else { - DEV_PORT - }; - - let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port); - info!(%addr, "Running in dev mode."); - (addr, None) - } else { - (cfg.addr, cfg.tls) - }; - - if let Some(tls_config) = &tls_config { - if let Some(captive_portal_port) = tls_config.captive_portal_port { - if addr.port() == captive_portal_port { - bail!("The main listening address {addr:?} and the `captive_portal_port` have the same port number."); - } + let mut cfg = Config::load(&cli).await?; + if cli.dev { + cfg.tls = None; + if cfg.http_bind_addr.is_none() { + cfg.http_bind_addr = Some((Ipv6Addr::UNSPECIFIED, DEV_MODE_HTTP_PORT).into()); } - } else if addr.port() == 443 { - // no tls config, but the port is 443 - warn!("The address port is 443, which is typically the expected tls port, but you have not supplied any tls configuration.\nIf you meant to run the relay server with tls enabled, adjust the config file to include tls configuration."); } + let relay_config = build_relay_config(cfg).await?; + debug!("{relay_config:#?}"); - // set up relay configuration details - let secret_key = if cfg.enable_relay { - Some(cfg.secret_key) - } else { - None - }; + let mut relay = iroh_relay::Server::spawn(relay_config).await?; - // run stun - let stun_task = if cfg.enable_stun { - Some(tokio::task::spawn(async move { - serve_stun(addr.ip(), cfg.stun_port).await - })) - } else { - None - }; - - // set up tls configuration details - let (tls_config, headers, captive_portal_port) = if let Some(tls_config) = tls_config { - let contact = tls_config.contact; - let is_production = tls_config.prod_tls; - let (config, acceptor) = tls_config - .cert_mode - .gen_server_config( - cfg.hostname.clone(), - contact, - is_production, - tls_config.cert_dir.unwrap_or_else(|| PathBuf::from(".")), - ) - .await?; - let mut headers = HeaderMap::new(); - for (name, value) in TLS_HEADERS.iter() { - headers.insert(*name, value.parse()?); - } - ( - Some(RelayTlsConfig { config, acceptor }), - headers, - tls_config - .captive_portal_port - .unwrap_or(DEFAULT_CAPTIVE_PORTAL_PORT), - ) - } else { - (None, HeaderMap::new(), 0) - }; - - let mut builder = RelayServerBuilder::new(addr) - .secret_key(secret_key.map(Into::into)) - .headers(headers) - .tls_config(tls_config.clone()) - .relay_override(Box::new(relay_disabled_handler)) - .request_handler(Method::GET, "/", Box::new(root_handler)) - .request_handler(Method::GET, "/index.html", Box::new(root_handler)) - .request_handler(Method::GET, "/derp/probe", Box::new(probe_handler)) - .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); - // if tls is enabled, we need to serve this endpoint from a non-tls connection - // which we check for below - if tls_config.is_none() { - builder = builder.request_handler( - Method::GET, - "/generate_204", - Box::new(serve_no_content_handler), - ); + tokio::select! { + biased; + _ = tokio::signal::ctrl_c() => (), + _ = relay.task_handle() => (), } - let relay_server = builder.spawn().await?; - // captive portal detections must be served over HTTP - let captive_portal_task = if tls_config.is_some() { - let http_addr = SocketAddr::new(addr.ip(), captive_portal_port); - let task = serve_captive_portal_service(http_addr).await?; - Some(task) - } else { - None - }; - - if let Some(addr_sender) = addr_sender { - if let Err(e) = addr_sender.send(relay_server.addr()) { - bail!("Unable to send the local SocketAddr, the Sender was dropped - {e:?}"); - } - } - - tokio::signal::ctrl_c().await?; - // Shutdown all tasks - if let Some(task) = stun_task { - task.abort(); - } - if let Some(task) = captive_portal_task { - task.abort() - } - relay_server.shutdown().await; - - Ok(()) + relay.shutdown().await } -const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge"; -const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response"; - -const NOTFOUND: &[u8] = b"Not Found"; -const RELAY_DISABLED: &[u8] = b"relay server disabled"; -const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; -const INDEX: &[u8] = br#" -

RELAY

-

- This is an - Iroh Relay - server. -

-"#; - -const TLS_HEADERS: [(&str, &str); 2] = [ - ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"), - ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'") -]; - -async fn serve_captive_portal_service(addr: SocketAddr) -> Result> { - let http_listener = TcpListener::bind(&addr) - .await - .context("failed to bind http")?; - let http_addr = http_listener.local_addr()?; - info!("[CaptivePortalService]: serving on {}", http_addr); - - let task = tokio::spawn( - async move { - loop { - match http_listener.accept().await { - Ok((stream, peer_addr)) => { - debug!( - "[CaptivePortalService] Connection opened from {}", - peer_addr - ); - let handler = CaptivePortalService; - - tokio::task::spawn(async move { - let stream = relay::MaybeTlsStreamServer::Plain(stream); - let stream = hyper_util::rt::TokioIo::new(stream); - if let Err(err) = hyper::server::conn::http1::Builder::new() - .serve_connection(stream, handler) - .with_upgrades() - .await - { - error!( - "[CaptivePortalService] Failed to serve connection: {:?}", - err - ); - } - }); - } - Err(err) => { - error!( - "[CaptivePortalService] failed to accept connection: {:#?}", - err - ); - } +/// Convert the TOML-loaded config to the [`iroh_relay::RelayConfig`] format. +async fn build_relay_config(cfg: Config) -> Result> { + let tls = match cfg.tls { + Some(ref tls) => { + let cert_config = match tls.cert_mode { + CertMode::Manual => { + let cert_path = tls.cert_path(); + let key_path = tls.key_path(); + // Could probably just do this blocking, we're only starting up. + let (private_key, certs) = tokio::task::spawn_blocking(move || { + let key = load_secret_key(key_path)?; + let certs = load_certs(cert_path)?; + anyhow::Ok((key, certs)) + }) + .await??; + iroh_relay::CertConfig::Manual { private_key, certs } } - } - } - .instrument(info_span!("captive-portal.service")), - ); - Ok(task) -} - -#[derive(Clone)] -struct CaptivePortalService; - -impl hyper::service::Service> for CaptivePortalService { - type Response = Response; - type Error = HyperError; - type Future = Pin> + Send>>; - - fn call(&self, req: Request) -> Self::Future { - match (req.method(), req.uri().path()) { - // Captive Portal checker - (&Method::GET, "/generate_204") => { - Box::pin(async move { serve_no_content_handler(req, Response::builder()) }) - } - _ => { - // Return 404 not found response. - let r = Response::builder() - .status(StatusCode::NOT_FOUND) - .body(NOTFOUND.into()) - .map_err(|err| Box::new(err) as HyperError); - Box::pin(async move { r }) - } - } - } -} - -fn relay_disabled_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::NOT_FOUND) - .body(RELAY_DISABLED.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn root_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .header("Content-Type", "text/html; charset=utf-8") - .body(INDEX.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -/// HTTP latency queries -fn probe_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .header("Access-Control-Allow-Origin", "*") - .body(body_empty()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn robots_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .body(ROBOTS_TXT.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -/// For captive portal detection. -fn serve_no_content_handler( - r: Request, - mut response: ResponseBuilder, -) -> HyperResult> { - if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { - if !challenge.is_empty() - && challenge.len() < 64 - && challenge - .as_bytes() - .iter() - .all(|c| is_challenge_char(*c as char)) - { - response = response.header( - NO_CONTENT_RESPONSE_HEADER, - format!("response {}", challenge.to_str()?), - ); - } - } - - response - .status(StatusCode::NO_CONTENT) - .body(body_empty()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn is_challenge_char(c: char) -> bool { - // Semi-randomly chosen as a limited set of valid characters - c.is_ascii_lowercase() - || c.is_ascii_uppercase() - || c.is_ascii_digit() - || c == '.' - || c == '-' - || c == '_' -} - -async fn serve_stun(host: IpAddr, port: u16) { - match UdpSocket::bind((host, port)).await { - Ok(sock) => { - let addr = sock.local_addr().expect("socket just bound"); - info!(%addr, "running STUN server"); - server_stun_listener(sock) - .instrument(debug_span!("stun_server", %addr)) - .await; - } - Err(err) => { - error!( - "failed to open STUN listener at host {host} and port {port}: {:#?}", - err - ); - } - } -} - -async fn server_stun_listener(sock: UdpSocket) { - let sock = Arc::new(sock); - let mut buffer = vec![0u8; 64 << 10]; - loop { - match sock.recv_from(&mut buffer).await { - Ok((n, src_addr)) => { - inc!(StunMetrics, requests); - let pkt = buffer[..n].to_vec(); - let sock = sock.clone(); - tokio::task::spawn(async move { - if !stun::is(&pkt) { - debug!(%src_addr, "STUN: ignoring non stun packet"); - inc!(StunMetrics, bad_requests); - return; - } - match tokio::task::spawn_blocking(move || stun::parse_binding_request(&pkt)) - .await - { - Ok(Ok(txid)) => { - debug!(%src_addr, %txid, "STUN: received binding request"); - let res = match tokio::task::spawn_blocking(move || { - stun::response(txid, src_addr) - }) - .await - { - Ok(res) => res, - Err(err) => { - error!("JoinError: {err:#}"); - return; - } - }; - match sock.send_to(&res, src_addr).await { - Ok(len) => { - if len != res.len() { - warn!(%src_addr, %txid, "STUN: failed to write response sent: {}, but expected {}", len, res.len()); - } - match src_addr { - SocketAddr::V4(_) => { - inc!(StunMetrics, ipv4_success); - } - SocketAddr::V6(_) => { - inc!(StunMetrics, ipv6_success); - } - } - trace!(%src_addr, %txid, "STUN: sent {} bytes", len); - } - Err(err) => { - inc!(StunMetrics, failures); - warn!(%src_addr, %txid, "STUN: failed to write response: {:?}", err); - } - } - } - Ok(Err(err)) => { - inc!(StunMetrics, bad_requests); - warn!(%src_addr, "STUN: invalid binding request: {:?}", err); - } - Err(err) => error!("JoinError parsing STUN binding: {err:#}"), - } - }); - } - Err(err) => { - inc!(StunMetrics, failures); - warn!("STUN: failed to recv: {:?}", err); - } + CertMode::LetsEncrypt => { + let hostname = tls + .hostname + .clone() + .context("LetsEncrypt needs a hostname")?; + let contact = tls + .contact + .clone() + .context("LetsEncrypt needs a contact email")?; + let config = AcmeConfig::new(vec![hostname.clone()]) + .contact([format!("mailto:{}", contact)]) + .cache_option(Some(DirCache::new(tls.cert_dir()))) + .directory_lets_encrypt(tls.prod_tls); + iroh_relay::CertConfig::LetsEncrypt { config } + } + }; + Some(iroh_relay::TlsConfig { + https_bind_addr: tls.https_bind_addr(&cfg), + cert: cert_config, + }) } - } + None => None, + }; + let limits = iroh_relay::Limits { + accept_conn_limit: cfg + .limits + .as_ref() + .map(|l| l.accept_conn_limit) + .unwrap_or_default(), + accept_conn_burst: cfg + .limits + .as_ref() + .map(|l| l.accept_conn_burst) + .unwrap_or_default(), + }; + let relay_config = iroh_relay::RelayConfig { + secret_key: cfg.secret_key.clone(), + http_bind_addr: cfg.http_bind_addr(), + tls, + limits, + }; + let stun_config = iroh_relay::StunConfig { + bind_addr: cfg.stun_bind_addr(), + }; + Ok(iroh_relay::ServerConfig { + relay: Some(relay_config), + stun: Some(stun_config), + #[cfg(feature = "metrics")] + metrics_addr: if cfg.enable_metrics { + Some(cfg.metrics_bind_addr()) + } else { + None + }, + }) } -// var validProdHostname = regexp.MustCompile(`^relay([^.]*)\.tailscale\.com\.?$`) - -// func prodAutocertHostPolicy(_ context.Context, host string) error { -// if validProdHostname.MatchString(host) { -// return nil -// } -// return errors.New("invalid hostname") -// } - -// func rateLimitedListenAndServeTLS(srv *http.Server) error { -// addr := srv.Addr -// if addr == "" { -// addr = ":https" -// } -// ln, err := net.Listen("tcp", addr) -// if err != nil { -// return err -// } -// rln := newRateLimitedListener(ln, rate.Limit(*acceptConnLimit), *acceptConnBurst) -// expvar.Publish("tls_listener", rln.ExpVar()) -// defer rln.Close() -// return srv.ServeTLS(rln, "", "") -// } - -// type rateLimitedListener struct { -// // These are at the start of the struct to ensure 64-bit alignment -// // on 32-bit architecture regardless of what other fields may exist -// // in this package. -// numAccepts expvar.Int // does not include number of rejects -// numRejects expvar.Int - -// net.Listener - -// lim *rate.Limiter -// } - -// func newRateLimitedListener(ln net.Listener, limit rate.Limit, burst int) *rateLimitedListener { -// return &rateLimitedListener{Listener: ln, lim: rate.NewLimiter(limit, burst)} -// } - -// func (l *rateLimitedListener) ExpVar() expvar.Var { -// m := new(metrics.Set) -// m.Set("counter_accepted_connections", &l.numAccepts) -// m.Set("counter_rejected_connections", &l.numRejects) -// return m -// } - -// var errLimitedConn = errors.New("cannot accept connection; rate limited") - -// func (l *rateLimitedListener) Accept() (net.Conn, error) { -// // Even under a rate limited situation, we accept the connection immediately -// // and close it, rather than being slow at accepting new connections. -// // This provides two benefits: 1) it signals to the client that something -// // is going on on the server, and 2) it prevents new connections from -// // piling up and occupying resources in the OS kernel. -// // The client will retry as needing (with backoffs in place). -// cn, err := l.Listener.Accept() -// if err != nil { -// return nil, err -// } -// if !l.lim.Allow() { -// l.numRejects.Add(1) -// cn.Close() -// return nil, errLimitedConn -// } -// l.numAccepts.Add(1) -// return cn, nil -// } -// mod metrics { use iroh_metrics::{ core::{Counter, Metric}, @@ -856,206 +485,3 @@ mod metrics { } } } - -#[cfg(test)] -mod tests { - use super::*; - - use std::net::Ipv4Addr; - use std::time::Duration; - - use bytes::Bytes; - use http_body_util::BodyExt; - use iroh_base::node_addr::RelayUrl; - use iroh_net::relay::http::ClientBuilder; - use iroh_net::relay::ReceivedMessage; - use tokio::task::JoinHandle; - - #[tokio::test] - async fn test_serve_no_content_handler() { - let challenge = "123az__."; - let req = Request::builder() - .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .body(body_empty()) - .unwrap(); - - let res = serve_no_content_handler(req, Response::builder()).unwrap(); - assert_eq!(res.status(), StatusCode::NO_CONTENT); - - let header = res - .headers() - .get(NO_CONTENT_RESPONSE_HEADER) - .unwrap() - .to_str() - .unwrap(); - assert_eq!(header, format!("response {challenge}")); - assert!(res - .into_body() - .collect() - .await - .unwrap() - .to_bytes() - .is_empty()); - } - - #[test] - fn test_escape_hostname() { - assert_eq!( - escape_hostname("hello.host.name_foo-bar%baz"), - "hello.host.namefoo-barbaz" - ); - } - - struct DropServer { - server_task: JoinHandle<()>, - } - - impl Drop for DropServer { - fn drop(&mut self) { - self.server_task.abort(); - } - } - - #[tokio::test] - async fn test_relay_server_basic() -> Result<()> { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); - // Binding to LOCALHOST to satisfy issues when binding to UNSPECIFIED in Windows for tests - // Binding to Ipv4 because, when binding to `IPv6::UNSPECIFIED`, it will also listen for - // IPv4 connections, but will not automatically do the same for `LOCALHOST`. In order to - // test STUN, which only listens on Ipv4, we must bind the whole relay server to Ipv4::LOCALHOST. - let cfg = Config { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), - ..Default::default() - }; - let (addr_send, addr_recv) = tokio::sync::oneshot::channel(); - let relay_server_task = tokio::spawn( - async move { - // dev mode will bind to IPv6::UNSPECIFIED, so setting it `false` - let res = run(false, cfg, Some(addr_send)).await; - if let Err(e) = res { - eprintln!("error starting relay server {e}"); - } - } - .instrument(debug_span!("relay server")), - ); - let _drop_server = DropServer { - server_task: relay_server_task, - }; - - let relay_server_addr = addr_recv.await?; - let relay_server_str_url = format!("http://{}", relay_server_addr); - let relay_server_url: RelayUrl = relay_server_str_url.parse().unwrap(); - - // set up clients - let a_secret_key = SecretKey::generate(); - let a_key = a_secret_key.public(); - let resolver = iroh_net::dns::default_resolver().clone(); - let (client_a, mut client_a_receiver) = - ClientBuilder::new(relay_server_url.clone()).build(a_secret_key, resolver); - let connect_client = client_a.clone(); - - // give the relay server some time to set up - if let Err(e) = tokio::time::timeout(Duration::from_secs(10), async move { - loop { - match connect_client.connect().await { - Ok(_) => break, - Err(e) => { - tracing::warn!("client a unable to connect to relay server: {e:?}. Attempting to dial again in 10ms"); - tokio::time::sleep(Duration::from_millis(100)).await - } - } - } - }) - .await - { - bail!("error connecting client a to relay server: {e:?}"); - } - - let b_secret_key = SecretKey::generate(); - let b_key = b_secret_key.public(); - let resolver = iroh_net::dns::default_resolver().clone(); - let (client_b, mut client_b_receiver) = - ClientBuilder::new(relay_server_url.clone()).build(b_secret_key, resolver); - client_b.connect().await?; - - let msg = Bytes::from("hello, b"); - client_a.send(b_key, msg.clone()).await?; - - let (res, _) = client_b_receiver.recv().await.unwrap()?; - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(a_key, source); - assert_eq!(msg, data); - } else { - bail!("client_b received unexpected message {res:?}"); - } - - let msg = Bytes::from("howdy, a"); - client_b.send(a_key, msg.clone()).await?; - - let (res, _) = client_a_receiver.recv().await.unwrap()?; - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(b_key, source); - assert_eq!(msg, data); - } else { - bail!("client_a received unexpected message {res:?}"); - } - - // run stun check - let stun_addr: SocketAddr = - SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 3478); - - let txid = stun::TransactionId::default(); - let req = stun::request(txid); - let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap()); - - let server_socket = socket.clone(); - let server_task = tokio::task::spawn(async move { - let mut buf = vec![0u8; 64000]; - let len = server_socket.recv(&mut buf).await.unwrap(); - dbg!(len); - buf.truncate(len); - buf - }); - - tracing::info!("sending stun request to {stun_addr}"); - if let Err(e) = socket.send_to(&req, stun_addr).await { - bail!("socket.send_to error: {e:?}"); - } - - let response = server_task.await.unwrap(); - let (txid_back, response_addr) = stun::parse_response(&response).unwrap(); - assert_eq!(txid, txid_back); - tracing::info!("got {response_addr}"); - - // get 200 home page response - tracing::info!("send request for homepage"); - let res = reqwest::get(relay_server_str_url).await?; - assert!(res.status().is_success()); - tracing::info!("got OK"); - - // test captive portal - tracing::info!("test captive portal response"); - - let url = relay_server_url.join("/generate_204")?; - let challenge = "123az__."; - let client = reqwest::Client::new(); - let res = client - .get(url) - .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .send() - .await?; - assert_eq!(StatusCode::NO_CONTENT.as_u16(), res.status().as_u16()); - let header = res.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); - assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); - let body = res.bytes().await?; - assert!(body.is_empty()); - - tracing::info!("got successful captive portal response"); - - Ok(()) - } -} diff --git a/iroh-net/src/config.rs b/iroh-net/src/config.rs deleted file mode 100644 index 8c98749810..0000000000 --- a/iroh-net/src/config.rs +++ /dev/null @@ -1,128 +0,0 @@ -//! Configuration types. - -use std::{collections::BTreeMap, fmt::Display, net::SocketAddr}; - -use crate::relay::RelayUrl; - -use super::portmapper; - -// TODO: This re-uses "Endpoint" again, a term that already means "a quic endpoint" and "a -// magicsock endpoint". this time it means "an IP address on which our local magicsock -// endpoint is listening". Name this better. -/// An endpoint IPPort and an associated type. -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct Endpoint { - /// The address of the endpoint. - pub addr: SocketAddr, - /// The kind of endpoint. - pub typ: EndpointType, -} - -/// Type of endpoint. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub enum EndpointType { - /// Endpoint kind has not been determined yet. - Unknown, - /// Endpoint is bound to a local address. - Local, - /// Endpoint has a publicly reachable address found via STUN. - Stun, - /// Endpoint uses a port mapping in the router. - Portmapped, - /// Hard NAT: STUN'ed IPv4 address + local fixed port. - Stun4LocalPort, -} - -impl Display for EndpointType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - EndpointType::Unknown => write!(f, "?"), - EndpointType::Local => write!(f, "local"), - EndpointType::Stun => write!(f, "stun"), - EndpointType::Portmapped => write!(f, "portmap"), - EndpointType::Stun4LocalPort => write!(f, "stun4localport"), - } - } -} - -/// Contains information about the host's network state. -#[derive(Debug, Clone, PartialEq)] -pub struct NetInfo { - /// Says whether the host's NAT mappings vary based on the destination IP. - pub mapping_varies_by_dest_ip: Option, - - /// If their router does hairpinning. It reports true even if there's no NAT involved. - pub hair_pinning: Option, - - /// Whether the host has IPv6 internet connectivity. - pub working_ipv6: Option, - - /// Whether the OS supports IPv6 at all, regardless of whether IPv6 internet connectivity is available. - pub os_has_ipv6: Option, - - /// Whether the host has UDP internet connectivity. - pub working_udp: Option, - - /// Whether ICMPv4 works, `None` means not checked. - pub working_icmp_v4: Option, - - /// Whether ICMPv6 works, `None` means not checked. - pub working_icmp_v6: Option, - - /// Whether we have an existing portmap open (UPnP, PMP, or PCP). - pub have_port_map: bool, - - /// Probe indicating the presence of port mapping protocols on the LAN. - pub portmap_probe: Option, - - /// This node's preferred relay server for incoming traffic. The node might be be temporarily - /// connected to multiple relay servers (to send to other nodes) - /// but PreferredRelay is the instance number that the node - /// subscribes to traffic at. Zero means disconnected or unknown. - pub preferred_relay: Option, - - /// LinkType is the current link type, if known. - pub link_type: Option, - - /// The fastest recent time to reach various relay STUN servers, in seconds. - /// - /// This should only be updated rarely, or when there's a - /// material change, as any change here also gets uploaded to the control plane. - pub relay_latency: BTreeMap, -} - -impl NetInfo { - /// reports whether `self` and `other` are basically equal, ignoring changes in relay ServerLatency & RelayLatency. - pub fn basically_equal(&self, other: &Self) -> bool { - let eq_icmp_v4 = match (self.working_icmp_v4, other.working_icmp_v4) { - (Some(slf), Some(other)) => slf == other, - _ => true, // ignore for comparison if only one report had this info - }; - let eq_icmp_v6 = match (self.working_icmp_v6, other.working_icmp_v6) { - (Some(slf), Some(other)) => slf == other, - _ => true, // ignore for comparison if only one report had this info - }; - self.mapping_varies_by_dest_ip == other.mapping_varies_by_dest_ip - && self.hair_pinning == other.hair_pinning - && self.working_ipv6 == other.working_ipv6 - && self.os_has_ipv6 == other.os_has_ipv6 - && self.working_udp == other.working_udp - && eq_icmp_v4 - && eq_icmp_v6 - && self.have_port_map == other.have_port_map - && self.portmap_probe == other.portmap_probe - && self.preferred_relay == other.preferred_relay - && self.link_type == other.link_type - } -} - -/// The type of link. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum LinkType { - /// A wired link (ethernet, fiber, etc). - Wired, - /// A WiFi link. - Wifi, - /// LTE, 4G, 3G, etc. - Mobile, -} diff --git a/iroh-net/src/defaults.rs b/iroh-net/src/defaults.rs index 22e6b1d6e9..7d0237a629 100644 --- a/iroh-net/src/defaults.rs +++ b/iroh-net/src/defaults.rs @@ -9,8 +9,20 @@ pub const NA_RELAY_HOSTNAME: &str = "use1-1.relay.iroh.network."; /// Hostname of the default EU relay. pub const EU_RELAY_HOSTNAME: &str = "euw1-1.relay.iroh.network."; -/// STUN port as defined by [RFC 8489]() -pub const DEFAULT_RELAY_STUN_PORT: u16 = 3478; +/// The default STUN port used by the Relay server. +/// +/// The STUN port as defined by [RFC +/// 8489]() +pub const DEFAULT_STUN_PORT: u16 = 3478; + +/// The default HTTP port used by the Relay server. +pub const DEFAULT_HTTP_PORT: u16 = 80; + +/// The default HTTPS port used by the Relay server. +pub const DEFAULT_HTTPS_PORT: u16 = 443; + +/// The default metrics port used by the Relay server. +pub const DEFAULT_METRICS_PORT: u16 = 9090; /// Get the default [`RelayMap`]. pub fn default_relay_map() -> RelayMap { @@ -27,7 +39,7 @@ pub fn default_na_relay_node() -> RelayNode { RelayNode { url: url.into(), stun_only: false, - stun_port: DEFAULT_RELAY_STUN_PORT, + stun_port: DEFAULT_STUN_PORT, } } @@ -40,6 +52,6 @@ pub fn default_eu_relay_node() -> RelayNode { RelayNode { url: url.into(), stun_only: false, - stun_port: DEFAULT_RELAY_STUN_PORT, + stun_port: DEFAULT_STUN_PORT, } } diff --git a/iroh-net/src/dns.rs b/iroh-net/src/dns.rs index 1ac64c2f7f..bcd5ebc15a 100644 --- a/iroh-net/src/dns.rs +++ b/iroh-net/src/dns.rs @@ -387,17 +387,6 @@ pub(crate) mod tests { const TIMEOUT: Duration = Duration::from_secs(5); const STAGGERING_DELAYS: &[u64] = &[200, 300]; - #[tokio::test] - #[cfg_attr(target_os = "windows", ignore = "flaky")] - async fn test_dns_lookup_basic() { - let _logging = iroh_test::logging::setup(); - let resolver = default_resolver(); - let res = resolver.lookup_ip(NA_RELAY_HOSTNAME).await.unwrap(); - let res: Vec<_> = res.iter().collect(); - assert!(!res.is_empty()); - dbg!(res); - } - #[tokio::test] async fn test_dns_lookup_ipv4_ipv6() { let _logging = iroh_test::logging::setup(); diff --git a/iroh-net/src/endpoint.rs b/iroh-net/src/endpoint.rs index 4dc49c3ebb..48bded5eda 100644 --- a/iroh-net/src/endpoint.rs +++ b/iroh-net/src/endpoint.rs @@ -47,8 +47,8 @@ pub use quinn::{ }; pub use super::magicsock::{ - ConnectionInfo, ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, - LocalEndpointsStream, + ConnectionInfo, ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddr, DirectAddrInfo, + DirectAddrType, DirectAddrsStream, }; pub use iroh_base::node_addr::{AddrInfo, NodeAddr}; @@ -590,10 +590,10 @@ impl Endpoint { /// /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and local IP endpoints /// as they would be returned by [`Endpoint::home_relay`] and - /// [`Endpoint::local_endpoints`]. + /// [`Endpoint::direct_addresses`]. pub async fn node_addr(&self) -> Result { let addrs = self - .local_endpoints() + .direct_addresses() .next() .await .ok_or(anyhow!("No IP endpoints found"))?; @@ -660,13 +660,13 @@ impl Endpoint { /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { /// let mep = Endpoint::builder().bind(0).await.unwrap(); - /// let _endpoints = mep.local_endpoints().next().await; + /// let _addrs = mep.direct_addresses().next().await; /// # }); /// ``` /// /// [STUN]: https://en.wikipedia.org/wiki/STUN - pub fn local_endpoints(&self) -> LocalEndpointsStream { - self.msock.local_endpoints() + pub fn direct_addresses(&self) -> DirectAddrsStream { + self.msock.direct_addresses() } /// Returns the local socket addresses on which the underlying sockets are bound. @@ -931,11 +931,11 @@ impl Connecting { /// Extracts the ALPN protocol from the peer's handshake data. // Note, we could totally provide this method to be on a Connection as well. But we'd // need to wrap Connection too. - pub async fn alpn(&mut self) -> Result { + pub async fn alpn(&mut self) -> Result> { let data = self.handshake_data().await?; match data.downcast::() { Ok(data) => match data.protocol { - Some(protocol) => std::string::String::from_utf8(protocol).map_err(Into::into), + Some(protocol) => Ok(protocol), None => bail!("no ALPN protocol available"), }, Err(_) => bail!("unknown handshake type"), @@ -1388,7 +1388,7 @@ mod tests { let conn = incoming.await.unwrap(); let node_id = get_remote_node_id(&conn).unwrap(); assert_eq!(node_id, src); - assert_eq!(alpn.as_bytes(), TEST_ALPN); + assert_eq!(alpn, TEST_ALPN); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); let m = recv.read_to_end(100).await.unwrap(); assert_eq!(m, b"hello"); diff --git a/iroh-net/src/lib.rs b/iroh-net/src/lib.rs index 5cba9c3892..8e54ac70e2 100644 --- a/iroh-net/src/lib.rs +++ b/iroh-net/src/lib.rs @@ -117,7 +117,6 @@ #![recursion_limit = "256"] #![deny(missing_docs, rustdoc::broken_intra_doc_links)] -pub mod config; pub mod defaults; pub mod dialer; mod disco; diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 1f99c0b4e5..3b3a56ec78 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -16,7 +16,7 @@ //! however, read any packets that come off the UDP sockets. use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, fmt::Display, io, net::{IpAddr, Ipv6Addr, SocketAddr}, @@ -51,7 +51,6 @@ use url::Url; use watchable::Watchable; use crate::{ - config, disco::{self, SendAddr}, discovery::Discovery, dns::DnsResolver, @@ -97,37 +96,37 @@ const NETCHECK_REPORT_TIMEOUT: Duration = Duration::from_secs(10); /// Contains options for `MagicSock::listen`. #[derive(derive_more::Debug)] -pub(super) struct Options { +pub(crate) struct Options { /// The port to listen on. /// Zero means to pick one automatically. - pub port: u16, + pub(crate) port: u16, /// Secret key for this node. - pub secret_key: SecretKey, + pub(crate) secret_key: SecretKey, /// The [`RelayMap`] to use, leave empty to not use a relay server. - pub relay_map: RelayMap, + pub(crate) relay_map: RelayMap, /// Path to store known nodes. - pub nodes_path: Option, + pub(crate) nodes_path: Option, /// Optional node discovery mechanism. - pub discovery: Option>, + pub(crate) discovery: Option>, /// A DNS resolver to use for resolving relay URLs. /// /// You can use [`crate::dns::default_resolver`] for a resolver that uses the system's DNS /// configuration. - pub dns_resolver: DnsResolver, + pub(crate) dns_resolver: DnsResolver, /// Proxy configuration. - pub proxy_url: Option, + pub(crate) proxy_url: Option, /// Skip verification of SSL certificates from relay servers /// /// May only be used in tests. #[cfg(any(test, feature = "test-utils"))] - pub insecure_skip_relay_cert_verify: bool, + pub(crate) insecure_skip_relay_cert_verify: bool, } impl Default for Options { @@ -148,13 +147,13 @@ impl Default for Options { /// Contents of a relay message. Use a SmallVec to avoid allocations for the very /// common case of a single packet. -pub(super) type RelayContents = SmallVec<[Bytes; 1]>; +type RelayContents = SmallVec<[Bytes; 1]>; /// Handle for [`MagicSock`]. /// /// Dereferences to [`MagicSock`], and handles closing. #[derive(Clone, Debug, derive_more::Deref)] -pub(super) struct Handle { +pub(crate) struct Handle { #[deref(forward)] msock: Arc, // Empty when closed @@ -172,7 +171,7 @@ pub(super) struct Handle { /// means any QUIC endpoints on top will be sharing as much information about nodes as /// possible. #[derive(derive_more::Debug)] -pub(super) struct MagicSock { +pub(crate) struct MagicSock { actor_sender: mpsc::Sender, relay_actor_sender: mpsc::Sender, /// String representation of the node_id of this node. @@ -248,19 +247,19 @@ pub(super) struct MagicSock { impl MagicSock { /// Creates a magic [`MagicSock`] listening on [`Options::port`]. - pub async fn spawn(opts: Options) -> Result { + pub(crate) async fn spawn(opts: Options) -> Result { Handle::new(opts).await } /// Returns the relay node we are connected to, that has the best latency. /// /// If `None`, then we are not connected to any relay nodes. - pub fn my_relay(&self) -> Option { + pub(crate) fn my_relay(&self) -> Option { self.my_relay.get() } /// Get the current proxy configuration. - pub fn proxy_url(&self) -> Option<&Url> { + pub(crate) fn proxy_url(&self) -> Option<&Url> { self.proxy_url.as_ref() } @@ -284,42 +283,43 @@ impl MagicSock { } /// Get the cached version of the Ipv4 and Ipv6 addrs of the current connection. - pub fn local_addr(&self) -> (SocketAddr, Option) { + pub(crate) fn local_addr(&self) -> (SocketAddr, Option) { *self.local_addrs.read().expect("not poisoned") } /// Returns `true` if we have at least one candidate address where we can send packets to. - pub fn has_send_address(&self, node_key: PublicKey) -> bool { + pub(crate) fn has_send_address(&self, node_key: PublicKey) -> bool { self.connection_info(node_key) .map(|info| info.has_send_address()) .unwrap_or(false) } /// Retrieve connection information about nodes in the network. - pub fn connection_infos(&self) -> Vec { + pub(crate) fn connection_infos(&self) -> Vec { self.node_map.node_infos(Instant::now()) } /// Retrieve connection information about a node in the network. - pub fn connection_info(&self, node_id: NodeId) -> Option { + pub(crate) fn connection_info(&self, node_id: NodeId) -> Option { self.node_map.node_info(node_id) } - /// Returns the local endpoints as a stream. + /// Returns the direct addresses as a stream. /// - /// The [`MagicSock`] continuously monitors the local endpoints, the network addresses - /// it can listen on, for changes. Whenever changes are detected this stream will yield - /// a new list of endpoints. + /// The [`MagicSock`] continuously monitors the direct addresses, the network addresses + /// it might be able to be contacted on, for changes. Whenever changes are detected + /// this stream will yield a new list of addresses. /// /// Upon the first creation on the [`MagicSock`] it may not yet have completed a first - /// local endpoint discovery, in this case the first item of the stream will not be - /// immediately available. Once this first set of local endpoints are discovered the - /// stream will always return the first set of endpoints immediately, which are the most - /// recently discovered endpoints. + /// direct addresses discovery, in this case the first item of the stream will not be + /// immediately available. Once this first set of direct addresses are discovered the + /// stream will always return the first set of addresses immediately, which are the most + /// recently discovered addresses. /// - /// To get the current endpoints, drop the stream after the first item was received. - pub fn local_endpoints(&self) -> LocalEndpointsStream { - LocalEndpointsStream { + /// To get the current direct addresses, drop the stream after the first item was + /// received. + pub(crate) fn direct_addresses(&self) -> DirectAddrsStream { + DirectAddrsStream { initial: Some(self.endpoints.get()), inner: self.endpoints.watch().into_stream(), } @@ -329,7 +329,7 @@ impl MagicSock { /// /// Note that this can be used to wait for the initial home relay to be known. If the home /// relay is known at this point, it will be the first item in the stream. - pub fn watch_home_relay(&self) -> impl Stream { + pub(crate) fn watch_home_relay(&self) -> impl Stream { let current = futures_lite::stream::iter(self.my_relay()); let changes = self .my_relay @@ -352,7 +352,7 @@ impl MagicSock { /// /// Will return an error if there is no address information known about the /// given `node_id`. - pub fn conn_type_stream(&self, node_id: NodeId) -> Result { + pub(crate) fn conn_type_stream(&self, node_id: NodeId) -> Result { self.node_map.conn_type_stream(node_id) } @@ -360,7 +360,7 @@ impl MagicSock { /// /// Note this is a user-facing API and does not wrap the [`SocketAddr`] in a /// [`QuicMappedAddr`] as we do internally. - pub fn get_mapping_addr(&self, node_id: NodeId) -> Option { + pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option { self.node_map .get_quic_mapped_addr_for_node_key(node_id) .map(|a| a.0) @@ -390,7 +390,7 @@ impl MagicSock { /// Updates our direct addresses. /// /// On a successful update, our address is published to discovery. - pub(super) fn update_endpoints(&self, eps: Vec) { + pub(super) fn update_endpoints(&self, eps: Vec) { let updated = self.endpoints.update(DiscoveredEndpoints::new(eps)).is_ok(); if updated { let eps = self.endpoints.read(); @@ -402,17 +402,17 @@ impl MagicSock { } /// Get a reference to the DNS resolver used in this [`MagicSock`]. - pub fn dns_resolver(&self) -> &DnsResolver { + pub(crate) fn dns_resolver(&self) -> &DnsResolver { &self.dns_resolver } /// Reference to optional discovery service - pub fn discovery(&self) -> Option<&dyn Discovery> { + pub(crate) fn discovery(&self) -> Option<&dyn Discovery> { self.discovery.as_ref().map(Box::as_ref) } /// Call to notify the system of potential network changes. - pub async fn network_change(&self) { + pub(crate) async fn network_change(&self) { self.actor_sender .send(ActorMessage::NetworkChange) .await @@ -1488,7 +1488,7 @@ impl Handle { /// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`] /// indefinitely after this call. #[instrument(skip_all, fields(me = %self.msock.me))] - pub async fn close(&self) -> Result<()> { + pub(crate) async fn close(&self) -> Result<()> { if self.msock.is_closed() { return Ok(()); } @@ -1523,13 +1523,13 @@ impl Handle { /// Stream returning local endpoints as they change. #[derive(Debug)] -pub struct LocalEndpointsStream { +pub struct DirectAddrsStream { initial: Option, inner: watchable::WatcherStream, } -impl Stream for LocalEndpointsStream { - type Item = Vec; +impl Stream for DirectAddrsStream { + type Item = Vec; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; @@ -1612,7 +1612,7 @@ enum DiscoBoxError { type RelayRecvResult = Result<(PublicKey, quinn_udp::RecvMeta, Bytes), io::Error>; /// Reports whether x and y represent the same set of endpoints. The order doesn't matter. -fn endpoint_sets_equal(xs: &[config::Endpoint], ys: &[config::Endpoint]) -> bool { +fn endpoint_sets_equal(xs: &[DirectAddr], ys: &[DirectAddr]) -> bool { if xs.is_empty() && ys.is_empty() { return true; } @@ -1628,7 +1628,7 @@ fn endpoint_sets_equal(xs: &[config::Endpoint], ys: &[config::Endpoint]) -> bool return true; } } - let mut m: HashMap<&config::Endpoint, usize> = HashMap::new(); + let mut m: HashMap<&DirectAddr, usize> = HashMap::new(); for x in xs { *m.entry(x).or_default() |= 1; } @@ -1697,7 +1697,7 @@ struct Actor { /// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun. periodic_re_stun_timer: time::Interval, /// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc. - net_info_last: Option, + net_info_last: Option, /// Path where connection info from [`MagicSock::node_map`] is persisted. nodes_path: Option, @@ -1992,7 +1992,7 @@ impl Actor { #[allow(clippy::map_entry)] if !$already.contains_key(&$ipp) { $already.insert($ipp, $et); - $eps.push(config::Endpoint { + $eps.push(DirectAddr { addr: $ipp, typ: $et, }); @@ -2003,13 +2003,13 @@ impl Actor { let maybe_port_mapped = *portmap_watcher.borrow(); if let Some(portmap_ext) = maybe_port_mapped.map(SocketAddr::V4) { - add_addr!(already, eps, portmap_ext, config::EndpointType::Portmapped); + add_addr!(already, eps, portmap_ext, DirectAddrType::Portmapped); self.set_net_info_have_port_map().await; } if let Some(nr) = nr { if let Some(global_v4) = nr.global_v4 { - add_addr!(already, eps, global_v4.into(), config::EndpointType::Stun); + add_addr!(already, eps, global_v4.into(), DirectAddrType::Stun); // If they're behind a hard NAT and are using a fixed // port locally, assume they might've added a static @@ -2019,16 +2019,11 @@ impl Actor { if nr.mapping_varies_by_dest_ip.unwrap_or_default() && port != 0 { let mut addr = global_v4; addr.set_port(port); - add_addr!( - already, - eps, - addr.into(), - config::EndpointType::Stun4LocalPort - ); + add_addr!(already, eps, addr.into(), DirectAddrType::Stun4LocalPort); } } if let Some(global_v6) = nr.global_v6 { - add_addr!(already, eps, global_v6.into(), config::EndpointType::Stun); + add_addr!(already, eps, global_v6.into(), DirectAddrType::Stun); } } let local_addr_v4 = self.pconn4.local_addr().ok(); @@ -2086,7 +2081,7 @@ impl Actor { already, eps, SocketAddr::new(ip, port), - config::EndpointType::Local + DirectAddrType::Local ); } } @@ -2096,7 +2091,7 @@ impl Actor { already, eps, SocketAddr::new(ip, port), - config::EndpointType::Local + DirectAddrType::Local ); } } @@ -2108,7 +2103,7 @@ impl Actor { if let Some(addr) = local_addr_v4 { // Our local endpoint is bound to a particular address. // Do not offer addresses on other local interfaces. - add_addr!(already, eps, addr, config::EndpointType::Local); + add_addr!(already, eps, addr, DirectAddrType::Local); } } @@ -2116,7 +2111,7 @@ impl Actor { if let Some(addr) = local_addr_v6 { // Our local endpoint is bound to a particular address. // Do not offer addresses on other local interfaces. - add_addr!(already, eps, addr, config::EndpointType::Local); + add_addr!(already, eps, addr, DirectAddrType::Local); } } @@ -2167,7 +2162,7 @@ impl Actor { } #[instrument(level = "debug", skip_all)] - async fn call_net_info_callback(&mut self, ni: config::NetInfo) { + async fn call_net_info_callback(&mut self, ni: NetInfo) { if let Some(ref net_info_last) = self.net_info_last { if ni.basically_equal(net_info_last) { return; @@ -2242,7 +2237,7 @@ impl Actor { self.no_v4_send = !r.ipv4_can_send; let have_port_map = self.port_mapper.watch_external_address().borrow().is_some(); - let mut ni = config::NetInfo { + let mut ni = NetInfo { relay_latency: Default::default(), mapping_varies_by_dest_ip: r.mapping_varies_by_dest_ip, hair_pinning: r.hair_pinning, @@ -2254,7 +2249,6 @@ impl Actor { working_icmp_v4: r.icmpv4, working_icmp_v6: r.icmpv6, preferred_relay: r.preferred_relay.clone(), - link_type: None, }; for (rid, d) in r.relay_v4_latency.iter() { ni.relay_latency @@ -2430,7 +2424,7 @@ fn bind(port: u16) -> Result<(UdpConn, Option)> { struct DiscoveredEndpoints { /// Records the endpoints found during the previous /// endpoint discovery. It's used to avoid duplicate endpoint change notifications. - last_endpoints: Vec, + last_endpoints: Vec, /// The last time the endpoints were updated, even if there was no change. last_endpoints_time: Option, @@ -2443,18 +2437,18 @@ impl PartialEq for DiscoveredEndpoints { } impl DiscoveredEndpoints { - fn new(endpoints: Vec) -> Self { + fn new(endpoints: Vec) -> Self { Self { last_endpoints: endpoints, last_endpoints_time: Some(Instant::now()), } } - fn into_iter(self) -> impl Iterator { + fn into_iter(self) -> impl Iterator { self.last_endpoints.into_iter() } - fn iter(&self) -> impl Iterator + '_ { + fn iter(&self) -> impl Iterator + '_ { self.last_endpoints.iter() } @@ -2510,7 +2504,7 @@ fn split_packets(transmits: &[quinn_udp::Transmit]) -> RelayContents { /// Splits a packet into its component items. #[derive(Debug)] -pub(super) struct PacketSplitIter { +struct PacketSplitIter { bytes: Bytes, } @@ -2518,7 +2512,7 @@ impl PacketSplitIter { /// Create a new PacketSplitIter from a packet. /// /// Returns an error if the packet is too big. - pub fn new(bytes: Bytes) -> Self { + fn new(bytes: Bytes) -> Self { Self { bytes } } @@ -2614,8 +2608,133 @@ fn disco_message_sent(msg: &disco::Message) { } } +/// A *direct address* on which an iroh-node might be contactable. +/// +/// Direct addresses are UDP socket addresses on which an iroh-net node could potentially be +/// contacted. These can come from various sources depending on the network topology of the +/// iroh-net node, see [`DirectAddrType`] for the several kinds of sources. +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct DirectAddr { + /// The address. + pub addr: SocketAddr, + /// The origin of this direct address. + pub typ: DirectAddrType, +} + +/// The type of direct address. +/// +/// These are the various sources or origins from which an iroh-net node might have found a +/// possible [`DirectAddr`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum DirectAddrType { + /// Not yet determined.. + Unknown, + /// A locally bound socket address. + Local, + /// Public internet address discovered via STUN. + /// + /// When possible an iroh-net node will perform STUN to discover which is the address + /// from which it sends data on the public internet. This can be different from locally + /// bound addresses when the node is on a local network wich performs NAT or similar. + Stun, + /// An address assigned by the router using port mapping. + /// + /// When possible an iroh-net node will request a port mapping from the local router to + /// get a publicly routable direct address. + Portmapped, + /// Hard NAT: STUN'ed IPv4 address + local fixed port. + /// + /// It is possible to configure iroh-net to bound to a specific port and independently + /// configure the router to forward this port to the iroh-net node. This indicates a + /// situation like this, which still uses STUN to discover the public address. + Stun4LocalPort, +} + +impl Display for DirectAddrType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DirectAddrType::Unknown => write!(f, "?"), + DirectAddrType::Local => write!(f, "local"), + DirectAddrType::Stun => write!(f, "stun"), + DirectAddrType::Portmapped => write!(f, "portmap"), + DirectAddrType::Stun4LocalPort => write!(f, "stun4localport"), + } + } +} + +/// Contains information about the host's network state. +#[derive(Debug, Clone, PartialEq)] +struct NetInfo { + /// Says whether the host's NAT mappings vary based on the destination IP. + mapping_varies_by_dest_ip: Option, + + /// If their router does hairpinning. It reports true even if there's no NAT involved. + hair_pinning: Option, + + /// Whether the host has IPv6 internet connectivity. + working_ipv6: Option, + + /// Whether the OS supports IPv6 at all, regardless of whether IPv6 internet connectivity is available. + os_has_ipv6: Option, + + /// Whether the host has UDP internet connectivity. + working_udp: Option, + + /// Whether ICMPv4 works, `None` means not checked. + working_icmp_v4: Option, + + /// Whether ICMPv6 works, `None` means not checked. + working_icmp_v6: Option, + + /// Whether we have an existing portmap open (UPnP, PMP, or PCP). + have_port_map: bool, + + /// Probe indicating the presence of port mapping protocols on the LAN. + portmap_probe: Option, + + /// This node's preferred relay server for incoming traffic. + /// + /// The node might be be temporarily connected to multiple relay servers (to send to + /// other nodes) but this is the relay on which you can always contact this node. Also + /// known as home relay. + preferred_relay: Option, + + /// The fastest recent time to reach various relay STUN servers, in seconds. + /// + /// This should only be updated rarely, or when there's a + /// material change, as any change here also gets uploaded to the control plane. + relay_latency: BTreeMap, +} + +impl NetInfo { + /// Checks if this is probably still the same network as *other*. + /// + /// This tries to compare the network situation, without taking into account things + /// expected to change a little like e.g. latency to the relay server. + fn basically_equal(&self, other: &Self) -> bool { + let eq_icmp_v4 = match (self.working_icmp_v4, other.working_icmp_v4) { + (Some(slf), Some(other)) => slf == other, + _ => true, // ignore for comparison if only one report had this info + }; + let eq_icmp_v6 = match (self.working_icmp_v6, other.working_icmp_v6) { + (Some(slf), Some(other)) => slf == other, + _ => true, // ignore for comparison if only one report had this info + }; + self.mapping_varies_by_dest_ip == other.mapping_varies_by_dest_ip + && self.hair_pinning == other.hair_pinning + && self.working_ipv6 == other.working_ipv6 + && self.os_has_ipv6 == other.os_has_ipv6 + && self.working_udp == other.working_udp + && eq_icmp_v4 + && eq_icmp_v6 + && self.have_port_map == other.have_port_map + && self.portmap_probe == other.portmap_probe + && self.preferred_relay == other.preferred_relay + } +} + #[cfg(test)] -pub(crate) mod tests { +mod tests { use anyhow::Context; use futures_lite::StreamExt; use iroh_test::CallOnDrop; @@ -2688,7 +2807,7 @@ pub(crate) mod tests { #[instrument(skip_all)] async fn mesh_stacks(stacks: Vec) -> Result { /// Registers endpoint addresses of a node to all other nodes. - fn update_eps(stacks: &[MagicStack], my_idx: usize, new_eps: Vec) { + fn update_direct_addrs(stacks: &[MagicStack], my_idx: usize, new_addrs: Vec) { let me = &stacks[my_idx]; for (i, m) in stacks.iter().enumerate() { if i == my_idx { @@ -2699,7 +2818,7 @@ pub(crate) mod tests { node_id: me.public(), info: crate::AddrInfo { relay_url: None, - direct_addresses: new_eps.iter().map(|ep| ep.addr).collect(), + direct_addresses: new_addrs.iter().map(|ep| ep.addr).collect(), }, }; m.endpoint.magic_sock().add_test_addr(addr); @@ -2714,10 +2833,10 @@ pub(crate) mod tests { let stacks = stacks.clone(); tasks.spawn(async move { let me = m.endpoint.node_id().fmt_short(); - let mut stream = m.endpoint.local_endpoints(); + let mut stream = m.endpoint.direct_addresses(); while let Some(new_eps) = stream.next().await { info!(%me, "conn{} endpoints update: {:?}", my_idx + 1, new_eps); - update_eps(&stacks, my_idx, new_eps); + update_direct_addrs(&stacks, my_idx, new_eps); } }); } @@ -3383,13 +3502,13 @@ pub(crate) mod tests { let ms = Handle::new(Default::default()).await.unwrap(); // See if we can get endpoints. - let mut eps0 = ms.local_endpoints().next().await.unwrap(); + let mut eps0 = ms.direct_addresses().next().await.unwrap(); eps0.sort(); println!("{eps0:?}"); assert!(!eps0.is_empty()); // Getting the endpoints again immediately should give the same results. - let mut eps1 = ms.local_endpoints().next().await.unwrap(); + let mut eps1 = ms.direct_addresses().next().await.unwrap(); eps1.sort(); println!("{eps1:?}"); assert_eq!(eps0, eps1); diff --git a/iroh-net/src/netcheck.rs b/iroh-net/src/netcheck.rs index 062368c1c8..391e174202 100644 --- a/iroh-net/src/netcheck.rs +++ b/iroh-net/src/netcheck.rs @@ -785,7 +785,7 @@ mod tests { use tokio::time; use tracing::info; - use crate::defaults::{DEFAULT_RELAY_STUN_PORT, EU_RELAY_HOSTNAME}; + use crate::defaults::{DEFAULT_STUN_PORT, EU_RELAY_HOSTNAME}; use crate::ping::Pinger; use crate::relay::RelayNode; @@ -795,11 +795,11 @@ mod tests { async fn test_basic() -> Result<()> { let _guard = iroh_test::logging::setup(); let (stun_addr, stun_stats, _cleanup_guard) = - stun::test::serve("0.0.0.0".parse().unwrap()).await?; + stun::tests::serve("127.0.0.1".parse().unwrap()).await?; let resolver = crate::dns::default_resolver(); let mut client = Client::new(None, resolver.clone())?; - let dm = stun::test::relay_map_of([stun_addr].into_iter()); + let dm = stun::tests::relay_map_of([stun_addr].into_iter()); // Note that the ProbePlan will change with each iteration. for i in 0..5 { @@ -842,7 +842,7 @@ mod tests { let dm = RelayMap::from_nodes([RelayNode { url: url.clone(), stun_only: true, - stun_port: DEFAULT_RELAY_STUN_PORT, + stun_port: DEFAULT_STUN_PORT, }]) .expect("hardcoded"); @@ -890,7 +890,7 @@ mod tests { // the STUN server being blocked will look like from the client's perspective. let blackhole = tokio::net::UdpSocket::bind("127.0.0.1:0").await?; let stun_addr = blackhole.local_addr()?; - let dm = stun::test::relay_map_of_opts([(stun_addr, false)].into_iter()); + let dm = stun::tests::relay_map_of_opts([(stun_addr, false)].into_iter()); // Now create a client and generate a report. let resolver = crate::dns::default_resolver().clone(); @@ -1127,8 +1127,8 @@ mod tests { // can easily use to identify the packet. // Setup STUN server and create relay_map. - let (stun_addr, _stun_stats, _done) = stun::test::serve_v4().await?; - let dm = stun::test::relay_map_of([stun_addr].into_iter()); + let (stun_addr, _stun_stats, _done) = stun::tests::serve_v4().await?; + let dm = stun::tests::relay_map_of([stun_addr].into_iter()); dbg!(&dm); let resolver = crate::dns::default_resolver().clone(); diff --git a/iroh-net/src/netcheck/reportgen.rs b/iroh-net/src/netcheck/reportgen.rs index 665ea66e54..b683d500a1 100644 --- a/iroh-net/src/netcheck/reportgen.rs +++ b/iroh-net/src/netcheck/reportgen.rs @@ -31,7 +31,7 @@ use tokio::time::{self, Instant}; use tracing::{debug, debug_span, error, info_span, trace, warn, Instrument, Span}; use super::NetcheckMetrics; -use crate::defaults::DEFAULT_RELAY_STUN_PORT; +use crate::defaults::DEFAULT_STUN_PORT; use crate::dns::{DnsResolver, ResolverExt}; use crate::net::interfaces; use crate::net::ip; @@ -935,7 +935,7 @@ async fn get_relay_addr( proto: ProbeProto, ) -> Result { let port = if relay_node.stun_port == 0 { - DEFAULT_RELAY_STUN_PORT + DEFAULT_STUN_PORT } else { relay_node.stun_port }; diff --git a/iroh-net/src/relay.rs b/iroh-net/src/relay.rs index 13dc332f75..88213f0635 100644 --- a/iroh-net/src/relay.rs +++ b/iroh-net/src/relay.rs @@ -15,6 +15,7 @@ pub(crate) mod client_conn; pub(crate) mod clients; mod codec; pub mod http; +pub mod iroh_relay; mod map; mod metrics; pub(crate) mod server; diff --git a/iroh-net/src/relay/http.rs b/iroh-net/src/relay/http.rs index e73da2de73..cd3d7519bf 100644 --- a/iroh-net/src/relay/http.rs +++ b/iroh-net/src/relay/http.rs @@ -6,32 +6,10 @@ mod server; pub(crate) mod streams; pub use self::client::{Client, ClientBuilder, ClientError, ClientReceiver}; -pub use self::server::{Server, ServerBuilder, TlsAcceptor, TlsConfig}; +pub use self::server::{Server, ServerBuilder, ServerHandle, TlsAcceptor, TlsConfig}; pub(crate) const HTTP_UPGRADE_PROTOCOL: &str = "iroh derp http"; -#[cfg(any(test, feature = "test-utils"))] -pub(crate) fn make_tls_config() -> TlsConfig { - let subject_alt_names = vec!["localhost".to_string()]; - - let cert = rcgen::generate_simple_self_signed(subject_alt_names).unwrap(); - let rustls_certificate = rustls::Certificate(cert.serialize_der().unwrap()); - let rustls_key = rustls::PrivateKey(cert.get_key_pair().serialize_der()); - let config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(vec![(rustls_certificate)], rustls_key) - .unwrap(); - - let config = std::sync::Arc::new(config); - let acceptor = tokio_rustls::TlsAcceptor::from(config.clone()); - - TlsConfig { - config, - acceptor: TlsAcceptor::Manual(acceptor), - } -} - #[cfg(test)] mod tests { use super::*; @@ -47,6 +25,27 @@ mod tests { use crate::key::{PublicKey, SecretKey}; use crate::relay::ReceivedMessage; + pub(crate) fn make_tls_config() -> TlsConfig { + let subject_alt_names = vec!["localhost".to_string()]; + + let cert = rcgen::generate_simple_self_signed(subject_alt_names).unwrap(); + let rustls_certificate = rustls::Certificate(cert.serialize_der().unwrap()); + let rustls_key = rustls::PrivateKey(cert.get_key_pair().serialize_der()); + let config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(vec![(rustls_certificate)], rustls_key) + .unwrap(); + + let config = std::sync::Arc::new(config); + let acceptor = tokio_rustls::TlsAcceptor::from(config.clone()); + + TlsConfig { + config, + acceptor: TlsAcceptor::Manual(acceptor), + } + } + #[tokio::test] async fn test_http_clients_and_server() -> Result<()> { let _guard = iroh_test::logging::setup(); @@ -115,7 +114,7 @@ mod tests { client_a_task.abort(); client_b.close().await?; client_b_task.abort(); - server.shutdown().await; + server.shutdown(); Ok(()) } @@ -186,7 +185,7 @@ mod tests { let tls_config = make_tls_config(); // start server - let server = ServerBuilder::new("127.0.0.1:0".parse().unwrap()) + let mut server = ServerBuilder::new("127.0.0.1:0".parse().unwrap()) .secret_key(Some(server_key)) .tls_config(Some(tls_config)) .spawn() @@ -232,7 +231,8 @@ mod tests { assert_eq!(b_key, got_key); assert_eq!(msg, got_msg); - server.shutdown().await; + server.shutdown(); + server.task_handle().await?; client_a.close().await?; client_a_task.abort(); client_b.close().await?; diff --git a/iroh-net/src/relay/http/server.rs b/iroh-net/src/relay/http/server.rs index a102458a3e..eaf6ffd70a 100644 --- a/iroh-net/src/relay/http/server.rs +++ b/iroh-net/src/relay/http/server.rs @@ -18,7 +18,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::task::JoinHandle; use tokio_rustls_acme::AcmeAcceptor; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, Instrument}; use crate::key::SecretKey; use crate::relay::http::HTTP_UPGRADE_PROTOCOL; @@ -70,30 +70,68 @@ async fn relay_connection_handler( conn_handler.accept(io).await } -/// A Relay Server handler. Created using [`ServerBuilder::spawn`], it starts a relay server -/// listening over HTTP or HTTPS. +/// The Relay HTTP server. +/// +/// A running HTTP server serving the relay endpoint and optionally a number of additional +/// HTTP services added with [`ServerBuilder::request_handler`]. If configured using +/// [`ServerBuilder::tls_config`] the server will handle TLS as well. +/// +/// Created using [`ServerBuilder::spawn`]. #[derive(Debug)] pub struct Server { addr: SocketAddr, - server: Option, http_server_task: JoinHandle<()>, cancel_server_loop: CancellationToken, } impl Server { - /// Close the underlying relay server and the HTTP(S) server task - pub async fn shutdown(self) { - if let Some(server) = self.server { - server.close().await; + /// Returns a handle for this server. + /// + /// The server runs in the background as several async tasks. This allows controlling + /// the server, in particular it allows gracefully shutting down the server. + pub fn handle(&self) -> ServerHandle { + ServerHandle { + addr: self.addr, + cancel_token: self.cancel_server_loop.clone(), } + } + /// Closes the underlying relay server and the HTTP(S) server tasks. + pub fn shutdown(&self) { self.cancel_server_loop.cancel(); - if let Err(e) = self.http_server_task.await { - warn!("Error shutting down server: {e:?}"); - } } - /// Get the local address of this server. + /// Returns the [`JoinHandle`] for the supervisor task managing the server. + /// + /// This is the root of all the tasks for the server. Aborting it will abort all the + /// other tasks for the server. Awaiting it will complete when all the server tasks are + /// completed. + pub fn task_handle(&mut self) -> &mut JoinHandle<()> { + &mut self.http_server_task + } + + /// Returns the local address of this server. + pub fn addr(&self) -> SocketAddr { + self.addr + } +} + +/// A handle for the [`Server`]. +/// +/// This does not allow access to the task but can communicate with it. +#[derive(Debug, Clone)] +pub struct ServerHandle { + addr: SocketAddr, + cancel_token: CancellationToken, +} + +impl ServerHandle { + /// Gracefully shut down the server. + pub fn shutdown(&self) { + self.cancel_token.cancel() + } + + /// Returns the address the server is bound on. pub fn addr(&self) -> SocketAddr { self.addr } @@ -108,13 +146,15 @@ pub struct TlsConfig { pub acceptor: TlsAcceptor, } -/// Build a Relay Server that communicates over HTTP or HTTPS, on a given address. +/// Builder for the Relay HTTP Server. /// -/// Defaults to handling relay requests on the "/derp" endpoint. +/// Defaults to handling relay requests on the "/derp" endpoint. Other HTTP endpoints can +/// be added using [`ServerBuilder::request_handler`]. /// -/// If no [`SecretKey`] is provided, it is assumed that you will provide a `relay_override` function -/// that handles requests to the relay endpoint. Not providing a `relay_override` in this case will -/// result in an error on `spawn`. +/// If no [`SecretKey`] is provided, it is assumed that you will provide a +/// [`ServerBuilder::relay_override`] function that handles requests to the relay +/// endpoint. Not providing a [`ServerBuilder::relay_override`] in this case will result in +/// an error on `spawn`. #[derive(derive_more::Debug)] pub struct ServerBuilder { /// The secret key for this Server. @@ -128,18 +168,21 @@ pub struct ServerBuilder { /// /// When `None`, the server will serve HTTP, otherwise it will serve HTTPS. tls_config: Option, - /// A map of request handlers to routes. Used when certain routes in your server should be made - /// available at the same port as the relay server, and so must be handled along side requests - /// to the relay endpoint. + /// A map of request handlers to routes. + /// + /// Used when certain routes in your server should be made available at the same port as + /// the relay server, and so must be handled along side requests to the relay endpoint. handlers: Handlers, /// Defaults to `GET` request at "/derp". relay_endpoint: &'static str, - /// Use a custom relay response handler. Typically used when you want to disable any relay connections. + /// Use a custom relay response handler. + /// + /// Typically used when you want to disable any relay connections. #[debug("{}", relay_override.as_ref().map_or("None", |_| "Some(Box, ResponseBuilder) -> Result + Send + Sync + 'static>)"))] relay_override: Option, - /// Headers to use for HTTP or HTTPS messages. + /// Headers to use for HTTP responses. headers: HeaderMap, - /// 404 not found response + /// 404 not found response. /// /// When `None`, a default is provided. #[debug("{}", not_found_fn.as_ref().map_or("None", |_| "Some(Box Result> + Send + Sync + 'static>)"))] @@ -147,7 +190,7 @@ pub struct ServerBuilder { } impl ServerBuilder { - /// Create a new [ServerBuilder] + /// Creates a new [ServerBuilder]. pub fn new(addr: SocketAddr) -> Self { Self { secret_key: None, @@ -161,20 +204,21 @@ impl ServerBuilder { } } - /// The [`SecretKey`] identity for this relay server. When set to `None`, the builder assumes - /// you do not want to run a relay service. + /// The [`SecretKey`] identity for this relay server. + /// + /// When set to `None`, the builder assumes you do not want to run a relay service. pub fn secret_key(mut self, secret_key: Option) -> Self { self.secret_key = secret_key; self } - /// Serve relay content using TLS. + /// Serves all requests content using TLS. pub fn tls_config(mut self, config: Option) -> Self { self.tls_config = config; self } - /// Add a custom handler for a specific Method & URI. + /// Adds a custom handler for a specific Method & URI. pub fn request_handler( mut self, method: Method, @@ -185,26 +229,29 @@ impl ServerBuilder { self } - /// Pass in a custom "404" handler. + /// Sets a custom "404" handler. pub fn not_found_handler(mut self, handler: HyperHandler) -> Self { self.not_found_fn = Some(handler); self } - /// Handle the relay endpoint in a custom way. This is required if no [`SecretKey`] was provided - /// to the builder. + /// Handles the relay endpoint in a custom way. + /// + /// This is required if no [`SecretKey`] was provided to the builder. pub fn relay_override(mut self, handler: HyperHandler) -> Self { self.relay_override = Some(handler); self } - /// Change the relay endpoint from "/derp" to `endpoint`. + /// Sets a custom endpoint for the relay handler. + /// + /// The default is `/derp`. pub fn relay_endpoint(mut self, endpoint: &'static str) -> Self { self.relay_endpoint = endpoint; self } - /// Add http headers. + /// Adds HTTP headers to responses. pub fn headers(mut self, headers: HeaderMap) -> Self { for (k, v) in headers.iter() { self.headers.insert(k.clone(), v.clone()); @@ -212,10 +259,14 @@ impl ServerBuilder { self } - /// Build and spawn an HTTP(S) relay Server + /// Builds and spawns an HTTP(S) Relay Server. pub async fn spawn(self) -> Result { - ensure!(self.secret_key.is_some() || self.relay_override.is_some(), "Must provide a `SecretKey` for the relay server OR pass in an override function for the 'relay' endpoint"); + ensure!( + self.secret_key.is_some() || self.relay_override.is_some(), + "Must provide a `SecretKey` for the relay server OR pass in an override function for the 'relay' endpoint" + ); let (relay_handler, relay_server) = if let Some(secret_key) = self.secret_key { + // spawns a server actor/task let server = crate::relay::server::Server::new(secret_key.clone()); ( RelayHandler::ConnHandler(server.client_conn_handler(self.headers.clone())), @@ -258,6 +309,7 @@ impl ServerBuilder { service, }; + // Spawns some server tasks, we only wait till all tasks are started. server_state.serve().await } } @@ -274,13 +326,19 @@ impl ServerState { // Binds a TCP listener on `addr` and handles content using HTTPS. // Returns the local [`SocketAddr`] on which the server is listening. async fn serve(self) -> Result { - let listener = TcpListener::bind(&self.addr) + let ServerState { + addr, + tls_config, + server, + service, + } = self; + let listener = TcpListener::bind(&addr) .await - .context("failed to bind https")?; + .context("failed to bind server socket")?; // we will use this cancel token to stop the infinite loop in the `listener.accept() task` let cancel_server_loop = CancellationToken::new(); let addr = listener.local_addr()?; - let http_str = self.tls_config.as_ref().map_or("HTTP", |_| "HTTPS"); + let http_str = tls_config.as_ref().map_or("HTTP", |_| "HTTPS"); info!("[{http_str}] relay: serving on {addr}"); let cancel = cancel_server_loop.clone(); let task = tokio::task::spawn(async move { @@ -295,8 +353,8 @@ impl ServerState { res = listener.accept() => match res { Ok((stream, peer_addr)) => { debug!("[{http_str}] relay: Connection opened from {peer_addr}"); - let tls_config = self.tls_config.clone(); - let service = self.service.clone(); + let tls_config = tls_config.clone(); + let service = service.clone(); // spawn a task to handle the connection set.spawn(async move { if let Err(error) = service @@ -320,13 +378,17 @@ impl ServerState { } } } + if let Some(server) = server { + // TODO: if the task this is running in is aborted this server is not shut + // down. + server.close().await; + } set.shutdown().await; debug!("[{http_str}] relay: server has been shutdown."); }.instrument(info_span!("relay-http-serve"))); Ok(Server { addr, - server: self.server, http_server_task: task, cancel_server_loop, }) diff --git a/iroh-net/src/relay/iroh_relay.rs b/iroh-net/src/relay/iroh_relay.rs new file mode 100644 index 0000000000..928cdbaa8c --- /dev/null +++ b/iroh-net/src/relay/iroh_relay.rs @@ -0,0 +1,909 @@ +//! A full-fledged iroh-relay server. +//! +//! This module provides an API to run a full fledged iroh-relay server. It is primarily +//! used by the `iroh-relay` binary in this crate. It can be used to run a relay server in +//! other locations however. +//! +//! This code is fully written in a form of structured-concurrency: every spawned task is +//! always attached to a handle and when the handle is dropped the tasks abort. So tasks +//! can not outlive their handle. It is also always possible to await for completion of a +//! task. Some tasks additionally have a method to do graceful shutdown. + +use std::fmt; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; + +use anyhow::{anyhow, bail, Context, Result}; +use futures_lite::StreamExt; +use http::response::Builder as ResponseBuilder; +use http::{HeaderMap, Method, Request, Response, StatusCode}; +use hyper::body::Incoming; +use iroh_metrics::inc; +use tokio::net::{TcpListener, UdpSocket}; +use tokio::task::JoinSet; +use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; + +use crate::key::SecretKey; +use crate::relay; +use crate::relay::http::{ServerBuilder as RelayServerBuilder, TlsAcceptor}; +use crate::stun; +use crate::util::AbortingJoinHandle; + +// Module defined in this file. +use metrics::StunMetrics; + +const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge"; +const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response"; +const NOTFOUND: &[u8] = b"Not Found"; +const RELAY_DISABLED: &[u8] = b"relay server disabled"; +const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; +const INDEX: &[u8] = br#" +

Iroh Relay

+

+ This is an Iroh Relay server. +

+"#; +const TLS_HEADERS: [(&str, &str); 2] = [ + ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"), + ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'") +]; + +type BytesBody = http_body_util::Full; +type HyperError = Box; +type HyperResult = std::result::Result; + +/// Creates a new [`BytesBody`] with no content. +fn body_empty() -> BytesBody { + http_body_util::Full::new(hyper::body::Bytes::new()) +} + +/// Configuration for the full Relay & STUN server. +/// +/// Be aware the generic parameters are for when using the Let's Encrypt TLS configuration. +/// If not used dummy ones need to be provided, e.g. `ServerConfig::<(), ()>::default()`. +#[derive(Debug, Default)] +pub struct ServerConfig { + /// Configuration for the Relay server, disabled if `None`. + pub relay: Option>, + /// Configuration for the STUN server, disabled if `None`. + pub stun: Option, + /// Socket to serve metrics on. + #[cfg(feature = "metrics")] + pub metrics_addr: Option, +} + +/// Configuration for the Relay HTTP and HTTPS server. +/// +/// This includes the HTTP services hosted by the Relay server, the Relay `/derp` HTTP +/// endpoint is only one of the services served. +#[derive(Debug)] +pub struct RelayConfig { + /// The iroh secret key of the Relay server. + pub secret_key: SecretKey, + /// The socket address on which the Relay HTTP server should bind. + /// + /// Normally you'd choose port `80`. The bind address for the HTTPS server is + /// configured in [`RelayConfig::tls`]. + /// + /// If [`RelayConfig::tls`] is `None` then this serves all the HTTP services without + /// TLS. + pub http_bind_addr: SocketAddr, + /// TLS configuration for the HTTPS server. + /// + /// If *None* all the HTTP services that would be served here are served from + /// [`RelayConfig::http_bind_addr`]. + pub tls: Option>, + /// Rate limits. + pub limits: Limits, +} + +/// Configuration for the STUN server. +#[derive(Debug)] +pub struct StunConfig { + /// The socket address on which the STUN server should bind. + /// + /// Normally you'd chose port `3478`, see [`crate::defaults::DEFAULT_STUN_PORT`]. + pub bind_addr: SocketAddr, +} + +/// TLS configuration for Relay server. +/// +/// Normally the Relay server accepts connections on both HTTPS and HTTP. +#[derive(Debug)] +pub struct TlsConfig { + /// The socket address on which to serve the HTTPS server. + /// + /// Since the captive portal probe has to run over plain text HTTP and TLS is used for + /// the main relay server this has to be on a different port. When TLS is not enabled + /// this is served on the [`RelayConfig::http_bind_addr`] socket address. + /// + /// Normally you'd choose port `80`. + pub https_bind_addr: SocketAddr, + /// Mode for getting a cert. + pub cert: CertConfig, +} + +/// Rate limits. +#[derive(Debug, Default)] +pub struct Limits { + /// Rate limit for accepting new connection. Unlimited if not set. + pub accept_conn_limit: Option, + /// Burst limit for accepting new connection. Unlimited if not set. + pub accept_conn_burst: Option, +} + +/// TLS certificate configuration. +#[derive(derive_more::Debug)] +pub enum CertConfig { + /// Use Let's Encrypt. + LetsEncrypt { + /// Configuration for Let's Encrypt certificates. + #[debug("AcmeConfig")] + config: tokio_rustls_acme::AcmeConfig, + }, + /// Use a static TLS key and certificate chain. + Manual { + /// The TLS private key. + private_key: rustls::PrivateKey, + /// The TLS certificate chain. + certs: Vec, + }, +} + +/// A running Relay + STUN server. +/// +/// This is a full Relay server, including STUN, Relay and various associated HTTP services. +/// +/// Dropping this will stop the server. +#[derive(Debug)] +pub struct Server { + /// The address of the HTTP server, if configured. + http_addr: Option, + /// The address of the STUN server, if configured. + stun_addr: Option, + /// The address of the HTTPS server, if the relay server is using TLS. + /// + /// If the Relay server is not using TLS then it is served from the + /// [`Server::http_addr`]. + https_addr: Option, + /// Handle to the relay server. + relay_handle: Option, + /// The main task running the server. + supervisor: AbortingJoinHandle>, +} + +impl Server { + /// Starts the server. + pub async fn spawn(config: ServerConfig) -> Result + where + EC: fmt::Debug + 'static, + EA: fmt::Debug + 'static, + { + let mut tasks = JoinSet::new(); + + #[cfg(feature = "metrics")] + if let Some(addr) = config.metrics_addr { + debug!("Starting metrics server"); + use iroh_metrics::core::Metric; + + iroh_metrics::core::Core::init(|reg, metrics| { + metrics.insert(crate::metrics::RelayMetrics::new(reg)); + metrics.insert(StunMetrics::new(reg)); + }); + tasks.spawn( + iroh_metrics::metrics::start_metrics_server(addr) + .instrument(info_span!("metrics-server")), + ); + } + + // Start the STUN server. + let stun_addr = match config.stun { + Some(stun) => { + debug!("Starting STUN server"); + match UdpSocket::bind(stun.bind_addr).await { + Ok(sock) => { + let addr = sock.local_addr()?; + info!("STUN server bound on {addr}"); + tasks.spawn( + server_stun_listener(sock).instrument(info_span!("stun-server", %addr)), + ); + Some(addr) + } + Err(err) => bail!("failed to bind STUN listener: {err:#?}"), + } + } + None => None, + }; + + // Start the Relay server. + let (relay_server, http_addr) = match config.relay { + Some(relay_config) => { + debug!("Starting Relay server"); + let mut headers = HeaderMap::new(); + for (name, value) in TLS_HEADERS.iter() { + headers.insert(*name, value.parse()?); + } + let relay_bind_addr = match relay_config.tls { + Some(ref tls) => tls.https_bind_addr, + None => relay_config.http_bind_addr, + }; + let mut builder = RelayServerBuilder::new(relay_bind_addr) + .secret_key(Some(relay_config.secret_key)) + .headers(headers) + .relay_override(Box::new(relay_disabled_handler)) + .request_handler(Method::GET, "/", Box::new(root_handler)) + .request_handler(Method::GET, "/index.html", Box::new(root_handler)) + .request_handler(Method::GET, "/derp/probe", Box::new(probe_handler)) + .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); + let http_addr = match relay_config.tls { + Some(tls_config) => { + let server_config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth(); + let server_tls_config = match tls_config.cert { + CertConfig::LetsEncrypt { config } => { + let mut state = config.state(); + let server_config = + server_config.with_cert_resolver(state.resolver()); + let acceptor = TlsAcceptor::LetsEncrypt(state.acceptor()); + tasks.spawn( + async move { + while let Some(event) = state.next().await { + match event { + Ok(ok) => debug!("acme event: {ok:?}"), + Err(err) => error!("error: {err:?}"), + } + } + Err(anyhow!("acme event stream finished")) + } + .instrument(info_span!("acme")), + ); + Some(relay::http::TlsConfig { + config: Arc::new(server_config), + acceptor, + }) + } + CertConfig::Manual { private_key, certs } => { + let server_config = server_config + .with_single_cert(certs.clone(), private_key.clone())?; + let server_config = Arc::new(server_config); + let acceptor = + tokio_rustls::TlsAcceptor::from(server_config.clone()); + let acceptor = TlsAcceptor::Manual(acceptor); + Some(relay::http::TlsConfig { + config: server_config, + acceptor, + }) + } + }; + builder = builder.tls_config(server_tls_config); + + // Some services always need to be served over HTTP without TLS. Run + // these standalone. + let http_listener = TcpListener::bind(&relay_config.http_bind_addr) + .await + .context("failed to bind http")?; + let http_addr = http_listener.local_addr()?; + tasks.spawn( + run_captive_portal_service(http_listener) + .instrument(info_span!("http-service", addr = %http_addr)), + ); + Some(http_addr) + } + None => { + // If running Relay without TLS add the plain HTTP server directly + // to the Relay server. + builder = builder.request_handler( + Method::GET, + "/generate_204", + Box::new(serve_no_content_handler), + ); + None + } + }; + let relay_server = builder.spawn().await?; + (Some(relay_server), http_addr) + } + None => (None, None), + }; + // If http_addr is Some then relay_server is serving HTTPS. If http_addr is None + // relay_server is serving HTTP, including the /generate_204 service. + let relay_addr = relay_server.as_ref().map(|srv| srv.addr()); + let relay_handle = relay_server.as_ref().map(|srv| srv.handle()); + let relay_server = relay_server.map(RelayHttpServerGuard); + let task = tokio::spawn(relay_supervisor(tasks, relay_server)); + Ok(Self { + http_addr: http_addr.or(relay_addr), + stun_addr, + https_addr: http_addr.and(relay_addr), + relay_handle, + supervisor: AbortingJoinHandle::from(task), + }) + } + + /// Requests graceful shutdown. + /// + /// Returns once all server tasks have stopped. + pub async fn shutdown(self) -> Result<()> { + // Only the Relay server needs shutting down, the supervisor will abort the tasks in + // the JoinSet when the server terminates. + if let Some(handle) = self.relay_handle { + handle.shutdown(); + } + self.supervisor.await? + } + + /// Returns the handle for the task. + /// + /// This allows waiting for the server's supervisor task to finish. Can be useful in + /// case there is an error in the server before it is shut down. + pub fn task_handle(&mut self) -> &mut AbortingJoinHandle> { + &mut self.supervisor + } + + /// The socket address the HTTPS server is listening on. + pub fn https_addr(&self) -> Option { + self.https_addr + } + + /// The socket address the HTTP server is listening on. + pub fn http_addr(&self) -> Option { + self.http_addr + } + + /// The socket address the STUN server is listening on. + pub fn stun_addr(&self) -> Option { + self.stun_addr + } +} + +/// Horrible hack to make [`relay::http::Server`] behave somewhat. +/// +/// We need this server to abort on drop to achieve structured concurrency. +// TODO: could consider building this directly into the relay::http::Server +#[derive(Debug)] +struct RelayHttpServerGuard(relay::http::Server); + +impl Drop for RelayHttpServerGuard { + fn drop(&mut self) { + self.0.task_handle().abort(); + } +} + +/// Supervisor for the relay server tasks. +/// +/// As soon as one of the tasks exits, all other tasks are stopped and the server stops. +/// The supervisor finishes once all tasks are finished. +#[instrument(skip_all)] +async fn relay_supervisor( + mut tasks: JoinSet>, + mut relay_http_server: Option, +) -> Result<()> { + let res = match (relay_http_server.as_mut(), tasks.len()) { + (None, _) => tasks + .join_next() + .await + .unwrap_or_else(|| Ok(Err(anyhow!("Nothing to supervise")))), + (Some(relay), 0) => relay.0.task_handle().await.map(anyhow::Ok), + (Some(relay), _) => { + tokio::select! { + biased; + Some(ret) = tasks.join_next() => ret, + ret = relay.0.task_handle() => ret.map(anyhow::Ok), + else => Ok(Err(anyhow!("Empty JoinSet (unreachable)"))), + } + } + }; + let ret = match res { + Ok(Ok(())) => { + debug!("Task exited"); + Ok(()) + } + Ok(Err(err)) => { + error!(%err, "Task failed"); + Err(err.context("task failed")) + } + Err(err) => { + if let Ok(panic) = err.try_into_panic() { + error!("Task panicked"); + std::panic::resume_unwind(panic); + } + debug!("Task cancelled"); + Err(anyhow!("task cancelled")) + } + }; + + // Ensure the HTTP server terminated, there is no harm in calling this after it is + // already shut down. The JoinSet is aborted on drop. + if let Some(server) = relay_http_server { + server.0.shutdown(); + } + + tasks.shutdown().await; + + ret +} + +/// Runs a STUN server. +/// +/// When the future is dropped, the server stops. +async fn server_stun_listener(sock: UdpSocket) -> Result<()> { + info!(addr = ?sock.local_addr().ok(), "running STUN server"); + let sock = Arc::new(sock); + let mut buffer = vec![0u8; 64 << 10]; + let mut tasks = JoinSet::new(); + loop { + tokio::select! { + biased; + _ = tasks.join_next(), if !tasks.is_empty() => (), + res = sock.recv_from(&mut buffer) => { + match res { + Ok((n, src_addr)) => { + inc!(StunMetrics, requests); + let pkt = &buffer[..n]; + if !stun::is(pkt) { + debug!(%src_addr, "STUN: ignoring non stun packet"); + inc!(StunMetrics, bad_requests); + continue; + } + let pkt = pkt.to_vec(); + tasks.spawn(handle_stun_request(src_addr, pkt, sock.clone())); + } + Err(err) => { + inc!(StunMetrics, failures); + warn!("failed to recv: {err:#}"); + } + } + } + } + } +} + +/// Handles a single STUN request, doing all logging required. +async fn handle_stun_request(src_addr: SocketAddr, pkt: Vec, sock: Arc) { + let handle = AbortingJoinHandle::from(tokio::task::spawn_blocking(move || { + match stun::parse_binding_request(&pkt) { + Ok(txid) => { + debug!(%src_addr, %txid, "STUN: received binding request"); + Some((txid, stun::response(txid, src_addr))) + } + Err(err) => { + inc!(StunMetrics, bad_requests); + warn!(%src_addr, "STUN: invalid binding request: {:?}", err); + None + } + } + })); + let (txid, response) = match handle.await { + Ok(Some(val)) => val, + Ok(None) => return, + Err(err) => { + error!("{err:#}"); + return; + } + }; + match sock.send_to(&response, src_addr).await { + Ok(len) => { + if len != response.len() { + warn!( + %src_addr, + %txid, + "failed to write response, {len}/{} bytes sent", + response.len() + ); + } else { + match src_addr { + SocketAddr::V4(_) => inc!(StunMetrics, ipv4_success), + SocketAddr::V6(_) => inc!(StunMetrics, ipv6_success), + } + } + trace!(%src_addr, %txid, "sent {len} bytes"); + } + Err(err) => { + inc!(StunMetrics, failures); + warn!(%src_addr, %txid, "failed to write response: {err:#}"); + } + } +} + +fn relay_disabled_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::NOT_FOUND) + .body(RELAY_DISABLED.into()) + .map_err(|err| Box::new(err) as HyperError) +} + +fn root_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::OK) + .header("Content-Type", "text/html; charset=utf-8") + .body(INDEX.into()) + .map_err(|err| Box::new(err) as HyperError) +} + +/// HTTP latency queries +fn probe_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::OK) + .header("Access-Control-Allow-Origin", "*") + .body(body_empty()) + .map_err(|err| Box::new(err) as HyperError) +} + +fn robots_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::OK) + .body(ROBOTS_TXT.into()) + .map_err(|err| Box::new(err) as HyperError) +} + +/// For captive portal detection. +fn serve_no_content_handler( + r: Request, + mut response: ResponseBuilder, +) -> HyperResult> { + if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { + if !challenge.is_empty() + && challenge.len() < 64 + && challenge + .as_bytes() + .iter() + .all(|c| is_challenge_char(*c as char)) + { + response = response.header( + NO_CONTENT_RESPONSE_HEADER, + format!("response {}", challenge.to_str()?), + ); + } + } + + response + .status(StatusCode::NO_CONTENT) + .body(body_empty()) + .map_err(|err| Box::new(err) as HyperError) +} + +fn is_challenge_char(c: char) -> bool { + // Semi-randomly chosen as a limited set of valid characters + c.is_ascii_lowercase() + || c.is_ascii_uppercase() + || c.is_ascii_digit() + || c == '.' + || c == '-' + || c == '_' +} + +/// This is a future that never returns, drop it to cancel/abort. +async fn run_captive_portal_service(http_listener: TcpListener) -> Result<()> { + info!("serving"); + + // If this future is cancelled, this is dropped and all tasks are aborted. + let mut tasks = JoinSet::new(); + + loop { + match http_listener.accept().await { + Ok((stream, peer_addr)) => { + debug!(%peer_addr, "Connection opened",); + let handler = CaptivePortalService; + + tasks.spawn(async move { + let stream = relay::MaybeTlsStreamServer::Plain(stream); + let stream = hyper_util::rt::TokioIo::new(stream); + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection(stream, handler) + .with_upgrades() + .await + { + error!("Failed to serve connection: {err:?}"); + } + }); + } + Err(err) => { + error!( + "[CaptivePortalService] failed to accept connection: {:#?}", + err + ); + } + } + } +} + +#[derive(Clone)] +struct CaptivePortalService; + +impl hyper::service::Service> for CaptivePortalService { + type Response = Response; + type Error = HyperError; + type Future = Pin> + Send>>; + + fn call(&self, req: Request) -> Self::Future { + match (req.method(), req.uri().path()) { + // Captive Portal checker + (&Method::GET, "/generate_204") => { + Box::pin(async move { serve_no_content_handler(req, Response::builder()) }) + } + _ => { + // Return 404 not found response. + let r = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(NOTFOUND.into()) + .map_err(|err| Box::new(err) as HyperError); + Box::pin(async move { r }) + } + } + } +} + +mod metrics { + use iroh_metrics::{ + core::{Counter, Metric}, + struct_iterable::Iterable, + }; + + /// StunMetrics tracked for the DERPER + #[allow(missing_docs)] + #[derive(Debug, Clone, Iterable)] + pub struct StunMetrics { + /* + * Metrics about STUN requests over ipv6 + */ + /// Number of stun requests made + pub requests: Counter, + /// Number of successful requests over ipv4 + pub ipv4_success: Counter, + /// Number of successful requests over ipv6 + pub ipv6_success: Counter, + + /// Number of bad requests, either non-stun packets or incorrect binding request + pub bad_requests: Counter, + /// Number of failures + pub failures: Counter, + } + + impl Default for StunMetrics { + fn default() -> Self { + Self { + /* + * Metrics about STUN requests + */ + requests: Counter::new("Number of STUN requests made to the server."), + ipv4_success: Counter::new("Number of successful ipv4 STUN requests served."), + ipv6_success: Counter::new("Number of successful ipv6 STUN requests served."), + bad_requests: Counter::new("Number of bad requests made to the STUN endpoint."), + failures: Counter::new("Number of STUN requests that end in failure."), + } + } + } + + impl Metric for StunMetrics { + fn name() -> &'static str { + "stun" + } + } +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + use std::time::Duration; + + use bytes::Bytes; + use iroh_base::node_addr::RelayUrl; + + use crate::relay::http::ClientBuilder; + + use self::relay::ReceivedMessage; + + use super::*; + + #[tokio::test] + async fn test_no_services() { + let _guard = iroh_test::logging::setup(); + let mut server = Server::spawn(ServerConfig::<(), ()>::default()) + .await + .unwrap(); + let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle()) + .await + .expect("timeout, server not finished") + .expect("server task JoinError"); + assert!(res.is_err()); + } + + #[tokio::test] + async fn test_conflicting_bind() { + let _guard = iroh_test::logging::setup(); + let mut server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 1234).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: Some((Ipv4Addr::LOCALHOST, 1234).into()), + }) + .await + .unwrap(); + let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle()) + .await + .expect("timeout, server not finished") + .expect("server task JoinError"); + assert!(res.is_err()); // AddrInUse + } + + #[tokio::test] + async fn test_root_handler() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let url = format!("http://{}", server.http_addr().unwrap()); + + let response = reqwest::get(&url).await.unwrap(); + assert_eq!(response.status(), 200); + let body = response.text().await.unwrap(); + assert!(body.contains("iroh.computer")); + } + + #[tokio::test] + async fn test_captive_portal_service() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let url = format!("http://{}/generate_204", server.http_addr().unwrap()); + let challenge = "123az__."; + + let client = reqwest::Client::new(); + let response = client + .get(&url) + .header(NO_CONTENT_CHALLENGE_HEADER, challenge) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + let header = response.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); + assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); + let body = response.text().await.unwrap(); + assert!(body.is_empty()); + } + + #[tokio::test] + async fn test_relay_clients() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let relay_url = format!("http://{}", server.http_addr().unwrap()); + let relay_url: RelayUrl = relay_url.parse().unwrap(); + + // set up client a + let a_secret_key = SecretKey::generate(); + let a_key = a_secret_key.public(); + let resolver = crate::dns::default_resolver().clone(); + let (client_a, mut client_a_receiver) = + ClientBuilder::new(relay_url.clone()).build(a_secret_key, resolver); + let connect_client = client_a.clone(); + + // give the relay server some time to accept connections + if let Err(err) = tokio::time::timeout(Duration::from_secs(10), async move { + loop { + match connect_client.connect().await { + Ok(_) => break, + Err(err) => { + warn!("client unable to connect to relay server: {err:#}"); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + }) + .await + { + panic!("error connecting to relay server: {err:#}"); + } + + // set up client b + let b_secret_key = SecretKey::generate(); + let b_key = b_secret_key.public(); + let resolver = crate::dns::default_resolver().clone(); + let (client_b, mut client_b_receiver) = + ClientBuilder::new(relay_url.clone()).build(b_secret_key, resolver); + client_b.connect().await.unwrap(); + + // send message from a to b + let msg = Bytes::from("hello, b"); + client_a.send(b_key, msg.clone()).await.unwrap(); + + let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + if let ReceivedMessage::ReceivedPacket { source, data } = res { + assert_eq!(a_key, source); + assert_eq!(msg, data); + } else { + panic!("client_b received unexpected message {res:?}"); + } + + // send message from b to a + let msg = Bytes::from("howdy, a"); + client_b.send(a_key, msg.clone()).await.unwrap(); + + let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + if let ReceivedMessage::ReceivedPacket { source, data } = res { + assert_eq!(b_key, source); + assert_eq!(msg, data); + } else { + panic!("client_a received unexpected message {res:?}"); + } + } + + #[tokio::test] + async fn test_stun() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: None, + stun: Some(StunConfig { + bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + }), + metrics_addr: None, + }) + .await + .unwrap(); + + let txid = stun::TransactionId::default(); + let req = stun::request(txid); + let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + socket + .send_to(&req, server.stun_addr().unwrap()) + .await + .unwrap(); + + // get response + let mut buf = vec![0u8; 64000]; + let (len, addr) = socket.recv_from(&mut buf).await.unwrap(); + assert_eq!(addr, server.stun_addr().unwrap()); + buf.truncate(len); + let (txid_back, response_addr) = stun::parse_response(&buf).unwrap(); + assert_eq!(txid, txid_back); + assert_eq!(response_addr, socket.local_addr().unwrap()); + } +} diff --git a/iroh-net/src/relay/map.rs b/iroh-net/src/relay/map.rs index ede590a7ae..721fd778a1 100644 --- a/iroh-net/src/relay/map.rs +++ b/iroh-net/src/relay/map.rs @@ -5,7 +5,7 @@ use std::{collections::BTreeMap, fmt, sync::Arc}; use anyhow::{ensure, Result}; use serde::{Deserialize, Serialize}; -use crate::defaults::DEFAULT_RELAY_STUN_PORT; +use crate::defaults::DEFAULT_STUN_PORT; use super::RelayUrl; @@ -91,7 +91,7 @@ impl RelayMap { /// This will use the default STUN port and IP addresses resolved from the URL's host name via DNS. /// relay nodes are specified at <../../../docs/relay_nodes.md> pub fn from_url(url: RelayUrl) -> Self { - Self::default_from_node(url, DEFAULT_RELAY_STUN_PORT) + Self::default_from_node(url, DEFAULT_STUN_PORT) } /// Constructs the [`RelayMap] from an iterator of [`RelayNode`]s. diff --git a/iroh-net/src/relay/server.rs b/iroh-net/src/relay/server.rs index 38493c4601..05dbc60ad7 100644 --- a/iroh-net/src/relay/server.rs +++ b/iroh-net/src/relay/server.rs @@ -113,6 +113,13 @@ impl Server { } } + /// Aborts the server. + /// + /// You should prefer to use [`Server::close`] for a graceful shutdown. + pub fn abort(&self) { + self.cancel.cancel(); + } + /// Whether or not the relay [Server] is closed. pub fn is_closed(&self) -> bool { self.closed diff --git a/iroh-net/src/stun.rs b/iroh-net/src/stun.rs index b9ff7e6dde..e0ed936782 100644 --- a/iroh-net/src/stun.rs +++ b/iroh-net/src/stun.rs @@ -72,8 +72,8 @@ const COOKIE: [u8; 4] = 0x2112_A442u32.to_be_bytes(); /// Reports whether b is a STUN message. pub fn is(b: &[u8]) -> bool { b.len() >= stun_rs::MESSAGE_HEADER_SIZE && - b[0]&0b11000000 == 0 && // top two bits must be zero - b[4..8] == COOKIE + b[0]&0b11000000 == 0 && // top two bits must be zero + b[4..8] == COOKIE } /// Parses a STUN binding request. @@ -149,9 +149,10 @@ pub fn parse_response(b: &[u8]) -> Result<(TransactionId, SocketAddr), Error> { Err(Error::MalformedAttrs) } -#[cfg(any(test, feature = "test-utils"))] -pub(crate) mod test { - use std::{net::IpAddr, sync::Arc}; +#[cfg(test)] +pub(crate) mod tests { + use std::net::{IpAddr, Ipv4Addr}; + use std::sync::Arc; use anyhow::Result; use tokio::{ @@ -160,30 +161,28 @@ pub(crate) mod test { }; use tracing::{debug, trace}; - #[cfg(test)] use crate::relay::{RelayMap, RelayNode, RelayUrl}; use crate::test_utils::CleanupDropGuard; use super::*; + // TODO: make all this private + // (read_ipv4, read_ipv5) #[derive(Debug, Default, Clone)] pub struct StunStats(Arc>); impl StunStats { - #[cfg(test)] pub async fn total(&self) -> usize { let s = self.0.lock().await; s.0 + s.1 } } - #[cfg(test)] pub fn relay_map_of(stun: impl Iterator) -> RelayMap { relay_map_of_opts(stun.map(|addr| (addr, true))) } - #[cfg(test)] pub fn relay_map_of_opts(stun: impl Iterator) -> RelayMap { let nodes = stun.map(|(addr, stun_only)| { let host = addr.ip(); @@ -202,7 +201,6 @@ pub(crate) mod test { /// Sets up a simple STUN server binding to `0.0.0.0:0`. /// /// See [`serve`] for more details. - #[cfg(test)] pub(crate) async fn serve_v4() -> Result<(SocketAddr, StunStats, CleanupDropGuard)> { serve(std::net::Ipv4Addr::UNSPECIFIED.into()).await } @@ -272,13 +270,6 @@ pub(crate) mod test { } } } -} - -#[cfg(test)] -mod tests { - use std::net::{IpAddr, Ipv4Addr}; - - use super::*; // Test to check if an existing stun server works // #[tokio::test] diff --git a/iroh-net/src/test_utils.rs b/iroh-net/src/test_utils.rs index 0cbf8bd857..3188a7e128 100644 --- a/iroh-net/src/test_utils.rs +++ b/iroh-net/src/test_utils.rs @@ -2,7 +2,6 @@ use anyhow::Result; use tokio::sync::oneshot; -use tracing::{error_span, info_span, Instrument}; use crate::{ key::SecretKey, @@ -24,48 +23,51 @@ pub struct CleanupDropGuard(pub(crate) oneshot::Sender<()>); /// Runs a relay server with STUN enabled suitable for tests. /// -/// The returned `Url` is the url of the relay server in the returned [`RelayMap`], it -/// is always `Some` as that is how the [`Endpoint::connect`] API expects it. +/// The returned `Url` is the url of the relay server in the returned [`RelayMap`]. +/// When dropped, the returned [`Server`] does will stop running. /// -/// [`Endpoint::connect`]: crate::endpoint::Endpoint -pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, CleanupDropGuard)> { - let server_key = SecretKey::generate(); - let me = server_key.public().fmt_short(); - let tls_config = crate::relay::http::make_tls_config(); - let server = crate::relay::http::ServerBuilder::new("127.0.0.1:0".parse().unwrap()) - .secret_key(Some(server_key)) - .tls_config(Some(tls_config)) - .spawn() - .instrument(error_span!("relay server", %me)) - .await?; - - let https_addr = server.addr(); - println!("relay listening on {:?}", https_addr); - - let (stun_addr, _, stun_drop_guard) = crate::stun::test::serve(server.addr().ip()).await?; - let url: RelayUrl = format!("https://localhost:{}", https_addr.port()) +/// [`Server`]: crate::relay::iroh_relay::Server +pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, crate::relay::iroh_relay::Server)> { + use crate::relay::iroh_relay::{CertConfig, RelayConfig, ServerConfig, StunConfig, TlsConfig}; + use std::net::Ipv4Addr; + + let secret_key = SecretKey::generate(); + let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap(); + let rustls_cert = rustls::Certificate(cert.serialize_der().unwrap()); + let private_key = rustls::PrivateKey(cert.get_key_pair().serialize_der()); + + let config = ServerConfig { + relay: Some(RelayConfig { + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + secret_key, + tls: Some(TlsConfig { + cert: CertConfig::<(), ()>::Manual { + private_key, + certs: vec![rustls_cert], + }, + https_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + }), + limits: Default::default(), + }), + stun: Some(StunConfig { + bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + }), + #[cfg(feature = "metrics")] + metrics_addr: None, + }; + let server = crate::relay::iroh_relay::Server::spawn(config) + .await + .unwrap(); + let url: RelayUrl = format!("https://localhost:{}", server.https_addr().unwrap().port()) .parse() .unwrap(); let m = RelayMap::from_nodes([RelayNode { url: url.clone(), stun_only: false, - stun_port: stun_addr.port(), + stun_port: server.stun_addr().unwrap().port(), }]) - .expect("hardcoded"); - - let (tx, rx) = oneshot::channel(); - tokio::spawn( - async move { - let _stun_cleanup = stun_drop_guard; // move into this closure - - // Wait until we're dropped or receive a message. - rx.await.ok(); - server.shutdown().await; - } - .instrument(info_span!("relay-stun-cleanup")), - ); - - Ok((m, url, CleanupDropGuard(tx))) + .unwrap(); + Ok((m, url, server)) } pub(crate) mod dns_and_pkarr_servers { diff --git a/iroh/src/node.rs b/iroh/src/node.rs index e074efa7b1..ac1bee9548 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -14,8 +14,10 @@ use iroh_base::key::PublicKey; use iroh_blobs::downloader::Downloader; use iroh_blobs::store::Store as BaoStore; use iroh_docs::engine::Engine; +use iroh_net::endpoint::DirectAddrsStream; +use iroh_net::key::SecretKey; use iroh_net::util::AbortingJoinHandle; -use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint}; +use iroh_net::Endpoint; use quic_rpc::transport::flume::FlumeConnection; use quic_rpc::RpcClient; use tokio::task::JoinHandle; @@ -116,8 +118,8 @@ impl Node { } /// Lists the local endpoint of this node. - pub fn local_endpoints(&self) -> LocalEndpointsStream { - self.inner.endpoint.local_endpoints() + pub fn local_endpoints(&self) -> DirectAddrsStream { + self.inner.endpoint.direct_addresses() } /// Convenience method to get just the addr part of [`Node::local_endpoints`]. @@ -185,7 +187,7 @@ impl NodeInner { async fn local_endpoint_addresses(&self) -> Result> { let endpoints = self .endpoint - .local_endpoints() + .direct_addresses() .next() .await .ok_or(anyhow!("no endpoints found"))?; @@ -413,6 +415,7 @@ mod tests { } #[cfg(feature = "fs-store")] + #[ignore = "flaky"] #[tokio::test] async fn test_default_author_persist() -> Result<()> { use crate::util::path::IrohPaths; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 6209883388..69a9a451b4 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -522,10 +522,10 @@ where // spawn a task that updates the gossip endpoints. // TODO: track task - let mut stream = endpoint.local_endpoints(); + let mut stream = endpoint.direct_addresses(); tokio::task::spawn(async move { while let Some(eps) = stream.next().await { - if let Err(err) = gossip.update_endpoints(&eps) { + if let Err(err) = gossip.update_direct_addresses(&eps) { warn!("Failed to update gossip endpoints: {err:?}"); } } @@ -534,7 +534,7 @@ where // Wait for a single endpoint update, to make sure // we found some endpoints - tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next()) + tokio::time::timeout(ENDPOINT_WAIT, endpoint.direct_addresses().next()) .await .context("waiting for endpoint")? .context("no endpoints")?; @@ -564,9 +564,9 @@ where // forward our initial endpoints to the gossip protocol // it may happen the the first endpoint update callback is missed because the gossip cell // is only initialized once the endpoint is fully bound - if let Some(local_endpoints) = server.local_endpoints().next().await { + if let Some(local_endpoints) = server.direct_addresses().next().await { debug!(me = ?server.node_id(), "gossip initial update: {local_endpoints:?}"); - gossip.update_endpoints(&local_endpoints).ok(); + gossip.update_direct_addresses(&local_endpoints).ok(); } loop { tokio::select! { @@ -734,12 +734,12 @@ impl Default for GcPolicy { #[allow(clippy::too_many_arguments)] async fn handle_connection( connecting: iroh_net::endpoint::Connecting, - alpn: String, + alpn: Vec, node: Arc>, gossip: Gossip, sync: DocsEngine, ) -> Result<()> { - match alpn.as_bytes() { + match alpn.as_ref() { GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, DOCS_ALPN => sync.handle_connection(connecting).await?, alpn if alpn == iroh_blobs::protocol::ALPN => {