Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCF-3059 Hardening LogPoller replay (#12484) #649

Merged
merged 2 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/small-beers-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Making LogPoller's replay more robust by backfilling up to finalized block and processing rest in the main loop
44 changes: 43 additions & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"database/sql"
"encoding/binary"
"errors"
"fmt"
"math/big"
"sort"
Expand Down Expand Up @@ -376,7 +377,13 @@ func (lp *logPoller) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQ
// If ctx is cancelled before the replay request has been initiated, ErrReplayRequestAborted is returned. If the replay
// is already in progress, the replay will continue and ErrReplayInProgress will be returned. If the client needs a
// guarantee that the replay is complete before proceeding, it should either avoid cancelling or retry until nil is returned
func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) {
defer func() {
if errors.Is(err, context.Canceled) {
err = ErrReplayRequestAborted
}
}()

lp.lggr.Debugf("Replaying from block %d", fromBlock)
latest, err := lp.ec.HeadByNumber(ctx, nil)
if err != nil {
Expand All @@ -385,6 +392,27 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
if fromBlock < 1 || fromBlock > latest.Number {
return pkgerrors.Errorf("Invalid replay block number %v, acceptable range [1, %v]", fromBlock, latest.Number)
}

// Backfill all logs up to the latest saved finalized block outside the LogPoller's main loop.
// This is safe, because chain cannot be rewinded deeper than that, so there must not be any race conditions.
savedFinalizedBlockNumber, err := lp.savedFinalizedBlockNumber(ctx)
if err != nil {
return err
}
if fromBlock <= savedFinalizedBlockNumber {
err = lp.backfill(ctx, fromBlock, savedFinalizedBlockNumber)
if err != nil {
return err
}
}

// Poll everything after latest finalized block in main loop to avoid concurrent writes during reorg
// We assume that number of logs between saved finalized block and current head is small enough to be processed in main loop
fromBlock = mathutil.Max(fromBlock, savedFinalizedBlockNumber+1)
// Don't continue if latest block number is the same as saved finalized block number
if fromBlock > latest.Number {
return nil
}
// Block until replay notification accepted or cancelled.
select {
case lp.replayStart <- fromBlock:
Expand All @@ -403,6 +431,20 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
}
}

// savedFinalizedBlockNumber returns the FinalizedBlockNumber saved with the last processed block in the db
// (latestFinalizedBlock at the time the last processed block was saved)
// If this is the first poll and no blocks are in the db, it returns 0
func (lp *logPoller) savedFinalizedBlockNumber(ctx context.Context) (int64, error) {
latestProcessed, err := lp.LatestBlock(pg.WithParentCtx(ctx))
if err == nil {
return latestProcessed.FinalizedBlockNumber, nil
}
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}

func (lp *logPoller) recvReplayComplete() {
err := <-lp.replayComplete
if err != nil {
Expand Down
49 changes: 37 additions & 12 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
Expand Down Expand Up @@ -253,6 +254,7 @@ func TestLogPoller_Replay(t *testing.T) {
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
ctx := testutils.Context(t)

head := evmtypes.Head{Number: 4}
events := []common.Hash{EmitterABI.Events["Log1"].ID}
Expand All @@ -268,7 +270,7 @@ func TestLogPoller_Replay(t *testing.T) {

ec := evmclimocks.NewClient(t)
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once()
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice()
ec.On("ConfiguredChainID").Return(chainID, nil)
lpOpts := Opts{
PollPeriod: time.Hour,
Expand All @@ -285,12 +287,13 @@ func TestLogPoller_Replay(t *testing.T) {
latest, err := lp.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(4), latest.BlockNumber)
require.Equal(t, int64(1), latest.FinalizedBlockNumber)

t.Run("abort before replayStart received", func(t *testing.T) {
// Replay() should abort immediately if caller's context is cancelled before request signal is read
ctx, cancel := context.WithCancel(testutils.Context(t))
cancelCtx, cancel := context.WithCancel(testutils.Context(t))
cancel()
err = lp.Replay(ctx, 3)
err = lp.Replay(cancelCtx, 3)
assert.ErrorIs(t, err, ErrReplayRequestAborted)
})

Expand All @@ -305,12 +308,11 @@ func TestLogPoller_Replay(t *testing.T) {

// Replay() should return error code received from replayComplete
t.Run("returns error code on replay complete", func(t *testing.T) {
ctx := testutils.Context(t)
anyErr := pkgerrors.New("any error")
done := make(chan struct{})
go func() {
defer close(done)
recvStartReplay(ctx, 1)
recvStartReplay(ctx, 2)
lp.replayComplete <- anyErr
}()
assert.ErrorIs(t, lp.Replay(ctx, 1), anyErr)
Expand All @@ -319,14 +321,14 @@ func TestLogPoller_Replay(t *testing.T) {

// Replay() should return ErrReplayInProgress if caller's context is cancelled after replay has begun
t.Run("late abort returns ErrReplayInProgress", func(t *testing.T) {
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s
cancelCtx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s
done := make(chan struct{})
go func() {
defer close(done)
recvStartReplay(ctx, 4)
recvStartReplay(cancelCtx, 4)
cancel()
}()
assert.ErrorIs(t, lp.Replay(ctx, 4), ErrReplayInProgress)
assert.ErrorIs(t, lp.Replay(cancelCtx, 4), ErrReplayInProgress)
<-done
lp.replayComplete <- nil
lp.wg.Wait()
Expand All @@ -336,8 +338,6 @@ func TestLogPoller_Replay(t *testing.T) {
t.Run("client abort doesnt hang run loop", func(t *testing.T) {
lp.backupPollerNextBlock = 0

ctx := testutils.Context(t)

pass := make(chan struct{})
cancelled := make(chan struct{})

Expand Down Expand Up @@ -392,7 +392,6 @@ func TestLogPoller_Replay(t *testing.T) {
done := make(chan struct{})
defer func() { <-done }()

ctx := testutils.Context(t)
ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) {
go func() {
defer close(done)
Expand Down Expand Up @@ -425,7 +424,7 @@ func TestLogPoller_Replay(t *testing.T) {

lp.ReplayAsync(1)

recvStartReplay(testutils.Context(t), 1)
recvStartReplay(testutils.Context(t), 2)
})

t.Run("ReplayAsync error", func(t *testing.T) {
Expand All @@ -447,6 +446,32 @@ func TestLogPoller_Replay(t *testing.T) {
require.Equal(t, 1, observedLogs.Len())
assert.Equal(t, observedLogs.All()[0].Message, anyErr.Error())
})

t.Run("run regular replay when there are not blocks in db", func(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(0)
require.NoError(t, err)

lp.ReplayAsync(1)
recvStartReplay(testutils.Context(t), 1)
})

t.Run("run only backfill when everything is finalized", func(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(0)
require.NoError(t, err)

err = lp.orm.InsertLogsWithBlock([]Log{}, LogPollerBlock{
EvmChainId: ubig.New(chainID),
BlockHash: head.Hash,
BlockNumber: head.Number,
BlockTimestamp: head.Timestamp,
FinalizedBlockNumber: head.Number,
CreatedAt: time.Time{},
})
require.NoError(t, err)

err = lp.Replay(ctx, 1)
require.NoError(t, err)
})
}

func (lp *logPoller) reset() {
Expand Down
3 changes: 1 addition & 2 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {
time.Sleep(100 * time.Millisecond)
require.NoError(t, lp.Start(ctx))
require.Eventually(t, func() bool {
return observedLogs.Len() >= 4
return observedLogs.Len() >= 1
}, 2*time.Second, 20*time.Millisecond)
lp.Close()

Expand All @@ -1518,7 +1518,6 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {

assert.Contains(t, logMsgs, "SQL ERROR")
assert.Contains(t, logMsgs, "Failed loading filters in main logpoller loop, retrying later")
assert.Contains(t, logMsgs, "Error executing replay, could not get fromBlock")
}

type getLogErrData struct {
Expand Down
Loading