Skip to content

Commit

Permalink
Merge pull request #88 from carver/improve-log-on-idle-timeout
Browse files Browse the repository at this point in the history
Close without error, when only FIN-ACK is missing on timeout, and related logs
  • Loading branch information
carver authored Aug 9, 2023
2 parents 25f2acc + b6d9fef commit 1262566
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 7 deletions.
87 changes: 85 additions & 2 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ enum State<const N: usize> {
},
}

impl<const N: usize> fmt::Debug for State<N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Connecting(_) => write!(f, "State::Connecting"),
Self::Established { .. } => write!(f, "State::Established"),
Self::Closing {
local_fin,
remote_fin,
..
} => f
.debug_struct("State::Closing")
.field("local_fin", local_fin)
.field("remote_fin", remote_fin)
.finish(),
Self::Closed { err } => f.debug_struct("State::Closed").field("err", err).finish(),
}
}
}

pub type Write = (Vec<u8>, oneshot::Sender<io::Result<usize>>);
pub type Read = (usize, oneshot::Sender<io::Result<Vec<u8>>>);

Expand Down Expand Up @@ -298,10 +317,27 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
}
() = &mut idle_timeout => {
if !std::matches!(self.state, State::Closed { .. }) {
// Warn that we are quitting the connection, due to a lack of activity.

// If both endpoints have exchanged FINs, and our FIN is the only
// unacknowledged packet, then do not emit a warning. It's not a big deal
// that their STATE for our FIN got dropped: we handled their FIN anyway.

let unacked: Vec<u16> = self.unacked.keys().copied().collect();
tracing::warn!(?unacked, "idle timeout expired, closing...");
let finished = match self.state {
State::Closing { local_fin, remote_fin, .. } => unacked.len() == 1 && local_fin.is_some() && &local_fin.unwrap() == unacked.last().unwrap() && remote_fin.is_some(),
_ => false,
};

self.state = State::Closed { err: Some(Error::TimedOut) };
let err = if finished {
tracing::debug!(?self.state, ?unacked, "idle timeout expired, but only missing ACK for local FIN");
None
} else {
tracing::warn!(?self.state, ?unacked, "closing, idle for too long...");
Some(Error::TimedOut)
};

self.state = State::Closed { err };
}
}
_ = &mut shutdown, if !shutting_down => {
Expand Down Expand Up @@ -1399,4 +1435,51 @@ mod test {
conn.on_reset();
assert!(std::matches!(conn.state, State::Closed { err: None }));
}

#[test]
fn state_debug_format() {
// Test that the state enum can be formatted with debug formatting.

// Test Connecting
let state: State<BUF> = State::Connecting(None);
let expected_format = "State::Connecting";
let actual_format = format!("{:?}", state);
assert_eq!(actual_format, expected_format);

// Test Established
let state: State<BUF> = State::Established {
sent_packets: SentPackets::new(
0,
congestion::Controller::new(congestion::Config::default()),
),
send_buf: SendBuffer::new(),
recv_buf: ReceiveBuffer::new(0),
};
let expected_format = "State::Established";
let actual_format = format!("{:?}", state);
assert_eq!(actual_format, expected_format);

// Test Closing
let state: State<BUF> = State::Closing {
local_fin: Some(12),
remote_fin: Some(34),
sent_packets: SentPackets::new(
0,
congestion::Controller::new(congestion::Config::default()),
),
send_buf: SendBuffer::new(),
recv_buf: ReceiveBuffer::new(0),
};
let expected_format = "State::Closing { local_fin: Some(12), remote_fin: Some(34) }";
let actual_format = format!("{:?}", state);
assert_eq!(actual_format, expected_format);

// Test Closed
let state: State<BUF> = State::Closed {
err: Some(Error::Reset),
};
let expected_format = "State::Closed { err: Some(Reset) }";
let actual_format = format!("{:?}", state);
assert_eq!(actual_format, expected_format);
}
}
108 changes: 103 additions & 5 deletions tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use utp_rs::socket::UtpSocket;

use utp_rs::testutils;

// How long should tests expect the connection to wait before timing out due to inactivity?
const EXPECTED_IDLE_TIMEOUT: Duration = Duration::from_secs(10);

// Test that close() returns successful, after transfer is complete
#[tokio::test]
async fn close_is_successful_when_write_completes() {
Expand Down Expand Up @@ -143,18 +146,113 @@ async fn close_errors_if_all_packets_dropped() {
let mut send_stream = send_stream_handle.await.unwrap();

// Close stream, which will fail because network is disabled.
match timeout(Duration::from_secs(20), send_stream.close()).await {
match timeout(EXPECTED_IDLE_TIMEOUT * 2, send_stream.close()).await {
Ok(Ok(_)) => panic!("Stream closed successfully, but should have timed out"),
Ok(Err(e)) => {
// The stream must time out when waiting to close, if the network is disabled.
assert_eq!(e.kind(), ErrorKind::TimedOut);
}
Err(e) => panic!(
"The stream did not timeout on close() fast enough, giving up after: {:?}",
e
),
Err(e) => {
panic!("The stream did not timeout on close() fast enough, giving up after: {e:?}")
}
};

// Wait to confirm that the read will time out, also.
recv_stream_handle.await.unwrap();
}

// Test that close() succeeds, if the connection is only missing the FIN-ACK
#[tokio::test(start_paused = true)]
async fn close_succeeds_if_only_fin_ack_dropped() {
let conn_config = ConnectionConfig::default();

let ((send_link, send_cid), (mut recv_link, recv_cid)) = testutils::build_connected_pair();
let rx_link_up = recv_link.up_status();

let recv = UtpSocket::with_socket(recv_link);
let recv = Arc::new(recv);

let send = UtpSocket::with_socket(send_link);
let send = Arc::new(send);

let recv_one = Arc::clone(&recv);
let recv_one_handle = tokio::spawn(async move {
recv_one
.accept_with_cid(recv_cid, conn_config)
.await
.unwrap()
});

// Keep a clone of the socket so that it doesn't drop when moved into the task.
// Dropping it causes all connections to exit.
let send_one = Arc::clone(&send);
let send_one_handle = tokio::spawn(async move {
send_one
.connect_with_cid(send_cid, conn_config)
.await
.unwrap()
});

let (tx_one, rx_one) = tokio::join!(send_one_handle, recv_one_handle,);
let mut send_stream = tx_one.unwrap();
let mut recv_stream = rx_one.unwrap();

// data to send
const DATA_LEN: usize = 100;
let data = [0xa5; DATA_LEN];

// send data
let send_stream_handle = tokio::spawn(async move {
match send_stream.write(&data).await {
Ok(written_len) => assert_eq!(written_len, DATA_LEN),
Err(e) => panic!("Error sending data: {:?}", e),
};
send_stream
});

// recv data
let recv_stream_handle = tokio::spawn(async move {
let mut read_buf = vec![];
let _ = recv_stream.read_to_eof(&mut read_buf).await.unwrap();
assert_eq!(read_buf, data.to_vec());
recv_stream
});

// Wait for send to start
let mut send_stream = send_stream_handle.await.unwrap();

// Wait for the full data to be sent before dropping the link
// This is a timeless sleep, because tokio time is paused
tokio::time::sleep(EXPECTED_IDLE_TIMEOUT / 2).await;

// ******* DISABLE NETWORK LINK ********
// This only drops the connection from the recipient to the sender, leading to the following
// scenario:
// - Sender sends FIN
// - Recipient receives FIN, sends FIN-ACK and its own FIN
// - Sender receives nothing, because link is down
// - Recipient is only missing its inbound FIN-ACK and closes with success
// - Sender is missing the recipient's FIN and times out with failure
rx_link_up.store(false, Ordering::SeqCst);

match timeout(EXPECTED_IDLE_TIMEOUT * 2, send_stream.close()).await {
Ok(Ok(_)) => panic!("Send stream closed successfully, but should have timed out"),
Ok(Err(e)) => {
// The stream must time out when waiting to close, because recipient's FIN is missing
assert_eq!(e.kind(), ErrorKind::TimedOut);
}
Err(e) => {
panic!("The send stream did not timeout on close() fast enough, giving up after: {e:?}")
}
};

let mut recv_stream = recv_stream_handle.await.unwrap();

match timeout(EXPECTED_IDLE_TIMEOUT * 2, recv_stream.close()).await {
Ok(Ok(_)) => {} // The receive stream should close successfully: only FIN-ACK is missing
Ok(Err(e)) => panic!("Error closing receive stream: {:?}", e),
Err(e) => {
panic!("The recv stream did not timeout on close() fast enough, giving up after: {e:?}")
}
};
}

0 comments on commit 1262566

Please sign in to comment.