Skip to content

Commit

Permalink
Bringing back prom observability after versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Oct 13, 2023
1 parent 384ba66 commit ee6b97c
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 263 deletions.
10 changes: 8 additions & 2 deletions core/services/ocr2/plugins/ccip/commit_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/contractutil"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/oraclelib"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/promwrapper"
Expand Down Expand Up @@ -55,7 +56,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
if err != nil {
return nil, nil, errors.Wrap(err, "get chainset")
}
commitStore, _, err := contractutil.LoadCommitStore(common.HexToAddress(spec.ContractID), CommitPluginLabel, destChain.Client())
commitStore, _, err := contractutil.LoadCommitStore(common.HexToAddress(spec.ContractID), destChain.Client())
if err != nil {
return nil, nil, errors.Wrap(err, "failed loading commitStore")
}
Expand Down Expand Up @@ -92,7 +93,6 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
if err != nil {
return nil, nil, errors.Wrap(err, "failed commit reader")
}

onRampRouterAddr, err := onRampReader.RouterAddress()
if err != nil {
return nil, nil, err
Expand All @@ -105,6 +105,12 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
if err != nil {
return nil, nil, err
}

// Prom wrappers
onRampReader = observability.NewObservedOnRampReader(onRampReader, chainId, CommitPluginLabel)
offRampReader = observability.NewObservedOffRampReader(offRampReader, chainId, CommitPluginLabel)
commitStoreReader = observability.NewObservedCommitStoreReader(commitStoreReader, chainId, CommitPluginLabel)

lggr.Infow("NewCommitServices",
"pluginConfig", pluginConfig,
"staticConfig", staticConfig,
Expand Down
2 changes: 2 additions & 0 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices"
"github.com/smartcontractkit/chainlink/v2/core/utils/mathutil"
Expand Down Expand Up @@ -122,6 +123,7 @@ func (rf *CommitReportingPluginFactory) UpdateDynamicReaders(newPriceRegAddr com
if err != nil {
return err
}
destPriceRegistryReader = observability.NewPriceRegistryReader(destPriceRegistryReader, rf.config.destChainEVMID.Uint64(), CommitPluginLabel)
rf.destPriceRegReader = destPriceRegistryReader
rf.destPriceRegAddr = newPriceRegAddr
return nil
Expand Down
12 changes: 10 additions & 2 deletions core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/contractutil"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/oraclelib"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc"
Expand Down Expand Up @@ -52,7 +53,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
if err != nil {
return nil, nil, errors.Wrap(err, "get chainset")
}
offRamp, _, err := contractutil.LoadOffRamp(common.HexToAddress(spec.ContractID), ExecPluginLabel, destChain.Client())
offRamp, _, err := contractutil.LoadOffRamp(common.HexToAddress(spec.ContractID), destChain.Client())
if err != nil {
return nil, nil, errors.Wrap(err, "failed loading offRamp")
}
Expand All @@ -68,7 +69,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
if err != nil {
return nil, nil, errors.Wrap(err, "unable to open source chain")
}
onRamp, onRampVersion, err := contractutil.LoadOnRamp(offRampConfig.OnRamp, ExecPluginLabel, sourceChain.Client())
onRamp, onRampVersion, err := contractutil.LoadOnRamp(offRampConfig.OnRamp, sourceChain.Client())
if err != nil {
return nil, nil, errors.Wrap(err, "failed loading onRamp")
}
Expand Down Expand Up @@ -109,6 +110,13 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
if err != nil {
return nil, nil, errors.Wrap(err, "could not get token data providers")
}

// Prom wrappers
commitStoreReader = observability.NewObservedCommitStoreReader(commitStoreReader, chainId, ExecPluginLabel)
onRampReader = observability.NewObservedOnRampReader(onRampReader, chainId, ExecPluginLabel)
offRampReader = observability.NewObservedOffRampReader(offRampReader, chainId, ExecPluginLabel)
sourcePriceRegistry = observability.NewPriceRegistryReader(sourcePriceRegistry, chainId, ExecPluginLabel)

execLggr.Infow("Initialized exec plugin",
"pluginConfig", pluginConfig,
"onRampAddress", onRamp.Address(),
Expand Down
2 changes: 2 additions & 0 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/contractutil"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/hashlib"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
)
Expand Down Expand Up @@ -117,6 +118,7 @@ func (rf *ExecutionReportingPluginFactory) UpdateDynamicReaders(newPriceRegAddr
if err != nil {
return err
}
destPriceRegistryReader = observability.NewPriceRegistryReader(destPriceRegistryReader, rf.config.destChainEVMID.Uint64(), ExecPluginLabel)
rf.destPriceRegReader = destPriceRegistryReader
rf.destPriceRegAddr = newPriceRegAddr
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/logpollerutil"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

Expand All @@ -26,7 +25,7 @@ var (
const ExecPluginLabel = "exec"

type PriceRegistryV1_0_0 struct {
priceRegistry *observability.ObservedPriceRegistryV1_0_0
priceRegistry price_registry.PriceRegistryInterface
address common.Address
lp logpoller.LogPoller
lggr logger.Logger
Expand Down Expand Up @@ -136,7 +135,7 @@ func (p *PriceRegistryV1_0_0) GetGasPriceUpdatesCreatedAfter(ctx context.Context

func NewPriceRegistryV1_0_0(lggr logger.Logger, priceRegistryAddr common.Address, lp logpoller.LogPoller, ec client.Client) (*PriceRegistryV1_0_0, error) {
// TODO pass label
priceRegistry, err := observability.NewObservedPriceRegistryV1_0_0(priceRegistryAddr, ExecPluginLabel, ec)
priceRegistry, err := price_registry.NewPriceRegistry(priceRegistryAddr, ec)
if err != nil {
return nil, err
}
Expand Down
13 changes: 6 additions & 7 deletions core/services/ocr2/plugins/ccip/internal/contractutil/loaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_onramp_1_0_0"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_onramp_1_1_0"
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/observability"
)

func LoadOnRamp(onRampAddress common.Address, pluginName string, client client.Client) (evm_2_evm_onramp.EVM2EVMOnRampInterface, semver.Version, error) {
func LoadOnRamp(onRampAddress common.Address, client client.Client) (evm_2_evm_onramp.EVM2EVMOnRampInterface, semver.Version, error) {
version, err := ccipconfig.VerifyTypeAndVersion(onRampAddress, client, ccipconfig.EVM2EVMOnRamp)
if err != nil {
return nil, semver.Version{}, errors.Wrap(err, "Invalid onRamp contract")
}

onRamp, err := observability.NewObservedEvm2EvmOnRamp(onRampAddress, pluginName, client)
onRamp, err := evm_2_evm_onramp.NewEVM2EVMOnRamp(onRampAddress, client)
return onRamp, version, err
}

Expand Down Expand Up @@ -79,22 +78,22 @@ func LoadOnRampDynamicConfig(onRamp evm_2_evm_onramp.EVM2EVMOnRampInterface, ver
}
}

func LoadOffRamp(offRampAddress common.Address, pluginName string, client client.Client) (evm_2_evm_offramp.EVM2EVMOffRampInterface, semver.Version, error) {
func LoadOffRamp(offRampAddress common.Address, client client.Client) (evm_2_evm_offramp.EVM2EVMOffRampInterface, semver.Version, error) {
version, err := ccipconfig.VerifyTypeAndVersion(offRampAddress, client, ccipconfig.EVM2EVMOffRamp)
if err != nil {
return nil, semver.Version{}, errors.Wrap(err, "Invalid offRamp contract")
}

offRamp, err := observability.NewObservedEvm2EvmOffRamp(offRampAddress, pluginName, client)
offRamp, err := evm_2_evm_offramp.NewEVM2EVMOffRamp(offRampAddress, client)
return offRamp, version, err
}

func LoadCommitStore(commitStoreAddress common.Address, pluginName string, client client.Client) (commit_store.CommitStoreInterface, semver.Version, error) {
func LoadCommitStore(commitStoreAddress common.Address, client client.Client) (commit_store.CommitStoreInterface, semver.Version, error) {
version, err := ccipconfig.VerifyTypeAndVersion(commitStoreAddress, client, ccipconfig.CommitStore)
if err != nil {
return nil, semver.Version{}, errors.Wrap(err, "Invalid commitStore contract")
}

commitStore, err := observability.NewObservedCommitStore(commitStoreAddress, pluginName, client)
commitStore, err := commit_store.NewCommitStore(commitStoreAddress, client)
return commitStore, version, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package observability

import (
"time"

"golang.org/x/net/context"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
)

type ObservedCommitStoreReader struct {
ccipdata.CommitStoreReader
metric metricDetails
}

func NewObservedCommitStoreReader(origin ccipdata.CommitStoreReader, chainID uint64, pluginName string) *ObservedCommitStoreReader {
return &ObservedCommitStoreReader{
CommitStoreReader: origin,
metric: metricDetails{
histogram: commitStoreHistogram,
pluginName: pluginName,
chainId: chainID,
},
}
}

func (o *ObservedCommitStoreReader) GetExpectedNextSequenceNumber(context context.Context) (uint64, error) {
return withObservedContract(o.metric, "GetExpectedNextSequenceNumber", func() (uint64, error) {
return o.CommitStoreReader.GetExpectedNextSequenceNumber(context)
})
}

func (o *ObservedCommitStoreReader) GetLatestPriceEpochAndRound(context context.Context) (uint64, error) {
return withObservedContract(o.metric, "GetLatestPriceEpochAndRound", func() (uint64, error) {
return o.CommitStoreReader.GetLatestPriceEpochAndRound(context)
})
}

func (o *ObservedCommitStoreReader) GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) {
return withObservedContract(o.metric, "GetAcceptedCommitReportsGteSeqNum", func() ([]ccipdata.Event[ccipdata.CommitStoreReport], error) {
return o.CommitStoreReader.GetAcceptedCommitReportsGteSeqNum(ctx, seqNum, confs)
})
}

func (o *ObservedCommitStoreReader) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]ccipdata.Event[ccipdata.CommitStoreReport], error) {
return withObservedContract(o.metric, "GetAcceptedCommitReportsGteTimestamp", func() ([]ccipdata.Event[ccipdata.CommitStoreReport], error) {
return o.CommitStoreReader.GetAcceptedCommitReportsGteTimestamp(ctx, ts, confs)
})
}

func (o *ObservedCommitStoreReader) IsDown(ctx context.Context) (bool, error) {
return withObservedContract(o.metric, "IsDown", func() (bool, error) {
return o.CommitStoreReader.IsDown(ctx)
})
}

func (o *ObservedCommitStoreReader) IsBlessed(ctx context.Context, root [32]byte) (bool, error) {
return withObservedContract(o.metric, "IsBlessed", func() (bool, error) {
return o.CommitStoreReader.IsBlessed(ctx, root)
})
}

func (o *ObservedCommitStoreReader) EncodeCommitReport(report ccipdata.CommitStoreReport) ([]byte, error) {
return withObservedContract(o.metric, "EncodeCommitReport", func() ([]byte, error) {
return o.CommitStoreReader.EncodeCommitReport(report)
})
}

func (o *ObservedCommitStoreReader) DecodeCommitReport(report []byte) (ccipdata.CommitStoreReport, error) {
return withObservedContract(o.metric, "DecodeCommitReport", func() (ccipdata.CommitStoreReport, error) {
return o.CommitStoreReader.DecodeCommitReport(report)
})
}

func (o *ObservedCommitStoreReader) VerifyExecutionReport(ctx context.Context, report ccipdata.ExecReport) (bool, error) {
return withObservedContract(o.metric, "VerifyExecutionReport", func() (bool, error) {
return o.CommitStoreReader.VerifyExecutionReport(ctx, report)
})
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package observability

import (
"math/big"
"strconv"
"time"

Expand All @@ -20,42 +19,43 @@ var (
float64(500 * time.Millisecond),
float64(750 * time.Millisecond),
float64(1 * time.Second),
float64(2 * time.Second),
}
labels = []string{"evmChainID", "plugin", "function", "success"}
priceRegistryHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_price_registry_contract_rpc_duration",
Help: "Duration of RPC calls to the Price Registry contract",
Name: "ccip_price_registry_contract_duration",
Help: "Duration of calls to the Price Registry reader",
Buckets: latencyBuckets,
}, labels)
commitStoreHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_commit_store_contract_rpc_duration",
Help: "Duration of RPC calls to the Commit Store contract",
Name: "ccip_commit_store_contract_duration",
Help: "Duration of calls to the Commit Store reader",
Buckets: latencyBuckets,
}, labels)
evm2evmOnRampHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_evm2evm_onramp_contract_rpc_duration",
Help: "Duration of RPC calls to the EVM2EVMOnRamp contract",
onRampHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_onramp_contract_duration",
Help: "Duration of calls to the OnRamp reader",
Buckets: latencyBuckets,
}, labels)
evm2evmOffRampHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_evm2evm_offramp_contract_rpc_duration",
Help: "Duration of RPC calls to the EVM2EVMOffRamp contract",
offRampHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_offramp_contract_duration",
Help: "Duration of calls to the OffRamp contract",
Buckets: latencyBuckets,
}, labels)
)

type metricDetails struct {
histogram *prometheus.HistogramVec
pluginName string
chainId *big.Int
chainId uint64
}

func withObservedContract[T any](metric metricDetails, function string, contract func() (T, error)) (T, error) {
contractExecutionStarted := time.Now()
value, err := contract()
metric.histogram.
WithLabelValues(
metric.chainId.String(),
strconv.FormatUint(metric.chainId, 10),
metric.pluginName,
function,
strconv.FormatBool(err == nil),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,27 @@ package observability

import (
"fmt"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
)

func TestProperLabelsArePassed(t *testing.T) {
histogram := evm2evmOffRampHistogram
histogram := offRampHistogram
successCounter := 10
failedCounter := 5

details := metricDetails{
histogram: histogram,
pluginName: "plugin",
chainId: big.NewInt(123),
chainId: 123,
}

for i := 0; i < successCounter; i++ {
Expand All @@ -47,22 +44,24 @@ func TestProperLabelsArePassed(t *testing.T) {

func TestMetricsSendFromContractDirectly(t *testing.T) {
expectedCounter := 4
evmClient := mocks.NewClient(t)
evmClient.On("ConfiguredChainID").Return(big.NewInt(420), nil)
evmClient.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return([]byte{}, fmt.Errorf("error"))
ctx := testutils.Context(t)
chainId := uint64(420)

ramp, err := NewObservedEvm2EvmOffRamp(common.HexToAddress("0xa"), "plugin", evmClient)
require.NoError(t, err)
mockedOfframp := ccipdata.NewMockOffRampReader(t)
mockedOfframp.On("GetSupportedTokens", ctx).Return([]common.Address{}, nil)
mockedOfframp.On("GetDestinationTokens", ctx).Return(nil, fmt.Errorf("execution error"))

observedOfframp := NewObservedOffRampReader(mockedOfframp, chainId, "plugin")

for i := 0; i < expectedCounter; i++ {
_, _ = ramp.GetSupportedTokens(&bind.CallOpts{Context: testutils.Context(t)})
_, _ = ramp.CurrentRateLimiterState(&bind.CallOpts{Context: testutils.Context(t)})
_, _ = observedOfframp.GetSupportedTokens(ctx)
_, _ = observedOfframp.GetDestinationTokens(ctx)
}

assert.Equal(t, expectedCounter, counterFromHistogramByLabels(t, ramp.metric.histogram, "420", "plugin", "GetSupportedTokens", "false"))
assert.Equal(t, expectedCounter, counterFromHistogramByLabels(t, ramp.metric.histogram, "420", "plugin", "CurrentRateLimiterState", "false"))
assert.Equal(t, 0, counterFromHistogramByLabels(t, ramp.metric.histogram, "420", "plugin", "GetDestinationTokens", "false"))
assert.Equal(t, 0, counterFromHistogramByLabels(t, ramp.metric.histogram, "420", "plugin", "GetDestinationTokens", "true"))
assert.Equal(t, expectedCounter, counterFromHistogramByLabels(t, observedOfframp.metric.histogram, "420", "plugin", "GetSupportedTokens", "true"))
assert.Equal(t, expectedCounter, counterFromHistogramByLabels(t, observedOfframp.metric.histogram, "420", "plugin", "GetDestinationTokens", "false"))
assert.Equal(t, 0, counterFromHistogramByLabels(t, observedOfframp.metric.histogram, "420", "plugin", "GetPoolByDestToken", "false"))
assert.Equal(t, 0, counterFromHistogramByLabels(t, observedOfframp.metric.histogram, "420", "plugin", "GetPoolByDestToken", "true"))
}

func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int {
Expand Down
Loading

0 comments on commit ee6b97c

Please sign in to comment.