diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index c969f5521d9..f78ff1b0bec 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1,6 +1,5 @@ use anyhow::Error; use ethabi::{Error as ABIError, Function, ParamType, Token}; -use graph::blockchain::BlockPtrExt; use graph::blockchain::ChainIdentifier; use graph::components::subgraph::MappingError; use graph::data::store::ethereum::call; @@ -1110,13 +1109,6 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_hash: H256, ) -> Box + Send>; - async fn load_block_ptrs_by_numbers( - &self, - _logger: Logger, - _chain_store: Arc, - _block_numbers: HashSet, - ) -> Box, Error = Error> + Send>; - /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. async fn load_blocks( diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 4e4f26fd9b1..0b689822ec8 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -11,11 +11,13 @@ use graph::components::store::{DeploymentCursorTracker, WritableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::firehose::{FirehoseEndpoint, ForkStep}; use graph::futures03::compat::Future01CompatExt; +use graph::futures03::TryStreamExt; use graph::prelude::{ BlockHash, ComponentLoggerConfig, DeploymentHash, ElasticComponentLoggerConfig, EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, }; use graph::schema::InputSchema; +use graph::slog::{debug, error, warn}; use graph::substreams::Clock; use graph::{ blockchain::{ @@ -243,7 +245,7 @@ impl BlockRefetcher for EthereumBlockRefetcher { logger: &Logger, cursor: FirehoseCursor, ) -> Result { - let endpoint = chain.chain_client().firehose_endpoint().await?; + let endpoint: Arc = chain.chain_client().firehose_endpoint().await?; let block = endpoint.get_block::(cursor, logger).await?; let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?; Ok(BlockFinality::NonFinal(ethereum_block)) @@ -714,13 +716,17 @@ impl Block for BlockFinality { } fn timestamp(&self) -> BlockTime { - let ts = match self { - BlockFinality::Final(block) => block.timestamp, - BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp, + match self { + BlockFinality::Final(block) => { + let ts = i64::try_from(block.timestamp.as_u64()).unwrap(); + BlockTime::since_epoch(ts, 0) + } + BlockFinality::NonFinal(block) => { + let ts = i64::try_from(block.ethereum_block.block.timestamp.as_u64()).unwrap(); + BlockTime::since_epoch(ts, 0) + } BlockFinality::Ptr(block) => block.timestamp, - }; - let ts = i64::try_from(ts.as_u64()).unwrap(); - BlockTime::since_epoch(ts, 0) + } } } @@ -735,6 +741,61 @@ pub struct TriggersAdapter { unified_api_version: UnifiedMappingApiVersion, } +/// Fetches blocks from the cache based on block numbers, excluding duplicates +/// (i.e., multiple blocks for the same number), and identifying missing blocks that +/// need to be fetched via RPC/Firehose. Returns a tuple of the found blocks and the missing block numbers. +async fn fetch_unique_blocks_from_cache( + logger: &Logger, + chain_store: Arc, + block_numbers: HashSet, +) -> (Vec>, Vec) { + // Load blocks from the cache + let blocks_map = chain_store + .cheap_clone() + .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) + .await + .map_err(|e| { + error!(logger, "Error accessing block cache {}", e); + e + }) + .unwrap_or_default(); + + // Collect blocks and filter out ones with multiple entries + let blocks: Vec> = blocks_map + .into_iter() + .filter_map(|(number, values)| { + if values.len() == 1 { + Some(Arc::new(values[0].clone())) + } else { + warn!( + logger, + "Expected one block for block number {:?}, found {}", + number, + values.len() + ); + None + } + }) + .collect(); + + // Identify missing blocks + let missing_blocks: Vec = block_numbers + .into_iter() + .filter(|&number| !blocks.iter().any(|block| block.block_number() == number)) + .collect(); + + if !missing_blocks.is_empty() { + debug!( + logger, + "Loading {} block(s) not in the block cache", + missing_blocks.len() + ); + debug!(logger, "Missing blocks {:?}", missing_blocks); + } + + (blocks, missing_blocks) +} + #[async_trait] impl TriggersAdapterTrait for TriggersAdapter { async fn scan_triggers( @@ -764,21 +825,71 @@ impl TriggersAdapterTrait for TriggersAdapter { logger: Logger, block_numbers: HashSet, ) -> Result> { - use graph::futures01::stream::Stream; + let blocks = match &*self.chain_client { + ChainClient::Firehose(endpoints, _) => { + let endpoint = endpoints.endpoint().await?; + let chain_store = self.chain_store.clone(); + + // Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so + // that we can fetch the right block from the RPC. + let (mut cached_blocks, missing_block_numbers) = + fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await; + + // Then fetch missing blocks from RPC + if !missing_block_numbers.is_empty() { + let missing_blocks = endpoint + .load_blocks_by_numbers::( + missing_block_numbers.iter().map(|&n| n as u64).collect(), + &logger, + ) + .await? + .into_iter() + .map(|block| { + let block: BlockPtrExt = BlockPtrExt { + hash: block.hash(), + number: block.number(), + parent_hash: block.parent_hash().unwrap_or_default(), + timestamp: block.timestamp(), + }; + Arc::new(block) + }) + .collect::>(); + + // Combine cached and newly fetched blocks + cached_blocks.extend(missing_blocks); + cached_blocks.sort_by_key(|block| block.number); + } - let adapter = self - .chain_client - .rpc()? - .cheapest_with(&self.capabilities) - .await?; + vec![] + } + ChainClient::Rpc(client) => { + let adapter = client.cheapest_with(&self.capabilities).await?; + let chain_store = self.chain_store.clone(); + + // Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so + // that we can fetch the right block from the RPC. + let (mut cached_blocks, missing_block_numbers) = + fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await; + + // Then fetch missing blocks from RPC + if !missing_block_numbers.is_empty() { + let missing_blocks: Vec> = adapter + .load_block_ptrs_by_numbers_rpc(logger.clone(), missing_block_numbers) + .try_collect() + .await?; + + // Combine cached and newly fetched blocks + cached_blocks.extend(missing_blocks); + cached_blocks.sort_by_key(|block| block.number); + } - let blocks = adapter - .load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers) - .await - .map(|block| BlockFinality::Ptr(block)) - .collect() - .compat() - .await?; + // Convert to BlockFinality + let blocks: Vec = + cached_blocks.into_iter().map(BlockFinality::Ptr).collect(); + + blocks + } + }; Ok(blocks) } diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 906aed29d09..afb5c22f66d 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -784,50 +784,59 @@ impl EthereumAdapter { } /// Request blocks by number through JSON-RPC. - fn load_block_ptrs_by_numbers_rpc( + pub fn load_block_ptrs_by_numbers_rpc( &self, logger: Logger, numbers: Vec, - ) -> impl Stream, Error = Error> + Send { + ) -> impl futures03::Stream, Error>> + Send { let web3 = self.web3.clone(); - stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| { + futures03::stream::iter(numbers.into_iter().map(move |number| { let web3 = web3.clone(); - retry(format!("load block {}", number), &logger) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - Box::pin( - web3.eth() - .block(BlockId::Number(Web3BlockNumber::Number(number.into()))), - ) - .compat() - .from_err::() - .and_then(move |block| { - block - .map(|block| { - let ptr = BlockPtrExt::try_from(( - block.hash, - block.number, - block.parent_hash, - block.timestamp, - )) - .unwrap(); - - Arc::new(ptr) - }) - .ok_or_else(|| { - anyhow::anyhow!( + let logger = logger.clone(); + + async move { + retry(format!("load block {}", number), &logger) + .limit(ENV_VARS.request_retries) + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.clone(); // Clone web3 again for the inner closure + + async move { + let block_result = web3 + .eth() + .block(BlockId::Number(Web3BlockNumber::Number(number.into()))) + .await; + + match block_result { + Ok(Some(block)) => { + let ptr = BlockPtrExt::try_from(( + block.hash, + block.number, + block.parent_hash, + block.timestamp, + )) + .map_err(|e| { + anyhow::anyhow!("Failed to convert block: {}", e) + })?; + Ok(Arc::new(ptr)) + } + Ok(None) => Err(anyhow::anyhow!( "Ethereum node did not find block with number {:?}", number - ) - }) + )), + Err(e) => Err(anyhow::anyhow!("Failed to fetch block: {}", e)), + } + } }) - .compat() - }) - .boxed() - .compat() - .from_err() + .await + .map_err(|e| match e { + TimeoutError::Elapsed => { + anyhow::anyhow!("Timeout while fetching block {}", number) + } + TimeoutError::Inner(e) => e, + }) + } })) .buffered(ENV_VARS.block_ptr_batch_size) } @@ -1700,67 +1709,6 @@ impl EthereumAdapterTrait for EthereumAdapter { Ok(decoded) } - /// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream. - async fn load_block_ptrs_by_numbers( - &self, - logger: Logger, - chain_store: Arc, - block_numbers: HashSet, - ) -> Box, Error = Error> + Send> { - let blocks_map = chain_store - .cheap_clone() - .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) - .await - .map_err(|e| { - error!(&logger, "Error accessing block cache {}", e); - e - }) - .unwrap_or_default(); - - let mut blocks: Vec> = blocks_map - .into_iter() - .filter_map(|(number, values)| { - if values.len() == 1 { - Arc::new(values[0].clone()).into() - } else { - warn!( - &logger, - "Expected one block for block number {:?}, found {}", - number, - values.len() - ); - None - } - }) - .collect::>(); - - let missing_blocks: Vec = block_numbers - .into_iter() - .filter(|&number| !blocks.iter().any(|block| block.block_number() == number)) - .collect(); - - if !missing_blocks.is_empty() { - debug!( - logger, - "Loading {} block(s) not in the block cache", - missing_blocks.len() - ); - - debug!(logger, "Missing blocks {:?}", missing_blocks); - } - - Box::new( - self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks) - .collect() - .map(move |new_blocks| { - blocks.extend(new_blocks); - blocks.sort_by_key(|block| block.number); - stream::iter_ok(blocks) - }) - .flatten_stream(), - ) - } - /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. async fn load_blocks( &self, diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 2fa04a6e41c..7186dccf42b 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -362,7 +362,7 @@ pub struct BlockPtrExt { #[serde(deserialize_with = "deserialize_block_number")] pub number: BlockNumber, pub parent_hash: BlockHash, - pub timestamp: U256, + pub timestamp: BlockTime, } impl BlockPtrExt { @@ -370,7 +370,7 @@ impl BlockPtrExt { hash: BlockHash, number: BlockNumber, parent_hash: BlockHash, - timestamp: U256, + timestamp: BlockTime, ) -> Self { Self { hash, @@ -464,7 +464,7 @@ impl TryFrom<(Option, Option, H256, U256)> for BlockPtrExt { type Error = anyhow::Error; fn try_from(tuple: (Option, Option, H256, U256)) -> Result { - let (hash_opt, number_opt, parent_hash, timestamp) = tuple; + let (hash_opt, number_opt, parent_hash, timestamp_u256) = tuple; let hash = hash_opt.ok_or_else(|| anyhow!("Block hash is missing"))?; let number = number_opt @@ -474,11 +474,16 @@ impl TryFrom<(Option, Option, H256, U256)> for BlockPtrExt { let block_number = i32::try_from(number).map_err(|_| anyhow!("Block number out of range"))?; + // Convert `U256` to `BlockTime` + let secs = + i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?; + let block_time = BlockTime::since_epoch(secs, 0); + Ok(BlockPtrExt { hash: hash.into(), number: block_number, parent_hash: parent_hash.into(), - timestamp, + timestamp: block_time, }) } } @@ -487,13 +492,18 @@ impl TryFrom<(H256, i32, H256, U256)> for BlockPtrExt { type Error = anyhow::Error; fn try_from(tuple: (H256, i32, H256, U256)) -> Result { - let (hash, block_number, parent_hash, timestamp) = tuple; + let (hash, block_number, parent_hash, timestamp_u256) = tuple; + + // Convert `U256` to `BlockTime` + let secs = + i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?; + let block_time = BlockTime::since_epoch(secs, 0); Ok(BlockPtrExt { hash: hash.into(), number: block_number, parent_hash: parent_hash.into(), - timestamp, + timestamp: block_time, }) } } @@ -543,7 +553,9 @@ impl fmt::Display for ChainIdentifier { /// The timestamp associated with a block. This is used whenever a time /// needs to be connected to data within the block -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression, Deserialize, +)] #[diesel(sql_type = Timestamptz)] pub struct BlockTime(Timestamp); @@ -619,6 +631,12 @@ impl ToSql for BlockTime { } } +impl fmt::Display for BlockTime { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0.as_microseconds_since_epoch()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index ef00ec53c03..b34fff1d860 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -25,7 +25,7 @@ use futures03::StreamExt; use http0::uri::{Scheme, Uri}; use itertools::Itertools; use prost::Message; -use slog::Logger; +use slog::{error, Logger}; use std::{ collections::HashMap, fmt::Display, marker::PhantomData, ops::ControlFlow, str::FromStr, sync::Arc, time::Duration, @@ -509,6 +509,75 @@ impl FirehoseEndpoint { } } + pub async fn get_block_by_number( + &self, + number: u64, + logger: &Logger, + ) -> Result + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + debug!( + logger, + "Connecting to firehose to retrieve block for number {}", number; + "provider" => self.provider.as_str(), + ); + + let req = firehose::SingleBlockRequest { + transforms: [].to_vec(), + reference: Some(firehose::single_block_request::Reference::BlockNumber( + firehose::single_block_request::BlockNumber { num: number }, + )), + }; + + let mut client = self.new_client(); + match client.block(req).await { + Ok(v) => Ok(M::decode( + v.get_ref().block.as_ref().unwrap().value.as_ref(), + )?), + Err(e) => return Err(anyhow::format_err!("firehose error {}", e)), + } + } + + pub async fn load_blocks_by_numbers( + &self, + numbers: Vec, + logger: &Logger, + ) -> Result, anyhow::Error> + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + let mut blocks = Vec::with_capacity(numbers.len()); + + for number in numbers { + debug!( + logger, + "Loading block for block number {}", number; + "provider" => self.provider.as_str(), + ); + + match self.get_block_by_number::(number, logger).await { + Ok(block) => { + blocks.push(block); + } + Err(e) => { + error!( + logger, + "Failed to load block number {}: {}", number, e; + "provider" => self.provider.as_str(), + ); + return Err(anyhow::format_err!( + "failed to load block number {}: {}", + number, + e + )); + } + } + } + + Ok(blocks) + } + pub async fn genesis_block_ptr(&self, logger: &Logger) -> Result where M: prost::Message + BlockchainBlock + Default + 'static,