From 4aedc1a442549a9423520236026bb4e89863e7bb Mon Sep 17 00:00:00 2001 From: Diva M Date: Tue, 23 Apr 2024 16:11:51 -0500 Subject: [PATCH 1/4] improve magicsock's shutdown story --- iroh-net/src/magic_endpoint.rs | 44 ++++++++++++++++++++++++++++------ iroh-net/src/magicsock.rs | 21 ++++++++-------- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 6f71cffe01..6a90670428 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -4,7 +4,7 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use anyhow::{anyhow, bail, ensure, Context, Result}; use derive_more::Debug; -use futures::StreamExt; +use futures::{future, StreamExt}; use quinn_proto::VarInt; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use tracing::{debug, trace}; @@ -560,10 +560,40 @@ impl MagicEndpoint { /// /// Returns an error if closing the magic socket failed. /// TODO: Document error cases. - pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> { - self.cancel_token.cancel(); - self.endpoint.close(error_code, reason); - self.msock.close().await?; + pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> { + let MagicEndpoint { + msock, + endpoint, + cancel_token, + .. + } = self; + cancel_token.cancel(); + endpoint.close(error_code, reason); + + { + // waiting for connections to close can be slow: this timeout decide whethers we should + // inform the user of a slow operation + let logging_timeout = tokio::time::sleep(Duration::from_millis(500)); + futures::pin_mut!(logging_timeout); + + let wait_idle = endpoint.wait_idle(); + futures::pin_mut!(wait_idle); + + // race the timeout and closing. Nothing else needs to be done if closing finishes + // first. If the timeouts is done first, give the user some logs + if let future::Either::Right(((), wait_idle_fut)) = + future::select(wait_idle, logging_timeout).await + { + tracing::info!("Closing connections"); + wait_idle_fut.await; + tracing::info!("Connections closed"); + } + } + // In case this is the last clone of `MagicEndpoint`, dropping the `quinn::Endpoint` will + // make it more likely that the underlying socket is not polled by quinn anymore after this + drop(endpoint); + + msock.close().await?; Ok(()) } @@ -582,8 +612,8 @@ impl MagicEndpoint { } #[cfg(test)] - pub(crate) fn magic_sock(&self) -> &MagicSock { - &self.msock + pub(crate) fn magic_sock(&self) -> MagicSock { + self.msock.clone() } #[cfg(test)] pub(crate) fn endpoint(&self) -> &quinn::Endpoint { diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index bc9de2f1b3..eda5e1b8a1 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -287,6 +287,11 @@ impl Inner { let bytes_total: usize = transmits.iter().map(|t| t.contents.len()).sum(); inc_by!(MagicsockMetrics, send_data, bytes_total as _); + let mut n = 0; + if transmits.is_empty() { + return Poll::Ready(Ok(n)); + } + if self.is_closed() { inc_by!(MagicsockMetrics, send_data_network_down, bytes_total as _); return Poll::Ready(Err(io::Error::new( @@ -295,10 +300,6 @@ impl Inner { ))); } - let mut n = 0; - if transmits.is_empty() { - return Poll::Ready(Ok(n)); - } trace!( "sending:\n{}", transmits.iter().fold( @@ -484,6 +485,7 @@ impl Inner { Ok(sock) } + // NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] undefinitely. #[instrument(skip_all, fields(me = %self.me))] fn poll_recv( &self, @@ -494,10 +496,7 @@ impl Inner { // FIXME: currently ipv4 load results in ipv6 traffic being ignored debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas"); if self.is_closed() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::NotConnected, - "connection closed", - ))); + return Poll::Pending; } // order of polling is: UDPv4, UDPv6, relay @@ -2982,11 +2981,13 @@ pub(crate) mod tests { let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], url.clone()).await?; println!("closing endpoints"); + let msock1 = m1.endpoint.magic_sock(); + let msock2 = m2.endpoint.magic_sock(); m1.endpoint.close(0u32.into(), b"done").await?; m2.endpoint.close(0u32.into(), b"done").await?; - assert!(m1.endpoint.magic_sock().inner.is_closed()); - assert!(m2.endpoint.magic_sock().inner.is_closed()); + assert!(msock1.inner.is_closed()); + assert!(msock2.inner.is_closed()); } Ok(()) } From c1238e3393390823894b66438e00e43065bedbad Mon Sep 17 00:00:00 2001 From: Diva M Date: Wed, 24 Apr 2024 11:52:20 -0500 Subject: [PATCH 2/4] adjust log statements --- iroh-net/src/magic_endpoint.rs | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 68a40d819e..558b0d49fe 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -4,7 +4,7 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use anyhow::{anyhow, bail, ensure, Context, Result}; use derive_more::Debug; -use futures::{future, StreamExt}; +use futures::StreamExt; use quinn_proto::VarInt; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use tracing::{debug, trace}; @@ -568,30 +568,13 @@ impl MagicEndpoint { .. } = self; cancel_token.cancel(); + tracing::debug!("Closing connections"); endpoint.close(error_code, reason); - - { - // waiting for connections to close can be slow: this timeout decide whethers we should - // inform the user of a slow operation - let logging_timeout = tokio::time::sleep(Duration::from_millis(500)); - futures::pin_mut!(logging_timeout); - - let wait_idle = endpoint.wait_idle(); - futures::pin_mut!(wait_idle); - - // race the timeout and closing. Nothing else needs to be done if closing finishes - // first. If the timeouts is done first, give the user some logs - if let future::Either::Right(((), wait_idle_fut)) = - future::select(wait_idle, logging_timeout).await - { - tracing::info!("Closing connections"); - wait_idle_fut.await; - tracing::info!("Connections closed"); - } - } + endpoint.wait_idle().await; // In case this is the last clone of `MagicEndpoint`, dropping the `quinn::Endpoint` will // make it more likely that the underlying socket is not polled by quinn anymore after this drop(endpoint); + tracing::debug!("Connections closed"); msock.close().await?; Ok(()) From c8fa0de34da01a888ce595802e08db419a5ee476 Mon Sep 17 00:00:00 2001 From: Diva M Date: Wed, 24 Apr 2024 11:58:32 -0500 Subject: [PATCH 3/4] better docs --- iroh-net/src/magicsock.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index c1c0841a43..0b7a575446 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -485,7 +485,7 @@ impl Inner { Ok(sock) } - // NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] undefinitely. + /// NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] indefinitely. #[instrument(skip_all, fields(me = %self.me))] fn poll_recv( &self, @@ -1410,6 +1410,8 @@ impl MagicSock { /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. + /// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`] + /// indefinitely after this call. #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn close(&self) -> Result<()> { if self.inner.is_closed() { @@ -1595,6 +1597,7 @@ impl AsyncUdpSocket for MagicSock { self.inner.poll_send(cx, transmits) } + /// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely. fn poll_recv( &self, cx: &mut Context, From 914661ebee7ad9bc98410c8330fbae2808ac80b2 Mon Sep 17 00:00:00 2001 From: Diva M Date: Wed, 24 Apr 2024 12:02:31 -0500 Subject: [PATCH 4/4] some extra logs for weird scenarios --- iroh-net/src/magicsock.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 0b7a575446..1b50ec98d0 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -289,6 +289,7 @@ impl Inner { let mut n = 0; if transmits.is_empty() { + tracing::trace!(is_closed=?self.is_closed(), "poll_send without any quinn_udp::Transmit"); return Poll::Ready(Ok(n)); }