diff --git a/iroh-bytes/src/baomap.rs b/iroh-bytes/src/baomap.rs index 84843e002d..5691a4ced5 100644 --- a/iroh-bytes/src/baomap.rs +++ b/iroh-bytes/src/baomap.rs @@ -251,7 +251,7 @@ pub trait Store: ReadableStore + PartialMap { /// /// Sweeping might take long, but it can safely be done in the background. fn gc_sweep(&self) -> LocalBoxStream<'_, GcSweepEvent> { - let blobs = self.blobs(); + let blobs = self.blobs().chain(self.partial_blobs()); Gen::new(|co| async move { let mut count = 0; for hash in blobs { @@ -607,3 +607,12 @@ pub enum ValidateProgress { /// We got an error and need to abort. Abort(RpcError), } + +/// Database events +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Event { + /// A GC was started + GcStarted, + /// A GC was completed + GcCompleted, +} diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index b97f9f8632..f1cecf4d2a 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -77,6 +77,12 @@ impl From for Tag { } } +impl From<&str> for Tag { + fn from(value: &str) -> Self { + Self(Bytes::from(value.to_owned())) + } +} + impl Display for Tag { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let bytes = self.0.as_ref(); @@ -124,6 +130,18 @@ pub enum SetTagOption { #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct HashAndFormat(pub Hash, pub BlobFormat); +impl HashAndFormat { + /// Create a new hash and format pair, using the default (raw) format. + pub fn raw(hash: Hash) -> Self { + Self(hash, BlobFormat::RAW) + } + + /// Create a new hash and format pair, using the collection format. + pub fn hash_seq(hash: Hash) -> Self { + Self(hash, BlobFormat::HASHSEQ) + } +} + /// Hash type used throughout. #[derive(PartialEq, Eq, Copy, Clone, Hash)] pub struct Hash(blake3::Hash); diff --git a/iroh/src/baomap/flat.rs b/iroh/src/baomap/flat.rs index ebf36116f4..1e5becbd52 100644 --- a/iroh/src/baomap/flat.rs +++ b/iroh/src/baomap/flat.rs @@ -143,7 +143,7 @@ use iroh_bytes::baomap::{ self, EntryStatus, ExportMode, ImportMode, ImportProgress, LivenessTracker, Map, MapEntry, PartialMap, PartialMapEntry, ReadableStore, TempTag, ValidateProgress, }; -use iroh_bytes::util::progress::{IdGenerator, ProgressSender}; +use iroh_bytes::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender}; use iroh_bytes::util::{BlobFormat, HashAndFormat, Tag}; use iroh_bytes::{Hash, IROH_BLOCK_SIZE}; use iroh_io::{AsyncSliceReader, AsyncSliceWriter, File}; @@ -895,6 +895,21 @@ impl Store { Ok((tag, size)) } + fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result { + let temp_data_path = self.temp_path(); + std::fs::write(&temp_data_path, &data)?; + let id = 0; + let file = ImportFile::TempFile(temp_data_path); + let progress = IgnoreProgressSender::default(); + let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?; + // we have the data in memory, so we can just insert it right now + if data.len() < self.0.options.inline_threshold as usize { + let mut state = self.0.state.write().unwrap(); + state.data.insert(*tag.hash(), data); + } + Ok(tag) + } + fn finalize_import_sync( &self, file: ImportFile, @@ -902,7 +917,6 @@ impl Store { id: u64, progress: impl ProgressSender + IdGenerator, ) -> io::Result<(TempTag, u64)> { - let complete_io_guard = self.0.complete_io_mutex.lock().unwrap(); let size = file.path().metadata()?.len(); progress.blocking_send(ImportProgress::Size { id, size })?; let progress2 = progress.clone(); @@ -910,27 +924,36 @@ impl Store { Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?) })?; progress.blocking_send(ImportProgress::OutboardDone { id, hash })?; - let (tag, new, outboard) = match file { - ImportFile::External(path) => { - use baomap::Store; - let tag = self.temp_tag(HashAndFormat(hash, format)); - (tag, CompleteEntry::new_external(size, path), outboard) - } - ImportFile::TempFile(path) => { + use baomap::Store; + // from here on, everything related to the hash is protected by the temp tag + let tag = self.temp_tag(HashAndFormat(hash, format)); + let hash = *tag.hash(); + let temp_outboard_path = if let Some(outboard) = outboard.as_ref() { + let uuid = new_uuid(); + // we write the outboard to a temp file first, since while it is being written it is not complete. + // it is protected from deletion by the temp tag. + let temp_outboard_path = self.0.options.partial_outboard_path(hash, &uuid); + std::fs::write(&temp_outboard_path, outboard)?; + Some(temp_outboard_path) + } else { + None + }; + // before here we did not touch the complete files at all. + // all writes here are protected by the temp tag + let complete_io_guard = self.0.complete_io_mutex.lock().unwrap(); + // move the data file into place, or create a reference to it + let new = match file { + ImportFile::External(path) => CompleteEntry::new_external(size, path), + ImportFile::TempFile(temp_data_path) => { let data_path = self.owned_data_path(&hash); - use baomap::Store; - // the blob must be pinned before we move the file, otherwise there is a race condition - // where it might be deleted here. - let tag = self.temp_tag(HashAndFormat(hash, BlobFormat::RAW)); - std::fs::rename(path, data_path)?; - (tag, CompleteEntry::new_default(size), outboard) + std::fs::rename(temp_data_path, data_path)?; + CompleteEntry::new_default(size) } }; - // all writes here are protected by the temp tag - let hash = *tag.hash(); - if let Some(outboard) = outboard.as_ref() { + // move the outboard file into place if we have one + if let Some(temp_outboard_path) = temp_outboard_path { let outboard_path = self.owned_outboard_path(&hash); - std::fs::write(outboard_path, outboard)?; + std::fs::rename(temp_outboard_path, outboard_path)?; } let size = new.size; let mut state = self.0.state.write().unwrap(); @@ -996,30 +1019,6 @@ impl Store { Ok(tag) } - fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result { - let complete_io_guard = self.0.complete_io_mutex.lock().unwrap(); - let (outboard, hash) = bao_tree::io::outboard(&data, IROH_BLOCK_SIZE); - let hash = hash.into(); - use baomap::Store; - let tag = self.temp_tag(HashAndFormat(hash, format)); - let data_path = self.owned_data_path(&hash); - std::fs::write(data_path, &data)?; - if outboard.len() > 8 { - let outboard_path = self.owned_outboard_path(&hash); - std::fs::write(outboard_path, &outboard)?; - } - let size = data.len() as u64; - let mut state = self.0.state.write().unwrap(); - let entry = state.complete.entry(hash).or_default(); - entry.union_with(CompleteEntry::new_default(size))?; - state.outboard.insert(hash, outboard.into()); - if size < self.0.options.inline_threshold { - state.data.insert(hash, data.to_vec().into()); - } - drop(complete_io_guard); - Ok(tag) - } - fn delete_sync(&self, hash: Hash) -> io::Result<()> { let mut data = None; let mut outboard = None; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 34208a1848..9d616f64c1 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -328,12 +328,14 @@ where downloader, ); + let callbacks = Callbacks::default(); let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy { - tracing::info!("Starting GC task with interval {}s", gc_period.as_secs()); + tracing::info!("Starting GC task with interval {:?}", gc_period); let db = self.db.clone(); + let callbacks = callbacks.clone(); let task = rt .local_pool() - .spawn_pinned(move || Self::gc_loop(db, ds, gc_period)); + .spawn_pinned(move || Self::gc_loop(db, ds, gc_period, callbacks)); Some(AbortingJoinHandle(task)) } else { None @@ -341,7 +343,6 @@ where let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); let rt2 = rt.clone(); let rt3 = rt.clone(); - let callbacks = Callbacks::default(); let inner = Arc::new(NodeInner { db: self.db, endpoint: endpoint.clone(), @@ -505,10 +506,15 @@ where .ok(); } - async fn gc_loop(db: D, ds: S, gc_period: Duration) { + async fn gc_loop(db: D, ds: S, gc_period: Duration, callbacks: Callbacks) { + tracing::debug!("GC loop starting {:?}", gc_period); 'outer: loop { // do delay before the two phases of GC tokio::time::sleep(gc_period).await; + tracing::debug!("Starting GC"); + callbacks + .send(Event::Db(iroh_bytes::baomap::Event::GcStarted)) + .await; db.clear_live(); let doc_hashes = match ds.content_hashes() { Ok(hashes) => hashes, @@ -518,16 +524,13 @@ where } }; let mut doc_db_error = false; - let doc_hashes = doc_hashes.filter_map(|e| { - let hash = match e { - Ok(e) => e, - Err(err) => { - tracing::error!("Error getting doc hash: {}", err); - doc_db_error = true; - return None; - } - }; - Some(hash) + let doc_hashes = doc_hashes.filter_map(|e| match e { + Ok(hash) => Some(hash), + Err(err) => { + tracing::error!("Error getting doc hash: {}", err); + doc_db_error = true; + None + } }); db.add_live(doc_hashes); if doc_db_error { @@ -551,6 +554,7 @@ where } } } + tracing::info!("Starting GC sweep phase"); let mut stream = db.gc_sweep(); while let Some(item) = stream.next().await { @@ -567,6 +571,9 @@ where } } } + callbacks + .send(Event::Db(iroh_bytes::baomap::Event::GcCompleted)) + .await; } } } @@ -655,7 +662,6 @@ struct NodeInner { controller: FlumeConnection, #[debug("callbacks: Sender>")] cb_sender: mpsc::Sender BoxFuture<'static, ()> + Send + Sync + 'static>>, - #[allow(dead_code)] callbacks: Callbacks, #[allow(dead_code)] gc_task: Option>, @@ -668,6 +674,8 @@ struct NodeInner { pub enum Event { /// Events from the iroh-bytes transfer protocol. ByteProvide(iroh_bytes::provider::Event), + /// Events from database + Db(iroh_bytes::baomap::Event), } impl Node { diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs new file mode 100644 index 0000000000..a2c8fc1c87 --- /dev/null +++ b/iroh/tests/gc.rs @@ -0,0 +1,474 @@ +#![cfg(feature = "mem-db")] +use std::{ + io::{self, Cursor}, + path::PathBuf, + time::Duration, +}; + +use anyhow::Result; +use bao_tree::{ + blake3, + io::{ + fsm::{BaoContentItem, Outboard, ResponseDecoderReadingNext}, + Leaf, Parent, + }, + ChunkRanges, +}; +use bytes::Bytes; +use futures::FutureExt; +use iroh::{baomap, node::Node}; +use iroh_io::AsyncSliceWriter; +use rand::RngCore; +use testdir::testdir; + +use iroh_bytes::{ + baomap::{EntryStatus, Map, PartialMap, PartialMapEntry, Store, TempTag}, + hashseq::HashSeq, + util::{runtime, BlobFormat, HashAndFormat, Tag}, + IROH_BLOCK_SIZE, +}; + +/// Pick up the tokio runtime from the thread local and add a +/// thread per core runtime. +fn test_runtime() -> runtime::Handle { + runtime::Handle::from_current(1).unwrap() +} + +fn create_test_data(n: usize) -> Bytes { + let mut rng = rand::thread_rng(); + let mut data = vec![0; n]; + rng.fill_bytes(&mut data); + data.into() +} + +/// Wrap a bao store in a node that has gc enabled. +async fn wrap_in_node( + bao_store: S, + rt: iroh_bytes::util::runtime::Handle, +) -> Node +where + S: iroh_bytes::baomap::Store, +{ + let doc_store = iroh_sync::store::memory::Store::default(); + Node::builder(bao_store, doc_store) + .runtime(&rt) + .gc_policy(iroh::node::GcPolicy::Interval(Duration::from_millis(50))) + .spawn() + .await + .unwrap() +} + +async fn attach_db_events( + node: &Node, +) -> flume::Receiver { + let (db_send, db_recv) = flume::unbounded(); + node.subscribe(move |ev| { + let db_send = db_send.clone(); + async move { + if let iroh::node::Event::Db(ev) = ev { + db_send.into_send_async(ev).await.ok(); + } + } + .boxed() + }) + .await + .unwrap(); + db_recv +} + +async fn gc_test_node() -> ( + Node, + baomap::mem::Store, + flume::Receiver, +) { + let rt = test_runtime(); + let bao_store = baomap::mem::Store::new(rt.clone()); + let node = wrap_in_node(bao_store.clone(), rt).await; + let db_recv = attach_db_events(&node).await; + (node, bao_store, db_recv) +} + +async fn step(evs: &flume::Receiver) { + while let Ok(ev) = evs.recv_async().await { + if let iroh_bytes::baomap::Event::GcCompleted = ev { + break; + } + } + while let Ok(ev) = evs.recv_async().await { + if let iroh_bytes::baomap::Event::GcCompleted = ev { + break; + } + } +} + +/// Test the absolute basics of gc, temp tags and tags for blobs. +#[tokio::test] +async fn gc_basics() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (node, bao_store, evs) = gc_test_node().await; + let data1 = create_test_data(1234); + let tt1 = bao_store.import_bytes(data1, BlobFormat::RAW).await?; + let data2 = create_test_data(5678); + let tt2 = bao_store.import_bytes(data2, BlobFormat::RAW).await?; + let h1 = *tt1.hash(); + let h2 = *tt2.hash(); + // temp tags are still there, so the entries should be there + step(&evs).await; + assert_eq!(bao_store.contains(&h1), EntryStatus::Complete); + assert_eq!(bao_store.contains(&h2), EntryStatus::Complete); + + // drop the first tag, the entry should be gone after some time + drop(tt1); + step(&evs).await; + assert_eq!(bao_store.contains(&h1), EntryStatus::NotFound); + assert_eq!(bao_store.contains(&h2), EntryStatus::Complete); + + // create an explicit tag for h1 (as raw) and then delete the temp tag. Entry should still be there. + let tag = Tag::from("test"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(h2))) + .await?; + drop(tt2); + step(&evs).await; + assert_eq!(bao_store.contains(&h2), EntryStatus::Complete); + + // delete the explicit tag, entry should be gone + bao_store.set_tag(tag, None).await?; + step(&evs).await; + assert_eq!(bao_store.contains(&h2), EntryStatus::NotFound); + + node.shutdown(); + node.await?; + Ok(()) +} + +/// Test gc for sequences of hashes that protect their children from deletion. +#[tokio::test] +async fn gc_hashseq() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (node, bao_store, evs) = gc_test_node().await; + let data1 = create_test_data(1234); + let tt1 = bao_store.import_bytes(data1, BlobFormat::RAW).await?; + let data2 = create_test_data(5678); + let tt2 = bao_store.import_bytes(data2, BlobFormat::RAW).await?; + let seq = vec![*tt1.hash(), *tt2.hash()] + .into_iter() + .collect::(); + let ttr = bao_store + .import_bytes(seq.into_inner(), BlobFormat::HASHSEQ) + .await?; + let h1 = *tt1.hash(); + let h2 = *tt2.hash(); + let hr = *ttr.hash(); + drop(tt1); + drop(tt2); + + // there is a temp tag for the link seq, so it and its entries should be there + step(&evs).await; + assert_eq!(bao_store.contains(&h1), EntryStatus::Complete); + assert_eq!(bao_store.contains(&h2), EntryStatus::Complete); + assert_eq!(bao_store.contains(&hr), EntryStatus::Complete); + + // make a permanent tag for the link seq, then delete the temp tag. Entries should still be there. + let tag = Tag::from("test"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(hr))) + .await?; + drop(ttr); + step(&evs).await; + assert_eq!(bao_store.contains(&h1), EntryStatus::Complete); + assert_eq!(bao_store.contains(&h2), EntryStatus::Complete); + assert_eq!(bao_store.contains(&hr), EntryStatus::Complete); + + // change the permanent tag to be just for the linkseq itself as a blob. Only the linkseq should be there, not the entries. + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .await?; + step(&evs).await; + assert_eq!(bao_store.contains(&h1), EntryStatus::NotFound); + assert_eq!(bao_store.contains(&h2), EntryStatus::NotFound); + assert_eq!(bao_store.contains(&hr), EntryStatus::Complete); + + // delete the permanent tag, everything should be gone + bao_store.set_tag(tag, None).await?; + step(&evs).await; + assert_eq!(bao_store.contains(&h1), EntryStatus::NotFound); + assert_eq!(bao_store.contains(&h2), EntryStatus::NotFound); + assert_eq!(bao_store.contains(&hr), EntryStatus::NotFound); + + node.shutdown(); + node.await?; + Ok(()) +} + +fn path(root: PathBuf, suffix: &'static str) -> impl Fn(&iroh_bytes::Hash) -> PathBuf { + move |hash| root.join(format!("{}.{}", hash.to_hex(), suffix)) +} + +fn data_path(root: PathBuf) -> impl Fn(&iroh_bytes::Hash) -> PathBuf { + path(root, "data") +} + +fn outboard_path(root: PathBuf) -> impl Fn(&iroh_bytes::Hash) -> PathBuf { + path(root, "obao4") +} + +/// count the number of partial files for a hash. partial files are -. +fn count_partial( + root: PathBuf, + suffix: &'static str, +) -> impl Fn(&iroh_bytes::Hash) -> std::io::Result { + move |hash| { + let valid_names = std::fs::read_dir(&root)? + .filter_map(|e| e.ok()) + .filter_map(|e| { + if e.metadata().ok()?.is_file() { + e.file_name().into_string().ok() + } else { + None + } + }); + let prefix = format!("{}-", hash.to_hex()); + Ok(valid_names + .filter(|x| x.starts_with(&prefix) && x.ends_with(suffix)) + .count()) + } +} + +/// count the number of partial data files for a hash +fn count_partial_data(root: PathBuf) -> impl Fn(&iroh_bytes::Hash) -> std::io::Result { + count_partial(root, "data") +} + +/// count the number of partial outboard files for a hash +fn count_partial_outboard(root: PathBuf) -> impl Fn(&iroh_bytes::Hash) -> std::io::Result { + count_partial(root, "obao4") +} + +/// Test gc for sequences of hashes that protect their children from deletion. +#[tokio::test] +async fn gc_flat_basics() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let rt = test_runtime(); + let dir = testdir!(); + let path = data_path(dir.clone()); + let outboard_path = outboard_path(dir.clone()); + + let bao_store = baomap::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt).await?; + let node = wrap_in_node(bao_store.clone(), rt).await; + let evs = attach_db_events(&node).await; + let data1 = create_test_data(123456); + let tt1 = bao_store + .import_bytes(data1.clone(), BlobFormat::RAW) + .await?; + let data2 = create_test_data(567890); + let tt2 = bao_store + .import_bytes(data2.clone(), BlobFormat::RAW) + .await?; + let seq = vec![*tt1.hash(), *tt2.hash()] + .into_iter() + .collect::(); + let ttr = bao_store + .import_bytes(seq.into_inner(), BlobFormat::HASHSEQ) + .await?; + + let h1 = *tt1.hash(); + let h2 = *tt2.hash(); + let hr = *ttr.hash(); + + step(&evs).await; + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + assert!(path(&h2).exists()); + assert!(outboard_path(&h2).exists()); + assert!(path(&hr).exists()); + // hr is too small to have an outboard file + + drop(tt1); + drop(tt2); + let tag = Tag::from("test"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) + .await?; + drop(ttr); + + step(&evs).await; + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + assert!(path(&h2).exists()); + assert!(outboard_path(&h2).exists()); + assert!(path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .await?; + step(&evs).await; + assert!(!path(&h1).exists()); + assert!(!outboard_path(&h1).exists()); + assert!(!path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + assert!(path(&hr).exists()); + + bao_store.set_tag(tag, None).await?; + step(&evs).await; + assert!(!path(&hr).exists()); + + node.shutdown(); + node.await?; + Ok(()) +} + +/// Take some data and encode it +#[allow(dead_code)] +fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { + let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE); + let mut encoded = Vec::new(); + bao_tree::io::sync::encode_ranges_validated(data, &outboard, &ChunkRanges::all(), &mut encoded) + .unwrap(); + let hash = outboard.root(); + (hash, Cursor::new(encoded.into())) +} + +/// Add a file to the store in the same way a download works. +/// +/// we know the hash in advance, create a partial entry, write the data to it and +/// the outboard file, then commit it to a complete entry. +/// +/// During this time, the partial entry is protected by a temp tag. +#[allow(dead_code)] +async fn simulate_download_protected( + bao_store: &S, + data: Bytes, +) -> io::Result { + use bao_tree::io::fsm::OutboardMut; + // simulate the remote side. + let (hash, response) = simulate_remote(data.as_ref()); + // simulate the local side. + // we got a hash and a response from the remote side. + let tt = bao_store.temp_tag(HashAndFormat::raw(hash.into())); + // start reading the response + let at_start = bao_tree::io::fsm::ResponseDecoderStart::new( + hash, + ChunkRanges::all(), + IROH_BLOCK_SIZE, + response, + ); + // get the size + let (mut reading, size) = at_start.next().await?; + // create the partial entry + let entry = bao_store.get_or_create_partial(hash.into(), size)?; + // create the + let mut ow = None; + let mut dw = entry.data_writer().await?; + while let ResponseDecoderReadingNext::More((next, res)) = reading.next().await { + match res? { + BaoContentItem::Parent(Parent { node, pair }) => { + // convoluted crap to create the outboard writer lazily, only if needed + let ow = if let Some(ow) = ow.as_mut() { + ow + } else { + let t = entry.outboard_mut().await?; + ow = Some(t); + ow.as_mut().unwrap() + }; + ow.save(node, &pair).await?; + } + BaoContentItem::Leaf(Leaf { offset, data }) => { + dw.write_bytes_at(offset.0, data).await?; + } + } + reading = next; + } + // commit the entry + bao_store.insert_complete(entry).await?; + Ok(tt) +} + +/// Test that partial files are deleted. +#[tokio::test] +async fn gc_flat_partial() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let rt = test_runtime(); + let dir = testdir!(); + let count_partial_data = count_partial_data(dir.clone()); + let count_partial_outboard = count_partial_outboard(dir.clone()); + + let bao_store = baomap::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt).await?; + let node = wrap_in_node(bao_store.clone(), rt).await; + let evs = attach_db_events(&node).await; + + let data1: Bytes = create_test_data(123456); + let (_o1, h1) = bao_tree::io::outboard(&data1, IROH_BLOCK_SIZE); + let h1 = h1.into(); + let tt1 = bao_store.temp_tag(HashAndFormat::raw(h1)); + { + let entry = bao_store.get_or_create_partial(h1, data1.len() as u64)?; + let mut dw = entry.data_writer().await?; + dw.write_bytes_at(0, data1.slice(..32 * 1024)).await?; + let _ow = entry.outboard_mut().await?; + } + + // partial data and outboard files should be there + step(&evs).await; + assert!(count_partial_data(&h1)? == 1); + assert!(count_partial_outboard(&h1)? == 1); + + drop(tt1); + // partial data and outboard files should be gone + step(&evs).await; + assert!(count_partial_data(&h1)? == 0); + assert!(count_partial_outboard(&h1)? == 0); + + node.shutdown(); + node.await?; + Ok(()) +} + +/// +#[tokio::test] +#[cfg(not(debug_assertions))] +async fn gc_flat_stress() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let rt = test_runtime(); + let dir = testdir!(); + let count_partial_data = count_partial_data(dir.clone()); + let count_partial_outboard = count_partial_outboard(dir.clone()); + + let bao_store = baomap::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt).await?; + let node = wrap_in_node(bao_store.clone(), rt).await; + + let mut deleted = Vec::new(); + let mut live = Vec::new(); + // download + for i in 0..10000 { + let data: Bytes = create_test_data(16 * 1024 * 3 + 1); + let tt = simulate_download_protected(&bao_store, data).await.unwrap(); + if i % 100 == 0 { + let tag = Tag::from(format!("test{}", i)); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) + .await?; + live.push(*tt.hash()); + } else { + deleted.push(*tt.hash()); + } + } + step().await; + + for h in deleted.iter() { + assert!(count_partial_data(h)? == 0); + assert!(count_partial_outboard(h)? == 0); + assert_eq!(bao_store.contains(h), EntryStatus::NotFound); + } + + for h in live.iter() { + assert!(count_partial_data(h)? == 0); + assert!(count_partial_outboard(h)? == 0); + assert_eq!(bao_store.contains(h), EntryStatus::Complete); + } + + node.shutdown(); + node.await?; + Ok(()) +} diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index 1c631a91e4..416cbb6092 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -778,14 +778,12 @@ async fn test_token_passthrough() -> Result<()> { node.subscribe(move |event| { let events_sender = events_sender.clone(); async move { - match event { - Event::ByteProvide(bp_msg) => { - if let iroh_bytes::provider::Event::GetRequestReceived { token: tok, .. } = - bp_msg - { - events_sender.send(tok).expect("receiver dropped"); - } - } + if let Event::ByteProvide(iroh_bytes::provider::Event::GetRequestReceived { + token: tok, + .. + }) = event + { + events_sender.send(tok).expect("receiver dropped"); } } .boxed()