diff --git a/commit/merkleroot/rmn/controller.go b/commit/merkleroot/rmn/controller.go index 525ca3c12..61d0f5b90 100644 --- a/commit/merkleroot/rmn/controller.go +++ b/commit/merkleroot/rmn/controller.go @@ -16,6 +16,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "golang.org/x/exp/maps" + rand2 "golang.org/x/exp/rand" "google.golang.org/protobuf/proto" chainsel "github.com/smartcontractkit/chain-selectors" @@ -364,7 +365,7 @@ func (c *controller) sendObservationRequests( } req := &rmnpb.Request{ - RequestId: newRequestID(), + RequestId: newRequestID(c.lggr), Request: &rmnpb.Request_ObservationRequest{ ObservationRequest: &rmnpb.ObservationRequest{ LaneDest: destChain, @@ -810,7 +811,7 @@ func (c *controller) sendReportSignatureRequest( } req := &rmnpb.Request{ - RequestId: newRequestID(), + RequestId: newRequestID(c.lggr), Request: &rmnpb.Request_ReportSignatureRequest{ ReportSignatureRequest: reportSigReq, }, @@ -908,7 +909,7 @@ func (c *controller) listenForRmnReportSignatures( continue } req := &rmnpb.Request{ - RequestId: newRequestID(), + RequestId: newRequestID(c.lggr), Request: &rmnpb.Request_ReportSignatureRequest{ ReportSignatureRequest: reportSigReq, }, @@ -1072,11 +1073,17 @@ func randomShuffle[T any](s []T) []T { return ret } -func newRequestID() uint64 { +// newRequestID generates a new unique request ID. +func newRequestID(lggr logger.Logger) uint64 { b := make([]byte, 8) _, err := crand.Read(b) if err != nil { - panic(err) + // fallback to time-based id in the very rare scenario that the random number generator fails + lggr.Warnw("failed to generate random request id, falling back to golang.org/x/exp/rand", + "err", err, + ) + rand2.Seed(uint64(time.Now().UnixNano())) + return rand2.Uint64() } randomUint64 := binary.LittleEndian.Uint64(b) return randomUint64 diff --git a/commit/merkleroot/rmn/controller_test.go b/commit/merkleroot/rmn/controller_test.go index 62f19c5f8..e1f3a25f9 100644 --- a/commit/merkleroot/rmn/controller_test.go +++ b/commit/merkleroot/rmn/controller_test.go @@ -729,6 +729,16 @@ func Test_controller_validateSignedObservationResponse(t *testing.T) { } } +func Test_newRequestID(t *testing.T) { + ids := map[uint64]struct{}{} + for i := 0; i < 1000; i++ { + id := newRequestID(logger.Test(t)) + _, ok := ids[id] + assert.False(t, ok) + ids[id] = struct{}{} + } +} + func getDeterministicPubKey(t *testing.T) *ed25519.PublicKey { // deterministically create a public key by seeding with a 32char string. publicKey, _, err := ed25519.GenerateKey( diff --git a/execute/exectypes/observation.go b/execute/exectypes/observation.go index 331e4d7dc..543338d2c 100644 --- a/execute/exectypes/observation.go +++ b/execute/exectypes/observation.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" + "github.com/smartcontractkit/chainlink-ccip/execute/internal" dt "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/discovery/discoverytypes" cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" ) @@ -17,6 +18,8 @@ type MessageObservations map[cciptypes.ChainSelector]map[cciptypes.SeqNum]ccipty type MessageHashes map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Bytes32 +type EncodedMsgAndTokenDataSizes map[cciptypes.ChainSelector]map[cciptypes.SeqNum]int + // Flatten nested maps into a slice of messages. func (mo MessageObservations) Flatten() []cciptypes.Message { var results []cciptypes.Message @@ -43,6 +46,19 @@ func GetHashes(ctx context.Context, mo MessageObservations, hasher cciptypes.Mes return hashes, nil } +// GetEncodedMsgAndTokenDataSizes calculates the encoded sizes of messages and their token data counterpart. +func GetEncodedMsgAndTokenDataSizes(mo MessageObservations, tds TokenDataObservations) EncodedMsgAndTokenDataSizes { + sizes := make(EncodedMsgAndTokenDataSizes) + for chain, msgs := range mo { + sizes[chain] = make(map[cciptypes.SeqNum]int) + for seq, msg := range msgs { + td := tds[chain][seq] + sizes[chain][seq] = internal.EncodedSize(msg) + internal.EncodedSize(td) + } + } + return sizes +} + // NonceObservations contain the latest nonce for senders in the previously observed messages. // Nonces are organized by source chain selector and the string encoded sender address. The address // must be encoding according to the destination chain requirements with typeconv.AddressBytesToString. diff --git a/execute/internal/gas/gas_estimate_provider.go b/execute/internal/gas/gas_estimate_provider.go index 7ac6f0fba..226213e4d 100644 --- a/execute/internal/gas/gas_estimate_provider.go +++ b/execute/internal/gas/gas_estimate_provider.go @@ -1,6 +1,8 @@ package gas -import "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +import ( + "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) // EstimateProvider is used to estimate the gas cost of a message or a merkle tree. // TODO: Move to pkg/types/ccipocr3 or remove. diff --git a/execute/internal/utils.go b/execute/internal/utils.go new file mode 100644 index 000000000..3acb6ce7c --- /dev/null +++ b/execute/internal/utils.go @@ -0,0 +1,20 @@ +package internal + +import "encoding/json" + +func EncodedSize[T any](obj T) int { + enc, err := json.Marshal(obj) + if err != nil { + return 0 + } + return len(enc) +} + +func RemoveIthElement[T any](slice []T, i int) []T { + if i < 0 || i >= len(slice) { + return slice // Return the original slice if index is out of bounds + } + newSlice := make([]T, 0, len(slice)-1) + newSlice = append(newSlice, slice[:i]...) + return append(newSlice, slice[i+1:]...) +} diff --git a/execute/internal/utils_test.go b/execute/internal/utils_test.go new file mode 100644 index 000000000..d7302d212 --- /dev/null +++ b/execute/internal/utils_test.go @@ -0,0 +1,31 @@ +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRemoveIthElement(t *testing.T) { + tests := []struct { + name string + slice []int + index int + expected []int + }{ + {"Remove middle element", []int{1, 2, 3, 4, 5}, 2, []int{1, 2, 4, 5}}, + {"Remove first element", []int{1, 2, 3, 4, 5}, 0, []int{2, 3, 4, 5}}, + {"Remove last element", []int{1, 2, 3, 4, 5}, 4, []int{1, 2, 3, 4}}, + {"Index out of bounds (negative)", []int{1, 2, 3, 4, 5}, -1, []int{1, 2, 3, 4, 5}}, + {"Index out of bounds (too large)", []int{1, 2, 3, 4, 5}, 5, []int{1, 2, 3, 4, 5}}, + {"Single element slice", []int{1}, 0, []int{}}, + {"Empty slice", []int{}, 0, []int{}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := RemoveIthElement(tt.slice, tt.index) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/execute/observation.go b/execute/observation.go index 771acb527..b03ba2889 100644 --- a/execute/observation.go +++ b/execute/observation.go @@ -236,6 +236,9 @@ func (p *Plugin) getMessagesObservation( if err1 != nil { return exectypes.Observation{}, fmt.Errorf("unable to process token data %w", err1) } + if validateTokenDataObservations(messageObs, tkData) != nil { + return exectypes.Observation{}, fmt.Errorf("invalid token data observations") + } costlyMessages, err := p.costlyMessageObserver.Observe(ctx, messageObs.Flatten(), messageTimestamps) if err != nil { @@ -252,9 +255,11 @@ func (p *Plugin) getMessagesObservation( observation.Hashes = hashes observation.CostlyMessages = costlyMessages observation.TokenData = tkData + //observation.MessageAndTokenDataEncodedSizes = exectypes.GetEncodedMsgAndTokenDataSizes(messageObs, tkData) // Make sure encoded observation fits within the maximum observation size. - observation, err = truncateObservation(observation, maxObservationLength) + //observation, err = truncateObservation(observation, maxObservationLength, p.emptyEncodedSizes) + observation, err = p.observationOptimizer.TruncateObservation(observation) if err != nil { return exectypes.Observation{}, fmt.Errorf("unable to truncate observation: %w", err) } diff --git a/execute/optimizers/type_optimizer.go b/execute/optimizers/type_optimizer.go new file mode 100644 index 000000000..7f4e7ae63 --- /dev/null +++ b/execute/optimizers/type_optimizer.go @@ -0,0 +1,220 @@ +package optimizers + +import ( + "fmt" + "sort" + + "github.com/smartcontractkit/chainlink-ccip/execute/exectypes" + "github.com/smartcontractkit/chainlink-ccip/execute/internal" + + "golang.org/x/exp/maps" + + cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) + +type ObservationOptimizer struct { + maxEncodedSize int + emptyEncodedSizes EmptyEncodeSizes +} + +func NewObservationOptimizer(maxEncodedSize int) ObservationOptimizer { + return ObservationOptimizer{ + maxEncodedSize: maxEncodedSize, + emptyEncodedSizes: NewEmptyEncodeSizes(), + } +} + +type EmptyEncodeSizes struct { + MessageAndTokenData int + CommitData int + SeqNumMap int +} + +func NewEmptyEncodeSizes() EmptyEncodeSizes { + emptyMsg := cciptypes.Message{} + emptyTokenData := exectypes.MessageTokenData{} + emptyCommitData := exectypes.CommitData{} + emptySeqNrSize := internal.EncodedSize(make(map[cciptypes.SeqNum]cciptypes.Message)) + + return EmptyEncodeSizes{ + MessageAndTokenData: internal.EncodedSize(emptyMsg) + internal.EncodedSize(emptyTokenData), + CommitData: internal.EncodedSize(emptyCommitData), // 305 + SeqNumMap: emptySeqNrSize, // 2 + } +} + +// TruncateObservation truncates the observation to fit within the given op.maxEncodedSize after encoding. +// It removes data from the observation in the following order: +// For each chain, pick last report and start removing messages one at a time. +// If removed all messages from the report, remove the report. +// If removed last report in the chain, remove the chain. +// After removing full report from a chain, move to the next chain and repeat. This ensures that we don't +// exclude messages from one chain only. +// Keep repeating this process until the encoded observation fits within the op.maxEncodedSize +// Important Note: We can't delete messages completely from single reports as we need them to create merkle proofs. +// +//nolint:gocyclo +func (op ObservationOptimizer) TruncateObservation(observation exectypes.Observation) (exectypes.Observation, error) { + obs := observation + encodedObs, err := obs.Encode() + if err != nil { + return exectypes.Observation{}, err + } + encodedObsSize := len(encodedObs) + if encodedObsSize <= op.maxEncodedSize { + return obs, nil + } + + chains := maps.Keys(obs.CommitReports) + sort.Slice(chains, func(i, j int) bool { + return chains[i] < chains[j] + }) + + messageAndTokenDataEncodedSizes := exectypes.GetEncodedMsgAndTokenDataSizes(obs.Messages, obs.TokenData) + // While the encoded obs is too large, continue filtering data. + for encodedObsSize > op.maxEncodedSize { + // go through each chain and truncate observations for the final commit report. + for _, chain := range chains { + commits := obs.CommitReports[chain] + if len(commits) == 0 { + continue + } + lastCommit := &commits[len(commits)-1] + seqNum := lastCommit.SequenceNumberRange.Start() + // Remove messages one by one starting from the last message of the last commit report. + for seqNum <= lastCommit.SequenceNumberRange.End() { + if _, ok := obs.Messages[chain][seqNum]; !ok { + return exectypes.Observation{}, fmt.Errorf("missing message with seqNr %d from chain %d", seqNum, chain) + } + obs.Messages[chain][seqNum] = cciptypes.Message{} + obs.TokenData[chain][seqNum] = exectypes.NewMessageTokenData() + // Subtract the removed message and token size + encodedObsSize -= messageAndTokenDataEncodedSizes[chain][seqNum] + // Add empty message and token encoded size + encodedObsSize += op.emptyEncodedSizes.MessageAndTokenData + seqNum++ + // Once we assert the estimation is less than the max size we double-check with actual encoding size. + // Otherwise, we short circuit after checking the estimation only + if encodedObsSize <= op.maxEncodedSize && fitsWithinSize(obs, op.maxEncodedSize) { + return obs, nil + } + } + + var bytesTruncated int + // Reaching here means that all messages in the report are truncated, truncate the last commit + obs, bytesTruncated = op.truncateLastCommit(obs, chain) + + encodedObsSize -= bytesTruncated + + if len(obs.CommitReports[chain]) == 0 { + // If the last commit report was truncated, truncate the chain + obs = op.truncateChain(obs, chain) + } + + // Once we assert the estimation is less than the max size we double-check with actual encoding size. + // Otherwise, we short circuit after checking the estimation only + if encodedObsSize <= op.maxEncodedSize && fitsWithinSize(obs, op.maxEncodedSize) { + return obs, nil + } + } + // Truncated all chains. Return obs as is. (it has other data like contract discovery) + if len(obs.CommitReports) == 0 { + return obs, nil + } + // Encoding again after doing a full iteration on all chains and removing messages/commits. + // That is because using encoded sizes is not 100% accurate and there are some missing bytes in the calculation. + encodedObs, err = obs.Encode() + if err != nil { + return exectypes.Observation{}, err + } + encodedObsSize = len(encodedObs) + } + + return obs, nil +} + +func fitsWithinSize(obs exectypes.Observation, maxEncodedSize int) bool { + encodedObs, err := obs.Encode() + if err != nil { + return false + } + return len(encodedObs) <= maxEncodedSize +} + +// truncateLastCommit removes the last commit from the observation. +// returns observation and the number of bytes truncated. +func (op ObservationOptimizer) truncateLastCommit( + obs exectypes.Observation, + chain cciptypes.ChainSelector, +) (exectypes.Observation, int) { + observation := obs + bytesTruncated := 0 + commits := observation.CommitReports[chain] + if len(commits) == 0 { + return observation, bytesTruncated + } + lastCommit := commits[len(commits)-1] + // Remove the last commit from the list. + commits = commits[:len(commits)-1] + observation.CommitReports[chain] = commits + // Remove from the encoded size. + bytesTruncated = bytesTruncated + op.emptyEncodedSizes.CommitData + 4 // brackets, and commas + for seqNum, msg := range observation.Messages[chain] { + if lastCommit.SequenceNumberRange.Contains(seqNum) { + // Remove the message from the observation. + delete(observation.Messages[chain], seqNum) + // Remove the token data from the observation. + delete(observation.TokenData[chain], seqNum) + //delete(observation.Hashes[chain], seqNum) + // Remove the encoded size of the message and token data. + bytesTruncated += op.emptyEncodedSizes.MessageAndTokenData + bytesTruncated = bytesTruncated + 2*op.emptyEncodedSizes.SeqNumMap + bytesTruncated += 4 // for brackets and commas + // Remove costly messages + for i, costlyMessage := range observation.CostlyMessages { + if costlyMessage == msg.Header.MessageID { + observation.CostlyMessages = internal.RemoveIthElement(observation.CostlyMessages, i) + break + } + } + // Leaving Nonces untouched + } + } + + return observation, bytesTruncated +} + +// truncateChain removes all data related to the given chain from the observation. +// returns true if the chain was found and truncated, false otherwise. +func (op ObservationOptimizer) truncateChain( + obs exectypes.Observation, + chain cciptypes.ChainSelector, +) exectypes.Observation { + observation := obs + if _, ok := observation.CommitReports[chain]; !ok { + return observation + } + messageIDs := make(map[cciptypes.Bytes32]struct{}) + // To remove costly message IDs we need to iterate over all messages and find the ones that belong to the chain. + for _, seqNumMap := range observation.Messages { + for _, message := range seqNumMap { + messageIDs[message.Header.MessageID] = struct{}{} + } + } + + deleteCostlyMessages := func() { + for i, costlyMessage := range observation.CostlyMessages { + if _, ok := messageIDs[costlyMessage]; ok { + observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) + } + } + } + + delete(observation.CommitReports, chain) + delete(observation.Messages, chain) + delete(observation.TokenData, chain) + delete(observation.Nonces, chain) + deleteCostlyMessages() + + return observation +} diff --git a/execute/optimizers/type_optimizer_test.go b/execute/optimizers/type_optimizer_test.go new file mode 100644 index 000000000..32abb3030 --- /dev/null +++ b/execute/optimizers/type_optimizer_test.go @@ -0,0 +1,395 @@ +package optimizers + +import ( + "testing" + + "github.com/smartcontractkit/chainlink-ccip/execute/exectypes" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers/rand" + cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) + +func Test_truncateLastCommit(t *testing.T) { + tests := []struct { + name string + chain cciptypes.ChainSelector + observation exectypes.Observation + expected exectypes.Observation + }{ + { + name: "no commits to truncate", + chain: 1, + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: {}, + }, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: {}, + }, + }, + }, + { + name: "truncate last commit", + chain: 1, + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + 11: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x02}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + 11: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x02}}, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := NewObservationOptimizer(10000) + truncated, _ := op.truncateLastCommit(tt.observation, tt.chain) + require.Equal(t, tt.expected, truncated) + }) + } +} + +func Test_truncateChain(t *testing.T) { + tests := []struct { + name string + chain cciptypes.ChainSelector + observation exectypes.Observation + expected exectypes.Observation + }{ + { + name: "truncate chain data", + chain: 1, + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{}, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{}, + CostlyMessages: []cciptypes.Bytes32{}, + }, + }, + { + name: "truncate non existent chain", + chain: 2, // non existent chain + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := NewObservationOptimizer(10000) // size not important here + truncated := op.truncateChain(tt.observation, tt.chain) + require.Equal(t, tt.expected, truncated) + }) + } +} + +func Test_truncateObservation(t *testing.T) { + tests := []struct { + name string + observation exectypes.Observation + maxSize int + expected exectypes.Observation + deletedMsgs map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{} + wantErr bool + }{ + { + name: "no truncation needed", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + }, + maxSize: 1000, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + }, + }, + { + name: "truncate last commit", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 2600, // this number is calculated by checking encoded sizes for the observation we expect + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + }, + // Not going to check Messages in the test + }, + deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ + 1: { + 3: struct{}{}, + 4: struct{}{}, + }, + }, + }, + { + name: "truncate entire chain", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 2}, + 2: {3, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 1789, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + }, + }, + { + name: "truncate one message from first chain", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 2}, + 2: {3, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 3159, // chain 1, message 1 will be truncated + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + }, + deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ + 1: { + 1: struct{}{}, + }, + }, + }, + { + name: "truncate all", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 10}, + 2: {11, 20}, + }, + ), + }, + maxSize: 50, // less than what can fit a single commit report for single chain + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.observation.TokenData = makeNoopTokenDataObservations(tt.observation.Messages) + tt.expected.TokenData = tt.observation.TokenData + op := NewObservationOptimizer(tt.maxSize) + obs, err := op.TruncateObservation(tt.observation) + if tt.wantErr { + require.Error(t, err) + return + } + require.Equal(t, tt.expected.CommitReports, obs.CommitReports) + + // Check only deleted messages were deleted + for chain, seqNums := range obs.Messages { + for seqNum := range seqNums { + if _, ok := tt.deletedMsgs[chain]; ok { + if _, ok2 := tt.deletedMsgs[chain][seqNum]; ok2 { + require.True(t, obs.Messages[chain][seqNum].IsEmpty()) + continue + } + } + require.False(t, obs.Messages[chain][seqNum].IsEmpty()) + } + } + }) + } +} + +func makeMessageObservation( + srcToSeqNumRange map[cciptypes.ChainSelector]cciptypes.SeqNumRange, + opts ...msgOption) exectypes.MessageObservations { + + obs := make(exectypes.MessageObservations) + for src, seqNumRng := range srcToSeqNumRange { + obs[src] = make(map[cciptypes.SeqNum]cciptypes.Message) + for i := seqNumRng.Start(); i <= seqNumRng.End(); i++ { + msg := cciptypes.Message{ + Header: cciptypes.RampMessageHeader{ + SourceChainSelector: src, + SequenceNumber: i, + MessageID: rand.RandomBytes32(), + }, + FeeValueJuels: cciptypes.NewBigIntFromInt64(100), + } + for _, opt := range opts { + opt(&msg) + } + obs[src][i] = msg + } + } + return obs +} + +func makeNoopTokenDataObservations(msgs exectypes.MessageObservations) exectypes.TokenDataObservations { + tokenData := make(exectypes.TokenDataObservations) + for src, seqNumToMsg := range msgs { + tokenData[src] = make(map[cciptypes.SeqNum]exectypes.MessageTokenData) + for seq := range seqNumToMsg { + tokenData[src][seq] = exectypes.NewMessageTokenData() + } + } + return tokenData + +} + +type msgOption func(*cciptypes.Message) + +func withData(data []byte) msgOption { + return func(m *cciptypes.Message) { + m.Data = data + } +} diff --git a/execute/outcome.go b/execute/outcome.go index 15abd1e22..9f9c74bf8 100644 --- a/execute/outcome.go +++ b/execute/outcome.go @@ -5,6 +5,8 @@ import ( "fmt" "sort" + "github.com/smartcontractkit/chainlink-ccip/execute/internal" + mapset "github.com/deckarep/golang-set/v2" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" @@ -158,7 +160,7 @@ func (p *Plugin) getMessagesOutcome( } if len(report.Messages) == 0 { // If there are no messages, remove the commit report. - commitReports = append(commitReports[:i], commitReports[i+1:]...) + commitReports = internal.RemoveIthElement(commitReports, i) } commitReports = append(commitReports, report) } diff --git a/execute/plugin.go b/execute/plugin.go index 1bcde3ba1..fd02e3fa9 100644 --- a/execute/plugin.go +++ b/execute/plugin.go @@ -7,6 +7,8 @@ import ( "fmt" "time" + "github.com/smartcontractkit/chainlink-ccip/execute/optimizers" + mapset "github.com/deckarep/golang-set/v2" "golang.org/x/exp/maps" @@ -63,6 +65,7 @@ type Plugin struct { estimateProvider gas.EstimateProvider lggr logger.Logger + observationOptimizer optimizers.ObservationOptimizer // state contractsInitialized bool } @@ -114,7 +117,8 @@ func NewPlugin( reportingCfg.OracleID, destChain, ), - observer: metricsReporter, + observer: metricsReporter, + observationOptimizer: optimizers.NewObservationOptimizer(maxObservationLength), } } diff --git a/execute/plugin_functions.go b/execute/plugin_functions.go index 133507558..924533911 100644 --- a/execute/plugin_functions.go +++ b/execute/plugin_functions.go @@ -6,8 +6,6 @@ import ( "sort" "time" - "golang.org/x/exp/maps" - mapset "github.com/deckarep/golang-set/v2" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -245,171 +243,6 @@ func filterOutExecutedMessages( return filtered, nil } -// truncateObservation truncates the observation to fit within the given maxSize after encoding. -// It removes data from the observation in the following order: -// For each chain, pick last report and start removing messages one at a time. -// If removed all messages from the report, remove the report. -// If removed last report in the chain, remove the chain. -// After removing full report from a chain, move to the next chain and repeat. This ensures that we don't -// exclude messages from one chain only. -// Keep repeating this process until the encoded observation fits within the maxSize -// Important Note: We can't delete messages completely from single reports as we need them to create merkle proofs. -// -//nolint:gocyclo -func truncateObservation( - obs exectypes.Observation, - maxSize int, -) (exectypes.Observation, error) { - // TODO: Use a hash to store encoding sizes for individual messages - // and use that to determine how many messages to delete. - observation := obs - encodedObs, err := observation.Encode() - if err != nil { - return exectypes.Observation{}, err - } - if len(encodedObs) <= maxSize { - return observation, nil - } - - chains := maps.Keys(observation.CommitReports) - sort.Slice(chains, func(i, j int) bool { - return chains[i] < chains[j] - }) - - // If the encoded observation is too large, start filtering data. - for len(encodedObs) > maxSize { - for _, chain := range chains { - commits := observation.CommitReports[chain] - if len(commits) == 0 { - continue - } - lastCommit := &commits[len(commits)-1] - seqNum := lastCommit.SequenceNumberRange.Start() - - for seqNum <= lastCommit.SequenceNumberRange.End() { - if _, ok := observation.Messages[chain][seqNum]; !ok { - return exectypes.Observation{}, fmt.Errorf("missing message with seqNr %d from chain %d", seqNum, chain) - } - observation.Messages[chain][seqNum] = cciptypes.Message{} - - if _, ok := observation.TokenData[chain][seqNum]; !ok { - return exectypes.Observation{}, fmt.Errorf( - "missing tokenData for message with seqNr %d from chain %d", seqNum, chain, - ) - } - observation.TokenData[chain][seqNum] = exectypes.NewMessageTokenData() - - seqNum++ - // Each report will be deleted completely by maximum looping 8 times as the max report messages is 256. - // TODO: Remove the 32 check once we implement the hash size calculation. - if seqNum%32 == 0 && observationFitsSize(observation, maxSize) { - return observation, nil - } - } - - // Reaching here means that all messages in the report are truncated, truncate the last commit - observation = truncateLastCommit(observation, chain) - - if len(observation.CommitReports[chain]) == 0 { - // If the last commit report was truncated, truncate the chain - observation = truncateChain(observation, chain) - } - - if observationFitsSize(observation, maxSize) { - return observation, nil - } - chains = maps.Keys(observation.CommitReports) - } - // Truncated all chains. Return observation as is. (it has other data like contract discovery) - if len(observation.CommitReports) == 0 { - return observation, nil - } - encodedObs, err = observation.Encode() - if err != nil { - return exectypes.Observation{}, nil - } - } - - return observation, nil -} - -func observationFitsSize(obs exectypes.Observation, maxSize int) bool { - encodedObs, err := obs.Encode() - if err != nil { - return false - } - return len(encodedObs) <= maxSize -} - -// truncateLastCommit removes the last commit from the observation. -// errors if there are no commits to truncate. -func truncateLastCommit( - obs exectypes.Observation, - chain cciptypes.ChainSelector, -) exectypes.Observation { - observation := obs - commits := observation.CommitReports[chain] - if len(commits) == 0 { - return observation - } - lastCommit := commits[len(commits)-1] - // Remove the last commit from the list. - commits = commits[:len(commits)-1] - observation.CommitReports[chain] = commits - for seqNum, msg := range observation.Messages[chain] { - if lastCommit.SequenceNumberRange.Contains(seqNum) { - // Remove the message from the observation. - delete(observation.Messages[chain], seqNum) - // Remove the token data from the observation. - delete(observation.TokenData[chain], seqNum) - // Remove costly messages - for i, costlyMessage := range observation.CostlyMessages { - if costlyMessage == msg.Header.MessageID { - observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) - } - } - // Leaving Nonces untouched - } - } - - return observation -} - -// truncateChain removes all data related to the given chain from the observation. -// returns true if the chain was found and truncated, false otherwise. -func truncateChain( - obs exectypes.Observation, - chain cciptypes.ChainSelector, -) exectypes.Observation { - observation := obs - if _, ok := observation.CommitReports[chain]; !ok { - return observation - } - messageIDs := make(map[cciptypes.Bytes32]struct{}) - // To remove costly message IDs we need to iterate over all messages and find the ones that belong to the chain. - for _, seqNumMap := range observation.Messages { - for _, message := range seqNumMap { - messageIDs[message.Header.MessageID] = struct{}{} - } - } - - deleteCostlyMessages := func() { - for i, costlyMessage := range observation.CostlyMessages { - if _, ok := messageIDs[costlyMessage]; ok { - observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) - } - } - } - - delete(observation.CommitReports, chain) - delete(observation.Messages, chain) - delete(observation.TokenData, chain) - delete(observation.Nonces, chain) - deleteCostlyMessages() - - return observation -} - func decodeAttributedObservations( aos []types.AttributedObservation, ) ([]plugincommon.AttributedObservation[exectypes.Observation], error) { diff --git a/execute/plugin_functions_test.go b/execute/plugin_functions_test.go index 64987354d..63754dcf9 100644 --- a/execute/plugin_functions_test.go +++ b/execute/plugin_functions_test.go @@ -1317,338 +1317,3 @@ func Test_getMessageTimestampMap(t *testing.T) { }) } } - -func Test_truncateLastCommit(t *testing.T) { - tests := []struct { - name string - chain cciptypes.ChainSelector - observation exectypes.Observation - expected exectypes.Observation - }{ - { - name: "no commits to truncate", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: {}, - }, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: {}, - }, - }, - }, - { - name: "truncate last commit", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - 11: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x02}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - 11: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x02}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - truncated := truncateLastCommit(tt.observation, tt.chain) - require.Equal(t, tt.expected, truncated) - }) - } -} - -func Test_truncateChain(t *testing.T) { - tests := []struct { - name string - chain cciptypes.ChainSelector - observation exectypes.Observation - expected exectypes.Observation - }{ - { - name: "truncate chain data", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{}, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{}, - CostlyMessages: []cciptypes.Bytes32{}, - }, - }, - { - name: "truncate non existent chain", - chain: 2, // non existent chain - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - truncated := truncateChain(tt.observation, tt.chain) - require.Equal(t, tt.expected, truncated) - }) - } -} - -func Test_truncateObservation(t *testing.T) { - tests := []struct { - name string - observation exectypes.Observation - maxSize int - expected exectypes.Observation - deletedMsgs map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{} - wantErr bool - }{ - { - name: "no truncation needed", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - }, - maxSize: 1000, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - }, - }, - { - name: "truncate last commit", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 4}, - }, - withData(make([]byte, 100)), - ), - }, - maxSize: 2600, // this number is calculated by checking encoded sizes for the observation we expect - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - }, - }, - // Not going to check Messages in the test - }, - deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ - 1: { - 3: struct{}{}, - 4: struct{}{}, - }, - }, - }, - { - name: "truncate entire chain", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - }, - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 2}, - 2: {3, 4}, - }, - withData(make([]byte, 100)), - ), - }, - maxSize: 1789, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - }, - }, - //{ - // name: "truncate one message from first chain", - // observation: exectypes.Observation{ - // CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - // 1: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - // }, - // 2: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - // }, - // }, - // Messages: makeMessageObservation( - // map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - // 1: {1, 2}, - // 2: {3, 4}, - // }, - // withData(make([]byte, 100)), - // ), - // }, - // maxSize: 3159, // chain 1, message 1 will be truncated - // expected: exectypes.Observation{ - // CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - // 1: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - // }, - // 2: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - // }, - // }, - // }, - // deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ - // 1: { - // 1: struct{}{}, - // }, - // }, - //}, - { - name: "truncate all", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 10}, - 2: {11, 20}, - }, - ), - }, - maxSize: 50, // less than what can fit a single commit report for single chain - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.observation.TokenData = makeNoopTokenDataObservations(tt.observation.Messages) - tt.expected.TokenData = tt.observation.TokenData - obs, err := truncateObservation(tt.observation, tt.maxSize) - if tt.wantErr { - require.Error(t, err) - return - } - require.Equal(t, tt.expected.CommitReports, obs.CommitReports) - - // Check only deleted messages were deleted - for chain, seqNums := range obs.Messages { - for seqNum := range seqNums { - if _, ok := tt.deletedMsgs[chain]; ok { - if _, ok2 := tt.deletedMsgs[chain][seqNum]; ok2 { - require.True(t, obs.Messages[chain][seqNum].IsEmpty()) - continue - } - } - require.False(t, obs.Messages[chain][seqNum].IsEmpty()) - } - } - }) - } -} diff --git a/execute/test_utils.go b/execute/test_utils.go index 518341ebf..2238a6f7e 100644 --- a/execute/test_utils.go +++ b/execute/test_utils.go @@ -461,43 +461,6 @@ func makeMsgWithMetadata( } } -func makeMessageObservation( - srcToSeqNumRange map[cciptypes.ChainSelector]cciptypes.SeqNumRange, - opts ...msgOption) exectypes.MessageObservations { - - obs := make(exectypes.MessageObservations) - for src, seqNumRng := range srcToSeqNumRange { - obs[src] = make(map[cciptypes.SeqNum]cciptypes.Message) - for i := seqNumRng.Start(); i <= seqNumRng.End(); i++ { - msg := cciptypes.Message{ - Header: cciptypes.RampMessageHeader{ - SourceChainSelector: src, - SequenceNumber: i, - MessageID: rand.RandomBytes32(), - }, - FeeValueJuels: cciptypes.NewBigIntFromInt64(100), - } - for _, opt := range opts { - opt(&msg) - } - obs[src][i] = msg - } - } - return obs -} - -func makeNoopTokenDataObservations(msgs exectypes.MessageObservations) exectypes.TokenDataObservations { - tokenData := make(exectypes.TokenDataObservations) - for src, seqNumToMsg := range msgs { - tokenData[src] = make(map[cciptypes.SeqNum]exectypes.MessageTokenData) - for seq := range seqNumToMsg { - tokenData[src][seq] = exectypes.NewMessageTokenData() - } - } - return tokenData - -} - type nodeSetup struct { node *Plugin reportCodec cciptypes.ExecutePluginCodec diff --git a/internal/plugincommon/discovery/processor.go b/internal/plugincommon/discovery/processor.go index 8e17d80d3..5dc23d992 100644 --- a/internal/plugincommon/discovery/processor.go +++ b/internal/plugincommon/discovery/processor.go @@ -218,6 +218,7 @@ func (cdp *ContractDiscoveryProcessor) Outcome( ctx context.Context, _ dt.Outcome, _ dt.Query, aos []plugincommon.AttributedObservation[dt.Observation], ) (dt.Outcome, error) { cdp.lggr.Infow("Processing contract discovery outcome", "observations", aos) + contracts := make(reader.ContractAddresses) agg := aggregateObservations(cdp.lggr, cdp.dest, aos) @@ -225,24 +226,28 @@ func (cdp *ContractDiscoveryProcessor) Outcome( donThresh := consensus.MakeConstantThreshold[cciptypes.ChainSelector](consensus.TwoFPlus1(cdp.fRoleDON)) fChain := consensus.GetConsensusMap(cdp.lggr, "fChain", agg.fChain, donThresh) fChainThresh := consensus.MakeMultiThreshold(fChain, consensus.TwoFPlus1) - destThresh := consensus.MakeConstantThreshold[cciptypes.ChainSelector](consensus.TwoFPlus1(fChain[cdp.dest])) - contracts := make(reader.ContractAddresses) - onrampConsensus := consensus.GetConsensusMap( - cdp.lggr, - "onramp", - agg.onrampAddrs, - destThresh, - ) - cdp.lggr.Infow("Determined consensus onramps", - "onrampConsensus", onrampConsensus, - "onrampAddrs", agg.onrampAddrs, - "fChainThresh", fChainThresh, - ) - if len(onrampConsensus) == 0 { - cdp.lggr.Debugw("No consensus on onramps, onrampConsensus map is empty") + if _, exists := fChain[cdp.dest]; !exists { + cdp.lggr.Warnw("missing fChain for dest (fChain[%d]), skipping onramp address lookup", cdp.dest) + } else { + destThresh := consensus.MakeConstantThreshold[cciptypes.ChainSelector](consensus.TwoFPlus1(fChain[cdp.dest])) + + onrampConsensus := consensus.GetConsensusMap( + cdp.lggr, + "onramp", + agg.onrampAddrs, + destThresh, + ) + cdp.lggr.Infow("Determined consensus onramps", + "onrampConsensus", onrampConsensus, + "onrampAddrs", agg.onrampAddrs, + "fChainThresh", fChainThresh, + ) + if len(onrampConsensus) == 0 { + cdp.lggr.Debugw("No consensus on onramps, onrampConsensus map is empty") + } + contracts[consts.ContractNameOnRamp] = onrampConsensus } - contracts[consts.ContractNameOnRamp] = onrampConsensus nonceManagerConsensus := consensus.GetConsensusMap( cdp.lggr, diff --git a/pkg/reader/ccip.go b/pkg/reader/ccip.go index d036f9b1e..e1a9fb2e8 100644 --- a/pkg/reader/ccip.go +++ b/pkg/reader/ccip.go @@ -1028,13 +1028,16 @@ func (r *ccipChainReader) getFeeQuoterTokenPriceUSD(ctx context.Context, tokenAd ) if err != nil { - return cciptypes.BigInt{}, fmt.Errorf("failed to get LINK token price, addr: %v, err: %w", tokenAddr, err) + return cciptypes.BigInt{}, fmt.Errorf("failed to get token price, addr: %v, err: %w", tokenAddr, err) } price := timestampedPrice.Value + if price == nil { + return cciptypes.BigInt{}, fmt.Errorf("token price is nil, addr: %v", tokenAddr) + } if price.Cmp(big.NewInt(0)) == 0 { - return cciptypes.BigInt{}, fmt.Errorf("LINK token price is 0, addr: %v", tokenAddr) + return cciptypes.BigInt{}, fmt.Errorf("token price is 0, addr: %v", tokenAddr) } return cciptypes.NewBigInt(price), nil diff --git a/pkg/reader/price_reader.go b/pkg/reader/price_reader.go index e9359d318..113204cec 100644 --- a/pkg/reader/price_reader.go +++ b/pkg/reader/price_reader.go @@ -177,23 +177,15 @@ func (pr *priceReader) GetFeedPricesUSD( } // Get price data - priceResult, err := contractResults[0].GetResult() + latestRoundData, err := pr.getPriceData(contractResults[0], boundContract) if err != nil { - return nil, fmt.Errorf("get price for contract %s: %w", boundContract.Address, err) - } - latestRoundData, ok := priceResult.(*LatestRoundData) - if !ok { - return nil, fmt.Errorf("invalid price data type for contract %s", boundContract.Address) + return nil, err } // Get decimals - decimalResult, err := contractResults[1].GetResult() + decimals, err := pr.getDecimals(contractResults[1], boundContract) if err != nil { - return nil, fmt.Errorf("get decimals for contract %s: %w", boundContract.Address, err) - } - decimals, ok := decimalResult.(*uint8) - if !ok { - return nil, fmt.Errorf("invalid decimals data type for contract %s", boundContract.Address) + return nil, err } // Normalize price for this contract @@ -215,6 +207,42 @@ func (pr *priceReader) GetFeedPricesUSD( return prices, nil } +func (pr *priceReader) getPriceData( + result commontypes.BatchReadResult, + boundContract commontypes.BoundContract, +) (*LatestRoundData, error) { + priceResult, err := result.GetResult() + if err != nil { + return nil, fmt.Errorf("get price for contract %s: %w", boundContract.Address, err) + } + if priceResult == nil { + return nil, fmt.Errorf("priceResult value is nil for contract %s", boundContract.Address) + } + latestRoundData, ok := priceResult.(*LatestRoundData) + if !ok { + return nil, fmt.Errorf("invalid price data type for contract %s", boundContract.Address) + } + return latestRoundData, nil +} + +func (pr *priceReader) getDecimals( + result commontypes.BatchReadResult, + boundContract commontypes.BoundContract, +) (*uint8, error) { + decimalResult, err := result.GetResult() + if err != nil { + return nil, fmt.Errorf("get decimals for contract %s: %w", boundContract.Address, err) + } + if decimalResult == nil { + return nil, fmt.Errorf("decimalResult value is nil for contract %s", boundContract.Address) + } + decimals, ok := decimalResult.(*uint8) + if !ok { + return nil, fmt.Errorf("invalid decimals data type for contract %s", boundContract.Address) + } + return decimals, nil +} + // prepareBatchRequest creates a batch request grouped by contract and returns the mapping of contracts to token indices func (pr *priceReader) prepareBatchRequest( tokens []ccipocr3.UnknownEncodedAddress,