From ede3c55efc0b614b7829423af518a81ec0a81057 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 25 Jul 2024 11:08:09 +0300 Subject: [PATCH 01/15] Add back code from the batch PR --- iroh-blobs/src/provider.rs | 24 ++ iroh-blobs/src/store/traits.rs | 2 +- iroh-blobs/src/util.rs | 5 + iroh-cli/src/commands/blobs.rs | 9 +- iroh/src/client.rs | 8 +- iroh/src/client/blobs.rs | 68 +++-- iroh/src/client/blobs/batch.rs | 458 +++++++++++++++++++++++++++++++++ iroh/src/node.rs | 55 ++++ iroh/src/node/builder.rs | 1 + iroh/src/node/rpc.rs | 213 ++++++++++++++- iroh/src/rpc_protocol/blobs.rs | 114 +++++++- iroh/src/rpc_protocol/tags.rs | 29 ++- iroh/tests/batch.rs | 239 +++++++++++++++++ 13 files changed, 1191 insertions(+), 34 deletions(-) create mode 100644 iroh/src/client/blobs/batch.rs create mode 100644 iroh/tests/batch.rs diff --git a/iroh-blobs/src/provider.rs b/iroh-blobs/src/provider.rs index 54b2515158..9e698ee908 100644 --- a/iroh-blobs/src/provider.rs +++ b/iroh-blobs/src/provider.rs @@ -153,6 +153,30 @@ pub enum AddProgress { Abort(RpcError), } +/// Progress updates for the batch add operation. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddPathProgress { + /// An item was found with the given size + Found { + /// The size of the entry in bytes. + size: u64, + }, + /// We got progress ingesting the item. + Progress { + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done, and the hash is `hash`. + Done { + /// The hash of the entry. + hash: Hash, + }, + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(RpcError), +} + /// Read the request from the getter. /// /// Will fail if there is an error while reading, if the reader diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 4d5162ac04..487b22b78d 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -703,7 +703,7 @@ pub enum ImportProgress { /// does not make any sense. E.g. an in memory implementation will always have /// to copy the file into memory. Also, a disk based implementation might choose /// to copy small files even if the mode is `Reference`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] pub enum ImportMode { /// This mode will copy the file into the database before hashing. /// diff --git a/iroh-blobs/src/util.rs b/iroh-blobs/src/util.rs index 6b70d24c9a..42f28cb5cf 100644 --- a/iroh-blobs/src/util.rs +++ b/iroh-blobs/src/util.rs @@ -207,6 +207,11 @@ impl TempTag { self.inner.format } + /// The hash and format of the pinned item + pub fn hash_and_format(&self) -> HashAndFormat { + self.inner + } + /// Keep the item alive until the end of the process pub fn leak(mut self) { // set the liveness tracker to None, so that the refcount is not decreased diff --git a/iroh-cli/src/commands/blobs.rs b/iroh-cli/src/commands/blobs.rs index 6e4c0aeba2..416d2ec003 100644 --- a/iroh-cli/src/commands/blobs.rs +++ b/iroh-cli/src/commands/blobs.rs @@ -370,10 +370,15 @@ impl BlobCommands { let (blob_status, size) = match (status, format) { (BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size), - (BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size), + (BlobStatus::Partial { size }, BlobFormat::Raw) => { + ("incomplete blob", size.value()) + } (BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size), (BlobStatus::Partial { size }, BlobFormat::HashSeq) => { - ("incomplete collection", size) + ("incomplete collection", size.value()) + } + (BlobStatus::NotFound, _) => { + return Err(anyhow!("blob is missing")); } }; println!( diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 3f6ae5055a..e2830fce6f 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -21,9 +21,11 @@ pub mod gossip; pub mod node; pub mod tags; -/// Iroh rpc client - boxed so that we can have a concrete type. -pub(crate) type RpcClient = - quic_rpc::RpcClient>; +/// Iroh rpc connection - boxed so that we can have a concrete type. +pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection; + +/// Iroh rpc client. +pub(crate) type RpcClient = quic_rpc::RpcClient; /// Iroh client. #[derive(Debug, Clone)] diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index a9a48539b3..9427308202 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -53,7 +53,8 @@ use std::{ task::{Context, Poll}, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context as _, Result}; +use batch::Batch; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; @@ -63,7 +64,7 @@ use iroh_blobs::{ export::ExportProgress as BytesExportProgress, format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, - store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, util::SetTagOption, BlobFormat, Hash, Tag, }; @@ -75,12 +76,14 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; +mod batch; +pub use batch::AddDirOpts; use crate::rpc_protocol::blobs::{ - AddPathRequest, AddStreamRequest, AddStreamUpdate, ConsistencyCheckRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadRequest, - ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse, - ValidateRequest, + AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse, + BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, + DeleteRequest, DownloadRequest, ExportRequest, ListIncompleteRequest, ListRequest, + ReadAtRequest, ReadAtResponse, ValidateRequest, }; use crate::rpc_protocol::node::StatusRequest; @@ -100,6 +103,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient { } impl Client { + /// Check if a blob is completely stored on the node. + /// + /// Note that this will return false for blobs that are partially stored on + /// the node. + pub async fn status(&self, hash: Hash) -> Result { + let status = self.rpc.rpc(BlobStatusRequest { hash }).await??; + Ok(status.0) + } + + /// Check if a blob is completely stored on the node. + /// + /// This is just a convenience wrapper around `status` that returns a boolean. + pub async fn has(&self, hash: Hash) -> Result { + match self.status(hash).await { + Ok(BlobStatus::Complete { .. }) => Ok(true), + Ok(_) => Ok(false), + Err(err) => Err(err), + } + } + + /// Create a new batch for adding data. + /// + /// A batch is a context in which temp tags are created and data is added to the node. Temp tags + /// are automatically deleted when the batch is dropped, leading to the data being garbage collected + /// unless a permanent tag is created for it. + pub async fn batch(&self) -> Result { + let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; + let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??; + let rpc = self.rpc.clone(); + Ok(Batch::new(batch, rpc, updates, 1024)) + } + /// Stream the contents of a a single blob. /// /// Returns a [`Reader`], which can report the size of the blob before reading it. @@ -422,17 +457,6 @@ impl Client { Ok(ticket) } - /// Get the status of a blob. - pub async fn status(&self, hash: Hash) -> Result { - // TODO: this could be implemented more efficiently - let reader = self.read(hash).await?; - if reader.is_complete { - Ok(BlobStatus::Complete { size: reader.size }) - } else { - Ok(BlobStatus::Partial { size: reader.size }) - } - } - fn tags_client(&self) -> tags::Client { tags::Client { rpc: self.rpc.clone(), @@ -447,9 +471,10 @@ impl SimpleStore for Client { } /// Whether to wrap the added data in a collection. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub enum WrapOption { /// Do not wrap the file or directory. + #[default] NoWrap, /// Wrap the file or directory in a collection. Wrap { @@ -459,12 +484,14 @@ pub enum WrapOption { } /// Status information about a blob. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum BlobStatus { + /// The blob is not stored at all. + NotFound, /// The blob is only stored partially. Partial { /// The size of the currently stored partial blob. - size: u64, + size: BaoBlobSize, }, /// The blob is stored completely. Complete { @@ -941,7 +968,6 @@ pub enum DownloadMode { mod tests { use super::*; - use anyhow::Context as _; use rand::RngCore; use tokio::io::AsyncWriteExt; diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs new file mode 100644 index 0000000000..58dd4c1e2b --- /dev/null +++ b/iroh/src/client/blobs/batch.rs @@ -0,0 +1,458 @@ +use std::{ + io, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; +use futures_buffered::BufferedStreamExt; +use futures_lite::StreamExt; +use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream}; +use iroh_blobs::{ + format::collection::Collection, + provider::BatchAddPathProgress, + store::ImportMode, + util::{SetTagOption, TagDrop}, + BlobFormat, HashAndFormat, Tag, TempTag, +}; +use quic_rpc::client::UpdateSink; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; +use tracing::{debug, warn}; + +use crate::{ + client::{RpcClient, RpcConnection, RpcService}, + rpc_protocol::{ + blobs::{ + BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchId, BatchUpdate, + }, + tags, + }, +}; + +use super::WrapOption; + +/// A scope in which blobs can be added. +#[derive(derive_more::Debug)] +struct BatchInner { + /// The id of the scope. + batch: BatchId, + /// The rpc client. + rpc: RpcClient, + /// The stream to send drop + #[debug(skip)] + updates: Mutex, BatchUpdate>>, +} + +/// A batch for write operations. +/// +/// This serves mostly as a scope for temporary tags. +/// +/// It is not a transaction, so things in a batch are not atomic. Also, there is +/// no isolation between batches. +#[derive(derive_more::Debug)] +pub struct Batch(Arc); + +impl TagDrop for BatchInner { + fn on_drop(&self, content: &HashAndFormat) { + let mut updates = self.updates.lock().unwrap(); + // make a spirited attempt to notify the server that we are dropping the content + // + // this will occasionally fail, but that's acceptable. The temp tags for the batch + // will be cleaned up as soon as the entire batch is dropped. + // + // E.g. a typical scenario is that you create a large array of temp tags, and then + // store them in a hash sequence and then drop the array. You will get many drops + // at the same time, and might get a send failure here. + // + // But that just means that the server will clean up the temp tags when the batch is + // dropped. + updates.feed(BatchUpdate::Drop(*content)).now_or_never(); + updates.flush().now_or_never(); + } +} + +/// Options for adding a file as a blob +#[derive(Debug, Clone, Copy, Default)] +pub struct AddFileOpts { + /// The import mode + pub import_mode: ImportMode, + /// The format of the blob + pub format: BlobFormat, +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddDirOpts { + /// The import mode + pub import_mode: ImportMode, + /// Whether to preserve the directory name + pub wrap: WrapOption, + /// Io parallelism + pub io_parallelism: usize, +} + +impl Default for AddDirOpts { + fn default() -> Self { + Self { + import_mode: ImportMode::TryReference, + wrap: WrapOption::NoWrap, + io_parallelism: 4, + } + } +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddReaderOpts { + /// The format of the blob + pub format: BlobFormat, + /// Size of the chunks to send + pub chunk_size: usize, +} + +impl Default for AddReaderOpts { + fn default() -> Self { + Self { + format: BlobFormat::Raw, + chunk_size: 1024 * 64, + } + } +} + +impl Batch { + pub(super) fn new( + batch: BatchId, + rpc: RpcClient, + updates: UpdateSink, + buffer_size: usize, + ) -> Self { + let updates = updates.buffer(buffer_size); + Self(Arc::new(BatchInner { + batch, + rpc, + updates: updates.into(), + })) + } + + /// Write a blob by passing bytes. + pub async fn add_bytes(&self, bytes: impl Into) -> Result { + self.add_bytes_with_opts(bytes, Default::default()).await + } + + /// Import a blob from a filesystem path, using the default options. + /// + /// For more control, use [`Self::add_file_with_opts`]. + pub async fn add_file(&self, path: PathBuf) -> Result<(TempTag, u64)> { + self.add_file_with_opts(path, AddFileOpts::default()).await + } + + /// Add a directory as a hashseq in iroh collection format + pub async fn add_dir(&self, root: PathBuf) -> Result { + self.add_dir_with_opts(root, Default::default()).await + } + + /// Write a blob by passing an async reader. + /// + /// This will use a default chunk size of 64KB, and a format of [BlobFormat::Raw]. + pub async fn add_reader( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + ) -> anyhow::Result { + self.add_reader_with_opts(reader, Default::default()).await + } + + /// Write a blob by passing a stream of bytes. + pub async fn add_stream( + &self, + input: impl Stream> + Send + Unpin + 'static, + ) -> Result { + self.add_stream_with_opts(input, Default::default()).await + } + + /// Create a temp tag to protect some content (blob or hashseq) from being deleted. + /// + /// A typical use case is that you are downloading some data and want to protect it + /// from deletion while the download is ongoing, but don't want to protect it permanently + /// until the download is completed. + pub async fn temp_tag(&self, content: HashAndFormat) -> Result { + // Notify the server that we want one temp tag for the given content + self.0 + .rpc + .rpc(BatchCreateTempTagRequest { + batch: self.0.batch, + content, + }) + .await??; + // Only after success of the above call, we can create the corresponding local temp tag + Ok(self.local_temp_tag(content, None)) + } + + /// Write a blob by passing an async reader. + /// + /// This produces a stream from the reader with a hardcoded buffer size of 64KB. + pub async fn add_reader_with_opts( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + opts: AddReaderOpts, + ) -> anyhow::Result { + let AddReaderOpts { format, chunk_size } = opts; + let input = ReaderStream::with_capacity(reader, chunk_size); + self.add_stream_with_opts(input, format).await + } + + /// Write a blob by passing bytes. + pub async fn add_bytes_with_opts( + &self, + bytes: impl Into, + format: BlobFormat, + ) -> Result { + let input = futures_lite::stream::once(Ok(bytes.into())); + self.add_stream_with_opts(input, format).await + } + + /// Import a blob from a filesystem path. + /// + /// `path` should be an absolute path valid for the file system on which + /// the node runs, which refers to a file. + /// + /// If you use [ImportMode::TryReference], Iroh will assume that the data will not + /// change and will share it in place without copying to the Iroh data directory + /// if appropriate. However, for tiny files, Iroh will copy the data. + /// + /// If you use [ImportMode::Copy], Iroh will always copy the data. + /// + /// Will return a temp tag for the added blob, as well as the size of the file. + pub async fn add_file_with_opts( + &self, + path: PathBuf, + opts: AddFileOpts, + ) -> Result<(TempTag, u64)> { + let AddFileOpts { + import_mode, + format, + } = opts; + anyhow::ensure!( + path.is_absolute(), + "Path must be absolute, but got: {:?}", + path + ); + anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path); + let mut stream = self + .0 + .rpc + .server_streaming(BatchAddPathRequest { + path, + import_mode, + format, + batch: self.0.batch, + }) + .await?; + let mut res_hash = None; + let mut res_size = None; + while let Some(item) = stream.next().await { + match item?.0 { + BatchAddPathProgress::Abort(cause) => { + Err(cause)?; + } + BatchAddPathProgress::Done { hash } => { + res_hash = Some(hash); + } + BatchAddPathProgress::Found { size } => { + res_size = Some(size); + } + _ => {} + } + } + let hash = res_hash.context("Missing hash")?; + let size = res_size.context("Missing size")?; + Ok(( + self.local_temp_tag(HashAndFormat { hash, format }, Some(size)), + size, + )) + } + + /// Add a directory as a hashseq in iroh collection format + /// + /// This can also be used to add a single file as a collection, if + /// wrap is set to [WrapOption::Wrap]. + /// + /// However, if you want to add a single file as a raw blob, use add_file instead. + pub async fn add_dir_with_opts(&self, root: PathBuf, opts: AddDirOpts) -> Result { + let AddDirOpts { + import_mode, + wrap, + io_parallelism, + } = opts; + anyhow::ensure!(root.is_absolute(), "Path must be absolute"); + + // let (send, recv) = flume::bounded(32); + // let import_progress = FlumeProgressSender::new(send); + + // import all files below root recursively + let data_sources = crate::util::fs::scan_path(root, wrap)?; + let opts = AddFileOpts { + import_mode, + format: BlobFormat::Raw, + }; + let result: Vec<_> = futures_lite::stream::iter(data_sources) + .map(|source| { + // let import_progress = import_progress.clone(); + async move { + let name = source.name().to_string(); + let (tag, size) = self + .add_file_with_opts(source.path().to_owned(), opts) + .await?; + let hash = *tag.hash(); + anyhow::Ok((name, hash, size, tag)) + } + }) + .buffered_ordered(io_parallelism) + .try_collect() + .await?; + + // create a collection + let (collection, child_tags): (Collection, Vec<_>) = result + .into_iter() + .map(|(name, hash, _, tag)| ((name, hash), tag)) + .unzip(); + + let tag = self.add_collection(collection).await?; + drop(child_tags); + Ok(tag) + } + + /// Write a blob by passing a stream of bytes. + /// + /// For convenient interop with common sources of data, this function takes a stream of `io::Result`. + /// If you have raw bytes, you need to wrap them in `io::Result::Ok`. + pub async fn add_stream_with_opts( + &self, + mut input: impl Stream> + Send + Unpin + 'static, + format: BlobFormat, + ) -> Result { + let (mut sink, mut stream) = self + .0 + .rpc + .bidi(BatchAddStreamRequest { + batch: self.0.batch, + format, + }) + .await?; + let mut size = 0u64; + while let Some(item) = input.next().await { + match item { + Ok(chunk) => { + size += chunk.len() as u64; + sink.send(BatchAddStreamUpdate::Chunk(chunk)) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + } + Err(err) => { + warn!("Abort send, reason: failed to read from source stream: {err:?}"); + sink.send(BatchAddStreamUpdate::Abort) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + break; + } + } + } + // this is needed for the remote to notice that the stream is closed + drop(sink); + let mut res = None; + while let Some(item) = stream.next().await { + match item? { + BatchAddStreamResponse::Abort(cause) => { + Err(cause)?; + } + BatchAddStreamResponse::Result { hash } => { + res = Some(hash); + } + _ => {} + } + } + let hash = res.context("Missing answer")?; + Ok(self.local_temp_tag(HashAndFormat { hash, format }, Some(size))) + } + + /// Add a collection + /// + /// This is a convenience function that converts the collection into two blobs + /// (the metadata and the hash sequence) and adds them, returning a temp tag for + /// the hash sequence. + /// + /// Note that this does not guarantee that the data that the collection refers to + /// actually exists. It will just create 2 blobs, the metadata and the hash sequence + /// itself. + pub async fn add_collection(&self, collection: Collection) -> Result { + self.add_blob_seq(collection.to_blobs()).await + } + + /// Add a sequence of blobs, where the last is a hash sequence. + /// + /// It is a common pattern in iroh to have a hash sequence with one or more + /// blobs of metadata, and the remaining blobs being the actual data. E.g. + /// a collection is a hash sequence where the first child is the metadata. + pub async fn add_blob_seq(&self, iter: impl Iterator) -> Result { + let mut blobs = iter.peekable(); + // put the tags somewhere + let mut tags = vec![]; + loop { + let blob = blobs.next().context("Failed to get next blob")?; + if blobs.peek().is_none() { + return self.add_bytes_with_opts(blob, BlobFormat::HashSeq).await; + } else { + tags.push(self.add_bytes(blob).await?); + } + } + } + + /// Upgrade a temp tag to a persistent tag. + pub async fn upgrade(&self, tt: TempTag) -> Result { + let tag = self + .0 + .rpc + .rpc(tags::CreateRequest { + value: tt.hash_and_format(), + batch: Some(self.0.batch), + }) + .await??; + Ok(tag) + } + + /// Upgrade a temp tag to a persistent tag with a specific name. + pub async fn upgrade_to(&self, tt: TempTag, tag: Tag) -> Result<()> { + self.0 + .rpc + .rpc(tags::SetRequest { + name: tag, + value: Some(tt.hash_and_format()), + batch: Some(self.0.batch), + }) + .await??; + Ok(()) + } + + /// Upgrade a temp tag to a persistent tag with either a specific name or + /// an automatically generated name. + pub async fn upgrade_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result { + match opts { + SetTagOption::Auto => self.upgrade(tt).await, + SetTagOption::Named(tag) => { + self.upgrade_to(tt, tag.clone()).await?; + Ok(tag) + } + } + } + + /// Creates a temp tag for the given hash and format, without notifying the server. + /// + /// Caution: only do this for data for which you know the server side has created a temp tag. + fn local_temp_tag(&self, inner: HashAndFormat, _size: Option) -> TempTag { + let on_drop: Arc = self.0.clone(); + let on_drop = Some(Arc::downgrade(&on_drop)); + TempTag::new(inner, on_drop) + } +} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 182f1dcc50..2b4b0d5ca3 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -35,6 +35,7 @@ //! well, without going through [`client`](crate::client::Iroh)) //! //! To shut down the node, call [`Node::shutdown`]. +use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::{collections::BTreeSet, net::SocketAddr}; @@ -46,6 +47,7 @@ use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::util::local_pool::{LocalPool, LocalPoolHandle}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; +use iroh_blobs::{HashAndFormat, TempTag}; use iroh_gossip::dispatcher::GossipDispatcher; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; @@ -62,6 +64,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use crate::node::nodes_storage::store_node_addrs; use crate::node::{docs::DocsEngine, protocol::ProtocolMap}; +use crate::rpc_protocol::blobs::BatchId; mod builder; mod docs; @@ -117,9 +120,61 @@ struct NodeInner { client: crate::client::Iroh, downloader: Downloader, gossip_dispatcher: GossipDispatcher, + blob_batches: tokio::sync::Mutex, local_pool_handle: LocalPoolHandle, } +/// Keeps track of all the currently active batch operations of the blobs api. +#[derive(Debug, Default)] +struct BlobBatches { + /// Currently active batches + batches: BTreeMap, + /// Used to generate new batch ids. + max: u64, +} + +/// A single batch of blob operations +#[derive(Debug, Default)] +struct BlobBatch { + /// The tags in this batch. + tags: BTreeMap>, +} + +impl BlobBatches { + /// Create a new unique batch id. + fn create(&mut self) -> BatchId { + let id = self.max; + self.max += 1; + BatchId(id) + } + + /// Store a temp tag in a batch identified by a batch id. + fn store(&mut self, batch: BatchId, tt: TempTag) { + let entry = self.batches.entry(batch).or_default(); + entry.tags.entry(tt.hash_and_format()).or_default().push(tt); + } + + /// Remove a tag from a batch. + fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat) -> Result<()> { + if let Some(batch) = self.batches.get_mut(&batch) { + if let Some(tags) = batch.tags.get_mut(content) { + tags.pop(); + if tags.is_empty() { + batch.tags.remove(content); + } + return Ok(()); + } + } + // this can happen if we try to upgrade a tag from an expired batch + anyhow::bail!("tag not found in batch"); + } + + /// Remove an entire batch. + fn remove(&mut self, batch: BatchId) { + self.batches.remove(&batch); + } +} + /// In memory node. pub type MemNode = Node; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 4623261724..69ecfc8ce8 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -579,6 +579,7 @@ where gossip, gossip_dispatcher, local_pool_handle: lp.handle().clone(), + blob_batches: Default::default(), }); let protocol_builder = ProtocolBuilder { diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 0796a0d86e..41b0641a56 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; +use futures_util::FutureExt; use genawaiter::sync::{Co, Gen}; use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; @@ -13,17 +14,18 @@ use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; +use iroh_blobs::provider::BatchAddPathProgress; use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry}; use iroh_blobs::util::local_pool::LocalPoolHandle; use iroh_blobs::util::progress::ProgressSender; use iroh_blobs::util::SetTagOption; -use iroh_blobs::BlobFormat; use iroh_blobs::{ provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, util::progress::FlumeProgressSender, HashAndFormat, }; +use iroh_blobs::{BlobFormat, Tag}; use iroh_io::AsyncSliceReader; use iroh_net::relay::RelayUrl; use iroh_net::{Endpoint, NodeAddr, NodeId}; @@ -32,12 +34,18 @@ use tokio::task::JoinSet; use tokio_util::either::Either; use tracing::{debug, info, warn}; +use crate::client::blobs::BlobStatus; use crate::client::{ blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, tags::TagInfo, NodeStatus, }; use crate::node::{docs::DocsEngine, NodeInner}; +use crate::rpc_protocol::blobs::{ + BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, + BatchUpdate, BlobStatusRequest, BlobStatusResponse, +}; use crate::rpc_protocol::{ authors, blobs, blobs::{ @@ -186,6 +194,16 @@ impl Handler { ReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, AddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, AddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + BlobStatus(msg) => chan.rpc(msg, self, Self::blob_status).await, + BatchCreate(msg) => chan.bidi_streaming(msg, self, Self::batch_create).await, + BatchUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), + BatchAddStream(msg) => chan.bidi_streaming(msg, self, Self::batch_add_stream).await, + BatchAddStreamUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), + BatchAddPath(msg) => { + chan.server_streaming(msg, self, Self::batch_add_from_path) + .await + } + BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await, } } @@ -198,6 +216,8 @@ impl Handler { match msg { ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, + Create(msg) => chan.rpc(msg, self, Self::tags_create_tag).await, + Set(msg) => chan.rpc(msg, self, Self::tags_set_tag).await, } } @@ -433,6 +453,22 @@ impl Handler { self.inner.local_pool_handle.clone() } + async fn blob_status(self, msg: BlobStatusRequest) -> RpcResult { + let entry = self.inner.db.get(&msg.hash).await?; + Ok(BlobStatusResponse(match entry { + Some(entry) => { + if entry.is_complete() { + BlobStatus::Complete { + size: entry.size().value(), + } + } else { + BlobStatus::Partial { size: entry.size() } + } + } + None => BlobStatus::NotFound, + })) + } + async fn blob_list_impl(self, co: &Co>) -> io::Result<()> { use bao_tree::io::fsm::Outboard; @@ -906,6 +942,31 @@ impl Handler { } } + async fn tags_set_tag(self, msg: tags::SetRequest) -> RpcResult<()> { + if let Some(batch) = msg.batch { + if let Some(content) = msg.value.as_ref() { + self.inner + .blob_batches + .lock() + .await + .remove_one(batch, content)?; + } + } + self.inner.db.set_tag(msg.name, msg.value).await?; + Ok(()) + } + + async fn tags_create_tag(self, msg: tags::CreateRequest) -> RpcResult { + if let Some(batch) = msg.batch { + self.inner + .blob_batches + .lock() + .await + .remove_one(batch, &msg.value)?; + } + Ok(self.inner.db.create_tag(msg.value).await?) + } + fn node_watch(self, _: NodeWatchRequest) -> impl Stream { futures_lite::stream::unfold((), |()| async move { tokio::time::sleep(HEALTH_POLL_WAIT).await; @@ -918,6 +979,126 @@ impl Handler { }) } + #[allow(clippy::unused_async)] + async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { + let tag = self.inner.db.temp_tag(msg.content); + self.inner.blob_batches.lock().await.store(msg.batch, tag); + Ok(()) + } + + fn batch_add_stream( + self, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + let (tx, rx) = flume::bounded(32); + let this = self.clone(); + + self.local_pool_handle().spawn_detached(|| async move { + if let Err(err) = this.batch_add_stream0(msg, stream, tx.clone()).await { + tx.send_async(BatchAddStreamResponse::Abort(err.into())) + .await + .ok(); + } + }); + rx.into_stream() + } + + fn batch_add_from_path( + self, + msg: BatchAddPathRequest, + ) -> impl Stream { + // provide a little buffer so that we don't slow down the sender + let (tx, rx) = flume::bounded(32); + let tx2 = tx.clone(); + self.local_pool_handle().spawn_detached(|| async move { + if let Err(e) = self.batch_add_from_path0(msg, tx).await { + tx2.send_async(BatchAddPathProgress::Abort(e.into())) + .await + .ok(); + } + }); + rx.into_stream().map(BatchAddPathResponse) + } + + async fn batch_add_stream0( + self, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + progress: flume::Sender, + ) -> anyhow::Result<()> { + let progress = FlumeProgressSender::new(progress); + + let stream = stream.map(|item| match item { + BatchAddStreamUpdate::Chunk(chunk) => Ok(chunk), + BatchAddStreamUpdate::Abort => { + Err(io::Error::new(io::ErrorKind::Interrupted, "Remote abort")) + } + }); + + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddStreamResponse::OutboardProgress { offset }) + } + _ => None, + }); + let (temp_tag, _len) = self + .inner + .db + .import_stream(stream, msg.format, import_progress) + .await?; + let hash = temp_tag.inner().hash; + self.inner + .blob_batches + .lock() + .await + .store(msg.batch, temp_tag); + progress + .send(BatchAddStreamResponse::Result { hash }) + .await?; + Ok(()) + } + + async fn batch_add_from_path0( + self, + msg: BatchAddPathRequest, + progress: flume::Sender, + ) -> anyhow::Result<()> { + let progress = FlumeProgressSender::new(progress); + // convert import progress to provide progress + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }), + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddPathProgress::Progress { offset }) + } + ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }), + _ => None, + }); + let BatchAddPathRequest { + path: root, + import_mode, + format, + batch, + } = msg; + // Check that the path is absolute and exists. + anyhow::ensure!(root.is_absolute(), "path must be absolute"); + anyhow::ensure!( + root.exists(), + "trying to add missing path: {}", + root.display() + ); + let (tag, _) = self + .inner + .db + .import_file(root, import_mode, format, import_progress) + .await?; + let hash = *tag.hash(); + self.inner.blob_batches.lock().await.store(batch, tag); + + progress.send(BatchAddPathProgress::Done { hash }).await?; + Ok(()) + } + fn blob_add_stream( self, msg: AddStreamRequest, @@ -1051,6 +1232,36 @@ impl Handler { rx.into_stream() } + fn batch_create( + self, + _: BatchCreateRequest, + mut updates: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + async move { + let batch = self.inner.blob_batches.lock().await.create(); + tokio::spawn(async move { + while let Some(item) = updates.next().await { + match item { + BatchUpdate::Drop(content) => { + // this can not fail, since we keep the batch alive. + // therefore it is safe to ignore the result. + let _ = self + .inner + .blob_batches + .lock() + .await + .remove_one(batch, &content); + } + BatchUpdate::Ping => {} + } + } + self.inner.blob_batches.lock().await.remove(batch); + }); + BatchCreateResponse::Id(batch) + } + .into_stream() + } + fn node_connections( self, _: ConnectionsRequest, diff --git a/iroh/src/rpc_protocol/blobs.rs b/iroh/src/rpc_protocol/blobs.rs index a24523a066..477832d23f 100644 --- a/iroh/src/rpc_protocol/blobs.rs +++ b/iroh/src/rpc_protocol/blobs.rs @@ -1,23 +1,27 @@ use std::path::PathBuf; use bytes::Bytes; -use iroh_base::hash::Hash; use iroh_base::rpc::RpcResult; +use iroh_base::{hash::Hash, rpc::RpcError}; +use iroh_blobs::provider::BatchAddPathProgress; use iroh_blobs::{ export::ExportProgress, format::collection::Collection, get::db::DownloadProgress, provider::AddProgress, - store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + store::{ + BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode, + ValidateProgress, + }, util::SetTagOption, - BlobFormat, Tag, + BlobFormat, HashAndFormat, Tag, }; use iroh_net::NodeAddr; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; -use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; +use crate::client::blobs::{BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, WrapOption}; use super::RpcService; @@ -49,6 +53,19 @@ pub enum Request { Fsck(ConsistencyCheckRequest), #[rpc(response = RpcResult)] CreateCollection(CreateCollectionRequest), + #[rpc(response = RpcResult)] + BlobStatus(BlobStatusRequest), + + #[bidi_streaming(update = BatchUpdate, response = BatchCreateResponse)] + BatchCreate(BatchCreateRequest), + BatchUpdate(BatchUpdate), + #[bidi_streaming(update = BatchAddStreamUpdate, response = BatchAddStreamResponse)] + BatchAddStream(BatchAddStreamRequest), + BatchAddStreamUpdate(BatchAddStreamUpdate), + #[server_streaming(response = BatchAddPathResponse)] + BatchAddPath(BatchAddPathRequest), + #[rpc(response = RpcResult<()>)] + BatchCreateTempTag(BatchCreateTempTagRequest), } #[allow(missing_docs)] @@ -65,6 +82,10 @@ pub enum Response { Export(ExportResponse), Validate(ValidateProgress), CreateCollection(RpcResult), + BlobStatus(RpcResult), + BatchCreate(BatchCreateResponse), + BatchAddStream(BatchAddStreamResponse), + BatchAddPath(BatchAddPathResponse), } /// A request to the node to provide the data at the given path @@ -235,3 +256,88 @@ pub struct CreateCollectionResponse { /// The resulting tag. pub tag: Tag, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobStatusRequest { + /// The hash of the blob + pub hash: Hash, +} + +/// The response to a status request +#[derive(Debug, Serialize, Deserialize, derive_more::From, derive_more::Into)] +pub struct BlobStatusResponse(pub BlobStatus); + +/// Request to create a new scope for temp tags +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateRequest; + +/// Update to a temp tag scope +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchUpdate { + /// Drop of a remote temp tag + Drop(HashAndFormat), + /// Message to check that the connection is still alive + Ping, +} + +/// Response to a temp tag scope request +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchCreateResponse { + /// We got the id of the scope + Id(BatchId), +} + +/// Create a temp tag with a given hash and format +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateTempTagRequest { + /// Content to protect + pub content: HashAndFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddStreamRequest { + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub enum BatchAddStreamUpdate { + /// A chunk of stream data + Chunk(Bytes), + /// Abort the request due to an error on the client side + Abort, +} + +/// Wrapper around [`AddProgress`]. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddStreamResponse { + Abort(RpcError), + OutboardProgress { offset: u64 }, + Result { hash: Hash }, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathRequest { + /// The path to the data to provide. + pub path: PathBuf, + /// Add the data in place + pub import_mode: ImportMode, + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Response to a batch add path request +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathResponse(pub BatchAddPathProgress); + +#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] +pub struct BatchId(pub(crate) u64); diff --git a/iroh/src/rpc_protocol/tags.rs b/iroh/src/rpc_protocol/tags.rs index bd7f1a016e..9750a72bf6 100644 --- a/iroh/src/rpc_protocol/tags.rs +++ b/iroh/src/rpc_protocol/tags.rs @@ -1,18 +1,22 @@ use iroh_base::rpc::RpcResult; -use iroh_blobs::Tag; +use iroh_blobs::{HashAndFormat, Tag}; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use crate::client::tags::TagInfo; -use super::RpcService; +use super::{blobs::BatchId, RpcService}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] #[enum_conversions(super::Request)] #[rpc_requests(RpcService)] pub enum Request { + #[rpc(response = RpcResult)] + Create(CreateRequest), + #[rpc(response = RpcResult<()>)] + Set(SetRequest), #[rpc(response = RpcResult<()>)] DeleteTag(DeleteRequest), #[server_streaming(response = TagInfo)] @@ -23,10 +27,31 @@ pub enum Request { #[derive(strum::Display, Debug, Serialize, Deserialize)] #[enum_conversions(super::Response)] pub enum Response { + Create(RpcResult), ListTags(TagInfo), DeleteTag(RpcResult<()>), } +/// Create a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateRequest { + /// Value of the tag + pub value: HashAndFormat, + /// Batch to use, none for global + pub batch: Option, +} + +/// Set or delete a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct SetRequest { + /// Name of the tag + pub name: Tag, + /// Value of the tag, None to delete + pub value: Option, + /// Batch to use, none for global + pub batch: Option, +} + /// List all collections /// /// Lists all collections that have been explicitly added to the database. diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs new file mode 100644 index 0000000000..aaf9fbe542 --- /dev/null +++ b/iroh/tests/batch.rs @@ -0,0 +1,239 @@ +use std::{io, time::Duration}; + +use bao_tree::blake3; +use bytes::Bytes; +use iroh::{ + client::blobs::{AddDirOpts, WrapOption}, + node::GcPolicy, +}; +use iroh_blobs::store::mem::Store; + +async fn create_node() -> anyhow::Result> { + iroh::node::Node::memory() + .gc_policy(GcPolicy::Interval(Duration::from_millis(10))) + .spawn() + .await +} + +async fn wait_for_gc() { + // wait for multiple gc cycles to ensure that the data is actually gone + tokio::time::sleep(Duration::from_millis(50)).await; +} + +/// Test that add_bytes adds the right data +#[tokio::test] +async fn add_bytes() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(data).into()); + assert_eq!(actual.as_ref(), data); + Ok(()) +} + +/// Test that add_bytes adds the right data +#[tokio::test] +async fn add_stream() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let data_stream = futures_lite::stream::iter([io::Result::Ok(Bytes::copy_from_slice(data))]); + let tag = batch.add_stream(data_stream).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(data).into()); + assert_eq!(actual.as_ref(), data); + Ok(()) +} + +/// Test that add_file adds the right data +#[tokio::test] +async fn add_file() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let temp_path = dir.path().join("test"); + std::fs::write(&temp_path, b"test")?; + let (tag, _) = batch.add_file(temp_path).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(b"test").into()); + assert_eq!(actual.as_ref(), b"test"); + Ok(()) +} + +/// Tests that add_dir adds the right data +#[tokio::test] +async fn add_dir() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; + for (name, content) in &data { + let temp_path = dir.path().join(name); + std::fs::write(&temp_path, content)?; + } + let tag = batch.add_dir(dir.path().to_owned()).await?; + assert!(client.has(*tag.hash()).await?); + for (_, content) in &data { + let hash = blake3::hash(content).into(); + let data = client.read_to_bytes(hash).await?; + assert_eq!(data.as_ref(), *content); + } + Ok(()) +} + +/// Tests that add_dir adds the right data +#[tokio::test] +async fn add_dir_single_file() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let temp_path = dir.path().join("test"); + let data: &[u8] = b"test"; + std::fs::write(&temp_path, data)?; + let tag = batch + .add_dir_with_opts( + temp_path, + AddDirOpts { + wrap: WrapOption::Wrap { name: None }, + ..Default::default() + }, + ) + .await?; + assert!(client.read_to_bytes(*tag.hash()).await.is_ok()); + let hash = blake3::hash(data).into(); + let actual_data = client.read_to_bytes(hash).await?; + assert_eq!(actual_data.as_ref(), data); + Ok(()) +} + +#[tokio::test] +async fn batch_drop() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(batch); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// This checks that dropping a tag makes the data eligible for garbage collection. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn tag_drop_raw() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// Tests that data is preserved if a second temp tag is created for it +/// before the first temp tag is dropped. +#[tokio::test] +async fn temp_tag_copy() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + // Create an additional temp tag for the same data + let tag2 = batch.temp_tag(tag.hash_and_format()).await?; + drop(tag); + // Check that the data is still present + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(tag2); + // Check that the data is gone since both temp tags are dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// Tests that temp tags work properly for hash sequences, using add_dir +/// to add the data. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn tag_drop_hashseq() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; + for (name, content) in &data { + let temp_path = dir.path().join(name); + std::fs::write(&temp_path, content)?; + } + let tag = batch.add_dir(dir.path().to_owned()).await?; + let hash = *tag.hash(); + // weird signature to avoid async move issues + let check_present = |present: &'static bool| async { + assert!(client.has(hash).await? == *present); + for (_, content) in &data { + let hash = blake3::hash(content).into(); + assert!(client.has(hash).await? == *present); + } + anyhow::Ok(()) + }; + // Check that the store has the data immediately after adding it + check_present(&true).await?; + // Check that it is protected from gc + wait_for_gc().await; + check_present(&true).await?; + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + check_present(&false).await?; + Ok(()) +} + +/// This checks that dropping a tag makes the data eligible for garbage collection. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn wrong_batch() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + drop(batch); + let batch = client.batch().await?; + assert!(batch.upgrade(tag).await.is_err()); + Ok(()) +} From 481370afdf59e7ac3f39c50ad6c1069b620d5967 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 25 Jul 2024 13:02:16 +0300 Subject: [PATCH 02/15] Make sure the required structs are publicly exported also fix docs --- iroh/src/client/blobs.rs | 3 +-- iroh/src/lib.rs | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 9427308202..64b4c15cab 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -54,7 +54,6 @@ use std::{ }; use anyhow::{anyhow, Context as _, Result}; -use batch::Batch; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; @@ -77,7 +76,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; mod batch; -pub use batch::AddDirOpts; +pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch}; use crate::rpc_protocol::blobs::{ AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse, diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index d5140f1849..3e0a5c3ad6 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -77,10 +77,10 @@ //! ## Reexports //! //! The iroh crate re-exports the following crates: -//! - [iroh_base](iroh_base) as [`base`] -//! - [iroh_blobs](iroh_blobs) as [`blobs`] -//! - [iroh_docs](iroh_docs) as [`docs`] -//! - [iroh_net](iroh_net) as [`net`] +//! - [iroh_base] as [`base`] +//! - [iroh_blobs] as [`blobs`] +//! - [iroh_docs] as [`docs`] +//! - [iroh_net] as [`net`] //! //! ## Feature Flags //! From d5c31da6ea0d4969d1c2e52278af69fea6139c27 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 25 Jul 2024 15:28:14 +0300 Subject: [PATCH 03/15] Add optional (default on) sync to all tag operations --- iroh-blobs/src/store/fs.rs | 11 ++++------- iroh-blobs/src/store/mem.rs | 4 ++++ iroh-blobs/src/store/readonly_mem.rs | 4 ++++ iroh-blobs/src/store/traits.rs | 3 +++ iroh/src/client/blobs/batch.rs | 4 +++- iroh/src/node/rpc.rs | 20 ++++++++++++++------ iroh/src/rpc_protocol/tags.rs | 14 ++++++++++++++ 7 files changed, 46 insertions(+), 14 deletions(-) diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index e9e113a603..46d68e6171 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -758,13 +758,6 @@ impl Store { Ok(self.0.dump().await?) } - /// Ensure that all operations before the sync are processed and persisted. - /// - /// This is done by closing any open write transaction. - pub async fn sync(&self) -> io::Result<()> { - Ok(self.0.sync().await?) - } - /// Import from a v0 or v1 flat store, for backwards compatibility. pub async fn import_flat_store(&self, paths: FlatStorePaths) -> io::Result { Ok(self.0.import_flat_store(paths).await?) @@ -1424,6 +1417,10 @@ impl super::Store for Store { self.0.temp.temp_tag(value) } + async fn sync(&self) -> io::Result<()> { + Ok(self.0.sync().await?) + } + async fn shutdown(&self) { self.0.shutdown().await; } diff --git a/iroh-blobs/src/store/mem.rs b/iroh-blobs/src/store/mem.rs index e10849e2b7..6f43b35b0d 100644 --- a/iroh-blobs/src/store/mem.rs +++ b/iroh-blobs/src/store/mem.rs @@ -237,6 +237,10 @@ impl super::Store for Store { } async fn shutdown(&self) {} + + async fn sync(&self) -> io::Result<()> { + Ok(()) + } } #[derive(Debug, Default)] diff --git a/iroh-blobs/src/store/readonly_mem.rs b/iroh-blobs/src/store/readonly_mem.rs index 4b77698313..059a7fe01f 100644 --- a/iroh-blobs/src/store/readonly_mem.rs +++ b/iroh-blobs/src/store/readonly_mem.rs @@ -333,4 +333,8 @@ impl super::Store for Store { } async fn shutdown(&self) {} + + async fn sync(&self) -> io::Result<()> { + Ok(()) + } } diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 487b22b78d..901546a93e 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -400,6 +400,9 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { /// Shutdown the store. fn shutdown(&self) -> impl Future + Send; + /// Sync the store. + fn sync(&self) -> impl Future> + Send; + /// Validate the database /// /// This will check that the file and outboard content is correct for all complete diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 58dd4c1e2b..5a0eb0086e 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -28,7 +28,7 @@ use crate::{ BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchId, BatchUpdate, }, - tags, + tags::{self, SyncMode}, }, }; @@ -417,6 +417,7 @@ impl Batch { .rpc(tags::CreateRequest { value: tt.hash_and_format(), batch: Some(self.0.batch), + sync: SyncMode::Full, }) .await??; Ok(tag) @@ -430,6 +431,7 @@ impl Batch { name: tag, value: Some(tt.hash_and_format()), batch: Some(self.0.batch), + sync: SyncMode::Full, }) .await??; Ok(()) diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 41b0641a56..e37488f9e6 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -46,6 +46,7 @@ use crate::rpc_protocol::blobs::{ BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, BatchUpdate, BlobStatusRequest, BlobStatusResponse, }; +use crate::rpc_protocol::tags::SyncMode; use crate::rpc_protocol::{ authors, blobs, blobs::{ @@ -216,8 +217,8 @@ impl Handler { match msg { ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, - Create(msg) => chan.rpc(msg, self, Self::tags_create_tag).await, - Set(msg) => chan.rpc(msg, self, Self::tags_set_tag).await, + Create(msg) => chan.rpc(msg, self, Self::tags_create).await, + Set(msg) => chan.rpc(msg, self, Self::tags_set).await, } } @@ -942,7 +943,11 @@ impl Handler { } } - async fn tags_set_tag(self, msg: tags::SetRequest) -> RpcResult<()> { + async fn tags_set(self, msg: tags::SetRequest) -> RpcResult<()> { + self.inner.db.set_tag(msg.name, msg.value).await?; + if let SyncMode::Full = msg.sync { + self.inner.db.sync().await?; + } if let Some(batch) = msg.batch { if let Some(content) = msg.value.as_ref() { self.inner @@ -952,11 +957,14 @@ impl Handler { .remove_one(batch, content)?; } } - self.inner.db.set_tag(msg.name, msg.value).await?; Ok(()) } - async fn tags_create_tag(self, msg: tags::CreateRequest) -> RpcResult { + async fn tags_create(self, msg: tags::CreateRequest) -> RpcResult { + let tag = self.inner.db.create_tag(msg.value).await?; + if let SyncMode::Full = msg.sync { + self.inner.db.sync().await?; + } if let Some(batch) = msg.batch { self.inner .blob_batches @@ -964,7 +972,7 @@ impl Handler { .await .remove_one(batch, &msg.value)?; } - Ok(self.inner.db.create_tag(msg.value).await?) + Ok(tag) } fn node_watch(self, _: NodeWatchRequest) -> impl Stream { diff --git a/iroh/src/rpc_protocol/tags.rs b/iroh/src/rpc_protocol/tags.rs index 9750a72bf6..62124523e4 100644 --- a/iroh/src/rpc_protocol/tags.rs +++ b/iroh/src/rpc_protocol/tags.rs @@ -32,6 +32,16 @@ pub enum Response { DeleteTag(RpcResult<()>), } +/// Determine how to sync the db after a modification operation +#[derive(Debug, Serialize, Deserialize, Default)] +pub enum SyncMode { + /// Fully sync the db + #[default] + Full, + /// Do not sync the db + None, +} + /// Create a tag #[derive(Debug, Serialize, Deserialize)] pub struct CreateRequest { @@ -39,6 +49,8 @@ pub struct CreateRequest { pub value: HashAndFormat, /// Batch to use, none for global pub batch: Option, + /// Sync mode + pub sync: SyncMode, } /// Set or delete a tag @@ -50,6 +62,8 @@ pub struct SetRequest { pub value: Option, /// Batch to use, none for global pub batch: Option, + /// Sync mode + pub sync: SyncMode, } /// List all collections From 6e945389acd741a62b74f42a4dc6167918ce8d05 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 14 Aug 2024 17:44:39 +0300 Subject: [PATCH 04/15] rename upgrade to persist --- iroh/src/client/blobs.rs | 1 - iroh/src/client/blobs/batch.rs | 8 ++++---- iroh/tests/batch.rs | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 8d9c358979..1b12df08b3 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -969,7 +969,6 @@ pub enum DownloadMode { mod tests { use super::*; - use anyhow::Context as _; use iroh_blobs::hashseq::HashSeq; use iroh_net::NodeId; use rand::RngCore; diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 5a0eb0086e..eb8df9938c 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -410,7 +410,7 @@ impl Batch { } /// Upgrade a temp tag to a persistent tag. - pub async fn upgrade(&self, tt: TempTag) -> Result { + pub async fn persist(&self, tt: TempTag) -> Result { let tag = self .0 .rpc @@ -424,7 +424,7 @@ impl Batch { } /// Upgrade a temp tag to a persistent tag with a specific name. - pub async fn upgrade_to(&self, tt: TempTag, tag: Tag) -> Result<()> { + pub async fn persist_to(&self, tt: TempTag, tag: Tag) -> Result<()> { self.0 .rpc .rpc(tags::SetRequest { @@ -441,9 +441,9 @@ impl Batch { /// an automatically generated name. pub async fn upgrade_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result { match opts { - SetTagOption::Auto => self.upgrade(tt).await, + SetTagOption::Auto => self.persist(tt).await, SetTagOption::Named(tag) => { - self.upgrade_to(tt, tag.clone()).await?; + self.persist_to(tt, tag.clone()).await?; Ok(tag) } } diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs index aaf9fbe542..8b852b03c7 100644 --- a/iroh/tests/batch.rs +++ b/iroh/tests/batch.rs @@ -234,6 +234,6 @@ async fn wrong_batch() -> anyhow::Result<()> { let tag = batch.add_bytes(data).await?; drop(batch); let batch = client.batch().await?; - assert!(batch.upgrade(tag).await.is_err()); + assert!(batch.persist(tag).await.is_err()); Ok(()) } From 16da1b27b6577db67e01500230451071af4f6499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:02:23 +0300 Subject: [PATCH 05/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index eb8df9938c..4ee1048fde 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -172,11 +172,12 @@ impl Batch { self.add_stream_with_opts(input, Default::default()).await } - /// Create a temp tag to protect some content (blob or hashseq) from being deleted. + /// Creates a temp tag to protect some content (blob or hashseq) from being deleted. /// - /// A typical use case is that you are downloading some data and want to protect it - /// from deletion while the download is ongoing, but don't want to protect it permanently - /// until the download is completed. + /// This is a lower-level API. The other functions in [`Batch`] already create [`TempTag`]s automatically. + /// + /// [`TempTag`]s allow you to protect some data from deletion while a download is ongoing, + /// even if you don't want to protect it permanently. pub async fn temp_tag(&self, content: HashAndFormat) -> Result { // Notify the server that we want one temp tag for the given content self.0 From faafa8f36de4e500fc86d2f9cd6efbc7222f53d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:02:32 +0300 Subject: [PATCH 06/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 4ee1048fde..d160416f76 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -193,7 +193,7 @@ impl Batch { /// Write a blob by passing an async reader. /// - /// This produces a stream from the reader with a hardcoded buffer size of 64KB. + /// This consumes the stream in chunks using `opts.chunk_size`. A good default is 64KB. pub async fn add_reader_with_opts( &self, reader: impl AsyncRead + Unpin + Send + 'static, From cebafe36ade539cb40c49449b3b96860ebc6d325 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:02:43 +0300 Subject: [PATCH 07/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index d160416f76..79e2c2e8a9 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -156,7 +156,9 @@ impl Batch { /// Write a blob by passing an async reader. /// - /// This will use a default chunk size of 64KB, and a format of [BlobFormat::Raw]. + /// This will consume the stream in 64KB chunks, and use a format of [BlobFormat::Raw]. + /// + /// For more options, see [`add_dir_with_opts`]. pub async fn add_reader( &self, reader: impl AsyncRead + Unpin + Send + 'static, From 22d5f5930074af2c9dbaf6bfd6d112fcedb726d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:02:54 +0300 Subject: [PATCH 08/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 79e2c2e8a9..f502702305 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -221,11 +221,11 @@ impl Batch { /// `path` should be an absolute path valid for the file system on which /// the node runs, which refers to a file. /// - /// If you use [ImportMode::TryReference], Iroh will assume that the data will not + /// If you use [`ImportMode::TryReference`], Iroh will assume that the data will not /// change and will share it in place without copying to the Iroh data directory /// if appropriate. However, for tiny files, Iroh will copy the data. /// - /// If you use [ImportMode::Copy], Iroh will always copy the data. + /// If you use [`ImportMode::Copy`], Iroh will always copy the data. /// /// Will return a temp tag for the added blob, as well as the size of the file. pub async fn add_file_with_opts( From 1afb0f732edc55d35c54c9e14ef4dd69cb033bfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:03:03 +0300 Subject: [PATCH 09/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index f502702305..296e55d925 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -380,7 +380,7 @@ impl Batch { Ok(self.local_temp_tag(HashAndFormat { hash, format }, Some(size))) } - /// Add a collection + /// Add a collection. /// /// This is a convenience function that converts the collection into two blobs /// (the metadata and the hash sequence) and adds them, returning a temp tag for From 41560ee09ac02aa0e41088bd1e6262f7ece13864 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:03:11 +0300 Subject: [PATCH 10/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 296e55d925..d390344c25 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -412,7 +412,7 @@ impl Batch { } } - /// Upgrade a temp tag to a persistent tag. + /// Upgrades a temp tag to a persistent tag. pub async fn persist(&self, tt: TempTag) -> Result { let tag = self .0 From 65b5f8fa758715f5afe0fc592a7095140b4e9b0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:03:26 +0300 Subject: [PATCH 11/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index d390344c25..faa0280483 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -426,7 +426,7 @@ impl Batch { Ok(tag) } - /// Upgrade a temp tag to a persistent tag with a specific name. + /// Upgrades a temp tag to a persistent tag with a specific name. pub async fn persist_to(&self, tt: TempTag, tag: Tag) -> Result<()> { self.0 .rpc From 9e9b807e10e1fae0810e2dbfcf78057f0bb3b600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:03:32 +0300 Subject: [PATCH 12/15] Update iroh/src/client/blobs/batch.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/client/blobs/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index faa0280483..dfe7877174 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -440,7 +440,7 @@ impl Batch { Ok(()) } - /// Upgrade a temp tag to a persistent tag with either a specific name or + /// Upgrades a temp tag to a persistent tag with either a specific name or /// an automatically generated name. pub async fn upgrade_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result { match opts { From 724185f7b0f3fb8e2f97746d6f6e0e616aebd457 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 15 Aug 2024 11:03:43 +0300 Subject: [PATCH 13/15] Update iroh/src/node/rpc.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philipp Krüger --- iroh/src/node/rpc.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 0133420fdd..e6df2bf71c 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -990,7 +990,6 @@ impl Handler { }) } - #[allow(clippy::unused_async)] async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { let tag = self.inner.db.temp_tag(msg.content); self.inner.blob_batches.lock().await.store(msg.batch, tag); From 8ce97a3eff2088752ea6398e9cd85661167a51ad Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 15 Aug 2024 11:43:35 +0300 Subject: [PATCH 14/15] fix docs --- iroh/src/client/blobs/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index dfe7877174..d6e70d090d 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -158,7 +158,7 @@ impl Batch { /// /// This will consume the stream in 64KB chunks, and use a format of [BlobFormat::Raw]. /// - /// For more options, see [`add_dir_with_opts`]. + /// For more options, see [`Self::add_reader_with_opts`]. pub async fn add_reader( &self, reader: impl AsyncRead + Unpin + Send + 'static, From 483e731a499bc6e4b8e179217358b944bba23e8a Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 15 Aug 2024 12:03:03 +0300 Subject: [PATCH 15/15] use register_gc_done_cb for gc tests --- iroh/tests/batch.rs | 58 ++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs index 8b852b03c7..c3289d03e6 100644 --- a/iroh/tests/batch.rs +++ b/iroh/tests/batch.rs @@ -2,28 +2,36 @@ use std::{io, time::Duration}; use bao_tree::blake3; use bytes::Bytes; +use futures_lite::StreamExt; use iroh::{ client::blobs::{AddDirOpts, WrapOption}, node::GcPolicy, }; use iroh_blobs::store::mem::Store; -async fn create_node() -> anyhow::Result> { - iroh::node::Node::memory() +async fn create_node() -> anyhow::Result<(iroh::node::Node, async_channel::Receiver<()>)> { + let (gc_send, gc_recv) = async_channel::unbounded(); + let node = iroh::node::Node::memory() .gc_policy(GcPolicy::Interval(Duration::from_millis(10))) + .register_gc_done_cb(Box::new(move || { + gc_send.send_blocking(()).ok(); + })) .spawn() - .await + .await?; + Ok((node, gc_recv)) } -async fn wait_for_gc() { - // wait for multiple gc cycles to ensure that the data is actually gone - tokio::time::sleep(Duration::from_millis(50)).await; +async fn wait_for_gc(chan: &mut async_channel::Receiver<()>) { + let _ = chan.drain(); + for _ in 0..5 { + chan.recv().await.unwrap(); + } } /// Test that add_bytes adds the right data #[tokio::test] async fn add_bytes() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; @@ -38,7 +46,7 @@ async fn add_bytes() -> anyhow::Result<()> { /// Test that add_bytes adds the right data #[tokio::test] async fn add_stream() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; @@ -54,7 +62,7 @@ async fn add_stream() -> anyhow::Result<()> { /// Test that add_file adds the right data #[tokio::test] async fn add_file() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -71,7 +79,7 @@ async fn add_file() -> anyhow::Result<()> { /// Tests that add_dir adds the right data #[tokio::test] async fn add_dir() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -93,7 +101,7 @@ async fn add_dir() -> anyhow::Result<()> { /// Tests that add_dir adds the right data #[tokio::test] async fn add_dir_single_file() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -118,18 +126,18 @@ async fn add_dir_single_file() -> anyhow::Result<()> { #[tokio::test] async fn batch_drop() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; let tag = batch.add_bytes(data).await?; let hash = *tag.hash(); // Check that the store has the data and that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); drop(batch); // Check that the store drops the data when the temp tag gets dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(!client.has(hash).await?); Ok(()) } @@ -140,18 +148,18 @@ async fn batch_drop() -> anyhow::Result<()> { /// once the batch is dropped. #[tokio::test] async fn tag_drop_raw() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; let tag = batch.add_bytes(data).await?; let hash = *tag.hash(); // Check that the store has the data and that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); drop(tag); // Check that the store drops the data when the temp tag gets dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(!client.has(hash).await?); Ok(()) } @@ -160,24 +168,24 @@ async fn tag_drop_raw() -> anyhow::Result<()> { /// before the first temp tag is dropped. #[tokio::test] async fn temp_tag_copy() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test"; let tag = batch.add_bytes(data).await?; let hash = *tag.hash(); // Check that the store has the data and that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); // Create an additional temp tag for the same data let tag2 = batch.temp_tag(tag.hash_and_format()).await?; drop(tag); // Check that the data is still present - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(client.has(hash).await?); drop(tag2); // Check that the data is gone since both temp tags are dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; assert!(!client.has(hash).await?); Ok(()) } @@ -189,7 +197,7 @@ async fn temp_tag_copy() -> anyhow::Result<()> { /// once the batch is dropped. #[tokio::test] async fn tag_drop_hashseq() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, mut gc) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let dir = tempfile::tempdir()?; @@ -212,11 +220,11 @@ async fn tag_drop_hashseq() -> anyhow::Result<()> { // Check that the store has the data immediately after adding it check_present(&true).await?; // Check that it is protected from gc - wait_for_gc().await; + wait_for_gc(&mut gc).await; check_present(&true).await?; drop(tag); // Check that the store drops the data when the temp tag gets dropped - wait_for_gc().await; + wait_for_gc(&mut gc).await; check_present(&false).await?; Ok(()) } @@ -227,7 +235,7 @@ async fn tag_drop_hashseq() -> anyhow::Result<()> { /// once the batch is dropped. #[tokio::test] async fn wrong_batch() -> anyhow::Result<()> { - let node = create_node().await?; + let (node, _) = create_node().await?; let client = &node.client().blobs(); let batch = client.batch().await?; let data: &[u8] = b"test";