Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use firehose get_block to get block ptrs for subgraph datasources #5702

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Error;
use ethabi::{Error as ABIError, Function, ParamType, Token};
use graph::blockchain::BlockPtrExt;
use graph::blockchain::ChainIdentifier;
use graph::components::subgraph::MappingError;
use graph::data::store::ethereum::call;
Expand Down Expand Up @@ -1110,13 +1109,6 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block_hash: H256,
) -> Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;

async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_chain_store: Arc<dyn ChainStore>,
_block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send>;

/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
/// May use the `chain_store` as a cache.
async fn load_blocks(
Expand Down
151 changes: 131 additions & 20 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use graph::components::store::{DeploymentCursorTracker, WritableStore};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
use graph::futures03::compat::Future01CompatExt;
use graph::futures03::TryStreamExt;
use graph::prelude::{
BlockHash, ComponentLoggerConfig, DeploymentHash, ElasticComponentLoggerConfig, EthereumBlock,
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
};
use graph::schema::InputSchema;
use graph::slog::{debug, error, warn};
use graph::substreams::Clock;
use graph::{
blockchain::{
Expand Down Expand Up @@ -243,7 +245,7 @@ impl BlockRefetcher<Chain> for EthereumBlockRefetcher {
logger: &Logger,
cursor: FirehoseCursor,
) -> Result<BlockFinality, Error> {
let endpoint = chain.chain_client().firehose_endpoint().await?;
let endpoint: Arc<FirehoseEndpoint> = chain.chain_client().firehose_endpoint().await?;
let block = endpoint.get_block::<codec::Block>(cursor, logger).await?;
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
Ok(BlockFinality::NonFinal(ethereum_block))
Expand Down Expand Up @@ -714,13 +716,17 @@ impl Block for BlockFinality {
}

fn timestamp(&self) -> BlockTime {
let ts = match self {
BlockFinality::Final(block) => block.timestamp,
BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp,
match self {
BlockFinality::Final(block) => {
let ts = i64::try_from(block.timestamp.as_u64()).unwrap();
BlockTime::since_epoch(ts, 0)
}
BlockFinality::NonFinal(block) => {
let ts = i64::try_from(block.ethereum_block.block.timestamp.as_u64()).unwrap();
BlockTime::since_epoch(ts, 0)
}
BlockFinality::Ptr(block) => block.timestamp,
};
let ts = i64::try_from(ts.as_u64()).unwrap();
BlockTime::since_epoch(ts, 0)
}
}
}

Expand All @@ -735,6 +741,61 @@ pub struct TriggersAdapter {
unified_api_version: UnifiedMappingApiVersion,
}

/// Fetches blocks from the cache based on block numbers, excluding duplicates
/// (i.e., multiple blocks for the same number), and identifying missing blocks that
/// need to be fetched via RPC/Firehose. Returns a tuple of the found blocks and the missing block numbers.
async fn fetch_unique_blocks_from_cache(
logger: &Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
) -> (Vec<Arc<BlockPtrExt>>, Vec<i32>) {
// Load blocks from the cache
let blocks_map = chain_store
.cheap_clone()
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
.await
.map_err(|e| {
error!(logger, "Error accessing block cache {}", e);
e
})
.unwrap_or_default();

// Collect blocks and filter out ones with multiple entries
let blocks: Vec<Arc<BlockPtrExt>> = blocks_map
.into_iter()
.filter_map(|(number, values)| {
if values.len() == 1 {
Some(Arc::new(values[0].clone()))
} else {
warn!(
logger,
"Expected one block for block number {:?}, found {}",
number,
values.len()
);
None
}
})
.collect();

// Identify missing blocks
let missing_blocks: Vec<i32> = block_numbers
.into_iter()
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
.collect();

if !missing_blocks.is_empty() {
debug!(
logger,
"Loading {} block(s) not in the block cache",
missing_blocks.len()
);
debug!(logger, "Missing blocks {:?}", missing_blocks);
}

(blocks, missing_blocks)
}

#[async_trait]
impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn scan_triggers(
Expand Down Expand Up @@ -764,21 +825,71 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
logger: Logger,
block_numbers: HashSet<BlockNumber>,
) -> Result<Vec<BlockFinality>> {
use graph::futures01::stream::Stream;
let blocks = match &*self.chain_client {
ChainClient::Firehose(endpoints, _) => {
let endpoint = endpoints.endpoint().await?;
let chain_store = self.chain_store.clone();

// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
// that we can fetch the right block from the RPC.
let (mut cached_blocks, missing_block_numbers) =
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;

// Then fetch missing blocks from RPC
if !missing_block_numbers.is_empty() {
let missing_blocks = endpoint
.load_blocks_by_numbers::<codec::Block>(
missing_block_numbers.iter().map(|&n| n as u64).collect(),
&logger,
)
.await?
.into_iter()
.map(|block| {
let block: BlockPtrExt = BlockPtrExt {
hash: block.hash(),
number: block.number(),
parent_hash: block.parent_hash().unwrap_or_default(),
timestamp: block.timestamp(),
};
Arc::new(block)
})
.collect::<Vec<_>>();

// Combine cached and newly fetched blocks
cached_blocks.extend(missing_blocks);
cached_blocks.sort_by_key(|block| block.number);
}

let adapter = self
.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?;
vec![]
}
ChainClient::Rpc(client) => {
let adapter = client.cheapest_with(&self.capabilities).await?;
let chain_store = self.chain_store.clone();

// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
// that we can fetch the right block from the RPC.
let (mut cached_blocks, missing_block_numbers) =
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;

// Then fetch missing blocks from RPC
if !missing_block_numbers.is_empty() {
let missing_blocks: Vec<Arc<BlockPtrExt>> = adapter
.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_block_numbers)
.try_collect()
.await?;

// Combine cached and newly fetched blocks
cached_blocks.extend(missing_blocks);
cached_blocks.sort_by_key(|block| block.number);
}

let blocks = adapter
.load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers)
.await
.map(|block| BlockFinality::Ptr(block))
.collect()
.compat()
.await?;
// Convert to BlockFinality
let blocks: Vec<BlockFinality> =
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();

blocks
}
};

Ok(blocks)
}
Expand Down
140 changes: 44 additions & 96 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,50 +784,59 @@ impl EthereumAdapter {
}

/// Request blocks by number through JSON-RPC.
fn load_block_ptrs_by_numbers_rpc(
pub fn load_block_ptrs_by_numbers_rpc(
&self,
logger: Logger,
numbers: Vec<BlockNumber>,
) -> impl Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send {
) -> impl futures03::Stream<Item = Result<Arc<BlockPtrExt>, Error>> + Send {
let web3 = self.web3.clone();

stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
futures03::stream::iter(numbers.into_iter().map(move |number| {
let web3 = web3.clone();
retry(format!("load block {}", number), &logger)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Box::pin(
web3.eth()
.block(BlockId::Number(Web3BlockNumber::Number(number.into()))),
)
.compat()
.from_err::<Error>()
.and_then(move |block| {
block
.map(|block| {
let ptr = BlockPtrExt::try_from((
block.hash,
block.number,
block.parent_hash,
block.timestamp,
))
.unwrap();

Arc::new(ptr)
})
.ok_or_else(|| {
anyhow::anyhow!(
let logger = logger.clone();

async move {
retry(format!("load block {}", number), &logger)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
let web3 = web3.clone(); // Clone web3 again for the inner closure

async move {
let block_result = web3
.eth()
.block(BlockId::Number(Web3BlockNumber::Number(number.into())))
.await;

match block_result {
Ok(Some(block)) => {
let ptr = BlockPtrExt::try_from((
block.hash,
block.number,
block.parent_hash,
block.timestamp,
))
.map_err(|e| {
anyhow::anyhow!("Failed to convert block: {}", e)
})?;
Ok(Arc::new(ptr))
}
Ok(None) => Err(anyhow::anyhow!(
"Ethereum node did not find block with number {:?}",
number
)
})
)),
Err(e) => Err(anyhow::anyhow!("Failed to fetch block: {}", e)),
}
}
})
.compat()
})
.boxed()
.compat()
.from_err()
.await
.map_err(|e| match e {
TimeoutError::Elapsed => {
anyhow::anyhow!("Timeout while fetching block {}", number)
}
TimeoutError::Inner(e) => e,
})
}
}))
.buffered(ENV_VARS.block_ptr_batch_size)
}
Expand Down Expand Up @@ -1700,67 +1709,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
Ok(decoded)
}

/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
async fn load_block_ptrs_by_numbers(
&self,
logger: Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send> {
let blocks_map = chain_store
.cheap_clone()
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
.await
.map_err(|e| {
error!(&logger, "Error accessing block cache {}", e);
e
})
.unwrap_or_default();

let mut blocks: Vec<Arc<BlockPtrExt>> = blocks_map
.into_iter()
.filter_map(|(number, values)| {
if values.len() == 1 {
Arc::new(values[0].clone()).into()
} else {
warn!(
&logger,
"Expected one block for block number {:?}, found {}",
number,
values.len()
);
None
}
})
.collect::<Vec<_>>();

let missing_blocks: Vec<i32> = block_numbers
.into_iter()
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
.collect();

if !missing_blocks.is_empty() {
debug!(
logger,
"Loading {} block(s) not in the block cache",
missing_blocks.len()
);

debug!(logger, "Missing blocks {:?}", missing_blocks);
}

Box::new(
self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks)
.collect()
.map(move |new_blocks| {
blocks.extend(new_blocks);
blocks.sort_by_key(|block| block.number);
stream::iter_ok(blocks)
})
.flatten_stream(),
)
}

/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
async fn load_blocks(
&self,
Expand Down
Loading
Loading