Skip to content

Commit

Permalink
feat(doctor): Report connection type changes in rolling fashion (#2251)
Browse files Browse the repository at this point in the history
## Description

The doctor already shows the current connection type, this changes to
show changes in a rolling fashion.

It also unambiguously logs each connection type change now.

This now prints something like:
`Connection with gzjlsppiplwpp7pn changed: direct (after 1.548µs)`

on each connection change.

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

Despite using the conn_type_stream this stream events are still lost,
especially very early on in the connection lifetime.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
flub authored May 13, 2024
1 parent b62e904 commit 9a050a9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
43 changes: 33 additions & 10 deletions iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use iroh::{
},
dns::default_resolver,
key::{PublicKey, SecretKey},
magic_endpoint::{self, Connection, RecvStream, SendStream},
magic_endpoint::{self, Connection, ConnectionTypeStream, RecvStream, SendStream},
netcheck, portmapper,
relay::{RelayMap, RelayMode, RelayUrl},
ticket::NodeTicket,
util::AbortingJoinHandle,
util::CancelOnDrop,
MagicEndpoint, NodeAddr, NodeId,
},
util::{path::IrohPaths, progress::ProgressWriter},
Expand Down Expand Up @@ -347,7 +347,7 @@ struct Gui {
recv_pb: ProgressBar,
echo_pb: ProgressBar,
#[allow(dead_code)]
counter_task: Option<AbortingJoinHandle<()>>,
counter_task: Option<CancelOnDrop>,
}

impl Gui {
Expand All @@ -373,21 +373,24 @@ impl Gui {
.template("{spinner:.green} [{bar:80.cyan/blue}] {msg} {bytes}/{total_bytes} ({bytes_per_sec})").unwrap()
.progress_chars("█▉▊▋▌▍▎▏ "));
let counters2 = counters.clone();
let counter_task = AbortingJoinHandle::from(tokio::spawn(async move {
let counter_task = tokio::spawn(async move {
loop {
Self::update_counters(&counters2);
Self::update_connection_info(&conn_info, &endpoint, &node_id);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}));
});
Self {
mp,
pb,
counters,
send_pb,
recv_pb,
echo_pb,
counter_task: Some(counter_task),
counter_task: Some(CancelOnDrop::new(
"counter_task",
counter_task.abort_handle(),
)),
}
}

Expand Down Expand Up @@ -587,9 +590,7 @@ async fn recv_test(
}

/// Passive side that just accepts connections and answers requests (echo, drain or send)
async fn passive_side(endpoint: MagicEndpoint, connection: Connection) -> anyhow::Result<()> {
let remote_peer_id = magic_endpoint::get_remote_node_id(&connection)?;
let gui = Gui::new(endpoint, remote_peer_id);
async fn passive_side(gui: Gui, connection: Connection) -> anyhow::Result<()> {
loop {
match connection.accept_bi().await {
Ok((send, recv)) => {
Expand Down Expand Up @@ -667,7 +668,13 @@ async fn connect(
let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await;
match conn {
Ok(connection) => {
if let Err(cause) = passive_side(endpoint.clone(), connection).await {
let maybe_stream = endpoint.conn_type_stream(&node_id);
let gui = Gui::new(endpoint, node_id);
if let Ok(stream) = maybe_stream {
log_connection_changes(gui.mp.clone(), node_id, stream);
}

if let Err(cause) = passive_side(gui, connection).await {
eprintln!("error handling connection: {cause}");
}
}
Expand Down Expand Up @@ -740,6 +747,9 @@ async fn accept(
println!("Accepted connection from {}", remote_peer_id);
let t0 = Instant::now();
let gui = Gui::new(endpoint.clone(), remote_peer_id);
if let Ok(stream) = endpoint.conn_type_stream(&remote_peer_id) {
log_connection_changes(gui.mp.clone(), remote_peer_id, stream);
}
let res = active_side(connection, &config, Some(&gui)).await;
gui.clear();
let dt = t0.elapsed().as_secs_f64();
Expand All @@ -764,6 +774,19 @@ async fn accept(
Ok(())
}

fn log_connection_changes(pb: MultiProgress, node_id: NodeId, mut stream: ConnectionTypeStream) {
tokio::spawn(async move {
let start = Instant::now();
while let Some(conn_type) = stream.next().await {
pb.println(format!(
"Connection with {node_id:#} changed: {conn_type} (after {:?})",
start.elapsed()
))
.ok();
}
});
}

async fn port_map(protocol: &str, local_port: NonZeroU16, timeout: Duration) -> anyhow::Result<()> {
// create the config that enables exclusively the required protocol
let mut enable_upnp = false;
Expand Down
24 changes: 9 additions & 15 deletions iroh-net/src/magicsock/node_map/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,21 +296,15 @@ impl NodeState {
(addr, self.relay_url())
}
};
match (best_addr, relay_url.clone()) {
(Some(best_addr), Some(relay_url)) => {
let _ = self
.conn_type
.update(ConnectionType::Mixed(best_addr, relay_url));
}
(Some(best_addr), None) => {
let _ = self.conn_type.update(ConnectionType::Direct(best_addr));
}
(None, Some(relay_url)) => {
let _ = self.conn_type.update(ConnectionType::Relay(relay_url));
}
(None, None) => {
let _ = self.conn_type.update(ConnectionType::None);
}
let typ = match (best_addr, relay_url.clone()) {
(Some(best_addr), Some(relay_url)) => ConnectionType::Mixed(best_addr, relay_url),
(Some(best_addr), None) => ConnectionType::Direct(best_addr),
(None, Some(relay_url)) => ConnectionType::Relay(relay_url),
(None, None) => ConnectionType::None,
};
if self.conn_type.update(typ).is_ok() {
let typ = self.conn_type.get();
info!(%typ, "new connection type");
}
(best_addr, relay_url)
}
Expand Down

0 comments on commit 9a050a9

Please sign in to comment.