diff --git a/commit/plugin.go b/commit/plugin.go index 13e8b9bbe..9e2b1935d 100644 --- a/commit/plugin.go +++ b/commit/plugin.go @@ -30,9 +30,9 @@ import ( "github.com/smartcontractkit/chainlink-ccip/pluginconfig" ) -type merkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation] -type tokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation] -type chainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation] +type attributedMerkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation] +type attributedTokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation] +type attributedChainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation] type Plugin struct { donID plugintypes.DonID @@ -171,7 +171,10 @@ func (p *Plugin) Query(ctx context.Context, outCtx ocr3types.OutcomeContext) (ty var err error var q Query - prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome) + prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome) + if err != nil { + return nil, fmt.Errorf("decode previous outcome: %w", err) + } q.MerkleRootQuery, err = p.merkleRootProcessor.Query(ctx, prevOutcome.MerkleRootOutcome) if err != nil { @@ -196,21 +199,17 @@ func (p *Plugin) ObservationQuorum( ) (bool, error) { // Across all chains we require at least 2F+1 observations. return quorumhelper.ObservationCountReachesObservationQuorum( - quorumhelper.QuorumTwoFPlusOne, p.reportingCfg.N, p.reportingCfg.F, aos), nil + quorumhelper.QuorumTwoFPlusOne, + p.reportingCfg.N, + p.reportingCfg.F, + aos, + ), nil } func (p *Plugin) Observation( ctx context.Context, outCtx ocr3types.OutcomeContext, q types.Query, ) (types.Observation, error) { - prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome) - fChain := p.ObserveFChain() - - decodedQ, err := DecodeCommitPluginQuery(q) - if err != nil { - return nil, fmt.Errorf("decode query: %w", err) - } - - // If the contracts are not initialized then only submit contracts discovery related observations. + // If the contracts are not initialized then only submit contracts discovery related observation. if !p.contractsInitialized.Load() && p.discoveryProcessor != nil { discoveryObs, err := p.discoveryProcessor.Observation(ctx, dt.Outcome{}, dt.Query{}) if err != nil { @@ -229,30 +228,44 @@ func (p *Plugin) Observation( return encoded, nil } + prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome) + if err != nil { + return nil, fmt.Errorf("decode previous outcome: %w", err) + } + + decodedQ, err := DecodeCommitPluginQuery(q) + if err != nil { + return nil, fmt.Errorf("decode query: %w", err) + } + merkleRootObs, err := p.merkleRootProcessor.Observation(ctx, prevOutcome.MerkleRootOutcome, decodedQ.MerkleRootQuery) if err != nil { - p.lggr.Errorw("failed to get merkle observation", "err", err) + p.lggr.Errorw("get merkle root processor observation", + "err", err, "prevOutcome", prevOutcome.MerkleRootOutcome, "decodedQ", decodedQ.MerkleRootQuery) } tokenPriceObs, err := p.tokenPriceProcessor.Observation(ctx, prevOutcome.TokenPriceOutcome, decodedQ.TokenPriceQuery) if err != nil { - p.lggr.Errorw("failed to get token prices", "err", err) + p.lggr.Errorw("get token price processor observation", "err", err, + "prevOutcome", prevOutcome.TokenPriceOutcome, "decodedQ", decodedQ.TokenPriceQuery) } chainFeeObs, err := p.chainFeeProcessor.Observation(ctx, prevOutcome.ChainFeeOutcome, decodedQ.ChainFeeQuery) if err != nil { - p.lggr.Errorw("failed to get gas prices", "err", err) + p.lggr.Errorw("get gas prices processor observation", + "err", err, "prevOutcome", prevOutcome.ChainFeeOutcome, "decodedQ", decodedQ.ChainFeeQuery) } obs := Observation{ MerkleRootObs: merkleRootObs, TokenPriceObs: tokenPriceObs, ChainFeeObs: chainFeeObs, - FChain: fChain, + FChain: p.ObserveFChain(), } + encoded, err := obs.Encode() if err != nil { - return nil, fmt.Errorf("failed to encode observation: %w, observation: %+v", err, obs) + return nil, fmt.Errorf("encode observation: %w, observation: %+v", err, obs) } p.lggr.Debugw("Commit plugin making observation", "encodedObservation", encoded, "observation", obs) @@ -262,77 +275,62 @@ func (p *Plugin) Observation( func (p *Plugin) ObserveFChain() map[cciptypes.ChainSelector]int { fChain, err := p.homeChain.GetFChain() if err != nil { - // TODO: metrics p.lggr.Errorw("call to GetFChain failed", "err", err) return map[cciptypes.ChainSelector]int{} } return fChain } -// Outcome depending on the current state, either: -// - chooses the seq num ranges for the next round -// - builds a report -// - checks for the transmission of a previous report func (p *Plugin) Outcome( ctx context.Context, outCtx ocr3types.OutcomeContext, q types.Query, aos []types.AttributedObservation, ) (ocr3types.Outcome, error) { - p.lggr.Debugw("Commit plugin performing outcome", - "outctx", outCtx, - "query", q, - "attributedObservations", aos) + p.lggr.Debugw("performing outcome", "outctx", outCtx, "query", q, "attributedObservations", aos) - prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome) + prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome) + if err != nil { + return nil, fmt.Errorf("decode previous outcome: %w", err) + } decodedQ, err := DecodeCommitPluginQuery(q) if err != nil { return nil, fmt.Errorf("decode query: %w", err) } - var merkleObservations []merkleRootObservation - var tokensObservations []tokenPricesObservation - var feeObservations []chainFeeObservation - var discoveryObservations []plugincommon.AttributedObservation[dt.Observation] + merkleRootObservations := make([]attributedMerkleRootObservation, 0, len(aos)) + tokenPricesObservations := make([]attributedTokenPricesObservation, 0, len(aos)) + chainFeeObservations := make([]attributedChainFeeObservation, 0, len(aos)) + discoveryObservations := make([]plugincommon.AttributedObservation[dt.Observation], 0, len(aos)) for _, ao := range aos { obs, err := DecodeCommitPluginObservation(ao.Observation) if err != nil { - p.lggr.Errorw("failed to decode observation", "err", err) + p.lggr.Warnw("failed to decode observation, observation skipped", "err", err) continue } + p.lggr.Debugw("Commit plugin outcome decoded observation", "observation", obs) - merkleObservations = append(merkleObservations, - merkleRootObservation{ - OracleID: ao.Observer, - Observation: obs.MerkleRootObs, - }, - ) - - tokensObservations = append(tokensObservations, - tokenPricesObservation{ - OracleID: ao.Observer, - Observation: obs.TokenPriceObs, - }, - ) - - feeObservations = append(feeObservations, - chainFeeObservation{ - OracleID: ao.Observer, - Observation: obs.ChainFeeObs, - }, - ) - - discoveryObservations = append(discoveryObservations, - plugincommon.AttributedObservation[dt.Observation]{ - OracleID: ao.Observer, - Observation: obs.DiscoveryObs, - }) + + merkleRootObservations = append(merkleRootObservations, attributedMerkleRootObservation{ + OracleID: ao.Observer, Observation: obs.MerkleRootObs}) + + tokenPricesObservations = append(tokenPricesObservations, attributedTokenPricesObservation{ + OracleID: ao.Observer, Observation: obs.TokenPriceObs}) + + chainFeeObservations = append(chainFeeObservations, attributedChainFeeObservation{ + OracleID: ao.Observer, Observation: obs.ChainFeeObs}) + + discoveryObservations = append(discoveryObservations, plugincommon.AttributedObservation[dt.Observation]{ + OracleID: ao.Observer, Observation: obs.DiscoveryObs}) } if p.discoveryProcessor != nil { p.lggr.Infow("Processing discovery observations", "discoveryObservations", discoveryObservations) + + // The outcome phase of the discovery processor is binding contracts to the chain reader. This is the reason + // we ignore the outcome of the discovery processor. _, err = p.discoveryProcessor.Outcome(ctx, dt.Outcome{}, dt.Query{}, discoveryObservations) if err != nil { - return nil, fmt.Errorf("unable to process outcome of discovery processor: %w", err) + return nil, fmt.Errorf("discovery processor outcome: %w", err) } p.contractsInitialized.Store(true) } @@ -341,17 +339,17 @@ func (p *Plugin) Outcome( ctx, prevOutcome.MerkleRootOutcome, decodedQ.MerkleRootQuery, - merkleObservations, + merkleRootObservations, ) if err != nil { - p.lggr.Errorw("failed to get merkle outcome", "err", err) + p.lggr.Errorw(" get merkle roots outcome", "err", err) } tokenPriceOutcome, err := p.tokenPriceProcessor.Outcome( ctx, prevOutcome.TokenPriceOutcome, decodedQ.TokenPriceQuery, - tokensObservations, + tokenPricesObservations, ) if err != nil { p.lggr.Warnw("failed to get token prices outcome", "err", err) @@ -361,7 +359,7 @@ func (p *Plugin) Outcome( ctx, prevOutcome.ChainFeeOutcome, decodedQ.ChainFeeQuery, - feeObservations, + chainFeeObservations, ) if err != nil { p.lggr.Warnw("failed to get gas prices outcome", "err", err) @@ -383,20 +381,6 @@ func (p *Plugin) Close() error { ) } -func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) Outcome { - if len(outcome) == 0 { - return Outcome{} - } - - decodedOutcome, err := DecodeOutcome(outcome) - if err != nil { - p.lggr.Errorw("Failed to decode Outcome", "outcome", outcome, "err", err) - return Outcome{} - } - - return decodedOutcome -} - // Assuming that we have to delegate a specific amount of time to the observation requests and the report requests. // We define some percentages in order to help us calculate the time we have to delegate to each request timer. const ( diff --git a/commit/plugin_e2e_test.go b/commit/plugin_e2e_test.go index 0b57d4490..482c0bf11 100644 --- a/commit/plugin_e2e_test.go +++ b/commit/plugin_e2e_test.go @@ -242,7 +242,7 @@ func TestPlugin_E2E_AllNodesAgree_MerkleRoots(t *testing.T) { res, err := runner.RunRound(params.ctx) assert.NoError(t, err) - decodedOutcome, err := DecodeOutcome(res.Outcome) + decodedOutcome, err := decodeOutcome(res.Outcome) assert.NoError(t, err) assert.Equal(t, normalizeOutcome(tc.expOutcome), normalizeOutcome(decodedOutcome)) @@ -387,7 +387,7 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) { res, err := runner.RunRound(params.ctx) assert.NoError(t, err) - decodedOutcome, err := DecodeOutcome(res.Outcome) + decodedOutcome, err := decodeOutcome(res.Outcome) assert.NoError(t, err) assert.Equal(t, normalizeOutcome(tc.expOutcome), normalizeOutcome(decodedOutcome)) @@ -614,7 +614,7 @@ func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) { res, err := runner.RunRound(params.ctx) assert.NoError(t, err) - decodedOutcome, err := DecodeOutcome(res.Outcome) + decodedOutcome, err := decodeOutcome(res.Outcome) assert.NoError(t, err) assert.Equal(t, normalizeOutcome(tc.expOutcome), normalizeOutcome(decodedOutcome)) diff --git a/commit/report.go b/commit/report.go index fc363c70c..ead8e7278 100644 --- a/commit/report.go +++ b/commit/report.go @@ -2,7 +2,6 @@ package commit import ( "context" - "encoding/hex" "encoding/json" "fmt" @@ -25,7 +24,7 @@ func (ri ReportInfo) Encode() ([]byte, error) { return json.Marshal(ri) } -// decode should be used to decode the report info +// Decode should be used to decode the report info func (ri *ReportInfo) Decode(encodedReportInfo []byte) error { return json.Unmarshal(encodedReportInfo, ri) } @@ -33,11 +32,10 @@ func (ri *ReportInfo) Decode(encodedReportInfo []byte) error { func (p *Plugin) Reports( ctx context.Context, seqNr uint64, outcomeBytes ocr3types.Outcome, ) ([]ocr3types.ReportPlus[[]byte], error) { - outcome, err := DecodeOutcome(outcomeBytes) + outcome, err := decodeOutcome(outcomeBytes) if err != nil { - // TODO: metrics - p.lggr.Errorw("failed to decode Outcome", "outcomeBytes", outcomeBytes, "err", err) - return nil, fmt.Errorf("failed to decode Outcome (%s): %w", hex.EncodeToString(outcomeBytes), err) + p.lggr.Errorw("failed to decode Outcome", "outcome", string(outcomeBytes), "err", err) + return nil, fmt.Errorf("decode outcome: %w", err) } // Gas prices and token prices do not need to get reported when merkle roots do not exist. @@ -73,13 +71,7 @@ func (p *Plugin) Reports( return nil, fmt.Errorf("encode commit plugin report: %w", err) } - // Prepare the info data - reportInfo := ReportInfo{ - RemoteF: outcome.MerkleRootOutcome.RMNRemoteCfg.F, - } - - // Serialize reportInfo to []byte - infoBytes, err := reportInfo.Encode() + reportInfo, err := ReportInfo{RemoteF: outcome.MerkleRootOutcome.RMNRemoteCfg.F}.Encode() if err != nil { return nil, fmt.Errorf("encode report info: %w", err) } @@ -88,7 +80,7 @@ func (p *Plugin) Reports( { ReportWithInfo: ocr3types.ReportWithInfo[[]byte]{ Report: encodedReport, - Info: infoBytes, + Info: reportInfo, }, }, }, nil diff --git a/commit/types.go b/commit/types.go index 55bfa8015..c81244f45 100644 --- a/commit/types.go +++ b/commit/types.go @@ -68,8 +68,15 @@ func (o Outcome) Encode() ([]byte, error) { return encodedOutcome, nil } -func DecodeOutcome(b []byte) (Outcome, error) { +func decodeOutcome(b []byte) (Outcome, error) { + if len(b) == 0 { + return Outcome{}, nil + } + o := Outcome{} - err := json.Unmarshal(b, &o) - return o, err + if err := json.Unmarshal(b, &o); err != nil { + return Outcome{}, fmt.Errorf("decode outcome: %w", err) + } + + return o, nil } diff --git a/commit/validate_observation.go b/commit/validate_observation.go index a570f0259..28bfa6c73 100644 --- a/commit/validate_observation.go +++ b/commit/validate_observation.go @@ -27,12 +27,16 @@ func (p *Plugin) ValidateObservation( return fmt.Errorf("failed to decode commit plugin observation: %w", err) } - prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome) + prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome) + if err != nil { + return fmt.Errorf("decode previous outcome: %w", err) + } + if err := validateFChain(obs.FChain); err != nil { return fmt.Errorf("failed to validate FChain: %w", err) } - merkleObs := merkleRootObservation{ + merkleObs := attributedMerkleRootObservation{ OracleID: ao.Observer, Observation: obs.MerkleRootObs, } @@ -42,7 +46,7 @@ func (p *Plugin) ValidateObservation( return fmt.Errorf("validate merkle roots observation: %w", err) } - tokenObs := tokenPricesObservation{ + tokenObs := attributedTokenPricesObservation{ OracleID: ao.Observer, Observation: obs.TokenPriceObs, } @@ -51,7 +55,7 @@ func (p *Plugin) ValidateObservation( return fmt.Errorf("validate token prices observation: %w", err) } - gasObs := chainFeeObservation{ + gasObs := attributedChainFeeObservation{ OracleID: ao.Observer, Observation: obs.ChainFeeObs, }