Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh-net): reverse ip-port mapping stores only direct addresses in the peermap #1606

65 changes: 24 additions & 41 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,10 +1229,7 @@ impl Actor {
/// Returns `true` if the message should be processed.
fn receive_ip(&mut self, bytes: &Bytes, meta: &mut quinn_udp::RecvMeta) -> bool {
debug!("received data {} from {}", meta.len, meta.addr);
match self
.peer_map
.endpoint_for_ip_port(&SendAddr::Udp(meta.addr))
{
match self.peer_map.endpoint_for_ip_port(meta.addr) {
None => {
warn!(peer=?meta.addr, "no peer_map state found for peer, skipping");
return false;
Expand Down Expand Up @@ -1298,7 +1295,6 @@ impl Actor {
derp_region: Some(region_id),
active: true,
});
self.peer_map.set_endpoint_for_ip_port(&ipp, id);
let ep = self.peer_map.by_id_mut(&id).expect("inserted");
ep.quic_mapped_addr
}
Expand Down Expand Up @@ -2137,13 +2133,17 @@ impl Actor {
}

let mut unknown_sender = false;
if self.peer_map.endpoint_for_node_key(&sender).is_none()
&& self.peer_map.endpoint_for_ip_port_mut(&src).is_none()
{
if self.peer_map.endpoint_for_node_key(&sender).is_none() {
unknown_sender = match src {
SendAddr::Udp(addr) => self.peer_map.endpoint_for_ip_port_mut(addr).is_none(),
SendAddr::Derp(_) => true,
};
}

if unknown_sender {
// Disco Ping from unseen endpoint. We will have to add the
// endpoint later if the message is a ping
info!("disco: unknown sender {:?} - {}", sender, src);
unknown_sender = true;
}

// We're now reasonably sure we're expecting communication from
Expand Down Expand Up @@ -2209,11 +2209,11 @@ impl Actor {
disco::Message::Pong(pong) => {
inc!(MagicsockMetrics, recv_disco_pong);
if let Some(ep) = self.peer_map.endpoint_for_node_key_mut(&sender) {
let (_, insert) = ep
let insert = ep
.handle_pong_conn(&self.inner.public_key(), &pong, di, src)
.await;
if let Some((src, key)) = insert {
self.peer_map.set_node_key_for_ip_port(&src, &key);
self.peer_map.set_node_key_for_ip_port(src, &key);
}
}
true
Expand Down Expand Up @@ -2268,48 +2268,31 @@ impl Actor {
.unwrap_or_default();
di.last_ping_from.replace(src);
di.last_ping_time.replace(Instant::now());
let is_derp = src.is_derp();

// If we got a ping over DERP, then derp_node_src is non-zero and we reply
// over DERP (in which case ip_dst is also a DERP address).
// But if the ping was over UDP (ip_dst is not a DERP address), then dst_key
// will be zero here, but that's fine: send_disco_message only requires
// a dstKey if the dst ip:port is DERP.

if is_derp {
assert!(derp_node_src.is_some());
} else {
assert!(derp_node_src.is_none());
}

let (dst_key, insert) = match derp_node_src {
let dst_key = match derp_node_src {
Some(dst_key) => {
// From Derp
if let Some(ep) = self.peer_map.endpoint_for_node_key_mut(&dst_key) {
if ep.add_candidate_endpoint(src, dm.tx_id) {
debug!("disco: ping got duplicate endpoint {} - {}", src, dm.tx_id);
return;
}
(dst_key, true)
} else {
(dst_key, false)
}
assert!(src.is_derp());
dst_key
}
None => {
if let Some(ep) = self.peer_map.endpoint_for_node_key_mut(&di.node_key) {
if ep.add_candidate_endpoint(src, dm.tx_id) {
debug!("disco: ping got duplicate endpoint {} - {}", src, dm.tx_id);
return;
}
(di.node_key, true)
} else {
(di.node_key, false)
}
assert!(!src.is_derp());
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
di.node_key
}
};

if insert {
self.peer_map.set_node_key_for_ip_port(&src, &dst_key);
if let Some(ep) = self.peer_map.endpoint_for_node_key_mut(&dst_key) {
if ep.add_candidate_endpoint(src, dm.tx_id) {
debug!("disco: ping got duplicate endpoint {} - {}", src, dm.tx_id);
return;
}
if let SendAddr::Udp(addr) = src {
self.peer_map.set_node_key_for_ip_port(addr, &dst_key);
}
}

if !likely_heart_beat {
Expand Down
74 changes: 52 additions & 22 deletions iroh-net/src/magicsock/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,14 @@ impl Endpoint {

/// Handles a Pong message (a reply to an earlier ping).
///
/// It reports whether m.tx_id corresponds to a ping that this endpoint sent.
/// It reports the address and key that should be inserted for the endpoint if any.
pub(super) async fn handle_pong_conn(
&mut self,
conn_disco_public: &PublicKey,
m: &disco::Pong,
_di: &mut DiscoInfo,
src: SendAddr,
) -> (bool, Option<(SendAddr, PublicKey)>) {
) -> Option<(SocketAddr, PublicKey)> {
let is_derp = src.is_derp();

info!(
Expand All @@ -661,27 +661,26 @@ impl Endpoint {
"disco: received unexpected pong {:?} from {:?}",
m.tx_id, src,
);
(false, None)
None
}
Some(sp) => {
sp.timer.stop().await;

let known_tx_id = true;
let mut peer_map_insert = None;

let now = Instant::now();
let latency = now - sp.at;

if !is_derp {
if let SendAddr::Udp(addr) = src {
let key = self.public_key;
match self.endpoint_state.get_mut(&sp.to) {
None => {
info!("disco: ignoring pong: {}", sp.to);
// This is no longer an endpoint we care about.
return (known_tx_id, peer_map_insert);
return peer_map_insert;
}
Some(st) => {
peer_map_insert = Some((src, key));
peer_map_insert = Some((addr, key));
st.add_pong_reply(PongReply {
latency,
pong_at: now,
Expand Down Expand Up @@ -762,7 +761,7 @@ impl Endpoint {
}
}

(known_tx_id, peer_map_insert)
peer_map_insert
}
}
}
Expand Down Expand Up @@ -964,6 +963,33 @@ pub struct AddrLatency {
pub latency: Option<Duration>,
}

/// An (Ip, Port) pair.
///
/// NOTE: storing an [`IpPort`] is safer than storing a [`SocketAddr`] because for IPv6 socket
/// addresses include fields that can't be assumed consistent even within a single connection.
#[derive(Debug, derive_more::Display, Clone, Copy, Hash, PartialEq, Eq)]
#[display("{}", SocketAddr::from(*self))]
pub struct IpPort {
ip: IpAddr,
port: u16,
}

impl From<SocketAddr> for IpPort {
fn from(socket_addr: SocketAddr) -> Self {
Self {
ip: socket_addr.ip(),
port: socket_addr.port(),
}
}
}

impl From<IpPort> for SocketAddr {
fn from(ip_port: IpPort) -> Self {
let IpPort { ip, port } = ip_port;
(ip, port).into()
}
}

/// Map of the [`Endpoint`] information for all the known peers.
///
/// The peers can be looked up by:
Expand All @@ -985,7 +1011,7 @@ pub struct AddrLatency {
#[derive(Default, Debug)]
pub(super) struct PeerMap {
by_node_key: HashMap<PublicKey, usize>,
by_ip_port: HashMap<SendAddr, usize>,
by_ip_port: HashMap<IpPort, usize>,
by_quic_mapped_addr: HashMap<QuicMappedAddr, usize>,
by_id: HashMap<usize, Endpoint>,
next_id: usize,
Expand Down Expand Up @@ -1032,7 +1058,7 @@ impl PeerMap {
ep.update_from_node_addr(&info);
let id = ep.id;
for endpoint in &info.direct_addresses {
self.set_endpoint_for_ip_port(&SendAddr::Udp(*endpoint), id);
self.set_endpoint_for_ip_port(*endpoint, id);
}
}
}
Expand Down Expand Up @@ -1062,13 +1088,15 @@ impl PeerMap {
}

/// Returns the endpoint for the peer we believe to be at ipp, or nil if we don't know of any such peer.
pub(super) fn endpoint_for_ip_port(&self, ipp: &SendAddr) -> Option<&Endpoint> {
self.by_ip_port.get(ipp).and_then(|id| self.by_id(id))
pub(super) fn endpoint_for_ip_port(&self, ipp: impl Into<IpPort>) -> Option<&Endpoint> {
self.by_ip_port
.get(&ipp.into())
.and_then(|id| self.by_id(id))
}

pub fn endpoint_for_ip_port_mut(&mut self, ipp: &SendAddr) -> Option<&mut Endpoint> {
pub fn endpoint_for_ip_port_mut(&mut self, ipp: impl Into<IpPort>) -> Option<&mut Endpoint> {
self.by_ip_port
.get(ipp)
.get(&ipp.into())
.and_then(|id| self.by_id.get_mut(id))
}

Expand Down Expand Up @@ -1118,22 +1146,24 @@ impl PeerMap {
/// This should only be called with a fully verified mapping of ipp to
/// nk, because calling this function defines the endpoint we hand to
/// WireGuard for packets received from ipp.
pub(super) fn set_node_key_for_ip_port(&mut self, ipp: &SendAddr, nk: &PublicKey) {
if let Some(id) = self.by_ip_port.get(ipp) {
pub(super) fn set_node_key_for_ip_port(&mut self, ipp: impl Into<IpPort>, nk: &PublicKey) {
let ipp = ipp.into();
if let Some(id) = self.by_ip_port.get(&ipp) {
if !self.by_node_key.contains_key(nk) {
self.by_node_key.insert(*nk, *id);
}
self.by_ip_port.remove(ipp);
self.by_ip_port.remove(&ipp);
}
if let Some(id) = self.by_node_key.get(nk) {
trace!("insert ip -> id: {:?} -> {}", ipp, id);
self.by_ip_port.insert(*ipp, *id);
self.by_ip_port.insert(ipp, *id);
}
}

pub(super) fn set_endpoint_for_ip_port(&mut self, ipp: &SendAddr, id: usize) {
pub(super) fn set_endpoint_for_ip_port(&mut self, ipp: impl Into<IpPort>, id: usize) {
let ipp = ipp.into();
trace!("insert ip -> id: {:?} -> {}", ipp, id);
self.by_ip_port.insert(*ipp, id);
self.by_ip_port.insert(ipp, id);
}

/// Saves the known peer info to the given path, returning the number of peers persisted.
Expand Down Expand Up @@ -1553,8 +1583,8 @@ mod tests {
(d_endpoint.public_key, d_endpoint.id),
]),
by_ip_port: HashMap::from([
(SendAddr::Udp(a_socket_addr), a_endpoint.id),
(SendAddr::Udp(d_socket_addr), d_endpoint.id),
(a_socket_addr.into(), a_endpoint.id),
(d_socket_addr.into(), d_endpoint.id),
]),
by_quic_mapped_addr: HashMap::from([
(a_endpoint.quic_mapped_addr, a_endpoint.id),
Expand Down
Loading