From e92240a540330d882f911893dcb57a3de125698e Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Fri, 4 Aug 2023 11:10:25 -0700 Subject: [PATCH] Close without error if just missing FIN-ACK Close errors will be used more, when #98 lands, which returns the close error to clients who wait for the stream to exit. --- src/conn.rs | 9 +++-- tests/stream.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 82c30d7..5d276ae 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -328,13 +328,16 @@ impl Connection { State::Closing { local_fin, remote_fin, .. } => unacked.len() == 1 && local_fin.is_some() && &local_fin.unwrap() == unacked.last().unwrap() && remote_fin.is_some(), _ => false, }; - if finished { + + let err = if finished { tracing::debug!(?self.state, ?unacked, "idle timeout expired, but only missing ACK for local FIN"); + None } else { tracing::warn!(?self.state, ?unacked, "closing, idle for too long..."); - } + Some(Error::TimedOut) + }; - self.state = State::Closed { err: Some(Error::TimedOut) }; + self.state = State::Closed { err }; } } _ = &mut shutdown, if !shutting_down => { diff --git a/tests/stream.rs b/tests/stream.rs index 10c9102..d6aeb4e 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -149,12 +149,107 @@ async fn close_errors_if_all_packets_dropped() { // The stream must time out when waiting to close, if the network is disabled. assert_eq!(e.kind(), ErrorKind::TimedOut); } - Err(e) => panic!( - "The stream did not timeout on close() fast enough, giving up after: {:?}", - e - ), + Err(e) => { + panic!("The stream did not timeout on close() fast enough, giving up after: {e:?}") + } }; // Wait to confirm that the read will time out, also. recv_stream_handle.await.unwrap(); } + +// Test that close() succeeds, if the connection is only missing the FIN-ACK +#[tokio::test(start_paused = true)] +async fn close_succeeds_if_only_fin_ack_dropped() { + let conn_config = ConnectionConfig::default(); + + let ((send_link, send_cid), (mut recv_link, recv_cid)) = testutils::build_connected_pair(); + let rx_link_up = recv_link.up_status(); + + let recv = UtpSocket::with_socket(recv_link); + let recv = Arc::new(recv); + + let send = UtpSocket::with_socket(send_link); + let send = Arc::new(send); + + let recv_one = Arc::clone(&recv); + let recv_one_handle = tokio::spawn(async move { + recv_one + .accept_with_cid(recv_cid, conn_config) + .await + .unwrap() + }); + + // Keep a clone of the socket so that it doesn't drop when moved into the task. + // Dropping it causes all connections to exit. + let send_one = Arc::clone(&send); + let send_one_handle = tokio::spawn(async move { + send_one + .connect_with_cid(send_cid, conn_config) + .await + .unwrap() + }); + + let (tx_one, rx_one) = tokio::join!(send_one_handle, recv_one_handle,); + let mut send_stream = tx_one.unwrap(); + let mut recv_stream = rx_one.unwrap(); + + // data to send + const DATA_LEN: usize = 100; + let data = [0xa5; DATA_LEN]; + + // send data + let send_stream_handle = tokio::spawn(async move { + match send_stream.write(&data).await { + Ok(written_len) => assert_eq!(written_len, DATA_LEN), + Err(e) => panic!("Error sending data: {:?}", e), + }; + send_stream + }); + + // recv data + let recv_stream_handle = tokio::spawn(async move { + let mut read_buf = vec![]; + let _ = recv_stream.read_to_eof(&mut read_buf).await.unwrap(); + assert_eq!(read_buf, data.to_vec()); + recv_stream + }); + + // Wait for send to start + let mut send_stream = send_stream_handle.await.unwrap(); + + // Wait for the full data to be sent before dropping the link + // This is a timeless sleep, because tokio time is paused + tokio::time::sleep(Duration::from_secs(5)).await; + + // ******* DISABLE NETWORK LINK ******** + // This only drops the connection from the recipient to the sender, leading to the following + // scenario: + // - Sender sends FIN + // - Recipient receives FIN, sends FIN-ACK and its own FIN + // - Sender receives nothing, because link is down + // - Recipient is only missing its inbound FIN-ACK and closes with success + // - Sender is missing the recipient's FIN and times out with failure + rx_link_up.store(false, Ordering::SeqCst); + + match timeout(Duration::from_secs(20), send_stream.close()).await { + Ok(Ok(_)) => panic!("Send stream closed successfully, but should have timed out"), + Ok(Err(e)) => { + // The stream must time out when waiting to close, because recipient's FIN is missing + assert_eq!(e.kind(), ErrorKind::TimedOut); + } + Err(e) => { + panic!("The send stream did not timeout on close() fast enough, giving up after: {e:?}") + } + }; + + let mut recv_stream = recv_stream_handle.await.unwrap(); + + match timeout(Duration::from_secs(20), recv_stream.close()).await { + Ok(Ok(_)) => {} // The receive stream should close successfully: only FIN-ACK is missing + Ok(Err(e)) => panic!("Error closing receive stream: {:?}", e), + Err(e) => { + panic!("The recv stream did not timeout on close() fast enough, giving up after: {e:?}") + } + }; +}