From 8593c70843f1f4d7f8b61977ad1a20c1d0b73959 Mon Sep 17 00:00:00 2001 From: Diva M Date: Tue, 10 Dec 2024 13:45:43 -0500 Subject: [PATCH] timeouts everywhere --- Cargo.lock | 7 +++ Cargo.toml | 1 + src/peer_channels.rs | 111 ++++++++++++++++++++++++------------------- 3 files changed, 69 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0aae3a3b8b..ad063b26a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1384,6 +1384,7 @@ dependencies = [ "tagger", "tempfile", "testdir", + "testresult", "textwrap", "thiserror 1.0.69", "tokio", @@ -6284,6 +6285,12 @@ dependencies = [ "whoami", ] +[[package]] +name = "testresult" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614b328ff036a4ef882c61570f72918f7e9c5bee1da33f8e7f91e01daee7e56c" + [[package]] name = "textwrap" version = "0.16.1" diff --git a/Cargo.toml b/Cargo.toml index d3cd141327..c421e6e621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ pretty_assertions = "1.4.1" proptest = { version = "1", default-features = false, features = ["std"] } tempfile = { workspace = true } testdir = "0.9.0" +testresult = "0.4.1" tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tracing-subscriber.workspace = true diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 3397dd950e..4b56fefb62 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -563,7 +563,6 @@ async fn subscribe_loop( #[cfg(test)] mod tests { - use std::time::Duration; use super::*; use crate::{ @@ -741,25 +740,34 @@ mod tests { assert!(alice.iroh.read().await.is_none()); } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_can_reconnect() { - tokio::time::timeout(Duration::from_secs(20), test_can_reconnect_impl()) - .await - .unwrap(); + trait Timeout> { + fn t(self) -> impl std::future::Future>; + } + + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + + impl> Timeout for F { + async fn t(self) -> anyhow::Result { + let out = tokio::time::timeout(TIMEOUT, self).await?; + anyhow::Ok(out) + } } - async fn test_can_reconnect_impl() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_can_reconnect() -> testresult::TestResult { tracing_subscriber::fmt::init(); let mut tcm = TestContextManager::new(); - let alice = &mut tcm.alice().await; - let bob = &mut tcm.bob().await; + let alice = &mut tcm.alice().t().await?; + let bob = &mut tcm.bob().t().await?; - assert!(alice - .get_config_bool(Config::WebxdcRealtimeEnabled) - .await - .unwrap()); + assert!( + alice + .get_config_bool(Config::WebxdcRealtimeEnabled) + .t() + .await?? + ); // Alice sends webxdc to bob - let alice_chat = alice.create_chat(bob).await; + let alice_chat = alice.create_chat(bob).t().await?; let mut instance = Message::new(Viewtype::File); instance .set_file_from_bytes( @@ -768,30 +776,32 @@ mod tests { include_bytes!("../test-data/webxdc/minimal.xdc"), None, ) - .await - .unwrap(); + .t() + .await??; - send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); - let alice_webxdc = alice.get_last_msg().await; + send_msg(alice, alice_chat.id, &mut instance).t().await??; + let alice_webxdc = alice.get_last_msg().t().await?; assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc); - let webxdc = alice.pop_sent_msg().await; + let webxdc = alice.pop_sent_msg().t().await?; let bob_webxdc = bob.recv_msg(&webxdc).await; assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc); - bob_webxdc.chat_id.accept(bob).await.unwrap(); + bob_webxdc.chat_id.accept(bob).t().await??; // Alice advertises herself. send_webxdc_realtime_advertisement(alice, alice_webxdc.id) - .await - .unwrap(); + .t() + .await??; - bob.recv_msg_trash(&alice.pop_sent_msg().await).await; + bob.recv_msg_trash(&alice.pop_sent_msg().t().await?) + .t() + .await?; // Bob adds alice to gossip peers. let members = get_iroh_gossip_peers(bob, bob_webxdc.id) - .await - .unwrap() + .t() + .await?? .into_iter() .map(|addr| addr.node_id) .collect::>(); @@ -801,36 +811,36 @@ mod tests { vec![ alice .get_or_try_init_peer_channel() - .await - .unwrap() + .t() + .await?? .get_node_addr() - .await - .unwrap() + .t() + .await?? .node_id ] ); bob.get_or_try_init_peer_channel() - .await - .unwrap() + .t() + .await?? .join_and_subscribe_gossip(bob, bob_webxdc.id) - .await - .unwrap() + .t() + .await?? .unwrap() - .await - .unwrap(); + .t() + .await??; // Alice sends ephemeral message alice .get_or_try_init_peer_channel() - .await - .unwrap() + .t() + .await?? .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec()) - .await - .unwrap(); + .t() + .await??; loop { - let event = bob.evtracker.recv().await.unwrap(); + let event = bob.evtracker.recv().t().await?.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == "alice -> bob".as_bytes() { break; @@ -844,8 +854,8 @@ mod tests { } let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id) - .await - .unwrap() + .t() + .await?? .unwrap(); let bob_sequence_number = bob .iroh @@ -872,14 +882,14 @@ mod tests { assert_eq!(bob_sequence_number, bob_sequence_number_after); bob.get_or_try_init_peer_channel() - .await - .unwrap() + .t() + .await?? .join_and_subscribe_gossip(bob, bob_webxdc.id) - .await - .unwrap() + .t() + .await?? .unwrap() - .await - .unwrap(); + .t() + .await??; bob.get_or_try_init_peer_channel() .await @@ -920,8 +930,8 @@ mod tests { ); leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap(); let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id) - .await - .unwrap() + .t() + .await?? .unwrap(); assert!(alice .iroh @@ -934,6 +944,7 @@ mod tests { .await .get(&topic) .is_none()); + testresult::TestResult::Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)]