Skip to content

Commit

Permalink
timeouts everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
divagant-martian committed Dec 11, 2024
1 parent 4d49858 commit 8593c70
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 50 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pretty_assertions = "1.4.1"
proptest = { version = "1", default-features = false, features = ["std"] }
tempfile = { workspace = true }
testdir = "0.9.0"
testresult = "0.4.1"
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing-subscriber.workspace = true

Expand Down
111 changes: 61 additions & 50 deletions src/peer_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ async fn subscribe_loop(

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::{
Expand Down Expand Up @@ -741,25 +740,34 @@ mod tests {
assert!(alice.iroh.read().await.is_none());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_can_reconnect() {
tokio::time::timeout(Duration::from_secs(20), test_can_reconnect_impl())
.await
.unwrap();
trait Timeout<O, F: std::future::IntoFuture<Output = O>> {
fn t(self) -> impl std::future::Future<Output = anyhow::Result<O>>;
}

const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl<O, F: std::future::IntoFuture<Output = O>> Timeout<O, F> for F {
async fn t(self) -> anyhow::Result<O> {
let out = tokio::time::timeout(TIMEOUT, self).await?;
anyhow::Ok(out)
}
}

async fn test_can_reconnect_impl() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_can_reconnect() -> testresult::TestResult {
tracing_subscriber::fmt::init();
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
let alice = &mut tcm.alice().t().await?;
let bob = &mut tcm.bob().t().await?;

assert!(alice
.get_config_bool(Config::WebxdcRealtimeEnabled)
.await
.unwrap());
assert!(
alice
.get_config_bool(Config::WebxdcRealtimeEnabled)
.t()
.await??
);
// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).await;
let alice_chat = alice.create_chat(bob).t().await?;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
Expand All @@ -768,30 +776,32 @@ mod tests {
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.await
.unwrap();
.t()
.await??;

send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
let alice_webxdc = alice.get_last_msg().await;
send_msg(alice, alice_chat.id, &mut instance).t().await??;
let alice_webxdc = alice.get_last_msg().t().await?;
assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);

let webxdc = alice.pop_sent_msg().await;
let webxdc = alice.pop_sent_msg().t().await?;
let bob_webxdc = bob.recv_msg(&webxdc).await;
assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);

bob_webxdc.chat_id.accept(bob).await.unwrap();
bob_webxdc.chat_id.accept(bob).t().await??;

// Alice advertises herself.
send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
.await
.unwrap();
.t()
.await??;

bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
bob.recv_msg_trash(&alice.pop_sent_msg().t().await?)
.t()
.await?;

// Bob adds alice to gossip peers.
let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
.await
.unwrap()
.t()
.await??
.into_iter()
.map(|addr| addr.node_id)
.collect::<Vec<_>>();
Expand All @@ -801,36 +811,36 @@ mod tests {
vec![
alice
.get_or_try_init_peer_channel()
.await
.unwrap()
.t()
.await??
.get_node_addr()
.await
.unwrap()
.t()
.await??
.node_id
]
);

bob.get_or_try_init_peer_channel()
.await
.unwrap()
.t()
.await??
.join_and_subscribe_gossip(bob, bob_webxdc.id)
.await
.unwrap()
.t()
.await??
.unwrap()
.await
.unwrap();
.t()
.await??;

// Alice sends ephemeral message
alice
.get_or_try_init_peer_channel()
.await
.unwrap()
.t()
.await??
.send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
.await
.unwrap();
.t()
.await??;

loop {
let event = bob.evtracker.recv().await.unwrap();
let event = bob.evtracker.recv().t().await?.unwrap();
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
if data == "alice -> bob".as_bytes() {
break;
Expand All @@ -844,8 +854,8 @@ mod tests {
}

let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
.await
.unwrap()
.t()
.await??
.unwrap();
let bob_sequence_number = bob
.iroh
Expand All @@ -872,14 +882,14 @@ mod tests {
assert_eq!(bob_sequence_number, bob_sequence_number_after);

bob.get_or_try_init_peer_channel()
.await
.unwrap()
.t()
.await??
.join_and_subscribe_gossip(bob, bob_webxdc.id)
.await
.unwrap()
.t()
.await??
.unwrap()
.await
.unwrap();
.t()
.await??;

bob.get_or_try_init_peer_channel()
.await
Expand Down Expand Up @@ -920,8 +930,8 @@ mod tests {
);
leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
.await
.unwrap()
.t()
.await??
.unwrap();
assert!(alice
.iroh
Expand All @@ -934,6 +944,7 @@ mod tests {
.await
.get(&topic)
.is_none());
testresult::TestResult::Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down

0 comments on commit 8593c70

Please sign in to comment.