From 4fa46176e4677d07d3d8e0839dfb4946d7ac5e24 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 18 Jun 2024 13:57:51 +0200 Subject: [PATCH] refactor(iroh-net)!: Rework relay-server binary, more configurable, reverse-proxy support (#2341) ## Description This re-architects the relay-server binary. There is now a struct with detailed configuration which runs the entire server and aborts the server on drop. This simplifies running the server in various situations, including tests. The configuration is now done using a declarative struct, which supports more control over how it runs so it can be more easily used behind a reverse proxy, without TLS etc. This is aiming to fix #2177, #2179 and #2178. ## Breaking Changes The configuration file format has changed, deployments will need to updated. For the full format see `struct Config` in `iroh-net/src/bin/iroh-relay.rs`. Here a summary: - The 3 parts of the server now have an independent enable setting: `enable_relay`, `enable_stun` and `enable_metrics`. If omitted they default to `true`. - The way to specify which addresses the server listens on has changed: `http_bind_addr` is for the relay server, `stun_bind_addr` for the STUN server, `metrics_bind_addr` is for the optional metrics server and `tls.https_bind_addr` is for when TLS is enabled. Note these are now all full socket addresses. All have sensible defaults if omitted. - There are new options in `tls.cert_path` and `tls.key_path` which allow more control over where the manual TLS keys are to be read from. - `iroh_net::defaults::DEFAULT_RELAY_STUN_PORT` has been renamed to `iroh_net::defaults::DEFAULT_STUN_PORT`. TBD: some APIs changed as well. Why are they not all private? ## Notes & open questions * The `iroh_net::relay::iroh_relay` crate name is a bit weird. But `iroh_net::relay::server` is already taken. Maybe `iroh_net::relay::bin` could work, but that would be weird when using it from code in other places. * The `ServerConfig` struct is a declarative way of controlling the new server interface. It's kind of nice to use. Bu it is a public API that will be a breaking change every time it changes, and it will change. Maybe it's worth creating a builder API for this. But maybe that's something to only tackle when it is a real demand. I feel like the `iroh_net::relay::server` builders are an attempt at doing this earlier than needed. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-cli/src/commands/doctor.rs | 6 +- iroh-net/src/bin/iroh-relay.rs | 1122 +++++++--------------------- iroh-net/src/defaults.rs | 20 +- iroh-net/src/netcheck.rs | 14 +- iroh-net/src/netcheck/reportgen.rs | 4 +- iroh-net/src/relay.rs | 1 + iroh-net/src/relay/http.rs | 52 +- iroh-net/src/relay/http/server.rs | 144 +++- iroh-net/src/relay/iroh_relay.rs | 909 ++++++++++++++++++++++ iroh-net/src/relay/map.rs | 4 +- iroh-net/src/relay/server.rs | 7 + iroh-net/src/stun.rs | 25 +- iroh-net/src/test_utils.rs | 74 +- 13 files changed, 1396 insertions(+), 986 deletions(-) create mode 100644 iroh-net/src/relay/iroh_relay.rs diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 62d1eb69b34..a28f749cf66 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -27,7 +27,7 @@ use iroh::{ }, docs::{Capability, DocTicket}, net::{ - defaults::DEFAULT_RELAY_STUN_PORT, + defaults::DEFAULT_STUN_PORT, discovery::{ dns::DnsDiscovery, pkarr_publish::PkarrPublisher, ConcurrentDiscovery, Discovery, }, @@ -93,7 +93,7 @@ pub enum Commands { #[clap(long)] stun_host: Option, /// The port of the STUN server. - #[clap(long, default_value_t = DEFAULT_RELAY_STUN_PORT)] + #[clap(long, default_value_t = DEFAULT_STUN_PORT)] stun_port: u16, }, /// Wait for incoming requests from iroh doctor connect @@ -631,7 +631,7 @@ async fn passive_side(gui: Gui, connection: Connection) -> anyhow::Result<()> { } fn configure_local_relay_map() -> RelayMap { - let stun_port = DEFAULT_RELAY_STUN_PORT; + let stun_port = DEFAULT_STUN_PORT; let url = "http://localhost:3340".parse().unwrap(); RelayMap::default_from_node(url, stun_port) } diff --git a/iroh-net/src/bin/iroh-relay.rs b/iroh-net/src/bin/iroh-relay.rs index f9717a46a14..45e076e66c9 100644 --- a/iroh-net/src/bin/iroh-relay.rs +++ b/iroh-net/src/bin/iroh-relay.rs @@ -1,49 +1,28 @@ -//! A simple relay server. +//! A simple relay server for iroh-net. //! -//! Based on /tailscale/cmd/derper +//! This handles only the CLI and config file loading, the server implementation lives in +//! [`iroh_net::relay::iroh_relay`]. -use std::{ - borrow::Cow, - future::Future, - net::{IpAddr, Ipv6Addr, SocketAddr}, - path::{Path, PathBuf}, - pin::Pin, - sync::Arc, -}; +use std::net::{Ipv6Addr, SocketAddr}; +use std::path::{Path, PathBuf}; use anyhow::{anyhow, bail, Context as _, Result}; use clap::Parser; -use futures_lite::StreamExt; -use http::{response::Builder as ResponseBuilder, HeaderMap}; -use hyper::body::Incoming; -use hyper::{Method, Request, Response, StatusCode}; -use iroh_metrics::inc; -use iroh_net::defaults::{DEFAULT_RELAY_STUN_PORT, NA_RELAY_HOSTNAME}; -use iroh_net::key::SecretKey; -use iroh_net::relay::http::{ - ServerBuilder as RelayServerBuilder, TlsAcceptor, TlsConfig as RelayTlsConfig, +use iroh_net::defaults::{ + DEFAULT_HTTPS_PORT, DEFAULT_HTTP_PORT, DEFAULT_METRICS_PORT, DEFAULT_STUN_PORT, }; -use iroh_net::relay::{self}; -use iroh_net::stun; +use iroh_net::key::SecretKey; +use iroh_net::relay::iroh_relay; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use tokio::net::{TcpListener, UdpSocket}; use tokio_rustls_acme::{caches::DirCache, AcmeConfig}; -use tracing::{debug, debug_span, error, info, info_span, trace, warn, Instrument}; +use tracing::{debug, info}; use tracing_subscriber::{prelude::*, EnvFilter}; -use metrics::StunMetrics; - -type BytesBody = http_body_util::Full; -type HyperError = Box; -type HyperResult = std::result::Result; +/// The default `http_bind_port` when using `--dev`. +const DEV_MODE_HTTP_PORT: u16 = 3340; -/// Creates a new [`BytesBody`] with no content. -fn body_empty() -> BytesBody { - http_body_util::Full::new(hyper::body::Bytes::new()) -} - -/// A simple relay server. +/// A relay server for iroh-net. #[derive(Parser, Debug, Clone)] #[clap(version, about, long_about = None)] struct Cli { @@ -54,7 +33,10 @@ struct Cli { /// Running in dev mode will ignore any config file fields pertaining to TLS. #[clap(long, default_value_t = false)] dev: bool, - /// Config file path. Generate a default configuration file by supplying a path. + /// Path to the configuration file. + /// + /// If provided and no configuration file exists the default configuration will be + /// written to the file. #[clap(long, short)] config_path: Option, } @@ -65,73 +47,6 @@ enum CertMode { LetsEncrypt, } -impl CertMode { - async fn gen_server_config( - &self, - hostname: String, - contact: String, - is_production: bool, - dir: PathBuf, - ) -> Result<(Arc, TlsAcceptor)> { - let config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth(); - - match self { - CertMode::LetsEncrypt => { - let mut state = AcmeConfig::new(vec![hostname]) - .contact([format!("mailto:{contact}")]) - .cache_option(Some(DirCache::new(dir))) - .directory_lets_encrypt(is_production) - .state(); - - let config = config.with_cert_resolver(state.resolver()); - let acceptor = state.acceptor(); - - tokio::spawn( - async move { - while let Some(event) = state.next().await { - match event { - Ok(ok) => debug!("acme event: {:?}", ok), - Err(err) => error!("error: {:?}", err), - } - } - debug!("event stream finished"); - } - .instrument(info_span!("acme")), - ); - - Ok((Arc::new(config), TlsAcceptor::LetsEncrypt(acceptor))) - } - CertMode::Manual => { - // load certificates manually - let keyname = escape_hostname(&hostname); - let cert_path = dir.join(format!("{keyname}.crt")); - let key_path = dir.join(format!("{keyname}.key")); - - let (certs, secret_key) = tokio::task::spawn_blocking(move || { - let certs = load_certs(cert_path)?; - let key = load_secret_key(key_path)?; - anyhow::Ok((certs, key)) - }) - .await??; - - let config = config.with_single_cert(certs, secret_key)?; - let config = Arc::new(config); - let acceptor = tokio_rustls::TlsAcceptor::from(config.clone()); - - Ok((config, TlsAcceptor::Manual(acceptor))) - } - } - } -} - -fn escape_hostname(hostname: &str) -> Cow<'_, str> { - let unsafe_hostname_characters = - regex::Regex::new(r"[^a-zA-Z0-9-\.]").expect("regex manually checked"); - unsafe_hostname_characters.replace_all(hostname, "") -} - fn load_certs(filename: impl AsRef) -> Result> { let certfile = std::fs::File::open(filename).context("cannot open certificate file")?; let mut reader = std::io::BufReader::new(certfile); @@ -164,72 +79,203 @@ fn load_secret_key(filename: impl AsRef) -> Result { ); } +/// Configuration for the relay-server. +/// +/// This is (de)serialised to/from a TOML config file. #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct Config { - /// [`SecretKey`] for this relay server. + /// The iroh [`SecretKey`] for this relay server. + /// + /// If not specified a new key will be generated and the config file will be re-written + /// using it. #[serde_as(as = "DisplayFromStr")] #[serde(default = "SecretKey::generate")] secret_key: SecretKey, - /// Server listen address. + /// Whether to enable the Relay server. /// - /// Defaults to `[::]:443`. + /// Defaults to `true`. /// - /// If the port address is 443, the relay server will issue a warning if it is started - /// without a `tls` config. - addr: SocketAddr, - - /// The UDP port on which to serve STUN. The listener is bound to the same IP (if any) as - /// specified in the `addr` field. Defaults to [`DEFAULT_RELAY_STUN_PORT`]. - stun_port: u16, - /// Certificate hostname. Defaults to [`NA_RELAY_HOSTNAME`]. - hostname: String, + /// Disabling will leave only the STUN server. The `http_bind_addr` and `tls` + /// configuration options will be ignored. + #[serde(default = "cfg_defaults::enable_relay")] + enable_relay: bool, + /// The socket address to bind the Relay HTTP server on. + /// + /// Defaults to `[::]:80`. + /// + /// When running with `--dev` defaults to [::]:3340`. If specified overrides these + /// defaults. + /// + /// The Relay server always starts an HTTP server, this specifies the socket this will + /// be bound on. If there is no `tls` configuration set all the HTTP relay services + /// will be bound on this socket. Otherwise most Relay HTTP services will run on the + /// `https_bind_addr` of the `tls` configuration section and only the captive portal + /// will be served from the HTTP socket. + http_bind_addr: Option, + /// TLS specific configuration. + /// + /// TLS is disabled if not present and the Relay server will serve all services over + /// plain HTTP. + /// + /// If disabled all services will run on plain HTTP. The `--dev` option disables this, + /// regardless of what is in the configuration file. + tls: Option, /// Whether to run a STUN server. It will bind to the same IP as the `addr` field. /// /// Defaults to `true`. + #[serde(default = "cfg_defaults::enable_stun")] enable_stun: bool, - /// Whether to run a relay server. The only reason to set this false is if you're decommissioning a - /// server but want to keep its bootstrap DNS functionality still running. + /// The socket address to bind the STUN server on. /// - /// Defaults to `true` - enable_relay: bool, - /// TLS specific configuration - tls: Option, - /// Rate limiting configuration + /// Defaults to using the `http_bind_addr` with the port set to [`DEFAULT_STUN_PORT`]. + stun_bind_addr: Option, + /// Rate limiting configuration. + /// + /// Disabled if not present. limits: Option, - #[cfg(feature = "metrics")] - /// Metrics serve address. If not set, metrics are not served. - metrics_addr: Option, + /// Whether to run the metrics server. + /// + /// Defaults to `true`, when the metrics feature is enabled. + #[serde(default = "cfg_defaults::enable_metrics")] + enable_metrics: bool, + /// Metrics serve address. + /// + /// Defaults to `http_bind_addr` with the port set to [`DEFAULT_METRICS_PORT`] + /// (`[::]:9090` when `http_bind_addr` is set to the default). + metrics_bind_addr: Option, +} + +impl Config { + fn http_bind_addr(&self) -> SocketAddr { + self.http_bind_addr + .unwrap_or((Ipv6Addr::UNSPECIFIED, DEFAULT_HTTP_PORT).into()) + } + + fn stun_bind_addr(&self) -> SocketAddr { + self.stun_bind_addr + .unwrap_or_else(|| SocketAddr::new(self.http_bind_addr().ip(), DEFAULT_STUN_PORT)) + } + + fn metrics_bind_addr(&self) -> SocketAddr { + self.metrics_bind_addr + .unwrap_or_else(|| SocketAddr::new(self.http_bind_addr().ip(), DEFAULT_METRICS_PORT)) + } +} + +impl Default for Config { + fn default() -> Self { + Self { + secret_key: SecretKey::generate(), + enable_relay: true, + http_bind_addr: None, + tls: None, + enable_stun: true, + stun_bind_addr: None, + limits: None, + enable_metrics: true, + metrics_bind_addr: None, + } + } } -#[derive(Serialize, Deserialize)] +/// Defaults for fields from [`Config`]. +/// +/// These are the defaults that serde will fill in. Other defaults depends on each other +/// and can not immediately be substituded by serde. +mod cfg_defaults { + pub(crate) fn enable_relay() -> bool { + true + } + + pub(crate) fn enable_stun() -> bool { + true + } + + pub(crate) fn enable_metrics() -> bool { + true + } + + pub(crate) mod tls_config { + pub(crate) fn prod_tls() -> bool { + true + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] struct TlsConfig { - /// Mode for getting a cert. possible options: 'Manual', 'LetsEncrypt' - /// When using manual mode, a certificate will be read from `.crt` and a secret key from - /// `.key`, with the `` being the escaped hostname. + /// The socket address to bind the Relay HTTPS server on. + /// + /// Defaults to the `http_bind_addr` with the port set to `443`. + https_bind_addr: Option, + /// Certificate hostname when using LetsEncrypt. + hostname: Option, + /// Mode for getting a cert. + /// + /// Possible options: 'Manual', 'LetsEncrypt'. cert_mode: CertMode, + /// Directory to store LetsEncrypt certs or read manual certificates from. + /// + /// Defaults to the servers' current working directory. + cert_dir: Option, + /// Path of where to read the certificate from for the `Manual` `cert_mode`. + /// + /// Defaults to `/default.crt`. + /// + /// Only used when `cert_mode` is `Manual`. + manual_cert_path: Option, + /// Path of where to read the private key from for the `Manual` `cert_mode`. + /// + /// Defaults to `/default.key`. + /// + /// Only used when `cert_mode` is `Manual`. + manual_key_path: Option, /// Whether to use the LetsEncrypt production or staging server. /// - /// While in development, LetsEncrypt prefers you to use the staging server. However, the staging server seems to - /// only use `ECDSA` keys. In their current set up, you can only get intermediate certificates - /// for `ECDSA` keys if you are on their "allowlist". The production server uses `RSA` keys, - /// which allow for issuing intermediate certificates in all normal circumstances. - /// So, to have valid certificates, we must use the LetsEncrypt production server. - /// Read more here: - /// Default is true. This field is ignored if we are not using `cert_mode: CertMode::LetsEncrypt`. + /// Default is `true`. + /// + /// Only used when `cert_mode` is `LetsEncrypt`. + /// + /// While in development, LetsEncrypt prefers you to use the staging server. However, + /// the staging server seems to only use `ECDSA` keys. In their current set up, you can + /// only get intermediate certificates for `ECDSA` keys if you are on their + /// "allowlist". The production server uses `RSA` keys, which allow for issuing + /// intermediate certificates in all normal circumstances. So, to have valid + /// certificates, we must use the LetsEncrypt production server. Read more here: + /// . + #[serde(default = "cfg_defaults::tls_config::prod_tls")] prod_tls: bool, /// The contact email for the tls certificate. - contact: String, - /// Directory to store LetsEncrypt certs or read certificates from, if TLS is used. - cert_dir: Option, - /// The port on which to serve a response for the captive portal probe over HTTP. /// - /// The listener is bound to the same IP as specified in the `addr` field. Defaults to 80. - /// This field is only read in we are serving the relay server over HTTPS. In that case, we must listen for requests for the `/generate_204` over a non-TLS connection. - captive_portal_port: Option, + /// Used when `cert_mode` is `LetsEncrypt`. + contact: Option, } -#[derive(Serialize, Deserialize)] +impl TlsConfig { + fn https_bind_addr(&self, cfg: &Config) -> SocketAddr { + self.https_bind_addr + .unwrap_or_else(|| SocketAddr::new(cfg.http_bind_addr().ip(), DEFAULT_HTTPS_PORT)) + } + + fn cert_dir(&self) -> PathBuf { + self.cert_dir.clone().unwrap_or_else(|| PathBuf::from(".")) + } + + fn cert_path(&self) -> PathBuf { + self.manual_cert_path + .clone() + .unwrap_or_else(|| self.cert_dir().join("default.crt")) + } + + fn key_path(&self) -> PathBuf { + self.manual_key_path + .clone() + .unwrap_or_else(|| self.cert_dir().join("default.key")) + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] struct Limits { /// Rate limit for accepting new connection. Unlimited if not set. accept_conn_limit: Option, @@ -237,23 +283,6 @@ struct Limits { accept_conn_burst: Option, } -impl Default for Config { - fn default() -> Self { - Self { - secret_key: SecretKey::generate(), - addr: (Ipv6Addr::UNSPECIFIED, 443).into(), - stun_port: DEFAULT_RELAY_STUN_PORT, - hostname: NA_RELAY_HOSTNAME.into(), - enable_stun: true, - enable_relay: true, - tls: None, - limits: None, - #[cfg(feature = "metrics")] - metrics_addr: None, - } - } -} - impl Config { async fn load(opts: &Cli) -> Result { let config_path = if let Some(config_path) = &opts.config_path { @@ -274,12 +303,12 @@ impl Config { async fn read_from_file(path: impl AsRef) -> Result { if !path.as_ref().is_file() { - bail!("config-path must be a valid toml file"); + bail!("config-path must be a file"); } let config_ser = tokio::fs::read_to_string(&path) .await .context("unable to read config")?; - let config: Self = toml::from_str(&config_ser).context("unable to decode config")?; + let config: Self = toml::from_str(&config_ser).context("config file must be valid toml")?; if !config_ser.contains("secret_key") { info!("generating new secret key and updating config file"); config.write_to_file(path).await?; @@ -307,36 +336,6 @@ impl Config { } } -#[cfg(feature = "metrics")] -pub fn init_metrics_collection( - metrics_addr: Option, -) -> Option> { - use iroh_metrics::core::Metric; - - let rt = tokio::runtime::Handle::current(); - - // doesn't start the server if the address is None - if let Some(metrics_addr) = metrics_addr { - iroh_metrics::core::Core::init(|reg, metrics| { - metrics.insert(iroh_net::metrics::RelayMetrics::new(reg)); - metrics.insert(StunMetrics::new(reg)); - }); - - return Some(rt.spawn(async move { - if let Err(e) = iroh_metrics::metrics::start_metrics_server(metrics_addr).await { - eprintln!("Failed to start metrics server: {e}"); - } - })); - } - tracing::info!("Metrics server not started, no address provided"); - None -} - -/// Only used when in `dev` mode & the given port is `443` -const DEV_PORT: u16 = 3340; -/// Only used when tls is enabled & a captive protal port is not given -const DEFAULT_CAPTIVE_PORTAL_PORT: u16 = 80; - #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::registry() @@ -345,470 +344,100 @@ async fn main() -> Result<()> { .init(); let cli = Cli::parse(); - let cfg = Config::load(&cli).await?; - - #[cfg(feature = "metrics")] - let metrics_fut = init_metrics_collection(cfg.metrics_addr); - - let r = run(cli.dev, cfg, None).await; - - #[cfg(feature = "metrics")] - if let Some(metrics_fut) = metrics_fut { - metrics_fut.abort(); - drop(metrics_fut); - } - r -} - -async fn run( - dev_mode: bool, - cfg: Config, - addr_sender: Option>, -) -> Result<()> { - let (addr, tls_config) = if dev_mode { - let port = if cfg.addr.port() != 443 { - cfg.addr.port() - } else { - DEV_PORT - }; - - let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port); - info!(%addr, "Running in dev mode."); - (addr, None) - } else { - (cfg.addr, cfg.tls) - }; - - if let Some(tls_config) = &tls_config { - if let Some(captive_portal_port) = tls_config.captive_portal_port { - if addr.port() == captive_portal_port { - bail!("The main listening address {addr:?} and the `captive_portal_port` have the same port number."); - } + let mut cfg = Config::load(&cli).await?; + if cli.dev { + cfg.tls = None; + if cfg.http_bind_addr.is_none() { + cfg.http_bind_addr = Some((Ipv6Addr::UNSPECIFIED, DEV_MODE_HTTP_PORT).into()); } - } else if addr.port() == 443 { - // no tls config, but the port is 443 - warn!("The address port is 443, which is typically the expected tls port, but you have not supplied any tls configuration.\nIf you meant to run the relay server with tls enabled, adjust the config file to include tls configuration."); } + let relay_config = build_relay_config(cfg).await?; + debug!("{relay_config:#?}"); - // set up relay configuration details - let secret_key = if cfg.enable_relay { - Some(cfg.secret_key) - } else { - None - }; + let mut relay = iroh_relay::Server::spawn(relay_config).await?; - // run stun - let stun_task = if cfg.enable_stun { - Some(tokio::task::spawn(async move { - serve_stun(addr.ip(), cfg.stun_port).await - })) - } else { - None - }; - - // set up tls configuration details - let (tls_config, headers, captive_portal_port) = if let Some(tls_config) = tls_config { - let contact = tls_config.contact; - let is_production = tls_config.prod_tls; - let (config, acceptor) = tls_config - .cert_mode - .gen_server_config( - cfg.hostname.clone(), - contact, - is_production, - tls_config.cert_dir.unwrap_or_else(|| PathBuf::from(".")), - ) - .await?; - let mut headers = HeaderMap::new(); - for (name, value) in TLS_HEADERS.iter() { - headers.insert(*name, value.parse()?); - } - ( - Some(RelayTlsConfig { config, acceptor }), - headers, - tls_config - .captive_portal_port - .unwrap_or(DEFAULT_CAPTIVE_PORTAL_PORT), - ) - } else { - (None, HeaderMap::new(), 0) - }; - - let mut builder = RelayServerBuilder::new(addr) - .secret_key(secret_key.map(Into::into)) - .headers(headers) - .tls_config(tls_config.clone()) - .relay_override(Box::new(relay_disabled_handler)) - .request_handler(Method::GET, "/", Box::new(root_handler)) - .request_handler(Method::GET, "/index.html", Box::new(root_handler)) - .request_handler(Method::GET, "/derp/probe", Box::new(probe_handler)) - .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); - // if tls is enabled, we need to serve this endpoint from a non-tls connection - // which we check for below - if tls_config.is_none() { - builder = builder.request_handler( - Method::GET, - "/generate_204", - Box::new(serve_no_content_handler), - ); + tokio::select! { + biased; + _ = tokio::signal::ctrl_c() => (), + _ = relay.task_handle() => (), } - let relay_server = builder.spawn().await?; - // captive portal detections must be served over HTTP - let captive_portal_task = if tls_config.is_some() { - let http_addr = SocketAddr::new(addr.ip(), captive_portal_port); - let task = serve_captive_portal_service(http_addr).await?; - Some(task) - } else { - None - }; - - if let Some(addr_sender) = addr_sender { - if let Err(e) = addr_sender.send(relay_server.addr()) { - bail!("Unable to send the local SocketAddr, the Sender was dropped - {e:?}"); - } - } - - tokio::signal::ctrl_c().await?; - // Shutdown all tasks - if let Some(task) = stun_task { - task.abort(); - } - if let Some(task) = captive_portal_task { - task.abort() - } - relay_server.shutdown().await; - - Ok(()) + relay.shutdown().await } -const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge"; -const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response"; - -const NOTFOUND: &[u8] = b"Not Found"; -const RELAY_DISABLED: &[u8] = b"relay server disabled"; -const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; -const INDEX: &[u8] = br#" -

RELAY

-

- This is an - Iroh Relay - server. -

-"#; - -const TLS_HEADERS: [(&str, &str); 2] = [ - ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"), - ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'") -]; - -async fn serve_captive_portal_service(addr: SocketAddr) -> Result> { - let http_listener = TcpListener::bind(&addr) - .await - .context("failed to bind http")?; - let http_addr = http_listener.local_addr()?; - info!("[CaptivePortalService]: serving on {}", http_addr); - - let task = tokio::spawn( - async move { - loop { - match http_listener.accept().await { - Ok((stream, peer_addr)) => { - debug!( - "[CaptivePortalService] Connection opened from {}", - peer_addr - ); - let handler = CaptivePortalService; - - tokio::task::spawn(async move { - let stream = relay::MaybeTlsStreamServer::Plain(stream); - let stream = hyper_util::rt::TokioIo::new(stream); - if let Err(err) = hyper::server::conn::http1::Builder::new() - .serve_connection(stream, handler) - .with_upgrades() - .await - { - error!( - "[CaptivePortalService] Failed to serve connection: {:?}", - err - ); - } - }); - } - Err(err) => { - error!( - "[CaptivePortalService] failed to accept connection: {:#?}", - err - ); - } +/// Convert the TOML-loaded config to the [`iroh_relay::RelayConfig`] format. +async fn build_relay_config(cfg: Config) -> Result> { + let tls = match cfg.tls { + Some(ref tls) => { + let cert_config = match tls.cert_mode { + CertMode::Manual => { + let cert_path = tls.cert_path(); + let key_path = tls.key_path(); + // Could probably just do this blocking, we're only starting up. + let (private_key, certs) = tokio::task::spawn_blocking(move || { + let key = load_secret_key(key_path)?; + let certs = load_certs(cert_path)?; + anyhow::Ok((key, certs)) + }) + .await??; + iroh_relay::CertConfig::Manual { private_key, certs } } - } - } - .instrument(info_span!("captive-portal.service")), - ); - Ok(task) -} - -#[derive(Clone)] -struct CaptivePortalService; - -impl hyper::service::Service> for CaptivePortalService { - type Response = Response; - type Error = HyperError; - type Future = Pin> + Send>>; - - fn call(&self, req: Request) -> Self::Future { - match (req.method(), req.uri().path()) { - // Captive Portal checker - (&Method::GET, "/generate_204") => { - Box::pin(async move { serve_no_content_handler(req, Response::builder()) }) - } - _ => { - // Return 404 not found response. - let r = Response::builder() - .status(StatusCode::NOT_FOUND) - .body(NOTFOUND.into()) - .map_err(|err| Box::new(err) as HyperError); - Box::pin(async move { r }) - } - } - } -} - -fn relay_disabled_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::NOT_FOUND) - .body(RELAY_DISABLED.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn root_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .header("Content-Type", "text/html; charset=utf-8") - .body(INDEX.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -/// HTTP latency queries -fn probe_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .header("Access-Control-Allow-Origin", "*") - .body(body_empty()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn robots_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .body(ROBOTS_TXT.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -/// For captive portal detection. -fn serve_no_content_handler( - r: Request, - mut response: ResponseBuilder, -) -> HyperResult> { - if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { - if !challenge.is_empty() - && challenge.len() < 64 - && challenge - .as_bytes() - .iter() - .all(|c| is_challenge_char(*c as char)) - { - response = response.header( - NO_CONTENT_RESPONSE_HEADER, - format!("response {}", challenge.to_str()?), - ); - } - } - - response - .status(StatusCode::NO_CONTENT) - .body(body_empty()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn is_challenge_char(c: char) -> bool { - // Semi-randomly chosen as a limited set of valid characters - c.is_ascii_lowercase() - || c.is_ascii_uppercase() - || c.is_ascii_digit() - || c == '.' - || c == '-' - || c == '_' -} - -async fn serve_stun(host: IpAddr, port: u16) { - match UdpSocket::bind((host, port)).await { - Ok(sock) => { - let addr = sock.local_addr().expect("socket just bound"); - info!(%addr, "running STUN server"); - server_stun_listener(sock) - .instrument(debug_span!("stun_server", %addr)) - .await; - } - Err(err) => { - error!( - "failed to open STUN listener at host {host} and port {port}: {:#?}", - err - ); - } - } -} - -async fn server_stun_listener(sock: UdpSocket) { - let sock = Arc::new(sock); - let mut buffer = vec![0u8; 64 << 10]; - loop { - match sock.recv_from(&mut buffer).await { - Ok((n, src_addr)) => { - inc!(StunMetrics, requests); - let pkt = buffer[..n].to_vec(); - let sock = sock.clone(); - tokio::task::spawn(async move { - if !stun::is(&pkt) { - debug!(%src_addr, "STUN: ignoring non stun packet"); - inc!(StunMetrics, bad_requests); - return; - } - match tokio::task::spawn_blocking(move || stun::parse_binding_request(&pkt)) - .await - { - Ok(Ok(txid)) => { - debug!(%src_addr, %txid, "STUN: received binding request"); - let res = match tokio::task::spawn_blocking(move || { - stun::response(txid, src_addr) - }) - .await - { - Ok(res) => res, - Err(err) => { - error!("JoinError: {err:#}"); - return; - } - }; - match sock.send_to(&res, src_addr).await { - Ok(len) => { - if len != res.len() { - warn!(%src_addr, %txid, "STUN: failed to write response sent: {}, but expected {}", len, res.len()); - } - match src_addr { - SocketAddr::V4(_) => { - inc!(StunMetrics, ipv4_success); - } - SocketAddr::V6(_) => { - inc!(StunMetrics, ipv6_success); - } - } - trace!(%src_addr, %txid, "STUN: sent {} bytes", len); - } - Err(err) => { - inc!(StunMetrics, failures); - warn!(%src_addr, %txid, "STUN: failed to write response: {:?}", err); - } - } - } - Ok(Err(err)) => { - inc!(StunMetrics, bad_requests); - warn!(%src_addr, "STUN: invalid binding request: {:?}", err); - } - Err(err) => error!("JoinError parsing STUN binding: {err:#}"), - } - }); - } - Err(err) => { - inc!(StunMetrics, failures); - warn!("STUN: failed to recv: {:?}", err); - } + CertMode::LetsEncrypt => { + let hostname = tls + .hostname + .clone() + .context("LetsEncrypt needs a hostname")?; + let contact = tls + .contact + .clone() + .context("LetsEncrypt needs a contact email")?; + let config = AcmeConfig::new(vec![hostname.clone()]) + .contact([format!("mailto:{}", contact)]) + .cache_option(Some(DirCache::new(tls.cert_dir()))) + .directory_lets_encrypt(tls.prod_tls); + iroh_relay::CertConfig::LetsEncrypt { config } + } + }; + Some(iroh_relay::TlsConfig { + https_bind_addr: tls.https_bind_addr(&cfg), + cert: cert_config, + }) } - } + None => None, + }; + let limits = iroh_relay::Limits { + accept_conn_limit: cfg + .limits + .as_ref() + .map(|l| l.accept_conn_limit) + .unwrap_or_default(), + accept_conn_burst: cfg + .limits + .as_ref() + .map(|l| l.accept_conn_burst) + .unwrap_or_default(), + }; + let relay_config = iroh_relay::RelayConfig { + secret_key: cfg.secret_key.clone(), + http_bind_addr: cfg.http_bind_addr(), + tls, + limits, + }; + let stun_config = iroh_relay::StunConfig { + bind_addr: cfg.stun_bind_addr(), + }; + Ok(iroh_relay::ServerConfig { + relay: Some(relay_config), + stun: Some(stun_config), + #[cfg(feature = "metrics")] + metrics_addr: if cfg.enable_metrics { + Some(cfg.metrics_bind_addr()) + } else { + None + }, + }) } -// var validProdHostname = regexp.MustCompile(`^relay([^.]*)\.tailscale\.com\.?$`) - -// func prodAutocertHostPolicy(_ context.Context, host string) error { -// if validProdHostname.MatchString(host) { -// return nil -// } -// return errors.New("invalid hostname") -// } - -// func rateLimitedListenAndServeTLS(srv *http.Server) error { -// addr := srv.Addr -// if addr == "" { -// addr = ":https" -// } -// ln, err := net.Listen("tcp", addr) -// if err != nil { -// return err -// } -// rln := newRateLimitedListener(ln, rate.Limit(*acceptConnLimit), *acceptConnBurst) -// expvar.Publish("tls_listener", rln.ExpVar()) -// defer rln.Close() -// return srv.ServeTLS(rln, "", "") -// } - -// type rateLimitedListener struct { -// // These are at the start of the struct to ensure 64-bit alignment -// // on 32-bit architecture regardless of what other fields may exist -// // in this package. -// numAccepts expvar.Int // does not include number of rejects -// numRejects expvar.Int - -// net.Listener - -// lim *rate.Limiter -// } - -// func newRateLimitedListener(ln net.Listener, limit rate.Limit, burst int) *rateLimitedListener { -// return &rateLimitedListener{Listener: ln, lim: rate.NewLimiter(limit, burst)} -// } - -// func (l *rateLimitedListener) ExpVar() expvar.Var { -// m := new(metrics.Set) -// m.Set("counter_accepted_connections", &l.numAccepts) -// m.Set("counter_rejected_connections", &l.numRejects) -// return m -// } - -// var errLimitedConn = errors.New("cannot accept connection; rate limited") - -// func (l *rateLimitedListener) Accept() (net.Conn, error) { -// // Even under a rate limited situation, we accept the connection immediately -// // and close it, rather than being slow at accepting new connections. -// // This provides two benefits: 1) it signals to the client that something -// // is going on on the server, and 2) it prevents new connections from -// // piling up and occupying resources in the OS kernel. -// // The client will retry as needing (with backoffs in place). -// cn, err := l.Listener.Accept() -// if err != nil { -// return nil, err -// } -// if !l.lim.Allow() { -// l.numRejects.Add(1) -// cn.Close() -// return nil, errLimitedConn -// } -// l.numAccepts.Add(1) -// return cn, nil -// } -// mod metrics { use iroh_metrics::{ core::{Counter, Metric}, @@ -856,206 +485,3 @@ mod metrics { } } } - -#[cfg(test)] -mod tests { - use super::*; - - use std::net::Ipv4Addr; - use std::time::Duration; - - use bytes::Bytes; - use http_body_util::BodyExt; - use iroh_base::node_addr::RelayUrl; - use iroh_net::relay::http::ClientBuilder; - use iroh_net::relay::ReceivedMessage; - use tokio::task::JoinHandle; - - #[tokio::test] - async fn test_serve_no_content_handler() { - let challenge = "123az__."; - let req = Request::builder() - .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .body(body_empty()) - .unwrap(); - - let res = serve_no_content_handler(req, Response::builder()).unwrap(); - assert_eq!(res.status(), StatusCode::NO_CONTENT); - - let header = res - .headers() - .get(NO_CONTENT_RESPONSE_HEADER) - .unwrap() - .to_str() - .unwrap(); - assert_eq!(header, format!("response {challenge}")); - assert!(res - .into_body() - .collect() - .await - .unwrap() - .to_bytes() - .is_empty()); - } - - #[test] - fn test_escape_hostname() { - assert_eq!( - escape_hostname("hello.host.name_foo-bar%baz"), - "hello.host.namefoo-barbaz" - ); - } - - struct DropServer { - server_task: JoinHandle<()>, - } - - impl Drop for DropServer { - fn drop(&mut self) { - self.server_task.abort(); - } - } - - #[tokio::test] - async fn test_relay_server_basic() -> Result<()> { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); - // Binding to LOCALHOST to satisfy issues when binding to UNSPECIFIED in Windows for tests - // Binding to Ipv4 because, when binding to `IPv6::UNSPECIFIED`, it will also listen for - // IPv4 connections, but will not automatically do the same for `LOCALHOST`. In order to - // test STUN, which only listens on Ipv4, we must bind the whole relay server to Ipv4::LOCALHOST. - let cfg = Config { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), - ..Default::default() - }; - let (addr_send, addr_recv) = tokio::sync::oneshot::channel(); - let relay_server_task = tokio::spawn( - async move { - // dev mode will bind to IPv6::UNSPECIFIED, so setting it `false` - let res = run(false, cfg, Some(addr_send)).await; - if let Err(e) = res { - eprintln!("error starting relay server {e}"); - } - } - .instrument(debug_span!("relay server")), - ); - let _drop_server = DropServer { - server_task: relay_server_task, - }; - - let relay_server_addr = addr_recv.await?; - let relay_server_str_url = format!("http://{}", relay_server_addr); - let relay_server_url: RelayUrl = relay_server_str_url.parse().unwrap(); - - // set up clients - let a_secret_key = SecretKey::generate(); - let a_key = a_secret_key.public(); - let resolver = iroh_net::dns::default_resolver().clone(); - let (client_a, mut client_a_receiver) = - ClientBuilder::new(relay_server_url.clone()).build(a_secret_key, resolver); - let connect_client = client_a.clone(); - - // give the relay server some time to set up - if let Err(e) = tokio::time::timeout(Duration::from_secs(10), async move { - loop { - match connect_client.connect().await { - Ok(_) => break, - Err(e) => { - tracing::warn!("client a unable to connect to relay server: {e:?}. Attempting to dial again in 10ms"); - tokio::time::sleep(Duration::from_millis(100)).await - } - } - } - }) - .await - { - bail!("error connecting client a to relay server: {e:?}"); - } - - let b_secret_key = SecretKey::generate(); - let b_key = b_secret_key.public(); - let resolver = iroh_net::dns::default_resolver().clone(); - let (client_b, mut client_b_receiver) = - ClientBuilder::new(relay_server_url.clone()).build(b_secret_key, resolver); - client_b.connect().await?; - - let msg = Bytes::from("hello, b"); - client_a.send(b_key, msg.clone()).await?; - - let (res, _) = client_b_receiver.recv().await.unwrap()?; - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(a_key, source); - assert_eq!(msg, data); - } else { - bail!("client_b received unexpected message {res:?}"); - } - - let msg = Bytes::from("howdy, a"); - client_b.send(a_key, msg.clone()).await?; - - let (res, _) = client_a_receiver.recv().await.unwrap()?; - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(b_key, source); - assert_eq!(msg, data); - } else { - bail!("client_a received unexpected message {res:?}"); - } - - // run stun check - let stun_addr: SocketAddr = - SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 3478); - - let txid = stun::TransactionId::default(); - let req = stun::request(txid); - let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap()); - - let server_socket = socket.clone(); - let server_task = tokio::task::spawn(async move { - let mut buf = vec![0u8; 64000]; - let len = server_socket.recv(&mut buf).await.unwrap(); - dbg!(len); - buf.truncate(len); - buf - }); - - tracing::info!("sending stun request to {stun_addr}"); - if let Err(e) = socket.send_to(&req, stun_addr).await { - bail!("socket.send_to error: {e:?}"); - } - - let response = server_task.await.unwrap(); - let (txid_back, response_addr) = stun::parse_response(&response).unwrap(); - assert_eq!(txid, txid_back); - tracing::info!("got {response_addr}"); - - // get 200 home page response - tracing::info!("send request for homepage"); - let res = reqwest::get(relay_server_str_url).await?; - assert!(res.status().is_success()); - tracing::info!("got OK"); - - // test captive portal - tracing::info!("test captive portal response"); - - let url = relay_server_url.join("/generate_204")?; - let challenge = "123az__."; - let client = reqwest::Client::new(); - let res = client - .get(url) - .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .send() - .await?; - assert_eq!(StatusCode::NO_CONTENT.as_u16(), res.status().as_u16()); - let header = res.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); - assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); - let body = res.bytes().await?; - assert!(body.is_empty()); - - tracing::info!("got successful captive portal response"); - - Ok(()) - } -} diff --git a/iroh-net/src/defaults.rs b/iroh-net/src/defaults.rs index 22e6b1d6e9e..7d0237a6295 100644 --- a/iroh-net/src/defaults.rs +++ b/iroh-net/src/defaults.rs @@ -9,8 +9,20 @@ pub const NA_RELAY_HOSTNAME: &str = "use1-1.relay.iroh.network."; /// Hostname of the default EU relay. pub const EU_RELAY_HOSTNAME: &str = "euw1-1.relay.iroh.network."; -/// STUN port as defined by [RFC 8489]() -pub const DEFAULT_RELAY_STUN_PORT: u16 = 3478; +/// The default STUN port used by the Relay server. +/// +/// The STUN port as defined by [RFC +/// 8489]() +pub const DEFAULT_STUN_PORT: u16 = 3478; + +/// The default HTTP port used by the Relay server. +pub const DEFAULT_HTTP_PORT: u16 = 80; + +/// The default HTTPS port used by the Relay server. +pub const DEFAULT_HTTPS_PORT: u16 = 443; + +/// The default metrics port used by the Relay server. +pub const DEFAULT_METRICS_PORT: u16 = 9090; /// Get the default [`RelayMap`]. pub fn default_relay_map() -> RelayMap { @@ -27,7 +39,7 @@ pub fn default_na_relay_node() -> RelayNode { RelayNode { url: url.into(), stun_only: false, - stun_port: DEFAULT_RELAY_STUN_PORT, + stun_port: DEFAULT_STUN_PORT, } } @@ -40,6 +52,6 @@ pub fn default_eu_relay_node() -> RelayNode { RelayNode { url: url.into(), stun_only: false, - stun_port: DEFAULT_RELAY_STUN_PORT, + stun_port: DEFAULT_STUN_PORT, } } diff --git a/iroh-net/src/netcheck.rs b/iroh-net/src/netcheck.rs index 062368c1c81..391e174202a 100644 --- a/iroh-net/src/netcheck.rs +++ b/iroh-net/src/netcheck.rs @@ -785,7 +785,7 @@ mod tests { use tokio::time; use tracing::info; - use crate::defaults::{DEFAULT_RELAY_STUN_PORT, EU_RELAY_HOSTNAME}; + use crate::defaults::{DEFAULT_STUN_PORT, EU_RELAY_HOSTNAME}; use crate::ping::Pinger; use crate::relay::RelayNode; @@ -795,11 +795,11 @@ mod tests { async fn test_basic() -> Result<()> { let _guard = iroh_test::logging::setup(); let (stun_addr, stun_stats, _cleanup_guard) = - stun::test::serve("0.0.0.0".parse().unwrap()).await?; + stun::tests::serve("127.0.0.1".parse().unwrap()).await?; let resolver = crate::dns::default_resolver(); let mut client = Client::new(None, resolver.clone())?; - let dm = stun::test::relay_map_of([stun_addr].into_iter()); + let dm = stun::tests::relay_map_of([stun_addr].into_iter()); // Note that the ProbePlan will change with each iteration. for i in 0..5 { @@ -842,7 +842,7 @@ mod tests { let dm = RelayMap::from_nodes([RelayNode { url: url.clone(), stun_only: true, - stun_port: DEFAULT_RELAY_STUN_PORT, + stun_port: DEFAULT_STUN_PORT, }]) .expect("hardcoded"); @@ -890,7 +890,7 @@ mod tests { // the STUN server being blocked will look like from the client's perspective. let blackhole = tokio::net::UdpSocket::bind("127.0.0.1:0").await?; let stun_addr = blackhole.local_addr()?; - let dm = stun::test::relay_map_of_opts([(stun_addr, false)].into_iter()); + let dm = stun::tests::relay_map_of_opts([(stun_addr, false)].into_iter()); // Now create a client and generate a report. let resolver = crate::dns::default_resolver().clone(); @@ -1127,8 +1127,8 @@ mod tests { // can easily use to identify the packet. // Setup STUN server and create relay_map. - let (stun_addr, _stun_stats, _done) = stun::test::serve_v4().await?; - let dm = stun::test::relay_map_of([stun_addr].into_iter()); + let (stun_addr, _stun_stats, _done) = stun::tests::serve_v4().await?; + let dm = stun::tests::relay_map_of([stun_addr].into_iter()); dbg!(&dm); let resolver = crate::dns::default_resolver().clone(); diff --git a/iroh-net/src/netcheck/reportgen.rs b/iroh-net/src/netcheck/reportgen.rs index 665ea66e548..b683d500a1e 100644 --- a/iroh-net/src/netcheck/reportgen.rs +++ b/iroh-net/src/netcheck/reportgen.rs @@ -31,7 +31,7 @@ use tokio::time::{self, Instant}; use tracing::{debug, debug_span, error, info_span, trace, warn, Instrument, Span}; use super::NetcheckMetrics; -use crate::defaults::DEFAULT_RELAY_STUN_PORT; +use crate::defaults::DEFAULT_STUN_PORT; use crate::dns::{DnsResolver, ResolverExt}; use crate::net::interfaces; use crate::net::ip; @@ -935,7 +935,7 @@ async fn get_relay_addr( proto: ProbeProto, ) -> Result { let port = if relay_node.stun_port == 0 { - DEFAULT_RELAY_STUN_PORT + DEFAULT_STUN_PORT } else { relay_node.stun_port }; diff --git a/iroh-net/src/relay.rs b/iroh-net/src/relay.rs index 13dc332f752..88213f0635c 100644 --- a/iroh-net/src/relay.rs +++ b/iroh-net/src/relay.rs @@ -15,6 +15,7 @@ pub(crate) mod client_conn; pub(crate) mod clients; mod codec; pub mod http; +pub mod iroh_relay; mod map; mod metrics; pub(crate) mod server; diff --git a/iroh-net/src/relay/http.rs b/iroh-net/src/relay/http.rs index e73da2de735..cd3d7519bf8 100644 --- a/iroh-net/src/relay/http.rs +++ b/iroh-net/src/relay/http.rs @@ -6,32 +6,10 @@ mod server; pub(crate) mod streams; pub use self::client::{Client, ClientBuilder, ClientError, ClientReceiver}; -pub use self::server::{Server, ServerBuilder, TlsAcceptor, TlsConfig}; +pub use self::server::{Server, ServerBuilder, ServerHandle, TlsAcceptor, TlsConfig}; pub(crate) const HTTP_UPGRADE_PROTOCOL: &str = "iroh derp http"; -#[cfg(any(test, feature = "test-utils"))] -pub(crate) fn make_tls_config() -> TlsConfig { - let subject_alt_names = vec!["localhost".to_string()]; - - let cert = rcgen::generate_simple_self_signed(subject_alt_names).unwrap(); - let rustls_certificate = rustls::Certificate(cert.serialize_der().unwrap()); - let rustls_key = rustls::PrivateKey(cert.get_key_pair().serialize_der()); - let config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(vec![(rustls_certificate)], rustls_key) - .unwrap(); - - let config = std::sync::Arc::new(config); - let acceptor = tokio_rustls::TlsAcceptor::from(config.clone()); - - TlsConfig { - config, - acceptor: TlsAcceptor::Manual(acceptor), - } -} - #[cfg(test)] mod tests { use super::*; @@ -47,6 +25,27 @@ mod tests { use crate::key::{PublicKey, SecretKey}; use crate::relay::ReceivedMessage; + pub(crate) fn make_tls_config() -> TlsConfig { + let subject_alt_names = vec!["localhost".to_string()]; + + let cert = rcgen::generate_simple_self_signed(subject_alt_names).unwrap(); + let rustls_certificate = rustls::Certificate(cert.serialize_der().unwrap()); + let rustls_key = rustls::PrivateKey(cert.get_key_pair().serialize_der()); + let config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(vec![(rustls_certificate)], rustls_key) + .unwrap(); + + let config = std::sync::Arc::new(config); + let acceptor = tokio_rustls::TlsAcceptor::from(config.clone()); + + TlsConfig { + config, + acceptor: TlsAcceptor::Manual(acceptor), + } + } + #[tokio::test] async fn test_http_clients_and_server() -> Result<()> { let _guard = iroh_test::logging::setup(); @@ -115,7 +114,7 @@ mod tests { client_a_task.abort(); client_b.close().await?; client_b_task.abort(); - server.shutdown().await; + server.shutdown(); Ok(()) } @@ -186,7 +185,7 @@ mod tests { let tls_config = make_tls_config(); // start server - let server = ServerBuilder::new("127.0.0.1:0".parse().unwrap()) + let mut server = ServerBuilder::new("127.0.0.1:0".parse().unwrap()) .secret_key(Some(server_key)) .tls_config(Some(tls_config)) .spawn() @@ -232,7 +231,8 @@ mod tests { assert_eq!(b_key, got_key); assert_eq!(msg, got_msg); - server.shutdown().await; + server.shutdown(); + server.task_handle().await?; client_a.close().await?; client_a_task.abort(); client_b.close().await?; diff --git a/iroh-net/src/relay/http/server.rs b/iroh-net/src/relay/http/server.rs index a102458a3e6..eaf6ffd70a6 100644 --- a/iroh-net/src/relay/http/server.rs +++ b/iroh-net/src/relay/http/server.rs @@ -18,7 +18,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::task::JoinHandle; use tokio_rustls_acme::AcmeAcceptor; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, Instrument}; use crate::key::SecretKey; use crate::relay::http::HTTP_UPGRADE_PROTOCOL; @@ -70,30 +70,68 @@ async fn relay_connection_handler( conn_handler.accept(io).await } -/// A Relay Server handler. Created using [`ServerBuilder::spawn`], it starts a relay server -/// listening over HTTP or HTTPS. +/// The Relay HTTP server. +/// +/// A running HTTP server serving the relay endpoint and optionally a number of additional +/// HTTP services added with [`ServerBuilder::request_handler`]. If configured using +/// [`ServerBuilder::tls_config`] the server will handle TLS as well. +/// +/// Created using [`ServerBuilder::spawn`]. #[derive(Debug)] pub struct Server { addr: SocketAddr, - server: Option, http_server_task: JoinHandle<()>, cancel_server_loop: CancellationToken, } impl Server { - /// Close the underlying relay server and the HTTP(S) server task - pub async fn shutdown(self) { - if let Some(server) = self.server { - server.close().await; + /// Returns a handle for this server. + /// + /// The server runs in the background as several async tasks. This allows controlling + /// the server, in particular it allows gracefully shutting down the server. + pub fn handle(&self) -> ServerHandle { + ServerHandle { + addr: self.addr, + cancel_token: self.cancel_server_loop.clone(), } + } + /// Closes the underlying relay server and the HTTP(S) server tasks. + pub fn shutdown(&self) { self.cancel_server_loop.cancel(); - if let Err(e) = self.http_server_task.await { - warn!("Error shutting down server: {e:?}"); - } } - /// Get the local address of this server. + /// Returns the [`JoinHandle`] for the supervisor task managing the server. + /// + /// This is the root of all the tasks for the server. Aborting it will abort all the + /// other tasks for the server. Awaiting it will complete when all the server tasks are + /// completed. + pub fn task_handle(&mut self) -> &mut JoinHandle<()> { + &mut self.http_server_task + } + + /// Returns the local address of this server. + pub fn addr(&self) -> SocketAddr { + self.addr + } +} + +/// A handle for the [`Server`]. +/// +/// This does not allow access to the task but can communicate with it. +#[derive(Debug, Clone)] +pub struct ServerHandle { + addr: SocketAddr, + cancel_token: CancellationToken, +} + +impl ServerHandle { + /// Gracefully shut down the server. + pub fn shutdown(&self) { + self.cancel_token.cancel() + } + + /// Returns the address the server is bound on. pub fn addr(&self) -> SocketAddr { self.addr } @@ -108,13 +146,15 @@ pub struct TlsConfig { pub acceptor: TlsAcceptor, } -/// Build a Relay Server that communicates over HTTP or HTTPS, on a given address. +/// Builder for the Relay HTTP Server. /// -/// Defaults to handling relay requests on the "/derp" endpoint. +/// Defaults to handling relay requests on the "/derp" endpoint. Other HTTP endpoints can +/// be added using [`ServerBuilder::request_handler`]. /// -/// If no [`SecretKey`] is provided, it is assumed that you will provide a `relay_override` function -/// that handles requests to the relay endpoint. Not providing a `relay_override` in this case will -/// result in an error on `spawn`. +/// If no [`SecretKey`] is provided, it is assumed that you will provide a +/// [`ServerBuilder::relay_override`] function that handles requests to the relay +/// endpoint. Not providing a [`ServerBuilder::relay_override`] in this case will result in +/// an error on `spawn`. #[derive(derive_more::Debug)] pub struct ServerBuilder { /// The secret key for this Server. @@ -128,18 +168,21 @@ pub struct ServerBuilder { /// /// When `None`, the server will serve HTTP, otherwise it will serve HTTPS. tls_config: Option, - /// A map of request handlers to routes. Used when certain routes in your server should be made - /// available at the same port as the relay server, and so must be handled along side requests - /// to the relay endpoint. + /// A map of request handlers to routes. + /// + /// Used when certain routes in your server should be made available at the same port as + /// the relay server, and so must be handled along side requests to the relay endpoint. handlers: Handlers, /// Defaults to `GET` request at "/derp". relay_endpoint: &'static str, - /// Use a custom relay response handler. Typically used when you want to disable any relay connections. + /// Use a custom relay response handler. + /// + /// Typically used when you want to disable any relay connections. #[debug("{}", relay_override.as_ref().map_or("None", |_| "Some(Box, ResponseBuilder) -> Result + Send + Sync + 'static>)"))] relay_override: Option, - /// Headers to use for HTTP or HTTPS messages. + /// Headers to use for HTTP responses. headers: HeaderMap, - /// 404 not found response + /// 404 not found response. /// /// When `None`, a default is provided. #[debug("{}", not_found_fn.as_ref().map_or("None", |_| "Some(Box Result> + Send + Sync + 'static>)"))] @@ -147,7 +190,7 @@ pub struct ServerBuilder { } impl ServerBuilder { - /// Create a new [ServerBuilder] + /// Creates a new [ServerBuilder]. pub fn new(addr: SocketAddr) -> Self { Self { secret_key: None, @@ -161,20 +204,21 @@ impl ServerBuilder { } } - /// The [`SecretKey`] identity for this relay server. When set to `None`, the builder assumes - /// you do not want to run a relay service. + /// The [`SecretKey`] identity for this relay server. + /// + /// When set to `None`, the builder assumes you do not want to run a relay service. pub fn secret_key(mut self, secret_key: Option) -> Self { self.secret_key = secret_key; self } - /// Serve relay content using TLS. + /// Serves all requests content using TLS. pub fn tls_config(mut self, config: Option) -> Self { self.tls_config = config; self } - /// Add a custom handler for a specific Method & URI. + /// Adds a custom handler for a specific Method & URI. pub fn request_handler( mut self, method: Method, @@ -185,26 +229,29 @@ impl ServerBuilder { self } - /// Pass in a custom "404" handler. + /// Sets a custom "404" handler. pub fn not_found_handler(mut self, handler: HyperHandler) -> Self { self.not_found_fn = Some(handler); self } - /// Handle the relay endpoint in a custom way. This is required if no [`SecretKey`] was provided - /// to the builder. + /// Handles the relay endpoint in a custom way. + /// + /// This is required if no [`SecretKey`] was provided to the builder. pub fn relay_override(mut self, handler: HyperHandler) -> Self { self.relay_override = Some(handler); self } - /// Change the relay endpoint from "/derp" to `endpoint`. + /// Sets a custom endpoint for the relay handler. + /// + /// The default is `/derp`. pub fn relay_endpoint(mut self, endpoint: &'static str) -> Self { self.relay_endpoint = endpoint; self } - /// Add http headers. + /// Adds HTTP headers to responses. pub fn headers(mut self, headers: HeaderMap) -> Self { for (k, v) in headers.iter() { self.headers.insert(k.clone(), v.clone()); @@ -212,10 +259,14 @@ impl ServerBuilder { self } - /// Build and spawn an HTTP(S) relay Server + /// Builds and spawns an HTTP(S) Relay Server. pub async fn spawn(self) -> Result { - ensure!(self.secret_key.is_some() || self.relay_override.is_some(), "Must provide a `SecretKey` for the relay server OR pass in an override function for the 'relay' endpoint"); + ensure!( + self.secret_key.is_some() || self.relay_override.is_some(), + "Must provide a `SecretKey` for the relay server OR pass in an override function for the 'relay' endpoint" + ); let (relay_handler, relay_server) = if let Some(secret_key) = self.secret_key { + // spawns a server actor/task let server = crate::relay::server::Server::new(secret_key.clone()); ( RelayHandler::ConnHandler(server.client_conn_handler(self.headers.clone())), @@ -258,6 +309,7 @@ impl ServerBuilder { service, }; + // Spawns some server tasks, we only wait till all tasks are started. server_state.serve().await } } @@ -274,13 +326,19 @@ impl ServerState { // Binds a TCP listener on `addr` and handles content using HTTPS. // Returns the local [`SocketAddr`] on which the server is listening. async fn serve(self) -> Result { - let listener = TcpListener::bind(&self.addr) + let ServerState { + addr, + tls_config, + server, + service, + } = self; + let listener = TcpListener::bind(&addr) .await - .context("failed to bind https")?; + .context("failed to bind server socket")?; // we will use this cancel token to stop the infinite loop in the `listener.accept() task` let cancel_server_loop = CancellationToken::new(); let addr = listener.local_addr()?; - let http_str = self.tls_config.as_ref().map_or("HTTP", |_| "HTTPS"); + let http_str = tls_config.as_ref().map_or("HTTP", |_| "HTTPS"); info!("[{http_str}] relay: serving on {addr}"); let cancel = cancel_server_loop.clone(); let task = tokio::task::spawn(async move { @@ -295,8 +353,8 @@ impl ServerState { res = listener.accept() => match res { Ok((stream, peer_addr)) => { debug!("[{http_str}] relay: Connection opened from {peer_addr}"); - let tls_config = self.tls_config.clone(); - let service = self.service.clone(); + let tls_config = tls_config.clone(); + let service = service.clone(); // spawn a task to handle the connection set.spawn(async move { if let Err(error) = service @@ -320,13 +378,17 @@ impl ServerState { } } } + if let Some(server) = server { + // TODO: if the task this is running in is aborted this server is not shut + // down. + server.close().await; + } set.shutdown().await; debug!("[{http_str}] relay: server has been shutdown."); }.instrument(info_span!("relay-http-serve"))); Ok(Server { addr, - server: self.server, http_server_task: task, cancel_server_loop, }) diff --git a/iroh-net/src/relay/iroh_relay.rs b/iroh-net/src/relay/iroh_relay.rs new file mode 100644 index 00000000000..928cdbaa8c5 --- /dev/null +++ b/iroh-net/src/relay/iroh_relay.rs @@ -0,0 +1,909 @@ +//! A full-fledged iroh-relay server. +//! +//! This module provides an API to run a full fledged iroh-relay server. It is primarily +//! used by the `iroh-relay` binary in this crate. It can be used to run a relay server in +//! other locations however. +//! +//! This code is fully written in a form of structured-concurrency: every spawned task is +//! always attached to a handle and when the handle is dropped the tasks abort. So tasks +//! can not outlive their handle. It is also always possible to await for completion of a +//! task. Some tasks additionally have a method to do graceful shutdown. + +use std::fmt; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; + +use anyhow::{anyhow, bail, Context, Result}; +use futures_lite::StreamExt; +use http::response::Builder as ResponseBuilder; +use http::{HeaderMap, Method, Request, Response, StatusCode}; +use hyper::body::Incoming; +use iroh_metrics::inc; +use tokio::net::{TcpListener, UdpSocket}; +use tokio::task::JoinSet; +use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; + +use crate::key::SecretKey; +use crate::relay; +use crate::relay::http::{ServerBuilder as RelayServerBuilder, TlsAcceptor}; +use crate::stun; +use crate::util::AbortingJoinHandle; + +// Module defined in this file. +use metrics::StunMetrics; + +const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge"; +const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response"; +const NOTFOUND: &[u8] = b"Not Found"; +const RELAY_DISABLED: &[u8] = b"relay server disabled"; +const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; +const INDEX: &[u8] = br#" +

Iroh Relay

+

+ This is an Iroh Relay server. +

+"#; +const TLS_HEADERS: [(&str, &str); 2] = [ + ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"), + ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'") +]; + +type BytesBody = http_body_util::Full; +type HyperError = Box; +type HyperResult = std::result::Result; + +/// Creates a new [`BytesBody`] with no content. +fn body_empty() -> BytesBody { + http_body_util::Full::new(hyper::body::Bytes::new()) +} + +/// Configuration for the full Relay & STUN server. +/// +/// Be aware the generic parameters are for when using the Let's Encrypt TLS configuration. +/// If not used dummy ones need to be provided, e.g. `ServerConfig::<(), ()>::default()`. +#[derive(Debug, Default)] +pub struct ServerConfig { + /// Configuration for the Relay server, disabled if `None`. + pub relay: Option>, + /// Configuration for the STUN server, disabled if `None`. + pub stun: Option, + /// Socket to serve metrics on. + #[cfg(feature = "metrics")] + pub metrics_addr: Option, +} + +/// Configuration for the Relay HTTP and HTTPS server. +/// +/// This includes the HTTP services hosted by the Relay server, the Relay `/derp` HTTP +/// endpoint is only one of the services served. +#[derive(Debug)] +pub struct RelayConfig { + /// The iroh secret key of the Relay server. + pub secret_key: SecretKey, + /// The socket address on which the Relay HTTP server should bind. + /// + /// Normally you'd choose port `80`. The bind address for the HTTPS server is + /// configured in [`RelayConfig::tls`]. + /// + /// If [`RelayConfig::tls`] is `None` then this serves all the HTTP services without + /// TLS. + pub http_bind_addr: SocketAddr, + /// TLS configuration for the HTTPS server. + /// + /// If *None* all the HTTP services that would be served here are served from + /// [`RelayConfig::http_bind_addr`]. + pub tls: Option>, + /// Rate limits. + pub limits: Limits, +} + +/// Configuration for the STUN server. +#[derive(Debug)] +pub struct StunConfig { + /// The socket address on which the STUN server should bind. + /// + /// Normally you'd chose port `3478`, see [`crate::defaults::DEFAULT_STUN_PORT`]. + pub bind_addr: SocketAddr, +} + +/// TLS configuration for Relay server. +/// +/// Normally the Relay server accepts connections on both HTTPS and HTTP. +#[derive(Debug)] +pub struct TlsConfig { + /// The socket address on which to serve the HTTPS server. + /// + /// Since the captive portal probe has to run over plain text HTTP and TLS is used for + /// the main relay server this has to be on a different port. When TLS is not enabled + /// this is served on the [`RelayConfig::http_bind_addr`] socket address. + /// + /// Normally you'd choose port `80`. + pub https_bind_addr: SocketAddr, + /// Mode for getting a cert. + pub cert: CertConfig, +} + +/// Rate limits. +#[derive(Debug, Default)] +pub struct Limits { + /// Rate limit for accepting new connection. Unlimited if not set. + pub accept_conn_limit: Option, + /// Burst limit for accepting new connection. Unlimited if not set. + pub accept_conn_burst: Option, +} + +/// TLS certificate configuration. +#[derive(derive_more::Debug)] +pub enum CertConfig { + /// Use Let's Encrypt. + LetsEncrypt { + /// Configuration for Let's Encrypt certificates. + #[debug("AcmeConfig")] + config: tokio_rustls_acme::AcmeConfig, + }, + /// Use a static TLS key and certificate chain. + Manual { + /// The TLS private key. + private_key: rustls::PrivateKey, + /// The TLS certificate chain. + certs: Vec, + }, +} + +/// A running Relay + STUN server. +/// +/// This is a full Relay server, including STUN, Relay and various associated HTTP services. +/// +/// Dropping this will stop the server. +#[derive(Debug)] +pub struct Server { + /// The address of the HTTP server, if configured. + http_addr: Option, + /// The address of the STUN server, if configured. + stun_addr: Option, + /// The address of the HTTPS server, if the relay server is using TLS. + /// + /// If the Relay server is not using TLS then it is served from the + /// [`Server::http_addr`]. + https_addr: Option, + /// Handle to the relay server. + relay_handle: Option, + /// The main task running the server. + supervisor: AbortingJoinHandle>, +} + +impl Server { + /// Starts the server. + pub async fn spawn(config: ServerConfig) -> Result + where + EC: fmt::Debug + 'static, + EA: fmt::Debug + 'static, + { + let mut tasks = JoinSet::new(); + + #[cfg(feature = "metrics")] + if let Some(addr) = config.metrics_addr { + debug!("Starting metrics server"); + use iroh_metrics::core::Metric; + + iroh_metrics::core::Core::init(|reg, metrics| { + metrics.insert(crate::metrics::RelayMetrics::new(reg)); + metrics.insert(StunMetrics::new(reg)); + }); + tasks.spawn( + iroh_metrics::metrics::start_metrics_server(addr) + .instrument(info_span!("metrics-server")), + ); + } + + // Start the STUN server. + let stun_addr = match config.stun { + Some(stun) => { + debug!("Starting STUN server"); + match UdpSocket::bind(stun.bind_addr).await { + Ok(sock) => { + let addr = sock.local_addr()?; + info!("STUN server bound on {addr}"); + tasks.spawn( + server_stun_listener(sock).instrument(info_span!("stun-server", %addr)), + ); + Some(addr) + } + Err(err) => bail!("failed to bind STUN listener: {err:#?}"), + } + } + None => None, + }; + + // Start the Relay server. + let (relay_server, http_addr) = match config.relay { + Some(relay_config) => { + debug!("Starting Relay server"); + let mut headers = HeaderMap::new(); + for (name, value) in TLS_HEADERS.iter() { + headers.insert(*name, value.parse()?); + } + let relay_bind_addr = match relay_config.tls { + Some(ref tls) => tls.https_bind_addr, + None => relay_config.http_bind_addr, + }; + let mut builder = RelayServerBuilder::new(relay_bind_addr) + .secret_key(Some(relay_config.secret_key)) + .headers(headers) + .relay_override(Box::new(relay_disabled_handler)) + .request_handler(Method::GET, "/", Box::new(root_handler)) + .request_handler(Method::GET, "/index.html", Box::new(root_handler)) + .request_handler(Method::GET, "/derp/probe", Box::new(probe_handler)) + .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); + let http_addr = match relay_config.tls { + Some(tls_config) => { + let server_config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth(); + let server_tls_config = match tls_config.cert { + CertConfig::LetsEncrypt { config } => { + let mut state = config.state(); + let server_config = + server_config.with_cert_resolver(state.resolver()); + let acceptor = TlsAcceptor::LetsEncrypt(state.acceptor()); + tasks.spawn( + async move { + while let Some(event) = state.next().await { + match event { + Ok(ok) => debug!("acme event: {ok:?}"), + Err(err) => error!("error: {err:?}"), + } + } + Err(anyhow!("acme event stream finished")) + } + .instrument(info_span!("acme")), + ); + Some(relay::http::TlsConfig { + config: Arc::new(server_config), + acceptor, + }) + } + CertConfig::Manual { private_key, certs } => { + let server_config = server_config + .with_single_cert(certs.clone(), private_key.clone())?; + let server_config = Arc::new(server_config); + let acceptor = + tokio_rustls::TlsAcceptor::from(server_config.clone()); + let acceptor = TlsAcceptor::Manual(acceptor); + Some(relay::http::TlsConfig { + config: server_config, + acceptor, + }) + } + }; + builder = builder.tls_config(server_tls_config); + + // Some services always need to be served over HTTP without TLS. Run + // these standalone. + let http_listener = TcpListener::bind(&relay_config.http_bind_addr) + .await + .context("failed to bind http")?; + let http_addr = http_listener.local_addr()?; + tasks.spawn( + run_captive_portal_service(http_listener) + .instrument(info_span!("http-service", addr = %http_addr)), + ); + Some(http_addr) + } + None => { + // If running Relay without TLS add the plain HTTP server directly + // to the Relay server. + builder = builder.request_handler( + Method::GET, + "/generate_204", + Box::new(serve_no_content_handler), + ); + None + } + }; + let relay_server = builder.spawn().await?; + (Some(relay_server), http_addr) + } + None => (None, None), + }; + // If http_addr is Some then relay_server is serving HTTPS. If http_addr is None + // relay_server is serving HTTP, including the /generate_204 service. + let relay_addr = relay_server.as_ref().map(|srv| srv.addr()); + let relay_handle = relay_server.as_ref().map(|srv| srv.handle()); + let relay_server = relay_server.map(RelayHttpServerGuard); + let task = tokio::spawn(relay_supervisor(tasks, relay_server)); + Ok(Self { + http_addr: http_addr.or(relay_addr), + stun_addr, + https_addr: http_addr.and(relay_addr), + relay_handle, + supervisor: AbortingJoinHandle::from(task), + }) + } + + /// Requests graceful shutdown. + /// + /// Returns once all server tasks have stopped. + pub async fn shutdown(self) -> Result<()> { + // Only the Relay server needs shutting down, the supervisor will abort the tasks in + // the JoinSet when the server terminates. + if let Some(handle) = self.relay_handle { + handle.shutdown(); + } + self.supervisor.await? + } + + /// Returns the handle for the task. + /// + /// This allows waiting for the server's supervisor task to finish. Can be useful in + /// case there is an error in the server before it is shut down. + pub fn task_handle(&mut self) -> &mut AbortingJoinHandle> { + &mut self.supervisor + } + + /// The socket address the HTTPS server is listening on. + pub fn https_addr(&self) -> Option { + self.https_addr + } + + /// The socket address the HTTP server is listening on. + pub fn http_addr(&self) -> Option { + self.http_addr + } + + /// The socket address the STUN server is listening on. + pub fn stun_addr(&self) -> Option { + self.stun_addr + } +} + +/// Horrible hack to make [`relay::http::Server`] behave somewhat. +/// +/// We need this server to abort on drop to achieve structured concurrency. +// TODO: could consider building this directly into the relay::http::Server +#[derive(Debug)] +struct RelayHttpServerGuard(relay::http::Server); + +impl Drop for RelayHttpServerGuard { + fn drop(&mut self) { + self.0.task_handle().abort(); + } +} + +/// Supervisor for the relay server tasks. +/// +/// As soon as one of the tasks exits, all other tasks are stopped and the server stops. +/// The supervisor finishes once all tasks are finished. +#[instrument(skip_all)] +async fn relay_supervisor( + mut tasks: JoinSet>, + mut relay_http_server: Option, +) -> Result<()> { + let res = match (relay_http_server.as_mut(), tasks.len()) { + (None, _) => tasks + .join_next() + .await + .unwrap_or_else(|| Ok(Err(anyhow!("Nothing to supervise")))), + (Some(relay), 0) => relay.0.task_handle().await.map(anyhow::Ok), + (Some(relay), _) => { + tokio::select! { + biased; + Some(ret) = tasks.join_next() => ret, + ret = relay.0.task_handle() => ret.map(anyhow::Ok), + else => Ok(Err(anyhow!("Empty JoinSet (unreachable)"))), + } + } + }; + let ret = match res { + Ok(Ok(())) => { + debug!("Task exited"); + Ok(()) + } + Ok(Err(err)) => { + error!(%err, "Task failed"); + Err(err.context("task failed")) + } + Err(err) => { + if let Ok(panic) = err.try_into_panic() { + error!("Task panicked"); + std::panic::resume_unwind(panic); + } + debug!("Task cancelled"); + Err(anyhow!("task cancelled")) + } + }; + + // Ensure the HTTP server terminated, there is no harm in calling this after it is + // already shut down. The JoinSet is aborted on drop. + if let Some(server) = relay_http_server { + server.0.shutdown(); + } + + tasks.shutdown().await; + + ret +} + +/// Runs a STUN server. +/// +/// When the future is dropped, the server stops. +async fn server_stun_listener(sock: UdpSocket) -> Result<()> { + info!(addr = ?sock.local_addr().ok(), "running STUN server"); + let sock = Arc::new(sock); + let mut buffer = vec![0u8; 64 << 10]; + let mut tasks = JoinSet::new(); + loop { + tokio::select! { + biased; + _ = tasks.join_next(), if !tasks.is_empty() => (), + res = sock.recv_from(&mut buffer) => { + match res { + Ok((n, src_addr)) => { + inc!(StunMetrics, requests); + let pkt = &buffer[..n]; + if !stun::is(pkt) { + debug!(%src_addr, "STUN: ignoring non stun packet"); + inc!(StunMetrics, bad_requests); + continue; + } + let pkt = pkt.to_vec(); + tasks.spawn(handle_stun_request(src_addr, pkt, sock.clone())); + } + Err(err) => { + inc!(StunMetrics, failures); + warn!("failed to recv: {err:#}"); + } + } + } + } + } +} + +/// Handles a single STUN request, doing all logging required. +async fn handle_stun_request(src_addr: SocketAddr, pkt: Vec, sock: Arc) { + let handle = AbortingJoinHandle::from(tokio::task::spawn_blocking(move || { + match stun::parse_binding_request(&pkt) { + Ok(txid) => { + debug!(%src_addr, %txid, "STUN: received binding request"); + Some((txid, stun::response(txid, src_addr))) + } + Err(err) => { + inc!(StunMetrics, bad_requests); + warn!(%src_addr, "STUN: invalid binding request: {:?}", err); + None + } + } + })); + let (txid, response) = match handle.await { + Ok(Some(val)) => val, + Ok(None) => return, + Err(err) => { + error!("{err:#}"); + return; + } + }; + match sock.send_to(&response, src_addr).await { + Ok(len) => { + if len != response.len() { + warn!( + %src_addr, + %txid, + "failed to write response, {len}/{} bytes sent", + response.len() + ); + } else { + match src_addr { + SocketAddr::V4(_) => inc!(StunMetrics, ipv4_success), + SocketAddr::V6(_) => inc!(StunMetrics, ipv6_success), + } + } + trace!(%src_addr, %txid, "sent {len} bytes"); + } + Err(err) => { + inc!(StunMetrics, failures); + warn!(%src_addr, %txid, "failed to write response: {err:#}"); + } + } +} + +fn relay_disabled_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::NOT_FOUND) + .body(RELAY_DISABLED.into()) + .map_err(|err| Box::new(err) as HyperError) +} + +fn root_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::OK) + .header("Content-Type", "text/html; charset=utf-8") + .body(INDEX.into()) + .map_err(|err| Box::new(err) as HyperError) +} + +/// HTTP latency queries +fn probe_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::OK) + .header("Access-Control-Allow-Origin", "*") + .body(body_empty()) + .map_err(|err| Box::new(err) as HyperError) +} + +fn robots_handler( + _r: Request, + response: ResponseBuilder, +) -> HyperResult> { + response + .status(StatusCode::OK) + .body(ROBOTS_TXT.into()) + .map_err(|err| Box::new(err) as HyperError) +} + +/// For captive portal detection. +fn serve_no_content_handler( + r: Request, + mut response: ResponseBuilder, +) -> HyperResult> { + if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { + if !challenge.is_empty() + && challenge.len() < 64 + && challenge + .as_bytes() + .iter() + .all(|c| is_challenge_char(*c as char)) + { + response = response.header( + NO_CONTENT_RESPONSE_HEADER, + format!("response {}", challenge.to_str()?), + ); + } + } + + response + .status(StatusCode::NO_CONTENT) + .body(body_empty()) + .map_err(|err| Box::new(err) as HyperError) +} + +fn is_challenge_char(c: char) -> bool { + // Semi-randomly chosen as a limited set of valid characters + c.is_ascii_lowercase() + || c.is_ascii_uppercase() + || c.is_ascii_digit() + || c == '.' + || c == '-' + || c == '_' +} + +/// This is a future that never returns, drop it to cancel/abort. +async fn run_captive_portal_service(http_listener: TcpListener) -> Result<()> { + info!("serving"); + + // If this future is cancelled, this is dropped and all tasks are aborted. + let mut tasks = JoinSet::new(); + + loop { + match http_listener.accept().await { + Ok((stream, peer_addr)) => { + debug!(%peer_addr, "Connection opened",); + let handler = CaptivePortalService; + + tasks.spawn(async move { + let stream = relay::MaybeTlsStreamServer::Plain(stream); + let stream = hyper_util::rt::TokioIo::new(stream); + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection(stream, handler) + .with_upgrades() + .await + { + error!("Failed to serve connection: {err:?}"); + } + }); + } + Err(err) => { + error!( + "[CaptivePortalService] failed to accept connection: {:#?}", + err + ); + } + } + } +} + +#[derive(Clone)] +struct CaptivePortalService; + +impl hyper::service::Service> for CaptivePortalService { + type Response = Response; + type Error = HyperError; + type Future = Pin> + Send>>; + + fn call(&self, req: Request) -> Self::Future { + match (req.method(), req.uri().path()) { + // Captive Portal checker + (&Method::GET, "/generate_204") => { + Box::pin(async move { serve_no_content_handler(req, Response::builder()) }) + } + _ => { + // Return 404 not found response. + let r = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(NOTFOUND.into()) + .map_err(|err| Box::new(err) as HyperError); + Box::pin(async move { r }) + } + } + } +} + +mod metrics { + use iroh_metrics::{ + core::{Counter, Metric}, + struct_iterable::Iterable, + }; + + /// StunMetrics tracked for the DERPER + #[allow(missing_docs)] + #[derive(Debug, Clone, Iterable)] + pub struct StunMetrics { + /* + * Metrics about STUN requests over ipv6 + */ + /// Number of stun requests made + pub requests: Counter, + /// Number of successful requests over ipv4 + pub ipv4_success: Counter, + /// Number of successful requests over ipv6 + pub ipv6_success: Counter, + + /// Number of bad requests, either non-stun packets or incorrect binding request + pub bad_requests: Counter, + /// Number of failures + pub failures: Counter, + } + + impl Default for StunMetrics { + fn default() -> Self { + Self { + /* + * Metrics about STUN requests + */ + requests: Counter::new("Number of STUN requests made to the server."), + ipv4_success: Counter::new("Number of successful ipv4 STUN requests served."), + ipv6_success: Counter::new("Number of successful ipv6 STUN requests served."), + bad_requests: Counter::new("Number of bad requests made to the STUN endpoint."), + failures: Counter::new("Number of STUN requests that end in failure."), + } + } + } + + impl Metric for StunMetrics { + fn name() -> &'static str { + "stun" + } + } +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + use std::time::Duration; + + use bytes::Bytes; + use iroh_base::node_addr::RelayUrl; + + use crate::relay::http::ClientBuilder; + + use self::relay::ReceivedMessage; + + use super::*; + + #[tokio::test] + async fn test_no_services() { + let _guard = iroh_test::logging::setup(); + let mut server = Server::spawn(ServerConfig::<(), ()>::default()) + .await + .unwrap(); + let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle()) + .await + .expect("timeout, server not finished") + .expect("server task JoinError"); + assert!(res.is_err()); + } + + #[tokio::test] + async fn test_conflicting_bind() { + let _guard = iroh_test::logging::setup(); + let mut server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 1234).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: Some((Ipv4Addr::LOCALHOST, 1234).into()), + }) + .await + .unwrap(); + let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle()) + .await + .expect("timeout, server not finished") + .expect("server task JoinError"); + assert!(res.is_err()); // AddrInUse + } + + #[tokio::test] + async fn test_root_handler() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let url = format!("http://{}", server.http_addr().unwrap()); + + let response = reqwest::get(&url).await.unwrap(); + assert_eq!(response.status(), 200); + let body = response.text().await.unwrap(); + assert!(body.contains("iroh.computer")); + } + + #[tokio::test] + async fn test_captive_portal_service() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let url = format!("http://{}/generate_204", server.http_addr().unwrap()); + let challenge = "123az__."; + + let client = reqwest::Client::new(); + let response = client + .get(&url) + .header(NO_CONTENT_CHALLENGE_HEADER, challenge) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + let header = response.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); + assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); + let body = response.text().await.unwrap(); + assert!(body.is_empty()); + } + + #[tokio::test] + async fn test_relay_clients() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: Some(RelayConfig { + secret_key: SecretKey::generate(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: None, + limits: Default::default(), + }), + stun: None, + metrics_addr: None, + }) + .await + .unwrap(); + let relay_url = format!("http://{}", server.http_addr().unwrap()); + let relay_url: RelayUrl = relay_url.parse().unwrap(); + + // set up client a + let a_secret_key = SecretKey::generate(); + let a_key = a_secret_key.public(); + let resolver = crate::dns::default_resolver().clone(); + let (client_a, mut client_a_receiver) = + ClientBuilder::new(relay_url.clone()).build(a_secret_key, resolver); + let connect_client = client_a.clone(); + + // give the relay server some time to accept connections + if let Err(err) = tokio::time::timeout(Duration::from_secs(10), async move { + loop { + match connect_client.connect().await { + Ok(_) => break, + Err(err) => { + warn!("client unable to connect to relay server: {err:#}"); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + }) + .await + { + panic!("error connecting to relay server: {err:#}"); + } + + // set up client b + let b_secret_key = SecretKey::generate(); + let b_key = b_secret_key.public(); + let resolver = crate::dns::default_resolver().clone(); + let (client_b, mut client_b_receiver) = + ClientBuilder::new(relay_url.clone()).build(b_secret_key, resolver); + client_b.connect().await.unwrap(); + + // send message from a to b + let msg = Bytes::from("hello, b"); + client_a.send(b_key, msg.clone()).await.unwrap(); + + let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + if let ReceivedMessage::ReceivedPacket { source, data } = res { + assert_eq!(a_key, source); + assert_eq!(msg, data); + } else { + panic!("client_b received unexpected message {res:?}"); + } + + // send message from b to a + let msg = Bytes::from("howdy, a"); + client_b.send(a_key, msg.clone()).await.unwrap(); + + let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + if let ReceivedMessage::ReceivedPacket { source, data } = res { + assert_eq!(b_key, source); + assert_eq!(msg, data); + } else { + panic!("client_a received unexpected message {res:?}"); + } + } + + #[tokio::test] + async fn test_stun() { + let _guard = iroh_test::logging::setup(); + let server = Server::spawn(ServerConfig::<(), ()> { + relay: None, + stun: Some(StunConfig { + bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + }), + metrics_addr: None, + }) + .await + .unwrap(); + + let txid = stun::TransactionId::default(); + let req = stun::request(txid); + let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + socket + .send_to(&req, server.stun_addr().unwrap()) + .await + .unwrap(); + + // get response + let mut buf = vec![0u8; 64000]; + let (len, addr) = socket.recv_from(&mut buf).await.unwrap(); + assert_eq!(addr, server.stun_addr().unwrap()); + buf.truncate(len); + let (txid_back, response_addr) = stun::parse_response(&buf).unwrap(); + assert_eq!(txid, txid_back); + assert_eq!(response_addr, socket.local_addr().unwrap()); + } +} diff --git a/iroh-net/src/relay/map.rs b/iroh-net/src/relay/map.rs index ede590a7aec..721fd778a16 100644 --- a/iroh-net/src/relay/map.rs +++ b/iroh-net/src/relay/map.rs @@ -5,7 +5,7 @@ use std::{collections::BTreeMap, fmt, sync::Arc}; use anyhow::{ensure, Result}; use serde::{Deserialize, Serialize}; -use crate::defaults::DEFAULT_RELAY_STUN_PORT; +use crate::defaults::DEFAULT_STUN_PORT; use super::RelayUrl; @@ -91,7 +91,7 @@ impl RelayMap { /// This will use the default STUN port and IP addresses resolved from the URL's host name via DNS. /// relay nodes are specified at <../../../docs/relay_nodes.md> pub fn from_url(url: RelayUrl) -> Self { - Self::default_from_node(url, DEFAULT_RELAY_STUN_PORT) + Self::default_from_node(url, DEFAULT_STUN_PORT) } /// Constructs the [`RelayMap] from an iterator of [`RelayNode`]s. diff --git a/iroh-net/src/relay/server.rs b/iroh-net/src/relay/server.rs index 38493c46017..05dbc60ad7a 100644 --- a/iroh-net/src/relay/server.rs +++ b/iroh-net/src/relay/server.rs @@ -113,6 +113,13 @@ impl Server { } } + /// Aborts the server. + /// + /// You should prefer to use [`Server::close`] for a graceful shutdown. + pub fn abort(&self) { + self.cancel.cancel(); + } + /// Whether or not the relay [Server] is closed. pub fn is_closed(&self) -> bool { self.closed diff --git a/iroh-net/src/stun.rs b/iroh-net/src/stun.rs index b9ff7e6ddee..e0ed9367826 100644 --- a/iroh-net/src/stun.rs +++ b/iroh-net/src/stun.rs @@ -72,8 +72,8 @@ const COOKIE: [u8; 4] = 0x2112_A442u32.to_be_bytes(); /// Reports whether b is a STUN message. pub fn is(b: &[u8]) -> bool { b.len() >= stun_rs::MESSAGE_HEADER_SIZE && - b[0]&0b11000000 == 0 && // top two bits must be zero - b[4..8] == COOKIE + b[0]&0b11000000 == 0 && // top two bits must be zero + b[4..8] == COOKIE } /// Parses a STUN binding request. @@ -149,9 +149,10 @@ pub fn parse_response(b: &[u8]) -> Result<(TransactionId, SocketAddr), Error> { Err(Error::MalformedAttrs) } -#[cfg(any(test, feature = "test-utils"))] -pub(crate) mod test { - use std::{net::IpAddr, sync::Arc}; +#[cfg(test)] +pub(crate) mod tests { + use std::net::{IpAddr, Ipv4Addr}; + use std::sync::Arc; use anyhow::Result; use tokio::{ @@ -160,30 +161,28 @@ pub(crate) mod test { }; use tracing::{debug, trace}; - #[cfg(test)] use crate::relay::{RelayMap, RelayNode, RelayUrl}; use crate::test_utils::CleanupDropGuard; use super::*; + // TODO: make all this private + // (read_ipv4, read_ipv5) #[derive(Debug, Default, Clone)] pub struct StunStats(Arc>); impl StunStats { - #[cfg(test)] pub async fn total(&self) -> usize { let s = self.0.lock().await; s.0 + s.1 } } - #[cfg(test)] pub fn relay_map_of(stun: impl Iterator) -> RelayMap { relay_map_of_opts(stun.map(|addr| (addr, true))) } - #[cfg(test)] pub fn relay_map_of_opts(stun: impl Iterator) -> RelayMap { let nodes = stun.map(|(addr, stun_only)| { let host = addr.ip(); @@ -202,7 +201,6 @@ pub(crate) mod test { /// Sets up a simple STUN server binding to `0.0.0.0:0`. /// /// See [`serve`] for more details. - #[cfg(test)] pub(crate) async fn serve_v4() -> Result<(SocketAddr, StunStats, CleanupDropGuard)> { serve(std::net::Ipv4Addr::UNSPECIFIED.into()).await } @@ -272,13 +270,6 @@ pub(crate) mod test { } } } -} - -#[cfg(test)] -mod tests { - use std::net::{IpAddr, Ipv4Addr}; - - use super::*; // Test to check if an existing stun server works // #[tokio::test] diff --git a/iroh-net/src/test_utils.rs b/iroh-net/src/test_utils.rs index 0cbf8bd857e..3188a7e1284 100644 --- a/iroh-net/src/test_utils.rs +++ b/iroh-net/src/test_utils.rs @@ -2,7 +2,6 @@ use anyhow::Result; use tokio::sync::oneshot; -use tracing::{error_span, info_span, Instrument}; use crate::{ key::SecretKey, @@ -24,48 +23,51 @@ pub struct CleanupDropGuard(pub(crate) oneshot::Sender<()>); /// Runs a relay server with STUN enabled suitable for tests. /// -/// The returned `Url` is the url of the relay server in the returned [`RelayMap`], it -/// is always `Some` as that is how the [`Endpoint::connect`] API expects it. +/// The returned `Url` is the url of the relay server in the returned [`RelayMap`]. +/// When dropped, the returned [`Server`] does will stop running. /// -/// [`Endpoint::connect`]: crate::endpoint::Endpoint -pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, CleanupDropGuard)> { - let server_key = SecretKey::generate(); - let me = server_key.public().fmt_short(); - let tls_config = crate::relay::http::make_tls_config(); - let server = crate::relay::http::ServerBuilder::new("127.0.0.1:0".parse().unwrap()) - .secret_key(Some(server_key)) - .tls_config(Some(tls_config)) - .spawn() - .instrument(error_span!("relay server", %me)) - .await?; - - let https_addr = server.addr(); - println!("relay listening on {:?}", https_addr); - - let (stun_addr, _, stun_drop_guard) = crate::stun::test::serve(server.addr().ip()).await?; - let url: RelayUrl = format!("https://localhost:{}", https_addr.port()) +/// [`Server`]: crate::relay::iroh_relay::Server +pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, crate::relay::iroh_relay::Server)> { + use crate::relay::iroh_relay::{CertConfig, RelayConfig, ServerConfig, StunConfig, TlsConfig}; + use std::net::Ipv4Addr; + + let secret_key = SecretKey::generate(); + let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap(); + let rustls_cert = rustls::Certificate(cert.serialize_der().unwrap()); + let private_key = rustls::PrivateKey(cert.get_key_pair().serialize_der()); + + let config = ServerConfig { + relay: Some(RelayConfig { + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + secret_key, + tls: Some(TlsConfig { + cert: CertConfig::<(), ()>::Manual { + private_key, + certs: vec![rustls_cert], + }, + https_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + }), + limits: Default::default(), + }), + stun: Some(StunConfig { + bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + }), + #[cfg(feature = "metrics")] + metrics_addr: None, + }; + let server = crate::relay::iroh_relay::Server::spawn(config) + .await + .unwrap(); + let url: RelayUrl = format!("https://localhost:{}", server.https_addr().unwrap().port()) .parse() .unwrap(); let m = RelayMap::from_nodes([RelayNode { url: url.clone(), stun_only: false, - stun_port: stun_addr.port(), + stun_port: server.stun_addr().unwrap().port(), }]) - .expect("hardcoded"); - - let (tx, rx) = oneshot::channel(); - tokio::spawn( - async move { - let _stun_cleanup = stun_drop_guard; // move into this closure - - // Wait until we're dropped or receive a message. - rx.await.ok(); - server.shutdown().await; - } - .instrument(info_span!("relay-stun-cleanup")), - ); - - Ok((m, url, CleanupDropGuard(tx))) + .unwrap(); + Ok((m, url, server)) } pub(crate) mod dns_and_pkarr_servers {