Skip to content

Commit

Permalink
enhance: Accelerate observe collection (#38028)
Browse files Browse the repository at this point in the history
1. A collection should observe the channel only once.
2. A collection should check the CollectionLoadPercent for updates only
once.
3. Skip saving coll/partition meta if there are no changes, primarily to
accelerate collection observation after recovery.

issue: #37630

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 17, 2024
1 parent d4dab3c commit d3c174b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 37 deletions.
22 changes: 12 additions & 10 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,44 +548,46 @@ func (m *CollectionManager) updateLoadMetrics() {
metrics.QueryCoordNumPartitions.WithLabelValues().Set(float64(len(lo.Filter(lo.Values(m.partitions), func(part *Partition, _ int) bool { return part.LoadPercentage == 100 }))))
}

func (m *CollectionManager) UpdateLoadPercent(ctx context.Context, partitionID int64, loadPercent int32) (int32, error) {
func (m *CollectionManager) UpdatePartitionLoadPercent(ctx context.Context, partitionID int64, loadPercent int32) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()

oldPartition, ok := m.partitions[partitionID]
if !ok {
return 0, merr.WrapErrPartitionNotFound(partitionID)
return merr.WrapErrPartitionNotFound(partitionID)
}

// update partition load percentage
newPartition := oldPartition.Clone()
newPartition.LoadPercentage = loadPercent
savePartition := false
if loadPercent == 100 {
savePartition = true
savePartition = newPartition.Status != querypb.LoadStatus_Loaded || newPartition.RecoverTimes != 0
newPartition.Status = querypb.LoadStatus_Loaded
// if partition becomes loaded, clear it's recoverTimes in load info
newPartition.RecoverTimes = 0
elapsed := time.Since(newPartition.CreatedAt)
metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds()))
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Partition %d loaded", partitionID)))
}
err := m.putPartition(ctx, []*Partition{newPartition}, savePartition)
if err != nil {
return 0, err
}
return m.putPartition(ctx, []*Partition{newPartition}, savePartition)
}

func (m *CollectionManager) UpdateCollectionLoadPercent(ctx context.Context, collectionID int64) (int32, error) {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()

// update collection load percentage
oldCollection, ok := m.collections[newPartition.CollectionID]
oldCollection, ok := m.collections[collectionID]
if !ok {
return 0, merr.WrapErrCollectionNotFound(newPartition.CollectionID)
return 0, merr.WrapErrCollectionNotFound(collectionID)
}
collectionPercent := m.calculateLoadPercentage(oldCollection.CollectionID)
newCollection := oldCollection.Clone()
newCollection.LoadPercentage = collectionPercent
saveCollection := false
if collectionPercent == 100 {
saveCollection = true
saveCollection = newCollection.Status != querypb.LoadStatus_Loaded || newCollection.RecoverTimes != 0
if newCollection.LoadSpan != nil {
newCollection.LoadSpan.End()
newCollection.LoadSpan = nil
Expand Down
20 changes: 14 additions & 6 deletions internal/querycoordv2/meta/collection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,11 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
// update load percent, then recover for second time
for _, collectionID := range suite.collections {
for _, partitionID := range suite.partitions[collectionID] {
mgr.UpdateLoadPercent(ctx, partitionID, 10)
err = mgr.UpdatePartitionLoadPercent(ctx, partitionID, 10)
suite.NoError(err)
}
_, err = mgr.UpdateCollectionLoadPercent(ctx, collectionID)
suite.NoError(err)
}
suite.clearMemory()
err = mgr.Recover(ctx, suite.broker)
Expand Down Expand Up @@ -444,27 +447,32 @@ func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() {
})
}
// test update partition load percentage
mgr.UpdateLoadPercent(ctx, 1, 30)
err := mgr.UpdatePartitionLoadPercent(ctx, 1, 30)
suite.NoError(err)
partition := mgr.GetPartition(ctx, 1)
suite.Equal(int32(30), partition.LoadPercentage)
suite.Equal(int32(30), mgr.GetPartitionLoadPercentage(ctx, partition.PartitionID))
suite.Equal(querypb.LoadStatus_Loading, partition.Status)
collection := mgr.GetCollection(ctx, 1)
suite.Equal(int32(15), collection.LoadPercentage)
suite.Equal(int32(0), collection.LoadPercentage)
suite.Equal(querypb.LoadStatus_Loading, collection.Status)
// test update partition load percentage to 100
mgr.UpdateLoadPercent(ctx, 1, 100)
err = mgr.UpdatePartitionLoadPercent(ctx, 1, 100)
suite.NoError(err)
partition = mgr.GetPartition(ctx, 1)
suite.Equal(int32(100), partition.LoadPercentage)
suite.Equal(querypb.LoadStatus_Loaded, partition.Status)
collection = mgr.GetCollection(ctx, 1)
suite.Equal(int32(50), collection.LoadPercentage)
suite.Equal(int32(0), collection.LoadPercentage)
suite.Equal(querypb.LoadStatus_Loading, collection.Status)
// test update collection load percentage
mgr.UpdateLoadPercent(ctx, 2, 100)
err = mgr.UpdatePartitionLoadPercent(ctx, 2, 100)
suite.NoError(err)
partition = mgr.GetPartition(ctx, 1)
suite.Equal(int32(100), partition.LoadPercentage)
suite.Equal(querypb.LoadStatus_Loaded, partition.Status)
_, err = mgr.UpdateCollectionLoadPercent(ctx, 1)
suite.NoError(err)
collection = mgr.GetCollection(ctx, 1)
suite.Equal(int32(100), collection.LoadPercentage)
suite.Equal(querypb.LoadStatus_Loaded, collection.Status)
Expand Down
78 changes: 57 additions & 21 deletions internal/querycoordv2/observers/collection_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,31 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
}

loaded := true
hasUpdate := false

channelTargetNum, subChannelCount := ob.observeChannelStatus(ctx, task.CollectionID)

for _, partition := range partitions {
if partition.LoadPercentage == 100 {
continue
}
if ob.readyToObserve(ctx, partition.CollectionID) {
replicaNum := ob.meta.GetReplicaNumber(ctx, partition.GetCollectionID())
ob.observePartitionLoadStatus(ctx, partition, replicaNum)
has := ob.observePartitionLoadStatus(ctx, partition, replicaNum, channelTargetNum, subChannelCount)
if has {
hasUpdate = true
}
}
partition = ob.meta.GetPartition(ctx, partition.PartitionID)
if partition != nil && partition.LoadPercentage != 100 {
loaded = false
}
}

if hasUpdate {
ob.observeCollectionLoadStatus(ctx, task.CollectionID)
}

// all partition loaded, finish task
if len(partitions) > 0 && loaded {
log.Info("Load task finish",
Expand All @@ -293,37 +305,48 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
}
}

func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32) {
func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collectionID int64) (int, int) {
channelTargets := ob.targetMgr.GetDmChannelsByCollection(ctx, collectionID, meta.NextTarget)

channelTargetNum := len(channelTargets)
if channelTargetNum == 0 {
log.Info("channels in target is empty, waiting for new target content")
return 0, 0
}

subChannelCount := 0
for _, channel := range channelTargets {
views := ob.dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName()))
nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID })
group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, collectionID, nodes)
subChannelCount += len(group)
}
return channelTargetNum, subChannelCount
}

func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool {
log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With(
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
)

segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget)
channelTargets := ob.targetMgr.GetDmChannelsByCollection(ctx, partition.GetCollectionID(), meta.NextTarget)

targetNum := len(segmentTargets) + len(channelTargets)
targetNum := len(segmentTargets) + channelTargetNum
if targetNum == 0 {
log.Info("segments and channels in target are both empty, waiting for new target content")
return
return false
}

log.RatedInfo(10, "partition targets",
zap.Int("segmentTargetNum", len(segmentTargets)),
zap.Int("channelTargetNum", len(channelTargets)),
zap.Int("channelTargetNum", channelTargetNum),
zap.Int("totalTargetNum", targetNum),
zap.Int32("replicaNum", replicaNum),
)
loadedCount := 0
loadedCount := subChannelCount
loadPercentage := int32(0)

for _, channel := range channelTargets {
views := ob.dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName()))
nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID })
group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, partition.GetCollectionID(), nodes)
loadedCount += len(group)
}
subChannelCount := loadedCount
for _, segment := range segmentTargets {
views := ob.dist.LeaderViewManager.GetByFilter(
meta.WithChannelName2LeaderView(segment.GetInsertChannel()),
Expand All @@ -341,29 +364,42 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa

if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 {
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
return
return false
}

ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if loadPercentage == 100 {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
log.Warn("failed to manual check current target, skip update load status")
return
return false
}
delete(ob.partitionLoadedCount, partition.GetPartitionID())
}
collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(ctx, partition.PartitionID, loadPercentage)
err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage)
if err != nil {
log.Warn("failed to update load percentage")
log.Warn("failed to update partition load percentage")
}
log.Info("load status updated",
log.Info("partition load status updated",
zap.Int32("partitionLoadPercentage", loadPercentage),
)
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage)))
return true
}

func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))

collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID)
if err != nil {
log.Warn("failed to update collection load percentage")
}
log.Info("collection load status updated",
zap.Int32("collectionLoadPercentage", collectionPercentage),
)
if collectionPercentage == 100 {
ob.invalidateCache(ctx, partition.GetCollectionID())
ob.invalidateCache(ctx, collectionID)
}
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("collection %d load percentage update: %d", partition.CollectionID, loadPercentage)))
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("collection %d load percentage update: %d", collectionID, collectionPercentage)))
}

func (ob *CollectionObserver) invalidateCache(ctx context.Context, collectionID int64) {
Expand Down

0 comments on commit d3c174b

Please sign in to comment.