diff --git a/iroh-blobs/src/store.rs b/iroh-blobs/src/store.rs index 0e8f35d301..3030d55b3f 100644 --- a/iroh-blobs/src/store.rs +++ b/iroh-blobs/src/store.rs @@ -11,6 +11,7 @@ pub mod readonly_mem; pub mod fs; mod traits; +use tracing::warn; pub use traits::*; /// Create a 16 byte unique ID. @@ -66,7 +67,10 @@ impl TempCounterMap { fn dec(&mut self, value: &HashAndFormat) { let HashAndFormat { hash, format } = value; - let counters = self.0.get_mut(hash).unwrap(); + let Some(counters) = self.0.get_mut(hash) else { + warn!("Decrementing non-existent temp tag"); + return; + }; counters.dec(*format); if counters.is_empty() { self.0.remove(hash); diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index e0a4d192f0..5febe54457 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -111,7 +111,7 @@ use crate::{ BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError, ProgressSender, }, - raw_outboard_size, LivenessTracker, MemOrFile, + raw_outboard_size, MemOrFile, TagCounter, TagDrop, }, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -779,16 +779,18 @@ struct StoreInner { path_options: Arc, } -impl LivenessTracker for RwLock { - fn on_clone(&self, content: &HashAndFormat) { - self.write().unwrap().inc(content); - } - +impl TagDrop for RwLock { fn on_drop(&self, content: &HashAndFormat) { self.write().unwrap().dec(content); } } +impl TagCounter for RwLock { + fn on_create(&self, content: &HashAndFormat) { + self.write().unwrap().inc(content); + } +} + impl StoreInner { fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result { tracing::trace!( @@ -981,7 +983,7 @@ impl StoreInner { )) })?; std::fs::create_dir_all(parent)?; - let temp_tag = self.temp_tag(HashAndFormat::raw(hash)); + let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx .send_async(ActorMessage::Export { @@ -1048,10 +1050,6 @@ impl StoreInner { Ok(rx.await?) } - fn temp_tag(&self, content: HashAndFormat) -> TempTag { - TempTag::new(content, Some(self.temp.clone())) - } - fn import_file_sync( &self, path: PathBuf, @@ -1141,7 +1139,7 @@ impl StoreInner { }; progress.blocking_send(ImportProgress::OutboardDone { id, hash })?; // from here on, everything related to the hash is protected by the temp tag - let tag = self.temp_tag(HashAndFormat { hash, format }); + let tag = self.temp.temp_tag(HashAndFormat { hash, format }); let hash = *tag.hash(); // blocking send for the import let (tx, rx) = flume::bounded(1); @@ -1423,7 +1421,7 @@ impl super::Store for Store { } fn temp_tag(&self, value: HashAndFormat) -> TempTag { - self.0.temp_tag(value) + self.0.temp.temp_tag(value) } async fn shutdown(&self) { @@ -1717,7 +1715,7 @@ impl ActorState { let inline_outboard = outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0; // from here on, everything related to the hash is protected by the temp tag - let tag = TempTag::new(content_id, Some(self.temp.clone())); + let tag = self.temp.temp_tag(content_id); let hash = *tag.hash(); self.protected.insert(hash); // move the data file into place, or create a reference to it diff --git a/iroh-blobs/src/store/mem.rs b/iroh-blobs/src/store/mem.rs index 7b14b2a14b..e10849e2b7 100644 --- a/iroh-blobs/src/store/mem.rs +++ b/iroh-blobs/src/store/mem.rs @@ -23,7 +23,7 @@ use crate::{ }, util::{ progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender}, - LivenessTracker, + TagCounter, TagDrop, }, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -43,13 +43,7 @@ pub struct Store { #[derive(Debug, Default)] struct StoreInner(RwLock); -impl LivenessTracker for StoreInner { - fn on_clone(&self, inner: &HashAndFormat) { - tracing::trace!("temp tagging: {:?}", inner); - let mut state = self.0.write().unwrap(); - state.temp.inc(inner); - } - +impl TagDrop for StoreInner { fn on_drop(&self, inner: &HashAndFormat) { tracing::trace!("temp tag drop: {:?}", inner); let mut state = self.0.write().unwrap(); @@ -57,6 +51,14 @@ impl LivenessTracker for StoreInner { } } +impl TagCounter for StoreInner { + fn on_create(&self, inner: &HashAndFormat) { + tracing::trace!("temp tagging: {:?}", inner); + let mut state = self.0.write().unwrap(); + state.temp.inc(inner); + } +} + impl Store { /// Create a new in memory store pub fn new() -> Self { @@ -217,7 +219,7 @@ impl super::Store for Store { } fn temp_tag(&self, tag: HashAndFormat) -> TempTag { - TempTag::new(tag, Some(self.inner.clone())) + self.inner.temp_tag(tag) } async fn gc_start(&self) -> io::Result<()> { diff --git a/iroh-blobs/src/util.rs b/iroh-blobs/src/util.rs index b540b88562..751886492c 100644 --- a/iroh-blobs/src/util.rs +++ b/iroh-blobs/src/util.rs @@ -4,7 +4,12 @@ use bytes::Bytes; use derive_more::{Debug, Display, From, Into}; use range_collections::range_set::RangeSetRange; use serde::{Deserialize, Serialize}; -use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime}; +use std::{ + borrow::Borrow, + fmt, + sync::{Arc, Weak}, + time::SystemTime, +}; use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE}; @@ -179,6 +184,13 @@ pub enum SetTagOption { Named(Tag), } +/// Trait used from temp tags to notify an abstract store that a temp tag is +/// being dropped. +pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static { + /// Called on drop + fn on_drop(&self, inner: &HashAndFormat); +} + /// A trait for things that can track liveness of blobs and collections. /// /// This trait works together with [TempTag] to keep track of the liveness of a @@ -187,11 +199,21 @@ pub enum SetTagOption { /// It is important to include the format in the liveness tracking, since /// protecting a collection means protecting the blob and all its children, /// whereas protecting a raw blob only protects the blob itself. -pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { - /// Called on clone - fn on_clone(&self, inner: &HashAndFormat); - /// Called on drop - fn on_drop(&self, inner: &HashAndFormat); +pub trait TagCounter: TagDrop + Sized { + /// Called on creation of a temp tag + fn on_create(&self, inner: &HashAndFormat); + + /// Get this as a weak reference for use in temp tags + fn as_weak(self: &Arc) -> Weak { + let on_drop: Arc = self.clone(); + Arc::downgrade(&on_drop) + } + + /// Create a new temp tag for the given hash and format + fn temp_tag(self: &Arc, inner: HashAndFormat) -> TempTag { + self.on_create(&inner); + TempTag::new(inner, Some(self.as_weak())) + } } /// A hash and format pair that is protected from garbage collection. @@ -202,8 +224,8 @@ pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { pub struct TempTag { /// The hash and format we are pinning inner: HashAndFormat, - /// liveness tracker - liveness: Option>, + /// optional callback to call on drop + on_drop: Option>, } impl TempTag { @@ -214,11 +236,8 @@ impl TempTag { /// The caller is responsible for increasing the refcount on creation and to /// make sure that temp tags that are created between a mark phase and a sweep /// phase are protected. - pub fn new(inner: HashAndFormat, liveness: Option>) -> Self { - if let Some(liveness) = liveness.as_ref() { - liveness.on_clone(&inner); - } - Self { inner, liveness } + pub fn new(inner: HashAndFormat, on_drop: Option>) -> Self { + Self { inner, on_drop } } /// The hash of the pinned item @@ -241,20 +260,16 @@ impl TempTag { // set the liveness tracker to None, so that the refcount is not decreased // during drop. This means that the refcount will never reach 0 and the // item will not be gced until the end of the process. - self.liveness = None; - } -} - -impl Clone for TempTag { - fn clone(&self) -> Self { - Self::new(self.inner, self.liveness.clone()) + self.on_drop = None; } } impl Drop for TempTag { fn drop(&mut self) { - if let Some(liveness) = self.liveness.as_ref() { - liveness.on_drop(&self.inner); + if let Some(on_drop) = self.on_drop.take() { + if let Some(on_drop) = on_drop.upgrade() { + on_drop.on_drop(&self.inner); + } } } }