Skip to content

Commit

Permalink
feat(iroh): add blobs.create_collection api endpoint
Browse files Browse the repository at this point in the history
Closes #1950
  • Loading branch information
dignifiedquire committed Jan 17, 2024
1 parent 0084b5f commit 73f2f34
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 17 deletions.
5 changes: 5 additions & 0 deletions iroh-bytes/src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ impl Collection {
pub fn is_empty(&self) -> bool {
self.blobs.is_empty()
}

/// Add the given blob to the collection.
pub fn push(&mut self, name: String, hash: Hash) {
self.blobs.push((name, hash));
}
}

#[cfg(test)]
Expand Down
103 changes: 94 additions & 9 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use anyhow::{anyhow, Context as AnyhowContext, Result};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use iroh_bytes::format::collection::Collection;
use iroh_bytes::provider::AddProgress;
use iroh_bytes::store::ValidateProgress;
// use iroh_bytes::util::progress::FlumeProgressSender;
Expand All @@ -37,15 +38,15 @@ use crate::rpc_protocol::{
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest,
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest,
CounterStats, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest,
DocDelResponse, DocDropRequest, DocExportFileRequest, DocExportProgress,
DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest,
DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest,
DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest,
DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress,
ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse,
NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest,
NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest,
DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest,
DocExportFileRequest, DocExportProgress, DocGetDownloadPolicyRequest, DocGetExactRequest,
DocGetManyRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest,
DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest,
DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket,
DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest,
NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
};
use crate::sync_engine::SyncEvent;

Expand Down Expand Up @@ -279,6 +280,19 @@ where
Ok(BlobAddProgress::new(stream))
}

/// Create a collection from already existing blobs.
pub async fn create_collection(
&self,
collection: Collection,
tag: SetTagOption,
) -> anyhow::Result<(Hash, Tag)> {
let CreateCollectionResponse { hash, tag } = self
.rpc
.rpc(CreateCollectionRequest { collection, tag })
.await??;
Ok((hash, tag))
}

/// Write a blob by passing an async reader.
pub async fn add_reader(
&self,
Expand Down Expand Up @@ -1219,4 +1233,75 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_blob_create_collection() -> Result<()> {
let _guard = iroh_test::logging::setup();

let doc_store = iroh_sync::store::memory::Store::default();
let db = iroh_bytes::store::mem::Store::new();
let node = crate::node::Node::builder(db, doc_store).spawn().await?;

// create temp file
let temp_dir = tempfile::tempdir().context("tempdir")?;

let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;

let mut paths = Vec::new();
for i in 0..5 {
let path = in_root.join(format!("test-{i}"));
let size = 100;
let mut buf = vec![0u8; size];
rand::thread_rng().fill_bytes(&mut buf);
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;
paths.push(path);
}

let client = node.client();

let mut collection = Collection::default();
// import files
for path in &paths {
let import_outcome = client
.blobs
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;

collection.push(
path.file_name().unwrap().to_str().unwrap().to_string(),
import_outcome.hash,
);
}

let (hash, tag) = client
.blobs
.create_collection(collection, SetTagOption::Auto)
.await?;

let collections: Vec<_> = client.blobs.list_collections().await?.try_collect().await?;

assert_eq!(collections.len(), 1);
assert_eq!(collections[0].tag, tag);
assert_eq!(collections[0].hash, hash);
// 5 blobs + 1 meta
assert_eq!(collections[0].total_blobs_count, Some(5 + 1));

Ok(())
}
}
38 changes: 31 additions & 7 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ use crate::rpc_protocol::{
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest,
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest,
DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocExportProgress,
DocImportFileRequest, DocImportFileResponse, DocImportProgress, DocSetHashRequest,
DownloadLocation, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse,
NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest,
NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, ProviderResponse,
ProviderService, SetTagOption,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest,
DocExportFileResponse, DocExportProgress, DocImportFileRequest, DocImportFileResponse,
DocImportProgress, DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse,
NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse,
NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest,
ProviderResponse, ProviderService, SetTagOption,
};
use crate::sync_engine::{SyncEngine, SYNC_ALPN};
use crate::ticket::BlobTicket;
Expand Down Expand Up @@ -1512,6 +1512,29 @@ impl<D: BaoStore> RpcHandler<D> {
let conn_info = self.inner.endpoint.connection_info(node_id).await?;
Ok(NodeConnectionInfoResponse { conn_info })
}

async fn create_collection(
self,
req: CreateCollectionRequest,
) -> RpcResult<CreateCollectionResponse> {
let CreateCollectionRequest { collection, tag } = req;

let temp_tag = collection.store(&self.inner.db).await?;
let hash_and_format = temp_tag.inner();
let HashAndFormat { hash, .. } = *hash_and_format;
let tag = match tag {
SetTagOption::Named(tag) => {
self.inner
.db
.set_tag(tag.clone(), Some(*hash_and_format))
.await?;
tag
}
SetTagOption::Auto => self.inner.db.create_tag(*hash_and_format).await?,
};

Ok(CreateCollectionResponse { hash, tag })
}
}

fn handle_rpc_request<D: BaoStore, E: ServiceEndpoint<ProviderService>>(
Expand Down Expand Up @@ -1551,6 +1574,7 @@ fn handle_rpc_request<D: BaoStore, E: ServiceEndpoint<ProviderService>>(
chan.server_streaming(msg, handler, RpcHandler::blob_list_collections)
.await
}
CreateCollection(msg) => chan.rpc(msg, handler, RpcHandler::create_collection).await,
ListTags(msg) => {
chan.server_streaming(msg, handler, RpcHandler::blob_list_tags)
.await
Expand Down
26 changes: 25 additions & 1 deletion iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf};

use bytes::Bytes;
use derive_more::{From, TryInto};
use iroh_bytes::util::Tag;
use iroh_bytes::{format::collection::Collection, util::Tag};
pub use iroh_bytes::{get::db::DownloadProgress, BlobFormat, Hash};
use iroh_net::{
key::PublicKey,
Expand Down Expand Up @@ -274,6 +274,28 @@ impl RpcMsg<ProviderService> for DeleteTagRequest {
type Response = RpcResult<()>;
}

/// Create a collection.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateCollectionRequest {
/// The collection
pub collection: Collection,
/// Tag option.
pub tag: SetTagOption,
}

/// A response to a create collection request
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateCollectionResponse {
/// The resulting hash.
pub hash: Hash,
/// The resulting tag.
pub tag: Tag,
}

impl RpcMsg<ProviderService> for CreateCollectionRequest {
type Response = RpcResult<CreateCollectionResponse>;
}

/// List connection information about all the nodes we know about
///
/// These can be nodes that we have explicitly connected to or nodes
Expand Down Expand Up @@ -1021,6 +1043,7 @@ pub enum ProviderRequest {
BlobListCollections(BlobListCollectionsRequest),
BlobDeleteBlob(BlobDeleteBlobRequest),
BlobValidate(BlobValidateRequest),
CreateCollection(CreateCollectionRequest),

DeleteTag(DeleteTagRequest),
ListTags(ListTagsRequest),
Expand Down Expand Up @@ -1070,6 +1093,7 @@ pub enum ProviderResponse {
BlobListIncomplete(BlobListIncompleteResponse),
BlobListCollections(BlobListCollectionsResponse),
BlobValidate(ValidateProgress),
CreateCollection(RpcResult<CreateCollectionResponse>),

ListTags(ListTagsResponse),
DeleteTag(RpcResult<()>),
Expand Down

0 comments on commit 73f2f34

Please sign in to comment.