Skip to content

Commit

Permalink
remove JoinSet usage from rpc and incoming connection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 25, 2024
1 parent 2caf1d5 commit 07ba4e9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 15 deletions.
21 changes: 10 additions & 11 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use quic_rpc::{RpcServer, ServiceEndpoint};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

use crate::{
client::RpcService,
Expand Down Expand Up @@ -274,7 +274,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
request = external_rpc.accept() => {
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
rpc::Handler::spawn_rpc_request(self.clone(), msg, chan);
}
Err(e) => {
info!("rpc request error: {:?}", e);
Expand All @@ -285,7 +285,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
request = internal_rpc.accept() => {
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
rpc::Handler::spawn_rpc_request(self.clone(), msg, chan);
}
Err(e) => {
info!("internal rpc request error: {:?}", e);
Expand All @@ -295,18 +295,17 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
// handle incoming p2p connections.
Some(connecting) = self.endpoint.accept() => {
let protocols = protocols.clone();
join_set.spawn(async move {
tokio::spawn(async move {
handle_connection(connecting, protocols).await;
Ok(())
});
},
// handle task terminations and quit on panics.
// res = join_set.join_next(), if !join_set.is_empty() => {
// if let Some(Err(err)) = res {
// error!("Task failed: {err:?}");
// break;
// }
// },
res = join_set.join_next(), if !join_set.is_empty() => {
if let Some(Err(err)) = res {
error!("Task failed: {err:?}");
break;
}
},
else => break,
}
}
Expand Down
5 changes: 1 addition & 4 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use quic_rpc::{
server::{RpcChannel, RpcServerError},
ServiceEndpoint,
};
use tokio::task::JoinSet;
use tokio_util::{either::Either, task::LocalPoolHandle};
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -108,16 +107,14 @@ impl<D: BaoStore> Handler<D> {

pub(crate) fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
inner: Arc<NodeInner<D>>,
join_set: &mut JoinSet<anyhow::Result<()>>,
msg: Request,
chan: RpcChannel<RpcService, E>,
) {
let handler = Self::new(inner);
join_set.spawn(async move {
tokio::spawn(async move {
if let Err(err) = handler.handle_rpc_request(msg, chan).await {
warn!("rpc request handler error: {err:?}");
}
Ok(())
});
}

Expand Down

0 comments on commit 07ba4e9

Please sign in to comment.