Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-net)!: Do not use &NodeId in APIs as this is Copy #2363

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ async fn connect(
let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await;
match conn {
Ok(connection) => {
let maybe_stream = endpoint.conn_type_stream(&node_id);
let maybe_stream = endpoint.conn_type_stream(node_id);
let gui = Gui::new(endpoint, node_id);
if let Ok(stream) = maybe_stream {
log_connection_changes(gui.mp.clone(), node_id, stream);
Expand Down Expand Up @@ -770,7 +770,7 @@ async fn accept(
println!("Accepted connection from {}", remote_peer_id);
let t0 = Instant::now();
let gui = Gui::new(endpoint.clone(), remote_peer_id);
if let Ok(stream) = endpoint.conn_type_stream(&remote_peer_id) {
if let Ok(stream) = endpoint.conn_type_stream(remote_peer_id) {
log_connection_changes(gui.mp.clone(), remote_peer_id, stream);
}
let res = active_side(connection, &config, Some(&gui)).await;
Expand Down
12 changes: 6 additions & 6 deletions iroh-net/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ impl Endpoint {

let rtt_msg = RttMessage::NewConnection {
connection: connection.weak_handle(),
conn_type_changes: self.conn_type_stream(node_id)?,
conn_type_changes: self.conn_type_stream(*node_id)?,
node_id: *node_id,
};
if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await {
Expand Down Expand Up @@ -700,7 +700,7 @@ impl Endpoint {
/// # Errors
///
/// Will error if we do not have any address information for the given `node_id`.
pub fn conn_type_stream(&self, node_id: &NodeId) -> Result<ConnectionTypeStream> {
pub fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
self.msock.conn_type_stream(node_id)
}

Expand Down Expand Up @@ -794,7 +794,7 @@ impl Endpoint {
// Only return a mapped addr if we have some way of dialing this node, in other
// words, we have either a relay URL or at least one direct address.
let addr = if self.msock.has_send_address(node_id) {
self.msock.get_mapping_addr(&node_id)
self.msock.get_mapping_addr(node_id)
} else {
None
};
Expand Down Expand Up @@ -822,7 +822,7 @@ impl Endpoint {
let mut discovery = DiscoveryTask::start(self.clone(), node_id)?;
discovery.first_arrived().await?;
if self.msock.has_send_address(node_id) {
let addr = self.msock.get_mapping_addr(&node_id).expect("checked");
let addr = self.msock.get_mapping_addr(node_id).expect("checked");
Ok((addr, Some(discovery)))
} else {
bail!("Failed to retrieve the mapped address from the magic socket. Unable to dial node {node_id:?}");
Expand Down Expand Up @@ -967,7 +967,7 @@ fn try_send_rtt_msg(conn: &quinn::Connection, magic_ep: &Endpoint) {
warn!(?conn, "failed to get remote node id");
return;
};
let Ok(conn_type_changes) = magic_ep.conn_type_stream(&peer_id) else {
let Ok(conn_type_changes) = magic_ep.conn_type_stream(peer_id) else {
warn!(?conn, "failed to create conn_type_stream");
return;
};
Expand Down Expand Up @@ -1411,7 +1411,7 @@ mod tests {
async fn handle_direct_conn(ep: Endpoint, node_id: PublicKey) -> Result<()> {
let node_addr = NodeAddr::new(node_id);
ep.add_node_addr(node_addr)?;
let stream = ep.conn_type_stream(&node_id)?;
let stream = ep.conn_type_stream(node_id)?;
async fn get_direct_event(
src: &PublicKey,
dst: &PublicKey,
Expand Down
13 changes: 7 additions & 6 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::{
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_lite::{FutureExt, Stream, StreamExt};
use iroh_base::key::NodeId;
use iroh_metrics::{inc, inc_by};
use quinn::AsyncUdpSocket;
use rand::{seq::SliceRandom, Rng, SeedableRng};
Expand Down Expand Up @@ -299,8 +300,8 @@ impl MagicSock {
}

/// Retrieve connection information about a node in the network.
pub fn connection_info(&self, node_key: PublicKey) -> Option<ConnectionInfo> {
self.node_map.node_info(&node_key)
pub fn connection_info(&self, node_id: NodeId) -> Option<ConnectionInfo> {
self.node_map.node_info(node_id)
}

/// Returns the local endpoints as a stream.
Expand Down Expand Up @@ -350,17 +351,17 @@ impl MagicSock {
///
/// Will return an error if there is no address information known about the
/// given `node_id`.
pub fn conn_type_stream(&self, node_id: &PublicKey) -> Result<ConnectionTypeStream> {
pub fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
self.node_map.conn_type_stream(node_id)
}

/// Returns the [`SocketAddr`] which can be used by the QUIC layer to dial this node.
///
/// Note this is a user-facing API and does not wrap the [`SocketAddr`] in a
/// [`QuicMappedAddr`] as we do internally.
pub fn get_mapping_addr(&self, node_key: &PublicKey) -> Option<SocketAddr> {
pub fn get_mapping_addr(&self, node_id: NodeId) -> Option<SocketAddr> {
self.node_map
.get_quic_mapped_addr_for_node_key(node_key)
.get_quic_mapped_addr_for_node_key(node_id)
.map(|a| a.0)
}

Expand Down Expand Up @@ -468,7 +469,7 @@ impl MagicSock {
let mut transmits_sent = 0;
match self
.node_map
.get_send_addrs(&dest, self.ipv6_reported.load(Ordering::Relaxed))
.get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed))
{
Some((public_key, udp_addr, relay_url, mut msgs)) => {
let mut pings_sent = false;
Expand Down
78 changes: 35 additions & 43 deletions iroh-net/src/magicsock/node_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ pub(super) struct NodeMapInner {
/// You can look up entries in [`NodeMap`] with various keys, depending on the context you
/// have for the node. These are all the keys the [`NodeMap`] can use.
#[derive(Clone)]
enum NodeStateKey<'a> {
Idx(&'a usize),
NodeId(&'a NodeId),
QuicMappedAddr(&'a QuicMappedAddr),
IpPort(&'a IpPort),
enum NodeStateKey {
Idx(usize),
NodeId(NodeId),
QuicMappedAddr(QuicMappedAddr),
IpPort(IpPort),
}

impl NodeMap {
Expand Down Expand Up @@ -112,8 +112,8 @@ impl NodeMap {
self.inner.lock().receive_udp(udp_addr)
}

pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: PublicKey) -> QuicMappedAddr {
self.inner.lock().receive_relay(relay_url, &src)
pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr {
self.inner.lock().receive_relay(relay_url, src)
}

pub(super) fn notify_ping_sent(
Expand All @@ -124,20 +124,20 @@ impl NodeMap {
purpose: DiscoPingPurpose,
msg_sender: tokio::sync::mpsc::Sender<ActorMessage>,
) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) {
ep.ping_sent(dst, tx_id, purpose, msg_sender);
}
}

pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) {
ep.ping_timeout(tx_id);
}
}

pub(super) fn get_quic_mapped_addr_for_node_key(
&self,
node_key: &PublicKey,
node_key: NodeId,
) -> Option<QuicMappedAddr> {
self.inner
.lock()
Expand Down Expand Up @@ -172,7 +172,7 @@ impl NodeMap {
#[allow(clippy::type_complexity)]
pub(super) fn get_send_addrs(
&self,
addr: &QuicMappedAddr,
addr: QuicMappedAddr,
have_ipv6: bool,
) -> Option<(
PublicKey,
Expand Down Expand Up @@ -223,16 +223,13 @@ impl NodeMap {
///
/// Will return an error if there is not an entry in the [`NodeMap`] for
/// the `public_key`
pub(super) fn conn_type_stream(
&self,
public_key: &PublicKey,
) -> anyhow::Result<ConnectionTypeStream> {
self.inner.lock().conn_type_stream(public_key)
pub(super) fn conn_type_stream(&self, node_id: NodeId) -> anyhow::Result<ConnectionTypeStream> {
self.inner.lock().conn_type_stream(node_id)
}

/// Get the [`NodeInfo`]s for each endpoint
pub(super) fn node_info(&self, public_key: &PublicKey) -> Option<NodeInfo> {
self.inner.lock().node_info(public_key)
pub(super) fn node_info(&self, node_id: NodeId) -> Option<NodeInfo> {
self.inner.lock().node_info(node_id)
}

/// Saves the known node info to the given path, returning the number of nodes persisted.
Expand Down Expand Up @@ -323,7 +320,7 @@ impl NodeMapInner {
fn add_node_addr(&mut self, node_addr: NodeAddr) {
let NodeAddr { node_id, info } = node_addr;

let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&node_id), || Options {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(node_id), || Options {
node_id,
relay_url: info.relay_url.clone(),
active: false,
Expand All @@ -338,10 +335,10 @@ impl NodeMapInner {

fn get_id(&self, id: NodeStateKey) -> Option<usize> {
match id {
NodeStateKey::Idx(id) => Some(*id),
NodeStateKey::NodeId(node_key) => self.by_node_key.get(node_key).copied(),
NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(),
NodeStateKey::IpPort(ipp) => self.by_ip_port.get(ipp).copied(),
NodeStateKey::Idx(id) => Some(id),
NodeStateKey::NodeId(node_key) => self.by_node_key.get(&node_key).copied(),
NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(),
NodeStateKey::IpPort(ipp) => self.by_ip_port.get(&ipp).copied(),
}
}

Expand Down Expand Up @@ -373,7 +370,7 @@ impl NodeMapInner {
/// Marks the node we believe to be at `ipp` as recently used.
fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> {
let ip_port: IpPort = udp_addr.into();
let Some(node_state) = self.get_mut(NodeStateKey::IpPort(&ip_port)) else {
let Some(node_state) = self.get_mut(NodeStateKey::IpPort(ip_port)) else {
info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore");
return None;
};
Expand All @@ -382,11 +379,11 @@ impl NodeMapInner {
}

#[instrument(skip_all, fields(src = %src.fmt_short()))]
fn receive_relay(&mut self, relay_url: &RelayUrl, src: &PublicKey) -> QuicMappedAddr {
fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || {
trace!("packets from unknown node, insert into node map");
Options {
node_id: *src,
node_id: src,
relay_url: Some(relay_url.clone()),
active: true,
}
Expand All @@ -409,8 +406,8 @@ impl NodeMapInner {
}

/// Get the [`NodeInfo`]s for each endpoint
fn node_info(&self, public_key: &PublicKey) -> Option<NodeInfo> {
self.get(NodeStateKey::NodeId(public_key))
fn node_info(&self, node_id: NodeId) -> Option<NodeInfo> {
self.get(NodeStateKey::NodeId(node_id))
.map(|ep| ep.info(Instant::now()))
}

Expand All @@ -423,18 +420,18 @@ impl NodeMapInner {
///
/// Will return an error if there is not an entry in the [`NodeMap`] for
/// the `public_key`
fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result<ConnectionTypeStream> {
match self.get(NodeStateKey::NodeId(public_key)) {
fn conn_type_stream(&self, node_id: NodeId) -> anyhow::Result<ConnectionTypeStream> {
match self.get(NodeStateKey::NodeId(node_id)) {
Some(ep) => Ok(ConnectionTypeStream {
initial: Some(ep.conn_type()),
inner: ep.conn_type_stream(),
}),
None => anyhow::bail!("No endpoint for {public_key:?} found"),
None => anyhow::bail!("No endpoint for {node_id:?} found"),
}
}

fn handle_pong(&mut self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) {
if let Some(ns) = self.get_mut(NodeStateKey::NodeId(&sender)).as_mut() {
fn handle_pong(&mut self, sender: NodeId, src: &DiscoMessageSource, pong: Pong) {
if let Some(ns) = self.get_mut(NodeStateKey::NodeId(sender)).as_mut() {
let insert = ns.handle_pong(&pong, src.into());
if let Some((src, key)) = insert {
self.set_node_key_for_ip_port(src, &key);
Expand All @@ -446,8 +443,8 @@ impl NodeMapInner {
}

#[must_use = "actions must be handled"]
fn handle_call_me_maybe(&mut self, sender: PublicKey, cm: CallMeMaybe) -> Vec<PingAction> {
let ns_id = NodeStateKey::NodeId(&sender);
fn handle_call_me_maybe(&mut self, sender: NodeId, cm: CallMeMaybe) -> Vec<PingAction> {
let ns_id = NodeStateKey::NodeId(sender);
if let Some(id) = self.get_id(ns_id.clone()) {
for number in &cm.my_numbers {
// ensure the new addrs are known
Expand All @@ -468,13 +465,8 @@ impl NodeMapInner {
}
}

fn handle_ping(
&mut self,
sender: PublicKey,
src: SendAddr,
tx_id: TransactionId,
) -> PingHandled {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&sender), || {
fn handle_ping(&mut self, sender: NodeId, src: SendAddr, tx_id: TransactionId) -> PingHandled {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(sender), || {
debug!("received ping: node unknown, add to node map");
Options {
node_id: sender,
Expand Down Expand Up @@ -815,7 +807,7 @@ mod tests {
node_map
.inner
.lock()
.get(NodeStateKey::NodeId(&active_node))
.get(NodeStateKey::NodeId(active_node))
.expect("should not be pruned");
}
}
2 changes: 1 addition & 1 deletion iroh-net/src/magicsock/node_map/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ impl NodeState {
.reconfirm_if_used(addr.into(), Source::Udp, now);
}

pub(super) fn receive_relay(&mut self, url: &RelayUrl, _src: &PublicKey, now: Instant) {
pub(super) fn receive_relay(&mut self, url: &RelayUrl, _src: NodeId, now: Instant) {
match self.relay_url.as_mut() {
Some((current_home, state)) if current_home == url => {
// We received on the expected url. update state.
Expand Down
Loading