Skip to content

Commit

Permalink
various improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkouv committed Nov 5, 2024
1 parent 2cded0f commit 62b5bc6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 103 deletions.
142 changes: 63 additions & 79 deletions commit/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

type merkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation]
type tokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation]
type chainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation]
type attributedMerkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation]
type attributedTokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation]
type attributedChainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation]

type Plugin struct {
donID plugintypes.DonID
Expand Down Expand Up @@ -171,7 +171,10 @@ func (p *Plugin) Query(ctx context.Context, outCtx ocr3types.OutcomeContext) (ty
var err error
var q Query

prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome)
prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome)
if err != nil {
return nil, fmt.Errorf("decode previous outcome: %w", err)
}

q.MerkleRootQuery, err = p.merkleRootProcessor.Query(ctx, prevOutcome.MerkleRootOutcome)
if err != nil {
Expand All @@ -196,21 +199,17 @@ func (p *Plugin) ObservationQuorum(
) (bool, error) {
// Across all chains we require at least 2F+1 observations.
return quorumhelper.ObservationCountReachesObservationQuorum(
quorumhelper.QuorumTwoFPlusOne, p.reportingCfg.N, p.reportingCfg.F, aos), nil
quorumhelper.QuorumTwoFPlusOne,
p.reportingCfg.N,
p.reportingCfg.F,
aos,
), nil
}

func (p *Plugin) Observation(
ctx context.Context, outCtx ocr3types.OutcomeContext, q types.Query,
) (types.Observation, error) {
prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome)
fChain := p.ObserveFChain()

decodedQ, err := DecodeCommitPluginQuery(q)
if err != nil {
return nil, fmt.Errorf("decode query: %w", err)
}

// If the contracts are not initialized then only submit contracts discovery related observations.
// If the contracts are not initialized then only submit contracts discovery related observation.
if !p.contractsInitialized.Load() && p.discoveryProcessor != nil {
discoveryObs, err := p.discoveryProcessor.Observation(ctx, dt.Outcome{}, dt.Query{})
if err != nil {
Expand All @@ -229,30 +228,44 @@ func (p *Plugin) Observation(
return encoded, nil
}

prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome)
if err != nil {
return nil, fmt.Errorf("decode previous outcome: %w", err)
}

decodedQ, err := DecodeCommitPluginQuery(q)
if err != nil {
return nil, fmt.Errorf("decode query: %w", err)
}

merkleRootObs, err := p.merkleRootProcessor.Observation(ctx, prevOutcome.MerkleRootOutcome, decodedQ.MerkleRootQuery)
if err != nil {
p.lggr.Errorw("failed to get merkle observation", "err", err)
p.lggr.Errorw("get merkle root processor observation",
"err", err, "prevOutcome", prevOutcome.MerkleRootOutcome, "decodedQ", decodedQ.MerkleRootQuery)
}

tokenPriceObs, err := p.tokenPriceProcessor.Observation(ctx, prevOutcome.TokenPriceOutcome, decodedQ.TokenPriceQuery)
if err != nil {
p.lggr.Errorw("failed to get token prices", "err", err)
p.lggr.Errorw("get token price processor observation", "err", err,
"prevOutcome", prevOutcome.TokenPriceOutcome, "decodedQ", decodedQ.TokenPriceQuery)
}

chainFeeObs, err := p.chainFeeProcessor.Observation(ctx, prevOutcome.ChainFeeOutcome, decodedQ.ChainFeeQuery)
if err != nil {
p.lggr.Errorw("failed to get gas prices", "err", err)
p.lggr.Errorw("get gas prices processor observation",
"err", err, "prevOutcome", prevOutcome.ChainFeeOutcome, "decodedQ", decodedQ.ChainFeeQuery)
}

obs := Observation{
MerkleRootObs: merkleRootObs,
TokenPriceObs: tokenPriceObs,
ChainFeeObs: chainFeeObs,
FChain: fChain,
FChain: p.ObserveFChain(),
}

encoded, err := obs.Encode()
if err != nil {
return nil, fmt.Errorf("failed to encode observation: %w, observation: %+v", err, obs)
return nil, fmt.Errorf("encode observation: %w, observation: %+v", err, obs)
}

p.lggr.Debugw("Commit plugin making observation", "encodedObservation", encoded, "observation", obs)
Expand All @@ -262,77 +275,62 @@ func (p *Plugin) Observation(
func (p *Plugin) ObserveFChain() map[cciptypes.ChainSelector]int {
fChain, err := p.homeChain.GetFChain()
if err != nil {
// TODO: metrics
p.lggr.Errorw("call to GetFChain failed", "err", err)
return map[cciptypes.ChainSelector]int{}
}
return fChain
}

// Outcome depending on the current state, either:
// - chooses the seq num ranges for the next round
// - builds a report
// - checks for the transmission of a previous report
func (p *Plugin) Outcome(
ctx context.Context, outCtx ocr3types.OutcomeContext, q types.Query, aos []types.AttributedObservation,
) (ocr3types.Outcome, error) {
p.lggr.Debugw("Commit plugin performing outcome",
"outctx", outCtx,
"query", q,
"attributedObservations", aos)
p.lggr.Debugw("performing outcome", "outctx", outCtx, "query", q, "attributedObservations", aos)

prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome)
prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome)
if err != nil {
return nil, fmt.Errorf("decode previous outcome: %w", err)
}

decodedQ, err := DecodeCommitPluginQuery(q)
if err != nil {
return nil, fmt.Errorf("decode query: %w", err)
}

var merkleObservations []merkleRootObservation
var tokensObservations []tokenPricesObservation
var feeObservations []chainFeeObservation
var discoveryObservations []plugincommon.AttributedObservation[dt.Observation]
merkleRootObservations := make([]attributedMerkleRootObservation, 0, len(aos))
tokenPricesObservations := make([]attributedTokenPricesObservation, 0, len(aos))
chainFeeObservations := make([]attributedChainFeeObservation, 0, len(aos))
discoveryObservations := make([]plugincommon.AttributedObservation[dt.Observation], 0, len(aos))

for _, ao := range aos {
obs, err := DecodeCommitPluginObservation(ao.Observation)
if err != nil {
p.lggr.Errorw("failed to decode observation", "err", err)
p.lggr.Warnw("failed to decode observation, observation skipped", "err", err)
continue
}

p.lggr.Debugw("Commit plugin outcome decoded observation", "observation", obs)
merkleObservations = append(merkleObservations,
merkleRootObservation{
OracleID: ao.Observer,
Observation: obs.MerkleRootObs,
},
)

tokensObservations = append(tokensObservations,
tokenPricesObservation{
OracleID: ao.Observer,
Observation: obs.TokenPriceObs,
},
)

feeObservations = append(feeObservations,
chainFeeObservation{
OracleID: ao.Observer,
Observation: obs.ChainFeeObs,
},
)

discoveryObservations = append(discoveryObservations,
plugincommon.AttributedObservation[dt.Observation]{
OracleID: ao.Observer,
Observation: obs.DiscoveryObs,
})

merkleRootObservations = append(merkleRootObservations, attributedMerkleRootObservation{
OracleID: ao.Observer, Observation: obs.MerkleRootObs})

tokenPricesObservations = append(tokenPricesObservations, attributedTokenPricesObservation{
OracleID: ao.Observer, Observation: obs.TokenPriceObs})

chainFeeObservations = append(chainFeeObservations, attributedChainFeeObservation{
OracleID: ao.Observer, Observation: obs.ChainFeeObs})

discoveryObservations = append(discoveryObservations, plugincommon.AttributedObservation[dt.Observation]{
OracleID: ao.Observer, Observation: obs.DiscoveryObs})
}

if p.discoveryProcessor != nil {
p.lggr.Infow("Processing discovery observations", "discoveryObservations", discoveryObservations)

// The outcome phase of the discovery processor is binding contracts to the chain reader. This is the reason
// we ignore the outcome of the discovery processor.
_, err = p.discoveryProcessor.Outcome(ctx, dt.Outcome{}, dt.Query{}, discoveryObservations)
if err != nil {
return nil, fmt.Errorf("unable to process outcome of discovery processor: %w", err)
return nil, fmt.Errorf("discovery processor outcome: %w", err)
}
p.contractsInitialized.Store(true)
}
Expand All @@ -341,17 +339,17 @@ func (p *Plugin) Outcome(
ctx,
prevOutcome.MerkleRootOutcome,
decodedQ.MerkleRootQuery,
merkleObservations,
merkleRootObservations,
)
if err != nil {
p.lggr.Errorw("failed to get merkle outcome", "err", err)
p.lggr.Errorw(" get merkle roots outcome", "err", err)
}

tokenPriceOutcome, err := p.tokenPriceProcessor.Outcome(
ctx,
prevOutcome.TokenPriceOutcome,
decodedQ.TokenPriceQuery,
tokensObservations,
tokenPricesObservations,
)
if err != nil {
p.lggr.Warnw("failed to get token prices outcome", "err", err)
Expand All @@ -361,7 +359,7 @@ func (p *Plugin) Outcome(
ctx,
prevOutcome.ChainFeeOutcome,
decodedQ.ChainFeeQuery,
feeObservations,
chainFeeObservations,
)
if err != nil {
p.lggr.Warnw("failed to get gas prices outcome", "err", err)
Expand All @@ -383,20 +381,6 @@ func (p *Plugin) Close() error {
)
}

func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) Outcome {
if len(outcome) == 0 {
return Outcome{}
}

decodedOutcome, err := DecodeOutcome(outcome)
if err != nil {
p.lggr.Errorw("Failed to decode Outcome", "outcome", outcome, "err", err)
return Outcome{}
}

return decodedOutcome
}

// Assuming that we have to delegate a specific amount of time to the observation requests and the report requests.
// We define some percentages in order to help us calculate the time we have to delegate to each request timer.
const (
Expand Down
6 changes: 3 additions & 3 deletions commit/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestPlugin_E2E_AllNodesAgree_MerkleRoots(t *testing.T) {
res, err := runner.RunRound(params.ctx)
assert.NoError(t, err)

decodedOutcome, err := DecodeOutcome(res.Outcome)
decodedOutcome, err := decodeOutcome(res.Outcome)
assert.NoError(t, err)
assert.Equal(t, normalizeOutcome(tc.expOutcome), normalizeOutcome(decodedOutcome))

Expand Down Expand Up @@ -387,7 +387,7 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) {
res, err := runner.RunRound(params.ctx)
assert.NoError(t, err)

decodedOutcome, err := DecodeOutcome(res.Outcome)
decodedOutcome, err := decodeOutcome(res.Outcome)
assert.NoError(t, err)
assert.Equal(t, normalizeOutcome(tc.expOutcome), normalizeOutcome(decodedOutcome))

Expand Down Expand Up @@ -614,7 +614,7 @@ func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) {
res, err := runner.RunRound(params.ctx)
assert.NoError(t, err)

decodedOutcome, err := DecodeOutcome(res.Outcome)
decodedOutcome, err := decodeOutcome(res.Outcome)
assert.NoError(t, err)
assert.Equal(t, normalizeOutcome(tc.expOutcome), normalizeOutcome(decodedOutcome))

Expand Down
20 changes: 6 additions & 14 deletions commit/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package commit

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"

Expand All @@ -25,19 +24,18 @@ func (ri ReportInfo) Encode() ([]byte, error) {
return json.Marshal(ri)
}

// decode should be used to decode the report info
// Decode should be used to decode the report info
func (ri *ReportInfo) Decode(encodedReportInfo []byte) error {
return json.Unmarshal(encodedReportInfo, ri)
}

func (p *Plugin) Reports(
ctx context.Context, seqNr uint64, outcomeBytes ocr3types.Outcome,
) ([]ocr3types.ReportPlus[[]byte], error) {
outcome, err := DecodeOutcome(outcomeBytes)
outcome, err := decodeOutcome(outcomeBytes)
if err != nil {
// TODO: metrics
p.lggr.Errorw("failed to decode Outcome", "outcomeBytes", outcomeBytes, "err", err)
return nil, fmt.Errorf("failed to decode Outcome (%s): %w", hex.EncodeToString(outcomeBytes), err)
p.lggr.Errorw("failed to decode Outcome", "outcome", string(outcomeBytes), "err", err)
return nil, fmt.Errorf("decode outcome: %w", err)
}

// Gas prices and token prices do not need to get reported when merkle roots do not exist.
Expand Down Expand Up @@ -73,13 +71,7 @@ func (p *Plugin) Reports(
return nil, fmt.Errorf("encode commit plugin report: %w", err)
}

// Prepare the info data
reportInfo := ReportInfo{
RemoteF: outcome.MerkleRootOutcome.RMNRemoteCfg.F,
}

// Serialize reportInfo to []byte
infoBytes, err := reportInfo.Encode()
reportInfo, err := ReportInfo{RemoteF: outcome.MerkleRootOutcome.RMNRemoteCfg.F}.Encode()
if err != nil {
return nil, fmt.Errorf("encode report info: %w", err)
}
Expand All @@ -88,7 +80,7 @@ func (p *Plugin) Reports(
{
ReportWithInfo: ocr3types.ReportWithInfo[[]byte]{
Report: encodedReport,
Info: infoBytes,
Info: reportInfo,
},
},
}, nil
Expand Down
13 changes: 10 additions & 3 deletions commit/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,15 @@ func (o Outcome) Encode() ([]byte, error) {
return encodedOutcome, nil
}

func DecodeOutcome(b []byte) (Outcome, error) {
func decodeOutcome(b []byte) (Outcome, error) {
if len(b) == 0 {
return Outcome{}, nil
}

o := Outcome{}
err := json.Unmarshal(b, &o)
return o, err
if err := json.Unmarshal(b, &o); err != nil {
return Outcome{}, fmt.Errorf("decode outcome: %w", err)
}

return o, nil
}
12 changes: 8 additions & 4 deletions commit/validate_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ func (p *Plugin) ValidateObservation(
return fmt.Errorf("failed to decode commit plugin observation: %w", err)
}

prevOutcome := p.decodeOutcome(outCtx.PreviousOutcome)
prevOutcome, err := decodeOutcome(outCtx.PreviousOutcome)
if err != nil {
return fmt.Errorf("decode previous outcome: %w", err)
}

if err := validateFChain(obs.FChain); err != nil {
return fmt.Errorf("failed to validate FChain: %w", err)
}

merkleObs := merkleRootObservation{
merkleObs := attributedMerkleRootObservation{
OracleID: ao.Observer,
Observation: obs.MerkleRootObs,
}
Expand All @@ -42,7 +46,7 @@ func (p *Plugin) ValidateObservation(
return fmt.Errorf("validate merkle roots observation: %w", err)
}

tokenObs := tokenPricesObservation{
tokenObs := attributedTokenPricesObservation{
OracleID: ao.Observer,
Observation: obs.TokenPriceObs,
}
Expand All @@ -51,7 +55,7 @@ func (p *Plugin) ValidateObservation(
return fmt.Errorf("validate token prices observation: %w", err)
}

gasObs := chainFeeObservation{
gasObs := attributedChainFeeObservation{
OracleID: ao.Observer,
Observation: obs.ChainFeeObs,
}
Expand Down

0 comments on commit 62b5bc6

Please sign in to comment.