diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index 048396e9051..e65306a1203 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -26,7 +26,7 @@ type LogBuffer interface { // It also accepts a function to select upkeeps. // Returns logs (associated to upkeeps) and the number of remaining // logs in that window for the involved upkeeps. - Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int) + Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int) // SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep. SetConfig(lookback, blockRate, logLimit uint32) // NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer. @@ -162,11 +162,10 @@ func (b *logBuffer) evictReorgdLogs(reorgBlocks map[int64]bool) { // Dequeue greedly pulls logs from the buffers. // Returns logs and the number of remaining logs in the buffer. -func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int) { +func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool, bestEffort bool) ([]BufferedLog, int) { b.lock.RLock() defer b.lock.RUnlock() - start, end := getBlockWindow(block, blockRate) return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector, bestEffort) } @@ -263,6 +262,7 @@ func (b *logBuffer) getUpkeepQueue(uid *big.Int) (*upkeepLogQueue, bool) { func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) { if _, ok := b.queues[uid.String()]; !ok { b.queueIDs = append(b.queueIDs, uid.String()) + sort.Slice(b.queueIDs, func(i, j int) bool { return b.queueIDs[i] < b.queueIDs[j] }) } b.queues[uid.String()] = buf } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 137639e45f7..35dd46a4b77 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -97,8 +97,6 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) { logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0}, ) - assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"]) assert.True(t, true, logReceived) }) @@ -134,9 +132,6 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) { logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0}, ) - assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"]) - assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"]) assert.True(t, true, logReceived) }) } @@ -519,13 +514,13 @@ func TestLogEventBufferV1_BlockWindow(t *testing.T) { type dequeueArgs struct { block int64 - blockRate int + blockRate int64 upkeepLimit int maxResults int upkeepSelector func(id *big.Int) bool } -func newDequeueArgs(block int64, blockRate int, upkeepLimit int, maxResults int, upkeepSelector func(id *big.Int) bool) dequeueArgs { +func newDequeueArgs(block int64, blockRate int64, upkeepLimit int, maxResults int, upkeepSelector func(id *big.Int) bool) dequeueArgs { args := dequeueArgs{ block: block, blockRate: blockRate, @@ -561,107 +556,3 @@ func createDummyLogSequence(n, startIndex int, block int64, tx common.Hash) []lo } return logs } - -func Test_trackBlockNumbersForUpkeep(t *testing.T) { - buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1) - - logBuffer := buf.(*logBuffer) - - for _, tc := range []struct { - uid *big.Int - uniqueBlocks map[int64]bool - wantEnqueuedBlocks map[int64]map[string]int - }{ - { - uid: big.NewInt(1), - uniqueBlocks: map[int64]bool{ - 1: true, - 2: true, - 3: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - }, - 2: { - "1": 1, - }, - 3: { - "1": 1, - }, - }, - }, - { - uid: big.NewInt(2), - uniqueBlocks: map[int64]bool{ - 1: true, - 2: true, - 3: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - "2": 1, - }, - 2: { - "1": 1, - "2": 1, - }, - 3: { - "1": 1, - "2": 1, - }, - }, - }, - { - uid: big.NewInt(2), - uniqueBlocks: map[int64]bool{ - 3: true, - 4: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - "2": 1, - }, - 2: { - "1": 1, - "2": 1, - }, - 3: { - "1": 1, - "2": 2, - }, - 4: { - "2": 1, - }, - }, - }, - { - uniqueBlocks: map[int64]bool{ - 3: true, - 4: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - "2": 1, - }, - 2: { - "1": 1, - "2": 1, - }, - 3: { - "1": 1, - "2": 2, - }, - 4: { - "2": 1, - }, - }, - }, - } { - logBuffer.trackBlockNumbersForUpkeep(tc.uid, tc.uniqueBlocks) - assert.Equal(t, tc.wantEnqueuedBlocks, logBuffer.enqueuedBlocks) - } -} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 6dc63045678..c21c0ac75f7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -289,37 +289,18 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up } startBlock := start - bestEffort := false switch p.opts.BufferVersion { case BufferVersionV1: blockRate, logLimitLow, maxResults, numOfUpkeeps := p.getBufferDequeueArgs() - iterationStep := 1 - - if p.iterations == p.currentIteration { - p.currentIteration = 0 - p.iterations = int(math.Ceil(float64(numOfUpkeeps*logLimitLow) / float64(maxResults))) - if p.iterations == 0 { - p.iterations = 1 - } - } - + // min commitment pass for len(payloads) < maxResults && start <= latestBlock { - upkeepSelectorFn := func(id *big.Int) bool { - return id.Int64()%int64(p.iterations) == int64(p.currentIteration) - } - - upkeepLimit := logLimitLow - if !bestEffort { - upkeepSelectorFn = DefaultUpkeepSelector - upkeepLimit = int(p.opts.LogLimit) - iterationStep = 0 - } + startWindow, end := getBlockWindow(start, blockRate) - logs, remaining := p.bufferV1.Dequeue(start, blockRate, upkeepLimit, maxResults-len(payloads), upkeepSelectorFn, bestEffort) + logs, remaining := p.bufferV1.Dequeue(startWindow, end, int(p.opts.LogLimit), maxResults-len(payloads), DefaultUpkeepSelector, false) if len(logs) > 0 { - p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs)) + p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining) } for _, l := range logs { payload, err := p.createPayload(l.ID, l.Log) @@ -327,18 +308,45 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up payloads = append(payloads, payload) } } - if remaining > 0 { - p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining) - // TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times - continue - } + start += int64(blockRate) - if start >= latestBlock && !bestEffort { - bestEffort = true - start = startBlock + } + + if len(payloads) < maxResults { + if p.iterations == p.currentIteration { + p.currentIteration = 0 + p.iterations = int(math.Ceil(float64(numOfUpkeeps*logLimitLow) / float64(maxResults))) + if p.iterations == 0 { + p.iterations = 1 + } } + + start = startBlock + + // best effort pass + for len(payloads) < maxResults && start <= latestBlock { + startWindow, end := getBlockWindow(start, blockRate) + + upkeepSelectorFn := func(id *big.Int) bool { + return id.Int64()%int64(p.iterations) == int64(p.currentIteration) + } + + logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn, true) + if len(logs) > 0 { + p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining) + } + for _, l := range logs { + payload, err := p.createPayload(l.ID, l.Log) + if err == nil { + payloads = append(payloads, payload) + } + } + + start += int64(blockRate) + } + + p.currentIteration += 1 } - p.currentIteration += iterationStep default: logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads) for _, l := range logs { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go index a327e5ac2b4..34dc882fc6c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go @@ -316,7 +316,7 @@ func newEntry(p *logEventProvider, i int, args ...string) (LogTriggerConfig, upk return cfg, f } -func countLogs(logs map[int64][]logpoller.Log) int { +func countRemainingLogs(logs map[int64][]logpoller.Log) int { count := 0 for _, logList := range logs { count += len(logList) @@ -324,2179 +324,338 @@ func countLogs(logs map[int64][]logpoller.Log) int { return count } -func TestLogEventProvider_GetLatestPayloads(t *testing.T) { - t.Run("5 upkeeps, 100 logs per upkeep per block for 100 blocks", func(t *testing.T) { - upkeepIDs := []*big.Int{ - big.NewInt(1), - big.NewInt(2), - big.NewInt(3), - big.NewInt(4), - big.NewInt(5), - } +func remainingBlockWindowCounts(queues map[string]*upkeepLogQueue, blockRate int) map[int64]int { + blockWindowCounts := map[int64]int{} - filterStore := NewUpkeepFilterStore() - - logGenerator := func(start, end int64) []logpoller.Log { - var res []logpoller.Log - for i := start; i < end; i++ { - for j := 0; j < 100; j++ { - res = append(res, logpoller.Log{ - LogIndex: int64(j), - BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), - BlockNumber: i + 1, - }) - } - } - return res + for _, q := range queues { + for blockNumber, logs := range q.logs { + start, _ := getBlockWindow(blockNumber, blockRate) + + blockWindowCounts[start] += len(logs) } + } + + return blockWindowCounts +} + +func TestLogEventProvider_GetLatestPayloads(t *testing.T) { + t.Run("dequeuing from an empty buffer returns 0 logs", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" - // use a log poller that will create logs for the queried block range logPoller := &mockLogPoller{ LatestBlockFn: func(ctx context.Context) (int64, error) { return 100, nil }, - LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - return logGenerator(start, end), nil - }, } - // prepare the filter store with upkeeps - for _, upkeepID := range upkeepIDs { - filterStore.AddActiveUpkeeps( - upkeepFilter{ - addr: []byte(upkeepID.String()), - upkeepID: upkeepID, - topics: []common.Hash{ - common.HexToHash(upkeepID.String()), - }, - }, - ) - } - - opts := NewOptions(200, big.NewInt(1)) - opts.BufferVersion = "v1" - - provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) ctx := context.Background() - err := provider.ReadLogs(ctx, upkeepIDs...) + payloads, err := provider.GetLatestPayloads(ctx) assert.NoError(t, err) + assert.Equal(t, 0, len(payloads)) + }) - assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + t.Run("a single log for a single upkeep gets dequeued", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() - bufV1 := provider.bufferV1.(*logBuffer) + buffer := provider.bufferV1 - // each upkeep should have 100 logs * 100 blocks = 10000 logs - assert.Equal(t, 10000, countLogs(bufV1.queues["1"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["2"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["3"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["4"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["5"].logs)) + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}) payloads, err := provider.GetLatestPayloads(ctx) assert.NoError(t, err) + assert.Equal(t, 1, len(payloads)) + }) - // we dequeue a maximum of 100 logs - assert.Equal(t, 100, len(payloads)) + t.Run("a log per upkeep for 4 upkeeps across 4 blocks (2 separate block windows) is dequeued, for a total of 4 payloads", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + } - // the dequeue is evenly distributed across the 5 upkeeps - assert.Equal(t, 9980, countLogs(bufV1.queues["1"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["2"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["3"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["4"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["5"].logs)) + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) - payloads, err = provider.GetLatestPayloads(ctx) - assert.NoError(t, err) + ctx := context.Background() - // we dequeue a maximum of 100 logs - assert.Equal(t, 100, len(payloads)) + buffer := provider.bufferV1 - // the dequeue is evenly distributed across the 5 upkeeps - assert.Equal(t, 9960, countLogs(bufV1.queues["1"].logs)) - assert.Equal(t, 9960, countLogs(bufV1.queues["2"].logs)) - assert.Equal(t, 9960, countLogs(bufV1.queues["3"].logs)) - assert.Equal(t, 9960, countLogs(bufV1.queues["4"].logs)) - assert.Equal(t, 9960, countLogs(bufV1.queues["5"].logs)) + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}) + buffer.Enqueue(big.NewInt(3), logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3"), LogIndex: 0}) + buffer.Enqueue(big.NewInt(4), logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 0}) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 4, len(payloads)) }) - t.Run("200 upkeeps", func(t *testing.T) { - var upkeepIDs []*big.Int + t.Run("100 logs are dequeued for a single upkeep, 1 log for every block window across 100 blocks followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" - for i := int64(1); i <= 200; i++ { - upkeepIDs = append(upkeepIDs, big.NewInt(i)) + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 101, nil + }, } - filterStore := NewUpkeepFilterStore() - - logGenerator := func(start, end int64) []logpoller.Log { - var res []logpoller.Log - for i := start; i < end; i++ { - for j := 0; j < 100; j++ { - res = append(res, logpoller.Log{ - LogIndex: int64(j), - BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), - BlockNumber: i + 1, - }) - } - } - return res + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + for i := 0; i < 100; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x%d", i+1)), LogIndex: 0}) } - // use a log poller that will create logs for the queried block range + assert.Equal(t, 100, countRemainingLogs(buffer.queues["1"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + assert.Equal(t, 0, countRemainingLogs(buffer.queues["1"].logs)) + }) + + t.Run("100 logs are dequeued for two upkeeps, 25 logs each as min commitment (50 logs total best effort), followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + logPoller := &mockLogPoller{ LatestBlockFn: func(ctx context.Context) (int64, error) { - return 100, nil - }, - LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - return logGenerator(start, end), nil + return 101, nil }, } - // prepare the filter store with upkeeps - for _, upkeepID := range upkeepIDs { - filterStore.AddActiveUpkeeps( - upkeepFilter{ - addr: []byte(upkeepID.String()), - upkeepID: upkeepID, - topics: []common.Hash{ - common.HexToHash(upkeepID.String()), - }, - }, - ) + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + for i := 0; i < 100; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x2%d", i+1)), LogIndex: 0}) } - opts := NewOptions(200, big.NewInt(1)) - opts.BufferVersion = "v1" + assert.Equal(t, 100, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 100, countRemainingLogs(buffer.queues["2"].logs)) - provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) - ctx := context.Background() + assert.Equal(t, 49, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 51, countRemainingLogs(buffer.queues["2"].logs)) + + windowCount := remainingBlockWindowCounts(buffer.queues, 4) - err := provider.ReadLogs(ctx, upkeepIDs...) + assert.Equal(t, 0, windowCount[0]) + assert.Equal(t, 6, windowCount[48]) + assert.Equal(t, 6, windowCount[96]) + + // the second dequeue call will retrieve the remaining 100 logs and exhaust the queues + payloads, err = provider.GetLatestPayloads(ctx) assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) - assert.Equal(t, 200, provider.bufferV1.NumOfUpkeeps()) + assert.Equal(t, 0, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 0, countRemainingLogs(buffer.queues["2"].logs)) - bufV1 := provider.bufferV1.(*logBuffer) + windowCount = remainingBlockWindowCounts(buffer.queues, 4) - blockWindowCounts := map[int64]int{} + assert.Equal(t, 0, windowCount[0]) + assert.Equal(t, 0, windowCount[48]) + assert.Equal(t, 0, windowCount[96]) + }) - for _, q := range bufV1.queues { - for blockNumber, logs := range q.logs { - blockWindowCounts[blockNumber] += len(logs) - } + t.Run("minimum guaranteed for all windows including an incomplete window followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 102, nil + }, } - assert.Equal(t, 20000, blockWindowCounts[1]) - assert.Equal(t, 20000, blockWindowCounts[2]) - assert.Equal(t, 20000, blockWindowCounts[3]) - assert.Equal(t, 20000, blockWindowCounts[100]) + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) - // each upkeep should have 100 logs * 100 blocks = 10000 logs - assert.Equal(t, 10000, countLogs(bufV1.queues["1"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["50"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["101"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["150"].logs)) + ctx := context.Background() - payloads, err := provider.GetLatestPayloads(ctx) - assert.NoError(t, err) + buffer := provider.bufferV1.(*logBuffer) + + for i := 0; i < 102; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x2%d", i+1)), LogIndex: 0}) + } - assert.Equal(t, 2, provider.iterations) - assert.Equal(t, 1, provider.currentIteration) + assert.Equal(t, 102, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 102, countRemainingLogs(buffer.queues["2"].logs)) - // we dequeue a maximum of 100 logs + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) assert.Equal(t, 100, len(payloads)) - blockWindowCounts = map[int64]int{} + windowCount := remainingBlockWindowCounts(buffer.queues, 4) - for _, q := range bufV1.queues { - for blockNumber, logs := range q.logs { - blockWindowCounts[blockNumber] += len(logs) - } - } + assert.Equal(t, 4, windowCount[100]) - assert.Equal(t, 19900, blockWindowCounts[1]) - assert.Equal(t, 20000, blockWindowCounts[2]) - assert.Equal(t, 20000, blockWindowCounts[3]) - assert.Equal(t, 20000, blockWindowCounts[100]) - - // the dequeue impacts the first 5 upkeeps - assert.Equal(t, 9980, countLogs(bufV1.queues["1"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["2"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["3"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["4"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["5"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["6"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["50"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["100"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["150"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["200"].logs)) + // upkeep 1 has had the minimum number of logs dequeued on the latest (incomplete) window + assert.Equal(t, 1, buffer.queues["1"].dequeued[100]) + // upkeep 2 has had the minimum number of logs dequeued on the latest (incomplete) window + assert.Equal(t, 1, buffer.queues["2"].dequeued[100]) + // the second dequeue call will retrieve the remaining 100 logs and exhaust the queues payloads, err = provider.GetLatestPayloads(ctx) assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) - assert.Equal(t, 2, provider.currentIteration) + assert.Equal(t, 2, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 2, countRemainingLogs(buffer.queues["2"].logs)) - // we dequeue a maximum of 100 logs - assert.Equal(t, 100, len(payloads)) + windowCount = remainingBlockWindowCounts(buffer.queues, 4) - // the dequeue impacts the next 5 upkeeps - assert.Equal(t, 9980, countLogs(bufV1.queues["1"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["2"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["3"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["4"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["5"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["6"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["7"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["8"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["9"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["10"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["50"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["100"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["150"].logs)) - assert.Equal(t, 10000, countLogs(bufV1.queues["200"].logs)) - - blockWindowCounts = map[int64]int{} - - for _, q := range bufV1.queues { - for blockNumber, logs := range q.logs { - blockWindowCounts[blockNumber] += len(logs) - } + assert.Equal(t, 0, windowCount[0]) + assert.Equal(t, 0, windowCount[28]) + assert.Equal(t, 0, windowCount[32]) + assert.Equal(t, 0, windowCount[36]) + assert.Equal(t, 0, windowCount[48]) + assert.Equal(t, 0, windowCount[96]) + assert.Equal(t, 4, windowCount[100]) + }) + + t.Run("min dequeue followed by best effort followed by reorg followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 101, nil + }, } - assert.Equal(t, 19800, blockWindowCounts[1]) - assert.Equal(t, 20000, blockWindowCounts[2]) - assert.Equal(t, 20000, blockWindowCounts[3]) - assert.Equal(t, 20000, blockWindowCounts[100]) + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) - payloads, err = provider.GetLatestPayloads(ctx) - assert.NoError(t, err) + ctx := context.Background() - assert.Equal(t, 1, provider.currentIteration) + buffer := provider.bufferV1.(*logBuffer) - // we dequeue a maximum of 100 logs + for i := 0; i < 100; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x2%d", i+1)), LogIndex: 0}) + } + + assert.Equal(t, 100, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 100, countRemainingLogs(buffer.queues["2"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) assert.Equal(t, 100, len(payloads)) - // the dequeue impacts the first 100 upkeeps - assert.Equal(t, 9980, countLogs(bufV1.queues["11"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["12"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["13"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["14"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["15"].logs)) + windowCount := remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 0, windowCount[28]) + // reorg block 28 + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(28), TxHash: common.HexToHash(fmt.Sprintf("0xreorg1%d", 28)), LogIndex: 0, BlockHash: common.BytesToHash([]byte("reorg"))}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(28), TxHash: common.HexToHash(fmt.Sprintf("0xreorg2%d", 28)), LogIndex: 0, BlockHash: common.BytesToHash([]byte("reorg"))}) + + windowCount = remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 2, windowCount[28]) + + // the second dequeue call will retrieve the remaining 100 logs and exhaust the queues payloads, err = provider.GetLatestPayloads(ctx) assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) - assert.Equal(t, 2, provider.currentIteration) + windowCount = remainingBlockWindowCounts(buffer.queues, 4) - // we dequeue a maximum of 100 logs - assert.Equal(t, 100, len(payloads)) + assert.Equal(t, 0, windowCount[0]) + assert.Equal(t, 0, windowCount[28]) + assert.Equal(t, 0, windowCount[32]) + assert.Equal(t, 0, windowCount[36]) + assert.Equal(t, 0, windowCount[48]) + assert.Equal(t, 2, windowCount[96]) // these 2 remaining logs are because of the 2 re orgd logs taking up dequeue space + }) - // the dequeue impacts the next 5 upkeeps - assert.Equal(t, 9980, countLogs(bufV1.queues["16"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["17"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["18"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["19"].logs)) - assert.Equal(t, 9980, countLogs(bufV1.queues["20"].logs)) + t.Run("sparsely populated blocks", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + } - blockWindowCounts = map[int64]int{} + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) - for _, q := range bufV1.queues { - for blockNumber, logs := range q.logs { - blockWindowCounts[blockNumber] += len(logs) + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + upkeepOmittedOnBlocks := map[int64][]int{ + 1: {5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100}, // upkeep 1 won't have logs on 20 blocks + 2: {2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100}, // upkeep 2 won't have logs on 50 blocks + 3: {3, 13, 23, 33, 43, 53, 63, 73, 83, 93}, // upkeep 3 won't appear on 10 blocks + 4: {1, 25, 50, 75, 100}, // upkeep 4 won't appear on 5 blocks + 5: {}, // upkeep 5 appears on all blocks + } + + for upkeep, skipBlocks := range upkeepOmittedOnBlocks { + blockLoop: + for i := 0; i < 100; i++ { + for _, block := range skipBlocks { + if block == i+1 { + continue blockLoop + } + } + buffer.Enqueue(big.NewInt(upkeep), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) } } - assert.Equal(t, 19600, blockWindowCounts[1]) - assert.Equal(t, 20000, blockWindowCounts[2]) - assert.Equal(t, 20000, blockWindowCounts[3]) - assert.Equal(t, 20000, blockWindowCounts[100]) - }) + assert.Equal(t, 80, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 50, countRemainingLogs(buffer.queues["2"].logs)) + assert.Equal(t, 90, countRemainingLogs(buffer.queues["3"].logs)) + assert.Equal(t, 95, countRemainingLogs(buffer.queues["4"].logs)) + assert.Equal(t, 100, countRemainingLogs(buffer.queues["5"].logs)) - //t.Run("200 upkeeps, increasing to 300 upkeeps midway through the test", func(t *testing.T) { - // var upkeepIDs []*big.Int - // - // for i := int64(1); i <= 200; i++ { - // upkeepIDs = append(upkeepIDs, big.NewInt(i)) - // } - // - // filterStore := NewUpkeepFilterStore() - // - // logGenerator := func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start; i < end; i++ { - // for j := 0; j < 100; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), - // BlockNumber: i + 1, - // }) - // } - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 100, nil - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 200, provider.bufferV1.NumOfUpkeeps()) - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // // each upkeep should have 100 logs * 100 blocks = 10000 logs - // assert.Equal(t, 10000, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["9"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["21"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["50"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["101"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["150"].logs)) - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 2, provider.iterations) - // assert.Equal(t, 1, provider.currentIteration) - // - // // we dequeue a maximum of 100 logs - // assert.Equal(t, 100, len(payloads)) - // - // // the dequeue is evenly distributed across selected upkeeps; with 2 iterations this means even upkeep IDs are dequeued first - // assert.Equal(t, 10000, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["40"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["45"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["50"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["101"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["150"].logs)) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 2, provider.currentIteration) - // - // // we dequeue a maximum of 100 logs - // assert.Equal(t, 100, len(payloads)) - // - // // the dequeue is evenly distributed across selected upkeeps; on the second iteration, odd upkeep IDs are dequeued - // assert.Equal(t, 9999, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["50"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["99"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["100"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["101"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["150"].logs)) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 1, provider.currentIteration) - // - // // we dequeue a maximum of 100 logs - // assert.Equal(t, 100, len(payloads)) - // - // // the dequeue is evenly distributed across selected upkeeps; on the third iteration, even upkeep IDs are dequeued once again - // assert.Equal(t, 9999, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["50"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["101"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["150"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["160"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["170"].logs)) - // - // for i := int64(201); i <= 300; i++ { - // upkeepIDs = append(upkeepIDs, big.NewInt(i)) - // } - // - // for i := 200; i < len(upkeepIDs); i++ { - // upkeepID := upkeepIDs[i] - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // err = provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 300, provider.bufferV1.NumOfUpkeeps()) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 2, provider.iterations) - // assert.Equal(t, 2, provider.currentIteration) - // - // // we dequeue a maximum of 100 logs - // assert.Equal(t, 100, len(payloads)) - // - // // the dequeue is evenly distributed across selected upkeeps; the new iterations - // // have not yet been recalculated despite the new logs being added; new iterations - // // are only calculated when current iteration maxes out at the total number of iterations - // assert.Equal(t, 9998, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["50"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["51"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["52"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["101"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["150"].logs)) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // with the newly added logs, iterations is recalculated - // assert.Equal(t, 3, provider.iterations) - // assert.Equal(t, 1, provider.currentIteration) - // - // // we dequeue a maximum of 100 logs - // assert.Equal(t, 100, len(payloads)) - // - // // the dequeue is evenly distributed across selected upkeeps - // assert.Equal(t, 9998, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["11"].logs)) - // assert.Equal(t, 9997, countLogs(bufV1.queues["111"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["50"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["101"].logs)) - // assert.Equal(t, 9997, countLogs(bufV1.queues["150"].logs)) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 3, provider.iterations) - // assert.Equal(t, 2, provider.currentIteration) - // - // // we dequeue a maximum of 100 logs - // assert.Equal(t, 100, len(payloads)) - // - // // the dequeue is evenly distributed across selected upkeeps - // assert.Equal(t, 9997, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 9997, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["50"].logs)) - // assert.Equal(t, 9998, countLogs(bufV1.queues["101"].logs)) - // assert.Equal(t, 9997, countLogs(bufV1.queues["150"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["250"].logs)) - // assert.Equal(t, 10000, countLogs(bufV1.queues["299"].logs)) - // assert.Equal(t, 9999, countLogs(bufV1.queues["300"].logs)) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 3, provider.iterations) - // assert.Equal(t, 3, provider.currentIteration) - // - // // we dequeue a maximum of 100 logs - // assert.Equal(t, 100, len(payloads)) - // - // var remainingLogs int - // // at this point, every queue should have had at least one log dequeued - // for _, queue := range bufV1.queues { - // assert.True(t, countLogs(queue.logs) < 10000) - // remainingLogs += countLogs(queue.logs) - // } - // - // // check that across all 300 upkeeps, we have only dequeued 700 of the 3000000 logs (7 dequeue calls of 100 logs) - // assert.Equal(t, 2999300, remainingLogs) - //}) - // - //t.Run("minimum guaranteed for all windows followed by best effort", func(t *testing.T) { - // oldMaxPayloads := MaxPayloads - // MaxPayloads = 10 - // defer func() { - // MaxPayloads = oldMaxPayloads - // }() - // - // upkeepIDs := []*big.Int{ - // big.NewInt(1), - // big.NewInt(2), - // big.NewInt(3), - // big.NewInt(4), - // big.NewInt(5), - // } - // - // filterStore := NewUpkeepFilterStore() - // - // logGenerator := func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start; i < end; i++ { - // for j := 0; j < 10; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), - // BlockNumber: i + 1, - // }) - // } - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 100, nil - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // // each upkeep should have 10 logs * 100 blocks = 1000 logs - // assert.Equal(t, 1000, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["5"].logs)) - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 998, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 998, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 998, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 998, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 998, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts := map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // // all 10 logs should have been dequeued from the first block window - // assert.Equal(t, 40, blockWindowCounts[1]) - // assert.Equal(t, 50, blockWindowCounts[2]) - // assert.Equal(t, 50, blockWindowCounts[3]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 996, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 996, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 996, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 996, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 996, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 40, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // assert.Equal(t, 50, blockWindowCounts[3]) - // - // for i := 0; i < 97; i++ { - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // } - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 802, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 802, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 802, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 802, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 802, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 40, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // assert.Equal(t, 40, blockWindowCounts[3]) - // assert.Equal(t, 40, blockWindowCounts[99]) - // assert.Equal(t, 50, blockWindowCounts[100]) - // - // // at this point, all block windows except for the latest block window will have been dequeued - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 800, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["5"].logs)) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 798, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 30, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // assert.Equal(t, 40, blockWindowCounts[3]) - // assert.Equal(t, 40, blockWindowCounts[100]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 796, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 20, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // assert.Equal(t, 40, blockWindowCounts[3]) - // assert.Equal(t, 40, blockWindowCounts[100]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 794, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 794, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 794, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 794, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 794, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 10, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // assert.Equal(t, 40, blockWindowCounts[3]) - // assert.Equal(t, 40, blockWindowCounts[100]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 792, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 792, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 792, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 792, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 792, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 0, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // assert.Equal(t, 40, blockWindowCounts[3]) - // assert.Equal(t, 40, blockWindowCounts[100]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 790, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 790, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 790, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 790, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 790, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 0, blockWindowCounts[1]) - // assert.Equal(t, 30, blockWindowCounts[2]) - // assert.Equal(t, 40, blockWindowCounts[3]) - // assert.Equal(t, 40, blockWindowCounts[100]) - //}) - // - //t.Run("minimum guaranteed for all windows including an incomplete window followed by best effort", func(t *testing.T) { - // oldMaxPayloads := MaxPayloads - // MaxPayloads = 10 - // defer func() { - // MaxPayloads = oldMaxPayloads - // }() - // - // upkeepIDs := []*big.Int{ - // big.NewInt(1), - // big.NewInt(2), - // big.NewInt(3), - // big.NewInt(4), - // big.NewInt(5), - // } - // - // filterStore := NewUpkeepFilterStore() - // - // logGenerator := func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start; i <= end; i++ { - // logsToAdd := 10 - // if i >= 100 { - // logsToAdd = 1 - // } - // for j := 0; j < logsToAdd; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), - // BlockNumber: i, - // }) - // } - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 102, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // opts.BlockRate = 4 // block window will be 4 blocks big - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) - // - // blockWindowCounts := map[int64]int{} - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the first block window - // assert.Equal(t, 150, blockWindowCounts[0]) // block 0 is outside the block threshold of 1 and is not enqueued - // assert.Equal(t, 200, blockWindowCounts[4]) - // assert.Equal(t, 200, blockWindowCounts[8]) - // assert.Equal(t, 15, blockWindowCounts[100]) // the block window starting at block 100 is only 3/4 complete as of block 102 - // - // // each upkeep should have 10 logs * 102 blocks = 1020 logs - // assert.Equal(t, 993, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 993, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 993, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 993, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 993, countLogs(bufV1.queues["5"].logs)) - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 991, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 991, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 991, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 991, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 991, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the first block window - // assert.Equal(t, 140, blockWindowCounts[0]) - // assert.Equal(t, 200, blockWindowCounts[4]) - // assert.Equal(t, 200, blockWindowCounts[8]) - // assert.Equal(t, 15, blockWindowCounts[100]) // the block window starting at block 100 is only 3/4 complete as of block 102 - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 989, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 989, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 989, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 989, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 989, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment - // assert.Equal(t, 140, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // assert.Equal(t, 200, blockWindowCounts[8]) - // - // for i := 0; i < 23; i++ { - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // } - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 943, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 943, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 943, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 943, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 943, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 140, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // assert.Equal(t, 190, blockWindowCounts[8]) - // assert.Equal(t, 190, blockWindowCounts[96]) - // assert.Equal(t, 15, blockWindowCounts[100]) // still not been dequeued at this point - // - // // at this point, all block windows except for the latest block window will have been dequeued - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 941, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 941, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 941, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 941, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 941, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 140, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // assert.Equal(t, 190, blockWindowCounts[8]) - // assert.Equal(t, 190, blockWindowCounts[96]) - // assert.Equal(t, 5, blockWindowCounts[100]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 939, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 939, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 939, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 939, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 939, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 130, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // assert.Equal(t, 190, blockWindowCounts[8]) - // assert.Equal(t, 190, blockWindowCounts[96]) - // assert.Equal(t, 5, blockWindowCounts[100]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 937, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 937, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 937, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 937, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 937, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 120, blockWindowCounts[0]) // first block window is repeatedly dequeued as best effort - // assert.Equal(t, 190, blockWindowCounts[4]) - // assert.Equal(t, 190, blockWindowCounts[8]) - // assert.Equal(t, 190, blockWindowCounts[96]) - // assert.Equal(t, 5, blockWindowCounts[100]) - // - // provider.poller = &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 103, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 935, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 935, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 935, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 935, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 935, countLogs(bufV1.queues["5"].logs)) - // - // for i := 0; i < 467; i++ { - // _, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // } - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[0]) // first block window is repeatedly dequeued as best effort - // assert.Equal(t, 0, blockWindowCounts[4]) - // assert.Equal(t, 0, blockWindowCounts[8]) - // assert.Equal(t, 0, blockWindowCounts[96]) - // assert.Equal(t, 5, blockWindowCounts[100]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[0]) - // assert.Equal(t, 0, blockWindowCounts[4]) - // assert.Equal(t, 0, blockWindowCounts[8]) - // assert.Equal(t, 0, blockWindowCounts[96]) - // assert.Equal(t, 0, blockWindowCounts[100]) - //}) - // - //t.Run("a complete window with no logs present is immediately marked as having the min logs dequeued, logs are dequeued from the next window", func(t *testing.T) { - // oldMaxPayloads := MaxPayloads - // MaxPayloads = 10 - // defer func() { - // MaxPayloads = oldMaxPayloads - // }() - // - // upkeepIDs := []*big.Int{ - // big.NewInt(1), - // big.NewInt(2), - // big.NewInt(3), - // big.NewInt(4), - // big.NewInt(5), - // } - // - // filterStore := NewUpkeepFilterStore() - // - // logGenerator := func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start + 4; i <= end; i++ { - // logsToAdd := 10 - // if i >= 100 { - // logsToAdd = 1 - // } - // for j := 0; j < logsToAdd; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), - // BlockNumber: i, - // }) - // } - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 99, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // opts.BlockRate = 4 // block window will be 4 blocks big - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // blockWindowCounts := map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the first block window - // assert.Equal(t, 0, blockWindowCounts[0]) - // assert.Equal(t, 200, blockWindowCounts[4]) - // assert.Equal(t, 200, blockWindowCounts[8]) - // assert.Equal(t, 200, blockWindowCounts[96]) - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the second block window - // assert.Equal(t, 0, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // assert.Equal(t, 200, blockWindowCounts[8]) - // assert.Equal(t, 200, blockWindowCounts[96]) - //}) - // - //t.Run("an incomplete window with no logs present is marked as not ready then min dequeued when the window is complete", func(t *testing.T) { - // oldMaxPayloads := MaxPayloads - // MaxPayloads = 10 - // defer func() { - // MaxPayloads = oldMaxPayloads - // }() - // - // upkeepIDs := []*big.Int{ - // big.NewInt(1), - // big.NewInt(2), - // big.NewInt(3), - // big.NewInt(4), - // big.NewInt(5), - // } - // - // filterStore := NewUpkeepFilterStore() - // - // logGenerator := func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start + 4; i <= end; i++ { - // logsToAdd := 10 - // if i >= 100 { - // logsToAdd = 1 - // } - // for j := 0; j < logsToAdd; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), - // BlockNumber: i, - // }) - // } - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 2, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // opts.BlockRate = 4 // block window will be 4 blocks big - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // blockWindowCounts := map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[0]) - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 0, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the second block window - // assert.Equal(t, 0, blockWindowCounts[0]) - // - // provider.poller = &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 3, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 0, len(payloads)) - // - // //assert.Equal(t, true, dequeueCoordinator.dequeuedMinimum[0]) // now that the window is complete, it should be marked as dequeued minimum - // - // provider.poller = &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 7, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // err = provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[0]) - // assert.Equal(t, 180, blockWindowCounts[4]) - // - // provider.poller = &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 11, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // err = provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[0]) - // assert.Equal(t, 180, blockWindowCounts[4]) - // assert.Equal(t, 190, blockWindowCounts[8]) - //}) - // - //t.Run("an incomplete window with minimum logs already present is marked as min dequeued", func(t *testing.T) { - // oldMaxPayloads := MaxPayloads - // MaxPayloads = 10 - // defer func() { - // MaxPayloads = oldMaxPayloads - // }() - // - // upkeepIDs := []*big.Int{ - // big.NewInt(1), - // big.NewInt(2), - // big.NewInt(3), - // big.NewInt(4), - // big.NewInt(5), - // } - // - // filterStore := NewUpkeepFilterStore() - // - // logGenerator := func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start; i <= end; i++ { - // logsToAdd := 10 - // for j := 0; j < logsToAdd; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), - // BlockNumber: i, - // }) - // } - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 2, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // opts.BlockRate = 4 // block window will be 4 blocks big - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // blockWindowCounts := map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 100, blockWindowCounts[0]) // 100 logs because blocks 0, 1, 2 exist, 0 is omitted in enqueue, so blocks 1 and 2 have 10x5 logs each - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // // all 10 logs should have been dequeued from the first block window - // assert.Equal(t, 90, blockWindowCounts[0]) - // - // logGenerator = func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start + 4; i <= end; i++ { - // logsToAdd := 10 - // for j := 0; j < logsToAdd; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), - // BlockNumber: i, - // }) - // } - // } - // return res - // } - // - // provider.poller = &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 7, nil // make the latest window incomplete - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // err = provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 90, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 80, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // startWindow, _ := getBlockWindow(blockNumber, 4) - // - // blockWindowCounts[startWindow] += len(logs) - // } - // } - // - // assert.Equal(t, 70, blockWindowCounts[0]) - // assert.Equal(t, 190, blockWindowCounts[4]) - //}) - // - //t.Run("min dequeue followed by best effort followed by reorg followed by best effort", func(t *testing.T) { - // oldMaxPayloads := MaxPayloads - // MaxPayloads = 10 - // defer func() { - // MaxPayloads = oldMaxPayloads - // }() - // - // upkeepIDs := []*big.Int{ - // big.NewInt(1), - // big.NewInt(2), - // big.NewInt(3), - // big.NewInt(4), - // big.NewInt(5), - // } - // - // filterStore := NewUpkeepFilterStore() - // - // logGenerator := func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start; i < end; i++ { - // for j := 0; j < 10; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), - // BlockNumber: i + 1, - // }) - // } - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 100, nil - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // // each upkeep should have 10 logs * 100 blocks = 1000 logs - // assert.Equal(t, 1000, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 1000, countLogs(bufV1.queues["5"].logs)) - // - // for i := 0; i < 100; i++ { - // _, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // } - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 800, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 800, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts := map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // // min dequeue should have happened for all block windows - // assert.Equal(t, 40, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[100]) - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 798, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 798, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // // best effort dequeues first block window - // assert.Equal(t, 30, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 796, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 796, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // // best effort dequeues first block window - // assert.Equal(t, 20, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[2]) - // - // // reorg happens - // logGenerator = func(start, end int64) []logpoller.Log { - // var res []logpoller.Log - // for i := start; i < end; i++ { - // if i == 97 { - // for j := 0; j < 10; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%de", i+1)), - // BlockNumber: i + 1, - // }) - // } - // } else { - // for j := 0; j < 10; j++ { - // res = append(res, logpoller.Log{ - // LogIndex: int64(j), - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), - // BlockNumber: i + 1, - // }) - // } - // } - // } - // return res - // } - // // use a log poller that will create logs for the queried block range - // provider.poller = &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 102, nil - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 20, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[98]) - // - // err = provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 20, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[97]) - // assert.Equal(t, 50, blockWindowCounts[98]) // reorg block window has had new logs added after reorg - // assert.Equal(t, 40, blockWindowCounts[99]) - // - // assert.Equal(t, 818, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 818, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 818, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 818, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 818, countLogs(bufV1.queues["5"].logs)) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 816, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 816, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 816, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 816, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 816, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 20, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[98]) // this block window has had its min dequeue met following a reorg - // assert.Equal(t, 50, blockWindowCounts[101]) - // assert.Equal(t, 50, blockWindowCounts[102]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 814, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 814, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 814, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 814, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 814, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // // best effort dequeues first block window - // assert.Equal(t, 20, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[98]) - // assert.Equal(t, 40, blockWindowCounts[101]) - // assert.Equal(t, 50, blockWindowCounts[102]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 812, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 812, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 812, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 812, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 812, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 20, blockWindowCounts[1]) - // assert.Equal(t, 40, blockWindowCounts[98]) - // assert.Equal(t, 40, blockWindowCounts[101]) - // assert.Equal(t, 40, blockWindowCounts[102]) // latest block window has now had min dequeue met - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 810, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 810, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 810, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 810, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 810, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 10, blockWindowCounts[1]) // best effort resumes on the first block window - // assert.Equal(t, 40, blockWindowCounts[98]) - // assert.Equal(t, 40, blockWindowCounts[101]) - // assert.Equal(t, 40, blockWindowCounts[102]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 808, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 808, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 808, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 808, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 808, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[1]) // best effort completes on the first block window - // assert.Equal(t, 40, blockWindowCounts[2]) - // assert.Equal(t, 40, blockWindowCounts[98]) - // assert.Equal(t, 40, blockWindowCounts[101]) - // assert.Equal(t, 40, blockWindowCounts[102]) - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // // we dequeue a maximum of 10 logs - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps - // assert.Equal(t, 806, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 806, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 806, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 806, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 806, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[1]) - // assert.Equal(t, 30, blockWindowCounts[2]) // best effort continues on the second block window - // assert.Equal(t, 40, blockWindowCounts[98]) - // assert.Equal(t, 40, blockWindowCounts[101]) - // assert.Equal(t, 40, blockWindowCounts[102]) - //}) - // - //t.Run("sparsely populated blocks", func(t *testing.T) { - // oldMaxPayloads := MaxPayloads - // MaxPayloads = 10 - // defer func() { - // MaxPayloads = oldMaxPayloads - // }() - // - // upkeepIDs := []*big.Int{ - // big.NewInt(1), - // big.NewInt(2), - // big.NewInt(3), - // big.NewInt(4), - // big.NewInt(5), - // } - // - // filterStore := NewUpkeepFilterStore() - // - // upkeepOmittedOnBlocks := map[int][]int64{ - // 1: {5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100}, // upkeep 1 won't have logs on 20 blocks - // 2: {2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100}, // upkeep 2 won't have logs on 50 blocks - // 3: {3, 13, 23, 33, 43, 53, 63, 73, 83, 93}, // upkeep 3 won't appear on 10 blocks - // 4: {1, 25, 50, 75, 100}, // upkeep 4 won't appear on 5 blocks - // 5: {}, // upkeep 5 appears on all blocks - // } - // - // callCount := 0 - // // this gets called once per upkeep ID - // logGenerator := func(start, end int64) []logpoller.Log { - // callCount++ - // var res []logpoller.Log - // outer: - // for i := start; i < end; i++ { - // for _, skip := range upkeepOmittedOnBlocks[callCount] { - // if skip == i+1 { - // continue outer - // } - // } - // res = append(res, logpoller.Log{ - // LogIndex: i, - // BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), - // BlockNumber: i + 1, - // }) - // } - // return res - // } - // - // // use a log poller that will create logs for the queried block range - // logPoller := &mockLogPoller{ - // LatestBlockFn: func(ctx context.Context) (int64, error) { - // return 100, nil - // }, - // LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - // return logGenerator(start, end), nil - // }, - // } - // - // // prepare the filter store with upkeeps - // for _, upkeepID := range upkeepIDs { - // filterStore.AddActiveUpkeeps( - // upkeepFilter{ - // addr: []byte(upkeepID.String()), - // upkeepID: upkeepID, - // topics: []common.Hash{ - // common.HexToHash(upkeepID.String()), - // }, - // }, - // ) - // } - // - // opts := NewOptions(200, big.NewInt(1)) - // opts.BufferVersion = "v1" - // - // provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) - // - // ctx := context.Background() - // - // err := provider.ReadLogs(ctx, upkeepIDs...) - // assert.NoError(t, err) - // - // assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) - // - // bufV1 := provider.bufferV1.(*logBuffer) - // - // blockWindowCounts := map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 4, blockWindowCounts[1]) - // assert.Equal(t, 4, blockWindowCounts[2]) - // assert.Equal(t, 4, blockWindowCounts[3]) - // assert.Equal(t, 4, blockWindowCounts[4]) - // assert.Equal(t, 4, blockWindowCounts[5]) - // assert.Equal(t, 4, blockWindowCounts[6]) - // assert.Equal(t, 5, blockWindowCounts[7]) // block 7 is the first block to contain 1 log for all upkeeps - // - // assert.Equal(t, 80, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 50, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 90, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 95, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 100, countLogs(bufV1.queues["5"].logs)) - // - // payloads, err := provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps based on availability - // assert.Equal(t, 77, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 48, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 88, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 94, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 98, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[1]) - // assert.Equal(t, 0, blockWindowCounts[2]) - // assert.Equal(t, 2, blockWindowCounts[3]) - // assert.Equal(t, 4, blockWindowCounts[4]) - // assert.Equal(t, 4, blockWindowCounts[5]) - // assert.Equal(t, 4, blockWindowCounts[6]) - // assert.Equal(t, 5, blockWindowCounts[7]) // block 7 is the first block to contain 1 log for all upkeeps - // - // payloads, err = provider.GetLatestPayloads(ctx) - // assert.NoError(t, err) - // - // assert.Equal(t, 10, len(payloads)) - // - // // the dequeue is evenly distributed across the 5 upkeeps based on availability - // assert.Equal(t, 76, countLogs(bufV1.queues["1"].logs)) - // assert.Equal(t, 47, countLogs(bufV1.queues["2"].logs)) - // assert.Equal(t, 86, countLogs(bufV1.queues["3"].logs)) - // assert.Equal(t, 91, countLogs(bufV1.queues["4"].logs)) - // assert.Equal(t, 95, countLogs(bufV1.queues["5"].logs)) - // - // blockWindowCounts = map[int64]int{} - // - // for _, q := range bufV1.queues { - // for blockNumber, logs := range q.logs { - // blockWindowCounts[blockNumber] += len(logs) - // } - // } - // - // assert.Equal(t, 0, blockWindowCounts[1]) - // assert.Equal(t, 0, blockWindowCounts[2]) - // assert.Equal(t, 0, blockWindowCounts[3]) - // assert.Equal(t, 0, blockWindowCounts[4]) - // assert.Equal(t, 0, blockWindowCounts[5]) - // assert.Equal(t, 4, blockWindowCounts[6]) - // assert.Equal(t, 5, blockWindowCounts[7]) // block 7 is the first block to contain 1 log for all upkeeps - //}) + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + // the second dequeue call will retrieve the remaining 100 logs and exhaust the queues + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + assert.Equal(t, 40, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 18, countRemainingLogs(buffer.queues["2"].logs)) + assert.Equal(t, 48, countRemainingLogs(buffer.queues["3"].logs)) + assert.Equal(t, 52, countRemainingLogs(buffer.queues["4"].logs)) + assert.Equal(t, 57, countRemainingLogs(buffer.queues["5"].logs)) + }) } type mockedPacker struct {