Skip to content

Commit

Permalink
chore: better debug logs for read_at failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 24, 2024
1 parent 38e8ce0 commit 2b32ae5
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 6 deletions.
3 changes: 3 additions & 0 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use iroh::{
net::{key::PublicKey, relay::RelayUrl, NodeAddr},
};
use tokio::io::AsyncWriteExt;
use tracing::debug;

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand, Debug, Clone)]
Expand Down Expand Up @@ -277,8 +278,10 @@ impl BlobCommands {
Some(OutputTarget::Stdout) => {
// we asserted above that `OutputTarget::Stdout` is only permitted if getting a
// single hash and not a hashseq.
debug!("cli: start blob read to stdout");
let mut blob_read = iroh.blobs().read(hash).await?;
tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?;
debug!("cli: blob finished");
}
Some(OutputTarget::Path(path)) => {
let absolute = std::env::current_dir()?.join(&path);
Expand Down
5 changes: 4 additions & 1 deletion iroh-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroh::{
net::relay::{RelayMap, RelayMode},
node::RpcStatus,
};
use tracing::{info_span, Instrument};
use tracing::{debug, info_span, Instrument};

/// Whether to stop the node after running a command or run forever until stopped.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -106,7 +106,10 @@ where
}
// abort if the command task finishes (will run forever if not in single-command mode)
res = &mut command_task => {
debug!("cli: command complete: {res:?}");
debug!("cli: node shutdown start");
let _ = node.shutdown().await;
debug!("cli: node shutdown complete");
res??;
}
}
Expand Down
4 changes: 3 additions & 1 deletion iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
use tracing::{debug, warn};

use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
Expand Down Expand Up @@ -795,12 +795,14 @@ impl Reader {
.await?;
let mut stream = flatten(stream);

debug!("client: start blob_read_at");
let (size, is_complete) = match stream.next().await {
Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
Some(Err(err)) => return Err(err),
Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
};
debug!("client: header received");

let stream = stream.map(|item| match item {
Ok(BlobReadAtResponse::Data { chunk }) => Ok(chunk),
Expand Down
3 changes: 3 additions & 0 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl<D: BaoStore> Node<D> {

// Wait for the main task to terminate.
self.task.await.map_err(|err| anyhow!(err))?;
debug!("node shutdown complete");

Ok(())
}
Expand Down Expand Up @@ -319,6 +320,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {

/// Shutdown the different parts of the node concurrently.
async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
debug!("node shutdown services: start");
let error_code = Closed::ProviderTerminating;

// Shutdown future for the docs engine, if enabled.
Expand Down Expand Up @@ -349,6 +351,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
// Shutdown protocol handlers.
protocols.shutdown(),
);
debug!("node shutdown services: done");
}

async fn run_gc_loop(
Expand Down
18 changes: 14 additions & 4 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use quic_rpc::{
};
use tokio::task::JoinSet;
use tokio_util::{either::Either, task::LocalPoolHandle};
use tracing::{debug, info, warn};
use tracing::{debug, error_span, info, warn, Instrument};

use crate::client::{
blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
Expand Down Expand Up @@ -909,10 +909,16 @@ impl<D: BaoStore> Handler<D> {
) -> impl Stream<Item = RpcResult<BlobReadAtResponse>> + Send + 'static {
let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP);
let db = self.inner.db.clone();
self.inner.rt.spawn_pinned(move || async move {
if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await {
tx.send_async(RpcResult::Err(err.into())).await.ok();
self.inner.rt.spawn_pinned(move || {
async move {
if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await {
debug!("blob_read_at failed: {err:?}");
tx.send_async(RpcResult::Err(err.into())).await.ok();
} else {
debug!("blob_read_at complete");
}
}
.instrument(error_span!("blob_read_at"))
});

async fn read_loop<D: iroh_blobs::store::Store>(
Expand All @@ -921,14 +927,17 @@ impl<D: BaoStore> Handler<D> {
tx: flume::Sender<RpcResult<BlobReadAtResponse>>,
max_chunk_size: usize,
) -> anyhow::Result<()> {
debug!("loop start");
let entry = db.get(&req.hash).await?;
debug!(is_some = entry.is_some(), "loop got entry");
let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?;
let size = entry.size();
tx.send_async(Ok(BlobReadAtResponse::Entry {
size,
is_complete: entry.is_complete(),
}))
.await?;
debug!("header sent");
let mut reader = entry.data_reader().await?;

let len = req.len.unwrap_or((size.value() - req.offset) as usize);
Expand Down Expand Up @@ -960,6 +969,7 @@ impl<D: BaoStore> Handler<D> {
read += chunk_len as u64;
}
}
debug!("loop complete");
Ok(())
}

Expand Down

0 comments on commit 2b32ae5

Please sign in to comment.