Skip to content

Commit

Permalink
fix(iroh-net): avoid double connections to relays (#2148)
Browse files Browse the repository at this point in the history
The `self.connect` call in `Actor::recv_detail` was racing to create a
connection with an incoming actor message to do so.

This simplifies `recv_detail` and only attempts to do so if there is
already a connection, which is the correct logic in the first place.

Closes #2144

---------

Co-authored-by: Kasey <[email protected]>
Co-authored-by: Kasey <[email protected]>
  • Loading branch information
3 people authored Apr 4, 2024
1 parent c85bf3d commit aa1cf66
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 43 deletions.
15 changes: 8 additions & 7 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,11 @@ impl Inner {
/// Sets the relay node with the best latency.
///
/// If we are not connected to any relay nodes, set this to `None`.
fn set_my_relay(&self, my_relay: Option<RelayUrl>) {
*self.my_relay.write().expect("not poisoned") = my_relay;
fn set_my_relay(&self, my_relay: Option<RelayUrl>) -> Option<RelayUrl> {
let mut lock = self.my_relay.write().expect("not poisoned");
let old = lock.take();
*lock = my_relay;
old
}

fn is_closing(&self) -> bool {
Expand Down Expand Up @@ -2188,20 +2191,18 @@ impl Actor {
// No change.
return true;
}
self.inner.set_my_relay(relay_url.clone());
let old_relay = self.inner.set_my_relay(relay_url.clone());

if let Some(ref relay_url) = relay_url {
inc!(MagicsockMetrics, relay_home_change);

// On change, notify all currently connected relay servers and
// start connecting to our home relay if we are not already.
info!("home is now relay {}", relay_url);
info!("home is now relay {}, was {:?}", relay_url, old_relay);
self.inner.publish_my_addr();

self.send_relay_actor(RelayActorMessage::NotePreferred(relay_url.clone()));
self.send_relay_actor(RelayActorMessage::Connect {
self.send_relay_actor(RelayActorMessage::SetHome {
url: relay_url.clone(),
peer: None,
});
}

Expand Down
23 changes: 11 additions & 12 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ pub(super) enum RelayActorMessage {
contents: RelayContents,
peer: PublicKey,
},
Connect {
MaybeCloseRelaysOnRebind(Vec<IpAddr>),
SetHome {
url: RelayUrl,
peer: Option<PublicKey>,
},
NotePreferred(RelayUrl),
MaybeCloseRelaysOnRebind(Vec<IpAddr>),
}

/// Contains fields for an active relay connection.
Expand Down Expand Up @@ -107,6 +105,7 @@ impl ActiveRelay {
}

async fn run(mut self, mut inbox: mpsc::Receiver<ActiveRelayMessage>) -> anyhow::Result<()> {
debug!("initial dial {}", self.url);
self.relay_client
.connect()
.await
Expand Down Expand Up @@ -349,11 +348,9 @@ impl RelayActor {
} => {
self.send_relay(&url, contents, peer).await;
}
RelayActorMessage::Connect { url, peer } => {
self.connect_relay(&url, peer.as_ref()).await;
}
RelayActorMessage::NotePreferred(my_relay) => {
self.note_preferred(&my_relay).await;
RelayActorMessage::SetHome { url } => {
self.note_preferred(&url).await;
self.connect_relay(&url, None).await;
}
RelayActorMessage::MaybeCloseRelaysOnRebind(ifs) => {
self.maybe_close_relays_on_rebind(&ifs).await;
Expand Down Expand Up @@ -407,7 +404,8 @@ impl RelayActor {

/// Returns `true`if the message was sent successfully.
async fn send_to_active(&mut self, url: &RelayUrl, msg: ActiveRelayMessage) -> bool {
match self.active_relay.get(url) {
let res = self.active_relay.get(url);
match res {
Some((s, _)) => match s.send(msg).await {
Ok(_) => true,
Err(mpsc::error::SendError(_)) => {
Expand All @@ -425,6 +423,7 @@ impl RelayActor {
url: &RelayUrl,
peer: Option<&PublicKey>,
) -> relay::http::Client {
debug!("connect relay {} for peer {:?}", url, peer);
// See if we have a connection open to that relay node ID first. If so, might as
// well use it. (It's a little arbitrary whether we use this one vs. the reverse route
// below when we have both.)
Expand Down Expand Up @@ -472,14 +471,14 @@ impl RelayActor {
} else {
"home-keep-alive".to_string()
};
info!("adding connection to relay-{url} for {why}");
info!("adding connection to relay: {url} for {why}");

let my_relay = self.conn.my_relay();
let ipv6_reported = self.conn.ipv6_reported.clone();
let url = url.clone();
let url1 = url.clone();

// building a client does not dial
// building a client dials the relay
let (dc, dc_receiver) = relay::http::ClientBuilder::new(url1.clone())
.address_family_selector(move || {
let ipv6_reported = ipv6_reported.clone();
Expand Down
64 changes: 40 additions & 24 deletions iroh-net/src/relay/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,33 @@ impl Actor {
mut inbox: mpsc::Receiver<ActorMessage>,
msg_sender: mpsc::Sender<Result<(ReceivedMessage, usize), ClientError>>,
) {
// Add an initial connection attempt.
if let Err(err) = self.connect("initial connect").await {
msg_sender.send(Err(err)).await.ok();
}

loop {
tokio::select! {
res = self.recv_detail() => {
if let Ok((ReceivedMessage::Pong(ping), _)) = res {
match self.pings.unregister(ping, "pong") {
Some(chan) => {
if chan.send(()).is_err() {
warn!("pong received for ping {ping:?}, but the receiving channel was closed");
}
}
None => {
warn!("pong received for ping {ping:?}, but not registered");
}
}
continue;
}
msg_sender.send(res).await.ok();
}
Some(msg) = inbox.recv() => {
match msg {
ActorMessage::Connect(s) => {
let res = self.connect().await.map(|(client, _, count)| (client, count));
let res = self.connect("actor msg").await.map(|(client, _, count)| (client, count));
s.send(res).ok();
},
ActorMessage::NotePreferred(is_preferred) => {
Expand Down Expand Up @@ -493,7 +511,14 @@ impl Actor {

async fn connect(
&mut self,
why: &'static str,
) -> Result<(RelayClient, &'_ mut RelayClientReceiver, usize), ClientError> {
debug!(
"connect: {}, current client {}",
why,
self.relay_client.is_some()
);

if self.is_closed {
return Err(ClientError::Closed);
}
Expand All @@ -505,17 +530,18 @@ impl Actor {
.await
.map_err(|_| ClientError::ConnectTimeout)??;

self.relay_client = Some((relay_client, receiver));
self.relay_client = Some((relay_client.clone(), receiver));
self.next_conn();
} else {
trace!("already had connection");
}

let count = self.current_conn();
let (relay_client, receiver) = self
.relay_client
.as_mut()
.map(|(c, r)| (c.clone(), r))
.expect("just inserted");
trace!("already had connection");
.expect("just checked");

Ok((relay_client, receiver, count))
}
.instrument(info_span!("connect"))
Expand Down Expand Up @@ -650,7 +676,7 @@ impl Actor {
}

async fn ping(&mut self, s: oneshot::Sender<Result<Duration, ClientError>>) {
let connect_res = self.connect().await.map(|(c, _, _)| c);
let connect_res = self.connect("ping").await.map(|(c, _, _)| c);
let (ping, recv) = self.pings.register();
trace!("ping: {}", hex::encode(ping));

Expand All @@ -677,7 +703,7 @@ impl Actor {

async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> {
trace!(dst = %dst_key.fmt_short(), len = b.len(), "send");
let (client, _, _) = self.connect().await?;
let (client, _, _) = self.connect("send").await?;
if client.send(dst_key, b).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
Expand All @@ -688,7 +714,7 @@ impl Actor {
async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> {
debug!("send_pong");
if self.can_ack_pings {
let (client, _, _) = self.connect().await?;
let (client, _, _) = self.connect("send_pong").await?;
if client.send_pong(data).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
Expand Down Expand Up @@ -785,25 +811,12 @@ impl Actor {
}

async fn recv_detail(&mut self) -> Result<(ReceivedMessage, usize), ClientError> {
loop {
if let Some((_client, client_receiver)) = self.relay_client.as_mut() {
trace!("recv_detail tick");
let (_client, client_receiver, conn_gen) = self.connect().await?;
match client_receiver.recv().await {
Ok(msg) => {
if let ReceivedMessage::Pong(ping) = msg {
match self.pings.unregister(ping, "pong") {
Some(chan) => {
if chan.send(()).is_err() {
warn!("pong received for ping {ping:?}, but the receiving channel was closed");
}
}
None => {
warn!("pong received for ping {ping:?}, but not registered");
}
}
continue;
}
return Ok((msg, conn_gen));
let current_gen = self.current_conn();
return Ok((msg, current_gen));
}
Err(e) => {
self.close_for_reconnect().await;
Expand All @@ -815,6 +828,7 @@ impl Actor {
}
}
}
std::future::pending().await
}

/// Close the underlying relay connection. The next time the client takes some action that
Expand Down Expand Up @@ -918,6 +932,8 @@ mod tests {

#[tokio::test]
async fn test_recv_detail_connect_error() -> Result<()> {
let _guard = iroh_test::logging::setup();

let key = SecretKey::generate();
let bad_url: Url = "https://bad.url".parse().unwrap();
let dns_resolver = default_resolver();
Expand Down

0 comments on commit aa1cf66

Please sign in to comment.