From 6d86b9022e09e2ba27159a79695a1373bd8dadfd Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 19 Nov 2024 10:12:30 +0800 Subject: [PATCH] enhance: Provide secondary index critria when filter leaderview (#37777) Related to #37630 --------- Signed-off-by: Congqi Xia --- internal/querycoordv2/observers/collection_observer.go | 4 +++- internal/querycoordv2/task/action.go | 4 +++- internal/querycoordv2/task/scheduler.go | 4 +++- internal/querycoordv2/task/task_test.go | 3 +++ 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 843e3a4d55065..54b6f7c52983b 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -325,7 +325,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa } subChannelCount := loadedCount for _, segment := range segmentTargets { - views := ob.dist.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(segment.GetID(), false)) + views := ob.dist.LeaderViewManager.GetByFilter( + meta.WithChannelName2LeaderView(segment.GetInsertChannel()), + meta.WithSegment2LeaderView(segment.GetID(), false)) nodes := lo.Map(views, func(view *meta.LeaderView, _ int) int64 { return view.ID }) group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, partition.GetCollectionID(), nodes) loadedCount += len(group) diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 12caa6a3f5709..bb426c9955bd6 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -124,7 +124,9 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool } // segment found in leader view - views := distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(action.SegmentID, false)) + views := distMgr.LeaderViewManager.GetByFilter( + meta.WithChannelName2LeaderView(action.Shard), + meta.WithSegment2LeaderView(action.SegmentID, false)) if len(views) == 0 { return false } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 0071cdae4867d..735045e60b295 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -351,7 +351,9 @@ func (scheduler *taskScheduler) preAdd(task Task) error { taskType := GetTaskType(task) if taskType == TaskTypeMove { - views := scheduler.distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(task.SegmentID(), false)) + views := scheduler.distMgr.LeaderViewManager.GetByFilter( + meta.WithChannelName2LeaderView(task.Shard()), + meta.WithSegment2LeaderView(task.SegmentID(), false)) if len(views) == 0 { return merr.WrapErrServiceInternal("segment's delegator not found, stop balancing") } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 7799d8cb4e76c..e1d3e50c9f817 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -478,6 +478,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() { ID: targetNode, CollectionID: suite.collection, Segments: map[int64]*querypb.SegmentDist{}, + Channel: channel.ChannelName, } for _, segment := range suite.loadSegments { view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} @@ -577,6 +578,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { ID: targetNode, CollectionID: suite.collection, Segments: map[int64]*querypb.SegmentDist{}, + Channel: channel.ChannelName, } for _, segment := range suite.loadSegments { view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} @@ -1141,6 +1143,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { ID: targetNode, CollectionID: suite.collection, Segments: map[int64]*querypb.SegmentDist{}, + Channel: channel.ChannelName, } for _, segment := range suite.loadSegments[1:] { view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}