Skip to content

Commit

Permalink
concurrently fetch all blocks in a range
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Dec 24, 2024
1 parent 8a0c068 commit f189986
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 57 deletions.
117 changes: 96 additions & 21 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type logPoller struct {
finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized. If `useFinalityTag` is set to true, this value is ignored, because finalityDepth is fetched from chain
keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database
backfillBatchSize int64 // batch size to use when backfilling finalized logs
rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks
rpcBatchSize int // batch size to use for fallback RPC calls made in GetBlocks
logPrunePageSize int64
clientErrors config.ClientErrors
backupPollerNextBlock int64 // next block to be processed by Backup LogPoller
Expand Down Expand Up @@ -176,7 +176,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
rpcBatchSize: int(opts.RpcBatchSize),
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
clientErrors: opts.ClientErrors,
Expand Down Expand Up @@ -850,15 +850,15 @@ func convertTopics(topics []common.Hash) [][]byte {
return topicsForDB
}

// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber,
// whether or not there are any logs in the list from that block
func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) {
var numbers []uint64
for _, log := range logs {
numbers = append(numbers, log.BlockNumber)
}
if numbers[len(numbers)-1] != endBlockNumber {
numbers = append(numbers, endBlockNumber)
// blocksForLogs fetches all the blocks starting from block of the first log to the endBlockNumber.
// endBlockNumber is always fetched even if logs slice is empty.
func (lp *logPoller) blocksForLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) {
numbers := []uint64{endBlockNumber}
if len(logs) > 0 {
numbers = make([]uint64, 0, endBlockNumber-logs[0].BlockNumber+1)
for i := logs[0].BlockNumber; i <= endBlockNumber; i++ {
numbers = append(numbers, i)
}
}
return lp.GetBlocksRange(ctx, numbers)
}
Expand Down Expand Up @@ -891,7 +891,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
if len(gethLogs) == 0 {
continue
}
blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to))
blocks, err := lp.blocksForLogs(ctx, gethLogs, uint64(to))

Check failure on line 894 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

G115: integer overflow conversion int64 -> uint64 (gosec)
if err != nil {
return err
}
Expand Down Expand Up @@ -1366,6 +1366,7 @@ func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]Lo
// fillRemainingBlocksFromRPC sends a batch request for each block in blocksRequested, and converts them from
// geth blocks into LogPollerBlock structs. This is only intended to be used for requesting finalized blocks,
// if any of the blocks coming back are not finalized, an error will be returned
// NOTE: Does not guarantee to keep return blocks in the same order as requested
func (lp *logPoller) fillRemainingBlocksFromRPC(
ctx context.Context,
blocksRequested map[uint64]struct{},
Expand All @@ -1379,7 +1380,7 @@ func (lp *logPoller) fillRemainingBlocksFromRPC(
}
}

if len(remainingBlocks) > 0 {
if len(remainingBlocks) == 0 {
lp.lggr.Debugw("Falling back to RPC for blocks not found in log poller blocks table",
"remainingBlocks", remainingBlocks)
}
Expand All @@ -1394,6 +1395,7 @@ func (lp *logPoller) fillRemainingBlocksFromRPC(
logPollerBlocks[uint64(head.Number)] = LogPollerBlock{
EvmChainId: head.EVMChainID,
BlockHash: head.Hash,
ParentBlockHash: &head.ParentHash,
BlockNumber: head.Number,
BlockTimestamp: head.Timestamp,
FinalizedBlockNumber: head.Number, // always finalized; only matters if this block is returned by LatestBlock()
Expand Down Expand Up @@ -1482,24 +1484,97 @@ func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string,
return blocks, nil
}

func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) {
var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1)

func (lp *logPoller) startBlocksFetching(ctx context.Context, wg *sync.WaitGroup, requests <-chan []string, results chan []*evmtypes.Head, errs chan<- error) {
defer wg.Done()
validationReq := finalizedBlock
if !lp.useFinalityTag {
validationReq = latestBlock
}
for {
select {
case <-ctx.Done():
return
case request, ok := <-requests:
if !ok {
return
}
result, err := lp.fetchBlocks(ctx, request, validationReq)
if err != nil {
select {
case errs <- fmt.Errorf("failed to fetch blocks %v: %w", request, err):
return
case <-ctx.Done():
return
}
}

select {
case results <- result:
continue
case <-ctx.Done():
return
}
}
}
}

// batchFetchBlocks - fetches blocks concurrently each request if of batchSize.
// NOTE: Does not guarantee to keep return blocks in the same order as requested
func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int) ([]*evmtypes.Head, error) {
if len(blocksRequested) == 0 {
return []*evmtypes.Head{}, nil
}

for i := 0; i < len(blocksRequested); i += int(batchSize) {
j := i + int(batchSize)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
const fetchers = 10
requests := make(chan []string, len(blocksRequested)/batchSize+1)
// As we do not expect large number of batches, we can schedule all work at once to improve readability
for i := 0; i < len(blocksRequested); i += batchSize {
j := i + batchSize
if j > len(blocksRequested) {
j = len(blocksRequested)
}
moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq)
if err != nil {
requests <- blocksRequested[i:j]
}
close(requests)

// do work
var fetchingWg sync.WaitGroup
fetchingWg.Add(fetchers)
results := make(chan []*evmtypes.Head, fetchers)
errs := make(chan error)
for range fetchers {
go lp.startBlocksFetching(ctx, &fetchingWg, requests, results, errs)
}

// signal when done
lp.wg.Add(1)
go func() {
defer lp.wg.Done()
fetchingWg.Wait()
close(results)
}()

blocks := make([]*evmtypes.Head, 0, len(blocksRequested))
readLoop:
for {
select {
case err := <-errs:
// all child goroutine are stopped on exit
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
case result, ok := <-results:
if !ok {
break readLoop
}
blocks = append(blocks, result...)
}
blocks = append(blocks, moreBlocks...)
}

if ctx.Err() != nil {
return nil, ctx.Err()
}

return blocks, nil
Expand Down
22 changes: 9 additions & 13 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
UseFinalityTag: false,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
RpcBatchSize: 1,
KeepFinalizedBlocksDepth: 1000,
}
th := SetupTH(t, lpOpts)
Expand All @@ -1340,21 +1340,21 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
blockNums := []uint64{}
blocks, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums)
require.NoError(t, err)
assert.Equal(t, 0, len(blocks))
assert.Empty(t, blocks)

// LP retrieves block 1
blockNums = []uint64{1}
blocks, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums)
require.NoError(t, err)
assert.Equal(t, 1, len(blocks))
require.Len(t, blocks, 1)
assert.Equal(t, 1, int(blocks[0].BlockNumber))
assert.Equal(t, 1, int(blocks[0].FinalizedBlockNumber))

// LP fails to return block 2 because it hasn't been finalized yet
blockNums = []uint64{2}
_, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums)
require.Error(t, err)
assert.Equal(t, "Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error())
assert.Equal(t, "failed to fetch blocks [0x2]: Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error())

th.Backend.Commit() // Commit block #4, so that block #2 is finalized

Expand All @@ -1365,7 +1365,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
// getBlocksRange is able to retrieve block 2 by calling RPC
rpcBlocks, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums)
require.NoError(t, err)
assert.Equal(t, 1, len(rpcBlocks))
require.Len(t, rpcBlocks, 1)
assert.Equal(t, 2, int(rpcBlocks[0].BlockNumber))
assert.Equal(t, 2, int(rpcBlocks[0].FinalizedBlockNumber))

Expand All @@ -1379,7 +1379,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
blockNums2 := []uint64{1, 3}
rpcBlocks2, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums2)
require.NoError(t, err)
assert.Equal(t, 2, len(rpcBlocks2))
require.Len(t, rpcBlocks2, 2)
assert.Equal(t, 1, int(rpcBlocks2[0].BlockNumber))
assert.Equal(t, 3, int(rpcBlocks2[1].BlockNumber))
assert.Equal(t, 3, int(rpcBlocks2[1].FinalizedBlockNumber))
Expand All @@ -1393,7 +1393,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
// getBlocksRange should still be able to return block 2 by fetching from DB
lpBlocks, err := th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums)
require.NoError(t, err)
assert.Equal(t, 1, len(lpBlocks))
require.Len(t, lpBlocks, 1)
assert.Equal(t, rpcBlocks[0].BlockNumber, lpBlocks[0].BlockNumber)
assert.Equal(t, rpcBlocks[0].BlockHash, lpBlocks[0].BlockHash)
assert.Equal(t, rpcBlocks[0].FinalizedBlockNumber, lpBlocks[0].FinalizedBlockNumber)
Expand Down Expand Up @@ -1423,12 +1423,6 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
_, err = th.LogPoller.GetBlocksRange(ctx, blockNums)
require.Error(t, err)
assert.Contains(t, err.Error(), "context canceled")

// test canceled ctx
ctx, cancel = context.WithCancel(testutils.Context(t))
cancel()
_, err = th.LogPoller.GetBlocksRange(ctx, blockNums)
require.Equal(t, err, context.Canceled)
}

func TestGetReplayFromBlock(t *testing.T) {
Expand Down Expand Up @@ -1957,12 +1951,14 @@ func Test_PruneOldBlocks(t *testing.T) {
th := SetupTH(t, lpOpts)

for i := 1; i <= tt.blockToCreate; i++ {
parentHash := common.Hash(utils.RandomBytes32())
err := th.ORM.InsertBlocks(ctx, []logpoller.LogPollerBlock{
{
BlockHash: utils.RandomBytes32(),
BlockNumber: int64(i + 10),
BlockTimestamp: time.Now(),
FinalizedBlockNumber: int64(i),
ParentBlockHash: &parentHash,
},
})
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ func (o *DSORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]L
WHERE block_number >= :start_block
AND block_number <= :end_block
AND evm_chain_id = :evm_chain_id
AND parent_block_hash IS NOT NULL
ORDER BY block_number ASC`)

var blocks []LogPollerBlock
Expand Down
Loading

0 comments on commit f189986

Please sign in to comment.