Skip to content

Commit

Permalink
handle receiver closing in magicsock and relay_actor
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulOlteanu committed Nov 16, 2024
1 parent 7906c79 commit fdeec9c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
42 changes: 34 additions & 8 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1767,10 +1767,22 @@ impl Actor {
discovery_events = events;
}
}

let mut receiver_closed = false;
let mut portmap_watcher_closed = false;
let mut link_change_closed = false;
loop {
inc!(Metrics, actor_tick_main);
tokio::select! {
Some(msg) = self.msg_receiver.recv() => {
msg = self.msg_receiver.recv(), if !receiver_closed => {
let Some(msg) = msg else {
trace!("tick: magicsock receiver closed");
inc!(Metrics, actor_tick_other);

receiver_closed = true;
continue;
};

trace!(?msg, "tick: msg");
inc!(Metrics, actor_tick_msg);
if self.handle_actor_message(msg).await {
Expand All @@ -1782,7 +1794,15 @@ impl Actor {
inc!(Metrics, actor_tick_re_stun);
self.msock.re_stun("periodic");
}
Ok(()) = portmap_watcher.changed() => {
change = portmap_watcher.changed(), if !portmap_watcher_closed => {
if change.is_err() {
trace!("tick: portmap watcher closed");
inc!(Metrics, actor_tick_other);

portmap_watcher_closed = true;
continue;
}

trace!("tick: portmap changed");
inc!(Metrics, actor_tick_portmap_changed);
let new_external_address = *portmap_watcher.borrow();
Expand All @@ -1809,23 +1829,29 @@ impl Actor {
self.refresh_direct_addrs(reason).await;
}
}
Some(is_major) = link_change_r.recv() => {
is_major = link_change_r.recv(), if !link_change_closed => {
let Some(is_major) = is_major else {
trace!("tick: link change receiver closed");
inc!(Metrics, actor_tick_other);

link_change_closed = true;
continue;
};

trace!("tick: link change {}", is_major);
inc!(Metrics, actor_link_change);
self.handle_network_change(is_major).await;
}
// Even if `discovery_events` yields `None`, it could begin to yield
// `Some` again in the future, so we don't want to disable this branch
// forever like we do with the other branches that yield `Option`s
Some(discovery_item) = discovery_events.next() => {
trace!("tick: discovery event, address discovered: {discovery_item:?}");
let node_addr = NodeAddr {node_id: discovery_item.node_id, info: discovery_item.addr_info};
if let Err(e) = self.msock.add_node_addr(node_addr.clone(), Source::Discovery { name: discovery_item.provenance.into() }) {
warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}");
}
}
// This case will never hit. TODO: Figure out what the intention was here
else => {
trace!("tick: other");
inc!(Metrics, actor_tick_other);
}
}
}
}
Expand Down
35 changes: 23 additions & 12 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,26 +303,37 @@ impl RelayActor {
trace!("shutting down");
break;
}
Some(Ok((url, ping_success))) = self.ping_tasks.join_next() => {
if !ping_success {
with_cancel(
self.cancel_token.child_token(),
self.close_or_reconnect_relay(&url, "rebind-ping-fail")
).await;
// `ping_tasks` being empty is a normal situation - in fact it starts empty
// until a `MaybeCloseRelaysOnRebind` message is received.
Some(task_result) = self.ping_tasks.join_next() => {
match task_result {
Ok((url, ping_success)) => {
if !ping_success {
with_cancel(
self.cancel_token.child_token(),
self.close_or_reconnect_relay(&url, "rebind-ping-fail")
).await;
}
}

Err(err) => {
warn!("ping task error: {:?}", err);
}
}
}
Some(msg) = receiver.recv() => {

msg = receiver.recv() => {
let Some(msg) = msg else {
trace!("shutting down relay recv loop");
break;
};

with_cancel(self.cancel_token.child_token(), self.handle_msg(msg)).await;
}
_ = cleanup_timer.tick() => {
trace!("tick: cleanup");
with_cancel(self.cancel_token.child_token(), self.clean_stale_relay()).await;
}
// This case will never hit. TODO: Figure out what the intention was here
else => {
trace!("shutting down relay recv loop");
break;
}
}
}

Expand Down

0 comments on commit fdeec9c

Please sign in to comment.