Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: better debug logs for read_at failure #2403

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/netsim.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ jobs:
cd ../chuck/netsim
sudo kill -9 $(pgrep ovs) || true
sudo mn --clean || true
export RUST_LOG=debug,iroh_net=warn,iroh::node=trace
c='${{ steps.detect_comment_config.outputs.NETSIM_CONFIG }}'
if [ -z "${c}" ];
then
Expand Down
3 changes: 3 additions & 0 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use iroh::{
net::{key::PublicKey, relay::RelayUrl, NodeAddr},
};
use tokio::io::AsyncWriteExt;
use tracing::debug;

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand, Debug, Clone)]
Expand Down Expand Up @@ -277,8 +278,10 @@ impl BlobCommands {
Some(OutputTarget::Stdout) => {
// we asserted above that `OutputTarget::Stdout` is only permitted if getting a
// single hash and not a hashseq.
debug!("cli: start blob read to stdout");
let mut blob_read = iroh.blobs().read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
debug!("cli: blob finished");
}
Some(OutputTarget::Path(path)) => {
let absolute = std::env::current_dir()?.join(&path);
Expand Down
5 changes: 4 additions & 1 deletion iroh-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroh::{
net::relay::{RelayMap, RelayMode},
node::RpcStatus,
};
use tracing::{info_span, Instrument};
use tracing::{debug, info_span, Instrument};

/// Whether to stop the node after running a command or run forever until stopped.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -106,7 +106,10 @@ where
}
// abort if the command task finishes (will run forever if not in single-command mode)
res = &mut command_task => {
debug!("cli: command complete: {res:?}");
debug!("cli: node shutdown start");
let _ = node.shutdown().await;
debug!("cli: node shutdown complete");
res??;
}
}
Expand Down
5 changes: 4 additions & 1 deletion iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
use tracing::{debug, warn};

use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
Expand Down Expand Up @@ -790,17 +790,20 @@ impl Reader {
offset: u64,
len: Option<usize>,
) -> anyhow::Result<Self> {
debug!("sending server_streaming BlobReadAtRequest");
let stream = rpc
.server_streaming(BlobReadAtRequest { hash, offset, len })
.await?;
let mut stream = flatten(stream);

debug!("client: start blob_read_at");
let (size, is_complete) = match stream.next().await {
Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
Some(Err(err)) => return Err(err),
Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
};
debug!("client: header received");

let stream = stream.map(|item| match item {
Ok(BlobReadAtResponse::Data { chunk }) => Ok(chunk),
Expand Down
16 changes: 14 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use quic_rpc::{RpcServer, ServiceEndpoint};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

use crate::{
client::RpcService,
Expand Down Expand Up @@ -164,6 +164,7 @@ impl<D: BaoStore> Node<D> {

// Wait for the main task to terminate.
self.task.await.map_err(|err| anyhow!(err))?;
debug!("node shutdown complete");

Ok(())
}
Expand Down Expand Up @@ -264,14 +265,17 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
});

loop {
trace!("wait for tick");
tokio::select! {
biased;
_ = self.cancel_token.cancelled() => {
trace!("tick: cancel");
break;
},
// handle rpc requests. This will do nothing if rpc is not configured, since
// accept is just a pending future.
request = external_rpc.accept() => {
trace!("tick: external_rpc");
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
Expand All @@ -283,6 +287,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
},
// handle internal rpc requests.
request = internal_rpc.accept() => {
trace!("tick: internal_rpc");
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
Expand All @@ -294,6 +299,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
},
// handle incoming p2p connections.
Some(connecting) = self.endpoint.accept() => {
trace!("tick: endpoint.accept");
let protocols = protocols.clone();
join_set.spawn(async move {
handle_connection(connecting, protocols).await;
Expand All @@ -302,12 +308,16 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
},
// handle task terminations and quit on panics.
res = join_set.join_next(), if !join_set.is_empty() => {
trace!("tick: join_set.join_next");
if let Some(Err(err)) = res {
error!("Task failed: {err:?}");
break;
}
},
else => break,
else => {
trace!("tick: else, break");
break;
}
}
}

Expand All @@ -319,6 +329,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {

/// Shutdown the different parts of the node concurrently.
async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
debug!("node shutdown services: start");
let error_code = Closed::ProviderTerminating;

// Shutdown future for the docs engine, if enabled.
Expand Down Expand Up @@ -349,6 +360,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
// Shutdown protocol handlers.
protocols.shutdown(),
);
debug!("node shutdown services: done");
}

async fn run_gc_loop(
Expand Down
18 changes: 14 additions & 4 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use quic_rpc::{
};
use tokio::task::JoinSet;
use tokio_util::{either::Either, task::LocalPoolHandle};
use tracing::{debug, info, warn};
use tracing::{debug, error_span, info, warn, Instrument};

use crate::client::{
blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
Expand Down Expand Up @@ -909,10 +909,16 @@ impl<D: BaoStore> Handler<D> {
) -> impl Stream<Item = RpcResult<BlobReadAtResponse>> + Send + 'static {
let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP);
let db = self.inner.db.clone();
self.inner.rt.spawn_pinned(move || async move {
if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await {
tx.send_async(RpcResult::Err(err.into())).await.ok();
self.inner.rt.spawn_pinned(move || {
async move {
if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await {
debug!("blob_read_at failed: {err:?}");
tx.send_async(RpcResult::Err(err.into())).await.ok();
} else {
debug!("blob_read_at complete");
}
}
.instrument(error_span!("blob_read_at"))
});

async fn read_loop<D: iroh_blobs::store::Store>(
Expand All @@ -921,14 +927,17 @@ impl<D: BaoStore> Handler<D> {
tx: flume::Sender<RpcResult<BlobReadAtResponse>>,
max_chunk_size: usize,
) -> anyhow::Result<()> {
debug!("loop start");
let entry = db.get(&req.hash).await?;
debug!(is_some = entry.is_some(), "loop got entry");
let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?;
let size = entry.size();
tx.send_async(Ok(BlobReadAtResponse::Entry {
size,
is_complete: entry.is_complete(),
}))
.await?;
debug!("header sent");
let mut reader = entry.data_reader().await?;

let len = req.len.unwrap_or((size.value() - req.offset) as usize);
Expand Down Expand Up @@ -960,6 +969,7 @@ impl<D: BaoStore> Handler<D> {
read += chunk_len as u64;
}
}
debug!("loop complete");
Ok(())
}

Expand Down
Loading