From bf0226087fe30968f8edcc12e4e575463252d97e Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 25 Apr 2024 18:39:40 +0200 Subject: [PATCH] Make this look a litte real --- Cargo.lock | 107 +++++++++++-- iroh-bytes/examples/provide-bytes.rs | 11 +- iroh-bytes/src/provider.rs | 18 +-- iroh-gossip/examples/chat.rs | 10 +- iroh-net/Cargo.toml | 2 + iroh-net/examples/listen-unreliable.rs | 7 +- iroh-net/examples/listen.rs | 7 +- iroh-net/src/magic_endpoint.rs | 200 ++++++++++++++++++++----- iroh-sync/src/net.rs | 2 +- iroh/src/node/builder.rs | 8 +- iroh/src/sync_engine.rs | 5 +- iroh/src/sync_engine/live.rs | 4 +- 12 files changed, 303 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79893fc6b6..af84530362 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1f7a89a8ee5889d2593ae422ce6e1bb03e48a0e8a16e4fa0882dfcbe7e182ef" dependencies = [ "bytes", - "futures-lite", + "futures-lite 2.3.0", "genawaiter", "iroh-blake3", "iroh-io", @@ -472,6 +472,18 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1457,6 +1469,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.2" @@ -1527,6 +1548,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.30" @@ -1563,6 +1590,20 @@ dependencies = [ "futures-sink", ] +[[package]] +name = "futures-concurrency" +version = "7.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ee14e256b9143bfafbf2fddeede6f396650bacf95d06fc1b3f2b503df129a0" +dependencies = [ + "bitvec", + "futures-core", + "futures-lite 1.13.0", + "pin-project", + "slab", + "smallvec", +] + [[package]] name = "futures-core" version = "0.3.30" @@ -1586,13 +1627,28 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-lite" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" dependencies = [ - "fastrand", + "fastrand 2.0.2", "futures-core", "futures-io", "parking", @@ -1929,9 +1985,9 @@ dependencies = [ [[package]] name = "hickory-server" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbbb45bc4dcb456445732c705e3cfdc7393b8bcae5c36ecec36b9d76bd67cb5" +checksum = "9be0e43c556b9b3fdb6c7c71a9a32153a2275d02419e3de809e520bfcfe40c37" dependencies = [ "async-trait", "bytes", @@ -2582,7 +2638,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74d1047ad5ca29ab4ff316b6830d86e7ea52cea54325e4d4a849692e1274b498" dependencies = [ "bytes", - "futures-lite", + "futures-lite 2.3.0", "pin-project", "smallvec", "tokio", @@ -2625,6 +2681,7 @@ dependencies = [ "duct", "flume", "futures", + "futures-concurrency", "governor", "hex", "hickory-proto", @@ -2646,6 +2703,7 @@ dependencies = [ "num_enum", "once_cell", "parking_lot", + "pin-project", "pkarr", "postcard", "pretty_assertions", @@ -3852,7 +3910,7 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quinn" version = "0.10.2" -source = "git+https://github.com/flub/quinn?branch=flub/reset-rtt#66ac4e668b082a7b725d84bd9d291caeabab610f" +source = "git+https://github.com/flub/quinn?branch=flub/reset-rtt#b01b7652ba5f05e898cc004e48bd72b67f98ae78" dependencies = [ "bytes", "pin-project-lite", @@ -3868,7 +3926,7 @@ dependencies = [ [[package]] name = "quinn-proto" version = "0.10.6" -source = "git+https://github.com/flub/quinn?branch=flub/reset-rtt#66ac4e668b082a7b725d84bd9d291caeabab610f" +source = "git+https://github.com/flub/quinn?branch=flub/reset-rtt#b01b7652ba5f05e898cc004e48bd72b67f98ae78" dependencies = [ "bytes", "rand", @@ -3885,7 +3943,7 @@ dependencies = [ [[package]] name = "quinn-udp" version = "0.4.1" -source = "git+https://github.com/flub/quinn?branch=flub/reset-rtt#66ac4e668b082a7b725d84bd9d291caeabab610f" +source = "git+https://github.com/flub/quinn?branch=flub/reset-rtt#b01b7652ba5f05e898cc004e48bd72b67f98ae78" dependencies = [ "bytes", "libc", @@ -3913,6 +3971,12 @@ dependencies = [ "pest_derive", ] +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "radix_trie" version = "0.2.1" @@ -4364,9 +4428,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" +checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" [[package]] name = "rustls-webpki" @@ -5097,6 +5161,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tempfile" version = "3.10.1" @@ -5104,7 +5174,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.2", "rustix", "windows-sys 0.52.0", ] @@ -5670,6 +5740,12 @@ dependencies = [ "libc", ] +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "walkdir" version = "2.5.0" @@ -6124,6 +6200,15 @@ dependencies = [ "windows 0.52.0", ] +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "x509-parser" version = "0.15.1" diff --git a/iroh-bytes/examples/provide-bytes.rs b/iroh-bytes/examples/provide-bytes.rs index 89777d7da4..3725249b30 100644 --- a/iroh-bytes/examples/provide-bytes.rs +++ b/iroh-bytes/examples/provide-bytes.rs @@ -11,6 +11,7 @@ //! To provide a collection (multiple blobs) use anyhow::Result; use tokio_util::task::LocalPoolHandle; +use tracing::warn; use tracing_subscriber::{prelude::*, EnvFilter}; use iroh_bytes::{format::collection::Collection, Hash}; @@ -84,7 +85,7 @@ async fn main() -> Result<()> { let lp = LocalPoolHandle::new(1); let accept_task = tokio::spawn(async move { - while let Some(conn) = endpoint.accept().await { + while let Some(incoming) = endpoint.accept().await { println!("connection incoming"); let db = db.clone(); @@ -92,6 +93,14 @@ async fn main() -> Result<()> { // spawn a task to handle the connection tokio::spawn(async move { + let remote_addr = incoming.remote_address(); + let conn = match incoming.await { + Ok(conn) => conn, + Err(err) => { + warn!(%remote_addr, "Error connecting: {err:#}"); + return; + } + }; iroh_bytes::provider::handle_connection(conn, db, MockEventSender, lp).await }); } diff --git a/iroh-bytes/src/provider.rs b/iroh-bytes/src/provider.rs index 3c23b3684b..fa39bc1c16 100644 --- a/iroh-bytes/src/provider.rs +++ b/iroh-bytes/src/provider.rs @@ -280,19 +280,19 @@ pub trait EventSender: Clone + Sync + Send + 'static { /// Handle a single connection. pub async fn handle_connection( - connecting: quinn::Connecting, + connection: quinn::Connection, db: D, events: E, rt: LocalPoolHandle, ) { - let remote_addr = connecting.remote_address(); - let connection = match connecting.await { - Ok(conn) => conn, - Err(err) => { - warn!(%remote_addr, "Error connecting: {err:#}"); - return; - } - }; + let remote_addr = connection.remote_address(); + // let connection = match connecting.await { + // Ok(conn) => conn, + // Err(err) => { + // warn!(%remote_addr, "Error connecting: {err:#}"); + // return; + // } + // }; let connection_id = connection.stable_id() as u64; let span = debug_span!("connection", connection_id, %remote_addr); async move { diff --git a/iroh-gossip/examples/chat.rs b/iroh-gossip/examples/chat.rs index 6a596b1bec..4a8eecbc4e 100644 --- a/iroh-gossip/examples/chat.rs +++ b/iroh-gossip/examples/chat.rs @@ -11,7 +11,6 @@ use iroh_gossip::{ }; use iroh_net::{ key::{PublicKey, SecretKey}, - magic_endpoint::accept_conn, relay::{RelayMap, RelayMode, RelayUrl}, MagicEndpoint, NodeAddr, }; @@ -200,8 +199,13 @@ async fn endpoint_loop(endpoint: MagicEndpoint, gossip: Gossip) { }); } } -async fn handle_connection(conn: quinn::Connecting, gossip: Gossip) -> anyhow::Result<()> { - let (peer_id, alpn, conn) = accept_conn(conn).await?; +async fn handle_connection( + mut conn: iroh_net::magic_endpoint::Connecting, + gossip: Gossip, +) -> anyhow::Result<()> { + let alpn = conn.alpn().await?; + let conn = conn.await?; + let peer_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?; match alpn.as_bytes() { GOSSIP_ALPN => gossip .handle_connection(conn) diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index f63e03dffd..5166de9c32 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -40,6 +40,7 @@ libc = "0.2.139" num_enum = "0.7" once_cell = "1.18.0" parking_lot = "0.12.1" +pin-project = "1" pkarr = { version = "1.1.3", default-features = false, features = ["async", "relay"] } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } quinn = "0.10" @@ -81,6 +82,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = tr # metrics iroh-metrics = { version = "0.14.0", path = "../iroh-metrics", default-features = false } strum = { version = "0.26.2", features = ["derive"] } +futures-concurrency = "7.6.0" [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] netlink-packet-core = "0.7.0" diff --git a/iroh-net/examples/listen-unreliable.rs b/iroh-net/examples/listen-unreliable.rs index 4ed651ff1a..cc889d64a1 100644 --- a/iroh-net/examples/listen-unreliable.rs +++ b/iroh-net/examples/listen-unreliable.rs @@ -63,9 +63,10 @@ async fn main() -> anyhow::Result<()> { ); // accept incoming connections, returns a normal QUIC connection - while let Some(conn) = endpoint.accept().await { - // accept the connection and extract the `node_id` and ALPN - let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?; + while let Some(mut conn) = endpoint.accept().await { + let alpn = conn.alpn().await?; + let conn = conn.await?; + let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?; info!( "new (unreliable) connection from {node_id} with ALPN {alpn} (coming from {})", conn.remote_address() diff --git a/iroh-net/examples/listen.rs b/iroh-net/examples/listen.rs index 5049ac4343..4da98624de 100644 --- a/iroh-net/examples/listen.rs +++ b/iroh-net/examples/listen.rs @@ -62,9 +62,10 @@ async fn main() -> anyhow::Result<()> { "\tcargo run --example connect -- --node-id {me} --addrs \"{local_addrs}\" --relay-url {relay_url}\n" ); // accept incoming connections, returns a normal QUIC connection - while let Some(conn) = endpoint.accept().await { - // accept the connection and extract the `node_id` and ALPN - let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?; + while let Some(mut conn) = endpoint.accept().await { + let alpn = conn.alpn().await?; + let conn = conn.await?; + let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?; info!( "new connection from {node_id} with ALPN {alpn} (coming from {})", conn.remote_address() diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index 959ad73359..5d251a05a0 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -1,13 +1,19 @@ //! An endpoint that leverages a [quinn::Endpoint] backed by a [magicsock::MagicSock]. -use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::net::{IpAddr, SocketAddr}; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use std::time::Duration; +use std::{any::Any, future::Future}; use anyhow::{anyhow, bail, ensure, Context, Result}; use derive_more::Debug; use futures::StreamExt; use quinn::VarInt; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; use crate::{ config, @@ -20,6 +26,10 @@ use crate::{ tls, NodeId, }; +mod rtt_actor; + +use self::rtt_actor::RttMessage; + pub use super::magicsock::{ConnectionInfo, LocalEndpointsStream}; pub use iroh_base::node_addr::{AddrInfo, NodeAddr}; @@ -232,6 +242,7 @@ pub struct MagicEndpoint { secret_key: Arc, msock: MagicSock, endpoint: quinn::Endpoint, + rtt_actor: Arc, keylog: bool, cancel_token: CancellationToken, } @@ -275,14 +286,18 @@ impl MagicEndpoint { secret_key: Arc::new(secret_key), msock, endpoint, + rtt_actor: Arc::new(rtt_actor::RttHandle::new()), keylog, cancel_token: CancellationToken::new(), }) } /// Accept an incoming connection on the socket. - pub fn accept(&self) -> quinn::Accept<'_> { - self.endpoint.accept() + pub fn accept(&self) -> Accept<'_> { + Accept { + inner: self.endpoint.accept(), + magic_ep: self.clone(), + } } /// Get the node id of this endpoint. @@ -520,18 +535,16 @@ impl MagicEndpoint { .connect_with(client_config, addr, "localhost")?; let connection = connect.await.context("failed connecting to provider")?; - { - let connection = connection.clone(); - let mut conn_type_stream = self.conn_type_stream(node_id)?; - tokio::spawn(async move { - info!("starting conn type change loop"); - while let Some(conn_type) = conn_type_stream.next().await { - info!(%conn_type, "conn type changed"); - connection.flub_reset_rtt(); - } - warn!("stopping conn type change loop"); - }); + + let rtt_msg = RttMessage::NewConnection { + connection: connection.weak_ref(), + conn_type_changes: self.conn_type_stream(node_id)?, + }; + if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await { + // If this actor is dead, that's not great but we can still function. + warn!("rtt-actor not reachable: {err:#}"); } + Ok(connection) } @@ -604,25 +617,128 @@ impl MagicEndpoint { } } -/// Accept an incoming connection and extract the client-provided [`PublicKey`] and ALPN protocol. -pub async fn accept_conn( - mut conn: quinn::Connecting, -) -> Result<(PublicKey, String, quinn::Connection)> { - let alpn = get_alpn(&mut conn).await?; - let conn = conn.await?; - let peer_id = get_remote_node_id(&conn)?; - Ok((peer_id, alpn, conn)) +/// Future produced by [`MagicEndpoint::accept`]. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct Accept<'a> { + #[pin] + inner: quinn::Accept<'a>, + magic_ep: MagicEndpoint, } -/// Extract the ALPN protocol from the peer's TLS certificate. -pub async fn get_alpn(connecting: &mut quinn::Connecting) -> Result { - let data = connecting.handshake_data().await?; - match data.downcast::() { - Ok(data) => match data.protocol { - Some(protocol) => std::string::String::from_utf8(protocol).map_err(Into::into), - None => bail!("no ALPN protocol available"), - }, - Err(_) => bail!("unknown handshake type"), +impl<'a> Future for Accept<'a> { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(inner)) => Poll::Ready(Some(Connecting { + inner, + magic_ep: this.magic_ep.clone(), + })), + } + } +} + +/// In-progress connection attempt future +#[derive(Debug)] +#[pin_project::pin_project] +pub struct Connecting { + #[pin] + inner: quinn::Connecting, + magic_ep: MagicEndpoint, +} + +impl Connecting { + /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security. + pub fn into_0rtt(self) -> Result<(quinn::Connection, quinn::ZeroRttAccepted), Self> { + match self.inner.into_0rtt() { + Ok((conn, zrtt_accepted)) => { + // If we can't notify the rtt-actor that's not great but not critical. + let Ok(peer_id) = get_remote_node_id(&conn) else { + warn!(?conn, "failed to get remote node id"); + return Ok((conn, zrtt_accepted)); + }; + let Ok(conn_type_changes) = self.magic_ep.conn_type_stream(&peer_id) else { + warn!(?conn, "failed to create conn_type_stream"); + return Ok((conn, zrtt_accepted)); + }; + let rtt_msg = RttMessage::NewConnection { + connection: conn.weak_ref(), + conn_type_changes, + }; + if let Err(err) = self.magic_ep.rtt_actor.msg_tx.try_send(rtt_msg) { + warn!(?conn, "rtt-actor not reachable: {err:#}"); + } + Ok((conn, zrtt_accepted)) + } + Err(inner) => Err(Self { + inner, + magic_ep: self.magic_ep, + }), + } + } + + /// Parameters negotiated during the handshake + pub async fn handshake_data(&mut self) -> Result, quinn::ConnectionError> { + self.inner.handshake_data().await + } + + /// The local IP address which was used when the peer established the connection. + pub fn local_ip(&self) -> Option { + self.inner.local_ip() + } + + /// The peer's UDP address. + pub fn remote_address(&self) -> SocketAddr { + self.inner.remote_address() + } + + /// Extracts the ALPN protocol from the peer's handshake data. + // Note, we could totally provide this method to be on a Connection as well. But we'd + // need to wrap Connection too. + pub async fn alpn(&mut self) -> Result { + let data = self.handshake_data().await?; + match data.downcast::() { + Ok(data) => match data.protocol { + Some(protocol) => std::string::String::from_utf8(protocol).map_err(Into::into), + None => bail!("no ALPN protocol available"), + }, + Err(_) => bail!("unknown handshake type"), + } + } +} + +impl Future for Connecting { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Ok(conn)) => { + // If we can't notify the rtt-actor that's not great but not critical. + let Ok(peer_id) = get_remote_node_id(&conn) else { + warn!(?conn, "failed to get remote node id"); + return Poll::Ready(Ok(conn)); + }; + let Ok(conn_type_changes) = this.magic_ep.conn_type_stream(&peer_id) else { + warn!(?conn, "failed to create conn_type_stream"); + return Poll::Ready(Ok(conn)); + }; + let rtt_msg = RttMessage::NewConnection { + connection: conn.weak_ref(), + conn_type_changes, + }; + if let Err(err) = this.magic_ep.rtt_actor.msg_tx.try_send(rtt_msg) { + warn!(?conn, "rtt-actor not reachable: {err:#}"); + } + Poll::Ready(Ok(conn)) + } + } } } @@ -718,8 +834,8 @@ mod tests { .await .unwrap(); info!("accepting connection"); - let conn = ep.accept().await.unwrap(); - let (_peer_id, _alpn, conn) = accept_conn(conn).await.unwrap(); + let incoming = ep.accept().await.unwrap(); + let conn = incoming.await.unwrap(); let mut stream = conn.accept_uni().await.unwrap(); let mut buf = [0u8, 5]; stream.read_exact(&mut buf).await.unwrap(); @@ -871,7 +987,8 @@ mod tests { let now = Instant::now(); println!("[server] round {}", i + 1); let incoming = ep.accept().await.unwrap(); - let (peer_id, _alpn, conn) = accept_conn(incoming).await.unwrap(); + let conn = incoming.await.unwrap(); + let peer_id = get_remote_node_id(&conn).unwrap(); info!(%i, peer = %peer_id.fmt_short(), "accepted connection"); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); let mut buf = vec![0u8; chunk_size]; @@ -978,8 +1095,10 @@ mod tests { } async fn accept_world(ep: MagicEndpoint, src: NodeId) { - let incoming = ep.accept().await.unwrap(); - let (node_id, alpn, conn) = accept_conn(incoming).await.unwrap(); + let mut incoming = ep.accept().await.unwrap(); + let alpn = incoming.alpn().await.unwrap(); + let conn = incoming.await.unwrap(); + let node_id = get_remote_node_id(&conn).unwrap(); assert_eq!(node_id, src); assert_eq!(alpn.as_bytes(), TEST_ALPN); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); @@ -1073,9 +1192,10 @@ mod tests { let _ep2_guard = CallOnDrop::new(move || { ep2_abort_handle.abort(); }); - async fn accept(ep: MagicEndpoint) -> (PublicKey, String, quinn::Connection) { + async fn accept(ep: MagicEndpoint) -> NodeId { let incoming = ep.accept().await.unwrap(); - accept_conn(incoming).await.unwrap() + let conn = incoming.await.unwrap(); + get_remote_node_id(&conn).unwrap() } // create a node addr with no direct connections @@ -1089,7 +1209,7 @@ mod tests { let _conn_2 = ep2.connect(ep1_nodeaddr, TEST_ALPN).await.unwrap(); - let (got_id, _, _conn) = accept_res.await.unwrap(); + let got_id = accept_res.await.unwrap(); assert_eq!(ep2_nodeid, got_id); res_ep1.await.unwrap().unwrap(); diff --git a/iroh-sync/src/net.rs b/iroh-sync/src/net.rs index 30bcfe8c82..cbe088a560 100644 --- a/iroh-sync/src/net.rs +++ b/iroh-sync/src/net.rs @@ -106,7 +106,7 @@ pub enum AcceptOutcome { /// Handle an iroh-sync connection and sync all shared documents in the replica store. pub async fn handle_connection( sync: SyncHandle, - connecting: quinn::Connecting, + connecting: iroh_net::magic_endpoint::Connecting, accept_cb: F, ) -> Result where diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 917395c511..a9f243b29f 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -17,7 +17,6 @@ use iroh_bytes::{ use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ discovery::{dns::DnsDiscovery, pkarr_publish::PkarrPublisher, ConcurrentDiscovery, Discovery}, - magic_endpoint::get_alpn, relay::RelayMode, util::AbortingJoinHandle, MagicEndpoint, @@ -543,7 +542,7 @@ where }, // handle incoming p2p connections Some(mut connecting) = server.accept() => { - let alpn = match get_alpn(&mut connecting).await { + let alpn = match connecting.alpn().await { Ok(alpn) => alpn, Err(err) => { error!("invalid handshake: {:?}", err); @@ -678,7 +677,7 @@ impl Default for GcPolicy { // TODO: Restructure this code to not take all these arguments. #[allow(clippy::too_many_arguments)] async fn handle_connection( - connecting: quinn::Connecting, + connecting: iroh_net::magic_endpoint::Connecting, alpn: String, node: Arc>, gossip: Gossip, @@ -688,8 +687,9 @@ async fn handle_connection( GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, SYNC_ALPN => sync.handle_connection(connecting).await?, alpn if alpn == iroh_bytes::protocol::ALPN => { + let connection = connecting.await?; iroh_bytes::provider::handle_connection( - connecting, + connection, node.db.clone(), node.callbacks.clone(), node.rt.clone(), diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 33ce2ec1e7..ec28d8ef3c 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -223,7 +223,10 @@ impl SyncEngine { } /// Handle an incoming iroh-sync connection. - pub async fn handle_connection(&self, conn: quinn::Connecting) -> anyhow::Result<()> { + pub async fn handle_connection( + &self, + conn: iroh_net::magic_endpoint::Connecting, + ) -> anyhow::Result<()> { self.to_live_actor .send(ToLiveActor::HandleConnection { conn }) .await?; diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 8a1f2a5997..b1147fa304 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -81,7 +81,7 @@ pub enum ToLiveActor { reply: sync::oneshot::Sender>, }, HandleConnection { - conn: quinn::Connecting, + conn: iroh_net::magic_endpoint::Connecting, }, AcceptSyncRequest { namespace: NamespaceId, @@ -717,7 +717,7 @@ impl LiveActor { } #[instrument("accept", skip_all)] - pub async fn handle_connection(&mut self, conn: quinn::Connecting) { + pub async fn handle_connection(&mut self, conn: iroh_net::magic_endpoint::Connecting) { let to_actor_tx = self.sync_actor_tx.clone(); let accept_request_cb = move |namespace, peer| { let to_actor_tx = to_actor_tx.clone();