diff --git a/iroh-blobs/src/format/collection.rs b/iroh-blobs/src/format/collection.rs index ab13572cc1..2e4966308f 100644 --- a/iroh-blobs/src/format/collection.rs +++ b/iroh-blobs/src/format/collection.rs @@ -84,7 +84,7 @@ impl Collection { /// /// To persist the collection, write all the blobs to storage, and use the /// hash of the last blob as the collection hash. - pub fn to_blobs(&self) -> impl Iterator { + pub fn to_blobs(&self) -> impl DoubleEndedIterator { let meta = CollectionMeta { header: *Self::HEADER, names: self.names(), diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 49d0a43abd..9d1e42fc33 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -703,7 +703,7 @@ pub enum ImportProgress { /// does not make any sense. E.g. an in memory implementation will always have /// to copy the file into memory. Also, a disk based implementation might choose /// to copy small files even if the mode is `Reference`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] pub enum ImportMode { /// This mode will copy the file into the database before hashing. /// diff --git a/iroh-net/src/net/interfaces/bsd.rs b/iroh-net/src/net/interfaces/bsd.rs index dd6ca7e3ca..7ef0cd1eb0 100644 --- a/iroh-net/src/net/interfaces/bsd.rs +++ b/iroh-net/src/net/interfaces/bsd.rs @@ -300,7 +300,7 @@ impl WireFormat { Ok(Some(WireMessage::Route(m))) } - #[cfg(any(target_os = "openbsd",))] + #[cfg(target_os = "openbsd")] MessageType::Route => { if data.len() < self.body_off { return Err(RouteError::MessageTooShort); diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 33be81a1cf..2eea0479c2 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -11,6 +11,7 @@ use std::{ use anyhow::{anyhow, Context as _, Result}; use bytes::Bytes; +use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use futures_util::{FutureExt, SinkExt}; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; @@ -36,16 +37,18 @@ use tracing::warn; use crate::rpc_protocol::{ BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, - BatchCreateRequest, BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddStreamRequest, - BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, - BlobExportRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, - BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, - BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, - NodeStatusRequest, RpcService, SetTagOption, + BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, BatchUpdate, + BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, + BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest, + BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, + BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, + CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, }; use super::{flatten, Iroh}; +pub use iroh_blobs::store::ImportMode; + /// Iroh blobs client. #[derive(Debug, Clone)] pub struct Client { @@ -399,8 +402,12 @@ struct BatchInner> { updates: Mutex>, } +/// A batch for write operations. /// - +/// This serves mostly as a scope for temporary tags. +/// +/// It is not a transaction, so things in a batch are not atomic. Also, there is +/// no isolation between batches. #[derive(derive_more::Debug)] pub struct Batch>(Arc>); @@ -412,6 +419,120 @@ impl> TagDrop for BatchInner { } impl> Batch { + /// Write a blob by passing bytes. + pub async fn add_bytes(&self, bytes: impl Into, format: BlobFormat) -> Result { + let input = futures_lite::stream::once(Ok(bytes.into())); + self.add_stream(input, format).await + } + + /// Import a blob from a filesystem path. + /// + /// `path` should be an absolute path valid for the file system on which + /// the node runs, which refers to a file. + /// + /// If `import_mode` is TryReference, Iroh will assume that the data will not + /// change and will share it in place without copying to the Iroh data directory + /// if appropriate. However, for tiny files, Iroh will copy the data. + /// + /// If `import_mode` is Copy, Iroh will always copy the data. + /// + /// Will return a temp tag for the added blob, as well as the size of the file. + pub async fn add_file( + &self, + path: PathBuf, + import_mode: ImportMode, + format: BlobFormat, + ) -> Result<(TempTag, u64)> { + anyhow::ensure!( + path.is_absolute(), + "Path must be absolute, but got: {:?}", + path + ); + anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path); + let mut stream = self + .0 + .rpc + .server_streaming(BatchAddPathRequest { + path, + import_mode, + format, + scope: self.0.scope, + }) + .await?; + let mut res_hash = None; + let mut res_size = None; + while let Some(item) = stream.next().await { + match item?.0 { + BatchAddPathProgress::Abort(cause) => { + Err(cause)?; + } + BatchAddPathProgress::Done { hash } => { + res_hash = Some(hash); + } + BatchAddPathProgress::Found { size } => { + res_size = Some(size); + } + _ => {} + } + } + let hash = res_hash.context("Missing hash")?; + let size = res_size.context("Missing size")?; + Ok((self.local_temp_tag(HashAndFormat { hash, format }), size)) + } + + /// Add a directory as a hashseq in collection format + pub async fn add_dir( + &self, + root: PathBuf, + import_mode: ImportMode, + wrap: WrapOption, + ) -> Result { + anyhow::ensure!(root.is_absolute(), "Path must be absolute",); + anyhow::ensure!(root.is_dir(), "Path must be a directory",); + + // let (send, recv) = flume::bounded(32); + // let import_progress = FlumeProgressSender::new(send); + + // import all files below root recursively + let data_sources = crate::util::fs::scan_path(root, wrap)?; + const IO_PARALLELISM: usize = 4; + let result: Vec<_> = futures_lite::stream::iter(data_sources) + .map(|source| { + // let import_progress = import_progress.clone(); + async move { + let name = source.name().to_string(); + let (tag, size) = self + .add_file(source.path().to_owned(), import_mode, BlobFormat::Raw) + .await?; + let hash = *tag.hash(); + anyhow::Ok((name, hash, size, tag)) + } + }) + .buffered_ordered(IO_PARALLELISM) + .try_collect() + .await?; + println!("{:?}", result); + + // create a collection + let (collection, child_tags): (Collection, Vec<_>) = result + .into_iter() + .map(|(name, hash, _, tag)| ((name, hash), tag)) + .unzip(); + + let tag = self.add_collection(collection).await?; + drop(child_tags); + Ok(tag) + } + + /// Add a collection + /// + /// This is a convenience function that converts the collection into two blobs + /// (the metadata and the hash sequence) and adds them, returning a temp tag for + /// the hash sequence. + pub async fn add_collection(&self, collection: Collection) -> Result { + self.add_blob_seq(collection.to_blobs()).await + } + /// Write a blob by passing an async reader. pub async fn add_reader( &self, @@ -423,12 +544,6 @@ impl> Batch { self.add_stream(input, format).await } - /// Write a blob by passing bytes. - pub async fn add_bytes(&self, bytes: impl Into, format: BlobFormat) -> Result { - let input = futures_lite::stream::once(Ok(bytes.into())); - self.add_stream(input, format).await - } - /// Write a blob by passing a stream of bytes. pub async fn add_stream( &self, @@ -473,51 +588,59 @@ impl> Batch { BatchAddStreamResponse::Result { hash } => { res = Some(hash); } + _ => {} } } let hash = res.context("Missing answer")?; - Ok(self.temp_tag(HashAndFormat { hash, format })) + println!( + "creating temp tag with hash {:?} and format {}", + hash, format + ); + Ok(self.local_temp_tag(HashAndFormat { hash, format })) } - /// Import a blob from a filesystem path. + /// Add a sequence of blobs, where the last is a hash sequence. /// - /// `path` should be an absolute path valid for the file system on which - /// the node runs. - /// If `in_place` is true, Iroh will assume that the data will not change and will share it in - /// place without copying to the Iroh data directory. - pub async fn add_from_path( - &self, - path: PathBuf, - in_place: bool, - format: BlobFormat, - ) -> Result { - let mut stream = self - .0 + /// It is a common pattern in iroh to have a hash sequence with one or more + /// blobs of metadata, and the remaining blobs being the actual data. E.g. + /// a collection is a hash sequence where the first child is the metadata. + pub async fn add_blob_seq(&self, iter: impl Iterator) -> Result { + let mut blobs = iter.peekable(); + let mut res = vec![]; + let res = loop { + let blob = blobs.next().context("Failed to get next blob")?; + if blobs.peek().is_none() { + println!("last blob"); + break self.add_bytes(blob, BlobFormat::HashSeq).await?; + } else { + res.push(self.add_bytes(blob, BlobFormat::Raw).await?); + } + }; + Ok(res) + } + + /// Create a temp tag to protect some content (blob or hashseq) from being deleted. + /// + /// A typical use case is that you are downloading some data and want to protect it + /// from deletion while the download is ongoing, but don't want to protect it permanently + /// until the download is completed. + pub async fn temp_tag(&self, content: HashAndFormat) -> Result { + // Notify the server that we want one temp tag for the given content + self.0 .rpc - .server_streaming(BatchAddPathRequest { - path, - in_place, - format, + .rpc(BatchCreateTempTagRequest { scope: self.0.scope, + content, }) - .await?; - let mut res = None; - while let Some(item) = stream.next().await { - match item?.0 { - BatchAddPathProgress::Abort(cause) => { - Err(cause)?; - } - BatchAddPathProgress::Done { hash } => { - res = Some(hash); - } - _ => {} - } - } - let hash = res.context("Missing answer")?; - Ok(self.temp_tag(HashAndFormat { hash, format })) + .await??; + // Only after success of the above call, we can create the corresponding local temp tag + Ok(self.local_temp_tag(content)) } - fn temp_tag(&self, inner: HashAndFormat) -> TempTag { + /// Creates a temp tag for the given hash and format, without notifying the server. + /// + /// Caution: only do this for data for which you know the server side has created a temp tag. + fn local_temp_tag(&self, inner: HashAndFormat) -> TempTag { let on_drop: Arc = self.0.clone(); let on_drop = Some(Arc::downgrade(&on_drop)); TempTag::new(inner, on_drop) diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 112a7868ca..9c96fccf24 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -91,12 +91,19 @@ impl BlobScopes { fn store(&mut self, scope: u64, tt: TempTag) { let entry = self.scopes.entry(scope).or_default(); let count = entry.tags.entry(tt.hash_and_format()).or_default(); + println!( + "storing tag {:?} {} in scope {}", + tt.hash(), + tt.format(), + scope + ); tt.leak(); *count += 1; } /// Remove a tag from a scope. fn remove_one(&mut self, scope: u64, content: &HashAndFormat, u: Option<&dyn TagDrop>) { + println!("removing tag {:?} from scope {}", content, scope); if let Some(scope) = self.scopes.get_mut(&scope) { if let Some(counter) = scope.tags.get_mut(content) { *counter -= 1; diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index e6093abec0..e958bf5ac8 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -41,18 +41,18 @@ use crate::client::tags::TagInfo; use crate::client::NodeStatus; use crate::rpc_protocol::{ BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, - BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchUpdate, BlobAddPathRequest, - BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, - BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, - BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse, - BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, - BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, - CreateTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, - DocImportFileResponse, DocSetHashRequest, ListTagsRequest, NodeAddrRequest, - NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, - NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, - NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, NodeWatchResponse, - Request, RpcService, SetTagOption, SetTagRequest, + BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, + BatchUpdate, BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, + BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, + BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, BlobExportResponse, + BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, + BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, + BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, CreateTagRequest, + DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, + DocSetHashRequest, ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, + NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, + NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, + NodeWatchRequest, NodeWatchResponse, Request, RpcService, SetTagOption, SetTagRequest, }; use super::NodeInner; @@ -102,7 +102,10 @@ impl Handler { } CreateCollection(msg) => chan.rpc(msg, handler, Self::create_collection).await, BlobGetCollection(msg) => chan.rpc(msg, handler, Self::blob_get_collection).await, - BatchAddStreamRequest(msg) => { + BatchCreateTempTag(msg) => { + chan.rpc(msg, handler, Self::batch_create_temp_tag).await + } + BatchAddStream(msg) => { chan.bidi_streaming(msg, handler, Self::batch_add_stream) .await } @@ -800,8 +803,6 @@ impl Handler { msg: BatchAddPathRequest, progress: flume::Sender, ) -> anyhow::Result<()> { - use iroh_blobs::store::ImportMode; - let progress = FlumeProgressSender::new(progress); // convert import progress to provide progress let import_progress = progress.clone().with_filter_map(move |x| match x { @@ -814,7 +815,7 @@ impl Handler { }); let BatchAddPathRequest { path: root, - in_place, + import_mode, format, scope, } = msg; @@ -825,12 +826,6 @@ impl Handler { "trying to add missing path: {}", root.display() ); - - let import_mode = match in_place { - true => ImportMode::TryReference, - false => ImportMode::Copy, - }; - let (tag, _) = self .inner .db @@ -934,6 +929,13 @@ impl Handler { futures_lite::stream::once(BatchCreateResponse::Id(scope_id)) } + #[allow(clippy::unused_async)] + async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { + let tag = self.inner.db.temp_tag(msg.content); + self.inner.blob_scopes.lock().unwrap().store(msg.scope, tag); + Ok(()) + } + fn batch_add_stream( self, msg: BatchAddStreamRequest, @@ -985,12 +987,15 @@ impl Handler { }); let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddStreamResponse::OutboardProgress { offset }) + } _ => None, }); let (temp_tag, _len) = self .inner .db - .import_stream(stream, BlobFormat::Raw, import_progress) + .import_stream(stream, msg.format, import_progress) .await?; let hash = temp_tag.inner().hash; self.inner diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 939c37f4d6..da4dba5f7e 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -16,7 +16,7 @@ pub use iroh_blobs::{export::ExportProgress, get::db::DownloadProgress, BlobForm use iroh_blobs::{ format::collection::Collection, provider::BatchAddPathProgress, - store::{BaoBlobSize, ConsistencyCheckProgress}, + store::{BaoBlobSize, ConsistencyCheckProgress, ImportMode}, util::Tag, HashAndFormat, }; @@ -1057,6 +1057,19 @@ impl BidiStreamingMsg for BlobAddStreamRequest { #[derive(Debug, Serialize, Deserialize, derive_more::Into)] pub struct BlobAddStreamResponse(pub AddProgress); +/// Create a temp tag with a given hash and format +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateTempTagRequest { + /// Content to protect + pub content: HashAndFormat, + /// Scope to create the temp tag in + pub scope: u64, +} + +impl RpcMsg for BatchCreateTempTagRequest { + type Response = RpcResult<()>; +} + /// Write a blob from a byte stream #[derive(Serialize, Deserialize, Debug)] pub struct BatchAddStreamRequest { @@ -1088,6 +1101,7 @@ impl BidiStreamingMsg for BatchAddStreamRequest { #[derive(Debug, Serialize, Deserialize)] pub enum BatchAddStreamResponse { Abort(RpcError), + OutboardProgress { offset: u64 }, Result { hash: Hash }, } @@ -1097,7 +1111,7 @@ pub struct BatchAddPathRequest { /// The path to the data to provide. pub path: PathBuf, /// Add the data in place - pub in_place: bool, + pub import_mode: ImportMode, /// What format to use for the blob pub format: BlobFormat, /// Scope to create the temp tag in @@ -1175,7 +1189,8 @@ pub enum Request { BatchCreate(BatchCreateRequest), BatchUpdate(BatchUpdate), - BatchAddStreamRequest(BatchAddStreamRequest), + BatchCreateTempTag(BatchCreateTempTagRequest), + BatchAddStream(BatchAddStreamRequest), BatchAddStreamUpdate(BatchAddStreamUpdate), BatchAddPath(BatchAddPathRequest), @@ -1241,8 +1256,7 @@ pub enum Response { CreateCollection(RpcResult), BlobGetCollection(RpcResult), - BatchCreateResponse(BatchCreateResponse), - BatchRequest(BatchCreateRequest), + BatchCreate(BatchCreateResponse), BatchAddStream(BatchAddStreamResponse), BatchAddPath(BatchAddPathResponse), diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs index 8d48565a57..240338d123 100644 --- a/iroh/tests/batch.rs +++ b/iroh/tests/batch.rs @@ -1,6 +1,7 @@ use std::time::Duration; use bao_tree::blake3; +use iroh::client::blobs::{ImportMode, WrapOption}; use iroh::node::GcPolicy; use iroh_blobs::{store::mem::Store, BlobFormat}; @@ -11,6 +12,10 @@ async fn create_node() -> anyhow::Result> { .await } +async fn wait_for_gc() { + tokio::time::sleep(Duration::from_millis(50)).await; +} + #[tokio::test] async fn test_batch_create_1() -> anyhow::Result<()> { let node = create_node().await?; @@ -22,12 +27,12 @@ async fn test_batch_create_1() -> anyhow::Result<()> { let hash = *tag.hash(); assert_eq!(hash, expected_hash); // Check that the store has the data and that it is protected from gc - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; let data = client.read_to_bytes(hash).await?; assert_eq!(data.as_ref(), expected_data); drop(tag); // Check that the store drops the data when the temp tag gets dropped - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; assert!(client.read_to_bytes(hash).await.is_err()); Ok(()) } @@ -43,18 +48,44 @@ async fn test_batch_create_2() -> anyhow::Result<()> { let hash = *tag.hash(); assert_eq!(hash, expected_hash); // Check that the store has the data and that it is protected from gc - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; let data = client.read_to_bytes(hash).await?; assert_eq!(data.as_ref(), expected_data); drop(batch); // Check that the store drops the data when the temp tag gets dropped - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; assert!(client.read_to_bytes(hash).await.is_err()); Ok(()) } #[tokio::test] -async fn test_batch_create_from_path_1() -> anyhow::Result<()> { +async fn test_batch_create_3() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs; + let batch = client.batch().await?; + let expected_data: &[u8] = b"test"; + let expected_hash = blake3::hash(expected_data).into(); + let tag = batch.add_bytes(expected_data, BlobFormat::Raw).await?; + let hash = *tag.hash(); + assert_eq!(hash, expected_hash); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.read_to_bytes(hash).await.is_ok()); + // Create an additional temp tag for the same data + let tag2 = batch.temp_tag(tag.hash_and_format()).await?; + drop(tag); + // Check that the data is still present + wait_for_gc().await; + assert!(client.read_to_bytes(hash).await.is_ok()); + drop(tag2); + // Check that the data is gone since both temp tags are dropped + wait_for_gc().await; + assert!(client.read_to_bytes(hash).await.is_err()); + Ok(()) +} + +#[tokio::test] +async fn test_batch_add_file_1() -> anyhow::Result<()> { let node = create_node().await?; let client = &node.client().blobs; let batch = client.batch().await?; @@ -63,24 +94,24 @@ async fn test_batch_create_from_path_1() -> anyhow::Result<()> { let expected_hash = blake3::hash(expected_data).into(); let temp_path = dir.path().join("test"); std::fs::write(&temp_path, expected_data)?; - let tag = batch - .add_from_path(temp_path, false, BlobFormat::Raw) + let (tag, _) = batch + .add_file(temp_path, ImportMode::Copy, BlobFormat::Raw) .await?; let hash = *tag.hash(); assert_eq!(hash, expected_hash); // Check that the store has the data and that it is protected from gc - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; let data = client.read_to_bytes(hash).await?; assert_eq!(data.as_ref(), expected_data); drop(tag); // Check that the store drops the data when the temp tag gets dropped - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; assert!(client.read_to_bytes(hash).await.is_err()); Ok(()) } #[tokio::test] -async fn test_batch_create_from_path_2() -> anyhow::Result<()> { +async fn test_batch_add_file_2() -> anyhow::Result<()> { let node = create_node().await?; let client = &node.client().blobs; let batch = client.batch().await?; @@ -89,18 +120,56 @@ async fn test_batch_create_from_path_2() -> anyhow::Result<()> { let expected_hash = blake3::hash(expected_data).into(); let temp_path = dir.path().join("test"); std::fs::write(&temp_path, expected_data)?; - let tag = batch - .add_from_path(temp_path, false, BlobFormat::Raw) + let (tag, _) = batch + .add_file(temp_path, ImportMode::Copy, BlobFormat::Raw) .await?; let hash = *tag.hash(); assert_eq!(hash, expected_hash); // Check that the store has the data and that it is protected from gc - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; let data = client.read_to_bytes(hash).await?; assert_eq!(data.as_ref(), expected_data); drop(batch); // Check that the store drops the data when the temp tag gets dropped - tokio::time::sleep(Duration::from_millis(50)).await; + wait_for_gc().await; assert!(client.read_to_bytes(hash).await.is_err()); Ok(()) } + +#[tokio::test] +async fn test_batch_add_dir_1() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs; + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; + for (name, content) in &data { + let temp_path = dir.path().join(name); + std::fs::write(&temp_path, content)?; + } + let tag = batch + .add_dir(dir.path().to_owned(), ImportMode::Copy, WrapOption::NoWrap) + .await?; + let check_present = || async { + assert!(client.read_to_bytes(*tag.hash()).await.is_ok()); + for (_, content) in &data { + let hash = blake3::hash(content).into(); + let data = client.read_to_bytes(hash).await?; + assert_eq!(data.as_ref(), *content); + } + anyhow::Ok(()) + }; + // Check that the store has the data immediately + check_present().await?; + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + check_present().await?; + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + for (_, content) in &data { + let hash = blake3::hash(content).into(); + assert!(client.read_to_bytes(hash).await.is_err()); + } + Ok(()) +}