diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index 81f51311d44..e391025e2c0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -148,6 +148,8 @@ type logEventBuffer struct { blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 + + logsInBuffer int64 } func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer { @@ -223,6 +225,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { } if added > 0 { lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock) + atomic.AddInt64(&b.logsInBuffer, int64(added-dropped)) } return added - dropped @@ -324,6 +327,7 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit if len(results) > 0 { b.lggr.Debugw("Dequeued logs", "results", len(results), "start", start, "end", end) + atomic.AddInt64(&b.logsInBuffer, -int64(len(results))) } return results diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 2dabcc82671..049e37c9201 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -102,6 +102,8 @@ type logEventProvider struct { opts LogTriggersOptions currentPartitionIdx uint64 + + servedLogs int64 } func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { @@ -142,6 +144,21 @@ func (p *logEventProvider) Start(context.Context) error { }) }) + p.threadCtrl.Go(func(ctx context.Context) { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + p.lggr.Debugw("logs stats", "servedLogs", atomic.LoadInt64(&p.servedLogs), "logsInBuffer", atomic.LoadInt64(&p.buffer.logsInBuffer), "latestBlockSeen", p.buffer.latestBlockSeen()) + case <-ctx.Done(): + return + } + } + + }) + return nil }) } @@ -188,6 +205,8 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers payloads = append(payloads, payload) } + atomic.AddInt64(&p.servedLogs, int64(len(logs))) + return payloads, nil }