diff --git a/trin-execution/src/execution.rs b/trin-execution/src/execution.rs index b286d7a16..2522fa26f 100644 --- a/trin-execution/src/execution.rs +++ b/trin-execution/src/execution.rs @@ -4,7 +4,7 @@ use eth_trie::{RootWithTrieDiff, Trie}; use ethportal_api::{types::execution::transaction::Transaction, Header}; use revm::inspectors::TracerEip3155; use std::{path::PathBuf, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{oneshot::Receiver, Mutex}; use tracing::{info, warn}; use crate::{ @@ -58,26 +58,32 @@ impl TrinExecution { } pub async fn process_next_block(&mut self) -> anyhow::Result { - self.process_range_of_blocks(self.next_block_number()).await + self.process_range_of_blocks(self.next_block_number(), None) + .await } - /// Processes up to end block number (inclusive) and returns the root with trie diff - /// If the state cache gets too big, we will commit the state to the database early and return - /// the function early along with it - pub async fn process_range_of_blocks(&mut self, end: u64) -> anyhow::Result { - let start = self.execution_position.next_block_number(); + /// Processes blocks up to last block number (inclusive) and returns the root with trie diff. + /// + /// If the state cache gets too big, we will commit the state and continue. Execution can be + /// interupted early by sending `stop_signal`, in which case we will commit and return. + pub async fn process_range_of_blocks( + &mut self, + last_block: u64, + mut stop_signal: Option>, + ) -> anyhow::Result { + let start_block = self.execution_position.next_block_number(); ensure!( - end >= start, - "End block number {end} is less than start block number {start}", + last_block >= start_block, + "Last block number {last_block} is less than start block number {start_block}", ); - info!("Processing blocks from {start} to {end} (inclusive)"); + info!("Processing blocks from {start_block} to {last_block} (inclusive)"); let mut block_executor = BlockExecutor::new(self.database.clone()); - let mut last_executed_block_header: Option
= None; - for block_number in start..=end { - let timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["fetching_block_from_era"]); + loop { + let fetching_block_timer = + start_timer_vec(&BLOCK_PROCESSING_TIMES, &["fetching_block_from_era"]); let block = self .era_manager .lock() @@ -85,44 +91,60 @@ impl TrinExecution { .get_next_block() .await? .clone(); - stop_timer(timer); + stop_timer(fetching_block_timer); block_executor .execute_block_with_tracer(&block, |tx| self.create_tracer(&block.header, tx))?; - last_executed_block_header = Some(block.header); + // Commit and return if we reached last block or stop signal is received. + let stop_signal_received = stop_signal + .as_mut() + .is_some_and(|stop_signal| stop_signal.try_recv().is_ok()); + if block.header.number == last_block || stop_signal_received { + if stop_signal_received { + info!("Stop signal received. Committing now, please wait!"); + } + return self.commit(&block.header, block_executor).await; + } // Commit early if we have reached the limits, to prevent too much memory usage. // We won't use this during the dos attack to avoid writing empty accounts to disk - if block_executor.should_commit() && !(2_200_000..2_700_000).contains(&block_number) { - break; + if block_executor.should_commit() + && !(2_200_000..2_700_000).contains(&block.header.number) + { + self.commit(&block.header, block_executor).await?; + block_executor = BlockExecutor::new(self.database.clone()); } } + } - let last_executed_block_header = - last_executed_block_header.expect("At least one block must have been executed"); + pub fn get_root(&mut self) -> anyhow::Result { + Ok(self.database.trie.lock().root_hash()?) + } + async fn commit( + &mut self, + header: &Header, + block_executor: BlockExecutor<'_>, + ) -> anyhow::Result { let root_with_trie_diff = block_executor.commit_bundle()?; ensure!( - root_with_trie_diff.root == last_executed_block_header.state_root, + root_with_trie_diff.root == header.state_root, "State root doesn't match! Irreversible! Block number: {} | Generated root: {} | Expected root: {}", - last_executed_block_header.number, + header.number, root_with_trie_diff.root, - last_executed_block_header.state_root + header.state_root ); - let timer = start_timer_vec(&BLOCK_PROCESSING_TIMES, &["set_block_execution_number"]); + let update_execution_position_timer = + start_timer_vec(&BLOCK_PROCESSING_TIMES, &["set_block_execution_number"]); self.execution_position - .update_position(self.database.db.clone(), &last_executed_block_header)?; - stop_timer(timer); + .update_position(self.database.db.clone(), header)?; + stop_timer(update_execution_position_timer); Ok(root_with_trie_diff) } - pub fn get_root(&mut self) -> anyhow::Result { - Ok(self.database.trie.lock().root_hash()?) - } - fn create_tracer(&self, header: &Header, tx: &Transaction) -> Option { self.config .block_to_trace diff --git a/trin-execution/src/main.rs b/trin-execution/src/main.rs index 47283d1dd..bba32ffea 100644 --- a/trin-execution/src/main.rs +++ b/trin-execution/src/main.rs @@ -33,14 +33,6 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - tx.send(true) - .await - .expect("signal ctrl_c should never fail"); - }); - if let Some(command) = trin_execution_config.command { match command { TrinExecutionSubCommands::ImportState(import_state) => { @@ -66,23 +58,16 @@ async fn main() -> anyhow::Result<()> { } } - let mut block_number = trin_execution.next_block_number(); + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + tx.send(()).expect("signal ctrl_c should never fail"); + }); let end_block = get_spec_block_number(SpecId::MERGE); - while block_number < end_block { - if rx.try_recv().is_ok() { - trin_execution.database.db.flush()?; - info!( - "Received SIGINT, stopping execution: {} {}", - block_number - 1, - trin_execution.get_root()? - ); - break; - } - - trin_execution.process_range_of_blocks(end_block).await?; - block_number = trin_execution.next_block_number(); - } + trin_execution + .process_range_of_blocks(end_block, Some(rx)) + .await?; Ok(()) }