Skip to content

Commit

Permalink
exec: Implement filter state and nonce filtering business logic. (#69)
Browse files Browse the repository at this point in the history
* Move report generation into Filter state.
* Add Nonces function to reader.CCIP interface.
* Observe sender nonces during Filter state.
* Consider sender nonces during message filtering.
* Feature flag for checking message nonces.
  • Loading branch information
winder authored Aug 22, 2024
1 parent e6e2e51 commit 4b62581
Show file tree
Hide file tree
Showing 20 changed files with 1,029 additions and 225 deletions.
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ packages:
github.com/smartcontractkit/chainlink-ccip/internal/reader:
interfaces:
HomeChain:
CCIP:
11 changes: 6 additions & 5 deletions commit/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
helpers "github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers"
"github.com/smartcontractkit/chainlink-ccip/internal/mocks"
"github.com/smartcontractkit/chainlink-ccip/internal/reader"
reader_mock "github.com/smartcontractkit/chainlink-ccip/mocks/internal_/reader"
"github.com/smartcontractkit/chainlink-ccip/pkg/consts"
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
"github.com/smartcontractkit/chainlink-ccip/plugintypes"
Expand Down Expand Up @@ -360,22 +361,22 @@ func setupNodesDoNotReportGasPrices(ctx context.Context, t *testing.T, lggr logg

type nodeSetup struct {
node *Plugin
ccipReader *mocks.CCIPReader
ccipReader *reader_mock.MockCCIP
priceReader *mocks.TokenPricesReader
reportCodec *mocks.CommitPluginJSONReportCodec
msgHasher *mocks.MessageHasher
}

func newNode(
_ context.Context,
_ *testing.T,
t *testing.T,
lggr logger.Logger,
id int,
cfg pluginconfig.CommitPluginConfig,
homeChain reader.HomeChain,
oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID,
) nodeSetup {
ccipReader := mocks.NewCCIPReader()
ccipReader := reader_mock.NewMockCCIP(t)
priceReader := mocks.NewTokenPricesReader()
reportCodec := mocks.NewCommitPluginJSONReportCodec()
msgHasher := mocks.NewMessageHasher()
Expand Down Expand Up @@ -447,7 +448,7 @@ func setupHomeChainPoller(lggr logger.Logger, chainConfigInfos []reader.ChainCon
// the gas prices are returned in the same order as the chains
func mockGasPrices(
ctx context.Context,
ccipReader *mocks.CCIPReader,
ccipReader *reader_mock.MockCCIP,
chains []cciptypes.ChainSelector,
gasPrices []int64) {
gasPricesBigInt := make([]cciptypes.BigInt, len(gasPrices))
Expand All @@ -461,7 +462,7 @@ func mockGasPrices(

func mockMsgsBetweenSeqNums(
ctx context.Context,
ccipReader *mocks.CCIPReader,
ccipReader *reader_mock.MockCCIP,
chain cciptypes.ChainSelector,
seqNum cciptypes.SeqNum,
msgs []cciptypes.Message) {
Expand Down
30 changes: 17 additions & 13 deletions commit/plugin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ import (
"time"

mapset "github.com/deckarep/golang-set/v2"

libocrtypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-ccip/internal/libs/slicelib"
"github.com/smartcontractkit/chainlink-ccip/internal/mocks"
reader_mock "github.com/smartcontractkit/chainlink-ccip/mocks/internal_/reader"
"github.com/smartcontractkit/chainlink-ccip/plugintypes"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

func Test_observeMaxSeqNumsPerChain(t *testing.T) {
Expand Down Expand Up @@ -71,7 +72,7 @@ func Test_observeMaxSeqNumsPerChain(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
mockReader := mocks.NewCCIPReader()
mockReader := reader_mock.NewMockCCIP(t)
knownSourceChains := slicelib.Filter(
tc.readChains,
func(ch cciptypes.ChainSelector) bool { return ch != tc.destChain },
Expand All @@ -86,13 +87,16 @@ func Test_observeMaxSeqNumsPerChain(t *testing.T) {
onChainSeqNums = append(onChainSeqNums, v)
}
}
mockReader.On("NextSeqNum", ctx, knownSourceChains).Return(onChainSeqNums, nil)
readableChains := mapset.NewSet(tc.readChains...)
if readableChains.Contains(tc.destChain) {
mockReader.On("NextSeqNum", ctx, knownSourceChains).Return(onChainSeqNums, nil)
}

seqNums, err := observeLatestCommittedSeqNums(
ctx,
lggr,
mockReader,
mapset.NewSet(tc.readChains...),
readableChains,
tc.destChain,
knownSourceChains,
)
Expand Down Expand Up @@ -212,7 +216,7 @@ func Test_observeNewMsgs(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
mockReader := mocks.NewCCIPReader()
mockReader := reader_mock.NewMockCCIP(t)
msgHasher := mocks.NewMessageHasher()
for i := range tc.expMsgs { // make sure the hashes are populated
h, err := msgHasher.Hash(ctx, tc.expMsgs[i])
Expand Down Expand Up @@ -270,7 +274,7 @@ func Benchmark_observeNewMsgs(b *testing.B) {
for i := 0; i < b.N; i++ {
ctx := context.Background()
lggr, _ := logger.New()
ccipReader := mocks.NewCCIPReader()
ccipReader := reader_mock.NewMockCCIP(b)
msgHasher := mocks.NewMessageHasher()

expNewMsgs := make([]cciptypes.Message, 0, newMsgsPerChain*numChains)
Expand Down Expand Up @@ -349,7 +353,7 @@ func Test_observeGasPrices(t *testing.T) {
ctx := context.Background()

t.Run("happy path", func(t *testing.T) {
mockReader := mocks.NewCCIPReader()
mockReader := reader_mock.NewMockCCIP(t)
chains := []cciptypes.ChainSelector{1, 2, 3}
mockGasPrices := []cciptypes.BigInt{
cciptypes.NewBigIntFromInt64(10),
Expand All @@ -367,7 +371,7 @@ func Test_observeGasPrices(t *testing.T) {
})

t.Run("gas reader internal issue", func(t *testing.T) {
mockReader := mocks.NewCCIPReader()
mockReader := reader_mock.NewMockCCIP(t)
chains := []cciptypes.ChainSelector{1, 2, 3}
mockGasPrices := []cciptypes.BigInt{
cciptypes.NewBigIntFromInt64(10),
Expand Down Expand Up @@ -1535,7 +1539,7 @@ func Test_validateMerkleRootsState(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
reader := mocks.NewCCIPReader()
reader := reader_mock.NewMockCCIP(t)
rep := cciptypes.CommitPluginReport{}
chains := make([]cciptypes.ChainSelector, 0, len(tc.reportSeqNums))
for _, snc := range tc.reportSeqNums {
Expand Down
23 changes: 16 additions & 7 deletions commitrmnocb/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
"testing"

mapset "github.com/deckarep/golang-set/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/internal/mocks"
reader_mock "github.com/smartcontractkit/chainlink-ccip/mocks/internal_/reader"
"github.com/smartcontractkit/chainlink-ccip/plugintypes"
)

Expand Down Expand Up @@ -223,10 +225,12 @@ func Test_ObserveOffRampNextSeqNums(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
var nodeID commontypes.OracleID = 1
reader := mocks.NewCCIPReader()
reader.On(
"NextSeqNum", ctx, tc.knownSourceChains,
).Return(tc.nextSeqNums, tc.nextSeqNumsError)
reader := reader_mock.NewMockCCIP(t)
if tc.supportsDestChain && tc.supportsDestChainError == nil && tc.knownSourceChainsError == nil {
reader.On(
"NextSeqNum", ctx, tc.knownSourceChains,
).Return(tc.nextSeqNums, tc.nextSeqNumsError)
}

chainSupport := mocks.NewChainSupport()
chainSupport.On(
Expand Down Expand Up @@ -426,8 +430,13 @@ func Test_ObserveMerkleRoots(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
var nodeID commontypes.OracleID = 1
reader := mocks.NewCCIPReader()
reader := reader_mock.NewMockCCIP(t)
for _, r := range tc.ranges {
// Skip unexpected calls.
if tc.supportedChainsFails || !tc.supportedChains.Contains(r.ChainSel) {
continue
}

var err error
if e, exists := tc.msgsBetweenSeqNumsErrors[r.ChainSel]; exists {
err = e
Expand Down
24 changes: 22 additions & 2 deletions execute/exectypes/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,57 @@ import (
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
)

// CommitObservations contain the commit plugin report data organized by the source chain selector.
type CommitObservations map[cciptypes.ChainSelector][]CommitData

// MessageObservations contain messages for commit plugin reports organized by source chain selector
// and sequence number.
type MessageObservations map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message

// NonceObservations contain the latest nonce for senders in the previously observed messages.
// Nonces are organized by source chain selector and the string encoded sender address. The address
// must be encoding according to the destination chain requirements with typeconv.AddressBytesToString.
type NonceObservations map[cciptypes.ChainSelector]map[string]uint64

// Observation is the observation of the ExecutePlugin.
// TODO: revisit observation types. The maps used here are more space efficient and easier to work
// with but require more transformations compared to the on-chain representations.
type Observation struct {
// CommitReports are determined during the first phase of execute.
// It contains the commit reports we would like to execute in the following round.
CommitReports CommitObservations `json:"commitReports"`

// Messages are determined during the second phase of execute.
// Ideally, it contains all the messages identified by the previous outcome's
// NextCommits. With the previous outcome, and these messsages, we can build the
// execute report.
Messages MessageObservations `json:"messages"`
// TODO: some of the nodes configuration may need to be included here.

// Nonces are determined during the third phase of execute.
// It contains the nonces of senders who are being considered for the final report.
Nonces NonceObservations `json:"nonces"`
}

// NewObservation constructs a Observation object.
func NewObservation(
commitReports CommitObservations, messages MessageObservations) Observation {
commitReports CommitObservations, messages MessageObservations, nonces NonceObservations) Observation {
return Observation{
CommitReports: commitReports,
Messages: messages,
Nonces: nonces,
}
}

// Encode the Observation into a byte slice.
func (obs Observation) Encode() ([]byte, error) {
return json.Marshal(obs)
}

// DecodeObservation from a byte slice into an Observation.
func DecodeObservation(b []byte) (Observation, error) {
if len(b) == 0 {
return Observation{}, nil
}
obs := Observation{}
err := json.Unmarshal(b, &obs)
return obs, err
Expand Down
3 changes: 1 addition & 2 deletions execute/exectypes/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func (p PluginState) Next() PluginState {
return GetMessages

case GetMessages:
// TODO: go to Filter after GetMessages
return GetCommitReports
return Filter

case Unknown:
fallthrough
Expand Down
7 changes: 6 additions & 1 deletion execute/exectypes/outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ func TestPluginState_Next(t *testing.T) {
want: GetMessages,
},
{
name: "Phase 2 to 1",
name: "Phase 2 to 3",
p: GetMessages,
want: Filter,
},
{
name: "Phase 3 to 1",
p: Filter,
want: GetCommitReports,
},
{
Expand Down
Loading

0 comments on commit 4b62581

Please sign in to comment.