From 0d06320bda68e87af04e784ee7f607939729639d Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 4 Dec 2024 10:44:29 +0100 Subject: [PATCH 1/6] fix(iroh, iroh-relay)!: Bypass magicsock::Actor for datagrams from the relay (#2986) ## Description Datagrams received via the relay used to be sent to the `magicsock::Actor` for processing (handling DISCO packets and adding RecvMeta). After processing they were put on a channel from which `AsyncUdpSocket::poll_recv` would read them. The major problem was that the relay actor did not handle the backpressure well: the channel it was placing the processed datagrams on is smaller than its inbox and it did not want to drop packets (which is good, dropping packets at this point is not helpful). So the entire actor blocked, which has negative effects all around. This change connects a channel directly between the `ConnectedRelay` actor which receives the packets and the `AsyncUdpSocket::poll_recv`. The processing which was happening in the `magicsock::Actor` now happens inside `poll_recv`. This is fine because: - The processing needs access to the `MagicSock` inners, like the `NodeMap`. This required accessing the same mutexes that `AsyncUdpSocket::poll_recv` was already accessing anyway. So this path is now slowed down. - The alternative was moving the processing to the `ConnectedRelay` actor, which would have added potential more conention on the `MagicSock` mutexted-inners. To support this there are a few additional changes: - The channel between the `AsyncUdpSocket::poll_recv(_relay)` and the `ConnectedRelay` actor has been abstracted into the `RelayRecvChannel`. This takes care of the wakers, which now do not need to be separate fields on the `MagicSock`, reducing the `MagicSock` state and risk of using the wakers wrongly. - The `ActiveRelay` actor is renamed to `ConnectedRelay`. This improves readability. - The `PacketSplitIter` is moved into the `relay_actor` module, this is where the corresponding `PacketizeIter` lives and makes the two much better logically linked. - ReceviedMessage::ReceivedPacket::source is renamed into remote_node_id. ## Breaking Changes ### iroh-relay - `ReceivedMessage::ReceivedPacket::source` is renamed to `ReceivedMessage::ReceivedPacket::remote_node_id`. ## Notes & open questions Closes #2982. I'd be happy to separate out some more changes from this: - PacketSplitIter move can easily be its own independent PR. - ActiveRelay -> ConnectedRelay rename can easily be independent too. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-relay/src/client/conn.rs | 4 +- iroh-relay/src/server.rs | 48 +++- iroh-relay/src/server/http_server.rs | 48 +++- iroh/src/magicsock.rs | 346 ++++++++++++++------------- iroh/src/magicsock/relay_actor.rs | 269 ++++++++++++++------- 5 files changed, 425 insertions(+), 290 deletions(-) diff --git a/iroh-relay/src/client/conn.rs b/iroh-relay/src/client/conn.rs index 020727743b..8230f79c2f 100644 --- a/iroh-relay/src/client/conn.rs +++ b/iroh-relay/src/client/conn.rs @@ -179,7 +179,7 @@ fn process_incoming_frame(frame: Frame) -> Result { Frame::NodeGone { node_id } => Ok(ReceivedMessage::NodeGone(node_id)), Frame::RecvPacket { src_key, content } => { let packet = ReceivedMessage::ReceivedPacket { - source: src_key, + remote_node_id: src_key, data: content, }; Ok(packet) @@ -451,7 +451,7 @@ pub enum ReceivedMessage { /// Represents an incoming packet. ReceivedPacket { /// The [`NodeId`] of the packet sender. - source: NodeId, + remote_node_id: NodeId, /// The received packet bytes. #[debug(skip)] data: Bytes, // TODO: ref diff --git a/iroh-relay/src/server.rs b/iroh-relay/src/server.rs index 671782d93c..7d14b6befb 100644 --- a/iroh-relay/src/server.rs +++ b/iroh-relay/src/server.rs @@ -914,8 +914,12 @@ mod tests { client_a.send(b_key, msg.clone()).await.unwrap(); let res = client_b_receiver.recv().await.unwrap().unwrap(); - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(a_key, source); + if let ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } = res + { + assert_eq!(a_key, remote_node_id); assert_eq!(msg, data); } else { panic!("client_b received unexpected message {res:?}"); @@ -926,8 +930,12 @@ mod tests { client_b.send(a_key, msg.clone()).await.unwrap(); let res = client_a_receiver.recv().await.unwrap().unwrap(); - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(b_key, source); + if let ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } = res + { + assert_eq!(b_key, remote_node_id); assert_eq!(msg, data); } else { panic!("client_a received unexpected message {res:?}"); @@ -982,8 +990,12 @@ mod tests { client_a.send(b_key, msg.clone()).await.unwrap(); let res = client_b_receiver.recv().await.unwrap().unwrap(); - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(a_key, source); + if let ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } = res + { + assert_eq!(a_key, remote_node_id); assert_eq!(msg, data); } else { panic!("client_b received unexpected message {res:?}"); @@ -994,8 +1006,12 @@ mod tests { client_b.send(a_key, msg.clone()).await.unwrap(); let res = client_a_receiver.recv().await.unwrap().unwrap(); - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(b_key, source); + if let ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } = res + { + assert_eq!(b_key, remote_node_id); assert_eq!(msg, data); } else { panic!("client_a received unexpected message {res:?}"); @@ -1049,8 +1065,12 @@ mod tests { client_a.send(b_key, msg.clone()).await.unwrap(); let res = client_b_receiver.recv().await.unwrap().unwrap(); - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(a_key, source); + if let ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } = res + { + assert_eq!(a_key, remote_node_id); assert_eq!(msg, data); } else { panic!("client_b received unexpected message {res:?}"); @@ -1061,8 +1081,12 @@ mod tests { client_b.send(a_key, msg.clone()).await.unwrap(); let res = client_a_receiver.recv().await.unwrap().unwrap(); - if let ReceivedMessage::ReceivedPacket { source, data } = res { - assert_eq!(b_key, source); + if let ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } = res + { + assert_eq!(b_key, remote_node_id); assert_eq!(msg, data); } else { panic!("client_a received unexpected message {res:?}"); diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index 884b83a2f6..767f5bb975 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -799,7 +799,11 @@ mod tests { } Some(Ok(msg)) => { info!("got message on {:?}: {msg:?}", key.public()); - if let ReceivedMessage::ReceivedPacket { source, data } = msg { + if let ReceivedMessage::ReceivedPacket { + remote_node_id: source, + data, + } = msg + { received_msg_s .send((source, data)) .await @@ -947,8 +951,11 @@ mod tests { let msg = Bytes::from_static(b"hello client b!!"); client_a.send(public_key_b, msg.clone()).await?; match client_receiver_b.recv().await? { - ReceivedMessage::ReceivedPacket { source, data } => { - assert_eq!(public_key_a, source); + ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } => { + assert_eq!(public_key_a, remote_node_id); assert_eq!(&msg[..], data); } msg => { @@ -960,8 +967,11 @@ mod tests { let msg = Bytes::from_static(b"nice to meet you client a!!"); client_b.send(public_key_a, msg.clone()).await?; match client_receiver_a.recv().await? { - ReceivedMessage::ReceivedPacket { source, data } => { - assert_eq!(public_key_b, source); + ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } => { + assert_eq!(public_key_b, remote_node_id); assert_eq!(&msg[..], data); } msg => { @@ -1027,8 +1037,11 @@ mod tests { let msg = Bytes::from_static(b"hello client b!!"); client_a.send(public_key_b, msg.clone()).await?; match client_receiver_b.recv().await? { - ReceivedMessage::ReceivedPacket { source, data } => { - assert_eq!(public_key_a, source); + ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } => { + assert_eq!(public_key_a, remote_node_id); assert_eq!(&msg[..], data); } msg => { @@ -1040,8 +1053,11 @@ mod tests { let msg = Bytes::from_static(b"nice to meet you client a!!"); client_b.send(public_key_a, msg.clone()).await?; match client_receiver_a.recv().await? { - ReceivedMessage::ReceivedPacket { source, data } => { - assert_eq!(public_key_b, source); + ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } => { + assert_eq!(public_key_b, remote_node_id); assert_eq!(&msg[..], data); } msg => { @@ -1065,8 +1081,11 @@ mod tests { let msg = Bytes::from_static(b"are you still there, b?!"); client_a.send(public_key_b, msg.clone()).await?; match new_client_receiver_b.recv().await? { - ReceivedMessage::ReceivedPacket { source, data } => { - assert_eq!(public_key_a, source); + ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } => { + assert_eq!(public_key_a, remote_node_id); assert_eq!(&msg[..], data); } msg => { @@ -1078,8 +1097,11 @@ mod tests { let msg = Bytes::from_static(b"just had a spot of trouble but I'm back now,a!!"); new_client_b.send(public_key_a, msg.clone()).await?; match client_receiver_a.recv().await? { - ReceivedMessage::ReceivedPacket { source, data } => { - assert_eq!(public_key_b, source); + ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } => { + assert_eq!(public_key_b, remote_node_id); assert_eq!(&msg[..], data); } msg => { diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 07c2d5e9d2..c117d2cf77 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -56,7 +56,7 @@ use watchable::Watchable; use self::{ metrics::Metrics as MagicsockMetrics, node_map::{NodeMap, PingAction, PingRole, SendPing}, - relay_actor::{RelayActor, RelayActorMessage, RelayReadResult}, + relay_actor::{RelayActor, RelayActorMessage, RelayRecvDatagram}, udp_conn::UdpConn, }; use crate::{ @@ -180,11 +180,13 @@ pub(crate) struct MagicSock { me: String, /// Proxy proxy_url: Option, + /// Channel to receive datagrams from relays for [`AsyncUdpSocket::poll_recv`]. + /// + /// QUIC datagrams received by relays are put on this channel and consumed by + /// [`AsyncUdpSocket`]. This channel takes care of the wakers needed by + /// [`AsyncUdpSocket::poll_recv`]. + relay_recv_channel: RelayRecvReceiver, - /// Used for receiving relay messages. - relay_recv_receiver: parking_lot::Mutex>, - /// Stores wakers, to be called when relay_recv_ch receives new data. - network_recv_wakers: parking_lot::Mutex>, network_send_wakers: Arc>>, /// Counter for ordering of [`MagicSock::poll_recv`] polling order. poll_recv_counter: AtomicUsize, @@ -850,29 +852,46 @@ impl MagicSock { metas: &mut [quinn_udp::RecvMeta], ) -> Poll> { let mut num_msgs = 0; - for (buf_out, meta_out) in bufs.iter_mut().zip(metas.iter_mut()) { + 'outer: for (buf_out, meta_out) in bufs.iter_mut().zip(metas.iter_mut()) { if self.is_closed() { break; } - let mut relay_recv_receiver = self.relay_recv_receiver.lock(); - match relay_recv_receiver.try_recv() { - Err(mpsc::error::TryRecvError::Empty) => { - self.network_recv_wakers.lock().replace(cx.waker().clone()); - break; - } - Err(mpsc::error::TryRecvError::Disconnected) => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::NotConnected, - "connection closed", - ))); - } - Ok(Err(err)) => return Poll::Ready(Err(err)), - Ok(Ok((node_id, meta, bytes))) => { - inc_by!(MagicsockMetrics, recv_data_relay, bytes.len() as _); - trace!(src = %meta.addr, node = %node_id.fmt_short(), count = meta.len / meta.stride, len = meta.len, "recv quic packets from relay"); - buf_out[..bytes.len()].copy_from_slice(&bytes); - *meta_out = meta; - num_msgs += 1; + + // For each output buffer keep polling the datagrams from the relay until one is + // a QUIC datagram to be placed into the output buffer. Or the channel is empty. + loop { + let recv = match self.relay_recv_channel.poll_recv(cx) { + Poll::Ready(Ok(recv)) => recv, + Poll::Ready(Err(err)) => { + error!("relay_recv_channel closed: {err:#}"); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "connection closed", + ))); + } + Poll::Pending => { + break 'outer; + } + }; + match self.process_relay_read_result(recv) { + None => { + // Received a DISCO or STUN datagram that was handled internally. + continue; + } + Some((node_id, meta, buf)) => { + inc_by!(MagicsockMetrics, recv_data_relay, buf.len() as _); + trace!( + src = %meta.addr, + node = %node_id.fmt_short(), + count = meta.len / meta.stride, + len = meta.len, + "recv quic packets from relay", + ); + buf_out[..buf.len()].copy_from_slice(&buf); + *meta_out = meta; + num_msgs += 1; + break; + } } } } @@ -886,6 +905,74 @@ impl MagicSock { } } + /// Process datagrams received from the relay server into incoming Quinn datagrams. + /// + /// This will transform datagrams received from the relay server into Quinn datagrams to + /// receive, adding the [`quinn_udp::RecvMeta`]. + /// + /// If the incoming datagram is a DISCO packet it will be handled internally and `None` + /// is returned. + fn process_relay_read_result( + &self, + dm: RelayRecvDatagram, + ) -> Option<(NodeId, quinn_udp::RecvMeta, Bytes)> { + trace!("process_relay_read {} bytes", dm.buf.len()); + if dm.buf.is_empty() { + warn!("received empty relay packet"); + return None; + } + + if self.handle_relay_disco_message(&dm.buf, &dm.url, dm.src) { + // DISCO messages are handled internally in the MagicSock, do not pass to Quinn. + return None; + } + + let quic_mapped_addr = self.node_map.receive_relay(&dm.url, dm.src); + + // Normalize local_ip + #[cfg(not(windows))] + let dst_ip = self.normalized_local_addr().ok().map(|addr| addr.ip()); + // Reasoning for this here: + // https://github.com/n0-computer/iroh/pull/2595#issuecomment-2290947319 + #[cfg(windows)] + let dst_ip = None; + + let meta = quinn_udp::RecvMeta { + len: dm.buf.len(), + stride: dm.buf.len(), + addr: quic_mapped_addr.0, + dst_ip, + ecn: None, + }; + Some((dm.src, meta, dm.buf)) + } + + fn handle_relay_disco_message( + &self, + msg: &[u8], + url: &RelayUrl, + relay_node_src: PublicKey, + ) -> bool { + match disco::source_and_box(msg) { + Some((source, sealed_box)) => { + if relay_node_src != source { + // TODO: return here? + warn!("Received relay disco message from connection for {}, but with message from {}", relay_node_src.fmt_short(), source.fmt_short()); + } + self.handle_disco_message( + source, + sealed_box, + DiscoMessageSource::Relay { + url: url.clone(), + key: relay_node_src, + }, + ); + true + } + None => false, + } + } + /// Handles a discovery message. #[instrument("disco_in", skip_all, fields(node = %sender.fmt_short(), %src))] fn handle_disco_message(&self, sender: PublicKey, sealed_box: &[u8], src: DiscoMessageSource) { @@ -1423,7 +1510,7 @@ impl Handle { insecure_skip_relay_cert_verify, } = opts; - let (relay_recv_sender, relay_recv_receiver) = mpsc::channel(128); + let (relay_recv_tx, relay_recv_rx) = relay_recv_channel(); let (pconn4, pconn6) = bind(addr_v4, addr_v6)?; let port = pconn4.port(); @@ -1460,8 +1547,7 @@ impl Handle { local_addrs: std::sync::RwLock::new((ipv4_addr, ipv6_addr)), closing: AtomicBool::new(false), closed: AtomicBool::new(false), - relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver), - network_recv_wakers: parking_lot::Mutex::new(None), + relay_recv_channel: relay_recv_rx, network_send_wakers: Arc::new(parking_lot::Mutex::new(None)), poll_recv_counter: AtomicUsize::new(0), actor_sender: actor_sender.clone(), @@ -1486,7 +1572,7 @@ impl Handle { let mut actor_tasks = JoinSet::default(); - let relay_actor = RelayActor::new(inner.clone(), actor_sender.clone()); + let relay_actor = RelayActor::new(inner.clone(), relay_recv_tx); let relay_actor_cancel_token = relay_actor.cancel_token(); actor_tasks.spawn( async move { @@ -1514,7 +1600,6 @@ impl Handle { relay_actor_sender, relay_actor_cancel_token, msock: inner2, - relay_recv_sender, periodic_re_stun_timer: new_re_stun_timer(false), net_info_last: None, port_mapper, @@ -1627,7 +1712,67 @@ enum DiscoBoxError { Parse(anyhow::Error), } -type RelayRecvResult = Result<(PublicKey, quinn_udp::RecvMeta, Bytes), io::Error>; +/// Channel for [`MagicSock::poll_recv_relay`] to receive datagrams from relays. +/// +/// The sender and receiver will take care of the required wakers needed for +/// [`AsyncUdpSocket::poll_recv`]. +// TODO: This channel should possibly be implemented with concurrent-queue and atomic-waker. +// Or maybe async-channel. +fn relay_recv_channel() -> (RelayRecvSender, RelayRecvReceiver) { + let (tx, rx) = mpsc::channel(128); + let waker = Arc::new(parking_lot::Mutex::new(None)); + let sender = RelayRecvSender { + sender: tx, + waker: waker.clone(), + }; + let receiver = RelayRecvReceiver { + receiver: parking_lot::Mutex::new(rx), + waker, + }; + (sender, receiver) +} + +#[derive(Debug, Clone)] +struct RelayRecvSender { + sender: mpsc::Sender, + waker: Arc>>, +} + +impl RelayRecvSender { + fn try_send( + &self, + item: RelayRecvDatagram, + ) -> Result<(), mpsc::error::TrySendError> { + self.sender.try_send(item).inspect(|_| { + if let Some(waker) = self.waker.lock().take() { + waker.wake(); + } + }) + } +} + +#[derive(Debug)] +struct RelayRecvReceiver { + receiver: parking_lot::Mutex>, + waker: Arc>>, +} + +impl RelayRecvReceiver { + fn poll_recv(&self, cx: &mut Context) -> Poll> { + let mut receiver = self.receiver.lock(); + self.waker.lock().replace(cx.waker().clone()); + match receiver.try_recv() { + Ok(item) => { + self.waker.lock().take(); + Poll::Ready(Ok(item)) + } + Err(mpsc::error::TryRecvError::Empty) => Poll::Pending, + Err(mpsc::error::TryRecvError::Disconnected) => { + Poll::Ready(Err(anyhow!("All RelayRecvSenders disconnected"))) + } + } + } +} impl AsyncUdpSocket for Handle { fn create_io_poller(self: Arc) -> Pin> { @@ -1735,7 +1880,6 @@ impl quinn::UdpPoller for IoPoller { #[derive(Debug)] enum ActorMessage { Shutdown, - ReceiveRelay(RelayReadResult), EndpointPingExpired(usize, stun_rs::TransactionId), NetReport(Result>>, &'static str), NetworkChange, @@ -1749,8 +1893,6 @@ struct Actor { msg_sender: mpsc::Sender, relay_actor_sender: mpsc::Sender, relay_actor_cancel_token: CancellationToken, - /// Channel to send received relay messages on, for processing. - relay_recv_sender: mpsc::Sender, /// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun. periodic_re_stun_timer: time::Interval, /// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc. @@ -1941,19 +2083,6 @@ impl Actor { debug!("shutdown complete"); return true; } - ActorMessage::ReceiveRelay(read_result) => { - let passthroughs = self.process_relay_read_result(read_result); - for passthrough in passthroughs { - self.relay_recv_sender - .send(passthrough) - .await - .expect("missing recv sender"); - let mut wakers = self.msock.network_recv_wakers.lock(); - if let Some(waker) = wakers.take() { - waker.wake(); - } - } - } ActorMessage::EndpointPingExpired(id, txid) => { self.msock.node_map.notify_ping_timeout(id, txid); } @@ -1983,59 +2112,6 @@ impl Actor { false } - #[cfg_attr(windows, allow(dead_code))] - fn normalized_local_addr(&self) -> io::Result { - self.msock.normalized_local_addr() - } - - fn process_relay_read_result(&mut self, dm: RelayReadResult) -> Vec { - trace!("process_relay_read {} bytes", dm.buf.len()); - if dm.buf.is_empty() { - warn!("received empty relay packet"); - return Vec::new(); - } - let url = &dm.url; - - let quic_mapped_addr = self.msock.node_map.receive_relay(url, dm.src); - - // the relay packet is made up of multiple udp packets, prefixed by a u16 be length prefix - // - // split the packet into these parts - let parts = PacketSplitIter::new(dm.buf); - // Normalize local_ip - #[cfg(not(windows))] - let dst_ip = self.normalized_local_addr().ok().map(|addr| addr.ip()); - // Reasoning for this here: https://github.com/n0-computer/iroh/pull/2595#issuecomment-2290947319 - #[cfg(windows)] - let dst_ip = None; - - let mut out = Vec::new(); - for part in parts { - match part { - Ok(part) => { - if self.handle_relay_disco_message(&part, url, dm.src) { - // Message was internal, do not bubble up. - continue; - } - - let meta = quinn_udp::RecvMeta { - len: part.len(), - stride: part.len(), - addr: quic_mapped_addr.0, - dst_ip, - ecn: None, - }; - out.push(Ok((dm.src, meta, part))); - } - Err(e) => { - out.push(Err(e)); - } - } - } - - out - } - /// Refreshes knowledge about our direct addresses. /// /// In other words, this triggers a net_report run. @@ -2412,32 +2488,6 @@ impl Actor { } } } - - fn handle_relay_disco_message( - &mut self, - msg: &[u8], - url: &RelayUrl, - relay_node_src: PublicKey, - ) -> bool { - match disco::source_and_box(msg) { - Some((source, sealed_box)) => { - if relay_node_src != source { - // TODO: return here? - warn!("Received relay disco message from connection for {}, but with message from {}", relay_node_src.fmt_short(), source.fmt_short()); - } - self.msock.handle_disco_message( - source, - sealed_box, - DiscoMessageSource::Relay { - url: url.clone(), - key: relay_node_src, - }, - ); - true - } - None => false, - } - } } fn new_re_stun_timer(initial_delay: bool) -> time::Interval { @@ -2612,50 +2662,6 @@ fn split_packets(transmit: &quinn_udp::Transmit) -> RelayContents { res } -/// Splits a packet into its component items. -#[derive(Debug)] -struct PacketSplitIter { - bytes: Bytes, -} - -impl PacketSplitIter { - /// Create a new PacketSplitIter from a packet. - /// - /// Returns an error if the packet is too big. - fn new(bytes: Bytes) -> Self { - Self { bytes } - } - - fn fail(&mut self) -> Option> { - self.bytes.clear(); - Some(Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "", - ))) - } -} - -impl Iterator for PacketSplitIter { - type Item = std::io::Result; - - fn next(&mut self) -> Option { - use bytes::Buf; - if self.bytes.has_remaining() { - if self.bytes.remaining() < 2 { - return self.fail(); - } - let len = self.bytes.get_u16_le() as usize; - if self.bytes.remaining() < len { - return self.fail(); - } - let item = self.bytes.split_to(len); - Some(Ok(item)) - } else { - None - } - } -} - /// The fake address used by the QUIC layer to address a node. /// /// You can consider this as nothing more than a lookup key for a node the [`MagicSock`] knows diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index b1f56a7686..2794bbf342 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -1,3 +1,8 @@ +//! The relay actor. +//! +//! The [`RelayActor`] handles all the relay connections. It is helped by the +//! [`ConnectedRelayActor`] which handles a single relay connection. + use std::{ collections::{BTreeMap, BTreeSet}, future::Future, @@ -17,10 +22,12 @@ use tokio::{ time, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, info_span, trace, warn, Instrument}; +use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use super::{ActorMessage, MagicSock, Metrics as MagicsockMetrics, RelayContents}; -use crate::key::{NodeId, PUBLIC_KEY_LENGTH}; +use crate::{ + key::{NodeId, PUBLIC_KEY_LENGTH}, + magicsock::{MagicSock, Metrics as MagicsockMetrics, RelayContents, RelayRecvSender}, +}; /// How long a non-home relay connection needs to be idle (last written to) before we close it. const RELAY_INACTIVE_CLEANUP_TIME: Duration = Duration::from_secs(60); @@ -40,13 +47,14 @@ pub(super) enum RelayActorMessage { }, } -/// Contains fields for an active relay connection. +/// An actor which handles a single relay connection. #[derive(Debug)] -struct ActiveRelay { +struct ConnectedRelayActor { /// The time of the last request for its write /// channel (currently even if there was no write). last_write: Instant, - msg_sender: mpsc::Sender, + /// Channel to send received QUIC datagrams on. + relay_recv_channel: RelayRecvSender, url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, @@ -62,7 +70,7 @@ struct ActiveRelay { #[derive(Debug)] #[allow(clippy::large_enum_variant)] -enum ActiveRelayMessage { +enum ConnectedRelayMessage { GetLastWrite(oneshot::Sender), Ping(oneshot::Sender>), GetLocalAddr(oneshot::Sender>), @@ -72,16 +80,16 @@ enum ActiveRelayMessage { Shutdown, } -impl ActiveRelay { +impl ConnectedRelayActor { fn new( url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, - msg_sender: mpsc::Sender, + relay_recv_channel: RelayRecvSender, ) -> Self { - ActiveRelay { + ConnectedRelayActor { last_write: Instant::now(), - msg_sender, + relay_recv_channel, url, node_present: BTreeSet::new(), backoff: backoff::exponential::ExponentialBackoffBuilder::new() @@ -95,7 +103,7 @@ impl ActiveRelay { } } - async fn run(mut self, mut inbox: mpsc::Receiver) -> anyhow::Result<()> { + async fn run(mut self, mut inbox: mpsc::Receiver) -> anyhow::Result<()> { debug!("initial dial {}", self.url); self.relay_client .connect() @@ -119,23 +127,23 @@ impl ActiveRelay { trace!("tick: inbox: {:?}", msg); match msg { - ActiveRelayMessage::GetLastWrite(r) => { + ConnectedRelayMessage::GetLastWrite(r) => { r.send(self.last_write).ok(); } - ActiveRelayMessage::Ping(r) => { + ConnectedRelayMessage::Ping(r) => { r.send(self.relay_client.ping().await).ok(); } - ActiveRelayMessage::GetLocalAddr(r) => { + ConnectedRelayMessage::GetLocalAddr(r) => { r.send(self.relay_client.local_addr().await).ok(); } - ActiveRelayMessage::GetClient(r) => { + ConnectedRelayMessage::GetClient(r) => { self.last_write = Instant::now(); r.send(self.relay_client.clone()).ok(); } - ActiveRelayMessage::NotePreferred(is_preferred) => { + ConnectedRelayMessage::NotePreferred(is_preferred) => { self.relay_client.note_preferred(is_preferred).await; } - ActiveRelayMessage::GetNodeRoute(peer, r) => { + ConnectedRelayMessage::GetNodeRoute(peer, r) => { let client = if self.node_present.contains(&peer) { Some(self.relay_client.clone()) } else { @@ -143,7 +151,7 @@ impl ActiveRelay { }; r.send(client).ok(); } - ActiveRelayMessage::Shutdown => { + ConnectedRelayMessage::Shutdown => { debug!("shutdown"); break; } @@ -211,29 +219,37 @@ impl ActiveRelay { } match msg { - ReceivedMessage::ReceivedPacket { source, data } => { + ReceivedMessage::ReceivedPacket { + remote_node_id, + data, + } => { trace!(len=%data.len(), "received msg"); // If this is a new sender we hadn't seen before, remember it and // register a route for this peer. if self .last_packet_src .as_ref() - .map(|p| *p != source) + .map(|p| *p != remote_node_id) .unwrap_or(true) { // avoid map lookup w/ high throughput single peer - self.last_packet_src = Some(source); - self.node_present.insert(source); + self.last_packet_src = Some(remote_node_id); + self.node_present.insert(remote_node_id); } - let res = RelayReadResult { - url: self.url.clone(), - src: source, - buf: data, - }; - if let Err(err) = self.msg_sender.try_send(ActorMessage::ReceiveRelay(res)) - { - warn!("dropping received relay packet: {:?}", err); + for datagram in PacketSplitIter::new(data) { + let Ok(datagram) = datagram else { + error!("Invalid packet split"); + break; + }; + let res = RelayRecvDatagram { + url: self.url.clone(), + src: remote_node_id, + buf: datagram, + }; + if let Err(err) = self.relay_recv_channel.try_send(res) { + warn!("dropping received relay packet: {err:#}"); + } } ReadResult::Continue @@ -241,6 +257,7 @@ impl ActiveRelay { ReceivedMessage::Ping(data) => { // Best effort reply to the ping. let dc = self.relay_client.clone(); + // TODO: Unbounded tasks/channel tokio::task::spawn(async move { if let Err(err) = dc.send_pong(data).await { warn!("pong error: {:?}", err); @@ -266,20 +283,20 @@ impl ActiveRelay { pub(super) struct RelayActor { msock: Arc, + relay_recv_channel: RelayRecvSender, /// relay Url -> connection to the node - active_relay: BTreeMap, JoinHandle<()>)>, - msg_sender: mpsc::Sender, + connected_relays: BTreeMap, JoinHandle<()>)>, ping_tasks: JoinSet<(RelayUrl, bool)>, cancel_token: CancellationToken, } impl RelayActor { - pub(super) fn new(msock: Arc, msg_sender: mpsc::Sender) -> Self { + pub(super) fn new(msock: Arc, recv_channel: RelayRecvSender) -> Self { let cancel_token = CancellationToken::new(); Self { msock, - active_relay: Default::default(), - msg_sender, + relay_recv_channel: recv_channel, + connected_relays: Default::default(), ping_tasks: Default::default(), cancel_token, } @@ -327,8 +344,8 @@ impl RelayActor { trace!("shutting down relay recv loop"); break; }; - - with_cancel(self.cancel_token.child_token(), self.handle_msg(msg)).await; + let cancel_token = self.cancel_token.child_token(); + cancel_token.run_until_cancelled(self.handle_msg(msg)).await; } _ = cleanup_timer.tick() => { trace!("tick: cleanup"); @@ -361,12 +378,16 @@ impl RelayActor { } async fn note_preferred(&self, my_url: &RelayUrl) { - futures_buffered::join_all(self.active_relay.iter().map(|(url, (s, _))| async move { - let is_preferred = url == my_url; - s.send(ActiveRelayMessage::NotePreferred(is_preferred)) - .await - .ok() - })) + futures_buffered::join_all( + self.connected_relays + .iter() + .map(|(url, (s, _))| async move { + let is_preferred = url == my_url; + s.send(ConnectedRelayMessage::NotePreferred(is_preferred)) + .await + .ok() + }), + ) .await; } @@ -386,10 +407,10 @@ impl RelayActor { const PAYLAOD_SIZE: usize = MAX_PACKET_SIZE - PUBLIC_KEY_LENGTH; - // Split into multiple packets if needed. - // In almost all cases this will be a single packet. - // But we have no guarantee that the total size of the contents including - // length prefix will be smaller than the payload size. + // When Quinn sends a GSO Transmit magicsock::split_packets will make us receive + // more than one packet to send in a single call. We join all packets back together + // and prefix them with a u16 packet size. They then get sent as a single DISCO + // frame. for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) { match relay_client.send(remote_node, packet).await { Ok(_) => { @@ -410,8 +431,12 @@ impl RelayActor { } /// Returns `true`if the message was sent successfully. - async fn send_to_active(&mut self, url: &RelayUrl, msg: ActiveRelayMessage) -> bool { - let res = self.active_relay.get(url); + async fn send_to_connected_relay( + &mut self, + url: &RelayUrl, + msg: ConnectedRelayMessage, + ) -> bool { + let res = self.connected_relays.get(url); match res { Some((s, _)) => match s.send(msg).await { Ok(_) => true, @@ -424,7 +449,10 @@ impl RelayActor { } } - /// Connect to the given relay node. + /// Returns a relay client to a given relay. + /// + /// If a connection to the relay already exists it is used, otherwise a new one is + /// created. async fn connect_relay( &mut self, url: &RelayUrl, @@ -438,7 +466,7 @@ impl RelayActor { { let (os, or) = oneshot::channel(); if self - .send_to_active(url, ActiveRelayMessage::GetClient(os)) + .send_to_connected_relay(url, ConnectedRelayMessage::GetClient(os)) .await { if let Ok(client) = or.await { @@ -455,7 +483,7 @@ impl RelayActor { // SF connection rather than dialing Frankfurt. if let Some(node) = remote_node { for url in self - .active_relay + .connected_relays .keys() .cloned() .collect::>() @@ -463,7 +491,7 @@ impl RelayActor { { let (os, or) = oneshot::channel(); if self - .send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os)) + .send_to_connected_relay(&url, ConnectedRelayMessage::GetNodeRoute(*node, os)) .await { if let Ok(Some(client)) = or.await { @@ -482,11 +510,10 @@ impl RelayActor { let my_relay = self.msock.my_relay(); let ipv6_reported = self.msock.ipv6_reported.clone(); - let url = url.clone(); - let url1 = url.clone(); - // building a client dials the relay - let mut builder = relay::client::ClientBuilder::new(url1.clone()); + // The relay client itself is an actor which will maintain the connection to the + // relay server. + let mut builder = relay::client::ClientBuilder::new(url.clone()); if let Some(url) = self.msock.proxy_url() { builder = builder.proxy_url(url.clone()); } @@ -496,40 +523,41 @@ impl RelayActor { Box::pin(async move { ipv6_reported.load(Ordering::Relaxed) }) }) .can_ack_pings(true) - .is_preferred(my_relay.as_ref() == Some(&url1)); + .is_preferred(my_relay.as_ref() == Some(url)); #[cfg(any(test, feature = "test-utils"))] let builder = builder.insecure_skip_cert_verify(self.msock.insecure_skip_relay_cert_verify); - let (dc, dc_receiver) = builder.build( + let (relay_client, relay_receiver) = builder.build( self.msock.secret_key.clone(), self.msock.dns_resolver.clone(), ); - - let (s, r) = mpsc::channel(64); - - let c = dc.clone(); - let msg_sender = self.msg_sender.clone(); - let url1 = url.clone(); - let handle = tokio::task::spawn( + let (conn_actor_inbox_tx, conn_actor_inbox_rx) = mpsc::channel(64); + let handle = tokio::task::spawn({ + let url = url.clone(); + let relay_client = relay_client.clone(); + let relay_recv_channel = self.relay_recv_channel.clone(); + let span = info_span!("conn-relay-actor", %url); async move { - let ad = ActiveRelay::new(url1, c, dc_receiver, msg_sender); + let conn_actor = + ConnectedRelayActor::new(url, relay_client, relay_receiver, relay_recv_channel); - if let Err(err) = ad.run(r).await { + if let Err(err) = conn_actor.run(conn_actor_inbox_rx).await { warn!("connection error: {:?}", err); } } - .instrument(info_span!("active-relay", %url)), - ); + .instrument(span) + }); // Insert, to make sure we do not attempt to double connect. - self.active_relay.insert(url.clone(), (s, handle)); + self.connected_relays + .insert(url.clone(), (conn_actor_inbox_tx, handle)); inc!(MagicsockMetrics, num_relay_conns_added); self.log_active_relay(); - dc + relay_client } /// Closes the relay connections not originating from a local IP address. @@ -540,7 +568,7 @@ impl RelayActor { async fn maybe_close_relays_on_rebind(&mut self, okay_local_ips: &[IpAddr]) { let mut tasks = Vec::new(); for url in self - .active_relay + .connected_relays .keys() .cloned() .collect::>() @@ -548,7 +576,7 @@ impl RelayActor { { let (os, or) = oneshot::channel(); let la = if self - .send_to_active(&url, ActiveRelayMessage::GetLocalAddr(os)) + .send_to_connected_relay(&url, ConnectedRelayMessage::GetLocalAddr(os)) .await { match or.await { @@ -570,7 +598,7 @@ impl RelayActor { let (os, or) = oneshot::channel(); let ping_sent = self - .send_to_active(&url, ActiveRelayMessage::Ping(os)) + .send_to_connected_relay(&url, ConnectedRelayMessage::Ping(os)) .await; self.ping_tasks.spawn(async move { @@ -605,16 +633,19 @@ impl RelayActor { } async fn clean_stale_relay(&mut self) { - trace!("checking {} relays for staleness", self.active_relay.len()); + trace!( + "checking {} relays for staleness", + self.connected_relays.len() + ); let now = Instant::now(); let mut to_close = Vec::new(); - for (i, (s, _)) in &self.active_relay { + for (i, (s, _)) in &self.connected_relays { if Some(i) == self.msock.my_relay().as_ref() { continue; } let (os, or) = oneshot::channel(); - match s.send(ActiveRelayMessage::GetLastWrite(os)).await { + match s.send(ConnectedRelayMessage::GetLastWrite(os)).await { Ok(_) => match or.await { Ok(last_write) => { if last_write.duration_since(now) > RELAY_INACTIVE_CLEANUP_TIME { @@ -635,7 +666,7 @@ impl RelayActor { trace!( "closing {} of {} relays", to_close.len(), - self.active_relay.len() + self.connected_relays.len() ); for i in to_close { self.close_relay(&i, "idle").await; @@ -646,11 +677,11 @@ impl RelayActor { } async fn close_all_relay(&mut self, why: &'static str) { - if self.active_relay.is_empty() { + if self.connected_relays.is_empty() { return; } // Need to collect to avoid double borrow - let urls: Vec<_> = self.active_relay.keys().cloned().collect(); + let urls: Vec<_> = self.connected_relays.keys().cloned().collect(); for url in urls { self.close_relay(&url, why).await; } @@ -658,10 +689,10 @@ impl RelayActor { } async fn close_relay(&mut self, url: &RelayUrl, why: &'static str) { - if let Some((s, t)) = self.active_relay.remove(url) { + if let Some((s, t)) = self.connected_relays.remove(url) { debug!(%url, "closing connection: {}", why); - s.send(ActiveRelayMessage::Shutdown).await.ok(); + s.send(ConnectedRelayMessage::Shutdown).await.ok(); t.abort(); // ensure the task is shutdown inc!(MagicsockMetrics, num_relay_conns_removed); @@ -669,9 +700,9 @@ impl RelayActor { } fn log_active_relay(&self) { - debug!("{} active relay conns{}", self.active_relay.len(), { + debug!("{} active relay conns{}", self.connected_relays.len(), { let mut s = String::new(); - if !self.active_relay.is_empty() { + if !self.connected_relays.is_empty() { s += ":"; for node in self.active_relay_sorted() { s += &format!(" relay-{}", node,); @@ -682,19 +713,20 @@ impl RelayActor { } fn active_relay_sorted(&self) -> impl Iterator { - let mut ids: Vec<_> = self.active_relay.keys().cloned().collect(); + let mut ids: Vec<_> = self.connected_relays.keys().cloned().collect(); ids.sort(); ids.into_iter() } } -#[derive(derive_more::Debug)] -pub(super) struct RelayReadResult { +/// A single datagram received from a relay server. +/// +/// This could be either a QUIC or DISCO packet. +#[derive(Debug)] +pub(super) struct RelayRecvDatagram { pub(super) url: RelayUrl, pub(super) src: NodeId, - /// packet data - #[debug(skip)] pub(super) buf: Bytes, } @@ -704,9 +736,15 @@ pub(super) enum ReadResult { Continue, } -/// Combines blobs into packets of at most MAX_PACKET_SIZE. +/// Combines datagrams into a single DISCO frame of at most MAX_PACKET_SIZE. +/// +/// The disco `iroh_relay::protos::Frame::SendPacket` frame can contain more then a single +/// datagram. Each datagram in this frame is prefixed with a little-endian 2-byte length +/// prefix. This occurs when Quinn sends a GSO transmit containing more than one datagram, +/// which are split using [`crate::magicsock::split_packets`]. /// -/// Each item in a packet has a little-endian 2-byte length prefix. +/// The [`PacketSplitIter`] does the inverse and splits such packets back into individual +/// datagrams. pub(super) struct PacketizeIter { iter: std::iter::Peekable, buffer: BytesMut, @@ -750,6 +788,51 @@ where } } +/// Splits a single [`ReceivedMessage::ReceivedPacket`] frame into datagrams. +/// +/// This splits packets joined by [`PacketizeIter`] back into individual datagrams. See +/// that struct for more details. +#[derive(Debug)] +struct PacketSplitIter { + bytes: Bytes, +} + +impl PacketSplitIter { + /// Create a new PacketSplitIter from a packet. + fn new(bytes: Bytes) -> Self { + Self { bytes } + } + + fn fail(&mut self) -> Option> { + self.bytes.clear(); + Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "", + ))) + } +} + +impl Iterator for PacketSplitIter { + type Item = std::io::Result; + + fn next(&mut self) -> Option { + use bytes::Buf; + if self.bytes.has_remaining() { + if self.bytes.remaining() < 2 { + return self.fail(); + } + let len = self.bytes.get_u16_le() as usize; + if self.bytes.remaining() < len { + return self.fail(); + } + let item = self.bytes.split_to(len); + Some(Ok(item)) + } else { + None + } + } +} + async fn with_cancel(token: CancellationToken, f: F) where F: Future, From f75a04b0cacdda9947d0b38e7179c8693708f9d1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 4 Dec 2024 11:08:36 +0100 Subject: [PATCH 2/6] refactor(iroh): Remove with_cancel, use run_until_cancelled (#3000) ## Description We manually created a function that was already provided by the CancellationToken. Use the one from the library for consistency. ## Breaking Changes ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh/src/magicsock/relay_actor.rs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 2794bbf342..74ed653862 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -5,7 +5,6 @@ use std::{ collections::{BTreeMap, BTreeSet}, - future::Future, net::{IpAddr, SocketAddr}, sync::{atomic::Ordering, Arc}, time::{Duration, Instant}, @@ -326,8 +325,8 @@ impl RelayActor { match task_result { Ok((url, ping_success)) => { if !ping_success { - with_cancel( - self.cancel_token.child_token(), + let token = self.cancel_token.child_token(); + token.run_until_cancelled( self.close_or_reconnect_relay(&url, "rebind-ping-fail") ).await; } @@ -349,7 +348,8 @@ impl RelayActor { } _ = cleanup_timer.tick() => { trace!("tick: cleanup"); - with_cancel(self.cancel_token.child_token(), self.clean_stale_relay()).await; + let cancel_token = self.cancel_token.child_token(); + cancel_token.run_until_cancelled(self.clean_stale_relay()).await; } } } @@ -833,19 +833,6 @@ impl Iterator for PacketSplitIter { } } -async fn with_cancel(token: CancellationToken, f: F) -where - F: Future, -{ - tokio::select! { - _ = token.cancelled_owned() => { - // abort - } - _ = f => { - } - } -} - #[cfg(test)] mod tests { use super::*; From 566d7eb5797ab173028deacc075635110bdf221e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 4 Dec 2024 14:05:07 +0100 Subject: [PATCH 3/6] chore(iroh, iroh-relay): Avoid a duplicate tungstenite dependency (#3006) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description Before we used to depend on both tungstenite version 0.21 as well as 0.24, because: ``` tungstenite v0.21.0 └── tokio-tungstenite v0.21.0 └── tokio-tungstenite-wasm v0.3.1 ├── iroh v0.29.0 (/home/philipp/program/work/iroh/iroh) └── iroh-relay v0.29.0 (/home/philipp/program/work/iroh/iroh-relay) ├── iroh v0.29.0 (/home/philipp/program/work/iroh/iroh) └── iroh-net-report v0.29.0 (/home/philipp/program/work/iroh/iroh-net-report) └── iroh v0.29.0 (/home/philipp/program/work/iroh/iroh) tungstenite v0.24.0 └── tokio-tungstenite v0.24.0 ├── iroh v0.29.0 (/home/philipp/program/work/iroh/iroh) └── iroh-relay v0.29.0 (/home/philipp/program/work/iroh/iroh-relay) ├── iroh v0.29.0 (/home/philipp/program/work/iroh/iroh) └── iroh-net-report v0.29.0 (/home/philipp/program/work/iroh/iroh-net-report) └── iroh v0.29.0 (/home/philipp/program/work/iroh/iroh) ``` Basically, `tokio-tungstenite-wasm` pulls in `0.21` and there's no newer version of it yet. But we updated all our dependencies including `tungstenite`, duplicating it. ## Notes & open questions I want this to be temporary until we can finally switch to `fasterwebsockets` entirely once it implements [`poll`-based methods](https://github.com/denoland/fastwebsockets/pull/78) (but I worry the project's maintenance is ... unclear). I checked the [tungstenite changelog](https://github.com/snapview/tungstenite-rs/blob/master/CHANGELOG.md), and it doesn't look like there's anything critical in there. The `rustls` update doesn't affect us - we don't duplicate rustls versions after this rollback. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - ~~[ ] Tests if relevant.~~ - [x] All breaking changes documented. --- Cargo.lock | 38 ++++---------------------------------- iroh-relay/Cargo.toml | 2 +- iroh/Cargo.toml | 2 +- 3 files changed, 6 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 77046bc729..65cb06c5d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2257,7 +2257,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", - "tokio-tungstenite 0.24.0", + "tokio-tungstenite", "tokio-tungstenite-wasm", "tokio-util", "tracing", @@ -2538,7 +2538,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-rustls-acme", - "tokio-tungstenite 0.24.0", + "tokio-tungstenite", "tokio-tungstenite-wasm", "tokio-util", "toml", @@ -5054,19 +5054,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite 0.21.0", -] - -[[package]] -name = "tokio-tungstenite" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.24.0", + "tungstenite", ] [[package]] @@ -5082,7 +5070,7 @@ dependencies = [ "js-sys", "thiserror 1.0.69", "tokio", - "tokio-tungstenite 0.21.0", + "tokio-tungstenite", "wasm-bindgen", "web-sys", ] @@ -5308,24 +5296,6 @@ dependencies = [ "utf-8", ] -[[package]] -name = "tungstenite" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http 1.1.0", - "httparse", - "log", - "rand", - "sha1", - "thiserror 1.0.69", - "utf-8", -] - [[package]] name = "typenum" version = "1.17.0" diff --git a/iroh-relay/Cargo.toml b/iroh-relay/Cargo.toml index 7c21715f98..4bbf92d8a6 100644 --- a/iroh-relay/Cargo.toml +++ b/iroh-relay/Cargo.toml @@ -83,7 +83,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = [ "ring", ] } tokio-rustls-acme = { version = "0.6", optional = true } -tokio-tungstenite = "0.24" +tokio-tungstenite = "0.21" # avoid duplicating this dependency as long as tokio-tungstenite-wasm isn't updated tokio-tungstenite-wasm = "0.3" tokio-util = { version = "0.7", features = ["io-util", "io", "codec", "rt"] } toml = { version = "0.8", optional = true } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 15dafae9b4..d79fde0bad 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -97,7 +97,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = [ "ring", ] } tokio-stream = { version = "0.1.15" } -tokio-tungstenite = "0.24" +tokio-tungstenite = "0.21" # avoid duplicating this dependency as long as tokio-tungstenite-wasm isn't updated tokio-tungstenite-wasm = "0.3" tokio-util = { version = "0.7", features = ["io-util", "io", "codec", "rt"] } tracing = "0.1" From b2b070fefeeb59da438f81bef451eb3f6fd14524 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 4 Dec 2024 14:35:52 +0100 Subject: [PATCH 4/6] refactor(iroh): Remove unused rate limiter (#3007) ## Description This was a rate limiter which seems like the intention was to slow down how fast it sends to a particular relay server. This is the wrong place to rate-limit. If the server limits that it should read slower from the TCP stream (it does this now!) and that blocks up the TCP-stream and in turn slows down the relay client. In no world does a socket internally rate-limit to one of it's destinations, that's not how sockets work. And this client exists for the MagicSock. ## Breaking Changes ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-relay/src/client/conn.rs | 52 +++------------------------- iroh-relay/src/server/actor.rs | 3 +- iroh-relay/src/server/client_conn.rs | 6 ++-- 3 files changed, 9 insertions(+), 52 deletions(-) diff --git a/iroh-relay/src/client/conn.rs b/iroh-relay/src/client/conn.rs index 8230f79c2f..4c9fbc64c4 100644 --- a/iroh-relay/src/client/conn.rs +++ b/iroh-relay/src/client/conn.rs @@ -4,14 +4,13 @@ use std::{ net::SocketAddr, - num::NonZeroU32, pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration, }; -use anyhow::{anyhow, bail, ensure, Context as _, Result}; +use anyhow::{anyhow, bail, ensure, Result}; use bytes::Bytes; use futures_lite::Stream; use futures_sink::Sink; @@ -229,7 +228,6 @@ enum ConnWriterMessage { struct ConnWriterTasks { recv_msgs: mpsc::Receiver, writer: ConnWriter, - rate_limiter: Option, } impl ConnWriterTasks { @@ -237,7 +235,7 @@ impl ConnWriterTasks { while let Some(msg) = self.recv_msgs.recv().await { match msg { ConnWriterMessage::Packet((key, bytes)) => { - send_packet(&mut self.writer, &self.rate_limiter, key, bytes).await?; + send_packet(&mut self.writer, key, bytes).await?; } ConnWriterMessage::Pong(data) => { write_frame(&mut self.writer, Frame::Pong { data }, None).await?; @@ -360,7 +358,7 @@ impl ConnBuilder { } } - async fn server_handshake(&mut self) -> Result> { + async fn server_handshake(&mut self) -> Result<()> { debug!("server_handshake: started"); let client_info = ClientInfo { version: PROTOCOL_VERSION, @@ -369,22 +367,18 @@ impl ConnBuilder { crate::protos::relay::send_client_key(&mut self.writer, &self.secret_key, &client_info) .await?; - // TODO: add some actual configuration - let rate_limiter = RateLimiter::new(0, 0)?; - debug!("server_handshake: done"); - Ok(rate_limiter) + Ok(()) } pub async fn build(mut self) -> Result<(Conn, ConnReceiver)> { // exchange information with the server - let rate_limiter = self.server_handshake().await?; + self.server_handshake().await?; // create task to handle writing to the server let (writer_sender, writer_recv) = mpsc::channel(PER_CLIENT_SEND_QUEUE_DEPTH); let writer_task = tokio::task::spawn( ConnWriterTasks { - rate_limiter, writer: self.writer, recv_msgs: writer_recv, } @@ -494,7 +488,6 @@ pub enum ReceivedMessage { pub(crate) async fn send_packet + Unpin>( mut writer: S, - rate_limiter: &Option, dst: NodeId, packet: Bytes, ) -> Result<()> { @@ -508,43 +501,8 @@ pub(crate) async fn send_packet + Unpin>( dst_key: dst, packet, }; - if let Some(rate_limiter) = rate_limiter { - if rate_limiter.check_n(frame.len()).is_err() { - tracing::debug!("dropping send: rate limit reached"); - return Ok(()); - } - } writer.send(frame).await?; writer.flush().await?; Ok(()) } - -pub(crate) struct RateLimiter { - inner: governor::DefaultDirectRateLimiter, -} - -impl RateLimiter { - pub(crate) fn new(bytes_per_second: usize, bytes_burst: usize) -> Result> { - if bytes_per_second == 0 || bytes_burst == 0 { - return Ok(None); - } - let bytes_per_second = NonZeroU32::new(u32::try_from(bytes_per_second)?) - .context("bytes_per_second not non-zero")?; - let bytes_burst = - NonZeroU32::new(u32::try_from(bytes_burst)?).context("bytes_burst not non-zero")?; - Ok(Some(Self { - inner: governor::RateLimiter::direct( - governor::Quota::per_second(bytes_per_second).allow_burst(bytes_burst), - ), - })) - } - - pub(crate) fn check_n(&self, n: usize) -> Result<()> { - let n = NonZeroU32::new(u32::try_from(n)?).context("n not non-zero")?; - match self.inner.check_n(n) { - Ok(_) => Ok(()), - Err(_) => bail!("batch cannot go through"), - } - } -} diff --git a/iroh-relay/src/server/actor.rs b/iroh-relay/src/server/actor.rs index 9ef38a0964..40970f3e21 100644 --- a/iroh-relay/src/server/actor.rs +++ b/iroh-relay/src/server/actor.rs @@ -308,8 +308,7 @@ mod tests { // write message from b to a let msg = b"hello world!"; - crate::client::conn::send_packet(&mut b_io, &None, node_id_a, Bytes::from_static(msg)) - .await?; + crate::client::conn::send_packet(&mut b_io, node_id_a, Bytes::from_static(msg)).await?; // get message on a's reader let frame = recv_frame(FrameType::RecvPacket, &mut a_io).await?; diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index afb4ed1efd..b40e0baae1 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -617,7 +617,7 @@ mod tests { // send packet println!(" send packet"); let data = b"hello world!"; - conn::send_packet(&mut io_rw, &None, target, Bytes::from_static(data)).await?; + conn::send_packet(&mut io_rw, target, Bytes::from_static(data)).await?; let msg = server_channel_r.recv().await.unwrap(); match msg { actor::Message::SendPacket { @@ -640,7 +640,7 @@ mod tests { let mut disco_data = disco::MAGIC.as_bytes().to_vec(); disco_data.extend_from_slice(target.as_bytes()); disco_data.extend_from_slice(data); - conn::send_packet(&mut io_rw, &None, target, disco_data.clone().into()).await?; + conn::send_packet(&mut io_rw, target, disco_data.clone().into()).await?; let msg = server_channel_r.recv().await.unwrap(); match msg { actor::Message::SendDiscoPacket { @@ -698,7 +698,7 @@ mod tests { let data = b"hello world!"; let target = SecretKey::generate().public(); - conn::send_packet(&mut io_rw, &None, target, Bytes::from_static(data)).await?; + conn::send_packet(&mut io_rw, target, Bytes::from_static(data)).await?; let msg = server_channel_r.recv().await.unwrap(); match msg { actor::Message::SendPacket { From a3f0497ba8c3a1a478280681b476ccc6fd8d7eb0 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Wed, 4 Dec 2024 14:58:33 +0100 Subject: [PATCH 5/6] chore: bump netsim setup (#3004) ## Description The sims on `netsim` have been promoted and are replacing the old ones. PRs older than 2 weeks from main no longer work with netsim. A simple rebase should fix it where necessary. I'll leave this as is for another 2 weeks and then do the cleanup pass. This has the neat benefit of also dropping old fixture generation scripts which should drop netsim execution time by ~1 min. ## Breaking Changes ## Notes & open questions ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --- .github/workflows/ci.yml | 2 +- .github/workflows/netsim.yml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 29145aaa32..dfb4c4e006 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -297,7 +297,7 @@ jobs: branch: ${{ github.ref }} max_workers: 4 netsim_branch: "main" - sim_paths: "sims/iroh_v2/iroh.json,sims/integration_v2" + sim_paths: "sims/iroh/iroh.json,sims/integration" pr_number: ${{ github.event.pull_request.number || '' }} codespell: diff --git a/.github/workflows/netsim.yml b/.github/workflows/netsim.yml index cbd08149e4..b7a79cd1d1 100644 --- a/.github/workflows/netsim.yml +++ b/.github/workflows/netsim.yml @@ -39,7 +39,7 @@ jobs: branch: "main" max_workers: 1 netsim_branch: "main" - sim_paths: "sims/iroh_v2,sims/integration_v2" + sim_paths: "sims/iroh,sims/integration" pr_number: "" publish_metrics: true build_profile: "optimized-release" @@ -53,7 +53,7 @@ jobs: branch: ${{inputs.branch}} max_workers: 1 netsim_branch: ${{inputs.netsim_branch}} - sim_paths: "sims/iroh_v2" + sim_paths: "sims/iroh" pr_number: ${{inputs.pr_number}} publish_metrics: false build_profile: "optimized-release" From e575af2a7fcb54fabcafd16e1b7edf29b3ef784b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 4 Dec 2024 15:30:55 +0100 Subject: [PATCH 6/6] tests(iroh): Packet loss is expected with socket rebinding (#3001) ## Description This allows packet loss when rebinding sockets. ## Breaking Changes ## Notes & open questions I think it would also be reasonable to use e.g.: ``` rust enum ExpectedLoss { Low, Medium, } ``` And then accept like 50 lost packets in Medium. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh/src/magicsock.rs | 90 +++++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 29 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index c117d2cf77..0e37df5bf8 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -2995,7 +2995,7 @@ mod tests { } #[instrument(skip_all, fields(me = %ep.endpoint.node_id().fmt_short()))] - async fn echo_receiver(ep: MagicStack) -> Result<()> { + async fn echo_receiver(ep: MagicStack, loss: ExpectedLoss) -> Result<()> { info!("accepting conn"); let conn = ep.endpoint.accept().await.expect("no conn"); @@ -3026,10 +3026,12 @@ mod tests { let stats = conn.stats(); info!("stats: {:#?}", stats); // TODO: ensure panics in this function are reported ok - assert!( - stats.path.lost_packets < 10, - "[receiver] should not loose many packets", - ); + if matches!(loss, ExpectedLoss::AlmostNone) { + assert!( + stats.path.lost_packets < 10, + "[receiver] should not loose many packets", + ); + } info!("close"); conn.close(0u32.into(), b"done"); @@ -3040,7 +3042,12 @@ mod tests { } #[instrument(skip_all, fields(me = %ep.endpoint.node_id().fmt_short()))] - async fn echo_sender(ep: MagicStack, dest_id: PublicKey, msg: &[u8]) -> Result<()> { + async fn echo_sender( + ep: MagicStack, + dest_id: PublicKey, + msg: &[u8], + loss: ExpectedLoss, + ) -> Result<()> { info!("connecting to {}", dest_id.fmt_short()); let dest = NodeAddr::new(dest_id); let conn = ep @@ -3071,10 +3078,12 @@ mod tests { let stats = conn.stats(); info!("stats: {:#?}", stats); - assert!( - stats.path.lost_packets < 10, - "[sender] should not loose many packets", - ); + if matches!(loss, ExpectedLoss::AlmostNone) { + assert!( + stats.path.lost_packets < 10, + "[sender] should not loose many packets", + ); + } info!("close"); conn.close(0u32.into(), b"done"); @@ -3083,14 +3092,25 @@ mod tests { Ok(()) } + #[derive(Debug, Copy, Clone)] + enum ExpectedLoss { + AlmostNone, + YeahSure, + } + /// Runs a roundtrip between the [`echo_sender`] and [`echo_receiver`]. - async fn run_roundtrip(sender: MagicStack, receiver: MagicStack, payload: &[u8]) { + async fn run_roundtrip( + sender: MagicStack, + receiver: MagicStack, + payload: &[u8], + loss: ExpectedLoss, + ) { let send_node_id = sender.endpoint.node_id(); let recv_node_id = receiver.endpoint.node_id(); info!("\nroundtrip: {send_node_id:#} -> {recv_node_id:#}"); - let receiver_task = tokio::spawn(echo_receiver(receiver)); - let sender_res = echo_sender(sender, recv_node_id, payload).await; + let receiver_task = tokio::spawn(echo_receiver(receiver, loss)); + let sender_res = echo_sender(sender, recv_node_id, payload, loss).await; let sender_is_err = match sender_res { Ok(()) => false, Err(err) => { @@ -3129,14 +3149,26 @@ mod tests { for i in 0..5 { info!("\n-- round {i}"); - run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; + run_roundtrip( + m1.clone(), + m2.clone(), + b"hello m1", + ExpectedLoss::AlmostNone, + ) + .await; + run_roundtrip( + m2.clone(), + m1.clone(), + b"hello m2", + ExpectedLoss::AlmostNone, + ) + .await; info!("\n-- larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data, ExpectedLoss::AlmostNone).await; + run_roundtrip(m2.clone(), m1.clone(), &data, ExpectedLoss::AlmostNone).await; } Ok(()) @@ -3223,14 +3255,14 @@ mod tests { for i in 0..rounds { println!("-- [m1 changes] round {}", i + 1); - run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; + run_roundtrip(m1.clone(), m2.clone(), b"hello m1", ExpectedLoss::YeahSure).await; + run_roundtrip(m2.clone(), m1.clone(), b"hello m2", ExpectedLoss::YeahSure).await; println!("-- [m1 changes] larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data, ExpectedLoss::YeahSure).await; + run_roundtrip(m2.clone(), m1.clone(), &data, ExpectedLoss::YeahSure).await; } std::mem::drop(m1_network_change_guard); @@ -3252,14 +3284,14 @@ mod tests { for i in 0..rounds { println!("-- [m2 changes] round {}", i + 1); - run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; + run_roundtrip(m1.clone(), m2.clone(), b"hello m1", ExpectedLoss::YeahSure).await; + run_roundtrip(m2.clone(), m1.clone(), b"hello m2", ExpectedLoss::YeahSure).await; println!("-- [m2 changes] larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data, ExpectedLoss::YeahSure).await; + run_roundtrip(m2.clone(), m1.clone(), &data, ExpectedLoss::YeahSure).await; } std::mem::drop(m2_network_change_guard); @@ -3282,14 +3314,14 @@ mod tests { for i in 0..rounds { println!("-- [m1 & m2 changes] round {}", i + 1); - run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; + run_roundtrip(m1.clone(), m2.clone(), b"hello m1", ExpectedLoss::YeahSure).await; + run_roundtrip(m2.clone(), m1.clone(), b"hello m2", ExpectedLoss::YeahSure).await; println!("-- [m1 & m2 changes] larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data, ExpectedLoss::YeahSure).await; + run_roundtrip(m2.clone(), m1.clone(), &data, ExpectedLoss::YeahSure).await; } std::mem::drop(m1_m2_network_change_guard);