Skip to content

Commit

Permalink
fix grpc error judge logic (#27800)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Oct 19, 2023
1 parent 49516d4 commit eadd48e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
9 changes: 4 additions & 5 deletions internal/querynodev2/cluster/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ import (
"fmt"
"io"

"github.com/cockroachdb/errors"
"go.uber.org/zap"
"google.golang.org/grpc/codes"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
)

Expand Down Expand Up @@ -108,7 +107,7 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e
)
status, err := w.client.Delete(ctx, req)
if err := merr.CheckRPCCall(status, err); err != nil {
if funcutil.IsGrpcErr(err, codes.Unimplemented) {
if errors.Is(err, merr.ErrServiceUnimplemented) {
log.Warn("invoke legacy querynode Delete method, ignore error", zap.Error(err))
return nil
}
Expand All @@ -120,7 +119,7 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e

func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
ret, err := w.client.SearchSegments(ctx, req)
if err != nil && funcutil.IsGrpcErr(err, codes.Unimplemented) {
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
// for compatible with rolling upgrade from version before v2.2.9
return w.client.Search(ctx, req)
}
Expand All @@ -130,7 +129,7 @@ func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRe

func (w *remoteWorker) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
ret, err := w.client.QuerySegments(ctx, req)
if err != nil && funcutil.IsGrpcErr(err, codes.Unimplemented) {
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
// for compatible with rolling upgrade from version before v2.2.9
return w.client.Query(ctx, req)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/querynodev2/cluster/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *RemoteWorkerSuite) TestDelete() {
defer func() { s.mockClient.ExpectedCalls = nil }()

s.mockClient.EXPECT().Delete(mock.Anything, mock.AnythingOfType("*querypb.DeleteRequest")).
Return(nil, status.Errorf(codes.Unimplemented, "mocked grpc unimplemented"))
Return(nil, merr.WrapErrServiceUnimplemented(status.Errorf(codes.Unimplemented, "mocked grpc unimplemented")))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *RemoteWorkerSuite) TestSearch() {

grpcErr := status.Error(codes.Unimplemented, "method not implemented")
s.mockClient.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")).
Return(result, grpcErr)
Return(result, merr.WrapErrServiceUnimplemented(grpcErr))
s.mockClient.EXPECT().Search(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")).
Return(result, err)

Expand Down Expand Up @@ -337,7 +337,7 @@ func (s *RemoteWorkerSuite) TestQuery() {

grpcErr := status.Error(codes.Unimplemented, "method not implemented")
s.mockClient.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")).
Return(result, grpcErr)
Return(result, merr.WrapErrServiceUnimplemented(grpcErr))
s.mockClient.EXPECT().Query(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")).
Return(result, err)

Expand Down

0 comments on commit eadd48e

Please sign in to comment.