diff --git a/.github/workflows/netsim.yml b/.github/workflows/netsim.yml index 3e81092a1d..159db21cd5 100644 --- a/.github/workflows/netsim.yml +++ b/.github/workflows/netsim.yml @@ -120,6 +120,7 @@ jobs: cd ../chuck/netsim sudo kill -9 $(pgrep ovs) || true sudo mn --clean || true + export RUST_LOG=debug,iroh_net=warn,iroh::node=trace c='${{ steps.detect_comment_config.outputs.NETSIM_CONFIG }}' if [ -z "${c}" ]; then diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 6e4c0aeba2..63b9578f5b 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -35,6 +35,7 @@ use iroh::{ net::{key::PublicKey, relay::RelayUrl, NodeAddr}, }; use tokio::io::AsyncWriteExt; +use tracing::debug; #[allow(clippy::large_enum_variant)] #[derive(Subcommand, Debug, Clone)] @@ -277,8 +278,10 @@ impl BlobCommands { Some(OutputTarget::Stdout) => { // we asserted above that `OutputTarget::Stdout` is only permitted if getting a // single hash and not a hashseq. + debug!("cli: start blob read to stdout"); let mut blob_read = iroh.blobs().read(hash).await?; tokio::io::copy(&mut blob_read, &mut tokio::io::stdout()).await?; + debug!("cli: blob finished"); } Some(OutputTarget::Path(path)) => { let absolute = std::env::current_dir()?.join(&path); diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index 694e484424..e0e3e5f4b5 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -9,7 +9,7 @@ use iroh::{ net::relay::{RelayMap, RelayMode}, node::RpcStatus, }; -use tracing::{info_span, Instrument}; +use tracing::{debug, info_span, Instrument}; /// Whether to stop the node after running a command or run forever until stopped. #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -106,7 +106,10 @@ where } // abort if the command task finishes (will run forever if not in single-command mode) res = &mut command_task => { + debug!("cli: command complete: {res:?}"); + debug!("cli: node shutdown start"); let _ = node.shutdown().await; + debug!("cli: node shutdown complete"); res??; } } diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index e199e0f8d1..92d491453e 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -29,7 +29,7 @@ use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::warn; +use tracing::{debug, warn}; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, @@ -790,17 +790,20 @@ impl Reader { offset: u64, len: Option, ) -> anyhow::Result { + debug!("sending server_streaming BlobReadAtRequest"); let stream = rpc .server_streaming(BlobReadAtRequest { hash, offset, len }) .await?; let mut stream = flatten(stream); + debug!("client: start blob_read_at"); let (size, is_complete) = match stream.next().await { Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete), Some(Err(err)) => return Err(err), Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")), None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")), }; + debug!("client: header received"); let stream = stream.map(|item| match item { Ok(BlobReadAtResponse::Data { chunk }) => Ok(chunk), diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 91e6febc61..c0e80705db 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -21,7 +21,7 @@ use quic_rpc::{RpcServer, ServiceEndpoint}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::{ client::RpcService, @@ -164,6 +164,7 @@ impl Node { // Wait for the main task to terminate. self.task.await.map_err(|err| anyhow!(err))?; + debug!("node shutdown complete"); Ok(()) } @@ -264,14 +265,17 @@ impl NodeInner { }); loop { + trace!("wait for tick"); tokio::select! { biased; _ = self.cancel_token.cancelled() => { + trace!("tick: cancel"); break; }, // handle rpc requests. This will do nothing if rpc is not configured, since // accept is just a pending future. request = external_rpc.accept() => { + trace!("tick: external_rpc"); match request { Ok((msg, chan)) => { rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan); @@ -283,6 +287,7 @@ impl NodeInner { }, // handle internal rpc requests. request = internal_rpc.accept() => { + trace!("tick: internal_rpc"); match request { Ok((msg, chan)) => { rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan); @@ -294,6 +299,7 @@ impl NodeInner { }, // handle incoming p2p connections. Some(connecting) = self.endpoint.accept() => { + trace!("tick: endpoint.accept"); let protocols = protocols.clone(); join_set.spawn(async move { handle_connection(connecting, protocols).await; @@ -302,12 +308,16 @@ impl NodeInner { }, // handle task terminations and quit on panics. res = join_set.join_next(), if !join_set.is_empty() => { + trace!("tick: join_set.join_next"); if let Some(Err(err)) = res { error!("Task failed: {err:?}"); break; } }, - else => break, + else => { + trace!("tick: else, break"); + break; + } } } @@ -319,6 +329,7 @@ impl NodeInner { /// Shutdown the different parts of the node concurrently. async fn shutdown(&self, protocols: Arc) { + debug!("node shutdown services: start"); let error_code = Closed::ProviderTerminating; // Shutdown future for the docs engine, if enabled. @@ -349,6 +360,7 @@ impl NodeInner { // Shutdown protocol handlers. protocols.shutdown(), ); + debug!("node shutdown services: done"); } async fn run_gc_loop( diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 2c9b49fe38..36e4ff15bd 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -31,7 +31,7 @@ use quic_rpc::{ }; use tokio::task::JoinSet; use tokio_util::{either::Either, task::LocalPoolHandle}; -use tracing::{debug, info, warn}; +use tracing::{debug, error_span, info, warn, Instrument}; use crate::client::{ blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, @@ -909,10 +909,16 @@ impl Handler { ) -> impl Stream> + Send + 'static { let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); let db = self.inner.db.clone(); - self.inner.rt.spawn_pinned(move || async move { - if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { - tx.send_async(RpcResult::Err(err.into())).await.ok(); + self.inner.rt.spawn_pinned(move || { + async move { + if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { + debug!("blob_read_at failed: {err:?}"); + tx.send_async(RpcResult::Err(err.into())).await.ok(); + } else { + debug!("blob_read_at complete"); + } } + .instrument(error_span!("blob_read_at")) }); async fn read_loop( @@ -921,7 +927,9 @@ impl Handler { tx: flume::Sender>, max_chunk_size: usize, ) -> anyhow::Result<()> { + debug!("loop start"); let entry = db.get(&req.hash).await?; + debug!(is_some = entry.is_some(), "loop got entry"); let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; let size = entry.size(); tx.send_async(Ok(BlobReadAtResponse::Entry { @@ -929,6 +937,7 @@ impl Handler { is_complete: entry.is_complete(), })) .await?; + debug!("header sent"); let mut reader = entry.data_reader().await?; let len = req.len.unwrap_or((size.value() - req.offset) as usize); @@ -960,6 +969,7 @@ impl Handler { read += chunk_len as u64; } } + debug!("loop complete"); Ok(()) }