Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gc tests #1585

Merged
merged 12 commits into from
Oct 10, 2023
11 changes: 10 additions & 1 deletion iroh-bytes/src/baomap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ pub trait Store: ReadableStore + PartialMap {
///
/// Sweeping might take long, but it can safely be done in the background.
fn gc_sweep(&self) -> LocalBoxStream<'_, GcSweepEvent> {
let blobs = self.blobs();
let blobs = self.blobs().chain(self.partial_blobs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was an actual bug. I was not considering partial blogs for cleanup.

Gen::new(|co| async move {
let mut count = 0;
for hash in blobs {
Expand Down Expand Up @@ -607,3 +607,12 @@ pub enum ValidateProgress {
/// We got an error and need to abort.
Abort(RpcError),
}

/// Database events
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
/// A GC was started
GcStarted,
/// A GC was completed
GcCompleted,
}
18 changes: 18 additions & 0 deletions iroh-bytes/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ impl From<String> for Tag {
}
}

impl From<&str> for Tag {
fn from(value: &str) -> Self {
Self(Bytes::from(value.to_owned()))
}
}

impl Display for Tag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let bytes = self.0.as_ref();
Expand Down Expand Up @@ -124,6 +130,18 @@ pub enum SetTagOption {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct HashAndFormat(pub Hash, pub BlobFormat);

impl HashAndFormat {
/// Create a new hash and format pair, using the default (raw) format.
pub fn raw(hash: Hash) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for convenience

Self(hash, BlobFormat::RAW)
}

/// Create a new hash and format pair, using the collection format.
pub fn hash_seq(hash: Hash) -> Self {
Self(hash, BlobFormat::HASHSEQ)
}
}

/// Hash type used throughout.
#[derive(PartialEq, Eq, Copy, Clone, Hash)]
pub struct Hash(blake3::Hash);
Expand Down
85 changes: 42 additions & 43 deletions iroh/src/baomap/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ use iroh_bytes::baomap::{
self, EntryStatus, ExportMode, ImportMode, ImportProgress, LivenessTracker, Map, MapEntry,
PartialMap, PartialMapEntry, ReadableStore, TempTag, ValidateProgress,
};
use iroh_bytes::util::progress::{IdGenerator, ProgressSender};
use iroh_bytes::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender};
use iroh_bytes::util::{BlobFormat, HashAndFormat, Tag};
use iroh_bytes::{Hash, IROH_BLOCK_SIZE};
use iroh_io::{AsyncSliceReader, AsyncSliceWriter, File};
Expand Down Expand Up @@ -895,42 +895,65 @@ impl Store {
Ok((tag, size))
}

fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result<TempTag> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use finalize_import_sync from import_bytes_sync for DRY

let temp_data_path = self.temp_path();
std::fs::write(&temp_data_path, &data)?;
let id = 0;
let file = ImportFile::TempFile(temp_data_path);
let progress = IgnoreProgressSender::default();
let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
// we have the data in memory, so we can just insert it right now
if data.len() < self.0.options.inline_threshold as usize {
let mut state = self.0.state.write().unwrap();
state.data.insert(*tag.hash(), data);
}
Ok(tag)
}

fn finalize_import_sync(
&self,
file: ImportFile,
format: BlobFormat,
id: u64,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(TempTag, u64)> {
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap();
let size = file.path().metadata()?.len();
progress.blocking_send(ImportProgress::Size { id, size })?;
let progress2 = progress.clone();
let (hash, outboard) = compute_outboard(file.path(), size, move |offset| {
Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
})?;
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
let (tag, new, outboard) = match file {
ImportFile::External(path) => {
use baomap::Store;
let tag = self.temp_tag(HashAndFormat(hash, format));
(tag, CompleteEntry::new_external(size, path), outboard)
}
ImportFile::TempFile(path) => {
use baomap::Store;
// from here on, everything related to the hash is protected by the temp tag
let tag = self.temp_tag(HashAndFormat(hash, format));
let hash = *tag.hash();
let temp_outboard_path = if let Some(outboard) = outboard.as_ref() {
let uuid = new_uuid();
// we write the outboard to a temp file first, since while it is being written it is not complete.
// it is protected from deletion by the temp tag.
let temp_outboard_path = self.0.options.partial_outboard_path(hash, &uuid);
std::fs::write(&temp_outboard_path, outboard)?;
Some(temp_outboard_path)
} else {
None
};
// before here we did not touch the complete files at all.
// all writes here are protected by the temp tag
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap();
// move the data file into place, or create a reference to it
let new = match file {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug before. You must pass through the format.

if somebody adds a hashseq using import_stream or import_reader, the resulting temp tag must still be with format HashSeq.

ImportFile::External(path) => CompleteEntry::new_external(size, path),
ImportFile::TempFile(temp_data_path) => {
let data_path = self.owned_data_path(&hash);
use baomap::Store;
// the blob must be pinned before we move the file, otherwise there is a race condition
// where it might be deleted here.
let tag = self.temp_tag(HashAndFormat(hash, BlobFormat::RAW));
std::fs::rename(path, data_path)?;
(tag, CompleteEntry::new_default(size), outboard)
std::fs::rename(temp_data_path, data_path)?;
CompleteEntry::new_default(size)
}
};
// all writes here are protected by the temp tag
let hash = *tag.hash();
if let Some(outboard) = outboard.as_ref() {
// move the outboard file into place if we have one
if let Some(temp_outboard_path) = temp_outboard_path {
let outboard_path = self.owned_outboard_path(&hash);
std::fs::write(outboard_path, outboard)?;
std::fs::rename(temp_outboard_path, outboard_path)?;
Copy link
Contributor Author

@rklaehn rklaehn Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a bug before. You must never write to the final outboard file. It can only come into existence by an atomic mv. It would have been really hard to observe this though...

}
let size = new.size;
let mut state = self.0.state.write().unwrap();
Expand Down Expand Up @@ -996,30 +1019,6 @@ impl Store {
Ok(tag)
}

fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result<TempTag> {
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap();
let (outboard, hash) = bao_tree::io::outboard(&data, IROH_BLOCK_SIZE);
let hash = hash.into();
use baomap::Store;
let tag = self.temp_tag(HashAndFormat(hash, format));
let data_path = self.owned_data_path(&hash);
std::fs::write(data_path, &data)?;
if outboard.len() > 8 {
let outboard_path = self.owned_outboard_path(&hash);
std::fs::write(outboard_path, &outboard)?;
}
let size = data.len() as u64;
let mut state = self.0.state.write().unwrap();
let entry = state.complete.entry(hash).or_default();
entry.union_with(CompleteEntry::new_default(size))?;
state.outboard.insert(hash, outboard.into());
if size < self.0.options.inline_threshold {
state.data.insert(hash, data.to_vec().into());
}
drop(complete_io_guard);
Ok(tag)
}

fn delete_sync(&self, hash: Hash) -> io::Result<()> {
let mut data = None;
let mut outboard = None;
Expand Down
38 changes: 23 additions & 15 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,20 +328,21 @@ where
downloader,
);

let callbacks = Callbacks::default();
let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy {
tracing::info!("Starting GC task with interval {}s", gc_period.as_secs());
tracing::info!("Starting GC task with interval {:?}", gc_period);
let db = self.db.clone();
let callbacks = callbacks.clone();
let task = rt
.local_pool()
.spawn_pinned(move || Self::gc_loop(db, ds, gc_period));
.spawn_pinned(move || Self::gc_loop(db, ds, gc_period, callbacks));
Some(AbortingJoinHandle(task))
} else {
None
};
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
let rt2 = rt.clone();
let rt3 = rt.clone();
let callbacks = Callbacks::default();
let inner = Arc::new(NodeInner {
db: self.db,
endpoint: endpoint.clone(),
Expand Down Expand Up @@ -505,10 +506,15 @@ where
.ok();
}

async fn gc_loop(db: D, ds: S, gc_period: Duration) {
async fn gc_loop(db: D, ds: S, gc_period: Duration, callbacks: Callbacks) {
tracing::debug!("GC loop starting {:?}", gc_period);
'outer: loop {
// do delay before the two phases of GC
tokio::time::sleep(gc_period).await;
tracing::debug!("Starting GC");
callbacks
.send(Event::Db(iroh_bytes::baomap::Event::GcStarted))
.await;
db.clear_live();
let doc_hashes = match ds.content_hashes() {
Ok(hashes) => hashes,
Expand All @@ -518,16 +524,13 @@ where
}
};
let mut doc_db_error = false;
let doc_hashes = doc_hashes.filter_map(|e| {
let hash = match e {
Ok(e) => e,
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
doc_db_error = true;
return None;
}
};
Some(hash)
let doc_hashes = doc_hashes.filter_map(|e| match e {
Ok(hash) => Some(hash),
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
doc_db_error = true;
None
}
});
db.add_live(doc_hashes);
if doc_db_error {
Expand All @@ -551,6 +554,7 @@ where
}
}
}

tracing::info!("Starting GC sweep phase");
let mut stream = db.gc_sweep();
while let Some(item) = stream.next().await {
Expand All @@ -567,6 +571,9 @@ where
}
}
}
callbacks
.send(Event::Db(iroh_bytes::baomap::Event::GcCompleted))
.await;
}
}
}
Expand Down Expand Up @@ -655,7 +662,6 @@ struct NodeInner<D, S: DocStore> {
controller: FlumeConnection<ProviderResponse, ProviderRequest>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<'static, ()> + Send + Sync + 'static>>,
#[allow(dead_code)]
callbacks: Callbacks,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
Expand All @@ -668,6 +674,8 @@ struct NodeInner<D, S: DocStore> {
pub enum Event {
/// Events from the iroh-bytes transfer protocol.
ByteProvide(iroh_bytes::provider::Event),
/// Events from database
Db(iroh_bytes::baomap::Event),
}

impl<D: ReadableStore, S: DocStore> Node<D, S> {
Expand Down
Loading
Loading