Skip to content

Commit

Permalink
refactor(iroh-net)!: Do not use &NodeId in APIs as this is Copy (n0-c…
Browse files Browse the repository at this point in the history
…omputer#2363)

## Description

Some of our APIs take NodeId by reference, some by value.  NodeId
itself however is Copy and takes 32 bytes.  I think it is more
consistent and rusty to pass this by value and use the Copy semantics.

Additionally this renames a few more types from PublicKey to NodeId to
keep in line with our convention of using NodeId when used as
identifier rather than cryptography.  I believe rust-analyser might be
inserting PublicKey by itself which is unfortunate.

QuicMappedAddr and IpPort are also a Copy types and get the same
treatment.

## Breaking Changes

- Endpoint::conn_type_stream takes NodeId by value instead of by
reference.

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- ~~[ ] Documentation updates if relevant.~~
- ~~[ ] Tests if relevant.~~
- [x] All breaking changes documented.
  • Loading branch information
flub authored and ppodolsky committed Jun 22, 2024
1 parent 3d1ed38 commit f8d91fa
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 58 deletions.
4 changes: 2 additions & 2 deletions iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ async fn connect(
let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await;
match conn {
Ok(connection) => {
let maybe_stream = endpoint.conn_type_stream(&node_id);
let maybe_stream = endpoint.conn_type_stream(node_id);
let gui = Gui::new(endpoint, node_id);
if let Ok(stream) = maybe_stream {
log_connection_changes(gui.mp.clone(), node_id, stream);
Expand Down Expand Up @@ -770,7 +770,7 @@ async fn accept(
println!("Accepted connection from {}", remote_peer_id);
let t0 = Instant::now();
let gui = Gui::new(endpoint.clone(), remote_peer_id);
if let Ok(stream) = endpoint.conn_type_stream(&remote_peer_id) {
if let Ok(stream) = endpoint.conn_type_stream(remote_peer_id) {
log_connection_changes(gui.mp.clone(), remote_peer_id, stream);
}
let res = active_side(connection, &config, Some(&gui)).await;
Expand Down
12 changes: 6 additions & 6 deletions iroh-net/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ impl Endpoint {

let rtt_msg = RttMessage::NewConnection {
connection: connection.weak_handle(),
conn_type_changes: self.conn_type_stream(node_id)?,
conn_type_changes: self.conn_type_stream(*node_id)?,
node_id: *node_id,
};
if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await {
Expand Down Expand Up @@ -700,7 +700,7 @@ impl Endpoint {
/// # Errors
///
/// Will error if we do not have any address information for the given `node_id`.
pub fn conn_type_stream(&self, node_id: &NodeId) -> Result<ConnectionTypeStream> {
pub fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
self.msock.conn_type_stream(node_id)
}

Expand Down Expand Up @@ -794,7 +794,7 @@ impl Endpoint {
// Only return a mapped addr if we have some way of dialing this node, in other
// words, we have either a relay URL or at least one direct address.
let addr = if self.msock.has_send_address(node_id) {
self.msock.get_mapping_addr(&node_id)
self.msock.get_mapping_addr(node_id)
} else {
None
};
Expand Down Expand Up @@ -822,7 +822,7 @@ impl Endpoint {
let mut discovery = DiscoveryTask::start(self.clone(), node_id)?;
discovery.first_arrived().await?;
if self.msock.has_send_address(node_id) {
let addr = self.msock.get_mapping_addr(&node_id).expect("checked");
let addr = self.msock.get_mapping_addr(node_id).expect("checked");
Ok((addr, Some(discovery)))
} else {
bail!("Failed to retrieve the mapped address from the magic socket. Unable to dial node {node_id:?}");
Expand Down Expand Up @@ -967,7 +967,7 @@ fn try_send_rtt_msg(conn: &quinn::Connection, magic_ep: &Endpoint) {
warn!(?conn, "failed to get remote node id");
return;
};
let Ok(conn_type_changes) = magic_ep.conn_type_stream(&peer_id) else {
let Ok(conn_type_changes) = magic_ep.conn_type_stream(peer_id) else {
warn!(?conn, "failed to create conn_type_stream");
return;
};
Expand Down Expand Up @@ -1411,7 +1411,7 @@ mod tests {
async fn handle_direct_conn(ep: Endpoint, node_id: PublicKey) -> Result<()> {
let node_addr = NodeAddr::new(node_id);
ep.add_node_addr(node_addr)?;
let stream = ep.conn_type_stream(&node_id)?;
let stream = ep.conn_type_stream(node_id)?;
async fn get_direct_event(
src: &PublicKey,
dst: &PublicKey,
Expand Down
13 changes: 7 additions & 6 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::{
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_lite::{FutureExt, Stream, StreamExt};
use iroh_base::key::NodeId;
use iroh_metrics::{inc, inc_by};
use quinn::AsyncUdpSocket;
use rand::{seq::SliceRandom, Rng, SeedableRng};
Expand Down Expand Up @@ -299,8 +300,8 @@ impl MagicSock {
}

/// Retrieve connection information about a node in the network.
pub fn connection_info(&self, node_key: PublicKey) -> Option<ConnectionInfo> {
self.node_map.node_info(&node_key)
pub fn connection_info(&self, node_id: NodeId) -> Option<ConnectionInfo> {
self.node_map.node_info(node_id)
}

/// Returns the local endpoints as a stream.
Expand Down Expand Up @@ -350,17 +351,17 @@ impl MagicSock {
///
/// Will return an error if there is no address information known about the
/// given `node_id`.
pub fn conn_type_stream(&self, node_id: &PublicKey) -> Result<ConnectionTypeStream> {
pub fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
self.node_map.conn_type_stream(node_id)
}

/// Returns the [`SocketAddr`] which can be used by the QUIC layer to dial this node.
///
/// Note this is a user-facing API and does not wrap the [`SocketAddr`] in a
/// [`QuicMappedAddr`] as we do internally.
pub fn get_mapping_addr(&self, node_key: &PublicKey) -> Option<SocketAddr> {
pub fn get_mapping_addr(&self, node_id: NodeId) -> Option<SocketAddr> {
self.node_map
.get_quic_mapped_addr_for_node_key(node_key)
.get_quic_mapped_addr_for_node_key(node_id)
.map(|a| a.0)
}

Expand Down Expand Up @@ -468,7 +469,7 @@ impl MagicSock {
let mut transmits_sent = 0;
match self
.node_map
.get_send_addrs(&dest, self.ipv6_reported.load(Ordering::Relaxed))
.get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed))
{
Some((public_key, udp_addr, relay_url, mut msgs)) => {
let mut pings_sent = false;
Expand Down
78 changes: 35 additions & 43 deletions iroh-net/src/magicsock/node_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ pub(super) struct NodeMapInner {
/// You can look up entries in [`NodeMap`] with various keys, depending on the context you
/// have for the node. These are all the keys the [`NodeMap`] can use.
#[derive(Clone)]
enum NodeStateKey<'a> {
Idx(&'a usize),
NodeId(&'a NodeId),
QuicMappedAddr(&'a QuicMappedAddr),
IpPort(&'a IpPort),
enum NodeStateKey {
Idx(usize),
NodeId(NodeId),
QuicMappedAddr(QuicMappedAddr),
IpPort(IpPort),
}

impl NodeMap {
Expand Down Expand Up @@ -112,8 +112,8 @@ impl NodeMap {
self.inner.lock().receive_udp(udp_addr)
}

pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: PublicKey) -> QuicMappedAddr {
self.inner.lock().receive_relay(relay_url, &src)
pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr {
self.inner.lock().receive_relay(relay_url, src)
}

pub(super) fn notify_ping_sent(
Expand All @@ -124,20 +124,20 @@ impl NodeMap {
purpose: DiscoPingPurpose,
msg_sender: tokio::sync::mpsc::Sender<ActorMessage>,
) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) {
ep.ping_sent(dst, tx_id, purpose, msg_sender);
}
}

pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(&id)) {
if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) {
ep.ping_timeout(tx_id);
}
}

pub(super) fn get_quic_mapped_addr_for_node_key(
&self,
node_key: &PublicKey,
node_key: NodeId,
) -> Option<QuicMappedAddr> {
self.inner
.lock()
Expand Down Expand Up @@ -172,7 +172,7 @@ impl NodeMap {
#[allow(clippy::type_complexity)]
pub(super) fn get_send_addrs(
&self,
addr: &QuicMappedAddr,
addr: QuicMappedAddr,
have_ipv6: bool,
) -> Option<(
PublicKey,
Expand Down Expand Up @@ -223,16 +223,13 @@ impl NodeMap {
///
/// Will return an error if there is not an entry in the [`NodeMap`] for
/// the `public_key`
pub(super) fn conn_type_stream(
&self,
public_key: &PublicKey,
) -> anyhow::Result<ConnectionTypeStream> {
self.inner.lock().conn_type_stream(public_key)
pub(super) fn conn_type_stream(&self, node_id: NodeId) -> anyhow::Result<ConnectionTypeStream> {
self.inner.lock().conn_type_stream(node_id)
}

/// Get the [`NodeInfo`]s for each endpoint
pub(super) fn node_info(&self, public_key: &PublicKey) -> Option<NodeInfo> {
self.inner.lock().node_info(public_key)
pub(super) fn node_info(&self, node_id: NodeId) -> Option<NodeInfo> {
self.inner.lock().node_info(node_id)
}

/// Saves the known node info to the given path, returning the number of nodes persisted.
Expand Down Expand Up @@ -323,7 +320,7 @@ impl NodeMapInner {
fn add_node_addr(&mut self, node_addr: NodeAddr) {
let NodeAddr { node_id, info } = node_addr;

let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&node_id), || Options {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(node_id), || Options {
node_id,
relay_url: info.relay_url.clone(),
active: false,
Expand All @@ -338,10 +335,10 @@ impl NodeMapInner {

fn get_id(&self, id: NodeStateKey) -> Option<usize> {
match id {
NodeStateKey::Idx(id) => Some(*id),
NodeStateKey::NodeId(node_key) => self.by_node_key.get(node_key).copied(),
NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(addr).copied(),
NodeStateKey::IpPort(ipp) => self.by_ip_port.get(ipp).copied(),
NodeStateKey::Idx(id) => Some(id),
NodeStateKey::NodeId(node_key) => self.by_node_key.get(&node_key).copied(),
NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(),
NodeStateKey::IpPort(ipp) => self.by_ip_port.get(&ipp).copied(),
}
}

Expand Down Expand Up @@ -373,7 +370,7 @@ impl NodeMapInner {
/// Marks the node we believe to be at `ipp` as recently used.
fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> {
let ip_port: IpPort = udp_addr.into();
let Some(node_state) = self.get_mut(NodeStateKey::IpPort(&ip_port)) else {
let Some(node_state) = self.get_mut(NodeStateKey::IpPort(ip_port)) else {
info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore");
return None;
};
Expand All @@ -382,11 +379,11 @@ impl NodeMapInner {
}

#[instrument(skip_all, fields(src = %src.fmt_short()))]
fn receive_relay(&mut self, relay_url: &RelayUrl, src: &PublicKey) -> QuicMappedAddr {
fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || {
trace!("packets from unknown node, insert into node map");
Options {
node_id: *src,
node_id: src,
relay_url: Some(relay_url.clone()),
active: true,
}
Expand All @@ -409,8 +406,8 @@ impl NodeMapInner {
}

/// Get the [`NodeInfo`]s for each endpoint
fn node_info(&self, public_key: &PublicKey) -> Option<NodeInfo> {
self.get(NodeStateKey::NodeId(public_key))
fn node_info(&self, node_id: NodeId) -> Option<NodeInfo> {
self.get(NodeStateKey::NodeId(node_id))
.map(|ep| ep.info(Instant::now()))
}

Expand All @@ -423,18 +420,18 @@ impl NodeMapInner {
///
/// Will return an error if there is not an entry in the [`NodeMap`] for
/// the `public_key`
fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result<ConnectionTypeStream> {
match self.get(NodeStateKey::NodeId(public_key)) {
fn conn_type_stream(&self, node_id: NodeId) -> anyhow::Result<ConnectionTypeStream> {
match self.get(NodeStateKey::NodeId(node_id)) {
Some(ep) => Ok(ConnectionTypeStream {
initial: Some(ep.conn_type()),
inner: ep.conn_type_stream(),
}),
None => anyhow::bail!("No endpoint for {public_key:?} found"),
None => anyhow::bail!("No endpoint for {node_id:?} found"),
}
}

fn handle_pong(&mut self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) {
if let Some(ns) = self.get_mut(NodeStateKey::NodeId(&sender)).as_mut() {
fn handle_pong(&mut self, sender: NodeId, src: &DiscoMessageSource, pong: Pong) {
if let Some(ns) = self.get_mut(NodeStateKey::NodeId(sender)).as_mut() {
let insert = ns.handle_pong(&pong, src.into());
if let Some((src, key)) = insert {
self.set_node_key_for_ip_port(src, &key);
Expand All @@ -446,8 +443,8 @@ impl NodeMapInner {
}

#[must_use = "actions must be handled"]
fn handle_call_me_maybe(&mut self, sender: PublicKey, cm: CallMeMaybe) -> Vec<PingAction> {
let ns_id = NodeStateKey::NodeId(&sender);
fn handle_call_me_maybe(&mut self, sender: NodeId, cm: CallMeMaybe) -> Vec<PingAction> {
let ns_id = NodeStateKey::NodeId(sender);
if let Some(id) = self.get_id(ns_id.clone()) {
for number in &cm.my_numbers {
// ensure the new addrs are known
Expand All @@ -468,13 +465,8 @@ impl NodeMapInner {
}
}

fn handle_ping(
&mut self,
sender: PublicKey,
src: SendAddr,
tx_id: TransactionId,
) -> PingHandled {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(&sender), || {
fn handle_ping(&mut self, sender: NodeId, src: SendAddr, tx_id: TransactionId) -> PingHandled {
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(sender), || {
debug!("received ping: node unknown, add to node map");
Options {
node_id: sender,
Expand Down Expand Up @@ -815,7 +807,7 @@ mod tests {
node_map
.inner
.lock()
.get(NodeStateKey::NodeId(&active_node))
.get(NodeStateKey::NodeId(active_node))
.expect("should not be pruned");
}
}
2 changes: 1 addition & 1 deletion iroh-net/src/magicsock/node_map/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ impl NodeState {
.reconfirm_if_used(addr.into(), Source::Udp, now);
}

pub(super) fn receive_relay(&mut self, url: &RelayUrl, _src: &PublicKey, now: Instant) {
pub(super) fn receive_relay(&mut self, url: &RelayUrl, _src: NodeId, now: Instant) {
match self.relay_url.as_mut() {
Some((current_home, state)) if current_home == url => {
// We received on the expected url. update state.
Expand Down

0 comments on commit f8d91fa

Please sign in to comment.