From e3bd31c06775ea46c192e1be3e32b035d88548e3 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 5 Dec 2023 17:12:14 +0000 Subject: [PATCH] v1.17: exits send_requests_task if the connection is closed (backport of #33837) (#34324) exits send_requests_task if the connection is closed (#33837) receiver.recv() can unnecessarily block when the connection is already closed. The commit exits send_requests_task if the connection is closed. (cherry picked from commit 20966266339044d320299efca6f81dfcc95cd5e0) Co-authored-by: behzad nouri --- core/src/repair/quic_endpoint.rs | 43 ++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index c6f2e00df53a26..89f9de78491101 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -408,11 +408,16 @@ async fn handle_connection( )); match futures::future::try_join(send_requests_task, recv_requests_task).await { Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"), - Ok(((), Err(err))) => { - debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); - record_error(&err, &stats); + Ok(out) => { + if let (Err(ref err), _) = out { + debug!("send_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); + } + if let (_, Err(ref err)) = out { + debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); + } } - Ok(((), Ok(()))) => (), } drop_connection(remote_pubkey, &connection, &cache).await; if let Entry::Occupied(entry) = router.write().await.entry(remote_address) { @@ -513,15 +518,27 @@ async fn send_requests_task( connection: Connection, mut receiver: AsyncReceiver, stats: Arc, -) { - while let Some(request) = receiver.recv().await { - tokio::task::spawn(send_request_task( - endpoint.clone(), - remote_address, - connection.clone(), - request, - stats.clone(), - )); +) -> Result<(), Error> { + tokio::pin! { + let connection_closed = connection.closed(); + } + loop { + tokio::select! { + biased; + request = receiver.recv() => { + match request { + None => return Ok(()), + Some(request) => tokio::task::spawn(send_request_task( + endpoint.clone(), + remote_address, + connection.clone(), + request, + stats.clone(), + )), + }; + } + err = &mut connection_closed => return Err(Error::from(err)), + } } }