diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index b891bea63a..9b54aac60a 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -16,7 +16,9 @@ sp-api = { git = "https://github.com/paritytech/substrate.git", branch = "rococo sc-client-api = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" } sp-block-builder = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" } sp-inherents = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" } +sp-io = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" } frontier-consensus-primitives = { path = "primitives" } +frontier-rpc-primitives = { path = "../rpc/primitives" } sp-consensus = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" } log = "0.4.8" futures = { version = "0.3.1", features = ["compat"] } diff --git a/consensus/src/aux_schema.rs b/consensus/src/aux_schema.rs index 7b4f7a3b6c..de6b2113ca 100644 --- a/consensus/src/aux_schema.rs +++ b/consensus/src/aux_schema.rs @@ -21,6 +21,7 @@ use sp_core::H256; use sp_runtime::traits::Block as BlockT; use sc_client_api::backend::AuxStore; use sp_blockchain::{Result as ClientResult, Error as ClientError}; +use frontier_rpc_primitives::TransactionStatus; fn load_decode(backend: &B, key: &[u8]) -> ClientResult> { let corrupt = |e: codec::Error| { @@ -96,3 +97,30 @@ pub fn write_transaction_metadata( let key = transaction_metadata_key(hash); write_aux(&[(&key, &metadata.encode())]) } +/// Map a Ethereum block number to the current runtime stored Ethereum logs. +pub fn log_key(block_number: u32) -> Vec { + let mut ret = b"ethereum_log:".to_vec(); + ret.append(&mut block_number.to_be_bytes().to_vec()); + ret +} + +/// Given an Ethereum block number, get the corresponding Ethereum logs. +pub fn load_logs( + backend: &B, + block_number: u32, +) -> ClientResult)>> { + let key = log_key(block_number); + load_decode(backend, &key) +} + +/// Update Aux logs. +pub fn write_logs( + block_number: u32, + data: (H256, Vec), + write_aux: F, +) -> R where + F: FnOnce(&[(&[u8], &[u8])]) -> R, +{ + let key = log_key(block_number); + write_aux(&[(&key, &data.encode())]) +} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 0ba3363200..ee54853e2f 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -18,22 +18,26 @@ mod aux_schema; -pub use crate::aux_schema::{load_block_hash, load_transaction_metadata}; +pub use crate::aux_schema::{load_block_hash, load_transaction_metadata, load_logs}; use std::sync::Arc; use std::collections::HashMap; use std::marker::PhantomData; +use codec::Decode; use frontier_consensus_primitives::{FRONTIER_ENGINE_ID, ConsensusLog}; -use sc_client_api::{BlockOf, backend::AuxStore}; +use sc_client_api::{BlockOf, backend::AuxStore, StorageProvider, Backend, StateBackend}; use sp_blockchain::{HeaderBackend, ProvideCache, well_known_cache_keys::Id as CacheKeyId}; use sp_block_builder::BlockBuilder as BlockBuilderApi; -use sp_runtime::generic::OpaqueDigestItemId; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use sp_runtime::generic::{OpaqueDigestItemId, BlockId}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, BlakeTwo256, UniqueSaturatedInto, Saturating, One}; use sp_api::ProvideRuntimeApi; +use sp_core::{H256, storage::StorageKey}; +use sp_io::hashing::twox_128; use sp_consensus::{ BlockImportParams, Error as ConsensusError, BlockImport, BlockCheckParams, ImportResult, }; +use frontier_rpc_primitives::TransactionStatus; use log::*; use sc_client_api; @@ -57,14 +61,14 @@ impl std::convert::From for ConsensusError { } } -pub struct FrontierBlockImport { +pub struct FrontierBlockImport { inner: I, client: Arc, enabled: bool, - _marker: PhantomData, + _marker: PhantomData<(B, BE)>, } -impl, C> Clone for FrontierBlockImport { +impl, C, BE> Clone for FrontierBlockImport { fn clone(&self) -> Self { FrontierBlockImport { inner: self.inner.clone(), @@ -75,11 +79,13 @@ impl, C> Clone for FrontierBlockImp } } -impl FrontierBlockImport where +impl FrontierBlockImport where B: BlockT, + BE: Backend, + BE::State: StateBackend, I: BlockImport> + Send + Sync, I::Error: Into, - C: ProvideRuntimeApi + Send + Sync + HeaderBackend + AuxStore + ProvideCache + BlockOf, + C: ProvideRuntimeApi + Send + Sync + HeaderBackend + AuxStore + ProvideCache + BlockOf + StorageProvider, C::Api: BlockBuilderApi, { pub fn new( @@ -96,11 +102,13 @@ impl FrontierBlockImport where } } -impl BlockImport for FrontierBlockImport where +impl BlockImport for FrontierBlockImport where B: BlockT, + BE: Backend, + BE::State: StateBackend, I: BlockImport> + Send + Sync, I::Error: Into, - C: ProvideRuntimeApi + Send + Sync + HeaderBackend + AuxStore + ProvideCache + BlockOf, + C: ProvideRuntimeApi + Send + Sync + HeaderBackend + AuxStore + ProvideCache + BlockOf + StorageProvider, C::Api: BlockBuilderApi, { type Error = ConsensusError; @@ -145,6 +153,20 @@ impl BlockImport for FrontierBlockImport where insert_closure!(), ); } + + // Store already processed TransactionStatus by block number. + if *block.header.number() > One::one() { + let number = UniqueSaturatedInto::::unique_saturated_into( + block.header.number().saturating_sub(One::one()) + ); + if let Some(data) = logs(client.as_ref(), number) { + aux_schema::write_logs( + number, + data, + insert_closure!() + ); + } + } }, } } @@ -170,3 +192,33 @@ fn find_frontier_log( Ok(frontier_log.ok_or(Error::NoPostRuntimeLog)?) } + +fn logs( + client: &C, + block_number: u32, +) -> Option<(H256, Vec)> where + B: BlockT, + BE: Backend, + BE::State: StateBackend, + C: HeaderBackend + StorageProvider, +{ + if let Ok(Some(header)) = client.header(BlockId::Number(block_number.into())) + { + if let Ok(ConsensusLog::EndBlock { block_hash, .. }) = find_frontier_log::(&header) { + if let Ok(Some(data)) = client.storage( + &BlockId::Number(block_number.into()), + &StorageKey( + storage_prefix_build(b"Ethereum", b"CurrentTransactionStatuses") + ) + ) { + let statuses: Vec = Decode::decode(&mut &data.0[..]).unwrap(); + return Some((block_hash, statuses)) + } + } + } + None +} + +fn storage_prefix_build(module: &[u8], storage: &[u8]) -> Vec { + [twox_128(module), twox_128(storage)].concat().to_vec() +} diff --git a/frame/ethereum/src/lib.rs b/frame/ethereum/src/lib.rs index e937b074d1..ec7a6c406b 100644 --- a/frame/ethereum/src/lib.rs +++ b/frame/ethereum/src/lib.rs @@ -279,8 +279,17 @@ impl Module { } CurrentBlock::put(block.clone()); - CurrentReceipts::put(receipts.clone()); - CurrentTransactionStatuses::put(statuses.clone()); + if receipts.len() > 0 { + CurrentReceipts::put(receipts.clone()); + } else { + CurrentReceipts::kill(); + } + + if statuses.len() > 0 { + CurrentTransactionStatuses::put(statuses.clone()); + } else { + CurrentTransactionStatuses::kill(); + } let digest = DigestItem::::Consensus( FRONTIER_ENGINE_ID, diff --git a/rpc/src/eth.rs b/rpc/src/eth.rs index 9e81044f40..668b4175e8 100644 --- a/rpc/src/eth.rs +++ b/rpc/src/eth.rs @@ -830,8 +830,13 @@ impl EthApiT for EthApi where let block: Option = self.current_block(&id); let statuses: Option> = self.current_statuses(&id); - if let (Some(block), Some(statuses)) = (block, statuses) { - blocks_and_statuses.push((block, statuses)); + let block_hash = Some(H256::from_slice( + Keccak256::digest(&rlp::encode(&block.clone().unwrap().header)).as_slice() + )); + let block_number = Some(block.unwrap().header.number); + + if let (Some(block_hash), Some(block_number), Some(statuses)) = (block_hash, block_number, statuses) { + blocks_and_statuses.push((block_hash, block_number, statuses)); } } else { let mut current_number = filter.to_block @@ -848,14 +853,21 @@ impl EthApiT for EthApi where self.client.info().best_number ); while current_number >= from_number { - let id = BlockId::Number(current_number); - - let block: Option = self.current_block(&id); - let statuses: Option> = self.current_statuses(&id); - - if let (Some(block), Some(statuses)) = (block, statuses) { - blocks_and_statuses.push((block, statuses)); - } + let number = UniqueSaturatedInto::::unique_saturated_into(current_number); + + match frontier_consensus::load_logs( + self.client.as_ref(), + number + ).map_err(|err| internal_err(format!("fetch aux store failed: {:?}", err)))? + { + Some((block_hash, statuses)) => { + let block_number = U256::from( + UniqueSaturatedInto::::unique_saturated_into(current_number) + ); + blocks_and_statuses.push((block_hash, block_number, statuses)); + }, + _ => {}, + }; if current_number == Zero::zero() { break @@ -868,14 +880,11 @@ impl EthApiT for EthApi where let mut blocks_processed: u32 = 0; let mut logs_processed: u32 = 0; - 'outer: for (block, statuses) in blocks_and_statuses { + 'outer: for (block_hash, block_number, statuses) in blocks_and_statuses { if blocks_processed == eth_block_limit { break; } let mut block_log_index: u32 = 0; - let block_hash = H256::from_slice( - Keccak256::digest(&rlp::encode(&block.header)).as_slice() - ); for status in statuses.iter() { let logs = status.logs.clone(); let mut transaction_log_index: u32 = 0; @@ -911,8 +920,8 @@ impl EthApiT for EthApi where address: log.address.clone(), topics: log.topics.clone(), data: Bytes(log.data.clone()), - block_hash: Some(block_hash), - block_number: Some(block.header.number.clone()), + block_hash: Some(block_hash.clone()), + block_number: Some(block_number.clone()), transaction_hash: Some(transaction_hash), transaction_index: Some(U256::from(status.transaction_index)), log_index: Some(U256::from(block_log_index)),