diff --git a/backend/src/connection.rs b/backend/src/connection.rs index cd4cbe3..c52100b 100644 --- a/backend/src/connection.rs +++ b/backend/src/connection.rs @@ -32,7 +32,7 @@ const NACK_CALCULATION_INTERVAL: Duration = Duration::from_millis(20); // tick interval ever decreases. const ACK_CALCULATION_INTERVAL: Duration = Duration::from_millis(100); -const RTCP_REPORT_INTERVAL: Duration = Duration::from_secs(5); +pub const RTCP_REPORT_INTERVAL: Duration = Duration::from_secs(5); pub type PacketToSend = Vec; @@ -571,6 +571,9 @@ impl Connection { packets_to_send: &mut Vec<(PacketToSend, SocketLocator)>, now: Instant, ) { + // allow the client some time to queue/pace the RTX + const RTT_GRACE_MULTIPLIER: f64 = 1.1; + if let Some(nacks_sent) = self.rtp.nacks_sent { if now < nacks_sent + NACK_CALCULATION_INTERVAL { // We sent NACKs recently. Wait to resend/recalculate them. @@ -579,7 +582,7 @@ impl Connection { } if let Some(outgoing_addr) = self.outgoing_addr { - let rtt = self.rtt(); + let rtt = self.rtt(now).mul_f64(RTT_GRACE_MULTIPLIER); let rtp_endpoint = &mut self.rtp.endpoint; let mut bytes = 0; @@ -623,7 +626,7 @@ impl Connection { self.congestion_control.pacer.queue_delay(now) } - pub fn rtp_endpoint_stats(&mut self, now: Instant) -> rtp::EndpointStats { + pub fn rtp_endpoint_stats(&mut self, now: Instant) -> &rtp::EndpointStats { self.rtp.endpoint.update_stats(now) } @@ -642,8 +645,18 @@ impl Connection { } } - pub fn rtt(&self) -> Duration { - self.congestion_control.controller.rtt() + pub fn rtt(&mut self, now: Instant) -> Duration { + // Congestion Controller RTT tends to be higher and more reactive, switch when delta is large + const RTCP_RTT_LAG_THRESHOLD: Duration = Duration::from_millis(250); + + let cc_rtt = self.congestion_control.controller.rtt(); + if let Some(rtcp_rtt) = self.rtp.endpoint.get_or_update_stats(now).rtt_estimate { + if cc_rtt.abs_diff(rtcp_rtt) < RTCP_RTT_LAG_THRESHOLD { + return rtcp_rtt; + } + } + + cc_rtt } pub fn region_relation(&self) -> RegionRelation { diff --git a/backend/src/rtp.rs b/backend/src/rtp.rs index 4ba659a..aad3751 100644 --- a/backend/src/rtp.rs +++ b/backend/src/rtp.rs @@ -161,6 +161,10 @@ pub struct Endpoint { // For RTX rtx_sender: RtxSender, + + // For Endpoint stats + last_stats_calculated_time: Instant, + last_stats: Option, } struct IncomingSsrcState { @@ -180,6 +184,18 @@ pub struct EndpointStats { pub rtt_estimate: Option, } +impl AsRef for EndpointStats { + fn as_ref(&self) -> &EndpointStats { + self + } +} + +impl AsRef for Endpoint { + fn as_ref(&self) -> &Endpoint { + self + } +} + impl Endpoint { pub fn new( decrypt: KeysAndSalts, @@ -202,6 +218,9 @@ impl Endpoint { max_received_tcc_seqnum: 0, rtx_sender: RtxSender::new(PACKET_LIFETIME), + + last_stats: None, + last_stats_calculated_time: now, } } @@ -640,15 +659,30 @@ impl Endpoint { Some(encrypted) } - pub fn update_stats(&mut self, now: Instant) -> EndpointStats { + pub fn get_or_update_stats(&mut self, now: Instant) -> &EndpointStats { + // destructuring here causes wrong lifetimes (https://github.com/rust-lang/rust/issues/54663) + if self.last_stats.is_some() + && now.saturating_duration_since(self.last_stats_calculated_time) + < RTT_ESTIMATE_AGE_LIMIT + { + return self.last_stats.as_ref().unwrap(); + } + + self.update_stats(now) + } + + pub fn update_stats(&mut self, now: Instant) -> &EndpointStats { let (remembered_packet_count, remembered_packet_bytes) = self.rtx_sender.remembered_packet_stats(); - EndpointStats { + self.last_stats = Some(EndpointStats { remembered_packet_count, remembered_packet_bytes, rtt_estimate: self.calculate_rtt(now), - } + }); + self.last_stats_calculated_time = now; + + self.last_stats.as_ref().unwrap() } /// Average RTT estimates across all SSRCS that are not too old. diff --git a/backend/src/rtp/rtcp.rs b/backend/src/rtp/rtcp.rs index 82616e0..fd1744e 100644 --- a/backend/src/rtp/rtcp.rs +++ b/backend/src/rtp/rtcp.rs @@ -5,11 +5,7 @@ use super::{Packet, OPUS_PAYLOAD_TYPE, VP8_PAYLOAD_TYPE}; -use std::{ - convert::TryFrom, - ops::{Range, RangeInclusive}, -}; - +use crate::transportcc as tcc; use aes::cipher::{generic_array::GenericArray, KeyInit}; use aes_gcm::{AeadInPlace, Aes128Gcm}; use calling_common::{ @@ -17,8 +13,10 @@ use calling_common::{ Instant, Writable, Writer, U24, }; use log::*; - -use crate::transportcc as tcc; +use std::{ + convert::TryFrom, + ops::{Range, RangeInclusive}, +}; use super::{ nack::{parse_nack, Nack}, @@ -48,8 +46,9 @@ pub const RTCP_FORMAT_NACK: u8 = 1; pub const RTCP_FORMAT_TRANSPORT_CC: u8 = 15; pub const RTCP_TYPE_SPECIFIC_FEEDBACK: u8 = 206; pub const RTCP_FORMAT_PLI: u8 = 1; -pub const RTT_ESTIMATE_AGE_LIMIT: Duration = Duration::from_secs(15); -const RTT_ESTIMATE_AGE_LIMIT_SECS_F64: f64 = 15.0; +// 2 * RTCP_REPORT_INTERVAL + 1 second +pub const RTT_ESTIMATE_AGE_LIMIT: Duration = Duration::from_secs(11); +const RTT_ESTIMATE_AGE_LIMIT_SECS_F64: f64 = 11.0; #[derive(Default, Debug)] pub struct ControlPacket<'packet> { diff --git a/backend/src/sfu.rs b/backend/src/sfu.rs index cb7ae64..588f7aa 100644 --- a/backend/src/sfu.rs +++ b/backend/src/sfu.rs @@ -356,7 +356,6 @@ impl Sfu { let tags = CONNECTION_TAG_VALUES.get(&(call_size_bucket, region_relation)); let stats = connection.rtp_endpoint_stats(now); - let rates = connection.current_rates(now); if let Some(rtt_estimate) = stats.rtt_estimate { client_rtt_ms .entry(tags) @@ -365,6 +364,7 @@ impl Sfu { } remembered_packet_count.push(stats.remembered_packet_count); remembered_packet_bytes.push(stats.remembered_packet_bytes); + let rates = connection.current_rates(now); if let Some(addr_type) = connection.outgoing_addr_type() { match addr_type { AddressType::UdpV4 => udp_v4_connections += 1, @@ -940,7 +940,7 @@ impl Sfu { let rtt = if let Some(connection) = self.get_connection_from_id(&connection_id) { - connection.lock().rtt().as_millis() + connection.lock().rtt(now).as_millis() } else { 0 }; diff --git a/common/src/time.rs b/common/src/time.rs index 8f6e2a8..4f91163 100644 --- a/common/src/time.rs +++ b/common/src/time.rs @@ -209,6 +209,14 @@ impl Duration { self.0.as_nanos() } + pub fn mul_f64(&self, rhs: f64) -> Duration { + self.0.mul_f64(rhs).into() + } + + pub fn abs_diff(&self, other: Duration) -> Duration { + self.0.abs_diff(other.0).into() + } + pub const fn subsec_nanos(&self) -> u32 { self.0.subsec_nanos() }