From d0662c2d980b9fe28c669f2e6262c446d08bf7bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Wed, 5 Jun 2024 13:10:03 +0300 Subject: [PATCH] refactor(iroh-blobs)!: make TempTag non-Clone (#2338) ## Description refactor(iroh-blobs): Make TempTag non-Clone I found that in most places it is not needed, and if somebody needs them to be clone they can always wrap them in an Arc. Also, this complicates extending the concept of temp tags via the rpc boundary. With this change if will be possible to use the same TempTag type both in the low level blobs store API and in the higher level blobs API of iroh. ## Breaking Changes Changes the signature of TempTag::new to take a `Weak` instead of an Arc. This will affect projects that write their own store impl, so none that I am aware of. Nevertheless it is a breaking change. The high level rpc API is not affected, since it does not know temp tags (yet). ## Notes & open questions Note: this is just part 1/x of changes needed to extend temp tags to the rpc api. On its own it does not provide much value. Note2: you could be even more radical and give a TempTag a lifetime and a reference to its store. But I am pretty sure that this would not play well with creating bindings and might even be inconvenient from pure rust. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-blobs/src/store.rs | 6 +++- iroh-blobs/src/store/fs.rs | 26 ++++++++-------- iroh-blobs/src/store/mem.rs | 20 +++++++------ iroh-blobs/src/util.rs | 59 +++++++++++++++++++++++-------------- 4 files changed, 65 insertions(+), 46 deletions(-) diff --git a/iroh-blobs/src/store.rs b/iroh-blobs/src/store.rs index 0e8f35d301..3030d55b3f 100644 --- a/iroh-blobs/src/store.rs +++ b/iroh-blobs/src/store.rs @@ -11,6 +11,7 @@ pub mod readonly_mem; pub mod fs; mod traits; +use tracing::warn; pub use traits::*; /// Create a 16 byte unique ID. @@ -66,7 +67,10 @@ impl TempCounterMap { fn dec(&mut self, value: &HashAndFormat) { let HashAndFormat { hash, format } = value; - let counters = self.0.get_mut(hash).unwrap(); + let Some(counters) = self.0.get_mut(hash) else { + warn!("Decrementing non-existent temp tag"); + return; + }; counters.dec(*format); if counters.is_empty() { self.0.remove(hash); diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index e0a4d192f0..5febe54457 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -111,7 +111,7 @@ use crate::{ BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError, ProgressSender, }, - raw_outboard_size, LivenessTracker, MemOrFile, + raw_outboard_size, MemOrFile, TagCounter, TagDrop, }, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -779,16 +779,18 @@ struct StoreInner { path_options: Arc, } -impl LivenessTracker for RwLock { - fn on_clone(&self, content: &HashAndFormat) { - self.write().unwrap().inc(content); - } - +impl TagDrop for RwLock { fn on_drop(&self, content: &HashAndFormat) { self.write().unwrap().dec(content); } } +impl TagCounter for RwLock { + fn on_create(&self, content: &HashAndFormat) { + self.write().unwrap().inc(content); + } +} + impl StoreInner { fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result { tracing::trace!( @@ -981,7 +983,7 @@ impl StoreInner { )) })?; std::fs::create_dir_all(parent)?; - let temp_tag = self.temp_tag(HashAndFormat::raw(hash)); + let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx .send_async(ActorMessage::Export { @@ -1048,10 +1050,6 @@ impl StoreInner { Ok(rx.await?) } - fn temp_tag(&self, content: HashAndFormat) -> TempTag { - TempTag::new(content, Some(self.temp.clone())) - } - fn import_file_sync( &self, path: PathBuf, @@ -1141,7 +1139,7 @@ impl StoreInner { }; progress.blocking_send(ImportProgress::OutboardDone { id, hash })?; // from here on, everything related to the hash is protected by the temp tag - let tag = self.temp_tag(HashAndFormat { hash, format }); + let tag = self.temp.temp_tag(HashAndFormat { hash, format }); let hash = *tag.hash(); // blocking send for the import let (tx, rx) = flume::bounded(1); @@ -1423,7 +1421,7 @@ impl super::Store for Store { } fn temp_tag(&self, value: HashAndFormat) -> TempTag { - self.0.temp_tag(value) + self.0.temp.temp_tag(value) } async fn shutdown(&self) { @@ -1717,7 +1715,7 @@ impl ActorState { let inline_outboard = outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0; // from here on, everything related to the hash is protected by the temp tag - let tag = TempTag::new(content_id, Some(self.temp.clone())); + let tag = self.temp.temp_tag(content_id); let hash = *tag.hash(); self.protected.insert(hash); // move the data file into place, or create a reference to it diff --git a/iroh-blobs/src/store/mem.rs b/iroh-blobs/src/store/mem.rs index 7b14b2a14b..e10849e2b7 100644 --- a/iroh-blobs/src/store/mem.rs +++ b/iroh-blobs/src/store/mem.rs @@ -23,7 +23,7 @@ use crate::{ }, util::{ progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender}, - LivenessTracker, + TagCounter, TagDrop, }, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -43,13 +43,7 @@ pub struct Store { #[derive(Debug, Default)] struct StoreInner(RwLock); -impl LivenessTracker for StoreInner { - fn on_clone(&self, inner: &HashAndFormat) { - tracing::trace!("temp tagging: {:?}", inner); - let mut state = self.0.write().unwrap(); - state.temp.inc(inner); - } - +impl TagDrop for StoreInner { fn on_drop(&self, inner: &HashAndFormat) { tracing::trace!("temp tag drop: {:?}", inner); let mut state = self.0.write().unwrap(); @@ -57,6 +51,14 @@ impl LivenessTracker for StoreInner { } } +impl TagCounter for StoreInner { + fn on_create(&self, inner: &HashAndFormat) { + tracing::trace!("temp tagging: {:?}", inner); + let mut state = self.0.write().unwrap(); + state.temp.inc(inner); + } +} + impl Store { /// Create a new in memory store pub fn new() -> Self { @@ -217,7 +219,7 @@ impl super::Store for Store { } fn temp_tag(&self, tag: HashAndFormat) -> TempTag { - TempTag::new(tag, Some(self.inner.clone())) + self.inner.temp_tag(tag) } async fn gc_start(&self) -> io::Result<()> { diff --git a/iroh-blobs/src/util.rs b/iroh-blobs/src/util.rs index b540b88562..751886492c 100644 --- a/iroh-blobs/src/util.rs +++ b/iroh-blobs/src/util.rs @@ -4,7 +4,12 @@ use bytes::Bytes; use derive_more::{Debug, Display, From, Into}; use range_collections::range_set::RangeSetRange; use serde::{Deserialize, Serialize}; -use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime}; +use std::{ + borrow::Borrow, + fmt, + sync::{Arc, Weak}, + time::SystemTime, +}; use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE}; @@ -179,6 +184,13 @@ pub enum SetTagOption { Named(Tag), } +/// Trait used from temp tags to notify an abstract store that a temp tag is +/// being dropped. +pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static { + /// Called on drop + fn on_drop(&self, inner: &HashAndFormat); +} + /// A trait for things that can track liveness of blobs and collections. /// /// This trait works together with [TempTag] to keep track of the liveness of a @@ -187,11 +199,21 @@ pub enum SetTagOption { /// It is important to include the format in the liveness tracking, since /// protecting a collection means protecting the blob and all its children, /// whereas protecting a raw blob only protects the blob itself. -pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { - /// Called on clone - fn on_clone(&self, inner: &HashAndFormat); - /// Called on drop - fn on_drop(&self, inner: &HashAndFormat); +pub trait TagCounter: TagDrop + Sized { + /// Called on creation of a temp tag + fn on_create(&self, inner: &HashAndFormat); + + /// Get this as a weak reference for use in temp tags + fn as_weak(self: &Arc) -> Weak { + let on_drop: Arc = self.clone(); + Arc::downgrade(&on_drop) + } + + /// Create a new temp tag for the given hash and format + fn temp_tag(self: &Arc, inner: HashAndFormat) -> TempTag { + self.on_create(&inner); + TempTag::new(inner, Some(self.as_weak())) + } } /// A hash and format pair that is protected from garbage collection. @@ -202,8 +224,8 @@ pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { pub struct TempTag { /// The hash and format we are pinning inner: HashAndFormat, - /// liveness tracker - liveness: Option>, + /// optional callback to call on drop + on_drop: Option>, } impl TempTag { @@ -214,11 +236,8 @@ impl TempTag { /// The caller is responsible for increasing the refcount on creation and to /// make sure that temp tags that are created between a mark phase and a sweep /// phase are protected. - pub fn new(inner: HashAndFormat, liveness: Option>) -> Self { - if let Some(liveness) = liveness.as_ref() { - liveness.on_clone(&inner); - } - Self { inner, liveness } + pub fn new(inner: HashAndFormat, on_drop: Option>) -> Self { + Self { inner, on_drop } } /// The hash of the pinned item @@ -241,20 +260,16 @@ impl TempTag { // set the liveness tracker to None, so that the refcount is not decreased // during drop. This means that the refcount will never reach 0 and the // item will not be gced until the end of the process. - self.liveness = None; - } -} - -impl Clone for TempTag { - fn clone(&self) -> Self { - Self::new(self.inner, self.liveness.clone()) + self.on_drop = None; } } impl Drop for TempTag { fn drop(&mut self) { - if let Some(liveness) = self.liveness.as_ref() { - liveness.on_drop(&self.inner); + if let Some(on_drop) = self.on_drop.take() { + if let Some(on_drop) = on_drop.upgrade() { + on_drop.on_drop(&self.inner); + } } } }