From b76500d15c77fd7d154542194395305fe47aea8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 4 Dec 2024 16:58:36 +0100 Subject: [PATCH] feat: Implement `RelayDatagramsQueue` (#2998) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Based on #2986 ## Description Replaces `RelayRecvReceiver` and `RelayRecvSender` with the (clonable & shared) `RelayDatagramsQueue`. This queue contains a `ConcurrentQueue` (from the smol library `concurrent-queue`) and an `AtomicWaker`. It should only be polled from one task. If polled from multiple tasks, then tasks will overwrite each other's wakers. Unfortunately we can't make it use `&mut Self` in `poll_recv` because `quinn` expects the `AsyncUdpSocket`s `poll_recv` interface to be `&self`. This (un)fortunately doesn't have an effect on performance for me. (The benchmark is completely broken half the time for some reason, but when it runs it produces normal numbers:) ```sh $ DEV_RELAY_ONLY=true cargo run -p iroh-net-bench --release --features=local-relay -- iroh --with-relay --download-size=100M │ Throughput │ Duration ──────┼───────────────┼────────── AVG │ 55.05 MiB/s │ 1.82s P0 │ 55.03 MiB/s │ 1.82s P10 │ 55.06 MiB/s │ 1.82s P50 │ 55.06 MiB/s │ 1.82s P90 │ 55.06 MiB/s │ 1.82s P100 │ 55.06 MiB/s │ 1.82s ``` And basically exactly the same times for the PR this is based on. ## Breaking Changes ## Notes & open questions ## Todo - [x] Rename variables to e.g. `relay_datagrams_queue` instead of `relay_recv_sender` or `relay_recv_channel` etc. - [x] Add documentation about multiple tasks polling, etc. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- Cargo.lock | 1 + iroh/Cargo.toml | 1 + iroh/src/magicsock.rs | 179 ++++++++++++++++++++---------- iroh/src/magicsock/relay_actor.rs | 31 ++++-- 4 files changed, 144 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65cb06c5d5..d7437b25e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2188,6 +2188,7 @@ dependencies = [ "base64", "bytes", "clap", + "concurrent-queue", "criterion", "crypto_box", "der", diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index d79fde0bad..32a7dc8b69 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -17,6 +17,7 @@ workspace = true [dependencies] anyhow = { version = "1" } +concurrent-queue = "2.5" axum = { version = "0.7", optional = true } backoff = "0.4.0" base64 = "0.22.1" diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 0e37df5bf8..1b08302de9 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -31,8 +31,9 @@ use std::{ use anyhow::{anyhow, Context as _, Result}; use bytes::Bytes; +use concurrent_queue::ConcurrentQueue; use futures_lite::{FutureExt, Stream, StreamExt}; -use futures_util::stream::BoxStream; +use futures_util::{stream::BoxStream, task::AtomicWaker}; use iroh_base::key::NodeId; use iroh_metrics::{inc, inc_by}; use iroh_relay::protos::stun; @@ -180,12 +181,12 @@ pub(crate) struct MagicSock { me: String, /// Proxy proxy_url: Option, - /// Channel to receive datagrams from relays for [`AsyncUdpSocket::poll_recv`]. + /// Queue to receive datagrams from relays for [`AsyncUdpSocket::poll_recv`]. /// - /// QUIC datagrams received by relays are put on this channel and consumed by - /// [`AsyncUdpSocket`]. This channel takes care of the wakers needed by + /// Relay datagrams received by relays are put into this queue and consumed by + /// [`AsyncUdpSocket`]. This queue takes care of the wakers needed by /// [`AsyncUdpSocket::poll_recv`]. - relay_recv_channel: RelayRecvReceiver, + relay_datagrams_queue: Arc, network_send_wakers: Arc>>, /// Counter for ordering of [`MagicSock::poll_recv`] polling order. @@ -860,7 +861,7 @@ impl MagicSock { // For each output buffer keep polling the datagrams from the relay until one is // a QUIC datagram to be placed into the output buffer. Or the channel is empty. loop { - let recv = match self.relay_recv_channel.poll_recv(cx) { + let recv = match self.relay_datagrams_queue.poll_recv(cx) { Poll::Ready(Ok(recv)) => recv, Poll::Ready(Err(err)) => { error!("relay_recv_channel closed: {err:#}"); @@ -1510,7 +1511,7 @@ impl Handle { insecure_skip_relay_cert_verify, } = opts; - let (relay_recv_tx, relay_recv_rx) = relay_recv_channel(); + let relay_datagrams_queue = Arc::new(RelayDatagramsQueue::new()); let (pconn4, pconn6) = bind(addr_v4, addr_v6)?; let port = pconn4.port(); @@ -1547,7 +1548,7 @@ impl Handle { local_addrs: std::sync::RwLock::new((ipv4_addr, ipv6_addr)), closing: AtomicBool::new(false), closed: AtomicBool::new(false), - relay_recv_channel: relay_recv_rx, + relay_datagrams_queue: relay_datagrams_queue.clone(), network_send_wakers: Arc::new(parking_lot::Mutex::new(None)), poll_recv_counter: AtomicUsize::new(0), actor_sender: actor_sender.clone(), @@ -1572,7 +1573,7 @@ impl Handle { let mut actor_tasks = JoinSet::default(); - let relay_actor = RelayActor::new(inner.clone(), relay_recv_tx); + let relay_actor = RelayActor::new(inner.clone(), relay_datagrams_queue); let relay_actor_cancel_token = relay_actor.cancel_token(); actor_tasks.spawn( async move { @@ -1712,64 +1713,74 @@ enum DiscoBoxError { Parse(anyhow::Error), } -/// Channel for [`MagicSock::poll_recv_relay`] to receive datagrams from relays. +/// A queue holding [`RelayRecvDatagram`]s that can be polled in async +/// contexts, and wakes up tasks when something adds items using [`try_send`]. /// -/// The sender and receiver will take care of the required wakers needed for -/// [`AsyncUdpSocket::poll_recv`]. -// TODO: This channel should possibly be implemented with concurrent-queue and atomic-waker. -// Or maybe async-channel. -fn relay_recv_channel() -> (RelayRecvSender, RelayRecvReceiver) { - let (tx, rx) = mpsc::channel(128); - let waker = Arc::new(parking_lot::Mutex::new(None)); - let sender = RelayRecvSender { - sender: tx, - waker: waker.clone(), - }; - let receiver = RelayRecvReceiver { - receiver: parking_lot::Mutex::new(rx), - waker, - }; - (sender, receiver) +/// This is used to transfer relay datagrams between the [`RelayActor`] +/// and [`MagicSock`]. +/// +/// [`try_send`]: Self::try_send +/// [`RelayActor`]: crate::magicsock::RelayActor +/// [`MagicSock`]: crate::magicsock::MagicSock +#[derive(Debug)] +struct RelayDatagramsQueue { + queue: ConcurrentQueue, + waker: AtomicWaker, } -#[derive(Debug, Clone)] -struct RelayRecvSender { - sender: mpsc::Sender, - waker: Arc>>, -} +impl RelayDatagramsQueue { + /// Creates a new, empty queue with a fixed size bound of 128 items. + fn new() -> Self { + Self { + queue: ConcurrentQueue::bounded(128), + waker: AtomicWaker::new(), + } + } -impl RelayRecvSender { + /// Sends an item into this queue and wakes a potential task + /// that's registered its waker with a [`poll_recv`] call. + /// + /// [`poll_recv`]: Self::poll_recv fn try_send( &self, item: RelayRecvDatagram, - ) -> Result<(), mpsc::error::TrySendError> { - self.sender.try_send(item).inspect(|_| { - if let Some(waker) = self.waker.lock().take() { - waker.wake(); - } + ) -> Result<(), concurrent_queue::PushError> { + self.queue.push(item).inspect(|_| { + self.waker.wake(); }) } -} -#[derive(Debug)] -struct RelayRecvReceiver { - receiver: parking_lot::Mutex>, - waker: Arc>>, -} - -impl RelayRecvReceiver { + /// Polls for new items in the queue. + /// + /// Although this method is available from `&self`, it must not be + /// polled concurrently between tasks. + /// + /// Calling this will replace the current waker used. So if another task + /// waits for this, that task's waker will be replaced and it won't be + /// woken up for new items. + /// + /// The reason this method is made available as `&self` is because + /// the interface for quinn's [`AsyncUdpSocket::poll_recv`] requires us + /// to be able to poll from `&self`. fn poll_recv(&self, cx: &mut Context) -> Poll> { - let mut receiver = self.receiver.lock(); - self.waker.lock().replace(cx.waker().clone()); - match receiver.try_recv() { - Ok(item) => { - self.waker.lock().take(); - Poll::Ready(Ok(item)) - } - Err(mpsc::error::TryRecvError::Empty) => Poll::Pending, - Err(mpsc::error::TryRecvError::Disconnected) => { - Poll::Ready(Err(anyhow!("All RelayRecvSenders disconnected"))) + match self.queue.pop() { + Ok(value) => Poll::Ready(Ok(value)), + Err(concurrent_queue::PopError::Empty) => { + self.waker.register(cx.waker()); + + match self.queue.pop() { + Ok(value) => { + self.waker.take(); + Poll::Ready(Ok(value)) + } + Err(concurrent_queue::PopError::Empty) => Poll::Pending, + Err(concurrent_queue::PopError::Closed) => { + self.waker.take(); + Poll::Ready(Err(anyhow!("Queue closed"))) + } + } } + Err(concurrent_queue::PopError::Closed) => Poll::Ready(Err(anyhow!("Queue closed"))), } } } @@ -2857,7 +2868,10 @@ mod tests { use tokio_util::task::AbortOnDropHandle; use super::*; - use crate::{defaults::staging::EU_RELAY_HOSTNAME, tls, Endpoint, RelayMode}; + use crate::{ + defaults::staging::{self, EU_RELAY_HOSTNAME}, + tls, Endpoint, RelayMode, + }; const ALPN: &[u8] = b"n0/test/1"; @@ -4020,4 +4034,57 @@ mod tests { // TODO: could remove the addresses again, send, add it back and see it recover. // But we don't have that much private access to the NodeMap. This will do for now. } + + #[tokio::test(flavor = "multi_thread")] + async fn test_relay_datagram_queue() { + let queue = Arc::new(RelayDatagramsQueue::new()); + let url = staging::default_na_relay_node().url; + let capacity = queue.queue.capacity().unwrap(); + + let mut tasks = JoinSet::new(); + + tasks.spawn({ + let queue = queue.clone(); + async move { + let mut expected_msgs = vec![false; capacity]; + + while let Ok(datagram) = tokio::time::timeout( + Duration::from_millis(100), + futures_lite::future::poll_fn(|cx| { + queue.poll_recv(cx).map(|result| result.unwrap()) + }), + ) + .await + { + let msg_num = usize::from_le_bytes(datagram.buf.as_ref().try_into().unwrap()); + + if expected_msgs[msg_num] { + panic!("Received message number {msg_num} more than once (duplicated)"); + } + + expected_msgs[msg_num] = true; + } + + assert!(expected_msgs.into_iter().all(|is_set| is_set)); + } + }); + + for i in 0..capacity { + tasks.spawn({ + let queue = queue.clone(); + let url = url.clone(); + async move { + queue + .try_send(RelayRecvDatagram { + url, + src: PublicKey::from_bytes(&[0u8; 32]).unwrap(), + buf: Bytes::copy_from_slice(&i.to_le_bytes()), + }) + .unwrap(); + } + }); + } + + tasks.join_all().await; + } } diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 74ed653862..887e3b5403 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -25,7 +25,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use crate::{ key::{NodeId, PUBLIC_KEY_LENGTH}, - magicsock::{MagicSock, Metrics as MagicsockMetrics, RelayContents, RelayRecvSender}, + magicsock::{MagicSock, Metrics as MagicsockMetrics, RelayContents, RelayDatagramsQueue}, }; /// How long a non-home relay connection needs to be idle (last written to) before we close it. @@ -52,8 +52,8 @@ struct ConnectedRelayActor { /// The time of the last request for its write /// channel (currently even if there was no write). last_write: Instant, - /// Channel to send received QUIC datagrams on. - relay_recv_channel: RelayRecvSender, + /// Queue to send received relay datagrams on. + relay_datagrams_queue: Arc, url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, @@ -84,11 +84,11 @@ impl ConnectedRelayActor { url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, - relay_recv_channel: RelayRecvSender, + relay_datagrams_queue: Arc, ) -> Self { ConnectedRelayActor { last_write: Instant::now(), - relay_recv_channel, + relay_datagrams_queue, url, node_present: BTreeSet::new(), backoff: backoff::exponential::ExponentialBackoffBuilder::new() @@ -246,7 +246,7 @@ impl ConnectedRelayActor { src: remote_node_id, buf: datagram, }; - if let Err(err) = self.relay_recv_channel.try_send(res) { + if let Err(err) = self.relay_datagrams_queue.try_send(res) { warn!("dropping received relay packet: {err:#}"); } } @@ -282,7 +282,7 @@ impl ConnectedRelayActor { pub(super) struct RelayActor { msock: Arc, - relay_recv_channel: RelayRecvSender, + relay_datagrams_queue: Arc, /// relay Url -> connection to the node connected_relays: BTreeMap, JoinHandle<()>)>, ping_tasks: JoinSet<(RelayUrl, bool)>, @@ -290,11 +290,14 @@ pub(super) struct RelayActor { } impl RelayActor { - pub(super) fn new(msock: Arc, recv_channel: RelayRecvSender) -> Self { + pub(super) fn new( + msock: Arc, + relay_datagrams_queue: Arc, + ) -> Self { let cancel_token = CancellationToken::new(); Self { msock, - relay_recv_channel: recv_channel, + relay_datagrams_queue, connected_relays: Default::default(), ping_tasks: Default::default(), cancel_token, @@ -536,11 +539,15 @@ impl RelayActor { let handle = tokio::task::spawn({ let url = url.clone(); let relay_client = relay_client.clone(); - let relay_recv_channel = self.relay_recv_channel.clone(); + let relay_datagrams_queue = self.relay_datagrams_queue.clone(); let span = info_span!("conn-relay-actor", %url); async move { - let conn_actor = - ConnectedRelayActor::new(url, relay_client, relay_receiver, relay_recv_channel); + let conn_actor = ConnectedRelayActor::new( + url, + relay_client, + relay_receiver, + relay_datagrams_queue, + ); if let Err(err) = conn_actor.run(conn_actor_inbox_rx).await { warn!("connection error: {:?}", err);