From 590d8b4b7e73009701757d6fe11d34d52b6071c3 Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 23 Dec 2024 16:16:48 +0800 Subject: [PATCH] enhance: add collection id as a parameter for list segment and channel request enhance: add db name in replica description Signed-off-by: jaime --- .../datacoord/compaction_task_clustering.go | 9 ++++ internal/datacoord/compaction_task_l0.go | 22 +++++----- internal/datacoord/compaction_task_meta.go | 4 +- internal/datacoord/compaction_task_mix.go | 26 ++++++------ internal/datacoord/index_meta.go | 4 +- internal/datacoord/metrics_info.go | 16 +++---- internal/datacoord/server.go | 6 +-- internal/datanode/data_node.go | 6 ++- .../pipeline/flow_graph_manager.go | 15 +++++-- .../pipeline/flow_graph_manager_test.go | 17 ++++++-- .../flushcommon/pipeline/mock_fgmanager.go | 42 ++++++++++--------- internal/metastore/model/segment_index.go | 3 ++ internal/proto/index_coord.proto | 1 + internal/proto/query_coord.proto | 1 + internal/proxy/http_req_impl.go | 10 +++-- internal/proxy/impl.go | 8 ++-- internal/querycoordv2/handlers.go | 10 ++--- internal/querycoordv2/job/job_load.go | 23 ++++++---- .../querycoordv2/meta/channel_dist_manager.go | 21 +++++++--- .../meta/channel_dist_manager_test.go | 2 +- .../querycoordv2/meta/collection_manager.go | 11 ++++- internal/querycoordv2/meta/dist_manager.go | 8 ++-- .../querycoordv2/meta/dist_manager_test.go | 11 ++++- .../querycoordv2/meta/leader_view_manager.go | 17 +++++++- .../meta/leader_view_manager_test.go | 11 +++-- .../querycoordv2/meta/mock_target_manager.go | 21 +++++----- internal/querycoordv2/meta/replica_manager.go | 12 +++++- .../querycoordv2/meta/replica_manager_test.go | 23 +++++++++- .../querycoordv2/meta/segment_dist_manager.go | 5 ++- internal/querycoordv2/meta/target.go | 17 +++++--- internal/querycoordv2/meta/target_manager.go | 6 +-- .../querycoordv2/meta/target_manager_test.go | 10 ++++- internal/querycoordv2/server.go | 9 ++-- internal/querycoordv2/utils/meta.go | 1 - internal/querynodev2/metrics_info.go | 11 +++-- internal/querynodev2/metrics_info_test.go | 7 ++-- internal/querynodev2/pipeline/manager.go | 7 +++- internal/querynodev2/server.go | 6 ++- pkg/util/metricsinfo/metric_request.go | 28 +++++++++---- pkg/util/metricsinfo/metrics_info.go | 10 +++-- 40 files changed, 314 insertions(+), 163 deletions(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 53a5fbd97e3a6..4f939fefbf043 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -716,6 +716,15 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap } func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { + // if task state is completed, cleaned, failed, timeout, then do append end time and save + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { + ts := time.Now().Unix() + opts = append(opts, setEndTime(ts)) + } + task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) if err != nil { diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index e7535bca8e4b5..96bdb207059cd 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -130,9 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool { log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err)) return false } - - ts := time.Now().Unix() - updateOps := []compactionTaskOpt{setEndTime(ts)} switch result.GetState() { case datapb.CompactionTaskState_completed: t.result = result @@ -141,15 +138,13 @@ func (t *l0CompactionTask) processExecuting() bool { return false } - updateOps = append(updateOps, setState(datapb.CompactionTaskState_meta_saved)) - if err := t.updateAndSaveTaskMeta(updateOps...); err != nil { + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil { log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err)) return false } return t.processMetaSaved() case datapb.CompactionTaskState_failed: - updateOps = append(updateOps, setState(datapb.CompactionTaskState_failed)) - if err := t.updateAndSaveTaskMeta(updateOps...); err != nil { + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil { log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err)) return false } @@ -159,9 +154,7 @@ func (t *l0CompactionTask) processExecuting() bool { } func (t *l0CompactionTask) processMetaSaved() bool { - ts := time.Now().Unix() - updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)} - err := t.updateAndSaveTaskMeta(updateOps...) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) if err != nil { log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) return false @@ -358,6 +351,15 @@ func (t *l0CompactionTask) SaveTaskMeta() error { } func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { + // if task state is completed, cleaned, failed, timeout, then do append end time and save + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { + ts := time.Now().Unix() + opts = append(opts, setEndTime(ts)) + } + task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) if err != nil { diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index 33a2ad20b59cb..be18fd0884fae 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction Type: task.Type.String(), State: task.State.String(), FailReason: task.FailReason, - StartTime: typeutil.TimestampToString(uint64(task.StartTime)), - EndTime: typeutil.TimestampToString(uint64(task.EndTime)), + StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000), + EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000), TotalRows: task.TotalRows, InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string { return strconv.FormatInt(t, 10) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index c61756949574a..90b43432e065d 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -97,9 +97,7 @@ func (t *mixCompactionTask) processPipelining() bool { func (t *mixCompactionTask) processMetaSaved() bool { log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) - ts := time.Now().Unix() - updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)} - if err := t.updateAndSaveTaskMeta(updateOps...); err != nil { + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil { log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err)) return false } @@ -119,15 +117,12 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err)) return false } - - ts := time.Now().Unix() - failedUpdateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_failed)} switch result.GetState() { case datapb.CompactionTaskState_completed: t.result = result if len(result.GetSegments()) == 0 { log.Info("illegal compaction results") - err := t.updateAndSaveTaskMeta(failedUpdateOps...) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false @@ -137,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool { if err := t.saveSegmentMeta(); err != nil { log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err)) if errors.Is(err, merr.ErrIllegalCompactionPlan) { - err := t.updateAndSaveTaskMeta(failedUpdateOps...) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false @@ -154,7 +149,7 @@ func (t *mixCompactionTask) processExecuting() bool { return t.processMetaSaved() case datapb.CompactionTaskState_failed: log.Info("mixCompactionTask fail in datanode") - err := t.updateAndSaveTaskMeta(failedUpdateOps...) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { log.Warn("fail to updateAndSaveTaskMeta") } @@ -240,10 +235,8 @@ func (t *mixCompactionTask) processCompleted() bool { t.resetSegmentCompacting() UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("mixCompactionTask processCompleted done") - task := t.GetTaskProto() - log.Info("mixCompactionTask processCompleted done", - zap.Int64("planID", task.GetPlanID()), zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second)) return true } @@ -289,6 +282,15 @@ func (t *mixCompactionTask) doClean() error { } func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { + // if task state is completed, cleaned, failed, timeout, then do append end time and save + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { + ts := time.Now().Unix() + opts = append(opts, setEndTime(ts)) + } + task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) if err != nil { diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index bc12f4f88da1c..37e744bd8a4d5 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats { FailReason: s.FailReason, IndexSize: s.IndexSize, IndexVersion: s.IndexVersion, - CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime), - FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime), + CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000), + FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000), } } diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 0dd5c8cd720d8..394d50b89d7f2 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -137,23 +137,17 @@ func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[s func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { v := jsonReq.Get(metricsinfo.MetricRequestParamINKey) if !v.Exists() { - // default to get all segments from dataanode + // default to get all segments from datanode return s.getDataNodeSegmentsJSON(ctx, req) } in := v.String() - if in == "dn" { - // TODO: support filter by collection id + if in == metricsinfo.MetricsRequestParamsInDN { return s.getDataNodeSegmentsJSON(ctx, req) } - if in == "dc" { - v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey) - collectionID := int64(0) - if v.Exists() { - collectionID = v.Int() - } - + if in == metricsinfo.MetricsRequestParamsInDC { + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) segments := s.meta.getSegmentsMetrics(collectionID) for _, seg := range segments { isIndexed, indexedFields := s.meta.indexMeta.GetSegmentIndexedFields(seg.CollectionID, seg.SegmentID) @@ -163,7 +157,7 @@ func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRe bs, err := json.Marshal(segments) if err != nil { - log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error())) + log.Ctx(ctx).Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error())) return "", nil } return string(bs), nil diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b5c0ac15dc29f..e7b68bb4a2bb0 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1203,11 +1203,7 @@ func (s *Server) registerMetricsRequest() { s.metricsRequest.RegisterMetricsRequest(metricsinfo.IndexKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - v := jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey) - collectionID := int64(0) - if v.Exists() { - collectionID = v.Int() - } + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) return s.meta.indexMeta.GetIndexJSON(collectionID), nil }) log.Ctx(s.ctx).Info("register metrics actions finished") diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 7ee1b6a6be46d..bba865fbaf02a 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -292,12 +292,14 @@ func (node *DataNode) registerMetricsRequest() { node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return node.flowgraphManager.GetSegmentsJSON(), nil + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) + return node.flowgraphManager.GetSegmentsJSON(collectionID), nil }) node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return node.flowgraphManager.GetChannelsJSON(), nil + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) + return node.flowgraphManager.GetChannelsJSON(collectionID), nil }) log.Ctx(node.ctx).Info("register metrics actions finished") } diff --git a/internal/flushcommon/pipeline/flow_graph_manager.go b/internal/flushcommon/pipeline/flow_graph_manager.go index 8527e65ee91c6..9a93b0c7715bd 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager.go +++ b/internal/flushcommon/pipeline/flow_graph_manager.go @@ -43,8 +43,8 @@ type FlowgraphManager interface { GetFlowgraphCount() int GetCollectionIDs() []int64 - GetChannelsJSON() string - GetSegmentsJSON() string + GetChannelsJSON(collectionID int64) string + GetSegmentsJSON(collectionID int64) string Close() } @@ -121,9 +121,12 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 { } // GetChannelsJSON returns all channels in json format. -func (fm *fgManagerImpl) GetChannelsJSON() string { +func (fm *fgManagerImpl) GetChannelsJSON(collectionID int64) string { var channels []*metricsinfo.Channel fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + if collectionID > 0 && ds.metacache.Collection() != collectionID { + return true + } latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch) channels = append(channels, &metricsinfo.Channel{ Name: ch, @@ -143,9 +146,13 @@ func (fm *fgManagerImpl) GetChannelsJSON() string { return string(ret) } -func (fm *fgManagerImpl) GetSegmentsJSON() string { +func (fm *fgManagerImpl) GetSegmentsJSON(collectionID int64) string { var segments []*metricsinfo.Segment fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + if collectionID > 0 && ds.metacache.Collection() != collectionID { + return true + } + meta := ds.metacache for _, segment := range meta.GetSegmentsBy() { segments = append(segments, &metricsinfo.Segment{ diff --git a/internal/flushcommon/pipeline/flow_graph_manager_test.go b/internal/flushcommon/pipeline/flow_graph_manager_test.go index 6108d9911ae79..0566cc437aa5e 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager_test.go +++ b/internal/flushcommon/pipeline/flow_graph_manager_test.go @@ -193,8 +193,14 @@ func TestGetChannelsJSON(t *testing.T) { assert.NoError(t, err) expectedJSON := string(expectedBytes) - jsonResult := fm.GetChannelsJSON() + jsonResult := fm.GetChannelsJSON(0) assert.JSONEq(t, expectedJSON, jsonResult) + + jsonResult = fm.GetChannelsJSON(10) + var ret []*metricsinfo.Channel + err = json.Unmarshal([]byte(jsonResult), &ret) + assert.NoError(t, err) + assert.Equal(t, 0, len(ret)) } func TestGetSegmentJSON(t *testing.T) { @@ -228,7 +234,12 @@ func TestGetSegmentJSON(t *testing.T) { expectedJSON := string(expectedBytes) ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory) - jsonResult := fm.GetSegmentsJSON() - fmt.Println(jsonResult) + jsonResult := fm.GetSegmentsJSON(0) assert.JSONEq(t, expectedJSON, jsonResult) + + jsonResult = fm.GetSegmentsJSON(10) + var ret []*metricsinfo.Segment + err = json.Unmarshal([]byte(jsonResult), &ret) + assert.NoError(t, err) + assert.Equal(t, 0, len(ret)) } diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index cf8cd6b2aa1ca..f63c5590193c9 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -114,17 +114,17 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra return _c } -// GetChannelsJSON provides a mock function with given fields: -func (_m *MockFlowgraphManager) GetChannelsJSON() string { - ret := _m.Called() +// GetChannelsJSON provides a mock function with given fields: collectionID +func (_m *MockFlowgraphManager) GetChannelsJSON(collectionID int64) string { + ret := _m.Called(collectionID) if len(ret) == 0 { panic("no return value specified for GetChannelsJSON") } var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(int64) string); ok { + r0 = rf(collectionID) } else { r0 = ret.Get(0).(string) } @@ -138,13 +138,14 @@ type MockFlowgraphManager_GetChannelsJSON_Call struct { } // GetChannelsJSON is a helper method to define mock.On call -func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON() *MockFlowgraphManager_GetChannelsJSON_Call { - return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON")} +// - collectionID int64 +func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON(collectionID interface{}) *MockFlowgraphManager_GetChannelsJSON_Call { + return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON", collectionID)} } -func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func()) *MockFlowgraphManager_GetChannelsJSON_Call { +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func(collectionID int64)) *MockFlowgraphManager_GetChannelsJSON_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(int64)) }) return _c } @@ -154,7 +155,7 @@ func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Return(_a0 string) *MockFlo return _c } -func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetChannelsJSON_Call { +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func(int64) string) *MockFlowgraphManager_GetChannelsJSON_Call { _c.Call.Return(run) return _c } @@ -309,17 +310,17 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s return _c } -// GetSegmentsJSON provides a mock function with given fields: -func (_m *MockFlowgraphManager) GetSegmentsJSON() string { - ret := _m.Called() +// GetSegmentsJSON provides a mock function with given fields: collectionID +func (_m *MockFlowgraphManager) GetSegmentsJSON(collectionID int64) string { + ret := _m.Called(collectionID) if len(ret) == 0 { panic("no return value specified for GetSegmentsJSON") } var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(int64) string); ok { + r0 = rf(collectionID) } else { r0 = ret.Get(0).(string) } @@ -333,13 +334,14 @@ type MockFlowgraphManager_GetSegmentsJSON_Call struct { } // GetSegmentsJSON is a helper method to define mock.On call -func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON() *MockFlowgraphManager_GetSegmentsJSON_Call { - return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON")} +// - collectionID int64 +func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON(collectionID interface{}) *MockFlowgraphManager_GetSegmentsJSON_Call { + return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON", collectionID)} } -func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func()) *MockFlowgraphManager_GetSegmentsJSON_Call { +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func(collectionID int64)) *MockFlowgraphManager_GetSegmentsJSON_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(int64)) }) return _c } @@ -349,7 +351,7 @@ func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Return(_a0 string) *MockFlo return _c } -func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetSegmentsJSON_Call { +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func(int64) string) *MockFlowgraphManager_GetSegmentsJSON_Call { _c.Call.Return(run) return _c } diff --git a/internal/metastore/model/segment_index.go b/internal/metastore/model/segment_index.go index 9f6dfd8dce537..8c2e8e3ef3827 100644 --- a/internal/metastore/model/segment_index.go +++ b/internal/metastore/model/segment_index.go @@ -50,6 +50,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex { IndexSize: segIndex.SerializeSize, WriteHandoff: segIndex.WriteHandoff, CurrentIndexVersion: segIndex.GetCurrentIndexVersion(), + FinishedUTCTime: segIndex.FinishedTime, } } @@ -75,6 +76,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex { SerializeSize: segIdx.IndexSize, WriteHandoff: segIdx.WriteHandoff, CurrentIndexVersion: segIdx.CurrentIndexVersion, + FinishedTime: segIdx.FinishedUTCTime, } } @@ -96,5 +98,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex { IndexSize: segIndex.IndexSize, WriteHandoff: segIndex.WriteHandoff, CurrentIndexVersion: segIndex.CurrentIndexVersion, + FinishedUTCTime: segIndex.FinishedUTCTime, } } diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index adcd0aed7bd37..b736098d46b82 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -80,6 +80,7 @@ message SegmentIndex { bool write_handoff = 15; int32 current_index_version = 16; int64 index_store_version = 17; + uint64 finished_time = 18; } message RegisterNodeRequest { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index d469841c525ff..9b44e6f5c9560 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -668,6 +668,7 @@ message CollectionLoadInfo { LoadType load_type = 6; int32 recover_times = 7; repeated int64 load_fields = 8; + int64 dbID= 9; } message PartitionLoadInfo { diff --git a/internal/proxy/http_req_impl.go b/internal/proxy/http_req_impl.go index 5697e06d47122..0dfcce4d78e93 100644 --- a/internal/proxy/http_req_impl.go +++ b/internal/proxy/http_req_impl.go @@ -148,10 +148,14 @@ func getSlowQuery(node *Proxy) gin.HandlerFunc { // buildReqParams fetch all parameters from query parameter of URL, add them into a map data structure. // put key and value from query parameter into map, concatenate values with separator if values size is greater than 1 -func buildReqParams(c *gin.Context, metricsType string) map[string]interface{} { +func buildReqParams(c *gin.Context, metricsType string, customParams ...*commonpb.KeyValuePair) map[string]interface{} { ret := make(map[string]interface{}) ret[metricsinfo.MetricTypeKey] = metricsType + for _, kv := range customParams { + ret[kv.Key] = kv.Value + } + queryParams := c.Request.URL.Query() for key, values := range queryParams { if len(values) > 1 { @@ -163,7 +167,7 @@ func buildReqParams(c *gin.Context, metricsType string) map[string]interface{} { return ret } -func getQueryComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc { +func getQueryComponentMetrics(node *Proxy, metricsType string, customParams ...*commonpb.KeyValuePair) gin.HandlerFunc { return func(c *gin.Context) { params := buildReqParams(c, metricsType) req, err := metricsinfo.ConstructGetMetricsRequest(params) @@ -185,7 +189,7 @@ func getQueryComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc { } } -func getDataComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc { +func getDataComponentMetrics(node *Proxy, metricsType string, customParams ...*commonpb.KeyValuePair) gin.HandlerFunc { return func(c *gin.Context) { params := buildReqParams(c, metricsType) req, err := metricsinfo.ConstructGetMetricsRequest(params) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index c5fc8fc9ca555..edb5eb65b1b0f 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6753,10 +6753,10 @@ func (node *Proxy) RegisterRestRouter(router gin.IRouter) { router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.ReplicaKey)) router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.ResourceGroupKey)) router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.AllTaskKey)) - router.GET(http.QCSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey)) + router.GET(http.QCSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInQC)) // QueryNode requests that are forwarded from querycoord - router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey)) + router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInQN)) router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.ChannelKey)) // DataCoord requests that are forwarded from proxy @@ -6765,11 +6765,11 @@ func (node *Proxy) RegisterRestRouter(router gin.IRouter) { router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTaskKey)) router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTaskKey)) router.GET(http.IndexListPath, getDataComponentMetrics(node, metricsinfo.IndexKey)) - router.GET(http.DCSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey)) + router.GET(http.DCSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInDC)) // Datanode requests that are forwarded from datacoord router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTaskKey)) - router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey)) + router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInDN)) router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.ChannelKey)) // Database requests diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index bdfeeb027103b..a9c0a51baab00 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -301,17 +301,13 @@ func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRe } in := v.String() - if in == "qn" { + if in == metricsinfo.MetricsRequestParamsInQN { // TODO: support filter by collection id return s.getSegmentsFromQueryNode(ctx, req) } - if in == "qc" { - v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey) - collectionID := int64(0) - if v.Exists() { - collectionID = v.Int() - } + if in == metricsinfo.MetricsRequestParamsInQC { + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) filteredSegments := s.dist.SegmentDistManager.GetSegmentDist(collectionID) bs, err := json.Marshal(filteredSegments) if err != nil { diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 5009be26fb4ad..998335f9e9a41 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -171,14 +171,15 @@ func (job *LoadCollectionJob) Execute() error { } } + collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + // 2. create replica if not exist replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID()) if len(replicas) == 0 { - collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) - if err != nil { - return err - } - // API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API. // Then we can implement dynamic replica changed in different resource group independently. _, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames()) @@ -213,6 +214,7 @@ func (job *LoadCollectionJob) Execute() error { FieldIndexID: req.GetFieldIndexID(), LoadType: querypb.LoadType_LoadCollection, LoadFields: req.GetLoadFields(), + DbID: collectionInfo.GetDbId(), }, CreatedAt: time.Now(), LoadSpan: sp, @@ -371,13 +373,15 @@ func (job *LoadPartitionJob) Execute() error { } } + collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + // 2. create replica if not exist replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID()) if len(replicas) == 0 { - collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) - if err != nil { - return err - } _, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames()) if err != nil { msg := "failed to spawn replica for collection" @@ -412,6 +416,7 @@ func (job *LoadPartitionJob) Execute() error { FieldIndexID: req.GetFieldIndexID(), LoadType: querypb.LoadType_LoadPartition, LoadFields: req.GetLoadFields(), + DbID: collectionInfo.GetDbId(), }, CreatedAt: time.Now(), LoadSpan: sp, diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index db3dc8720100a..24e06327c3e54 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -300,15 +300,24 @@ func (m *ChannelDistManager) updateCollectionIndex() { } } -func (m *ChannelDistManager) GetChannelDist() []*metricsinfo.DmChannel { +func (m *ChannelDistManager) GetChannelDist(collectionID int64) []*metricsinfo.DmChannel { m.rwmutex.RLock() defer m.rwmutex.RUnlock() - var channels []*metricsinfo.DmChannel - for _, nodeChannels := range m.channels { - for _, channel := range nodeChannels.channels { - channels = append(channels, newDmChannelMetricsFrom(channel)) + var ret []*metricsinfo.DmChannel + if collectionID > 0 { + if channels, ok := m.collectionIndex[collectionID]; ok { + for _, channel := range channels { + ret = append(ret, newDmChannelMetricsFrom(channel)) + } } + return ret } - return channels + + for _, channels := range m.collectionIndex { + for _, channel := range channels { + ret = append(ret, newDmChannelMetricsFrom(channel)) + } + } + return ret } diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index aa5db6eb111d5..734d665d6f3df 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -207,7 +207,7 @@ func TestGetChannelDistJSON(t *testing.T) { manager.Update(1, channel1) manager.Update(2, channel2) - channels := manager.GetChannelDist() + channels := manager.GetChannelDist(0) assert.Equal(t, 2, len(channels)) checkResult := func(channel *metricsinfo.DmChannel) { diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index f95d18b0658f3..980331ff64c80 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -285,7 +285,13 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e // we should save it's CollectionLoadInfo to meta store for _, partition := range m.GetAllPartitions(ctx) { // In old version, collection would NOT be stored if the partition existed. - if _, ok := m.collections[partition.GetCollectionID()]; !ok { + if !m.Exist(ctx, partition.GetCollectionID()) { + collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + col := &Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: partition.GetCollectionID(), @@ -293,10 +299,11 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e Status: partition.GetStatus(), FieldIndexID: partition.GetFieldIndexID(), LoadType: querypb.LoadType_LoadPartition, + DbID: collectionInfo.GetDbId(), }, LoadPercentage: 100, } - err := m.PutCollection(ctx, col) + err = m.PutCollection(ctx, col) if err != nil { return err } diff --git a/internal/querycoordv2/meta/dist_manager.go b/internal/querycoordv2/meta/dist_manager.go index 05275f213587b..83f8f4ae7ce97 100644 --- a/internal/querycoordv2/meta/dist_manager.go +++ b/internal/querycoordv2/meta/dist_manager.go @@ -42,10 +42,10 @@ func NewDistributionManager() *DistributionManager { // It includes segments, DM channels, and leader views. // If there are no segments, channels, or leader views, it returns an empty string. // In case of an error during JSON marshaling, it returns the error. -func (dm *DistributionManager) GetDistributionJSON() string { - segments := dm.GetSegmentDist(0) - channels := dm.GetChannelDist() - leaderView := dm.GetLeaderView() +func (dm *DistributionManager) GetDistributionJSON(collectionID int64) string { + segments := dm.GetSegmentDist(collectionID) + channels := dm.GetChannelDist(collectionID) + leaderView := dm.GetLeaderView(collectionID) dist := &metricsinfo.QueryCoordDist{ Segments: segments, diff --git a/internal/querycoordv2/meta/dist_manager_test.go b/internal/querycoordv2/meta/dist_manager_test.go index 4af530ea6f4de..3e8f33f4eff26 100644 --- a/internal/querycoordv2/meta/dist_manager_test.go +++ b/internal/querycoordv2/meta/dist_manager_test.go @@ -81,7 +81,7 @@ func TestGetDistributionJSON(t *testing.T) { manager.LeaderViewManager.Update(2, leaderView2) // Call GetDistributionJSON - jsonOutput := manager.GetDistributionJSON() + jsonOutput := manager.GetDistributionJSON(0) // Verify JSON output var dist metricsinfo.QueryCoordDist @@ -91,4 +91,13 @@ func TestGetDistributionJSON(t *testing.T) { assert.Len(t, dist.Segments, 2) assert.Len(t, dist.DMChannels, 2) assert.Len(t, dist.LeaderViews, 2) + + jsonOutput = manager.GetDistributionJSON(1000) + var dist2 metricsinfo.QueryCoordDist + err = json.Unmarshal([]byte(jsonOutput), &dist2) + assert.NoError(t, err) + + assert.Len(t, dist2.Segments, 0) + assert.Len(t, dist2.DMChannels, 0) + assert.Len(t, dist2.LeaderViews, 0) } diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index c0cfca82b4ac1..c29171e6f90ae 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -320,13 +320,26 @@ func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderView // GetLeaderView returns a slice of LeaderView objects, each representing the state of a leader node. // It traverses the views map, converts each LeaderView to a metricsinfo.LeaderView, and collects them into a slice. // The method locks the views map for reading to ensure thread safety. -func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView { +func (mgr *LeaderViewManager) GetLeaderView(collectionID int64) []*metricsinfo.LeaderView { mgr.rwmutex.RLock() defer mgr.rwmutex.RUnlock() var leaderViews []*metricsinfo.LeaderView for _, nodeViews := range mgr.views { - for _, lv := range nodeViews.views { + var filteredViews []*LeaderView + if collectionID > 0 { + if lv, ok := nodeViews.collectionViews[collectionID]; ok { + filteredViews = lv + } else { + // if collectionID is not found, return empty leader views + return leaderViews + } + } else { + // if collectionID is not set, return all leader views + filteredViews = nodeViews.views + } + + for _, lv := range filteredViews { errString := "" if lv.UnServiceableError != nil { errString = lv.UnServiceableError.Error() diff --git a/internal/querycoordv2/meta/leader_view_manager_test.go b/internal/querycoordv2/meta/leader_view_manager_test.go index 61e441498b924..c4cc7cec2ec31 100644 --- a/internal/querycoordv2/meta/leader_view_manager_test.go +++ b/internal/querycoordv2/meta/leader_view_manager_test.go @@ -23,13 +23,11 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -359,7 +357,7 @@ func TestGetLeaderView(t *testing.T) { manager.Update(2, leaderView2) // Call GetLeaderView - leaderViews := manager.GetLeaderView() + leaderViews := manager.GetLeaderView(0) jsonOutput, err := json.Marshal(leaderViews) assert.NoError(t, err) @@ -368,7 +366,6 @@ func TestGetLeaderView(t *testing.T) { assert.NoError(t, err) assert.Len(t, result, 2) - log.Info("====", zap.Any("result", result)) checkResult := func(lv *metricsinfo.LeaderView) { if lv.LeaderID == 1 { assert.Equal(t, int64(100), lv.CollectionID) @@ -394,4 +391,10 @@ func TestGetLeaderView(t *testing.T) { for _, lv := range result { checkResult(lv) } + + leaderViews = manager.GetLeaderView(1) + assert.Len(t, leaderViews, 0) + + leaderViews = manager.GetLeaderView(100) + assert.Len(t, leaderViews, 1) } diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index 6c69dff9f24fb..560c055bf30b3 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -638,17 +638,17 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run return _c } -// GetTargetJSON provides a mock function with given fields: ctx, scope -func (_m *MockTargetManager) GetTargetJSON(ctx context.Context, scope int32) string { - ret := _m.Called(ctx, scope) +// GetTargetJSON provides a mock function with given fields: ctx, scope, collectionID +func (_m *MockTargetManager) GetTargetJSON(ctx context.Context, scope int32, collectionID int64) string { + ret := _m.Called(ctx, scope, collectionID) if len(ret) == 0 { panic("no return value specified for GetTargetJSON") } var r0 string - if rf, ok := ret.Get(0).(func(context.Context, int32) string); ok { - r0 = rf(ctx, scope) + if rf, ok := ret.Get(0).(func(context.Context, int32, int64) string); ok { + r0 = rf(ctx, scope, collectionID) } else { r0 = ret.Get(0).(string) } @@ -664,13 +664,14 @@ type MockTargetManager_GetTargetJSON_Call struct { // GetTargetJSON is a helper method to define mock.On call // - ctx context.Context // - scope int32 -func (_e *MockTargetManager_Expecter) GetTargetJSON(ctx interface{}, scope interface{}) *MockTargetManager_GetTargetJSON_Call { - return &MockTargetManager_GetTargetJSON_Call{Call: _e.mock.On("GetTargetJSON", ctx, scope)} +// - collectionID int64 +func (_e *MockTargetManager_Expecter) GetTargetJSON(ctx interface{}, scope interface{}, collectionID interface{}) *MockTargetManager_GetTargetJSON_Call { + return &MockTargetManager_GetTargetJSON_Call{Call: _e.mock.On("GetTargetJSON", ctx, scope, collectionID)} } -func (_c *MockTargetManager_GetTargetJSON_Call) Run(run func(ctx context.Context, scope int32)) *MockTargetManager_GetTargetJSON_Call { +func (_c *MockTargetManager_GetTargetJSON_Call) Run(run func(ctx context.Context, scope int32, collectionID int64)) *MockTargetManager_GetTargetJSON_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int32)) + run(args[0].(context.Context), args[1].(int32), args[2].(int64)) }) return _c } @@ -680,7 +681,7 @@ func (_c *MockTargetManager_GetTargetJSON_Call) Return(_a0 string) *MockTargetMa return _c } -func (_c *MockTargetManager_GetTargetJSON_Call) RunAndReturn(run func(context.Context, int32) string) *MockTargetManager_GetTargetJSON_Call { +func (_c *MockTargetManager_GetTargetJSON_Call) RunAndReturn(run func(context.Context, int32, int64) string) *MockTargetManager_GetTargetJSON_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index 2c04c52c7cddc..a9de0566d2c5d 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -506,7 +506,7 @@ func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, colle // It locks the ReplicaManager for reading, converts the replicas to their protobuf representation, // marshals them into a JSON string, and returns the result. // If an error occurs during marshaling, it logs a warning and returns an empty string. -func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string { +func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string { m.rwmutex.RLock() defer m.rwmutex.RUnlock() @@ -515,9 +515,19 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string { for k, v := range r.replicaPB.GetChannelNodeInfos() { channelTowRWNodes[k] = v.GetRwNodes() } + + collectionInfo := meta.GetCollection(ctx, r.GetCollectionID()) + dbID := int64(-1) + if collectionInfo == nil { + log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID())) + } else { + dbID = collectionInfo.GetDbID() + } + return &metricsinfo.Replica{ ID: r.GetID(), CollectionID: r.GetCollectionID(), + DatabaseID: dbID, RWNodes: r.GetNodes(), ResourceGroup: r.GetResourceGroup(), RONodes: r.GetRONodes(), diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index cdfc1cbdfc8b5..7921129590190 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) { err = replicaManager.put(ctx, replica2) assert.NoError(t, err) - jsonOutput := replicaManager.GetReplicasJSON(ctx) + meta := &Meta{ + CollectionManager: NewCollectionManager(catalog), + } + + err = meta.PutCollectionWithoutSave(ctx, &Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 100, + DbID: int64(1), + }, + }) + assert.NoError(t, err) + + err = meta.PutCollectionWithoutSave(ctx, &Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 200, + }, + }) + assert.NoError(t, err) + + jsonOutput := replicaManager.GetReplicasJSON(ctx, meta) var replicas []*metricsinfo.Replica err = json.Unmarshal([]byte(jsonOutput), &replicas) assert.NoError(t, err) @@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) { assert.Equal(t, int64(100), replica.CollectionID) assert.Equal(t, "rg1", replica.ResourceGroup) assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes) + assert.Equal(t, int64(1), replica.DatabaseID) } else if replica.ID == 2 { assert.Equal(t, int64(200), replica.CollectionID) assert.Equal(t, "rg2", replica.ResourceGroup) assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes) + assert.Equal(t, int64(0), replica.DatabaseID) } else { assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID) } diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 85519b7770360..38cefdcf7ad8f 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -254,9 +254,10 @@ func (m *SegmentDistManager) GetSegmentDist(collectionID int64) []*metricsinfo.S var segments []*metricsinfo.Segment for _, nodeSeg := range m.segments { for _, segment := range nodeSeg.segments { - if collectionID == 0 || segment.GetCollectionID() == collectionID { - segments = append(segments, newSegmentMetricsFrom(segment)) + if collectionID > 0 && segment.GetCollectionID() != collectionID { + continue } + segments = append(segments, newSegmentMetricsFrom(segment)) } } diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 4795eade4cfae..60491a389ae05 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -207,8 +207,13 @@ func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { return t.collectionTargetMap[collectionID] } -func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget { - return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordTarget { +func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinfo.QueryCoordTarget { + var ret []*metricsinfo.QueryCoordTarget + for k, v := range t.collectionTargetMap { + if collectionID > 0 && collectionID != k { + continue + } + segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment { return metrics.NewSegmentFrom(s) }) @@ -217,10 +222,12 @@ func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget return metrics.NewDMChannelFrom(ch.VchannelInfo) }) - return &metricsinfo.QueryCoordTarget{ + ret = append(ret, &metricsinfo.QueryCoordTarget{ CollectionID: k, Segments: segments, DMChannels: dmChannels, - } - }) + }) + } + + return ret } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 10fe0b787b55d..924f8eb6ecc18 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -70,7 +70,7 @@ type TargetManagerInterface interface { SaveCurrentTarget(ctx context.Context, catalog metastore.QueryCoordCatalog) Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool - GetTargetJSON(ctx context.Context, scope TargetScope) string + GetTargetJSON(ctx context.Context, scope TargetScope, collectionID int64) string GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool } @@ -638,7 +638,7 @@ func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, s return false } -func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) string { +func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope, collectionID int64) string { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() @@ -647,7 +647,7 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) return "" } - v, err := json.Marshal(ret.toQueryCoordCollectionTargets()) + v, err := json.Marshal(ret.toQueryCoordCollectionTargets(collectionID)) if err != nil { log.Warn("failed to marshal target", zap.Error(err)) return "" diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 34bf64136a2e2..91958bfcbd60c 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -669,7 +669,7 @@ func (suite *TargetManagerSuite) TestGetTargetJSON() { suite.NoError(suite.mgr.UpdateCollectionNextTarget(ctx, collectionID)) suite.True(suite.mgr.UpdateCollectionCurrentTarget(ctx, collectionID)) - jsonStr := suite.mgr.GetTargetJSON(ctx, CurrentTarget) + jsonStr := suite.mgr.GetTargetJSON(ctx, CurrentTarget, 0) assert.NotEmpty(suite.T(), jsonStr) var currentTarget []*metricsinfo.QueryCoordTarget @@ -679,6 +679,14 @@ func (suite *TargetManagerSuite) TestGetTargetJSON() { assert.Equal(suite.T(), collectionID, currentTarget[0].CollectionID) assert.Len(suite.T(), currentTarget[0].DMChannels, 2) assert.Len(suite.T(), currentTarget[0].Segments, 2) + + jsonStr = suite.mgr.GetTargetJSON(ctx, CurrentTarget, 1) + assert.NotEmpty(suite.T(), jsonStr) + + var currentTarget2 []*metricsinfo.QueryCoordTarget + err = json.Unmarshal([]byte(jsonStr), ¤tTarget) + suite.NoError(err) + assert.Len(suite.T(), currentTarget2, 0) } func BenchmarkTargetManager(b *testing.B) { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8ae802cf7848f..8464d3055fcff 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -202,7 +202,8 @@ func (s *Server) registerMetricsRequest() { } QueryDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.dist.GetDistributionJSON(), nil + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) + return s.dist.GetDistributionJSON(collectionID), nil } QueryTargetAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { @@ -211,11 +212,13 @@ func (s *Server) registerMetricsRequest() { if v.Exists() { scope = meta.TargetScope(v.Int()) } - return s.targetMgr.GetTargetJSON(ctx, scope), nil + + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) + return s.targetMgr.GetTargetJSON(ctx, scope, collectionID), nil } QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.meta.GetReplicasJSON(ctx), nil + return s.meta.GetReplicasJSON(ctx, s.meta), nil } QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index 1744e2367849f..fe15c685bbf0c 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re if err != nil { return nil, err } - // Spawn it in replica manager. replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels) if err != nil { diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 3cedfedb1df0b..722b8b844c45d 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -185,8 +185,8 @@ func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetr } // getChannelJSON returns the JSON string of channels -func getChannelJSON(node *QueryNode) string { - stats := node.pipelineManager.GetChannelStats() +func getChannelJSON(node *QueryNode, collectionID int64) string { + stats := node.pipelineManager.GetChannelStats(collectionID) ret, err := json.Marshal(stats) if err != nil { log.Warn("failed to marshal channels", zap.Error(err)) @@ -196,10 +196,14 @@ func getChannelJSON(node *QueryNode) string { } // getSegmentJSON returns the JSON string of segments -func getSegmentJSON(node *QueryNode) string { +func getSegmentJSON(node *QueryNode, collectionID int64) string { allSegments := node.manager.Segment.GetBy() var ms []*metricsinfo.Segment for _, s := range allSegments { + if collectionID > 0 && s.Collection() != collectionID { + continue + } + indexes := make([]*metricsinfo.IndexedField, 0, len(s.Indexes())) for _, index := range s.Indexes() { indexes = append(indexes, &metricsinfo.IndexedField{ @@ -208,6 +212,7 @@ func getSegmentJSON(node *QueryNode) string { IndexSize: index.IndexInfo.IndexSize, BuildID: index.IndexInfo.BuildID, IsLoaded: index.IsLoaded, + HasRawData: s.HasRawData(index.IndexInfo.FieldID), }) } diff --git a/internal/querynodev2/metrics_info_test.go b/internal/querynodev2/metrics_info_test.go index 03c99519a0625..6966f2b050473 100644 --- a/internal/querynodev2/metrics_info_test.go +++ b/internal/querynodev2/metrics_info_test.go @@ -59,7 +59,7 @@ func TestGetPipelineJSON(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, pipelineManager.Num()) - stats := pipelineManager.GetChannelStats() + stats := pipelineManager.GetChannelStats(0) expectedStats := []*metricsinfo.Channel{ { Name: ch, @@ -71,7 +71,7 @@ func TestGetPipelineJSON(t *testing.T) { } assert.Equal(t, expectedStats, stats) - JSONStr := getChannelJSON(&QueryNode{pipelineManager: pipelineManager}) + JSONStr := getChannelJSON(&QueryNode{pipelineManager: pipelineManager}, 0) assert.NotEmpty(t, JSONStr) var actualStats []*metricsinfo.Channel @@ -86,6 +86,7 @@ func TestGetSegmentJSON(t *testing.T) { segment.EXPECT().Collection().Return(int64(1001)) segment.EXPECT().Partition().Return(int64(2001)) segment.EXPECT().MemSize().Return(int64(1024)) + segment.EXPECT().HasRawData(mock.Anything).Return(true) segment.EXPECT().Indexes().Return([]*segments.IndexedFieldInfo{ { IndexInfo: &querypb.FieldIndexInfo{ @@ -106,7 +107,7 @@ func TestGetSegmentJSON(t *testing.T) { mockedSegmentManager.EXPECT().GetBy().Return([]segments.Segment{segment}) node.manager = &segments.Manager{Segment: mockedSegmentManager} - jsonStr := getSegmentJSON(node) + jsonStr := getSegmentJSON(node, 0) assert.NotEmpty(t, jsonStr) var segments []*metricsinfo.Segment diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 6c6e26cfc7348..e7ebaddbb1eec 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -42,7 +42,7 @@ type Manager interface { Remove(channels ...string) Start(channels ...string) error Close() - GetChannelStats() []*metricsinfo.Channel + GetChannelStats(collectionID int64) []*metricsinfo.Channel } type manager struct { @@ -157,12 +157,15 @@ func (m *manager) Close() { } } -func (m *manager) GetChannelStats() []*metricsinfo.Channel { +func (m *manager) GetChannelStats(collectionID int64) []*metricsinfo.Channel { m.mu.RLock() defer m.mu.RUnlock() ret := make([]*metricsinfo.Channel, 0, len(m.channel2Pipeline)) for ch, p := range m.channel2Pipeline { + if collectionID > 0 && p.GetCollectionID() != collectionID { + continue + } delegator, ok := m.delegators.Get(ch) if ok { tt := delegator.GetTSafe() diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index d9cbae89b13a7..ea5723e9de55b 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -284,12 +284,14 @@ func (node *QueryNode) registerMetricsRequest() { node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return getSegmentJSON(node), nil + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) + return getSegmentJSON(node, collectionID), nil }) node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return getChannelJSON(node), nil + collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq) + return getChannelJSON(node, collectionID), nil }) log.Ctx(node.ctx).Info("register metrics actions finished") } diff --git a/pkg/util/metricsinfo/metric_request.go b/pkg/util/metricsinfo/metric_request.go index 02c0e5a6fe650..98a63fa9c9413 100644 --- a/pkg/util/metricsinfo/metric_request.go +++ b/pkg/util/metricsinfo/metric_request.go @@ -86,17 +86,21 @@ const ( MetricRequestParamTargetScopeKey = "target_scope" - MetricRequestParamINKey = "in" - MetricRequestParamCollectionIDKey = "collection_id" + + MetricRequestParamINKey = "in" + MetricsRequestParamsInDC = "dc" + MetricsRequestParamsInQC = "qc" + MetricsRequestParamsInDN = "dn" + MetricsRequestParamsInQN = "qn" ) -var MetricRequestParamINValue = map[string]struct{}{ - "dc": {}, - "qc": {}, - "dn": {}, - "qn": {}, -} +var ( + RequestParamsInDC = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInDC} + RequestParamsInQC = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInQC} + RequestParamsInDN = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInDN} + RequestParamsInQN = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInQN} +) type MetricsRequestAction func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) @@ -172,6 +176,14 @@ func ParseMetricRequestType(jsonRet gjson.Result) (string, error) { return "", fmt.Errorf("%s or %s not found in request", MetricTypeKey, MetricRequestTypeKey) } +func GetCollectionIDFromRequest(jsonReq gjson.Result) int64 { + v := jsonReq.Get(MetricRequestParamCollectionIDKey) + if !v.Exists() { + return 0 + } + return v.Int() +} + // ConstructRequestByMetricType constructs a request according to the metric type func ConstructRequestByMetricType(metricType string) (*milvuspb.GetMetricsRequest, error) { m := make(map[string]interface{}) diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index 6a6b5b46f8679..cded42ef32139 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -147,6 +147,7 @@ type IndexedField struct { BuildID int64 `json:"build_id,omitempty,string"` IndexSize int64 `json:"index_size,omitempty,string"` IsLoaded bool `json:"is_loaded,omitempty,string"` + HasRawData bool `json:"has_raw_data,omitempty"` } type QueryCoordTarget struct { @@ -195,6 +196,7 @@ type ResourceGroup struct { type Replica struct { ID int64 `json:"ID,omitempty,string"` CollectionID int64 `json:"collectionID,omitempty,string"` + DatabaseID int64 `json:"database_id,omitempty,string"` RWNodes []int64 `json:"rw_nodes,omitempty"` ResourceGroup string `json:"resource_group,omitempty"` RONodes []int64 `json:"ro_nodes,omitempty"` @@ -382,8 +384,8 @@ type ImportTask struct { } type CompactionTask struct { - PlanID int64 `json:"plan_id,omitempty"` - CollectionID int64 `json:"collection_id,omitempty"` + PlanID int64 `json:"plan_id,omitempty,string"` + CollectionID int64 `json:"collection_id,omitempty,string"` Type string `json:"type,omitempty"` State string `json:"state,omitempty"` FailReason string `json:"fail_reason,omitempty"` @@ -447,7 +449,7 @@ type Collection struct { ConsistencyLevel string `json:"consistency_level,omitempty"` Aliases []string `json:"aliases,omitempty"` Properties map[string]string `json:"properties,omitempty"` - DBName string `json:"db_name,omitempty,string"` + DBName string `json:"db_name,omitempty"` NumPartitions int `json:"num_partitions,omitempty,string"` VirtualChannelNames []string `json:"virtual_channel_names,omitempty"` PhysicalChannelNames []string `json:"physical_channel_names,omitempty"` @@ -458,7 +460,7 @@ type Collection struct { type Database struct { DBName string `json:"db_name,omitempty"` - DBID int64 `json:"dbID,omitempty"` + DBID int64 `json:"dbID,omitempty,string"` CreatedTimestamp string `json:"created_timestamp,omitempty"` Properties map[string]string `json:"properties,omitempty"` }