Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: db state sync by merk chunking #292

Merged
merged 31 commits into from
May 1, 2024
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: base state sync
ogabrielides committed Apr 16, 2024
commit bde2deaecfb4a39d74bf3582a22af3db74d03b5a
344 changes: 343 additions & 1 deletion grovedb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -168,7 +168,10 @@ mod versioning;
mod visualize;

#[cfg(feature = "full")]
use std::{collections::HashMap, option::Option::None, path::Path};
use std::{collections::HashMap, option::Option::None, path::Path, fmt};
use std::collections::{BTreeMap, BTreeSet, LinkedList, VecDeque};
use std::marker::PhantomData;
use itertools::Chunk;

#[cfg(any(feature = "full", feature = "verify"))]
use element::helpers;
@@ -180,6 +183,7 @@ pub use element::ElementFlags;
use grovedb_costs::{
cost_return_on_error, cost_return_on_error_no_add, CostResult, CostsExt, OperationCost,
};
use grovedb_costs::storage_cost::key_value_cost::KeyValueStorageCost;
#[cfg(feature = "estimated_costs")]
pub use grovedb_merk::estimated_costs::{
average_case_costs::{
@@ -199,6 +203,12 @@ use grovedb_merk::{
tree::{combine_hash, value_hash},
BatchEntry, CryptoHash, KVIterator, Merk,
};
use grovedb_merk::{ChunkProducer, Restorer, TreeFeatureType};
use grovedb_merk::Error::ChunkingError;
use grovedb_merk::proofs::{Node, Op};
use grovedb_merk::proofs::chunk::error::ChunkError;
use grovedb_merk::proofs::chunk::util::{generate_traversal_instruction_as_string, number_of_chunks};
use grovedb_merk::tree::kv_digest_to_kv_hash;
use grovedb_path::SubtreePath;
#[cfg(feature = "full")]
use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext;
@@ -233,6 +243,82 @@ pub struct GroveDb {
db: RocksDbStorage,
}

pub struct s_db_snapshot {
pub root_hash: CryptoHash,
pub data: Vec<(String, Vec<Op>)>
}

impl s_db_snapshot {
pub fn new() -> s_db_snapshot {
s_db_snapshot {
root_hash: CryptoHash::default(),
data: Vec::new(),
}
}
}

impl fmt::Debug for s_db_snapshot {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "root_hash:{:?}\n", hex::encode(self.root_hash));
for (global_chunk_id, _) in self.data.iter() {
write!(f, " global_chunk_id:{:?}\n", global_chunk_id);
}
Ok(())
}
}

pub struct s_db_snapshot_sorted {
pub root_hash: CryptoHash,
pub data: BTreeMap<String, Vec<(String, Vec<Op>)>>
}

impl s_db_snapshot_sorted {
pub fn new() -> s_db_snapshot_sorted {
s_db_snapshot_sorted {
root_hash: CryptoHash::default(),
data: BTreeMap::new(),
}
}
}

impl fmt::Debug for s_db_snapshot_sorted {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "root_hash:{:?}\n", hex::encode(self.root_hash));
for (prefix, chunk_vec) in self.data.iter() {
write!(f, " prefix:{:?}\n", prefix);
for (chunk_id, _) in chunk_vec.iter() {
write!(f, " chunk_id:{:?}\n", chunk_id);
}
}
Ok(())
}
}

pub struct s_subtrees_metadata {
pub data: BTreeMap<String, (Vec<Vec<u8>>, CryptoHash, CryptoHash)>
}

impl s_subtrees_metadata {
pub fn new() -> s_subtrees_metadata {
s_subtrees_metadata {
data: BTreeMap::new(),
}
}
}

impl fmt::Debug for s_subtrees_metadata {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for (prefix, metadata) in self.data.iter() {
let metadata_path = &metadata.0;
let metadata_path_str = s_util_path_to_string(&metadata_path);
let metadata_hash_0 = &metadata.1;
let metadata_hash_1 = &metadata.2;
write!(f, " prefix:{:?} -> path:{:?} ({:?}:{:?})\n", prefix, metadata_path_str, hex::encode(metadata_hash_0), hex::encode(metadata_hash_1));
}
Ok(())
}
}

/// Transaction
#[cfg(feature = "full")]
pub type Transaction<'db> = <RocksDbStorage as Storage<'db>>::Transaction;
@@ -1004,4 +1090,260 @@ impl GroveDb {
}
Ok(issues)
}

pub fn s_create_db_snapshot(
&self,
list_only_chunk_ids: bool,
) -> Result<s_db_snapshot, Error> {
let mut db_snapsot = s_db_snapshot::new();

db_snapsot.root_hash = self.root_hash(None).unwrap().unwrap();

let subtrees_root = self.find_subtrees(&SubtreePath::empty(), None).unwrap()?;
for subtree in subtrees_root.into_iter() {
let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;

let continue_storage_batch = StorageBatch::new();

let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap();
let merk = self.open_batch_merk_at_path(&continue_storage_batch, path.into(), false).value.unwrap();

if (merk.is_empty_tree().unwrap()) {
continue;
}

let mut chunk_producer = ChunkProducer::new(&merk).unwrap();

let mut chunk_id_opt = Some("".to_string());
while let Some(chunk_id) = chunk_id_opt {
let (chunk, next_chunk_id) = chunk_producer.chunk(chunk_id.as_str()).unwrap();

let global_chunk_id = hex::encode(prefix) + &chunk_id;
if (list_only_chunk_ids) {
db_snapsot.data.push((global_chunk_id, vec![]));
}
else {
db_snapsot.data.push((global_chunk_id, chunk));
}

chunk_id_opt = next_chunk_id;
}
}

Ok(db_snapsot)
}

fn s_sort_db_snapshot(
&self,
snapshot: s_db_snapshot,
) -> Result<s_db_snapshot_sorted, Error> {
let mut db_snapsot_sorted = s_db_snapshot_sorted::new();
db_snapsot_sorted.root_hash = snapshot.root_hash;

let CHUNK_PREFIX_LENGTH: usize = 64;

for chunk_entry in snapshot.data {
let global_chunk_id = chunk_entry.0;
let chunk_data = chunk_entry.1;

if (global_chunk_id.len() < CHUNK_PREFIX_LENGTH) {
return Err(Error::CorruptedData(
"expected global chunk id of at least 64 length".to_string(),
));
}

let chunk_prefix = global_chunk_id.chars().take(CHUNK_PREFIX_LENGTH).collect::<String>();
let chunk_id = global_chunk_id.chars().skip(CHUNK_PREFIX_LENGTH).collect::<String>();

db_snapsot_sorted.data.entry(chunk_prefix).or_insert(Vec::new()).push((chunk_id, chunk_data.to_vec()));
}

for (_key, vec) in db_snapsot_sorted.data.iter_mut() {
vec.sort_by(|a, b| a.0.len().cmp(&b.0.len()));
}

Ok(db_snapsot_sorted)
}

fn s_get_subtrees_metadata<B: AsRef<[u8]>>(
&self,
path: &SubtreePath<B>,
) -> Result<s_subtrees_metadata, Error> {
let mut subtrees_metadata = crate::s_subtrees_metadata::new();

let subtrees_root = self.find_subtrees(&SubtreePath::empty(), None).unwrap().unwrap();
for subtree in subtrees_root.into_iter() {
let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;
let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap();

let current_path = SubtreePath::from(path);

let parent_path_opt = current_path.derive_parent();
if (parent_path_opt.is_some()) {
let parent_path = parent_path_opt.unwrap().0;
let continue_storage_batch = StorageBatch::new();
let parent_merk = self.open_batch_merk_at_path(&continue_storage_batch, parent_path, false).value.unwrap();
let parent_key = subtree.last().unwrap();
let (elem_value, elem_value_hash) = parent_merk
.get_value_and_value_hash(
parent_key,
true,
None::<&fn(&[u8]) -> Option<ValueDefinedCostType>>,
)
.unwrap()
.expect("should get value hash")
.expect("value hash should be some");

let actual_value_hash = value_hash(&elem_value).unwrap();
subtrees_metadata.data.insert(hex::encode(prefix), (current_path.to_vec(), actual_value_hash, elem_value_hash));
}
else {
subtrees_metadata.data.insert(hex::encode(prefix), (current_path.to_vec(), CryptoHash::default(), CryptoHash::default()));
}
}
Ok(subtrees_metadata)
}

pub fn s_fetch_chunk(
&self,
global_chunk_id: String
) -> Result<Vec<Op>, Error> {
let CHUNK_PREFIX_LENGTH: usize = 64;
if (global_chunk_id.len() < CHUNK_PREFIX_LENGTH) {
return Err(Error::CorruptedData(
"expected global chunk id of at least 64 length".to_string(),
));
}

let chunk_prefix = global_chunk_id.chars().take(CHUNK_PREFIX_LENGTH).collect::<String>();
let chunk_id = global_chunk_id.chars().skip(CHUNK_PREFIX_LENGTH).collect::<String>();

let subtrees_metadata = self.s_get_subtrees_metadata(&SubtreePath::empty()).unwrap();

match subtrees_metadata.data.get(&chunk_prefix) {
Some(path_data) => {
let subtree = &path_data.0;
let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;

let continue_storage_batch = StorageBatch::new();
let merk = self.open_batch_merk_at_path(&continue_storage_batch, path.into(), false).value?;

if (merk.is_empty_tree().unwrap()) {
return Err(Error::CorruptedData(
"Empty merk".to_string(),
));
}

let mut chunk_producer = ChunkProducer::new(&merk).unwrap();
let (chunk, _) = chunk_producer.chunk(chunk_id.as_str()).unwrap();
Ok(chunk)
},
None => {
return Err(Error::CorruptedData(
"Prefix not found".to_string(),
));
}
}
}

pub fn s_reconstruct_db(
&self,
snapshot: s_db_snapshot
) -> Result<(), Error> {
let mut sorted_snapshot = self.s_sort_db_snapshot(snapshot)?;

//Always start by empty prefix = root
if let Some(chunk_vec) = sorted_snapshot.data.remove(&hex::encode(CryptoHash::default())) {
let tx = self.start_transaction();
let merk = self.open_merk_for_replication(SubtreePath::empty(), &tx).unwrap();
let mut restorer = Restorer::new(merk, sorted_snapshot.root_hash, None);
for (chunk_id, chunk) in chunk_vec {
restorer.process_chunk(chunk_id, chunk).expect("should process chunk successfully");
}
restorer.finalize().expect("should finalize");
self.commit_transaction(tx);
} else {
return Err(Error::CorruptedData(
"No root prefix chunks found".to_string(),
));
}

let mut processed_prefixes :BTreeSet<String> = BTreeSet::new();
processed_prefixes.insert(hex::encode(CryptoHash::default()));

let mut queue_prefixes_to_be_processed : VecDeque<String> = VecDeque::new();

let mut subtrees_metadata = self.s_get_subtrees_metadata(&SubtreePath::empty()).unwrap();
for prefix in subtrees_metadata.data.keys() {
if !processed_prefixes.contains(prefix) {
//println!("prefix:{:?} pending for processing", prefix);
queue_prefixes_to_be_processed.push_back(prefix.to_string());
}
}

while (!queue_prefixes_to_be_processed.is_empty()) {
while let Some(current_prefix) = queue_prefixes_to_be_processed.pop_front() {
let prefix_metadata = &subtrees_metadata.data[&current_prefix];
let s_path = &prefix_metadata.0;
let s_actual_value_hash = &prefix_metadata.1;
let s_elem_value_hash = &prefix_metadata.2;
println!(" about to process{:?} with ({:?}:{:?})", s_util_path_to_string(&s_path), hex::encode(s_actual_value_hash), hex::encode(s_elem_value_hash));

let subtree_path: Vec<&[u8]> = s_path.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;

if let Some(chunk_vec) = sorted_snapshot.data.remove(&current_prefix) {
let tx = self.start_transaction();
if (chunk_vec.is_empty()) {println!("empty"); }
let merk = self.open_merk_for_replication(path.into(), &tx).unwrap();
let mut restorer = Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash));
for (chunk_id, chunk) in chunk_vec {
restorer.process_chunk(chunk_id, chunk).expect("should process chunk successfully");
}
restorer.finalize().expect("should finalize");
self.commit_transaction(tx);
} else {
println!(" skipping empty {:?}", s_util_path_to_string(&s_path));
}

processed_prefixes.insert(current_prefix);
}

subtrees_metadata = self.s_get_subtrees_metadata(&SubtreePath::empty()).unwrap();
for prefix in subtrees_metadata.data.keys() {
if !processed_prefixes.contains(prefix) {
queue_prefixes_to_be_processed.push_back(prefix.to_string());
}
}
}

if (sorted_snapshot.data.len() > 0) {
return Err(Error::CorruptedData(
"Remaining chunks not processed".to_string(),
));
}

let incorrect_hashes = self.verify_grovedb(None)?;
if (incorrect_hashes.len() > 0) {
return Err(Error::CorruptedData(
"DB verification failed".to_string(),
));
}

Ok(())
}
}

pub fn s_util_path_to_string(
path: &Vec<Vec<u8>>,
) -> Vec<String> {
let mut subtree_path_str: Vec<String> = vec![];
for subtree in path.to_vec() {
let string = std::str::from_utf8(&subtree).unwrap();
subtree_path_str.push(string.parse().unwrap());
}
subtree_path_str
}
Loading