-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gc tests #1585
Gc tests #1585
Changes from 8 commits
5a7c3bc
80b8225
6fbbab6
96666a2
31167bc
0562442
46e3230
cd36837
6a88c70
8138bf0
47358ca
0146754
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,12 @@ impl From<String> for Tag { | |
} | ||
} | ||
|
||
impl From<&str> for Tag { | ||
fn from(value: &str) -> Self { | ||
Self(Bytes::from(value.to_owned())) | ||
} | ||
} | ||
|
||
impl Display for Tag { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
let bytes = self.0.as_ref(); | ||
|
@@ -124,6 +130,18 @@ pub enum SetTagOption { | |
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] | ||
pub struct HashAndFormat(pub Hash, pub BlobFormat); | ||
|
||
impl HashAndFormat { | ||
/// Create a new hash and format pair, using the default (raw) format. | ||
pub fn raw(hash: Hash) -> Self { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just for convenience |
||
Self(hash, BlobFormat::RAW) | ||
} | ||
|
||
/// Create a new hash and format pair, using the collection format. | ||
pub fn hash_seq(hash: Hash) -> Self { | ||
Self(hash, BlobFormat::HASHSEQ) | ||
} | ||
} | ||
|
||
/// Hash type used throught. | ||
#[derive(PartialEq, Eq, Copy, Clone, Hash)] | ||
pub struct Hash(blake3::Hash); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,7 +143,7 @@ use iroh_bytes::baomap::{ | |
self, EntryStatus, ExportMode, ImportMode, ImportProgress, LivenessTracker, Map, MapEntry, | ||
PartialMap, PartialMapEntry, ReadableStore, TempTag, ValidateProgress, | ||
}; | ||
use iroh_bytes::util::progress::{IdGenerator, ProgressSender}; | ||
use iroh_bytes::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender}; | ||
use iroh_bytes::util::{BlobFormat, HashAndFormat, Tag}; | ||
use iroh_bytes::{Hash, IROH_BLOCK_SIZE}; | ||
use iroh_io::{AsyncSliceReader, AsyncSliceWriter, File}; | ||
|
@@ -895,42 +895,65 @@ impl Store { | |
Ok((tag, size)) | ||
} | ||
|
||
fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result<TempTag> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use finalize_import_sync from import_bytes_sync for DRY |
||
let temp_data_path = self.temp_path(); | ||
std::fs::write(&temp_data_path, &data)?; | ||
let id = 0; | ||
let file = ImportFile::TempFile(temp_data_path); | ||
let progress = IgnoreProgressSender::default(); | ||
let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?; | ||
// we have the data in memory, so we can just insert it right now | ||
if data.len() < self.0.options.inline_threshold as usize { | ||
let mut state = self.0.state.write().unwrap(); | ||
state.data.insert(*tag.hash(), data); | ||
} | ||
Ok(tag) | ||
} | ||
|
||
fn finalize_import_sync( | ||
&self, | ||
file: ImportFile, | ||
format: BlobFormat, | ||
id: u64, | ||
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator, | ||
) -> io::Result<(TempTag, u64)> { | ||
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap(); | ||
let size = file.path().metadata()?.len(); | ||
progress.blocking_send(ImportProgress::Size { id, size })?; | ||
let progress2 = progress.clone(); | ||
let (hash, outboard) = compute_outboard(file.path(), size, move |offset| { | ||
Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?) | ||
})?; | ||
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?; | ||
let (tag, new, outboard) = match file { | ||
ImportFile::External(path) => { | ||
use baomap::Store; | ||
let tag = self.temp_tag(HashAndFormat(hash, format)); | ||
(tag, CompleteEntry::new_external(size, path), outboard) | ||
} | ||
ImportFile::TempFile(path) => { | ||
use baomap::Store; | ||
// from here on, everything related to the hash is protected by the temp tag | ||
let tag = self.temp_tag(HashAndFormat(hash, format)); | ||
let hash = *tag.hash(); | ||
let temp_outboard_path = if let Some(outboard) = outboard.as_ref() { | ||
let uuid = new_uuid(); | ||
// we write the outboard to a temp file first, since while it is being written it is not complete. | ||
// it is protected from deletion by the temp tag. | ||
let temp_outboard_path = self.0.options.partial_outboard_path(hash, &uuid); | ||
std::fs::write(&temp_outboard_path, outboard)?; | ||
Some(temp_outboard_path) | ||
} else { | ||
None | ||
}; | ||
// before here we did not touch the complete files at all. | ||
// all writes here are protected by the temp tag | ||
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap(); | ||
// move the data file into place, or create a reference to it | ||
let new = match file { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a bug before. You must pass through the format. if somebody adds a hashseq using import_stream or import_reader, the resulting temp tag must still be with format HashSeq. |
||
ImportFile::External(path) => CompleteEntry::new_external(size, path), | ||
ImportFile::TempFile(temp_data_path) => { | ||
let data_path = self.owned_data_path(&hash); | ||
use baomap::Store; | ||
// the blob must be pinned before we move the file, otherwise there is a race condition | ||
// where it might be deleted here. | ||
let tag = self.temp_tag(HashAndFormat(hash, BlobFormat::RAW)); | ||
std::fs::rename(path, data_path)?; | ||
(tag, CompleteEntry::new_default(size), outboard) | ||
std::fs::rename(temp_data_path, data_path)?; | ||
CompleteEntry::new_default(size) | ||
} | ||
}; | ||
// all writes here are protected by the temp tag | ||
let hash = *tag.hash(); | ||
if let Some(outboard) = outboard.as_ref() { | ||
// move the outboard file into place if we have one | ||
if let Some(temp_outboard_path) = temp_outboard_path { | ||
let outboard_path = self.owned_outboard_path(&hash); | ||
std::fs::write(outboard_path, outboard)?; | ||
std::fs::rename(temp_outboard_path, outboard_path)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was a bug before. You must never write to the final outboard file. It can only come into existence by an atomic |
||
} | ||
let size = new.size; | ||
let mut state = self.0.state.write().unwrap(); | ||
|
@@ -996,30 +1019,6 @@ impl Store { | |
Ok(tag) | ||
} | ||
|
||
fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> io::Result<TempTag> { | ||
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap(); | ||
let (outboard, hash) = bao_tree::io::outboard(&data, IROH_BLOCK_SIZE); | ||
let hash = hash.into(); | ||
use baomap::Store; | ||
let tag = self.temp_tag(HashAndFormat(hash, format)); | ||
let data_path = self.owned_data_path(&hash); | ||
std::fs::write(data_path, &data)?; | ||
if outboard.len() > 8 { | ||
let outboard_path = self.owned_outboard_path(&hash); | ||
std::fs::write(outboard_path, &outboard)?; | ||
} | ||
let size = data.len() as u64; | ||
let mut state = self.0.state.write().unwrap(); | ||
let entry = state.complete.entry(hash).or_default(); | ||
entry.union_with(CompleteEntry::new_default(size))?; | ||
state.outboard.insert(hash, outboard.into()); | ||
if size < self.0.options.inline_threshold { | ||
state.data.insert(hash, data.to_vec().into()); | ||
} | ||
drop(complete_io_guard); | ||
Ok(tag) | ||
} | ||
|
||
fn delete_sync(&self, hash: Hash) -> io::Result<()> { | ||
let mut data = None; | ||
let mut outboard = None; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was an actual bug. I was not considering partial blogs for cleanup.