From e78d3b81fa50dc05d3f7b6c2777a9dbc8a00f1ad Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Tue, 13 Feb 2024 16:57:19 +0100 Subject: [PATCH] do not call an RPC if it's not Alive (#11999) --- common/client/multi_node.go | 26 +++++++++--- common/client/multi_node_test.go | 70 +++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index c03c3fbcd61..ed1a2700c71 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -406,6 +406,10 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP // main node is used at the end for the return value continue } + + if n.State() != nodeStateAlive { + continue + } // Parallel call made to all other nodes with ignored return value wg.Add(1) go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { @@ -575,11 +579,14 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } // collectTxResults - refer to SendTransaction comment for implementation details, -func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) error { +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan sendTxResult) error { + if healthyNodesNum == 0 { + return ErroringNodeError + } // combine context and stop channel to ensure we stop, when signal received ctx, cancel := c.chStop.Ctx(ctx) defer cancel() - requiredResults := int(math.Ceil(float64(len(c.nodes)) * sendTxQuorum)) + requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) errorsByCode := map[SendTxReturnCode][]error{} var softTimeoutChan <-chan time.Time var resultsCount int @@ -685,12 +692,16 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return ErroringNodeError } + healthyNodesNum := 0 txResults := make(chan sendTxResult, len(c.nodes)) // Must wrap inside IfNotStopped to avoid waitgroup racing with Close ok := c.IfNotStopped(func() { - c.wg.Add(len(c.sendonlys)) // fire-n-forget, as sendOnlyNodes can not be trusted with result reporting for _, n := range c.sendonlys { + if n.State() != nodeStateAlive { + continue + } + c.wg.Add(1) go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { defer c.wg.Done() c.broadcastTxAsync(ctx, n, tx) @@ -698,9 +709,14 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } var primaryBroadcastWg sync.WaitGroup - primaryBroadcastWg.Add(len(c.nodes)) txResultsToReport := make(chan sendTxResult, len(c.nodes)) for _, n := range c.nodes { + if n.State() != nodeStateAlive { + continue + } + + healthyNodesNum++ + primaryBroadcastWg.Add(1) go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { defer primaryBroadcastWg.Done() result := c.broadcastTxAsync(ctx, n, tx) @@ -727,7 +743,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return fmt.Errorf("aborted while broadcasting tx - multiNode is stopped: %w", context.Canceled) } - return c.collectTxResults(ctx, tx, txResults) + return c.collectTxResults(ctx, tx, healthyNodesNum, txResults) } // findFirstIn - returns first existing value for the slice of keys diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 3ffc82572c2..dabaae57c5d 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -535,8 +535,10 @@ func TestMultiNode_BatchCallContextAll(t *testing.T) { // setup ok and failed auxiliary nodes okNode := newMockSendOnlyNode[types.ID, multiNodeRPCClient](t) okNode.On("RPC").Return(okRPC).Once() + okNode.On("State").Return(nodeStateAlive) failedNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) failedNode.On("RPC").Return(failedRPC).Once() + failedNode.On("State").Return(nodeStateAlive) // setup main node mainNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) @@ -557,6 +559,34 @@ func TestMultiNode_BatchCallContextAll(t *testing.T) { require.NoError(t, err) tests.RequireLogMessage(t, observedLogs, "Secondary node BatchCallContext failed") }) + t.Run("Does not call BatchCallContext for unhealthy nodes", func(t *testing.T) { + // setup RPCs + okRPC := newMultiNodeRPCClient(t) + okRPC.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Twice() + + // setup ok and failed auxiliary nodes + healthyNode := newMockSendOnlyNode[types.ID, multiNodeRPCClient](t) + healthyNode.On("RPC").Return(okRPC).Once() + healthyNode.On("State").Return(nodeStateAlive) + deadNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) + deadNode.On("State").Return(nodeStateUnreachable) + + // setup main node + mainNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) + mainNode.On("RPC").Return(okRPC) + nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t) + nodeSelector.On("Select").Return(mainNode).Once() + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: types.RandomID(), + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{deadNode, mainNode}, + sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{healthyNode, deadNode}, + }) + mn.nodeSelector = nodeSelector + + err := mn.BatchCallContextAll(tests.Context(t), nil) + require.NoError(t, err) + }) } func TestMultiNode_SendTransaction(t *testing.T) { @@ -568,15 +598,20 @@ func TestMultiNode_SendTransaction(t *testing.T) { return Successful } - newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] { + newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] { rpc := newMultiNodeRPCClient(t) rpc.On("SendTransaction", mock.Anything, mock.Anything).Return(txErr).Run(sendTxRun).Maybe() node := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) node.On("String").Return("node name").Maybe() node.On("RPC").Return(rpc).Maybe() + node.On("State").Return(state).Maybe() node.On("Close").Return(nil).Once() return node } + + newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] { + return newNodeWithState(t, nodeStateAlive, txErr, sendTxRun) + } newStartedMultiNode := func(t *testing.T, opts multiNodeOpts) testMultiNode { mn := newTestMultiNode(t, opts) err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) @@ -714,6 +749,39 @@ func TestMultiNode_SendTransaction(t *testing.T) { err = mn.SendTransaction(tests.Context(t), nil) require.EqualError(t, err, "aborted while broadcasting tx - multiNode is stopped: context canceled") }) + t.Run("Returns error if there is no healthy primary nodes", func(t *testing.T) { + mn := newStartedMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: types.RandomID(), + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{newNodeWithState(t, nodeStateUnreachable, nil, nil)}, + sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newNodeWithState(t, nodeStateUnreachable, nil, nil)}, + classifySendTxError: classifySendTxError, + }) + err := mn.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, err, ErroringNodeError.Error()) + }) + t.Run("Transaction success even if one of the nodes is unhealthy", func(t *testing.T) { + chainID := types.RandomID() + mainNode := newNode(t, nil, nil) + unexpectedCall := func(args mock.Arguments) { + panic("SendTx must not be called for unhealthy node") + } + unhealthyNode := newNodeWithState(t, nodeStateUnreachable, nil, unexpectedCall) + unhealthySendOnlyNode := newNodeWithState(t, nodeStateUnreachable, nil, unexpectedCall) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + mn := newStartedMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode, unhealthyNode}, + sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{unhealthySendOnlyNode, newNode(t, errors.New("unexpected error"), nil)}, + classifySendTxError: classifySendTxError, + logger: lggr, + }) + err := mn.SendTransaction(tests.Context(t), nil) + require.NoError(t, err) + tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) + tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 1) + }) } func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {