Skip to content

Commit

Permalink
PluginProcessors [CCIP-3147] (#98)
Browse files Browse the repository at this point in the history
Add SubPlugin interface to create multiple SubPlugins/Processors under OCR plugin.

The idea is to separate the logic of different types of observations and outcomes into separate processors.
This makes it easier to manage and test the logic of each type of observation/outcome without affecting each others.
Some of them will implement state machines (e.g. merkleroot), others might implement simpler logic. (e.g. token)
Also makes running them in parallel more streamlined.
The OCR plugin becomes a coordinator/collector of these SubPlugins.

Example Pseudo code:
```
	OCRPlugin {
	  nodeID
	  merkleProcessor
	  tokenProcessor
	  feeProcessor
	  ...
	}

	OCRPlugin.Observer {
	  mObs := merkleProcessor.Observer
	  tObs := tokenProcessor.Observer
	  fObs := feeProcessor.Observer
	  return Observation{mObs, tObs, fObs}
	}
	OCRPlugin.Validate {
	  mObs := merkleProcessor.Validate
	  tObs := tokenProcessor.Validate
	  fObs := feeProcessor.Validate
	  check errors for each
	  return nil
	}
	OCRPlugin.Outcome {
	  mOut := merkleProcessor.Outcome
	  tOut := tokenProcessor.Outcome
	  fOut := feeProcessor.Outcome
	  return Outcome{mOut, tOut, fOut}
	}

	OCRPlugin.Report {
		return Report{mOut.X, tOut.Y, fOut.Z}
	}
```

Notice all SubPlugin interface functions are using `prevOutcome` instead of `outCtx`.
We're interested in the prevOutcome, and it makes it easier to have all decoding on the top level (OCR plugin),
otherwise there might be cyclic dependencies or just complicating the code more.
  • Loading branch information
asoliman92 authored Sep 5, 2024
1 parent 70b5719 commit f8a43b7
Show file tree
Hide file tree
Showing 33 changed files with 1,694 additions and 1,083 deletions.
5 changes: 4 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ packages:
github.com/smartcontractkit/chainlink-ccip/execute/internal/gen:
interfaces:
ExecutePluginCodec:
github.com/smartcontractkit/chainlink-ccip/commit:
github.com/smartcontractkit/chainlink-ccip/commit/merkleroot:
interfaces:
Observer:
github.com/smartcontractkit/chainlink-ccip/shared:
interfaces:
PluginProcessor:
ChainSupport:
github.com/smartcontractkit/chainlink-ccip/internal/reader:
interfaces:
Expand Down
65 changes: 65 additions & 0 deletions commit/chainfee/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package chainfee

import (
"context"
"fmt"

mapset "github.com/deckarep/golang-set/v2"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/shared"
)

type Processor struct {
}

func NewProcessor() *Processor {
return &Processor{}
}

func (w *Processor) Query(ctx context.Context, prevOutcome Outcome) (Query, error) {
return Query{}, nil
}

func (w *Processor) Observation(
ctx context.Context,
prevOutcome Outcome,
query Query,
) (Observation, error) {
return Observation{}, nil
}

func (w *Processor) Outcome(
prevOutcome Outcome,
query Query,
aos []shared.AttributedObservation[Observation],
) (Outcome, error) {
return Outcome{}, nil
}

func (w *Processor) ValidateObservation(
prevOutcome Outcome,
query Query,
ao shared.AttributedObservation[Observation],
) error {
//TODO: Validate token prices
return nil
}

func validateObservedGasPrices(gasPrices []cciptypes.GasPriceChain) error {
// Duplicate gas prices must not appear for the same chain and must not be empty.
gasPriceChains := mapset.NewSet[cciptypes.ChainSelector]()
for _, g := range gasPrices {
if gasPriceChains.Contains(g.ChainSel) {
return fmt.Errorf("duplicate gas price for chain %d", g.ChainSel)
}
gasPriceChains.Add(g.ChainSel)
if g.GasPrice.IsEmpty() {
return fmt.Errorf("gas price must not be empty")
}
}

return nil
}

var _ shared.PluginProcessor[Query, Observation, Outcome] = &Processor{}
16 changes: 16 additions & 0 deletions commit/chainfee/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package chainfee

import (
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
)

type Query struct {
}

type Outcome struct {
GasPrices []cciptypes.GasPriceChain `json:"gasPrices"`
}

type Observation struct {
GasPrices []cciptypes.GasPriceChain `json:"gasPrices"`
}
62 changes: 62 additions & 0 deletions commit/chainfee/validate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package chainfee

import (
"math/big"
"testing"

"github.com/stretchr/testify/assert"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
)

func Test_validateObservedGasPrices(t *testing.T) {
testCases := []struct {
name string
gasPrices []cciptypes.GasPriceChain
expErr bool
}{
{
name: "empty is valid",
gasPrices: []cciptypes.GasPriceChain{},
expErr: false,
},
{
name: "all valid",
gasPrices: []cciptypes.GasPriceChain{
cciptypes.NewGasPriceChain(big.NewInt(10), 1),
cciptypes.NewGasPriceChain(big.NewInt(20), 2),
cciptypes.NewGasPriceChain(big.NewInt(1312), 3),
},
expErr: false,
},
{
name: "duplicate gas price",
gasPrices: []cciptypes.GasPriceChain{
cciptypes.NewGasPriceChain(big.NewInt(10), 1),
cciptypes.NewGasPriceChain(big.NewInt(20), 2),
cciptypes.NewGasPriceChain(big.NewInt(1312), 1), // notice we already have a gas price for chain 1
},
expErr: true,
},
{
name: "empty gas price",
gasPrices: []cciptypes.GasPriceChain{
cciptypes.NewGasPriceChain(big.NewInt(10), 1),
cciptypes.NewGasPriceChain(big.NewInt(20), 2),
cciptypes.NewGasPriceChain(nil, 3), // nil
},
expErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := validateObservedGasPrices(tc.gasPrices)
if tc.expErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
})
}
}
70 changes: 31 additions & 39 deletions commit/observation.go → commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package commit
package merkleroot

import (
"context"
Expand All @@ -8,61 +8,68 @@ import (
"sync"
"time"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/hashutil"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/merklemulti"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-ccip/internal/reader"
"github.com/smartcontractkit/chainlink-ccip/plugintypes"
"github.com/smartcontractkit/chainlink-ccip/shared"

"github.com/smartcontractkit/chainlink-ccip/internal/reader"
)

func (p *Plugin) ObservationQuorum(_ ocr3types.OutcomeContext, _ types.Query) (ocr3types.Quorum, error) {
func (w *Processor) ObservationQuorum(_ ocr3types.OutcomeContext, _ types.Query) (ocr3types.Quorum, error) {
// Across all chains we require at least 2F+1 observations.
return ocr3types.QuorumTwoFPlusOne, nil
}

func (p *Plugin) Observation(
ctx context.Context, outCtx ocr3types.OutcomeContext, _ types.Query,
) (types.Observation, error) {
func (w *Processor) Query(ctx context.Context, prevOutcome Outcome) (Query, error) {
return Query{}, nil
}

func (w *Processor) Observation(
ctx context.Context,
prevOutcome Outcome,
_ Query,
) (Observation, error) {
tStart := time.Now()
observation, nextState := p.getObservation(ctx, outCtx)
p.lggr.Infow("Sending Observation",
observation, nextState := w.getObservation(ctx, prevOutcome)
w.lggr.Infow("Sending MerkleRootObs",
"observation", observation, "nextState", nextState, "observationDuration", time.Since(tStart))
return observation.Encode()
return observation, nil
}

func (p *Plugin) getObservation(ctx context.Context, outCtx ocr3types.OutcomeContext) (Observation, State) {
previousOutcome, nextState := p.decodeOutcome(outCtx.PreviousOutcome)
func (w *Processor) getObservation(ctx context.Context, previousOutcome Outcome) (Observation, State) {
nextState := previousOutcome.NextState()
switch nextState {
case SelectingRangesForReport:
offRampNextSeqNums := p.observer.ObserveOffRampNextSeqNums(ctx)
offRampNextSeqNums := w.observer.ObserveOffRampNextSeqNums(ctx)
return Observation{
// TODO: observe OnRamp max seq nums. The use of offRampNextSeqNums here effectively disables batching,
// e.g. the ranges selected for each chain will be [x, x] (e.g. [46, 46]), which means reports will only
// contain one message per chain. Querying the OnRamp contract requires changes to reader.CCIP, which will
// need to be done in a future change.
OnRampMaxSeqNums: offRampNextSeqNums,
OffRampNextSeqNums: offRampNextSeqNums,
FChain: p.observer.ObserveFChain(),
FChain: w.observer.ObserveFChain(),
}, nextState
case BuildingReport:
return Observation{
MerkleRoots: p.observer.ObserveMerkleRoots(ctx, previousOutcome.RangesSelectedForReport),
GasPrices: p.observer.ObserveGasPrices(ctx),
TokenPrices: p.observer.ObserveTokenPrices(ctx),
FChain: p.observer.ObserveFChain(),
MerkleRoots: w.observer.ObserveMerkleRoots(ctx, previousOutcome.RangesSelectedForReport),
FChain: w.observer.ObserveFChain(),
}, nextState
case WaitingForReportTransmission:
return Observation{
OffRampNextSeqNums: p.observer.ObserveOffRampNextSeqNums(ctx),
FChain: p.observer.ObserveFChain(),
OffRampNextSeqNums: w.observer.ObserveOffRampNextSeqNums(ctx),
FChain: w.observer.ObserveFChain(),
}, nextState
default:
p.lggr.Errorw("Unexpected state", "state", nextState)
w.lggr.Errorw("Unexpected state", "state", nextState)
return Observation{}, nextState
}
}
Expand All @@ -74,18 +81,14 @@ type Observer interface {
// ObserveMerkleRoots computes the merkle roots for the given sequence number ranges
ObserveMerkleRoots(ctx context.Context, ranges []plugintypes.ChainRange) []cciptypes.MerkleRootChain

ObserveTokenPrices(ctx context.Context) []cciptypes.TokenPrice

ObserveGasPrices(ctx context.Context) []cciptypes.GasPriceChain

ObserveFChain() map[cciptypes.ChainSelector]int
}

type ObserverImpl struct {
lggr logger.Logger
homeChain reader.HomeChain
nodeID commontypes.OracleID
chainSupport ChainSupport
chainSupport shared.ChainSupport
ccipReader reader.CCIP
msgHasher cciptypes.MessageHasher
}
Expand Down Expand Up @@ -215,14 +218,6 @@ func (o ObserverImpl) computeMerkleRoot(ctx context.Context, msgs []cciptypes.Me
return root, nil
}

func (o ObserverImpl) ObserveTokenPrices(ctx context.Context) []cciptypes.TokenPrice {
return []cciptypes.TokenPrice{}
}

func (o ObserverImpl) ObserveGasPrices(ctx context.Context) []cciptypes.GasPriceChain {
return []cciptypes.GasPriceChain{}
}

func (o ObserverImpl) ObserveFChain() map[cciptypes.ChainSelector]int {
fChain, err := o.homeChain.GetFChain()
if err != nil {
Expand All @@ -232,6 +227,3 @@ func (o ObserverImpl) ObserveFChain() map[cciptypes.ChainSelector]int {
}
return fChain
}

// Interface compliance check
var _ Observer = (*ObserverImpl)(nil)
Loading

0 comments on commit f8a43b7

Please sign in to comment.