From 6362cfb67a1b692ce035cdb7b8b66febbcabbb12 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 24 Jun 2024 14:44:31 +0200 Subject: [PATCH] fix(iroh): keep rpc tasks seperate from protocol tasks --- iroh/src/node.rs | 26 ++++++++++++++++++++++++-- iroh/src/node/rpc.rs | 16 +++++----------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 91e6febc61..13b038ae51 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -263,6 +263,8 @@ impl NodeInner { Ok(()) }); + let mut rpc_join_set = JoinSet::default(); + loop { tokio::select! { biased; @@ -274,7 +276,10 @@ impl NodeInner { request = external_rpc.accept() => { match request { Ok((msg, chan)) => { - rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan); + let this = self.clone(); + rpc_join_set.spawn(async move { + rpc::Handler::spawn_rpc_request(this, msg, chan).await + }); } Err(e) => { info!("rpc request error: {:?}", e); @@ -285,7 +290,10 @@ impl NodeInner { request = internal_rpc.accept() => { match request { Ok((msg, chan)) => { - rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan); + let this = self.clone(); + rpc_join_set.spawn(async move { + rpc::Handler::spawn_rpc_request(this, msg, chan).await + }); } Err(e) => { info!("internal rpc request error: {:?}", e); @@ -307,10 +315,24 @@ impl NodeInner { break; } }, + res = rpc_join_set.join_next(), if !rpc_join_set.is_empty() => match res { + None => {}, + Some(Ok(Ok(()))) => {}, + Some(Ok(Err(err))) => { + warn!("RPC request failed: {err:?}"); + } + Some(Err(err)) => { + error!("RPC task failed: {err:?}"); + break; + } + }, else => break, } } + // Abort rpc tasks. + rpc_join_set.shutdown().await; + self.shutdown(protocols).await; // Abort remaining tasks. diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 2c9b49fe38..b8d6159df5 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -29,9 +29,8 @@ use quic_rpc::{ server::{RpcChannel, RpcServerError}, ServiceEndpoint, }; -use tokio::task::JoinSet; use tokio_util::{either::Either, task::LocalPoolHandle}; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; use crate::client::{ blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, @@ -106,19 +105,14 @@ impl Handler { } } - pub(crate) fn spawn_rpc_request>( + pub(crate) async fn spawn_rpc_request>( inner: Arc>, - join_set: &mut JoinSet>, msg: Request, chan: RpcChannel, - ) { + ) -> anyhow::Result<()> { let handler = Self::new(inner); - join_set.spawn(async move { - if let Err(err) = handler.handle_rpc_request(msg, chan).await { - warn!("rpc request handler error: {err:?}"); - } - Ok(()) - }); + handler.handle_rpc_request(msg, chan).await?; + Ok(()) } pub(crate) async fn handle_rpc_request>(