diff --git a/execute/exectypes/observation.go b/execute/exectypes/observation.go index b68fe970..67362219 100644 --- a/execute/exectypes/observation.go +++ b/execute/exectypes/observation.go @@ -89,10 +89,6 @@ type Observation struct { // execute report. Messages MessageObservations `json:"messages"` Hashes MessageHashes `json:"messageHashes"` - // EncodedMsgAndTokenDataSizes are determined during the Get Messages phase - // It contains the sum of encoded sizes of messages and their token data counterpart that were observed in same - // phase. - MessageAndTokenDataEncodedSizes EncodedMsgAndTokenDataSizes `json:"messageEncodedSizes"` // TokenData are determined during the second phase of execute. // It contains the token data for the messages identified in the same stage as Messages TokenData TokenDataObservations `json:"tokenDataObservations"` diff --git a/execute/exectypes/type_optimizer.go b/execute/exectypes/type_optimizer.go index 5a76f14b..012865a3 100644 --- a/execute/exectypes/type_optimizer.go +++ b/execute/exectypes/type_optimizer.go @@ -13,6 +13,13 @@ type ObservationOptimizer struct { emptyEncodedSizes EmptyEncodeSizes } +func NewObservationOptimizer(maxEncodedSize int) ObservationOptimizer { + return ObservationOptimizer{ + maxEncodedSize: maxEncodedSize, + emptyEncodedSizes: NewEmptyEncodeSizes(), + } +} + type EmptyEncodeSizes struct { MessageAndTokenData int CommitData int @@ -38,7 +45,7 @@ func NewEmptyEncodeSizes() EmptyEncodeSizes { } } -// truncateObservation truncates the observation to fit within the given op.maxEncodedSize after encoding. +// TruncateObservation truncates the observation to fit within the given op.maxEncodedSize after encoding. // It removes data from the observation in the following order: // For each chain, pick last report and start removing messages one at a time. // If removed all messages from the report, remove the report. @@ -47,7 +54,7 @@ func NewEmptyEncodeSizes() EmptyEncodeSizes { // exclude messages from one chain only. // Keep repeating this process until the encoded observation fits within the op.maxEncodedSize // Important Note: We can't delete messages completely from single reports as we need them to create merkle proofs. -func (op ObservationOptimizer) truncateObservation(observation Observation) (Observation, error) { +func (op ObservationOptimizer) TruncateObservation(observation Observation) (Observation, error) { obs := observation encodedObs, err := obs.Encode() if err != nil { @@ -63,6 +70,7 @@ func (op ObservationOptimizer) truncateObservation(observation Observation) (Obs return chains[i] < chains[j] }) + messageAndTokenDataEncodedSizes := GetEncodedMsgAndTokenDataSizes(obs.Messages, obs.TokenData) // If the encoded obs is too large, start filtering data. for encodedObsSize > op.maxEncodedSize { for _, chain := range chains { @@ -79,18 +87,21 @@ func (op ObservationOptimizer) truncateObservation(observation Observation) (Obs } obs.Messages[chain][seqNum] = cciptypes.Message{} obs.TokenData[chain][seqNum] = NewMessageTokenData() - - encodedObsSize = encodedObsSize - - obs.MessageAndTokenDataEncodedSizes[chain][seqNum] + // Subtract the removed message and token size - op.emptyEncodedSizes.MessageAndTokenData // Add empty message and token encoded size + // Subtract the removed message and token size + encodedObsSize -= messageAndTokenDataEncodedSizes[chain][seqNum] + // Add empty message and token encoded size + encodedObsSize += op.emptyEncodedSizes.MessageAndTokenData seqNum++ if encodedObsSize <= op.maxEncodedSize { return obs, nil } } + var sizeDelta int // Reaching here means that all messages in the report are truncated, truncate the last commit - obs, encodedObsSize = op.truncateLastCommit(obs, chain, encodedObsSize) + obs, sizeDelta = op.truncateLastCommit(obs, chain) + + encodedObsSize += sizeDelta if len(obs.CommitReports[chain]) == 0 { // If the last commit report was truncated, truncate the chain @@ -123,20 +134,19 @@ func (op ObservationOptimizer) truncateObservation(observation Observation) (Obs func (op ObservationOptimizer) truncateLastCommit( obs Observation, chain cciptypes.ChainSelector, - currentObsEncSize int, ) (Observation, int) { observation := obs - newSize := currentObsEncSize + sizeDelta := 0 commits := observation.CommitReports[chain] if len(commits) == 0 { - return observation, currentObsEncSize + return observation, sizeDelta } lastCommit := commits[len(commits)-1] // Remove the last commit from the list. commits = commits[:len(commits)-1] observation.CommitReports[chain] = commits // Remove from the encoded size. - newSize = newSize - op.emptyEncodedSizes.CommitData - 4 // brackets, and commas + sizeDelta = sizeDelta - op.emptyEncodedSizes.CommitData - 4 // brackets, and commas for seqNum, msg := range observation.Messages[chain] { if lastCommit.SequenceNumberRange.Contains(seqNum) { // Remove the message from the observation. @@ -145,8 +155,9 @@ func (op ObservationOptimizer) truncateLastCommit( delete(observation.TokenData[chain], seqNum) //delete(observation.Hashes[chain], seqNum) // Remove the encoded size of the message and token data. - newSize = newSize - op.emptyEncodedSizes.MessageAndTokenData - 2*op.emptyEncodedSizes.SeqNumMap - - 4 // for brackets and commas + sizeDelta -= op.emptyEncodedSizes.MessageAndTokenData + sizeDelta -= 2 * op.emptyEncodedSizes.SeqNumMap + sizeDelta -= 4 // for brackets and commas // Remove costly messages for i, costlyMessage := range observation.CostlyMessages { if costlyMessage == msg.Header.MessageID { @@ -157,7 +168,7 @@ func (op ObservationOptimizer) truncateLastCommit( } } - return observation, newSize + return observation, sizeDelta } // truncateChain removes all data related to the given chain from the observation. diff --git a/execute/exectypes/type_optimizer_test.go b/execute/exectypes/type_optimizer_test.go new file mode 100644 index 00000000..38726ba6 --- /dev/null +++ b/execute/exectypes/type_optimizer_test.go @@ -0,0 +1,391 @@ +package exectypes + +import ( + "github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers/rand" + cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" + "github.com/stretchr/testify/require" + "testing" +) + +func Test_truncateLastCommit(t *testing.T) { + tests := []struct { + name string + chain cciptypes.ChainSelector + observation Observation + expected Observation + }{ + { + name: "no commits to truncate", + chain: 1, + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: {}, + }, + }, + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: {}, + }, + }, + }, + { + name: "truncate last commit", + chain: 1, + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + 11: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x02}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]MessageTokenData{ + 1: { + 1: NewMessageTokenData(), + 11: NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x02}}, + }, + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]MessageTokenData{ + 1: { + 1: NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := NewObservationOptimizer(10000) + truncated, _ := op.truncateLastCommit(tt.observation, tt.chain) + require.Equal(t, tt.expected, truncated) + }) + } +} + +func Test_truncateChain(t *testing.T) { + tests := []struct { + name string + chain cciptypes.ChainSelector + observation Observation + expected Observation + }{ + { + name: "truncate chain data", + chain: 1, + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]MessageTokenData{ + 1: { + 1: NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{}, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{}, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]MessageTokenData{}, + CostlyMessages: []cciptypes.Bytes32{}, + }, + }, + { + name: "truncate non existent chain", + chain: 2, // non existent chain + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]MessageTokenData{ + 1: { + 1: NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ + 1: { + 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, + }, + }, + TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]MessageTokenData{ + 1: { + 1: NewMessageTokenData(), + }, + }, + CostlyMessages: []cciptypes.Bytes32{{0x01}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := NewObservationOptimizer(10000) // size not important here + truncated := op.truncateChain(tt.observation, tt.chain) + require.Equal(t, tt.expected, truncated) + }) + } +} + +func Test_truncateObservation(t *testing.T) { + tests := []struct { + name string + observation Observation + maxSize int + expected Observation + deletedMsgs map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{} + wantErr bool + }{ + { + name: "no truncation needed", + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + }, + maxSize: 1000, + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + }, + }, + }, + { + name: "truncate last commit", + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 2600, // this number is calculated by checking encoded sizes for the observation we expect + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + }, + // Not going to check Messages in the test + }, + deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ + 1: { + 3: struct{}{}, + 4: struct{}{}, + }, + }, + }, + { + name: "truncate entire chain", + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 2}, + 2: {3, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 1789, + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + }, + }, + { + name: "truncate one message from first chain", + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 2}, + 2: {3, 4}, + }, + withData(make([]byte, 100)), + ), + }, + maxSize: 3159, // chain 1, message 1 will be truncated + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, + }, + }, + }, + deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ + 1: { + 1: struct{}{}, + }, + }, + }, + { + name: "truncate all", + observation: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{ + 1: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, + }, + 2: { + {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, + }, + }, + Messages: makeMessageObservation( + map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ + 1: {1, 10}, + 2: {11, 20}, + }, + ), + }, + maxSize: 50, // less than what can fit a single commit report for single chain + expected: Observation{ + CommitReports: map[cciptypes.ChainSelector][]CommitData{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.observation.TokenData = makeNoopTokenDataObservations(tt.observation.Messages) + tt.expected.TokenData = tt.observation.TokenData + op := NewObservationOptimizer(tt.maxSize) + obs, err := op.TruncateObservation(tt.observation) + if tt.wantErr { + require.Error(t, err) + return + } + require.Equal(t, tt.expected.CommitReports, obs.CommitReports) + + // Check only deleted messages were deleted + for chain, seqNums := range obs.Messages { + for seqNum := range seqNums { + if _, ok := tt.deletedMsgs[chain]; ok { + if _, ok2 := tt.deletedMsgs[chain][seqNum]; ok2 { + require.True(t, obs.Messages[chain][seqNum].IsEmpty()) + continue + } + } + require.False(t, obs.Messages[chain][seqNum].IsEmpty()) + } + } + }) + } +} + +func makeMessageObservation( + srcToSeqNumRange map[cciptypes.ChainSelector]cciptypes.SeqNumRange, + opts ...msgOption) MessageObservations { + + obs := make(MessageObservations) + for src, seqNumRng := range srcToSeqNumRange { + obs[src] = make(map[cciptypes.SeqNum]cciptypes.Message) + for i := seqNumRng.Start(); i <= seqNumRng.End(); i++ { + msg := cciptypes.Message{ + Header: cciptypes.RampMessageHeader{ + SourceChainSelector: src, + SequenceNumber: i, + MessageID: rand.RandomBytes32(), + }, + FeeValueJuels: cciptypes.NewBigIntFromInt64(100), + } + for _, opt := range opts { + opt(&msg) + } + obs[src][i] = msg + } + } + return obs +} + +func makeNoopTokenDataObservations(msgs MessageObservations) TokenDataObservations { + tokenData := make(TokenDataObservations) + for src, seqNumToMsg := range msgs { + tokenData[src] = make(map[cciptypes.SeqNum]MessageTokenData) + for seq := range seqNumToMsg { + tokenData[src][seq] = NewMessageTokenData() + } + } + return tokenData + +} + +type msgOption func(*cciptypes.Message) + +func withData(data []byte) msgOption { + return func(m *cciptypes.Message) { + m.Data = data + } +} diff --git a/execute/observation.go b/execute/observation.go index 5d707efb..b03ba288 100644 --- a/execute/observation.go +++ b/execute/observation.go @@ -255,10 +255,11 @@ func (p *Plugin) getMessagesObservation( observation.Hashes = hashes observation.CostlyMessages = costlyMessages observation.TokenData = tkData - observation.MessageAndTokenDataEncodedSizes = exectypes.GetEncodedMsgAndTokenDataSizes(messageObs, tkData) + //observation.MessageAndTokenDataEncodedSizes = exectypes.GetEncodedMsgAndTokenDataSizes(messageObs, tkData) // Make sure encoded observation fits within the maximum observation size. - observation, err = truncateObservation(observation, maxObservationLength, p.emptyEncodedSizes) + //observation, err = truncateObservation(observation, maxObservationLength, p.emptyEncodedSizes) + observation, err = p.observationOptimizer.TruncateObservation(observation) if err != nil { return exectypes.Observation{}, fmt.Errorf("unable to truncate observation: %w", err) } diff --git a/execute/plugin.go b/execute/plugin.go index 42a4d62b..a7da01fc 100644 --- a/execute/plugin.go +++ b/execute/plugin.go @@ -63,7 +63,7 @@ type Plugin struct { estimateProvider gas.EstimateProvider lggr logger.Logger - emptyEncodedSizes exectypes.EmptyEncodeSizes + observationOptimizer exectypes.ObservationOptimizer // state contractsInitialized bool } @@ -115,8 +115,8 @@ func NewPlugin( reportingCfg.OracleID, destChain, ), - observer: metricsReporter, - emptyEncodedSizes: exectypes.NewEmptyEncodeSizes(), + observer: metricsReporter, + observationOptimizer: exectypes.NewObservationOptimizer(maxObservationLength), } } diff --git a/execute/plugin_functions.go b/execute/plugin_functions.go index 4d1a3ebc..92453391 100644 --- a/execute/plugin_functions.go +++ b/execute/plugin_functions.go @@ -6,8 +6,6 @@ import ( "sort" "time" - "golang.org/x/exp/maps" - mapset "github.com/deckarep/golang-set/v2" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -245,168 +243,6 @@ func filterOutExecutedMessages( return filtered, nil } -// truncateObservation truncates the observation to fit within the given maxSize after encoding. -// It removes data from the observation in the following order: -// For each chain, pick last report and start removing messages one at a time. -// If removed all messages from the report, remove the report. -// If removed last report in the chain, remove the chain. -// After removing full report from a chain, move to the next chain and repeat. This ensures that we don't -// exclude messages from one chain only. -// Keep repeating this process until the encoded observation fits within the maxSize -// Important Note: We can't delete messages completely from single reports as we need them to create merkle proofs. -func truncateObservation( - observation exectypes.Observation, - maxSize int, - emptyEncodedSizes exectypes.EmptyEncodeSizes, -) (exectypes.Observation, error) { - obs := observation - encodedObs, err := obs.Encode() - if err != nil { - return exectypes.Observation{}, err - } - encodedObsSize := len(encodedObs) - if encodedObsSize <= maxSize { - return obs, nil - } - - chains := maps.Keys(obs.CommitReports) - sort.Slice(chains, func(i, j int) bool { - return chains[i] < chains[j] - }) - - // If the encoded obs is too large, start filtering data. - for encodedObsSize > maxSize { - for _, chain := range chains { - commits := obs.CommitReports[chain] - if len(commits) == 0 { - continue - } - lastCommit := &commits[len(commits)-1] - seqNum := lastCommit.SequenceNumberRange.Start() - - for seqNum <= lastCommit.SequenceNumberRange.End() { - if _, ok := obs.Messages[chain][seqNum]; !ok { - return exectypes.Observation{}, fmt.Errorf("missing message with seqNr %d from chain %d", seqNum, chain) - } - obs.Messages[chain][seqNum] = cciptypes.Message{} - obs.TokenData[chain][seqNum] = exectypes.NewMessageTokenData() - - encodedObsSize = encodedObsSize - - obs.MessageAndTokenDataEncodedSizes[chain][seqNum] + // Subtract the removed message and token size - emptyEncodedSizes.MessageAndTokenData // Add empty message and token encoded size - seqNum++ - if encodedObsSize <= maxSize { - return obs, nil - } - } - - // Reaching here means that all messages in the report are truncated, truncate the last commit - obs, encodedObsSize = truncateLastCommit(obs, chain, encodedObsSize, emptyEncodedSizes) - - if len(obs.CommitReports[chain]) == 0 { - // If the last commit report was truncated, truncate the chain - obs = truncateChain(obs, chain) - } - - if encodedObsSize <= maxSize { - return obs, nil - } - chains = maps.Keys(obs.CommitReports) - } - // Truncated all chains. Return obs as is. (it has other data like contract discovery) - if len(obs.CommitReports) == 0 { - return obs, nil - } - // Encoding again after doing a full iteration on all chains and removing messages/commits. - // That is because using encoded sizes is not 100% accurate and there are some missing bytes in the calculation. - encodedObs, err = obs.Encode() - if err != nil { - return exectypes.Observation{}, nil - } - encodedObsSize = len(encodedObs) - } - - return obs, nil -} - -// truncateLastCommit removes the last commit from the observation. -// errors if there are no commits to truncate. -func truncateLastCommit( - obs exectypes.Observation, - chain cciptypes.ChainSelector, - currentObsEncSize int, - emptyEncodedSizes exectypes.EmptyEncodeSizes, -) (exectypes.Observation, int) { - observation := obs - newSize := currentObsEncSize - commits := observation.CommitReports[chain] - if len(commits) == 0 { - return observation, currentObsEncSize - } - lastCommit := commits[len(commits)-1] - // Remove the last commit from the list. - commits = commits[:len(commits)-1] - observation.CommitReports[chain] = commits - // Remove from the encoded size. - newSize = newSize - emptyEncodedSizes.CommitData - 4 // brackets, and commas - for seqNum, msg := range observation.Messages[chain] { - if lastCommit.SequenceNumberRange.Contains(seqNum) { - // Remove the message from the observation. - delete(observation.Messages[chain], seqNum) - // Remove the token data from the observation. - delete(observation.TokenData[chain], seqNum) - //delete(observation.Hashes[chain], seqNum) - // Remove the encoded size of the message and token data. - newSize = newSize - emptyEncodedSizes.MessageAndTokenData - 2*emptyEncodedSizes.SeqNumMap - - 4 // for brackets and commas - // Remove costly messages - for i, costlyMessage := range observation.CostlyMessages { - if costlyMessage == msg.Header.MessageID { - observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) - } - } - // Leaving Nonces untouched - } - } - - return observation, newSize -} - -// truncateChain removes all data related to the given chain from the observation. -// returns true if the chain was found and truncated, false otherwise. -func truncateChain( - obs exectypes.Observation, - chain cciptypes.ChainSelector, -) exectypes.Observation { - observation := obs - if _, ok := observation.CommitReports[chain]; !ok { - return observation - } - messageIDs := make(map[cciptypes.Bytes32]struct{}) - // To remove costly message IDs we need to iterate over all messages and find the ones that belong to the chain. - for _, seqNumMap := range observation.Messages { - for _, message := range seqNumMap { - messageIDs[message.Header.MessageID] = struct{}{} - } - } - - deleteCostlyMessages := func() { - for i, costlyMessage := range observation.CostlyMessages { - if _, ok := messageIDs[costlyMessage]; ok { - observation.CostlyMessages = append(observation.CostlyMessages[:i], observation.CostlyMessages[i+1:]...) - } - } - } - - delete(observation.CommitReports, chain) - delete(observation.Messages, chain) - delete(observation.TokenData, chain) - delete(observation.Nonces, chain) - deleteCostlyMessages() - - return observation -} - func decodeAttributedObservations( aos []types.AttributedObservation, ) ([]plugincommon.AttributedObservation[exectypes.Observation], error) { diff --git a/execute/plugin_functions_test.go b/execute/plugin_functions_test.go index dfdb244c..63754dcf 100644 --- a/execute/plugin_functions_test.go +++ b/execute/plugin_functions_test.go @@ -1317,342 +1317,3 @@ func Test_getMessageTimestampMap(t *testing.T) { }) } } - -func Test_truncateLastCommit(t *testing.T) { - tests := []struct { - name string - chain cciptypes.ChainSelector - observation exectypes.Observation - expected exectypes.Observation - }{ - { - name: "no commits to truncate", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: {}, - }, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: {}, - }, - }, - }, - { - name: "truncate last commit", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - 11: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x02}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - 11: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x02}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - enc, err := tt.observation.Encode() - require.NoError(t, err) - truncated, _ := truncateLastCommit(tt.observation, tt.chain, len(enc), exectypes.NewEmptyEncodeSizes()) - require.Equal(t, tt.expected, truncated) - }) - } -} - -func Test_truncateChain(t *testing.T) { - tests := []struct { - name string - chain cciptypes.ChainSelector - observation exectypes.Observation - expected exectypes.Observation - }{ - { - name: "truncate chain data", - chain: 1, - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{}, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{}, - CostlyMessages: []cciptypes.Bytes32{}, - }, - }, - { - name: "truncate non existent chain", - chain: 2, // non existent chain - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - Messages: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message{ - 1: { - 1: {Header: cciptypes.RampMessageHeader{MessageID: cciptypes.Bytes32{0x01}}}, - }, - }, - TokenData: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]exectypes.MessageTokenData{ - 1: { - 1: exectypes.NewMessageTokenData(), - }, - }, - CostlyMessages: []cciptypes.Bytes32{{0x01}}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - truncated := truncateChain(tt.observation, tt.chain) - require.Equal(t, tt.expected, truncated) - }) - } -} - -func Test_truncateObservation(t *testing.T) { - tests := []struct { - name string - observation exectypes.Observation - maxSize int - expected exectypes.Observation - deletedMsgs map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{} - wantErr bool - }{ - { - name: "no truncation needed", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - }, - maxSize: 1000, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - }, - }, - }, - { - name: "truncate last commit", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 4}, - }, - withData(make([]byte, 100)), - ), - }, - maxSize: 2600, // this number is calculated by checking encoded sizes for the observation we expect - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - }, - }, - // Not going to check Messages in the test - }, - deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ - 1: { - 3: struct{}{}, - 4: struct{}{}, - }, - }, - }, - { - name: "truncate entire chain", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - }, - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 2}, - 2: {3, 4}, - }, - withData(make([]byte, 100)), - ), - }, - maxSize: 1819, - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - }, - }, - }, - }, - //{ - // name: "truncate one message from first chain", - // observation: exectypes.Observation{ - // CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - // 1: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - // }, - // 2: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - // }, - // }, - // Messages: makeMessageObservation( - // map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - // 1: {1, 2}, - // 2: {3, 4}, - // }, - // withData(make([]byte, 100)), - // ), - // }, - // maxSize: 3159, // chain 1, message 1 will be truncated - // expected: exectypes.Observation{ - // CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - // 1: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 2)}, - // }, - // 2: { - // {SequenceNumberRange: cciptypes.NewSeqNumRange(3, 4)}, - // }, - // }, - // }, - // deletedMsgs: map[cciptypes.ChainSelector]map[cciptypes.SeqNum]struct{}{ - // 1: { - // 1: struct{}{}, - // }, - // }, - //}, - { - name: "truncate all", - observation: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{ - 1: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(1, 10)}, - }, - 2: { - {SequenceNumberRange: cciptypes.NewSeqNumRange(11, 20)}, - }, - }, - Messages: makeMessageObservation( - map[cciptypes.ChainSelector]cciptypes.SeqNumRange{ - 1: {1, 10}, - 2: {11, 20}, - }, - ), - }, - maxSize: 50, // less than what can fit a single commit report for single chain - expected: exectypes.Observation{ - CommitReports: map[cciptypes.ChainSelector][]exectypes.CommitData{}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.observation.TokenData = makeNoopTokenDataObservations(tt.observation.Messages) - tt.expected.TokenData = tt.observation.TokenData - tt.observation.MessageAndTokenDataEncodedSizes = - exectypes.GetEncodedMsgAndTokenDataSizes(tt.observation.Messages, tt.observation.TokenData) - obs, err := truncateObservation(tt.observation, tt.maxSize, exectypes.NewEmptyEncodeSizes()) - if tt.wantErr { - require.Error(t, err) - return - } - require.Equal(t, tt.expected.CommitReports, obs.CommitReports) - - // Check only deleted messages were deleted - for chain, seqNums := range obs.Messages { - for seqNum := range seqNums { - if _, ok := tt.deletedMsgs[chain]; ok { - if _, ok2 := tt.deletedMsgs[chain][seqNum]; ok2 { - require.True(t, obs.Messages[chain][seqNum].IsEmpty()) - continue - } - } - require.False(t, obs.Messages[chain][seqNum].IsEmpty()) - } - } - }) - } -} diff --git a/execute/test_utils.go b/execute/test_utils.go index 518341eb..2238a6f7 100644 --- a/execute/test_utils.go +++ b/execute/test_utils.go @@ -461,43 +461,6 @@ func makeMsgWithMetadata( } } -func makeMessageObservation( - srcToSeqNumRange map[cciptypes.ChainSelector]cciptypes.SeqNumRange, - opts ...msgOption) exectypes.MessageObservations { - - obs := make(exectypes.MessageObservations) - for src, seqNumRng := range srcToSeqNumRange { - obs[src] = make(map[cciptypes.SeqNum]cciptypes.Message) - for i := seqNumRng.Start(); i <= seqNumRng.End(); i++ { - msg := cciptypes.Message{ - Header: cciptypes.RampMessageHeader{ - SourceChainSelector: src, - SequenceNumber: i, - MessageID: rand.RandomBytes32(), - }, - FeeValueJuels: cciptypes.NewBigIntFromInt64(100), - } - for _, opt := range opts { - opt(&msg) - } - obs[src][i] = msg - } - } - return obs -} - -func makeNoopTokenDataObservations(msgs exectypes.MessageObservations) exectypes.TokenDataObservations { - tokenData := make(exectypes.TokenDataObservations) - for src, seqNumToMsg := range msgs { - tokenData[src] = make(map[cciptypes.SeqNum]exectypes.MessageTokenData) - for seq := range seqNumToMsg { - tokenData[src][seq] = exectypes.NewMessageTokenData() - } - } - return tokenData - -} - type nodeSetup struct { node *Plugin reportCodec cciptypes.ExecutePluginCodec