Skip to content

Commit

Permalink
Update RTT to use RTCP RTT for nack interval and diagnostic lines
Browse files Browse the repository at this point in the history
  • Loading branch information
adel-signal authored Oct 10, 2024
1 parent 82da50e commit 9657f12
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 19 deletions.
23 changes: 18 additions & 5 deletions backend/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const NACK_CALCULATION_INTERVAL: Duration = Duration::from_millis(20);
// tick interval ever decreases.
const ACK_CALCULATION_INTERVAL: Duration = Duration::from_millis(100);

const RTCP_REPORT_INTERVAL: Duration = Duration::from_secs(5);
pub const RTCP_REPORT_INTERVAL: Duration = Duration::from_secs(5);

pub type PacketToSend = Vec<u8>;

Expand Down Expand Up @@ -571,6 +571,9 @@ impl Connection {
packets_to_send: &mut Vec<(PacketToSend, SocketLocator)>,
now: Instant,
) {
// allow the client some time to queue/pace the RTX
const RTT_GRACE_MULTIPLIER: f64 = 1.1;

if let Some(nacks_sent) = self.rtp.nacks_sent {
if now < nacks_sent + NACK_CALCULATION_INTERVAL {
// We sent NACKs recently. Wait to resend/recalculate them.
Expand All @@ -579,7 +582,7 @@ impl Connection {
}

if let Some(outgoing_addr) = self.outgoing_addr {
let rtt = self.rtt();
let rtt = self.rtt(now).mul_f64(RTT_GRACE_MULTIPLIER);
let rtp_endpoint = &mut self.rtp.endpoint;

let mut bytes = 0;
Expand Down Expand Up @@ -623,7 +626,7 @@ impl Connection {
self.congestion_control.pacer.queue_delay(now)
}

pub fn rtp_endpoint_stats(&mut self, now: Instant) -> rtp::EndpointStats {
pub fn rtp_endpoint_stats(&mut self, now: Instant) -> &rtp::EndpointStats {
self.rtp.endpoint.update_stats(now)
}

Expand All @@ -642,8 +645,18 @@ impl Connection {
}
}

pub fn rtt(&self) -> Duration {
self.congestion_control.controller.rtt()
pub fn rtt(&mut self, now: Instant) -> Duration {
// Congestion Controller RTT tends to be higher and more reactive, switch when delta is large
const RTCP_RTT_LAG_THRESHOLD: Duration = Duration::from_millis(250);

let cc_rtt = self.congestion_control.controller.rtt();
if let Some(rtcp_rtt) = self.rtp.endpoint.get_or_update_stats(now).rtt_estimate {
if cc_rtt.abs_diff(rtcp_rtt) < RTCP_RTT_LAG_THRESHOLD {
return rtcp_rtt;
}
}

cc_rtt
}

pub fn region_relation(&self) -> RegionRelation {
Expand Down
40 changes: 37 additions & 3 deletions backend/src/rtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ pub struct Endpoint {

// For RTX
rtx_sender: RtxSender,

// For Endpoint stats
last_stats_calculated_time: Instant,
last_stats: Option<EndpointStats>,
}

struct IncomingSsrcState {
Expand All @@ -180,6 +184,18 @@ pub struct EndpointStats {
pub rtt_estimate: Option<Duration>,
}

impl AsRef<EndpointStats> for EndpointStats {
fn as_ref(&self) -> &EndpointStats {
self
}
}

impl AsRef<Endpoint> for Endpoint {
fn as_ref(&self) -> &Endpoint {
self
}
}

impl Endpoint {
pub fn new(
decrypt: KeysAndSalts,
Expand All @@ -202,6 +218,9 @@ impl Endpoint {
max_received_tcc_seqnum: 0,

rtx_sender: RtxSender::new(PACKET_LIFETIME),

last_stats: None,
last_stats_calculated_time: now,
}
}

Expand Down Expand Up @@ -640,15 +659,30 @@ impl Endpoint {
Some(encrypted)
}

pub fn update_stats(&mut self, now: Instant) -> EndpointStats {
pub fn get_or_update_stats(&mut self, now: Instant) -> &EndpointStats {
// destructuring here causes wrong lifetimes (https://github.com/rust-lang/rust/issues/54663)
if self.last_stats.is_some()
&& now.saturating_duration_since(self.last_stats_calculated_time)
< RTT_ESTIMATE_AGE_LIMIT
{
return self.last_stats.as_ref().unwrap();
}

self.update_stats(now)
}

pub fn update_stats(&mut self, now: Instant) -> &EndpointStats {
let (remembered_packet_count, remembered_packet_bytes) =
self.rtx_sender.remembered_packet_stats();

EndpointStats {
self.last_stats = Some(EndpointStats {
remembered_packet_count,
remembered_packet_bytes,
rtt_estimate: self.calculate_rtt(now),
}
});
self.last_stats_calculated_time = now;

self.last_stats.as_ref().unwrap()
}

/// Average RTT estimates across all SSRCS that are not too old.
Expand Down
17 changes: 8 additions & 9 deletions backend/src/rtp/rtcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@

use super::{Packet, OPUS_PAYLOAD_TYPE, VP8_PAYLOAD_TYPE};

use std::{
convert::TryFrom,
ops::{Range, RangeInclusive},
};

use crate::transportcc as tcc;
use aes::cipher::{generic_array::GenericArray, KeyInit};
use aes_gcm::{AeadInPlace, Aes128Gcm};
use calling_common::{
parse_u16, parse_u24, parse_u32, parse_u64, round_up_to_multiple_of, CheckedSplitAt, Duration,
Instant, Writable, Writer, U24,
};
use log::*;

use crate::transportcc as tcc;
use std::{
convert::TryFrom,
ops::{Range, RangeInclusive},
};

use super::{
nack::{parse_nack, Nack},
Expand Down Expand Up @@ -48,8 +46,9 @@ pub const RTCP_FORMAT_NACK: u8 = 1;
pub const RTCP_FORMAT_TRANSPORT_CC: u8 = 15;
pub const RTCP_TYPE_SPECIFIC_FEEDBACK: u8 = 206;
pub const RTCP_FORMAT_PLI: u8 = 1;
pub const RTT_ESTIMATE_AGE_LIMIT: Duration = Duration::from_secs(15);
const RTT_ESTIMATE_AGE_LIMIT_SECS_F64: f64 = 15.0;
// 2 * RTCP_REPORT_INTERVAL + 1 second
pub const RTT_ESTIMATE_AGE_LIMIT: Duration = Duration::from_secs(11);
const RTT_ESTIMATE_AGE_LIMIT_SECS_F64: f64 = 11.0;

#[derive(Default, Debug)]
pub struct ControlPacket<'packet> {
Expand Down
4 changes: 2 additions & 2 deletions backend/src/sfu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ impl Sfu {
let tags = CONNECTION_TAG_VALUES.get(&(call_size_bucket, region_relation));

let stats = connection.rtp_endpoint_stats(now);
let rates = connection.current_rates(now);
if let Some(rtt_estimate) = stats.rtt_estimate {
client_rtt_ms
.entry(tags)
Expand All @@ -365,6 +364,7 @@ impl Sfu {
}
remembered_packet_count.push(stats.remembered_packet_count);
remembered_packet_bytes.push(stats.remembered_packet_bytes);
let rates = connection.current_rates(now);
if let Some(addr_type) = connection.outgoing_addr_type() {
match addr_type {
AddressType::UdpV4 => udp_v4_connections += 1,
Expand Down Expand Up @@ -940,7 +940,7 @@ impl Sfu {
let rtt = if let Some(connection) =
self.get_connection_from_id(&connection_id)
{
connection.lock().rtt().as_millis()
connection.lock().rtt(now).as_millis()
} else {
0
};
Expand Down
8 changes: 8 additions & 0 deletions common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ impl Duration {
self.0.as_nanos()
}

pub fn mul_f64(&self, rhs: f64) -> Duration {
self.0.mul_f64(rhs).into()
}

pub fn abs_diff(&self, other: Duration) -> Duration {
self.0.abs_diff(other.0).into()
}

pub const fn subsec_nanos(&self) -> u32 {
self.0.subsec_nanos()
}
Expand Down

0 comments on commit 9657f12

Please sign in to comment.