Skip to content

Commit

Permalink
refine target manager code style (milvus-io#27883)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Oct 24, 2023
1 parent 4640928 commit e0222b2
Show file tree
Hide file tree
Showing 17 changed files with 87 additions and 87 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
// Only balance segments in targets
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
})

if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
// Only balance segments in targets
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil
})

if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/checkers/index_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
)
var tasks []task.Task

segments := c.getHistoricalSegmentsDist(replica)
segments := c.getSealedSegmentsDist(replica)
idSegments := make(map[int64]*meta.Segment)

targets := make(map[int64][]int64) // segmentID => FieldID
Expand Down Expand Up @@ -133,7 +133,7 @@ func (c *IndexChecker) checkSegment(ctx context.Context, segment *meta.Segment,
return result
}

func (c *IndexChecker) getHistoricalSegmentsDist(replica *meta.Replica) []*meta.Segment {
func (c *IndexChecker) getSealedSegmentsDist(replica *meta.Replica) []*meta.Segment {
var ret []*meta.Segment
for _, node := range replica.GetNodes() {
ret = append(ret, c.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, node)...)
Expand Down
30 changes: 15 additions & 15 deletions internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
}

// compare with targets to find the lack and redundancy of segments
lacks, redundancies := c.getHistoricalSegmentDiff(replica.GetCollectionID(), replica.GetID())
lacks, redundancies := c.getSealedSegmentDiff(replica.GetCollectionID(), replica.GetID())
tasks := c.createSegmentLoadTasks(ctx, lacks, replica)
task.SetReason("lacks of segment", tasks...)
ret = append(ret, tasks...)
Expand All @@ -122,23 +122,23 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
ret = append(ret, tasks...)

// compare inner dists to find repeated loaded segments
redundancies = c.findRepeatedHistoricalSegments(replica.GetID())
redundancies = c.findRepeatedSealedSegments(replica.GetID())
redundancies = c.filterExistedOnLeader(replica, redundancies)
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Historical)
task.SetReason("redundancies of segment", tasks...)
ret = append(ret, tasks...)

// compare with target to find the lack and redundancy of segments
_, redundancies = c.getStreamingSegmentDiff(replica.GetCollectionID(), replica.GetID())
_, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID())
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Streaming)
task.SetReason("streaming segment not exists in target", tasks...)
ret = append(ret, tasks...)

return ret
}

// GetStreamingSegmentDiff get streaming segment diff between leader view and target
func (c *SegmentChecker) getStreamingSegmentDiff(collectionID int64,
// GetGrowingSegmentDiff get streaming segment diff between leader view and target
func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64,
replicaID int64,
) (toLoad []*datapb.SegmentInfo, toRelease []*meta.Segment) {
replica := c.meta.Get(replicaID)
Expand Down Expand Up @@ -171,8 +171,8 @@ func (c *SegmentChecker) getStreamingSegmentDiff(collectionID int64,
continue
}

nextTargetSegmentIDs := c.targetMgr.GetStreamingSegmentsByCollection(collectionID, meta.NextTarget)
currentTargetSegmentIDs := c.targetMgr.GetStreamingSegmentsByCollection(collectionID, meta.CurrentTarget)
nextTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.NextTarget)
currentTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.CurrentTarget)
currentTargetChannelMap := c.targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)

// get segment which exist on leader view, but not on current target and next target
Expand All @@ -196,8 +196,8 @@ func (c *SegmentChecker) getStreamingSegmentDiff(collectionID int64,
return
}

// GetHistoricalSegmentDiff get historical segment diff between target and dist
func (c *SegmentChecker) getHistoricalSegmentDiff(
// GetSealedSegmentDiff get historical segment diff between target and dist
func (c *SegmentChecker) getSealedSegmentDiff(
collectionID int64,
replicaID int64,
) (toLoad []*datapb.SegmentInfo, toRelease []*meta.Segment) {
Expand All @@ -206,7 +206,7 @@ func (c *SegmentChecker) getHistoricalSegmentDiff(
log.Info("replica does not exist, skip it")
return
}
dist := c.getHistoricalSegmentsDist(replica)
dist := c.getSealedSegmentsDist(replica)
sort.Slice(dist, func(i, j int) bool {
return dist[i].Version < dist[j].Version
})
Expand All @@ -215,8 +215,8 @@ func (c *SegmentChecker) getHistoricalSegmentDiff(
distMap[s.GetID()] = s.Node
}

nextTargetMap := c.targetMgr.GetHistoricalSegmentsByCollection(collectionID, meta.NextTarget)
currentTargetMap := c.targetMgr.GetHistoricalSegmentsByCollection(collectionID, meta.CurrentTarget)
nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.NextTarget)
currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget)

// Segment which exist on next target, but not on dist
for segmentID, segment := range nextTargetMap {
Expand Down Expand Up @@ -256,22 +256,22 @@ func (c *SegmentChecker) getHistoricalSegmentDiff(
return
}

func (c *SegmentChecker) getHistoricalSegmentsDist(replica *meta.Replica) []*meta.Segment {
func (c *SegmentChecker) getSealedSegmentsDist(replica *meta.Replica) []*meta.Segment {
ret := make([]*meta.Segment, 0)
for _, node := range replica.GetNodes() {
ret = append(ret, c.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, node)...)
}
return ret
}

func (c *SegmentChecker) findRepeatedHistoricalSegments(replicaID int64) []*meta.Segment {
func (c *SegmentChecker) findRepeatedSealedSegments(replicaID int64) []*meta.Segment {
segments := make([]*meta.Segment, 0)
replica := c.meta.Get(replicaID)
if replica == nil {
log.Info("replica does not exist, skip it")
return segments
}
dist := c.getHistoricalSegmentsDist(replica)
dist := c.getSealedSegmentsDist(replica)
versions := make(map[int64]*meta.Segment)
for _, s := range dist {
maxVer, ok := versions[s.GetID()]
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ func (dh *distHandler) updateSegmentsDistribution(resp *querypb.GetDataDistribut
updates := make([]*meta.Segment, 0, len(resp.GetSegments()))
for _, s := range resp.GetSegments() {
// for collection which is already loaded
segmentInfo := dh.target.GetHistoricalSegment(s.GetCollection(), s.GetID(), meta.CurrentTarget)
segmentInfo := dh.target.GetSealedSegment(s.GetCollection(), s.GetID(), meta.CurrentTarget)
if segmentInfo == nil {
// for collection which is loading
segmentInfo = dh.target.GetHistoricalSegment(s.GetCollection(), s.GetID(), meta.NextTarget)
segmentInfo = dh.target.GetSealedSegment(s.GetCollection(), s.GetID(), meta.NextTarget)
}
var segment *meta.Segment
if segmentInfo == nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *Server) checkAnyReplicaAvailable(collectionID int64) bool {

func (s *Server) getCollectionSegmentInfo(collection int64) []*querypb.SegmentInfo {
segments := s.dist.SegmentDistManager.GetByCollection(collection)
currentTargetSegmentsMap := s.targetMgr.GetHistoricalSegmentsByCollection(collection, meta.CurrentTarget)
currentTargetSegmentsMap := s.targetMgr.GetSealedSegmentsByCollection(collection, meta.CurrentTarget)
infos := make(map[int64]*querypb.SegmentInfo)
for _, segment := range segments {
if _, existCurrentTarget := currentTargetSegmentsMap[segment.GetID()]; !existCurrentTarget {
Expand Down Expand Up @@ -109,7 +109,7 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe
// Only balance segments in targets
segments := s.dist.SegmentDistManager.GetByCollectionAndNode(req.GetCollectionID(), srcNode)
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
return s.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil
return s.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil
})
allSegments := make(map[int64]*meta.Segment)
for _, segment := range segments {
Expand Down
8 changes: 4 additions & 4 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,7 @@ func (suite *JobSuite) assertCollectionLoaded(collection int64) {
}
for _, segments := range suite.segments[collection] {
for _, segment := range segments {
suite.NotNil(suite.targetMgr.GetHistoricalSegment(collection, segment, meta.CurrentTarget))
suite.NotNil(suite.targetMgr.GetSealedSegment(collection, segment, meta.CurrentTarget))
}
}
}
Expand All @@ -1501,7 +1501,7 @@ func (suite *JobSuite) assertPartitionLoaded(collection int64, partitionIDs ...i
}
suite.NotNil(suite.meta.GetPartition(partitionID))
for _, segment := range segments {
suite.NotNil(suite.targetMgr.GetHistoricalSegment(collection, segment, meta.CurrentTarget))
suite.NotNil(suite.targetMgr.GetSealedSegment(collection, segment, meta.CurrentTarget))
}
}
}
Expand All @@ -1514,7 +1514,7 @@ func (suite *JobSuite) assertCollectionReleased(collection int64) {
}
for _, partitions := range suite.segments[collection] {
for _, segment := range partitions {
suite.Nil(suite.targetMgr.GetHistoricalSegment(collection, segment, meta.CurrentTarget))
suite.Nil(suite.targetMgr.GetSealedSegment(collection, segment, meta.CurrentTarget))
}
}
}
Expand All @@ -1524,7 +1524,7 @@ func (suite *JobSuite) assertPartitionReleased(collection int64, partitionIDs ..
suite.Nil(suite.meta.GetPartition(partition))
segments := suite.segments[collection][partition]
for _, segment := range segments {
suite.Nil(suite.targetMgr.GetHistoricalSegment(collection, segment, meta.CurrentTarget))
suite.Nil(suite.targetMgr.GetSealedSegment(collection, segment, meta.CurrentTarget))
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target {
return mgr.next
}

func (mgr *TargetManager) GetStreamingSegmentsByCollection(collectionID int64,
func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64,
scope TargetScope,
) typeutil.UniqueSet {
mgr.rwMutex.RLock()
Expand All @@ -345,7 +345,7 @@ func (mgr *TargetManager) GetStreamingSegmentsByCollection(collectionID int64,
return segments
}

func (mgr *TargetManager) GetStreamingSegmentsByChannel(collectionID int64,
func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
channelName string,
scope TargetScope,
) typeutil.UniqueSet {
Expand All @@ -369,7 +369,7 @@ func (mgr *TargetManager) GetStreamingSegmentsByChannel(collectionID int64,
return segments
}

func (mgr *TargetManager) GetHistoricalSegmentsByCollection(collectionID int64,
func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
Expand All @@ -384,7 +384,7 @@ func (mgr *TargetManager) GetHistoricalSegmentsByCollection(collectionID int64,
return collectionTarget.GetAllSegments()
}

func (mgr *TargetManager) GetHistoricalSegmentsByChannel(collectionID int64,
func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
channelName string,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
Expand Down Expand Up @@ -430,7 +430,7 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
return channel.GetDroppedSegmentIds()
}

func (mgr *TargetManager) GetHistoricalSegmentsByPartition(collectionID int64,
func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
partitionID int64, scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
Expand Down Expand Up @@ -479,7 +479,7 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope
return collectionTarget.GetAllDmChannels()[channel]
}

func (mgr *TargetManager) GetHistoricalSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
Expand Down
Loading

0 comments on commit e0222b2

Please sign in to comment.