Skip to content

Commit

Permalink
Add ability to add from path in batch api
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 3, 2024
1 parent 23c90d3 commit bf0baea
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 17 deletions.
24 changes: 24 additions & 0 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ pub enum AddProgress {
Abort(RpcError),
}

/// Progress updates for the batch add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchAddProgress {
/// An item was found with the given size
Found {
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting item `id`.
Progress {
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done with `id`, and the hash is `hash`.
Done {
/// The hash of the entry.
hash: Hash,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

/// Read the request from the getter.
///
/// Will fail if there is an error while reading, if the reader
Expand Down
78 changes: 67 additions & 11 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::Collection,
get::db::DownloadProgress as BytesDownloadProgress,
provider::BatchAddProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::TagDrop,
BlobFormat, Hash, HashAndFormat, Tag, TempTag,
Expand All @@ -34,8 +35,8 @@ use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;

use crate::rpc_protocol::{
BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest,
BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddStreamRequest,
BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate,
BatchCreateRequest, BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddStreamRequest,
BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest,
BlobExportRequest, BlobGetCollectionRequest, BlobGetCollectionResponse,
BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest,
Expand Down Expand Up @@ -65,9 +66,13 @@ where
pub async fn batch(&self) -> Result<Batch<C>> {
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
let updates = Mutex::new(updates);
let BatchCreateResponse::Id(id) = stream.next().await.context("expected scope id")??;
let BatchCreateResponse::Id(scope) = stream.next().await.context("expected scope id")??;
let rpc = self.rpc.clone();
Ok(Batch(Arc::new(BatchInner { id, rpc, updates })))
Ok(Batch(Arc::new(BatchInner {
scope,
rpc,
updates,
})))
}
/// Stream the contents of a a single blob.
///
Expand Down Expand Up @@ -386,7 +391,7 @@ where
#[derive(derive_more::Debug)]
struct BatchInner<C: ServiceConnection<RpcService>> {
/// The id of the scope.
id: u64,
scope: u64,
/// The rpc client.
rpc: RpcClient<RpcService, C>,
/// The stream to send drop
Expand All @@ -407,6 +412,17 @@ impl<C: ServiceConnection<RpcService>> TagDrop for BatchInner<C> {
}

impl<C: ServiceConnection<RpcService>> Batch<C> {
/// Write a blob by passing an async reader.
pub async fn add_reader(
&self,
reader: impl AsyncRead + Unpin + Send + 'static,
format: BlobFormat,
) -> anyhow::Result<TempTag> {
const CAP: usize = 1024 * 64; // send 64KB per request by default
let input = ReaderStream::with_capacity(reader, CAP);
self.add_stream(input, format).await
}

/// Write a blob by passing bytes.
pub async fn add_bytes(&self, bytes: impl Into<Bytes>, format: BlobFormat) -> Result<TempTag> {
let input = futures_lite::stream::once(Ok(bytes.into()));
Expand All @@ -423,7 +439,7 @@ impl<C: ServiceConnection<RpcService>> Batch<C> {
.0
.rpc
.bidi(BatchAddStreamRequest {
scope: self.0.id,
scope: self.0.scope,
format,
})
.await?;
Expand Down Expand Up @@ -460,11 +476,51 @@ impl<C: ServiceConnection<RpcService>> Batch<C> {
}
}
let hash = res.context("Missing answer")?;
let t: Arc<dyn TagDrop> = self.0.clone();
Ok(TempTag::new(
HashAndFormat { hash, format },
Some(Arc::downgrade(&t)),
))
Ok(self.temp_tag(HashAndFormat { hash, format }))
}

/// Import a blob from a filesystem path.
///
/// `path` should be an absolute path valid for the file system on which
/// the node runs.
/// If `in_place` is true, Iroh will assume that the data will not change and will share it in
/// place without copying to the Iroh data directory.
pub async fn add_from_path(
&self,
path: PathBuf,
in_place: bool,
format: BlobFormat,
) -> Result<TempTag> {
let mut stream = self
.0
.rpc
.server_streaming(BatchAddPathRequest {
path,
in_place,
format,
scope: self.0.scope,
})
.await?;
let mut res = None;
while let Some(item) = stream.next().await {
match item?.0 {
BatchAddProgress::Abort(cause) => {
Err(cause)?;
}
BatchAddProgress::Done { hash } => {
res = Some(hash);
}
_ => {}
}
}
let hash = res.context("Missing answer")?;
Ok(self.temp_tag(HashAndFormat { hash, format }))
}

fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
let on_drop: Arc<dyn TagDrop> = self.0.clone();
let on_drop = Some(Arc::downgrade(&on_drop));
TempTag::new(inner, on_drop)
}
}

Expand Down
79 changes: 73 additions & 6 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use iroh_blobs::export::ExportProgress;
use iroh_blobs::format::collection::Collection;
use iroh_blobs::get::db::DownloadProgress;
use iroh_blobs::get::Stats;
use iroh_blobs::provider::BatchAddProgress;
use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry};
use iroh_blobs::util::progress::ProgressSender;
use iroh_blobs::BlobFormat;
Expand All @@ -39,11 +40,11 @@ use crate::client::blobs::{
use crate::client::tags::TagInfo;
use crate::client::NodeStatus;
use crate::rpc_protocol::{
BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest,
BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddPathResponse,
BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest,
BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse,
BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse,
BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchUpdate, BlobAddPathRequest,
BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate,
BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse,
BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse,
BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest,
BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest,
Expand Down Expand Up @@ -106,6 +107,10 @@ impl<D: BaoStore> Handler<D> {
.await
}
BatchAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage),
BatchAddPath(msg) => {
chan.server_streaming(msg, handler, Self::batch_add_from_path)
.await
}
ListTags(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_tags)
.await
Expand Down Expand Up @@ -785,6 +790,54 @@ impl<D: BaoStore> Handler<D> {
Ok(())
}

async fn batch_add_from_path0(
self,
msg: BatchAddPathRequest,
progress: flume::Sender<BatchAddProgress>,
) -> anyhow::Result<()> {
use iroh_blobs::store::ImportMode;

let progress = FlumeProgressSender::new(progress);
// convert import progress to provide progress
let import_progress = progress.clone().with_filter_map(move |x| match x {
ImportProgress::Size { size, .. } => Some(BatchAddProgress::Found { size }),
ImportProgress::OutboardProgress { offset, .. } => {
Some(BatchAddProgress::Progress { offset })
}
ImportProgress::OutboardDone { hash, .. } => Some(BatchAddProgress::Done { hash }),
_ => None,
});
let BatchAddPathRequest {
path: root,
in_place,
format,
scope,
} = msg;
// Check that the path is absolute and exists.
anyhow::ensure!(root.is_absolute(), "path must be absolute");
anyhow::ensure!(
root.exists(),
"trying to add missing path: {}",
root.display()
);

let import_mode = match in_place {
true => ImportMode::TryReference,
false => ImportMode::Copy,
};

let (tag, _) = self
.inner
.db
.import_file(root, import_mode, format, import_progress)
.await?;
let hash = *tag.hash();
self.inner.blob_scopes.lock().unwrap().store(scope, tag);

progress.send(BatchAddProgress::Done { hash }).await?;
Ok(())
}

#[allow(clippy::unused_async)]
async fn node_stats(self, _req: NodeStatsRequest) -> RpcResult<NodeStatsResponse> {
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -867,7 +920,6 @@ impl<D: BaoStore> Handler<D> {
}
}
}
println!("dropping scope {}", scope_id);
self.inner
.blob_scopes
.lock()
Expand Down Expand Up @@ -895,6 +947,21 @@ impl<D: BaoStore> Handler<D> {
rx.into_stream()
}

fn batch_add_from_path(
self,
msg: BatchAddPathRequest,
) -> impl Stream<Item = BatchAddPathResponse> {
// provide a little buffer so that we don't slow down the sender
let (tx, rx) = flume::bounded(32);
let tx2 = tx.clone();
self.rt().spawn_pinned(|| async move {
if let Err(e) = self.batch_add_from_path0(msg, tx).await {
tx2.send_async(BatchAddProgress::Abort(e.into())).await.ok();
}
});
rx.into_stream().map(BatchAddPathResponse)
}

async fn batch_add_stream0(
self,
msg: BatchAddStreamRequest,
Expand Down
28 changes: 28 additions & 0 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use iroh_base::node_addr::AddrInfoOptions;
pub use iroh_blobs::{export::ExportProgress, get::db::DownloadProgress, BlobFormat, Hash};
use iroh_blobs::{
format::collection::Collection,
provider::BatchAddProgress,
store::{BaoBlobSize, ConsistencyCheckProgress},
util::Tag,
HashAndFormat,
Expand Down Expand Up @@ -1077,6 +1078,31 @@ pub enum BatchAddStreamResponse {
Result { hash: Hash },
}

/// Write a blob from a byte stream
#[derive(Serialize, Deserialize, Debug)]
pub struct BatchAddPathRequest {
/// The path to the data to provide.
pub path: PathBuf,
/// Add the data in place
pub in_place: bool,
/// What format to use for the blob
pub format: BlobFormat,
/// Scope to create the temp tag in
pub scope: u64,
}

/// Response to a batch add path request
#[derive(Serialize, Deserialize, Debug)]
pub struct BatchAddPathResponse(pub BatchAddProgress);

impl Msg<RpcService> for BatchAddPathRequest {
type Pattern = ServerStreaming;
}

impl ServerStreamingMsg<RpcService> for BatchAddPathRequest {
type Response = BatchAddPathResponse;
}

/// Get stats for the running Iroh node
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeStatsRequest {}
Expand Down Expand Up @@ -1138,6 +1164,7 @@ pub enum Request {
BatchUpdate(BatchUpdate),
BatchAddStreamRequest(BatchAddStreamRequest),
BatchAddStreamUpdate(BatchAddStreamUpdate),
BatchAddPath(BatchAddPathRequest),

DeleteTag(DeleteTagRequest),
ListTags(ListTagsRequest),
Expand Down Expand Up @@ -1203,6 +1230,7 @@ pub enum Response {
BatchCreateResponse(BatchCreateResponse),
BatchRequest(BatchCreateRequest),
BatchAddStream(BatchAddStreamResponse),
BatchAddPath(BatchAddPathResponse),

ListTags(TagInfo),
DeleteTag(RpcResult<()>),
Expand Down

0 comments on commit bf0baea

Please sign in to comment.