Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-bytes): Batch blob api #2339

Closed
wants to merge 23 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Make TempTag non-Clone
I found that in most places it is not needed, and if somebody needs them
to be clone they can always wrap them in an Arc.

Also, this complicates extending the concept of temp tags via the rpc
boundary.

With this change if will be possible to use the same TempTag type both in
the low level blobs store API and in the higher level blobs API of iroh.
  • Loading branch information
rklaehn committed Jun 3, 2024
commit 11e2d7f0f12d05baa58022e5ba4d6e424ef90654
4 changes: 3 additions & 1 deletion iroh-blobs/src/store.rs
Original file line number Diff line number Diff line change
@@ -66,7 +66,9 @@ impl TempCounterMap {

fn dec(&mut self, value: &HashAndFormat) {
let HashAndFormat { hash, format } = value;
let counters = self.0.get_mut(hash).unwrap();
let Some(counters) = self.0.get_mut(hash) else {
return;
};
counters.dec(*format);
if counters.is_empty() {
self.0.remove(hash);
27 changes: 18 additions & 9 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ use std::{
collections::{BTreeMap, BTreeSet},
io::{self, BufReader, Read},
path::{Path, PathBuf},
sync::{Arc, RwLock},
sync::{Arc, RwLock, Weak},
time::{Duration, SystemTime},
};

@@ -111,7 +111,7 @@ use crate::{
BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
ProgressSender,
},
raw_outboard_size, LivenessTracker, MemOrFile,
raw_outboard_size, MemOrFile, TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
@@ -775,20 +775,23 @@ impl Store {
struct StoreInner {
tx: flume::Sender<ActorMessage>,
temp: Arc<RwLock<TempCounterMap>>,
temp_weak: Weak<dyn TagDrop>,
handle: Option<std::thread::JoinHandle<()>>,
path_options: Arc<PathOptions>,
}

impl LivenessTracker for RwLock<TempCounterMap> {
fn on_clone(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}

impl TagDrop for RwLock<TempCounterMap> {
fn on_drop(&self, content: &HashAndFormat) {
self.write().unwrap().dec(content);
}
}

impl TagCounter for RwLock<TempCounterMap> {
fn on_create(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}
}

impl StoreInner {
fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
tracing::trace!(
@@ -807,6 +810,7 @@ impl StoreInner {
);
std::fs::create_dir_all(path.parent().unwrap())?;
let temp: Arc<RwLock<TempCounterMap>> = Default::default();
let temp_weak = Arc::downgrade(&temp);
let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt)?;
let handle = std::thread::Builder::new()
.name("redb-actor".to_string())
@@ -819,6 +823,7 @@ impl StoreInner {
Ok(Self {
tx,
temp,
temp_weak,
handle: Some(handle),
path_options: Arc::new(options.path),
})
@@ -1049,7 +1054,8 @@ impl StoreInner {
}

fn temp_tag(&self, content: HashAndFormat) -> TempTag {
TempTag::new(content, Some(self.temp.clone()))
self.temp.on_create(&content);
TempTag::new(content, Some(self.temp_weak.clone()))
}

fn import_file_sync(
@@ -1717,7 +1723,10 @@ impl ActorState {
let inline_outboard =
outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
// from here on, everything related to the hash is protected by the temp tag
let tag = TempTag::new(content_id, Some(self.temp.clone()));
self.temp.on_create(&content_id);
let temp: Arc<dyn TagDrop> = self.temp.clone();
let liveness = Arc::downgrade(&temp);
let tag = TempTag::new(content_id, Some(liveness));
let hash = *tag.hash();
self.protected.insert(hash);
// move the data file into place, or create a reference to it
23 changes: 14 additions & 9 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ use crate::{
},
util::{
progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
LivenessTracker,
TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
@@ -43,20 +43,22 @@ pub struct Store {
#[derive(Debug, Default)]
struct StoreInner(RwLock<StateInner>);

impl LivenessTracker for StoreInner {
fn on_clone(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}

impl TagDrop for StoreInner {
fn on_drop(&self, inner: &HashAndFormat) {
tracing::trace!("temp tag drop: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.dec(inner);
}
}

impl TagCounter for StoreInner {
fn on_create(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}
}

impl Store {
/// Create a new in memory store
pub fn new() -> Self {
@@ -217,7 +219,10 @@ impl super::Store for Store {
}

fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
TempTag::new(tag, Some(self.inner.clone()))
self.inner.on_create(&tag);
let temp: Arc<dyn TagDrop> = self.inner.clone();
let liveness = Arc::downgrade(&temp);
TempTag::new(tag, Some(liveness))
}

async fn gc_start(&self) -> io::Result<()> {
39 changes: 21 additions & 18 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use bytes::Bytes;
use derive_more::{Debug, Display, From, Into};
use range_collections::range_set::RangeSetRange;
use serde::{Deserialize, Serialize};
use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime};
use std::{borrow::Borrow, fmt, sync::Weak, time::SystemTime};

use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};

@@ -179,6 +179,13 @@ pub enum SetTagOption {
Named(Tag),
}

/// Trait used from temp tags to notify an abstract store that a temp tag is
/// being dropped.
pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
}

/// A trait for things that can track liveness of blobs and collections.
///
/// This trait works together with [TempTag] to keep track of the liveness of a
@@ -187,11 +194,9 @@ pub enum SetTagOption {
/// It is important to include the format in the liveness tracking, since
/// protecting a collection means protecting the blob and all its children,
/// whereas protecting a raw blob only protects the blob itself.
pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
/// Called on clone
fn on_clone(&self, inner: &HashAndFormat);
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
pub trait TagCounter: TagDrop {
/// Called on creation of a temp tag
fn on_create(&self, inner: &HashAndFormat);
}

/// A hash and format pair that is protected from garbage collection.
@@ -203,7 +208,7 @@ pub struct TempTag {
/// The hash and format we are pinning
inner: HashAndFormat,
/// liveness tracker
liveness: Option<Arc<dyn LivenessTracker>>,
liveness: Option<Weak<dyn TagDrop>>,
}

impl TempTag {
@@ -214,10 +219,12 @@ impl TempTag {
/// The caller is responsible for increasing the refcount on creation and to
/// make sure that temp tags that are created between a mark phase and a sweep
/// phase are protected.
pub fn new(inner: HashAndFormat, liveness: Option<Arc<dyn LivenessTracker>>) -> Self {
if let Some(liveness) = liveness.as_ref() {
liveness.on_clone(&inner);
}
pub fn new(inner: HashAndFormat, liveness: Option<Weak<dyn TagDrop>>) -> Self {
// if let Some(liveness) = liveness.as_ref() {
// if let Some(liveness) = liveness.upgrade() {
// liveness.on_clone(&inner);
// }
// }
Self { inner, liveness }
}

@@ -245,16 +252,12 @@ impl TempTag {
}
}

impl Clone for TempTag {
fn clone(&self) -> Self {
Self::new(self.inner, self.liveness.clone())
}
}

impl Drop for TempTag {
fn drop(&mut self) {
if let Some(liveness) = self.liveness.as_ref() {
liveness.on_drop(&self.inner);
if let Some(liveness) = liveness.upgrade() {
liveness.on_drop(&self.inner);
}
}
}
}
Loading