Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-bytes): Batch blob api #2339

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ pub enum AddProgress {
Abort(RpcError),
}

/// Progress updates for the batch add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchAddPathProgress {
/// An item was found with the given size
Found {
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting item `id`.
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
Progress {
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done with `id`, and the hash is `hash`.
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
Done {
/// The hash of the entry.
hash: Hash,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

/// Read the request from the getter.
///
/// Will fail if there is an error while reading, if the reader
Expand Down
4 changes: 4 additions & 0 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,10 @@ impl super::Store for Store {
self.0.temp.temp_tag(value)
}

fn tag_drop(&self) -> Option<&dyn TagDrop> {
Some(self.0.temp.as_ref())
}

async fn shutdown(&self) {
self.0.shutdown().await;
}
Expand Down
4 changes: 4 additions & 0 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ impl super::Store for Store {
self.inner.temp_tag(tag)
}

fn tag_drop(&self) -> Option<&dyn TagDrop> {
Some(self.inner.as_ref())
}

async fn gc_start(&self) -> io::Result<()> {
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion iroh-blobs/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
},
util::{
progress::{BoxedProgressSender, IdGenerator, ProgressSender},
Tag,
Tag, TagDrop,
},
BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -324,6 +324,10 @@ impl super::Store for Store {
TempTag::new(inner, None)
}

fn tag_drop(&self) -> Option<&dyn TagDrop> {
None
}

async fn gc_start(&self) -> io::Result<()> {
Ok(())
}
Expand Down
10 changes: 8 additions & 2 deletions iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
protocol::RangeSpec,
util::{
progress::{BoxedProgressSender, IdGenerator, ProgressSender},
Tag,
Tag, TagDrop,
},
BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -356,6 +356,12 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
/// Create a temporary pin for this store
fn temp_tag(&self, value: HashAndFormat) -> TempTag;

/// Handle to use to drop tags
///
/// Return None for stores that don't keep track of tags, such as read-only
/// stores.
fn tag_drop(&self) -> Option<&dyn TagDrop>;
rklaehn marked this conversation as resolved.
Show resolved Hide resolved

/// Notify the store that a new gc phase is about to start.
///
/// This should not fail unless the store is shut down or otherwise in a
Expand Down Expand Up @@ -700,7 +706,7 @@ pub enum ImportProgress {
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reexport this from the blobs api since I don't want to newtype it...

/// This mode will copy the file into the database before hashing.
///
Expand Down
8 changes: 7 additions & 1 deletion iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,17 @@ impl TempTag {
self.inner.format
}

/// The hash and format of the pinned item
pub fn hash_and_format(&self) -> HashAndFormat {
self.inner
}

/// Keep the item alive until the end of the process
pub fn leak(mut self) {
// set the liveness tracker to None, so that the refcount is not decreased
// during drop. This means that the refcount will never reach 0 and the
// item will not be gced until the end of the process.
// item will not be gced until the end of the process, unless you manually
// invoke on_drop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reads like it should be moved from a comment to documentation

self.on_drop = None;
}
}
Expand Down
9 changes: 7 additions & 2 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,15 @@ impl BlobCommands {

let (blob_status, size) = match (status, format) {
(BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => {
("incomplete blob", size.value())
}
(BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size),
(BlobStatus::Partial { size }, BlobFormat::HashSeq) => {
("incomplete collection", size)
("incomplete collection", size.value())
}
(BlobStatus::NotFound, _) => {
return Err(anyhow!("blob is missing"));
}
};
println!(
Expand Down
6 changes: 4 additions & 2 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ pub mod tags;
mod node;

/// Iroh rpc client - boxed so that we can have a concrete type.
pub(crate) type RpcClient =
quic_rpc::RpcClient<RpcService, quic_rpc::transport::boxed::Connection<RpcService>>;
pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection<RpcService>;

/// Iroh rpc client - boxed so that we can have a concrete type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not seeing the boxed here from just looking at the type, maybe the docs could be more clear in that regard?

pub(crate) type RpcClient = quic_rpc::RpcClient<RpcService, RpcConnection>;

/// Iroh client.
#[derive(Debug, Clone)]
Expand Down
117 changes: 74 additions & 43 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use std::{
task::{Context, Poll},
};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as _, Result};
use batch::Batch;
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
Expand All @@ -19,7 +20,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
BlobFormat, Hash, Tag,
};
use iroh_net::NodeAddr;
Expand All @@ -30,12 +31,14 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
mod batch;
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts};

use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption,
BatchCreateRequest, BatchCreateResponse, BlobAddPathRequest, BlobAddStreamRequest,
BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest,
BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest,
BlobReadAtResponse, BlobStatusRequest, BlobValidateRequest, NodeStatusRequest, SetTagOption,
};

use super::{flatten, tags, Iroh, RpcClient};
Expand All @@ -54,6 +57,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient {
}

impl Client {
/// Check if a blob is completely stored on the node.
///
/// Note that this will return false for blobs that are partially stored on
/// the node.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
Ok(status.0)
}

/// Check if a blob is completely stored on the node.
///
/// This is just a convenience wrapper around `status` that returns a boolean.
pub async fn has(&self, hash: Hash) -> Result<bool> {
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
match self.status(hash).await {
Ok(BlobStatus::Complete { .. }) => Ok(true),
Ok(_) => Ok(false),
Err(err) => Err(err),
}
}

/// Create a new batch for adding data.
///
/// A batch is a context in which temp tags are created and data is added to the node. Temp tags
/// are automatically deleted when the batch is dropped, leading to the data being garbage collected
/// unless a permanent tag is created for it.
pub async fn batch(&self) -> Result<Batch> {
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
let rpc = self.rpc.clone();
Ok(Batch::new(batch, rpc, updates, 1024))
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
}

/// Stream the contents of a a single blob.
///
/// Returns a [`Reader`], which can report the size of the blob before reading it.
Expand Down Expand Up @@ -127,17 +162,19 @@ impl Client {
pub async fn create_collection(
&self,
collection: Collection,
tag: SetTagOption,
opts: SetTagOption,
tags_to_delete: Vec<Tag>,
) -> anyhow::Result<(Hash, Tag)> {
let CreateCollectionResponse { hash, tag } = self
.rpc
.rpc(CreateCollectionRequest {
collection,
tag,
tags_to_delete,
})
.await??;
let batch = self.batch().await?;
let temp_tag = batch.add_collection(collection).await?;
let hash = *temp_tag.hash();
let tag = batch.upgrade_with_opts(temp_tag, opts).await?;
if !tags_to_delete.is_empty() {
let tags = self.tags_client();
for tag in tags_to_delete {
tags.delete(tag).await?;
}
}
Ok((hash, tag))
}

Expand Down Expand Up @@ -372,17 +409,6 @@ impl Client {
Ok(ticket)
}

/// Get the status of a blob.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
// TODO: this could be implemented more efficiently
let reader = self.read(hash).await?;
if reader.is_complete {
Ok(BlobStatus::Complete { size: reader.size })
} else {
Ok(BlobStatus::Partial { size: reader.size })
}
}

fn tags_client(&self) -> tags::Client {
tags::Client {
rpc: self.rpc.clone(),
Expand All @@ -397,9 +423,10 @@ impl SimpleStore for Client {
}

/// Whether to wrap the added data in a collection.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub enum WrapOption {
/// Do not wrap the file or directory.
#[default]
NoWrap,
/// Wrap the file or directory in a collection.
Wrap {
Expand All @@ -408,21 +435,6 @@ pub enum WrapOption {
},
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlobStatus {
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
size: u64,
},
/// The blob is stored completely.
Complete {
/// The size of the blob.
size: u64,
},
}

/// Outcome of a blob add operation.
#[derive(Debug, Clone)]
pub struct AddOutcome {
Expand Down Expand Up @@ -887,11 +899,30 @@ pub enum DownloadMode {
Queued,
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
/// The blob is not stored on the node.
NotFound,
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
///
/// This can be either a verified size if the last chunk was received,
/// or an unverified size if the last chunk was not yet received.
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
size: BaoBlobSize,
},
/// The blob is stored completely.
Complete {
/// The size of the blob. For a complete blob the size is always known.
size: u64,
},
}

#[cfg(test)]
mod tests {
use super::*;

use anyhow::Context as _;
use rand::RngCore;
use tokio::io::AsyncWriteExt;

Expand Down
Loading
Loading