From 0b532b42ea8efa82d0d7697998e559da02278d44 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 8 Dec 2023 21:12:35 +0800 Subject: [PATCH] enhance: [cherry-pick] Add client connect wrapper to keep connection alive (#29061) Cherry-pick from master pr: #29058 See also #29057 Add wrapper to maintain client&connection When reset operation is needed, close method shall wait until all on-going request return --------- Signed-off-by: Congqi Xia --- internal/util/grpcclient/client.go | 88 ++++++++++++++++--------- internal/util/grpcclient/client_test.go | 10 +-- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index d9ec46d6a37e7..9a60d59150542 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -51,16 +51,41 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -// GrpcClient abstracts client of grpc -type GrpcClient[T interface { +type GrpcComponent interface { GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) -}] interface { +} + +// clientConnWrapper is the wrapper for client & conn. +type clientConnWrapper[T GrpcComponent] struct { + client T + conn *grpc.ClientConn + mut sync.RWMutex +} + +func (c *clientConnWrapper[T]) Pin() { + c.mut.RLock() +} + +func (c *clientConnWrapper[T]) Unpin() { + c.mut.RUnlock() +} + +func (c *clientConnWrapper[T]) Close() error { + if c.conn != nil { + c.mut.Lock() + defer c.mut.Unlock() + return c.conn.Close() + } + return nil +} + +// GrpcClient abstracts client of grpc +type GrpcClient[T GrpcComponent] interface { SetRole(string) GetRole() string SetGetAddrFunc(func() (string, error)) EnableEncryption() SetNewGrpcClientFunc(func(cc *grpc.ClientConn) T) - GetGrpcClient(ctx context.Context) (T, error) ReCall(ctx context.Context, caller func(client T) (any, error)) (any, error) Call(ctx context.Context, caller func(client T) (any, error)) (any, error) Close() error @@ -76,10 +101,11 @@ type ClientBase[T interface { getAddrFunc func() (string, error) newGrpcClient func(cc *grpc.ClientConn) T - grpcClient T - encryption bool - addr atomic.String - conn *grpc.ClientConn + // grpcClient T + grpcClient *clientConnWrapper[T] + encryption bool + addr atomic.String + // conn *grpc.ClientConn grpcClientMtx sync.RWMutex role string ClientMaxSendSize int @@ -160,7 +186,7 @@ func (c *ClientBase[T]) SetNewGrpcClientFunc(f func(cc *grpc.ClientConn) T) { } // GetGrpcClient returns grpc client -func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (T, error) { +func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (*clientConnWrapper[T], error) { c.grpcClientMtx.RLock() if !generic.IsZero(c.grpcClient) { @@ -178,13 +204,13 @@ func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (T, error) { err := c.connect(ctx) if err != nil { - return generic.Zero[T](), err + return nil, err } return c.grpcClient, nil } -func (c *ClientBase[T]) resetConnection(client T) { +func (c *ClientBase[T]) resetConnection(wrapper *clientConnWrapper[T]) { if time.Since(c.lastReset.Load()) < c.minResetInterval { return } @@ -196,15 +222,16 @@ func (c *ClientBase[T]) resetConnection(client T) { if generic.IsZero(c.grpcClient) { return } - if !generic.Equal(client, c.grpcClient) { + if c.grpcClient != wrapper { return } - if c.conn != nil { - _ = c.conn.Close() - } - c.conn = nil + // wrapper close may block waiting pending request finish + go func(w *clientConnWrapper[T], addr string) { + w.Close() + log.Info("previous client closed", zap.String("role", c.role), zap.String("addr", c.addr.Load())) + }(c.grpcClient, c.addr.Load()) c.addr.Store("") - c.grpcClient = generic.Zero[T]() + c.grpcClient = nil c.lastReset.Store(time.Now()) } @@ -310,14 +337,13 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { if err != nil { return wrapErrConnect(addr, err) } - if c.conn != nil { - _ = c.conn.Close() - } - c.conn = conn c.addr.Store(addr) c.ctxCounter.Store(0) - c.grpcClient = c.newGrpcClient(c.conn) + c.grpcClient = &clientConnWrapper[T]{ + client: c.newGrpcClient(conn), + conn: conn, + } return nil } @@ -408,17 +434,17 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er var ( ret any clientErr error - client T + wrapper *clientConnWrapper[T] ) - client, clientErr = c.GetGrpcClient(ctx) + wrapper, clientErr = c.GetGrpcClient(ctx) if clientErr != nil { log.Warn("fail to get grpc client", zap.Error(clientErr)) } resetClientFunc := func() { - c.resetConnection(client) - client, clientErr = c.GetGrpcClient(ctx) + c.resetConnection(wrapper) + wrapper, clientErr = c.GetGrpcClient(ctx) if clientErr != nil { log.Warn("fail to get grpc client in the retry state", zap.Error(clientErr)) } @@ -427,7 +453,7 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er ctx, cancel := context.WithCancel(ctx) defer cancel() err := retry.Do(ctx, func() error { - if generic.IsZero(client) { + if wrapper == nil { if ok, err := c.checkNodeSessionExist(ctx); !ok { // if session doesn't exist, no need to reset connection for datanode/indexnode/querynode return retry.Unrecoverable(err) @@ -438,8 +464,10 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er resetClientFunc() return err } + wrapper.Pin() var err error - ret, err = caller(client) + ret, err = caller(wrapper.client) + wrapper.Unpin() if err != nil { var needRetry, needReset bool needRetry, needReset, err = c.checkGrpcErr(ctx, err) @@ -533,8 +561,8 @@ func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any, func (c *ClientBase[T]) Close() error { c.grpcClientMtx.Lock() defer c.grpcClientMtx.Unlock() - if c.conn != nil { - return c.conn.Close() + if c.grpcClient != nil { + return c.grpcClient.Close() } return nil } diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index 421e4b0ef9c83..e4a4ee3ea97c7 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -118,7 +118,7 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) { // test querynode/datanode/indexnode/proxy already down, but new node start up with same ip and port base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} + base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}} base.grpcClientMtx.Unlock() _, err = base.Call(ctx, func(client *mockClient) (any, error) { return struct{}{}, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error()) @@ -127,7 +127,7 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) { // test querynode/datanode/indexnode/proxy down, return unavailable error base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} + base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}} base.grpcClientMtx.Unlock() _, err = base.Call(ctx, func(client *mockClient) (any, error) { return struct{}{}, status.Errorf(codes.Unavailable, "fake error") @@ -152,7 +152,7 @@ func testCall(t *testing.T, compressed bool) { base.CompressionEnabled = compressed initClient := func() { base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} + base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}} base.grpcClientMtx.Unlock() } base.MaxAttempts = 1 @@ -292,7 +292,7 @@ func TestClientBase_Recall(t *testing.T) { base := ClientBase[*mockClient]{} initClient := func() { base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} + base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}} base.grpcClientMtx.Unlock() } base.MaxAttempts = 1 @@ -354,7 +354,7 @@ func TestClientBase_Recall(t *testing.T) { func TestClientBase_CheckGrpcError(t *testing.T) { base := ClientBase[*mockClient]{} - base.grpcClient = &mockClient{} + base.grpcClient = &clientConnWrapper[*mockClient]{client: &mockClient{}} base.MaxAttempts = 1 ctx := context.Background()