diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 6e4c0aeba2..63b9578f5b 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -35,6 +35,7 @@ use iroh::{ net::{key::PublicKey, relay::RelayUrl, NodeAddr}, }; use tokio::io::AsyncWriteExt; +use tracing::debug; #[allow(clippy::large_enum_variant)] #[derive(Subcommand, Debug, Clone)] @@ -277,8 +278,10 @@ impl BlobCommands { Some(OutputTarget::Stdout) => { // we asserted above that `OutputTarget::Stdout` is only permitted if getting a // single hash and not a hashseq. + debug!("cli: start blob read to stdout"); let mut blob_read = iroh.blobs().read(hash).await?; tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?; + debug!("cli: blob finished"); } Some(OutputTarget::Path(path)) => { let absolute = std::env::current_dir()?.join(&path); diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index 694e484424..e0e3e5f4b5 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -9,7 +9,7 @@ use iroh::{ net::relay::{RelayMap, RelayMode}, node::RpcStatus, }; -use tracing::{info_span, Instrument}; +use tracing::{debug, info_span, Instrument}; /// Whether to stop the node after running a command or run forever until stopped. #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -106,7 +106,10 @@ where } // abort if the command task finishes (will run forever if not in single-command mode) res = &mut command_task => { + debug!("cli: command complete: {res:?}"); + debug!("cli: node shutdown start"); let _ = node.shutdown().await; + debug!("cli: node shutdown complete"); res??; } } diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index e199e0f8d1..038e4203af 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -29,7 +29,7 @@ use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::warn; +use tracing::{debug, warn}; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, @@ -795,12 +795,14 @@ impl Reader { .await?; let mut stream = flatten(stream); + debug!("client: start blob_read_at"); let (size, is_complete) = match stream.next().await { Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete), Some(Err(err)) => return Err(err), Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")), None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")), }; + debug!("client: header received"); let stream = stream.map(|item| match item { Ok(BlobReadAtResponse::Data { chunk }) => Ok(chunk), diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 91e6febc61..2563d1ab79 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -164,6 +164,7 @@ impl Node { // Wait for the main task to terminate. self.task.await.map_err(|err| anyhow!(err))?; + debug!("node shutdown complete"); Ok(()) } @@ -319,6 +320,7 @@ impl NodeInner { /// Shutdown the different parts of the node concurrently. async fn shutdown(&self, protocols: Arc) { + debug!("node shutdown services: start"); let error_code = Closed::ProviderTerminating; // Shutdown future for the docs engine, if enabled. @@ -349,6 +351,7 @@ impl NodeInner { // Shutdown protocol handlers. protocols.shutdown(), ); + debug!("node shutdown services: done"); } async fn run_gc_loop( diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 2c9b49fe38..36e4ff15bd 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -31,7 +31,7 @@ use quic_rpc::{ }; use tokio::task::JoinSet; use tokio_util::{either::Either, task::LocalPoolHandle}; -use tracing::{debug, info, warn}; +use tracing::{debug, error_span, info, warn, Instrument}; use crate::client::{ blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, @@ -909,10 +909,16 @@ impl Handler { ) -> impl Stream> + Send + 'static { let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); let db = self.inner.db.clone(); - self.inner.rt.spawn_pinned(move || async move { - if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { - tx.send_async(RpcResult::Err(err.into())).await.ok(); + self.inner.rt.spawn_pinned(move || { + async move { + if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { + debug!("blob_read_at failed: {err:?}"); + tx.send_async(RpcResult::Err(err.into())).await.ok(); + } else { + debug!("blob_read_at complete"); + } } + .instrument(error_span!("blob_read_at")) }); async fn read_loop( @@ -921,7 +927,9 @@ impl Handler { tx: flume::Sender>, max_chunk_size: usize, ) -> anyhow::Result<()> { + debug!("loop start"); let entry = db.get(&req.hash).await?; + debug!(is_some = entry.is_some(), "loop got entry"); let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; let size = entry.size(); tx.send_async(Ok(BlobReadAtResponse::Entry { @@ -929,6 +937,7 @@ impl Handler { is_complete: entry.is_complete(), })) .await?; + debug!("header sent"); let mut reader = entry.data_reader().await?; let len = req.len.unwrap_or((size.value() - req.offset) as usize); @@ -960,6 +969,7 @@ impl Handler { read += chunk_len as u64; } } + debug!("loop complete"); Ok(()) }