Skip to content

Commit

Permalink
Force RPC switching interval (#10726)
Browse files Browse the repository at this point in the history
* Initial draft

* - Fix tests
- Add subscriber count

* - Fix resolver test

* - Update SubscribersCount check

* - Update CHANGELOG.md

* - Fix typo in CHANGELOG.md

* - Fix typo in CONFIG.md

* - Update naming

* - Update config comment

* - Fix data race in test

* - Update LeaseDuration definition
- Add read lock for SubscribersCount
- Change log level when LeaseDuration is disabled

* - Add waitgroup
- Reset lease when new best node is selected

* - Don't cancel alive loop subscription
- Change active node when doing lease check

* - Update config description
  • Loading branch information
george-dorin authored Oct 2, 2023
1 parent 07d4038 commit 0adc90c
Show file tree
Hide file tree
Showing 25 changed files with 261 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func newEthClientFromChain(cfg evmconfig.NodePool, noNewHeadsThreshold time.Dura
primaries = append(primaries, primary)
}
}
return evmclient.NewClientWithNodes(lggr, cfg.SelectionMode(), noNewHeadsThreshold, primaries, sendonlys, chainID, chainType)
return evmclient.NewClientWithNodes(lggr, cfg.SelectionMode(), cfg.LeaseDuration(), noNewHeadsThreshold, primaries, sendonlys, chainID, chainType)
}

func newPrimary(cfg evmconfig.NodePool, noNewHeadsThreshold time.Duration, lggr logger.Logger, n *toml.Node, id int32, chainID *big.Int) (evmclient.Node, error) {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ var _ htrktypes.Client[*evmtypes.Head, ethereum.Subscription, *big.Int, common.H

// NewClientWithNodes instantiates a client from a list of nodes
// Currently only supports one primary
func NewClientWithNodes(logger logger.Logger, selectionMode string, noNewHeadsThreshold time.Duration, primaryNodes []Node, sendOnlyNodes []SendOnlyNode, chainID *big.Int, chainType config.ChainType) (*client, error) {
pool := NewPool(logger, selectionMode, noNewHeadsThreshold, primaryNodes, sendOnlyNodes, chainID, chainType)
func NewClientWithNodes(logger logger.Logger, selectionMode string, leaseDuration time.Duration, noNewHeadsThreshold time.Duration, primaryNodes []Node, sendOnlyNodes []SendOnlyNode, chainID *big.Int, chainType config.ChainType) (*client, error) {
pool := NewPool(logger, selectionMode, leaseDuration, noNewHeadsThreshold, primaryNodes, sendOnlyNodes, chainID, chainType)
return &client{
logger: logger,
pool: pool,
Expand Down
6 changes: 6 additions & 0 deletions core/chains/evm/client/erroring_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type erroringNode struct {
errMsg string
}

func (e *erroringNode) UnsubscribeAllExceptAliveLoop() {}

func (e *erroringNode) SubscribersCount() int32 {
return 0
}

func (e *erroringNode) ChainID() (chainID *big.Int) { return nil }

func (e *erroringNode) Start(ctx context.Context) error { return errors.New(e.errMsg) }
Expand Down
6 changes: 5 additions & 1 deletion core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ type TestNodePoolConfig struct {
NodePollInterval time.Duration
NodeSelectionMode string
NodeSyncThreshold uint32
NodeLeaseDuration time.Duration
}

func (tc TestNodePoolConfig) PollFailureThreshold() uint32 { return tc.NodePollFailureThreshold }
func (tc TestNodePoolConfig) PollInterval() time.Duration { return tc.NodePollInterval }
func (tc TestNodePoolConfig) SelectionMode() string { return tc.NodeSelectionMode }
func (tc TestNodePoolConfig) SyncThreshold() uint32 { return tc.NodeSyncThreshold }
func (tc TestNodePoolConfig) LeaseDuration() time.Duration {
return tc.NodeLeaseDuration
}

func NewClientWithTestNode(t *testing.T, nodePoolCfg config.NodePool, noNewHeadsThreshold time.Duration, rpcUrl string, rpcHTTPURL *url.URL, sendonlyRPCURLs []url.URL, id int32, chainID *big.Int) (*client, error) {
parsed, err := url.ParseRequestURI(rpcUrl)
Expand All @@ -50,7 +54,7 @@ func NewClientWithTestNode(t *testing.T, nodePoolCfg config.NodePool, noNewHeads
sendonlys = append(sendonlys, s)
}

pool := NewPool(lggr, nodePoolCfg.SelectionMode(), noNewHeadsThreshold, primaries, sendonlys, chainID, "")
pool := NewPool(lggr, nodePoolCfg.SelectionMode(), nodePoolCfg.LeaseDuration(), noNewHeadsThreshold, primaries, sendonlys, chainID, "")
c := &client{logger: lggr, pool: pool}
t.Cleanup(c.Close)
return c, nil
Expand Down
25 changes: 25 additions & 0 deletions core/chains/evm/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type Node interface {
Name() string
ChainID() *big.Int
Order() int32
SubscribersCount() int32
UnsubscribeAllExceptAliveLoop()

CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
Expand Down Expand Up @@ -153,6 +155,9 @@ type node struct {
// close the underlying subscription
subs []ethereum.Subscription

// Need to track the aliveLoop subscription, so we do not cancel it when checking lease
aliveLoopSub ethereum.Subscription

// chStopInFlight can be closed to immediately cancel all in-flight requests on
// this node. Closing and replacing should be serialized through
// stateMu since it can happen on state transitions as well as node Close.
Expand Down Expand Up @@ -380,6 +385,26 @@ func (n *node) disconnectAll() {
n.unsubscribeAll()
}

// SubscribersCount returns the number of client subscribed to the node
func (n *node) SubscribersCount() int32 {
n.stateMu.RLock()
defer n.stateMu.RUnlock()
return int32(len(n.subs))
}

// UnsubscribeAllExceptAliveLoop disconnects all subscriptions to the node except the alive loop subscription
// while holding the n.stateMu lock
func (n *node) UnsubscribeAllExceptAliveLoop() {
n.stateMu.Lock()
defer n.stateMu.Unlock()

for _, s := range n.subs {
if s != n.aliveLoopSub {
s.Unsubscribe()
}
}
}

// cancelInflightRequests closes and replaces the chStopInFlight
// WARNING: NOT THREAD-SAFE
// This must be called from within the n.stateMu lock
Expand Down
52 changes: 50 additions & 2 deletions core/chains/evm/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type NodeSelector interface {
type PoolConfig interface {
NodeSelectionMode() string
NodeNoNewHeadsThreshold() time.Duration
LeaseDuration() time.Duration
}

// Pool represents an abstraction over one or more primary nodes
Expand All @@ -65,6 +66,8 @@ type Pool struct {
selectionMode string
noNewHeadsThreshold time.Duration
nodeSelector NodeSelector
leaseDuration time.Duration
leaseTicker *time.Ticker

activeMu sync.RWMutex
activeNode Node
Expand All @@ -73,7 +76,7 @@ type Pool struct {
wg sync.WaitGroup
}

func NewPool(logger logger.Logger, selectionMode string, noNewHeadsTreshold time.Duration, nodes []Node, sendonlys []SendOnlyNode, chainID *big.Int, chainType config.ChainType) *Pool {
func NewPool(logger logger.Logger, selectionMode string, leaseDuration time.Duration, noNewHeadsTreshold time.Duration, nodes []Node, sendonlys []SendOnlyNode, chainID *big.Int, chainType config.ChainType) *Pool {
if chainID == nil {
panic("chainID is required")
}
Expand Down Expand Up @@ -105,6 +108,7 @@ func NewPool(logger logger.Logger, selectionMode string, noNewHeadsTreshold time
noNewHeadsThreshold: noNewHeadsTreshold,
nodeSelector: nodeSelector,
chStop: make(chan struct{}),
leaseDuration: leaseDuration,
}

p.logger.Debugf("The pool is configured to use NodeSelectionMode: %s", selectionMode)
Expand Down Expand Up @@ -150,6 +154,14 @@ func (p *Pool) Dial(ctx context.Context) error {
p.wg.Add(1)
go p.runLoop()

if p.leaseDuration.Seconds() > 0 && p.selectionMode != NodeSelectionMode_RoundRobin {
p.logger.Infof("The pool will switch to best node every %s", p.leaseDuration.String())
p.wg.Add(1)
go p.checkLeaseLoop()
} else {
p.logger.Info("Best node switching is disabled")
}

return nil
})
}
Expand All @@ -172,6 +184,39 @@ func (p *Pool) nLiveNodes() (nLiveNodes int, blockNumber int64, totalDifficulty
return
}

func (p *Pool) checkLease() {
bestNode := p.nodeSelector.Select()
for _, n := range p.nodes {
// Terminate client subscriptions. Services are responsible for reconnecting, which will be routed to the new
// best node. Only terminate connections with more than 1 subscription to account for the aliveLoop subscription
if n.State() == NodeStateAlive && n != bestNode && n.SubscribersCount() > 1 {
p.logger.Infof("Switching to best node from %q to %q", n.String(), bestNode.String())
n.UnsubscribeAllExceptAliveLoop()
}
}

if bestNode != p.activeNode {
p.activeMu.Lock()
p.activeNode = bestNode
p.activeMu.Unlock()
}
}

func (p *Pool) checkLeaseLoop() {
defer p.wg.Done()
p.leaseTicker = time.NewTicker(p.leaseDuration)
defer p.leaseTicker.Stop()

for {
select {
case <-p.leaseTicker.C:
p.checkLease()
case <-p.chStop:
return
}
}
}

func (p *Pool) runLoop() {
defer p.wg.Done()

Expand Down Expand Up @@ -271,6 +316,9 @@ func (p *Pool) selectNode() (node Node) {
return &erroringNode{errMsg: errmsg.Error()}
}

if p.leaseTicker != nil {
p.leaseTicker.Reset(p.leaseDuration)
}
return p.activeNode
}

Expand Down Expand Up @@ -317,7 +365,7 @@ func (p *Pool) BatchCallContextAll(ctx context.Context, b []rpc.BatchElem) error
return main.BatchCallContext(ctx, b)
}

// Wrapped Geth client methods
// SendTransaction wrapped Geth client methods
func (p *Pool) SendTransaction(ctx context.Context, tx *types.Transaction) error {
main := p.selectNode()
var all []SendOnlyNode
Expand Down
70 changes: 67 additions & 3 deletions core/chains/evm/client/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"

Expand All @@ -27,6 +28,7 @@ import (
type poolConfig struct {
selectionMode string
noNewHeadsThreshold time.Duration
leaseDuration time.Duration
}

func (c poolConfig) NodeSelectionMode() string {
Expand All @@ -37,9 +39,14 @@ func (c poolConfig) NodeNoNewHeadsThreshold() time.Duration {
return c.noNewHeadsThreshold
}

func (c poolConfig) LeaseDuration() time.Duration {
return c.leaseDuration
}

var defaultConfig evmclient.PoolConfig = &poolConfig{
selectionMode: evmclient.NodeSelectionMode_RoundRobin,
noNewHeadsThreshold: 0,
leaseDuration: time.Second * 0,
}

func TestPool_Dial(t *testing.T) {
Expand Down Expand Up @@ -157,7 +164,7 @@ func TestPool_Dial(t *testing.T) {
for i, n := range test.sendNodes {
sendNodes[i] = n.newSendOnlyNode(t, test.sendNodeChainID)
}
p := evmclient.NewPool(logger.TestLogger(t), defaultConfig.NodeSelectionMode(), time.Second*0, nodes, sendNodes, test.poolChainID, "")
p := evmclient.NewPool(logger.TestLogger(t), defaultConfig.NodeSelectionMode(), defaultConfig.LeaseDuration(), time.Second*0, nodes, sendNodes, test.poolChainID, "")
err := p.Dial(ctx)
if err == nil {
t.Cleanup(func() { assert.NoError(t, p.Close()) })
Expand Down Expand Up @@ -250,7 +257,7 @@ func TestUnit_Pool_RunLoop(t *testing.T) {
nodes := []evmclient.Node{n1, n2, n3}

lggr, observedLogs := logger.TestLoggerObserved(t, zap.ErrorLevel)
p := evmclient.NewPool(lggr, defaultConfig.NodeSelectionMode(), time.Second*0, nodes, []evmclient.SendOnlyNode{}, &cltest.FixtureChainID, "")
p := evmclient.NewPool(lggr, defaultConfig.NodeSelectionMode(), defaultConfig.LeaseDuration(), time.Second*0, nodes, []evmclient.SendOnlyNode{}, &cltest.FixtureChainID, "")

n1.On("String").Maybe().Return("n1")
n2.On("String").Maybe().Return("n2")
Expand Down Expand Up @@ -324,9 +331,66 @@ func TestUnit_Pool_BatchCallContextAll(t *testing.T) {
sendonlys = append(sendonlys, s)
}

p := evmclient.NewPool(logger.TestLogger(t), defaultConfig.NodeSelectionMode(), time.Second*0, nodes, sendonlys, &cltest.FixtureChainID, "")
p := evmclient.NewPool(logger.TestLogger(t), defaultConfig.NodeSelectionMode(), defaultConfig.LeaseDuration(), time.Second*0, nodes, sendonlys, &cltest.FixtureChainID, "")

assert.True(t, p.ChainType().IsValid())
assert.False(t, p.ChainType().IsL2())
require.NoError(t, p.BatchCallContextAll(ctx, b))
}

func TestUnit_Pool_LeaseDuration(t *testing.T) {
t.Parallel()

n1 := evmmocks.NewNode(t)
n2 := evmmocks.NewNode(t)
nodes := []evmclient.Node{n1, n2}
type nodeStateSwitch struct {
isAlive bool
mu sync.RWMutex
}

nodeSwitch := nodeStateSwitch{
isAlive: true,
mu: sync.RWMutex{},
}

n1.On("String").Maybe().Return("n1")
n2.On("String").Maybe().Return("n2")
n1.On("Close").Maybe().Return(nil)
n2.On("Close").Maybe().Return(nil)
n2.On("UnsubscribeAllExceptAliveLoop").Return()
n2.On("SubscribersCount").Return(int32(2))

n1.On("Start", mock.Anything).Return(nil).Once()
n1.On("State").Return(func() evmclient.NodeState {
nodeSwitch.mu.RLock()
defer nodeSwitch.mu.RUnlock()
if nodeSwitch.isAlive {
return evmclient.NodeStateAlive
}
return evmclient.NodeStateOutOfSync
})
n1.On("Order").Return(int32(1))
n1.On("ChainID").Return(testutils.FixtureChainID).Once()

n2.On("Start", mock.Anything).Return(nil).Once()
n2.On("State").Return(evmclient.NodeStateAlive)
n2.On("Order").Return(int32(2))
n2.On("ChainID").Return(testutils.FixtureChainID).Once()

lggr, observedLogs := logger.TestLoggerObserved(t, zap.InfoLevel)
p := evmclient.NewPool(lggr, "PriorityLevel", time.Second*2, time.Second*0, nodes, []evmclient.SendOnlyNode{}, &cltest.FixtureChainID, "")
require.NoError(t, p.Dial(testutils.Context(t)))
t.Cleanup(func() { assert.NoError(t, p.Close()) })

testutils.WaitForLogMessage(t, observedLogs, "The pool will switch to best node every 2s")
nodeSwitch.mu.Lock()
nodeSwitch.isAlive = false
nodeSwitch.mu.Unlock()
testutils.WaitForLogMessage(t, observedLogs, "At least one EVM primary node is dead")
nodeSwitch.mu.Lock()
nodeSwitch.isAlive = true
nodeSwitch.mu.Unlock()
testutils.WaitForLogMessage(t, observedLogs, `Switching to best node from "n2" to "n1"`)

}
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped_node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ func (n *nodePoolConfig) SelectionMode() string {
func (n *nodePoolConfig) SyncThreshold() uint32 {
return *n.c.SyncThreshold
}

func (n *nodePoolConfig) LeaseDuration() time.Duration {
return n.c.LeaseDuration.Duration()
}
1 change: 1 addition & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type NodePool interface {
PollInterval() time.Duration
SelectionMode() string
SyncThreshold() uint32
LeaseDuration() time.Duration
}

// TODO BCF-2509 does the chainscopedconfig really need the entire app config?
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ type NodePool struct {
PollInterval *models.Duration
SelectionMode *string
SyncThreshold *uint32
LeaseDuration *models.Duration
}

func (p *NodePool) setFrom(f *NodePool) {
Expand All @@ -704,6 +705,9 @@ func (p *NodePool) setFrom(f *NodePool) {
if v := f.SyncThreshold; v != nil {
p.SyncThreshold = v
}
if v := f.LeaseDuration; v != nil {
p.LeaseDuration = v
}
}

type OCR struct {
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ PollFailureThreshold = 5
PollInterval = '10s'
SelectionMode = 'HighestHead'
SyncThreshold = 5
LeaseDuration = '0s'

[OCR]
ContractConfirmations = 4
Expand Down
Loading

0 comments on commit 0adc90c

Please sign in to comment.