diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 11c531e69b491..3d3dba2537d70 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -24,7 +24,6 @@ import ( "sync" "time" - "github.com/cockroachdb/errors" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" clientv3 "go.etcd.io/etcd/client/v3" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -50,6 +49,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/interceptor" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" ) @@ -326,11 +326,8 @@ func (s *Server) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannel } func (s *Server) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { - if s.datanode.GetStateCode() != commonpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DataNode isn't healthy.", - }, errors.New("DataNode is not ready yet") + if err := merr.CheckHealthy(s.datanode.GetStateCode()); err != nil { + return merr.Status(err), nil } return s.datanode.FlushSegments(ctx, req) } diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 86ca106699338..4f910c0af1854 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -262,7 +262,7 @@ func Test_NewServer(t *testing.T) { status: &commonpb.Status{}, } states, err := server.FlushSegments(ctx, nil) - assert.Error(t, err) + assert.NoError(t, err) assert.NotNil(t, states) }) diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index c06afaa897eb4..671600c46e7bb 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -868,19 +868,13 @@ func (s *Server) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasReq func (s *Server) DescribeAlias(ctx context.Context, request *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) { return &milvuspb.DescribeAliasResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "TODO: implement me", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("DescribeAlias unimplemented")), }, nil } func (s *Server) ListAliases(ctx context.Context, request *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) { return &milvuspb.ListAliasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "TODO: implement me", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("ListAliases unimplemented")), }, nil } @@ -1070,19 +1064,13 @@ func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou func (s *Server) ListIndexedSegment(ctx context.Context, req *federpb.ListIndexedSegmentRequest) (*federpb.ListIndexedSegmentResponse, error) { return &federpb.ListIndexedSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "not implemented", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("ListIndexedSegment unimplemented")), }, nil } func (s *Server) DescribeSegmentIndexData(ctx context.Context, req *federpb.DescribeSegmentIndexDataRequest) (*federpb.DescribeSegmentIndexDataResponse, error) { return &federpb.DescribeSegmentIndexDataResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "not implemented", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("DescribeSegmentIndexData unimplemented")), }, nil } diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index 359719b5c6738..47f68e722ae7a 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -42,19 +42,20 @@ import ( ) func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) { + log := log.Ctx(ctx).With( + zap.String("clusterID", req.GetClusterID()), + zap.Int64("indexBuildID", req.GetBuildID()), + ) + if !i.lifetime.Add(commonpbutil.IsHealthy) { stateCode := i.lifetime.GetState() - log.Ctx(ctx).Warn("index node not ready", + log.Warn("index node not ready", zap.String("state", stateCode.String()), - zap.String("clusterID", req.GetClusterID()), - zap.Int64("indexBuildID", req.GetBuildID()), ) return merr.Status(merr.WrapErrServiceNotReady(stateCode.String())), nil } defer i.lifetime.Done() - log.Ctx(ctx).Info("IndexNode building index ...", - zap.String("clusterID", req.GetClusterID()), - zap.Int64("indexBuildID", req.GetBuildID()), + log.Info("IndexNode building index ...", zap.Int64("indexID", req.GetIndexID()), zap.String("indexName", req.GetIndexName()), zap.String("indexFilePrefix", req.GetIndexFilePrefix()), @@ -77,26 +78,20 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest cancel: taskCancel, state: commonpb.IndexState_InProgress, }); oldInfo != nil { - log.Ctx(ctx).Warn("duplicated index build task", zap.String("clusterID", req.GetClusterID()), zap.Int64("buildID", req.GetBuildID())) + err := merr.WrapErrIndexDuplicate(req.GetIndexName(), "building index task existed") + log.Warn("duplicated index build task", zap.Error(err)) metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_BuildIndexError, - Reason: "duplicated index build task", - }, nil + return merr.Status(err), nil } cm, err := i.storageFactory.NewChunkManager(i.loopCtx, req.GetStorageConfig()) if err != nil { - log.Ctx(ctx).Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()), + log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()), zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()), - zap.String("clusterID", req.GetClusterID()), zap.Int64("indexBuildID", req.GetBuildID()), zap.Error(err), ) i.deleteTaskInfos(ctx, []taskKey{{ClusterID: req.GetClusterID(), BuildID: req.GetBuildID()}}) metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_BuildIndexError, - Reason: "create chunk manager failed, error: " + err.Error(), - }, nil + return merr.Status(err), nil } task := &indexBuildTask{ ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID), @@ -113,15 +108,15 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest } ret := merr.Status(nil) if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil { - log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", req.GetBuildID()), - zap.String("clusterID", req.GetClusterID()), zap.Error(err)) + log.Warn("IndexNode failed to schedule", + zap.Error(err)) ret = merr.Status(err) metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc() return ret, nil } metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc() - log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("indexBuildID", req.GetBuildID()), - zap.String("clusterID", req.GetClusterID()), zap.String("indexName", req.GetIndexName())) + log.Info("IndexNode successfully scheduled", + zap.String("indexName", req.GetIndexName())) return ret, nil } @@ -253,7 +248,6 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: msgIndexNodeIsUnhealthy(paramtable.GetNodeID()), }, - Response: "", }, nil } defer i.lifetime.Done() @@ -266,8 +260,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: merr.Status(err), - Response: "", + Status: merr.Status(err), }, nil } @@ -289,10 +282,6 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ zap.String("metricType", metricType)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, - Response: "", + Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), }, nil } diff --git a/internal/indexnode/indexnode_service_test.go b/internal/indexnode/indexnode_service_test.go index 26a632f004a56..b75b148629d99 100644 --- a/internal/indexnode/indexnode_service_test.go +++ b/internal/indexnode/indexnode_service_test.go @@ -92,8 +92,7 @@ func TestGetMetricsError(t *testing.T) { } resp, err = in.GetMetrics(ctx, unsupportedReq) assert.NoError(t, err) - assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError) - assert.Equal(t, resp.GetStatus().GetReason(), metricsinfo.MsgUnimplementedMetric) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrMetricNotFound) } func TestMockFieldData(t *testing.T) { diff --git a/internal/proxy/error.go b/internal/proxy/error.go deleted file mode 100644 index 88558c3ad00a3..0000000000000 --- a/internal/proxy/error.go +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package proxy - -import ( - "fmt" - - "github.com/cockroachdb/errors" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -) - -// Keep this error temporarily -// this error belongs to ErrServiceMemoryLimitExceeded -// but in the error returned by querycoord,the collection id is given -// which can not be thrown out -// the error will be deleted after reaching an agreement on collection name and id in qn - -// ErrInsufficientMemory returns insufficient memory error. -var ErrInsufficientMemory = errors.New("InsufficientMemoryToLoad") - -// InSufficientMemoryStatus returns insufficient memory status. -func InSufficientMemoryStatus(collectionName string) *commonpb.Status { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_InsufficientMemoryToLoad, - Reason: fmt.Sprintf("deny to load, insufficient memory, please allocate more resources, collectionName: %s", collectionName), - } -} diff --git a/internal/proxy/error_test.go b/internal/proxy/error_test.go deleted file mode 100644 index d5641f4e29457..0000000000000 --- a/internal/proxy/error_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package proxy - -import ( - "fmt" - "testing" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -) - -func Test_ErrInsufficientMemory(t *testing.T) { - err := fmt.Errorf("%w, mock insufficient memory error", ErrInsufficientMemory) - assert.True(t, errors.Is(err, ErrInsufficientMemory)) - - status := InSufficientMemoryStatus("collection1") - assert.Equal(t, commonpb.ErrorCode_InsufficientMemoryToLoad, status.GetErrorCode()) -} diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index a460dca3f9a11..f6f111dc6be4d 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1483,13 +1483,13 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse { log.Warn("fail to get loading progress", - zap.String("collection_name", request.CollectionName), - zap.Strings("partition_name", request.PartitionNames), + zap.String("collectionName", request.CollectionName), + zap.Strings("partitionName", request.PartitionNames), zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - if errors.Is(err, ErrInsufficientMemory) { + if errors.Is(err, merr.ErrServiceMemoryLimitExceeded) { return &milvuspb.GetLoadingProgressResponse{ - Status: InSufficientMemoryStatus(request.GetCollectionName()), + Status: merr.Status(err), } } return &milvuspb.GetLoadingProgressResponse{ @@ -1574,14 +1574,6 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt return getErrResponse(err), nil } - // TODO(longjiquan): https://github.com/milvus-io/milvus/issues/21485, Remove `GetComponentStates` after error code - // is ready to distinguish case whether the querycoord is not healthy or the collection is not even loaded. - if statesResp, err := node.queryCoord.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}); err != nil { - return getErrResponse(err), nil - } else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy { - return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil - } - successResponse := &milvuspb.GetLoadStateResponse{ Status: merr.Status(nil), } @@ -1615,24 +1607,30 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt var progress int64 if len(request.GetPartitionNames()) == 0 { if progress, _, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil { - if errors.Is(err, ErrInsufficientMemory) { + if err != nil { + if errors.Is(err, merr.ErrCollectionNotLoaded) { + successResponse.State = commonpb.LoadState_LoadStateNotLoad + return successResponse, nil + } return &milvuspb.GetLoadStateResponse{ - Status: InSufficientMemoryStatus(request.GetCollectionName()), + Status: merr.Status(err), }, nil } - successResponse.State = commonpb.LoadState_LoadStateNotLoad - return successResponse, nil } } else { if progress, _, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(), request.GetPartitionNames(), request.GetCollectionName(), collectionID, request.GetDbName()); err != nil { - if errors.Is(err, ErrInsufficientMemory) { + if err != nil { + if errors.IsAny(err, + merr.ErrCollectionNotLoaded, + merr.ErrPartitionNotLoaded) { + successResponse.State = commonpb.LoadState_LoadStateNotLoad + return successResponse, nil + } return &milvuspb.GetLoadStateResponse{ - Status: InSufficientMemoryStatus(request.GetCollectionName()), + Status: merr.Status(err), }, nil } - successResponse.State = commonpb.LoadState_LoadStateNotLoad - return successResponse, nil } } if progress >= 100 { diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 159d5bf879a9f..2ef6717bfacae 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -4242,18 +4242,8 @@ func TestProxy_GetLoadState(t *testing.T) { { qc := getQueryCoordClient() - qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{ - NodeID: 0, - Role: typeutil.QueryCoordRole, - StateCode: commonpb.StateCode_Abnormal, - ExtraInfo: nil, - }, - SubcomponentStates: nil, - Status: merr.Status(nil), - }, nil) qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(merr.WrapErrServiceNotReady("initialization")), CollectionIDs: nil, InMemoryPercentages: []int64{}, }, nil) @@ -4261,27 +4251,17 @@ func TestProxy_GetLoadState(t *testing.T) { proxy.stateCode.Store(commonpb.StateCode_Healthy) stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stateResp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(stateResp.GetStatus()), merr.ErrServiceNotReady) progressResp, err := proxy.GetLoadingProgress(context.Background(), &milvuspb.GetLoadingProgressRequest{CollectionName: "foo"}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, progressResp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(progressResp.GetStatus()), merr.ErrServiceNotReady) } { qc := getQueryCoordClient() - qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{ - NodeID: 0, - Role: typeutil.QueryCoordRole, - StateCode: commonpb.StateCode_Healthy, - ExtraInfo: nil, - }, - SubcomponentStates: nil, - Status: merr.Status(nil), - }, nil) - qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("test")) - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("test")) + qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, merr.WrapErrCollectionNotLoaded("foo")) + qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, merr.WrapErrPartitionNotLoaded("p1")) proxy := &Proxy{queryCoord: qc} proxy.stateCode.Store(commonpb.StateCode_Healthy) @@ -4306,37 +4286,6 @@ func TestProxy_GetLoadState(t *testing.T) { assert.Equal(t, int64(0), progressResp.Progress) } - { - qc := getQueryCoordClient() - qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{ - NodeID: 0, - Role: typeutil.QueryCoordRole, - StateCode: commonpb.StateCode_Healthy, - ExtraInfo: nil, - }, - SubcomponentStates: nil, - Status: merr.Status(nil), - }, nil) - qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - CollectionIDs: nil, - InMemoryPercentages: []int64{}, - }, nil) - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("test")) - proxy := &Proxy{queryCoord: qc} - proxy.stateCode.Store(commonpb.StateCode_Healthy) - - stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, stateResp.GetStatus().GetErrorCode()) - assert.Equal(t, commonpb.LoadState_LoadStateNotLoad, stateResp.State) - - progressResp, err := proxy.GetLoadingProgress(context.Background(), &milvuspb.GetLoadingProgressRequest{CollectionName: "foo"}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, progressResp.GetStatus().GetErrorCode()) - } - { qc := getQueryCoordClient() qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ @@ -4415,11 +4364,13 @@ func TestProxy_GetLoadState(t *testing.T) { SubcomponentStates: nil, Status: merr.Status(nil), }, nil) + + mockErr := merr.WrapErrServiceMemoryLimitExceeded(110, 100) qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_InsufficientMemoryToLoad}, + Status: merr.Status(mockErr), }, nil) qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&querypb.ShowPartitionsResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_InsufficientMemoryToLoad}, + Status: merr.Status(mockErr), }, nil) proxy := &Proxy{queryCoord: qc} proxy.stateCode.Store(commonpb.StateCode_Healthy) diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 3fec75050a065..dd5f35ec1fe06 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -1241,32 +1241,22 @@ func getCollectionProgress( CollectionIDs: []int64{collectionID}, }) if err != nil { - log.Warn("fail to show collections", zap.Int64("collection_id", collectionID), zap.Error(err)) - return - } - - if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_InsufficientMemoryToLoad { - err = ErrInsufficientMemory - log.Warn("detected insufficientMemoryError when getCollectionProgress", zap.Int64("collection_id", collectionID), zap.String("reason", resp.GetStatus().GetReason())) - return - } - - if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - err = merr.Error(resp.GetStatus()) - log.Warn("fail to show collections", zap.Int64("collection_id", collectionID), - zap.String("reason", resp.GetStatus().GetReason())) + log.Warn("fail to show collections", + zap.Int64("collectionID", collectionID), + zap.Error(err), + ) return } - if len(resp.InMemoryPercentages) == 0 { - errMsg := "fail to show collections from the querycoord, no data" - err = errors.New(errMsg) - log.Warn(errMsg, zap.Int64("collection_id", collectionID)) + err = merr.Error(resp.GetStatus()) + if err != nil { + log.Warn("fail to show collections", + zap.Int64("collectionID", collectionID), + zap.Error(err)) return } loadProgress = resp.GetInMemoryPercentages()[0] - if len(resp.GetRefreshProgress()) > 0 { // Compatibility for new Proxy with old QueryCoord refreshProgress = resp.GetRefreshProgress()[0] } @@ -1311,34 +1301,17 @@ func getPartitionProgress( zap.Error(err)) return } - if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_InsufficientMemoryToLoad { - err = ErrInsufficientMemory - log.Warn("detected insufficientMemoryError when getPartitionProgress", - zap.Int64("collection_id", collectionID), - zap.String("collection_name", collectionName), - zap.Strings("partition_names", partitionNames), - zap.String("reason", resp.GetStatus().GetReason()), - ) - return - } - if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + err = merr.Error(resp.GetStatus()) + if err != nil { err = merr.Error(resp.GetStatus()) log.Warn("fail to show partitions", - zap.String("collection_name", collectionName), - zap.Strings("partition_names", partitionNames), - zap.String("reason", resp.GetStatus().GetReason())) + zap.String("collectionName", collectionName), + zap.Strings("partitionNames", partitionNames), + zap.Error(err)) return } - if len(resp.InMemoryPercentages) != len(partitionIDs) { - errMsg := "fail to show partitions from the querycoord, invalid data num" - err = errors.New(errMsg) - log.Warn(errMsg, zap.Int64("collection_id", collectionID), - zap.String("collection_name", collectionName), - zap.Strings("partition_names", partitionNames)) - return - } for _, p := range resp.InMemoryPercentages { loadProgress += p } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 0923ccb36601e..a7b68530c1789 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -104,7 +104,7 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio }, nil } - err = fmt.Errorf("collection %d has not been loaded to memory or load failed", collectionID) + err = merr.WrapErrCollectionNotLoaded(collectionID) log.Warn("show collection failed", zap.Error(err)) return &querypb.ShowCollectionsResponse{ Status: merr.Status(err), @@ -162,10 +162,9 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions } err = merr.WrapErrPartitionNotLoaded(partitionID) - msg := fmt.Sprintf("partition %d has not been loaded to memory or load failed", partitionID) - log.Warn(msg) + log.Warn("show partitions failed", zap.Error(err)) return &querypb.ShowPartitionsResponse{ - Status: merr.Status(errors.Wrap(err, msg)), + Status: merr.Status(err), }, nil } percentages = append(percentages, int64(percentage)) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index b15c6187752dd..61c9dbb3a3c07 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -419,21 +419,16 @@ func (m *importManager) isRowbased(files []string) (bool, error) { // importJob processes the import request, generates import tasks, sends these tasks to DataCoord, and returns // immediately. func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportRequest, cID int64, pID int64) *milvuspb.ImportResponse { - returnErrorFunc := func(reason string) *milvuspb.ImportResponse { + if len(req.GetFiles()) == 0 { return &milvuspb.ImportResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: reason, - }, + Status: merr.Status(merr.WrapErrParameterInvalidMsg("import request is empty")), } } - if req == nil || len(req.Files) == 0 { - return returnErrorFunc("import request is empty") - } - if m.callImportService == nil { - return returnErrorFunc("import service is not available") + return &milvuspb.ImportResponse{ + Status: merr.Status(merr.WrapErrServiceUnavailable("import service unavailable")), + } } resp := &milvuspb.ImportResponse{ @@ -553,7 +548,9 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque return nil }() if err != nil { - return returnErrorFunc(err.Error()) + return &milvuspb.ImportResponse{ + Status: merr.Status(err), + } } if sendOutTasksErr := m.sendOutTasks(ctx); sendOutTasksErr != nil { log.Error("fail to send out tasks", zap.Error(sendOutTasksErr)) @@ -755,11 +752,8 @@ func (m *importManager) copyTaskInfo(input *datapb.ImportTaskInfo, output *milvu // getTaskState looks for task with the given ID and returns its import state. func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse { resp := &milvuspb.GetImportStateResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "import task id doesn't exist", - }, - Infos: make([]*commonpb.KeyValuePair, 0), + Status: merr.Status(nil), + Infos: make([]*commonpb.KeyValuePair, 0), } // (1) Search in pending tasks list. found := false @@ -786,24 +780,24 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse return resp } // (3) Search in Etcd. - if v, err := m.taskStore.Load(BuildImportTaskKey(tID)); err == nil && v != "" { - ti := &datapb.ImportTaskInfo{} - if err := proto.Unmarshal([]byte(v), ti); err != nil { - log.Error("failed to unmarshal proto", zap.String("taskInfo", v), zap.Error(err)) - } else { - m.copyTaskInfo(ti, resp) - found = true - } - } else { + v, err := m.taskStore.Load(BuildImportTaskKey(tID)) + if err != nil { log.Warn("failed to load task info from Etcd", zap.String("value", v), - zap.Error(err)) + zap.Error(err), + ) + resp.Status = merr.Status(err) + return resp } - if found { - log.Info("getting import task state", zap.Int64("task ID", tID), zap.Any("state", resp.State), zap.Int64s("segment", resp.SegmentIds)) + + ti := &datapb.ImportTaskInfo{} + if err := proto.Unmarshal([]byte(v), ti); err != nil { + log.Error("failed to unmarshal proto", zap.String("taskInfo", v), zap.Error(err)) + resp.Status = merr.Status(err) return resp } - log.Debug("get import task state failed", zap.Int64("taskID", tID)) + + m.copyTaskInfo(ti, resp) return resp } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 21aaee2063672..298f707621bd9 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1865,25 +1865,25 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus // Currently, Backup tool call import must with a partition name, each time restore a partition if req.GetPartitionName() != "" { if pID, err = c.meta.GetPartitionByName(cID, req.GetPartitionName(), typeutil.MaxTimestamp); err != nil { - log.Warn("failed to get partition ID from its name", zap.String("partition name", req.GetPartitionName()), zap.Error(err)) + log.Warn("failed to get partition ID from its name", zap.String("partitionName", req.GetPartitionName()), zap.Error(err)) return &milvuspb.ImportResponse{ - Status: merr.Status(merr.WrapBulkInsertPartitionNotFound(req.GetCollectionName(), req.GetPartitionName())), + Status: merr.Status(merr.WrapErrPartitionNotFound(req.GetPartitionName())), }, nil } } else { log.Info("partition name not specified when backup recovery", zap.String("collectionName", req.GetCollectionName())) return &milvuspb.ImportResponse{ - Status: merr.Status(merr.WrapBadBulkInsertRequest("partition name not specified when backup")), + Status: merr.Status(merr.WrapErrParameterInvalidMsg("partition not specified")), }, nil } } else { if hasPartitionKey { if req.GetPartitionName() != "" { msg := "not allow to set partition name for collection with partition key" - log.Warn(msg, zap.String("collection name", req.GetCollectionName())) + log.Warn(msg, zap.String("collectionName", req.GetCollectionName())) return &milvuspb.ImportResponse{ - Status: merr.Status(merr.WrapBadBulkInsertRequest(msg)), + Status: merr.Status(merr.WrapErrParameterInvalidMsg(msg)), }, nil } } else { @@ -1895,7 +1895,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus zap.String("partition name", req.GetPartitionName()), zap.Error(err)) return &milvuspb.ImportResponse{ - Status: merr.Status(merr.WrapBulkInsertPartitionNotFound(req.GetCollectionName(), req.GetPartitionName())), + Status: merr.Status(merr.WrapErrPartitionNotFound(req.GetPartitionName())), }, nil } } @@ -1904,8 +1904,8 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus log.Info("RootCoord receive import request", zap.String("collectionName", req.GetCollectionName()), zap.Int64("collectionID", cID), - zap.String("partition name", req.GetPartitionName()), - zap.Strings("virtual channel names", req.GetChannelNames()), + zap.String("partitionName", req.GetPartitionName()), + zap.Strings("virtualChannelNames", req.GetChannelNames()), zap.Int64("partitionID", pID), zap.Int("# of files = ", len(req.GetFiles())), ) @@ -1997,11 +1997,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( // Upon receiving ReportImport request, update the related task's state in task store. ti, err := c.importManager.updateTaskInfo(ir) if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure, - Reason: err.Error(), - Code: merr.Code(err), - }, nil + return merr.Status(err), nil } // If task failed, send task to idle datanode diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 593d192d64ca2..3748d6c0a0638 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1057,7 +1057,7 @@ func TestCore_Import(t *testing.T) { CollectionName: "a-good-name", }) assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrBulkInsertPartitionNotFound) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrPartitionNotFound) }) t.Run("normal case", func(t *testing.T) { @@ -1101,7 +1101,7 @@ func TestCore_Import(t *testing.T) { }, }) assert.NotNil(t, resp) - assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrBadBulkInsertRequest) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid) }) // Remove the following case after bulkinsert can support partition key @@ -1159,7 +1159,7 @@ func TestCore_Import(t *testing.T) { PartitionName: "p1", }) assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrBadBulkInsertRequest) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid) }) t.Run("backup should set partition name", func(t *testing.T) { @@ -1201,7 +1201,7 @@ func TestCore_Import(t *testing.T) { }, }) assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp1.GetStatus()), merr.ErrBadBulkInsertRequest) + assert.ErrorIs(t, merr.Error(resp1.GetStatus()), merr.ErrParameterInvalid) meta.GetPartitionByNameFunc = func(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) { return common.InvalidPartitionID, fmt.Errorf("partition ID not found for partition name '%s'", partitionName) @@ -1217,7 +1217,7 @@ func TestCore_Import(t *testing.T) { }, }) assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp2.GetStatus()), merr.ErrBulkInsertPartitionNotFound) + assert.ErrorIs(t, merr.Error(resp2.GetStatus()), merr.ErrPartitionNotFound) }) } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index ac31f5061d02f..41e214683f7e1 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -117,10 +117,6 @@ var ( ErrInvalidSearchResult = newMilvusError("fail to parse search result", 1805, false) ErrCheckPrimaryKey = newMilvusError("please check the primary key and its' type can only in [int, string]", 1806, false) - // bulkinsert related - ErrBadBulkInsertRequest = newMilvusError("bad bulkinsert request", 1900, false) - ErrBulkInsertPartitionNotFound = newMilvusError("partition not found during bulkinsert", 1901, false) - // Segcore related ErrSegcore = newMilvusError("segcore error", 2000, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index d841313364147..40c74ba907711 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -133,10 +133,6 @@ func (s *ErrSuite) TestWrap() { // field related s.ErrorIs(WrapErrFieldNotFound("meta", "failed to get field"), ErrFieldNotFound) - - // bulkinsert related - s.ErrorIs(WrapBadBulkInsertRequest("fail reason"), ErrBadBulkInsertRequest) - s.ErrorIs(WrapBulkInsertPartitionNotFound("hello_milvus", "notexist"), ErrBulkInsertPartitionNotFound) } func (s *ErrSuite) TestOldCode() { diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 07f199fdfebda..6da59299ca899 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -639,11 +639,3 @@ func WrapErrFieldNotFound[T any](field T, msg ...string) error { func wrapWithField(err error, name string, value any) error { return errors.Wrapf(err, "%s=%v", name, value) } - -func WrapBadBulkInsertRequest(msg ...string) error { - return errors.Wrap(ErrBadBulkInsertRequest, strings.Join(msg, "; ")) -} - -func WrapBulkInsertPartitionNotFound(collection any, partition any) error { - return errors.Wrapf(ErrBulkInsertPartitionNotFound, "collection=%s, partition=%s", collection, partition) -}