Skip to content

Commit

Permalink
use register_gc_done_cb for gc tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Aug 15, 2024
1 parent 8ce97a3 commit 483e731
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions iroh/tests/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,36 @@ use std::{io, time::Duration};

use bao_tree::blake3;
use bytes::Bytes;
use futures_lite::StreamExt;
use iroh::{
client::blobs::{AddDirOpts, WrapOption},
node::GcPolicy,
};
use iroh_blobs::store::mem::Store;

async fn create_node() -> anyhow::Result<iroh::node::Node<Store>> {
iroh::node::Node::memory()
async fn create_node() -> anyhow::Result<(iroh::node::Node<Store>, async_channel::Receiver<()>)> {
let (gc_send, gc_recv) = async_channel::unbounded();
let node = iroh::node::Node::memory()
.gc_policy(GcPolicy::Interval(Duration::from_millis(10)))
.register_gc_done_cb(Box::new(move || {
gc_send.send_blocking(()).ok();
}))
.spawn()
.await
.await?;
Ok((node, gc_recv))
}

async fn wait_for_gc() {
// wait for multiple gc cycles to ensure that the data is actually gone
tokio::time::sleep(Duration::from_millis(50)).await;
async fn wait_for_gc(chan: &mut async_channel::Receiver<()>) {
let _ = chan.drain();
for _ in 0..5 {
chan.recv().await.unwrap();
}
}

/// Test that add_bytes adds the right data
#[tokio::test]
async fn add_bytes() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, _) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let data: &[u8] = b"test";
Expand All @@ -38,7 +46,7 @@ async fn add_bytes() -> anyhow::Result<()> {
/// Test that add_bytes adds the right data
#[tokio::test]
async fn add_stream() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, _) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let data: &[u8] = b"test";
Expand All @@ -54,7 +62,7 @@ async fn add_stream() -> anyhow::Result<()> {
/// Test that add_file adds the right data
#[tokio::test]
async fn add_file() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, _) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let dir = tempfile::tempdir()?;
Expand All @@ -71,7 +79,7 @@ async fn add_file() -> anyhow::Result<()> {
/// Tests that add_dir adds the right data
#[tokio::test]
async fn add_dir() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, _) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let dir = tempfile::tempdir()?;
Expand All @@ -93,7 +101,7 @@ async fn add_dir() -> anyhow::Result<()> {
/// Tests that add_dir adds the right data
#[tokio::test]
async fn add_dir_single_file() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, _) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let dir = tempfile::tempdir()?;
Expand All @@ -118,18 +126,18 @@ async fn add_dir_single_file() -> anyhow::Result<()> {

#[tokio::test]
async fn batch_drop() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, mut gc) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let data: &[u8] = b"test";
let tag = batch.add_bytes(data).await?;
let hash = *tag.hash();
// Check that the store has the data and that it is protected from gc
wait_for_gc().await;
wait_for_gc(&mut gc).await;
assert!(client.has(hash).await?);
drop(batch);
// Check that the store drops the data when the temp tag gets dropped
wait_for_gc().await;
wait_for_gc(&mut gc).await;
assert!(!client.has(hash).await?);
Ok(())
}
Expand All @@ -140,18 +148,18 @@ async fn batch_drop() -> anyhow::Result<()> {
/// once the batch is dropped.
#[tokio::test]
async fn tag_drop_raw() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, mut gc) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let data: &[u8] = b"test";
let tag = batch.add_bytes(data).await?;
let hash = *tag.hash();
// Check that the store has the data and that it is protected from gc
wait_for_gc().await;
wait_for_gc(&mut gc).await;
assert!(client.has(hash).await?);
drop(tag);
// Check that the store drops the data when the temp tag gets dropped
wait_for_gc().await;
wait_for_gc(&mut gc).await;
assert!(!client.has(hash).await?);
Ok(())
}
Expand All @@ -160,24 +168,24 @@ async fn tag_drop_raw() -> anyhow::Result<()> {
/// before the first temp tag is dropped.
#[tokio::test]
async fn temp_tag_copy() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, mut gc) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let data: &[u8] = b"test";
let tag = batch.add_bytes(data).await?;
let hash = *tag.hash();
// Check that the store has the data and that it is protected from gc
wait_for_gc().await;
wait_for_gc(&mut gc).await;
assert!(client.has(hash).await?);
// Create an additional temp tag for the same data
let tag2 = batch.temp_tag(tag.hash_and_format()).await?;
drop(tag);
// Check that the data is still present
wait_for_gc().await;
wait_for_gc(&mut gc).await;
assert!(client.has(hash).await?);
drop(tag2);
// Check that the data is gone since both temp tags are dropped
wait_for_gc().await;
wait_for_gc(&mut gc).await;
assert!(!client.has(hash).await?);
Ok(())
}
Expand All @@ -189,7 +197,7 @@ async fn temp_tag_copy() -> anyhow::Result<()> {
/// once the batch is dropped.
#[tokio::test]
async fn tag_drop_hashseq() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, mut gc) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let dir = tempfile::tempdir()?;
Expand All @@ -212,11 +220,11 @@ async fn tag_drop_hashseq() -> anyhow::Result<()> {
// Check that the store has the data immediately after adding it
check_present(&true).await?;
// Check that it is protected from gc
wait_for_gc().await;
wait_for_gc(&mut gc).await;
check_present(&true).await?;
drop(tag);
// Check that the store drops the data when the temp tag gets dropped
wait_for_gc().await;
wait_for_gc(&mut gc).await;
check_present(&false).await?;
Ok(())
}
Expand All @@ -227,7 +235,7 @@ async fn tag_drop_hashseq() -> anyhow::Result<()> {
/// once the batch is dropped.
#[tokio::test]
async fn wrong_batch() -> anyhow::Result<()> {
let node = create_node().await?;
let (node, _) = create_node().await?;
let client = &node.client().blobs();
let batch = client.batch().await?;
let data: &[u8] = b"test";
Expand Down

0 comments on commit 483e731

Please sign in to comment.