Skip to content

Commit

Permalink
enhance: [cherry-pick] Add client connect wrapper to keep connection …
Browse files Browse the repository at this point in the history
…alive (#29061)

Cherry-pick from master
pr: #29058
See also #29057
Add wrapper to maintain client&connection
When reset operation is needed, close method shall wait until all
on-going request return

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Dec 8, 2023
1 parent 273be6c commit 0b532b4
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 35 deletions.
88 changes: 58 additions & 30 deletions internal/util/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,41 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// GrpcClient abstracts client of grpc
type GrpcClient[T interface {
type GrpcComponent interface {
GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error)
}] interface {
}

// clientConnWrapper is the wrapper for client & conn.
type clientConnWrapper[T GrpcComponent] struct {
client T
conn *grpc.ClientConn
mut sync.RWMutex
}

func (c *clientConnWrapper[T]) Pin() {
c.mut.RLock()
}

func (c *clientConnWrapper[T]) Unpin() {
c.mut.RUnlock()
}

func (c *clientConnWrapper[T]) Close() error {
if c.conn != nil {
c.mut.Lock()
defer c.mut.Unlock()
return c.conn.Close()
}
return nil
}

// GrpcClient abstracts client of grpc
type GrpcClient[T GrpcComponent] interface {
SetRole(string)
GetRole() string
SetGetAddrFunc(func() (string, error))
EnableEncryption()
SetNewGrpcClientFunc(func(cc *grpc.ClientConn) T)
GetGrpcClient(ctx context.Context) (T, error)
ReCall(ctx context.Context, caller func(client T) (any, error)) (any, error)
Call(ctx context.Context, caller func(client T) (any, error)) (any, error)
Close() error
Expand All @@ -76,10 +101,11 @@ type ClientBase[T interface {
getAddrFunc func() (string, error)
newGrpcClient func(cc *grpc.ClientConn) T

grpcClient T
encryption bool
addr atomic.String
conn *grpc.ClientConn
// grpcClient T
grpcClient *clientConnWrapper[T]
encryption bool
addr atomic.String
// conn *grpc.ClientConn
grpcClientMtx sync.RWMutex
role string
ClientMaxSendSize int
Expand Down Expand Up @@ -160,7 +186,7 @@ func (c *ClientBase[T]) SetNewGrpcClientFunc(f func(cc *grpc.ClientConn) T) {
}

// GetGrpcClient returns grpc client
func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (T, error) {
func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (*clientConnWrapper[T], error) {
c.grpcClientMtx.RLock()

if !generic.IsZero(c.grpcClient) {
Expand All @@ -178,13 +204,13 @@ func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (T, error) {

err := c.connect(ctx)
if err != nil {
return generic.Zero[T](), err
return nil, err
}

return c.grpcClient, nil
}

func (c *ClientBase[T]) resetConnection(client T) {
func (c *ClientBase[T]) resetConnection(wrapper *clientConnWrapper[T]) {
if time.Since(c.lastReset.Load()) < c.minResetInterval {
return
}
Expand All @@ -196,15 +222,16 @@ func (c *ClientBase[T]) resetConnection(client T) {
if generic.IsZero(c.grpcClient) {
return
}
if !generic.Equal(client, c.grpcClient) {
if c.grpcClient != wrapper {
return
}
if c.conn != nil {
_ = c.conn.Close()
}
c.conn = nil
// wrapper close may block waiting pending request finish
go func(w *clientConnWrapper[T], addr string) {
w.Close()
log.Info("previous client closed", zap.String("role", c.role), zap.String("addr", c.addr.Load()))
}(c.grpcClient, c.addr.Load())
c.addr.Store("")
c.grpcClient = generic.Zero[T]()
c.grpcClient = nil
c.lastReset.Store(time.Now())
}

Expand Down Expand Up @@ -310,14 +337,13 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
if err != nil {
return wrapErrConnect(addr, err)
}
if c.conn != nil {
_ = c.conn.Close()
}

c.conn = conn
c.addr.Store(addr)
c.ctxCounter.Store(0)
c.grpcClient = c.newGrpcClient(c.conn)
c.grpcClient = &clientConnWrapper[T]{
client: c.newGrpcClient(conn),
conn: conn,
}
return nil
}

Expand Down Expand Up @@ -408,17 +434,17 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
var (
ret any
clientErr error
client T
wrapper *clientConnWrapper[T]
)

client, clientErr = c.GetGrpcClient(ctx)
wrapper, clientErr = c.GetGrpcClient(ctx)
if clientErr != nil {
log.Warn("fail to get grpc client", zap.Error(clientErr))
}

resetClientFunc := func() {
c.resetConnection(client)
client, clientErr = c.GetGrpcClient(ctx)
c.resetConnection(wrapper)
wrapper, clientErr = c.GetGrpcClient(ctx)
if clientErr != nil {
log.Warn("fail to get grpc client in the retry state", zap.Error(clientErr))
}
Expand All @@ -427,7 +453,7 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := retry.Do(ctx, func() error {
if generic.IsZero(client) {
if wrapper == nil {
if ok, err := c.checkNodeSessionExist(ctx); !ok {
// if session doesn't exist, no need to reset connection for datanode/indexnode/querynode
return retry.Unrecoverable(err)
Expand All @@ -438,8 +464,10 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
resetClientFunc()
return err
}
wrapper.Pin()
var err error
ret, err = caller(client)
ret, err = caller(wrapper.client)
wrapper.Unpin()
if err != nil {
var needRetry, needReset bool
needRetry, needReset, err = c.checkGrpcErr(ctx, err)
Expand Down Expand Up @@ -533,8 +561,8 @@ func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any,
func (c *ClientBase[T]) Close() error {
c.grpcClientMtx.Lock()
defer c.grpcClientMtx.Unlock()
if c.conn != nil {
return c.conn.Close()
if c.grpcClient != nil {
return c.grpcClient.Close()
}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions internal/util/grpcclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) {

// test querynode/datanode/indexnode/proxy already down, but new node start up with same ip and port
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}}
base.grpcClientMtx.Unlock()
_, err = base.Call(ctx, func(client *mockClient) (any, error) {
return struct{}{}, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error())
Expand All @@ -127,7 +127,7 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) {

// test querynode/datanode/indexnode/proxy down, return unavailable error
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}}
base.grpcClientMtx.Unlock()
_, err = base.Call(ctx, func(client *mockClient) (any, error) {
return struct{}{}, status.Errorf(codes.Unavailable, "fake error")
Expand All @@ -152,7 +152,7 @@ func testCall(t *testing.T, compressed bool) {
base.CompressionEnabled = compressed
initClient := func() {
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}}
base.grpcClientMtx.Unlock()
}
base.MaxAttempts = 1
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestClientBase_Recall(t *testing.T) {
base := ClientBase[*mockClient]{}
initClient := func() {
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}}
base.grpcClientMtx.Unlock()
}
base.MaxAttempts = 1
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestClientBase_Recall(t *testing.T) {

func TestClientBase_CheckGrpcError(t *testing.T) {
base := ClientBase[*mockClient]{}
base.grpcClient = &mockClient{}
base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}}
base.MaxAttempts = 1

ctx := context.Background()
Expand Down

0 comments on commit 0b532b4

Please sign in to comment.