diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 6109b7b594..558b0d49fe 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -560,10 +560,23 @@ impl MagicEndpoint { /// /// Returns an error if closing the magic socket failed. /// TODO: Document error cases. - pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> { - self.cancel_token.cancel(); - self.endpoint.close(error_code, reason); - self.msock.close().await?; + pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> { + let MagicEndpoint { + msock, + endpoint, + cancel_token, + .. + } = self; + cancel_token.cancel(); + tracing::debug!("Closing connections"); + endpoint.close(error_code, reason); + endpoint.wait_idle().await; + // In case this is the last clone of `MagicEndpoint`, dropping the `quinn::Endpoint` will + // make it more likely that the underlying socket is not polled by quinn anymore after this + drop(endpoint); + tracing::debug!("Connections closed"); + + msock.close().await?; Ok(()) } @@ -582,8 +595,8 @@ impl MagicEndpoint { } #[cfg(test)] - pub(crate) fn magic_sock(&self) -> &MagicSock { - &self.msock + pub(crate) fn magic_sock(&self) -> MagicSock { + self.msock.clone() } #[cfg(test)] pub(crate) fn endpoint(&self) -> &quinn::Endpoint { diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index be1f48e351..1b50ec98d0 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -287,6 +287,12 @@ impl Inner { let bytes_total: usize = transmits.iter().map(|t| t.contents.len()).sum(); inc_by!(MagicsockMetrics, send_data, bytes_total as _); + let mut n = 0; + if transmits.is_empty() { + tracing::trace!(is_closed=?self.is_closed(), "poll_send without any quinn_udp::Transmit"); + return Poll::Ready(Ok(n)); + } + if self.is_closed() { inc_by!(MagicsockMetrics, send_data_network_down, bytes_total as _); return Poll::Ready(Err(io::Error::new( @@ -295,10 +301,6 @@ impl Inner { ))); } - let mut n = 0; - if transmits.is_empty() { - return Poll::Ready(Ok(n)); - } trace!( "sending:\n{}", transmits.iter().fold( @@ -484,6 +486,7 @@ impl Inner { Ok(sock) } + /// NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] indefinitely. #[instrument(skip_all, fields(me = %self.me))] fn poll_recv( &self, @@ -494,10 +497,7 @@ impl Inner { // FIXME: currently ipv4 load results in ipv6 traffic being ignored debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas"); if self.is_closed() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::NotConnected, - "connection closed", - ))); + return Poll::Pending; } // order of polling is: UDPv4, UDPv6, relay @@ -1411,6 +1411,8 @@ impl MagicSock { /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. + /// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`] + /// indefinitely after this call. #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn close(&self) -> Result<()> { if self.inner.is_closed() { @@ -1596,6 +1598,7 @@ impl AsyncUdpSocket for MagicSock { self.inner.poll_send(cx, transmits) } + /// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely. fn poll_recv( &self, cx: &mut Context, @@ -2982,11 +2985,13 @@ pub(crate) mod tests { let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], url.clone()).await?; println!("closing endpoints"); + let msock1 = m1.endpoint.magic_sock(); + let msock2 = m2.endpoint.magic_sock(); m1.endpoint.close(0u32.into(), b"done").await?; m2.endpoint.close(0u32.into(), b"done").await?; - assert!(m1.endpoint.magic_sock().inner.is_closed()); - assert!(m2.endpoint.magic_sock().inner.is_closed()); + assert!(msock1.inner.is_closed()); + assert!(msock2.inner.is_closed()); } Ok(()) }