diff --git a/iroh-blobs/src/provider.rs b/iroh-blobs/src/provider.rs index 54b25151583..9e698ee9085 100644 --- a/iroh-blobs/src/provider.rs +++ b/iroh-blobs/src/provider.rs @@ -153,6 +153,30 @@ pub enum AddProgress { Abort(RpcError), } +/// Progress updates for the batch add operation. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddPathProgress { + /// An item was found with the given size + Found { + /// The size of the entry in bytes. + size: u64, + }, + /// We got progress ingesting the item. + Progress { + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done, and the hash is `hash`. + Done { + /// The hash of the entry. + hash: Hash, + }, + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(RpcError), +} + /// Read the request from the getter. /// /// Will fail if there is an error while reading, if the reader diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 4d5162ac04e..487b22b78d4 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -703,7 +703,7 @@ pub enum ImportProgress { /// does not make any sense. E.g. an in memory implementation will always have /// to copy the file into memory. Also, a disk based implementation might choose /// to copy small files even if the mode is `Reference`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] pub enum ImportMode { /// This mode will copy the file into the database before hashing. /// diff --git a/iroh-blobs/src/util.rs b/iroh-blobs/src/util.rs index 6b70d24c9a5..42f28cb5cfa 100644 --- a/iroh-blobs/src/util.rs +++ b/iroh-blobs/src/util.rs @@ -207,6 +207,11 @@ impl TempTag { self.inner.format } + /// The hash and format of the pinned item + pub fn hash_and_format(&self) -> HashAndFormat { + self.inner + } + /// Keep the item alive until the end of the process pub fn leak(mut self) { // set the liveness tracker to None, so that the refcount is not decreased diff --git a/iroh-cli/src/commands/blobs.rs b/iroh-cli/src/commands/blobs.rs index 6e4c0aeba2e..416d2ec0038 100644 --- a/iroh-cli/src/commands/blobs.rs +++ b/iroh-cli/src/commands/blobs.rs @@ -370,10 +370,15 @@ impl BlobCommands { let (blob_status, size) = match (status, format) { (BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size), - (BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size), + (BlobStatus::Partial { size }, BlobFormat::Raw) => { + ("incomplete blob", size.value()) + } (BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size), (BlobStatus::Partial { size }, BlobFormat::HashSeq) => { - ("incomplete collection", size) + ("incomplete collection", size.value()) + } + (BlobStatus::NotFound, _) => { + return Err(anyhow!("blob is missing")); } }; println!( diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 3f6ae5055ab..e2830fce6f3 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -21,9 +21,11 @@ pub mod gossip; pub mod node; pub mod tags; -/// Iroh rpc client - boxed so that we can have a concrete type. -pub(crate) type RpcClient = - quic_rpc::RpcClient>; +/// Iroh rpc connection - boxed so that we can have a concrete type. +pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection; + +/// Iroh rpc client. +pub(crate) type RpcClient = quic_rpc::RpcClient; /// Iroh client. #[derive(Debug, Clone)] diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index a9a48539b35..94273082023 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -53,7 +53,8 @@ use std::{ task::{Context, Poll}, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context as _, Result}; +use batch::Batch; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; @@ -63,7 +64,7 @@ use iroh_blobs::{ export::ExportProgress as BytesExportProgress, format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, - store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, util::SetTagOption, BlobFormat, Hash, Tag, }; @@ -75,12 +76,14 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; +mod batch; +pub use batch::AddDirOpts; use crate::rpc_protocol::blobs::{ - AddPathRequest, AddStreamRequest, AddStreamUpdate, ConsistencyCheckRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadRequest, - ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse, - ValidateRequest, + AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse, + BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, + DeleteRequest, DownloadRequest, ExportRequest, ListIncompleteRequest, ListRequest, + ReadAtRequest, ReadAtResponse, ValidateRequest, }; use crate::rpc_protocol::node::StatusRequest; @@ -100,6 +103,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient { } impl Client { + /// Check if a blob is completely stored on the node. + /// + /// Note that this will return false for blobs that are partially stored on + /// the node. + pub async fn status(&self, hash: Hash) -> Result { + let status = self.rpc.rpc(BlobStatusRequest { hash }).await??; + Ok(status.0) + } + + /// Check if a blob is completely stored on the node. + /// + /// This is just a convenience wrapper around `status` that returns a boolean. + pub async fn has(&self, hash: Hash) -> Result { + match self.status(hash).await { + Ok(BlobStatus::Complete { .. }) => Ok(true), + Ok(_) => Ok(false), + Err(err) => Err(err), + } + } + + /// Create a new batch for adding data. + /// + /// A batch is a context in which temp tags are created and data is added to the node. Temp tags + /// are automatically deleted when the batch is dropped, leading to the data being garbage collected + /// unless a permanent tag is created for it. + pub async fn batch(&self) -> Result { + let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; + let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??; + let rpc = self.rpc.clone(); + Ok(Batch::new(batch, rpc, updates, 1024)) + } + /// Stream the contents of a a single blob. /// /// Returns a [`Reader`], which can report the size of the blob before reading it. @@ -422,17 +457,6 @@ impl Client { Ok(ticket) } - /// Get the status of a blob. - pub async fn status(&self, hash: Hash) -> Result { - // TODO: this could be implemented more efficiently - let reader = self.read(hash).await?; - if reader.is_complete { - Ok(BlobStatus::Complete { size: reader.size }) - } else { - Ok(BlobStatus::Partial { size: reader.size }) - } - } - fn tags_client(&self) -> tags::Client { tags::Client { rpc: self.rpc.clone(), @@ -447,9 +471,10 @@ impl SimpleStore for Client { } /// Whether to wrap the added data in a collection. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub enum WrapOption { /// Do not wrap the file or directory. + #[default] NoWrap, /// Wrap the file or directory in a collection. Wrap { @@ -459,12 +484,14 @@ pub enum WrapOption { } /// Status information about a blob. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum BlobStatus { + /// The blob is not stored at all. + NotFound, /// The blob is only stored partially. Partial { /// The size of the currently stored partial blob. - size: u64, + size: BaoBlobSize, }, /// The blob is stored completely. Complete { @@ -941,7 +968,6 @@ pub enum DownloadMode { mod tests { use super::*; - use anyhow::Context as _; use rand::RngCore; use tokio::io::AsyncWriteExt; diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs new file mode 100644 index 00000000000..58dd4c1e2bc --- /dev/null +++ b/iroh/src/client/blobs/batch.rs @@ -0,0 +1,458 @@ +use std::{ + io, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; +use futures_buffered::BufferedStreamExt; +use futures_lite::StreamExt; +use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream}; +use iroh_blobs::{ + format::collection::Collection, + provider::BatchAddPathProgress, + store::ImportMode, + util::{SetTagOption, TagDrop}, + BlobFormat, HashAndFormat, Tag, TempTag, +}; +use quic_rpc::client::UpdateSink; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; +use tracing::{debug, warn}; + +use crate::{ + client::{RpcClient, RpcConnection, RpcService}, + rpc_protocol::{ + blobs::{ + BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchId, BatchUpdate, + }, + tags, + }, +}; + +use super::WrapOption; + +/// A scope in which blobs can be added. +#[derive(derive_more::Debug)] +struct BatchInner { + /// The id of the scope. + batch: BatchId, + /// The rpc client. + rpc: RpcClient, + /// The stream to send drop + #[debug(skip)] + updates: Mutex, BatchUpdate>>, +} + +/// A batch for write operations. +/// +/// This serves mostly as a scope for temporary tags. +/// +/// It is not a transaction, so things in a batch are not atomic. Also, there is +/// no isolation between batches. +#[derive(derive_more::Debug)] +pub struct Batch(Arc); + +impl TagDrop for BatchInner { + fn on_drop(&self, content: &HashAndFormat) { + let mut updates = self.updates.lock().unwrap(); + // make a spirited attempt to notify the server that we are dropping the content + // + // this will occasionally fail, but that's acceptable. The temp tags for the batch + // will be cleaned up as soon as the entire batch is dropped. + // + // E.g. a typical scenario is that you create a large array of temp tags, and then + // store them in a hash sequence and then drop the array. You will get many drops + // at the same time, and might get a send failure here. + // + // But that just means that the server will clean up the temp tags when the batch is + // dropped. + updates.feed(BatchUpdate::Drop(*content)).now_or_never(); + updates.flush().now_or_never(); + } +} + +/// Options for adding a file as a blob +#[derive(Debug, Clone, Copy, Default)] +pub struct AddFileOpts { + /// The import mode + pub import_mode: ImportMode, + /// The format of the blob + pub format: BlobFormat, +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddDirOpts { + /// The import mode + pub import_mode: ImportMode, + /// Whether to preserve the directory name + pub wrap: WrapOption, + /// Io parallelism + pub io_parallelism: usize, +} + +impl Default for AddDirOpts { + fn default() -> Self { + Self { + import_mode: ImportMode::TryReference, + wrap: WrapOption::NoWrap, + io_parallelism: 4, + } + } +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddReaderOpts { + /// The format of the blob + pub format: BlobFormat, + /// Size of the chunks to send + pub chunk_size: usize, +} + +impl Default for AddReaderOpts { + fn default() -> Self { + Self { + format: BlobFormat::Raw, + chunk_size: 1024 * 64, + } + } +} + +impl Batch { + pub(super) fn new( + batch: BatchId, + rpc: RpcClient, + updates: UpdateSink, + buffer_size: usize, + ) -> Self { + let updates = updates.buffer(buffer_size); + Self(Arc::new(BatchInner { + batch, + rpc, + updates: updates.into(), + })) + } + + /// Write a blob by passing bytes. + pub async fn add_bytes(&self, bytes: impl Into) -> Result { + self.add_bytes_with_opts(bytes, Default::default()).await + } + + /// Import a blob from a filesystem path, using the default options. + /// + /// For more control, use [`Self::add_file_with_opts`]. + pub async fn add_file(&self, path: PathBuf) -> Result<(TempTag, u64)> { + self.add_file_with_opts(path, AddFileOpts::default()).await + } + + /// Add a directory as a hashseq in iroh collection format + pub async fn add_dir(&self, root: PathBuf) -> Result { + self.add_dir_with_opts(root, Default::default()).await + } + + /// Write a blob by passing an async reader. + /// + /// This will use a default chunk size of 64KB, and a format of [BlobFormat::Raw]. + pub async fn add_reader( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + ) -> anyhow::Result { + self.add_reader_with_opts(reader, Default::default()).await + } + + /// Write a blob by passing a stream of bytes. + pub async fn add_stream( + &self, + input: impl Stream> + Send + Unpin + 'static, + ) -> Result { + self.add_stream_with_opts(input, Default::default()).await + } + + /// Create a temp tag to protect some content (blob or hashseq) from being deleted. + /// + /// A typical use case is that you are downloading some data and want to protect it + /// from deletion while the download is ongoing, but don't want to protect it permanently + /// until the download is completed. + pub async fn temp_tag(&self, content: HashAndFormat) -> Result { + // Notify the server that we want one temp tag for the given content + self.0 + .rpc + .rpc(BatchCreateTempTagRequest { + batch: self.0.batch, + content, + }) + .await??; + // Only after success of the above call, we can create the corresponding local temp tag + Ok(self.local_temp_tag(content, None)) + } + + /// Write a blob by passing an async reader. + /// + /// This produces a stream from the reader with a hardcoded buffer size of 64KB. + pub async fn add_reader_with_opts( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + opts: AddReaderOpts, + ) -> anyhow::Result { + let AddReaderOpts { format, chunk_size } = opts; + let input = ReaderStream::with_capacity(reader, chunk_size); + self.add_stream_with_opts(input, format).await + } + + /// Write a blob by passing bytes. + pub async fn add_bytes_with_opts( + &self, + bytes: impl Into, + format: BlobFormat, + ) -> Result { + let input = futures_lite::stream::once(Ok(bytes.into())); + self.add_stream_with_opts(input, format).await + } + + /// Import a blob from a filesystem path. + /// + /// `path` should be an absolute path valid for the file system on which + /// the node runs, which refers to a file. + /// + /// If you use [ImportMode::TryReference], Iroh will assume that the data will not + /// change and will share it in place without copying to the Iroh data directory + /// if appropriate. However, for tiny files, Iroh will copy the data. + /// + /// If you use [ImportMode::Copy], Iroh will always copy the data. + /// + /// Will return a temp tag for the added blob, as well as the size of the file. + pub async fn add_file_with_opts( + &self, + path: PathBuf, + opts: AddFileOpts, + ) -> Result<(TempTag, u64)> { + let AddFileOpts { + import_mode, + format, + } = opts; + anyhow::ensure!( + path.is_absolute(), + "Path must be absolute, but got: {:?}", + path + ); + anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path); + let mut stream = self + .0 + .rpc + .server_streaming(BatchAddPathRequest { + path, + import_mode, + format, + batch: self.0.batch, + }) + .await?; + let mut res_hash = None; + let mut res_size = None; + while let Some(item) = stream.next().await { + match item?.0 { + BatchAddPathProgress::Abort(cause) => { + Err(cause)?; + } + BatchAddPathProgress::Done { hash } => { + res_hash = Some(hash); + } + BatchAddPathProgress::Found { size } => { + res_size = Some(size); + } + _ => {} + } + } + let hash = res_hash.context("Missing hash")?; + let size = res_size.context("Missing size")?; + Ok(( + self.local_temp_tag(HashAndFormat { hash, format }, Some(size)), + size, + )) + } + + /// Add a directory as a hashseq in iroh collection format + /// + /// This can also be used to add a single file as a collection, if + /// wrap is set to [WrapOption::Wrap]. + /// + /// However, if you want to add a single file as a raw blob, use add_file instead. + pub async fn add_dir_with_opts(&self, root: PathBuf, opts: AddDirOpts) -> Result { + let AddDirOpts { + import_mode, + wrap, + io_parallelism, + } = opts; + anyhow::ensure!(root.is_absolute(), "Path must be absolute"); + + // let (send, recv) = flume::bounded(32); + // let import_progress = FlumeProgressSender::new(send); + + // import all files below root recursively + let data_sources = crate::util::fs::scan_path(root, wrap)?; + let opts = AddFileOpts { + import_mode, + format: BlobFormat::Raw, + }; + let result: Vec<_> = futures_lite::stream::iter(data_sources) + .map(|source| { + // let import_progress = import_progress.clone(); + async move { + let name = source.name().to_string(); + let (tag, size) = self + .add_file_with_opts(source.path().to_owned(), opts) + .await?; + let hash = *tag.hash(); + anyhow::Ok((name, hash, size, tag)) + } + }) + .buffered_ordered(io_parallelism) + .try_collect() + .await?; + + // create a collection + let (collection, child_tags): (Collection, Vec<_>) = result + .into_iter() + .map(|(name, hash, _, tag)| ((name, hash), tag)) + .unzip(); + + let tag = self.add_collection(collection).await?; + drop(child_tags); + Ok(tag) + } + + /// Write a blob by passing a stream of bytes. + /// + /// For convenient interop with common sources of data, this function takes a stream of `io::Result`. + /// If you have raw bytes, you need to wrap them in `io::Result::Ok`. + pub async fn add_stream_with_opts( + &self, + mut input: impl Stream> + Send + Unpin + 'static, + format: BlobFormat, + ) -> Result { + let (mut sink, mut stream) = self + .0 + .rpc + .bidi(BatchAddStreamRequest { + batch: self.0.batch, + format, + }) + .await?; + let mut size = 0u64; + while let Some(item) = input.next().await { + match item { + Ok(chunk) => { + size += chunk.len() as u64; + sink.send(BatchAddStreamUpdate::Chunk(chunk)) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + } + Err(err) => { + warn!("Abort send, reason: failed to read from source stream: {err:?}"); + sink.send(BatchAddStreamUpdate::Abort) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + break; + } + } + } + // this is needed for the remote to notice that the stream is closed + drop(sink); + let mut res = None; + while let Some(item) = stream.next().await { + match item? { + BatchAddStreamResponse::Abort(cause) => { + Err(cause)?; + } + BatchAddStreamResponse::Result { hash } => { + res = Some(hash); + } + _ => {} + } + } + let hash = res.context("Missing answer")?; + Ok(self.local_temp_tag(HashAndFormat { hash, format }, Some(size))) + } + + /// Add a collection + /// + /// This is a convenience function that converts the collection into two blobs + /// (the metadata and the hash sequence) and adds them, returning a temp tag for + /// the hash sequence. + /// + /// Note that this does not guarantee that the data that the collection refers to + /// actually exists. It will just create 2 blobs, the metadata and the hash sequence + /// itself. + pub async fn add_collection(&self, collection: Collection) -> Result { + self.add_blob_seq(collection.to_blobs()).await + } + + /// Add a sequence of blobs, where the last is a hash sequence. + /// + /// It is a common pattern in iroh to have a hash sequence with one or more + /// blobs of metadata, and the remaining blobs being the actual data. E.g. + /// a collection is a hash sequence where the first child is the metadata. + pub async fn add_blob_seq(&self, iter: impl Iterator) -> Result { + let mut blobs = iter.peekable(); + // put the tags somewhere + let mut tags = vec![]; + loop { + let blob = blobs.next().context("Failed to get next blob")?; + if blobs.peek().is_none() { + return self.add_bytes_with_opts(blob, BlobFormat::HashSeq).await; + } else { + tags.push(self.add_bytes(blob).await?); + } + } + } + + /// Upgrade a temp tag to a persistent tag. + pub async fn upgrade(&self, tt: TempTag) -> Result { + let tag = self + .0 + .rpc + .rpc(tags::CreateRequest { + value: tt.hash_and_format(), + batch: Some(self.0.batch), + }) + .await??; + Ok(tag) + } + + /// Upgrade a temp tag to a persistent tag with a specific name. + pub async fn upgrade_to(&self, tt: TempTag, tag: Tag) -> Result<()> { + self.0 + .rpc + .rpc(tags::SetRequest { + name: tag, + value: Some(tt.hash_and_format()), + batch: Some(self.0.batch), + }) + .await??; + Ok(()) + } + + /// Upgrade a temp tag to a persistent tag with either a specific name or + /// an automatically generated name. + pub async fn upgrade_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result { + match opts { + SetTagOption::Auto => self.upgrade(tt).await, + SetTagOption::Named(tag) => { + self.upgrade_to(tt, tag.clone()).await?; + Ok(tag) + } + } + } + + /// Creates a temp tag for the given hash and format, without notifying the server. + /// + /// Caution: only do this for data for which you know the server side has created a temp tag. + fn local_temp_tag(&self, inner: HashAndFormat, _size: Option) -> TempTag { + let on_drop: Arc = self.0.clone(); + let on_drop = Some(Arc::downgrade(&on_drop)); + TempTag::new(inner, on_drop) + } +} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 182f1dcc501..8f1f92c092c 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -35,6 +35,7 @@ //! well, without going through [`client`](crate::client::Iroh)) //! //! To shut down the node, call [`Node::shutdown`]. +use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::{collections::BTreeSet, net::SocketAddr}; @@ -46,6 +47,7 @@ use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::util::local_pool::{LocalPool, LocalPoolHandle}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; +use iroh_blobs::{HashAndFormat, TempTag}; use iroh_gossip::dispatcher::GossipDispatcher; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; @@ -62,6 +64,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use crate::node::nodes_storage::store_node_addrs; use crate::node::{docs::DocsEngine, protocol::ProtocolMap}; +use crate::rpc_protocol::blobs::BatchId; mod builder; mod docs; @@ -117,9 +120,65 @@ struct NodeInner { client: crate::client::Iroh, downloader: Downloader, gossip_dispatcher: GossipDispatcher, + blob_batches: tokio::sync::Mutex, local_pool_handle: LocalPoolHandle, } +/// Keeps track of all the currently active batch operations of the blobs api. +#[derive(Debug, Default)] +struct BlobBatches { + /// Currently active batches + batches: BTreeMap, + /// Used to generate new batch ids. + max: u64, +} + +/// A single batch of blob operations +#[derive(Debug, Default)] +struct BlobBatch { + /// The tags in this batch. + tags: BTreeMap>, +} + +impl BlobBatches { + /// Create a new unique batch id. + fn create(&mut self) -> BatchId { + let id = self.max; + self.max += 1; + BatchId(id) + } + + /// Store a temp tag in a batch identified by a batch id. + fn store(&mut self, batch: BatchId, tt: TempTag) { + let entry = self.batches.entry(batch).or_default(); + entry + .tags + .entry(tt.hash_and_format()) + .or_default() + .push(tt); + } + + /// Remove a tag from a batch. + fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat) -> Result<()> { + if let Some(batch) = self.batches.get_mut(&batch) { + if let Some(tags) = batch.tags.get_mut(content) { + tags.pop(); + if tags.is_empty() { + batch.tags.remove(content); + } + return Ok(()); + } + } + // this can happen if we try to upgrade a tag from an expired batch + anyhow::bail!("tag not found in batch"); + } + + /// Remove an entire batch. + fn remove(&mut self, batch: BatchId) { + self.batches.remove(&batch); + } +} + /// In memory node. pub type MemNode = Node; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 4623261724d..69ecfc8ce85 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -579,6 +579,7 @@ where gossip, gossip_dispatcher, local_pool_handle: lp.handle().clone(), + blob_batches: Default::default(), }); let protocol_builder = ProtocolBuilder { diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 0796a0d86eb..41b0641a56c 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; +use futures_util::FutureExt; use genawaiter::sync::{Co, Gen}; use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; @@ -13,17 +14,18 @@ use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; +use iroh_blobs::provider::BatchAddPathProgress; use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry}; use iroh_blobs::util::local_pool::LocalPoolHandle; use iroh_blobs::util::progress::ProgressSender; use iroh_blobs::util::SetTagOption; -use iroh_blobs::BlobFormat; use iroh_blobs::{ provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, util::progress::FlumeProgressSender, HashAndFormat, }; +use iroh_blobs::{BlobFormat, Tag}; use iroh_io::AsyncSliceReader; use iroh_net::relay::RelayUrl; use iroh_net::{Endpoint, NodeAddr, NodeId}; @@ -32,12 +34,18 @@ use tokio::task::JoinSet; use tokio_util::either::Either; use tracing::{debug, info, warn}; +use crate::client::blobs::BlobStatus; use crate::client::{ blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, tags::TagInfo, NodeStatus, }; use crate::node::{docs::DocsEngine, NodeInner}; +use crate::rpc_protocol::blobs::{ + BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, + BatchUpdate, BlobStatusRequest, BlobStatusResponse, +}; use crate::rpc_protocol::{ authors, blobs, blobs::{ @@ -186,6 +194,16 @@ impl Handler { ReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, AddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, AddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + BlobStatus(msg) => chan.rpc(msg, self, Self::blob_status).await, + BatchCreate(msg) => chan.bidi_streaming(msg, self, Self::batch_create).await, + BatchUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), + BatchAddStream(msg) => chan.bidi_streaming(msg, self, Self::batch_add_stream).await, + BatchAddStreamUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), + BatchAddPath(msg) => { + chan.server_streaming(msg, self, Self::batch_add_from_path) + .await + } + BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await, } } @@ -198,6 +216,8 @@ impl Handler { match msg { ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, + Create(msg) => chan.rpc(msg, self, Self::tags_create_tag).await, + Set(msg) => chan.rpc(msg, self, Self::tags_set_tag).await, } } @@ -433,6 +453,22 @@ impl Handler { self.inner.local_pool_handle.clone() } + async fn blob_status(self, msg: BlobStatusRequest) -> RpcResult { + let entry = self.inner.db.get(&msg.hash).await?; + Ok(BlobStatusResponse(match entry { + Some(entry) => { + if entry.is_complete() { + BlobStatus::Complete { + size: entry.size().value(), + } + } else { + BlobStatus::Partial { size: entry.size() } + } + } + None => BlobStatus::NotFound, + })) + } + async fn blob_list_impl(self, co: &Co>) -> io::Result<()> { use bao_tree::io::fsm::Outboard; @@ -906,6 +942,31 @@ impl Handler { } } + async fn tags_set_tag(self, msg: tags::SetRequest) -> RpcResult<()> { + if let Some(batch) = msg.batch { + if let Some(content) = msg.value.as_ref() { + self.inner + .blob_batches + .lock() + .await + .remove_one(batch, content)?; + } + } + self.inner.db.set_tag(msg.name, msg.value).await?; + Ok(()) + } + + async fn tags_create_tag(self, msg: tags::CreateRequest) -> RpcResult { + if let Some(batch) = msg.batch { + self.inner + .blob_batches + .lock() + .await + .remove_one(batch, &msg.value)?; + } + Ok(self.inner.db.create_tag(msg.value).await?) + } + fn node_watch(self, _: NodeWatchRequest) -> impl Stream { futures_lite::stream::unfold((), |()| async move { tokio::time::sleep(HEALTH_POLL_WAIT).await; @@ -918,6 +979,126 @@ impl Handler { }) } + #[allow(clippy::unused_async)] + async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { + let tag = self.inner.db.temp_tag(msg.content); + self.inner.blob_batches.lock().await.store(msg.batch, tag); + Ok(()) + } + + fn batch_add_stream( + self, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + let (tx, rx) = flume::bounded(32); + let this = self.clone(); + + self.local_pool_handle().spawn_detached(|| async move { + if let Err(err) = this.batch_add_stream0(msg, stream, tx.clone()).await { + tx.send_async(BatchAddStreamResponse::Abort(err.into())) + .await + .ok(); + } + }); + rx.into_stream() + } + + fn batch_add_from_path( + self, + msg: BatchAddPathRequest, + ) -> impl Stream { + // provide a little buffer so that we don't slow down the sender + let (tx, rx) = flume::bounded(32); + let tx2 = tx.clone(); + self.local_pool_handle().spawn_detached(|| async move { + if let Err(e) = self.batch_add_from_path0(msg, tx).await { + tx2.send_async(BatchAddPathProgress::Abort(e.into())) + .await + .ok(); + } + }); + rx.into_stream().map(BatchAddPathResponse) + } + + async fn batch_add_stream0( + self, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + progress: flume::Sender, + ) -> anyhow::Result<()> { + let progress = FlumeProgressSender::new(progress); + + let stream = stream.map(|item| match item { + BatchAddStreamUpdate::Chunk(chunk) => Ok(chunk), + BatchAddStreamUpdate::Abort => { + Err(io::Error::new(io::ErrorKind::Interrupted, "Remote abort")) + } + }); + + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddStreamResponse::OutboardProgress { offset }) + } + _ => None, + }); + let (temp_tag, _len) = self + .inner + .db + .import_stream(stream, msg.format, import_progress) + .await?; + let hash = temp_tag.inner().hash; + self.inner + .blob_batches + .lock() + .await + .store(msg.batch, temp_tag); + progress + .send(BatchAddStreamResponse::Result { hash }) + .await?; + Ok(()) + } + + async fn batch_add_from_path0( + self, + msg: BatchAddPathRequest, + progress: flume::Sender, + ) -> anyhow::Result<()> { + let progress = FlumeProgressSender::new(progress); + // convert import progress to provide progress + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }), + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddPathProgress::Progress { offset }) + } + ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }), + _ => None, + }); + let BatchAddPathRequest { + path: root, + import_mode, + format, + batch, + } = msg; + // Check that the path is absolute and exists. + anyhow::ensure!(root.is_absolute(), "path must be absolute"); + anyhow::ensure!( + root.exists(), + "trying to add missing path: {}", + root.display() + ); + let (tag, _) = self + .inner + .db + .import_file(root, import_mode, format, import_progress) + .await?; + let hash = *tag.hash(); + self.inner.blob_batches.lock().await.store(batch, tag); + + progress.send(BatchAddPathProgress::Done { hash }).await?; + Ok(()) + } + fn blob_add_stream( self, msg: AddStreamRequest, @@ -1051,6 +1232,36 @@ impl Handler { rx.into_stream() } + fn batch_create( + self, + _: BatchCreateRequest, + mut updates: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + async move { + let batch = self.inner.blob_batches.lock().await.create(); + tokio::spawn(async move { + while let Some(item) = updates.next().await { + match item { + BatchUpdate::Drop(content) => { + // this can not fail, since we keep the batch alive. + // therefore it is safe to ignore the result. + let _ = self + .inner + .blob_batches + .lock() + .await + .remove_one(batch, &content); + } + BatchUpdate::Ping => {} + } + } + self.inner.blob_batches.lock().await.remove(batch); + }); + BatchCreateResponse::Id(batch) + } + .into_stream() + } + fn node_connections( self, _: ConnectionsRequest, diff --git a/iroh/src/rpc_protocol/blobs.rs b/iroh/src/rpc_protocol/blobs.rs index a24523a066f..477832d23f3 100644 --- a/iroh/src/rpc_protocol/blobs.rs +++ b/iroh/src/rpc_protocol/blobs.rs @@ -1,23 +1,27 @@ use std::path::PathBuf; use bytes::Bytes; -use iroh_base::hash::Hash; use iroh_base::rpc::RpcResult; +use iroh_base::{hash::Hash, rpc::RpcError}; +use iroh_blobs::provider::BatchAddPathProgress; use iroh_blobs::{ export::ExportProgress, format::collection::Collection, get::db::DownloadProgress, provider::AddProgress, - store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + store::{ + BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode, + ValidateProgress, + }, util::SetTagOption, - BlobFormat, Tag, + BlobFormat, HashAndFormat, Tag, }; use iroh_net::NodeAddr; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; -use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; +use crate::client::blobs::{BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, WrapOption}; use super::RpcService; @@ -49,6 +53,19 @@ pub enum Request { Fsck(ConsistencyCheckRequest), #[rpc(response = RpcResult)] CreateCollection(CreateCollectionRequest), + #[rpc(response = RpcResult)] + BlobStatus(BlobStatusRequest), + + #[bidi_streaming(update = BatchUpdate, response = BatchCreateResponse)] + BatchCreate(BatchCreateRequest), + BatchUpdate(BatchUpdate), + #[bidi_streaming(update = BatchAddStreamUpdate, response = BatchAddStreamResponse)] + BatchAddStream(BatchAddStreamRequest), + BatchAddStreamUpdate(BatchAddStreamUpdate), + #[server_streaming(response = BatchAddPathResponse)] + BatchAddPath(BatchAddPathRequest), + #[rpc(response = RpcResult<()>)] + BatchCreateTempTag(BatchCreateTempTagRequest), } #[allow(missing_docs)] @@ -65,6 +82,10 @@ pub enum Response { Export(ExportResponse), Validate(ValidateProgress), CreateCollection(RpcResult), + BlobStatus(RpcResult), + BatchCreate(BatchCreateResponse), + BatchAddStream(BatchAddStreamResponse), + BatchAddPath(BatchAddPathResponse), } /// A request to the node to provide the data at the given path @@ -235,3 +256,88 @@ pub struct CreateCollectionResponse { /// The resulting tag. pub tag: Tag, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobStatusRequest { + /// The hash of the blob + pub hash: Hash, +} + +/// The response to a status request +#[derive(Debug, Serialize, Deserialize, derive_more::From, derive_more::Into)] +pub struct BlobStatusResponse(pub BlobStatus); + +/// Request to create a new scope for temp tags +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateRequest; + +/// Update to a temp tag scope +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchUpdate { + /// Drop of a remote temp tag + Drop(HashAndFormat), + /// Message to check that the connection is still alive + Ping, +} + +/// Response to a temp tag scope request +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchCreateResponse { + /// We got the id of the scope + Id(BatchId), +} + +/// Create a temp tag with a given hash and format +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateTempTagRequest { + /// Content to protect + pub content: HashAndFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddStreamRequest { + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub enum BatchAddStreamUpdate { + /// A chunk of stream data + Chunk(Bytes), + /// Abort the request due to an error on the client side + Abort, +} + +/// Wrapper around [`AddProgress`]. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddStreamResponse { + Abort(RpcError), + OutboardProgress { offset: u64 }, + Result { hash: Hash }, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathRequest { + /// The path to the data to provide. + pub path: PathBuf, + /// Add the data in place + pub import_mode: ImportMode, + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Response to a batch add path request +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathResponse(pub BatchAddPathProgress); + +#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] +pub struct BatchId(pub(crate) u64); diff --git a/iroh/src/rpc_protocol/tags.rs b/iroh/src/rpc_protocol/tags.rs index bd7f1a016e7..9750a72bf63 100644 --- a/iroh/src/rpc_protocol/tags.rs +++ b/iroh/src/rpc_protocol/tags.rs @@ -1,18 +1,22 @@ use iroh_base::rpc::RpcResult; -use iroh_blobs::Tag; +use iroh_blobs::{HashAndFormat, Tag}; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use crate::client::tags::TagInfo; -use super::RpcService; +use super::{blobs::BatchId, RpcService}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] #[enum_conversions(super::Request)] #[rpc_requests(RpcService)] pub enum Request { + #[rpc(response = RpcResult)] + Create(CreateRequest), + #[rpc(response = RpcResult<()>)] + Set(SetRequest), #[rpc(response = RpcResult<()>)] DeleteTag(DeleteRequest), #[server_streaming(response = TagInfo)] @@ -23,10 +27,31 @@ pub enum Request { #[derive(strum::Display, Debug, Serialize, Deserialize)] #[enum_conversions(super::Response)] pub enum Response { + Create(RpcResult), ListTags(TagInfo), DeleteTag(RpcResult<()>), } +/// Create a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateRequest { + /// Value of the tag + pub value: HashAndFormat, + /// Batch to use, none for global + pub batch: Option, +} + +/// Set or delete a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct SetRequest { + /// Name of the tag + pub name: Tag, + /// Value of the tag, None to delete + pub value: Option, + /// Batch to use, none for global + pub batch: Option, +} + /// List all collections /// /// Lists all collections that have been explicitly added to the database. diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs new file mode 100644 index 00000000000..aaf9fbe5423 --- /dev/null +++ b/iroh/tests/batch.rs @@ -0,0 +1,239 @@ +use std::{io, time::Duration}; + +use bao_tree::blake3; +use bytes::Bytes; +use iroh::{ + client::blobs::{AddDirOpts, WrapOption}, + node::GcPolicy, +}; +use iroh_blobs::store::mem::Store; + +async fn create_node() -> anyhow::Result> { + iroh::node::Node::memory() + .gc_policy(GcPolicy::Interval(Duration::from_millis(10))) + .spawn() + .await +} + +async fn wait_for_gc() { + // wait for multiple gc cycles to ensure that the data is actually gone + tokio::time::sleep(Duration::from_millis(50)).await; +} + +/// Test that add_bytes adds the right data +#[tokio::test] +async fn add_bytes() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(data).into()); + assert_eq!(actual.as_ref(), data); + Ok(()) +} + +/// Test that add_bytes adds the right data +#[tokio::test] +async fn add_stream() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let data_stream = futures_lite::stream::iter([io::Result::Ok(Bytes::copy_from_slice(data))]); + let tag = batch.add_stream(data_stream).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(data).into()); + assert_eq!(actual.as_ref(), data); + Ok(()) +} + +/// Test that add_file adds the right data +#[tokio::test] +async fn add_file() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let temp_path = dir.path().join("test"); + std::fs::write(&temp_path, b"test")?; + let (tag, _) = batch.add_file(temp_path).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(b"test").into()); + assert_eq!(actual.as_ref(), b"test"); + Ok(()) +} + +/// Tests that add_dir adds the right data +#[tokio::test] +async fn add_dir() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; + for (name, content) in &data { + let temp_path = dir.path().join(name); + std::fs::write(&temp_path, content)?; + } + let tag = batch.add_dir(dir.path().to_owned()).await?; + assert!(client.has(*tag.hash()).await?); + for (_, content) in &data { + let hash = blake3::hash(content).into(); + let data = client.read_to_bytes(hash).await?; + assert_eq!(data.as_ref(), *content); + } + Ok(()) +} + +/// Tests that add_dir adds the right data +#[tokio::test] +async fn add_dir_single_file() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let temp_path = dir.path().join("test"); + let data: &[u8] = b"test"; + std::fs::write(&temp_path, data)?; + let tag = batch + .add_dir_with_opts( + temp_path, + AddDirOpts { + wrap: WrapOption::Wrap { name: None }, + ..Default::default() + }, + ) + .await?; + assert!(client.read_to_bytes(*tag.hash()).await.is_ok()); + let hash = blake3::hash(data).into(); + let actual_data = client.read_to_bytes(hash).await?; + assert_eq!(actual_data.as_ref(), data); + Ok(()) +} + +#[tokio::test] +async fn batch_drop() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(batch); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// This checks that dropping a tag makes the data eligible for garbage collection. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn tag_drop_raw() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// Tests that data is preserved if a second temp tag is created for it +/// before the first temp tag is dropped. +#[tokio::test] +async fn temp_tag_copy() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + // Create an additional temp tag for the same data + let tag2 = batch.temp_tag(tag.hash_and_format()).await?; + drop(tag); + // Check that the data is still present + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(tag2); + // Check that the data is gone since both temp tags are dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// Tests that temp tags work properly for hash sequences, using add_dir +/// to add the data. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn tag_drop_hashseq() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; + for (name, content) in &data { + let temp_path = dir.path().join(name); + std::fs::write(&temp_path, content)?; + } + let tag = batch.add_dir(dir.path().to_owned()).await?; + let hash = *tag.hash(); + // weird signature to avoid async move issues + let check_present = |present: &'static bool| async { + assert!(client.has(hash).await? == *present); + for (_, content) in &data { + let hash = blake3::hash(content).into(); + assert!(client.has(hash).await? == *present); + } + anyhow::Ok(()) + }; + // Check that the store has the data immediately after adding it + check_present(&true).await?; + // Check that it is protected from gc + wait_for_gc().await; + check_present(&true).await?; + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + check_present(&false).await?; + Ok(()) +} + +/// This checks that dropping a tag makes the data eligible for garbage collection. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn wrong_batch() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + drop(batch); + let batch = client.batch().await?; + assert!(batch.upgrade(tag).await.is_err()); + Ok(()) +}