Skip to content

Commit

Permalink
CCIP-1306 Using proper query for fetching single CommitReport from th…
Browse files Browse the repository at this point in the history
…e LP (#291)
  • Loading branch information
mateusz-sekara authored Nov 17, 2023
1 parent 23ca470 commit 84a60b8
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 40 deletions.
4 changes: 4 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A
func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) {
return 0, ErrDisabled
}

func (d disabled) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}
5 changes: 5 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type LogPoller interface {
IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
}

type Confirmations int
Expand Down Expand Up @@ -1019,6 +1020,10 @@ func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, event
return lp.orm.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
}

func (lp *logPoller) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...)
}

// GetBlocksRange tries to get the specified block numbers from the log pollers
// blocks table. It falls back to the RPC for any unfulfilled requested blocks.
func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
Expand Down
33 changes: 33 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ func (o *ObservedORM) SelectLogsDataWordGreaterThan(address common.Address, even
})
}

func (o *ObservedORM) SelectLogsDataWordBetween(address common.Address, eventSIg common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "SelectLogsDataWordBetween", func() ([]Log, error) {
return o.ORM.SelectLogsDataWordBetween(address, eventSIg, wordIndexMin, wordIndexMax, wordValue, confs, qopts...)
})
}

func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) {
return o.ORM.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...)
Expand Down
27 changes: 27 additions & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ORM interface {
SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
SelectLogsDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
SelectLogsDataWordBetween(address common.Address, eventSIg common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
}

type DbORM struct {
Expand Down Expand Up @@ -535,6 +536,32 @@ func (o *DbORM) SelectLogsDataWordGreaterThan(address common.Address, eventSig c
return logs, nil
}

func (o *DbORM) SelectLogsDataWordBetween(address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withWordIndexMin(wordIndexMin).
withWordIndexMax(wordIndexMax).
withWordValue(wordValue).
withConfs(confs).
toArgs()
if err != nil {
return nil, err
}
query := fmt.Sprintf(`
SELECT * FROM evm.logs
WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :event_sig
AND substring(data from 32*:word_index_min+1 for 32) <= :word_value
AND substring(data from 32*:word_index_max+1 for 32) >= :word_value
AND block_number <= %s
ORDER BY (block_number, log_index)`, nestedBlockNumberQuery(confs))
var logs []Log
if err = o.q.WithOpts(qopts...).SelectNamed(&logs, query, args); err != nil {
return nil, err
}
return logs, nil
}

func (o *DbORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withTopicIndex(topicIndex).
Expand Down
129 changes: 129 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ func GenLogWithTimestamp(chainID *big.Int, logIndex int64, blockNum int64, block
}
}

func GenLogWithData(chainID *big.Int, address common.Address, eventSig common.Hash, logIndex int64, blockNum int64, data []byte) logpoller.Log {
return logpoller.Log{
EvmChainId: utils.NewBig(chainID),
LogIndex: logIndex,
BlockHash: utils.RandomBytes32(),
BlockNumber: blockNum,
EventSig: eventSig,
Topics: [][]byte{},
Address: address,
TxHash: utils.RandomBytes32(),
Data: data,
BlockTimestamp: time.Now(),
}
}

func TestLogPoller_Batching(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)
Expand Down Expand Up @@ -1434,3 +1449,117 @@ func TestInsertLogsInTx(t *testing.T) {
})
}
}

func TestSelectLogsDataWordBetween(t *testing.T) {
address := utils.RandomAddress()
eventSig := utils.RandomBytes32()
th := SetupTH(t, false, 2, 3, 2, 1000)

firstLogData := make([]byte, 0, 64)
firstLogData = append(firstLogData, logpoller.EvmWord(1).Bytes()...)
firstLogData = append(firstLogData, logpoller.EvmWord(10).Bytes()...)

secondLogData := make([]byte, 0, 64)
secondLogData = append(secondLogData, logpoller.EvmWord(5).Bytes()...)
secondLogData = append(secondLogData, logpoller.EvmWord(20).Bytes()...)

err := th.ORM.InsertLogsWithBlock(
[]logpoller.Log{
GenLogWithData(th.ChainID, address, eventSig, 1, 1, firstLogData),
GenLogWithData(th.ChainID, address, eventSig, 2, 2, secondLogData),
},
logpoller.NewLogPollerBlock(utils.RandomBytes32(), 10, time.Now(), 1),
)
require.NoError(t, err)

tests := []struct {
name string
wordValue uint64
expectedLogs []int64
}{
{
name: "returns only first log",
wordValue: 2,
expectedLogs: []int64{1},
},
{
name: "returns only second log",
wordValue: 11,
expectedLogs: []int64{2},
},
{
name: "returns both logs if word value is between",
wordValue: 5,
expectedLogs: []int64{1, 2},
},
{
name: "returns no logs if word value is outside of the range",
wordValue: 21,
expectedLogs: []int64{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logs, err1 := th.ORM.SelectLogsDataWordBetween(address, eventSig, 0, 1, logpoller.EvmWord(tt.wordValue), logpoller.Unconfirmed)
assert.NoError(t, err1)
assert.Len(t, logs, len(tt.expectedLogs))

for index := range logs {
assert.Equal(t, tt.expectedLogs[index], logs[index].BlockNumber)
}
})
}
}

func Benchmark_LogsDataWordBetween(b *testing.B) {
chainId := big.NewInt(137)
_, db := heavyweight.FullTestDBV2(b, "logs_data_word_between", nil)
o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false))

numberOfReports := 100_000
numberOfMessagesPerReport := 256

commitStoreAddress := utils.RandomAddress()
commitReportAccepted := utils.RandomBytes32()

var dbLogs []logpoller.Log
for i := 0; i < numberOfReports; i++ {
data := make([]byte, 96)
// MinSeqNr
data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*i+1)).Bytes()...)
// MaxSeqNr
data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*(i+1))).Bytes()...)

dbLogs = append(dbLogs, logpoller.Log{
EvmChainId: utils.NewBig(chainId),
LogIndex: int64(i + 1),
BlockHash: utils.RandomBytes32(),
BlockNumber: int64(i + 1),
BlockTimestamp: time.Now(),
EventSig: commitReportAccepted,
Topics: [][]byte{},
Address: commitStoreAddress,
TxHash: utils.RandomAddress().Hash(),
Data: data,
CreatedAt: time.Now(),
})
}
require.NoError(b, o.InsertBlock(utils.RandomAddress().Hash(), int64(numberOfReports*numberOfMessagesPerReport), time.Now(), int64(numberOfReports*numberOfMessagesPerReport)))
require.NoError(b, o.InsertLogs(dbLogs))

b.ResetTimer()

for i := 0; i < b.N; i++ {
logs, err := o.SelectLogsDataWordBetween(
commitStoreAddress,
commitReportAccepted,
3,
4,
logpoller.EvmWord(uint64(numberOfReports*numberOfMessagesPerReport/2)), // Pick the middle report
logpoller.Unconfirmed,
)
assert.NoError(b, err)
assert.Len(b, logs, 1)
}
}
12 changes: 12 additions & 0 deletions core/chains/evm/logpoller/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ func (q *queryArgs) withWordIndex(wordIndex int) *queryArgs {
return q.withCustomArg("word_index", wordIndex)
}

func (q *queryArgs) withWordIndexMin(wordIndex int) *queryArgs {
return q.withCustomArg("word_index_min", wordIndex)
}

func (q *queryArgs) withWordIndexMax(wordIndex int) *queryArgs {
return q.withCustomArg("word_index_max", wordIndex)
}

func (q *queryArgs) withWordValue(wordValue common.Hash) *queryArgs {
return q.withCustomHashArg("word_value", wordValue)
}

func (q *queryArgs) withWordValueMin(wordValueMin common.Hash) *queryArgs {
return q.withCustomHashArg("word_value_min", wordValueMin)
}
Expand Down
11 changes: 4 additions & 7 deletions core/services/ocr2/plugins/ccip/execution_batch_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,14 @@ func validateSeqNumbers(serviceCtx context.Context, commitStore ccipdata.CommitS

// Gets the commit report from the saved logs for a given sequence number.
func getCommitReportForSeqNum(ctx context.Context, commitStoreReader ccipdata.CommitStoreReader, seqNum uint64) (ccipdata.CommitStoreReport, error) {
acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteSeqNum(ctx, seqNum, 0)
acceptedReports, err := commitStoreReader.GetCommitReportMatchingSeqNum(ctx, seqNum, 0)
if err != nil {
return ccipdata.CommitStoreReport{}, err
}

for _, acceptedReport := range acceptedReports {
reportInterval := acceptedReport.Data.Interval
if reportInterval.Min <= seqNum && seqNum <= reportInterval.Max {
return acceptedReport.Data, nil
}
if len(acceptedReports) == 0 {
return ccipdata.CommitStoreReport{}, errors.Errorf("seq number not committed")
}

return ccipdata.CommitStoreReport{}, errors.Errorf("seq number not committed")
return acceptedReports[0].Data, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestExecutionReportingPlugin_buildReport(t *testing.T) {
commitStore.On("VerifyExecutionReport", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
commitStore.On("GetExpectedNextSequenceNumber", mock.Anything).
Return(executionReport.Messages[len(executionReport.Messages)-1].SequenceNumber+1, nil)
commitStore.On("GetAcceptedCommitReportsGteSeqNum", ctx, observations[0].SeqNr, 0).
commitStore.On("GetCommitReportMatchingSeqNum", ctx, observations[0].SeqNr, 0).
Return([]ccipdata.Event[ccipdata.CommitStoreReport]{
{
Data: ccipdata.CommitStoreReport{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ type CommitStoreReader interface {
Closer
GetExpectedNextSequenceNumber(context context.Context) (uint64, error)
GetLatestPriceEpochAndRound(context context.Context) (uint64, error)
// GetAcceptedCommitReportsGteSeqNum returns all the accepted commit reports that have max sequence number greater than or equal to the provided.
GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error)
// GetCommitReportMatchingSeqNum returns accepted commit report that satisfies Interval.Min <= seqNum <= Interval.Max. Returned slice should be empty or have exactly one element
GetCommitReportMatchingSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error)
// GetAcceptedCommitReportsGteTimestamp returns all the commit reports with timestamp greater than or equal to the provided.
// Returned Commit Reports have to be sorted by Interval.Min/Interval.Max in ascending order.
GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]Event[CommitStoreReport], error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,20 +343,24 @@ func TestCommitStoreReaders(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, rep.Interval.Max+1, seqNr)

reps, err := cr.GetAcceptedCommitReportsGteSeqNum(context.Background(), rep.Interval.Max+1, 0)
reps, err := cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Max+1, 0)
require.NoError(t, err)
assert.Len(t, reps, 0)

reps, err = cr.GetAcceptedCommitReportsGteSeqNum(context.Background(), rep.Interval.Max, 0)
reps, err = cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Max, 0)
require.NoError(t, err)
require.Len(t, reps, 1)
assert.Equal(t, reps[0].Data, rep)

reps, err = cr.GetAcceptedCommitReportsGteSeqNum(context.Background(), rep.Interval.Min-1, 0)
reps, err = cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Min, 0)
require.NoError(t, err)
require.Len(t, reps, 1)
assert.Equal(t, reps[0].Data, rep)

reps, err = cr.GetCommitReportMatchingSeqNum(context.Background(), rep.Interval.Min-1, 0)
require.NoError(t, err)
require.Len(t, reps, 0)

// Sanity
reps, err = cr.GetAcceptedCommitReportsGteTimestamp(context.Background(), time.Unix(0, 0), 0)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ func (c *CommitStoreV1_0_0) parseReport(log types.Log) (*CommitStoreReport, erro
}, nil
}

func (c *CommitStoreV1_0_0) GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) {
logs, err := c.lp.LogsDataWordGreaterThan(
func (c *CommitStoreV1_0_0) GetCommitReportMatchingSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) {
logs, err := c.lp.LogsDataWordBetween(
c.reportAcceptedSig,
c.address,
c.reportAcceptedMaxSeqIndex-1,
c.reportAcceptedMaxSeqIndex,
logpoller.EvmWord(seqNum),
logpoller.Confirmations(confs),
Expand All @@ -274,11 +275,20 @@ func (c *CommitStoreV1_0_0) GetAcceptedCommitReportsGteSeqNum(ctx context.Contex
return nil, err
}

return parseLogs[CommitStoreReport](
parsedLogs, err := parseLogs[CommitStoreReport](
logs,
c.lggr,
c.parseReport,
)
if err != nil {
return nil, err
}

if len(parsedLogs) > 1 {
c.lggr.Errorw("More than one report found for seqNum", "seqNum", seqNum, "commitReports", parsedLogs)
return parsedLogs[:1], nil
}
return parsedLogs, nil
}

func (c *CommitStoreV1_0_0) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]Event[CommitStoreReport], error) {
Expand Down
Loading

0 comments on commit 84a60b8

Please sign in to comment.