Skip to content

Commit

Permalink
Add back code from the batch PR
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jul 25, 2024
1 parent 2964569 commit a372bd6
Show file tree
Hide file tree
Showing 13 changed files with 1,195 additions and 34 deletions.
24 changes: 24 additions & 0 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ pub enum AddProgress {
Abort(RpcError),
}

/// Progress updates for the batch add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchAddPathProgress {
/// An item was found with the given size
Found {
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting the item.
Progress {
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done, and the hash is `hash`.
Done {
/// The hash of the entry.
hash: Hash,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

/// Read the request from the getter.
///
/// Will fail if there is an error while reading, if the reader
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ pub enum ImportProgress {
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
/// This mode will copy the file into the database before hashing.
///
Expand Down
5 changes: 5 additions & 0 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ impl TempTag {
self.inner.format
}

/// The hash and format of the pinned item
pub fn hash_and_format(&self) -> HashAndFormat {
self.inner
}

/// Keep the item alive until the end of the process
pub fn leak(mut self) {
// set the liveness tracker to None, so that the refcount is not decreased
Expand Down
9 changes: 7 additions & 2 deletions iroh-cli/src/commands/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,15 @@ impl BlobCommands {

let (blob_status, size) = match (status, format) {
(BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => {
("incomplete blob", size.value())
}
(BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size),
(BlobStatus::Partial { size }, BlobFormat::HashSeq) => {
("incomplete collection", size)
("incomplete collection", size.value())
}
(BlobStatus::NotFound, _) => {
return Err(anyhow!("blob is missing"));
}
};
println!(
Expand Down
8 changes: 5 additions & 3 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ pub mod gossip;
pub mod node;
pub mod tags;

/// Iroh rpc client - boxed so that we can have a concrete type.
pub(crate) type RpcClient =
quic_rpc::RpcClient<RpcService, quic_rpc::transport::boxed::Connection<RpcService>>;
/// Iroh rpc connection - boxed so that we can have a concrete type.
pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection<RpcService>;

/// Iroh rpc client.
pub(crate) type RpcClient = quic_rpc::RpcClient<RpcService, RpcConnection>;

/// Iroh client.
#[derive(Debug, Clone)]
Expand Down
68 changes: 47 additions & 21 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ use std::{
task::{Context, Poll},
};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as _, Result};
use batch::Batch;
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
Expand All @@ -63,7 +64,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::SetTagOption,
BlobFormat, Hash, Tag,
};
Expand All @@ -75,12 +76,14 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
mod batch;
pub use batch::AddDirOpts;

use crate::rpc_protocol::blobs::{
AddPathRequest, AddStreamRequest, AddStreamUpdate, ConsistencyCheckRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadRequest,
ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse,
ValidateRequest,
AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteRequest, DownloadRequest, ExportRequest, ListIncompleteRequest, ListRequest,
ReadAtRequest, ReadAtResponse, ValidateRequest,
};
use crate::rpc_protocol::node::StatusRequest;

Expand All @@ -100,6 +103,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient {
}

impl Client {
/// Check if a blob is completely stored on the node.
///
/// Note that this will return false for blobs that are partially stored on
/// the node.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
Ok(status.0)
}

/// Check if a blob is completely stored on the node.
///
/// This is just a convenience wrapper around `status` that returns a boolean.
pub async fn has(&self, hash: Hash) -> Result<bool> {
match self.status(hash).await {
Ok(BlobStatus::Complete { .. }) => Ok(true),
Ok(_) => Ok(false),
Err(err) => Err(err),
}
}

/// Create a new batch for adding data.
///
/// A batch is a context in which temp tags are created and data is added to the node. Temp tags
/// are automatically deleted when the batch is dropped, leading to the data being garbage collected
/// unless a permanent tag is created for it.
pub async fn batch(&self) -> Result<Batch> {
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
let rpc = self.rpc.clone();
Ok(Batch::new(batch, rpc, updates, 1024))
}

/// Stream the contents of a a single blob.
///
/// Returns a [`Reader`], which can report the size of the blob before reading it.
Expand Down Expand Up @@ -422,17 +457,6 @@ impl Client {
Ok(ticket)
}

/// Get the status of a blob.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
// TODO: this could be implemented more efficiently
let reader = self.read(hash).await?;
if reader.is_complete {
Ok(BlobStatus::Complete { size: reader.size })
} else {
Ok(BlobStatus::Partial { size: reader.size })
}
}

fn tags_client(&self) -> tags::Client {
tags::Client {
rpc: self.rpc.clone(),
Expand All @@ -447,9 +471,10 @@ impl SimpleStore for Client {
}

/// Whether to wrap the added data in a collection.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub enum WrapOption {
/// Do not wrap the file or directory.
#[default]
NoWrap,
/// Wrap the file or directory in a collection.
Wrap {
Expand All @@ -459,12 +484,14 @@ pub enum WrapOption {
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
/// The blob is not stored at all.
NotFound,
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
size: u64,
size: BaoBlobSize,
},
/// The blob is stored completely.
Complete {
Expand Down Expand Up @@ -941,7 +968,6 @@ pub enum DownloadMode {
mod tests {
use super::*;

use anyhow::Context as _;
use rand::RngCore;
use tokio::io::AsyncWriteExt;

Expand Down
Loading

0 comments on commit a372bd6

Please sign in to comment.