From accbf0fe9647f36bab9016f75b48a9338546ae7d Mon Sep 17 00:00:00 2001 From: Domino Valdano Date: Wed, 16 Oct 2024 08:49:15 -0700 Subject: [PATCH] [BCFR-899] MaxLogsKept implementation (#14574) * Add PruneExcessLogs for MaxLogsKept * Update contract_transmitter.go for MaxLogsKept * Fix unreachable code * pnpm changeset * Re-use block-range paging from DeleteBlocksBefore for SelectExcessLogs Also: add block_number >= lower * Use ExecPagedQuery for SelectUnmatchedLogIDs Also: add deduping of ids for logs matching multiple filters * Improve logging * Add test for SelectExcessLogIDs Also, remove some extraneous lines in orm_test.go * Only activate count-based log pruning when needed * Typo in changeset * Refactor ExecPagedQuery into method of generic type RangedQuery[T] * Fix setting of countBasedPruningActive flag * Change sql comments to go comments * Address remaining PR comments - Remove topics from SelectExcessLogs query - Early exit from loadFilters - upper >= end * Take Jordan's suggestion of replacing *atomic.Bool with atomic.Bool --- .changeset/flat-horses-argue.md | 5 + core/chains/evm/logpoller/log_poller.go | 94 ++++++-- core/chains/evm/logpoller/observability.go | 18 +- core/chains/evm/logpoller/orm.go | 158 +++++++++--- core/chains/evm/logpoller/orm_test.go | 228 +++++++++++++++++- .../relay/evm/contract_transmitter.go | 12 +- 6 files changed, 448 insertions(+), 67 deletions(-) create mode 100644 .changeset/flat-horses-argue.md diff --git a/.changeset/flat-horses-argue.md b/.changeset/flat-horses-argue.md new file mode 100644 index 00000000000..08f151cd5aa --- /dev/null +++ b/.changeset/flat-horses-argue.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added LogPoller MaxLogsKept feature: recency count-based instead of time based log retention diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index b19e3f53c39..eeba2b43df4 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -134,7 +134,8 @@ type logPoller struct { // Usually the only way to recover is to manually remove the offending logs and block from the database. // LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should // recover automatically without needing to restart the LogPoller. - finalityViolated *atomic.Bool + finalityViolated atomic.Bool + countBasedLogPruningActive atomic.Bool } type Opts struct { @@ -179,7 +180,6 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke clientErrors: opts.ClientErrors, filters: make(map[string]Filter), filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. - finalityViolated: new(atomic.Bool), } } @@ -217,6 +217,12 @@ func (filter *Filter) Contains(other *Filter) bool { if other == nil { return true } + if other.Retention != filter.Retention { + return false + } + if other.MaxLogsKept != filter.MaxLogsKept { + return false + } addresses := make(map[common.Address]interface{}) for _, addr := range filter.Addresses { addresses[addr] = struct{}{} @@ -282,7 +288,7 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { lp.lggr.Warnw("Filter already present, no-op", "name", filter.Name, "filter", filter) return nil } - lp.lggr.Warnw("Updating existing filter with more events or addresses", "name", filter.Name, "filter", filter) + lp.lggr.Warnw("Updating existing filter", "name", filter.Name, "filter", filter) } if err := lp.orm.InsertFilter(ctx, filter); err != nil { @@ -290,6 +296,9 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { } lp.filters[filter.Name] = filter lp.filterDirty = true + if filter.MaxLogsKept > 0 { + lp.countBasedLogPruningActive.Store(true) + } return nil } @@ -545,18 +554,38 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i return mathutil.Min(requested, lastProcessed.BlockNumber), nil } +// loadFilters loads the filters from db, and activates count-based Log Pruning +// if required by any of the filters func (lp *logPoller) loadFilters(ctx context.Context) error { + filters, err := lp.lockAndLoadFilters(ctx) + if err != nil { + return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + } + if lp.countBasedLogPruningActive.Load() { + return nil + } + for _, filter := range filters { + if filter.MaxLogsKept != 0 { + lp.countBasedLogPruningActive.Store(true) + return nil + } + } + return nil +} + +// lockAndLoadFilters is the part of loadFilters() requiring a filterMu lock +func (lp *logPoller) lockAndLoadFilters(ctx context.Context) (filters map[string]Filter, err error) { lp.filterMu.Lock() defer lp.filterMu.Unlock() - filters, err := lp.orm.LoadFilters(ctx) + filters, err = lp.orm.LoadFilters(ctx) if err != nil { - return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + return filters, err } lp.filters = filters lp.filterDirty = true - return nil + return filters, nil } // tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period @@ -665,31 +694,41 @@ func (lp *logPoller) backgroundWorkerRun() { case <-ctx.Done(): return case <-blockPruneTick: + lp.lggr.Infow("pruning old blocks") blockPruneTick = tickWithDefaultJitter(blockPruneInterval) if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil { - lp.lggr.Errorw("Unable to prune old blocks", "err", err) + lp.lggr.Errorw("unable to prune old blocks", "err", err) } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new blocks blockPruneTick = tickWithDefaultJitter(blockPruneShortInterval) + lp.lggr.Warnw("reached page limit while pruning old blocks") + } else { + lp.lggr.Debugw("finished pruning old blocks") } case <-logPruneTick: logPruneTick = tickWithDefaultJitter(logPruneInterval) + lp.lggr.Infof("pruning expired logs") if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil { - lp.lggr.Errorw("Unable to prune expired logs", "err", err) + lp.lggr.Errorw("unable to prune expired logs", "err", err) } else if !allRemoved { + lp.lggr.Warnw("reached page limit while pruning expired logs") // Tick faster when cleanup can't keep up with the pace of new logs logPruneTick = tickWithDefaultJitter(logPruneShortInterval) - } else if successfulExpiredLogPrunes == 20 { + } else if successfulExpiredLogPrunes >= 20 { // Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times // since the last time unmatched logs were pruned + lp.lggr.Infof("finished pruning expired logs: pruning unmatched logs") if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil { - lp.lggr.Errorw("Unable to prune unmatched logs", "err", err) + lp.lggr.Errorw("unable to prune unmatched logs", "err", err) } else if !allRemoved { + lp.lggr.Warnw("reached page limit while pruning unmatched logs") logPruneTick = tickWithDefaultJitter(logPruneShortInterval) } else { + lp.lggr.Debugw("finished pruning unmatched logs") successfulExpiredLogPrunes = 0 } } else { + lp.lggr.Debugw("finished pruning expired logs") successfulExpiredLogPrunes++ } } @@ -1097,7 +1136,8 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He } // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block. -// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true. +// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, then it +// will always return true unless there is an actual error. func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) { latestBlock, err := lp.orm.SelectLatestBlock(ctx) if err != nil { @@ -1121,13 +1161,39 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) { return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } -// PruneExpiredLogs logs that are older than their retention period defined in Filter. -// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true. +// PruneExpiredLogs will attempt to remove any logs which have passed their retention period. Returns whether all expired +// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { + done := true + rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize) - return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err + if err != nil { + lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err) + return false, err + } else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize { + done = false + } + + if !lp.countBasedLogPruningActive.Load() { + return done, err + } + + rowIDs, err := lp.orm.SelectExcessLogIDs(ctx, lp.logPrunePageSize) + if err != nil { + lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err) + return false, err + } + rowsRemoved, err = lp.orm.DeleteLogsByRowID(ctx, rowIDs) + if err != nil { + lp.lggr.Errorw("Unable to prune excess logs", "err", err) + } else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize { + done = false + } + return done, err } +// PruneUnmatchedLogs will attempt to remove any logs which no longer match a registered filter. Returns whether all unmatched +// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) { ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize) if err != nil { diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 59b93fffdaf..776fe5bf215 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -136,9 +136,9 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) }) } -func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { - return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) { - return o.ORM.DeleteLogsByRowID(ctx, rowIDs) +func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) { + return o.ORM.DeleteExpiredLogs(ctx, limit) }) } @@ -148,9 +148,15 @@ func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (i }) } -func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { - return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) { - return o.ORM.DeleteExpiredLogs(ctx, limit) +func (o *ObservedORM) SelectExcessLogIDs(ctx context.Context, limit int64) ([]uint64, error) { + return withObservedQueryAndResults[uint64](o, "SelectExcessLogIDs", func() ([]uint64, error) { + return o.ORM.SelectExcessLogIDs(ctx, limit) + }) +} + +func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) { + return o.ORM.DeleteLogsByRowID(ctx, rowIDs) }) } diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 30cd19e0447..4d7cf33ebec 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -40,6 +40,7 @@ type ORM interface { DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) + SelectExcessLogIDs(ctx context.Context, limit int64) (rowIDs []uint64, err error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) @@ -294,24 +295,36 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig return &l, nil } -// DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. -// Otherwise, it will delete all blocks at once. -func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { - var result sql.Result - var err error +type RangeQueryer[T comparable] struct { + chainID *ubig.Big + ds sqlutil.DataSource + query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error) + acc []T +} +func NewRangeQueryer[T comparable](evmChainID *big.Int, ds sqlutil.DataSource, query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error)) *RangeQueryer[T] { + return &RangeQueryer[T]{ + chainID: ubig.New(evmChainID), + ds: ds, + query: query, + } +} + +// ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number +// of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks. +// The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once +// the limit on results is reached or block_number = end. The query will never be executed on blocks where block_number > end, and +// it will never be executed on block_number = B unless it has also been executed on all blocks with block_number < B +// r.AddResults(moreResults []T) should be called if this is a query returning results (ie, SELECT). These will be accumulated in +// r.acc and can be retrieved later with r.AllResults() +func (r *RangeQueryer[T]) ExecPagedQuery(ctx context.Context, limit, end int64) (rowsAffected int64, err error) { if limit == 0 { - result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks - WHERE block_number <= $1 AND evm_chain_id = $2`, end, ubig.New(o.chainID)) - if err != nil { - return 0, err - } - return result.RowsAffected() + return r.query(ctx, r, 0, end) } - var limitBlock int64 - err = o.ds.GetContext(ctx, &limitBlock, `SELECT MIN(block_number) FROM evm.log_poller_blocks - WHERE evm_chain_id = $1`, ubig.New(o.chainID)) + var start int64 + err = r.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks + WHERE evm_chain_id = $1`, r.chainID) if err != nil { if errors.Is(err, sql.ErrNoRows) { return 0, nil @@ -320,27 +333,46 @@ func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) } // Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion - var deleted, rows int64 - for limitBlock += (limit - 1); deleted < limit; limitBlock += limit { - if limitBlock > end { - limitBlock = end - } - result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE block_number <= $1 AND evm_chain_id = $2`, limitBlock, ubig.New(o.chainID)) - if err != nil { - return deleted, err + var upper int64 + for lower := start; rowsAffected < limit; lower = upper + 1 { + upper = lower + limit - 1 + if upper > end { + upper = end } - if rows, err = result.RowsAffected(); err != nil { - return deleted, err + rows, err2 := r.query(ctx, r, lower, upper) + if err2 != nil { + return rowsAffected, err2 } + rowsAffected += rows - deleted += rows - - if limitBlock == end { + if upper >= end { break } } - return deleted, err + return rowsAffected, nil +} + +func (r *RangeQueryer[T]) AddResults(moreResults []T) { + r.acc = append(r.acc, moreResults...) +} + +func (r *RangeQueryer[T]) AllResults() []T { + return r.acc +} + +// DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. +// Otherwise, it will delete all blocks at once. +func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { + q := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { + result, err := r.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`, + r.chainID, lower, upper) + if err != nil { + return 0, err + } + return result.RowsAffected() + }) + return q.ExecPagedQuery(ctx, limit, end) } func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { @@ -386,23 +418,79 @@ type Exp struct { } func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { - query := ` - SELECT l.id FROM evm.logs l LEFT JOIN ( + batchLogsSubQuery := `SELECT id, evm_chain_id, address, event_sig FROM evm.logs + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3` + + query := fmt.Sprintf(` + SELECT l.id FROM (%s) l LEFT JOIN ( SELECT evm_chain_id, address, event FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event WHERE l.evm_chain_id = $1 AND r.evm_chain_id IS NULL - ` + `, batchLogsSubQuery) - if limit == 0 { - err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID)) + latestBlock, err := o.SelectLatestBlock(ctx) + if err != nil { return ids, err } - err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit), ubig.New(o.chainID)) - return ids, err + r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { + var rowIDs []uint64 + err2 := r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper) + if err2 != nil { + return 0, err2 + } + r.AddResults(rowIDs) + return int64(len(rowIDs)), nil + }) + + _, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber) + + return r.AllResults(), err +} + +// SelectExcessLogIDs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match. +func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []uint64, err error) { + // Roll up the filter table into 1 row per filter + withSubQuery := ` + SELECT name, + ARRAY_AGG(address) AS addresses, ARRAY_AGG(event) AS events, + MAX(max_logs_kept) AS max_logs_kept -- Should all be the same, just need MAX for GROUP BY + FROM evm.log_poller_filters WHERE evm_chain_id=$1 + GROUP BY name` + + // Count logs matching each filter in reverse order, labeling anything after the filter.max_logs_kept'th with old=true + countLogsSubQuery := ` + SELECT l.id, block_number, log_index, max_logs_kept != 0 AND + ROW_NUMBER() OVER(PARTITION BY f.name ORDER BY block_number, log_index DESC) > max_logs_kept AS old + FROM filters f JOIN evm.logs l ON + l.address = ANY(f.addresses) AND l.event_sig = ANY(f.events) + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3 + ` + + // Return all logs considered "old" by every filter they match + query := fmt.Sprintf(`WITH filters AS ( %s ) SELECT id FROM ( %s ) x GROUP BY id, block_number, log_index HAVING BOOL_AND(old)`, + withSubQuery, countLogsSubQuery) + + latestBlock, err := o.SelectLatestBlock(ctx) + if err != nil { + return results, err + } + + r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { + var rowIDs []uint64 + err = r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper) + if err != nil { + return 0, err + } + r.AddResults(rowIDs) + return int64(len(rowIDs)), err + }) + _, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber) + + return r.AllResults(), err } // DeleteExpiredLogs removes any logs which either: diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 7ebc97bb835..6e618ba9cef 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "database/sql" + "errors" "fmt" "math" "math/big" @@ -11,10 +12,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/ethereum/go-ethereum/common" - "github.com/jackc/pgx/v4" pkgerrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -456,7 +458,7 @@ func TestORM(t *testing.T) { require.Equal(t, 2, len(lgs)) require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1237"), 16, time.Now(), 0)) - require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1238"), 17, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1238"), 17, time.Now(), 17)) filter0 := logpoller.Filter{ Name: "permanent retention filter", @@ -545,16 +547,160 @@ func TestORM(t *testing.T) { assert.Zero(t, len(logs)) } -type PgxLogger struct { - lggr logger.Logger -} +func TestORM_SelectExcessLogs(t *testing.T) { + t.Parallel() + th := SetupTH(t, lpOpts) + o1 := th.ORM + o2 := th.ORM2 + ctx := testutils.Context(t) -func NewPgxLogger(lggr logger.Logger) PgxLogger { - return PgxLogger{lggr} -} + topic := common.HexToHash("0x1599") + topic2 := common.HexToHash("0x1600") -func (l PgxLogger) Log(ctx context.Context, log pgx.LogLevel, msg string, data map[string]interface{}) { + blockHashes := []common.Hash{common.HexToHash("0x1234"), common.HexToHash("0x1235"), common.HexToHash("0x1236")} + // Insert blocks for active chain + for i := int64(0); i < 3; i++ { + blockNumber := 10 + i + require.NoError(t, o1.InsertBlock(ctx, blockHashes[i], blockNumber, time.Now(), blockNumber)) + b1, err := o1.SelectBlockByHash(ctx, blockHashes[i]) + require.NoError(t, err) + require.Equal(t, blockNumber, b1.BlockNumber) + } + + // Insert block from a different chain + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1234"), 17, time.Now(), 17)) + b, err := o2.SelectBlockByHash(ctx, common.HexToHash("0x1234")) + require.NoError(t, err) + require.Equal(t, int64(17), b.BlockNumber) + + for i := int64(0); i < 7; i++ { + require.NoError(t, o1.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(10), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1234"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(11), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(12), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + })) + } + + logs, err := o1.SelectLogsByBlockRange(ctx, 1, 12) + require.NoError(t, err) + require.Len(t, logs, 21) + + // Insert a log on a different chain, to make sure + // it's not affected by any operations on the chain LogPoller + // is managing. + require.NoError(t, o2.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 8, + BlockHash: common.HexToHash("0x1238"), + BlockNumber: int64(17), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("same log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + })) + + logs, err = o2.SelectLogsByBlockRange(ctx, 1, 17) + require.NoError(t, err) + require.Len(t, logs, 1) + + filter1 := logpoller.Filter{ + Name: "MaxLogsKept = 0 (addr 1234 topic1)", + Addresses: []common.Address{common.HexToAddress("0x1234")}, + EventSigs: types.HashArray{topic}, + MaxLogsKept: 0, + } + + filter12 := logpoller.Filter{ // retain both topic1 and topic2 on contract3 for at least 1ms + Name: "MaxLogsKept = 1 (addr 1235 topic1 & topic2)", + Addresses: []common.Address{common.HexToAddress("0x1235")}, + EventSigs: types.HashArray{topic, topic2}, + Retention: time.Millisecond, + MaxLogsKept: 1, + } + filter2 := logpoller.Filter{ // retain topic2 on contract3 for at least 1 hour + Name: "MaxLogsKept = 5 (addr 1235 topic2)", + Addresses: []common.Address{common.HexToAddress("0x1235")}, + EventSigs: types.HashArray{topic2}, + MaxLogsKept: 5, + } + + // Test inserting filters and reading them back + require.NoError(t, o1.InsertFilter(ctx, filter1)) + require.NoError(t, o1.InsertFilter(ctx, filter12)) + require.NoError(t, o1.InsertFilter(ctx, filter2)) + + filters, err := o1.LoadFilters(ctx) + require.NoError(t, err) + require.Len(t, filters, 3) + assert.Equal(t, filter1, filters["MaxLogsKept = 0 (addr 1234 topic1)"]) + assert.Equal(t, filter12, filters["MaxLogsKept = 1 (addr 1235 topic1 & topic2)"]) + assert.Equal(t, filter2, filters["MaxLogsKept = 5 (addr 1235 topic2)"]) + + ids, err := o1.SelectUnmatchedLogIDs(ctx, 0) + require.NoError(t, err) + require.Len(t, ids, 0) + + // Number of excess logs eligible for pruning: + // 2 of the 7 matching filter2 + 6 of the 7 matching filter12 but not filter2 = 8 total of 21 + + // Test SelectExcessLogIDs with limit less than # blocks + // ( should only consider blocks 10 & 11, returning 6 excess events from block 11 + // but ignoring the 2 in block 12 ) + ids, err = o1.SelectExcessLogIDs(ctx, 2) + require.NoError(t, err) + assert.Len(t, ids, 6) + + // Test SelectExcessLogIDs with limit greater than # blocks: + ids, err = o1.SelectExcessLogIDs(ctx, 4) + require.NoError(t, err) + assert.Len(t, ids, 8) + + // Test SelectExcessLogIDs with no limit + ids, err = o1.SelectExcessLogIDs(ctx, 10) + require.NoError(t, err) + assert.Len(t, ids, 8) + + deleted, err := o1.DeleteLogsByRowID(ctx, ids) + require.NoError(t, err) + assert.Equal(t, int64(8), deleted) } func TestLogPollerFilters(t *testing.T) { @@ -1166,6 +1312,70 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { assertion(t, logs, err, startBlock, endBlock) } +type mockQueryExecutor struct { + mock.Mock +} + +func (m *mockQueryExecutor) Exec(ctx context.Context, r *logpoller.RangeQueryer[uint64], lower, upper int64) (int64, error) { + res := m.Called(lower, upper) + return int64(res.Int(0)), res.Error(1) +} + +func Test_ExecPagedQuery(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + lggr := logger.Test(t) + chainID := testutils.NewRandomEVMChainID() + db := pgtest.NewSqlxDB(t) + o := logpoller.NewORM(chainID, db, lggr) + + m := mockQueryExecutor{} + + queryError := errors.New("some error") + m.On("Exec", int64(0), int64(0)).Return(0, queryError).Once() + + // Should handle errors gracefully + r := logpoller.NewRangeQueryer(chainID, db, m.Exec) + _, err := r.ExecPagedQuery(ctx, 0, 0) + assert.ErrorIs(t, err, queryError) + + m.On("Exec", int64(0), int64(60)).Return(4, nil).Once() + + // Query should only get executed once with limitBlock=end if called with limit=0 + numResults, err := r.ExecPagedQuery(ctx, 0, 60) + require.NoError(t, err) + assert.Equal(t, int64(4), numResults) + + // Should report actual db errors + _, err = r.ExecPagedQuery(ctx, 300, 1000) + assert.Error(t, err) + + require.NoError(t, o.InsertBlock(ctx, common.HexToHash("0x1234"), 42, time.Now(), 0)) + + m.On("Exec", mock.Anything, mock.Anything).Return(3, nil) + + // Should get called with limitBlock = 342, 642, 942, 1000 + numResults, err = r.ExecPagedQuery(ctx, 300, 1000) + require.NoError(t, err) + assert.Equal(t, int64(12), numResults) // 3 results in each of 4 calls + m.AssertNumberOfCalls(t, "Exec", 6) // 4 new calls, plus the prior 2 + expectedLimitBlocks := [][]int64{{42, 341}, {342, 641}, {642, 941}, {942, 1000}} + for _, expected := range expectedLimitBlocks { + m.AssertCalled(t, "Exec", expected[0], expected[1]) + } + + // Should not go all the way to 1000, but stop after ~ 13 results have + // been returned + numResults, err = r.ExecPagedQuery(ctx, 15, 1000) + require.NoError(t, err) + assert.Equal(t, int64(15), numResults) + m.AssertNumberOfCalls(t, "Exec", 11) + expectedLimitBlocks = [][]int64{{42, 56}, {57, 71}, {72, 86}, {87, 101}, {102, 116}} // upper[n] = 42 + 15 * n - 1 for n = 1, 2, 3, 4, 5, lower[n] = upper[n-1] + 1 + for _, expected := range expectedLimitBlocks { + m.AssertCalled(t, "Exec", expected[0], expected[1]) + } +} + func TestORM_DeleteBlocksBefore(t *testing.T) { th := SetupTH(t, lpOpts) o1 := th.ORM diff --git a/core/services/relay/evm/contract_transmitter.go b/core/services/relay/evm/contract_transmitter.go index 65f0e42fc41..aead9f6ca8a 100644 --- a/core/services/relay/evm/contract_transmitter.go +++ b/core/services/relay/evm/contract_transmitter.go @@ -56,6 +56,12 @@ func WithRetention(retention time.Duration) OCRTransmitterOption { } } +func WithMaxLogsKept(maxLogsKept uint64) OCRTransmitterOption { + return func(ct *contractTransmitter) { + ct.maxLogsKept = maxLogsKept + } +} + func WithReportToEthMetadata(reportToEvmTxMeta ReportToEthMetadata) OCRTransmitterOption { return func(ct *contractTransmitter) { if reportToEvmTxMeta != nil { @@ -76,6 +82,7 @@ type contractTransmitter struct { reportToEvmTxMeta ReportToEthMetadata excludeSigs bool retention time.Duration + maxLogsKept uint64 } func transmitterFilterName(addr common.Address) string { @@ -108,15 +115,14 @@ func NewOCRContractTransmitter( reportToEvmTxMeta: reportToEvmTxMetaNoop, excludeSigs: false, retention: 0, + maxLogsKept: 0, } for _, opt := range opts { opt(newContractTransmitter) } - // TODO It would be better to keep MaxLogsKept = 1 for the OCR contract transmitter instead of Retention. We are always interested only in the latest log. - // Although MaxLogsKept is present in the Filter struct, it is not supported by LogPoller yet. - err := lp.RegisterFilter(ctx, logpoller.Filter{Name: transmitterFilterName(address), EventSigs: []common.Hash{transmitted.ID}, Addresses: []common.Address{address}, Retention: newContractTransmitter.retention}) + err := lp.RegisterFilter(ctx, logpoller.Filter{Name: transmitterFilterName(address), EventSigs: []common.Hash{transmitted.ID}, Addresses: []common.Address{address}, Retention: newContractTransmitter.retention, MaxLogsKept: newContractTransmitter.maxLogsKept}) if err != nil { return nil, err }