Skip to content

Commit

Permalink
Include multiple blocks in observation (#11142)
Browse files Browse the repository at this point in the history
* Include multiple blocks in observation

* Bump chainlink-relay

* Take the trash out

* Add prom metrics

* Bump chainlink-relay => c28841d7cd41b14fa1207586a905c44f735e9517

* Address PR comments

* Update core/services/relay/evm/mercury/v1/data_source.go

Co-authored-by: martin-cll <[email protected]>

* Address PR feedback

* Fix test

---------

Co-authored-by: martin-cll <[email protected]>
  • Loading branch information
samsondav and martin-cll authored Nov 6, 2023
1 parent e623afd commit 28e9596
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 147 deletions.
15 changes: 15 additions & 0 deletions core/chains/evm/types/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,21 @@ func (h *Head) NextInt() *big.Int {
return new(big.Int).Add(h.ToInt(), big.NewInt(1))
}

// AsSlice returns a slice of heads up to length k
// len(heads) may be less than k if the available chain is not long enough
func (h *Head) AsSlice(k int) (heads []*Head) {
if k < 1 || h == nil {
return
}
heads = make([]*Head, 1)
heads[0] = h
for len(heads) < k && h.Parent != nil {
h = h.Parent
heads = append(heads, h)
}
return
}

func (h *Head) UnmarshalJSON(bs []byte) error {
type head struct {
Hash common.Hash `json:"hash"`
Expand Down
23 changes: 23 additions & 0 deletions core/chains/evm/types/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,29 @@ func TestHead_ChainLength(t *testing.T) {
assert.Equal(t, uint32(0), head2.ChainLength())
}

func TestHead_AsSlice(t *testing.T) {
h1 := &evmtypes.Head{
Number: 1,
}
h2 := &evmtypes.Head{
Number: 2,
Parent: h1,
}
h3 := &evmtypes.Head{
Number: 3,
Parent: h2,
}

assert.Len(t, (*evmtypes.Head)(nil).AsSlice(0), 0)
assert.Len(t, (*evmtypes.Head)(nil).AsSlice(1), 0)

assert.Len(t, h3.AsSlice(0), 0)
assert.Equal(t, []*evmtypes.Head{h3}, h3.AsSlice(1))
assert.Equal(t, []*evmtypes.Head{h3, h2}, h3.AsSlice(2))
assert.Equal(t, []*evmtypes.Head{h3, h2, h1}, h3.AsSlice(3))
assert.Equal(t, []*evmtypes.Head{h3, h2, h1}, h3.AsSlice(4))
}

func TestModels_HexToFunctionSelector(t *testing.T) {
t.Parallel()
fid := evmtypes.HexToFunctionSelector("0xb3f98adc")
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231101160906-7acebcc1b353 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231101203911-c686b4d48672 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231102162027-5fdce33763de // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1466,8 +1466,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231101160906-7acebcc1b353 h1:4iO3Ei1b/Lb0yprzclk93e1aQnYF92sIe+EJzMG87y4=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231101160906-7acebcc1b353/go.mod h1:hMhGr9ok3p4442keFtK6u6Ei9yWfG66fmDwsFi3aHcw=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231101203911-c686b4d48672 h1:59vz5H52EpwWE/64ZQpNCs7Gtnyi7/ytjyoGjlbKhBA=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231101203911-c686b4d48672/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231102162027-5fdce33763de h1:CeVpn5xEdmuEsYE8ss2b7bSq9h3BY4OPvpqXeYIPnHw=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231102162027-5fdce33763de/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
30 changes: 6 additions & 24 deletions core/services/relay/evm/mercury/mocks/chain_head_tracker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions core/services/relay/evm/mercury/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name ChainHeadTracker --output ../mocks/ --case=underscore
type ChainHeadTracker interface {
Client() evmclient.Client
HeadTracker() httypes.HeadTracker
}

Expand Down
90 changes: 63 additions & 27 deletions core/services/relay/evm/mercury/v1/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"

pkgerrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

Expand All @@ -25,6 +27,24 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
insufficientBlocksCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_insufficient_blocks_count",
Help: fmt.Sprintf("Count of times that there were not enough blocks in the chain during observation (need: %d)", nBlocksObservation),
},
[]string{"feedID"},
)
zeroBlocksCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_zero_blocks_count",
Help: "Count of times that there were zero blocks in the chain during observation",
},
[]string{"feedID"},
)
)

// nBlocksObservation controls how many blocks are included in the LatestBlocks observation
const nBlocksObservation int = 5

type Runner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error)
}
Expand All @@ -51,18 +71,31 @@ type datasource struct {
chainHeadTracker types.ChainHeadTracker
fetcher Fetcher
initialBlockNumber *int64

insufficientBlocksCounter prometheus.Counter
zeroBlocksCounter prometheus.Counter
}

var _ relaymercuryv1.DataSource = &datasource{}

func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID [32]byte) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber}
func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())}
}

type ErrEmptyLatestReport struct {
Err error
}

func (e ErrEmptyLatestReport) Unwrap() error { return e.Err }

func (e ErrEmptyLatestReport) Error() string {
return fmt.Sprintf("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: %v", e.Err)
}

func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercuryv1.Observation, pipelineExecutionErr error) {
// setCurrentBlock must come first, along with observationTimestamp, to
// avoid front-running
ds.setCurrentBlock(ctx, &obs)
// setLatestBlocks must come chronologically before observations, along
// with observationTimestamp, to avoid front-running
ds.setLatestBlocks(ctx, &obs)

var wg sync.WaitGroup
if fetchMaxFinalizedBlockNum {
Expand All @@ -89,7 +122,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}
if ds.initialBlockNumber == nil {
if obs.CurrentBlockNum.Err != nil {
obs.MaxFinalizedBlockNumber.Err = fmt.Errorf("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: %w", obs.CurrentBlockNum.Err)
obs.MaxFinalizedBlockNumber.Err = ErrEmptyLatestReport{Err: obs.CurrentBlockNum.Err}
} else {
// Subract 1 here because we will later add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
Expand Down Expand Up @@ -258,37 +291,40 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T
return run, trrs, err
}

func (ds *datasource) setCurrentBlock(ctx context.Context, obs *relaymercuryv1.Observation) {
latestHead, err := ds.getCurrentBlock(ctx)
if err != nil {
func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) {
latestBlocks := ds.getLatestBlocks(ctx, nBlocksObservation)
if len(latestBlocks) < nBlocksObservation {
ds.insufficientBlocksCounter.Inc()
ds.lggr.Warnw("Insufficient blocks", "latestBlocks", latestBlocks, "lenLatestBlocks", len(latestBlocks), "nBlocksObservation", nBlocksObservation)
}

// TODO: remove with https://smartcontract-it.atlassian.net/browse/BCF-2209
if len(latestBlocks) == 0 {
ds.zeroBlocksCounter.Inc()
err := errors.New("no blocks available")
obs.CurrentBlockNum.Err = err
obs.CurrentBlockHash.Err = err
obs.CurrentBlockTimestamp.Err = err
return
} else {
obs.CurrentBlockNum.Val = latestBlocks[0].Number
obs.CurrentBlockHash.Val = latestBlocks[0].Hash.Bytes()
if latestBlocks[0].Timestamp.IsZero() {
obs.CurrentBlockTimestamp.Val = 0
} else {
obs.CurrentBlockTimestamp.Val = uint64(latestBlocks[0].Timestamp.Unix())
}
}
obs.CurrentBlockNum.Val = latestHead.Number
obs.CurrentBlockHash.Val = latestHead.Hash.Bytes()

if latestHead.Timestamp.IsZero() {
obs.CurrentBlockTimestamp.Val = 0
} else {
obs.CurrentBlockTimestamp.Val = uint64(latestHead.Timestamp.Unix())
for _, block := range latestBlocks {
obs.LatestBlocks = append(obs.LatestBlocks, relaymercuryv1.NewBlock(block.Number, block.Hash.Bytes(), uint64(block.Timestamp.Unix())))
}
}

func (ds *datasource) getCurrentBlock(ctx context.Context) (*evmtypes.Head, error) {
// Use the headtracker's view of the latest block, this is very fast since
func (ds *datasource) getLatestBlocks(ctx context.Context, k int) (blocks []*evmtypes.Head) {
// Use the headtracker's view of the chain, this is very fast since
// it doesn't make any external network requests, and it is the
// headtracker's job to ensure it has an up-to-date view of the chain based
// on responses from all available RPC nodes
latestHead := ds.chainHeadTracker.HeadTracker().LatestChain()
if latestHead == nil {
logger.Sugared(ds.lggr).AssumptionViolation("HeadTracker unexpectedly returned nil head, falling back to RPC call")
var err error
latestHead, err = ds.chainHeadTracker.Client().HeadByNumber(ctx, nil)
if err != nil {
return nil, err
}
}
return latestHead, nil
return latestHead.AsSlice(k)
}
Loading

0 comments on commit 28e9596

Please sign in to comment.