Skip to content

Commit

Permalink
- Add log limit to buffer opts
Browse files Browse the repository at this point in the history
- Use a default log limit of 1
- Simplify dequeue interface
  • Loading branch information
ferglor committed Jul 4, 2024
1 parent c735f74 commit 273f4df
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type LogBuffer interface {
// It also accepts a boolean to identify if we are operating under minimum dequeue.
// Returns logs (associated to upkeeps) and the number of remaining
// logs in that window for the involved upkeeps.
Dequeue(start, end int64, upkeepLimit, maxResults int, minimumDequeue bool) ([]BufferedLog, int)
Dequeue(startWindowBlock int64, maxResults int, minimumDequeue bool) ([]BufferedLog, int)
// SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep.
SetConfig(lookback, blockRate, logLimit uint32)
// NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer.
Expand All @@ -41,13 +41,16 @@ type logBufferOptions struct {
blockRate *atomic.Uint32
// max number of logs to keep in the buffer for each upkeep per window (LogLimit*10)
windowLimit *atomic.Uint32
// number of logs we need to dequeue per upkeep per block window at a minimum
logLimit *atomic.Uint32
}

func newLogBufferOptions(lookback, blockRate, logLimit uint32) *logBufferOptions {
opts := &logBufferOptions{
windowLimit: new(atomic.Uint32),
lookback: new(atomic.Uint32),
blockRate: new(atomic.Uint32),
logLimit: new(atomic.Uint32),
}
opts.override(lookback, blockRate, logLimit)

Expand All @@ -58,6 +61,7 @@ func (o *logBufferOptions) override(lookback, blockRate, logLimit uint32) {
o.windowLimit.Store(logLimit * 10)
o.lookback.Store(lookback)
o.blockRate.Store(blockRate)
o.logLimit.Store(logLimit)
}

type logBuffer struct {
Expand Down Expand Up @@ -107,6 +111,7 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
}

blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load())
blockThreshold, _ = getBlockWindow(blockThreshold, int(b.opts.blockRate.Load()))
if blockThreshold <= 0 {
blockThreshold = 1
}
Expand Down Expand Up @@ -153,11 +158,11 @@ func (b *logBuffer) evictReorgdLogs(reorgBlocks map[int64]bool) {

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, bestEffort bool) ([]BufferedLog, int) {
func (b *logBuffer) Dequeue(startWindowBlock int64, maxResults int, bestEffort bool) ([]BufferedLog, int) {
b.lock.RLock()
defer b.lock.RUnlock()

return b.dequeue(start, end, upkeepLimit, maxResults, bestEffort)
return b.dequeue(startWindowBlock, maxResults, bestEffort)
}

// dequeue pulls logs from the buffers, in block range [start,end] with minimum number
Expand All @@ -166,14 +171,22 @@ func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, bestE
// of logs have been dequeued for that upkeep.
// Returns logs and the number of remaining logs in the buffer for the given range and selector.
// NOTE: this method is not thread safe and should be called within a lock.
func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, minimumDequeue bool) ([]BufferedLog, int) {
func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]BufferedLog, int) {
var result []BufferedLog
var remainingLogs int
minimumDequeueMet := 0

logLimit := int(b.opts.logLimit.Load())
end := start + int64(b.opts.blockRate.Load())

if !minimumDequeue {
logLimit = capacity
}

for _, qid := range b.queueIDs {
q := b.queues[qid]

if minimumDequeue && q.dequeued[start] >= upkeepLimit {
if minimumDequeue && q.dequeued[start] >= logLimit {
// if we have already dequeued the minimum commitment for this window, skip it
minimumDequeueMet++
continue
Expand All @@ -189,12 +202,20 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, minimum
remainingLogs += logsInRange
continue
}
if upkeepLimit > capacity {
if logLimit > capacity {
// adjust limit if it is higher than the actual capacity
upkeepLimit = capacity
logLimit = capacity
}

var logs []logpoller.Log
remaining := 0

if minimumDequeue {
logs, remaining = q.dequeue(start, end, logLimit-q.dequeued[start])
} else {
logs, remaining = q.dequeue(start, end, logLimit)
}

logs, remaining := q.dequeue(start, end, upkeepLimit)
for _, l := range logs {
result = append(result, BufferedLog{ID: q.id, Log: l})
capacity--
Expand Down Expand Up @@ -504,7 +525,7 @@ func (q *upkeepLogQueue) clean(blockThreshold int64) int {
// NOTE: this method is not thread safe and should be called within a lock.
func (q *upkeepLogQueue) cleanStates(blockThreshold int64) {
for lid, s := range q.states {
if s.block <= blockThreshold {
if s.block < blockThreshold {
delete(q.states, lid)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ func TestLogEventBufferV1(t *testing.T) {
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 2},
)
results, remaining := buf.Dequeue(int64(1), 10, 1, 2, true)
results, remaining := buf.Dequeue(int64(1), 2, true)
require.Equal(t, 2, len(results))
require.Equal(t, 2, remaining)
require.True(t, results[0].ID.Cmp(results[1].ID) != 0)
results, remaining = buf.Dequeue(int64(1), 10, 1, 2, true)
results, remaining = buf.Dequeue(int64(1), 2, true)
require.Equal(t, 0, len(results))
require.Equal(t, 0, remaining)
}
Expand Down Expand Up @@ -215,9 +215,9 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) {
added, dropped := buf.Enqueue(id, logs...)
require.Equal(t, len(logs), added+dropped)
}
start, end := getBlockWindow(tc.args.block, tc.args.blockRate)
start, _ := getBlockWindow(tc.args.block, tc.args.blockRate)

results, remaining := buf.Dequeue(start, end, tc.args.upkeepLimit, tc.args.maxResults, true)
results, remaining := buf.Dequeue(start, tc.args.maxResults, true)
require.Equal(t, len(tc.results), len(results))
require.Equal(t, tc.remaining, remaining)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ const (
BufferVersionV1 BufferVersion = "v1"
)

var (
// RPC nodes can provide logs as far back as 128 blocks
lookbackBlocks = int64(128)
)

func NewOptions(finalityDepth int64, chainID *big.Int) LogTriggersOptions {
opts := new(LogTriggersOptions)
opts.chainID = chainID
Expand All @@ -68,6 +63,7 @@ func NewOptions(finalityDepth int64, chainID *big.Int) LogTriggersOptions {
// NOTE: o.LookbackBlocks should be set only from within tests
func (o *LogTriggersOptions) Defaults(finalityDepth int64) {
if o.LookbackBlocks == 0 {
lookbackBlocks := int64(100)
if lookbackBlocks < finalityDepth {
lookbackBlocks = finalityDepth
}
Expand Down Expand Up @@ -98,11 +94,13 @@ func (o *LogTriggersOptions) defaultBlockRate() uint32 {

func (o *LogTriggersOptions) defaultLogLimit() uint32 {
switch o.chainID.Int64() {
case 42161, 421613, 421614: // Arbitrum
return 2
case 1, 4, 5, 42, 11155111: // Eth
return 20
case 10, 420, 56, 97, 137, 80001, 43113, 43114, 8453, 84531: // Optimism, BSC, Polygon, Avax, Base
return 5
default:
return 2
return 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,13 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
start = 1
}

startBlock := start

switch p.opts.BufferVersion {
case BufferVersionV1:
payloads = p.minimumCommitmentDequeue(latestBlock, start)

// if we have remaining capacity following minimum commitment dequeue, perform a best effort dequeue
if len(payloads) < MaxPayloads {
payloads = p.bestEffortDequeue(latestBlock, startBlock, payloads)
payloads = p.bestEffortDequeue(latestBlock, start, payloads)
}
default:
logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads)
Expand All @@ -289,10 +287,10 @@ func (p *logEventProvider) minimumCommitmentDequeue(latestBlock, start int64) []
blockRate := int(p.opts.BlockRate)

for len(payloads) < MaxPayloads && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)
startWindow, _ := getBlockWindow(start, blockRate)

// dequeue the minimum number logs (log limit, varies by chain) per upkeep for this block window
logs, remaining := p.bufferV1.Dequeue(startWindow, end, int(p.opts.LogLimit), MaxPayloads-len(payloads), true)
logs, remaining := p.bufferV1.Dequeue(startWindow, MaxPayloads-len(payloads), true)
if len(logs) > 0 {
p.lggr.Debugw("minimum commitment dequeue", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
Expand All @@ -315,10 +313,10 @@ func (p *logEventProvider) bestEffortDequeue(latestBlock, start int64, payloads
blockRate := int(p.opts.BlockRate)

for len(payloads) < MaxPayloads && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)
startWindow, _ := getBlockWindow(start, blockRate)

// dequeue as many logs as we can, based on remaining capacity, for this block window
logs, remaining := p.bufferV1.Dequeue(startWindow, end, MaxPayloads-len(payloads), MaxPayloads-len(payloads), false)
logs, remaining := p.bufferV1.Dequeue(startWindow, MaxPayloads-len(payloads), false)
if len(logs) > 0 {
p.lggr.Debugw("best effort dequeue", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
Expand Down

0 comments on commit 273f4df

Please sign in to comment.