Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Jun 25, 2024
1 parent 23b6e5a commit 9aeb99c
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 2,239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogBuffer interface {
// It also accepts a function to select upkeeps.
// Returns logs (associated to upkeeps) and the number of remaining
// logs in that window for the involved upkeeps.
Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int)
Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int)
// SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep.
SetConfig(lookback, blockRate, logLimit uint32)
// NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer.
Expand Down Expand Up @@ -162,11 +162,10 @@ func (b *logBuffer) evictReorgdLogs(reorgBlocks map[int64]bool) {

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int) {
func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int) {
b.lock.RLock()
defer b.lock.RUnlock()

start, end := getBlockWindow(block, blockRate)
return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector, bestEffort)
}

Expand Down Expand Up @@ -263,6 +262,7 @@ func (b *logBuffer) getUpkeepQueue(uid *big.Int) (*upkeepLogQueue, bool) {
func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) {
if _, ok := b.queues[uid.String()]; !ok {
b.queueIDs = append(b.queueIDs, uid.String())
sort.Slice(b.queueIDs, func(i, j int) bool { return b.queueIDs[i] < b.queueIDs[j] })
}
b.queues[uid.String()] = buf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"])
assert.True(t, true, logReceived)
})

Expand Down Expand Up @@ -134,9 +132,6 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"])
assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"])
assert.True(t, true, logReceived)
})
}
Expand Down Expand Up @@ -519,13 +514,13 @@ func TestLogEventBufferV1_BlockWindow(t *testing.T) {

type dequeueArgs struct {
block int64
blockRate int
blockRate int64
upkeepLimit int
maxResults int
upkeepSelector func(id *big.Int) bool
}

func newDequeueArgs(block int64, blockRate int, upkeepLimit int, maxResults int, upkeepSelector func(id *big.Int) bool) dequeueArgs {
func newDequeueArgs(block int64, blockRate int64, upkeepLimit int, maxResults int, upkeepSelector func(id *big.Int) bool) dequeueArgs {
args := dequeueArgs{
block: block,
blockRate: blockRate,
Expand Down Expand Up @@ -561,107 +556,3 @@ func createDummyLogSequence(n, startIndex int, block int64, tx common.Hash) []lo
}
return logs
}

func Test_trackBlockNumbersForUpkeep(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1)

logBuffer := buf.(*logBuffer)

for _, tc := range []struct {
uid *big.Int
uniqueBlocks map[int64]bool
wantEnqueuedBlocks map[int64]map[string]int
}{
{
uid: big.NewInt(1),
uniqueBlocks: map[int64]bool{
1: true,
2: true,
3: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
},
2: {
"1": 1,
},
3: {
"1": 1,
},
},
},
{
uid: big.NewInt(2),
uniqueBlocks: map[int64]bool{
1: true,
2: true,
3: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
"2": 1,
},
2: {
"1": 1,
"2": 1,
},
3: {
"1": 1,
"2": 1,
},
},
},
{
uid: big.NewInt(2),
uniqueBlocks: map[int64]bool{
3: true,
4: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
"2": 1,
},
2: {
"1": 1,
"2": 1,
},
3: {
"1": 1,
"2": 2,
},
4: {
"2": 1,
},
},
},
{
uniqueBlocks: map[int64]bool{
3: true,
4: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
"2": 1,
},
2: {
"1": 1,
"2": 1,
},
3: {
"1": 1,
"2": 2,
},
4: {
"2": 1,
},
},
},
} {
logBuffer.trackBlockNumbersForUpkeep(tc.uid, tc.uniqueBlocks)
assert.Equal(t, tc.wantEnqueuedBlocks, logBuffer.enqueuedBlocks)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,56 +289,64 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
}

startBlock := start
bestEffort := false

switch p.opts.BufferVersion {
case BufferVersionV1:
blockRate, logLimitLow, maxResults, numOfUpkeeps := p.getBufferDequeueArgs()

iterationStep := 1

if p.iterations == p.currentIteration {
p.currentIteration = 0
p.iterations = int(math.Ceil(float64(numOfUpkeeps*logLimitLow) / float64(maxResults)))
if p.iterations == 0 {
p.iterations = 1
}
}

// min commitment pass
for len(payloads) < maxResults && start <= latestBlock {
upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
}

upkeepLimit := logLimitLow
if !bestEffort {
upkeepSelectorFn = DefaultUpkeepSelector
upkeepLimit = int(p.opts.LogLimit)
iterationStep = 0
}
startWindow, end := getBlockWindow(start, blockRate)

logs, remaining := p.bufferV1.Dequeue(start, blockRate, upkeepLimit, maxResults-len(payloads), upkeepSelectorFn, bestEffort)
logs, remaining := p.bufferV1.Dequeue(startWindow, end, int(p.opts.LogLimit), maxResults-len(payloads), DefaultUpkeepSelector, false)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs))
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
}
}
if remaining > 0 {
p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining)
// TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times
continue
}

start += int64(blockRate)
if start >= latestBlock && !bestEffort {
bestEffort = true
start = startBlock
}

if len(payloads) < maxResults {
if p.iterations == p.currentIteration {
p.currentIteration = 0
p.iterations = int(math.Ceil(float64(numOfUpkeeps*logLimitLow) / float64(maxResults)))
if p.iterations == 0 {
p.iterations = 1
}
}

start = startBlock

// best effort pass
for len(payloads) < maxResults && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
}

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn, true)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
}
}

start += int64(blockRate)
}

p.currentIteration += 1

Check failure on line 348 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go

View workflow job for this annotation

GitHub Actions / lint

increment-decrement: should replace p.currentIteration += 1 with p.currentIteration++ (revive)
}
p.currentIteration += iterationStep
default:
logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads)
for _, l := range logs {
Expand Down
Loading

0 comments on commit 9aeb99c

Please sign in to comment.