diff --git a/iroh-net/src/bin/iroh-relay.rs b/iroh-net/src/bin/iroh-relay.rs index 13a9a773b3..3a022783ee 100644 --- a/iroh-net/src/bin/iroh-relay.rs +++ b/iroh-net/src/bin/iroh-relay.rs @@ -1,9 +1,10 @@ -//! A simple relay server. +//! A simple relay server for iroh-net. //! -//! Based on /tailscale/cmd/derper +//! Based on /tailscale/cmd/derper. use std::{ borrow::Cow, + fmt, future::Future, net::{IpAddr, Ipv6Addr, SocketAddr}, path::{Path, PathBuf}, @@ -23,11 +24,14 @@ use iroh_net::key::SecretKey; use iroh_net::relay::http::{ ServerBuilder as RelayServerBuilder, TlsAcceptor, TlsConfig as RelayTlsConfig, }; -use iroh_net::relay::{self}; +use iroh_net::relay::{self, iroh_relay}; use iroh_net::stun; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use tokio::net::{TcpListener, UdpSocket}; +use tokio::{ + net::{TcpListener, UdpSocket}, + task::JoinHandle, +}; use tokio_rustls_acme::{caches::DirCache, AcmeConfig}; use tracing::{debug, debug_span, error, info, info_span, trace, warn, Instrument}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -38,12 +42,24 @@ type BytesBody = http_body_util::Full; type HyperError = Box; type HyperResult = std::result::Result; +/// The default port for `http_bind_addr`. +const DEFAULT_HTTP_PORT: u16 = 80; + +/// The default port for `https_bind_addr`. +const DEFAULT_HTTPS_PORT: u16 = 443; + +/// The default port for the metrics server. +const DEFAULT_METRICS_PORT: u16 = 9090; + +/// The default `http_bind_port` when using `--dev`. +const DEV_MODE_HTTP_PORT: u16 = 3340; + /// Creates a new [`BytesBody`] with no content. fn body_empty() -> BytesBody { http_body_util::Full::new(hyper::body::Bytes::new()) } -/// A simple relay server. +/// A relay server for iroh-net. #[derive(Parser, Debug, Clone)] #[clap(version, about, long_about = None)] struct Cli { @@ -167,72 +183,218 @@ fn load_secret_key(filename: impl AsRef) -> Result { ); } +/// Configuration for the relay-server. +/// +/// This is (de)serialised to/from a TOML config file. #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct Config { - /// [`SecretKey`] for this relay server. + /// The iroh [`SecretKey`] for this relay server. + /// + /// If not specified a new key will be generated and the config file will be re-written + /// using it. #[serde_as(as = "DisplayFromStr")] #[serde(default = "SecretKey::generate")] secret_key: SecretKey, - /// Server listen address. + /// Whether to enable the Relay server. + /// + /// Defaults to `true`. + /// + /// Disabling will leave only the STUN server. The `http_bind_addr` and `tls` + /// configuration options will be ignored. + #[serde(default = "cfg_defaults::enable_relay")] + enable_relay: bool, + /// The socket address to bind the Relay HTTP server on. /// - /// Defaults to `[::]:443`. + /// Defaults to `[::]:80`. /// - /// If the port address is 443, the relay server will issue a warning if it is started - /// without a `tls` config. - addr: SocketAddr, - - /// The UDP port on which to serve STUN. The listener is bound to the same IP (if any) as - /// specified in the `addr` field. Defaults to [`DEFAULT_RELAY_STUN_PORT`]. - stun_port: u16, - /// Certificate hostname. Defaults to [`NA_RELAY_HOSTNAME`]. - hostname: String, + /// When running with `--dev` defaults to [::]:3340`. If specified overrides these + /// defaults. + /// + /// The Relay server always starts an HTTP server, this specifies the socket this will + /// be bound on. If there is no `tls` configuration set all the HTTP relay services + /// will be bound on this socket. Otherwise most Relay HTTP services will run on the + /// `https_bind_addr` of the `tls` configuration section and only the captive portal + /// will be served from the HTTP socket. + http_bind_addr: Option, + /// TLS specific configuration. + /// + /// TLS is disabled if not present and the Relay server will serve all services over + /// plain HTTP. + /// + /// If disabled all services will run on plain HTTP. The `--dev` option disables this, + /// regardless of what is in the configuration file. + tls: Option, /// Whether to run a STUN server. It will bind to the same IP as the `addr` field. /// /// Defaults to `true`. + #[serde(default = "cfg_defaults::enable_stun")] enable_stun: bool, - /// Whether to run a relay server. The only reason to set this false is if you're decommissioning a - /// server but want to keep its bootstrap DNS functionality still running. + /// The socket address to bind the STUN server on. /// - /// Defaults to `true` - enable_relay: bool, - /// TLS specific configuration - tls: Option, - /// Rate limiting configuration + /// Defaults to using the `http_bind_addr` with the port set to + /// [`DEFAULT_RELAY_STUN_PORT`]. + stun_bind_addr: Option, + /// Rate limiting configuration. + /// + /// Disabled if not present. limits: Option, - #[cfg(feature = "metrics")] - /// Metrics serve address. If not set, metrics are not served. - metrics_addr: Option, + /// Whether to run the metrics server. + /// + /// Defaults to `true`, when the metrics feature is enabled. + #[serde(default = "cfg_defaults::enable_metrics")] + enable_metrics: bool, + /// Metrics serve address. + /// + /// Defaults to `http_bind_addr` with the port set to [`DEFAULT_METRICS_PORT`] + /// (`[::]:9090` when `http_bind_addr` is set to the default). + metrics_bind_addr: Option, +} + +impl Config { + fn http_bind_addr(&self) -> SocketAddr { + self.http_bind_addr + .unwrap_or((Ipv6Addr::UNSPECIFIED, DEFAULT_HTTP_PORT).into()) + } + + fn stun_bind_addr(&self) -> SocketAddr { + self.stun_bind_addr + .unwrap_or_else(|| SocketAddr::new(self.http_bind_addr().ip(), DEFAULT_RELAY_STUN_PORT)) + } + + fn metrics_bind_addr(&self) -> SocketAddr { + self.metrics_bind_addr + .unwrap_or_else(|| SocketAddr::new(self.http_bind_addr().ip(), DEFAULT_METRICS_PORT)) + } +} + +impl Default for Config { + fn default() -> Self { + Self { + secret_key: SecretKey::generate(), + enable_relay: true, + http_bind_addr: None, + tls: None, + enable_stun: true, + stun_bind_addr: None, + limits: None, + enable_metrics: true, + metrics_bind_addr: None, + } + } +} + +/// Defaults for fields from [`Config`]. +/// +/// These are the defaults that serde will fill in. Other defaults depends on each other +/// and can not immediately be substituded by serde. +mod cfg_defaults { + use super::*; + + pub(crate) fn enable_relay() -> bool { + true + } + + pub(crate) fn enable_stun() -> bool { + true + } + + pub(crate) fn enable_metrics() -> bool { + true + } + + pub(crate) mod tls_config { + use super::*; + + pub(crate) fn hostname() -> String { + NA_RELAY_HOSTNAME.to_string() + } + + pub(crate) fn prod_tls() -> bool { + true + } + } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct TlsConfig { - /// Mode for getting a cert. possible options: 'Manual', 'LetsEncrypt' - /// When using manual mode, a certificate will be read from `.crt` and a secret key from - /// `.key`, with the `` being the escaped hostname. + /// The socket address to bind the Relay HTTPS server on. + /// + /// Defaults to the `http_bind_addr` with the port set to `443`. + https_bind_addr: Option, + /// Certificate hostname. + /// + /// Defaults to [`NA_RELAY_HOSTNAME`]. + #[serde(default = "cfg_defaults::tls_config::hostname")] + hostname: String, + /// Mode for getting a cert. + /// + /// Possible options: 'Manual', 'LetsEncrypt'. cert_mode: CertMode, + /// Directory to store LetsEncrypt certs or read manual certificates from. + /// + /// Defaults to the servers' current working directory. + cert_dir: Option, + /// Path of where to read the certificate from for the `Manual` `cert_mode`. + /// + /// Defaults to `/.crt`, with `` being the escaped + /// hostname. + /// + /// Only used when `cert_mode` is `Manual`. + manual_cert_path: Option, + /// Path of where to read the private key from for the `Manual` `cert_mode`. + /// + /// Defaults to `/.key` with `` being the escaped + /// hostname. + /// + /// Only used when `cert_mode` is `Manual`. + manual_key_path: Option, /// Whether to use the LetsEncrypt production or staging server. /// - /// While in development, LetsEncrypt prefers you to use the staging server. However, the staging server seems to - /// only use `ECDSA` keys. In their current set up, you can only get intermediate certificates - /// for `ECDSA` keys if you are on their "allowlist". The production server uses `RSA` keys, - /// which allow for issuing intermediate certificates in all normal circumstances. - /// So, to have valid certificates, we must use the LetsEncrypt production server. - /// Read more here: - /// Default is true. This field is ignored if we are not using `cert_mode: CertMode::LetsEncrypt`. + /// Default is `true`. + /// + /// Only used when `cert_mode` is `LetsEncrypt`. + /// + /// While in development, LetsEncrypt prefers you to use the staging server. However, + /// the staging server seems to only use `ECDSA` keys. In their current set up, you can + /// only get intermediate certificates for `ECDSA` keys if you are on their + /// "allowlist". The production server uses `RSA` keys, which allow for issuing + /// intermediate certificates in all normal circumstances. So, to have valid + /// certificates, we must use the LetsEncrypt production server. Read more here: + /// . + #[serde(default = "cfg_defaults::tls_config::prod_tls")] prod_tls: bool, /// The contact email for the tls certificate. - contact: String, - /// Directory to store LetsEncrypt certs or read certificates from, if TLS is used. - cert_dir: Option, - /// The port on which to serve a response for the captive portal probe over HTTP. /// - /// The listener is bound to the same IP as specified in the `addr` field. Defaults to 80. - /// This field is only read in we are serving the relay server over HTTPS. In that case, we must listen for requests for the `/generate_204` over a non-TLS connection. - captive_portal_port: Option, + /// Used when `cert_mode` is `LetsEncrypt`. + contact: String, +} + +impl TlsConfig { + fn https_bind_addr(&self, cfg: &Config) -> SocketAddr { + self.https_bind_addr + .unwrap_or_else(|| SocketAddr::new(cfg.http_bind_addr().ip(), DEFAULT_HTTPS_PORT)) + } + + fn cert_dir(&self) -> PathBuf { + self.cert_dir + .as_ref() + .map(|d| d.clone()) + .unwrap_or_else(|| PathBuf::from(".")) + } + + fn cert_path(&self) -> PathBuf { + let name = escape_hostname(&self.hostname); + self.cert_dir().join(format!("{name}.crt")) + } + + fn key_path(&self) -> PathBuf { + let name = escape_hostname(&self.hostname); + self.cert_dir().join(format!("{name}.key")) + } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] struct Limits { /// Rate limit for accepting new connection. Unlimited if not set. accept_conn_limit: Option, @@ -240,23 +402,6 @@ struct Limits { accept_conn_burst: Option, } -impl Default for Config { - fn default() -> Self { - Self { - secret_key: SecretKey::generate(), - addr: (Ipv6Addr::UNSPECIFIED, 443).into(), - stun_port: DEFAULT_RELAY_STUN_PORT, - hostname: NA_RELAY_HOSTNAME.into(), - enable_stun: true, - enable_relay: true, - tls: None, - limits: None, - #[cfg(feature = "metrics")] - metrics_addr: None, - } - } -} - impl Config { async fn load(opts: &Cli) -> Result { let config_path = if let Some(config_path) = &opts.config_path { @@ -277,12 +422,12 @@ impl Config { async fn read_from_file(path: impl AsRef) -> Result { if !path.as_ref().is_file() { - bail!("config-path must be a valid toml file"); + bail!("config-path must be a valid file"); } let config_ser = tokio::fs::read_to_string(&path) .await .context("unable to read config")?; - let config: Self = toml::from_str(&config_ser).context("unable to decode config")?; + let config: Self = toml::from_str(&config_ser).context("config file must be valid toml")?; if !config_ser.contains("secret_key") { info!("generating new secret key and updating config file"); config.write_to_file(path).await?; @@ -310,36 +455,6 @@ impl Config { } } -#[cfg(feature = "metrics")] -pub fn init_metrics_collection( - metrics_addr: Option, -) -> Option> { - use iroh_metrics::core::Metric; - - let rt = tokio::runtime::Handle::current(); - - // doesn't start the server if the address is None - if let Some(metrics_addr) = metrics_addr { - iroh_metrics::core::Core::init(|reg, metrics| { - metrics.insert(iroh_net::metrics::RelayMetrics::new(reg)); - metrics.insert(StunMetrics::new(reg)); - }); - - return Some(rt.spawn(async move { - if let Err(e) = iroh_metrics::metrics::start_metrics_server(metrics_addr).await { - eprintln!("Failed to start metrics server: {e}"); - } - })); - } - tracing::info!("Metrics server not started, no address provided"); - None -} - -/// Only used when in `dev` mode & the given port is `443` -const DEV_PORT: u16 = 3340; -/// Only used when tls is enabled & a captive protal port is not given -const DEFAULT_CAPTIVE_PORTAL_PORT: u16 = 80; - #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::registry() @@ -348,470 +463,542 @@ async fn main() -> Result<()> { .init(); let cli = Cli::parse(); - let cfg = Config::load(&cli).await?; - - #[cfg(feature = "metrics")] - let metrics_fut = init_metrics_collection(cfg.metrics_addr); - - let r = run(cli.dev, cfg, None).await; - - #[cfg(feature = "metrics")] - if let Some(metrics_fut) = metrics_fut { - metrics_fut.abort(); - drop(metrics_fut); - } - r -} - -async fn run( - dev_mode: bool, - cfg: Config, - addr_sender: Option>, -) -> Result<()> { - let (addr, tls_config) = if dev_mode { - let port = if cfg.addr.port() != 443 { - cfg.addr.port() - } else { - DEV_PORT - }; - - let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port); - info!(%addr, "Running in dev mode."); - (addr, None) - } else { - (cfg.addr, cfg.tls) - }; - - if let Some(tls_config) = &tls_config { - if let Some(captive_portal_port) = tls_config.captive_portal_port { - if addr.port() == captive_portal_port { - bail!("The main listening address {addr:?} and the `captive_portal_port` have the same port number."); - } - } - } else if addr.port() == 443 { - // no tls config, but the port is 443 - warn!("The address port is 443, which is typically the expected tls port, but you have not supplied any tls configuration.\nIf you meant to run the relay server with tls enabled, adjust the config file to include tls configuration."); - } - - // set up relay configuration details - let secret_key = if cfg.enable_relay { - Some(cfg.secret_key) - } else { - None - }; - - // run stun - let stun_task = if cfg.enable_stun { - Some(tokio::task::spawn(async move { - serve_stun(addr.ip(), cfg.stun_port).await - })) - } else { - None - }; - - // set up tls configuration details - let (tls_config, headers, captive_portal_port) = if let Some(tls_config) = tls_config { - let contact = tls_config.contact; - let is_production = tls_config.prod_tls; - let (config, acceptor) = tls_config - .cert_mode - .gen_server_config( - cfg.hostname.clone(), - contact, - is_production, - tls_config.cert_dir.unwrap_or_else(|| PathBuf::from(".")), - ) - .await?; - let mut headers = HeaderMap::new(); - for (name, value) in TLS_HEADERS.iter() { - headers.insert(*name, value.parse()?); + let mut cfg = Config::load(&cli).await?; + if cli.dev { + cfg.tls = None; + if cfg.http_bind_addr.is_none() { + cfg.http_bind_addr = Some((Ipv6Addr::UNSPECIFIED, DEV_MODE_HTTP_PORT).into()); } - ( - Some(RelayTlsConfig { config, acceptor }), - headers, - tls_config - .captive_portal_port - .unwrap_or(DEFAULT_CAPTIVE_PORTAL_PORT), - ) - } else { - (None, HeaderMap::new(), 0) - }; - - let mut builder = RelayServerBuilder::new(addr) - .secret_key(secret_key.map(Into::into)) - .headers(headers) - .tls_config(tls_config.clone()) - .relay_override(Box::new(relay_disabled_handler)) - .request_handler(Method::GET, "/", Box::new(root_handler)) - .request_handler(Method::GET, "/index.html", Box::new(root_handler)) - .request_handler(Method::GET, "/derp/probe", Box::new(probe_handler)) - .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); - // if tls is enabled, we need to serve this endpoint from a non-tls connection - // which we check for below - if tls_config.is_none() { - builder = builder.request_handler( - Method::GET, - "/generate_204", - Box::new(serve_no_content_handler), - ); } - let relay_server = builder.spawn().await?; - - // captive portal detections must be served over HTTP - let captive_portal_task = if tls_config.is_some() { - let http_addr = SocketAddr::new(addr.ip(), captive_portal_port); - let task = serve_captive_portal_service(http_addr).await?; - Some(task) - } else { - None - }; + let relay_config = build_relay_config(cfg).await?; + debug!("{relay_config:#?}"); - if let Some(addr_sender) = addr_sender { - if let Err(e) = addr_sender.send(relay_server.addr()) { - bail!("Unable to send the local SocketAddr, the Sender was dropped - {e:?}"); - } - } + let mut relay = iroh_relay::Server::spawn(relay_config).await?; - tokio::signal::ctrl_c().await?; - // Shutdown all tasks - if let Some(task) = stun_task { - task.abort(); + tokio::select! { + biased; + _ = tokio::signal::ctrl_c() => (), + _ = relay.task_handle() => (), } - if let Some(task) = captive_portal_task { - task.abort() - } - relay_server.shutdown().await; - Ok(()) + relay.shutdown().await } -const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge"; -const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response"; - -const NOTFOUND: &[u8] = b"Not Found"; -const RELAY_DISABLED: &[u8] = b"relay server disabled"; -const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; -const INDEX: &[u8] = br#" -

RELAY

-

- This is an - Iroh Relay - server. -

-"#; - -const TLS_HEADERS: [(&str, &str); 2] = [ - ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"), - ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'") -]; - -async fn serve_captive_portal_service(addr: SocketAddr) -> Result> { - let http_listener = TcpListener::bind(&addr) - .await - .context("failed to bind http")?; - let http_addr = http_listener.local_addr()?; - info!("[CaptivePortalService]: serving on {}", http_addr); - - let task = tokio::spawn( - async move { - loop { - match http_listener.accept().await { - Ok((stream, peer_addr)) => { - debug!( - "[CaptivePortalService] Connection opened from {}", - peer_addr - ); - let handler = CaptivePortalService; - - tokio::task::spawn(async move { - let stream = relay::MaybeTlsStreamServer::Plain(stream); - let stream = hyper_util::rt::TokioIo::new(stream); - if let Err(err) = hyper::server::conn::http1::Builder::new() - .serve_connection(stream, handler) - .with_upgrades() - .await - { - error!( - "[CaptivePortalService] Failed to serve connection: {:?}", - err - ); - } - }); - } - Err(err) => { - error!( - "[CaptivePortalService] failed to accept connection: {:#?}", - err - ); - } +/// Convert the TOML-loaded config to the [`iroh_relay::RelayConfig`] format. +async fn build_relay_config(cfg: Config) -> Result> { + let tls = match cfg.tls { + Some(ref tls) => { + let cert_config = match tls.cert_mode { + CertMode::Manual => { + let cert_path = tls.cert_path(); + let key_path = tls.key_path(); + // Could probably just do this blocking, we're only starting up. + let (private_key, certs) = tokio::task::spawn_blocking(move || { + let key = load_secret_key(key_path)?; + let certs = load_certs(cert_path)?; + anyhow::Ok((key, certs)) + }) + .await??; + iroh_relay::CertConfig::Manual { private_key, certs } } - } - } - .instrument(info_span!("captive-portal.service")), - ); - Ok(task) -} - -#[derive(Clone)] -struct CaptivePortalService; - -impl hyper::service::Service> for CaptivePortalService { - type Response = Response; - type Error = HyperError; - type Future = Pin> + Send>>; - - fn call(&self, req: Request) -> Self::Future { - match (req.method(), req.uri().path()) { - // Captive Portal checker - (&Method::GET, "/generate_204") => { - Box::pin(async move { serve_no_content_handler(req, Response::builder()) }) - } - _ => { - // Return 404 not found response. - let r = Response::builder() - .status(StatusCode::NOT_FOUND) - .body(NOTFOUND.into()) - .map_err(|err| Box::new(err) as HyperError); - Box::pin(async move { r }) - } - } - } -} - -fn relay_disabled_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::NOT_FOUND) - .body(RELAY_DISABLED.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn root_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .header("Content-Type", "text/html; charset=utf-8") - .body(INDEX.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -/// HTTP latency queries -fn probe_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .header("Access-Control-Allow-Origin", "*") - .body(body_empty()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn robots_handler( - _r: Request, - response: ResponseBuilder, -) -> HyperResult> { - response - .status(StatusCode::OK) - .body(ROBOTS_TXT.into()) - .map_err(|err| Box::new(err) as HyperError) -} - -/// For captive portal detection. -fn serve_no_content_handler( - r: Request, - mut response: ResponseBuilder, -) -> HyperResult> { - if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { - if !challenge.is_empty() - && challenge.len() < 64 - && challenge - .as_bytes() - .iter() - .all(|c| is_challenge_char(*c as char)) - { - response = response.header( - NO_CONTENT_RESPONSE_HEADER, - format!("response {}", challenge.to_str()?), - ); + CertMode::LetsEncrypt => { + let config = AcmeConfig::new(vec![tls.hostname.clone()]) + .contact([format!("mailto:{}", tls.contact)]) + .cache_option(Some(DirCache::new(tls.cert_dir()))) + .directory_lets_encrypt(tls.prod_tls); + iroh_relay::CertConfig::LetsEncrypt { config } + } + }; + Some(iroh_relay::TlsConfig { + https_bind_addr: tls.https_bind_addr(&cfg), + hostname: tls.hostname.clone(), + cert: cert_config, + }) } - } - - response - .status(StatusCode::NO_CONTENT) - .body(body_empty()) - .map_err(|err| Box::new(err) as HyperError) -} - -fn is_challenge_char(c: char) -> bool { - // Semi-randomly chosen as a limited set of valid characters - c.is_ascii_lowercase() - || c.is_ascii_uppercase() - || c.is_ascii_digit() - || c == '.' - || c == '-' - || c == '_' + None => None, + }; + let limits = iroh_relay::Limits { + accept_conn_limit: cfg + .limits + .as_ref() + .map(|l| l.accept_conn_limit) + .unwrap_or_default(), + accept_conn_burst: cfg + .limits + .as_ref() + .map(|l| l.accept_conn_burst) + .unwrap_or_default(), + }; + let relay_config = iroh_relay::RelayConfig { + secret_key: cfg.secret_key.clone(), + http_bind_addr: cfg.http_bind_addr(), + tls, + limits, + }; + let stun_config = iroh_relay::StunConfig { + bind_addr: cfg.stun_bind_addr(), + }; + Ok(iroh_relay::ServerConfig { + relay: Some(relay_config), + stun: Some(stun_config), + #[cfg(feature = "metrics")] + metrics_addr: if cfg.enable_metrics { + Some(cfg.metrics_bind_addr()) + } else { + None + }, + }) } -async fn serve_stun(host: IpAddr, port: u16) { - match UdpSocket::bind((host, port)).await { - Ok(sock) => { - let addr = sock.local_addr().expect("socket just bound"); - info!(%addr, "running STUN server"); - server_stun_listener(sock) - .instrument(debug_span!("stun_server", %addr)) - .await; - } - Err(err) => { - error!( - "failed to open STUN listener at host {host} and port {port}: {:#?}", - err - ); - } - } -} +// async fn run( +// dev_mode: bool, +// cfg: Config, +// addr_sender: Option>, +// ) -> Result<()> { +// let (addr, tls_config) = if dev_mode { +// let port = if cfg.addr.port() != 443 { +// cfg.addr.port() +// } else { +// DEV_PORT +// }; + +// let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port); +// info!(%addr, "Running in dev mode."); +// (addr, None) +// } else { +// (cfg.addr, cfg.tls) +// }; + +// if let Some(tls_config) = &tls_config { +// if let Some(captive_portal_port) = tls_config.captive_portal_port { +// if addr.port() == captive_portal_port { +// bail!("The main listening address {addr:?} and the `captive_portal_port` have the same port number."); +// } +// } +// } else if addr.port() == 443 { +// // no tls config, but the port is 443 +// warn!("The address port is 443, which is typically the expected tls port, but you have not supplied any tls configuration.\nIf you meant to run the relay server with tls enabled, adjust the config file to include tls configuration."); +// } + +// // set up relay configuration details +// let secret_key = if cfg.enable_relay { +// Some(cfg.secret_key) +// } else { +// None +// }; + +// // run stun +// let stun_task = if cfg.enable_stun { +// Some(tokio::task::spawn(async move { +// serve_stun(addr.ip(), cfg.stun_port).await +// })) +// } else { +// None +// }; + +// // set up tls configuration details +// let (tls_config, headers, captive_portal_port) = if let Some(tls_config) = tls_config { +// let contact = tls_config.contact; +// let is_production = tls_config.prod_tls; +// let (config, acceptor) = tls_config +// .cert_mode +// .gen_server_config( +// cfg.hostname.clone(), +// contact, +// is_production, +// tls_config.cert_dir.unwrap_or_else(|| PathBuf::from(".")), +// ) +// .await?; +// let mut headers = HeaderMap::new(); +// for (name, value) in TLS_HEADERS.iter() { +// headers.insert(*name, value.parse()?); +// } +// ( +// Some(RelayTlsConfig { config, acceptor }), +// headers, +// tls_config +// .captive_portal_port +// .unwrap_or(DEFAULT_CAPTIVE_PORTAL_PORT), +// ) +// } else { +// (None, HeaderMap::new(), 0) +// }; + +// let mut builder = RelayServerBuilder::new(addr) +// .secret_key(secret_key.map(Into::into)) +// .headers(headers) +// .tls_config(tls_config.clone()) +// .relay_override(Box::new(relay_disabled_handler)) +// .request_handler(Method::GET, "/", Box::new(root_handler)) +// .request_handler(Method::GET, "/index.html", Box::new(root_handler)) +// .request_handler(Method::GET, "/derp/probe", Box::new(probe_handler)) +// .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); +// // if tls is enabled, we need to serve this endpoint from a non-tls connection +// // which we check for below +// if tls_config.is_none() { +// builder = builder.request_handler( +// Method::GET, +// "/generate_204", +// Box::new(serve_no_content_handler), +// ); +// } +// let relay_server = builder.spawn().await?; + +// // captive portal detections must be served over HTTP +// let captive_portal_task = if tls_config.is_some() { +// let http_addr = SocketAddr::new(addr.ip(), captive_portal_port); +// let task = serve_captive_portal_service(http_addr).await?; +// Some(task) +// } else { +// None +// }; + +// if let Some(addr_sender) = addr_sender { +// if let Err(e) = addr_sender.send(relay_server.addr()) { +// bail!("Unable to send the local SocketAddr, the Sender was dropped - {e:?}"); +// } +// } + +// tokio::signal::ctrl_c().await?; +// // Shutdown all tasks +// if let Some(task) = stun_task { +// task.abort(); +// } +// if let Some(task) = captive_portal_task { +// task.abort() +// } +// relay_server.shutdown().await; + +// Ok(()) +// } -async fn server_stun_listener(sock: UdpSocket) { - let sock = Arc::new(sock); - let mut buffer = vec![0u8; 64 << 10]; - loop { - match sock.recv_from(&mut buffer).await { - Ok((n, src_addr)) => { - inc!(StunMetrics, requests); - let pkt = buffer[..n].to_vec(); - let sock = sock.clone(); - tokio::task::spawn(async move { - if !stun::is(&pkt) { - debug!(%src_addr, "STUN: ignoring non stun packet"); - inc!(StunMetrics, bad_requests); - return; - } - match tokio::task::spawn_blocking(move || stun::parse_binding_request(&pkt)) - .await - { - Ok(Ok(txid)) => { - debug!(%src_addr, %txid, "STUN: received binding request"); - let res = match tokio::task::spawn_blocking(move || { - stun::response(txid, src_addr) - }) - .await - { - Ok(res) => res, - Err(err) => { - error!("JoinError: {err:#}"); - return; - } - }; - match sock.send_to(&res, src_addr).await { - Ok(len) => { - if len != res.len() { - warn!(%src_addr, %txid, "STUN: failed to write response sent: {}, but expected {}", len, res.len()); - } - match src_addr { - SocketAddr::V4(_) => { - inc!(StunMetrics, ipv4_success); - } - SocketAddr::V6(_) => { - inc!(StunMetrics, ipv6_success); - } - } - trace!(%src_addr, %txid, "STUN: sent {} bytes", len); - } - Err(err) => { - inc!(StunMetrics, failures); - warn!(%src_addr, %txid, "STUN: failed to write response: {:?}", err); - } - } - } - Ok(Err(err)) => { - inc!(StunMetrics, bad_requests); - warn!(%src_addr, "STUN: invalid binding request: {:?}", err); - } - Err(err) => error!("JoinError parsing STUN binding: {err:#}"), - } - }); - } - Err(err) => { - inc!(StunMetrics, failures); - warn!("STUN: failed to recv: {:?}", err); - } - } - } -} +// const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge"; +// const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response"; + +// const NOTFOUND: &[u8] = b"Not Found"; +// const RELAY_DISABLED: &[u8] = b"relay server disabled"; +// const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; +// const INDEX: &[u8] = br#" +//

RELAY

+//

+// This is an +// Iroh Relay +// server. +//

+// "#; + +// const TLS_HEADERS: [(&str, &str); 2] = [ +// ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"), +// ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'") +// ]; + +// async fn serve_captive_portal_service(addr: SocketAddr) -> Result> { +// let http_listener = TcpListener::bind(&addr) +// .await +// .context("failed to bind http")?; +// let http_addr = http_listener.local_addr()?; +// info!("[CaptivePortalService]: serving on {}", http_addr); + +// let task = tokio::spawn( +// async move { +// loop { +// match http_listener.accept().await { +// Ok((stream, peer_addr)) => { +// debug!( +// "[CaptivePortalService] Connection opened from {}", +// peer_addr +// ); +// let handler = CaptivePortalService; + +// tokio::task::spawn(async move { +// let stream = relay::MaybeTlsStreamServer::Plain(stream); +// let stream = hyper_util::rt::TokioIo::new(stream); +// if let Err(err) = hyper::server::conn::http1::Builder::new() +// .serve_connection(stream, handler) +// .with_upgrades() +// .await +// { +// error!( +// "[CaptivePortalService] Failed to serve connection: {:?}", +// err +// ); +// } +// }); +// } +// Err(err) => { +// error!( +// "[CaptivePortalService] failed to accept connection: {:#?}", +// err +// ); +// } +// } +// } +// } +// .instrument(info_span!("captive-portal.service")), +// ); +// Ok(task) +// } -// var validProdHostname = regexp.MustCompile(`^relay([^.]*)\.tailscale\.com\.?$`) +// #[derive(Clone)] +// struct CaptivePortalService; + +// impl hyper::service::Service> for CaptivePortalService { +// type Response = Response; +// type Error = HyperError; +// type Future = Pin> + Send>>; + +// fn call(&self, req: Request) -> Self::Future { +// match (req.method(), req.uri().path()) { +// // Captive Portal checker +// (&Method::GET, "/generate_204") => { +// Box::pin(async move { serve_no_content_handler(req, Response::builder()) }) +// } +// _ => { +// // Return 404 not found response. +// let r = Response::builder() +// .status(StatusCode::NOT_FOUND) +// .body(NOTFOUND.into()) +// .map_err(|err| Box::new(err) as HyperError); +// Box::pin(async move { r }) +// } +// } +// } +// } -// func prodAutocertHostPolicy(_ context.Context, host string) error { -// if validProdHostname.MatchString(host) { -// return nil -// } -// return errors.New("invalid hostname") +// fn relay_disabled_handler( +// _r: Request, +// response: ResponseBuilder, +// ) -> HyperResult> { +// response +// .status(StatusCode::NOT_FOUND) +// .body(RELAY_DISABLED.into()) +// .map_err(|err| Box::new(err) as HyperError) // } -// func rateLimitedListenAndServeTLS(srv *http.Server) error { -// addr := srv.Addr -// if addr == "" { -// addr = ":https" -// } -// ln, err := net.Listen("tcp", addr) -// if err != nil { -// return err -// } -// rln := newRateLimitedListener(ln, rate.Limit(*acceptConnLimit), *acceptConnBurst) -// expvar.Publish("tls_listener", rln.ExpVar()) -// defer rln.Close() -// return srv.ServeTLS(rln, "", "") +// fn root_handler( +// _r: Request, +// response: ResponseBuilder, +// ) -> HyperResult> { +// response +// .status(StatusCode::OK) +// .header("Content-Type", "text/html; charset=utf-8") +// .body(INDEX.into()) +// .map_err(|err| Box::new(err) as HyperError) // } -// type rateLimitedListener struct { -// // These are at the start of the struct to ensure 64-bit alignment -// // on 32-bit architecture regardless of what other fields may exist -// // in this package. -// numAccepts expvar.Int // does not include number of rejects -// numRejects expvar.Int +// /// HTTP latency queries +// fn probe_handler( +// _r: Request, +// response: ResponseBuilder, +// ) -> HyperResult> { +// response +// .status(StatusCode::OK) +// .header("Access-Control-Allow-Origin", "*") +// .body(body_empty()) +// .map_err(|err| Box::new(err) as HyperError) +// } -// net.Listener +// fn robots_handler( +// _r: Request, +// response: ResponseBuilder, +// ) -> HyperResult> { +// response +// .status(StatusCode::OK) +// .body(ROBOTS_TXT.into()) +// .map_err(|err| Box::new(err) as HyperError) +// } -// lim *rate.Limiter +// /// For captive portal detection. +// fn serve_no_content_handler( +// r: Request, +// mut response: ResponseBuilder, +// ) -> HyperResult> { +// if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) { +// if !challenge.is_empty() +// && challenge.len() < 64 +// && challenge +// .as_bytes() +// .iter() +// .all(|c| is_challenge_char(*c as char)) +// { +// response = response.header( +// NO_CONTENT_RESPONSE_HEADER, +// format!("response {}", challenge.to_str()?), +// ); +// } +// } + +// response +// .status(StatusCode::NO_CONTENT) +// .body(body_empty()) +// .map_err(|err| Box::new(err) as HyperError) // } -// func newRateLimitedListener(ln net.Listener, limit rate.Limit, burst int) *rateLimitedListener { -// return &rateLimitedListener{Listener: ln, lim: rate.NewLimiter(limit, burst)} +// fn is_challenge_char(c: char) -> bool { +// // Semi-randomly chosen as a limited set of valid characters +// c.is_ascii_lowercase() +// || c.is_ascii_uppercase() +// || c.is_ascii_digit() +// || c == '.' +// || c == '-' +// || c == '_' // } -// func (l *rateLimitedListener) ExpVar() expvar.Var { -// m := new(metrics.Set) -// m.Set("counter_accepted_connections", &l.numAccepts) -// m.Set("counter_rejected_connections", &l.numRejects) -// return m +// async fn serve_stun(host: IpAddr, port: u16) { +// match UdpSocket::bind((host, port)).await { +// Ok(sock) => { +// let addr = sock.local_addr().expect("socket just bound"); +// info!(%addr, "running STUN server"); +// server_stun_listener(sock) +// .instrument(debug_span!("stun_server", %addr)) +// .await; +// } +// Err(err) => { +// error!( +// "failed to open STUN listener at host {host} and port {port}: {:#?}", +// err +// ); +// } +// } // } -// var errLimitedConn = errors.New("cannot accept connection; rate limited") - -// func (l *rateLimitedListener) Accept() (net.Conn, error) { -// // Even under a rate limited situation, we accept the connection immediately -// // and close it, rather than being slow at accepting new connections. -// // This provides two benefits: 1) it signals to the client that something -// // is going on on the server, and 2) it prevents new connections from -// // piling up and occupying resources in the OS kernel. -// // The client will retry as needing (with backoffs in place). -// cn, err := l.Listener.Accept() -// if err != nil { -// return nil, err -// } -// if !l.lim.Allow() { -// l.numRejects.Add(1) -// cn.Close() -// return nil, errLimitedConn -// } -// l.numAccepts.Add(1) -// return cn, nil +// async fn server_stun_listener(sock: UdpSocket) { +// let sock = Arc::new(sock); +// let mut buffer = vec![0u8; 64 << 10]; +// loop { +// match sock.recv_from(&mut buffer).await { +// Ok((n, src_addr)) => { +// inc!(StunMetrics, requests); +// let pkt = buffer[..n].to_vec(); +// let sock = sock.clone(); +// tokio::task::spawn(async move { +// if !stun::is(&pkt) { +// debug!(%src_addr, "STUN: ignoring non stun packet"); +// inc!(StunMetrics, bad_requests); +// return; +// } +// match tokio::task::spawn_blocking(move || stun::parse_binding_request(&pkt)) +// .await +// { +// Ok(Ok(txid)) => { +// debug!(%src_addr, %txid, "STUN: received binding request"); +// let res = match tokio::task::spawn_blocking(move || { +// stun::response(txid, src_addr) +// }) +// .await +// { +// Ok(res) => res, +// Err(err) => { +// error!("JoinError: {err:#}"); +// return; +// } +// }; +// match sock.send_to(&res, src_addr).await { +// Ok(len) => { +// if len != res.len() { +// warn!(%src_addr, %txid, "STUN: failed to write response sent: {}, but expected {}", len, res.len()); +// } +// match src_addr { +// SocketAddr::V4(_) => { +// inc!(StunMetrics, ipv4_success); +// } +// SocketAddr::V6(_) => { +// inc!(StunMetrics, ipv6_success); +// } +// } +// trace!(%src_addr, %txid, "STUN: sent {} bytes", len); +// } +// Err(err) => { +// inc!(StunMetrics, failures); +// warn!(%src_addr, %txid, "STUN: failed to write response: {:?}", err); +// } +// } +// } +// Ok(Err(err)) => { +// inc!(StunMetrics, bad_requests); +// warn!(%src_addr, "STUN: invalid binding request: {:?}", err); +// } +// Err(err) => error!("JoinError parsing STUN binding: {err:#}"), +// } +// }); +// } +// Err(err) => { +// inc!(StunMetrics, failures); +// warn!("STUN: failed to recv: {:?}", err); +// } +// } +// } // } -// + +// // var validProdHostname = regexp.MustCompile(`^relay([^.]*)\.tailscale\.com\.?$`) + +// // func prodAutocertHostPolicy(_ context.Context, host string) error { +// // if validProdHostname.MatchString(host) { +// // return nil +// // } +// // return errors.New("invalid hostname") +// // } + +// // func rateLimitedListenAndServeTLS(srv *http.Server) error { +// // addr := srv.Addr +// // if addr == "" { +// // addr = ":https" +// // } +// // ln, err := net.Listen("tcp", addr) +// // if err != nil { +// // return err +// // } +// // rln := newRateLimitedListener(ln, rate.Limit(*acceptConnLimit), *acceptConnBurst) +// // expvar.Publish("tls_listener", rln.ExpVar()) +// // defer rln.Close() +// // return srv.ServeTLS(rln, "", "") +// // } + +// // type rateLimitedListener struct { +// // // These are at the start of the struct to ensure 64-bit alignment +// // // on 32-bit architecture regardless of what other fields may exist +// // // in this package. +// // numAccepts expvar.Int // does not include number of rejects +// // numRejects expvar.Int + +// // net.Listener + +// // lim *rate.Limiter +// // } + +// // func newRateLimitedListener(ln net.Listener, limit rate.Limit, burst int) *rateLimitedListener { +// // return &rateLimitedListener{Listener: ln, lim: rate.NewLimiter(limit, burst)} +// // } + +// // func (l *rateLimitedListener) ExpVar() expvar.Var { +// // m := new(metrics.Set) +// // m.Set("counter_accepted_connections", &l.numAccepts) +// // m.Set("counter_rejected_connections", &l.numRejects) +// // return m +// // } + +// // var errLimitedConn = errors.New("cannot accept connection; rate limited") + +// // func (l *rateLimitedListener) Accept() (net.Conn, error) { +// // // Even under a rate limited situation, we accept the connection immediately +// // // and close it, rather than being slow at accepting new connections. +// // // This provides two benefits: 1) it signals to the client that something +// // // is going on on the server, and 2) it prevents new connections from +// // // piling up and occupying resources in the OS kernel. +// // // The client will retry as needing (with backoffs in place). +// // cn, err := l.Listener.Accept() +// // if err != nil { +// // return nil, err +// // } +// // if !l.lim.Allow() { +// // l.numRejects.Add(1) +// // cn.Close() +// // return nil, errLimitedConn +// // } +// // l.numAccepts.Add(1) +// // return cn, nil +// // } +// // mod metrics { use iroh_metrics::{ core::{Counter, Metric}, @@ -860,205 +1047,205 @@ mod metrics { } } -#[cfg(test)] -mod tests { - use super::*; - - use std::net::Ipv4Addr; - use std::time::Duration; - - use bytes::Bytes; - use http_body_util::BodyExt; - use iroh_base::node_addr::RelayUrl; - use iroh_net::relay::http::ClientBuilder; - use iroh_net::relay::ReceivedMessage; - use tokio::task::JoinHandle; - - #[tokio::test] - async fn test_serve_no_content_handler() { - let challenge = "123az__."; - let req = Request::builder() - .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .body(body_empty()) - .unwrap(); - - let res = serve_no_content_handler(req, Response::builder()).unwrap(); - assert_eq!(res.status(), StatusCode::NO_CONTENT); - - let header = res - .headers() - .get(NO_CONTENT_RESPONSE_HEADER) - .unwrap() - .to_str() - .unwrap(); - assert_eq!(header, format!("response {challenge}")); - assert!(res - .into_body() - .collect() - .await - .unwrap() - .to_bytes() - .is_empty()); - } - - #[test] - fn test_escape_hostname() { - assert_eq!( - escape_hostname("hello.host.name_foo-bar%baz"), - "hello.host.namefoo-barbaz" - ); - } - - struct DropServer { - server_task: JoinHandle<()>, - } - - impl Drop for DropServer { - fn drop(&mut self) { - self.server_task.abort(); - } - } - - #[tokio::test] - async fn test_relay_server_basic() -> Result<()> { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); - // Binding to LOCALHOST to satisfy issues when binding to UNSPECIFIED in Windows for tests - // Binding to Ipv4 because, when binding to `IPv6::UNSPECIFIED`, it will also listen for - // IPv4 connections, but will not automatically do the same for `LOCALHOST`. In order to - // test STUN, which only listens on Ipv4, we must bind the whole relay server to Ipv4::LOCALHOST. - let cfg = Config { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), - ..Default::default() - }; - let (addr_send, addr_recv) = tokio::sync::oneshot::channel(); - let relay_server_task = tokio::spawn( - async move { - // dev mode will bind to IPv6::UNSPECIFIED, so setting it `false` - let res = run(false, cfg, Some(addr_send)).await; - if let Err(e) = res { - eprintln!("error starting relay server {e}"); - } - } - .instrument(debug_span!("relay server")), - ); - let _drop_server = DropServer { - server_task: relay_server_task, - }; - - let relay_server_addr = addr_recv.await?; - let relay_server_str_url = format!("http://{}", relay_server_addr); - let relay_server_url: RelayUrl = relay_server_str_url.parse().unwrap(); - - // set up clients - let a_secret_key = SecretKey::generate(); - let a_key = a_secret_key.public(); - let resolver = iroh_net::dns::default_resolver().clone(); - let (client_a, mut client_a_receiver) = - ClientBuilder::new(relay_server_url.clone()).build(a_secret_key, resolver); - let connect_client = client_a.clone(); - - // give the relay server some time to set up - if let Err(e) = tokio::time::timeout(Duration::from_secs(10), async move { - loop { - match connect_client.connect().await { - Ok(_) => break, - Err(e) => { - tracing::warn!("client a unable to connect to relay server: {e:?}. Attempting to dial again in 10ms"); - tokio::time::sleep(Duration::from_millis(100)).await - } - } - } - }) - .await - { - bail!("error connecting client a to relay server: {e:?}"); - } - - let b_secret_key = SecretKey::generate(); - let b_key = b_secret_key.public(); - let resolver = iroh_net::dns::default_resolver().clone(); - let (client_b, mut client_b_receiver) = - ClientBuilder::new(relay_server_url.clone()).build(b_secret_key, resolver); - client_b.connect().await?; - - let msg = Bytes::from("hello, b"); - client_a.send(b_key, msg.clone()).await?; - - let (res, _) = client_b_receiver.recv().await.unwrap()?; - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(a_key, source); - assert_eq!(msg, data); - } else { - bail!("client_b received unexpected message {res:?}"); - } - - let msg = Bytes::from("howdy, a"); - client_b.send(a_key, msg.clone()).await?; - - let (res, _) = client_a_receiver.recv().await.unwrap()?; - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(b_key, source); - assert_eq!(msg, data); - } else { - bail!("client_a received unexpected message {res:?}"); - } - - // run stun check - let stun_addr: SocketAddr = - SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 3478); - - let txid = stun::TransactionId::default(); - let req = stun::request(txid); - let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap()); - - let server_socket = socket.clone(); - let server_task = tokio::task::spawn(async move { - let mut buf = vec![0u8; 64000]; - let len = server_socket.recv(&mut buf).await.unwrap(); - dbg!(len); - buf.truncate(len); - buf - }); - - tracing::info!("sending stun request to {stun_addr}"); - if let Err(e) = socket.send_to(&req, stun_addr).await { - bail!("socket.send_to error: {e:?}"); - } - - let response = server_task.await.unwrap(); - let (txid_back, response_addr) = stun::parse_response(&response).unwrap(); - assert_eq!(txid, txid_back); - tracing::info!("got {response_addr}"); - - // get 200 home page response - tracing::info!("send request for homepage"); - let res = reqwest::get(relay_server_str_url).await?; - assert!(res.status().is_success()); - tracing::info!("got OK"); - - // test captive portal - tracing::info!("test captive portal response"); - - let url = relay_server_url.join("/generate_204")?; - let challenge = "123az__."; - let client = reqwest::Client::new(); - let res = client - .get(url) - .header(NO_CONTENT_CHALLENGE_HEADER, challenge) - .send() - .await?; - assert_eq!(StatusCode::NO_CONTENT.as_u16(), res.status().as_u16()); - let header = res.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); - assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); - let body = res.bytes().await?; - assert!(body.is_empty()); - - tracing::info!("got successful captive portal response"); - - Ok(()) - } -} +// #[cfg(test)] +// mod tests { +// use super::*; + +// use std::net::Ipv4Addr; +// use std::time::Duration; + +// use bytes::Bytes; +// use http_body_util::BodyExt; +// use iroh_base::node_addr::RelayUrl; +// use iroh_net::relay::http::ClientBuilder; +// use iroh_net::relay::ReceivedMessage; +// use tokio::task::JoinHandle; + +// #[tokio::test] +// async fn test_serve_no_content_handler() { +// let challenge = "123az__."; +// let req = Request::builder() +// .header(NO_CONTENT_CHALLENGE_HEADER, challenge) +// .body(body_empty()) +// .unwrap(); + +// let res = serve_no_content_handler(req, Response::builder()).unwrap(); +// assert_eq!(res.status(), StatusCode::NO_CONTENT); + +// let header = res +// .headers() +// .get(NO_CONTENT_RESPONSE_HEADER) +// .unwrap() +// .to_str() +// .unwrap(); +// assert_eq!(header, format!("response {challenge}")); +// assert!(res +// .into_body() +// .collect() +// .await +// .unwrap() +// .to_bytes() +// .is_empty()); +// } + +// #[test] +// fn test_escape_hostname() { +// assert_eq!( +// escape_hostname("hello.host.name_foo-bar%baz"), +// "hello.host.namefoo-barbaz" +// ); +// } + +// struct DropServer { +// server_task: JoinHandle<()>, +// } + +// impl Drop for DropServer { +// fn drop(&mut self) { +// self.server_task.abort(); +// } +// } + +// #[tokio::test] +// async fn test_relay_server_basic() -> Result<()> { +// tracing_subscriber::registry() +// .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) +// .with(EnvFilter::from_default_env()) +// .try_init() +// .ok(); +// // Binding to LOCALHOST to satisfy issues when binding to UNSPECIFIED in Windows for tests +// // Binding to Ipv4 because, when binding to `IPv6::UNSPECIFIED`, it will also listen for +// // IPv4 connections, but will not automatically do the same for `LOCALHOST`. In order to +// // test STUN, which only listens on Ipv4, we must bind the whole relay server to Ipv4::LOCALHOST. +// let cfg = Config { +// addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), +// ..Default::default() +// }; +// let (addr_send, addr_recv) = tokio::sync::oneshot::channel(); +// let relay_server_task = tokio::spawn( +// async move { +// // dev mode will bind to IPv6::UNSPECIFIED, so setting it `false` +// let res = run(false, cfg, Some(addr_send)).await; +// if let Err(e) = res { +// eprintln!("error starting relay server {e}"); +// } +// } +// .instrument(debug_span!("relay server")), +// ); +// let _drop_server = DropServer { +// server_task: relay_server_task, +// }; + +// let relay_server_addr = addr_recv.await?; +// let relay_server_str_url = format!("http://{}", relay_server_addr); +// let relay_server_url: RelayUrl = relay_server_str_url.parse().unwrap(); + +// // set up clients +// let a_secret_key = SecretKey::generate(); +// let a_key = a_secret_key.public(); +// let resolver = iroh_net::dns::default_resolver().clone(); +// let (client_a, mut client_a_receiver) = +// ClientBuilder::new(relay_server_url.clone()).build(a_secret_key, resolver); +// let connect_client = client_a.clone(); + +// // give the relay server some time to set up +// if let Err(e) = tokio::time::timeout(Duration::from_secs(10), async move { +// loop { +// match connect_client.connect().await { +// Ok(_) => break, +// Err(e) => { +// tracing::warn!("client a unable to connect to relay server: {e:?}. Attempting to dial again in 10ms"); +// tokio::time::sleep(Duration::from_millis(100)).await +// } +// } +// } +// }) +// .await +// { +// bail!("error connecting client a to relay server: {e:?}"); +// } + +// let b_secret_key = SecretKey::generate(); +// let b_key = b_secret_key.public(); +// let resolver = iroh_net::dns::default_resolver().clone(); +// let (client_b, mut client_b_receiver) = +// ClientBuilder::new(relay_server_url.clone()).build(b_secret_key, resolver); +// client_b.connect().await?; + +// let msg = Bytes::from("hello, b"); +// client_a.send(b_key, msg.clone()).await?; + +// let (res, _) = client_b_receiver.recv().await.unwrap()?; +// if let ReceivedMessage::ReceivedPacket { source, data } = res { +// assert_eq!(a_key, source); +// assert_eq!(msg, data); +// } else { +// bail!("client_b received unexpected message {res:?}"); +// } + +// let msg = Bytes::from("howdy, a"); +// client_b.send(a_key, msg.clone()).await?; + +// let (res, _) = client_a_receiver.recv().await.unwrap()?; +// if let ReceivedMessage::ReceivedPacket { source, data } = res { +// assert_eq!(b_key, source); +// assert_eq!(msg, data); +// } else { +// bail!("client_a received unexpected message {res:?}"); +// } + +// // run stun check +// let stun_addr: SocketAddr = +// SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 3478); + +// let txid = stun::TransactionId::default(); +// let req = stun::request(txid); +// let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap()); + +// let server_socket = socket.clone(); +// let server_task = tokio::task::spawn(async move { +// let mut buf = vec![0u8; 64000]; +// let len = server_socket.recv(&mut buf).await.unwrap(); +// dbg!(len); +// buf.truncate(len); +// buf +// }); + +// tracing::info!("sending stun request to {stun_addr}"); +// if let Err(e) = socket.send_to(&req, stun_addr).await { +// bail!("socket.send_to error: {e:?}"); +// } + +// let response = server_task.await.unwrap(); +// let (txid_back, response_addr) = stun::parse_response(&response).unwrap(); +// assert_eq!(txid, txid_back); +// tracing::info!("got {response_addr}"); + +// // get 200 home page response +// tracing::info!("send request for homepage"); +// let res = reqwest::get(relay_server_str_url).await?; +// assert!(res.status().is_success()); +// tracing::info!("got OK"); + +// // test captive portal +// tracing::info!("test captive portal response"); + +// let url = relay_server_url.join("/generate_204")?; +// let challenge = "123az__."; +// let client = reqwest::Client::new(); +// let res = client +// .get(url) +// .header(NO_CONTENT_CHALLENGE_HEADER, challenge) +// .send() +// .await?; +// assert_eq!(StatusCode::NO_CONTENT.as_u16(), res.status().as_u16()); +// let header = res.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap(); +// assert_eq!(header.to_str().unwrap(), format!("response {challenge}")); +// let body = res.bytes().await?; +// assert!(body.is_empty()); + +// tracing::info!("got successful captive portal response"); + +// Ok(()) +// } +// } diff --git a/iroh-net/src/relay/iroh_relay.rs b/iroh-net/src/relay/iroh_relay.rs index e08e870236..128a37ec10 100644 --- a/iroh-net/src/relay/iroh_relay.rs +++ b/iroh-net/src/relay/iroh_relay.rs @@ -1,7 +1,8 @@ //! A full-fledged iroh-relay server. //! //! This module provides an API to create a full fledged iroh-relay server. It is primarily -//! used by the `iroh-relay` binary in this crate. +//! used by the `iroh-relay` binary in this crate. It can be used to run a relay server in +//! other locations however. use std::fmt; use std::future::Future; @@ -21,7 +22,7 @@ use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument use crate::key::SecretKey; use crate::relay; -use crate::relay::http::{ServerBuilder as DerpServerBuilder, TlsAcceptor}; +use crate::relay::http::{ServerBuilder as RelayServerBuilder, TlsAcceptor}; use crate::stun; use crate::util::AbortingJoinHandle; @@ -34,11 +35,9 @@ const NOTFOUND: &[u8] = b"Not Found"; const RELAY_DISABLED: &[u8] = b"derp server disabled"; const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n"; const INDEX: &[u8] = br#" -

DERP

+

Iroh Relay

- This is an - Iroh DERP - server. + This is an Iroh Relay server.

"#; const TLS_HEADERS: [(&str, &str); 2] = [ @@ -57,13 +56,13 @@ fn body_empty() -> BytesBody { /// Configuration for the full Relay & STUN server. #[derive(Debug)] -#[non_exhaustive] pub struct ServerConfig { /// Configuration for the DERP server, disabled if `None`. pub relay: Option>, /// Configuration for the STUN server, disabled if `None`. pub stun: Option, /// Socket to serve metrics on. + #[cfg(feature = "metrics")] pub metrics_addr: Option, } @@ -87,7 +86,7 @@ impl ServerConfig { } if let Some(derp) = &self.relay { if let Some(tls) = &derp.tls { - if derp.bind_addr == tls.http_bind_addr { + if derp.http_bind_addr == tls.https_bind_addr { bail!("derp port conflicts with captive portal port"); } } @@ -96,29 +95,33 @@ impl ServerConfig { } } -/// Configuration for the Relay server. +/// Configuration for the Relay HTTP and HTTPS server. /// -/// This includes the HTTP services hosted by the Relay server, the Relay HTTP endpoint is -/// only one of the services served. +/// This includes the HTTP services hosted by the Relay server, the Relay `/derp` HTTP +/// endpoint is only one of the services served. #[derive(Debug)] -#[non_exhaustive] pub struct RelayConfig { - /// The socket address on which the relay server should bind. - /// - /// Normally you'd choose port `80` if configured without TLS and port `443` when - /// configured with TLS since the DERP server is an HTTP server. - pub bind_addr: SocketAddr, /// The iroh secret key of the Relay server. pub secret_key: SecretKey, - /// TLS configuration, no TLS is used if `None`. + /// The socket address on which the Relay HTTP server should bind. + /// + /// Normally you'd choose port `80`. The bind address for the HTTPS server is + /// configured in [`RelayConfig::tls`]. + /// + /// If [`RelayConfig::tls`] is `None` then this serves all the HTTP services without + /// TLS. + pub http_bind_addr: SocketAddr, + /// TLS configuration for the HTTPS servier. + /// + /// If *None* all the HTTP services that would be served here are served from + /// [`RelayConfig::bind_addr`]. pub tls: Option>, - /// Rate limits, if enabled. - pub limits: Option, + /// Rate limits. + pub limits: Limits, } /// Configuration for the STUN server. #[derive(Debug)] -#[non_exhaustive] pub struct StunConfig { /// The socket address on which the STUN server should bind. /// @@ -126,29 +129,28 @@ pub struct StunConfig { pub bind_addr: SocketAddr, } -/// TLS configuration for DERP server. +/// TLS configuration for Relay server. /// -/// Normally the DERP server accepts connections on HTTPS. +/// Normally the Relay server accepts connections on both HTTPS and HTTP. #[derive(Debug)] -#[non_exhaustive] pub struct TlsConfig { - /// Mode for getting a cert. - pub cert: CertConfig, - /// Hostname to use for the certificate, must match the certificate. - pub hostname: String, - /// The socket address on which to serve plain text HTTP requests. + /// The socket address on which to serve the HTTPS server. /// /// Since the captive portal probe has to run over plain text HTTP and TLS is used for /// the main relay server this has to be on a different port. When TLS is not enabled /// this is served on the [`RelayConfig::bind_addr`] socket address. /// /// Normally you'd choose port `80`. - pub http_bind_addr: SocketAddr, + pub https_bind_addr: SocketAddr, + /// Mode for getting a cert. + pub cert: CertConfig, + /// Hostname to use for the certificate, must match the certificate. + // TODO: do we use this field? + pub hostname: String, } /// Rate limits. -#[derive(Debug)] -#[non_exhaustive] +#[derive(Debug, Default)] pub struct Limits { /// Rate limit for accepting new connection. Unlimited if not set. pub accept_conn_limit: Option, @@ -158,7 +160,6 @@ pub struct Limits { /// TLS certificate configuration. #[derive(derive_more::Debug)] -#[non_exhaustive] pub enum CertConfig { /// Use Let's Encrypt. LetsEncrypt { @@ -194,7 +195,7 @@ pub struct Server { /// Handle to the relay server. relay_handle: Option, /// The main task running the server. - _supervisor: AbortingJoinHandle>, + supervisor: AbortingJoinHandle>, } impl Server { @@ -207,6 +208,20 @@ impl Server { config.validate()?; let mut tasks = JoinSet::new(); + #[cfg(feature = "metrics")] + if let Some(addr) = config.metrics_addr { + use iroh_metrics::core::Metric; + + iroh_metrics::core::Core::init(|reg, metrics| { + metrics.insert(crate::metrics::RelayMetrics::new(reg)); + metrics.insert(StunMetrics::new(reg)); + }); + tasks.spawn( + iroh_metrics::metrics::start_metrics_server(addr) + .instrument(info_span!("metrics-server")), + ); + } + // Start the STUN server. let stun_addr = match config.stun { Some(stun) => match UdpSocket::bind(stun.bind_addr).await { @@ -229,7 +244,11 @@ impl Server { for (name, value) in TLS_HEADERS.iter() { headers.insert(*name, value.parse()?); } - let mut builder = DerpServerBuilder::new(relay_config.bind_addr) + let relay_bind_addr = match relay_config.tls { + Some(ref tls) => tls.https_bind_addr, + None => relay_config.http_bind_addr, + }; + let mut builder = RelayServerBuilder::new(relay_bind_addr) .secret_key(Some(relay_config.secret_key)) .headers(headers) .relay_override(Box::new(relay_disabled_handler)) @@ -282,7 +301,7 @@ impl Server { // Some services always need to be served over HTTP without TLS. Run // these standalone. - let http_listener = TcpListener::bind(&tls_config.http_bind_addr) + let http_listener = TcpListener::bind(&relay_config.http_bind_addr) .await .context("failed to bind http")?; let http_addr = http_listener.local_addr()?; @@ -293,8 +312,8 @@ impl Server { Some(http_addr) } None => { - // If running DERP without TLS add the plain HTTP server directly to the - // DERP server. + // If running DERP without TLS add the plain HTTP server directly to + // the DERP server. builder = builder.request_handler( Method::GET, "/generate_204", @@ -319,16 +338,28 @@ impl Server { stun_addr, https_addr: http_addr.and_then(|_| relay_addr), relay_handle, - _supervisor: AbortingJoinHandle::from(task), + supervisor: AbortingJoinHandle::from(task), }) } - /// Graceful shutdown. - pub async fn shutdown(self) { - // Only the Relay server needs shutting down, all other services only abort on drop. + /// Requests graceful shutdown. + /// + /// Returns once all server tasks have stopped. + pub async fn shutdown(self) -> Result<()> { + // Only the Relay server needs shutting down, the supervisor will abort the tasks in + // the JoinSet when the server terminates. if let Some(handle) = self.relay_handle { handle.shutdown(); } + self.supervisor.await? + } + + /// Returns the handle for the task. + /// + /// This allows waiting for the server's supervisor task to finish. Can be useful in + /// case there is an error in the server before it is shut down. + pub fn task_handle(&mut self) -> &mut AbortingJoinHandle> { + &mut self.supervisor } /// The socket address the HTTPS server is listening on. @@ -363,6 +394,7 @@ impl Drop for RelayHttpServerGuard { /// Supervisor for the relay server tasks. /// /// As soon as one of the tasks exits, all other tasks are stopped and the server stops. +/// The supervisor finishes once all tasks are finished. #[instrument(skip_all)] async fn relay_supervisor( mut tasks: JoinSet>, @@ -399,6 +431,8 @@ async fn relay_supervisor( // already shut down. The JoinSet is aborted on drop. relay_http_server.map(|server| server.0.shutdown()); + tasks.shutdown().await; + ret } diff --git a/iroh-net/src/test_utils.rs b/iroh-net/src/test_utils.rs index f8abcd7e0e..45b73d963c 100644 --- a/iroh-net/src/test_utils.rs +++ b/iroh-net/src/test_utils.rs @@ -38,7 +38,7 @@ pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, crate::relay::iro let config = ServerConfig { relay: Some(RelayConfig { - bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), secret_key, tls: Some(TlsConfig { cert: CertConfig::<(), ()>::Manual { @@ -46,9 +46,9 @@ pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, crate::relay::iro certs: vec![rustls_cert], }, hostname: "localhost".to_string(), - http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + https_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), }), - limits: None, + limits: Default::default(), }), stun: Some(StunConfig { bind_addr: (Ipv4Addr::LOCALHOST, 0).into(),