Skip to content

Commit

Permalink
chain/ethereum: refactor load_block_ptrs_by_numbers and add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Nov 22, 2024
1 parent 55ff516 commit f95df0a
Showing 1 changed file with 162 additions and 27 deletions.
189 changes: 162 additions & 27 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,32 @@ async fn fetch_unique_blocks_from_cache(
(blocks, missing_blocks)
}

/// Fetches blocks by their numbers, first attempting to load from cache.
/// Missing blocks are retrieved from an external source, with all blocks sorted and converted to `BlockFinality` format.
async fn load_blocks<F, Fut>(
logger: &Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
fetch_missing: F,
) -> Result<Vec<BlockFinality>>
where
F: FnOnce(Vec<BlockNumber>) -> Fut,
Fut: Future<Output = Result<Vec<Arc<ExtendedBlockPtr>>>>,
{
// Fetch cached blocks and identify missing ones
let (mut cached_blocks, missing_block_numbers) =
fetch_unique_blocks_from_cache(logger, chain_store, block_numbers).await;

// Fetch missing blocks if any
if !missing_block_numbers.is_empty() {
let missing_blocks = fetch_missing(missing_block_numbers).await?;
cached_blocks.extend(missing_blocks);
cached_blocks.sort_by_key(|block| block.number);
}

Ok(cached_blocks.into_iter().map(BlockFinality::Ptr).collect())
}

#[async_trait]
impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn scan_triggers(
Expand Down Expand Up @@ -819,32 +845,6 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
logger: Logger,
block_numbers: HashSet<BlockNumber>,
) -> Result<Vec<BlockFinality>> {
// Common function to handle block loading, regardless of source
async fn load_blocks<F, Fut>(
logger: &Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
fetch_missing: F,
) -> Result<Vec<BlockFinality>>
where
F: FnOnce(Vec<BlockNumber>) -> Fut,
Fut: Future<Output = Result<Vec<Arc<BlockPtrExt>>>>,
{
// Fetch cached blocks and identify missing ones
let (mut cached_blocks, missing_block_numbers) =
fetch_unique_blocks_from_cache(logger, chain_store, block_numbers).await;

// Fetch missing blocks if any
if !missing_block_numbers.is_empty() {
let missing_blocks = fetch_missing(missing_block_numbers).await?;
cached_blocks.extend(missing_blocks);
cached_blocks.sort_by_key(|block| block.number);
}

// Convert to BlockFinality
Ok(cached_blocks.into_iter().map(BlockFinality::Ptr).collect())
}

match &*self.chain_client {
ChainClient::Firehose(endpoints, _) => {
trace!(
Expand All @@ -870,7 +870,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
.await?
.into_iter()
.map(|block| {
Arc::new(BlockPtrExt {
Arc::new(ExtendedBlockPtr {
hash: block.hash(),
number: block.number(),
parent_hash: block.parent_hash().unwrap_or_default(),
Expand Down Expand Up @@ -1193,3 +1193,138 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
.await
}
}

#[cfg(test)]
mod tests {
use graph::blockchain::mock::MockChainStore;
use graph::{slog, tokio};

use super::*;
use std::collections::HashSet;
use std::sync::Arc;

// Helper function to create test blocks
fn create_test_block(number: BlockNumber, hash: &str) -> ExtendedBlockPtr {
let hash = BlockHash(hash.as_bytes().to_vec().into_boxed_slice());
let ptr = BlockPtr::new(hash.clone(), number);
ExtendedBlockPtr {
hash,
number,
parent_hash: BlockHash(vec![0; 32].into_boxed_slice()),
timestamp: BlockTime::for_test(&ptr),
}
}

#[tokio::test]
async fn test_fetch_unique_blocks_single_block() {
let logger = Logger::root(slog::Discard, o!());
let mut chain_store = MockChainStore::default();

// Add a single block
let block = create_test_block(1, "block1");
chain_store.blocks.insert(1, vec![block.clone()]);

let block_numbers: HashSet<_> = vec![1].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;

assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].number, 1);
assert!(missing.is_empty());
}

#[tokio::test]
async fn test_fetch_unique_blocks_duplicate_blocks() {
let logger = Logger::root(slog::Discard, o!());
let mut chain_store = MockChainStore::default();

// Add multiple blocks for the same number
let block1 = create_test_block(1, "block1a");
let block2 = create_test_block(1, "block1b");
chain_store
.blocks
.insert(1, vec![block1.clone(), block2.clone()]);

let block_numbers: HashSet<_> = vec![1].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;

// Should filter out the duplicate block
assert!(blocks.is_empty());
assert_eq!(missing, vec![1]);
assert_eq!(missing[0], 1);
}

#[tokio::test]
async fn test_fetch_unique_blocks_missing_blocks() {
let logger = Logger::root(slog::Discard, o!());
let mut chain_store = MockChainStore::default();

// Add block number 1 but not 2
let block = create_test_block(1, "block1");
chain_store.blocks.insert(1, vec![block.clone()]);

let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;

assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].number, 1);
assert_eq!(missing, vec![2]);
}

#[tokio::test]
async fn test_fetch_unique_blocks_multiple_valid_blocks() {
let logger = Logger::root(slog::Discard, o!());
let mut chain_store = MockChainStore::default();

// Add multiple valid blocks
let block1 = create_test_block(1, "block1");
let block2 = create_test_block(2, "block2");
chain_store.blocks.insert(1, vec![block1.clone()]);
chain_store.blocks.insert(2, vec![block2.clone()]);

let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;

assert_eq!(blocks.len(), 2);
assert!(blocks.iter().any(|b| b.number == 1));
assert!(blocks.iter().any(|b| b.number == 2));
assert!(missing.is_empty());
}

#[tokio::test]
async fn test_fetch_unique_blocks_mixed_scenario() {
let logger = Logger::root(slog::Discard, o!());
let mut chain_store = MockChainStore::default();

// Add a mix of scenarios:
// - Block 1: Single valid block
// - Block 2: Multiple blocks (duplicate)
// - Block 3: Missing
let block1 = create_test_block(1, "block1");
let block2a = create_test_block(2, "block2a");
let block2b = create_test_block(2, "block2b");

chain_store.blocks.insert(1, vec![block1.clone()]);
chain_store
.blocks
.insert(2, vec![block2a.clone(), block2b.clone()]);

let block_numbers: HashSet<_> = vec![1, 2, 3].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;

assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].number, 1);
assert_eq!(missing.len(), 2);
assert!(missing.contains(&2));
assert!(missing.contains(&3));
}
}

0 comments on commit f95df0a

Please sign in to comment.