Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh)!: Blob batch PR, attempt 3 #2545

Merged
merged 20 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,30 @@ pub enum AddProgress {
Abort(RpcError),
}

/// Progress updates for the batch add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchAddPathProgress {
/// An item was found with the given size
Found {
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting the item.
Progress {
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done, and the hash is `hash`.
Done {
/// The hash of the entry.
hash: Hash,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

/// Read the request from the getter.
///
/// Will fail if there is an error while reading, if the reader
Expand Down
11 changes: 4 additions & 7 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,13 +757,6 @@ impl Store {
Ok(self.0.dump().await?)
}

/// Ensure that all operations before the sync are processed and persisted.
///
/// This is done by closing any open write transaction.
pub async fn sync(&self) -> io::Result<()> {
Ok(self.0.sync().await?)
}

/// Import from a v0 or v1 flat store, for backwards compatibility.
pub async fn import_flat_store(&self, paths: FlatStorePaths) -> io::Result<bool> {
Ok(self.0.import_flat_store(paths).await?)
Expand Down Expand Up @@ -1415,6 +1408,10 @@ impl super::Store for Store {
self.0.temp.temp_tag(value)
}

async fn sync(&self) -> io::Result<()> {
Ok(self.0.sync().await?)
}

async fn shutdown(&self) {
self.0.shutdown().await;
}
Expand Down
4 changes: 4 additions & 0 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ impl super::Store for Store {
}

async fn shutdown(&self) {}

async fn sync(&self) -> io::Result<()> {
Ok(())
}
}

#[derive(Debug, Default)]
Expand Down
4 changes: 4 additions & 0 deletions iroh-blobs/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,8 @@ impl super::Store for Store {
}

async fn shutdown(&self) {}

async fn sync(&self) -> io::Result<()> {
Ok(())
}
}
5 changes: 4 additions & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
/// Shutdown the store.
fn shutdown(&self) -> impl Future<Output = ()> + Send;

/// Sync the store.
fn sync(&self) -> impl Future<Output = io::Result<()>> + Send;

/// Validate the database
///
/// This will check that the file and outboard content is correct for all complete
Expand Down Expand Up @@ -703,7 +706,7 @@ pub enum ImportProgress {
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
/// This mode will copy the file into the database before hashing.
///
Expand Down
5 changes: 5 additions & 0 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ impl TempTag {
self.inner.format
}

/// The hash and format of the pinned item
pub fn hash_and_format(&self) -> HashAndFormat {
self.inner
}

/// Keep the item alive until the end of the process
pub fn leak(mut self) {
// set the liveness tracker to None, so that the refcount is not decreased
Expand Down
9 changes: 7 additions & 2 deletions iroh-cli/src/commands/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,15 @@ impl BlobCommands {

let (blob_status, size) = match (status, format) {
(BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => {
("incomplete blob", size.value())
}
(BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size),
(BlobStatus::Partial { size }, BlobFormat::HashSeq) => {
("incomplete collection", size)
("incomplete collection", size.value())
}
(BlobStatus::NotFound, _) => {
return Err(anyhow!("blob is missing"));
}
};
println!(
Expand Down
3 changes: 3 additions & 0 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod gossip;
pub mod node;
pub mod tags;

/// Iroh rpc connection - boxed so that we can have a concrete type.
pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection<RpcService>;

// Keep this type exposed, otherwise every occurrence of `RpcClient` in the API
// will show up as `RpcClient<RpcService, Connection<RpcService>>` in the docs.
/// Iroh rpc client - boxed so that we can have a concrete type.
Expand Down
67 changes: 46 additions & 21 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use std::{
task::{Context, Poll},
};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
Expand All @@ -65,7 +65,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::SetTagOption,
BlobFormat, Hash, Tag,
};
Expand All @@ -77,12 +77,14 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
mod batch;
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};

use crate::rpc_protocol::blobs::{
AddPathRequest, AddStreamRequest, AddStreamUpdate, ConsistencyCheckRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadRequest,
ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse,
ValidateRequest,
AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteRequest, DownloadRequest, ExportRequest, ListIncompleteRequest, ListRequest,
ReadAtRequest, ReadAtResponse, ValidateRequest,
};
use crate::rpc_protocol::node::StatusRequest;

Expand All @@ -102,6 +104,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient {
}

impl Client {
/// Check if a blob is completely stored on the node.
///
/// Note that this will return false for blobs that are partially stored on
/// the node.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
Ok(status.0)
}

/// Check if a blob is completely stored on the node.
///
/// This is just a convenience wrapper around `status` that returns a boolean.
pub async fn has(&self, hash: Hash) -> Result<bool> {
match self.status(hash).await {
Ok(BlobStatus::Complete { .. }) => Ok(true),
Ok(_) => Ok(false),
Err(err) => Err(err),
}
}

/// Create a new batch for adding data.
///
/// A batch is a context in which temp tags are created and data is added to the node. Temp tags
/// are automatically deleted when the batch is dropped, leading to the data being garbage collected
/// unless a permanent tag is created for it.
pub async fn batch(&self) -> Result<Batch> {
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
let rpc = self.rpc.clone();
Ok(Batch::new(batch, rpc, updates, 1024))
}

/// Stream the contents of a a single blob.
///
/// Returns a [`Reader`], which can report the size of the blob before reading it.
Expand Down Expand Up @@ -424,17 +458,6 @@ impl Client {
Ok(ticket)
}

/// Get the status of a blob.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
// TODO: this could be implemented more efficiently
let reader = self.read(hash).await?;
if reader.is_complete {
Ok(BlobStatus::Complete { size: reader.size })
} else {
Ok(BlobStatus::Partial { size: reader.size })
}
}

fn tags_client(&self) -> tags::Client {
tags::Client {
rpc: self.rpc.clone(),
Expand All @@ -449,9 +472,10 @@ impl SimpleStore for Client {
}

/// Whether to wrap the added data in a collection.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub enum WrapOption {
/// Do not wrap the file or directory.
#[default]
NoWrap,
/// Wrap the file or directory in a collection.
Wrap {
Expand All @@ -461,12 +485,14 @@ pub enum WrapOption {
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
/// The blob is not stored at all.
NotFound,
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
size: u64,
size: BaoBlobSize,
},
/// The blob is stored completely.
Complete {
Expand Down Expand Up @@ -943,7 +969,6 @@ pub enum DownloadMode {
mod tests {
use super::*;

use anyhow::Context as _;
use iroh_blobs::hashseq::HashSeq;
use iroh_net::NodeId;
use rand::RngCore;
Expand Down
Loading
Loading