diff --git a/iroh-blobs/src/provider.rs b/iroh-blobs/src/provider.rs index 7fe4e13004..fd9ce37acf 100644 --- a/iroh-blobs/src/provider.rs +++ b/iroh-blobs/src/provider.rs @@ -153,6 +153,30 @@ pub enum AddProgress { Abort(RpcError), } +/// Progress updates for the batch add operation. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddPathProgress { + /// An item was found with the given size + Found { + /// The size of the entry in bytes. + size: u64, + }, + /// We got progress ingesting item `id`. + Progress { + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done with `id`, and the hash is `hash`. + Done { + /// The hash of the entry. + hash: Hash, + }, + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(RpcError), +} + /// Read the request from the getter. /// /// Will fail if there is an error while reading, if the reader diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index e9e113a603..6f76df5be6 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -1424,6 +1424,10 @@ impl super::Store for Store { self.0.temp.temp_tag(value) } + fn tag_drop(&self) -> Option<&dyn TagDrop> { + Some(self.0.temp.as_ref()) + } + async fn shutdown(&self) { self.0.shutdown().await; } diff --git a/iroh-blobs/src/store/mem.rs b/iroh-blobs/src/store/mem.rs index e10849e2b7..d98af09f04 100644 --- a/iroh-blobs/src/store/mem.rs +++ b/iroh-blobs/src/store/mem.rs @@ -222,6 +222,10 @@ impl super::Store for Store { self.inner.temp_tag(tag) } + fn tag_drop(&self) -> Option<&dyn TagDrop> { + Some(self.inner.as_ref()) + } + async fn gc_start(&self) -> io::Result<()> { Ok(()) } diff --git a/iroh-blobs/src/store/readonly_mem.rs b/iroh-blobs/src/store/readonly_mem.rs index 4b77698313..2ef0a2b89e 100644 --- a/iroh-blobs/src/store/readonly_mem.rs +++ b/iroh-blobs/src/store/readonly_mem.rs @@ -15,7 +15,7 @@ use crate::{ }, util::{ progress::{BoxedProgressSender, IdGenerator, ProgressSender}, - Tag, + Tag, TagDrop, }, BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE, }; @@ -324,6 +324,10 @@ impl super::Store for Store { TempTag::new(inner, None) } + fn tag_drop(&self) -> Option<&dyn TagDrop> { + None + } + async fn gc_start(&self) -> io::Result<()> { Ok(()) } diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 2a91d1c0f3..048b0b6285 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -19,7 +19,7 @@ use crate::{ protocol::RangeSpec, util::{ progress::{BoxedProgressSender, IdGenerator, ProgressSender}, - Tag, + Tag, TagDrop, }, BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE, }; @@ -356,6 +356,12 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { /// Create a temporary pin for this store fn temp_tag(&self, value: HashAndFormat) -> TempTag; + /// Handle to use to drop tags + /// + /// Return None for stores that don't keep track of tags, such as read-only + /// stores. + fn tag_drop(&self) -> Option<&dyn TagDrop>; + /// Notify the store that a new gc phase is about to start. /// /// This should not fail unless the store is shut down or otherwise in a @@ -700,7 +706,7 @@ pub enum ImportProgress { /// does not make any sense. E.g. an in memory implementation will always have /// to copy the file into memory. Also, a disk based implementation might choose /// to copy small files even if the mode is `Reference`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] pub enum ImportMode { /// This mode will copy the file into the database before hashing. /// diff --git a/iroh-blobs/src/util.rs b/iroh-blobs/src/util.rs index be43dfaaff..e9cbd85ea9 100644 --- a/iroh-blobs/src/util.rs +++ b/iroh-blobs/src/util.rs @@ -206,11 +206,17 @@ impl TempTag { self.inner.format } + /// The hash and format of the pinned item + pub fn hash_and_format(&self) -> HashAndFormat { + self.inner + } + /// Keep the item alive until the end of the process pub fn leak(mut self) { // set the liveness tracker to None, so that the refcount is not decreased // during drop. This means that the refcount will never reach 0 and the - // item will not be gced until the end of the process. + // item will not be gced until the end of the process, unless you manually + // invoke on_drop. self.on_drop = None; } } diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 6e4c0aeba2..416d2ec003 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -370,10 +370,15 @@ impl BlobCommands { let (blob_status, size) = match (status, format) { (BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size), - (BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size), + (BlobStatus::Partial { size }, BlobFormat::Raw) => { + ("incomplete blob", size.value()) + } (BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size), (BlobStatus::Partial { size }, BlobFormat::HashSeq) => { - ("incomplete collection", size) + ("incomplete collection", size.value()) + } + (BlobStatus::NotFound, _) => { + return Err(anyhow!("blob is missing")); } }; println!( diff --git a/iroh/src/client.rs b/iroh/src/client.rs index ce9647f5fd..d552379dca 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -29,8 +29,10 @@ pub mod tags; mod node; /// Iroh rpc client - boxed so that we can have a concrete type. -pub(crate) type RpcClient = - quic_rpc::RpcClient>; +pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection; + +/// Iroh rpc client - boxed so that we can have a concrete type. +pub(crate) type RpcClient = quic_rpc::RpcClient; /// Iroh client. #[derive(Debug, Clone)] diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index e199e0f8d1..9e2f8378d2 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -9,7 +9,8 @@ use std::{ task::{Context, Poll}, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context as _, Result}; +use batch::Batch; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; @@ -19,7 +20,7 @@ use iroh_blobs::{ export::ExportProgress as BytesExportProgress, format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, - store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, BlobFormat, Hash, Tag, }; use iroh_net::NodeAddr; @@ -30,12 +31,14 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; +mod batch; +pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts}; use crate::rpc_protocol::{ - BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, - BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, - BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption, + BatchCreateRequest, BatchCreateResponse, BlobAddPathRequest, BlobAddStreamRequest, + BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, + BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, + BlobReadAtResponse, BlobStatusRequest, BlobValidateRequest, NodeStatusRequest, SetTagOption, }; use super::{flatten, tags, Iroh, RpcClient}; @@ -54,6 +57,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient { } impl Client { + /// Check if a blob is completely stored on the node. + /// + /// Note that this will return false for blobs that are partially stored on + /// the node. + pub async fn status(&self, hash: Hash) -> Result { + let status = self.rpc.rpc(BlobStatusRequest { hash }).await??; + Ok(status.0) + } + + /// Check if a blob is completely stored on the node. + /// + /// This is just a convenience wrapper around `status` that returns a boolean. + pub async fn has(&self, hash: Hash) -> Result { + match self.status(hash).await { + Ok(BlobStatus::Complete { .. }) => Ok(true), + Ok(_) => Ok(false), + Err(err) => Err(err), + } + } + + /// Create a new batch for adding data. + /// + /// A batch is a context in which temp tags are created and data is added to the node. Temp tags + /// are automatically deleted when the batch is dropped, leading to the data being garbage collected + /// unless a permanent tag is created for it. + pub async fn batch(&self) -> Result { + let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; + let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??; + let rpc = self.rpc.clone(); + Ok(Batch::new(batch, rpc, updates, 1024)) + } + /// Stream the contents of a a single blob. /// /// Returns a [`Reader`], which can report the size of the blob before reading it. @@ -127,17 +162,19 @@ impl Client { pub async fn create_collection( &self, collection: Collection, - tag: SetTagOption, + opts: SetTagOption, tags_to_delete: Vec, ) -> anyhow::Result<(Hash, Tag)> { - let CreateCollectionResponse { hash, tag } = self - .rpc - .rpc(CreateCollectionRequest { - collection, - tag, - tags_to_delete, - }) - .await??; + let batch = self.batch().await?; + let temp_tag = batch.add_collection(collection).await?; + let hash = *temp_tag.hash(); + let tag = batch.upgrade_with_opts(temp_tag, opts).await?; + if !tags_to_delete.is_empty() { + let tags = self.tags_client(); + for tag in tags_to_delete { + tags.delete(tag).await?; + } + } Ok((hash, tag)) } @@ -372,17 +409,6 @@ impl Client { Ok(ticket) } - /// Get the status of a blob. - pub async fn status(&self, hash: Hash) -> Result { - // TODO: this could be implemented more efficiently - let reader = self.read(hash).await?; - if reader.is_complete { - Ok(BlobStatus::Complete { size: reader.size }) - } else { - Ok(BlobStatus::Partial { size: reader.size }) - } - } - fn tags_client(&self) -> tags::Client { tags::Client { rpc: self.rpc.clone(), @@ -397,9 +423,10 @@ impl SimpleStore for Client { } /// Whether to wrap the added data in a collection. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub enum WrapOption { /// Do not wrap the file or directory. + #[default] NoWrap, /// Wrap the file or directory in a collection. Wrap { @@ -408,21 +435,6 @@ pub enum WrapOption { }, } -/// Status information about a blob. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum BlobStatus { - /// The blob is only stored partially. - Partial { - /// The size of the currently stored partial blob. - size: u64, - }, - /// The blob is stored completely. - Complete { - /// The size of the blob. - size: u64, - }, -} - /// Outcome of a blob add operation. #[derive(Debug, Clone)] pub struct AddOutcome { @@ -887,11 +899,30 @@ pub enum DownloadMode { Queued, } +/// Status information about a blob. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum BlobStatus { + /// The blob is not stored on the node. + NotFound, + /// The blob is only stored partially. + Partial { + /// The size of the currently stored partial blob. + /// + /// This can be either a verified size if the last chunk was received, + /// or an unverified size if the last chunk was not yet received. + size: BaoBlobSize, + }, + /// The blob is stored completely. + Complete { + /// The size of the blob. For a complete blob the size is always known. + size: u64, + }, +} + #[cfg(test)] mod tests { use super::*; - use anyhow::Context as _; use rand::RngCore; use tokio::io::AsyncWriteExt; diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs new file mode 100644 index 0000000000..b754a9d880 --- /dev/null +++ b/iroh/src/client/blobs/batch.rs @@ -0,0 +1,455 @@ +use std::{ + io, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; +use futures_buffered::BufferedStreamExt; +use futures_lite::StreamExt; +use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream}; +use iroh_blobs::{ + format::collection::Collection, + provider::BatchAddPathProgress, + store::ImportMode, + util::{SetTagOption, TagDrop}, + BlobFormat, HashAndFormat, Tag, TempTag, +}; +use quic_rpc::client::UpdateSink; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; +use tracing::{debug, warn}; + +use crate::{ + client::{RpcClient, RpcConnection, RpcService}, + rpc_protocol::{ + BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, + BatchCreateTempTagRequest, BatchId, BatchUpdate, CreateTagRequest, SetTagRequest, + }, +}; + +use super::WrapOption; + +/// A scope in which blobs can be added. +#[derive(derive_more::Debug)] +struct BatchInner { + /// The id of the scope. + batch: BatchId, + /// The rpc client. + rpc: RpcClient, + /// The stream to send drop + #[debug(skip)] + updates: Mutex, BatchUpdate>>, +} + +/// A batch for write operations. +/// +/// This serves mostly as a scope for temporary tags. +/// +/// It is not a transaction, so things in a batch are not atomic. Also, there is +/// no isolation between batches. +#[derive(derive_more::Debug)] +pub struct Batch(Arc); + +impl TagDrop for BatchInner { + fn on_drop(&self, content: &HashAndFormat) { + let mut updates = self.updates.lock().unwrap(); + // make a spirited attempt to notify the server that we are dropping the content + // + // this will occasionally fail, but that's acceptable. The temp tags for the batch + // will be cleaned up as soon as the entire batch is dropped. + // + // E.g. a typical scenario is that you create a large array of temp tags, and then + // store them in a hash sequence and then drop the array. You will get many drops + // at the same time, and might get a send failure here. + // + // But that just means that the server will clean up the temp tags when the batch is + // dropped. + updates.feed(BatchUpdate::Drop(*content)).now_or_never(); + updates.flush().now_or_never(); + } +} + +/// Options for adding a file as a blob +#[derive(Debug, Clone, Copy, Default)] +pub struct AddFileOpts { + /// The import mode + pub import_mode: ImportMode, + /// The format of the blob + pub format: BlobFormat, +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddDirOpts { + /// The import mode + pub import_mode: ImportMode, + /// Whether to preserve the directory name + pub wrap: WrapOption, + /// Io parallelism + pub io_parallelism: usize, +} + +impl Default for AddDirOpts { + fn default() -> Self { + Self { + import_mode: ImportMode::TryReference, + wrap: WrapOption::NoWrap, + io_parallelism: 4, + } + } +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddReaderOpts { + /// The format of the blob + pub format: BlobFormat, + /// Size of the chunks to send + pub chunk_size: usize, +} + +impl Default for AddReaderOpts { + fn default() -> Self { + Self { + format: BlobFormat::Raw, + chunk_size: 1024 * 64, + } + } +} + +impl Batch { + pub(super) fn new( + batch: BatchId, + rpc: RpcClient, + updates: UpdateSink, + buffer_size: usize, + ) -> Self { + let updates = updates.buffer(buffer_size); + Self(Arc::new(BatchInner { + batch, + rpc, + updates: updates.into(), + })) + } + + /// Write a blob by passing bytes. + pub async fn add_bytes(&self, bytes: impl Into) -> Result { + self.add_bytes_with_opts(bytes, Default::default()).await + } + + /// Import a blob from a filesystem path, using the default options. + /// + /// For more control, use [`Self::add_file_with_opts`]. + pub async fn add_file(&self, path: PathBuf) -> Result<(TempTag, u64)> { + self.add_file_with_opts(path, AddFileOpts::default()).await + } + + /// Add a directory as a hashseq in iroh collection format + pub async fn add_dir(&self, root: PathBuf) -> Result { + self.add_dir_with_opts(root, Default::default()).await + } + + /// Write a blob by passing an async reader. + /// + /// This will use a default chunk size of 64KB, and a format of [BlobFormat::Raw]. + pub async fn add_reader( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + ) -> anyhow::Result { + self.add_reader_with_opts(reader, Default::default()).await + } + + /// Write a blob by passing a stream of bytes. + pub async fn add_stream( + &self, + input: impl Stream> + Send + Unpin + 'static, + ) -> Result { + self.add_stream_with_opts(input, Default::default()).await + } + + /// Create a temp tag to protect some content (blob or hashseq) from being deleted. + /// + /// A typical use case is that you are downloading some data and want to protect it + /// from deletion while the download is ongoing, but don't want to protect it permanently + /// until the download is completed. + pub async fn temp_tag(&self, content: HashAndFormat) -> Result { + // Notify the server that we want one temp tag for the given content + self.0 + .rpc + .rpc(BatchCreateTempTagRequest { + batch: self.0.batch, + content, + }) + .await??; + // Only after success of the above call, we can create the corresponding local temp tag + Ok(self.local_temp_tag(content, None)) + } + + /// Write a blob by passing an async reader. + /// + /// This produces a stream from the reader with a hardcoded buffer size of 64KB. + pub async fn add_reader_with_opts( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + opts: AddReaderOpts, + ) -> anyhow::Result { + let AddReaderOpts { format, chunk_size } = opts; + let input = ReaderStream::with_capacity(reader, chunk_size); + self.add_stream_with_opts(input, format).await + } + + /// Write a blob by passing bytes. + pub async fn add_bytes_with_opts( + &self, + bytes: impl Into, + format: BlobFormat, + ) -> Result { + let input = futures_lite::stream::once(Ok(bytes.into())); + self.add_stream_with_opts(input, format).await + } + + /// Import a blob from a filesystem path. + /// + /// `path` should be an absolute path valid for the file system on which + /// the node runs, which refers to a file. + /// + /// If you use [ImportMode::TryReference], Iroh will assume that the data will not + /// change and will share it in place without copying to the Iroh data directory + /// if appropriate. However, for tiny files, Iroh will copy the data. + /// + /// If you use [ImportMode::Copy], Iroh will always copy the data. + /// + /// Will return a temp tag for the added blob, as well as the size of the file. + pub async fn add_file_with_opts( + &self, + path: PathBuf, + opts: AddFileOpts, + ) -> Result<(TempTag, u64)> { + let AddFileOpts { + import_mode, + format, + } = opts; + anyhow::ensure!( + path.is_absolute(), + "Path must be absolute, but got: {:?}", + path + ); + anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path); + let mut stream = self + .0 + .rpc + .server_streaming(BatchAddPathRequest { + path, + import_mode, + format, + batch: self.0.batch, + }) + .await?; + let mut res_hash = None; + let mut res_size = None; + while let Some(item) = stream.next().await { + match item?.0 { + BatchAddPathProgress::Abort(cause) => { + Err(cause)?; + } + BatchAddPathProgress::Done { hash } => { + res_hash = Some(hash); + } + BatchAddPathProgress::Found { size } => { + res_size = Some(size); + } + _ => {} + } + } + let hash = res_hash.context("Missing hash")?; + let size = res_size.context("Missing size")?; + Ok(( + self.local_temp_tag(HashAndFormat { hash, format }, Some(size)), + size, + )) + } + + /// Add a directory as a hashseq in iroh collection format + /// + /// This can also be used to add a single file as a collection, if + /// wrap is set to [WrapOption::Wrap]. + /// + /// However, if you want to add a single file as a raw blob, use add_file instead. + pub async fn add_dir_with_opts(&self, root: PathBuf, opts: AddDirOpts) -> Result { + let AddDirOpts { + import_mode, + wrap, + io_parallelism, + } = opts; + anyhow::ensure!(root.is_absolute(), "Path must be absolute"); + + // let (send, recv) = flume::bounded(32); + // let import_progress = FlumeProgressSender::new(send); + + // import all files below root recursively + let data_sources = crate::util::fs::scan_path(root, wrap)?; + let opts = AddFileOpts { + import_mode, + format: BlobFormat::Raw, + }; + let result: Vec<_> = futures_lite::stream::iter(data_sources) + .map(|source| { + // let import_progress = import_progress.clone(); + async move { + let name = source.name().to_string(); + let (tag, size) = self + .add_file_with_opts(source.path().to_owned(), opts) + .await?; + let hash = *tag.hash(); + anyhow::Ok((name, hash, size, tag)) + } + }) + .buffered_ordered(io_parallelism) + .try_collect() + .await?; + + // create a collection + let (collection, child_tags): (Collection, Vec<_>) = result + .into_iter() + .map(|(name, hash, _, tag)| ((name, hash), tag)) + .unzip(); + + let tag = self.add_collection(collection).await?; + drop(child_tags); + Ok(tag) + } + + /// Write a blob by passing a stream of bytes. + /// + /// For convenient interop with common sources of data, this function takes a stream of `io::Result`. + /// If you have raw bytes, you need to wrap them in `io::Result::Ok`. + pub async fn add_stream_with_opts( + &self, + mut input: impl Stream> + Send + Unpin + 'static, + format: BlobFormat, + ) -> Result { + let (mut sink, mut stream) = self + .0 + .rpc + .bidi(BatchAddStreamRequest { + batch: self.0.batch, + format, + }) + .await?; + let mut size = 0u64; + while let Some(item) = input.next().await { + match item { + Ok(chunk) => { + size += chunk.len() as u64; + sink.send(BatchAddStreamUpdate::Chunk(chunk)) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + } + Err(err) => { + warn!("Abort send, reason: failed to read from source stream: {err:?}"); + sink.send(BatchAddStreamUpdate::Abort) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + break; + } + } + } + // this is needed for the remote to notice that the stream is closed + drop(sink); + let mut res = None; + while let Some(item) = stream.next().await { + match item? { + BatchAddStreamResponse::Abort(cause) => { + Err(cause)?; + } + BatchAddStreamResponse::Result { hash } => { + res = Some(hash); + } + _ => {} + } + } + let hash = res.context("Missing answer")?; + Ok(self.local_temp_tag(HashAndFormat { hash, format }, Some(size))) + } + + /// Add a collection + /// + /// This is a convenience function that converts the collection into two blobs + /// (the metadata and the hash sequence) and adds them, returning a temp tag for + /// the hash sequence. + /// + /// Note that this does not guarantee that the data that the collection refers to + /// actually exists. It will just create 2 blobs, the metadata and the hash sequence + /// itself. + pub async fn add_collection(&self, collection: Collection) -> Result { + self.add_blob_seq(collection.to_blobs()).await + } + + /// Add a sequence of blobs, where the last is a hash sequence. + /// + /// It is a common pattern in iroh to have a hash sequence with one or more + /// blobs of metadata, and the remaining blobs being the actual data. E.g. + /// a collection is a hash sequence where the first child is the metadata. + pub async fn add_blob_seq(&self, iter: impl Iterator) -> Result { + let mut blobs = iter.peekable(); + // put the tags somewhere + let mut tags = vec![]; + loop { + let blob = blobs.next().context("Failed to get next blob")?; + if blobs.peek().is_none() { + return self.add_bytes_with_opts(blob, BlobFormat::HashSeq).await; + } else { + tags.push(self.add_bytes(blob).await?); + } + } + } + + /// Upgrade a temp tag to a persistent tag. + pub async fn upgrade(&self, tt: TempTag) -> Result { + let tag = self + .0 + .rpc + .rpc(CreateTagRequest { + value: tt.hash_and_format(), + batch: Some(self.0.batch), + }) + .await??; + Ok(tag) + } + + /// Upgrade a temp tag to a persistent tag with a specific name. + pub async fn upgrade_to(&self, tt: TempTag, tag: Tag) -> Result<()> { + self.0 + .rpc + .rpc(SetTagRequest { + name: tag, + value: Some(tt.hash_and_format()), + batch: Some(self.0.batch), + }) + .await??; + Ok(()) + } + + /// Upgrade a temp tag to a persistent tag with either a specific name or + /// an automatically generated name. + pub async fn upgrade_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result { + match opts { + SetTagOption::Auto => self.upgrade(tt).await, + SetTagOption::Named(tag) => { + self.upgrade_to(tt, tag.clone()).await?; + Ok(tag) + } + } + } + + /// Creates a temp tag for the given hash and format, without notifying the server. + /// + /// Caution: only do this for data for which you know the server side has created a temp tag. + fn local_temp_tag(&self, inner: HashAndFormat, _size: Option) -> TempTag { + let on_drop: Arc = self.0.clone(); + let on_drop = Some(Arc::downgrade(&on_drop)); + TempTag::new(inner, on_drop) + } +} diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 06ecb08f4a..279426069b 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -10,7 +10,7 @@ use std::{ use anyhow::{bail, Context}; use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection}; -use super::Iroh; +use super::{Iroh, RpcClient}; use crate::{ node::RpcStatus, rpc_protocol::{NodeStatusRequest, RpcService}, @@ -20,9 +20,6 @@ use crate::{ // TODO: Change to "/iroh-rpc/1" pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; -/// RPC client to an iroh node running in a separate process. -pub type RpcClient = quic_rpc::RpcClient>; - impl Iroh { /// Connect to an iroh node running on the same computer, but in a different process. pub async fn connect_path(root: impl AsRef) -> anyhow::Result { diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index 66166a396c..df384628a1 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -2,12 +2,12 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; -use iroh_blobs::{BlobFormat, Hash, Tag}; +use iroh_blobs::{BlobFormat, Hash, HashAndFormat, Tag}; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use super::RpcClient; -use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest}; +use crate::rpc_protocol::{CreateTagRequest, ListTagsRequest, SetTagRequest}; /// Iroh tags client. #[derive(Debug, Clone, RefCast)] @@ -32,9 +32,41 @@ impl Client { Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } + /// Create a tag, where the name is automatically generated. + /// + /// Use this method if you want a new tag with a unique name. + pub async fn create(&self, value: HashAndFormat) -> Result { + Ok(self + .rpc + .rpc(CreateTagRequest { value, batch: None }) + .await??) + } + + /// Set a tag to a value, overwriting any existing value. + /// + /// This is a convenience wrapper around `set_opt`. + pub async fn set(&self, name: Tag, value: HashAndFormat) -> Result<()> { + self.set_with_opts(name, Some(value)).await + } + /// Delete a tag. + /// + /// This is a convenience wrapper around `set_opt`. pub async fn delete(&self, name: Tag) -> Result<()> { - self.rpc.rpc(DeleteTagRequest { name }).await??; + self.set_with_opts(name, None).await + } + + /// Set a tag to a value, overwriting any existing value. + /// + /// Setting the value to `None` deletes the tag. Setting the value to `Some` creates or updates the tag. + pub async fn set_with_opts(&self, name: Tag, value: Option) -> Result<()> { + self.rpc + .rpc(SetTagRequest { + name, + value, + batch: None, + }) + .await??; Ok(()) } } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 238577d40c..6968989830 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -3,6 +3,7 @@ //! A node is a server that serves various protocols. //! //! To shut down the node, call [`Node::shutdown`]. +use std::collections::BTreeMap; use std::path::Path; use std::sync::Arc; use std::{collections::BTreeSet, net::SocketAddr}; @@ -12,7 +13,9 @@ use anyhow::{anyhow, Result}; use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; +use iroh_blobs::util::TagDrop; use iroh_blobs::{downloader::Downloader, protocol::Closed}; +use iroh_blobs::{HashAndFormat, TempTag}; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; use iroh_net::Endpoint; @@ -23,6 +26,7 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::{debug, error, info, warn}; +use crate::rpc_protocol::BatchId; use crate::{ client::RpcService, node::{docs::DocsEngine, protocol::ProtocolMap}, @@ -68,6 +72,77 @@ struct NodeInner { #[debug("rt")] rt: LocalPoolHandle, downloader: Downloader, + blob_batches: tokio::sync::Mutex, +} + +/// Keeps track of all the currently active batch operations of the blobs api. +#[derive(Debug, Default)] +struct BlobBatches { + /// Currently active batches + batches: BTreeMap, + /// Used to generate new batch ids. + max: u64, +} + +/// A single batch of blob operations +#[derive(Debug, Default)] +struct BlobBatch { + /// Each counter corresponds to the number of temp tags we have sent to the client + /// for this hash and format. Counters should never be zero. + tags: BTreeMap, +} + +impl BlobBatches { + /// Create a new unique batch id. + fn create(&mut self) -> BatchId { + let id = self.max; + self.max += 1; + BatchId(id) + } + + /// Store a temp tag in a batch identified by a batch id. + fn store(&mut self, batch: BatchId, tt: TempTag) { + let entry = self.batches.entry(batch).or_default(); + let count = entry.tags.entry(tt.hash_and_format()).or_default(); + tt.leak(); + *count += 1; + } + + /// Remove a tag from a batch. + fn remove_one( + &mut self, + batch: BatchId, + content: &HashAndFormat, + tag_drop: Option<&dyn TagDrop>, + ) -> Result<()> { + if let Some(scope) = self.batches.get_mut(&batch) { + if let Some(counter) = scope.tags.get_mut(content) { + *counter -= 1; + if let Some(tag_drop) = tag_drop { + tag_drop.on_drop(content); + } + if *counter == 0 { + scope.tags.remove(content); + } + return Ok(()); + } + } + // this can happen if we try to upgrade a tag from an expired batch + anyhow::bail!("tag not found in batch"); + } + + /// Remove an entire batch. + fn remove(&mut self, batch: BatchId, tag_drop: Option<&dyn TagDrop>) { + if let Some(scope) = self.batches.remove(&batch) { + for (content, count) in scope.tags { + if let Some(tag_drop) = tag_drop { + for _ in 0..count { + tag_drop.on_drop(&content); + } + } + } + } + } } /// In memory node. diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index a8f770f0aa..6b75d0d001 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -540,7 +540,6 @@ where // so this has zero overhead. let controller = quic_rpc::transport::boxed::Connection::new(controller); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); - let inner = Arc::new(NodeInner { rpc_port: self.rpc_port, db: self.blobs_store, @@ -551,6 +550,7 @@ where cancel_token: CancellationToken::new(), rt: lp, downloader, + blob_batches: Default::default(), gossip, }); diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index f24854b139..5a0517bc22 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; +use futures_util::FutureExt; use genawaiter::sync::{Co, Gen}; use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; @@ -13,15 +14,16 @@ use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; +use iroh_blobs::provider::BatchAddPathProgress; use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry}; use iroh_blobs::util::progress::ProgressSender; -use iroh_blobs::BlobFormat; use iroh_blobs::{ provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, util::progress::FlumeProgressSender, HashAndFormat, }; +use iroh_blobs::{BlobFormat, Tag}; use iroh_io::AsyncSliceReader; use iroh_net::relay::RelayUrl; use iroh_net::{Endpoint, NodeAddr, NodeId}; @@ -33,6 +35,7 @@ use tokio::task::JoinSet; use tokio_util::{either::Either, task::LocalPoolHandle}; use tracing::{debug, info, warn}; +use crate::client::blobs::BlobStatus; use crate::client::{ blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, tags::TagInfo, @@ -40,16 +43,18 @@ use crate::client::{ }; use crate::node::{docs::DocsEngine, NodeInner}; use crate::rpc_protocol::{ - BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, - BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, - BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest, - BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, - DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, - ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, - NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, - NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, - NodeWatchResponse, Request, RpcService, SetTagOption, + BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, + BatchUpdate, BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, + BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, + BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, BlobExportResponse, + BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, + BlobStatusRequest, BlobStatusResponse, BlobValidateRequest, CreateTagRequest, + DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, + DocSetHashRequest, ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, + NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, + NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, + NodeWatchRequest, NodeWatchResponse, Request, RpcService, SetTagOption, SetTagRequest, }; mod docs; @@ -141,14 +146,15 @@ impl Handler { .await } NodeConnectionInfo(msg) => chan.rpc(msg, self, Self::node_connection_info).await, + BlobStatus(msg) => chan.rpc(msg, self, Self::blob_status).await, BlobList(msg) => chan.server_streaming(msg, self, Self::blob_list).await, BlobListIncomplete(msg) => { chan.server_streaming(msg, self, Self::blob_list_incomplete) .await } - CreateCollection(msg) => chan.rpc(msg, self, Self::create_collection).await, - ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, - DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, + SetTag(msg) => chan.rpc(msg, self, Self::tags_set_tag).await, + CreateTag(msg) => chan.rpc(msg, self, Self::tags_create_tag).await, + ListTags(msg) => chan.server_streaming(msg, self, Self::tags_list_tags).await, BlobDeleteBlob(msg) => chan.rpc(msg, self, Self::blob_delete_blob).await, BlobAddPath(msg) => { chan.server_streaming(msg, self, Self::blob_add_from_path) @@ -164,7 +170,15 @@ impl Handler { BlobReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, BlobAddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, BlobAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), - + BatchCreate(msg) => chan.bidi_streaming(msg, self, Self::batch_create).await, + BatchUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await, + BatchAddStream(msg) => chan.bidi_streaming(msg, self, Self::batch_add_stream).await, + BatchAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + BatchAddPath(msg) => { + chan.server_streaming(msg, self, Self::batch_add_from_path) + .await + } AuthorList(msg) => { chan.server_streaming(msg, self, |handler, req| { handler.with_docs_stream(|docs| docs.author_list(req)) @@ -381,6 +395,22 @@ impl Handler { Ok(()) } + async fn blob_status(self, msg: BlobStatusRequest) -> RpcResult { + let entry = self.inner.db.get(&msg.hash).await?; + Ok(BlobStatusResponse(match entry { + Some(entry) => { + if entry.is_complete() { + BlobStatus::Complete { + size: entry.size().value(), + } + } else { + BlobStatus::Partial { size: entry.size() } + } + } + None => BlobStatus::NotFound, + })) + } + fn blob_list( self, _msg: BlobListRequest, @@ -403,18 +433,12 @@ impl Handler { }) } - async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> { - self.inner.db.set_tag(msg.name, None).await?; - Ok(()) - } - async fn blob_delete_blob(self, msg: BlobDeleteBlobRequest) -> RpcResult<()> { self.inner.db.delete(vec![msg.hash]).await?; Ok(()) } - fn blob_list_tags(self, msg: ListTagsRequest) -> impl Stream + Send + 'static { - tracing::info!("blob_list_tags"); + fn tags_list_tags(self, msg: ListTagsRequest) -> impl Stream + Send + 'static { Gen::new(|co| async move { let tags = self.inner.db.tags().await.unwrap(); #[allow(clippy::manual_flatten)] @@ -428,6 +452,31 @@ impl Handler { }) } + async fn tags_set_tag(self, msg: SetTagRequest) -> RpcResult<()> { + if let Some(batch) = msg.batch { + if let Some(content) = msg.value.as_ref() { + self.inner.blob_batches.lock().await.remove_one( + batch, + content, + self.inner.db.tag_drop(), + )?; + } + } + self.inner.db.set_tag(msg.name, msg.value).await?; + Ok(()) + } + + async fn tags_create_tag(self, msg: CreateTagRequest) -> RpcResult { + if let Some(batch) = msg.batch { + self.inner.blob_batches.lock().await.remove_one( + batch, + &msg.value, + self.inner.db.tag_drop(), + )?; + } + Ok(self.inner.db.create_tag(msg.value).await?) + } + /// Invoke validate on the database and stream out the result fn blob_validate( self, @@ -768,6 +817,46 @@ impl Handler { Ok(()) } + async fn batch_add_from_path0( + self, + msg: BatchAddPathRequest, + progress: flume::Sender, + ) -> anyhow::Result<()> { + let progress = FlumeProgressSender::new(progress); + // convert import progress to provide progress + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }), + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddPathProgress::Progress { offset }) + } + ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }), + _ => None, + }); + let BatchAddPathRequest { + path: root, + import_mode, + format, + batch, + } = msg; + // Check that the path is absolute and exists. + anyhow::ensure!(root.is_absolute(), "path must be absolute"); + anyhow::ensure!( + root.exists(), + "trying to add missing path: {}", + root.display() + ); + let (tag, _) = self + .inner + .db + .import_file(root, import_mode, format, import_progress) + .await?; + let hash = *tag.hash(); + self.inner.blob_batches.lock().await.store(batch, tag); + + progress.send(BatchAddPathProgress::Done { hash }).await?; + Ok(()) + } + #[allow(clippy::unused_async)] async fn node_stats(self, _req: NodeStatsRequest) -> RpcResult { #[cfg(feature = "metrics")] @@ -833,6 +922,119 @@ impl Handler { }) } + fn batch_create( + self, + _: BatchCreateRequest, + mut updates: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + async move { + let batch = self.inner.blob_batches.lock().await.create(); + tokio::spawn(async move { + while let Some(item) = updates.next().await { + match item { + BatchUpdate::Drop(content) => { + // this can not fail, since we keep the batch alive. + // therefore it is safe to ignore the result. + let _ = self.inner.blob_batches.lock().await.remove_one( + batch, + &content, + self.inner.db.tag_drop(), + ); + } + BatchUpdate::Ping => {} + } + } + self.inner + .blob_batches + .lock() + .await + .remove(batch, self.inner.db.tag_drop()); + }); + BatchCreateResponse::Id(batch) + } + .into_stream() + } + + #[allow(clippy::unused_async)] + async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { + let tag = self.inner.db.temp_tag(msg.content); + self.inner.blob_batches.lock().await.store(msg.batch, tag); + Ok(()) + } + + fn batch_add_stream( + self, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + let (tx, rx) = flume::bounded(32); + let this = self.clone(); + + self.rt().spawn_pinned(|| async move { + if let Err(err) = this.batch_add_stream0(msg, stream, tx.clone()).await { + tx.send_async(BatchAddStreamResponse::Abort(err.into())) + .await + .ok(); + } + }); + rx.into_stream() + } + + fn batch_add_from_path( + self, + msg: BatchAddPathRequest, + ) -> impl Stream { + // provide a little buffer so that we don't slow down the sender + let (tx, rx) = flume::bounded(32); + let tx2 = tx.clone(); + self.rt().spawn_pinned(|| async move { + if let Err(e) = self.batch_add_from_path0(msg, tx).await { + tx2.send_async(BatchAddPathProgress::Abort(e.into())) + .await + .ok(); + } + }); + rx.into_stream().map(BatchAddPathResponse) + } + + async fn batch_add_stream0( + self, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + progress: flume::Sender, + ) -> anyhow::Result<()> { + let progress = FlumeProgressSender::new(progress); + + let stream = stream.map(|item| match item { + BatchAddStreamUpdate::Chunk(chunk) => Ok(chunk), + BatchAddStreamUpdate::Abort => { + Err(io::Error::new(io::ErrorKind::Interrupted, "Remote abort")) + } + }); + + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddStreamResponse::OutboardProgress { offset }) + } + _ => None, + }); + let (temp_tag, _len) = self + .inner + .db + .import_stream(stream, msg.format, import_progress) + .await?; + let hash = temp_tag.inner().hash; + self.inner + .blob_batches + .lock() + .await + .store(msg.batch, temp_tag); + progress + .send(BatchAddStreamResponse::Result { hash }) + .await?; + Ok(()) + } + fn blob_add_stream( self, msg: BlobAddStreamRequest, @@ -995,37 +1197,6 @@ impl Handler { let conn_info = self.inner.endpoint.connection_info(node_id); Ok(NodeConnectionInfoResponse { conn_info }) } - - async fn create_collection( - self, - req: CreateCollectionRequest, - ) -> RpcResult { - let CreateCollectionRequest { - collection, - tag, - tags_to_delete, - } = req; - - let temp_tag = collection.store(&self.inner.db).await?; - let hash_and_format = temp_tag.inner(); - let HashAndFormat { hash, .. } = *hash_and_format; - let tag = match tag { - SetTagOption::Named(tag) => { - self.inner - .db - .set_tag(tag.clone(), Some(*hash_and_format)) - .await?; - tag - } - SetTagOption::Auto => self.inner.db.create_tag(*hash_and_format).await?, - }; - - for tag in tags_to_delete { - self.inner.db.set_tag(tag, None).await?; - } - - Ok(CreateCollectionResponse { hash, tag }) - } } async fn download( diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 8334590a11..102cce82a5 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -14,9 +14,10 @@ use derive_more::{From, TryInto}; use iroh_base::node_addr::AddrInfoOptions; pub use iroh_blobs::{export::ExportProgress, get::db::DownloadProgress, BlobFormat, Hash}; use iroh_blobs::{ - format::collection::Collection, - store::{BaoBlobSize, ConsistencyCheckProgress}, + provider::BatchAddPathProgress, + store::{BaoBlobSize, ConsistencyCheckProgress, ImportMode}, util::Tag, + HashAndFormat, }; use iroh_net::{ endpoint::{ConnectionInfo, NodeAddr}, @@ -44,13 +45,42 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress}; use iroh_docs::engine::LiveEvent; use crate::client::{ - blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + blobs::{BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, WrapOption}, docs::{ImportProgress, ShareMode}, tags::TagInfo, NodeStatus, }; pub use iroh_blobs::util::SetTagOption; +/// Request to create a new scope for temp tags +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateRequest; + +/// Update to a temp tag scope +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchUpdate { + /// Drop of a remote temp tag + Drop(HashAndFormat), + /// Message to check that the connection is still alive + Ping, +} + +/// Response to a temp tag scope request +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchCreateResponse { + /// We got the id of the scope + Id(BatchId), +} + +impl Msg for BatchCreateRequest { + type Pattern = BidiStreaming; +} + +impl BidiStreamingMsg for BatchCreateRequest { + type Update = BatchUpdate; + type Response = BatchCreateResponse; +} + /// A request to the node to provide the data at the given path /// /// Will produce a stream of [`AddProgress`] messages. @@ -126,7 +156,7 @@ pub struct BlobExportRequest { /// This should be an absolute path valid for the file system on which /// the node runs. pub path: PathBuf, - /// Set to [`ExportFormat::Collection`] if the `hash` refers to a [`Collection`] and you want + /// Set to [`ExportFormat::Collection`] if the `hash` refers to a collection and you want /// to export all children of the collection into individual files. pub format: ExportFormat, /// The mode of exporting. @@ -177,6 +207,21 @@ impl ServerStreamingMsg for BlobValidateRequest { type Response = ValidateProgress; } +/// Get the status of a blob +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobStatusRequest { + /// The hash of the blob + pub hash: Hash, +} + +/// The response to a status request +#[derive(Debug, Serialize, Deserialize, derive_more::From, derive_more::Into)] +pub struct BlobStatusResponse(pub BlobStatus); + +impl RpcMsg for BlobStatusRequest { + type Response = RpcResult; +} + /// List all blobs, including collections #[derive(Debug, Serialize, Deserialize)] pub struct BlobListRequest; @@ -259,36 +304,30 @@ impl RpcMsg for BlobDeleteBlobRequest { /// Delete a tag #[derive(Debug, Serialize, Deserialize)] -pub struct DeleteTagRequest { +pub struct SetTagRequest { /// Name of the tag pub name: Tag, + /// Value of the tag, None to delete + pub value: Option, + /// Batch to use, none for global + pub batch: Option, } -impl RpcMsg for DeleteTagRequest { +impl RpcMsg for SetTagRequest { type Response = RpcResult<()>; } -/// Create a collection. -#[derive(Debug, Serialize, Deserialize)] -pub struct CreateCollectionRequest { - /// The collection - pub collection: Collection, - /// Tag option. - pub tag: SetTagOption, - /// Tags that should be deleted after creation. - pub tags_to_delete: Vec, -} -/// A response to a create collection request +/// Create a tag #[derive(Debug, Serialize, Deserialize)] -pub struct CreateCollectionResponse { - /// The resulting hash. - pub hash: Hash, - /// The resulting tag. - pub tag: Tag, +pub struct CreateTagRequest { + /// Value of the tag + pub value: HashAndFormat, + /// Batch to use, none for global + pub batch: Option, } -impl RpcMsg for CreateCollectionRequest { - type Response = RpcResult; +impl RpcMsg for CreateTagRequest { + type Response = RpcResult; } /// List connection information about all the nodes we know about @@ -1011,6 +1050,82 @@ impl BidiStreamingMsg for BlobAddStreamRequest { #[derive(Debug, Serialize, Deserialize, derive_more::Into)] pub struct BlobAddStreamResponse(pub AddProgress); +#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] +pub struct BatchId(pub(crate) u64); + +/// Create a temp tag with a given hash and format +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateTempTagRequest { + /// Content to protect + pub content: HashAndFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +impl RpcMsg for BatchCreateTempTagRequest { + type Response = RpcResult<()>; +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddStreamRequest { + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub enum BatchAddStreamUpdate { + /// A chunk of stream data + Chunk(Bytes), + /// Abort the request due to an error on the client side + Abort, +} + +impl Msg for BatchAddStreamRequest { + type Pattern = BidiStreaming; +} + +impl BidiStreamingMsg for BatchAddStreamRequest { + type Update = BatchAddStreamUpdate; + type Response = BatchAddStreamResponse; +} + +/// Wrapper around [`AddProgress`]. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddStreamResponse { + Abort(RpcError), + OutboardProgress { offset: u64 }, + Result { hash: Hash }, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathRequest { + /// The path to the data to provide. + pub path: PathBuf, + /// Add the data in place + pub import_mode: ImportMode, + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Response to a batch add path request +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathResponse(pub BatchAddPathProgress); + +impl Msg for BatchAddPathRequest { + type Pattern = ServerStreaming; +} + +impl ServerStreamingMsg for BatchAddPathRequest { + type Response = BatchAddPathResponse; +} + /// Get stats for the running Iroh node #[derive(Serialize, Deserialize, Debug)] pub struct NodeStatsRequest {} @@ -1060,13 +1175,21 @@ pub enum Request { BlobDownload(BlobDownloadRequest), BlobExport(BlobExportRequest), BlobList(BlobListRequest), + BlobStatus(BlobStatusRequest), BlobListIncomplete(BlobListIncompleteRequest), BlobDeleteBlob(BlobDeleteBlobRequest), BlobValidate(BlobValidateRequest), BlobFsck(BlobConsistencyCheckRequest), - CreateCollection(CreateCollectionRequest), - DeleteTag(DeleteTagRequest), + BatchCreate(BatchCreateRequest), + BatchUpdate(BatchUpdate), + BatchCreateTempTag(BatchCreateTempTagRequest), + BatchAddStream(BatchAddStreamRequest), + BatchAddStreamUpdate(BatchAddStreamUpdate), + BatchAddPath(BatchAddPathRequest), + + SetTag(SetTagRequest), + CreateTag(CreateTagRequest), ListTags(ListTagsRequest), DocOpen(DocOpenRequest), @@ -1118,15 +1241,20 @@ pub enum Response { BlobAddStream(BlobAddStreamResponse), BlobAddPath(BlobAddPathResponse), BlobList(RpcResult), + BlobStatus(RpcResult), BlobListIncomplete(RpcResult), BlobDownload(BlobDownloadResponse), BlobFsck(ConsistencyCheckProgress), BlobExport(BlobExportResponse), BlobValidate(ValidateProgress), - CreateCollection(RpcResult), + + BatchCreate(BatchCreateResponse), + BatchAddStream(BatchAddStreamResponse), + BatchAddPath(BatchAddPathResponse), ListTags(TagInfo), DeleteTag(RpcResult<()>), + CreateTag(RpcResult), DocOpen(RpcResult), DocClose(RpcResult), diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs new file mode 100644 index 0000000000..aaf9fbe542 --- /dev/null +++ b/iroh/tests/batch.rs @@ -0,0 +1,239 @@ +use std::{io, time::Duration}; + +use bao_tree::blake3; +use bytes::Bytes; +use iroh::{ + client::blobs::{AddDirOpts, WrapOption}, + node::GcPolicy, +}; +use iroh_blobs::store::mem::Store; + +async fn create_node() -> anyhow::Result> { + iroh::node::Node::memory() + .gc_policy(GcPolicy::Interval(Duration::from_millis(10))) + .spawn() + .await +} + +async fn wait_for_gc() { + // wait for multiple gc cycles to ensure that the data is actually gone + tokio::time::sleep(Duration::from_millis(50)).await; +} + +/// Test that add_bytes adds the right data +#[tokio::test] +async fn add_bytes() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(data).into()); + assert_eq!(actual.as_ref(), data); + Ok(()) +} + +/// Test that add_bytes adds the right data +#[tokio::test] +async fn add_stream() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let data_stream = futures_lite::stream::iter([io::Result::Ok(Bytes::copy_from_slice(data))]); + let tag = batch.add_stream(data_stream).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(data).into()); + assert_eq!(actual.as_ref(), data); + Ok(()) +} + +/// Test that add_file adds the right data +#[tokio::test] +async fn add_file() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let temp_path = dir.path().join("test"); + std::fs::write(&temp_path, b"test")?; + let (tag, _) = batch.add_file(temp_path).await?; + let hash = *tag.hash(); + let actual = client.read_to_bytes(hash).await?; + assert_eq!(hash, blake3::hash(b"test").into()); + assert_eq!(actual.as_ref(), b"test"); + Ok(()) +} + +/// Tests that add_dir adds the right data +#[tokio::test] +async fn add_dir() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; + for (name, content) in &data { + let temp_path = dir.path().join(name); + std::fs::write(&temp_path, content)?; + } + let tag = batch.add_dir(dir.path().to_owned()).await?; + assert!(client.has(*tag.hash()).await?); + for (_, content) in &data { + let hash = blake3::hash(content).into(); + let data = client.read_to_bytes(hash).await?; + assert_eq!(data.as_ref(), *content); + } + Ok(()) +} + +/// Tests that add_dir adds the right data +#[tokio::test] +async fn add_dir_single_file() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let temp_path = dir.path().join("test"); + let data: &[u8] = b"test"; + std::fs::write(&temp_path, data)?; + let tag = batch + .add_dir_with_opts( + temp_path, + AddDirOpts { + wrap: WrapOption::Wrap { name: None }, + ..Default::default() + }, + ) + .await?; + assert!(client.read_to_bytes(*tag.hash()).await.is_ok()); + let hash = blake3::hash(data).into(); + let actual_data = client.read_to_bytes(hash).await?; + assert_eq!(actual_data.as_ref(), data); + Ok(()) +} + +#[tokio::test] +async fn batch_drop() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(batch); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// This checks that dropping a tag makes the data eligible for garbage collection. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn tag_drop_raw() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// Tests that data is preserved if a second temp tag is created for it +/// before the first temp tag is dropped. +#[tokio::test] +async fn temp_tag_copy() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + let hash = *tag.hash(); + // Check that the store has the data and that it is protected from gc + wait_for_gc().await; + assert!(client.has(hash).await?); + // Create an additional temp tag for the same data + let tag2 = batch.temp_tag(tag.hash_and_format()).await?; + drop(tag); + // Check that the data is still present + wait_for_gc().await; + assert!(client.has(hash).await?); + drop(tag2); + // Check that the data is gone since both temp tags are dropped + wait_for_gc().await; + assert!(!client.has(hash).await?); + Ok(()) +} + +/// Tests that temp tags work properly for hash sequences, using add_dir +/// to add the data. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn tag_drop_hashseq() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let data: [(&str, &[u8]); 2] = [("test1", b"test1"), ("test2", b"test2")]; + for (name, content) in &data { + let temp_path = dir.path().join(name); + std::fs::write(&temp_path, content)?; + } + let tag = batch.add_dir(dir.path().to_owned()).await?; + let hash = *tag.hash(); + // weird signature to avoid async move issues + let check_present = |present: &'static bool| async { + assert!(client.has(hash).await? == *present); + for (_, content) in &data { + let hash = blake3::hash(content).into(); + assert!(client.has(hash).await? == *present); + } + anyhow::Ok(()) + }; + // Check that the store has the data immediately after adding it + check_present(&true).await?; + // Check that it is protected from gc + wait_for_gc().await; + check_present(&true).await?; + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + wait_for_gc().await; + check_present(&false).await?; + Ok(()) +} + +/// This checks that dropping a tag makes the data eligible for garbage collection. +/// +/// Note that we might change this behavior in the future and only drop the data +/// once the batch is dropped. +#[tokio::test] +async fn wrong_batch() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs(); + let batch = client.batch().await?; + let data: &[u8] = b"test"; + let tag = batch.add_bytes(data).await?; + drop(batch); + let batch = client.batch().await?; + assert!(batch.upgrade(tag).await.is_err()); + Ok(()) +}