Skip to content

Commit

Permalink
fix: Querycoord will trigger unexpected balance task after restart (m…
Browse files Browse the repository at this point in the history
…ilvus-io#38630)

issue: milvus-io#38606

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 25, 2024
1 parent bc15ad2 commit f49d618
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 40 deletions.
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ message CheckerInfo {
message SegmentTarget {
int64 ID = 1;
data.SegmentLevel level = 2;
int64 num_of_rows = 3;
}

message PartitionTarget {
Expand Down
32 changes: 16 additions & 16 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
return nil
}

// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
// todo: should also check distribution and leader view in the future
return !b.targetMgr.IsCurrentTargetReady(ctx, cid)
})
if len(notReadyCollections) > 0 {
log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections))
return nil
}

// iterator one normal collection in one round
normalReplicasToBalance := make([]int64, 0)
hasUnbalancedCollection := false
Expand Down
32 changes: 14 additions & 18 deletions internal/querycoordv2/checkers/balance_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,20 +324,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)

segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
mockTarget := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTarget

// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
Expand All @@ -347,8 +335,6 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))

cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
Expand All @@ -358,6 +344,17 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)

// test normal balance when one collection has unready target
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
replicasToBalance := suite.checker.replicasToBalance(ctx)
suite.Len(replicasToBalance, 0)

// test stopping balance with target not ready
mockTarget.ExpectedCalls = nil
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(false)
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid1), mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid2), mock.Anything).Return(false)
mr1 := replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
Expand All @@ -366,9 +363,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
mr2.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())

// test stopping balance
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.replicasToBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
}

Expand Down
47 changes: 47 additions & 0 deletions internal/querycoordv2/meta/mock_target_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 27 additions & 6 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -34,6 +36,9 @@ type CollectionTarget struct {
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64

// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
lackSegmentInfo bool
}

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
Expand All @@ -50,15 +55,20 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
dmChannels := make(map[string]*DmChannel)
var partitions []int64

lackSegmentInfo := false
for _, t := range target.GetChannelTargets() {
for _, partition := range t.GetPartitionTargets() {
for _, segment := range partition.GetSegments() {
if segment.GetNumOfRows() <= 0 {
lackSegmentInfo = true
}
segments[segment.GetID()] = &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
PartitionID: partition.GetPartitionID(),
InsertChannel: t.GetChannelName(),
NumOfRows: segment.GetNumOfRows(),
}
}
partitions = append(partitions, partition.GetPartitionID())
Expand All @@ -75,11 +85,16 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}
}

if lackSegmentInfo {
log.Info("target has lack of segment info", zap.Int64("collectionID", target.GetCollectionID()))
}

return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
}
}

Expand Down Expand Up @@ -113,8 +128,9 @@ func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget {
}

partitionTarget.Segments = append(partitionTarget.Segments, &querypb.SegmentTarget{
ID: info.GetID(),
Level: info.GetLevel(),
ID: info.GetID(),
Level: info.GetLevel(),
NumOfRows: info.GetNumOfRows(),
})
}
}
Expand Down Expand Up @@ -159,6 +175,11 @@ func (p *CollectionTarget) IsEmpty() bool {
return len(p.dmChannels)+len(p.segments) == 0
}

// if target is ready, it should have all segment info
func (p *CollectionTarget) Ready() bool {
return !p.lackSegmentInfo
}

type target struct {
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget
Expand Down
12 changes: 12 additions & 0 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type TargetManagerInterface interface {
CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool
GetTargetJSON(ctx context.Context, scope TargetScope) string
GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error)
IsCurrentTargetReady(ctx context.Context, collectionID int64) bool
}

type TargetManager struct {
Expand Down Expand Up @@ -673,3 +674,14 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target {

return mgr.next
}

func (mgr *TargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
target, ok := mgr.current.collectionTargetMap[collectionID]
if !ok {
return false
}

return target.Ready()
}
6 changes: 6 additions & 0 deletions internal/querycoordv2/meta/target_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,13 @@ func (suite *TargetManagerSuite) TestRecover() {
ID: 11,
PartitionID: 1,
InsertChannel: "channel-1",
NumOfRows: 100,
},
{
ID: 12,
PartitionID: 1,
InsertChannel: "channel-2",
NumOfRows: 100,
},
}

Expand All @@ -609,6 +611,10 @@ func (suite *TargetManagerSuite) TestRecover() {
suite.Len(target.GetAllDmChannelNames(), 2)
suite.Len(target.GetAllSegmentIDs(), 2)
suite.Equal(target.GetTargetVersion(), version)
for _, segment := range target.GetAllSegments() {
suite.Equal(int64(100), segment.GetNumOfRows())
}
suite.True(target.Ready())

// after recover, target info should be cleaned up
targets, err := suite.catalog.GetCollectionTargets(ctx)
Expand Down

0 comments on commit f49d618

Please sign in to comment.