Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-bytes): Batch blob api #2339

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-blobs/src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Collection {
///
/// To persist the collection, write all the blobs to storage, and use the
/// hash of the last blob as the collection hash.
pub fn to_blobs(&self) -> impl Iterator<Item = Bytes> {
pub fn to_blobs(&self) -> impl DoubleEndedIterator<Item = Bytes> {
let meta = CollectionMeta {
header: *Self::HEADER,
names: self.names(),
Expand Down
24 changes: 24 additions & 0 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ pub enum AddProgress {
Abort(RpcError),
}

/// Progress updates for the batch add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchAddPathProgress {
/// An item was found with the given size
Found {
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting item `id`.
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
Progress {
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done with `id`, and the hash is `hash`.
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
Done {
/// The hash of the entry.
hash: Hash,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

/// Read the request from the getter.
///
/// Will fail if there is an error while reading, if the reader
Expand Down
4 changes: 3 additions & 1 deletion iroh-blobs/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ impl TempCounterMap {

fn dec(&mut self, value: &HashAndFormat) {
let HashAndFormat { hash, format } = value;
let counters = self.0.get_mut(hash).unwrap();
let Some(counters) = self.0.get_mut(hash) else {
return;
};
counters.dec(*format);
if counters.is_empty() {
self.0.remove(hash);
Expand Down
30 changes: 16 additions & 14 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use crate::{
BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
ProgressSender,
},
raw_outboard_size, LivenessTracker, MemOrFile,
raw_outboard_size, MemOrFile, TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -779,16 +779,18 @@ struct StoreInner {
path_options: Arc<PathOptions>,
}

impl LivenessTracker for RwLock<TempCounterMap> {
fn on_clone(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}

impl TagDrop for RwLock<TempCounterMap> {
fn on_drop(&self, content: &HashAndFormat) {
self.write().unwrap().dec(content);
}
}

impl TagCounter for RwLock<TempCounterMap> {
fn on_create(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}
}

impl StoreInner {
fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
tracing::trace!(
Expand Down Expand Up @@ -981,7 +983,7 @@ impl StoreInner {
))
})?;
std::fs::create_dir_all(parent)?;
let temp_tag = self.temp_tag(HashAndFormat::raw(hash));
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
let (tx, rx) = oneshot::channel();
self.tx
.send_async(ActorMessage::Export {
Expand Down Expand Up @@ -1048,10 +1050,6 @@ impl StoreInner {
Ok(rx.await?)
}

fn temp_tag(&self, content: HashAndFormat) -> TempTag {
TempTag::new(content, Some(self.temp.clone()))
}

fn import_file_sync(
&self,
path: PathBuf,
Expand Down Expand Up @@ -1141,7 +1139,7 @@ impl StoreInner {
};
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
// from here on, everything related to the hash is protected by the temp tag
let tag = self.temp_tag(HashAndFormat { hash, format });
let tag = self.temp.temp_tag(HashAndFormat { hash, format });
let hash = *tag.hash();
// blocking send for the import
let (tx, rx) = flume::bounded(1);
Expand Down Expand Up @@ -1423,7 +1421,11 @@ impl super::Store for Store {
}

fn temp_tag(&self, value: HashAndFormat) -> TempTag {
self.0.temp_tag(value)
self.0.temp.temp_tag(value)
}

fn tag_drop(&self) -> Option<&dyn TagDrop> {
Some(self.0.temp.as_ref())
}

async fn shutdown(&self) {
Expand Down Expand Up @@ -1717,7 +1719,7 @@ impl ActorState {
let inline_outboard =
outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
// from here on, everything related to the hash is protected by the temp tag
let tag = TempTag::new(content_id, Some(self.temp.clone()));
let tag = self.temp.temp_tag(content_id);
let hash = *tag.hash();
self.protected.insert(hash);
// move the data file into place, or create a reference to it
Expand Down
24 changes: 15 additions & 9 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
},
util::{
progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
LivenessTracker,
TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
Expand All @@ -43,20 +43,22 @@ pub struct Store {
#[derive(Debug, Default)]
struct StoreInner(RwLock<StateInner>);

impl LivenessTracker for StoreInner {
fn on_clone(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}

impl TagDrop for StoreInner {
fn on_drop(&self, inner: &HashAndFormat) {
tracing::trace!("temp tag drop: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.dec(inner);
}
}

impl TagCounter for StoreInner {
fn on_create(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}
}

impl Store {
/// Create a new in memory store
pub fn new() -> Self {
Expand Down Expand Up @@ -217,7 +219,11 @@ impl super::Store for Store {
}

fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
TempTag::new(tag, Some(self.inner.clone()))
self.inner.temp_tag(tag)
}

fn tag_drop(&self) -> Option<&dyn TagDrop> {
Some(self.inner.as_ref())
}

async fn gc_start(&self) -> io::Result<()> {
Expand Down
6 changes: 5 additions & 1 deletion iroh-blobs/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
},
util::{
progress::{BoxedProgressSender, IdGenerator, ProgressSender},
Tag,
Tag, TagDrop,
},
BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -324,6 +324,10 @@ impl super::Store for Store {
TempTag::new(inner, None)
}

fn tag_drop(&self) -> Option<&dyn TagDrop> {
None
}

async fn gc_start(&self) -> io::Result<()> {
Ok(())
}
Expand Down
7 changes: 5 additions & 2 deletions iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
protocol::RangeSpec,
util::{
progress::{BoxedProgressSender, IdGenerator, ProgressSender},
Tag,
Tag, TagDrop,
},
BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -356,6 +356,9 @@ pub trait Store: ReadableStore + MapMut {
/// Create a temporary pin for this store
fn temp_tag(&self, value: HashAndFormat) -> TempTag;

/// Handle to use to drop tags
fn tag_drop(&self) -> Option<&dyn TagDrop>;
rklaehn marked this conversation as resolved.
Show resolved Hide resolved

/// Notify the store that a new gc phase is about to start.
///
/// This should not fail unless the store is shut down or otherwise in a
Expand Down Expand Up @@ -700,7 +703,7 @@ pub enum ImportProgress {
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reexport this from the blobs api since I don't want to newtype it...

/// This mode will copy the file into the database before hashing.
///
Expand Down
67 changes: 44 additions & 23 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ use bytes::Bytes;
use derive_more::{Debug, Display, From, Into};
use range_collections::range_set::RangeSetRange;
use serde::{Deserialize, Serialize};
use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime};
use std::{
borrow::Borrow,
fmt,
sync::{Arc, Weak},
time::SystemTime,
};

use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};

Expand Down Expand Up @@ -179,6 +184,13 @@ pub enum SetTagOption {
Named(Tag),
}

/// Trait used from temp tags to notify an abstract store that a temp tag is
/// being dropped.
pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
}

/// A trait for things that can track liveness of blobs and collections.
///
/// This trait works together with [TempTag] to keep track of the liveness of a
Expand All @@ -187,11 +199,21 @@ pub enum SetTagOption {
/// It is important to include the format in the liveness tracking, since
/// protecting a collection means protecting the blob and all its children,
/// whereas protecting a raw blob only protects the blob itself.
pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
/// Called on clone
fn on_clone(&self, inner: &HashAndFormat);
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
pub trait TagCounter: TagDrop + Sized {
/// Called on creation of a temp tag
fn on_create(&self, inner: &HashAndFormat);

/// Get this as a weak reference for use in temp tags
fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
let on_drop: Arc<dyn TagDrop> = self.clone();
Arc::downgrade(&on_drop)
}

/// Create a new temp tag for the given hash and format
fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
self.on_create(&inner);
TempTag::new(inner, Some(self.as_weak()))
}
}

/// A hash and format pair that is protected from garbage collection.
Expand All @@ -202,8 +224,8 @@ pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
pub struct TempTag {
/// The hash and format we are pinning
inner: HashAndFormat,
/// liveness tracker
liveness: Option<Arc<dyn LivenessTracker>>,
/// optional callback to call on drop
on_drop: Option<Weak<dyn TagDrop>>,
}

impl TempTag {
Expand All @@ -214,11 +236,8 @@ impl TempTag {
/// The caller is responsible for increasing the refcount on creation and to
/// make sure that temp tags that are created between a mark phase and a sweep
/// phase are protected.
pub fn new(inner: HashAndFormat, liveness: Option<Arc<dyn LivenessTracker>>) -> Self {
if let Some(liveness) = liveness.as_ref() {
liveness.on_clone(&inner);
}
Self { inner, liveness }
pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
Self { inner, on_drop }
}

/// The hash of the pinned item
Expand All @@ -236,25 +255,27 @@ impl TempTag {
self.inner.format
}

/// The hash and format of the pinned item
pub fn hash_and_format(&self) -> HashAndFormat {
self.inner
}

/// Keep the item alive until the end of the process
pub fn leak(mut self) {
// set the liveness tracker to None, so that the refcount is not decreased
// during drop. This means that the refcount will never reach 0 and the
// item will not be gced until the end of the process.
self.liveness = None;
}
}

impl Clone for TempTag {
fn clone(&self) -> Self {
Self::new(self.inner, self.liveness.clone())
// item will not be gced until the end of the process, unless you manually
// invoke on_drop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reads like it should be moved from a comment to documentation

self.on_drop = None;
}
}

impl Drop for TempTag {
fn drop(&mut self) {
if let Some(liveness) = self.liveness.as_ref() {
liveness.on_drop(&self.inner);
if let Some(on_drop) = self.on_drop.take() {
if let Some(on_drop) = on_drop.upgrade() {
on_drop.on_drop(&self.inner);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/net/interfaces/bsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl WireFormat {

Ok(Some(WireMessage::Route(m)))
}
#[cfg(any(target_os = "openbsd",))]
#[cfg(target_os = "openbsd")]
MessageType::Route => {
if data.len() < self.body_off {
return Err(RouteError::MessageTooShort);
Expand Down
Loading
Loading