Skip to content

Commit

Permalink
progress, going to rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAustinWang committed Jan 10, 2025
1 parent 76689aa commit 42ea568
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 10 deletions.
31 changes: 30 additions & 1 deletion integration-tests/load/ccip/ccip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ccip
import (
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-testing-framework/wasp"
Expand Down Expand Up @@ -64,6 +65,8 @@ func TestCCIPLoad_RPS(t *testing.T) {
require.NoError(t, err)
defer loki.StopNow()

gunMap := make(map[uint64]*DestinationGun)

// Based on the config, initialize DestinationGun
p := wasp.NewProfile()
for selector, chain := range env.Chains {
Expand All @@ -72,6 +75,11 @@ func TestCCIPLoad_RPS(t *testing.T) {
block := latesthdr.Number.Uint64()
startBlocks[selector] = &block

gunMap[selector] = NewDestinationGun(env.Logger, selector, *env, state.Chains[selector].Receiver.Address(), loki)

}

for _, gun := range gunMap {
p.Add(wasp.NewGenerator(&wasp.Config{
T: t,
GenName: "ccipLoad",
Expand All @@ -82,21 +90,36 @@ func TestCCIPLoad_RPS(t *testing.T) {
// this schedule is per generator
// in this example, it would be 1 request per 10seconds per generator (dest chain)
// so if there are 3 generators, it would be 3 requests per 10 seconds over the network
Gun: NewDestinationGun(env.Logger, selector, *env, state.Chains[selector].Receiver.Address(), loki),
Gun: gun,
Labels: CommonTestLabels,
LokiConfig: wasp.NewLokiConfig(config.CCIP.Load.LokiEndpoint, nil, nil, nil),
// use the same loki client using `NewLokiClient` with the same config for sending events
}))
}

_, err = p.Run(true)
csPair := ChainSelectorPair{
src: 12922642891491394802,
dst: 3379446385462418246,
}
src, dst := env.Chains[csPair.src], env.Chains[csPair.dst]
startblk := uint64(11654)

seqNum := gunMap[csPair.dst].seqNums[csPair].End.Load()
_, err = ccipchangeset.ConfirmCommitWithExpectedSeqNumRange(t, src, dst, state.Chains[3379446385462418246].OffRamp, &startblk, cciptypes.SeqNumRange{
cciptypes.SeqNum(seqNum),
cciptypes.SeqNum(seqNum),
}, false)

lokiLabels := map[string]string{}
for chainSelector, startBlock := range startBlocks {
wg.Add(1)
go func(chainSelector uint64, startBlock *uint64) {
defer wg.Done()
lggr.Infow("Starting to query for events on ", "chainSelector", chainSelector, "startblock", startBlock)
latesthdr, err := env.Chains[chainSelector].Client.HeaderByNumber(ctx, nil)
require.NoError(t, err)
lggr.Infow("Current block number", "chainSelector", chainSelector, "block", latesthdr.Number.Uint64())

filterOpts := &bind.FilterOpts{
Start: *startBlock,
Expand All @@ -112,6 +135,8 @@ func TestCCIPLoad_RPS(t *testing.T) {
fmt.Printf("Events on commitIterator %+v", commitIterator)

for commitIterator.Next() {
fmt.Printf("Events on commitIterator inside %+v", commitIterator)

event := commitIterator.Event
fmt.Printf("CommitReportAccepted event: %+v\n", event)

Expand All @@ -126,6 +151,8 @@ func TestCCIPLoad_RPS(t *testing.T) {

for i := root.MinSeqNr; i <= root.MaxSeqNr; i++ {
// todo: push loki calls to channel?
fmt.Printf("Pushed loki for seqNumber %d\n", i)

SendMetricsToLoki(lggr, loki, lokiLabels, &LokiMetric{
EventType: committed,
Timestamp: timestamp,
Expand All @@ -150,6 +177,8 @@ func TestCCIPLoad_RPS(t *testing.T) {
// todo: push loki calls to channel?
lokiLabels, err = setLokiLabels(execIterator.Event.SourceChainSelector, chainSelector)
require.NoError(t, err)

fmt.Printf("Pushed loki exec for seqNumber %d\n", execIterator.Event.SequenceNumber)
SendMetricsToLoki(lggr, loki, lokiLabels, &LokiMetric{
EventType: executed,
Timestamp: timestamp,
Expand Down
59 changes: 51 additions & 8 deletions integration-tests/load/ccip/destination_gun.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,33 @@ type ChainSelectorPair struct {
dst uint64
}

type SeqNumRange struct {
Start *atomic.Uint64
End *atomic.Uint64
}

type DestinationGun struct {
l logger.Logger
env deployment.Environment
seqNums map[ChainSelectorPair]*atomic.Uint64
seqNums map[ChainSelectorPair]SeqNumRange
roundNum *atomic.Int32
chainSelector uint64
receiver common.Address
loki *wasp.LokiClient
}

func NewDestinationGun(l logger.Logger, chainSelector uint64, env deployment.Environment, receiver common.Address, loki *wasp.LokiClient) *DestinationGun {
seqNums := make(map[ChainSelectorPair]*atomic.Uint64)
seqNums := make(map[ChainSelectorPair]SeqNumRange)
for _, cs := range env.AllChainSelectorsExcluding([]uint64{chainSelector}) {

// query for the actual sequence number
seqNums[ChainSelectorPair{
src: cs,
dst: chainSelector,
}] = atomic.NewUint64(1)
}] = SeqNumRange{
Start: atomic.NewUint64(0),
End: atomic.NewUint64(0),
}
}
return &DestinationGun{
l: l,
Expand Down Expand Up @@ -66,12 +75,16 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response {
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}

lokiLabels, err := setLokiLabels(src, m.chainSelector)
if err != nil {
m.l.Errorw("Failed setting loki labels", "error", err)
}

csPair := ChainSelectorPair{
src: src,
dst: m.chainSelector,
}
m.seqNums[csPair].Add(1)
m.l.Infow("Starting transmit with ", "RoundNum", requestedRound, "Destination ChainSelector", m.chainSelector, "Source ChainSelector", src, "SequenceNumber", m.seqNums[csPair].Load())
m.l.Infow("Starting transmit with ", "RoundNum", requestedRound, "Destination ChainSelector", m.chainSelector, "Source ChainSelector", src, "SequenceNumber", m.seqNums[csPair].End.Load())

r := state.Chains[src].Router

Expand Down Expand Up @@ -100,16 +113,38 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response {
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}

lokiLabels, err := setLokiLabels(src, m.chainSelector)
blockNum, err := m.env.Chains[src].Confirm(tx)
if err != nil {
m.l.Errorw("Failed setting loki labels", "error", err)
m.l.Errorw("could not confirm tx on source", "tx", tx, "err", err)
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}

it, err := state.Chains[src].OnRamp.FilterCCIPMessageSent(&bind.FilterOpts{
Start: blockNum,
End: &blockNum,
Context: context.Background(),
}, []uint64{m.chainSelector}, []uint64{})
if err != nil {
m.l.Errorw("could not find sent message event on src chain", "src", src, "dst", m.chainSelector, "err", err)
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}
if !it.Next() {
m.l.Errorw("Could not find event")
return &wasp.Response{Error: "Could not iterate", Group: waspGroup, Failed: true}
}

SendMetricsToLoki(m.l, m.loki, lokiLabels, &LokiMetric{
EventType: transmitted,
Timestamp: time.Now(),
SequenceNumber: m.seqNums[csPair].Load(),
SequenceNumber: m.seqNums[csPair].End.Load(),
})

if m.seqNums[csPair].End.Load() == 0 {
m.seqNums[csPair].Start.Store(it.Event.SequenceNumber)
}

m.seqNums[csPair].End.Store(it.Event.SequenceNumber)

return &wasp.Response{Failed: false, Group: waspGroup}
}

Expand Down Expand Up @@ -142,3 +177,11 @@ func (m *DestinationGun) GetMessage() (router.ClientEVM2AnyMessage, error) {
ExtraArgs: nil,
}, nil
}

func (m *DestinationGun) GetSequenceNumberRange(csPair ChainSelectorPair) (uint64, uint64, error) {
if r, ok := m.seqNums[csPair]; !ok {
return 0, 0, fmt.Errorf("no sequence number found for chain pair %v", csPair)
} else {
return r.Start.Load(), r.End.Load(), nil
}
}
2 changes: 1 addition & 1 deletion integration-tests/testconfig/ccip/ccip.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,5 @@ ephemeral_addresses_number = 0


[Load.CCIP.Load]
LokiEndpoint = "http://localhost:3030/loki/api/v1/push"
LokiEndpoint = "http://loki.localhost/loki/api/v1/push"
NoOfNodes = 4

0 comments on commit 42ea568

Please sign in to comment.