Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-net)!: Improve initial connection latency #2234

Merged
merged 30 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
892c4a5
Modify doctor to show something about RTTs
flub Apr 10, 2024
cf3fa48
Better rtts
flub Apr 11, 2024
277d5bb
Merge branch 'main' into flub/messing-around-with-rtt
flub Apr 18, 2024
d900242
hack up resetting rtt. and some other dead ends
flub Apr 19, 2024
66a025c
Merge branch 'main' into flub/messing-around-with-rtt
flub Apr 24, 2024
b923c31
remove unused methods
flub Apr 24, 2024
bf02260
Make this look a litte real
flub Apr 25, 2024
794f022
missing file
flub Apr 25, 2024
b03d496
Merge branch 'main' into flub/messing-around-with-rtt
flub Apr 26, 2024
c215a73
A bunch of logging and clarifications
flub Apr 30, 2024
cf53f28
use newer quinn branch
flub Apr 30, 2024
6a9961b
Merge branch 'main' into flub/messing-around-with-rtt
flub Apr 30, 2024
ba1ffbb
Throw in some changes that are not yet released, but will work
flub May 3, 2024
193d3d4
Use a newer version of quic-rpc, using iroh-quinn
flub May 6, 2024
be315d8
Drop the quinn patches, use released versions
flub May 6, 2024
008d470
bunch of cleanups, remove doctor changes
flub May 6, 2024
3518b49
Merge branch 'main' into flub/messing-around-with-rtt
flub May 6, 2024
c380941
Merge branch 'main' into flub/messing-around-with-rtt
flub May 6, 2024
5b81453
Merge branch 'main' into flub/messing-around-with-rtt
flub May 7, 2024
e55e290
explain why we ignore the connection type switched to
flub May 7, 2024
4b80cb0
Use a helper function
flub May 7, 2024
59ba489
Provide the Connection stuct directly.
flub May 8, 2024
996b8c4
do double reference
flub May 8, 2024
924754f
avoid pulling in quinn itself
flub May 8, 2024
42ea2f3
move to right location
flub May 8, 2024
4379e51
cleanup imports
flub May 8, 2024
0703621
Use new re-export
flub May 8, 2024
6a89fb9
describe why this field is here
flub May 8, 2024
edb0055
require iroh-net now
flub May 8, 2024
bca8016
fix cargo.toml
flub May 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 210 additions & 132 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ iroh-net = { version = "0.15.0", path = "../iroh-net", optional = true }
num_cpus = "1.15.0"
parking_lot = { version = "0.12.1", optional = true }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quinn = "0.10"
quinn = { package = "iroh-quinn", version = "0.10" }
rand = "0.8"
range-collections = "0.4.0"
redb = { version = "2.0.0", optional = true }
Expand Down
11 changes: 10 additions & 1 deletion iroh-blobs/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! To provide a collection (multiple blobs)
use anyhow::Result;
use tokio_util::task::LocalPoolHandle;
use tracing::warn;
use tracing_subscriber::{prelude::*, EnvFilter};

use iroh_blobs::{format::collection::Collection, Hash};
Expand Down Expand Up @@ -84,14 +85,22 @@ async fn main() -> Result<()> {
let lp = LocalPoolHandle::new(1);

let accept_task = tokio::spawn(async move {
while let Some(conn) = endpoint.accept().await {
while let Some(incoming) = endpoint.accept().await {
println!("connection incoming");

let db = db.clone();
let lp = lp.clone();

// spawn a task to handle the connection
tokio::spawn(async move {
let remote_addr = incoming.remote_address();
let conn = match incoming.await {
Ok(conn) => conn,
Err(err) => {
warn!(%remote_addr, "Error connecting: {err:#}");
return;
}
};
iroh_blobs::provider::handle_connection(conn, db, MockEventSender, lp).await
});
}
Expand Down
12 changes: 3 additions & 9 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use iroh_io::stats::{
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,
};
use iroh_io::{AsyncSliceReader, AsyncStreamWriter, TokioStreamWriter};
use iroh_net::magic_endpoint;
use serde::{Deserialize, Serialize};
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, debug_span, info, trace, warn};
Expand Down Expand Up @@ -280,19 +281,12 @@ pub trait EventSender: Clone + Sync + Send + 'static {

/// Handle a single connection.
pub async fn handle_connection<D: Map, E: EventSender>(
connecting: quinn::Connecting,
connection: magic_endpoint::Connection,
db: D,
events: E,
rt: LocalPoolHandle,
) {
let remote_addr = connecting.remote_address();
let connection = match connecting.await {
Ok(conn) => conn,
Err(err) => {
warn!(%remote_addr, "Error connecting: {err:#}");
return;
}
};
let remote_addr = connection.remote_address();
let connection_id = connection.stable_id() as u64;
let span = debug_span!("connection", connection_id, %remote_addr);
async move {
Expand Down
4 changes: 2 additions & 2 deletions iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ parking_lot = "0.12.1"
pkarr = { version = "1.1.5", default-features = false }
portable-atomic = "1"
postcard = "1.0.8"
quic-rpc = { version = "0.8.0", features = ["flume-transport", "quinn-transport"] }
quinn = "0.10.2"
quic-rpc = { version = "0.9.0", features = ["flume-transport", "quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.10.2"}
rand = "0.8.5"
rustyline = "12.0.0"
serde = { version = "1.0.197", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tempfile = { version = "3.4" }
iroh-net = { version = "0.15.0", optional = true, path = "../iroh-net" }
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
quinn = { version = "0.10", optional = true }
quinn = { package = "iroh-quinn", version = "0.10", optional = true }
futures-util = { version = "0.3.25", optional = true }
lru = "0.12"
self_cell = "1.0.3"
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub enum AcceptOutcome {
/// Handle an iroh-docs connection and sync all shared documents in the replica store.
pub async fn handle_connection<F, Fut>(
sync: SyncHandle,
connecting: quinn::Connecting,
connecting: iroh_net::magic_endpoint::Connecting,
accept_cb: F,
) -> Result<SyncFinished, AcceptError>
where
Expand Down
2 changes: 1 addition & 1 deletion iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ iroh-base = { version = "0.15.0", path = "../iroh-base" }
# net dependencies (optional)
futures-lite = { version = "2.3", optional = true }
iroh-net = { path = "../iroh-net", version = "0.15.0", optional = true, default-features = false }
quinn = { version = "0.10", optional = true }
quinn = { package = "iroh-quinn", version = "0.10", optional = true }
tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] }
tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }
Expand Down
10 changes: 7 additions & 3 deletions iroh-gossip/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use iroh_gossip::{
};
use iroh_net::{
key::{PublicKey, SecretKey},
magic_endpoint::accept_conn,
relay::{RelayMap, RelayMode, RelayUrl},
MagicEndpoint, NodeAddr,
};
Expand Down Expand Up @@ -200,8 +199,13 @@ async fn endpoint_loop(endpoint: MagicEndpoint, gossip: Gossip) {
});
}
}
async fn handle_connection(conn: quinn::Connecting, gossip: Gossip) -> anyhow::Result<()> {
let (peer_id, alpn, conn) = accept_conn(conn).await?;
async fn handle_connection(
mut conn: iroh_net::magic_endpoint::Connecting,
gossip: Gossip,
) -> anyhow::Result<()> {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let peer_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?;
match alpn.as_bytes() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
Expand Down
8 changes: 5 additions & 3 deletions iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ der = { version = "0.7", features = ["alloc", "derive"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-concurrency = "7.6.0"
futures-lite = "2.3"
futures-sink = "0.3.25"
futures-util = "0.3.25"
Expand All @@ -44,11 +45,12 @@ libc = "0.2.139"
num_enum = "0.7"
once_cell = "1.18.0"
parking_lot = "0.12.1"
pin-project = "1"
pkarr = { version = "1.1.4", default-features = false, features = ["async", "relay"] }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quinn = "0.10"
quinn-proto = "0.10.5"
quinn-udp = "0.4"
quinn = { package = "iroh-quinn", version = "0.10.4" }
quinn-proto = { package = "iroh-quinn-proto", version = "0.10.7" }
quinn-udp = { package = "iroh-quinn-udp", version = "0.4" }
rand = "0.8"
rand_core = "0.6.4"
rcgen = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = "1.0.22"
bytes = "1"
hdrhistogram = { version = "7.2", default-features = false }
iroh-net = { path = ".." }
quinn = "0.10"
quinn = { package = "iroh-quinn", version = "0.10"}
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.0.1", features = ["rt", "sync"] }
tracing = "0.1"
Expand Down
7 changes: 4 additions & 3 deletions iroh-net/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ async fn main() -> anyhow::Result<()> {
);
// accept incoming connections, returns a normal QUIC connection

while let Some(conn) = endpoint.accept().await {
// accept the connection and extract the `node_id` and ALPN
let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?;
while let Some(mut conn) = endpoint.accept().await {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?;
info!(
"new (unreliable) connection from {node_id} with ALPN {alpn} (coming from {})",
conn.remote_address()
Expand Down
7 changes: 4 additions & 3 deletions iroh-net/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ async fn main() -> anyhow::Result<()> {
"\tcargo run --example connect -- --node-id {me} --addrs \"{local_addrs}\" --relay-url {relay_url}\n"
);
// accept incoming connections, returns a normal QUIC connection
while let Some(conn) = endpoint.accept().await {
// accept the connection and extract the `node_id` and ALPN
let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?;
while let Some(mut conn) = endpoint.accept().await {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?;
dignifiedquire marked this conversation as resolved.
Show resolved Hide resolved
info!(
"new connection from {node_id} with ALPN {alpn} (coming from {})",
conn.remote_address()
Expand Down
15 changes: 10 additions & 5 deletions iroh-net/src/disco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ pub struct Ping {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Pong {
pub tx_id: stun::TransactionId,
/// The observed address off the ping sender.
///
/// 18 bytes (16+2) on the wire; v4-mapped ipv6 for IPv4.
pub src: SendAddr,
pub ping_observed_addr: SendAddr,
}

/// Addresses to which we can send. This is either a UDP or a relay address.
Expand Down Expand Up @@ -280,15 +282,18 @@ impl Pong {
let tx_id = stun::TransactionId::from(tx_id);
let src = send_addr_from_bytes(&p[TX_LEN..])?;

Ok(Pong { tx_id, src })
Ok(Pong {
tx_id,
ping_observed_addr: src,
})
}

fn as_bytes(&self) -> Vec<u8> {
let header = msg_header(MessageType::Pong, V0);
let mut out = header.to_vec();
out.extend_from_slice(&self.tx_id);

let src_bytes = send_addr_to_vec(&self.src);
let src_bytes = send_addr_to_vec(&self.ping_observed_addr);
out.extend(src_bytes);
out
}
Expand Down Expand Up @@ -412,15 +417,15 @@ mod tests {
name: "pong",
m: Message::Pong(Pong{
tx_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12].into(),
src: SendAddr::Udp("2.3.4.5:1234".parse().unwrap()),
ping_observed_addr: SendAddr::Udp("2.3.4.5:1234".parse().unwrap()),
}),
want: "02 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 00 00 00 00 00 00 00 00 00 00 00 ff ff 02 03 04 05 d2 04",
},
Test {
name: "pongv6",
m: Message::Pong(Pong {
tx_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12].into(),
src: SendAddr::Udp("[fed0::12]:6666".parse().unwrap()),
ping_observed_addr: SendAddr::Udp("[fed0::12]:6666".parse().unwrap()),
}),
want: "02 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 00 fe d0 00 00 00 00 00 00 00 00 00 00 00 00 00 12 0a 1a",
},
Expand Down
Loading
Loading