From aa0f5ceb344b1cdb1bccdb713bb63c3397d48f26 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:58:24 +0100 Subject: [PATCH] BCFR-1076 false unreachable RPC transition on rotation (#1548) ## Motivation In cases when lease duration > 0 MultiNode periodically rotates RPCs. As we have not marked the finalized head subscription as part of aliveLoopSub on RPC rotation, the subscription was closed, which caused RPC to transition to an unreachable state. ## Solution Mark finalized subscription as part of alive loop to avoid subscription termination on RPC rotation. --- common/client/mock_node_client_test.go | 33 ++++++++++++++++++++++ common/client/mock_rpc_test.go | 33 ++++++++++++++++++++++ common/client/node_lifecycle.go | 1 + common/client/node_lifecycle_test.go | 7 +++++ common/client/types.go | 1 + core/chains/evm/client/mocks/rpc_client.go | 33 ++++++++++++++++++++++ core/chains/evm/client/rpc_client.go | 16 ++++++++--- core/chains/evm/client/rpc_client_test.go | 19 +++++++++++++ 8 files changed, 139 insertions(+), 4 deletions(-) diff --git a/common/client/mock_node_client_test.go b/common/client/mock_node_client_test.go index 5643dcde90..120e6c0b68 100644 --- a/common/client/mock_node_client_test.go +++ b/common/client/mock_node_client_test.go @@ -400,6 +400,39 @@ func (_c *mockNodeClient_IsSyncing_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(c return _c } +// SetAliveLoopFinalizedHeadSub provides a mock function with given fields: _a0 +func (_m *mockNodeClient[CHAIN_ID, HEAD]) SetAliveLoopFinalizedHeadSub(_a0 types.Subscription) { + _m.Called(_a0) +} + +// mockNodeClient_SetAliveLoopFinalizedHeadSub_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAliveLoopFinalizedHeadSub' +type mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID types.ID, HEAD Head] struct { + *mock.Call +} + +// SetAliveLoopFinalizedHeadSub is a helper method to define mock.On call +// - _a0 types.Subscription +func (_e *mockNodeClient_Expecter[CHAIN_ID, HEAD]) SetAliveLoopFinalizedHeadSub(_a0 interface{}) *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + return &mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("SetAliveLoopFinalizedHeadSub", _a0)} +} + +func (_c *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]) Run(run func(_a0 types.Subscription)) *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Subscription)) + }) + return _c +} + +func (_c *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]) Return() *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + _c.Call.Return() + return _c +} + +func (_c *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(types.Subscription)) *mockNodeClient_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, HEAD] { + _c.Call.Return(run) + return _c +} + // SetAliveLoopSub provides a mock function with given fields: _a0 func (_m *mockNodeClient[CHAIN_ID, HEAD]) SetAliveLoopSub(_a0 types.Subscription) { _m.Called(_a0) diff --git a/common/client/mock_rpc_test.go b/common/client/mock_rpc_test.go index 00473c6636..9e25c99dbf 100644 --- a/common/client/mock_rpc_test.go +++ b/common/client/mock_rpc_test.go @@ -1379,6 +1379,39 @@ func (_c *mockRPC_SequenceAt_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, return _c } +// SetAliveLoopFinalizedHeadSub provides a mock function with given fields: _a0 +func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) SetAliveLoopFinalizedHeadSub(_a0 types.Subscription) { + _m.Called(_a0) +} + +// mockRPC_SetAliveLoopFinalizedHeadSub_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAliveLoopFinalizedHeadSub' +type mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID types.ID, SEQ types.Sequence, ADDR types.Hashable, BLOCK_HASH types.Hashable, TX interface{}, TX_HASH types.Hashable, EVENT interface{}, EVENT_OPS interface{}, TX_RECEIPT types.Receipt[TX_HASH, BLOCK_HASH], FEE feetypes.Fee, HEAD types.Head[BLOCK_HASH], BATCH_ELEM interface{}] struct { + *mock.Call +} + +// SetAliveLoopFinalizedHeadSub is a helper method to define mock.On call +// - _a0 types.Subscription +func (_e *mockRPC_Expecter[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) SetAliveLoopFinalizedHeadSub(_a0 interface{}) *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + return &mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]{Call: _e.mock.On("SetAliveLoopFinalizedHeadSub", _a0)} +} + +func (_c *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) Run(run func(_a0 types.Subscription)) *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Subscription)) + }) + return _c +} + +func (_c *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) Return() *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + _c.Call.Return() + return _c +} + +func (_c *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) RunAndReturn(run func(types.Subscription)) *mockRPC_SetAliveLoopFinalizedHeadSub_Call[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM] { + _c.Call.Return(run) + return _c +} + // SetAliveLoopSub provides a mock function with given fields: _a0 func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) SetAliveLoopSub(_a0 types.Subscription) { _m.Called(_a0) diff --git a/common/client/node_lifecycle.go b/common/client/node_lifecycle.go index 40d9a9ef6e..bc87b25148 100644 --- a/common/client/node_lifecycle.go +++ b/common/client/node_lifecycle.go @@ -119,6 +119,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } defer finalizedHeadsSub.Unsubscribe() + n.rpc.SetAliveLoopFinalizedHeadSub(finalizedHeadsSub.sub) } var pollCh <-chan time.Time diff --git a/common/client/node_lifecycle_test.go b/common/client/node_lifecycle_test.go index 833bccf7f2..a5681eda96 100644 --- a/common/client/node_lifecycle_test.go +++ b/common/client/node_lifecycle_test.go @@ -446,6 +446,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() rpc.On("SetAliveLoopSub", mock.Anything).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newDialedNode(t, testNodeOpts{ config: testNodeConfig{}, @@ -467,6 +468,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { ch := make(chan Head) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once() rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() name := "node-" + rand.Str(5) node := newSubscribedNode(t, testNodeOpts{ config: testNodeConfig{}, @@ -501,6 +503,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { ch := make(chan Head) close(ch) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newSubscribedNode(t, testNodeOpts{ chainConfig: clientMocks.ChainConfig{ @@ -527,6 +530,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { ch := make(chan Head, 1) ch <- head{BlockNumber: 10}.ToMockHead(t) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observed := logger.TestObserved(t, zap.DebugLevel) noNewFinalizedHeadsThreshold := tests.TestInterval node := newSubscribedNode(t, testNodeOpts{ @@ -560,6 +564,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc := newMockNodeClient[types.ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observed := logger.TestObserved(t, zap.DebugLevel) noNewFinalizedHeadsThreshold := tests.TestInterval node := newSubscribedNode(t, testNodeOpts{ @@ -593,6 +598,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { sub.On("Err").Return((<-chan error)(errCh)) sub.On("Unsubscribe").Once() rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(nil), sub, nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) node := newSubscribedNode(t, testNodeOpts{ chainConfig: clientMocks.ChainConfig{ @@ -1116,6 +1122,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { outOfSyncSubscription.On("Unsubscribe").Once() ch := make(chan Head) rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), outOfSyncSubscription, nil).Once() + rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once() setupRPCForAliveLoop(t, rpc) diff --git a/common/client/types.go b/common/client/types.go index c9b6a3580e..806a3eaef1 100644 --- a/common/client/types.go +++ b/common/client/types.go @@ -66,6 +66,7 @@ type NodeClient[ ClientVersion(context.Context) (string, error) SubscribersCount() int32 SetAliveLoopSub(types.Subscription) + SetAliveLoopFinalizedHeadSub(types.Subscription) UnsubscribeAllExceptAliveLoop() IsSyncing(ctx context.Context) (bool, error) SubscribeToFinalizedHeads(_ context.Context) (<-chan HEAD, types.Subscription, error) diff --git a/core/chains/evm/client/mocks/rpc_client.go b/core/chains/evm/client/mocks/rpc_client.go index 5567b3f897..695698eb05 100644 --- a/core/chains/evm/client/mocks/rpc_client.go +++ b/core/chains/evm/client/mocks/rpc_client.go @@ -1754,6 +1754,39 @@ func (_c *RPCClient_SequenceAt_Call) RunAndReturn(run func(context.Context, comm return _c } +// SetAliveLoopFinalizedHeadSub provides a mock function with given fields: _a0 +func (_m *RPCClient) SetAliveLoopFinalizedHeadSub(_a0 commontypes.Subscription) { + _m.Called(_a0) +} + +// RPCClient_SetAliveLoopFinalizedHeadSub_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAliveLoopFinalizedHeadSub' +type RPCClient_SetAliveLoopFinalizedHeadSub_Call struct { + *mock.Call +} + +// SetAliveLoopFinalizedHeadSub is a helper method to define mock.On call +// - _a0 commontypes.Subscription +func (_e *RPCClient_Expecter) SetAliveLoopFinalizedHeadSub(_a0 interface{}) *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + return &RPCClient_SetAliveLoopFinalizedHeadSub_Call{Call: _e.mock.On("SetAliveLoopFinalizedHeadSub", _a0)} +} + +func (_c *RPCClient_SetAliveLoopFinalizedHeadSub_Call) Run(run func(_a0 commontypes.Subscription)) *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(commontypes.Subscription)) + }) + return _c +} + +func (_c *RPCClient_SetAliveLoopFinalizedHeadSub_Call) Return() *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + _c.Call.Return() + return _c +} + +func (_c *RPCClient_SetAliveLoopFinalizedHeadSub_Call) RunAndReturn(run func(commontypes.Subscription)) *RPCClient_SetAliveLoopFinalizedHeadSub_Call { + _c.Call.Return(run) + return _c +} + // SetAliveLoopSub provides a mock function with given fields: _a0 func (_m *RPCClient) SetAliveLoopSub(_a0 commontypes.Subscription) { _m.Called(_a0) diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index f55c35980d..ea324f6db7 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -137,7 +137,8 @@ type rpcClient struct { subs []ethereum.Subscription // Need to track the aliveLoop subscription, so we do not cancel it when checking lease on the MultiNode - aliveLoopSub ethereum.Subscription + aliveLoopHeadsSub ethereum.Subscription + aliveLoopFinalizedHeadsSub ethereum.Subscription // chStopInFlight can be closed to immediately cancel all in-flight requests on // this rpcClient. Closing and replacing should be serialized through @@ -368,11 +369,18 @@ func (r *rpcClient) unsubscribeAll() { } r.subs = nil } -func (r *rpcClient) SetAliveLoopSub(sub commontypes.Subscription) { +func (r *rpcClient) SetAliveLoopSub(headsSub commontypes.Subscription) { r.stateMu.Lock() defer r.stateMu.Unlock() - r.aliveLoopSub = sub + r.aliveLoopHeadsSub = headsSub +} + +func (r *rpcClient) SetAliveLoopFinalizedHeadSub(finalizedHeads commontypes.Subscription) { + r.stateMu.Lock() + defer r.stateMu.Unlock() + + r.aliveLoopFinalizedHeadsSub = finalizedHeads } // SubscribersCount returns the number of client subscribed to the node @@ -389,7 +397,7 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() { defer r.stateMu.Unlock() for _, s := range r.subs { - if s != r.aliveLoopSub { + if s != r.aliveLoopHeadsSub && s != r.aliveLoopFinalizedHeadsSub { s.Unsubscribe() } } diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 662c757ffb..7e97bd2aa2 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -318,6 +318,25 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { require.NoError(t, err) checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) }) + t.Run("UnsubscribeAllExceptAliveLoop should keep finalized heads subscription open", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + + _, sub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t)) + require.NoError(t, err) + rpc.SetAliveLoopFinalizedHeadSub(sub) + rpc.UnsubscribeAllExceptAliveLoop() + select { + case <-sub.Err(): + t.Fatal("Expected subscription to remain open") + default: + } + checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) + }) t.Run("Subscription error is properly wrapper", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL()