Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh-net)!: improve magicsock's shutdown story #2227

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,23 @@ impl MagicEndpoint {
///
/// Returns an error if closing the magic socket failed.
/// TODO: Document error cases.
pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> {
self.cancel_token.cancel();
self.endpoint.close(error_code, reason);
self.msock.close().await?;
pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> {
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
let MagicEndpoint {
msock,
endpoint,
cancel_token,
..
} = self;
cancel_token.cancel();
tracing::debug!("Closing connections");
endpoint.close(error_code, reason);
endpoint.wait_idle().await;
// In case this is the last clone of `MagicEndpoint`, dropping the `quinn::Endpoint` will
// make it more likely that the underlying socket is not polled by quinn anymore after this
drop(endpoint);
tracing::debug!("Connections closed");

msock.close().await?;
Ok(())
}

Expand All @@ -582,8 +595,8 @@ impl MagicEndpoint {
}

#[cfg(test)]
pub(crate) fn magic_sock(&self) -> &MagicSock {
&self.msock
pub(crate) fn magic_sock(&self) -> MagicSock {
self.msock.clone()
}
#[cfg(test)]
pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
Expand Down
25 changes: 15 additions & 10 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ impl Inner {
let bytes_total: usize = transmits.iter().map(|t| t.contents.len()).sum();
inc_by!(MagicsockMetrics, send_data, bytes_total as _);

let mut n = 0;
if transmits.is_empty() {
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
tracing::trace!(is_closed=?self.is_closed(), "poll_send without any quinn_udp::Transmit");
return Poll::Ready(Ok(n));
}

if self.is_closed() {
inc_by!(MagicsockMetrics, send_data_network_down, bytes_total as _);
return Poll::Ready(Err(io::Error::new(
Expand All @@ -295,10 +301,6 @@ impl Inner {
)));
}

let mut n = 0;
if transmits.is_empty() {
return Poll::Ready(Ok(n));
}
trace!(
"sending:\n{}",
transmits.iter().fold(
Expand Down Expand Up @@ -484,6 +486,7 @@ impl Inner {
Ok(sock)
}

/// NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] indefinitely.
#[instrument(skip_all, fields(me = %self.me))]
fn poll_recv(
&self,
Expand All @@ -494,10 +497,7 @@ impl Inner {
// FIXME: currently ipv4 load results in ipv6 traffic being ignored
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
)));
return Poll::Pending;
}

// order of polling is: UDPv4, UDPv6, relay
Expand Down Expand Up @@ -1411,6 +1411,8 @@ impl MagicSock {
/// Closes the connection.
///
/// Only the first close does anything. Any later closes return nil.
/// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`]
/// indefinitely after this call.
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn close(&self) -> Result<()> {
if self.inner.is_closed() {
Expand Down Expand Up @@ -1596,6 +1598,7 @@ impl AsyncUdpSocket for MagicSock {
self.inner.poll_send(cx, transmits)
}

/// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely.
fn poll_recv(
&self,
cx: &mut Context,
Expand Down Expand Up @@ -2982,11 +2985,13 @@ pub(crate) mod tests {
let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], url.clone()).await?;

println!("closing endpoints");
let msock1 = m1.endpoint.magic_sock();
let msock2 = m2.endpoint.magic_sock();
m1.endpoint.close(0u32.into(), b"done").await?;
m2.endpoint.close(0u32.into(), b"done").await?;

assert!(m1.endpoint.magic_sock().inner.is_closed());
assert!(m2.endpoint.magic_sock().inner.is_closed());
assert!(msock1.inner.is_closed());
assert!(msock2.inner.is_closed());
}
Ok(())
}
Expand Down
Loading