Skip to content

Commit

Permalink
AuxStore logs
Browse files Browse the repository at this point in the history
  • Loading branch information
tgmichel committed Oct 19, 2020
1 parent 833758f commit 92a8034
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 29 deletions.
2 changes: 2 additions & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ sp-api = { git = "https://github.com/paritytech/substrate.git", branch = "rococo
sc-client-api = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" }
sp-block-builder = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" }
sp-inherents = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" }
sp-io = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" }
frontier-consensus-primitives = { path = "primitives" }
frontier-rpc-primitives = { path = "../rpc/primitives" }
sp-consensus = { git = "https://github.com/paritytech/substrate.git", branch = "rococo-branch" }
log = "0.4.8"
futures = { version = "0.3.1", features = ["compat"] }
Expand Down
28 changes: 28 additions & 0 deletions consensus/src/aux_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use sp_core::H256;
use sp_runtime::traits::Block as BlockT;
use sc_client_api::backend::AuxStore;
use sp_blockchain::{Result as ClientResult, Error as ClientError};
use frontier_rpc_primitives::TransactionStatus;

fn load_decode<B: AuxStore, T: Decode>(backend: &B, key: &[u8]) -> ClientResult<Option<T>> {
let corrupt = |e: codec::Error| {
Expand Down Expand Up @@ -96,3 +97,30 @@ pub fn write_transaction_metadata<F, R>(
let key = transaction_metadata_key(hash);
write_aux(&[(&key, &metadata.encode())])
}
/// Map a Ethereum block number to the current runtime stored Ethereum logs.
pub fn log_key(block_number: u32) -> Vec<u8> {
let mut ret = b"ethereum_log:".to_vec();
ret.append(&mut block_number.to_be_bytes().to_vec());
ret
}

/// Given an Ethereum block number, get the corresponding Ethereum logs.
pub fn load_logs<B: AuxStore>(
backend: &B,
block_number: u32,
) -> ClientResult<Option<(H256, Vec<TransactionStatus>)>> {
let key = log_key(block_number);
load_decode(backend, &key)
}

/// Update Aux logs.
pub fn write_logs<F, R>(
block_number: u32,
data: (H256, Vec<TransactionStatus>),
write_aux: F,
) -> R where
F: FnOnce(&[(&[u8], &[u8])]) -> R,
{
let key = log_key(block_number);
write_aux(&[(&key, &data.encode())])
}
74 changes: 63 additions & 11 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@

mod aux_schema;

pub use crate::aux_schema::{load_block_hash, load_transaction_metadata};
pub use crate::aux_schema::{load_block_hash, load_transaction_metadata, load_logs};

use std::sync::Arc;
use std::collections::HashMap;
use std::marker::PhantomData;
use codec::Decode;
use frontier_consensus_primitives::{FRONTIER_ENGINE_ID, ConsensusLog};
use sc_client_api::{BlockOf, backend::AuxStore};
use sc_client_api::{BlockOf, backend::AuxStore, StorageProvider, Backend, StateBackend};
use sp_blockchain::{HeaderBackend, ProvideCache, well_known_cache_keys::Id as CacheKeyId};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_runtime::generic::OpaqueDigestItemId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_runtime::generic::{OpaqueDigestItemId, BlockId};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, BlakeTwo256, UniqueSaturatedInto, Saturating, One};
use sp_api::ProvideRuntimeApi;
use sp_core::{H256, storage::StorageKey};
use sp_io::hashing::twox_128;
use sp_consensus::{
BlockImportParams, Error as ConsensusError, BlockImport,
BlockCheckParams, ImportResult,
};
use frontier_rpc_primitives::TransactionStatus;
use log::*;
use sc_client_api;

Expand All @@ -57,14 +61,14 @@ impl std::convert::From<Error> for ConsensusError {
}
}

pub struct FrontierBlockImport<B: BlockT, I, C> {
pub struct FrontierBlockImport<B: BlockT, I, C, BE> {
inner: I,
client: Arc<C>,
enabled: bool,
_marker: PhantomData<B>,
_marker: PhantomData<(B, BE)>,
}

impl<Block: BlockT, I: Clone + BlockImport<Block>, C> Clone for FrontierBlockImport<Block, I, C> {
impl<Block: BlockT, I: Clone + BlockImport<Block>, C, BE> Clone for FrontierBlockImport<Block, I, C, BE> {
fn clone(&self) -> Self {
FrontierBlockImport {
inner: self.inner.clone(),
Expand All @@ -75,11 +79,13 @@ impl<Block: BlockT, I: Clone + BlockImport<Block>, C> Clone for FrontierBlockImp
}
}

impl<B, I, C> FrontierBlockImport<B, I, C> where
impl<B, I, C, BE> FrontierBlockImport<B, I, C, BE> where
B: BlockT,
BE: Backend<B>,
BE::State: StateBackend<BlakeTwo256>,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync,
I::Error: Into<ConsensusError>,
C: ProvideRuntimeApi<B> + Send + Sync + HeaderBackend<B> + AuxStore + ProvideCache<B> + BlockOf,
C: ProvideRuntimeApi<B> + Send + Sync + HeaderBackend<B> + AuxStore + ProvideCache<B> + BlockOf + StorageProvider<B,BE>,
C::Api: BlockBuilderApi<B, Error = sp_blockchain::Error>,
{
pub fn new(
Expand All @@ -96,11 +102,13 @@ impl<B, I, C> FrontierBlockImport<B, I, C> where
}
}

impl<B, I, C> BlockImport<B> for FrontierBlockImport<B, I, C> where
impl<B, I, C, BE> BlockImport<B> for FrontierBlockImport<B, I, C, BE> where
B: BlockT,
BE: Backend<B>,
BE::State: StateBackend<BlakeTwo256>,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync,
I::Error: Into<ConsensusError>,
C: ProvideRuntimeApi<B> + Send + Sync + HeaderBackend<B> + AuxStore + ProvideCache<B> + BlockOf,
C: ProvideRuntimeApi<B> + Send + Sync + HeaderBackend<B> + AuxStore + ProvideCache<B> + BlockOf + StorageProvider<B,BE>,
C::Api: BlockBuilderApi<B, Error = sp_blockchain::Error>,
{
type Error = ConsensusError;
Expand Down Expand Up @@ -145,6 +153,20 @@ impl<B, I, C> BlockImport<B> for FrontierBlockImport<B, I, C> where
insert_closure!(),
);
}

// Store already processed TransactionStatus by block number.
if *block.header.number() > One::one() {
let number = UniqueSaturatedInto::<u32>::unique_saturated_into(
block.header.number().saturating_sub(One::one())
);
if let Some(data) = logs(client.as_ref(), number) {
aux_schema::write_logs(
number,
data,
insert_closure!()
);
}
}
},
}
}
Expand All @@ -170,3 +192,33 @@ fn find_frontier_log<B: BlockT>(

Ok(frontier_log.ok_or(Error::NoPostRuntimeLog)?)
}

fn logs<B, BE, C>(
client: &C,
block_number: u32,
) -> Option<(H256, Vec<TransactionStatus>)> where
B: BlockT,
BE: Backend<B>,
BE::State: StateBackend<BlakeTwo256>,
C: HeaderBackend<B> + StorageProvider<B,BE>,
{
if let Ok(Some(header)) = client.header(BlockId::Number(block_number.into()))
{
if let Ok(ConsensusLog::EndBlock { block_hash, .. }) = find_frontier_log::<B>(&header) {
if let Ok(Some(data)) = client.storage(
&BlockId::Number(block_number.into()),
&StorageKey(
storage_prefix_build(b"Ethereum", b"CurrentTransactionStatuses")
)
) {
let statuses: Vec<TransactionStatus> = Decode::decode(&mut &data.0[..]).unwrap();
return Some((block_hash, statuses))
}
}
}
None
}

fn storage_prefix_build(module: &[u8], storage: &[u8]) -> Vec<u8> {
[twox_128(module), twox_128(storage)].concat().to_vec()
}
13 changes: 11 additions & 2 deletions frame/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,17 @@ impl<T: Trait> Module<T> {
}

CurrentBlock::put(block.clone());
CurrentReceipts::put(receipts.clone());
CurrentTransactionStatuses::put(statuses.clone());
if receipts.len() > 0 {
CurrentReceipts::put(receipts.clone());
} else {
CurrentReceipts::kill();
}

if statuses.len() > 0 {
CurrentTransactionStatuses::put(statuses.clone());
} else {
CurrentTransactionStatuses::kill();
}

let digest = DigestItem::<T::Hash>::Consensus(
FRONTIER_ENGINE_ID,
Expand Down
41 changes: 25 additions & 16 deletions rpc/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,13 @@ impl<B, C, P, CT, BE, A> EthApiT for EthApi<B, C, P, CT, BE, A> where
let block: Option<ethereum::Block> = self.current_block(&id);
let statuses: Option<Vec<TransactionStatus>> = self.current_statuses(&id);

if let (Some(block), Some(statuses)) = (block, statuses) {
blocks_and_statuses.push((block, statuses));
let block_hash = Some(H256::from_slice(
Keccak256::digest(&rlp::encode(&block.clone().unwrap().header)).as_slice()
));
let block_number = Some(block.unwrap().header.number);

if let (Some(block_hash), Some(block_number), Some(statuses)) = (block_hash, block_number, statuses) {
blocks_and_statuses.push((block_hash, block_number, statuses));
}
} else {
let mut current_number = filter.to_block
Expand All @@ -848,14 +853,21 @@ impl<B, C, P, CT, BE, A> EthApiT for EthApi<B, C, P, CT, BE, A> where
self.client.info().best_number
);
while current_number >= from_number {
let id = BlockId::Number(current_number);

let block: Option<ethereum::Block> = self.current_block(&id);
let statuses: Option<Vec<TransactionStatus>> = self.current_statuses(&id);

if let (Some(block), Some(statuses)) = (block, statuses) {
blocks_and_statuses.push((block, statuses));
}
let number = UniqueSaturatedInto::<u32>::unique_saturated_into(current_number);

match frontier_consensus::load_logs(
self.client.as_ref(),
number
).map_err(|err| internal_err(format!("fetch aux store failed: {:?}", err)))?
{
Some((block_hash, statuses)) => {
let block_number = U256::from(
UniqueSaturatedInto::<u32>::unique_saturated_into(current_number)
);
blocks_and_statuses.push((block_hash, block_number, statuses));
},
_ => {},
};

if current_number == Zero::zero() {
break
Expand All @@ -868,14 +880,11 @@ impl<B, C, P, CT, BE, A> EthApiT for EthApi<B, C, P, CT, BE, A> where
let mut blocks_processed: u32 = 0;
let mut logs_processed: u32 = 0;

'outer: for (block, statuses) in blocks_and_statuses {
'outer: for (block_hash, block_number, statuses) in blocks_and_statuses {
if blocks_processed == eth_block_limit {
break;
}
let mut block_log_index: u32 = 0;
let block_hash = H256::from_slice(
Keccak256::digest(&rlp::encode(&block.header)).as_slice()
);
for status in statuses.iter() {
let logs = status.logs.clone();
let mut transaction_log_index: u32 = 0;
Expand Down Expand Up @@ -911,8 +920,8 @@ impl<B, C, P, CT, BE, A> EthApiT for EthApi<B, C, P, CT, BE, A> where
address: log.address.clone(),
topics: log.topics.clone(),
data: Bytes(log.data.clone()),
block_hash: Some(block_hash),
block_number: Some(block.header.number.clone()),
block_hash: Some(block_hash.clone()),
block_number: Some(block_number.clone()),
transaction_hash: Some(transaction_hash),
transaction_index: Some(U256::from(status.transaction_index)),
log_index: Some(U256::from(block_log_index)),
Expand Down

0 comments on commit 92a8034

Please sign in to comment.