From eadd48ef7c9d54bac97e50c5dbc9d21754058a56 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 20 Oct 2023 04:28:09 +0800 Subject: [PATCH] fix grpc error judge logic (#27800) Signed-off-by: Wei Liu --- internal/querynodev2/cluster/worker.go | 9 ++++----- internal/querynodev2/cluster/worker_test.go | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/internal/querynodev2/cluster/worker.go b/internal/querynodev2/cluster/worker.go index 9261dac91089d..9791de7547b0b 100644 --- a/internal/querynodev2/cluster/worker.go +++ b/internal/querynodev2/cluster/worker.go @@ -22,8 +22,8 @@ import ( "fmt" "io" + "github.com/cockroachdb/errors" "go.uber.org/zap" - "google.golang.org/grpc/codes" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -108,7 +107,7 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e ) status, err := w.client.Delete(ctx, req) if err := merr.CheckRPCCall(status, err); err != nil { - if funcutil.IsGrpcErr(err, codes.Unimplemented) { + if errors.Is(err, merr.ErrServiceUnimplemented) { log.Warn("invoke legacy querynode Delete method, ignore error", zap.Error(err)) return nil } @@ -120,7 +119,7 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { ret, err := w.client.SearchSegments(ctx, req) - if err != nil && funcutil.IsGrpcErr(err, codes.Unimplemented) { + if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) { // for compatible with rolling upgrade from version before v2.2.9 return w.client.Search(ctx, req) } @@ -130,7 +129,7 @@ func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRe func (w *remoteWorker) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { ret, err := w.client.QuerySegments(ctx, req) - if err != nil && funcutil.IsGrpcErr(err, codes.Unimplemented) { + if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) { // for compatible with rolling upgrade from version before v2.2.9 return w.client.Query(ctx, req) } diff --git a/internal/querynodev2/cluster/worker_test.go b/internal/querynodev2/cluster/worker_test.go index 55c0e97f1cf48..084fa3ead1f9b 100644 --- a/internal/querynodev2/cluster/worker_test.go +++ b/internal/querynodev2/cluster/worker_test.go @@ -183,7 +183,7 @@ func (s *RemoteWorkerSuite) TestDelete() { defer func() { s.mockClient.ExpectedCalls = nil }() s.mockClient.EXPECT().Delete(mock.Anything, mock.AnythingOfType("*querypb.DeleteRequest")). - Return(nil, status.Errorf(codes.Unimplemented, "mocked grpc unimplemented")) + Return(nil, merr.WrapErrServiceUnimplemented(status.Errorf(codes.Unimplemented, "mocked grpc unimplemented"))) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -258,7 +258,7 @@ func (s *RemoteWorkerSuite) TestSearch() { grpcErr := status.Error(codes.Unimplemented, "method not implemented") s.mockClient.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). - Return(result, grpcErr) + Return(result, merr.WrapErrServiceUnimplemented(grpcErr)) s.mockClient.EXPECT().Search(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). Return(result, err) @@ -337,7 +337,7 @@ func (s *RemoteWorkerSuite) TestQuery() { grpcErr := status.Error(codes.Unimplemented, "method not implemented") s.mockClient.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). - Return(result, grpcErr) + Return(result, merr.WrapErrServiceUnimplemented(grpcErr)) s.mockClient.EXPECT().Query(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). Return(result, err)