Skip to content

Commit

Permalink
Handle tags also in redb
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jan 18, 2024
1 parent 8e22925 commit fdccf3b
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 71 deletions.
45 changes: 45 additions & 0 deletions iroh-base/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,51 @@ pub struct HashAndFormat {
pub format: BlobFormat,
}

#[cfg(feature = "redb")]
mod redb_support {
use super::{BlobFormat, Hash, HashAndFormat};
use redb::RedbValue;

impl RedbValue for HashAndFormat {
type SelfType<'a> = Self;

type AsBytes<'a> = [u8; 33];

fn fixed_width() -> Option<usize> {
Some(33)
}

fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
let t: &'a [u8; 33] = data.try_into().unwrap();
let format = match t[0] {
0 => BlobFormat::Raw,
1 => BlobFormat::HashSeq,
_ => panic!("invalid format"),
};
let hash = Hash::from_bytes(t[1..].try_into().unwrap());
Self { hash, format }
}

fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a,
Self: 'b,
{
let mut res = [0u8; 33];
res[0] = u64::from(value.format) as u8;
res[1..].copy_from_slice(value.hash.as_bytes());
res
}

fn type_name() -> redb::TypeName {
redb::TypeName::new("iroh_base::HashAndFormat")
}
}
}

impl HashAndFormat {
/// Create a new hash and format pair, using the default (raw) format.
pub fn raw(hash: Hash) -> Self {
Expand Down
124 changes: 57 additions & 67 deletions iroh-bytes/src/store/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ impl Options {
struct Inner {
options: Options,
state: RwLock<State>,
tags: RwLock<BTreeMap<Tag, HashAndFormat>>,
// mutex for async access to complete files
//
// complete files are never written to. They come into existence when a partial
Expand All @@ -493,22 +492,16 @@ struct Inner {
}

/// Table: Partial Index
/// Key: [u8; 32] # Hash
/// Value:
/// - u64 - size
/// - [u8; 16] - UUID
const PARTIAL_TABLE: TableDefinition<Hash, PartialEntryData> =
TableDefinition::new("partial-index-0");

/// Table: Full Index
/// Key: [u8; 32] # Hash
/// Value:
/// - u64 - size
/// - bool - owned
/// - &[u8] - External Paths (might be empty)
const COMPLETE_TABLE: TableDefinition<Hash, CompleteEntry> =
TableDefinition::new("complete-index-0");

/// Table: Tags
const TAGS_TABLE: TableDefinition<Tag, HashAndFormat> = TableDefinition::new("tags-0");

/// Flat file database implementation.
///
/// This
Expand Down Expand Up @@ -705,10 +698,15 @@ impl ReadableStore for Store {
}

fn tags(&self) -> Box<dyn Iterator<Item = (Tag, HashAndFormat)> + Send + Sync + 'static> {
let inner = self.0.tags.read().unwrap();
let items = inner
let inner = self.0.db.begin_read().unwrap();
let tags_table = inner.open_table(TAGS_TABLE).unwrap();
let items = tags_table
.iter()
.map(|(k, v)| (k.clone(), *v))
.unwrap()
.map(|x| {
let (k, v) = x.unwrap();
(k.value(), v.value())
})
.collect::<Vec<_>>();
Box::new(items.into_iter())
}
Expand Down Expand Up @@ -814,7 +812,7 @@ impl super::Store for Store {

fn create_tag(&self, value: HashAndFormat) -> BoxFuture<'_, io::Result<Tag>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.create_tag_sync(value))
tokio::task::spawn_blocking(move || this.create_tag_sync(value).map_err(to_io_err))
.map(flatten_to_io)
.boxed()
}
Expand Down Expand Up @@ -1018,48 +1016,33 @@ impl Store {

fn set_tag_sync(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> {
tracing::debug!("set_tag {} {:?}", name, value);
let mut tags = self.0.tags.write().unwrap();
let mut new_tags = tags.clone();
let changed = if let Some(value) = value {
if let Some(old_value) = new_tags.insert(name, value) {
value != old_value
let txn = self.0.db.begin_write().map_err(to_io_err)?;
{
let mut tags = txn.open_table(TAGS_TABLE).map_err(to_io_err)?;
if let Some(target) = value {
tags.insert(name, target)
} else {
true
tags.remove(name)
}
} else {
new_tags.remove(&name).is_some()
};
if changed {
let serialized = postcard::to_stdvec(&new_tags).unwrap();
let temp_path = self
.0
.options
.meta_path
.join(format!("tags-{}.meta", hex::encode(new_uuid())));
let final_path = self.0.options.meta_path.join("tags.meta");
write_atomic(&temp_path, &final_path, &serialized)?;
*tags = new_tags;
.map_err(to_io_err)?;
}
drop(tags);
txn.commit().map_err(to_io_err)?;
Ok(())
}

fn create_tag_sync(&self, value: HashAndFormat) -> io::Result<Tag> {
fn create_tag_sync(&self, value: HashAndFormat) -> std::result::Result<Tag, redb::Error> {
tracing::debug!("create_tag {:?}", value);
let mut tags = self.0.tags.write().unwrap();
let mut new_tags = tags.clone();
let tag = Tag::auto(SystemTime::now(), |x| new_tags.contains_key(x));
new_tags.insert(tag.clone(), value);
let serialized = postcard::to_stdvec(&new_tags).unwrap();
let temp_path = self
.0
.options
.meta_path
.join(format!("tags-{}.meta", hex::encode(new_uuid())));
let final_path = self.0.options.meta_path.join("tags.meta");
write_atomic(&temp_path, &final_path, &serialized)?;
*tags = new_tags;
drop(tags);
let txn = self.0.db.begin_write().map_err(to_io_err)?;
let tag = {
let mut tags = txn.open_table(TAGS_TABLE).map_err(to_io_err)?;
let tag = Tag::auto(SystemTime::now(), |t| {
tags.get(Tag(Bytes::copy_from_slice(t)))
.map(|x| x.is_some())
})?;
tags.insert(&tag, value)?;
tag
};
txn.commit()?;
Ok(tag)
}

Expand Down Expand Up @@ -1156,7 +1139,7 @@ impl Store {
std::fs::rename(temp_data_path, data_path)?;
if temp_outboard_path.exists() {
let outboard_path = self.0.options.owned_outboard_path(&hash);
std::fs::rename(temp_outboard_path, &outboard_path)?;
std::fs::rename(temp_outboard_path, outboard_path)?;
}
let write_tx = self.0.db.begin_write().map_err(to_io_err)?;
{
Expand Down Expand Up @@ -1312,27 +1295,21 @@ impl Store {
std::fs::create_dir_all(&meta_path)?;

let db = Database::create(db_path)?;
// create tables if they don't exist
let write_tx = db.begin_write()?;
{
let _table = write_tx.open_table(PARTIAL_TABLE)?;
let _table = write_tx.open_table(COMPLETE_TABLE)?;
let _table = write_tx.open_table(TAGS_TABLE)?;
}
write_tx.commit()?;

let tags_path = meta_path.join("tags.meta");
let mut tags = BTreeMap::new();
if tags_path.exists() {
let data = std::fs::read(tags_path)?;
tags = postcard::from_bytes(&data)?;
tracing::debug!("loaded tags. {} entries", tags.len());
};
let res = Self(Arc::new(Inner {
state: RwLock::new(State {
data: Default::default(),
live: Default::default(),
temp: Default::default(),
}),
tags: RwLock::new(tags),
options: Options {
complete_path,
partial_path,
Expand All @@ -1356,6 +1333,15 @@ impl Store {

let complete_path = &self.0.options.complete_path;
let partial_path = &self.0.options.partial_path;
let meta_path = &self.0.options.meta_path;
let tags_path = meta_path.join("tags.meta");
let mut tags = BTreeMap::<Tag, HashAndFormat>::new();
if tags_path.exists() {
let data = std::fs::read(tags_path)?;
tags = postcard::from_bytes(&data)?;
tracing::debug!("loaded tags. {} entries", tags.len());
};

tracing::info!("migration from v1 to v2");
tracing::info!("complete_path: {}", complete_path.display());
tracing::info!("partial_path: {}", partial_path.display());
Expand Down Expand Up @@ -1589,16 +1575,20 @@ impl Store {
tracing::info!("partial {}", hash);
}
let txn = self.0.db.begin_write()?;
let mut complete_table = txn.open_table(COMPLETE_TABLE)?;
let mut partial_table = txn.open_table(PARTIAL_TABLE)?;
for (hash, entry) in complete {
complete_table.insert(hash, entry)?;
}
for (hash, entry) in partial {
partial_table.insert(hash, entry)?;
{
let mut complete_table = txn.open_table(COMPLETE_TABLE)?;
let mut partial_table = txn.open_table(PARTIAL_TABLE)?;
let mut tags_table = txn.open_table(TAGS_TABLE)?;
for (hash, entry) in complete {
complete_table.insert(hash, entry)?;
}
for (hash, entry) in partial {
partial_table.insert(hash, entry)?;
}
for (tag, target) in tags {
tags_table.insert(tag, target)?;
}
}
drop(complete_table);
drop(partial_table);
txn.commit()?;

Ok(())
Expand Down
5 changes: 4 additions & 1 deletion iroh-bytes/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,10 @@ impl super::Store for Store {

fn create_tag(&self, hash: HashAndFormat) -> BoxFuture<'_, io::Result<Tag>> {
let mut state = self.0.state.write().unwrap();
let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
let tag = Tag::auto(SystemTime::now(), |x| {
io::Result::Ok(state.tags.contains_key(x))
})
.unwrap();
state.tags.insert(tag.clone(), hash);
futures::future::ok(tag).boxed()
}
Expand Down
46 changes: 43 additions & 3 deletions iroh-bytes/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,43 @@ pub mod progress;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)]
pub struct Tag(pub Bytes);

#[cfg(feature = "redb")]
impl redb::RedbValue for Tag {
type SelfType<'a> = Self;

type AsBytes<'a> = &'a [u8];

fn fixed_width() -> Option<usize> {
None
}

fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
Self(Bytes::copy_from_slice(data))
}

fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a,
Self: 'b,
{
value.0.as_ref()
}

fn type_name() -> redb::TypeName {
redb::TypeName::new("iroh_base::Tag")
}
}

#[cfg(feature = "redb")]
impl redb::RedbKey for Tag {
fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
data1.cmp(data2)
}
}

impl Borrow<[u8]> for Tag {
fn borrow(&self) -> &[u8] {
self.0.as_ref()
Expand Down Expand Up @@ -59,16 +96,19 @@ impl Debug for Tag {

impl Tag {
/// Create a new tag that does not exist yet.
pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
pub fn auto<E>(
time: SystemTime,
exists: impl Fn(&[u8]) -> std::result::Result<bool, E>,
) -> std::result::Result<Self, E> {
let now = chrono::DateTime::<chrono::Utc>::from(time);
let mut i = 0;
loop {
let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
if i != 0 {
text.push_str(&format!("-{}", i));
}
if !exists(text.as_bytes()) {
return Self::from(text);
if !exists(text.as_bytes())? {
return Ok(Self::from(text));
}
i += 1;
}
Expand Down

0 comments on commit fdccf3b

Please sign in to comment.