Skip to content

Commit

Permalink
Ensure RPCClient with enabled polling respects health check flag (#14896
Browse files Browse the repository at this point in the history
)

* Fix SubscribeToHeads tests & Ensure RPCClient with enabled polling respects health check flag

* Fail test instead of panic, if was not able to marshal head

Co-authored-by: Jordan Krage <[email protected]>

* do not use two context in the same scope

---------

Co-authored-by: Jordan Krage <[email protected]>
  • Loading branch information
dhaidashenko and jmank88 authored Nov 1, 2024
1 parent 65351c3 commit 38ef179
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 74 deletions.
5 changes: 5 additions & 0 deletions .changeset/cool-feet-happen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Ensure RPCClient with enabled polling respects health check flag #internal
16 changes: 14 additions & 2 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,13 @@ func (r *RPCClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H
if r.newHeadsPollInterval > 0 {
interval := r.newHeadsPollInterval
timeout := interval
poller, channel := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, timeout, r.rpcLog)
isHealthCheckRequest := commonclient.CtxIsHeathCheckRequest(ctx)
poller, channel := commonclient.NewPoller[*evmtypes.Head](interval, func(ctx context.Context) (*evmtypes.Head, error) {
if isHealthCheckRequest {
ctx = commonclient.CtxAddHealthCheckFlag(ctx)
}
return r.latestBlock(ctx)
}, timeout, r.rpcLog)
if err = poller.Start(ctx); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -510,7 +516,13 @@ func (r *RPCClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *evmt
return nil, nil, errors.New("FinalizedBlockPollInterval is 0")
}
timeout := interval
poller, channel := commonclient.NewPoller[*evmtypes.Head](interval, r.LatestFinalizedBlock, timeout, r.rpcLog)
isHealthCheckRequest := commonclient.CtxIsHeathCheckRequest(ctx)
poller, channel := commonclient.NewPoller[*evmtypes.Head](interval, func(ctx context.Context) (*evmtypes.Head, error) {
if isHealthCheckRequest {
ctx = commonclient.CtxAddHealthCheckFlag(ctx)
}
return r.LatestFinalizedBlock(ctx)
}, timeout, r.rpcLog)
if err := poller.Start(ctx); err != nil {
return nil, nil, err
}
Expand Down
160 changes: 88 additions & 72 deletions core/chains/evm/client/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

commonclient "github.com/smartcontractkit/chainlink/v2/common/client"
commontypes "github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -50,16 +51,10 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
NodeFinalizedBlockPollInterval: 1 * time.Second,
}

nodePoolCfgNoPolling := client.TestNodePoolConfig{
nodePoolCfgWSSub := client.TestNodePoolConfig{
NodeFinalizedBlockPollInterval: 1 * time.Second,
}

var rpcHeads []*evmtypes.Head
previousHead := &evmtypes.Head{Number: 0}
SetNextRPCHead := func(head *evmtypes.Head) {
rpcHeads = append(rpcHeads, head)
}

serverCallBack := func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) {
if method == "eth_unsubscribe" {
resp.Result = "true"
Expand All @@ -71,20 +66,6 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
}
return
}
assert.Equal(t, "eth_getBlockByNumber", method)
if assert.True(t, params.IsArray()) && assert.Equal(t, "latest", params.Array()[0].String()) {
if len(rpcHeads) == 0 {
SetNextRPCHead(previousHead)
}
head := rpcHeads[0]
previousHead = head
rpcHeads = rpcHeads[1:]
jsonHead, err := json.Marshal(head)
if err != nil {
panic(fmt.Errorf("failed to marshal head: %w", err))
}
resp.Result = string(jsonHead)
}
return
}

Expand Down Expand Up @@ -114,7 +95,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
// set to default values
Expand All @@ -126,14 +107,13 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
assert.Equal(t, int64(0), highestUserObservations.FinalizedBlockNumber)
assert.Nil(t, highestUserObservations.TotalDifficulty)

SetNextRPCHead(&evmtypes.Head{Number: 256, TotalDifficulty: big.NewInt(1000)})
SetNextRPCHead(&evmtypes.Head{Number: 128, TotalDifficulty: big.NewInt(500)})

ch, sub, err := rpc.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
defer sub.Unsubscribe()
go server.MustWriteBinaryMessageSync(t, makeNewHeadWSMessage(&evmtypes.Head{Number: 256, TotalDifficulty: big.NewInt(1000)}))
// received 256 head
<-ch
go server.MustWriteBinaryMessageSync(t, makeNewHeadWSMessage(&evmtypes.Head{Number: 128, TotalDifficulty: big.NewInt(500)}))
// received 128 head
<-ch

Expand Down Expand Up @@ -164,15 +144,14 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))

SetNextRPCHead(&evmtypes.Head{Number: 256, TotalDifficulty: big.NewInt(1000)})

ch, sub, err := rpc.SubscribeToHeads(commonclient.CtxAddHealthCheckFlag(tests.Context(t)))
require.NoError(t, err)
defer sub.Unsubscribe()
go server.MustWriteBinaryMessageSync(t, makeNewHeadWSMessage(&evmtypes.Head{Number: 256, TotalDifficulty: big.NewInt(1000)}))
// received 256 head
<-ch

Expand All @@ -181,41 +160,73 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
assert.Equal(t, int64(0), latest.FinalizedBlockNumber)
assert.Equal(t, big.NewInt(1000), latest.TotalDifficulty)

assert.Equal(t, int64(256), highestUserObservations.BlockNumber)
assert.Equal(t, int64(0), highestUserObservations.BlockNumber)
assert.Equal(t, int64(0), highestUserObservations.FinalizedBlockNumber)
assert.Equal(t, big.NewInt(1000), highestUserObservations.TotalDifficulty)
assert.Nil(t, highestUserObservations.TotalDifficulty)
})
t.Run("SubscribeToHeads with http polling enabled will update new heads", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
type rpcServer struct {
Head *evmtypes.Head
URL *url.URL
}
createRPCServer := func() *rpcServer {
server := &rpcServer{}
server.URL = testutils.NewWSServer(t, chainId, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) {
assert.Equal(t, "eth_getBlockByNumber", method)
if assert.True(t, params.IsArray()) && assert.Equal(t, "latest", params.Array()[0].String()) {
head := server.Head
jsonHead, err := json.Marshal(head)
if assert.NoError(t, err, "failed to marshal head") {
resp.Result = string(jsonHead)
}
}

rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
return
}).WSURL()
return server
}

server := createRPCServer()
rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))

latest, highestUserObservations := rpc.GetInterceptedChainInfo()
// latest chain info hasn't been initialized
assert.Equal(t, int64(0), latest.BlockNumber)
assert.Equal(t, int64(0), highestUserObservations.BlockNumber)

SetNextRPCHead(&evmtypes.Head{Number: 127, TotalDifficulty: big.NewInt(1000)})

headCh, sub, err := rpc.SubscribeToHeads(commonclient.CtxAddHealthCheckFlag(tests.Context(t)))
server.Head = &evmtypes.Head{Number: 127}
headCh, sub, err := rpc.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
defer sub.Unsubscribe()

head := <-headCh
assert.Equal(t, int64(127), head.BlockNumber())
// the http polling subscription should update the head block
assert.Equal(t, server.Head.Number, head.BlockNumber())
// should update both latest and user observations
latest, highestUserObservations = rpc.GetInterceptedChainInfo()
assert.Equal(t, int64(127), latest.BlockNumber)
assert.Equal(t, int64(127), highestUserObservations.BlockNumber)

// subscription with health check flag won't affect user observations
sub.Unsubscribe() // stop prev subscription
server.Head = &evmtypes.Head{Number: 256}
headCh, sub, err = rpc.SubscribeToHeads(commonclient.CtxAddHealthCheckFlag(tests.Context(t)))
require.NoError(t, err)
defer sub.Unsubscribe()

head = <-headCh
assert.Equal(t, server.Head.Number, head.BlockNumber())
// should only update latest
latest, highestUserObservations = rpc.GetInterceptedChainInfo()
assert.Equal(t, int64(256), latest.BlockNumber)
assert.Equal(t, int64(127), highestUserObservations.BlockNumber)
})
t.Run("Concurrent Unsubscribe and onNewHead calls do not lead to a deadlock", func(t *testing.T) {
const numberOfAttempts = 1000 // need a large number to increase the odds of reproducing the issue
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
var wg sync.WaitGroup
Expand All @@ -228,7 +239,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
wg.Done()
}()
go func() {
rpc.UnsubscribeAllExcept(sub)
rpc.UnsubscribeAllExcept()
sub.Unsubscribe()
wg.Done()
}()
Expand All @@ -238,7 +249,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
t.Run("Block's chain ID matched configured", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
ch, sub, err := rpc.SubscribeToHeads(tests.Context(t))
Expand All @@ -254,7 +265,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
})
wsURL := server.WSURL()
observedLggr, observed := logger.TestObserved(t, zap.DebugLevel)
rpc := client.NewRPCClient(nodePoolCfgNoPolling, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(nodePoolCfgWSSub, observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.NoError(t, rpc.Dial(ctx))
server.Close()
_, _, err := rpc.SubscribeToHeads(ctx)
Expand All @@ -264,31 +275,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
t.Run("Closed rpc client should remove existing SubscribeToHeads subscription with WS", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))

_, sub, err := rpc.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
})
t.Run("Closed rpc client should remove existing SubscribeToHeads subscription with HTTP polling", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(nodePoolCfgHeadPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))

_, sub, err := rpc.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
})
t.Run("Closed rpc client should remove existing SubscribeToHeads subscription with WS", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))

Expand Down Expand Up @@ -323,14 +310,12 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) {
t.Run("Subscription error is properly wrapper", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
rpc := client.NewRPCClient(nodePoolCfgNoPolling, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(nodePoolCfgWSSub, lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
SetNextRPCHead(nil)
_, sub, err := rpc.SubscribeToHeads(ctx)
require.NoError(t, err)
go server.MustWriteBinaryMessageSync(t, "invalid msg")

select {
case err = <-sub.Err():
require.ErrorContains(t, err, "RPCClient returned error (rpc): invalid character")
Expand Down Expand Up @@ -477,11 +462,42 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) {
assert.Equal(t, int64(0), latest.BlockNumber)
assert.Equal(t, int64(256), latest.FinalizedBlockNumber)

// subscription updates chain info
server.Head = &evmtypes.Head{Number: 512}
ch, sub, err := rpc.SubscribeToFinalizedHeads(ctx)
require.NoError(t, err)
defer sub.Unsubscribe()
head := <-ch
require.Equal(t, int64(512), head.BlockNumber())

latest, highestUserObservations = rpc.GetInterceptedChainInfo()
assert.Equal(t, int64(0), highestUserObservations.BlockNumber)
assert.Equal(t, int64(512), highestUserObservations.FinalizedBlockNumber)

assert.Equal(t, int64(0), latest.BlockNumber)
assert.Equal(t, int64(512), latest.FinalizedBlockNumber)

// health check subscription only updates latest
sub.Unsubscribe() // close previous one
server.Head = &evmtypes.Head{Number: 1024}
ch, sub, err = rpc.SubscribeToFinalizedHeads(commonclient.CtxAddHealthCheckFlag(ctx))
require.NoError(t, err)
defer sub.Unsubscribe()
head = <-ch
require.Equal(t, int64(1024), head.BlockNumber())

latest, highestUserObservations = rpc.GetInterceptedChainInfo()
assert.Equal(t, int64(0), highestUserObservations.BlockNumber)
assert.Equal(t, int64(512), highestUserObservations.FinalizedBlockNumber)

assert.Equal(t, int64(0), latest.BlockNumber)
assert.Equal(t, int64(1024), latest.FinalizedBlockNumber)

// Close resets latest ChainInfo
rpc.Close()
latest, highestUserObservations = rpc.GetInterceptedChainInfo()
assert.Equal(t, int64(0), highestUserObservations.BlockNumber)
assert.Equal(t, int64(128), highestUserObservations.FinalizedBlockNumber)
assert.Equal(t, int64(512), highestUserObservations.FinalizedBlockNumber)

assert.Equal(t, int64(0), latest.BlockNumber)
assert.Equal(t, int64(0), latest.FinalizedBlockNumber)
Expand Down

0 comments on commit 38ef179

Please sign in to comment.