Skip to content

Commit

Permalink
fix(iroh): keep rpc tasks seperate from protocol tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Jun 24, 2024
1 parent 38e8ce0 commit 6362cfb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
26 changes: 24 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
Ok(())
});

let mut rpc_join_set = JoinSet::default();

loop {
tokio::select! {
biased;
Expand All @@ -274,7 +276,10 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
request = external_rpc.accept() => {
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
let this = self.clone();
rpc_join_set.spawn(async move {
rpc::Handler::spawn_rpc_request(this, msg, chan).await
});
}
Err(e) => {
info!("rpc request error: {:?}", e);
Expand All @@ -285,7 +290,10 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
request = internal_rpc.accept() => {
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
let this = self.clone();
rpc_join_set.spawn(async move {
rpc::Handler::spawn_rpc_request(this, msg, chan).await
});
}
Err(e) => {
info!("internal rpc request error: {:?}", e);
Expand All @@ -307,10 +315,24 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
break;
}
},
res = rpc_join_set.join_next(), if !rpc_join_set.is_empty() => match res {
None => {},
Some(Ok(Ok(()))) => {},
Some(Ok(Err(err))) => {
warn!("RPC request failed: {err:?}");
}
Some(Err(err)) => {
error!("RPC task failed: {err:?}");
break;
}
},
else => break,
}
}

// Abort rpc tasks.
rpc_join_set.shutdown().await;

self.shutdown(protocols).await;

// Abort remaining tasks.
Expand Down
16 changes: 5 additions & 11 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ use quic_rpc::{
server::{RpcChannel, RpcServerError},
ServiceEndpoint,
};
use tokio::task::JoinSet;
use tokio_util::{either::Either, task::LocalPoolHandle};
use tracing::{debug, info, warn};
use tracing::{debug, info};

use crate::client::{
blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
Expand Down Expand Up @@ -106,19 +105,14 @@ impl<D: BaoStore> Handler<D> {
}
}

pub(crate) fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
pub(crate) async fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
inner: Arc<NodeInner<D>>,
join_set: &mut JoinSet<anyhow::Result<()>>,
msg: Request,
chan: RpcChannel<RpcService, E>,
) {
) -> anyhow::Result<()> {
let handler = Self::new(inner);
join_set.spawn(async move {
if let Err(err) = handler.handle_rpc_request(msg, chan).await {
warn!("rpc request handler error: {err:?}");
}
Ok(())
});
handler.handle_rpc_request(msg, chan).await?;
Ok(())
}

pub(crate) async fn handle_rpc_request<E: ServiceEndpoint<RpcService>>(
Expand Down

0 comments on commit 6362cfb

Please sign in to comment.