Skip to content

Commit

Permalink
refactor(iroh-blobs)!: implement some collection related things on th…
Browse files Browse the repository at this point in the history
…e client side (#2349)

## Description

A collection is just one particular way to use a hashseq, so it feels a
bit weird to have it baked in to the iroh node. With this we can move
some of it into the client.

This is a part of #2272 . We can
make more similar changes once we have the batch API
#2339 .

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

Note: I closed #2272 because half of the changes in that PR are here,
the other half will be part of the batch PR, and moving collections into
iroh I am not convinced of yet...

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
rklaehn authored Jun 6, 2024
1 parent 98914ee commit b047b28
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 131 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn export_collection<D: BaoStore>(
progress: impl ProgressSender<Msg = ExportProgress> + IdGenerator,
) -> anyhow::Result<()> {
tokio::fs::create_dir_all(&outpath).await?;
let collection = Collection::load(db, &hash).await?;
let collection = Collection::load_db(db, &hash).await?;
for (name, hash) in collection.into_iter() {
#[allow(clippy::needless_borrow)]
let path = outpath.join(pathbuf_from_name(&name));
Expand Down
26 changes: 23 additions & 3 deletions iroh-blobs/src/format/collection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! The collection type used by iroh
use std::collections::BTreeMap;
use std::{collections::BTreeMap, future::Future};

use anyhow::Context;
use bao_tree::blake3;
Expand Down Expand Up @@ -64,6 +64,12 @@ impl IntoIterator for Collection {
}
}

/// A simple store trait for loading blobs
pub trait SimpleStore {
/// Load a blob from the store
fn load(&self, hash: Hash) -> impl Future<Output = anyhow::Result<Bytes>> + Send + '_;
}

/// Metadata for a collection
///
/// This is the wire format for the metadata blob.
Expand All @@ -84,7 +90,7 @@ impl Collection {
///
/// To persist the collection, write all the blobs to storage, and use the
/// hash of the last blob as the collection hash.
pub fn to_blobs(&self) -> impl Iterator<Item = Bytes> {
pub fn to_blobs(&self) -> impl DoubleEndedIterator<Item = Bytes> {
let meta = CollectionMeta {
header: *Self::HEADER,
names: self.names(),
Expand Down Expand Up @@ -160,11 +166,25 @@ impl Collection {
Ok((collection, res, stats))
}

/// Create a new collection from a hash sequence and metadata.
pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result<Self> {
let hs = store.load(root).await?;
let hs = HashSeq::try_from(hs)?;
let meta_hash = hs.iter().next().context("empty hash seq")?;
let meta = store.load(meta_hash).await?;
let meta: CollectionMeta = postcard::from_bytes(&meta)?;
anyhow::ensure!(
meta.names.len() + 1 == hs.len(),
"names and links length mismatch"
);
Ok(Self::from_parts(hs.into_iter(), meta))
}

/// Load a collection from a store given a root hash
///
/// This assumes that both the links and the metadata of the collection is stored in the store.
/// It does not require that all child blobs are stored in the store.
pub async fn load<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
pub async fn load_db<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
where
D: crate::store::Map,
{
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl ListCommands {
}
}
Self::Collections => {
let mut response = iroh.blobs.list_collections().await?;
let mut response = iroh.blobs.list_collections()?;
while let Some(item) = response.next().await {
let CollectionInfo {
tag,
Expand Down
60 changes: 46 additions & 14 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
use genawaiter::sync::{Co, Gen};
use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket};
use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::Collection,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
BlobFormat, Hash, Tag,
Expand All @@ -31,13 +32,12 @@ use tracing::warn;

use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption,
};

use super::{flatten, Iroh};
use super::{flatten, tags, Iroh};

/// Iroh blobs client.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -322,18 +322,35 @@ where

/// Read the content of a collection.
pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
let BlobGetCollectionResponse { collection } =
self.rpc.rpc(BlobGetCollectionRequest { hash }).await??;
Ok(collection)
Collection::load(hash, self).await
}

/// List all collections.
pub async fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
let stream = self
.rpc
.server_streaming(BlobListCollectionsRequest)
.await?;
Ok(flatten(stream))
pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
let this = self.clone();
Ok(Gen::new(|co| async move {
if let Err(cause) = this.list_collections_impl(&co).await {
co.yield_(Err(cause)).await;
}
}))
}

async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
let tags = self.tags_client();
let mut tags = tags.list_hash_seq().await?;
while let Some(tag) = tags.next().await {
let tag = tag?;
if let Ok(collection) = self.get_collection(tag.hash).await {
let info = CollectionInfo {
tag: tag.name,
hash: tag.hash,
total_blobs_count: Some(collection.len() as u64 + 1),
total_blobs_size: Some(0),
};
co.yield_(Ok(info)).await;
}
}
Ok(())
}

/// Delete a blob.
Expand Down Expand Up @@ -366,6 +383,21 @@ where
Ok(BlobStatus::Partial { size: reader.size })
}
}

fn tags_client(&self) -> tags::Client<C> {
tags::Client {
rpc: self.rpc.clone(),
}
}
}

impl<C> SimpleStore for Client<C>
where
C: ServiceConnection<RpcService>,
{
async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
self.read_to_bytes(hash).await
}
}

/// Whether to wrap the added data in a collection.
Expand Down Expand Up @@ -929,7 +961,7 @@ mod tests {
.create_collection(collection, SetTagOption::Auto, tags)
.await?;

let collections: Vec<_> = client.blobs.list_collections().await?.try_collect().await?;
let collections: Vec<_> = client.blobs.list_collections()?.try_collect().await?;

assert_eq!(collections.len(), 1);
{
Expand Down
11 changes: 10 additions & 1 deletion iroh/src/client/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ where
{
/// List all tags.
pub async fn list(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
let stream = self.rpc.server_streaming(ListTagsRequest).await?;
let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?;
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}

/// List all tags with a hash_seq format.
pub async fn list_hash_seq(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
let stream = self
.rpc
.server_streaming(ListTagsRequest::hash_seq())
.await?;
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}

Expand Down
79 changes: 6 additions & 73 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress,
use iroh_blobs::util::progress::ProgressSender;
use iroh_blobs::BlobFormat;
use iroh_blobs::{
hashseq::parse_hash_seq,
provider::AddProgress,
store::{Store as BaoStore, ValidateProgress},
util::progress::FlumeProgressSender,
Expand All @@ -33,16 +32,13 @@ use quic_rpc::{
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, info};

use crate::client::blobs::{
BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption,
};
use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption};
use crate::client::tags::TagInfo;
use crate::client::NodeStatus;
use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse,
BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest,
BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest,
DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest,
Expand Down Expand Up @@ -95,12 +91,7 @@ impl<D: BaoStore> Handler<D> {
chan.server_streaming(msg, handler, Self::blob_list_incomplete)
.await
}
BlobListCollections(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_collections)
.await
}
CreateCollection(msg) => chan.rpc(msg, handler, Self::create_collection).await,
BlobGetCollection(msg) => chan.rpc(msg, handler, Self::blob_get_collection).await,
ListTags(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_tags)
.await
Expand Down Expand Up @@ -348,39 +339,6 @@ impl<D: BaoStore> Handler<D> {
Ok(())
}

async fn blob_list_collections_impl(
self,
co: &Co<RpcResult<CollectionInfo>>,
) -> anyhow::Result<()> {
let db = self.inner.db.clone();
let local = self.inner.rt.clone();
let tags = db.tags().await.unwrap();
for item in tags {
let (name, HashAndFormat { hash, format }) = item?;
if !format.is_hash_seq() {
continue;
}
let Some(entry) = db.get(&hash).await? else {
continue;
};
let count = local
.spawn_pinned(|| async move {
let reader = entry.data_reader().await?;
let (_collection, count) = parse_hash_seq(reader).await?;
anyhow::Ok(count)
})
.await??;
co.yield_(Ok(CollectionInfo {
tag: name,
hash,
total_blobs_count: Some(count),
total_blobs_size: None,
}))
.await;
}
Ok(())
}

fn blob_list(
self,
_msg: BlobListRequest,
Expand All @@ -403,17 +361,6 @@ impl<D: BaoStore> Handler<D> {
})
}

fn blob_list_collections(
self,
_msg: BlobListCollectionsRequest,
) -> impl Stream<Item = RpcResult<CollectionInfo>> + Send + 'static {
Gen::new(move |co| async move {
if let Err(e) = self.blob_list_collections_impl(&co).await {
co.yield_(Err(e.into())).await;
}
})
}

async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> {
self.inner.db.set_tag(msg.name, None).await?;
Ok(())
Expand All @@ -424,15 +371,16 @@ impl<D: BaoStore> Handler<D> {
Ok(())
}

fn blob_list_tags(self, _msg: ListTagsRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
fn blob_list_tags(self, msg: ListTagsRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
tracing::info!("blob_list_tags");
Gen::new(|co| async move {
let tags = self.inner.db.tags().await.unwrap();
#[allow(clippy::manual_flatten)]
for item in tags {
if let Ok((name, HashAndFormat { hash, format })) = item {
tracing::info!("{:?} {} {:?}", name, hash, format);
co.yield_(TagInfo { name, hash, format }).await;
if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) {
co.yield_(TagInfo { name, hash, format }).await;
}
}
}
})
Expand Down Expand Up @@ -1044,21 +992,6 @@ impl<D: BaoStore> Handler<D> {

Ok(CreateCollectionResponse { hash, tag })
}

async fn blob_get_collection(
self,
req: BlobGetCollectionRequest,
) -> RpcResult<BlobGetCollectionResponse> {
let hash = req.hash;
let db = self.inner.db.clone();
let collection = self
.rt()
.spawn_pinned(move || async move { Collection::load(&db, &hash).await })
.await
.map_err(|_| anyhow!("join failed"))??;

Ok(BlobGetCollectionResponse { collection })
}
}

async fn download<D>(
Expand Down
Loading

0 comments on commit b047b28

Please sign in to comment.