Skip to content

Commit

Permalink
HeadTracker finalization support (#12082)
Browse files Browse the repository at this point in the history
* Add LatestFinalizedBlock to HeadTracker

* Added LatestFinalizedHead to Head

* remove unused func

* fix flakey nil pointer

* improve logs & address lint issue

* nitpicks

* fixed copy on heads on MarkFinalized

* error instead of panic

* return error instead of panic

* nitpicks

* Finalized block based history depth

* simplify trimming

* nit fixes

* fix build issues caused by merge

* regen

* FIx rpc client mock generation

* nit fixes

* nit fixes

* update comments

* ensure that we trim redundant blocks both in slice and in chain in Heads
handle corner case for multiple uncle blocks at the end of the slice

* nit fix

* Update common/headtracker/head_tracker.go

Co-authored-by: Dimitris Grigoriou <[email protected]>

* HeadTracker backfill test with 0 finality depth

* docs

* Update docs/CHANGELOG.md

Co-authored-by: Dimitris Grigoriou <[email protected]>

* ensure latest finalized block is valid on startup

* changeset

* switch from warn to debug level when we failed to makr block as finalized

---------

Co-authored-by: Dimitris Grigoriou <[email protected]>
  • Loading branch information
dhaidashenko and dimriou authored Mar 13, 2024
1 parent e6843e8 commit 608ea0a
Show file tree
Hide file tree
Showing 33 changed files with 782 additions and 347 deletions.
5 changes: 5 additions & 0 deletions .changeset/healthy-toes-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

HeadTracker now respects the `FinalityTagEnabled` config option. If the flag is enabled, HeadTracker backfills blocks up to the latest finalized block provided by the corresponding RPC call. To address potential misconfigurations, `HistoryDepth` is now calculated from the latest finalized block instead of the head. NOTE: Consumers (e.g. TXM and LogPoller) do not fully utilize Finality Tag yet.
28 changes: 28 additions & 0 deletions common/client/mock_rpc_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,3 +819,12 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}
return n.RPC().TransactionReceipt(ctx, txHash)
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) LatestFinalizedBlock(ctx context.Context) (head HEAD, err error) {
n, err := c.selectNode()
if err != nil {
return head, err
}

return n.RPC().LatestFinalizedBlock(ctx)
}
1 change: 1 addition & 0 deletions common/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type clientAPI[
BlockByNumber(ctx context.Context, number *big.Int) (HEAD, error)
BlockByHash(ctx context.Context, hash BLOCK_HASH) (HEAD, error)
LatestBlockHeight(context.Context) (*big.Int, error)
LatestFinalizedBlock(ctx context.Context) (HEAD, error)

// Events
FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error)
Expand Down
155 changes: 106 additions & 49 deletions common/headtracker/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"

Expand Down Expand Up @@ -96,37 +97,19 @@ func NewHeadTracker[
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error {
return ht.StartOnce("HeadTracker", func() error {
ht.log.Debugw("Starting HeadTracker", "chainID", ht.chainID)
latestChain, err := ht.headSaver.Load(ctx)
if err != nil {
return err
}
if latestChain.IsValid() {
ht.log.Debugw(
fmt.Sprintf("HeadTracker: Tracking logs from last block %v with hash %s", latestChain.BlockNumber(), latestChain.BlockHash()),
"blockNumber", latestChain.BlockNumber(),
"blockHash", latestChain.BlockHash(),
)
}

// NOTE: Always try to start the head tracker off with whatever the
// latest head is, without waiting for the subscription to send us one.
//
// In some cases the subscription will send us the most recent head
// anyway when we connect (but we should not rely on this because it is
// not specced). If it happens this is fine, and the head will be
// ignored as a duplicate.
initialHead, err := ht.getInitialHead(ctx)
err := ht.handleInitialHead(ctx)
if err != nil {
if errors.Is(err, ctx.Err()) {
return nil
if ctx.Err() != nil {
return ctx.Err()
}
ht.log.Errorw("Error getting initial head", "err", err)
} else if initialHead.IsValid() {
if err := ht.handleNewHead(ctx, initialHead); err != nil {
return fmt.Errorf("error handling initial head: %w", err)
}
} else {
ht.log.Debug("Got nil initial head")
ht.log.Errorw("Error handling initial head", "err", err)
}

ht.wgDone.Add(3)
Expand All @@ -140,6 +123,49 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error
})
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleInitialHead(ctx context.Context) error {
initialHead, err := ht.client.HeadByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("failed to fetch initial head: %w", err)
}

if !initialHead.IsValid() {
ht.log.Warnw("Got nil initial head", "head", initialHead)
return nil
}
ht.log.Debugw("Got initial head", "head", initialHead, "blockNumber", initialHead.BlockNumber(), "blockHash", initialHead.BlockHash())

latestFinalized, err := ht.calculateLatestFinalized(ctx, initialHead)
if err != nil {
return fmt.Errorf("failed to calculate latest finalized head: %w", err)
}

if !latestFinalized.IsValid() {
return fmt.Errorf("latest finalized block is not valid")
}

latestChain, err := ht.headSaver.Load(ctx, latestFinalized.BlockNumber())
if err != nil {
return fmt.Errorf("failed to initialized headSaver: %w", err)
}

if latestChain.IsValid() {
earliest := latestChain.EarliestHeadInChain()
ht.log.Debugw(
"Loaded chain from DB",
"latest_blockNumber", latestChain.BlockNumber(),
"latest_blockHash", latestChain.BlockHash(),
"earliest_blockNumber", earliest.BlockNumber(),
"earliest_blockHash", earliest.BlockHash(),
)
}
if err := ht.handleNewHead(ctx, initialHead); err != nil {
return fmt.Errorf("error handling initial head: %w", err)
}

return nil
}

// Close stops HeadTracker service.
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Close() error {
return ht.StopOnce("HeadTracker", func() error {
Expand All @@ -159,36 +185,26 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error {
return report
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, depth uint) (err error) {
if uint(headWithChain.ChainLength()) >= depth {
return nil
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain, latestFinalized HTH) (err error) {
if !latestFinalized.IsValid() {
return errors.New("can not perform backfill without a valid latestFinalized head")
}

baseHeight := headWithChain.BlockNumber() - int64(depth-1)
if baseHeight < 0 {
baseHeight = 0
if headWithChain.BlockNumber() < latestFinalized.BlockNumber() {
const errMsg = "invariant violation: expected head of canonical chain to be ahead of the latestFinalized"
ht.log.With("head_block_num", headWithChain.BlockNumber(),
"latest_finalized_block_number", latestFinalized.BlockNumber()).
Criticalf(errMsg)
return errors.New(errMsg)
}

return ht.backfill(ctx, headWithChain.EarliestHeadInChain(), baseHeight)
return ht.backfill(ctx, headWithChain, latestFinalized)
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) LatestChain() HTH {
return ht.headSaver.LatestChain()
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) getInitialHead(ctx context.Context) (HTH, error) {
head, err := ht.client.HeadByNumber(ctx, nil)
if err != nil {
return ht.getNilHead(), fmt.Errorf("failed to fetch initial head: %w", err)
}
loggerFields := []interface{}{"head", head}
if head.IsValid() {
loggerFields = append(loggerFields, "blockNumber", head.BlockNumber(), "blockHash", head.BlockHash())
}
ht.log.Debugw("Got initial head", loggerFields...)
return head, nil
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, head HTH) error {
prevHead := ht.headSaver.LatestChain()

Expand Down Expand Up @@ -290,7 +306,13 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() {
break
}
{
err := ht.Backfill(ctx, head, uint(ht.config.FinalityDepth()))
latestFinalized, err := ht.calculateLatestFinalized(ctx, head)
if err != nil {
ht.log.Warnw("Failed to calculate finalized block", "err", err)
continue
}

err = ht.Backfill(ctx, head, latestFinalized)
if err != nil {
ht.log.Warnw("Unexpected error while backfilling heads", "err", err)
} else if ctx.Err() != nil {
Expand All @@ -302,14 +324,30 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() {
}
}

// backfill fetches all missing heads up until the base height
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head types.Head[BLOCK_HASH], baseHeight int64) (err error) {
headBlockNumber := head.BlockNumber()
if headBlockNumber <= baseHeight {
return nil
// calculateLatestFinalized - returns latest finalized block. It's expected that currentHeadNumber - is the head of
// canonical chain. There is no guaranties that returned block belongs to the canonical chain. Additional verification
// must be performed before usage.
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) calculateLatestFinalized(ctx context.Context, currentHead HTH) (h HTH, err error) {
if ht.config.FinalityTagEnabled() {
return ht.client.LatestFinalizedBlock(ctx)
}
// no need to make an additional RPC call on chains with instant finality
if ht.config.FinalityDepth() == 0 {
return currentHead, nil
}
finalizedBlockNumber := currentHead.BlockNumber() - int64(ht.config.FinalityDepth())
if finalizedBlockNumber <= 0 {
finalizedBlockNumber = 0
}
return ht.client.HeadByNumber(ctx, big.NewInt(finalizedBlockNumber))
}

// backfill fetches all missing heads up until the latestFinalizedHead
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head, latestFinalizedHead HTH) (err error) {
headBlockNumber := head.BlockNumber()
mark := time.Now()
fetched := 0
baseHeight := latestFinalizedHead.BlockNumber()
l := ht.log.With("blockNumber", headBlockNumber,
"n", headBlockNumber-baseHeight,
"fromBlockHeight", baseHeight,
Expand Down Expand Up @@ -337,11 +375,30 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, hea
fetched++
if ctx.Err() != nil {
ht.log.Debugw("context canceled, aborting backfill", "err", err, "ctx.Err", ctx.Err())
break
return fmt.Errorf("fetchAndSaveHead failed: %w", ctx.Err())
} else if err != nil {
return fmt.Errorf("fetchAndSaveHead failed: %w", err)
}
}

if head.BlockHash() != latestFinalizedHead.BlockHash() {
const errMsg = "expected finalized block to be present in canonical chain"
ht.log.With("finalized_block_number", latestFinalizedHead.BlockNumber(), "finalized_hash", latestFinalizedHead.BlockHash(),
"canonical_chain_block_number", head.BlockNumber(), "canonical_chain_hash", head.BlockHash()).Criticalf(errMsg)
return fmt.Errorf(errMsg)
}

l = l.With("latest_finalized_block_hash", latestFinalizedHead.BlockHash(),
"latest_finalized_block_number", latestFinalizedHead.BlockNumber())

err = ht.headSaver.MarkFinalized(ctx, latestFinalizedHead)
if err != nil {
l.Debugw("failed to mark block as finalized", "err", err)
return nil
}

l.Debugw("marked block as finalized")

return
}

Expand Down
2 changes: 2 additions & 0 deletions common/headtracker/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ type Client[H types.Head[BLOCK_HASH], S types.Subscription, ID types.ID, BLOCK_H
// SubscribeNewHead is the method in which the client receives new Head.
// It can be implemented differently for each chain i.e websocket, polling, etc
SubscribeNewHead(ctx context.Context, ch chan<- H) (S, error)
// LatestFinalizedBlock - returns the latest block that was marked as finalized
LatestFinalizedBlock(ctx context.Context) (head H, err error)
}
1 change: 1 addition & 0 deletions common/headtracker/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "time"
type Config interface {
BlockEmissionIdleWarningThreshold() time.Duration
FinalityDepth() uint32
FinalityTagEnabled() bool
}

type HeadTrackerConfig interface {
Expand Down
10 changes: 5 additions & 5 deletions common/mocks/head_tracker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions common/types/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
//go:generate mockery --quiet --name HeadTracker --output ../mocks/ --case=underscore
type HeadTracker[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface {
services.Service
// Backfill given a head will fill in any missing heads up to the given depth
// (used for testing)
Backfill(ctx context.Context, headWithChain H, depth uint) (err error)
// Backfill given a head will fill in any missing heads up to latestFinalized
Backfill(ctx context.Context, headWithChain, latestFinalized H) (err error)
LatestChain() H
}

Expand All @@ -37,12 +36,14 @@ type HeadSaver[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface {
// Save updates the latest block number, if indeed the latest, and persists
// this number in case of reboot.
Save(ctx context.Context, head H) error
// Load loads latest EvmHeadTrackerHistoryDepth heads, returns the latest chain.
Load(ctx context.Context) (H, error)
// Load loads latest heads up to latestFinalized - historyDepth, returns the latest chain.
Load(ctx context.Context, latestFinalized int64) (H, error)
// LatestChain returns the block header with the highest number that has been seen, or nil.
LatestChain() H
// Chain returns a head for the specified hash, or nil.
Chain(hash BLOCK_HASH) H
// MarkFinalized - marks matching block and all it's direct ancestors as finalized
MarkFinalized(ctx context.Context, latestFinalized H) error
}

// HeadListener is a chain agnostic interface that manages connection of Client that receives heads from the blockchain node
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,7 @@ func (c *chainClient) TransactionReceipt(ctx context.Context, txHash common.Hash
//return rpc.TransactionReceipt(ctx, txHash)
return rpc.TransactionReceiptGeth(ctx, txHash)
}

func (c *chainClient) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) {
return c.multiNode.LatestFinalizedBlock(ctx)
}
5 changes: 5 additions & 0 deletions core/chains/evm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Client interface {
HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error)
HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error)
SubscribeNewHead(ctx context.Context, ch chan<- *evmtypes.Head) (ethereum.Subscription, error)
LatestFinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error)

SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error)

Expand Down Expand Up @@ -366,3 +367,7 @@ func (client *client) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, er
func (client *client) IsL2() bool {
return client.pool.ChainType().IsL2()
}

func (client *client) LatestFinalizedBlock(_ context.Context) (*evmtypes.Head, error) {
return nil, pkgerrors.New("not implemented. client was deprecated. New methods are added only to satisfy type constraints while we are migrating to new alternatives")
}
Loading

0 comments on commit 608ea0a

Please sign in to comment.