Skip to content

Commit

Permalink
chore/perf: RefWrapper on witness, impl Database for rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
nhtyy committed Oct 22, 2024
1 parent 3647076 commit 9606341
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 49 deletions.
23 changes: 11 additions & 12 deletions crates/executor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use reth_evm_optimism::OpExecutorProvider;
use reth_execution_types::ExecutionOutcome;
use reth_optimism_consensus::validate_block_post_execution as validate_block_post_execution_optimism;
use reth_primitives::{proofs, Block, BlockWithSenders, Bloom, Header, Receipt, Receipts, Request};
use revm::{db::CacheDB, Database};
use revm::{db::WrapDatabaseRef, Database};
use revm_primitives::{address, U256};

/// Chain ID for Ethereum Mainnet.
Expand All @@ -43,7 +43,7 @@ pub trait Variant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>;
Expand Down Expand Up @@ -101,7 +101,7 @@ impl ClientExecutor {
{
// Initialize the witnessed database with verified storage proofs.
let witness_db = input.witness_db()?;
let cache_db = CacheDB::new(&witness_db);
let db = WrapDatabaseRef(witness_db);

// Execute the block.
let spec = V::spec();
Expand All @@ -113,9 +113,8 @@ impl ClientExecutor {
.ok_or(eyre!("failed to recover senders"))
})?;
let executor_difficulty = input.current_block.header.difficulty;
let executor_output = profile!("execute", {
V::execute(&executor_block_input, executor_difficulty, cache_db)
})?;
let executor_output =
profile!("execute", { V::execute(&executor_block_input, executor_difficulty, db) })?;

// Validate the block post execution.
profile!("validate block post-execution", {
Expand Down Expand Up @@ -183,7 +182,7 @@ impl Variant for EthereumVariant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>,
Expand All @@ -192,7 +191,7 @@ impl Variant for EthereumVariant {
Self::spec().into(),
CustomEvmConfig::from_variant(ChainVariant::Ethereum),
)
.executor(cache_db)
.executor(db)
.execute((executor_block_input, executor_difficulty).into())?)
}

Expand All @@ -214,7 +213,7 @@ impl Variant for OptimismVariant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>,
Expand All @@ -223,7 +222,7 @@ impl Variant for OptimismVariant {
Self::spec().into(),
CustomEvmConfig::from_variant(ChainVariant::Optimism),
)
.executor(cache_db)
.executor(db)
.execute((executor_block_input, executor_difficulty).into())?)
}

Expand All @@ -245,7 +244,7 @@ impl Variant for LineaVariant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>,
Expand All @@ -254,7 +253,7 @@ impl Variant for LineaVariant {
Self::spec().into(),
CustomEvmConfig::from_variant(ChainVariant::Linea),
)
.executor(cache_db)
.executor(db)
.execute((executor_block_input, executor_difficulty).into())?)
}

Expand Down
7 changes: 3 additions & 4 deletions crates/executor/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> HostExecutor<T, P

// Setup the database for the block executor.
tracing::info!("setting up the database for the block executor");
let rpc_db = RpcDb::new(self.provider.clone(), block_number - 1);
let cache_db = CacheDB::new(&rpc_db);
let mut rpc_db = RpcDb::new(self.provider.clone(), block_number - 1);

// Execute the block and fetch all the necessary data along the way.
tracing::info!(
Expand All @@ -82,7 +81,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> HostExecutor<T, P
.with_recovered_senders()
.ok_or(eyre!("failed to recover senders"))?;
let executor_difficulty = current_block.header.difficulty;
let executor_output = V::execute(&executor_block_input, executor_difficulty, cache_db)?;
let executor_output = V::execute(&executor_block_input, executor_difficulty, &mut rpc_db)?;

// Validate the block post execution.
tracing::info!("validating the block post execution");
Expand Down Expand Up @@ -196,7 +195,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> HostExecutor<T, P
);

// Fetch the parent headers needed to constrain the BLOCKHASH opcode.
let oldest_ancestor = *rpc_db.oldest_ancestor.borrow();
let oldest_ancestor = rpc_db.oldest_ancestor;
let mut ancestor_headers = vec![];
tracing::info!("fetching {} ancestor headers", block_number - oldest_ancestor);
for height in (oldest_ancestor..=(block_number - 1)).rev() {
Expand Down
9 changes: 7 additions & 2 deletions crates/executor/host/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ where
.try_init();

// Setup the provider.
let rpc_url =
Url::parse(std::env::var(env_var_key).unwrap().as_str()).expect("invalid rpc url");
let rpc_url = Url::parse(
std::env::var(env_var_key)
.unwrap_or_else(|_| panic!("Env var {} to be set", env_var_key))
.as_str(),
)
.expect("invalid rpc url");

let provider = ReqwestProvider::new_http(rpc_url);

// Setup the host executor.
Expand Down
87 changes: 56 additions & 31 deletions crates/storage/rpc-db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet},
marker::PhantomData,
};
Expand All @@ -11,23 +10,28 @@ use reth_primitives::{
revm_primitives::{AccountInfo, Bytecode},
Address, B256, U256,
};
use reth_revm::DatabaseRef;
use reth_revm::Database;
use reth_storage_errors::{db::DatabaseError, provider::ProviderError};
use revm_primitives::HashMap;

/// A database that fetches data from a [Provider] over a [Transport].
///
/// This type is very similar to a CacheDb as exposed by revm, except we have some extra fields
/// and the inner types are more conducive to the state we need to extract.
#[derive(Debug, Clone)]
pub struct RpcDb<T, P> {
/// The provider which fetches data.
pub provider: P,
/// The block to fetch data from.
pub block: BlockId,
/// The cached accounts.
pub accounts: RefCell<HashMap<Address, AccountInfo>>,
pub accounts: HashMap<Address, AccountInfo>,
/// The cached storage values.
pub storage: RefCell<HashMap<Address, HashMap<U256, U256>>>,
pub storage: HashMap<Address, HashMap<U256, U256>>,
/// The cached block hashes
pub block_hash_by_number: HashMap<u64, B256>,
/// The oldest block whose header/hash has been requested.
pub oldest_ancestor: RefCell<u64>,
pub oldest_ancestor: u64,
/// A phantom type to make the struct generic over the transport.
pub _phantom: PhantomData<T>,
}
Expand All @@ -43,23 +47,35 @@ pub enum RpcDbError {
PreimageNotFound,
}

impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
impl<T, P> RpcDb<T, P>
where
T: Transport + Clone,
P: Provider<T, AnyNetwork>,
{
/// Create a new [`RpcDb`].
pub fn new(provider: P, block: u64) -> Self {
RpcDb {
provider,
block: block.into(),
accounts: RefCell::new(HashMap::new()),
storage: RefCell::new(HashMap::new()),
oldest_ancestor: RefCell::new(block),
accounts: HashMap::new(),
storage: HashMap::new(),
block_hash_by_number: HashMap::new(),
oldest_ancestor: block,
_phantom: PhantomData,
}
}

/// Fetch the [AccountInfo] for an [Address].
pub async fn fetch_account_info(&self, address: Address) -> Result<AccountInfo, RpcDbError> {
pub async fn fetch_account_info(
&mut self,
address: Address,
) -> Result<AccountInfo, RpcDbError> {
tracing::info!("fetching account info for address: {}", address);

if let Some(account_info) = self.accounts.get(&address) {
return Ok(account_info.clone());
}

// Fetch the proof for the account.
let proof = self
.provider
Expand All @@ -86,19 +102,25 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
};

// Record the account info to the state.
self.accounts.borrow_mut().insert(address, account_info.clone());
self.accounts.insert(address, account_info.clone());

Ok(account_info)
}

/// Fetch the storage value at an [Address] and [U256] index.
pub async fn fetch_storage_at(
&self,
&mut self,
address: Address,
index: U256,
) -> Result<U256, RpcDbError> {
tracing::info!("fetching storage value at address: {}, index: {}", address, index);

if let Some(account) = self.storage.get(&address) {
if let Some(value) = account.get(&index) {
return Ok(*value);
}
}

// Fetch the storage value.
let value = self
.provider
Expand All @@ -108,17 +130,20 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
.map_err(|e| RpcDbError::RpcError(e.to_string()))?;

// Record the storage value to the state.
let mut storage_values = self.storage.borrow_mut();
let entry = storage_values.entry(address).or_default();
let entry = self.storage.entry(address).or_default();
entry.insert(index, value);

Ok(value)
}

/// Fetch the block hash for a block number.
pub async fn fetch_block_hash(&self, number: u64) -> Result<B256, RpcDbError> {
pub async fn fetch_block_hash(&mut self, number: u64) -> Result<B256, RpcDbError> {
tracing::info!("fetching block hash for block number: {}", number);

if let Some(hash) = self.block_hash_by_number.get(&number) {
return Ok(*hash);
}

// Fetch the block.
let block = self
.provider
Expand All @@ -130,22 +155,20 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
let block = block.ok_or(RpcDbError::BlockNotFound)?;
let hash = block.header.hash;

let mut oldest_ancestor = self.oldest_ancestor.borrow_mut();
*oldest_ancestor = number.min(*oldest_ancestor);
self.block_hash_by_number.insert(number, hash);
self.oldest_ancestor = number.min(self.oldest_ancestor);

Ok(hash)
}

/// Gets all the state keys used. The client uses this to read the actual state data from tries.
pub fn get_state_requests(&self) -> HashMap<Address, Vec<U256>> {
let accounts = self.accounts.borrow();
let storage = self.storage.borrow();

accounts
self.accounts
.keys()
.chain(storage.keys())
.chain(self.storage.keys())
.map(|&address| {
let storage_keys_for_address: BTreeSet<U256> = storage
let storage_keys_for_address: BTreeSet<U256> = self
.storage
.get(&address)
.map(|storage_map| storage_map.keys().cloned().collect())
.unwrap_or_default();
Expand All @@ -157,9 +180,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {

/// Gets all account bytecodes.
pub fn get_bytecodes(&self) -> Vec<Bytecode> {
let accounts = self.accounts.borrow();

accounts
self.accounts
.values()
.flat_map(|account| account.code.clone())
.map(|code| (code.hash_slow(), code))
Expand All @@ -169,10 +190,14 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
}
}

impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> DatabaseRef for RpcDb<T, P> {
impl<T, P> Database for RpcDb<T, P>
where
T: Transport + Clone,
P: Provider<T, AnyNetwork>,
{
type Error = ProviderError;

fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
fn basic(&mut self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
ProviderError::Database(DatabaseError::Other("no tokio runtime found".to_string()))
})?;
Expand All @@ -183,11 +208,11 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> DatabaseRef for R
Ok(Some(account_info))
}

fn code_by_hash_ref(&self, _code_hash: B256) -> Result<Bytecode, Self::Error> {
fn code_by_hash(&mut self, _code_hash: B256) -> Result<Bytecode, Self::Error> {
unimplemented!()
}

fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
fn storage(&mut self, address: Address, index: U256) -> Result<U256, Self::Error> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
ProviderError::Database(DatabaseError::Other("no tokio runtime found".to_string()))
})?;
Expand All @@ -198,7 +223,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> DatabaseRef for R
Ok(value)
}

fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
fn block_hash(&mut self, number: u64) -> Result<B256, Self::Error> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
ProviderError::Database(DatabaseError::Other("no tokio runtime found".to_string()))
})?;
Expand Down

0 comments on commit 9606341

Please sign in to comment.