From f189986bde04c120a15157d180b8bc726d6953e5 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 24 Dec 2024 18:59:46 +0100 Subject: [PATCH] concurrently fetch all blocks in a range --- core/chains/evm/logpoller/log_poller.go | 117 +++++++++++++++---- core/chains/evm/logpoller/log_poller_test.go | 22 ++-- core/chains/evm/logpoller/orm.go | 1 + core/chains/evm/logpoller/orm_test.go | 73 ++++++++---- 4 files changed, 156 insertions(+), 57 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index eeebc3d7f26..7c8482cef93 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -115,7 +115,7 @@ type logPoller struct { finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized. If `useFinalityTag` is set to true, this value is ignored, because finalityDepth is fetched from chain keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database backfillBatchSize int64 // batch size to use when backfilling finalized logs - rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks + rpcBatchSize int // batch size to use for fallback RPC calls made in GetBlocks logPrunePageSize int64 clientErrors config.ClientErrors backupPollerNextBlock int64 // next block to be processed by Backup LogPoller @@ -176,7 +176,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke finalityDepth: opts.FinalityDepth, useFinalityTag: opts.UseFinalityTag, backfillBatchSize: opts.BackfillBatchSize, - rpcBatchSize: opts.RpcBatchSize, + rpcBatchSize: int(opts.RpcBatchSize), keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, logPrunePageSize: opts.LogPrunePageSize, clientErrors: opts.ClientErrors, @@ -850,15 +850,15 @@ func convertTopics(topics []common.Hash) [][]byte { return topicsForDB } -// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber, -// whether or not there are any logs in the list from that block -func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) { - var numbers []uint64 - for _, log := range logs { - numbers = append(numbers, log.BlockNumber) - } - if numbers[len(numbers)-1] != endBlockNumber { - numbers = append(numbers, endBlockNumber) +// blocksForLogs fetches all the blocks starting from block of the first log to the endBlockNumber. +// endBlockNumber is always fetched even if logs slice is empty. +func (lp *logPoller) blocksForLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) { + numbers := []uint64{endBlockNumber} + if len(logs) > 0 { + numbers = make([]uint64, 0, endBlockNumber-logs[0].BlockNumber+1) + for i := logs[0].BlockNumber; i <= endBlockNumber; i++ { + numbers = append(numbers, i) + } } return lp.GetBlocksRange(ctx, numbers) } @@ -891,7 +891,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { if len(gethLogs) == 0 { continue } - blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to)) + blocks, err := lp.blocksForLogs(ctx, gethLogs, uint64(to)) if err != nil { return err } @@ -1366,6 +1366,7 @@ func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]Lo // fillRemainingBlocksFromRPC sends a batch request for each block in blocksRequested, and converts them from // geth blocks into LogPollerBlock structs. This is only intended to be used for requesting finalized blocks, // if any of the blocks coming back are not finalized, an error will be returned +// NOTE: Does not guarantee to keep return blocks in the same order as requested func (lp *logPoller) fillRemainingBlocksFromRPC( ctx context.Context, blocksRequested map[uint64]struct{}, @@ -1379,7 +1380,7 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( } } - if len(remainingBlocks) > 0 { + if len(remainingBlocks) == 0 { lp.lggr.Debugw("Falling back to RPC for blocks not found in log poller blocks table", "remainingBlocks", remainingBlocks) } @@ -1394,6 +1395,7 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( logPollerBlocks[uint64(head.Number)] = LogPollerBlock{ EvmChainId: head.EVMChainID, BlockHash: head.Hash, + ParentBlockHash: &head.ParentHash, BlockNumber: head.Number, BlockTimestamp: head.Timestamp, FinalizedBlockNumber: head.Number, // always finalized; only matters if this block is returned by LatestBlock() @@ -1482,24 +1484,97 @@ func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, return blocks, nil } -func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { - var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1) - +func (lp *logPoller) startBlocksFetching(ctx context.Context, wg *sync.WaitGroup, requests <-chan []string, results chan []*evmtypes.Head, errs chan<- error) { + defer wg.Done() validationReq := finalizedBlock if !lp.useFinalityTag { validationReq = latestBlock } + for { + select { + case <-ctx.Done(): + return + case request, ok := <-requests: + if !ok { + return + } + result, err := lp.fetchBlocks(ctx, request, validationReq) + if err != nil { + select { + case errs <- fmt.Errorf("failed to fetch blocks %v: %w", request, err): + return + case <-ctx.Done(): + return + } + } + + select { + case results <- result: + continue + case <-ctx.Done(): + return + } + } + } +} + +// batchFetchBlocks - fetches blocks concurrently each request if of batchSize. +// NOTE: Does not guarantee to keep return blocks in the same order as requested +func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int) ([]*evmtypes.Head, error) { + if len(blocksRequested) == 0 { + return []*evmtypes.Head{}, nil + } - for i := 0; i < len(blocksRequested); i += int(batchSize) { - j := i + int(batchSize) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + const fetchers = 10 + requests := make(chan []string, len(blocksRequested)/batchSize+1) + // As we do not expect large number of batches, we can schedule all work at once to improve readability + for i := 0; i < len(blocksRequested); i += batchSize { + j := i + batchSize if j > len(blocksRequested) { j = len(blocksRequested) } - moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq) - if err != nil { + requests <- blocksRequested[i:j] + } + close(requests) + + // do work + var fetchingWg sync.WaitGroup + fetchingWg.Add(fetchers) + results := make(chan []*evmtypes.Head, fetchers) + errs := make(chan error) + for range fetchers { + go lp.startBlocksFetching(ctx, &fetchingWg, requests, results, errs) + } + + // signal when done + lp.wg.Add(1) + go func() { + defer lp.wg.Done() + fetchingWg.Wait() + close(results) + }() + + blocks := make([]*evmtypes.Head, 0, len(blocksRequested)) +readLoop: + for { + select { + case err := <-errs: + // all child goroutine are stopped on exit return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + case result, ok := <-results: + if !ok { + break readLoop + } + blocks = append(blocks, result...) } - blocks = append(blocks, moreBlocks...) + } + + if ctx.Err() != nil { + return nil, ctx.Err() } return blocks, nil diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 5a9b3ee3db6..e003bccc3df 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1316,7 +1316,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { UseFinalityTag: false, FinalityDepth: 2, BackfillBatchSize: 3, - RpcBatchSize: 2, + RpcBatchSize: 1, KeepFinalizedBlocksDepth: 1000, } th := SetupTH(t, lpOpts) @@ -1340,13 +1340,13 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { blockNums := []uint64{} blocks, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.NoError(t, err) - assert.Equal(t, 0, len(blocks)) + assert.Empty(t, blocks) // LP retrieves block 1 blockNums = []uint64{1} blocks, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.NoError(t, err) - assert.Equal(t, 1, len(blocks)) + require.Len(t, blocks, 1) assert.Equal(t, 1, int(blocks[0].BlockNumber)) assert.Equal(t, 1, int(blocks[0].FinalizedBlockNumber)) @@ -1354,7 +1354,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { blockNums = []uint64{2} _, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.Error(t, err) - assert.Equal(t, "Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error()) + assert.Equal(t, "failed to fetch blocks [0x2]: Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error()) th.Backend.Commit() // Commit block #4, so that block #2 is finalized @@ -1365,7 +1365,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { // getBlocksRange is able to retrieve block 2 by calling RPC rpcBlocks, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.NoError(t, err) - assert.Equal(t, 1, len(rpcBlocks)) + require.Len(t, rpcBlocks, 1) assert.Equal(t, 2, int(rpcBlocks[0].BlockNumber)) assert.Equal(t, 2, int(rpcBlocks[0].FinalizedBlockNumber)) @@ -1379,7 +1379,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { blockNums2 := []uint64{1, 3} rpcBlocks2, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums2) require.NoError(t, err) - assert.Equal(t, 2, len(rpcBlocks2)) + require.Len(t, rpcBlocks2, 2) assert.Equal(t, 1, int(rpcBlocks2[0].BlockNumber)) assert.Equal(t, 3, int(rpcBlocks2[1].BlockNumber)) assert.Equal(t, 3, int(rpcBlocks2[1].FinalizedBlockNumber)) @@ -1393,7 +1393,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { // getBlocksRange should still be able to return block 2 by fetching from DB lpBlocks, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.NoError(t, err) - assert.Equal(t, 1, len(lpBlocks)) + require.Len(t, lpBlocks, 1) assert.Equal(t, rpcBlocks[0].BlockNumber, lpBlocks[0].BlockNumber) assert.Equal(t, rpcBlocks[0].BlockHash, lpBlocks[0].BlockHash) assert.Equal(t, rpcBlocks[0].FinalizedBlockNumber, lpBlocks[0].FinalizedBlockNumber) @@ -1423,12 +1423,6 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { _, err = th.LogPoller.GetBlocksRange(ctx, blockNums) require.Error(t, err) assert.Contains(t, err.Error(), "context canceled") - - // test canceled ctx - ctx, cancel = context.WithCancel(testutils.Context(t)) - cancel() - _, err = th.LogPoller.GetBlocksRange(ctx, blockNums) - require.Equal(t, err, context.Canceled) } func TestGetReplayFromBlock(t *testing.T) { @@ -1957,12 +1951,14 @@ func Test_PruneOldBlocks(t *testing.T) { th := SetupTH(t, lpOpts) for i := 1; i <= tt.blockToCreate; i++ { + parentHash := common.Hash(utils.RandomBytes32()) err := th.ORM.InsertBlocks(ctx, []logpoller.LogPollerBlock{ { BlockHash: utils.RandomBytes32(), BlockNumber: int64(i + 10), BlockTimestamp: time.Now(), FinalizedBlockNumber: int64(i), + ParentBlockHash: &parentHash, }, }) require.NoError(t, err) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 63cfa9a56df..aec3654f8cb 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -721,6 +721,7 @@ func (o *DSORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]L WHERE block_number >= :start_block AND block_number <= :end_block AND evm_chain_id = :evm_chain_id + AND parent_block_hash IS NOT NULL ORDER BY block_number ASC`) var blocks []LogPollerBlock diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 177db329305..f2cdbd53700 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -35,9 +35,10 @@ import ( ) type block struct { - number int64 - hash common.Hash - timestamp int64 + number int64 + hash common.Hash + parentHash *common.Hash + timestamp int64 } var lpOpts = logpoller.Opts{ @@ -99,6 +100,8 @@ func TestLogPoller_Batching(t *testing.T) { require.Equal(t, len(logs), len(lgs)) } +func ptr[T any](v T) *T { return &v } + func TestORM_GetBlocks_From_Range(t *testing.T) { th := SetupTH(t, lpOpts) o1 := th.ORM @@ -106,37 +109,43 @@ func TestORM_GetBlocks_From_Range(t *testing.T) { // Insert many blocks and read them back together blocks := []block{ { - number: 10, - hash: common.HexToHash("0x111"), - timestamp: 0, + number: 10, + hash: common.HexToHash("0x111"), + parentHash: ptr(common.HexToHash("0x110")), + timestamp: 0, }, { - number: 11, - hash: common.HexToHash("0x112"), - timestamp: 10, + number: 11, + hash: common.HexToHash("0x112"), + parentHash: ptr(common.HexToHash("0x111")), + timestamp: 10, }, { - number: 12, - hash: common.HexToHash("0x113"), - timestamp: 20, + number: 12, + hash: common.HexToHash("0x113"), + parentHash: ptr(common.HexToHash("0x112")), + timestamp: 20, }, { - number: 13, - hash: common.HexToHash("0x114"), - timestamp: 30, + number: 13, + hash: common.HexToHash("0x114"), + parentHash: ptr(common.HexToHash("0x113")), + timestamp: 30, }, { - number: 14, - hash: common.HexToHash("0x115"), - timestamp: 40, + number: 14, + hash: common.HexToHash("0x115"), + parentHash: ptr(common.HexToHash("0x114")), + timestamp: 40, }, } for _, b := range blocks { require.NoError(t, o1.InsertBlocks(ctx, []logpoller.LogPollerBlock{ { - BlockHash: b.hash, - BlockNumber: b.number, - BlockTimestamp: time.Unix(b.timestamp, 0).UTC(), + BlockHash: b.hash, + BlockNumber: b.number, + BlockTimestamp: time.Unix(b.timestamp, 0).UTC(), + ParentBlockHash: b.parentHash, }, })) } @@ -159,6 +168,19 @@ func TestORM_GetBlocks_From_Range(t *testing.T) { lpBlocks3, err := o1.GetBlocksRange(ctx, 15, 15) require.NoError(t, err) assert.Len(t, lpBlocks3, 0) + + // Ignore blocks without parent hash + require.NoError(t, o1.InsertBlocks(ctx, []logpoller.LogPollerBlock{ + { + BlockHash: common.HexToHash("0x116"), + BlockNumber: 15, + BlockTimestamp: time.Unix(50, 0).UTC(), + }, + })) + + lpBlocks4, err := o1.GetBlocksRange(ctx, 16, 16) + require.NoError(t, err) + assert.Len(t, lpBlocks4, 0) } func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { @@ -167,8 +189,11 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { ctx := testutils.Context(t) // Insert many blocks and read them back together var recentBlocks []block + var parentHash common.Hash for i := 1; i <= 256; i++ { - recentBlocks = append(recentBlocks, block{number: int64(i), hash: common.HexToHash(fmt.Sprintf("0x%d", i))}) + hash := common.HexToHash(fmt.Sprintf("0x%d", i)) + recentBlocks = append(recentBlocks, block{number: int64(i), hash: hash, parentHash: &parentHash}) + parentHash = hash } for _, b := range recentBlocks { require.NoError(t, o1.InsertBlocks(ctx, []logpoller.LogPollerBlock{ @@ -177,6 +202,7 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { BlockNumber: b.number, BlockTimestamp: time.Now(), FinalizedBlockNumber: 0, + ParentBlockHash: b.parentHash, }, })) } @@ -214,6 +240,7 @@ func TestORM(t *testing.T) { BlockNumber: 10, BlockTimestamp: time.Now(), FinalizedBlockNumber: 0, + ParentBlockHash: ptr(common.HexToHash("0x1235")), }, })) b, err := o1.SelectBlockByHash(ctx, common.HexToHash("0x1234")) @@ -221,7 +248,7 @@ func TestORM(t *testing.T) { assert.Equal(t, b.BlockNumber, int64(10)) assert.Equal(t, b.BlockHash.Bytes(), common.HexToHash("0x1234").Bytes()) assert.Equal(t, b.EvmChainId.String(), th.ChainID.String()) - assert.Nil(t, b.ParentBlockHash) + assert.Equal(t, common.HexToHash("0x1235"), *b.ParentBlockHash) // Insert blocks from a different chain parentHash := common.HexToHash("0x1233")