From bf0baead088b73d4587658e9ba7aa8906f8a4be2 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 3 Jun 2024 17:04:16 +0300 Subject: [PATCH] Add ability to add from path in batch api --- iroh-blobs/src/provider.rs | 24 ++++++++++++ iroh/src/client/blobs.rs | 78 +++++++++++++++++++++++++++++++------ iroh/src/node/rpc.rs | 79 +++++++++++++++++++++++++++++++++++--- iroh/src/rpc_protocol.rs | 28 ++++++++++++++ 4 files changed, 192 insertions(+), 17 deletions(-) diff --git a/iroh-blobs/src/provider.rs b/iroh-blobs/src/provider.rs index 7fe4e13004..7e481f4c2b 100644 --- a/iroh-blobs/src/provider.rs +++ b/iroh-blobs/src/provider.rs @@ -153,6 +153,30 @@ pub enum AddProgress { Abort(RpcError), } +/// Progress updates for the batch add operation. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddProgress { + /// An item was found with the given size + Found { + /// The size of the entry in bytes. + size: u64, + }, + /// We got progress ingesting item `id`. + Progress { + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done with `id`, and the hash is `hash`. + Done { + /// The hash of the entry. + hash: Hash, + }, + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(RpcError), +} + /// Read the request from the getter. /// /// Will fail if there is an error while reading, if the reader diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 2a88d9787c..6631add2e9 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -18,6 +18,7 @@ use iroh_blobs::{ export::ExportProgress as BytesExportProgress, format::collection::Collection, get::db::DownloadProgress as BytesDownloadProgress, + provider::BatchAddProgress, store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, util::TagDrop, BlobFormat, Hash, HashAndFormat, Tag, TempTag, @@ -34,8 +35,8 @@ use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; use crate::rpc_protocol::{ - BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, - BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddStreamRequest, + BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, + BatchCreateRequest, BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, @@ -65,9 +66,13 @@ where pub async fn batch(&self) -> Result> { let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; let updates = Mutex::new(updates); - let BatchCreateResponse::Id(id) = stream.next().await.context("expected scope id")??; + let BatchCreateResponse::Id(scope) = stream.next().await.context("expected scope id")??; let rpc = self.rpc.clone(); - Ok(Batch(Arc::new(BatchInner { id, rpc, updates }))) + Ok(Batch(Arc::new(BatchInner { + scope, + rpc, + updates, + }))) } /// Stream the contents of a a single blob. /// @@ -386,7 +391,7 @@ where #[derive(derive_more::Debug)] struct BatchInner> { /// The id of the scope. - id: u64, + scope: u64, /// The rpc client. rpc: RpcClient, /// The stream to send drop @@ -407,6 +412,17 @@ impl> TagDrop for BatchInner { } impl> Batch { + /// Write a blob by passing an async reader. + pub async fn add_reader( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + format: BlobFormat, + ) -> anyhow::Result { + const CAP: usize = 1024 * 64; // send 64KB per request by default + let input = ReaderStream::with_capacity(reader, CAP); + self.add_stream(input, format).await + } + /// Write a blob by passing bytes. pub async fn add_bytes(&self, bytes: impl Into, format: BlobFormat) -> Result { let input = futures_lite::stream::once(Ok(bytes.into())); @@ -423,7 +439,7 @@ impl> Batch { .0 .rpc .bidi(BatchAddStreamRequest { - scope: self.0.id, + scope: self.0.scope, format, }) .await?; @@ -460,11 +476,51 @@ impl> Batch { } } let hash = res.context("Missing answer")?; - let t: Arc = self.0.clone(); - Ok(TempTag::new( - HashAndFormat { hash, format }, - Some(Arc::downgrade(&t)), - )) + Ok(self.temp_tag(HashAndFormat { hash, format })) + } + + /// Import a blob from a filesystem path. + /// + /// `path` should be an absolute path valid for the file system on which + /// the node runs. + /// If `in_place` is true, Iroh will assume that the data will not change and will share it in + /// place without copying to the Iroh data directory. + pub async fn add_from_path( + &self, + path: PathBuf, + in_place: bool, + format: BlobFormat, + ) -> Result { + let mut stream = self + .0 + .rpc + .server_streaming(BatchAddPathRequest { + path, + in_place, + format, + scope: self.0.scope, + }) + .await?; + let mut res = None; + while let Some(item) = stream.next().await { + match item?.0 { + BatchAddProgress::Abort(cause) => { + Err(cause)?; + } + BatchAddProgress::Done { hash } => { + res = Some(hash); + } + _ => {} + } + } + let hash = res.context("Missing answer")?; + Ok(self.temp_tag(HashAndFormat { hash, format })) + } + + fn temp_tag(&self, inner: HashAndFormat) -> TempTag { + let on_drop: Arc = self.0.clone(); + let on_drop = Some(Arc::downgrade(&on_drop)); + TempTag::new(inner, on_drop) } } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 4aa4cb600d..e07de2165a 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -13,6 +13,7 @@ use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; +use iroh_blobs::provider::BatchAddProgress; use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry}; use iroh_blobs::util::progress::ProgressSender; use iroh_blobs::BlobFormat; @@ -39,11 +40,11 @@ use crate::client::blobs::{ use crate::client::tags::TagInfo; use crate::client::NodeStatus; use crate::rpc_protocol::{ - BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, - BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddPathResponse, - BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, - BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, - BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse, + BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchUpdate, BlobAddPathRequest, + BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, + BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, + BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, @@ -106,6 +107,10 @@ impl Handler { .await } BatchAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + BatchAddPath(msg) => { + chan.server_streaming(msg, handler, Self::batch_add_from_path) + .await + } ListTags(msg) => { chan.server_streaming(msg, handler, Self::blob_list_tags) .await @@ -785,6 +790,54 @@ impl Handler { Ok(()) } + async fn batch_add_from_path0( + self, + msg: BatchAddPathRequest, + progress: flume::Sender, + ) -> anyhow::Result<()> { + use iroh_blobs::store::ImportMode; + + let progress = FlumeProgressSender::new(progress); + // convert import progress to provide progress + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::Size { size, .. } => Some(BatchAddProgress::Found { size }), + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddProgress::Progress { offset }) + } + ImportProgress::OutboardDone { hash, .. } => Some(BatchAddProgress::Done { hash }), + _ => None, + }); + let BatchAddPathRequest { + path: root, + in_place, + format, + scope, + } = msg; + // Check that the path is absolute and exists. + anyhow::ensure!(root.is_absolute(), "path must be absolute"); + anyhow::ensure!( + root.exists(), + "trying to add missing path: {}", + root.display() + ); + + let import_mode = match in_place { + true => ImportMode::TryReference, + false => ImportMode::Copy, + }; + + let (tag, _) = self + .inner + .db + .import_file(root, import_mode, format, import_progress) + .await?; + let hash = *tag.hash(); + self.inner.blob_scopes.lock().unwrap().store(scope, tag); + + progress.send(BatchAddProgress::Done { hash }).await?; + Ok(()) + } + #[allow(clippy::unused_async)] async fn node_stats(self, _req: NodeStatsRequest) -> RpcResult { #[cfg(feature = "metrics")] @@ -867,7 +920,6 @@ impl Handler { } } } - println!("dropping scope {}", scope_id); self.inner .blob_scopes .lock() @@ -895,6 +947,21 @@ impl Handler { rx.into_stream() } + fn batch_add_from_path( + self, + msg: BatchAddPathRequest, + ) -> impl Stream { + // provide a little buffer so that we don't slow down the sender + let (tx, rx) = flume::bounded(32); + let tx2 = tx.clone(); + self.rt().spawn_pinned(|| async move { + if let Err(e) = self.batch_add_from_path0(msg, tx).await { + tx2.send_async(BatchAddProgress::Abort(e.into())).await.ok(); + } + }); + rx.into_stream().map(BatchAddPathResponse) + } + async fn batch_add_stream0( self, msg: BatchAddStreamRequest, diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 1117773e0f..76ae87551a 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -15,6 +15,7 @@ use iroh_base::node_addr::AddrInfoOptions; pub use iroh_blobs::{export::ExportProgress, get::db::DownloadProgress, BlobFormat, Hash}; use iroh_blobs::{ format::collection::Collection, + provider::BatchAddProgress, store::{BaoBlobSize, ConsistencyCheckProgress}, util::Tag, HashAndFormat, @@ -1077,6 +1078,31 @@ pub enum BatchAddStreamResponse { Result { hash: Hash }, } +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathRequest { + /// The path to the data to provide. + pub path: PathBuf, + /// Add the data in place + pub in_place: bool, + /// What format to use for the blob + pub format: BlobFormat, + /// Scope to create the temp tag in + pub scope: u64, +} + +/// Response to a batch add path request +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathResponse(pub BatchAddProgress); + +impl Msg for BatchAddPathRequest { + type Pattern = ServerStreaming; +} + +impl ServerStreamingMsg for BatchAddPathRequest { + type Response = BatchAddPathResponse; +} + /// Get stats for the running Iroh node #[derive(Serialize, Deserialize, Debug)] pub struct NodeStatsRequest {} @@ -1138,6 +1164,7 @@ pub enum Request { BatchUpdate(BatchUpdate), BatchAddStreamRequest(BatchAddStreamRequest), BatchAddStreamUpdate(BatchAddStreamUpdate), + BatchAddPath(BatchAddPathRequest), DeleteTag(DeleteTagRequest), ListTags(ListTagsRequest), @@ -1203,6 +1230,7 @@ pub enum Response { BatchCreateResponse(BatchCreateResponse), BatchRequest(BatchCreateRequest), BatchAddStream(BatchAddStreamResponse), + BatchAddPath(BatchAddPathResponse), ListTags(TagInfo), DeleteTag(RpcResult<()>),