diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs index 8b852b03c7..c3289d03e6 100644 --- a/iroh/tests/batch.rs +++ b/iroh/tests/batch.rs @@ -2,28 +2,36 @@ use std::{io, time::Duration}; use bao_tree::blake3; use bytes::Bytes; +use futures_lite::StreamExt; use iroh::{ client::blobs::{AddDirOpts, WrapOption}, node::GcPolicy, }; use iroh_blobs::store::mem::Store; -async fn create_node() -> anyhow::Result> { - iroh::node::Node::memory() +async fn create_node() -> anyhow::Result<(iroh::node::Node, async_channel::Receiver<()>)> { + let (gc_send, gc_recv) = async_channel::unbounded(); + let node = iroh::node::Node::memory() .gc_policy(GcPolicy::Interval(Duration::from_millis(10))) + .register_gc_done_cb(Box::new(move || { + gc_send.send_blocking(()).ok(); + })) .spawn() - .await + .await?; + Ok((node, gc_recv)) } -async fn wait_for_gc() { - // wait for multiple gc cycles to ensure that the data is actually gone - tokio::time::sleep(Duration::from_millis(50)).await; +async fn wait_for_gc(chan: &mut async_channel::Receiver<()>) { + let _ = chan.drain(); + for _ in 0..5 { + chan.recv().await.unwrap(); + } } /// Test that add_bytes adds the right data #[tokio::test] async fn add_bytes() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; @@ -38,7 +46,7 @@ async fn add_bytes() -> anyhow::Result<()> { /// Test that add_bytes adds the right data #[tokio::test] async fn add_stream() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; @@ -54,7 +62,7 @@ async fn add_stream() -> anyhow::Result<()> { /// Test that add_file adds the right data #[tokio::test] async fn add_file() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -71,7 +79,7 @@ async fn add_file() -> anyhow::Result<()> { /// Tests that add_dir adds the right data #[tokio::test] async fn add_dir() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -93,7 +101,7 @@ async fn add_dir() -> anyhow::Result<()> { /// Tests that add_dir adds the right data #[tokio::test] async fn add_dir_single_file() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -118,18 +126,18 @@ async fn add_dir_single_file() -> anyhow::Result<()> { #[tokio::test] async fn batch_drop() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; let tag = batch.add_bytes(data).await?; let hash = *tag.hash(); // Check that the store has the data and that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); drop(batch); // Check that the store drops the data when the temp tag gets dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(!client.has(hash).await?); Ok(()) } @@ -140,18 +148,18 @@ async fn batch_drop() -> anyhow::Result<()> { /// once the batch is dropped. #[tokio::test] async fn tag_drop_raw() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; let tag = batch.add_bytes(data).await?; let hash = *tag.hash(); // Check that the store has the data and that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); drop(tag); // Check that the store drops the data when the temp tag gets dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(!client.has(hash).await?); Ok(()) } @@ -160,24 +168,24 @@ async fn tag_drop_raw() -> anyhow::Result<()> { /// before the first temp tag is dropped. #[tokio::test] async fn temp_tag_copy() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; let tag = batch.add_bytes(data).await?; let hash = *tag.hash(); // Check that the store has the data and that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); // Create an additional temp tag for the same data let tag2 = batch.temp_tag(tag.hash_and_format()).await?; drop(tag); // Check that the data is still present - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); drop(tag2); // Check that the data is gone since both temp tags are dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(!client.has(hash).await?); Ok(()) } @@ -189,7 +197,7 @@ async fn temp_tag_copy() -> anyhow::Result<()> { /// once the batch is dropped. #[tokio::test] async fn tag_drop_hashseq() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -212,11 +220,11 @@ async fn tag_drop_hashseq() -> anyhow::Result<()> { // Check that the store has the data immediately after adding it check_present(&true).await?; // Check that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; check_present(&true).await?; drop(tag); // Check that the store drops the data when the temp tag gets dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; check_present(&false).await?; Ok(()) } @@ -227,7 +235,7 @@ async fn tag_drop_hashseq() -> anyhow::Result<()> { /// once the batch is dropped. #[tokio::test] async fn wrong_batch() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test";