diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 470ec75b03..9a7d3586a0 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -11,18 +11,18 @@ use futures_lite::StreamExt; use futures_util::{FutureExt, SinkExt, Stream}; use iroh_blobs::{ format::collection::Collection, provider::BatchAddPathProgress, store::ImportMode, - util::TagDrop, BlobFormat, HashAndFormat, TempTag, + util::TagDrop, BlobFormat, HashAndFormat, Tag, TempTag, }; use quic_rpc::{client::UpdateSink, RpcClient, ServiceConnection}; use tokio::io::AsyncRead; use tokio_util::io::ReaderStream; -use tracing::warn; +use tracing::{debug, warn}; use crate::{ client::RpcService, rpc_protocol::{ BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, - BatchCreateTempTagRequest, BatchId, BatchUpdate, + BatchCreateTempTagRequest, BatchId, BatchUpdate, CreateTagRequest, SetTagRequest, }, }; @@ -52,7 +52,23 @@ pub struct Batch>(Arc>); impl> TagDrop for BatchInner { fn on_drop(&self, content: &HashAndFormat) { let mut updates = self.updates.lock().unwrap(); - updates.send(BatchUpdate::Drop(*content)).now_or_never(); + // send a drop to the server + // this will occasionally fail, but that's acceptable. The temp tags for the batch + // will be cleaned up as soon as the entire batch is dropped. + // + // E.g. a typical scenario is that you create a large array of temp tags, and then + // store them in a hash sequence and then drop the array. You will get many drops + // at the same time, and might get a send failure here. + // + // But that just means that the server will clean up the temp tags when the batch is + // dropped. + if updates + .send(BatchUpdate::Drop(*content)) + .now_or_never() + .is_none() + { + debug!("Failed to send drop to server"); + } } } @@ -384,6 +400,32 @@ impl> Batch { } } + /// Upgrade a temp tag to a persistent tag. + pub async fn upgrade(&self, tt: TempTag) -> Result { + let tag = self + .0 + .rpc + .rpc(CreateTagRequest { + value: tt.hash_and_format(), + batch: Some(self.0.batch), + }) + .await??; + Ok(tag) + } + + /// Upgrade a temp tag to a persistent tag with a specific name. + pub async fn upgrade_to(&self, tt: TempTag, tag: Tag) -> Result<()> { + self.0 + .rpc + .rpc(SetTagRequest { + name: tag, + value: Some(tt.hash_and_format()), + batch: Some(self.0.batch), + }) + .await??; + Ok(()) + } + /// Creates a temp tag for the given hash and format, without notifying the server. /// /// Caution: only do this for data for which you know the server side has created a temp tag. diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index 926834d783..1e12a55676 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -28,29 +28,38 @@ where /// /// Use this method if you want a new tag with a unique name. pub async fn create(&self, value: HashAndFormat) -> Result { - Ok(self.rpc.rpc(CreateTagRequest { value }).await??) - } - - /// Set a tag to a value, overwriting any existing value. - /// - /// Setting the value to `None` deletes the tag. Setting the value to `Some` creates or updates the tag. - pub async fn set_opt(&self, name: Tag, value: Option) -> Result<()> { - self.rpc.rpc(SetTagRequest { name, value }).await??; - Ok(()) + Ok(self + .rpc + .rpc(CreateTagRequest { value, batch: None }) + .await??) } /// Set a tag to a value, overwriting any existing value. /// /// This is a convenience wrapper around `set_opt`. pub async fn set(&self, name: Tag, value: HashAndFormat) -> Result<()> { - self.set_opt(name, Some(value)).await + self.set_with_opts(name, Some(value)).await } /// Delete a tag. /// /// This is a convenience wrapper around `set_opt`. pub async fn delete(&self, name: Tag) -> Result<()> { - self.set_opt(name, None).await + self.set_with_opts(name, None).await + } + + /// Set a tag to a value, overwriting any existing value. + /// + /// Setting the value to `None` deletes the tag. Setting the value to `Some` creates or updates the tag. + pub async fn set_with_opts(&self, name: Tag, value: Option) -> Result<()> { + self.rpc + .rpc(SetTagRequest { + name, + value, + batch: None, + }) + .await??; + Ok(()) } } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 7e5bd12406..1154763220 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -103,27 +103,35 @@ impl BlobBatches { } /// Remove a tag from a batch. - fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat, u: Option<&dyn TagDrop>) { + fn remove_one( + &mut self, + batch: BatchId, + content: &HashAndFormat, + tag_drop: Option<&dyn TagDrop>, + ) -> Result<()> { if let Some(scope) = self.batches.get_mut(&batch) { if let Some(counter) = scope.tags.get_mut(content) { *counter -= 1; - if let Some(u) = u { - u.on_drop(content); + if let Some(tag_drop) = tag_drop { + tag_drop.on_drop(content); } if *counter == 0 { scope.tags.remove(content); } } + } else { + anyhow::bail!("batch not found"); } + Ok(()) } /// Remove an entire batch. - fn remove(&mut self, batch: BatchId, u: Option<&dyn TagDrop>) { + fn remove(&mut self, batch: BatchId, tag_drop: Option<&dyn TagDrop>) { if let Some(scope) = self.batches.remove(&batch) { for (content, count) in scope.tags { - if let Some(u) = u { + if let Some(tag_drop) = tag_drop { for _ in 0..count { - u.on_drop(&content); + tag_drop.on_drop(&content); } } } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index d4042dd86e..2d5e08d1c1 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -456,13 +456,28 @@ impl Handler { } async fn tags_set_tag(self, msg: SetTagRequest) -> RpcResult<()> { - self.inner.db.set_tag(msg.name, None).await?; + if let Some(batch) = msg.batch { + if let Some(content) = msg.value.as_ref() { + self.inner.blob_batches.lock().unwrap().remove_one( + batch, + content, + self.inner.db.tag_drop(), + )?; + } + } + self.inner.db.set_tag(msg.name, msg.value).await?; Ok(()) } async fn tags_create_tag(self, msg: CreateTagRequest) -> RpcResult { - let tag = self.inner.db.create_tag(msg.value).await?; - Ok(tag) + if let Some(batch) = msg.batch { + self.inner.blob_batches.lock().unwrap().remove_one( + batch, + &msg.value, + self.inner.db.tag_drop(), + )?; + } + Ok(self.inner.db.create_tag(msg.value).await?) } fn tags_list_tags(self, _msg: ListTagsRequest) -> impl Stream + Send + 'static { @@ -932,12 +947,15 @@ impl Handler { while let Some(item) = updates.next().await { match item { BatchUpdate::Drop(content) => { - self.inner.blob_batches.lock().unwrap().remove_one( + // this can not fail, since we keep the batch alive. + // therefore it is safe to ignore the result. + let _ = self.inner.blob_batches.lock().unwrap().remove_one( batch, &content, self.inner.db.tag_drop(), ); } + BatchUpdate::Ping => {} } } self.inner diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 3894bb2d2c..ec70024fd8 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -62,6 +62,8 @@ pub struct BatchCreateRequest; pub enum BatchUpdate { /// Drop of a remote temp tag Drop(HashAndFormat), + /// Message to check that the connection is still alive + Ping, } /// Response to a temp tag scope request @@ -311,6 +313,8 @@ pub struct SetTagRequest { pub name: Tag, /// Value of the tag, None to delete pub value: Option, + /// Batch to use, none for global + pub batch: Option, } impl RpcMsg for SetTagRequest { @@ -322,6 +326,8 @@ impl RpcMsg for SetTagRequest { pub struct CreateTagRequest { /// Value of the tag pub value: HashAndFormat, + /// Batch to use, none for global + pub batch: Option, } impl RpcMsg for CreateTagRequest {