Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Nov 3, 2023
1 parent 27f193b commit a1dcd7f
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 5 deletions.
68 changes: 64 additions & 4 deletions core/chains/evm/logpoller/log_poller_ccip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/test-go/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand All @@ -18,7 +19,7 @@ import (

const (
numberOfReports = 10000
numberOfMessagesPerReport = 10
numberOfMessagesPerReport = 100
numberOfMessages = numberOfReports * numberOfMessagesPerReport
)

Expand Down Expand Up @@ -77,6 +78,33 @@ func populateDbWithExecutionStateChanges(b *testing.B, o *logpoller.DbORM, chain
require.NoError(b, o.InsertLogs(logs))
}

func populateDbWithSomeExecuted(b *testing.B, o *logpoller.DbORM, chainID *big.Int, offrampAddress common.Address, offrampExecuted common.Hash) {
var logs []logpoller.Log
for i := 1; i <= numberOfMessages; i += 2 {
var topics [][]byte
for j := 0; j < 5; j++ {
topics = append(topics, logpoller.EvmWord(uint64(i)).Bytes())
}

logs = append(logs, logpoller.Log{
EvmChainId: utils.NewBig(chainID),
LogIndex: int64(i),
BlockHash: utils.RandomBytes32(),
BlockNumber: int64(i),
BlockTimestamp: time.Now(),
EventSig: offrampExecuted,
Topics: topics,
Address: offrampAddress,
TxHash: utils.RandomAddress().Hash(),
Data: []byte{},
CreatedAt: time.Now(),
})

}
require.NoError(b, o.InsertBlock(utils.RandomAddress().Hash(), int64(100_000), time.Now()))
require.NoError(b, o.InsertLogs(logs))
}

func populateDbWithMessages(b *testing.B, o *logpoller.DbORM, chainID *big.Int, onrampAddress common.Address, onrampEvent common.Hash) {
var logs []logpoller.Log
for i := 1; i <= numberOfMessages; i++ {
Expand Down Expand Up @@ -164,7 +192,7 @@ func Benchmark_GetExecutionStatesAndMessages(b *testing.B) {
0,
)
require.NoError(b, err)
require.Len(b, commitReports, numberOfReports)
assert.Len(b, commitReports, numberOfReports)
fmt.Printf("Commit Reports: %d millis\n", time.Since(start).Milliseconds())

start = time.Now()
Expand All @@ -177,7 +205,7 @@ func Benchmark_GetExecutionStatesAndMessages(b *testing.B) {
0,
)
require.NoError(b, err)
require.Len(b, messages, numberOfMessages)
assert.Len(b, messages, numberOfMessages)
fmt.Printf("OnRamp messages: %d millis\n", time.Since(start).Milliseconds())

start = time.Now()
Expand All @@ -190,7 +218,7 @@ func Benchmark_GetExecutionStatesAndMessages(b *testing.B) {
0,
)
require.NoError(b, err)
require.Len(b, executionStateChanges, numberOfMessages)
assert.Len(b, executionStateChanges, numberOfMessages)
fmt.Printf("Offramp exec state changes: %d millis\n", time.Since(start).Milliseconds())
}
}
Expand Down Expand Up @@ -258,3 +286,35 @@ func Benchmark_SingleQueryNoneExecuted(b *testing.B) {
require.Len(b, logs, numberOfReports)
}
}

func Benchmark_SingleQuerySomeExecuted(b *testing.B) {
chainId := big.NewInt(137)
_, db := heavyweight.FullTestDBV2(b, "log_match_all", nil)
o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false))

commitStoreAddress := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb")
commitReportAccepted := common.HexToHash("0xe81b49e583122eb290c46fc255c962b9a2dec468816c00fb7a2e6ebc42dc92d4")

offrampAddress := common.HexToAddress("0x6E225058950f237371261C985Db6bDe26df2200E")
offrampExecuted := common.HexToHash("0xd4f851956a5d67c3997d1c9205045fef79bae2947fdee7e9e2641abc7391ef65")

populateDbWithCommitReports(b, o, chainId, commitStoreAddress, commitReportAccepted)
populateDbWithSomeExecuted(b, o, chainId, offrampAddress, offrampExecuted)

b.Log("Db load, running query")
b.ResetTimer()

for i := 0; i < b.N; i++ {
start := time.Now()
logs, err := o.FetchNotExecutedReports(
commitStoreAddress,
commitReportAccepted,
offrampAddress,
offrampExecuted,
time.Now().Add(-1*time.Hour),
)
fmt.Printf("%d millis\n", time.Since(start).Milliseconds())
require.NoError(b, err)
require.Len(b, logs, numberOfReports)
}
}
91 changes: 90 additions & 1 deletion core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ func (o *DbORM) FetchNotExecutedReports(
offrampEventSig common.Hash,
after time.Time,
qopts ...pg.QOpt,
) ([]Log, error) {
return o.FetchNotExecutedReportsV4(commitStoreAddr, commitStoreEvent, offrampAddress, offrampEventSig, after, qopts...)
}

func (o *DbORM) FetchNotExecutedReportsV1(
commitStoreAddr common.Address,
commitStoreEvent common.Hash,
offrampAddress common.Address,
offrampEventSig common.Hash,
after time.Time,
qopts ...pg.QOpt,
) ([]Log, error) {
var logs []Log
queryArgs := map[string]interface{}{
Expand All @@ -309,7 +320,7 @@ func (o *DbORM) FetchNotExecutedReports(
"offramp_addr": offrampAddress,
"offramp_event": offrampEventSig,
"block_timestamp_after": after,
"topic_index": 1,
"topic_index": 2,
"min_seq_word_index": 2,
"max_seq_word_index": 3,
}
Expand Down Expand Up @@ -339,6 +350,84 @@ func (o *DbORM) FetchNotExecutedReports(
return logs, err
}

func (o *DbORM) FetchNotExecutedReportsV3(
commitStoreAddr common.Address,
commitStoreEvent common.Hash,
offrampAddress common.Address,
offrampEventSig common.Hash,
after time.Time,
qopts ...pg.QOpt,
) ([]Log, error) {
var logs []Log
queryArgs := map[string]interface{}{
"chain_id": utils.NewBig(o.chainID),
"commit_store_addr": commitStoreAddr,
"commit_store_event": commitStoreEvent,
"offramp_addr": offrampAddress,
"offramp_event": offrampEventSig,
"block_timestamp_after": after,
"topic_index": 2,
"min_seq_word_index": 2,
"max_seq_word_index": 3,
}
q := o.q.WithOpts(qopts...)
err := q.SelectNamed(&logs, `
SELECT reports.* FROM evm.logs reports
WHERE reports.evm_chain_id = :chain_id
AND reports.address = :commit_store_addr
AND reports.event_sig = :commit_store_event
AND reports.block_timestamp > :block_timestamp_after
AND NOT EXISTS(
SELECT 1 FROM evm.logs executed
WHERE executed.evm_chain_id = :chain_id
AND executed.address = :offramp_addr
AND executed.event_sig = :offramp_event
AND executed.topics[:topic_index]
BETWEEN substring(reports.data from 32*2+1 for 32)
AND substring(reports.data from 32*3+1 for 32)
)
ORDER BY (reports.block_number, reports.log_index)`, queryArgs)
return logs, err
}

func (o *DbORM) FetchNotExecutedReportsV4(
commitStoreAddr common.Address,
commitStoreEvent common.Hash,
offrampAddress common.Address,
offrampEventSig common.Hash,
after time.Time,
qopts ...pg.QOpt,
) ([]Log, error) {
var logs []Log
queryArgs := map[string]interface{}{
"chain_id": utils.NewBig(o.chainID),
"commit_store_addr": commitStoreAddr,
"commit_store_event": commitStoreEvent,
"offramp_addr": offrampAddress,
"offramp_event": offrampEventSig,
"block_timestamp_after": after,
"topic_index": 2,
"min_seq_word_index": 2,
"max_seq_word_index": 3,
}
q := o.q.WithOpts(qopts...)
err := q.SelectNamed(&logs, `
SELECT reports.* FROM evm.logs reports
WHERE reports.evm_chain_id = :chain_id
AND reports.address = :commit_store_addr
AND reports.event_sig = :commit_store_event
AND reports.block_timestamp > :block_timestamp_after
AND (SELECT count(*)
FROM evm.logs executed
WHERE executed.topics[:topic_index] BETWEEN substring(reports.data from 32 * :min_seq_word_index + 1 for 32) AND substring(reports.data from 32 * :max_seq_word_index + 1 for 32)
AND executed.evm_chain_id = :chain_id
AND executed.address = :offramp_addr
AND executed.event_sig = :offramp_event
) < ('x' || encode(substring(reports.data from 32*:max_seq_word_index+25 for 8), 'hex'))::::bit(64)::::bigint - ('x' || encode(substring(reports.data from 32*:min_seq_word_index+25 for 8), 'hex'))::::bit(64)::::bigint + 1
ORDER BY (reports.block_number, reports.log_index)`, queryArgs)
return logs, err
}

// SelectLogsWithSigsByBlockRangeFilter finds the logs in the given block range with the given event signatures
// emitted from the given address.
func (o *DbORM) SelectLogsWithSigs(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) (logs []Log, err error) {
Expand Down

0 comments on commit a1dcd7f

Please sign in to comment.