Skip to content

Commit

Permalink
test: run 2k concurrent transfers with 1MB of data
Browse files Browse the repository at this point in the history
Now we can reliably handle the launching of many more streams, so let's
really stress-test the library.

This has helped us identify and resolve a number of bugs, in local
testing. Now that they seem to be fixed, we can merge to master to
prevent regressions.

Bonus cleanups on @njgheorghita 's original change:
- Change the high end of the range to equal the number of concurrent
  streams
- Log showing the transfer speed in socket test
- Run an optimized build of utp when testing, for more performance (and
  interesting results) in the high-concurrency tests
- Return from the test as soon as an error is found, cancelling all
  in-flight transfers
  • Loading branch information
carver committed Sep 4, 2023
1 parent 4e3be0e commit daaeb19
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 76 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,8 @@ tracing = { version = "0.1.37", features = ["std", "attributes", "log"] }

[dev-dependencies]
quickcheck = "1.0.3"
tracing-subscriber = "0.3.16"
tokio = { version = "1.25.0", features = ["test-util"] }
tracing-subscriber = "0.3.16"

[profile.test]
opt-level = 3
141 changes: 66 additions & 75 deletions tests/socket.rs
Original file line number Diff line number Diff line change
@@ -1,105 +1,96 @@
use futures::stream::{FuturesUnordered, StreamExt};
use std::net::SocketAddr;
use std::sync::Arc;

use tokio::task::JoinHandle;
use tokio::time::Instant;

use utp_rs::cid;
use utp_rs::conn::ConnectionConfig;
use utp_rs::socket::UtpSocket;

#[tokio::test(flavor = "multi_thread")]
const TEST_DATA: &[u8] = &[0xf0; 1_000_000];

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn socket() {
tracing_subscriber::fmt::init();

let conn_config = ConnectionConfig::default();

let data_one = vec![0xef; 8192 * 2 * 2];
let data_one_recv = data_one.clone();
tracing::info!("starting socket test");

let recv_addr = SocketAddr::from(([127, 0, 0, 1], 3400));
let send_addr = SocketAddr::from(([127, 0, 0, 1], 3401));

let recv = UtpSocket::bind(recv_addr).await.unwrap();
let recv = Arc::new(recv);

let send_addr = SocketAddr::from(([127, 0, 0, 1], 3401));
let send = UtpSocket::bind(send_addr).await.unwrap();
let send = Arc::new(send);
let mut handles = FuturesUnordered::new();

let start = Instant::now();
let num_transfers = 2000;
for i in 0..num_transfers {
// step up cid by two to avoid collisions
let handle =
initiate_transfer(i * 2, recv_addr, recv.clone(), send_addr, send.clone()).await;
handles.push(handle.0);
handles.push(handle.1);
}

while let Some(res) = handles.next().await {
res.unwrap();
}
let elapsed = Instant::now() - start;
let megabits_sent = num_transfers as f64 * TEST_DATA.len() as f64 * 8.0 / 1_000_000.0;
let transfer_rate = megabits_sent / elapsed.as_secs_f64();
tracing::info!("finished real udp load test of {} simultaneous transfers, in {:?}, at a rate of {:.0} Mbps", num_transfers, elapsed, transfer_rate);
}

let recv_one_cid = cid::ConnectionId {
send: 100,
recv: 101,
peer: send_addr,
};
let send_one_cid = cid::ConnectionId {
send: 101,
recv: 100,
peer: recv_addr,
};

let recv_arc = Arc::clone(&recv);
let recv_one_handle = tokio::spawn(async move {
let mut stream = recv_arc
.accept_with_cid(recv_one_cid, conn_config)
.await
.unwrap();
let mut buf = vec![];
let n = stream.read_to_eof(&mut buf).await.unwrap();
tracing::info!(cid.send = %recv_one_cid.send, cid.recv = %recv_one_cid.recv, "read {n} bytes from uTP stream");

assert_eq!(n, data_one_recv.len());
assert_eq!(buf, data_one_recv);
});

let send_arc = Arc::clone(&send);
tokio::spawn(async move {
let mut stream = send_arc
.connect_with_cid(send_one_cid, conn_config)
.await
.unwrap();
let n = stream.write(&data_one).await.unwrap();
assert_eq!(n, data_one.len());

let _ = stream.close().await;
});

let data_two = vec![0xfe; 8192 * 2 * 2];
let data_two_recv = data_two.clone();

let recv_two_cid = cid::ConnectionId {
send: 200,
recv: 201,
async fn initiate_transfer(
i: u16,
recv_addr: SocketAddr,
recv: Arc<UtpSocket<SocketAddr>>,
send_addr: SocketAddr,
send: Arc<UtpSocket<SocketAddr>>,
) -> (JoinHandle<()>, JoinHandle<()>) {
let conn_config = ConnectionConfig::default();
let initiator_cid = 100 + i;
let responder_cid = 100 + i + 1;
let recv_cid = cid::ConnectionId {
send: initiator_cid,
recv: responder_cid,
peer: send_addr,
};
let send_two_cid = cid::ConnectionId {
send: 201,
recv: 200,
let send_cid = cid::ConnectionId {
send: responder_cid,
recv: initiator_cid,
peer: recv_addr,
};

let recv_two_handle = tokio::spawn(async move {
let mut stream = recv
.accept_with_cid(recv_two_cid, conn_config)
.await
.unwrap();
let recv_handle = tokio::spawn(async move {
let mut stream = recv.accept_with_cid(recv_cid, conn_config).await.unwrap();
let mut buf = vec![];
let n = stream.read_to_eof(&mut buf).await.unwrap();
tracing::info!(cid.send = %recv_two_cid.send, cid.recv = %recv_two_cid.recv, "read {n} bytes from uTP stream");

assert_eq!(n, data_two_recv.len());
assert_eq!(buf, data_two_recv);
let n = match stream.read_to_eof(&mut buf).await {
Ok(num_bytes) => num_bytes,
Err(err) => {
let cid = stream.cid();
tracing::error!(?cid, "read to eof error: {:?}", err);
panic!("fail to read data");
}
};
tracing::info!(cid.send = %recv_cid.send, cid.recv = %recv_cid.recv, "read {n} bytes from uTP stream");

assert_eq!(n, TEST_DATA.len());
assert_eq!(buf, TEST_DATA);
});

tokio::spawn(async move {
let mut stream = send
.connect_with_cid(send_two_cid, conn_config)
.await
.unwrap();
let n = stream.write(&data_two).await.unwrap();
assert_eq!(n, data_two.len());
let send_handle = tokio::spawn(async move {
let mut stream = send.connect_with_cid(send_cid, conn_config).await.unwrap();
let n = stream.write(TEST_DATA).await.unwrap();
assert_eq!(n, TEST_DATA.len());

let _ = stream.close().await;
stream.close().await.unwrap();
});

let (one, two) = tokio::join!(recv_one_handle, recv_two_handle);
one.unwrap();
two.unwrap();
(send_handle, recv_handle)
}

// Test that a new socket has zero connections
Expand Down

0 comments on commit daaeb19

Please sign in to comment.