Skip to content

Commit

Permalink
fixes & logs
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Mar 20, 2024
1 parent f530690 commit f72baf0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func DefaultUpkeepSelector(id *big.Int) bool {
type logBuffer struct {
lggr logger.Logger
// max number of logs to keep in the buffer for each upkeep per block
maxUpkeepLogs *atomic.Int32
logLimitHigh *atomic.Int32
// number of blocks to keep in the buffer
bufferSize *atomic.Int32
// last block number seen by the buffer
Expand All @@ -53,15 +53,15 @@ type logBuffer struct {
lock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, size, upkeepLogLimit int) LogBuffer {
s := new(atomic.Int32)
s.Add(int32(size))
l := new(atomic.Int32)
l.Add(int32(upkeepLogLimit))
func NewLogBuffer(lggr logger.Logger, lookback, logLimitHigh uint) LogBuffer {
bufferSize := new(atomic.Int32)
bufferSize.Add(int32(lookback))
limitHigh := new(atomic.Int32)
limitHigh.Add(int32(logLimitHigh))
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
maxUpkeepLogs: l,
bufferSize: s,
logLimitHigh: limitHigh,
bufferSize: bufferSize,
lastBlockSeen: new(atomic.Int64),
upkeepBuffers: make(map[string]*upkeepLogBuffer),
}
Expand All @@ -72,10 +72,11 @@ func (b *logBuffer) SetConfig(lookback, logLimitHigh int) {
defer b.lock.Unlock()

b.bufferSize.Store(int32(lookback))
b.maxUpkeepLogs.Store(int32(logLimitHigh))
b.logLimitHigh.Store(int32(logLimitHigh))

cap := uint(logLimitHigh * lookback)

Check failure on line 77 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go

View workflow job for this annotation

GitHub Actions / lint

redefines-builtin-id: redefinition of the built-in function cap (revive)
for _, ub := range b.upkeepBuffers {
ub.setConfig(logLimitHigh)
ub.setConfig(cap)
}
}

Expand All @@ -85,17 +86,18 @@ func (b *logBuffer) SetConfig(lookback, logLimitHigh int) {
func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
buf, ok := b.getUpkeepBuffer(uid)
if !ok || buf == nil {
buf = newUpkeepLogBuffer(b.lggr, uid, int(b.maxUpkeepLogs.Load()*b.bufferSize.Load()))
buf = newUpkeepLogBuffer(b.lggr, uid, int(b.logLimitHigh.Load()*b.bufferSize.Load()))
b.setUpkeepBuffer(uid, buf)
}
lastBlockSeen := latestBlockNumber(logs...)
if b.lastBlockSeen.Load() < lastBlockSeen {
b.lastBlockSeen.Store(lastBlockSeen)
latestBlock := latestBlockNumber(logs...)
if b.lastBlockSeen.Load() < latestBlock {
b.lastBlockSeen.Store(latestBlock)
}
blockThreshold := b.lastBlockSeen.Load() - int64(b.bufferSize.Load())
if blockThreshold <= 0 {
blockThreshold = 1
}
buf.lggr.Debugw("Enqueuing logs", "blockThreshold", blockThreshold, "logsLatestBlock", latestBlock, "lastBlockSeen", b.lastBlockSeen.Load(), "logs", len(logs), "upkeepID", uid.String(), "upkeepBufferSize", buf.size(), "upkeepBufferCap", buf.cap.Load())
return buf.enqueue(blockThreshold, logs...)
}

Expand Down Expand Up @@ -166,28 +168,28 @@ func (b *logBuffer) setUpkeepBuffer(uid *big.Int, buf *upkeepLogBuffer) {
type upkeepLogBuffer struct {
lggr logger.Logger

id *big.Int
maxLogs *atomic.Int32
id *big.Int
cap *atomic.Int32

q []logpoller.Log
visited map[string]int64
lock sync.RWMutex
}

func newUpkeepLogBuffer(lggr logger.Logger, id *big.Int, maxLogs int) *upkeepLogBuffer {
limit := new(atomic.Int32)
limit.Add(int32(maxLogs))
func newUpkeepLogBuffer(lggr logger.Logger, id *big.Int, capacity int) *upkeepLogBuffer {
cap := new(atomic.Int32)

Check failure on line 180 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go

View workflow job for this annotation

GitHub Actions / lint

redefines-builtin-id: redefinition of the built-in function cap (revive)
cap.Add(int32(capacity))
return &upkeepLogBuffer{
lggr: lggr.With("id", id.String()),
lggr: lggr.With("upkeepID", id.String()),
id: id,
maxLogs: limit,
q: make([]logpoller.Log, 0, maxLogs),
cap: cap,
q: make([]logpoller.Log, 0, capacity),
visited: make(map[string]int64),
}
}

func (ub *upkeepLogBuffer) setConfig(maxLogs int) {
ub.maxLogs.Store(int32(maxLogs))
func (ub *upkeepLogBuffer) setConfig(capacity uint) {
ub.cap.Store(int32(capacity))
}

// size returns the total number of logs in the buffer.
Expand Down Expand Up @@ -238,9 +240,9 @@ func (ub *upkeepLogBuffer) dequeue(start, end int64, limit int) ([]logpoller.Log

if len(results) > 0 {
ub.q = updatedLogs
ub.lggr.Debugw("Dequeued logs", "start", start, "end", end, "limit", limit, "results", len(results), "remaining", remaining)
}

ub.lggr.Debugf("Dequeued %d logs, remaining %d", len(results), remaining)
prommetrics.AutomationLogsInLogBuffer.Sub(float64(len(results)))

return results, remaining
Expand All @@ -257,12 +259,12 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller.
var added int
for _, log := range logsToAdd {
if log.BlockNumber < blockThreshold {
ub.lggr.Debugw("Skipping log from old block", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber)
ub.lggr.Debugw("Skipping log from old block", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex)
continue
}
logid := logID(log)
if _, ok := ub.visited[logid]; ok {
ub.lggr.Debugw("Skipping known log", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber)
ub.lggr.Debugw("Skipping known log", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex)
continue
}
added++
Expand All @@ -289,9 +291,9 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller.
var dropped int
if added > 0 {
dropped = ub.clean(blockThreshold)
ub.lggr.Debugw("Enqueued logs", "added", added, "dropped", dropped, "blockThreshold", blockThreshold, "q size", len(ub.q), "maxLogs", ub.cap.Load(), "visited size", len(ub.visited))
}

ub.lggr.Debugf("Enqueued %d logs, dropped %d with blockThreshold %d", added, dropped, blockThreshold)
prommetrics.AutomationLogsInLogBuffer.Add(float64(added))

return added, dropped
Expand All @@ -300,15 +302,15 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller.
// clean removes logs that are older than blockThreshold and drops logs if the limit for the
// given upkeep was exceeded. Returns the number of logs that were dropped.
func (ub *upkeepLogBuffer) clean(blockThreshold int64) int {
maxLogs := int(ub.maxLogs.Load())
maxLogs := int(ub.cap.Load())

sort.SliceStable(ub.q, func(i, j int) bool {
return LogSorter(ub.q[i], ub.q[j])
})
updated := make([]logpoller.Log, 0)
var dropped int
var dropped, expired int
for _, l := range ub.q {
if l.BlockNumber > blockThreshold {
if l.BlockNumber >= blockThreshold {
if len(updated) < maxLogs {
updated = append(updated, l)
} else {
Expand All @@ -321,14 +323,16 @@ func (ub *upkeepLogBuffer) clean(blockThreshold int64) int {
} else {
prommetrics.AutomationLogsInLogBuffer.Dec()
// old logs are ignored and removed from visited
ub.lggr.Debugw("Dropping old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
ub.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
logid := logID(l)
delete(ub.visited, logid)
expired++
}
}

ub.lggr.Debugw("Cleaned logs", "dropped", dropped, "blockThreshold", blockThreshold, "len updated", len(updated), "len ub.q", len(ub.q), "maxLogs", maxLogs)

if dropped > 0 || expired > 0 {
ub.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len ub.q", len(ub.q), "maxLogs", maxLogs)
}
ub.q = updated

for lid, block := range ub.visited {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) {
o.LogLimitLow = 4
}
if o.LogLimitHigh == 0 {
o.LogLimitHigh = o.LogLimitHigh * 10
o.LogLimitHigh = o.LogLimitLow * 10
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
bufferV1: NewLogBuffer(lggr, int(opts.LookbackBlocks), int(opts.LogLimitHigh)),
bufferV1: NewLogBuffer(lggr, uint(opts.LookbackBlocks), uint(opts.LogLimitHigh)),
poller: poller,
opts: opts,
filterStore: filterStore,
Expand Down

0 comments on commit f72baf0

Please sign in to comment.