Skip to content

Commit

Permalink
[BCFR-899] MaxLogsKept implementation (#14574)
Browse files Browse the repository at this point in the history
* Add PruneExcessLogs for MaxLogsKept

* Update contract_transmitter.go for MaxLogsKept

* Fix unreachable code

* pnpm changeset

* Re-use block-range paging from DeleteBlocksBefore for SelectExcessLogs

Also: add block_number >= lower

* Use ExecPagedQuery for SelectUnmatchedLogIDs

Also: add deduping of ids for logs matching multiple filters

* Improve logging

* Add test for SelectExcessLogIDs

Also, remove some extraneous lines in orm_test.go

* Only activate count-based log pruning when needed

* Typo in changeset

* Refactor ExecPagedQuery into method of generic type RangedQuery[T]

* Fix setting of countBasedPruningActive flag

* Change sql comments to go comments

* Address remaining PR comments

- Remove topics from SelectExcessLogs query
- Early exit from loadFilters
- upper >= end

* Take Jordan's suggestion of replacing *atomic.Bool with atomic.Bool
  • Loading branch information
reductionista authored Oct 16, 2024
1 parent 275bcf0 commit accbf0f
Show file tree
Hide file tree
Showing 6 changed files with 448 additions and 67 deletions.
5 changes: 5 additions & 0 deletions .changeset/flat-horses-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added LogPoller MaxLogsKept feature: recency count-based instead of time based log retention
94 changes: 80 additions & 14 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ type logPoller struct {
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
finalityViolated atomic.Bool
countBasedLogPruningActive atomic.Bool
}

type Opts struct {
Expand Down Expand Up @@ -179,7 +180,6 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke
clientErrors: opts.ClientErrors,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
}
}

Expand Down Expand Up @@ -217,6 +217,12 @@ func (filter *Filter) Contains(other *Filter) bool {
if other == nil {
return true
}
if other.Retention != filter.Retention {
return false
}
if other.MaxLogsKept != filter.MaxLogsKept {
return false
}
addresses := make(map[common.Address]interface{})
for _, addr := range filter.Addresses {
addresses[addr] = struct{}{}
Expand Down Expand Up @@ -282,14 +288,17 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error {
lp.lggr.Warnw("Filter already present, no-op", "name", filter.Name, "filter", filter)
return nil
}
lp.lggr.Warnw("Updating existing filter with more events or addresses", "name", filter.Name, "filter", filter)
lp.lggr.Warnw("Updating existing filter", "name", filter.Name, "filter", filter)
}

if err := lp.orm.InsertFilter(ctx, filter); err != nil {
return pkgerrors.Wrap(err, "error inserting filter")
}
lp.filters[filter.Name] = filter
lp.filterDirty = true
if filter.MaxLogsKept > 0 {
lp.countBasedLogPruningActive.Store(true)
}
return nil
}

Expand Down Expand Up @@ -545,18 +554,38 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

// loadFilters loads the filters from db, and activates count-based Log Pruning
// if required by any of the filters
func (lp *logPoller) loadFilters(ctx context.Context) error {
filters, err := lp.lockAndLoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}
if lp.countBasedLogPruningActive.Load() {
return nil
}
for _, filter := range filters {
if filter.MaxLogsKept != 0 {
lp.countBasedLogPruningActive.Store(true)
return nil
}
}
return nil
}

// lockAndLoadFilters is the part of loadFilters() requiring a filterMu lock
func (lp *logPoller) lockAndLoadFilters(ctx context.Context) (filters map[string]Filter, err error) {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(ctx)

filters, err = lp.orm.LoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
return filters, err
}

lp.filters = filters
lp.filterDirty = true
return nil
return filters, nil
}

// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period
Expand Down Expand Up @@ -665,31 +694,41 @@ func (lp *logPoller) backgroundWorkerRun() {
case <-ctx.Done():
return
case <-blockPruneTick:
lp.lggr.Infow("pruning old blocks")
blockPruneTick = tickWithDefaultJitter(blockPruneInterval)
if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
lp.lggr.Errorw("unable to prune old blocks", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new blocks
blockPruneTick = tickWithDefaultJitter(blockPruneShortInterval)
lp.lggr.Warnw("reached page limit while pruning old blocks")
} else {
lp.lggr.Debugw("finished pruning old blocks")
}
case <-logPruneTick:
logPruneTick = tickWithDefaultJitter(logPruneInterval)
lp.lggr.Infof("pruning expired logs")
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
lp.lggr.Errorw("unable to prune expired logs", "err", err)
} else if !allRemoved {
lp.lggr.Warnw("reached page limit while pruning expired logs")
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else if successfulExpiredLogPrunes == 20 {
} else if successfulExpiredLogPrunes >= 20 {
// Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times
// since the last time unmatched logs were pruned
lp.lggr.Infof("finished pruning expired logs: pruning unmatched logs")
if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune unmatched logs", "err", err)
lp.lggr.Errorw("unable to prune unmatched logs", "err", err)
} else if !allRemoved {
lp.lggr.Warnw("reached page limit while pruning unmatched logs")
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else {
lp.lggr.Debugw("finished pruning unmatched logs")
successfulExpiredLogPrunes = 0
}
} else {
lp.lggr.Debugw("finished pruning expired logs")
successfulExpiredLogPrunes++
}
}
Expand Down Expand Up @@ -1097,7 +1136,8 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
}

// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, then it
// will always return true unless there is an actual error.
func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
latestBlock, err := lp.orm.SelectLatestBlock(ctx)
if err != nil {
Expand All @@ -1121,13 +1161,39 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// PruneExpiredLogs logs that are older than their retention period defined in Filter.
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
// PruneExpiredLogs will attempt to remove any logs which have passed their retention period. Returns whether all expired
// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
done := true

rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
if err != nil {
lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)
return false, err
} else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize {
done = false
}

if !lp.countBasedLogPruningActive.Load() {
return done, err
}

rowIDs, err := lp.orm.SelectExcessLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)
return false, err
}
rowsRemoved, err = lp.orm.DeleteLogsByRowID(ctx, rowIDs)
if err != nil {
lp.lggr.Errorw("Unable to prune excess logs", "err", err)
} else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize {
done = false
}
return done, err
}

// PruneUnmatchedLogs will attempt to remove any logs which no longer match a registered filter. Returns whether all unmatched
// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered
func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) {
ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64)
})
}

func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowID(ctx, rowIDs)
func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
})
}

Expand All @@ -148,9 +148,15 @@ func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (i
})
}

func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
func (o *ObservedORM) SelectExcessLogIDs(ctx context.Context, limit int64) ([]uint64, error) {
return withObservedQueryAndResults[uint64](o, "SelectExcessLogIDs", func() ([]uint64, error) {
return o.ORM.SelectExcessLogIDs(ctx, limit)
})
}

func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowID(ctx, rowIDs)
})
}

Expand Down
Loading

0 comments on commit accbf0f

Please sign in to comment.