From 08ccd3179b5838b2c69be3922bf13e855a90cb89 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 3 Jun 2024 19:08:12 +0300 Subject: [PATCH] WIP add more rich tags api --- iroh-blobs/src/provider.rs | 2 +- iroh/src/client/blobs.rs | 6 ++--- iroh/src/client/tags.rs | 31 ++++++++++++++++++++--- iroh/src/node/rpc.rs | 43 ++++++++++++++++++------------- iroh/src/rpc_protocol.rs | 25 ++++++++++++++---- iroh/tests/batch.rs | 52 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 128 insertions(+), 31 deletions(-) diff --git a/iroh-blobs/src/provider.rs b/iroh-blobs/src/provider.rs index 7e481f4c2b..fd9ce37acf 100644 --- a/iroh-blobs/src/provider.rs +++ b/iroh-blobs/src/provider.rs @@ -155,7 +155,7 @@ pub enum AddProgress { /// Progress updates for the batch add operation. #[derive(Debug, Serialize, Deserialize)] -pub enum BatchAddProgress { +pub enum BatchAddPathProgress { /// An item was found with the given size Found { /// The size of the entry in bytes. diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 6631add2e9..33be81a1cf 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -18,7 +18,7 @@ use iroh_blobs::{ export::ExportProgress as BytesExportProgress, format::collection::Collection, get::db::DownloadProgress as BytesDownloadProgress, - provider::BatchAddProgress, + provider::BatchAddPathProgress, store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, util::TagDrop, BlobFormat, Hash, HashAndFormat, Tag, TempTag, @@ -504,10 +504,10 @@ impl> Batch { let mut res = None; while let Some(item) = stream.next().await { match item?.0 { - BatchAddProgress::Abort(cause) => { + BatchAddPathProgress::Abort(cause) => { Err(cause)?; } - BatchAddProgress::Done { hash } => { + BatchAddPathProgress::Done { hash } => { res = Some(hash); } _ => {} diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index c2d4309977..926834d783 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -2,11 +2,11 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; -use iroh_blobs::{BlobFormat, Hash, Tag}; +use iroh_blobs::{BlobFormat, Hash, HashAndFormat, Tag}; use quic_rpc::{RpcClient, ServiceConnection}; use serde::{Deserialize, Serialize}; -use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest, RpcService}; +use crate::rpc_protocol::{CreateTagRequest, ListTagsRequest, RpcService, SetTagRequest}; /// Iroh tags client. #[derive(Debug, Clone)] @@ -24,10 +24,33 @@ where Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } + /// Create a tag, where the name is automatically generated. + /// + /// Use this method if you want a new tag with a unique name. + pub async fn create(&self, value: HashAndFormat) -> Result { + Ok(self.rpc.rpc(CreateTagRequest { value }).await??) + } + + /// Set a tag to a value, overwriting any existing value. + /// + /// Setting the value to `None` deletes the tag. Setting the value to `Some` creates or updates the tag. + pub async fn set_opt(&self, name: Tag, value: Option) -> Result<()> { + self.rpc.rpc(SetTagRequest { name, value }).await??; + Ok(()) + } + + /// Set a tag to a value, overwriting any existing value. + /// + /// This is a convenience wrapper around `set_opt`. + pub async fn set(&self, name: Tag, value: HashAndFormat) -> Result<()> { + self.set_opt(name, Some(value)).await + } + /// Delete a tag. + /// + /// This is a convenience wrapper around `set_opt`. pub async fn delete(&self, name: Tag) -> Result<()> { - self.rpc.rpc(DeleteTagRequest { name }).await??; - Ok(()) + self.set_opt(name, None).await } } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index e07de2165a..e6093abec0 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -13,10 +13,9 @@ use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; -use iroh_blobs::provider::BatchAddProgress; +use iroh_blobs::provider::BatchAddPathProgress; use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry}; use iroh_blobs::util::progress::ProgressSender; -use iroh_blobs::BlobFormat; use iroh_blobs::{ hashseq::parse_hash_seq, provider::AddProgress, @@ -24,6 +23,7 @@ use iroh_blobs::{ util::progress::FlumeProgressSender, HashAndFormat, }; +use iroh_blobs::{BlobFormat, Tag}; use iroh_io::AsyncSliceReader; use iroh_net::relay::RelayUrl; use iroh_net::{Endpoint, NodeAddr, NodeId}; @@ -47,12 +47,12 @@ use crate::rpc_protocol::{ BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, - DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, + CreateTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, NodeWatchResponse, - Request, RpcService, SetTagOption, + Request, RpcService, SetTagOption, SetTagRequest, }; use super::NodeInner; @@ -112,10 +112,11 @@ impl Handler { .await } ListTags(msg) => { - chan.server_streaming(msg, handler, Self::blob_list_tags) + chan.server_streaming(msg, handler, Self::tags_list_tags) .await } - DeleteTag(msg) => chan.rpc(msg, handler, Self::blob_delete_tag).await, + SetTag(msg) => chan.rpc(msg, handler, Self::tags_set_tag).await, + CreateTag(msg) => chan.rpc(msg, handler, Self::tags_create_tag).await, BlobDeleteBlob(msg) => chan.rpc(msg, handler, Self::blob_delete_blob).await, BlobAddPath(msg) => { chan.server_streaming(msg, handler, Self::blob_add_from_path) @@ -426,18 +427,22 @@ impl Handler { }) } - async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> { - self.inner.db.set_tag(msg.name, None).await?; + async fn blob_delete_blob(self, msg: BlobDeleteBlobRequest) -> RpcResult<()> { + self.inner.db.delete(vec![msg.hash]).await?; Ok(()) } - async fn blob_delete_blob(self, msg: BlobDeleteBlobRequest) -> RpcResult<()> { - self.inner.db.delete(vec![msg.hash]).await?; + async fn tags_set_tag(self, msg: SetTagRequest) -> RpcResult<()> { + self.inner.db.set_tag(msg.name, None).await?; Ok(()) } - fn blob_list_tags(self, _msg: ListTagsRequest) -> impl Stream + Send + 'static { - tracing::info!("blob_list_tags"); + async fn tags_create_tag(self, msg: CreateTagRequest) -> RpcResult { + let tag = self.inner.db.create_tag(msg.value).await?; + Ok(tag) + } + + fn tags_list_tags(self, _msg: ListTagsRequest) -> impl Stream + Send + 'static { Gen::new(|co| async move { let tags = self.inner.db.tags().await.unwrap(); #[allow(clippy::manual_flatten)] @@ -793,18 +798,18 @@ impl Handler { async fn batch_add_from_path0( self, msg: BatchAddPathRequest, - progress: flume::Sender, + progress: flume::Sender, ) -> anyhow::Result<()> { use iroh_blobs::store::ImportMode; let progress = FlumeProgressSender::new(progress); // convert import progress to provide progress let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::Size { size, .. } => Some(BatchAddProgress::Found { size }), + ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }), ImportProgress::OutboardProgress { offset, .. } => { - Some(BatchAddProgress::Progress { offset }) + Some(BatchAddPathProgress::Progress { offset }) } - ImportProgress::OutboardDone { hash, .. } => Some(BatchAddProgress::Done { hash }), + ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }), _ => None, }); let BatchAddPathRequest { @@ -834,7 +839,7 @@ impl Handler { let hash = *tag.hash(); self.inner.blob_scopes.lock().unwrap().store(scope, tag); - progress.send(BatchAddProgress::Done { hash }).await?; + progress.send(BatchAddPathProgress::Done { hash }).await?; Ok(()) } @@ -956,7 +961,9 @@ impl Handler { let tx2 = tx.clone(); self.rt().spawn_pinned(|| async move { if let Err(e) = self.batch_add_from_path0(msg, tx).await { - tx2.send_async(BatchAddProgress::Abort(e.into())).await.ok(); + tx2.send_async(BatchAddPathProgress::Abort(e.into())) + .await + .ok(); } }); rx.into_stream().map(BatchAddPathResponse) diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 76ae87551a..939c37f4d6 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -15,7 +15,7 @@ use iroh_base::node_addr::AddrInfoOptions; pub use iroh_blobs::{export::ExportProgress, get::db::DownloadProgress, BlobFormat, Hash}; use iroh_blobs::{ format::collection::Collection, - provider::BatchAddProgress, + provider::BatchAddPathProgress, store::{BaoBlobSize, ConsistencyCheckProgress}, util::Tag, HashAndFormat, @@ -273,15 +273,28 @@ impl RpcMsg for BlobDeleteBlobRequest { /// Delete a tag #[derive(Debug, Serialize, Deserialize)] -pub struct DeleteTagRequest { +pub struct SetTagRequest { /// Name of the tag pub name: Tag, + /// Value of the tag, None to delete + pub value: Option, } -impl RpcMsg for DeleteTagRequest { +impl RpcMsg for SetTagRequest { type Response = RpcResult<()>; } +/// Create a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateTagRequest { + /// Value of the tag + pub value: HashAndFormat, +} + +impl RpcMsg for CreateTagRequest { + type Response = RpcResult; +} + /// Get a collection #[derive(Debug, Serialize, Deserialize)] pub struct BlobGetCollectionRequest { @@ -1093,7 +1106,7 @@ pub struct BatchAddPathRequest { /// Response to a batch add path request #[derive(Serialize, Deserialize, Debug)] -pub struct BatchAddPathResponse(pub BatchAddProgress); +pub struct BatchAddPathResponse(pub BatchAddPathProgress); impl Msg for BatchAddPathRequest { type Pattern = ServerStreaming; @@ -1166,7 +1179,8 @@ pub enum Request { BatchAddStreamUpdate(BatchAddStreamUpdate), BatchAddPath(BatchAddPathRequest), - DeleteTag(DeleteTagRequest), + SetTag(SetTagRequest), + CreateTag(CreateTagRequest), ListTags(ListTagsRequest), DocOpen(DocOpenRequest), @@ -1234,6 +1248,7 @@ pub enum Response { ListTags(TagInfo), DeleteTag(RpcResult<()>), + CreateTag(RpcResult), DocOpen(RpcResult), DocClose(RpcResult), diff --git a/iroh/tests/batch.rs b/iroh/tests/batch.rs index e36e0c31de..8d48565a57 100644 --- a/iroh/tests/batch.rs +++ b/iroh/tests/batch.rs @@ -52,3 +52,55 @@ async fn test_batch_create_2() -> anyhow::Result<()> { assert!(client.read_to_bytes(hash).await.is_err()); Ok(()) } + +#[tokio::test] +async fn test_batch_create_from_path_1() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs; + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let expected_data: &[u8] = b"test"; + let expected_hash = blake3::hash(expected_data).into(); + let temp_path = dir.path().join("test"); + std::fs::write(&temp_path, expected_data)?; + let tag = batch + .add_from_path(temp_path, false, BlobFormat::Raw) + .await?; + let hash = *tag.hash(); + assert_eq!(hash, expected_hash); + // Check that the store has the data and that it is protected from gc + tokio::time::sleep(Duration::from_millis(50)).await; + let data = client.read_to_bytes(hash).await?; + assert_eq!(data.as_ref(), expected_data); + drop(tag); + // Check that the store drops the data when the temp tag gets dropped + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(client.read_to_bytes(hash).await.is_err()); + Ok(()) +} + +#[tokio::test] +async fn test_batch_create_from_path_2() -> anyhow::Result<()> { + let node = create_node().await?; + let client = &node.client().blobs; + let batch = client.batch().await?; + let dir = tempfile::tempdir()?; + let expected_data: &[u8] = b"test"; + let expected_hash = blake3::hash(expected_data).into(); + let temp_path = dir.path().join("test"); + std::fs::write(&temp_path, expected_data)?; + let tag = batch + .add_from_path(temp_path, false, BlobFormat::Raw) + .await?; + let hash = *tag.hash(); + assert_eq!(hash, expected_hash); + // Check that the store has the data and that it is protected from gc + tokio::time::sleep(Duration::from_millis(50)).await; + let data = client.read_to_bytes(hash).await?; + assert_eq!(data.as_ref(), expected_data); + drop(batch); + // Check that the store drops the data when the temp tag gets dropped + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(client.read_to_bytes(hash).await.is_err()); + Ok(()) +}