Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh-net)!: improve magicsock's shutdown story #2227

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use anyhow::{anyhow, bail, ensure, Context, Result};
use derive_more::Debug;
use futures::StreamExt;
use futures::{future, StreamExt};
use quinn_proto::VarInt;
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
use tracing::{debug, trace};
Expand Down Expand Up @@ -560,10 +560,40 @@ impl MagicEndpoint {
///
/// Returns an error if closing the magic socket failed.
/// TODO: Document error cases.
pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> {
self.cancel_token.cancel();
self.endpoint.close(error_code, reason);
self.msock.close().await?;
pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> {
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
let MagicEndpoint {
msock,
endpoint,
cancel_token,
..
} = self;
cancel_token.cancel();
endpoint.close(error_code, reason);

{
// waiting for connections to close can be slow: this timeout decide whethers we should
// inform the user of a slow operation
let logging_timeout = tokio::time::sleep(Duration::from_millis(500));
futures::pin_mut!(logging_timeout);
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved

let wait_idle = endpoint.wait_idle();
futures::pin_mut!(wait_idle);

// race the timeout and closing. Nothing else needs to be done if closing finishes
// first. If the timeouts is done first, give the user some logs
if let future::Either::Right(((), wait_idle_fut)) =
future::select(wait_idle, logging_timeout).await
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
{
tracing::info!("Closing connections");
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
wait_idle_fut.await;
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!("Connections closed");
}
}
// In case this is the last clone of `MagicEndpoint`, dropping the `quinn::Endpoint` will
// make it more likely that the underlying socket is not polled by quinn anymore after this
drop(endpoint);

msock.close().await?;
Ok(())
}

Expand All @@ -582,8 +612,8 @@ impl MagicEndpoint {
}

#[cfg(test)]
pub(crate) fn magic_sock(&self) -> &MagicSock {
&self.msock
pub(crate) fn magic_sock(&self) -> MagicSock {
self.msock.clone()
}
#[cfg(test)]
pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
Expand Down
21 changes: 11 additions & 10 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ impl Inner {
let bytes_total: usize = transmits.iter().map(|t| t.contents.len()).sum();
inc_by!(MagicsockMetrics, send_data, bytes_total as _);

let mut n = 0;
if transmits.is_empty() {
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
return Poll::Ready(Ok(n));
}

if self.is_closed() {
inc_by!(MagicsockMetrics, send_data_network_down, bytes_total as _);
return Poll::Ready(Err(io::Error::new(
Expand All @@ -295,10 +300,6 @@ impl Inner {
)));
}

let mut n = 0;
if transmits.is_empty() {
return Poll::Ready(Ok(n));
}
trace!(
"sending:\n{}",
transmits.iter().fold(
Expand Down Expand Up @@ -484,6 +485,7 @@ impl Inner {
Ok(sock)
}

// NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] undefinitely.
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
#[instrument(skip_all, fields(me = %self.me))]
fn poll_recv(
&self,
Expand All @@ -494,10 +496,7 @@ impl Inner {
// FIXME: currently ipv4 load results in ipv6 traffic being ignored
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
)));
return Poll::Pending;
}

// order of polling is: UDPv4, UDPv6, relay
Expand Down Expand Up @@ -2982,11 +2981,13 @@ pub(crate) mod tests {
let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], url.clone()).await?;

println!("closing endpoints");
let msock1 = m1.endpoint.magic_sock();
let msock2 = m2.endpoint.magic_sock();
m1.endpoint.close(0u32.into(), b"done").await?;
m2.endpoint.close(0u32.into(), b"done").await?;

assert!(m1.endpoint.magic_sock().inner.is_closed());
assert!(m2.endpoint.magic_sock().inner.is_closed());
assert!(msock1.inner.is_closed());
assert!(msock2.inner.is_closed());
}
Ok(())
}
Expand Down
Loading