From 5fa504b51deecfff82c35a27b82c1c8932d843db Mon Sep 17 00:00:00 2001 From: "Abdelrahman Soliman (Boda)" <2677789+asoliman92@users.noreply.github.com> Date: Fri, 20 Dec 2024 19:13:39 +0200 Subject: [PATCH 1/2] Use encoded sizes hash [CCIP-4618] (#376) Truncate observation algorithm to fit max size was encoding whole observation after removing each single message. This operation is CPU heavy and made it very slow even to finish tests in time. Fix: Use a hash to hold encoded messages sizes and while removing messages we do simple arithmetic operations to deduce the current size. Still there's encoding happening but on a higher level and less frequently to reduce any snowball effects from small differences because the arithmetic operations are not 100% accurate. Co-authored-by: Will Winder --- execute/exectypes/observation.go | 16 + execute/internal/gas/gas_estimate_provider.go | 4 +- execute/internal/utils.go | 20 + execute/internal/utils_test.go | 31 ++ execute/observation.go | 7 +- execute/optimizers/type_optimizer.go | 220 ++++++++++ execute/optimizers/type_optimizer_test.go | 395 ++++++++++++++++++ execute/outcome.go | 4 +- execute/plugin.go | 6 +- execute/plugin_functions.go | 167 -------- execute/plugin_functions_test.go | 335 --------------- execute/test_utils.go | 37 -- 12 files changed, 699 insertions(+), 543 deletions(-) create mode 100644 execute/internal/utils.go create mode 100644 execute/internal/utils_test.go create mode 100644 execute/optimizers/type_optimizer.go create mode 100644 execute/optimizers/type_optimizer_test.go diff --git a/execute/exectypes/observation.go b/execute/exectypes/observation.go index 331e4d7dc..543338d2c 100644 --- a/execute/exectypes/observation.go +++ b/execute/exectypes/observation.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" + "github.com/smartcontractkit/chainlink-ccip/execute/internal" dt "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/discovery/discoverytypes" cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" ) @@ -17,6 +18,8 @@ type MessageObservations map[cciptypes.ChainSelector]map[cciptypes.SeqNum]ccipty type MessageHashes map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Bytes32 +type EncodedMsgAndTokenDataSizes map[cciptypes.ChainSelector]map[cciptypes.SeqNum]int + // Flatten nested maps into a slice of messages. func (mo MessageObservations) Flatten() []cciptypes.Message { var results []cciptypes.Message @@ -43,6 +46,19 @@ func GetHashes(ctx context.Context, mo MessageObservations, hasher cciptypes.Mes return hashes, nil } +// GetEncodedMsgAndTokenDataSizes calculates the encoded sizes of messages and their token data counterpart. +func GetEncodedMsgAndTokenDataSizes(mo MessageObservations, tds TokenDataObservations) EncodedMsgAndTokenDataSizes { + sizes := make(EncodedMsgAndTokenDataSizes) + for chain, msgs := range mo { + sizes[chain] = make(map[cciptypes.SeqNum]int) + for seq, msg := range msgs { + td := tds[chain][seq] + sizes[chain][seq] = internal.EncodedSize(msg) + internal.EncodedSize(td) + } + } + return sizes +} + // NonceObservations contain the latest nonce for senders in the previously observed messages. // Nonces are organized by source chain selector and the string encoded sender address. The address // must be encoding according to the destination chain requirements with typeconv.AddressBytesToString. diff --git a/execute/internal/gas/gas_estimate_provider.go b/execute/internal/gas/gas_estimate_provider.go index 7ac6f0fba..226213e4d 100644 --- a/execute/internal/gas/gas_estimate_provider.go +++ b/execute/internal/gas/gas_estimate_provider.go @@ -1,6 +1,8 @@ package gas -import "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +import ( + "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) // EstimateProvider is used to estimate the gas cost of a message or a merkle tree. // TODO: Move to pkg/types/ccipocr3 or remove. diff --git a/execute/internal/utils.go b/execute/internal/utils.go new file mode 100644 index 000000000..3acb6ce7c --- /dev/null +++ b/execute/internal/utils.go @@ -0,0 +1,20 @@ +package internal + +import "encoding/json" + +func EncodedSize[T any](obj T) int { + enc, err := json.Marshal(obj) + if err != nil { + return 0 + } + return len(enc) +} + +func RemoveIthElement[T any](slice []T, i int) []T { + if i < 0 || i >= len(slice) { + return slice // Return the original slice if index is out of bounds + } + newSlice := make([]T, 0, len(slice)-1) + newSlice = append(newSlice, slice[:i]...) + return append(newSlice, slice[i+1:]...) +} diff --git a/execute/internal/utils_test.go b/execute/internal/utils_test.go new file mode 100644 index 000000000..d7302d212 --- /dev/null +++ b/execute/internal/utils_test.go @@ -0,0 +1,31 @@ +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRemoveIthElement(t *testing.T) { + tests := []struct { + name string + slice []int + index int + expected []int + }{ + {"Remove middle element", []int{1, 2, 3, 4, 5}, 2, []int{1, 2, 4, 5}}, + {"Remove first element", []int{1, 2, 3, 4, 5}, 0, []int{2, 3, 4, 5}}, + {"Remove last element", []int{1, 2, 3, 4, 5}, 4, []int{1, 2, 3, 4}}, + {"Index out of bounds (negative)", []int{1, 2, 3, 4, 5}, -1, []int{1, 2, 3, 4, 5}}, + {"Index out of bounds (too large)", []int{1, 2, 3, 4, 5}, 5, []int{1, 2, 3, 4, 5}}, + {"Single element slice", []int{1}, 0, []int{}}, + {"Empty slice", []int{}, 0, []int{}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := RemoveIthElement(tt.slice, tt.index) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/execute/observation.go b/execute/observation.go index 771acb527..b03ba2889 100644 --- a/execute/observation.go +++ b/execute/observation.go @@ -236,6 +236,9 @@ func (p *Plugin) getMessagesObservation( if err1 != nil { return exectypes.Observation{}, fmt.Errorf("unable to process token data %w", err1) } + if validateTokenDataObservations(messageObs, tkData) != nil { + return exectypes.Observation{}, fmt.Errorf("invalid token data observations") + } costlyMessages, err := p.costlyMessageObserver.Observe(ctx, messageObs.Flatten(), messageTimestamps) if err != nil { @@ -252,9 +255,11 @@ func (p *Plugin) getMessagesObservation( observation.Hashes = hashes observation.CostlyMessages = costlyMessages observation.TokenData = tkData + //observation.MessageAndTokenDataEncodedSizes = exectypes.GetEncodedMsgAndTokenDataSizes(messageObs, tkData) // Make sure encoded observation fits within the maximum observation size. - observation, err = truncateObservation(observation, maxObservationLength) + //observation, err = truncateObservation(observation, maxObservationLength, p.emptyEncodedSizes) + observation, err = p.observationOptimizer.TruncateObservation(observation) if err != nil { return exectypes.Observation{}, fmt.Errorf("unable to truncate observation: %w", err) } diff --git a/execute/optimizers/type_optimizer.go b/execute/optimizers/type_optimizer.go new file mode 100644 index 000000000..7f4e7ae63 --- /dev/null +++ b/execute/optimizers/type_optimizer.go @@ -0,0 +1,220 @@ +package optimizers + +import ( + "fmt" + "sort" + + "github.com/smartcontractkit/chainlink-ccip/execute/exectypes" + "github.com/smartcontractkit/chainlink-ccip/execute/internal" + + "golang.org/x/exp/maps" + + cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) + +type ObservationOptimizer struct { + maxEncodedSize int + emptyEncodedSizes EmptyEncodeSizes +} + +func NewObservationOptimizer(maxEncodedSize int) ObservationOptimizer { + return ObservationOptimizer{ + maxEncodedSize: maxEncodedSize, + emptyEncodedSizes: NewEmptyEncodeSizes(), + } +} + +type EmptyEncodeSizes struct { + MessageAndTokenData int + CommitData int + SeqNumMap int +} + +func NewEmptyEncodeSizes() EmptyEncodeSizes { + emptyMsg := cciptypes.Message{} + emptyTokenData := exectypes.MessageTokenData{} + emptyCommitData := exectypes.CommitData{} + emptySeqNrSize := internal.EncodedSize(make(map[cciptypes.SeqNum]cciptypes.Message)) + + return EmptyEncodeSizes{ + MessageAndTokenData: internal.EncodedSize(emptyMsg) + internal.EncodedSize(emptyTokenData), + CommitData: internal.EncodedSize(emptyCommitData), // 305 + SeqNumMap: emptySeqNrSize, // 2 + } +} + +// TruncateObservation truncates the observation to fit within the given op.maxEncodedSize after encoding. +// It removes data from the observation in the following order: +// For each chain, pick last report and start removing messages one at a time. +// If removed all messages from the report, remove the report. +// If removed last report in the chain, remove the chain. +// After removing full report from a chain, move to the next chain and repeat. This ensures that we don't +// exclude messages from one chain only. +// Keep repeating this process until the encoded observation fits within the op.maxEncodedSize +// Important Note: We can't delete messages completely from single reports as we need them to create merkle proofs. +// +//nolint:gocyclo +func (op ObservationOptimizer) TruncateObservation(observation exectypes.Observation) (exectypes.Observation, error) { + obs := observation + encodedObs, err := obs.Encode() + if err != nil { + return exectypes.Observation{}, err + } + encodedObsSize := len(encodedObs) + if encodedObsSize <= op.maxEncodedSize { + return obs, nil + } + + chains := maps.Keys(obs.CommitReports) + sort.Slice(chains, func(i, j int) bool { + return chains[i] < chains[j] + }) + + messageAndTokenDataEncodedSizes := exectypes.GetEncodedMsgAndTokenDataSizes(obs.Messages, obs.TokenData) + // While the encoded obs is too large, continue filtering data. + for encodedObsSize > op.maxEncodedSize { + // go through each chain and truncate observations for the final commit report. + for _, chain := range chains { + commits := obs.CommitReports[chain] + if len(commits) == 0 { + continue + } + lastCommit := &commits[len(commits)-1] + seqNum := lastCommit.SequenceNumberRange.Start() + // Remove messages one by one starting from the last message of the last commit report. + for seqNum <= lastCommit.SequenceNumberRange.End() { + if _, ok := obs.Messages[chain][seqNum]; !ok { + return exectypes.Observation{}, fmt.Errorf("missing message with seqNr %d from chain %d", seqNum, chain) + } + obs.Messages[chain][seqNum] = cciptypes.Message{} + obs.TokenData[chain][seqNum] = exectypes.NewMessageTokenData() + // Subtract the removed message and token size + encodedObsSize -= messageAndTokenDataEncodedSizes[chain][seqNum] + // Add empty message and token encoded size + encodedObsSize += op.emptyEncodedSizes.MessageAndTokenData + seqNum++ + // Once we assert the estimation is less than the max size we double-check with actual encoding size. + // Otherwise, we short circuit after checking the estimation only + if encodedObsSize <= op.maxEncodedSize && fitsWithinSize(obs, op.maxEncodedSize) { + return obs, nil + } + } + + var bytesTruncated int + // Reaching here means that all messages in the report are truncated, truncate the last commit + obs, bytesTruncated = op.truncateLastCommit(obs, chain) + + encodedObsSize -= bytesTruncated + + if len(obs.CommitReports[chain]) == 0 { + // If the last commit report was truncated, truncate the chain + obs = op.truncateChain(obs, chain) + } + + // Once we assert the estimation is less than the max size we double-check with actual encoding size. + // Otherwise, we short circuit after checking the estimation only + if encodedObsSize <= op.maxEncodedSize && fitsWithinSize(obs, op.maxEncodedSize) { + return obs, nil + } + } + // Truncated all chains. Return obs as is. (it has other data like contract discovery) + if len(obs.CommitReports) == 0 { + return obs, nil + } + // Encoding again after doing a full iteration on all chains and removing messages/commits. + // That is because using encoded sizes is not 100% accurate and there are some missing bytes in the calculation. + encodedObs, err = obs.Encode() + if err != nil { + return exectypes.Observation{}, err + } + encodedObsSize = len(encodedObs) + } + + return obs, nil +} + +func fitsWithinSize(obs exectypes.Observation, maxEncodedSize int) bool { + encodedObs, err := obs.Encode() + if err != nil { + return false + } + return len(encodedObs) <= maxEncodedSize +} + +// truncateLastCommit removes the last commit from the observation. +// returns observation and the number of bytes truncated. +func (op ObservationOptimizer) truncateLastCommit( + obs exectypes.Observation, + chain cciptypes.ChainSelector, +) (exectypes.Observation, int) { + observation := obs + bytesTruncated := 0 + commits := observation.CommitReports[chain] + if len(commits) == 0 { + return observation, bytesTruncated + } + lastCommit := commits[len(commits)-1] + // Remove the last commit from the list. + commits = commits[:len(commits)-1] + observation.CommitReports[chain] = commits + // Remove from the encoded size. + bytesTruncated = bytesTruncated + op.emptyEncodedSizes.CommitData + 4 // brackets, and commas + for seqNum, msg := range observation.Messages[chain] { + if lastCommit.SequenceNumberRange.Contains(seqNum) { + // Remove the message from the observation. + delete(observation.Messages[chain], seqNum) + // Remove the token data from the observation. + delete(observation.TokenData[chain], seqNum) + //delete(observation.Hashes[chain], seqNum) + // Remove the encoded size of the message and token data. + bytesTruncated += op.emptyEncodedSizes.MessageAndTokenData + bytesTruncated = bytesTruncated + 2*op.emptyEncodedSizes.SeqNumMap + bytesTruncated += 4 // for brackets and commas + // Remove costly messages + for i, costlyMessage := range observation.CostlyMessages { + if costlyMessage == msg.Header.MessageID { + observation.CostlyMessages = internal.RemoveIthElement(observation.CostlyMessages, i) + break + } + } + // Leaving Nonces untouched + } + } + + return observation, bytesTruncated +} + +// truncateChain removes all data related to the given chain from the observation. +// returns true if the chain was found and truncated, false otherwise. +func (op ObservationOptimizer) truncateChain( + obs exectypes.Observation, + chain cciptypes.ChainSelector, +) exectypes.Observation { + observation := obs + if _, ok := observation.CommitReports[chain]; !ok { + return observation + } + messageIDs := make(map[cciptypes.Bytes32]struct{}) + // To remove costly message IDs we need to iterate over all messages and find the ones that belong to the chain. + for _, seqNumMap := range observation.Messages { + for _, message := range seqNumMap { + messageIDs[message.Header.MessageID] = struct{}{} + } + } + + deleteCostlyMessages := func() { + for i, costlyMessage := range observation.CostlyMessages { + if _, ok := messageIDs[costlyMessage]; ok { + observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) + } + } + } + + delete(observation.CommitReports, chain) + delete(observation.Messages, chain) + delete(observation.TokenData, chain) + delete(observation.Nonces, chain) + deleteCostlyMessages() + + return observation +} diff --git a/execute/optimizers/type_optimizer_test.go b/execute/optimizers/type_optimizer_test.go new file mode 100644 index 000000000..32abb3030 --- /dev/null +++ b/execute/optimizers/type_optimizer_test.go @@ -0,0 +1,395 @@ +package optimizers + +import ( + "testing" + + "github.com/smartcontractkit/chainlink-ccip/execute/exectypes" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers/rand" + cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) + +func Test_truncateLastCommit(t *testing.T) { + tests := []struct { + name string + chain cciptypes.ChainSelector + observation exectypes.Observation + expected exectypes.Observation + }{ + { + name: "no commits to truncate", + chain: 1, + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: {}, + }, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: {}, + }, + }, + }, + { + name: "truncate last commit", + chain: 1, + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + 11: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x02}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + 11: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x02}}, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := NewObservationOptimizer(10000) + truncated, _ := op.truncateLastCommit(tt.observation, tt.chain) + require.Equal(t, tt.expected, truncated) + }) + } +} + +func Test_truncateChain(t *testing.T) { + tests := []struct { + name string + chain cciptypes.ChainSelector + observation exectypes.Observation + expected exectypes.Observation + }{ + { + name: "truncate chain data", + chain: 1, + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{}, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{}, + CostlyMessages: []cciptypes.Bytes32{}, + }, + }, + { + name: "truncate non existent chain", + chain: 2, // non existent chain + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ + 1: { + 1: exectypes.NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := NewObservationOptimizer(10000) // size not important here + truncated := op.truncateChain(tt.observation, tt.chain) + require.Equal(t, tt.expected, truncated) + }) + } +} + +func Test_truncateObservation(t *testing.T) { + tests := []struct { + name string + observation exectypes.Observation + maxSize int + expected exectypes.Observation + deletedMsgs map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{} + wantErr bool + }{ + { + name: "no truncation needed", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + }, + maxSize: 1000, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + }, + }, + { + name: "truncate last commit", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 2600, // this number is calculated by checking encoded sizes for the observation we expect + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + }, + // Not going to check Messages in the test + }, + deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ + 1: { + 3: struct{}{}, + 4: struct{}{}, + }, + }, + }, + { + name: "truncate entire chain", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 2}, + 2: {3, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 1789, + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + }, + }, + { + name: "truncate one message from first chain", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 2}, + 2: {3, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 3159, // chain 1, message 1 will be truncated + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + }, + deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ + 1: { + 1: struct{}{}, + }, + }, + }, + { + name: "truncate all", + observation: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 10}, + 2: {11, 20}, + }, + ), + }, + maxSize: 50, // less than what can fit a single commit report for single chain + expected: exectypes.Observation{ + CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.observation.TokenData = makeNoopTokenDataObservations(tt.observation.Messages) + tt.expected.TokenData = tt.observation.TokenData + op := NewObservationOptimizer(tt.maxSize) + obs, err := op.TruncateObservation(tt.observation) + if tt.wantErr { + require.Error(t, err) + return + } + require.Equal(t, tt.expected.CommitReports, obs.CommitReports) + + // Check only deleted messages were deleted + for chain, seqNums := range obs.Messages { + for seqNum := range seqNums { + if _, ok := tt.deletedMsgs[chain]; ok { + if _, ok2 := tt.deletedMsgs[chain][seqNum]; ok2 { + require.True(t, obs.Messages[chain][seqNum].IsEmpty()) + continue + } + } + require.False(t, obs.Messages[chain][seqNum].IsEmpty()) + } + } + }) + } +} + +func makeMessageObservation( + srcToSeqNumRange map[cciptypes.ChainSelector]cciptypes.SeqNumRange, + opts ...msgOption) exectypes.MessageObservations { + + obs := make(exectypes.MessageObservations) + for src, seqNumRng := range srcToSeqNumRange { + obs[src] = make(map[cciptypes.SeqNum]cciptypes.Message) + for i := seqNumRng.Start(); i <= seqNumRng.End(); i++ { + msg := cciptypes.Message{ + Header: cciptypes.RampMessageHeader{ + SourceChainSelector: src, + SequenceNumber: i, + MessageID: rand.RandomBytes32(), + }, + FeeValueJuels: cciptypes.NewBigIntFromInt64(100), + } + for _, opt := range opts { + opt(&msg) + } + obs[src][i] = msg + } + } + return obs +} + +func makeNoopTokenDataObservations(msgs exectypes.MessageObservations) exectypes.TokenDataObservations { + tokenData := make(exectypes.TokenDataObservations) + for src, seqNumToMsg := range msgs { + tokenData[src] = make(map[cciptypes.SeqNum]exectypes.MessageTokenData) + for seq := range seqNumToMsg { + tokenData[src][seq] = exectypes.NewMessageTokenData() + } + } + return tokenData + +} + +type msgOption func(*cciptypes.Message) + +func withData(data []byte) msgOption { + return func(m *cciptypes.Message) { + m.Data = data + } +} diff --git a/execute/outcome.go b/execute/outcome.go index 15abd1e22..9f9c74bf8 100644 --- a/execute/outcome.go +++ b/execute/outcome.go @@ -5,6 +5,8 @@ import ( "fmt" "sort" + "github.com/smartcontractkit/chainlink-ccip/execute/internal" + mapset "github.com/deckarep/golang-set/v2" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" @@ -158,7 +160,7 @@ func (p *Plugin) getMessagesOutcome( } if len(report.Messages) == 0 { // If there are no messages, remove the commit report. - commitReports = append(commitReports[:i], commitReports[i+1:]...) + commitReports = internal.RemoveIthElement(commitReports, i) } commitReports = append(commitReports, report) } diff --git a/execute/plugin.go b/execute/plugin.go index 1bcde3ba1..fd02e3fa9 100644 --- a/execute/plugin.go +++ b/execute/plugin.go @@ -7,6 +7,8 @@ import ( "fmt" "time" + "github.com/smartcontractkit/chainlink-ccip/execute/optimizers" + mapset "github.com/deckarep/golang-set/v2" "golang.org/x/exp/maps" @@ -63,6 +65,7 @@ type Plugin struct { estimateProvider gas.EstimateProvider lggr logger.Logger + observationOptimizer optimizers.ObservationOptimizer // state contractsInitialized bool } @@ -114,7 +117,8 @@ func NewPlugin( reportingCfg.OracleID, destChain, ), - observer: metricsReporter, + observer: metricsReporter, + observationOptimizer: optimizers.NewObservationOptimizer(maxObservationLength), } } diff --git a/execute/plugin_functions.go b/execute/plugin_functions.go index 133507558..924533911 100644 --- a/execute/plugin_functions.go +++ b/execute/plugin_functions.go @@ -6,8 +6,6 @@ import ( "sort" "time" - "golang.org/x/exp/maps" - mapset "github.com/deckarep/golang-set/v2" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -245,171 +243,6 @@ func filterOutExecutedMessages( return filtered, nil } -// truncateObservation truncates the observation to fit within the given maxSize after encoding. -// It removes data from the observation in the following order: -// For each chain, pick last report and start removing messages one at a time. -// If removed all messages from the report, remove the report. -// If removed last report in the chain, remove the chain. -// After removing full report from a chain, move to the next chain and repeat. This ensures that we don't -// exclude messages from one chain only. -// Keep repeating this process until the encoded observation fits within the maxSize -// Important Note: We can't delete messages completely from single reports as we need them to create merkle proofs. -// -//nolint:gocyclo -func truncateObservation( - obs exectypes.Observation, - maxSize int, -) (exectypes.Observation, error) { - // TODO: Use a hash to store encoding sizes for individual messages - // and use that to determine how many messages to delete. - observation := obs - encodedObs, err := observation.Encode() - if err != nil { - return exectypes.Observation{}, err - } - if len(encodedObs) <= maxSize { - return observation, nil - } - - chains := maps.Keys(observation.CommitReports) - sort.Slice(chains, func(i, j int) bool { - return chains[i] < chains[j] - }) - - // If the encoded observation is too large, start filtering data. - for len(encodedObs) > maxSize { - for _, chain := range chains { - commits := observation.CommitReports[chain] - if len(commits) == 0 { - continue - } - lastCommit := &commits[len(commits)-1] - seqNum := lastCommit.SequenceNumberRange.Start() - - for seqNum <= lastCommit.SequenceNumberRange.End() { - if _, ok := observation.Messages[chain][seqNum]; !ok { - return exectypes.Observation{}, fmt.Errorf("missing message with seqNr %d from chain %d", seqNum, chain) - } - observation.Messages[chain][seqNum] = cciptypes.Message{} - - if _, ok := observation.TokenData[chain][seqNum]; !ok { - return exectypes.Observation{}, fmt.Errorf( - "missing tokenData for message with seqNr %d from chain %d", seqNum, chain, - ) - } - observation.TokenData[chain][seqNum] = exectypes.NewMessageTokenData() - - seqNum++ - // Each report will be deleted completely by maximum looping 8 times as the max report messages is 256. - // TODO: Remove the 32 check once we implement the hash size calculation. - if seqNum%32 == 0 && observationFitsSize(observation, maxSize) { - return observation, nil - } - } - - // Reaching here means that all messages in the report are truncated, truncate the last commit - observation = truncateLastCommit(observation, chain) - - if len(observation.CommitReports[chain]) == 0 { - // If the last commit report was truncated, truncate the chain - observation = truncateChain(observation, chain) - } - - if observationFitsSize(observation, maxSize) { - return observation, nil - } - chains = maps.Keys(observation.CommitReports) - } - // Truncated all chains. Return observation as is. (it has other data like contract discovery) - if len(observation.CommitReports) == 0 { - return observation, nil - } - encodedObs, err = observation.Encode() - if err != nil { - return exectypes.Observation{}, nil - } - } - - return observation, nil -} - -func observationFitsSize(obs exectypes.Observation, maxSize int) bool { - encodedObs, err := obs.Encode() - if err != nil { - return false - } - return len(encodedObs) <= maxSize -} - -// truncateLastCommit removes the last commit from the observation. -// errors if there are no commits to truncate. -func truncateLastCommit( - obs exectypes.Observation, - chain cciptypes.ChainSelector, -) exectypes.Observation { - observation := obs - commits := observation.CommitReports[chain] - if len(commits) == 0 { - return observation - } - lastCommit := commits[len(commits)-1] - // Remove the last commit from the list. - commits = commits[:len(commits)-1] - observation.CommitReports[chain] = commits - for seqNum, msg := range observation.Messages[chain] { - if lastCommit.SequenceNumberRange.Contains(seqNum) { - // Remove the message from the observation. - delete(observation.Messages[chain], seqNum) - // Remove the token data from the observation. - delete(observation.TokenData[chain], seqNum) - // Remove costly messages - for i, costlyMessage := range observation.CostlyMessages { - if costlyMessage == msg.Header.MessageID { - observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) - } - } - // Leaving Nonces untouched - } - } - - return observation -} - -// truncateChain removes all data related to the given chain from the observation. -// returns true if the chain was found and truncated, false otherwise. -func truncateChain( - obs exectypes.Observation, - chain cciptypes.ChainSelector, -) exectypes.Observation { - observation := obs - if _, ok := observation.CommitReports[chain]; !ok { - return observation - } - messageIDs := make(map[cciptypes.Bytes32]struct{}) - // To remove costly message IDs we need to iterate over all messages and find the ones that belong to the chain. - for _, seqNumMap := range observation.Messages { - for _, message := range seqNumMap { - messageIDs[message.Header.MessageID] = struct{}{} - } - } - - deleteCostlyMessages := func() { - for i, costlyMessage := range observation.CostlyMessages { - if _, ok := messageIDs[costlyMessage]; ok { - observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) - } - } - } - - delete(observation.CommitReports, chain) - delete(observation.Messages, chain) - delete(observation.TokenData, chain) - delete(observation.Nonces, chain) - deleteCostlyMessages() - - return observation -} - func decodeAttributedObservations( aos []types.AttributedObservation, ) ([]plugincommon.AttributedObservation[exectypes.Observation], error) { diff --git a/execute/plugin_functions_test.go b/execute/plugin_functions_test.go index 64987354d..63754dcf9 100644 --- a/execute/plugin_functions_test.go +++ b/execute/plugin_functions_test.go @@ -1317,338 +1317,3 @@ func Test_getMessageTimestampMap(t *testing.T) { }) } } - -func Test_truncateLastCommit(t *testing.T) { - tests := []struct { - name string - chain cciptypes.ChainSelector - observation exectypes.Observation - expected exectypes.Observation - }{ - { - name: "no commits to truncate", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: {}, - }, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: {}, - }, - }, - }, - { - name: "truncate last commit", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - 11: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x02}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - 11: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x02}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - truncated := truncateLastCommit(tt.observation, tt.chain) - require.Equal(t, tt.expected, truncated) - }) - } -} - -func Test_truncateChain(t *testing.T) { - tests := []struct { - name string - chain cciptypes.ChainSelector - observation exectypes.Observation - expected exectypes.Observation - }{ - { - name: "truncate chain data", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{}, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{}, - CostlyMessages: []cciptypes.Bytes32{}, - }, - }, - { - name: "truncate non existent chain", - chain: 2, // non existent chain - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - truncated := truncateChain(tt.observation, tt.chain) - require.Equal(t, tt.expected, truncated) - }) - } -} - -func Test_truncateObservation(t *testing.T) { - tests := []struct { - name string - observation exectypes.Observation - maxSize int - expected exectypes.Observation - deletedMsgs map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{} - wantErr bool - }{ - { - name: "no truncation needed", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - }, - maxSize: 1000, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - }, - }, - { - name: "truncate last commit", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 4}, - }, - withData(make([]byte, 100)), - ), - }, - maxSize: 2600, // this number is calculated by checking encoded sizes for the observation we expect - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - }, - }, - // Not going to check Messages in the test - }, - deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ - 1: { - 3: struct{}{}, - 4: struct{}{}, - }, - }, - }, - { - name: "truncate entire chain", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - }, - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 2}, - 2: {3, 4}, - }, - withData(make([]byte, 100)), - ), - }, - maxSize: 1789, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - }, - }, - //{ - // name: "truncate one message from first chain", - // observation: exectypes.Observation{ - // CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - // 1: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - // }, - // 2: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - // }, - // }, - // Messages: makeMessageObservation( - // map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - // 1: {1, 2}, - // 2: {3, 4}, - // }, - // withData(make([]byte, 100)), - // ), - // }, - // maxSize: 3159, // chain 1, message 1 will be truncated - // expected: exectypes.Observation{ - // CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - // 1: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - // }, - // 2: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - // }, - // }, - // }, - // deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ - // 1: { - // 1: struct{}{}, - // }, - // }, - //}, - { - name: "truncate all", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 10}, - 2: {11, 20}, - }, - ), - }, - maxSize: 50, // less than what can fit a single commit report for single chain - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.observation.TokenData = makeNoopTokenDataObservations(tt.observation.Messages) - tt.expected.TokenData = tt.observation.TokenData - obs, err := truncateObservation(tt.observation, tt.maxSize) - if tt.wantErr { - require.Error(t, err) - return - } - require.Equal(t, tt.expected.CommitReports, obs.CommitReports) - - // Check only deleted messages were deleted - for chain, seqNums := range obs.Messages { - for seqNum := range seqNums { - if _, ok := tt.deletedMsgs[chain]; ok { - if _, ok2 := tt.deletedMsgs[chain][seqNum]; ok2 { - require.True(t, obs.Messages[chain][seqNum].IsEmpty()) - continue - } - } - require.False(t, obs.Messages[chain][seqNum].IsEmpty()) - } - } - }) - } -} diff --git a/execute/test_utils.go b/execute/test_utils.go index 518341ebf..2238a6f7e 100644 --- a/execute/test_utils.go +++ b/execute/test_utils.go @@ -461,43 +461,6 @@ func makeMsgWithMetadata( } } -func makeMessageObservation( - srcToSeqNumRange map[cciptypes.ChainSelector]cciptypes.SeqNumRange, - opts ...msgOption) exectypes.MessageObservations { - - obs := make(exectypes.MessageObservations) - for src, seqNumRng := range srcToSeqNumRange { - obs[src] = make(map[cciptypes.SeqNum]cciptypes.Message) - for i := seqNumRng.Start(); i <= seqNumRng.End(); i++ { - msg := cciptypes.Message{ - Header: cciptypes.RampMessageHeader{ - SourceChainSelector: src, - SequenceNumber: i, - MessageID: rand.RandomBytes32(), - }, - FeeValueJuels: cciptypes.NewBigIntFromInt64(100), - } - for _, opt := range opts { - opt(&msg) - } - obs[src][i] = msg - } - } - return obs -} - -func makeNoopTokenDataObservations(msgs exectypes.MessageObservations) exectypes.TokenDataObservations { - tokenData := make(exectypes.TokenDataObservations) - for src, seqNumToMsg := range msgs { - tokenData[src] = make(map[cciptypes.SeqNum]exectypes.MessageTokenData) - for seq := range seqNumToMsg { - tokenData[src][seq] = exectypes.NewMessageTokenData() - } - } - return tokenData - -} - type nodeSetup struct { node *Plugin reportCodec cciptypes.ExecutePluginCodec From bf224dc36b842c635e2849e2c95d3780929c48c5 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Fri, 20 Dec 2024 12:25:40 -0500 Subject: [PATCH 2/2] Check that fChain(dest) exists before using it. (#398) --- internal/plugincommon/discovery/processor.go | 37 +++++++++++--------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/internal/plugincommon/discovery/processor.go b/internal/plugincommon/discovery/processor.go index 8e17d80d3..5dc23d992 100644 --- a/internal/plugincommon/discovery/processor.go +++ b/internal/plugincommon/discovery/processor.go @@ -218,6 +218,7 @@ func (cdp *ContractDiscoveryProcessor) Outcome( ctx context.Context, _ dt.Outcome, _ dt.Query, aos []plugincommon.AttributedObservation[dt.Observation], ) (dt.Outcome, error) { cdp.lggr.Infow("Processing contract discovery outcome", "observations", aos) + contracts := make(reader.ContractAddresses) agg := aggregateObservations(cdp.lggr, cdp.dest, aos) @@ -225,24 +226,28 @@ func (cdp *ContractDiscoveryProcessor) Outcome( donThresh := consensus.MakeConstantThreshold[cciptypes.ChainSelector](consensus.TwoFPlus1(cdp.fRoleDON)) fChain := consensus.GetConsensusMap(cdp.lggr, "fChain", agg.fChain, donThresh) fChainThresh := consensus.MakeMultiThreshold(fChain, consensus.TwoFPlus1) - destThresh := consensus.MakeConstantThreshold[cciptypes.ChainSelector](consensus.TwoFPlus1(fChain[cdp.dest])) - contracts := make(reader.ContractAddresses) - onrampConsensus := consensus.GetConsensusMap( - cdp.lggr, - "onramp", - agg.onrampAddrs, - destThresh, - ) - cdp.lggr.Infow("Determined consensus onramps", - "onrampConsensus", onrampConsensus, - "onrampAddrs", agg.onrampAddrs, - "fChainThresh", fChainThresh, - ) - if len(onrampConsensus) == 0 { - cdp.lggr.Debugw("No consensus on onramps, onrampConsensus map is empty") + if _, exists := fChain[cdp.dest]; !exists { + cdp.lggr.Warnw("missing fChain for dest (fChain[%d]), skipping onramp address lookup", cdp.dest) + } else { + destThresh := consensus.MakeConstantThreshold[cciptypes.ChainSelector](consensus.TwoFPlus1(fChain[cdp.dest])) + + onrampConsensus := consensus.GetConsensusMap( + cdp.lggr, + "onramp", + agg.onrampAddrs, + destThresh, + ) + cdp.lggr.Infow("Determined consensus onramps", + "onrampConsensus", onrampConsensus, + "onrampAddrs", agg.onrampAddrs, + "fChainThresh", fChainThresh, + ) + if len(onrampConsensus) == 0 { + cdp.lggr.Debugw("No consensus on onramps, onrampConsensus map is empty") + } + contracts[consts.ContractNameOnRamp] = onrampConsensus } - contracts[consts.ContractNameOnRamp] = onrampConsensus nonceManagerConsensus := consensus.GetConsensusMap( cdp.lggr,