From f72baf0ae557f4880aececdeb572b66b0c669ea6 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 20 Mar 2024 12:31:27 +0200 Subject: [PATCH] fixes & logs --- .../evmregistry/v21/logprovider/buffer_v1.go | 72 ++++++++++--------- .../evmregistry/v21/logprovider/factory.go | 2 +- .../evmregistry/v21/logprovider/provider.go | 2 +- 3 files changed, 40 insertions(+), 36 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index c9c4363745e..3f49eb9f4dd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -43,7 +43,7 @@ func DefaultUpkeepSelector(id *big.Int) bool { type logBuffer struct { lggr logger.Logger // max number of logs to keep in the buffer for each upkeep per block - maxUpkeepLogs *atomic.Int32 + logLimitHigh *atomic.Int32 // number of blocks to keep in the buffer bufferSize *atomic.Int32 // last block number seen by the buffer @@ -53,15 +53,15 @@ type logBuffer struct { lock sync.RWMutex } -func NewLogBuffer(lggr logger.Logger, size, upkeepLogLimit int) LogBuffer { - s := new(atomic.Int32) - s.Add(int32(size)) - l := new(atomic.Int32) - l.Add(int32(upkeepLogLimit)) +func NewLogBuffer(lggr logger.Logger, lookback, logLimitHigh uint) LogBuffer { + bufferSize := new(atomic.Int32) + bufferSize.Add(int32(lookback)) + limitHigh := new(atomic.Int32) + limitHigh.Add(int32(logLimitHigh)) return &logBuffer{ lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), - maxUpkeepLogs: l, - bufferSize: s, + logLimitHigh: limitHigh, + bufferSize: bufferSize, lastBlockSeen: new(atomic.Int64), upkeepBuffers: make(map[string]*upkeepLogBuffer), } @@ -72,10 +72,11 @@ func (b *logBuffer) SetConfig(lookback, logLimitHigh int) { defer b.lock.Unlock() b.bufferSize.Store(int32(lookback)) - b.maxUpkeepLogs.Store(int32(logLimitHigh)) + b.logLimitHigh.Store(int32(logLimitHigh)) + cap := uint(logLimitHigh * lookback) for _, ub := range b.upkeepBuffers { - ub.setConfig(logLimitHigh) + ub.setConfig(cap) } } @@ -85,17 +86,18 @@ func (b *logBuffer) SetConfig(lookback, logLimitHigh int) { func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf, ok := b.getUpkeepBuffer(uid) if !ok || buf == nil { - buf = newUpkeepLogBuffer(b.lggr, uid, int(b.maxUpkeepLogs.Load()*b.bufferSize.Load())) + buf = newUpkeepLogBuffer(b.lggr, uid, int(b.logLimitHigh.Load()*b.bufferSize.Load())) b.setUpkeepBuffer(uid, buf) } - lastBlockSeen := latestBlockNumber(logs...) - if b.lastBlockSeen.Load() < lastBlockSeen { - b.lastBlockSeen.Store(lastBlockSeen) + latestBlock := latestBlockNumber(logs...) + if b.lastBlockSeen.Load() < latestBlock { + b.lastBlockSeen.Store(latestBlock) } blockThreshold := b.lastBlockSeen.Load() - int64(b.bufferSize.Load()) if blockThreshold <= 0 { blockThreshold = 1 } + buf.lggr.Debugw("Enqueuing logs", "blockThreshold", blockThreshold, "logsLatestBlock", latestBlock, "lastBlockSeen", b.lastBlockSeen.Load(), "logs", len(logs), "upkeepID", uid.String(), "upkeepBufferSize", buf.size(), "upkeepBufferCap", buf.cap.Load()) return buf.enqueue(blockThreshold, logs...) } @@ -166,28 +168,28 @@ func (b *logBuffer) setUpkeepBuffer(uid *big.Int, buf *upkeepLogBuffer) { type upkeepLogBuffer struct { lggr logger.Logger - id *big.Int - maxLogs *atomic.Int32 + id *big.Int + cap *atomic.Int32 q []logpoller.Log visited map[string]int64 lock sync.RWMutex } -func newUpkeepLogBuffer(lggr logger.Logger, id *big.Int, maxLogs int) *upkeepLogBuffer { - limit := new(atomic.Int32) - limit.Add(int32(maxLogs)) +func newUpkeepLogBuffer(lggr logger.Logger, id *big.Int, capacity int) *upkeepLogBuffer { + cap := new(atomic.Int32) + cap.Add(int32(capacity)) return &upkeepLogBuffer{ - lggr: lggr.With("id", id.String()), + lggr: lggr.With("upkeepID", id.String()), id: id, - maxLogs: limit, - q: make([]logpoller.Log, 0, maxLogs), + cap: cap, + q: make([]logpoller.Log, 0, capacity), visited: make(map[string]int64), } } -func (ub *upkeepLogBuffer) setConfig(maxLogs int) { - ub.maxLogs.Store(int32(maxLogs)) +func (ub *upkeepLogBuffer) setConfig(capacity uint) { + ub.cap.Store(int32(capacity)) } // size returns the total number of logs in the buffer. @@ -238,9 +240,9 @@ func (ub *upkeepLogBuffer) dequeue(start, end int64, limit int) ([]logpoller.Log if len(results) > 0 { ub.q = updatedLogs + ub.lggr.Debugw("Dequeued logs", "start", start, "end", end, "limit", limit, "results", len(results), "remaining", remaining) } - ub.lggr.Debugf("Dequeued %d logs, remaining %d", len(results), remaining) prommetrics.AutomationLogsInLogBuffer.Sub(float64(len(results))) return results, remaining @@ -257,12 +259,12 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller. var added int for _, log := range logsToAdd { if log.BlockNumber < blockThreshold { - ub.lggr.Debugw("Skipping log from old block", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber) + ub.lggr.Debugw("Skipping log from old block", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex) continue } logid := logID(log) if _, ok := ub.visited[logid]; ok { - ub.lggr.Debugw("Skipping known log", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber) + ub.lggr.Debugw("Skipping known log", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex) continue } added++ @@ -289,9 +291,9 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller. var dropped int if added > 0 { dropped = ub.clean(blockThreshold) + ub.lggr.Debugw("Enqueued logs", "added", added, "dropped", dropped, "blockThreshold", blockThreshold, "q size", len(ub.q), "maxLogs", ub.cap.Load(), "visited size", len(ub.visited)) } - ub.lggr.Debugf("Enqueued %d logs, dropped %d with blockThreshold %d", added, dropped, blockThreshold) prommetrics.AutomationLogsInLogBuffer.Add(float64(added)) return added, dropped @@ -300,15 +302,15 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller. // clean removes logs that are older than blockThreshold and drops logs if the limit for the // given upkeep was exceeded. Returns the number of logs that were dropped. func (ub *upkeepLogBuffer) clean(blockThreshold int64) int { - maxLogs := int(ub.maxLogs.Load()) + maxLogs := int(ub.cap.Load()) sort.SliceStable(ub.q, func(i, j int) bool { return LogSorter(ub.q[i], ub.q[j]) }) updated := make([]logpoller.Log, 0) - var dropped int + var dropped, expired int for _, l := range ub.q { - if l.BlockNumber > blockThreshold { + if l.BlockNumber >= blockThreshold { if len(updated) < maxLogs { updated = append(updated, l) } else { @@ -321,14 +323,16 @@ func (ub *upkeepLogBuffer) clean(blockThreshold int64) int { } else { prommetrics.AutomationLogsInLogBuffer.Dec() // old logs are ignored and removed from visited - ub.lggr.Debugw("Dropping old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex) + ub.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex) logid := logID(l) delete(ub.visited, logid) + expired++ } } - ub.lggr.Debugw("Cleaned logs", "dropped", dropped, "blockThreshold", blockThreshold, "len updated", len(updated), "len ub.q", len(ub.q), "maxLogs", maxLogs) - + if dropped > 0 || expired > 0 { + ub.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len ub.q", len(ub.q), "maxLogs", maxLogs) + } ub.q = updated for lid, block := range ub.visited { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go index b3ec9a3d3bd..90bf871471e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go @@ -87,6 +87,6 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) { o.LogLimitLow = 4 } if o.LogLimitHigh == 0 { - o.LogLimitHigh = o.LogLimitHigh * 10 + o.LogLimitHigh = o.LogLimitLow * 10 } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 9650003316a..688e5aab028 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -112,7 +112,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), - bufferV1: NewLogBuffer(lggr, int(opts.LookbackBlocks), int(opts.LogLimitHigh)), + bufferV1: NewLogBuffer(lggr, uint(opts.LookbackBlocks), uint(opts.LogLimitHigh)), poller: poller, opts: opts, filterStore: filterStore,