Skip to content

Commit

Permalink
revert messing with test
Browse files Browse the repository at this point in the history
  • Loading branch information
divagant-martian committed Dec 11, 2024
1 parent 933f9e6 commit a609e99
Showing 1 changed file with 44 additions and 65 deletions.
109 changes: 44 additions & 65 deletions src/peer_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ async fn subscribe_loop(

#[cfg(test)]
mod tests {

use super::*;
use crate::{
chat::send_msg,
Expand Down Expand Up @@ -735,34 +734,18 @@ mod tests {
assert!(alice.iroh.read().await.is_none());
}

trait Timeout<O, F: std::future::IntoFuture<Output = O>> {
fn t(self) -> impl std::future::Future<Output = anyhow::Result<O>>;
}

const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl<O, F: std::future::IntoFuture<Output = O>> Timeout<O, F> for F {
async fn t(self) -> anyhow::Result<O> {
let out = tokio::time::timeout(TIMEOUT, self).await?;
anyhow::Ok(out)
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_can_reconnect() -> testresult::TestResult {
tracing_subscriber::fmt::init();
async fn test_can_reconnect() {
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().t().await?;
let bob = &mut tcm.bob().t().await?;
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;

assert!(
alice
.get_config_bool(Config::WebxdcRealtimeEnabled)
.t()
.await??
);
assert!(alice
.get_config_bool(Config::WebxdcRealtimeEnabled)
.await
.unwrap());
// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).t().await?;
let alice_chat = alice.create_chat(bob).await;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
Expand All @@ -771,32 +754,30 @@ mod tests {
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.t()
.await??;
.await
.unwrap();

send_msg(alice, alice_chat.id, &mut instance).t().await??;
let alice_webxdc = alice.get_last_msg().t().await?;
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
let alice_webxdc = alice.get_last_msg().await;
assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);

let webxdc = alice.pop_sent_msg().t().await?;
let webxdc = alice.pop_sent_msg().await;
let bob_webxdc = bob.recv_msg(&webxdc).await;
assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);

bob_webxdc.chat_id.accept(bob).t().await??;
bob_webxdc.chat_id.accept(bob).await.unwrap();

// Alice advertises herself.
send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
.t()
.await??;
.await
.unwrap();

bob.recv_msg_trash(&alice.pop_sent_msg().t().await?)
.t()
.await?;
bob.recv_msg_trash(&alice.pop_sent_msg().await).await;

// Bob adds alice to gossip peers.
let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
.t()
.await??
.await
.unwrap()
.into_iter()
.map(|addr| addr.node_id)
.collect::<Vec<_>>();
Expand All @@ -806,36 +787,36 @@ mod tests {
vec![
alice
.get_or_try_init_peer_channel()
.t()
.await??
.await
.unwrap()
.get_node_addr()
.t()
.await??
.await
.unwrap()
.node_id
]
);

bob.get_or_try_init_peer_channel()
.t()
.await??
.await
.unwrap()
.join_and_subscribe_gossip(bob, bob_webxdc.id)
.t()
.await??
.await
.unwrap()
.t()
.await??;
.unwrap()
.await
.unwrap();

// Alice sends ephemeral message
alice
.get_or_try_init_peer_channel()
.t()
.await??
.await
.unwrap()
.send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
.t()
.await??;
.await
.unwrap();

loop {
let event = bob.evtracker.recv().t().await?.unwrap();
let event = bob.evtracker.recv().await.unwrap();
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
if data == "alice -> bob".as_bytes() {
break;
Expand All @@ -849,8 +830,8 @@ mod tests {
}

let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
.t()
.await??
.await
.unwrap()
.unwrap();
let bob_sequence_number = bob
.iroh
Expand All @@ -877,14 +858,14 @@ mod tests {
assert_eq!(bob_sequence_number, bob_sequence_number_after);

bob.get_or_try_init_peer_channel()
.t()
.await??
.await
.unwrap()
.join_and_subscribe_gossip(bob, bob_webxdc.id)
.t()
.await??
.await
.unwrap()
.unwrap()
.t()
.await??;
.await
.unwrap();

bob.get_or_try_init_peer_channel()
.await
Expand Down Expand Up @@ -925,8 +906,8 @@ mod tests {
);
leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
.t()
.await??
.await
.unwrap()
.unwrap();
assert!(alice
.iroh
Expand All @@ -939,12 +920,10 @@ mod tests {
.await
.get(&topic)
.is_none());
testresult::TestResult::Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_parallel_connect() {
eprintln!("START-----");
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
Expand Down

0 comments on commit a609e99

Please sign in to comment.