Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 4, 2024
1 parent 08ccd31 commit 978587a
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 92 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Collection {
///
/// To persist the collection, write all the blobs to storage, and use the
/// hash of the last blob as the collection hash.
pub fn to_blobs(&self) -> impl Iterator<Item = Bytes> {
pub fn to_blobs(&self) -> impl DoubleEndedIterator<Item = Bytes> {
let meta = CollectionMeta {
header: *Self::HEADER,
names: self.names(),
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ pub enum ImportProgress {
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
/// This mode will copy the file into the database before hashing.
///
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/net/interfaces/bsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl WireFormat {

Ok(Some(WireMessage::Route(m)))
}
#[cfg(any(target_os = "openbsd",))]
#[cfg(target_os = "openbsd")]
MessageType::Route => {
if data.len() < self.body_off {
return Err(RouteError::MessageTooShort);
Expand Down
217 changes: 170 additions & 47 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{

use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_buffered::BufferedStreamExt;
use futures_lite::{Stream, StreamExt};
use futures_util::{FutureExt, SinkExt};
use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket};
Expand All @@ -36,16 +37,18 @@ use tracing::warn;

use crate::rpc_protocol::{
BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate,
BatchCreateRequest, BatchCreateResponse, BatchUpdate, BlobAddPathRequest, BlobAddStreamRequest,
BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest,
BlobExportRequest, BlobGetCollectionRequest, BlobGetCollectionResponse,
BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest,
BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse,
NodeStatusRequest, RpcService, SetTagOption,
BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, BatchUpdate,
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption,
};

use super::{flatten, Iroh};

pub use iroh_blobs::store::ImportMode;

/// Iroh blobs client.
#[derive(Debug, Clone)]
pub struct Client<C> {
Expand Down Expand Up @@ -399,8 +402,12 @@ struct BatchInner<C: ServiceConnection<RpcService>> {
updates: Mutex<UpdateSink<RpcService, C, BatchUpdate>>,
}

/// A batch for write operations.
///
/// This serves mostly as a scope for temporary tags.
///
/// It is not a transaction, so things in a batch are not atomic. Also, there is
/// no isolation between batches.
#[derive(derive_more::Debug)]
pub struct Batch<C: ServiceConnection<RpcService>>(Arc<BatchInner<C>>);

Expand All @@ -412,6 +419,120 @@ impl<C: ServiceConnection<RpcService>> TagDrop for BatchInner<C> {
}

impl<C: ServiceConnection<RpcService>> Batch<C> {
/// Write a blob by passing bytes.
pub async fn add_bytes(&self, bytes: impl Into<Bytes>, format: BlobFormat) -> Result<TempTag> {
let input = futures_lite::stream::once(Ok(bytes.into()));
self.add_stream(input, format).await
}

/// Import a blob from a filesystem path.
///
/// `path` should be an absolute path valid for the file system on which
/// the node runs, which refers to a file.
///
/// If `import_mode` is TryReference, Iroh will assume that the data will not
/// change and will share it in place without copying to the Iroh data directory
/// if appropriate. However, for tiny files, Iroh will copy the data.
///
/// If `import_mode` is Copy, Iroh will always copy the data.
///
/// Will return a temp tag for the added blob, as well as the size of the file.
pub async fn add_file(
&self,
path: PathBuf,
import_mode: ImportMode,
format: BlobFormat,
) -> Result<(TempTag, u64)> {
anyhow::ensure!(
path.is_absolute(),
"Path must be absolute, but got: {:?}",
path
);
anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path);
let mut stream = self
.0
.rpc
.server_streaming(BatchAddPathRequest {
path,
import_mode,
format,
scope: self.0.scope,
})
.await?;
let mut res_hash = None;
let mut res_size = None;
while let Some(item) = stream.next().await {
match item?.0 {
BatchAddPathProgress::Abort(cause) => {
Err(cause)?;
}
BatchAddPathProgress::Done { hash } => {
res_hash = Some(hash);
}
BatchAddPathProgress::Found { size } => {
res_size = Some(size);
}
_ => {}
}
}
let hash = res_hash.context("Missing hash")?;
let size = res_size.context("Missing size")?;
Ok((self.local_temp_tag(HashAndFormat { hash, format }), size))
}

/// Add a directory as a hashseq in collection format
pub async fn add_dir(
&self,
root: PathBuf,
import_mode: ImportMode,
wrap: WrapOption,
) -> Result<TempTag> {
anyhow::ensure!(root.is_absolute(), "Path must be absolute",);
anyhow::ensure!(root.is_dir(), "Path must be a directory",);

// let (send, recv) = flume::bounded(32);
// let import_progress = FlumeProgressSender::new(send);

// import all files below root recursively
let data_sources = crate::util::fs::scan_path(root, wrap)?;
const IO_PARALLELISM: usize = 4;
let result: Vec<_> = futures_lite::stream::iter(data_sources)
.map(|source| {
// let import_progress = import_progress.clone();
async move {
let name = source.name().to_string();
let (tag, size) = self
.add_file(source.path().to_owned(), import_mode, BlobFormat::Raw)
.await?;
let hash = *tag.hash();
anyhow::Ok((name, hash, size, tag))
}
})
.buffered_ordered(IO_PARALLELISM)
.try_collect()
.await?;
println!("{:?}", result);

// create a collection
let (collection, child_tags): (Collection, Vec<_>) = result
.into_iter()
.map(|(name, hash, _, tag)| ((name, hash), tag))
.unzip();

let tag = self.add_collection(collection).await?;
drop(child_tags);
Ok(tag)
}

/// Add a collection
///
/// This is a convenience function that converts the collection into two blobs
/// (the metadata and the hash sequence) and adds them, returning a temp tag for
/// the hash sequence.
pub async fn add_collection(&self, collection: Collection) -> Result<TempTag> {
self.add_blob_seq(collection.to_blobs()).await
}

/// Write a blob by passing an async reader.
pub async fn add_reader(
&self,
Expand All @@ -423,12 +544,6 @@ impl<C: ServiceConnection<RpcService>> Batch<C> {
self.add_stream(input, format).await
}

/// Write a blob by passing bytes.
pub async fn add_bytes(&self, bytes: impl Into<Bytes>, format: BlobFormat) -> Result<TempTag> {
let input = futures_lite::stream::once(Ok(bytes.into()));
self.add_stream(input, format).await
}

/// Write a blob by passing a stream of bytes.
pub async fn add_stream(
&self,
Expand Down Expand Up @@ -473,51 +588,59 @@ impl<C: ServiceConnection<RpcService>> Batch<C> {
BatchAddStreamResponse::Result { hash } => {
res = Some(hash);
}
_ => {}
}
}
let hash = res.context("Missing answer")?;
Ok(self.temp_tag(HashAndFormat { hash, format }))
println!(
"creating temp tag with hash {:?} and format {}",
hash, format
);
Ok(self.local_temp_tag(HashAndFormat { hash, format }))
}

/// Import a blob from a filesystem path.
/// Add a sequence of blobs, where the last is a hash sequence.
///
/// `path` should be an absolute path valid for the file system on which
/// the node runs.
/// If `in_place` is true, Iroh will assume that the data will not change and will share it in
/// place without copying to the Iroh data directory.
pub async fn add_from_path(
&self,
path: PathBuf,
in_place: bool,
format: BlobFormat,
) -> Result<TempTag> {
let mut stream = self
.0
/// It is a common pattern in iroh to have a hash sequence with one or more
/// blobs of metadata, and the remaining blobs being the actual data. E.g.
/// a collection is a hash sequence where the first child is the metadata.
pub async fn add_blob_seq(&self, iter: impl Iterator<Item = Bytes>) -> Result<TempTag> {
let mut blobs = iter.peekable();
let mut res = vec![];
let res = loop {
let blob = blobs.next().context("Failed to get next blob")?;
if blobs.peek().is_none() {
println!("last blob");
break self.add_bytes(blob, BlobFormat::HashSeq).await?;
} else {
res.push(self.add_bytes(blob, BlobFormat::Raw).await?);
}
};
Ok(res)
}

/// Create a temp tag to protect some content (blob or hashseq) from being deleted.
///
/// A typical use case is that you are downloading some data and want to protect it
/// from deletion while the download is ongoing, but don't want to protect it permanently
/// until the download is completed.
pub async fn temp_tag(&self, content: HashAndFormat) -> Result<TempTag> {
// Notify the server that we want one temp tag for the given content
self.0
.rpc
.server_streaming(BatchAddPathRequest {
path,
in_place,
format,
.rpc(BatchCreateTempTagRequest {
scope: self.0.scope,
content,
})
.await?;
let mut res = None;
while let Some(item) = stream.next().await {
match item?.0 {
BatchAddPathProgress::Abort(cause) => {
Err(cause)?;
}
BatchAddPathProgress::Done { hash } => {
res = Some(hash);
}
_ => {}
}
}
let hash = res.context("Missing answer")?;
Ok(self.temp_tag(HashAndFormat { hash, format }))
.await??;
// Only after success of the above call, we can create the corresponding local temp tag
Ok(self.local_temp_tag(content))
}

fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
/// Creates a temp tag for the given hash and format, without notifying the server.
///
/// Caution: only do this for data for which you know the server side has created a temp tag.
fn local_temp_tag(&self, inner: HashAndFormat) -> TempTag {
let on_drop: Arc<dyn TagDrop> = self.0.clone();
let on_drop = Some(Arc::downgrade(&on_drop));
TempTag::new(inner, on_drop)
Expand Down
7 changes: 7 additions & 0 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,19 @@ impl BlobScopes {
fn store(&mut self, scope: u64, tt: TempTag) {
let entry = self.scopes.entry(scope).or_default();
let count = entry.tags.entry(tt.hash_and_format()).or_default();
println!(
"storing tag {:?} {} in scope {}",
tt.hash(),
tt.format(),
scope
);
tt.leak();
*count += 1;
}

/// Remove a tag from a scope.
fn remove_one(&mut self, scope: u64, content: &HashAndFormat, u: Option<&dyn TagDrop>) {
println!("removing tag {:?} from scope {}", content, scope);
if let Some(scope) = self.scopes.get_mut(&scope) {
if let Some(counter) = scope.tags.get_mut(content) {
*counter -= 1;
Expand Down
Loading

0 comments on commit 978587a

Please sign in to comment.