From 9a050a954bcd3f3baedfa148b33e6df356a0c0f0 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 13 May 2024 13:55:57 +0200 Subject: [PATCH] feat(doctor): Report connection type changes in rolling fashion (#2251) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description The doctor already shows the current connection type, this changes to show changes in a rolling fashion. It also unambiguously logs each connection type change now. This now prints something like: `Connection with gzjlsppiplwpp7pn changed: direct (after 1.548µs)` on each connection change. ## Breaking Changes ## Notes & open questions Despite using the conn_type_stream this stream events are still lost, especially very early on in the connection lifetime. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-cli/src/commands/doctor.rs | 43 ++++++++++++++----- iroh-net/src/magicsock/node_map/node_state.rs | 24 ++++------- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 492806fcb3..0bb72d8664 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -32,11 +32,11 @@ use iroh::{ }, dns::default_resolver, key::{PublicKey, SecretKey}, - magic_endpoint::{self, Connection, RecvStream, SendStream}, + magic_endpoint::{self, Connection, ConnectionTypeStream, RecvStream, SendStream}, netcheck, portmapper, relay::{RelayMap, RelayMode, RelayUrl}, ticket::NodeTicket, - util::AbortingJoinHandle, + util::CancelOnDrop, MagicEndpoint, NodeAddr, NodeId, }, util::{path::IrohPaths, progress::ProgressWriter}, @@ -347,7 +347,7 @@ struct Gui { recv_pb: ProgressBar, echo_pb: ProgressBar, #[allow(dead_code)] - counter_task: Option>, + counter_task: Option, } impl Gui { @@ -373,13 +373,13 @@ impl Gui { .template("{spinner:.green} [{bar:80.cyan/blue}] {msg} {bytes}/{total_bytes} ({bytes_per_sec})").unwrap() .progress_chars("█▉▊▋▌▍▎▏ ")); let counters2 = counters.clone(); - let counter_task = AbortingJoinHandle::from(tokio::spawn(async move { + let counter_task = tokio::spawn(async move { loop { Self::update_counters(&counters2); Self::update_connection_info(&conn_info, &endpoint, &node_id); tokio::time::sleep(Duration::from_millis(100)).await; } - })); + }); Self { mp, pb, @@ -387,7 +387,10 @@ impl Gui { send_pb, recv_pb, echo_pb, - counter_task: Some(counter_task), + counter_task: Some(CancelOnDrop::new( + "counter_task", + counter_task.abort_handle(), + )), } } @@ -587,9 +590,7 @@ async fn recv_test( } /// Passive side that just accepts connections and answers requests (echo, drain or send) -async fn passive_side(endpoint: MagicEndpoint, connection: Connection) -> anyhow::Result<()> { - let remote_peer_id = magic_endpoint::get_remote_node_id(&connection)?; - let gui = Gui::new(endpoint, remote_peer_id); +async fn passive_side(gui: Gui, connection: Connection) -> anyhow::Result<()> { loop { match connection.accept_bi().await { Ok((send, recv)) => { @@ -667,7 +668,13 @@ async fn connect( let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await; match conn { Ok(connection) => { - if let Err(cause) = passive_side(endpoint.clone(), connection).await { + let maybe_stream = endpoint.conn_type_stream(&node_id); + let gui = Gui::new(endpoint, node_id); + if let Ok(stream) = maybe_stream { + log_connection_changes(gui.mp.clone(), node_id, stream); + } + + if let Err(cause) = passive_side(gui, connection).await { eprintln!("error handling connection: {cause}"); } } @@ -740,6 +747,9 @@ async fn accept( println!("Accepted connection from {}", remote_peer_id); let t0 = Instant::now(); let gui = Gui::new(endpoint.clone(), remote_peer_id); + if let Ok(stream) = endpoint.conn_type_stream(&remote_peer_id) { + log_connection_changes(gui.mp.clone(), remote_peer_id, stream); + } let res = active_side(connection, &config, Some(&gui)).await; gui.clear(); let dt = t0.elapsed().as_secs_f64(); @@ -764,6 +774,19 @@ async fn accept( Ok(()) } +fn log_connection_changes(pb: MultiProgress, node_id: NodeId, mut stream: ConnectionTypeStream) { + tokio::spawn(async move { + let start = Instant::now(); + while let Some(conn_type) = stream.next().await { + pb.println(format!( + "Connection with {node_id:#} changed: {conn_type} (after {:?})", + start.elapsed() + )) + .ok(); + } + }); +} + async fn port_map(protocol: &str, local_port: NonZeroU16, timeout: Duration) -> anyhow::Result<()> { // create the config that enables exclusively the required protocol let mut enable_upnp = false; diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index dfdf1ebe07..336d325bfa 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -296,21 +296,15 @@ impl NodeState { (addr, self.relay_url()) } }; - match (best_addr, relay_url.clone()) { - (Some(best_addr), Some(relay_url)) => { - let _ = self - .conn_type - .update(ConnectionType::Mixed(best_addr, relay_url)); - } - (Some(best_addr), None) => { - let _ = self.conn_type.update(ConnectionType::Direct(best_addr)); - } - (None, Some(relay_url)) => { - let _ = self.conn_type.update(ConnectionType::Relay(relay_url)); - } - (None, None) => { - let _ = self.conn_type.update(ConnectionType::None); - } + let typ = match (best_addr, relay_url.clone()) { + (Some(best_addr), Some(relay_url)) => ConnectionType::Mixed(best_addr, relay_url), + (Some(best_addr), None) => ConnectionType::Direct(best_addr), + (None, Some(relay_url)) => ConnectionType::Relay(relay_url), + (None, None) => ConnectionType::None, + }; + if self.conn_type.update(typ).is_ok() { + let typ = self.conn_type.get(); + info!(%typ, "new connection type"); } (best_addr, relay_url) }