From aa1cf66decf69f1f4b6d9e464c45d4229a065c65 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 4 Apr 2024 20:25:19 +0200 Subject: [PATCH] fix(iroh-net): avoid double connections to relays (#2148) The `self.connect` call in `Actor::recv_detail` was racing to create a connection with an incoming actor message to do so. This simplifies `recv_detail` and only attempts to do so if there is already a connection, which is the correct logic in the first place. Closes #2144 --------- Co-authored-by: Kasey Co-authored-by: Kasey --- iroh-net/src/magicsock.rs | 15 ++++--- iroh-net/src/magicsock/relay_actor.rs | 23 +++++----- iroh-net/src/relay/http/client.rs | 64 +++++++++++++++++---------- 3 files changed, 59 insertions(+), 43 deletions(-) diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 05a250eac7..7025348d67 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -233,8 +233,11 @@ impl Inner { /// Sets the relay node with the best latency. /// /// If we are not connected to any relay nodes, set this to `None`. - fn set_my_relay(&self, my_relay: Option) { - *self.my_relay.write().expect("not poisoned") = my_relay; + fn set_my_relay(&self, my_relay: Option) -> Option { + let mut lock = self.my_relay.write().expect("not poisoned"); + let old = lock.take(); + *lock = my_relay; + old } fn is_closing(&self) -> bool { @@ -2188,20 +2191,18 @@ impl Actor { // No change. return true; } - self.inner.set_my_relay(relay_url.clone()); + let old_relay = self.inner.set_my_relay(relay_url.clone()); if let Some(ref relay_url) = relay_url { inc!(MagicsockMetrics, relay_home_change); // On change, notify all currently connected relay servers and // start connecting to our home relay if we are not already. - info!("home is now relay {}", relay_url); + info!("home is now relay {}, was {:?}", relay_url, old_relay); self.inner.publish_my_addr(); - self.send_relay_actor(RelayActorMessage::NotePreferred(relay_url.clone())); - self.send_relay_actor(RelayActorMessage::Connect { + self.send_relay_actor(RelayActorMessage::SetHome { url: relay_url.clone(), - peer: None, }); } diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index eb25b66644..f40daf2de8 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -38,12 +38,10 @@ pub(super) enum RelayActorMessage { contents: RelayContents, peer: PublicKey, }, - Connect { + MaybeCloseRelaysOnRebind(Vec), + SetHome { url: RelayUrl, - peer: Option, }, - NotePreferred(RelayUrl), - MaybeCloseRelaysOnRebind(Vec), } /// Contains fields for an active relay connection. @@ -107,6 +105,7 @@ impl ActiveRelay { } async fn run(mut self, mut inbox: mpsc::Receiver) -> anyhow::Result<()> { + debug!("initial dial {}", self.url); self.relay_client .connect() .await @@ -349,11 +348,9 @@ impl RelayActor { } => { self.send_relay(&url, contents, peer).await; } - RelayActorMessage::Connect { url, peer } => { - self.connect_relay(&url, peer.as_ref()).await; - } - RelayActorMessage::NotePreferred(my_relay) => { - self.note_preferred(&my_relay).await; + RelayActorMessage::SetHome { url } => { + self.note_preferred(&url).await; + self.connect_relay(&url, None).await; } RelayActorMessage::MaybeCloseRelaysOnRebind(ifs) => { self.maybe_close_relays_on_rebind(&ifs).await; @@ -407,7 +404,8 @@ impl RelayActor { /// Returns `true`if the message was sent successfully. async fn send_to_active(&mut self, url: &RelayUrl, msg: ActiveRelayMessage) -> bool { - match self.active_relay.get(url) { + let res = self.active_relay.get(url); + match res { Some((s, _)) => match s.send(msg).await { Ok(_) => true, Err(mpsc::error::SendError(_)) => { @@ -425,6 +423,7 @@ impl RelayActor { url: &RelayUrl, peer: Option<&PublicKey>, ) -> relay::http::Client { + debug!("connect relay {} for peer {:?}", url, peer); // See if we have a connection open to that relay node ID first. If so, might as // well use it. (It's a little arbitrary whether we use this one vs. the reverse route // below when we have both.) @@ -472,14 +471,14 @@ impl RelayActor { } else { "home-keep-alive".to_string() }; - info!("adding connection to relay-{url} for {why}"); + info!("adding connection to relay: {url} for {why}"); let my_relay = self.conn.my_relay(); let ipv6_reported = self.conn.ipv6_reported.clone(); let url = url.clone(); let url1 = url.clone(); - // building a client does not dial + // building a client dials the relay let (dc, dc_receiver) = relay::http::ClientBuilder::new(url1.clone()) .address_family_selector(move || { let ipv6_reported = ipv6_reported.clone(); diff --git a/iroh-net/src/relay/http/client.rs b/iroh-net/src/relay/http/client.rs index e7c3346b72..7c92b69850 100644 --- a/iroh-net/src/relay/http/client.rs +++ b/iroh-net/src/relay/http/client.rs @@ -437,15 +437,33 @@ impl Actor { mut inbox: mpsc::Receiver, msg_sender: mpsc::Sender>, ) { + // Add an initial connection attempt. + if let Err(err) = self.connect("initial connect").await { + msg_sender.send(Err(err)).await.ok(); + } + loop { tokio::select! { res = self.recv_detail() => { + if let Ok((ReceivedMessage::Pong(ping), _)) = res { + match self.pings.unregister(ping, "pong") { + Some(chan) => { + if chan.send(()).is_err() { + warn!("pong received for ping {ping:?}, but the receiving channel was closed"); + } + } + None => { + warn!("pong received for ping {ping:?}, but not registered"); + } + } + continue; + } msg_sender.send(res).await.ok(); } Some(msg) = inbox.recv() => { match msg { ActorMessage::Connect(s) => { - let res = self.connect().await.map(|(client, _, count)| (client, count)); + let res = self.connect("actor msg").await.map(|(client, _, count)| (client, count)); s.send(res).ok(); }, ActorMessage::NotePreferred(is_preferred) => { @@ -493,7 +511,14 @@ impl Actor { async fn connect( &mut self, + why: &'static str, ) -> Result<(RelayClient, &'_ mut RelayClientReceiver, usize), ClientError> { + debug!( + "connect: {}, current client {}", + why, + self.relay_client.is_some() + ); + if self.is_closed { return Err(ClientError::Closed); } @@ -505,17 +530,18 @@ impl Actor { .await .map_err(|_| ClientError::ConnectTimeout)??; - self.relay_client = Some((relay_client, receiver)); + self.relay_client = Some((relay_client.clone(), receiver)); self.next_conn(); + } else { + trace!("already had connection"); } - let count = self.current_conn(); let (relay_client, receiver) = self .relay_client .as_mut() .map(|(c, r)| (c.clone(), r)) - .expect("just inserted"); - trace!("already had connection"); + .expect("just checked"); + Ok((relay_client, receiver, count)) } .instrument(info_span!("connect")) @@ -650,7 +676,7 @@ impl Actor { } async fn ping(&mut self, s: oneshot::Sender>) { - let connect_res = self.connect().await.map(|(c, _, _)| c); + let connect_res = self.connect("ping").await.map(|(c, _, _)| c); let (ping, recv) = self.pings.register(); trace!("ping: {}", hex::encode(ping)); @@ -677,7 +703,7 @@ impl Actor { async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> { trace!(dst = %dst_key.fmt_short(), len = b.len(), "send"); - let (client, _, _) = self.connect().await?; + let (client, _, _) = self.connect("send").await?; if client.send(dst_key, b).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); @@ -688,7 +714,7 @@ impl Actor { async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> { debug!("send_pong"); if self.can_ack_pings { - let (client, _, _) = self.connect().await?; + let (client, _, _) = self.connect("send_pong").await?; if client.send_pong(data).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); @@ -785,25 +811,12 @@ impl Actor { } async fn recv_detail(&mut self) -> Result<(ReceivedMessage, usize), ClientError> { - loop { + if let Some((_client, client_receiver)) = self.relay_client.as_mut() { trace!("recv_detail tick"); - let (_client, client_receiver, conn_gen) = self.connect().await?; match client_receiver.recv().await { Ok(msg) => { - if let ReceivedMessage::Pong(ping) = msg { - match self.pings.unregister(ping, "pong") { - Some(chan) => { - if chan.send(()).is_err() { - warn!("pong received for ping {ping:?}, but the receiving channel was closed"); - } - } - None => { - warn!("pong received for ping {ping:?}, but not registered"); - } - } - continue; - } - return Ok((msg, conn_gen)); + let current_gen = self.current_conn(); + return Ok((msg, current_gen)); } Err(e) => { self.close_for_reconnect().await; @@ -815,6 +828,7 @@ impl Actor { } } } + std::future::pending().await } /// Close the underlying relay connection. The next time the client takes some action that @@ -918,6 +932,8 @@ mod tests { #[tokio::test] async fn test_recv_detail_connect_error() -> Result<()> { + let _guard = iroh_test::logging::setup(); + let key = SecretKey::generate(); let bad_url: Url = "https://bad.url".parse().unwrap(); let dns_resolver = default_resolver();