Skip to content

Commit

Permalink
fix: (log_poller backfill) exit on non-rpc err
Browse files Browse the repository at this point in the history
  • Loading branch information
bukata-sa committed Aug 14, 2024
1 parent 220ca2a commit 389875c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 73 deletions.
5 changes: 5 additions & 0 deletions .changeset/gorgeous-carpets-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix backfill error detection
10 changes: 4 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,10 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {

gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil))
if err != nil {
var rpcErr client.JsonError
if pkgerrors.As(err, &rpcErr) {
if rpcErr.Code != jsonRpcLimitExceeded {
lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to)
return err
}
var rpcErr *client.JsonError
if !pkgerrors.As(err, &rpcErr) || rpcErr.Code != jsonRpcLimitExceeded {
lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to)
return err
}
if batchSize == 1 {
lp.lggr.Criticalw("Too many log results in a single block, failed to retrieve logs! Node may be running in a degraded state.", "err", err, "from", from, "to", to, "LogBackfillBatchSize", lp.backfillBatchSize)
Expand Down
158 changes: 91 additions & 67 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logpoller_test

import (
"context"
"errors"
"fmt"
"math"
"math/big"
Expand Down Expand Up @@ -1541,15 +1542,12 @@ type getLogErrData struct {
Limit int
}

func TestTooManyLogResults(t *testing.T) {
func TestLogPoller_PollAndSaveLogsErrors(t *testing.T) {
ctx := testutils.Context(t)
ec := evmtest.NewEthClientMockWithDefaultChain(t)
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)

o := logpoller.NewORM(chainID, db, lggr)

lpOpts := logpoller.Opts{
PollPeriod: time.Hour,
FinalityDepth: 2,
Expand All @@ -1558,7 +1556,6 @@ func TestTooManyLogResults(t *testing.T) {
KeepFinalizedBlocksDepth: 1000,
}
headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)
expected := []int64{10, 5, 2, 1}

clientErr := client.JsonError{
Expand All @@ -1567,83 +1564,110 @@ func TestTooManyLogResults(t *testing.T) {
Message: "query returned more than 10000 results. Try with this block range [0x100E698, 0x100E6D4].",
}

// Simulate currentBlock = 300
head := &evmtypes.Head{Number: 300}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
call1 := ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) {
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) {
if blockNumber == nil {
require.FailNow(t, "unexpected call to get current head")
}
return &evmtypes.Head{Number: blockNumber.Int64()}, nil
})

call2 := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested
}
from := fq.FromBlock.Uint64()
to := fq.ToBlock.Uint64()
if to-from >= 4 {
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks
}
return logs, err
})
t.Run("Too many logs until batchSize=1", func(t *testing.T) {
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
o := logpoller.NewORM(chainID, db, lggr)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)

addr := testutils.NewAddress()
err := lp.RegisterFilter(ctx, logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock(ctx)
require.NoError(t, err2)
assert.Equal(t, int64(298), block.BlockNumber)

logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All()
// Should have tried again 3 times--first reducing batch size to 10, then 5, then 2
require.Len(t, logs, 3)
for i, s := range expected[:3] {
assert.Equal(t, s, logs[i].ContextMap()["newBatchSize"])
}
// Simulate currentBlock = 300
head := &evmtypes.Head{Number: 300}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()

obs.TakeAll()
call1.Unset()
call2.Unset()
filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested
}
from := fq.FromBlock.Uint64()
to := fq.ToBlock.Uint64()
if to-from >= 4 {
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks
}
return logs, err
})
defer filterCall.Unset()

// Now jump to block 500, but return error no matter how small the block range gets.
// Should exit the loop with a critical error instead of hanging.
head = &evmtypes.Head{Number: 500}
finalized = &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
call1.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) {
if blockNumber == nil {
require.FailNow(t, "unexpected call to get current head")
addr := testutils.NewAddress()
err := lp.RegisterFilter(ctx, logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock(ctx)
require.NoError(t, err2)
assert.Equal(t, int64(298), block.BlockNumber)

logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All()
// Should have tried again 3 times--first reducing batch size to 10, then 5, then 2
require.Len(t, logs, 3)
for i, s := range expected[:3] {
assert.Equal(t, s, logs[i].ContextMap()["newBatchSize"])
}
return &evmtypes.Head{Number: blockNumber.Int64()}, nil
})
call2.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested

t.Run("Too many logs always", func(t *testing.T) {
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
o := logpoller.NewORM(chainID, db, lggr)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)

// Now jump to block 500, but return error no matter how small the block range gets.
// Should exit the loop with a critical error instead of hanging.
head := &evmtypes.Head{Number: 500}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested
}
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks
})
defer filterCall.Unset()

lp.PollAndSaveLogs(ctx, 298)
block, err := o.SelectLatestBlock(ctx)
require.NoError(t, err)
assert.Equal(t, int64(298), block.BlockNumber)
warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All()
crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All()
require.Len(t, warns, 4)
for i, s := range expected {
assert.Equal(t, s, warns[i].ContextMap()["newBatchSize"])
}
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks

require.Len(t, crit, 1)
assert.Contains(t, crit[0].Message, "Too many log results in a single block")
})

lp.PollAndSaveLogs(ctx, 298)
block, err2 = o.SelectLatestBlock(ctx)
require.NoError(t, err2)
assert.Equal(t, int64(298), block.BlockNumber)
warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All()
crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All()
require.Len(t, warns, 4)
for i, s := range expected {
assert.Equal(t, s, warns[i].ContextMap()["newBatchSize"])
}
t.Run("Context canceled return err", func(t *testing.T) {
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
o := logpoller.NewORM(chainID, db, lggr)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)

head := &evmtypes.Head{Number: 300}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.ToBlock.Int64() >= 297 {
return nil, errors.New("context canceled")
}
return []types.Log{}, err
})
defer filterCall.Unset()

require.Len(t, crit, 1)
assert.Contains(t, crit[0].Message, "Too many log results in a single block")
lp.PollAndSaveLogs(ctx, 5)
errs := obs.FilterMessageSnippet("Unable to query for logs").FilterLevelExact(zapcore.ErrorLevel).All()
require.Len(t, errs, 1)
assert.Equal(t, "context canceled", errs[0].ContextMap()["err"])
})
}

func Test_PollAndQueryFinalizedBlocks(t *testing.T) {
Expand Down

0 comments on commit 389875c

Please sign in to comment.