From fdccf3b78affe5f841479d47d6c1641c10be18c4 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 18 Jan 2024 17:52:28 +0200 Subject: [PATCH] Handle tags also in redb --- iroh-base/src/hash.rs | 45 +++++++++++++ iroh-bytes/src/store/flat.rs | 124 ++++++++++++++++------------------- iroh-bytes/src/store/mem.rs | 5 +- iroh-bytes/src/util.rs | 46 ++++++++++++- 4 files changed, 149 insertions(+), 71 deletions(-) diff --git a/iroh-base/src/hash.rs b/iroh-base/src/hash.rs index 9c6fe8f006..41b8addbc8 100644 --- a/iroh-base/src/hash.rs +++ b/iroh-base/src/hash.rs @@ -251,6 +251,51 @@ pub struct HashAndFormat { pub format: BlobFormat, } +#[cfg(feature = "redb")] +mod redb_support { + use super::{BlobFormat, Hash, HashAndFormat}; + use redb::RedbValue; + + impl RedbValue for HashAndFormat { + type SelfType<'a> = Self; + + type AsBytes<'a> = [u8; 33]; + + fn fixed_width() -> Option { + Some(33) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let t: &'a [u8; 33] = data.try_into().unwrap(); + let format = match t[0] { + 0 => BlobFormat::Raw, + 1 => BlobFormat::HashSeq, + _ => panic!("invalid format"), + }; + let hash = Hash::from_bytes(t[1..].try_into().unwrap()); + Self { hash, format } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + let mut res = [0u8; 33]; + res[0] = u64::from(value.format) as u8; + res[1..].copy_from_slice(value.hash.as_bytes()); + res + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("iroh_base::HashAndFormat") + } + } +} + impl HashAndFormat { /// Create a new hash and format pair, using the default (raw) format. pub fn raw(hash: Hash) -> Self { diff --git a/iroh-bytes/src/store/flat.rs b/iroh-bytes/src/store/flat.rs index a5faeb8c73..3c4e2751da 100644 --- a/iroh-bytes/src/store/flat.rs +++ b/iroh-bytes/src/store/flat.rs @@ -483,7 +483,6 @@ impl Options { struct Inner { options: Options, state: RwLock, - tags: RwLock>, // mutex for async access to complete files // // complete files are never written to. They come into existence when a partial @@ -493,22 +492,16 @@ struct Inner { } /// Table: Partial Index -/// Key: [u8; 32] # Hash -/// Value: -/// - u64 - size -/// - [u8; 16] - UUID const PARTIAL_TABLE: TableDefinition = TableDefinition::new("partial-index-0"); /// Table: Full Index -/// Key: [u8; 32] # Hash -/// Value: -/// - u64 - size -/// - bool - owned -/// - &[u8] - External Paths (might be empty) const COMPLETE_TABLE: TableDefinition = TableDefinition::new("complete-index-0"); +/// Table: Tags +const TAGS_TABLE: TableDefinition = TableDefinition::new("tags-0"); + /// Flat file database implementation. /// /// This @@ -705,10 +698,15 @@ impl ReadableStore for Store { } fn tags(&self) -> Box + Send + Sync + 'static> { - let inner = self.0.tags.read().unwrap(); - let items = inner + let inner = self.0.db.begin_read().unwrap(); + let tags_table = inner.open_table(TAGS_TABLE).unwrap(); + let items = tags_table .iter() - .map(|(k, v)| (k.clone(), *v)) + .unwrap() + .map(|x| { + let (k, v) = x.unwrap(); + (k.value(), v.value()) + }) .collect::>(); Box::new(items.into_iter()) } @@ -814,7 +812,7 @@ impl super::Store for Store { fn create_tag(&self, value: HashAndFormat) -> BoxFuture<'_, io::Result> { let this = self.clone(); - tokio::task::spawn_blocking(move || this.create_tag_sync(value)) + tokio::task::spawn_blocking(move || this.create_tag_sync(value).map_err(to_io_err)) .map(flatten_to_io) .boxed() } @@ -1018,48 +1016,33 @@ impl Store { fn set_tag_sync(&self, name: Tag, value: Option) -> io::Result<()> { tracing::debug!("set_tag {} {:?}", name, value); - let mut tags = self.0.tags.write().unwrap(); - let mut new_tags = tags.clone(); - let changed = if let Some(value) = value { - if let Some(old_value) = new_tags.insert(name, value) { - value != old_value + let txn = self.0.db.begin_write().map_err(to_io_err)?; + { + let mut tags = txn.open_table(TAGS_TABLE).map_err(to_io_err)?; + if let Some(target) = value { + tags.insert(name, target) } else { - true + tags.remove(name) } - } else { - new_tags.remove(&name).is_some() - }; - if changed { - let serialized = postcard::to_stdvec(&new_tags).unwrap(); - let temp_path = self - .0 - .options - .meta_path - .join(format!("tags-{}.meta", hex::encode(new_uuid()))); - let final_path = self.0.options.meta_path.join("tags.meta"); - write_atomic(&temp_path, &final_path, &serialized)?; - *tags = new_tags; + .map_err(to_io_err)?; } - drop(tags); + txn.commit().map_err(to_io_err)?; Ok(()) } - fn create_tag_sync(&self, value: HashAndFormat) -> io::Result { + fn create_tag_sync(&self, value: HashAndFormat) -> std::result::Result { tracing::debug!("create_tag {:?}", value); - let mut tags = self.0.tags.write().unwrap(); - let mut new_tags = tags.clone(); - let tag = Tag::auto(SystemTime::now(), |x| new_tags.contains_key(x)); - new_tags.insert(tag.clone(), value); - let serialized = postcard::to_stdvec(&new_tags).unwrap(); - let temp_path = self - .0 - .options - .meta_path - .join(format!("tags-{}.meta", hex::encode(new_uuid()))); - let final_path = self.0.options.meta_path.join("tags.meta"); - write_atomic(&temp_path, &final_path, &serialized)?; - *tags = new_tags; - drop(tags); + let txn = self.0.db.begin_write().map_err(to_io_err)?; + let tag = { + let mut tags = txn.open_table(TAGS_TABLE).map_err(to_io_err)?; + let tag = Tag::auto(SystemTime::now(), |t| { + tags.get(Tag(Bytes::copy_from_slice(t))) + .map(|x| x.is_some()) + })?; + tags.insert(&tag, value)?; + tag + }; + txn.commit()?; Ok(tag) } @@ -1156,7 +1139,7 @@ impl Store { std::fs::rename(temp_data_path, data_path)?; if temp_outboard_path.exists() { let outboard_path = self.0.options.owned_outboard_path(&hash); - std::fs::rename(temp_outboard_path, &outboard_path)?; + std::fs::rename(temp_outboard_path, outboard_path)?; } let write_tx = self.0.db.begin_write().map_err(to_io_err)?; { @@ -1312,27 +1295,21 @@ impl Store { std::fs::create_dir_all(&meta_path)?; let db = Database::create(db_path)?; + // create tables if they don't exist let write_tx = db.begin_write()?; { let _table = write_tx.open_table(PARTIAL_TABLE)?; let _table = write_tx.open_table(COMPLETE_TABLE)?; + let _table = write_tx.open_table(TAGS_TABLE)?; } write_tx.commit()?; - let tags_path = meta_path.join("tags.meta"); - let mut tags = BTreeMap::new(); - if tags_path.exists() { - let data = std::fs::read(tags_path)?; - tags = postcard::from_bytes(&data)?; - tracing::debug!("loaded tags. {} entries", tags.len()); - }; let res = Self(Arc::new(Inner { state: RwLock::new(State { data: Default::default(), live: Default::default(), temp: Default::default(), }), - tags: RwLock::new(tags), options: Options { complete_path, partial_path, @@ -1356,6 +1333,15 @@ impl Store { let complete_path = &self.0.options.complete_path; let partial_path = &self.0.options.partial_path; + let meta_path = &self.0.options.meta_path; + let tags_path = meta_path.join("tags.meta"); + let mut tags = BTreeMap::::new(); + if tags_path.exists() { + let data = std::fs::read(tags_path)?; + tags = postcard::from_bytes(&data)?; + tracing::debug!("loaded tags. {} entries", tags.len()); + }; + tracing::info!("migration from v1 to v2"); tracing::info!("complete_path: {}", complete_path.display()); tracing::info!("partial_path: {}", partial_path.display()); @@ -1589,16 +1575,20 @@ impl Store { tracing::info!("partial {}", hash); } let txn = self.0.db.begin_write()?; - let mut complete_table = txn.open_table(COMPLETE_TABLE)?; - let mut partial_table = txn.open_table(PARTIAL_TABLE)?; - for (hash, entry) in complete { - complete_table.insert(hash, entry)?; - } - for (hash, entry) in partial { - partial_table.insert(hash, entry)?; + { + let mut complete_table = txn.open_table(COMPLETE_TABLE)?; + let mut partial_table = txn.open_table(PARTIAL_TABLE)?; + let mut tags_table = txn.open_table(TAGS_TABLE)?; + for (hash, entry) in complete { + complete_table.insert(hash, entry)?; + } + for (hash, entry) in partial { + partial_table.insert(hash, entry)?; + } + for (tag, target) in tags { + tags_table.insert(tag, target)?; + } } - drop(complete_table); - drop(partial_table); txn.commit()?; Ok(()) diff --git a/iroh-bytes/src/store/mem.rs b/iroh-bytes/src/store/mem.rs index d595a392aa..175902e2ba 100644 --- a/iroh-bytes/src/store/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -526,7 +526,10 @@ impl super::Store for Store { fn create_tag(&self, hash: HashAndFormat) -> BoxFuture<'_, io::Result> { let mut state = self.0.state.write().unwrap(); - let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x)); + let tag = Tag::auto(SystemTime::now(), |x| { + io::Result::Ok(state.tags.contains_key(x)) + }) + .unwrap(); state.tags.insert(tag.clone(), hash); futures::future::ok(tag).boxed() } diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index b44796607a..b93b3a52c4 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -15,6 +15,43 @@ pub mod progress; #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)] pub struct Tag(pub Bytes); +#[cfg(feature = "redb")] +impl redb::RedbValue for Tag { + type SelfType<'a> = Self; + + type AsBytes<'a> = &'a [u8]; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + Self(Bytes::copy_from_slice(data)) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + value.0.as_ref() + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("iroh_base::Tag") + } +} + +#[cfg(feature = "redb")] +impl redb::RedbKey for Tag { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + data1.cmp(data2) + } +} + impl Borrow<[u8]> for Tag { fn borrow(&self) -> &[u8] { self.0.as_ref() @@ -59,7 +96,10 @@ impl Debug for Tag { impl Tag { /// Create a new tag that does not exist yet. - pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self { + pub fn auto( + time: SystemTime, + exists: impl Fn(&[u8]) -> std::result::Result, + ) -> std::result::Result { let now = chrono::DateTime::::from(time); let mut i = 0; loop { @@ -67,8 +107,8 @@ impl Tag { if i != 0 { text.push_str(&format!("-{}", i)); } - if !exists(text.as_bytes()) { - return Self::from(text); + if !exists(text.as_bytes())? { + return Ok(Self::from(text)); } i += 1; }