Skip to content

Commit

Permalink
refactor(iroh-blobs)!: make TempTag non-Clone (#2338)
Browse files Browse the repository at this point in the history
## Description

refactor(iroh-blobs): Make TempTag non-Clone

I found that in most places it is not needed, and if somebody needs them
to be clone they can always wrap them in an Arc.

Also, this complicates extending the concept of temp tags via the rpc
boundary.

With this change if will be possible to use the same TempTag type both
in the low level blobs store API and in the higher level blobs API of
iroh.

## Breaking Changes

Changes the signature of TempTag::new to take a `Weak<dyn TagDrop>`
instead
of an Arc<dyn LivenessTracker>. This will affect projects that write
their own store
impl, so none that I am aware of. Nevertheless it is a breaking change.

The high level rpc API is not affected, since it does not know temp tags
(yet).

## Notes & open questions

Note: this is just part 1/x of changes needed to extend temp tags to the
rpc api. On its own it does not provide much value.

Note2: you could be even more radical and give a TempTag a lifetime and
a reference to its store. But I am pretty sure that this would not play
well with creating bindings and might even be inconvenient from pure
rust.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
rklaehn authored Jun 5, 2024
1 parent 3772889 commit d0662c2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 46 deletions.
6 changes: 5 additions & 1 deletion iroh-blobs/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod readonly_mem;
pub mod fs;

mod traits;
use tracing::warn;
pub use traits::*;

/// Create a 16 byte unique ID.
Expand Down Expand Up @@ -66,7 +67,10 @@ impl TempCounterMap {

fn dec(&mut self, value: &HashAndFormat) {
let HashAndFormat { hash, format } = value;
let counters = self.0.get_mut(hash).unwrap();
let Some(counters) = self.0.get_mut(hash) else {
warn!("Decrementing non-existent temp tag");
return;
};
counters.dec(*format);
if counters.is_empty() {
self.0.remove(hash);
Expand Down
26 changes: 12 additions & 14 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use crate::{
BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
ProgressSender,
},
raw_outboard_size, LivenessTracker, MemOrFile,
raw_outboard_size, MemOrFile, TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -779,16 +779,18 @@ struct StoreInner {
path_options: Arc<PathOptions>,
}

impl LivenessTracker for RwLock<TempCounterMap> {
fn on_clone(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}

impl TagDrop for RwLock<TempCounterMap> {
fn on_drop(&self, content: &HashAndFormat) {
self.write().unwrap().dec(content);
}
}

impl TagCounter for RwLock<TempCounterMap> {
fn on_create(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}
}

impl StoreInner {
fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
tracing::trace!(
Expand Down Expand Up @@ -981,7 +983,7 @@ impl StoreInner {
))
})?;
std::fs::create_dir_all(parent)?;
let temp_tag = self.temp_tag(HashAndFormat::raw(hash));
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
let (tx, rx) = oneshot::channel();
self.tx
.send_async(ActorMessage::Export {
Expand Down Expand Up @@ -1048,10 +1050,6 @@ impl StoreInner {
Ok(rx.await?)
}

fn temp_tag(&self, content: HashAndFormat) -> TempTag {
TempTag::new(content, Some(self.temp.clone()))
}

fn import_file_sync(
&self,
path: PathBuf,
Expand Down Expand Up @@ -1141,7 +1139,7 @@ impl StoreInner {
};
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
// from here on, everything related to the hash is protected by the temp tag
let tag = self.temp_tag(HashAndFormat { hash, format });
let tag = self.temp.temp_tag(HashAndFormat { hash, format });
let hash = *tag.hash();
// blocking send for the import
let (tx, rx) = flume::bounded(1);
Expand Down Expand Up @@ -1423,7 +1421,7 @@ impl super::Store for Store {
}

fn temp_tag(&self, value: HashAndFormat) -> TempTag {
self.0.temp_tag(value)
self.0.temp.temp_tag(value)
}

async fn shutdown(&self) {
Expand Down Expand Up @@ -1717,7 +1715,7 @@ impl ActorState {
let inline_outboard =
outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
// from here on, everything related to the hash is protected by the temp tag
let tag = TempTag::new(content_id, Some(self.temp.clone()));
let tag = self.temp.temp_tag(content_id);
let hash = *tag.hash();
self.protected.insert(hash);
// move the data file into place, or create a reference to it
Expand Down
20 changes: 11 additions & 9 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
},
util::{
progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
LivenessTracker,
TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
Expand All @@ -43,20 +43,22 @@ pub struct Store {
#[derive(Debug, Default)]
struct StoreInner(RwLock<StateInner>);

impl LivenessTracker for StoreInner {
fn on_clone(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}

impl TagDrop for StoreInner {
fn on_drop(&self, inner: &HashAndFormat) {
tracing::trace!("temp tag drop: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.dec(inner);
}
}

impl TagCounter for StoreInner {
fn on_create(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}
}

impl Store {
/// Create a new in memory store
pub fn new() -> Self {
Expand Down Expand Up @@ -217,7 +219,7 @@ impl super::Store for Store {
}

fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
TempTag::new(tag, Some(self.inner.clone()))
self.inner.temp_tag(tag)
}

async fn gc_start(&self) -> io::Result<()> {
Expand Down
59 changes: 37 additions & 22 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ use bytes::Bytes;
use derive_more::{Debug, Display, From, Into};
use range_collections::range_set::RangeSetRange;
use serde::{Deserialize, Serialize};
use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime};
use std::{
borrow::Borrow,
fmt,
sync::{Arc, Weak},
time::SystemTime,
};

use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};

Expand Down Expand Up @@ -179,6 +184,13 @@ pub enum SetTagOption {
Named(Tag),
}

/// Trait used from temp tags to notify an abstract store that a temp tag is
/// being dropped.
pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
}

/// A trait for things that can track liveness of blobs and collections.
///
/// This trait works together with [TempTag] to keep track of the liveness of a
Expand All @@ -187,11 +199,21 @@ pub enum SetTagOption {
/// It is important to include the format in the liveness tracking, since
/// protecting a collection means protecting the blob and all its children,
/// whereas protecting a raw blob only protects the blob itself.
pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
/// Called on clone
fn on_clone(&self, inner: &HashAndFormat);
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
pub trait TagCounter: TagDrop + Sized {
/// Called on creation of a temp tag
fn on_create(&self, inner: &HashAndFormat);

/// Get this as a weak reference for use in temp tags
fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
let on_drop: Arc<dyn TagDrop> = self.clone();
Arc::downgrade(&on_drop)
}

/// Create a new temp tag for the given hash and format
fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
self.on_create(&inner);
TempTag::new(inner, Some(self.as_weak()))
}
}

/// A hash and format pair that is protected from garbage collection.
Expand All @@ -202,8 +224,8 @@ pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
pub struct TempTag {
/// The hash and format we are pinning
inner: HashAndFormat,
/// liveness tracker
liveness: Option<Arc<dyn LivenessTracker>>,
/// optional callback to call on drop
on_drop: Option<Weak<dyn TagDrop>>,
}

impl TempTag {
Expand All @@ -214,11 +236,8 @@ impl TempTag {
/// The caller is responsible for increasing the refcount on creation and to
/// make sure that temp tags that are created between a mark phase and a sweep
/// phase are protected.
pub fn new(inner: HashAndFormat, liveness: Option<Arc<dyn LivenessTracker>>) -> Self {
if let Some(liveness) = liveness.as_ref() {
liveness.on_clone(&inner);
}
Self { inner, liveness }
pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
Self { inner, on_drop }
}

/// The hash of the pinned item
Expand All @@ -241,20 +260,16 @@ impl TempTag {
// set the liveness tracker to None, so that the refcount is not decreased
// during drop. This means that the refcount will never reach 0 and the
// item will not be gced until the end of the process.
self.liveness = None;
}
}

impl Clone for TempTag {
fn clone(&self) -> Self {
Self::new(self.inner, self.liveness.clone())
self.on_drop = None;
}
}

impl Drop for TempTag {
fn drop(&mut self) {
if let Some(liveness) = self.liveness.as_ref() {
liveness.on_drop(&self.inner);
if let Some(on_drop) = self.on_drop.take() {
if let Some(on_drop) = on_drop.upgrade() {
on_drop.on_drop(&self.inner);
}
}
}
}
Expand Down

0 comments on commit d0662c2

Please sign in to comment.