Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: iroh-perf #2186

Merged
merged 7 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
# uses: obi1kenobi/cargo-semver-checks-action@v2
uses: n0-computer/cargo-semver-checks-action@feat-baseline
with:
package: iroh, iroh-base, iroh-blobs, iroh-cli, iroh-dns-server, iroh-gossip, iroh-metrics, iroh-net, iroh-docs
package: iroh, iroh-base, iroh-blobs, iroh-cli, iroh-dns-server, iroh-gossip, iroh-metrics, iroh-net, iroh-net-bench, iroh-docs
baseline-rev: ${{ env.HEAD_COMMIT_SHA }}
use-cache: false

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env:
RUSTFLAGS: -Dwarnings
RUSTDOCFLAGS: -Dwarnings
SCCACHE_CACHE_SIZE: "50G"
CRATES_LIST: "iroh,iroh-blobs,iroh-gossip,iroh-metrics,iroh-net,iroh-docs,iroh-test,iroh-cli,iroh-dns-server"
CRATES_LIST: "iroh,iroh-blobs,iroh-gossip,iroh-metrics,iroh-net,iroh-net-bench,iroh-docs,iroh-test,iroh-cli,iroh-dns-server"

jobs:
build_and_test_nix:
Expand Down
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions iroh-net/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ anyhow = "1.0.22"
bytes = "1"
hdrhistogram = { version = "7.2", default-features = false }
iroh-net = { path = ".." }
quinn = "0.10"
rcgen = "0.11.1"
rustls = { version = "0.21.0", default-features = false, features = ["quic"] }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.0.1", features = ["rt", "sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] }
socket2 = "0.5"
218 changes: 61 additions & 157 deletions iroh-net/bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
use std::{
sync::{Arc, Mutex},
time::Instant,
};

use anyhow::{Context, Result};
use anyhow::Result;
use clap::Parser;
use iroh_net::{
endpoint::{self, Connection},
Endpoint, NodeAddr,
};
use tokio::sync::Semaphore;
use tracing::{info, trace};

use iroh_net_bench::{
configure_tracing_subscriber, connect_client, drain_stream, rt, send_data_on_stream,
server_endpoint,
stats::{Stats, TransferResult},
Opt,
};

use iroh_net_bench::{configure_tracing_subscriber, iroh, quinn, rt, s2n, Commands, Opt};

fn main() {
let opt = Opt::parse();
let cmd = Commands::parse();
configure_tracing_subscriber();

match cmd {
Commands::Iroh(opt) => {
if let Err(e) = run_iroh(opt) {
eprintln!("failed: {e:#}");
}
}
Commands::Quinn(opt) => {
if let Err(e) = run_quinn(opt) {
eprintln!("failed: {e:#}");
}
}
Commands::S2n(opt) => {
if let Err(e) = run_s2n(opt) {
eprintln!("failed: {e:#}");
}
}
}
}

pub fn run_iroh(opt: Opt) -> Result<()> {
let server_span = tracing::error_span!("server");
let runtime = rt();
let (server_addr, endpoint) = {
let _guard = server_span.enter();
server_endpoint(&runtime, &opt)
iroh::server_endpoint(&runtime, &opt)
};

let server_thread = std::thread::spawn(move || {
let _guard = server_span.entered();
if let Err(e) = runtime.block_on(server(endpoint, opt)) {
if let Err(e) = runtime.block_on(iroh::server(endpoint, opt)) {
eprintln!("server failed: {e:#}");
}
});
Expand All @@ -43,7 +47,7 @@ fn main() {
handles.push(std::thread::spawn(move || {
let _guard = tracing::error_span!("client", id).entered();
let runtime = rt();
match runtime.block_on(client(server_addr, opt)) {
match runtime.block_on(iroh::client(server_addr, opt)) {
Ok(stats) => Ok(stats),
Err(e) => {
eprintln!("client failed: {e:#}");
Expand All @@ -62,153 +66,53 @@ fn main() {
}

server_thread.join().expect("server thread");
}

async fn server(endpoint: Endpoint, opt: Opt) -> Result<()> {
let mut server_tasks = Vec::new();

// Handle only the expected amount of clients
for _ in 0..opt.clients {
let handshake = endpoint.accept().await.unwrap();
let connection = handshake.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
Err(endpoint::ConnectionError::ApplicationClosed(_)) => break,
Err(e) => {
eprintln!("accepting stream failed: {e:?}");
break;
}
Ok(stream) => stream,
};
trace!("stream established");

tokio::spawn(async move {
drain_stream(&mut recv_stream, opt.read_unordered).await?;
send_data_on_stream(&mut send_stream, opt.download_size).await?;
Ok::<_, anyhow::Error>(())
});
}

if opt.stats {
println!("\nServer connection stats:\n{:#?}", connection.stats());
}
}));
}

// Await all the tasks. We have to do this to prevent the runtime getting dropped
// and all server tasks to be cancelled
for handle in server_tasks {
if let Err(e) = handle.await {
eprintln!("Server task error: {e:?}");
};
}

Ok(())
}

async fn client(server_addr: NodeAddr, opt: Opt) -> Result<ClientStats> {
let (endpoint, connection) = connect_client(server_addr, opt).await?;

let start = Instant::now();

let connection = Arc::new(connection);

let mut stats = ClientStats::default();
let mut first_error = None;

let sem = Arc::new(Semaphore::new(opt.max_streams));
let results = Arc::new(Mutex::new(Vec::new()));
for _ in 0..opt.streams {
let permit = sem.clone().acquire_owned().await.unwrap();
let results = results.clone();
let connection = connection.clone();
tokio::spawn(async move {
let result =
handle_client_stream(connection, opt.upload_size, opt.read_unordered).await;
info!("stream finished: {:?}", result);
results.lock().unwrap().push(result);
drop(permit);
});
}
pub fn run_quinn(opt: Opt) -> Result<()> {
let server_span = tracing::error_span!("server");
let runtime = rt();
let (server_addr, endpoint) = {
let _guard = server_span.enter();
quinn::server_endpoint(&runtime, &opt)
};

// Wait for remaining streams to finish
let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap();
let server_thread = std::thread::spawn(move || {
let _guard = server_span.entered();
if let Err(e) = runtime.block_on(quinn::server(endpoint, opt)) {
eprintln!("server failed: {e:#}");
}
});

for result in results.lock().unwrap().drain(..) {
match result {
Ok((upload_result, download_result)) => {
stats.upload_stats.stream_finished(upload_result);
stats.download_stats.stream_finished(download_result);
}
Err(e) => {
if first_error.is_none() {
first_error = Some(e);
let mut handles = Vec::new();
for id in 0..opt.clients {
handles.push(std::thread::spawn(move || {
let _guard = tracing::error_span!("client", id).entered();
let runtime = rt();
match runtime.block_on(quinn::client(server_addr, opt)) {
Ok(stats) => Ok(stats),
Err(e) => {
eprintln!("client failed: {e:#}");
Err(e)
}
}
}
}

stats.upload_stats.total_duration = start.elapsed();
stats.download_stats.total_duration = start.elapsed();

// Explicit close of the connection, since handles can still be around due
// to `Arc`ing them
connection.close(0u32.into(), b"Benchmark done");

endpoint.close(0u32.into(), b"").await?;

if opt.stats {
println!("\nClient connection stats:\n{:#?}", connection.stats());
}));
}

match first_error {
None => Ok(stats),
Some(e) => Err(e),
for (id, handle) in handles.into_iter().enumerate() {
// We print all stats at the end of the test sequentially to avoid
// them being garbled due to being printed concurrently
if let Ok(stats) = handle.join().expect("client thread") {
stats.print(id);
}
}
}

async fn handle_client_stream(
connection: Arc<Connection>,
upload_size: u64,
read_unordered: bool,
) -> Result<(TransferResult, TransferResult)> {
let start = Instant::now();

let (mut send_stream, mut recv_stream) = connection
.open_bi()
.await
.context("failed to open stream")?;

send_data_on_stream(&mut send_stream, upload_size).await?;

let upload_result = TransferResult::new(start.elapsed(), upload_size);

let start = Instant::now();
let size = drain_stream(&mut recv_stream, read_unordered).await?;
let download_result = TransferResult::new(start.elapsed(), size as u64);

Ok((upload_result, download_result))
}
server_thread.join().expect("server thread");

#[derive(Default)]
struct ClientStats {
upload_stats: Stats,
download_stats: Stats,
Ok(())
}

impl ClientStats {
pub fn print(&self, client_id: usize) {
println!();
println!("Client {client_id} stats:");

if self.upload_stats.total_size != 0 {
self.upload_stats.print("upload");
}

if self.download_stats.total_size != 0 {
self.download_stats.print("download");
}
}
pub fn run_s2n(_opt: s2n::Opt) -> Result<()> {
unimplemented!()
}
Loading
Loading