Skip to content

Commit

Permalink
WIP add more rich tags api
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 3, 2024
1 parent bf0baea commit 08ccd31
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 31 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub enum AddProgress {

/// Progress updates for the batch add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchAddProgress {
pub enum BatchAddPathProgress {
/// An item was found with the given size
Found {
/// The size of the entry in bytes.
Expand Down
6 changes: 3 additions & 3 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::Collection,
get::db::DownloadProgress as BytesDownloadProgress,
provider::BatchAddProgress,
provider::BatchAddPathProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::TagDrop,
BlobFormat, Hash, HashAndFormat, Tag, TempTag,
Expand Down Expand Up @@ -504,10 +504,10 @@ impl<C: ServiceConnection<RpcService>> Batch<C> {
let mut res = None;
while let Some(item) = stream.next().await {
match item?.0 {
BatchAddProgress::Abort(cause) => {
BatchAddPathProgress::Abort(cause) => {
Err(cause)?;
}
BatchAddProgress::Done { hash } => {
BatchAddPathProgress::Done { hash } => {
res = Some(hash);
}
_ => {}
Expand Down
31 changes: 27 additions & 4 deletions iroh/src/client/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
use anyhow::Result;
use futures_lite::{Stream, StreamExt};
use iroh_blobs::{BlobFormat, Hash, Tag};
use iroh_blobs::{BlobFormat, Hash, HashAndFormat, Tag};
use quic_rpc::{RpcClient, ServiceConnection};
use serde::{Deserialize, Serialize};

use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest, RpcService};
use crate::rpc_protocol::{CreateTagRequest, ListTagsRequest, RpcService, SetTagRequest};

/// Iroh tags client.
#[derive(Debug, Clone)]
Expand All @@ -24,10 +24,33 @@ where
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}

/// Create a tag, where the name is automatically generated.
///
/// Use this method if you want a new tag with a unique name.
pub async fn create(&self, value: HashAndFormat) -> Result<Tag> {
Ok(self.rpc.rpc(CreateTagRequest { value }).await??)
}

/// Set a tag to a value, overwriting any existing value.
///
/// Setting the value to `None` deletes the tag. Setting the value to `Some` creates or updates the tag.
pub async fn set_opt(&self, name: Tag, value: Option<HashAndFormat>) -> Result<()> {
self.rpc.rpc(SetTagRequest { name, value }).await??;
Ok(())
}

/// Set a tag to a value, overwriting any existing value.
///
/// This is a convenience wrapper around `set_opt`.
pub async fn set(&self, name: Tag, value: HashAndFormat) -> Result<()> {
self.set_opt(name, Some(value)).await
}

/// Delete a tag.
///
/// This is a convenience wrapper around `set_opt`.
pub async fn delete(&self, name: Tag) -> Result<()> {
self.rpc.rpc(DeleteTagRequest { name }).await??;
Ok(())
self.set_opt(name, None).await
}
}

Expand Down
43 changes: 25 additions & 18 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ use iroh_blobs::export::ExportProgress;
use iroh_blobs::format::collection::Collection;
use iroh_blobs::get::db::DownloadProgress;
use iroh_blobs::get::Stats;
use iroh_blobs::provider::BatchAddProgress;
use iroh_blobs::provider::BatchAddPathProgress;
use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry};
use iroh_blobs::util::progress::ProgressSender;
use iroh_blobs::BlobFormat;
use iroh_blobs::{
hashseq::parse_hash_seq,
provider::AddProgress,
store::{Store as BaoStore, ValidateProgress},
util::progress::FlumeProgressSender,
HashAndFormat,
};
use iroh_blobs::{BlobFormat, Tag};
use iroh_io::AsyncSliceReader;
use iroh_net::relay::RelayUrl;
use iroh_net::{Endpoint, NodeAddr, NodeId};
Expand All @@ -47,12 +47,12 @@ use crate::rpc_protocol::{
BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse,
BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest,
BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest,
CreateTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest,
DocImportFileResponse, DocSetHashRequest, ListTagsRequest, NodeAddrRequest,
NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest,
NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, NodeWatchResponse,
Request, RpcService, SetTagOption,
Request, RpcService, SetTagOption, SetTagRequest,
};

use super::NodeInner;
Expand Down Expand Up @@ -112,10 +112,11 @@ impl<D: BaoStore> Handler<D> {
.await
}
ListTags(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_tags)
chan.server_streaming(msg, handler, Self::tags_list_tags)
.await
}
DeleteTag(msg) => chan.rpc(msg, handler, Self::blob_delete_tag).await,
SetTag(msg) => chan.rpc(msg, handler, Self::tags_set_tag).await,
CreateTag(msg) => chan.rpc(msg, handler, Self::tags_create_tag).await,
BlobDeleteBlob(msg) => chan.rpc(msg, handler, Self::blob_delete_blob).await,
BlobAddPath(msg) => {
chan.server_streaming(msg, handler, Self::blob_add_from_path)
Expand Down Expand Up @@ -426,18 +427,22 @@ impl<D: BaoStore> Handler<D> {
})
}

async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> {
self.inner.db.set_tag(msg.name, None).await?;
async fn blob_delete_blob(self, msg: BlobDeleteBlobRequest) -> RpcResult<()> {
self.inner.db.delete(vec![msg.hash]).await?;
Ok(())
}

async fn blob_delete_blob(self, msg: BlobDeleteBlobRequest) -> RpcResult<()> {
self.inner.db.delete(vec![msg.hash]).await?;
async fn tags_set_tag(self, msg: SetTagRequest) -> RpcResult<()> {
self.inner.db.set_tag(msg.name, None).await?;
Ok(())
}

fn blob_list_tags(self, _msg: ListTagsRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
tracing::info!("blob_list_tags");
async fn tags_create_tag(self, msg: CreateTagRequest) -> RpcResult<Tag> {
let tag = self.inner.db.create_tag(msg.value).await?;
Ok(tag)
}

fn tags_list_tags(self, _msg: ListTagsRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
Gen::new(|co| async move {
let tags = self.inner.db.tags().await.unwrap();
#[allow(clippy::manual_flatten)]
Expand Down Expand Up @@ -793,18 +798,18 @@ impl<D: BaoStore> Handler<D> {
async fn batch_add_from_path0(
self,
msg: BatchAddPathRequest,
progress: flume::Sender<BatchAddProgress>,
progress: flume::Sender<BatchAddPathProgress>,
) -> anyhow::Result<()> {
use iroh_blobs::store::ImportMode;

let progress = FlumeProgressSender::new(progress);
// convert import progress to provide progress
let import_progress = progress.clone().with_filter_map(move |x| match x {
ImportProgress::Size { size, .. } => Some(BatchAddProgress::Found { size }),
ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }),
ImportProgress::OutboardProgress { offset, .. } => {
Some(BatchAddProgress::Progress { offset })
Some(BatchAddPathProgress::Progress { offset })
}
ImportProgress::OutboardDone { hash, .. } => Some(BatchAddProgress::Done { hash }),
ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }),
_ => None,
});
let BatchAddPathRequest {
Expand Down Expand Up @@ -834,7 +839,7 @@ impl<D: BaoStore> Handler<D> {
let hash = *tag.hash();
self.inner.blob_scopes.lock().unwrap().store(scope, tag);

progress.send(BatchAddProgress::Done { hash }).await?;
progress.send(BatchAddPathProgress::Done { hash }).await?;
Ok(())
}

Expand Down Expand Up @@ -956,7 +961,9 @@ impl<D: BaoStore> Handler<D> {
let tx2 = tx.clone();
self.rt().spawn_pinned(|| async move {
if let Err(e) = self.batch_add_from_path0(msg, tx).await {
tx2.send_async(BatchAddProgress::Abort(e.into())).await.ok();
tx2.send_async(BatchAddPathProgress::Abort(e.into()))
.await
.ok();
}
});
rx.into_stream().map(BatchAddPathResponse)
Expand Down
25 changes: 20 additions & 5 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use iroh_base::node_addr::AddrInfoOptions;
pub use iroh_blobs::{export::ExportProgress, get::db::DownloadProgress, BlobFormat, Hash};
use iroh_blobs::{
format::collection::Collection,
provider::BatchAddProgress,
provider::BatchAddPathProgress,
store::{BaoBlobSize, ConsistencyCheckProgress},
util::Tag,
HashAndFormat,
Expand Down Expand Up @@ -273,15 +273,28 @@ impl RpcMsg<RpcService> for BlobDeleteBlobRequest {

/// Delete a tag
#[derive(Debug, Serialize, Deserialize)]
pub struct DeleteTagRequest {
pub struct SetTagRequest {
/// Name of the tag
pub name: Tag,
/// Value of the tag, None to delete
pub value: Option<HashAndFormat>,
}

impl RpcMsg<RpcService> for DeleteTagRequest {
impl RpcMsg<RpcService> for SetTagRequest {
type Response = RpcResult<()>;
}

/// Create a tag
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTagRequest {
/// Value of the tag
pub value: HashAndFormat,
}

impl RpcMsg<RpcService> for CreateTagRequest {
type Response = RpcResult<Tag>;
}

/// Get a collection
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobGetCollectionRequest {
Expand Down Expand Up @@ -1093,7 +1106,7 @@ pub struct BatchAddPathRequest {

/// Response to a batch add path request
#[derive(Serialize, Deserialize, Debug)]
pub struct BatchAddPathResponse(pub BatchAddProgress);
pub struct BatchAddPathResponse(pub BatchAddPathProgress);

impl Msg<RpcService> for BatchAddPathRequest {
type Pattern = ServerStreaming;
Expand Down Expand Up @@ -1166,7 +1179,8 @@ pub enum Request {
BatchAddStreamUpdate(BatchAddStreamUpdate),
BatchAddPath(BatchAddPathRequest),

DeleteTag(DeleteTagRequest),
SetTag(SetTagRequest),
CreateTag(CreateTagRequest),
ListTags(ListTagsRequest),

DocOpen(DocOpenRequest),
Expand Down Expand Up @@ -1234,6 +1248,7 @@ pub enum Response {

ListTags(TagInfo),
DeleteTag(RpcResult<()>),
CreateTag(RpcResult<Tag>),

DocOpen(RpcResult<DocOpenResponse>),
DocClose(RpcResult<DocCloseResponse>),
Expand Down
52 changes: 52 additions & 0 deletions iroh/tests/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,55 @@ async fn test_batch_create_2() -> anyhow::Result<()> {
assert!(client.read_to_bytes(hash).await.is_err());
Ok(())
}

#[tokio::test]
async fn test_batch_create_from_path_1() -> anyhow::Result<()> {
let node = create_node().await?;
let client = &node.client().blobs;
let batch = client.batch().await?;
let dir = tempfile::tempdir()?;
let expected_data: &[u8] = b"test";
let expected_hash = blake3::hash(expected_data).into();
let temp_path = dir.path().join("test");
std::fs::write(&temp_path, expected_data)?;
let tag = batch
.add_from_path(temp_path, false, BlobFormat::Raw)
.await?;
let hash = *tag.hash();
assert_eq!(hash, expected_hash);
// Check that the store has the data and that it is protected from gc
tokio::time::sleep(Duration::from_millis(50)).await;
let data = client.read_to_bytes(hash).await?;
assert_eq!(data.as_ref(), expected_data);
drop(tag);
// Check that the store drops the data when the temp tag gets dropped
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(client.read_to_bytes(hash).await.is_err());
Ok(())
}

#[tokio::test]
async fn test_batch_create_from_path_2() -> anyhow::Result<()> {
let node = create_node().await?;
let client = &node.client().blobs;
let batch = client.batch().await?;
let dir = tempfile::tempdir()?;
let expected_data: &[u8] = b"test";
let expected_hash = blake3::hash(expected_data).into();
let temp_path = dir.path().join("test");
std::fs::write(&temp_path, expected_data)?;
let tag = batch
.add_from_path(temp_path, false, BlobFormat::Raw)
.await?;
let hash = *tag.hash();
assert_eq!(hash, expected_hash);
// Check that the store has the data and that it is protected from gc
tokio::time::sleep(Duration::from_millis(50)).await;
let data = client.read_to_bytes(hash).await?;
assert_eq!(data.as_ref(), expected_data);
drop(batch);
// Check that the store drops the data when the temp tag gets dropped
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(client.read_to_bytes(hash).await.is_err());
Ok(())
}

0 comments on commit 08ccd31

Please sign in to comment.