Skip to content

Commit

Permalink
v1.17: exits send_requests_task if the connection is closed (backport…
Browse files Browse the repository at this point in the history
… of solana-labs#33837) (solana-labs#34324)

exits send_requests_task if the connection is closed (solana-labs#33837)

receiver.recv() can unnecessarily block when the connection is already closed.
The commit exits send_requests_task if the connection is closed.

(cherry picked from commit 2096626)

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored Dec 5, 2023
1 parent da2dadd commit e3bd31c
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions core/src/repair/quic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,16 @@ async fn handle_connection(
));
match futures::future::try_join(send_requests_task, recv_requests_task).await {
Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"),
Ok(((), Err(err))) => {
debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}");
record_error(&err, &stats);
Ok(out) => {
if let (Err(ref err), _) = out {
debug!("send_requests_task: {remote_pubkey}, {remote_address}, {err:?}");
record_error(err, &stats);
}
if let (_, Err(ref err)) = out {
debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}");
record_error(err, &stats);
}
}
Ok(((), Ok(()))) => (),
}
drop_connection(remote_pubkey, &connection, &cache).await;
if let Entry::Occupied(entry) = router.write().await.entry(remote_address) {
Expand Down Expand Up @@ -513,15 +518,27 @@ async fn send_requests_task(
connection: Connection,
mut receiver: AsyncReceiver<LocalRequest>,
stats: Arc<RepairQuicStats>,
) {
while let Some(request) = receiver.recv().await {
tokio::task::spawn(send_request_task(
endpoint.clone(),
remote_address,
connection.clone(),
request,
stats.clone(),
));
) -> Result<(), Error> {
tokio::pin! {
let connection_closed = connection.closed();
}
loop {
tokio::select! {
biased;
request = receiver.recv() => {
match request {
None => return Ok(()),
Some(request) => tokio::task::spawn(send_request_task(
endpoint.clone(),
remote_address,
connection.clone(),
request,
stats.clone(),
)),
};
}
err = &mut connection_closed => return Err(Error::from(err)),
}
}
}

Expand Down

0 comments on commit e3bd31c

Please sign in to comment.