Skip to content

Commit

Permalink
Slow node start with checkpoint sync (#2014)
Browse files Browse the repository at this point in the history
* Some improvements for loading from checkpoint

* Update zilliqa/src/db.rs

Co-authored-by: James Hinshelwood <[email protected]>

* Addressed review feedback

---------

Co-authored-by: James Hinshelwood <[email protected]>
  • Loading branch information
bzawisto and JamesHinshelwood authored Dec 13, 2024
1 parent c58115f commit 4c76cf9
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 45 deletions.
2 changes: 1 addition & 1 deletion eth-trie.rs/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub trait DB: Send + Sync {
pub struct MemoryDB {
// If "light" is true, the data is deleted from the database at the time of submission.
light: bool,
storage: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
pub storage: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
}

impl MemoryDB {
Expand Down
123 changes: 79 additions & 44 deletions zilliqa/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{

use alloy::primitives::Address;
use anyhow::{anyhow, Context, Result};
use eth_trie::{EthTrie, Trie};
use eth_trie::{EthTrie, MemoryDB, Trie, DB};
use itertools::Itertools;
use lru_mem::LruCache;
use lz4::{Decoder, EncoderBuilder};
Expand All @@ -21,7 +21,7 @@ use rusqlite::{
Connection, OptionalExtension, Row, ToSql,
};
use serde::{Deserialize, Serialize};
use tracing::warn;
use tracing::{debug, warn};

use crate::{
crypto::{Hash, NodeSignature},
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Db {
// Read decompressed file
let input = File::open(&temp_filename)?;

let mut reader = BufReader::with_capacity(8192 * 1024, input); // 8 MiB read chunks
let mut reader = BufReader::with_capacity(128 * 1024 * 1024, input); // 128 MiB read chunks
let trie_storage = Arc::new(self.state_trie()?);
let mut state_trie = EthTrie::new(trie_storage.clone());

Expand Down Expand Up @@ -414,8 +414,24 @@ impl Db {
}
}

// Helper function used for inserting entries from memory (which backs storage trie) into persistent storage
let db_flush = |db: Arc<TrieStorage>, cache: Arc<MemoryDB>| -> Result<()> {
let mut cache_storage = cache.storage.write();
let (keys, values): (Vec<_>, Vec<_>) = cache_storage.drain().unzip();
debug!("Doing write to db with total items {}", keys.len());
db.insert_batch(keys, values)?;
Ok(())
};

let mut processed_accounts = 0;
let mut processed_storage_items = 0;
// This is taken directly from batch_write. However, this can be as big as we think it's reasonable to be
// (ideally multiples of `32766 / 2` so that batch writes are fully utilized)
// TODO: consider putting this const somewhere else as long as we use sql-lite
// Also see: https://www.sqlite.org/limits.html#max_variable_number
let maximum_sql_parameters = 32766 / 2;
const COMPUTE_ROOT_HASH_EVERY_ACCOUNTS: usize = 10000;
let mem_storage = Arc::new(MemoryDB::new(true));

// then decode state
loop {
Expand Down Expand Up @@ -445,7 +461,7 @@ impl Db {
reader.read_exact(&mut account_storage)?;

// Pull out each storage key and value
let mut account_trie = EthTrie::new(trie_storage.clone());
let mut account_trie = EthTrie::new(mem_storage.clone());
let mut pointer: usize = 0;
while account_storage_len > pointer {
let storage_key_len_buf: &[u8] =
Expand All @@ -465,6 +481,8 @@ impl Db {
pointer += storage_val_len;

account_trie.insert(storage_key, storage_val)?;

processed_storage_items += 1;
}

let account_trie_root =
Expand All @@ -474,6 +492,11 @@ impl Db {
"Invalid checkpoint file: account trie root hash mismatch: calculated {}, checkpoint file contained {}", hex::encode(account_trie.root_hash()?.as_slice()), hex::encode(account_trie_root)
));
}
if processed_storage_items > maximum_sql_parameters {
db_flush(trie_storage.clone(), mem_storage.clone())?;
processed_storage_items = 0;
}

state_trie.insert(&account_hash, &serialised_account)?;

processed_accounts += 1;
Expand All @@ -483,6 +506,8 @@ impl Db {
}
}

db_flush(trie_storage.clone(), mem_storage.clone())?;

if state_trie.root_hash()? != parent.state_root_hash().0 {
return Err(anyhow!("Invalid checkpoint file: state root hash mismatch"));
}
Expand Down Expand Up @@ -1060,7 +1085,7 @@ pub fn checkpoint_block_with_state<P: AsRef<Path> + Debug>(
let output_filename = get_checkpoint_filename(output_dir, block)?;
let temp_filename = output_filename.with_extension("part");
let outfile_temp = File::create_new(&temp_filename)?;
let mut writer = BufWriter::with_capacity(8192 * 1024, outfile_temp); // 8 MiB chunks
let mut writer = BufWriter::with_capacity(128 * 1024 * 1024, outfile_temp); // 128 MiB chunks

// write the header:
writer.write_all(&CHECKPOINT_HEADER_BYTES)?; // file identifier
Expand Down Expand Up @@ -1166,45 +1191,12 @@ pub struct TrieStorage {
cache: Arc<Mutex<LruCache<Vec<u8>, Vec<u8>>>>,
}

impl eth_trie::DB for TrieStorage {
type Error = rusqlite::Error;

fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
if let Some(cached) = self.cache.lock().unwrap().get(key).map(|v| v.to_vec()) {
return Ok(Some(cached));
}

let value: Option<Vec<u8>> = self
.db
.lock()
.unwrap()
.query_row(
"SELECT value FROM state_trie WHERE key = ?1",
[key],
|row| row.get(0),
)
.optional()?;

let mut cache = self.cache.lock().unwrap();
if !cache.contains(key) {
if let Some(value) = &value {
let _ = cache.insert(key.to_vec(), value.clone());
}
}

Ok(value)
}

fn insert(&self, key: &[u8], value: Vec<u8>) -> Result<(), Self::Error> {
self.db.lock().unwrap().execute(
"INSERT OR REPLACE INTO state_trie (key, value) VALUES (?1, ?2)",
(key, &value),
)?;
let _ = self.cache.lock().unwrap().insert(key.to_vec(), value);
Ok(())
}

fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
impl TrieStorage {
pub fn write_batch(
&self,
keys: Vec<Vec<u8>>,
values: Vec<Vec<u8>>,
) -> Result<(), rusqlite::Error> {
if keys.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -1246,6 +1238,49 @@ impl eth_trie::DB for TrieStorage {

Ok(())
}
}

impl eth_trie::DB for TrieStorage {
type Error = rusqlite::Error;

fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
if let Some(cached) = self.cache.lock().unwrap().get(key).map(|v| v.to_vec()) {
return Ok(Some(cached));
}

let value: Option<Vec<u8>> = self
.db
.lock()
.unwrap()
.query_row(
"SELECT value FROM state_trie WHERE key = ?1",
[key],
|row| row.get(0),
)
.optional()?;

let mut cache = self.cache.lock().unwrap();
if !cache.contains(key) {
if let Some(value) = &value {
let _ = cache.insert(key.to_vec(), value.clone());
}
}

Ok(value)
}

fn insert(&self, key: &[u8], value: Vec<u8>) -> Result<(), Self::Error> {
self.db.lock().unwrap().execute(
"INSERT OR REPLACE INTO state_trie (key, value) VALUES (?1, ?2)",
(key, &value),
)?;
let _ = self.cache.lock().unwrap().insert(key.to_vec(), value);
Ok(())
}

fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
self.write_batch(keys, values)
}

fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> {
// we keep old state to function as an archive node, therefore no-op
Expand Down

0 comments on commit 4c76cf9

Please sign in to comment.