Skip to content

Commit

Permalink
pickfirst: Register a health listener when used as a leaf policy (grp…
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Dec 5, 2024
1 parent 5565631 commit 317271b
Show file tree
Hide file tree
Showing 2 changed files with 384 additions and 37 deletions.
173 changes: 136 additions & 37 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func init() {
balancer.Register(pickfirstBuilder{})
}

// enableHealthListenerKeyType is a unique key type used in resolver attributes
// to indicate whether the health listener usage is enabled.
type enableHealthListenerKeyType struct{}

var (
logger = grpclog.Component("pick-first-leaf-lb")
// Name is the name of the pick_first_leaf balancer.
Expand Down Expand Up @@ -109,10 +113,8 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions)
target: bo.Target.String(),
metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder.

addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cancelConnectionTimer: func() {},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
Expand All @@ -131,6 +133,13 @@ func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalan
return cfg, nil
}

// EnableHealthListener updates the state to configure pickfirst for using a
// generic health listener.
func EnableHealthListener(state resolver.State) resolver.State {
state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
return state
}

type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Expand All @@ -148,15 +157,19 @@ type scData struct {
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
rawConnectivityState connectivity.State
// The effective connectivity state based on raw connectivity, health state
// and after following sticky TransientFailure behaviour defined in A62.
effectiveState connectivity.State
lastErr error
connectionFailedInFirstPass bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
sd := &scData{
state: connectivity.Idle,
addr: addr,
rawConnectivityState: connectivity.Idle,
effectiveState: connectivity.Idle,
addr: addr,
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
Expand All @@ -181,14 +194,17 @@ type pickfirstBalancer struct {
// The mutex is used to ensure synchronization of updates triggered
// from the idle picker and the already serialized resolver,
// SubConn state updates.
mu sync.Mutex
mu sync.Mutex
// State reported to the channel based on SubConn states and resolver
// updates.
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
cancelConnectionTimer func()
healthCheckingEnabled bool
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand All @@ -214,7 +230,7 @@ func (b *pickfirstBalancer) resolverErrorLocked(err error) {
return
}

b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
Expand All @@ -227,12 +243,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.state = connectivity.TransientFailure
b.closeSubConnsLocked()
b.addressList.updateAddrs(nil)
b.resolverErrorLocked(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
Expand Down Expand Up @@ -279,12 +295,15 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
newAddrs = deDupAddresses(newAddrs)
newAddrs = interleaveAddresses(newAddrs)

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
prevSCData, found := b.subConns.Get(prevAddr)
prevAddrsCount := b.addressList.size()
isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready
b.addressList.updateAddrs(newAddrs)
if b.state == connectivity.Ready && b.addressList.seekTo(prevAddr) {

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
return nil
}

Expand All @@ -296,10 +315,9 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// we should still enter CONNECTING because the sticky TF behaviour
// mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
// due to connectivity failures.
if b.state == connectivity.Ready || b.state == connectivity.Connecting || prevAddrsCount == 0 {
if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
b.forceUpdateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
Expand Down Expand Up @@ -501,7 +519,7 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
}

scd := sd.(*scData)
switch scd.state {
switch scd.rawConnectivityState {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
Expand All @@ -519,7 +537,7 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState)
return

}
Expand Down Expand Up @@ -562,16 +580,17 @@ func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
sd.state = newState.ConnectivityState
oldState := sd.rawConnectivityState
sd.rawConnectivityState = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes this
// SubConn.
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
if !b.isActiveSCData(sd) {
return
}
if newState.ConnectivityState == connectivity.Shutdown {
sd.effectiveState = connectivity.Shutdown
return
}

Expand All @@ -590,10 +609,30 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.state = connectivity.Ready
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
if !b.healthCheckingEnabled {
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
}

sd.effectiveState = connectivity.Ready
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
return
}
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
}
// Send a CONNECTING update to take the SubConn out of sticky-TF if
// required.
sd.effectiveState = connectivity.Connecting
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
b.updateSubConnHealthState(sd, scs)
})
return
}
Expand All @@ -604,11 +643,13 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
if (b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
// part of the if condition below once the issue is fixed.
if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.shutdownRemainingLocked(sd)
b.state = connectivity.Idle
sd.effectiveState = newState.ConnectivityState
// READY SubConn interspliced in between CONNECTING and IDLE, need to
// account for that.
if oldState == connectivity.Connecting {
Expand All @@ -619,7 +660,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
}
disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
b.addressList.reset()
b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
})
Expand All @@ -629,19 +670,19 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
if b.firstPass {
switch newState.ConnectivityState {
case connectivity.Connecting:
// The balancer can be in either IDLE, CONNECTING or
// TRANSIENT_FAILURE. If it's in TRANSIENT_FAILURE, stay in
// The effective state can be in either IDLE, CONNECTING or
// TRANSIENT_FAILURE. If it's TRANSIENT_FAILURE, stay in
// TRANSIENT_FAILURE until it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.state == connectivity.Idle {
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
if sd.effectiveState != connectivity.TransientFailure {
sd.effectiveState = connectivity.Connecting
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
}
case connectivity.TransientFailure:
sd.lastErr = newState.ConnectionError
sd.effectiveState = connectivity.TransientFailure
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. Happy Eyeballs will also
Expand All @@ -668,7 +709,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
Expand Down Expand Up @@ -698,21 +739,79 @@ func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
}
}
b.firstPass = false
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: lastErr},
})
// Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.state == connectivity.Idle {
if sd.rawConnectivityState == connectivity.Idle {
sd.subConn.Connect()
}
}
}

func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
activeSD, found := b.subConns.Get(sd.addr)
return found && activeSD == sd
}

func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes
// this SubConn.
if !b.isActiveSCData(sd) {
return
}
sd.effectiveState = state.ConnectivityState
switch state.ConnectivityState {
case connectivity.Ready:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
case connectivity.TransientFailure:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
})
case connectivity.Connecting:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
default:
b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
}
}

// updateBalancerState stores the state reported to the channel and calls
// ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
// updates to the channel.
func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
// In case of TransientFailures allow the picker to be updated to update
// the connectivity error, in all other cases don't send duplicate state
// updates.
if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
return
}
b.forceUpdateConcludedStateLocked(newState)
}

// forceUpdateConcludedStateLocked stores the state reported to the channel and
// calls ClientConn.UpdateState().
// A separate function is defined to force update the ClientConn state since the
// channel doesn't correctly assume that LB policies start in CONNECTING and
// relies on LB policy to send an initial CONNECTING update.
func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
b.state = newState.ConnectivityState
b.cc.UpdateState(newState)
}

type picker struct {
result balancer.PickResult
err error
Expand Down
Loading

0 comments on commit 317271b

Please sign in to comment.