Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve utp Resiliance #69

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 69 additions & 21 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::{Duration, Instant};

use delay_map::HashMapDelay;
use futures::StreamExt;
use tokio::sync::{mpsc, oneshot, Notify};
use tokio::sync::{mpsc, oneshot, Notify, watch};

use crate::cid::{ConnectionId, ConnectionPeer};
use crate::congestion;
Expand Down Expand Up @@ -101,14 +101,14 @@ pub struct ConnectionConfig {

impl Default for ConnectionConfig {
fn default() -> Self {
let max_idle_timeout = Duration::from_secs(10);
let max_idle_timeout = Duration::from_secs(30);
Self {
max_conn_attempts: 3,
max_conn_attempts: 5,
max_idle_timeout,
max_packet_size: congestion::DEFAULT_MAX_PACKET_SIZE_BYTES as u16,
initial_timeout: congestion::DEFAULT_INITIAL_TIMEOUT,
min_timeout: congestion::DEFAULT_MIN_TIMEOUT,
max_timeout: max_idle_timeout,
max_timeout: max_idle_timeout / 4,
target_delay: Duration::from_micros(congestion::DEFAULT_TARGET_MICROS.into()),
}
}
Expand Down Expand Up @@ -141,6 +141,7 @@ pub struct Connection<const N: usize, P> {
pending_writes: VecDeque<Write>,
writable: Notify,
latest_timeout: Option<Instant>,
stress_tx: watch::Sender<bool>,
}

impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
Expand All @@ -150,6 +151,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
syn: Option<Packet>,
connected: oneshot::Sender<io::Result<()>>,
socket_events: mpsc::UnboundedSender<SocketEvent<P>>,
stress_tx: watch::Sender<bool>,
) -> Self {
let (endpoint, peer_ts_diff, peer_recv_window) = match syn {
Some(syn) => {
Expand Down Expand Up @@ -182,6 +184,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
pending_writes: VecDeque::new(),
writable: Notify::new(),
latest_timeout: None,
stress_tx,
}
}

Expand Down Expand Up @@ -249,9 +252,16 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
tokio::pin!(idle_timeout);
loop {
tokio::select! {
biased;
Some(event) = stream_events.recv() => {
match event {
StreamEvent::Incoming(packet) => {
StreamEvent::Incoming(packet, receive_time) => {
let queue_time = Instant::now() - receive_time;
if queue_time > Duration::from_millis(40) {
tracing::debug!(?queue_time, "incoming packet queued for too long");
} else if queue_time > Duration::from_millis(400) {
tracing::warn!(?queue_time, "incoming packet queued for way too long");
}
// Reset the idle timeout on any incoming packet.
let idle_deadline = tokio::time::Instant::now() + self.config.max_idle_timeout;
idle_timeout.as_mut().reset(idle_deadline);
Expand All @@ -263,12 +273,6 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
}
}
}
Some(Ok(timeout)) = self.unacked.next() => {
let (seq, packet) = timeout;
tracing::debug!(seq, ack = %packet.ack_num(), packet = ?packet.packet_type(), "timeout");

self.on_timeout(packet, Instant::now());
}
Some(write) = writes.recv(), if !shutting_down => {
// Reset the idle timeout on any new write.
let idle_deadline = tokio::time::Instant::now() + self.config.max_idle_timeout;
Expand All @@ -285,10 +289,27 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
_ = self.writable.notified() => {
self.process_writes(Instant::now());
}
Some(Ok(timeout)) = self.unacked.next() => {
let (_seq, packet) = timeout;
//tracing::debug!(seq, ack = %packet.ack_num(), packet = ?packet.packet_type(), "timeout");

self.on_timeout(packet, Instant::now());
}
() = &mut idle_timeout => {
if !std::matches!(self.state, State::Closed { .. }) {
let unacked: Vec<u16> = self.unacked.keys().copied().collect();
tracing::warn!(?unacked, "idle timeout expired, closing...");
match self.state {
State::Closing { local_fin, remote_fin, .. } => {
tracing::warn!(?unacked, ?local_fin, ?remote_fin, "idle timeout expired while closing...");
}
State::Established { .. } => {
tracing::warn!(?unacked, "idle timeout expired in established connection, closing...");
}
State::Connecting { .. } => {
tracing::warn!(?unacked, "idle timeout expired while connecting, closing...");
}
State::Closed { .. } => unreachable!("In an if block that excludes the closed state"),
}

self.state = State::Closed { err: Some(Error::TimedOut) };
}
Expand Down Expand Up @@ -319,6 +340,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
break;
}
}
self.stress_tx.send_replace(false);
}

fn shutdown(&mut self) {
Expand Down Expand Up @@ -501,6 +523,12 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
);
seq_num = seq_num.wrapping_add(1);
}
// Channel is no longer stressed, notify watchers
let was_stressed = *self.stress_tx.borrow();
if was_stressed {
// always fills up window, so local stress seems to be gone
self.stress_tx.send_replace(false);
}
}

fn on_write(&mut self, write: Write) {
Expand Down Expand Up @@ -620,14 +648,17 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
self.state = State::Closed { err: Some(err) };
} else {
let seq = *syn;
*attempts += 1;
let new_attempt_count = *attempts + 1;
*attempts = new_attempt_count;
let packet = self.syn_packet(seq);
let _ = self.socket_events.send(SocketEvent::Outgoing((
packet.clone(),
self.cid.peer.clone(),
)));
let timeout = self.config.initial_timeout * 2u32.pow((new_attempt_count-1).try_into().unwrap());
tracing::debug!("Timeout on SYN, retrying in {}s", timeout.as_secs());
self.unacked
.insert_at(seq, packet, self.config.initial_timeout);
.insert_at(seq, packet, timeout);
}
}
Endpoint::Acceptor(..) => {}
Expand All @@ -647,6 +678,9 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
if std::matches!(packet.packet_type(), PacketType::Syn) {
return;
}
if !*self.stress_tx.borrow() {
self.stress_tx.send_replace(true);
}

// To prevent timeout amplification in the event that a batch of packets sent near
// the same time all timeout, we only register a new timeout if the time elapsed
Expand Down Expand Up @@ -739,9 +773,12 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
match packet.packet_type() {
PacketType::Syn | PacketType::Fin | PacketType::Data => {
if let Some(state) = self.state_packet() {
self.socket_events
.send(SocketEvent::Outgoing((state, self.cid.peer.clone())))
.expect("outgoing channel should be open if connection is not closed");
let send_result = self.socket_events
.send(SocketEvent::Outgoing((state, self.cid.peer.clone())));
if send_result.is_err() {
tracing::error!("Cannot send STATE packet: socket_events closed");
return;
}
}
}
PacketType::State | PacketType::Reset => {}
Expand Down Expand Up @@ -830,9 +867,16 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
}
Endpoint::Acceptor(..) => {}
},
State::Established { sent_packets, .. } | State::Closing { sent_packets, .. } => {
State::Established { send_buf, sent_packets, .. } | State::Closing { send_buf, sent_packets, .. } => {
let range = sent_packets.seq_num_range();
if range.contains(ack_num) {
// If connection is not already marked as stressed, then
// mark it as stressed now: the full window isn't being used anymore
if !*self.stress_tx.borrow() {
let is_stressed = !send_buf.is_empty();
self.stress_tx.send_replace(is_stressed);
}

// Do not ACK if ACK num corresponds to initial packet.
if ack_num != range.start() {
sent_packets.on_ack(ack_num, selective_ack, delay, now);
Expand Down Expand Up @@ -1106,9 +1150,10 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {

sent_packets.on_transmit(packet.seq_num(), packet.packet_type(), payload, len, now);
unacked.insert_at(packet.seq_num(), packet.clone(), sent_packets.timeout());
socket_events
.send(SocketEvent::Outgoing((packet, dest.clone())))
.expect("outgoing channel should be open if connection is not closed");
let send_result = socket_events.send(SocketEvent::Outgoing((packet, dest.clone())));
if send_result.is_err() {
tracing::error!("Cannot transmit packet: socket_events closed");
}
}
}

Expand All @@ -1132,6 +1177,8 @@ mod test {
peer,
};

let (stress_tx, _) = watch::channel(true);

Connection {
state: State::Connecting(Some(connected)),
cid,
Expand All @@ -1146,6 +1193,7 @@ mod test {
pending_writes: VecDeque::new(),
writable: Notify::new(),
latest_timeout: None,
stress_tx,
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::Instant;
use crate::cid::ConnectionId;
use crate::packet::Packet;

#[derive(Clone, Debug)]
pub enum StreamEvent {
Incoming(Packet),
Incoming(Packet, Instant),
Shutdown,
}

Expand Down
Loading