diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index b54d4e6fc8..12c7814753 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -106,3 +106,7 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) { return 0, ErrDisabled } + +func (d disabled) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + return nil, ErrDisabled +} diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index b80a6f5f0b..b2a370fd0a 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -58,6 +58,7 @@ type LogPoller interface { IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) + LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) } type Confirmations int @@ -1019,6 +1020,10 @@ func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, event return lp.orm.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) } +func (lp *logPoller) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + return lp.orm.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) +} + // GetBlocksRange tries to get the specified block numbers from the log pollers // blocks table. It falls back to the RPC for any unfulfilled requested blocks. func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) { diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 01be5f7ba5..6cb8b6f9ef 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -522,6 +522,39 @@ func (_m *LogPoller) LogsCreatedAfter(eventSig common.Hash, address common.Addre return r0, r1 } +// LogsDataWordBetween provides a mock function with given fields: eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts +func (_m *LogPoller) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs logpoller.Confirmations, qopts ...pg.QOpt) ([]logpoller.Log, error) { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 []logpoller.Log + var r1 error + if rf, ok := ret.Get(0).(func(common.Hash, common.Address, int, int, common.Hash, logpoller.Confirmations, ...pg.QOpt) ([]logpoller.Log, error)); ok { + return rf(eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + } + if rf, ok := ret.Get(0).(func(common.Hash, common.Address, int, int, common.Hash, logpoller.Confirmations, ...pg.QOpt) []logpoller.Log); ok { + r0 = rf(eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]logpoller.Log) + } + } + + if rf, ok := ret.Get(1).(func(common.Hash, common.Address, int, int, common.Hash, logpoller.Confirmations, ...pg.QOpt) error); ok { + r1 = rf(eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LogsDataWordGreaterThan provides a mock function with given fields: eventSig, address, wordIndex, wordValueMin, confs, qopts func (_m *LogPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs logpoller.Confirmations, qopts ...pg.QOpt) ([]logpoller.Log, error) { _va := make([]interface{}, len(qopts)) diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index b2d7ff9198..ade3ab1f96 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -212,6 +212,12 @@ func (o *ObservedORM) SelectLogsDataWordGreaterThan(address common.Address, even }) } +func (o *ObservedORM) SelectLogsDataWordBetween(address common.Address, eventSIg common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogsDataWordBetween", func() ([]Log, error) { + return o.ORM.SelectLogsDataWordBetween(address, eventSIg, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + }) +} + func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { return withObservedQueryAndResults(o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) { return o.ORM.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index f107df5ca9..17f8753bd9 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -51,6 +51,7 @@ type ORM interface { SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) SelectLogsDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) + SelectLogsDataWordBetween(address common.Address, eventSIg common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) } type DbORM struct { @@ -535,6 +536,32 @@ func (o *DbORM) SelectLogsDataWordGreaterThan(address common.Address, eventSig c return logs, nil } +func (o *DbORM) SelectLogsDataWordBetween(address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + args, err := newQueryArgsForEvent(o.chainID, address, eventSig). + withWordIndexMin(wordIndexMin). + withWordIndexMax(wordIndexMax). + withWordValue(wordValue). + withConfs(confs). + toArgs() + if err != nil { + return nil, err + } + query := fmt.Sprintf(` + SELECT * FROM evm.logs + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index_min+1 for 32) <= :word_value + AND substring(data from 32*:word_index_max+1 for 32) >= :word_value + AND block_number <= %s + ORDER BY (block_number, log_index)`, nestedBlockNumberQuery(confs)) + var logs []Log + if err = o.q.WithOpts(qopts...).SelectNamed(&logs, query, args); err != nil { + return nil, err + } + return logs, nil +} + func (o *DbORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withTopicIndex(topicIndex). diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 49bf844053..48d140561b 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -49,6 +49,21 @@ func GenLogWithTimestamp(chainID *big.Int, logIndex int64, blockNum int64, block } } +func GenLogWithData(chainID *big.Int, address common.Address, eventSig common.Hash, logIndex int64, blockNum int64, data []byte) logpoller.Log { + return logpoller.Log{ + EvmChainId: utils.NewBig(chainID), + LogIndex: logIndex, + BlockHash: utils.RandomBytes32(), + BlockNumber: blockNum, + EventSig: eventSig, + Topics: [][]byte{}, + Address: address, + TxHash: utils.RandomBytes32(), + Data: data, + BlockTimestamp: time.Now(), + } +} + func TestLogPoller_Batching(t *testing.T) { t.Parallel() th := SetupTH(t, false, 2, 3, 2, 1000) @@ -1434,3 +1449,117 @@ func TestInsertLogsInTx(t *testing.T) { }) } } + +func TestSelectLogsDataWordBetween(t *testing.T) { + address := utils.RandomAddress() + eventSig := utils.RandomBytes32() + th := SetupTH(t, false, 2, 3, 2, 1000) + + firstLogData := make([]byte, 0, 64) + firstLogData = append(firstLogData, logpoller.EvmWord(1).Bytes()...) + firstLogData = append(firstLogData, logpoller.EvmWord(10).Bytes()...) + + secondLogData := make([]byte, 0, 64) + secondLogData = append(secondLogData, logpoller.EvmWord(5).Bytes()...) + secondLogData = append(secondLogData, logpoller.EvmWord(20).Bytes()...) + + err := th.ORM.InsertLogsWithBlock( + []logpoller.Log{ + GenLogWithData(th.ChainID, address, eventSig, 1, 1, firstLogData), + GenLogWithData(th.ChainID, address, eventSig, 2, 2, secondLogData), + }, + logpoller.NewLogPollerBlock(utils.RandomBytes32(), 10, time.Now(), 1), + ) + require.NoError(t, err) + + tests := []struct { + name string + wordValue uint64 + expectedLogs []int64 + }{ + { + name: "returns only first log", + wordValue: 2, + expectedLogs: []int64{1}, + }, + { + name: "returns only second log", + wordValue: 11, + expectedLogs: []int64{2}, + }, + { + name: "returns both logs if word value is between", + wordValue: 5, + expectedLogs: []int64{1, 2}, + }, + { + name: "returns no logs if word value is outside of the range", + wordValue: 21, + expectedLogs: []int64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logs, err1 := th.ORM.SelectLogsDataWordBetween(address, eventSig, 0, 1, logpoller.EvmWord(tt.wordValue), logpoller.Unconfirmed) + assert.NoError(t, err1) + assert.Len(t, logs, len(tt.expectedLogs)) + + for index := range logs { + assert.Equal(t, tt.expectedLogs[index], logs[index].BlockNumber) + } + }) + } +} + +func Benchmark_LogsDataWordBetween(b *testing.B) { + chainId := big.NewInt(137) + _, db := heavyweight.FullTestDBV2(b, "logs_data_word_between", nil) + o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false)) + + numberOfReports := 100_000 + numberOfMessagesPerReport := 256 + + commitStoreAddress := utils.RandomAddress() + commitReportAccepted := utils.RandomBytes32() + + var dbLogs []logpoller.Log + for i := 0; i < numberOfReports; i++ { + data := make([]byte, 96) + // MinSeqNr + data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*i+1)).Bytes()...) + // MaxSeqNr + data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*(i+1))).Bytes()...) + + dbLogs = append(dbLogs, logpoller.Log{ + EvmChainId: utils.NewBig(chainId), + LogIndex: int64(i + 1), + BlockHash: utils.RandomBytes32(), + BlockNumber: int64(i + 1), + BlockTimestamp: time.Now(), + EventSig: commitReportAccepted, + Topics: [][]byte{}, + Address: commitStoreAddress, + TxHash: utils.RandomAddress().Hash(), + Data: data, + CreatedAt: time.Now(), + }) + } + require.NoError(b, o.InsertBlock(utils.RandomAddress().Hash(), int64(numberOfReports*numberOfMessagesPerReport), time.Now(), int64(numberOfReports*numberOfMessagesPerReport))) + require.NoError(b, o.InsertLogs(dbLogs)) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + logs, err := o.SelectLogsDataWordBetween( + commitStoreAddress, + commitReportAccepted, + 3, + 4, + logpoller.EvmWord(uint64(numberOfReports*numberOfMessagesPerReport/2)), // Pick the middle report + logpoller.Unconfirmed, + ) + assert.NoError(b, err) + assert.Len(b, logs, 1) + } +} diff --git a/core/chains/evm/logpoller/query.go b/core/chains/evm/logpoller/query.go index 7443a860a8..8826c5a2ac 100644 --- a/core/chains/evm/logpoller/query.go +++ b/core/chains/evm/logpoller/query.go @@ -74,6 +74,18 @@ func (q *queryArgs) withWordIndex(wordIndex int) *queryArgs { return q.withCustomArg("word_index", wordIndex) } +func (q *queryArgs) withWordIndexMin(wordIndex int) *queryArgs { + return q.withCustomArg("word_index_min", wordIndex) +} + +func (q *queryArgs) withWordIndexMax(wordIndex int) *queryArgs { + return q.withCustomArg("word_index_max", wordIndex) +} + +func (q *queryArgs) withWordValue(wordValue common.Hash) *queryArgs { + return q.withCustomHashArg("word_value", wordValue) +} + func (q *queryArgs) withWordValueMin(wordValueMin common.Hash) *queryArgs { return q.withCustomHashArg("word_value_min", wordValueMin) } diff --git a/core/services/ocr2/plugins/ccip/execution_batch_building.go b/core/services/ocr2/plugins/ccip/execution_batch_building.go index fea4faa361..7795316698 100644 --- a/core/services/ocr2/plugins/ccip/execution_batch_building.go +++ b/core/services/ocr2/plugins/ccip/execution_batch_building.go @@ -90,17 +90,14 @@ func validateSeqNumbers(serviceCtx context.Context, commitStore ccipdata.CommitS // Gets the commit report from the saved logs for a given sequence number. func getCommitReportForSeqNum(ctx context.Context, commitStoreReader ccipdata.CommitStoreReader, seqNum uint64) (ccipdata.CommitStoreReport, error) { - acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteSeqNum(ctx, seqNum, 0) + acceptedReports, err := commitStoreReader.GetCommitReportMatchingSeqNum(ctx, seqNum, 0) if err != nil { return ccipdata.CommitStoreReport{}, err } - for _, acceptedReport := range acceptedReports { - reportInterval := acceptedReport.Data.Interval - if reportInterval.Min <= seqNum && seqNum <= reportInterval.Max { - return acceptedReport.Data, nil - } + if len(acceptedReports) == 0 { + return ccipdata.CommitStoreReport{}, errors.Errorf("seq number not committed") } - return ccipdata.CommitStoreReport{}, errors.Errorf("seq number not committed") + return acceptedReports[0].Data, nil } diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go index 310be34760..ca172227f5 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go @@ -357,7 +357,7 @@ func TestExecutionReportingPlugin_buildReport(t *testing.T) { commitStore.On("VerifyExecutionReport", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) commitStore.On("GetExpectedNextSequenceNumber", mock.Anything). Return(executionReport.Messages[len(executionReport.Messages)-1].SequenceNumber+1, nil) - commitStore.On("GetAcceptedCommitReportsGteSeqNum", ctx, observations[0].SeqNr, 0). + commitStore.On("GetCommitReportMatchingSeqNum", ctx, observations[0].SeqNr, 0). Return([]ccipdata.Event[ccipdata.CommitStoreReport]{ { Data: ccipdata.CommitStoreReport{ diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go index 88be66a2f0..91bde02109 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go @@ -96,8 +96,8 @@ type CommitStoreReader interface { Closer GetExpectedNextSequenceNumber(context context.Context) (uint64, error) GetLatestPriceEpochAndRound(context context.Context) (uint64, error) - // GetAcceptedCommitReportsGteSeqNum returns all the accepted commit reports that have max sequence number greater than or equal to the provided. - GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) + // GetCommitReportMatchingSeqNum returns accepted commit report that satisfies Interval.Min <= seqNum <= Interval.Max. Returned slice should be empty or have exactly one element + GetCommitReportMatchingSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) // GetAcceptedCommitReportsGteTimestamp returns all the commit reports with timestamp greater than or equal to the provided. // Returned Commit Reports have to be sorted by Interval.Min/Interval.Max in ascending order. GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]Event[CommitStoreReport], error) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader_test.go index 4cfe6b94f4..9ad21de96a 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader_test.go @@ -343,20 +343,24 @@ func TestCommitStoreReaders(t *testing.T) { require.NoError(t, err) assert.Equal(t, rep.Interval.Max+1, seqNr) - reps, err := cr.GetAcceptedCommitReportsGteSeqNum(context.Background(), rep.Interval.Max+1, 0) + reps, err := cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Max+1, 0) require.NoError(t, err) assert.Len(t, reps, 0) - reps, err = cr.GetAcceptedCommitReportsGteSeqNum(context.Background(), rep.Interval.Max, 0) + reps, err = cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Max, 0) require.NoError(t, err) require.Len(t, reps, 1) assert.Equal(t, reps[0].Data, rep) - reps, err = cr.GetAcceptedCommitReportsGteSeqNum(context.Background(), rep.Interval.Min-1, 0) + reps, err = cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Min, 0) require.NoError(t, err) require.Len(t, reps, 1) assert.Equal(t, reps[0].Data, rep) + reps, err = cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Min-1, 0) + require.NoError(t, err) + require.Len(t, reps, 0) + // Sanity reps, err = cr.GetAcceptedCommitReportsGteTimestamp(context.Background(), time.Unix(0, 0), 0) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_0_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_0_0.go index 6336ca6ac7..e0de8608ec 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_0_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_0_0.go @@ -261,10 +261,11 @@ func (c *CommitStoreV1_0_0) parseReport(log types.Log) (*CommitStoreReport, erro }, nil } -func (c *CommitStoreV1_0_0) GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) { - logs, err := c.lp.LogsDataWordGreaterThan( +func (c *CommitStoreV1_0_0) GetCommitReportMatchingSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) { + logs, err := c.lp.LogsDataWordBetween( c.reportAcceptedSig, c.address, + c.reportAcceptedMaxSeqIndex-1, c.reportAcceptedMaxSeqIndex, logpoller.EvmWord(seqNum), logpoller.Confirmations(confs), @@ -274,11 +275,20 @@ func (c *CommitStoreV1_0_0) GetAcceptedCommitReportsGteSeqNum(ctx context.Contex return nil, err } - return parseLogs[CommitStoreReport]( + parsedLogs, err := parseLogs[CommitStoreReport]( logs, c.lggr, c.parseReport, ) + if err != nil { + return nil, err + } + + if len(parsedLogs) > 1 { + c.lggr.Errorw("More than one report found for seqNum", "seqNum", seqNum, "commitReports", parsedLogs) + return parsedLogs[:1], nil + } + return parsedLogs, nil } func (c *CommitStoreV1_0_0) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]Event[CommitStoreReport], error) { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_2_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_2_0.go index 0a888fcc4e..6d55f6eb90 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_2_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_v1_2_0.go @@ -281,10 +281,11 @@ func (c *CommitStoreV1_2_0) parseReport(log types.Log) (*CommitStoreReport, erro }, nil } -func (c *CommitStoreV1_2_0) GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) { - logs, err := c.lp.LogsDataWordGreaterThan( +func (c *CommitStoreV1_2_0) GetCommitReportMatchingSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) { + logs, err := c.lp.LogsDataWordBetween( c.reportAcceptedSig, c.address, + c.reportAcceptedMaxSeqIndex-1, c.reportAcceptedMaxSeqIndex, logpoller.EvmWord(seqNum), logpoller.Confirmations(confs), @@ -294,11 +295,20 @@ func (c *CommitStoreV1_2_0) GetAcceptedCommitReportsGteSeqNum(ctx context.Contex return nil, err } - return parseLogs[CommitStoreReport]( + parsedLogs, err := parseLogs[CommitStoreReport]( logs, c.lggr, c.parseReport, ) + if err != nil { + return nil, err + } + + if len(parsedLogs) > 1 { + c.lggr.Errorw("More than one report found for seqNum", "seqNum", seqNum, "commitReports", parsedLogs) + return parsedLogs[:1], nil + } + return parsedLogs, nil } func (c *CommitStoreV1_2_0) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]Event[CommitStoreReport], error) { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go index 99c16d1d66..5c6109d710 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go @@ -134,25 +134,25 @@ func (_m *CommitStoreReader) GasPriceEstimator() prices.GasPriceEstimatorCommit return r0 } -// GetAcceptedCommitReportsGteSeqNum provides a mock function with given fields: ctx, seqNum, confs -func (_m *CommitStoreReader) GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { - ret := _m.Called(ctx, seqNum, confs) +// GetAcceptedCommitReportsGteTimestamp provides a mock function with given fields: ctx, ts, confs +func (_m *CommitStoreReader) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { + ret := _m.Called(ctx, ts, confs) var r0 []ccipdata.Event[ccipdata.CommitStoreReport] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error)); ok { - return rf(ctx, seqNum, confs) + if rf, ok := ret.Get(0).(func(context.Context, time.Time, int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error)); ok { + return rf(ctx, ts, confs) } - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) []ccipdata.Event[ccipdata.CommitStoreReport]); ok { - r0 = rf(ctx, seqNum, confs) + if rf, ok := ret.Get(0).(func(context.Context, time.Time, int) []ccipdata.Event[ccipdata.CommitStoreReport]); ok { + r0 = rf(ctx, ts, confs) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]ccipdata.Event[ccipdata.CommitStoreReport]) } } - if rf, ok := ret.Get(1).(func(context.Context, uint64, int) error); ok { - r1 = rf(ctx, seqNum, confs) + if rf, ok := ret.Get(1).(func(context.Context, time.Time, int) error); ok { + r1 = rf(ctx, ts, confs) } else { r1 = ret.Error(1) } @@ -160,25 +160,25 @@ func (_m *CommitStoreReader) GetAcceptedCommitReportsGteSeqNum(ctx context.Conte return r0, r1 } -// GetAcceptedCommitReportsGteTimestamp provides a mock function with given fields: ctx, ts, confs -func (_m *CommitStoreReader) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { - ret := _m.Called(ctx, ts, confs) +// GetCommitReportMatchingSeqNum provides a mock function with given fields: ctx, seqNum, confs +func (_m *CommitStoreReader) GetCommitReportMatchingSeqNum(ctx context.Context, seqNum uint64, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { + ret := _m.Called(ctx, seqNum, confs) var r0 []ccipdata.Event[ccipdata.CommitStoreReport] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, time.Time, int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error)); ok { - return rf(ctx, ts, confs) + if rf, ok := ret.Get(0).(func(context.Context, uint64, int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error)); ok { + return rf(ctx, seqNum, confs) } - if rf, ok := ret.Get(0).(func(context.Context, time.Time, int) []ccipdata.Event[ccipdata.CommitStoreReport]); ok { - r0 = rf(ctx, ts, confs) + if rf, ok := ret.Get(0).(func(context.Context, uint64, int) []ccipdata.Event[ccipdata.CommitStoreReport]); ok { + r0 = rf(ctx, seqNum, confs) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]ccipdata.Event[ccipdata.CommitStoreReport]) } } - if rf, ok := ret.Get(1).(func(context.Context, time.Time, int) error); ok { - r1 = rf(ctx, ts, confs) + if rf, ok := ret.Get(1).(func(context.Context, uint64, int) error); ok { + r1 = rf(ctx, seqNum, confs) } else { r1 = ret.Error(1) } diff --git a/core/services/ocr2/plugins/ccip/internal/observability/commit_store.go b/core/services/ocr2/plugins/ccip/internal/observability/commit_store.go index fcddbed473..6c8d94fc74 100644 --- a/core/services/ocr2/plugins/ccip/internal/observability/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/observability/commit_store.go @@ -37,9 +37,9 @@ func (o *ObservedCommitStoreReader) GetLatestPriceEpochAndRound(context context. }) } -func (o *ObservedCommitStoreReader) GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { - return withObservedInteractionAndResults(o.metric, "GetAcceptedCommitReportsGteSeqNum", func() ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { - return o.CommitStoreReader.GetAcceptedCommitReportsGteSeqNum(ctx, seqNum, confs) +func (o *ObservedCommitStoreReader) GetCommitReportMatchingSeqNum(ctx context.Context, seqNum uint64, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { + return withObservedInteractionAndResults(o.metric, "GetCommitReportMatchingSeqNum", func() ([]ccipdata.Event[ccipdata.CommitStoreReport], error) { + return o.CommitStoreReader.GetCommitReportMatchingSeqNum(ctx, seqNum, confs) }) }