Skip to content

Commit

Permalink
Gc tests (#1585)
Browse files Browse the repository at this point in the history
## Description

Add a number of basic tests as well as a stress test to gc

## Questions

I had some consistent windows failures on Friday. But now it seems to
work on windows. Not sure what is going on - was windows CI just broken
on Friday?

## Missing (this or next PR):

- [ ] test that docs are protected
- [ ] actually change gc default to enabled

## Change checklist

- [x] Self-review.
- [x] ~Documentation updates if relevant.~
- [x] Tests if relevant.
  • Loading branch information
rklaehn authored Oct 10, 2023
1 parent d7a3dd3 commit 079bb9e
Show file tree
Hide file tree
Showing 6 changed files with 573 additions and 67 deletions.
11 changes: 10 additions & 1 deletion iroh-bytes/src/baomap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ pub trait Store: ReadableStore + PartialMap {
///
/// Sweeping might take long, but it can safely be done in the background.
fn gc_sweep(&self) -> LocalBoxStream<'_, GcSweepEvent> {
let blobs = self.blobs();
let blobs = self.blobs().chain(self.partial_blobs());
Gen::new(|co| async move {
let mut count = 0;
for hash in blobs {
Expand Down Expand Up @@ -607,3 +607,12 @@ pub enum ValidateProgress {
/// We got an error and need to abort.
Abort(RpcError),
}

/// Database events
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
/// A GC was started
GcStarted,
/// A GC was completed
GcCompleted,
}
18 changes: 18 additions & 0 deletions iroh-bytes/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ impl From<String> for Tag {
}
}

impl From<&str> for Tag {
fn from(value: &str) -> Self {
Self(Bytes::from(value.to_owned()))
}
}

impl Display for Tag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let bytes = self.0.as_ref();
Expand Down Expand Up @@ -124,6 +130,18 @@ pub enum SetTagOption {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct HashAndFormat(pub Hash, pub BlobFormat);

impl HashAndFormat {
/// Create a new hash and format pair, using the default (raw) format.
pub fn raw(hash: Hash) -> Self {
Self(hash, BlobFormat::RAW)
}

/// Create a new hash and format pair, using the collection format.
pub fn hash_seq(hash: Hash) -> Self {
Self(hash, BlobFormat::HASHSEQ)
}
}

/// Hash type used throughout.
#[derive(PartialEq, Eq, Copy, Clone, Hash)]
pub struct Hash(blake3::Hash);
Expand Down
85 changes: 42 additions & 43 deletions iroh/src/baomap/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ use iroh_bytes::baomap::{
self, EntryStatus, ExportMode, ImportMode, ImportProgress, LivenessTracker, Map, MapEntry,
PartialMap, PartialMapEntry, ReadableStore, TempTag, ValidateProgress,
};
use iroh_bytes::util::progress::{IdGenerator, ProgressSender};
use iroh_bytes::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender};
use iroh_bytes::util::{BlobFormat, HashAndFormat, Tag};
use iroh_bytes::{Hash, IROH_BLOCK_SIZE};
use iroh_io::{AsyncSliceReader, AsyncSliceWriter, File};
Expand Down Expand Up @@ -895,42 +895,65 @@ impl Store {
Ok((tag, size))
}

fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result<TempTag> {
let temp_data_path = self.temp_path();
std::fs::write(&temp_data_path, &data)?;
let id = 0;
let file = ImportFile::TempFile(temp_data_path);
let progress = IgnoreProgressSender::default();
let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
// we have the data in memory, so we can just insert it right now
if data.len() < self.0.options.inline_threshold as usize {
let mut state = self.0.state.write().unwrap();
state.data.insert(*tag.hash(), data);
}
Ok(tag)
}

fn finalize_import_sync(
&self,
file: ImportFile,
format: BlobFormat,
id: u64,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(TempTag, u64)> {
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap();
let size = file.path().metadata()?.len();
progress.blocking_send(ImportProgress::Size { id, size })?;
let progress2 = progress.clone();
let (hash, outboard) = compute_outboard(file.path(), size, move |offset| {
Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
})?;
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
let (tag, new, outboard) = match file {
ImportFile::External(path) => {
use baomap::Store;
let tag = self.temp_tag(HashAndFormat(hash, format));
(tag, CompleteEntry::new_external(size, path), outboard)
}
ImportFile::TempFile(path) => {
use baomap::Store;
// from here on, everything related to the hash is protected by the temp tag
let tag = self.temp_tag(HashAndFormat(hash, format));
let hash = *tag.hash();
let temp_outboard_path = if let Some(outboard) = outboard.as_ref() {
let uuid = new_uuid();
// we write the outboard to a temp file first, since while it is being written it is not complete.
// it is protected from deletion by the temp tag.
let temp_outboard_path = self.0.options.partial_outboard_path(hash, &uuid);
std::fs::write(&temp_outboard_path, outboard)?;
Some(temp_outboard_path)
} else {
None
};
// before here we did not touch the complete files at all.
// all writes here are protected by the temp tag
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap();
// move the data file into place, or create a reference to it
let new = match file {
ImportFile::External(path) => CompleteEntry::new_external(size, path),
ImportFile::TempFile(temp_data_path) => {
let data_path = self.owned_data_path(&hash);
use baomap::Store;
// the blob must be pinned before we move the file, otherwise there is a race condition
// where it might be deleted here.
let tag = self.temp_tag(HashAndFormat(hash, BlobFormat::RAW));
std::fs::rename(path, data_path)?;
(tag, CompleteEntry::new_default(size), outboard)
std::fs::rename(temp_data_path, data_path)?;
CompleteEntry::new_default(size)
}
};
// all writes here are protected by the temp tag
let hash = *tag.hash();
if let Some(outboard) = outboard.as_ref() {
// move the outboard file into place if we have one
if let Some(temp_outboard_path) = temp_outboard_path {
let outboard_path = self.owned_outboard_path(&hash);
std::fs::write(outboard_path, outboard)?;
std::fs::rename(temp_outboard_path, outboard_path)?;
}
let size = new.size;
let mut state = self.0.state.write().unwrap();
Expand Down Expand Up @@ -996,30 +1019,6 @@ impl Store {
Ok(tag)
}

fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result<TempTag> {
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap();
let (outboard, hash) = bao_tree::io::outboard(&data, IROH_BLOCK_SIZE);
let hash = hash.into();
use baomap::Store;
let tag = self.temp_tag(HashAndFormat(hash, format));
let data_path = self.owned_data_path(&hash);
std::fs::write(data_path, &data)?;
if outboard.len() > 8 {
let outboard_path = self.owned_outboard_path(&hash);
std::fs::write(outboard_path, &outboard)?;
}
let size = data.len() as u64;
let mut state = self.0.state.write().unwrap();
let entry = state.complete.entry(hash).or_default();
entry.union_with(CompleteEntry::new_default(size))?;
state.outboard.insert(hash, outboard.into());
if size < self.0.options.inline_threshold {
state.data.insert(hash, data.to_vec().into());
}
drop(complete_io_guard);
Ok(tag)
}

fn delete_sync(&self, hash: Hash) -> io::Result<()> {
let mut data = None;
let mut outboard = None;
Expand Down
38 changes: 23 additions & 15 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,20 +328,21 @@ where
downloader,
);

let callbacks = Callbacks::default();
let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy {
tracing::info!("Starting GC task with interval {}s", gc_period.as_secs());
tracing::info!("Starting GC task with interval {:?}", gc_period);
let db = self.db.clone();
let callbacks = callbacks.clone();
let task = rt
.local_pool()
.spawn_pinned(move || Self::gc_loop(db, ds, gc_period));
.spawn_pinned(move || Self::gc_loop(db, ds, gc_period, callbacks));
Some(AbortingJoinHandle(task))
} else {
None
};
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
let rt2 = rt.clone();
let rt3 = rt.clone();
let callbacks = Callbacks::default();
let inner = Arc::new(NodeInner {
db: self.db,
endpoint: endpoint.clone(),
Expand Down Expand Up @@ -505,10 +506,15 @@ where
.ok();
}

async fn gc_loop(db: D, ds: S, gc_period: Duration) {
async fn gc_loop(db: D, ds: S, gc_period: Duration, callbacks: Callbacks) {
tracing::debug!("GC loop starting {:?}", gc_period);
'outer: loop {
// do delay before the two phases of GC
tokio::time::sleep(gc_period).await;
tracing::debug!("Starting GC");
callbacks
.send(Event::Db(iroh_bytes::baomap::Event::GcStarted))
.await;
db.clear_live();
let doc_hashes = match ds.content_hashes() {
Ok(hashes) => hashes,
Expand All @@ -518,16 +524,13 @@ where
}
};
let mut doc_db_error = false;
let doc_hashes = doc_hashes.filter_map(|e| {
let hash = match e {
Ok(e) => e,
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
doc_db_error = true;
return None;
}
};
Some(hash)
let doc_hashes = doc_hashes.filter_map(|e| match e {
Ok(hash) => Some(hash),
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
doc_db_error = true;
None
}
});
db.add_live(doc_hashes);
if doc_db_error {
Expand All @@ -551,6 +554,7 @@ where
}
}
}

tracing::info!("Starting GC sweep phase");
let mut stream = db.gc_sweep();
while let Some(item) = stream.next().await {
Expand All @@ -567,6 +571,9 @@ where
}
}
}
callbacks
.send(Event::Db(iroh_bytes::baomap::Event::GcCompleted))
.await;
}
}
}
Expand Down Expand Up @@ -655,7 +662,6 @@ struct NodeInner<D, S: DocStore> {
controller: FlumeConnection<ProviderResponse, ProviderRequest>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<'static, ()> + Send + Sync + 'static>>,
#[allow(dead_code)]
callbacks: Callbacks,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
Expand All @@ -668,6 +674,8 @@ struct NodeInner<D, S: DocStore> {
pub enum Event {
/// Events from the iroh-bytes transfer protocol.
ByteProvide(iroh_bytes::provider::Event),
/// Events from database
Db(iroh_bytes::baomap::Event),
}

impl<D: ReadableStore, S: DocStore> Node<D, S> {
Expand Down
Loading

0 comments on commit 079bb9e

Please sign in to comment.