Skip to content

Commit

Permalink
Add batch option to CreateTagRequest and SetTagRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 5, 2024
1 parent be8209c commit aa7f73d
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 25 deletions.
50 changes: 46 additions & 4 deletions iroh/src/client/blobs/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ use futures_lite::StreamExt;
use futures_util::{FutureExt, SinkExt, Stream};
use iroh_blobs::{
format::collection::Collection, provider::BatchAddPathProgress, store::ImportMode,
util::TagDrop, BlobFormat, HashAndFormat, TempTag,
util::TagDrop, BlobFormat, HashAndFormat, Tag, TempTag,
};
use quic_rpc::{client::UpdateSink, RpcClient, ServiceConnection};
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use tracing::warn;
use tracing::{debug, warn};

use crate::{
client::RpcService,
rpc_protocol::{
BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate,
BatchCreateTempTagRequest, BatchId, BatchUpdate,
BatchCreateTempTagRequest, BatchId, BatchUpdate, CreateTagRequest, SetTagRequest,
},
};

Expand Down Expand Up @@ -52,7 +52,23 @@ pub struct Batch<C: ServiceConnection<RpcService>>(Arc<BatchInner<C>>);
impl<C: ServiceConnection<RpcService>> TagDrop for BatchInner<C> {
fn on_drop(&self, content: &HashAndFormat) {
let mut updates = self.updates.lock().unwrap();
updates.send(BatchUpdate::Drop(*content)).now_or_never();
// send a drop to the server
// this will occasionally fail, but that's acceptable. The temp tags for the batch
// will be cleaned up as soon as the entire batch is dropped.
//
// E.g. a typical scenario is that you create a large array of temp tags, and then
// store them in a hash sequence and then drop the array. You will get many drops
// at the same time, and might get a send failure here.
//
// But that just means that the server will clean up the temp tags when the batch is
// dropped.
if updates
.send(BatchUpdate::Drop(*content))
.now_or_never()
.is_none()
{
debug!("Failed to send drop to server");
}
}
}

Expand Down Expand Up @@ -384,6 +400,32 @@ impl<C: ServiceConnection<RpcService>> Batch<C> {
}
}

/// Upgrade a temp tag to a persistent tag.
pub async fn upgrade(&self, tt: TempTag) -> Result<Tag> {
let tag = self
.0
.rpc
.rpc(CreateTagRequest {
value: tt.hash_and_format(),
batch: Some(self.0.batch),
})
.await??;
Ok(tag)
}

/// Upgrade a temp tag to a persistent tag with a specific name.
pub async fn upgrade_to(&self, tt: TempTag, tag: Tag) -> Result<()> {
self.0
.rpc
.rpc(SetTagRequest {
name: tag,
value: Some(tt.hash_and_format()),
batch: Some(self.0.batch),
})
.await??;
Ok(())
}

/// Creates a temp tag for the given hash and format, without notifying the server.
///
/// Caution: only do this for data for which you know the server side has created a temp tag.
Expand Down
31 changes: 20 additions & 11 deletions iroh/src/client/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,38 @@ where
///
/// Use this method if you want a new tag with a unique name.
pub async fn create(&self, value: HashAndFormat) -> Result<Tag> {
Ok(self.rpc.rpc(CreateTagRequest { value }).await??)
}

/// Set a tag to a value, overwriting any existing value.
///
/// Setting the value to `None` deletes the tag. Setting the value to `Some` creates or updates the tag.
pub async fn set_opt(&self, name: Tag, value: Option<HashAndFormat>) -> Result<()> {
self.rpc.rpc(SetTagRequest { name, value }).await??;
Ok(())
Ok(self
.rpc
.rpc(CreateTagRequest { value, batch: None })
.await??)
}

/// Set a tag to a value, overwriting any existing value.
///
/// This is a convenience wrapper around `set_opt`.
pub async fn set(&self, name: Tag, value: HashAndFormat) -> Result<()> {
self.set_opt(name, Some(value)).await
self.set_with_opts(name, Some(value)).await
}

/// Delete a tag.
///
/// This is a convenience wrapper around `set_opt`.
pub async fn delete(&self, name: Tag) -> Result<()> {
self.set_opt(name, None).await
self.set_with_opts(name, None).await
}

/// Set a tag to a value, overwriting any existing value.
///
/// Setting the value to `None` deletes the tag. Setting the value to `Some` creates or updates the tag.
pub async fn set_with_opts(&self, name: Tag, value: Option<HashAndFormat>) -> Result<()> {
self.rpc
.rpc(SetTagRequest {
name,
value,
batch: None,
})
.await??;
Ok(())
}
}

Expand Down
20 changes: 14 additions & 6 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,35 @@ impl BlobBatches {
}

/// Remove a tag from a batch.
fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat, u: Option<&dyn TagDrop>) {
fn remove_one(
&mut self,
batch: BatchId,
content: &HashAndFormat,
tag_drop: Option<&dyn TagDrop>,
) -> Result<()> {
if let Some(scope) = self.batches.get_mut(&batch) {
if let Some(counter) = scope.tags.get_mut(content) {
*counter -= 1;
if let Some(u) = u {
u.on_drop(content);
if let Some(tag_drop) = tag_drop {
tag_drop.on_drop(content);
}
if *counter == 0 {
scope.tags.remove(content);
}
}
} else {
anyhow::bail!("batch not found");
}
Ok(())
}

/// Remove an entire batch.
fn remove(&mut self, batch: BatchId, u: Option<&dyn TagDrop>) {
fn remove(&mut self, batch: BatchId, tag_drop: Option<&dyn TagDrop>) {
if let Some(scope) = self.batches.remove(&batch) {
for (content, count) in scope.tags {
if let Some(u) = u {
if let Some(tag_drop) = tag_drop {
for _ in 0..count {
u.on_drop(&content);
tag_drop.on_drop(&content);
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,28 @@ impl<D: BaoStore> Handler<D> {
}

async fn tags_set_tag(self, msg: SetTagRequest) -> RpcResult<()> {
self.inner.db.set_tag(msg.name, None).await?;
if let Some(batch) = msg.batch {
if let Some(content) = msg.value.as_ref() {
self.inner.blob_batches.lock().unwrap().remove_one(
batch,
content,
self.inner.db.tag_drop(),
)?;
}
}
self.inner.db.set_tag(msg.name, msg.value).await?;
Ok(())
}

async fn tags_create_tag(self, msg: CreateTagRequest) -> RpcResult<Tag> {
let tag = self.inner.db.create_tag(msg.value).await?;
Ok(tag)
if let Some(batch) = msg.batch {
self.inner.blob_batches.lock().unwrap().remove_one(
batch,
&msg.value,
self.inner.db.tag_drop(),
)?;
}
Ok(self.inner.db.create_tag(msg.value).await?)
}

fn tags_list_tags(self, _msg: ListTagsRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
Expand Down Expand Up @@ -932,12 +947,15 @@ impl<D: BaoStore> Handler<D> {
while let Some(item) = updates.next().await {
match item {
BatchUpdate::Drop(content) => {
self.inner.blob_batches.lock().unwrap().remove_one(
// this can not fail, since we keep the batch alive.
// therefore it is safe to ignore the result.
let _ = self.inner.blob_batches.lock().unwrap().remove_one(
batch,
&content,
self.inner.db.tag_drop(),
);
}
BatchUpdate::Ping => {}
}
}
self.inner
Expand Down
6 changes: 6 additions & 0 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub struct BatchCreateRequest;
pub enum BatchUpdate {
/// Drop of a remote temp tag
Drop(HashAndFormat),
/// Message to check that the connection is still alive
Ping,
}

/// Response to a temp tag scope request
Expand Down Expand Up @@ -311,6 +313,8 @@ pub struct SetTagRequest {
pub name: Tag,
/// Value of the tag, None to delete
pub value: Option<HashAndFormat>,
/// Batch to use, none for global
pub batch: Option<BatchId>,
}

impl RpcMsg<RpcService> for SetTagRequest {
Expand All @@ -322,6 +326,8 @@ impl RpcMsg<RpcService> for SetTagRequest {
pub struct CreateTagRequest {
/// Value of the tag
pub value: HashAndFormat,
/// Batch to use, none for global
pub batch: Option<BatchId>,
}

impl RpcMsg<RpcService> for CreateTagRequest {
Expand Down

0 comments on commit aa7f73d

Please sign in to comment.