Skip to content

Commit

Permalink
Refactor the dequeue loop into two functions
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Jun 25, 2024
1 parent 9aeb99c commit ea00fb7
Showing 1 changed file with 54 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,67 +294,78 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
case BufferVersionV1:
blockRate, logLimitLow, maxResults, numOfUpkeeps := p.getBufferDequeueArgs()

// min commitment pass
for len(payloads) < maxResults && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)
payloads = p.minimumCommitmentDequeue(latestBlock, maxResults, start, blockRate)

logs, remaining := p.bufferV1.Dequeue(startWindow, end, int(p.opts.LogLimit), maxResults-len(payloads), DefaultUpkeepSelector, false)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
}
if len(payloads) < maxResults {
payloads = p.bestEffortDequeue(latestBlock, numOfUpkeeps, logLimitLow, maxResults, startBlock, payloads, blockRate)
}
default:
logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads)
for _, l := range logs {
payload, err := p.createPayload(l.upkeepID, l.log)
if err == nil {
payloads = append(payloads, payload)
}
}
}

return payloads
}

start += int64(blockRate)
func (p *logEventProvider) bestEffortDequeue(latestBlock int64, numOfUpkeeps int, logLimitLow int, maxResults int, start int64, payloads []ocr2keepers.UpkeepPayload, blockRate int) []ocr2keepers.UpkeepPayload {
if p.iterations == p.currentIteration {
p.currentIteration = 0
p.iterations = int(math.Ceil(float64(numOfUpkeeps*logLimitLow) / float64(maxResults)))
if p.iterations == 0 {
p.iterations = 1
}
}

if len(payloads) < maxResults {
if p.iterations == p.currentIteration {
p.currentIteration = 0
p.iterations = int(math.Ceil(float64(numOfUpkeeps*logLimitLow) / float64(maxResults)))
if p.iterations == 0 {
p.iterations = 1
}
// best effort pass
for len(payloads) < maxResults && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
}

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn, true)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
}
}

start = startBlock
start += int64(blockRate)
}

// best effort pass
for len(payloads) < maxResults && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)
p.currentIteration += 1

Check failure on line 346 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go

View workflow job for this annotation

GitHub Actions / lint

increment-decrement: should replace p.currentIteration += 1 with p.currentIteration++ (revive)

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
}
return payloads
}

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn, true)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
}
}
func (p *logEventProvider) minimumCommitmentDequeue(latestBlock int64, maxResults int, start int64, blockRate int) []ocr2keepers.UpkeepPayload {
var payloads []ocr2keepers.UpkeepPayload

start += int64(blockRate)
}
for len(payloads) < maxResults && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)

p.currentIteration += 1
logs, remaining := p.bufferV1.Dequeue(startWindow, end, int(p.opts.LogLimit), maxResults-len(payloads), DefaultUpkeepSelector, false)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
default:
logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads)
for _, l := range logs {
payload, err := p.createPayload(l.upkeepID, l.log)
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
}
}

start += int64(blockRate)
}

return payloads
Expand Down

0 comments on commit ea00fb7

Please sign in to comment.