Skip to content

Commit

Permalink
latest finalized block metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Mar 7, 2024
1 parent dee11fc commit 4b27f75
Show file tree
Hide file tree
Showing 28 changed files with 508 additions and 144 deletions.
18 changes: 18 additions & 0 deletions common/client/mock_head_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions common/client/mock_node_client_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions common/client/mocks/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package mocks

import "time"

type NodeConfig struct {
PollFailureThresholdVal uint32
PollIntervalVal time.Duration
SelectionModeVal string
SyncThresholdVal uint32
NodeIsSyncingEnabledVal bool
FinalizedBlockPollIntervalVal time.Duration
}

func (n NodeConfig) PollFailureThreshold() uint32 {
return n.PollFailureThresholdVal
}

func (n NodeConfig) PollInterval() time.Duration {
return n.PollIntervalVal
}

func (n NodeConfig) SelectionMode() string {
return n.SelectionModeVal
}

func (n NodeConfig) SyncThreshold() uint32 {
return n.SyncThresholdVal
}

func (n NodeConfig) NodeIsSyncingEnabled() bool {
return n.NodeIsSyncingEnabledVal
}

func (n NodeConfig) FinalizedBlockPollInterval() time.Duration {
return n.FinalizedBlockPollIntervalVal
}

type ChainConfig struct {
IsFinalityTagEnabled bool
FinalityDepthVal uint32
NoNewHeadsThresholdVal time.Duration
}

func (t ChainConfig) NodeNoNewHeadsThreshold() time.Duration {
return t.NoNewHeadsThresholdVal
}

func (t ChainConfig) FinalityDepth() uint32 {
return t.FinalityDepthVal
}

func (t ChainConfig) FinalityTagEnabled() bool {
return t.IsFinalityTagEnabled
}
32 changes: 20 additions & 12 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ type NodeConfig interface {
SelectionMode() string
SyncThreshold() uint32
NodeIsSyncingEnabled() bool
FinalizedBlockPollInterval() time.Duration
}

type ChainConfig interface {
NodeNoNewHeadsThreshold() time.Duration
FinalityDepth() uint32
FinalityTagEnabled() bool
}

//go:generate mockery --quiet --name Node --structname mockNode --filename "mock_node_test.go" --inpackage --case=underscore
Expand Down Expand Up @@ -73,14 +80,14 @@ type node[
RPC NodeClient[CHAIN_ID, HEAD],
] struct {
services.StateMachine
lfcLog logger.Logger
name string
id int32
chainID CHAIN_ID
nodePoolCfg NodeConfig
noNewHeadsThreshold time.Duration
order int32
chainFamily string
lfcLog logger.Logger
name string
id int32
chainID CHAIN_ID
nodePoolCfg NodeConfig
chainCfg ChainConfig
order int32
chainFamily string

ws url.URL
http *url.URL
Expand All @@ -90,8 +97,9 @@ type node[
stateMu sync.RWMutex // protects state* fields
state nodeState
// Each node is tracking the last received head number and total difficulty
stateLatestBlockNumber int64
stateLatestTotalDifficulty *big.Int
stateLatestBlockNumber int64
stateLatestTotalDifficulty *big.Int
stateLatestFinalizedBlockNumber int64

// nodeCtx is the node lifetime's context
nodeCtx context.Context
Expand All @@ -113,7 +121,7 @@ func NewNode[
RPC NodeClient[CHAIN_ID, HEAD],
](
nodeCfg NodeConfig,
noNewHeadsThreshold time.Duration,
chainCfg ChainConfig,
lggr logger.Logger,
wsuri url.URL,
httpuri *url.URL,
Expand All @@ -129,7 +137,7 @@ func NewNode[
n.id = id
n.chainID = chainID
n.nodePoolCfg = nodeCfg
n.noNewHeadsThreshold = noNewHeadsThreshold
n.chainCfg = chainCfg
n.ws = wsuri
n.order = nodeOrder
if httpuri != nil {
Expand Down
5 changes: 3 additions & 2 deletions common/client/node_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/assert"

clientMocks "github.com/smartcontractkit/chainlink/v2/common/client/mocks"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

Expand All @@ -28,7 +29,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) {
t.Parallel()

t.Run("setState", func(t *testing.T) {
n := newTestNode(t, testNodeOpts{rpc: nil, config: testNodeConfig{nodeIsSyncingEnabled: true}})
n := newTestNode(t, testNodeOpts{rpc: nil, config: clientMocks.NodeConfig{NodeIsSyncingEnabledVal: true}})
assert.Equal(t, nodeStateUndialed, n.State())
n.setState(nodeStateAlive)
assert.Equal(t, nodeStateAlive, n.State())
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) {
}

func testTransition(t *testing.T, rpc *mockNodeClient[types.ID, Head], transition func(node testNode, fn func()), destinationState nodeState, allowedStates ...nodeState) {
node := newTestNode(t, testNodeOpts{rpc: rpc, config: testNodeConfig{nodeIsSyncingEnabled: true}})
node := newTestNode(t, testNodeOpts{rpc: rpc, config: clientMocks.NodeConfig{NodeIsSyncingEnabledVal: true}})
for _, allowedState := range allowedStates {
m := new(fnMock)
node.setState(allowedState)
Expand Down
45 changes: 42 additions & 3 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var (
Name: "pool_rpc_node_highest_seen_block",
Help: "The highest seen block for the given RPC node",
}, []string{"chainID", "nodeName"})
promPoolRPCNodeHighestFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "pool_rpc_node_highest_finalized_block",
Help: "The highest seen finalized block for the given RPC node",
}, []string{"chainID", "nodeName"})
promPoolRPCNodeNumSeenBlocks = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "pool_rpc_node_num_seen_blocks",
Help: "The total number of new blocks seen by the given RPC node",
Expand Down Expand Up @@ -88,7 +92,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
}
}

noNewHeadsTimeoutThreshold := n.noNewHeadsThreshold
noNewHeadsTimeoutThreshold := n.chainCfg.NodeNoNewHeadsThreshold()
pollFailureThreshold := n.nodePoolCfg.PollFailureThreshold()
pollInterval := n.nodePoolCfg.PollInterval()

Expand Down Expand Up @@ -134,6 +138,14 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Debug("Polling disabled")
}

var pollFinalizedHeadCh <-chan time.Time
if n.nodePoolCfg.FinalizedBlockPollInterval() > 0 {
lggr.Debugw("Finalized block polling enabled")
pollT := time.NewTicker(n.nodePoolCfg.FinalizedBlockPollInterval())
defer pollT.Stop()
pollFinalizedHeadCh = pollT.C
}

_, highestReceivedBlockNumber, _ := n.StateAndLatest()
var pollFailures uint32

Expand Down Expand Up @@ -201,6 +213,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
outOfSyncT.Reset(noNewHeadsTimeoutThreshold)
}
n.setLatestReceived(bh.BlockNumber(), bh.BlockDifficulty())
if !n.chainCfg.FinalityTagEnabled() {
latestFinalizedBN := max(bh.BlockNumber()-int64(n.chainCfg.FinalityDepth()), 0)
if latestFinalizedBN > n.stateLatestFinalizedBlockNumber {
promPoolRPCNodeHighestFinalizedBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(latestFinalizedBN))
n.stateLatestFinalizedBlockNumber = latestFinalizedBN
}
}
case err := <-sub.Err():
lggr.Errorw("Subscription was terminated", "err", err, "nodeState", n.State())
n.declareUnreachable()
Expand All @@ -214,13 +233,33 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
outOfSyncT.Reset(zombieNodeCheckInterval(n.noNewHeadsThreshold))
outOfSyncT.Reset(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold))
continue
}
}
n.declareOutOfSync(func(num int64, td *big.Int) bool { return num < highestReceivedBlockNumber })
return
case <-pollFinalizedHeadCh:
ctx, cancel := context.WithTimeout(n.nodeCtx, n.nodePoolCfg.FinalizedBlockPollInterval())
latestFinalized, err := n.RPC().LatestFinalizedBlock(ctx)
cancel()
if err != nil {
lggr.Warnw("Failed to fetch latest finalized block", "err", err)
continue
}

if !latestFinalized.IsValid() {
lggr.Warn("Latest finalized block is not valid")
continue
}

latestFinalizedBN := latestFinalized.BlockNumber()
if latestFinalizedBN > n.stateLatestFinalizedBlockNumber {
promPoolRPCNodeHighestFinalizedBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(latestFinalizedBN))
n.stateLatestFinalizedBlockNumber = latestFinalizedBN
}
}

}
}

Expand Down Expand Up @@ -316,7 +355,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
return
}
lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "nodeState", n.State())
case <-time.After(zombieNodeCheckInterval(n.noNewHeadsThreshold)):
case <-time.After(zombieNodeCheckInterval(n.chainCfg.NodeNoNewHeadsThreshold())):
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 1 {
lggr.Critical("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state")
Expand Down
Loading

0 comments on commit 4b27f75

Please sign in to comment.