Skip to content

Commit

Permalink
Merge branch 'main' into flub/endpoint-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
flub committed Jun 7, 2024
2 parents 7077ec1 + 7198cd0 commit fce2064
Show file tree
Hide file tree
Showing 35 changed files with 584 additions and 403 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- name: Install sccache
uses: mozilla-actions/[email protected]

Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-blobs/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn export_collection<D: BaoStore>(
progress: impl ProgressSender<Msg = ExportProgress> + IdGenerator,
) -> anyhow::Result<()> {
tokio::fs::create_dir_all(&outpath).await?;
let collection = Collection::load(db, &hash).await?;
let collection = Collection::load_db(db, &hash).await?;
for (name, hash) in collection.into_iter() {
#[allow(clippy::needless_borrow)]
let path = outpath.join(pathbuf_from_name(&name));
Expand Down
26 changes: 23 additions & 3 deletions iroh-blobs/src/format/collection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! The collection type used by iroh
use std::collections::BTreeMap;
use std::{collections::BTreeMap, future::Future};

use anyhow::Context;
use bao_tree::blake3;
Expand Down Expand Up @@ -64,6 +64,12 @@ impl IntoIterator for Collection {
}
}

/// A simple store trait for loading blobs
pub trait SimpleStore {
/// Load a blob from the store
fn load(&self, hash: Hash) -> impl Future<Output = anyhow::Result<Bytes>> + Send + '_;
}

/// Metadata for a collection
///
/// This is the wire format for the metadata blob.
Expand All @@ -84,7 +90,7 @@ impl Collection {
///
/// To persist the collection, write all the blobs to storage, and use the
/// hash of the last blob as the collection hash.
pub fn to_blobs(&self) -> impl Iterator<Item = Bytes> {
pub fn to_blobs(&self) -> impl DoubleEndedIterator<Item = Bytes> {
let meta = CollectionMeta {
header: *Self::HEADER,
names: self.names(),
Expand Down Expand Up @@ -160,11 +166,25 @@ impl Collection {
Ok((collection, res, stats))
}

/// Create a new collection from a hash sequence and metadata.
pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result<Self> {
let hs = store.load(root).await?;
let hs = HashSeq::try_from(hs)?;
let meta_hash = hs.iter().next().context("empty hash seq")?;
let meta = store.load(meta_hash).await?;
let meta: CollectionMeta = postcard::from_bytes(&meta)?;
anyhow::ensure!(
meta.names.len() + 1 == hs.len(),
"names and links length mismatch"
);
Ok(Self::from_parts(hs.into_iter(), meta))
}

/// Load a collection from a store given a root hash
///
/// This assumes that both the links and the metadata of the collection is stored in the store.
/// It does not require that all child blobs are stored in the store.
pub async fn load<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
pub async fn load_db<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
where
D: crate::store::Map,
{
Expand Down
6 changes: 5 additions & 1 deletion iroh-blobs/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod readonly_mem;
pub mod fs;

mod traits;
use tracing::warn;
pub use traits::*;

/// Create a 16 byte unique ID.
Expand Down Expand Up @@ -66,7 +67,10 @@ impl TempCounterMap {

fn dec(&mut self, value: &HashAndFormat) {
let HashAndFormat { hash, format } = value;
let counters = self.0.get_mut(hash).unwrap();
let Some(counters) = self.0.get_mut(hash) else {
warn!("Decrementing non-existent temp tag");
return;
};
counters.dec(*format);
if counters.is_empty() {
self.0.remove(hash);
Expand Down
26 changes: 12 additions & 14 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use crate::{
BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
ProgressSender,
},
raw_outboard_size, LivenessTracker, MemOrFile,
raw_outboard_size, MemOrFile, TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -779,16 +779,18 @@ struct StoreInner {
path_options: Arc<PathOptions>,
}

impl LivenessTracker for RwLock<TempCounterMap> {
fn on_clone(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}

impl TagDrop for RwLock<TempCounterMap> {
fn on_drop(&self, content: &HashAndFormat) {
self.write().unwrap().dec(content);
}
}

impl TagCounter for RwLock<TempCounterMap> {
fn on_create(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}
}

impl StoreInner {
fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
tracing::trace!(
Expand Down Expand Up @@ -981,7 +983,7 @@ impl StoreInner {
))
})?;
std::fs::create_dir_all(parent)?;
let temp_tag = self.temp_tag(HashAndFormat::raw(hash));
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
let (tx, rx) = oneshot::channel();
self.tx
.send_async(ActorMessage::Export {
Expand Down Expand Up @@ -1048,10 +1050,6 @@ impl StoreInner {
Ok(rx.await?)
}

fn temp_tag(&self, content: HashAndFormat) -> TempTag {
TempTag::new(content, Some(self.temp.clone()))
}

fn import_file_sync(
&self,
path: PathBuf,
Expand Down Expand Up @@ -1141,7 +1139,7 @@ impl StoreInner {
};
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
// from here on, everything related to the hash is protected by the temp tag
let tag = self.temp_tag(HashAndFormat { hash, format });
let tag = self.temp.temp_tag(HashAndFormat { hash, format });
let hash = *tag.hash();
// blocking send for the import
let (tx, rx) = flume::bounded(1);
Expand Down Expand Up @@ -1423,7 +1421,7 @@ impl super::Store for Store {
}

fn temp_tag(&self, value: HashAndFormat) -> TempTag {
self.0.temp_tag(value)
self.0.temp.temp_tag(value)
}

async fn shutdown(&self) {
Expand Down Expand Up @@ -1717,7 +1715,7 @@ impl ActorState {
let inline_outboard =
outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
// from here on, everything related to the hash is protected by the temp tag
let tag = TempTag::new(content_id, Some(self.temp.clone()));
let tag = self.temp.temp_tag(content_id);
let hash = *tag.hash();
self.protected.insert(hash);
// move the data file into place, or create a reference to it
Expand Down
20 changes: 11 additions & 9 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
},
util::{
progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
LivenessTracker,
TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
Expand All @@ -43,20 +43,22 @@ pub struct Store {
#[derive(Debug, Default)]
struct StoreInner(RwLock<StateInner>);

impl LivenessTracker for StoreInner {
fn on_clone(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}

impl TagDrop for StoreInner {
fn on_drop(&self, inner: &HashAndFormat) {
tracing::trace!("temp tag drop: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.dec(inner);
}
}

impl TagCounter for StoreInner {
fn on_create(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}
}

impl Store {
/// Create a new in memory store
pub fn new() -> Self {
Expand Down Expand Up @@ -217,7 +219,7 @@ impl super::Store for Store {
}

fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
TempTag::new(tag, Some(self.inner.clone()))
self.inner.temp_tag(tag)
}

async fn gc_start(&self) -> io::Result<()> {
Expand Down
59 changes: 37 additions & 22 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ use bytes::Bytes;
use derive_more::{Debug, Display, From, Into};
use range_collections::range_set::RangeSetRange;
use serde::{Deserialize, Serialize};
use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime};
use std::{
borrow::Borrow,
fmt,
sync::{Arc, Weak},
time::SystemTime,
};

use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};

Expand Down Expand Up @@ -179,6 +184,13 @@ pub enum SetTagOption {
Named(Tag),
}

/// Trait used from temp tags to notify an abstract store that a temp tag is
/// being dropped.
pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
}

/// A trait for things that can track liveness of blobs and collections.
///
/// This trait works together with [TempTag] to keep track of the liveness of a
Expand All @@ -187,11 +199,21 @@ pub enum SetTagOption {
/// It is important to include the format in the liveness tracking, since
/// protecting a collection means protecting the blob and all its children,
/// whereas protecting a raw blob only protects the blob itself.
pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
/// Called on clone
fn on_clone(&self, inner: &HashAndFormat);
/// Called on drop
fn on_drop(&self, inner: &HashAndFormat);
pub trait TagCounter: TagDrop + Sized {
/// Called on creation of a temp tag
fn on_create(&self, inner: &HashAndFormat);

/// Get this as a weak reference for use in temp tags
fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
let on_drop: Arc<dyn TagDrop> = self.clone();
Arc::downgrade(&on_drop)
}

/// Create a new temp tag for the given hash and format
fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
self.on_create(&inner);
TempTag::new(inner, Some(self.as_weak()))
}
}

/// A hash and format pair that is protected from garbage collection.
Expand All @@ -202,8 +224,8 @@ pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
pub struct TempTag {
/// The hash and format we are pinning
inner: HashAndFormat,
/// liveness tracker
liveness: Option<Arc<dyn LivenessTracker>>,
/// optional callback to call on drop
on_drop: Option<Weak<dyn TagDrop>>,
}

impl TempTag {
Expand All @@ -214,11 +236,8 @@ impl TempTag {
/// The caller is responsible for increasing the refcount on creation and to
/// make sure that temp tags that are created between a mark phase and a sweep
/// phase are protected.
pub fn new(inner: HashAndFormat, liveness: Option<Arc<dyn LivenessTracker>>) -> Self {
if let Some(liveness) = liveness.as_ref() {
liveness.on_clone(&inner);
}
Self { inner, liveness }
pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
Self { inner, on_drop }
}

/// The hash of the pinned item
Expand All @@ -241,20 +260,16 @@ impl TempTag {
// set the liveness tracker to None, so that the refcount is not decreased
// during drop. This means that the refcount will never reach 0 and the
// item will not be gced until the end of the process.
self.liveness = None;
}
}

impl Clone for TempTag {
fn clone(&self) -> Self {
Self::new(self.inner, self.liveness.clone())
self.on_drop = None;
}
}

impl Drop for TempTag {
fn drop(&mut self) {
if let Some(liveness) = self.liveness.as_ref() {
liveness.on_drop(&self.inner);
if let Some(on_drop) = self.on_drop.take() {
if let Some(on_drop) = on_drop.upgrade() {
on_drop.on_drop(&self.inner);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl ListCommands {
}
}
Self::Collections => {
let mut response = iroh.blobs.list_collections().await?;
let mut response = iroh.blobs.list_collections()?;
while let Some(item) = response.next().await {
let CollectionInfo {
tag,
Expand Down
Loading

0 comments on commit fce2064

Please sign in to comment.