diff --git a/src/congestion.rs b/src/congestion.rs index db2ca9c..d692802 100644 --- a/src/congestion.rs +++ b/src/congestion.rs @@ -5,6 +5,7 @@ use std::time::{Duration, Instant}; pub(crate) const DEFAULT_TARGET_MICROS: u32 = 100_000; pub(crate) const DEFAULT_INITIAL_TIMEOUT: Duration = Duration::from_secs(1); pub(crate) const DEFAULT_MIN_TIMEOUT: Duration = Duration::from_millis(500); +pub(crate) const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60); pub(crate) const DEFAULT_MAX_PACKET_SIZE_BYTES: u32 = 1024; const DEFAULT_GAIN: f32 = 1.0; const DEFAULT_DELAY_WINDOW: Duration = Duration::from_secs(120); @@ -41,6 +42,7 @@ pub struct Config { pub target_delay_micros: u32, pub initial_timeout: Duration, pub min_timeout: Duration, + pub max_timeout: Duration, pub max_packet_size_bytes: u32, pub max_window_size_inc_bytes: u32, pub gain: f32, @@ -53,6 +55,7 @@ impl Default for Config { target_delay_micros: DEFAULT_TARGET_MICROS, initial_timeout: DEFAULT_INITIAL_TIMEOUT, min_timeout: DEFAULT_MIN_TIMEOUT, + max_timeout: DEFAULT_MAX_TIMEOUT, max_packet_size_bytes: DEFAULT_MAX_PACKET_SIZE_BYTES, max_window_size_inc_bytes: DEFAULT_MAX_PACKET_SIZE_BYTES, gain: DEFAULT_GAIN, @@ -66,6 +69,7 @@ pub struct Controller { target_delay_micros: u32, timeout: Duration, min_timeout: Duration, + max_timeout: Duration, window_size_bytes: u32, max_window_size_bytes: u32, min_window_size_bytes: u32, @@ -84,6 +88,7 @@ impl Controller { target_delay_micros: config.target_delay_micros, timeout: config.initial_timeout, min_timeout: config.min_timeout, + max_timeout: config.max_timeout, window_size_bytes: 0, max_window_size_bytes: 2 * config.max_packet_size_bytes, min_window_size_bytes: 2 * config.max_packet_size_bytes, @@ -260,7 +265,7 @@ impl Controller { /// Registers a timeout with the controller. pub fn on_timeout(&mut self) { self.max_window_size_bytes = self.min_window_size_bytes; - self.timeout *= 2; + self.timeout = cmp::min(self.timeout * 2, self.max_timeout); } /// Adjusts the maximum window (i.e. congestion window) by `adjustment`, keeping the size of @@ -288,10 +293,14 @@ impl Controller { /// /// The congestion timeout cannot fall below the configured minimum. fn apply_timeout_adjustment(&mut self) { + // Do not let timeout go below minimum. self.timeout = cmp::max( self.rtt + Duration::from_micros(self.rtt_variance_micros * 4), self.min_timeout, ); + + // Do not let timeout go above maximum. + self.timeout = cmp::min(self.timeout, self.max_timeout) } } @@ -675,6 +684,22 @@ mod tests { assert_eq!(ctrl.max_window_size_bytes, ctrl.min_window_size_bytes); assert_eq!(ctrl.timeout, initial_timeout * 2); } + + #[test] + fn on_timeout_not_exceed_max() { + const MAX_TIMEOUT: Duration = Duration::from_secs(3); + let config = Config { + initial_timeout: Duration::from_secs(2), + max_timeout: MAX_TIMEOUT, + ..Default::default() + }; + + let mut ctrl = Controller::new(config); + + // Register a timeout. + ctrl.on_timeout(); + assert_eq!(ctrl.timeout, MAX_TIMEOUT); + } } mod delay_accumulator { diff --git a/src/conn.rs b/src/conn.rs index a90fd8e..f8f8149 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -95,17 +95,20 @@ pub struct ConnectionConfig { pub max_idle_timeout: Duration, pub initial_timeout: Duration, pub min_timeout: Duration, + pub max_timeout: Duration, pub target_delay: Duration, } impl Default for ConnectionConfig { fn default() -> Self { + let max_idle_timeout = Duration::from_secs(10); Self { max_conn_attempts: 3, - max_idle_timeout: Duration::from_secs(10), + max_idle_timeout, max_packet_size: congestion::DEFAULT_MAX_PACKET_SIZE_BYTES as u16, initial_timeout: congestion::DEFAULT_INITIAL_TIMEOUT, min_timeout: congestion::DEFAULT_MIN_TIMEOUT, + max_timeout: max_idle_timeout, target_delay: Duration::from_micros(congestion::DEFAULT_TARGET_MICROS.into()), } } @@ -117,6 +120,7 @@ impl From for congestion::Config { max_packet_size_bytes: u32::from(value.max_packet_size), initial_timeout: value.initial_timeout, min_timeout: value.min_timeout, + max_timeout: value.max_timeout, target_delay_micros: value.target_delay.as_micros() as u32, ..Default::default() } @@ -136,6 +140,7 @@ pub struct Connection { readable: Notify, pending_writes: VecDeque, writable: Notify, + latest_timeout: Option, } impl Connection { @@ -176,6 +181,7 @@ impl Connection { readable: Notify::new(), pending_writes: VecDeque::new(), writable: Notify::new(), + latest_timeout: None, } } @@ -642,7 +648,21 @@ impl Connection { return; } - sent_packets.on_timeout(); + // To prevent timeout amplification in the event that a batch of packets sent near + // the same time all timeout, we only register a new timeout if the time elapsed + // since the latest timeout is greater than the existing timeout. + // + // For example, if the current congestion control timeout is 1s, then we only + // register a new timeout if the time elapsed since the latest registered timeout + // is greater than 1s. + let timeout = match self.latest_timeout { + Some(latest) => latest.elapsed() > sent_packets.timeout(), + None => true, + }; + if timeout { + sent_packets.on_timeout(); + self.latest_timeout = Some(Instant::now()); + } // TODO: Limit number of retransmissions. let recv_window = recv_buf.available() as u32; @@ -1125,6 +1145,7 @@ mod test { readable: Notify::new(), pending_writes: VecDeque::new(), writable: Notify::new(), + latest_timeout: None, } }