diff --git a/trin-execution/src/evm/block_executor.rs b/trin-execution/src/evm/block_executor.rs index b6896c123..181eb50e4 100644 --- a/trin-execution/src/evm/block_executor.rs +++ b/trin-execution/src/evm/block_executor.rs @@ -1,13 +1,17 @@ use std::{ collections::HashMap, - fs::{self, File}, + fs::File, io::BufReader, - path::{Path, PathBuf}, + path::Path, + time::{Duration, Instant}, }; use anyhow::{bail, ensure}; use eth_trie::{RootWithTrieDiff, Trie}; -use ethportal_api::{types::state_trie::account_state::AccountState, Header}; +use ethportal_api::{ + types::{execution::transaction::Transaction, state_trie::account_state::AccountState}, + Header, +}; use revm::{ db::{states::bundle_state::BundleRetention, State}, inspectors::TracerEip3155, @@ -24,7 +28,6 @@ use crate::{ TRANSACTION_PROCESSING_TIMES, }, storage::evm_db::EvmDB, - types::block_to_trace::BlockToTrace, }; use super::{ @@ -50,23 +53,24 @@ struct GenesisConfig { } /// BlockExecutor is a struct that is responsible for executing blocks or a block in memory. +/// /// The use case is /// - initialize the BlockExecutor with a database /// - execute blocks /// - commit the changes and retrieve the result pub struct BlockExecutor<'a> { + /// The evm used for block execution. evm: Evm<'a, (), State>, + /// The time when BlockExecutor was created. + creation_time: Instant, + /// The number of executed blocks. + executed_blocks: u64, + /// Sum of gas used of all executed blocks. cumulative_gas_used: u64, - block_to_trace: BlockToTrace, - node_data_directory: PathBuf, } impl<'a> BlockExecutor<'a> { - pub fn new( - database: EvmDB, - block_to_trace: BlockToTrace, - node_data_directory: PathBuf, - ) -> Self { + pub fn new(database: EvmDB) -> Self { let state_database = State::builder() .with_database(database) .with_bundle_update() @@ -75,9 +79,9 @@ impl<'a> BlockExecutor<'a> { Self { evm, + creation_time: Instant::now(), + executed_blocks: 0, cumulative_gas_used: 0, - block_to_trace, - node_data_directory, } } @@ -115,10 +119,7 @@ impl<'a> BlockExecutor<'a> { stop_timer(timer); let timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["get_root_with_trie_diff"]); - let RootWithTrieDiff { - root, - trie_diff: changed_nodes, - } = self + let root_with_trie_diff = self .evm .db_mut() .database @@ -128,10 +129,7 @@ impl<'a> BlockExecutor<'a> { stop_timer(timer); - Ok(RootWithTrieDiff { - root, - trie_diff: changed_nodes, - }) + Ok(root_with_trie_diff) } fn process_genesis(&mut self) -> anyhow::Result<()> { @@ -165,6 +163,14 @@ impl<'a> BlockExecutor<'a> { } pub fn execute_block(&mut self, block: &ProcessedBlock) -> anyhow::Result<()> { + self.execute_block_with_tracer(block, |_| None) + } + + pub fn execute_block_with_tracer( + &mut self, + block: &ProcessedBlock, + tx_tracer_fn: impl Fn(&Transaction) -> Option, + ) -> anyhow::Result<()> { info!("State EVM processing block {}", block.header.number); let execute_block_timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["execute_block"]); @@ -183,7 +189,7 @@ impl<'a> BlockExecutor<'a> { for transaction in block.transactions.iter() { let transaction_timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["transaction"]); - let evm_result = self.execute_transaction(transaction)?; + let evm_result = self.execute_transaction(transaction, &tx_tracer_fn)?; block_gas_used += evm_result.result.gas_used(); let commit_timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["commit_state"]); @@ -200,6 +206,7 @@ impl<'a> BlockExecutor<'a> { block_gas_used, block.header.gas_used ); + self.executed_blocks += 1; self.cumulative_gas_used += block_gas_used; // update beneficiaries @@ -218,6 +225,7 @@ impl<'a> BlockExecutor<'a> { fn execute_transaction( &mut self, tx: &TransactionsWithSender, + tracer_fn: impl Fn(&Transaction) -> Option, ) -> anyhow::Result { let block_number = self.evm.block().number.to(); @@ -228,21 +236,12 @@ impl<'a> BlockExecutor<'a> { // Execute transaction let timer = start_timer_vec(&TRANSACTION_PROCESSING_TIMES, &["transact"]); - let result = if self.block_to_trace.should_trace(block_number) { - let output_path = self - .node_data_directory - .as_path() - .join("evm_traces") - .join(format!("block_{block_number}")); - fs::create_dir_all(&output_path)?; - let output_file = - File::create(output_path.join(format!("tx_{}.json", tx.transaction.hash())))?; - let tracer = TracerEip3155::new(Box::new(output_file)); - - create_evm_with_tracer(self.evm.block().clone(), tx, self.evm.db_mut(), tracer) - .transact()? - } else { - self.evm.transact()? + let result = match tracer_fn(&tx.transaction) { + Some(tracer) => { + create_evm_with_tracer(self.evm.block().clone(), tx, self.evm.db_mut(), tracer) + .transact()? + } + None => self.evm.transact()?, }; stop_timer(timer); @@ -269,11 +268,19 @@ impl<'a> BlockExecutor<'a> { Ok(()) } - pub fn cumulative_gas_used(&self) -> u64 { - self.cumulative_gas_used - } - - pub fn bundle_size_hint(&self) -> usize { - self.evm.db().bundle_size_hint() + /// This function is used to determine if we should commit the block execution early. + /// + /// We want this for a few reasons + /// - To prevent memory usage from getting too high + /// - To cap the amount of time it takes to commit everything to the database, the bigger the + /// changes the more time it takes. + /// + /// The various limits are arbitrary and can be adjusted as needed, + /// but are based on the current state of the network and what we have seen so far. + pub fn should_commit(&self) -> bool { + self.executed_blocks >= 500_000 + || self.evm.db().bundle_size_hint() >= 5_000_000 + || self.cumulative_gas_used >= 30_000_000 * 50_000 + || self.creation_time.elapsed() >= Duration::from_secs(30 * 60) } } diff --git a/trin-execution/src/execution.rs b/trin-execution/src/execution.rs index 9255a6f3a..b286d7a16 100644 --- a/trin-execution/src/execution.rs +++ b/trin-execution/src/execution.rs @@ -1,16 +1,11 @@ -use alloy_primitives::{keccak256, Address, Bytes, B256}; -use alloy_rlp::Decodable; -use anyhow::{anyhow, bail, ensure}; +use alloy_primitives::B256; +use anyhow::ensure; use eth_trie::{RootWithTrieDiff, Trie}; -use ethportal_api::{types::state_trie::account_state::AccountState, Header}; -use std::{ - collections::BTreeSet, - path::PathBuf, - sync::Arc, - time::{Duration, Instant}, -}; +use ethportal_api::{types::execution::transaction::Transaction, Header}; +use revm::inspectors::TracerEip3155; +use std::{path::PathBuf, sync::Arc}; use tokio::sync::Mutex; -use tracing::info; +use tracing::{info, warn}; use crate::{ era::manager::EraManager, @@ -23,7 +18,7 @@ use crate::{ }, }; -use super::{config::StateConfig, types::trie_proof::TrieProof, utils::address_to_nibble_path}; +use super::config::StateConfig; pub struct TrinExecution { pub database: EvmDB, @@ -58,6 +53,14 @@ impl TrinExecution { }) } + pub fn next_block_number(&self) -> u64 { + self.execution_position.next_block_number() + } + + pub async fn process_next_block(&mut self) -> anyhow::Result { + self.process_range_of_blocks(self.next_block_number()).await + } + /// Processes up to end block number (inclusive) and returns the root with trie diff /// If the state cache gets too big, we will commit the state to the database early and return /// the function early along with it @@ -68,13 +71,10 @@ impl TrinExecution { "End block number {end} is less than start block number {start}", ); - info!("Processing blocks from {} to {} (inclusive)", start, end); - let mut block_executor = BlockExecutor::new( - self.database.clone(), - self.config.block_to_trace.clone(), - self.node_data_directory.clone(), - ); - let range_start = Instant::now(); + info!("Processing blocks from {start} to {end} (inclusive)"); + + let mut block_executor = BlockExecutor::new(self.database.clone()); + let mut last_executed_block_header: Option
= None; for block_number in start..=end { let timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["fetching_block_from_era"]); @@ -87,19 +87,14 @@ impl TrinExecution { .clone(); stop_timer(timer); - block_executor.execute_block(&block)?; + block_executor + .execute_block_with_tracer(&block, |tx| self.create_tracer(&block.header, tx))?; + last_executed_block_header = Some(block.header); - // Commit the bundle if we have reached the limits, to prevent to much memory usage + // Commit early if we have reached the limits, to prevent too much memory usage. // We won't use this during the dos attack to avoid writing empty accounts to disk - if !(2_200_000..2_700_000).contains(&block_number) - && should_we_commit_block_execution_early( - block_number - start, - block_executor.bundle_size_hint() as u64, - block_executor.cumulative_gas_used(), - range_start.elapsed(), - ) - { + if block_executor.should_commit() && !(2_200_000..2_700_000).contains(&block_number) { break; } } @@ -118,108 +113,35 @@ impl TrinExecution { let timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["set_block_execution_number"]); self.execution_position - .update_position(self.database.db.clone(), last_executed_block_header)?; + .update_position(self.database.db.clone(), &last_executed_block_header)?; stop_timer(timer); Ok(root_with_trie_diff) } - pub async fn process_next_block(&mut self) -> anyhow::Result { - self.process_range_of_blocks(self.next_block_number()).await - } - - pub fn next_block_number(&self) -> u64 { - self.execution_position.next_block_number() - } - pub fn get_root(&mut self) -> anyhow::Result { Ok(self.database.trie.lock().root_hash()?) } - pub fn get_root_with_trie_diff(&mut self) -> anyhow::Result { - Ok(self.database.trie.lock().root_hash_with_changed_nodes()?) - } - - pub fn get_account_state(&self, account: &Address) -> anyhow::Result { - let account_state = self.database.db.get(keccak256(account))?; - match account_state { - Some(account) => { - let account = AccountState::decode(&mut account.as_slice())?; - Ok(account) - } - None => Ok(AccountState::default()), - } - } - - pub fn get_proof(&mut self, address: Address) -> anyhow::Result { - let proof: Vec = self - .database - .trie - .lock() - .get_proof(keccak256(address).as_slice())? - .into_iter() - .map(Bytes::from) - .collect(); - let last_node = proof.last().ok_or(anyhow!("Missing proof!"))?; - - let eth_trie::node::Node::Leaf(last_node) = eth_trie::decode_node(&mut last_node.as_ref())? - else { - bail!("Last node in the proof should be leaf!") - }; - let mut last_node_nibbles = last_node.key.clone(); - if last_node_nibbles.is_leaf() { - last_node_nibbles.pop(); - } else { - bail!("Nibbles of the last node should have LEAF Marker") - } - - let mut path = address_to_nibble_path(address); - if path.ends_with(last_node_nibbles.get_data()) { - path.truncate(path.len() - last_node_nibbles.len()); - } else { - bail!("Path should have a suffix of last node's nibbles") - } - - Ok(TrieProof { path, proof }) - } - - pub fn get_proofs(&mut self, accounts: &BTreeSet
) -> anyhow::Result> { - accounts - .iter() - .map(|account| self.get_proof(*account)) - .collect() + fn create_tracer(&self, header: &Header, tx: &Transaction) -> Option { + self.config + .block_to_trace + .create_trace_writer(self.node_data_directory.clone(), header, tx) + .unwrap_or_else(|err| { + warn!("Error while creating trace file: {err}. Skipping."); + None + }) + .map(TracerEip3155::new) } } -/// This function is used to determine if we should commit the block execution early. -/// We want this for a few reasons -/// - To prevent memory usage from getting too high -/// - To cap the amount of time it takes to commit everything to the database, the bigger the -/// changes the more time it takes The various limits are arbitrary and can be adjusted as needed, -/// but are based on the current state of the network and what we have seen so far -pub fn should_we_commit_block_execution_early( - blocks_processed: u64, - pending_state_changes: u64, - cumulative_gas_used: u64, - elapsed: Duration, -) -> bool { - blocks_processed >= 500_000 - || pending_state_changes >= 5_000_000 - || cumulative_gas_used >= 30_000_000 * 50_000 - || elapsed >= Duration::from_secs(30 * 60) -} - #[cfg(test)] mod tests { use std::fs; - use crate::{ - config::StateConfig, era::utils::process_era1_file, execution::TrinExecution, - storage::utils::setup_temp_dir, - }; + use crate::{era::utils::process_era1_file, storage::utils::setup_temp_dir}; - use alloy_primitives::Address; - use revm_primitives::hex::FromHex; + use super::*; #[tokio::test] async fn test_we_generate_the_correct_state_root_for_the_first_8192_blocks() { @@ -237,21 +159,4 @@ mod tests { assert_eq!(trin_execution.get_root().unwrap(), block.header.state_root); } } - - #[tokio::test] - async fn test_get_proof() { - let temp_directory = setup_temp_dir().unwrap(); - let mut trin_execution = TrinExecution::new( - Some(temp_directory.path().to_path_buf()), - StateConfig::default(), - ) - .await - .unwrap(); - trin_execution.process_next_block().await.unwrap(); - let valid_proof = trin_execution - .get_proof(Address::from_hex("0x001d14804b399c6ef80e64576f657660804fec0b").unwrap()) - .unwrap(); - assert_eq!(valid_proof.path, [5, 9, 2, 13]); - // the proof is already tested by eth-trie.rs - } } diff --git a/trin-execution/src/storage/execution_position.rs b/trin-execution/src/storage/execution_position.rs index 43b2c959d..efa0499e2 100644 --- a/trin-execution/src/storage/execution_position.rs +++ b/trin-execution/src/storage/execution_position.rs @@ -47,7 +47,7 @@ impl ExecutionPosition { self.state_root } - pub fn update_position(&mut self, db: Arc, header: Header) -> anyhow::Result<()> { + pub fn update_position(&mut self, db: Arc, header: &Header) -> anyhow::Result<()> { self.next_block_number = header.number + 1; self.state_root = header.state_root; db.put(EXECUTION_POSITION_DB_KEY, alloy_rlp::encode(self))?; diff --git a/trin-execution/src/subcommands/era2/import.rs b/trin-execution/src/subcommands/era2/import.rs index f300bebcc..81747d3b5 100644 --- a/trin-execution/src/subcommands/era2/import.rs +++ b/trin-execution/src/subcommands/era2/import.rs @@ -113,7 +113,7 @@ impl StateImporter { // Save execution position self.trin_execution .execution_position - .update_position(self.trin_execution.database.db.clone(), era2.header.header)?; + .update_position(self.trin_execution.database.db.clone(), &era2.header.header)?; info!("Done importing State from .era2 file"); diff --git a/trin-execution/src/trie_walker.rs b/trin-execution/src/trie_walker.rs index ae3581184..886d1ad9f 100644 --- a/trin-execution/src/trie_walker.rs +++ b/trin-execution/src/trie_walker.rs @@ -159,10 +159,10 @@ impl TrieWalker { #[cfg(test)] mod tests { - use alloy_primitives::{keccak256, Address}; - use anyhow::anyhow; - use eth_trie::RootWithTrieDiff; - use revm_primitives::hex::FromHex; + use std::str::FromStr; + + use alloy_primitives::{keccak256, Address, Bytes}; + use eth_trie::{RootWithTrieDiff, Trie}; use crate::{ config::StateConfig, execution::TrinExecution, storage::utils::setup_temp_dir, @@ -179,18 +179,24 @@ mod tests { .await .unwrap(); let RootWithTrieDiff { trie_diff, .. } = trin_execution.process_next_block().await.unwrap(); - let valid_proof = trin_execution - .get_proof(Address::from_hex("0x001d14804b399c6ef80e64576f657660804fec0b").unwrap()) - .unwrap(); let root_hash = trin_execution.get_root().unwrap(); let walk_diff = TrieWalker::new(root_hash, trie_diff); - let last_node = valid_proof - .proof - .last() - .ok_or(anyhow!("Missing proof!")) - .unwrap(); + let address = Address::from_str("0x001d14804b399c6ef80e64576f657660804fec0b").unwrap(); + let valid_proof = trin_execution + .database + .trie + .lock() + .get_proof(keccak256(address).as_slice()) + .unwrap() + .into_iter() + .map(Bytes::from) + .collect::>(); + let last_node = valid_proof.last().expect("Missing proof!"); + let account_proof = walk_diff.get_proof(keccak256(last_node)); - assert_eq!(valid_proof, account_proof); + + assert_eq!(account_proof.path, [5, 9, 2, 13]); + assert_eq!(account_proof.proof, valid_proof); } } diff --git a/trin-execution/src/types/block_to_trace.rs b/trin-execution/src/types/block_to_trace.rs index e252ac7f0..3f163fc1b 100644 --- a/trin-execution/src/types/block_to_trace.rs +++ b/trin-execution/src/types/block_to_trace.rs @@ -1,4 +1,11 @@ -use std::str::FromStr; +use std::{ + fs::{self, File}, + io::{BufWriter, Write}, + path::PathBuf, + str::FromStr, +}; + +use ethportal_api::{types::execution::transaction::Transaction, Header}; #[derive(Clone, Debug, PartialEq, Default, Eq)] pub enum BlockToTrace { @@ -16,6 +23,28 @@ impl BlockToTrace { BlockToTrace::Block(b) => *b == block_number, } } + + /// Creates file writer for tracing given transaction. + /// + /// Returns None if transaction shouldn't be traced. + pub fn create_trace_writer( + &self, + root_dir: PathBuf, + header: &Header, + tx: &Transaction, + ) -> std::io::Result>> { + let block_number = header.number; + if self.should_trace(block_number) { + let output_dir = root_dir + .join("evm_traces") + .join(format!("block_{block_number}")); + fs::create_dir_all(&output_dir)?; + let output_file = File::create(output_dir.join(format!("tx_{}.json", tx.hash())))?; + Ok(Some(Box::new(BufWriter::new(output_file)))) + } else { + Ok(None) + } + } } impl FromStr for BlockToTrace {