Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

debug/fix: internal rpc cancel safety #2407

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/netsim.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ jobs:
cd ../chuck/netsim
sudo kill -9 $(pgrep ovs) || true
sudo mn --clean || true
export RUST_LOG=debug,iroh_net=warn,iroh::node=trace
c='${{ steps.detect_comment_config.outputs.NETSIM_CONFIG }}'
if [ -z "${c}" ];
then
Expand Down
3 changes: 3 additions & 0 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use iroh::{
net::{key::PublicKey, relay::RelayUrl, NodeAddr},
};
use tokio::io::AsyncWriteExt;
use tracing::debug;

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand, Debug, Clone)]
Expand Down Expand Up @@ -277,8 +278,10 @@ impl BlobCommands {
Some(OutputTarget::Stdout) => {
// we asserted above that `OutputTarget::Stdout` is only permitted if getting a
// single hash and not a hashseq.
debug!("cli: start blob read to stdout");
let mut blob_read = iroh.blobs().read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
debug!("cli: blob finished");
}
Some(OutputTarget::Path(path)) => {
let absolute = std::env::current_dir()?.join(&path);
Expand Down
5 changes: 4 additions & 1 deletion iroh-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroh::{
net::relay::{RelayMap, RelayMode},
node::RpcStatus,
};
use tracing::{info_span, Instrument};
use tracing::{debug, info_span, Instrument};

/// Whether to stop the node after running a command or run forever until stopped.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -106,7 +106,10 @@ where
}
// abort if the command task finishes (will run forever if not in single-command mode)
res = &mut command_task => {
debug!("cli: command complete: {res:?}");
debug!("cli: node shutdown start");
let _ = node.shutdown().await;
debug!("cli: node shutdown complete");
res??;
}
}
Expand Down
5 changes: 4 additions & 1 deletion iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
use tracing::{debug, warn};

use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
Expand Down Expand Up @@ -790,17 +790,20 @@ impl Reader {
offset: u64,
len: Option<usize>,
) -> anyhow::Result<Self> {
debug!("sending server_streaming BlobReadAtRequest");
let stream = rpc
.server_streaming(BlobReadAtRequest { hash, offset, len })
.await?;
let mut stream = flatten(stream);

debug!("client: start blob_read_at");
let (size, is_complete) = match stream.next().await {
Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
Some(Err(err)) => return Err(err),
Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
};
debug!("client: header received");

let stream = stream.map(|item| match item {
Ok(BlobReadAtResponse::Data { chunk }) => Ok(chunk),
Expand Down
67 changes: 54 additions & 13 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use quic_rpc::{RpcServer, ServiceEndpoint};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

use crate::{
client::RpcService,
Expand Down Expand Up @@ -164,6 +164,7 @@ impl<D: BaoStore> Node<D> {

// Wait for the main task to terminate.
self.task.await.map_err(|err| anyhow!(err))?;
debug!("node shutdown complete");

Ok(())
}
Expand Down Expand Up @@ -263,15 +264,55 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
Ok(())
});

// Spawn a task for the internal RPC.
// TODO: Find out if this is really needed? If flume is cancel-safe, it is *not* needed.
let internal_rpc_task = tokio::task::spawn({
let cancel_token = self.cancel_token.clone();
let inner = self.clone();
let mut join_set = JoinSet::new();
async move {
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
trace!("tick: cancel");
break;
},
// handle internal rpc requests.
request = internal_rpc.accept() => {
trace!("tick: internal_rpc");
// We cannot poll this in the tokio::select! because on completion a
// internal_rpc.accept() future could be dropped. But we need to clear
// finished tasks from the join_set to not amass them.
if let Some(Err(err)) = join_set.try_join_next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this have to be a loop? We pass the join_set into spawn_rpc_request so we don't really know how many tasks there are per request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just done so we clean them up eventually, current impl is also guranteed to be 1-1 but we can make that more explicit

warn!("internal RPC task paniced: {err:?}");
}
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(inner.clone(), &mut join_set, msg, chan);
}
Err(e) => {
info!("internal rpc request error: {:?}", e);
}
}
},
}
}
}
});

loop {
trace!("wait for tick");
tokio::select! {
biased;
_ = self.cancel_token.cancelled() => {
trace!("tick: cancel");
break;
},
// handle rpc requests. This will do nothing if rpc is not configured, since
// accept is just a pending future.
request = external_rpc.accept() => {
trace!("tick: external_rpc");
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
Expand All @@ -281,19 +322,9 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
}
}
},
// handle internal rpc requests.
request = internal_rpc.accept() => {
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
}
Err(e) => {
info!("internal rpc request error: {:?}", e);
}
}
},
// handle incoming p2p connections.
Some(connecting) = self.endpoint.accept() => {
trace!("tick: endpoint.accept");
let protocols = protocols.clone();
join_set.spawn(async move {
handle_connection(connecting, protocols).await;
Expand All @@ -302,15 +333,23 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
},
// handle task terminations and quit on panics.
res = join_set.join_next(), if !join_set.is_empty() => {
trace!("tick: join_set.join_next");
if let Some(Err(err)) = res {
error!("Task failed: {err:?}");
break;
}
},
else => break,
else => {
trace!("tick: else, break");
break;
}
}
}

if let Err(err) = internal_rpc_task.await {
warn!("internal rpc task panicked: {err:?}");
}

self.shutdown(protocols).await;

// Abort remaining tasks.
Expand All @@ -319,6 +358,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {

/// Shutdown the different parts of the node concurrently.
async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
debug!("node shutdown services: start");
let error_code = Closed::ProviderTerminating;

// Shutdown future for the docs engine, if enabled.
Expand Down Expand Up @@ -349,6 +389,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
// Shutdown protocol handlers.
protocols.shutdown(),
);
debug!("node shutdown services: done");
}

async fn run_gc_loop(
Expand Down
18 changes: 14 additions & 4 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use quic_rpc::{
};
use tokio::task::JoinSet;
use tokio_util::{either::Either, task::LocalPoolHandle};
use tracing::{debug, info, warn};
use tracing::{debug, error_span, info, warn, Instrument};

use crate::client::{
blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
Expand Down Expand Up @@ -909,10 +909,16 @@ impl<D: BaoStore> Handler<D> {
) -> impl Stream<Item = RpcResult<BlobReadAtResponse>> + Send + 'static {
let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP);
let db = self.inner.db.clone();
self.inner.rt.spawn_pinned(move || async move {
if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await {
tx.send_async(RpcResult::Err(err.into())).await.ok();
self.inner.rt.spawn_pinned(move || {
async move {
if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await {
debug!("blob_read_at failed: {err:?}");
tx.send_async(RpcResult::Err(err.into())).await.ok();
} else {
debug!("blob_read_at complete");
}
}
.instrument(error_span!("blob_read_at"))
});

async fn read_loop<D: iroh_blobs::store::Store>(
Expand All @@ -921,14 +927,17 @@ impl<D: BaoStore> Handler<D> {
tx: flume::Sender<RpcResult<BlobReadAtResponse>>,
max_chunk_size: usize,
) -> anyhow::Result<()> {
debug!("loop start");
let entry = db.get(&req.hash).await?;
debug!(is_some = entry.is_some(), "loop got entry");
let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?;
let size = entry.size();
tx.send_async(Ok(BlobReadAtResponse::Entry {
size,
is_complete: entry.is_complete(),
}))
.await?;
debug!("header sent");
let mut reader = entry.data_reader().await?;

let len = req.len.unwrap_or((size.value() - req.offset) as usize);
Expand Down Expand Up @@ -960,6 +969,7 @@ impl<D: BaoStore> Handler<D> {
read += chunk_len as u64;
}
}
debug!("loop complete");
Ok(())
}

Expand Down
Loading