diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 828cedc6e5ce3..55d3371252d94 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) { } func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) { + tr := timerecord.NewTimeRecorder("") resp, err := dh.getDistribution(ctx) + d1 := tr.RecordSpan() if err != nil { node := dh.nodeManager.Get(dh.nodeID) *failures = *failures + 1 @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat())) } fields = append(fields, zap.Error(err)) - log.RatedWarn(30.0, "failed to get data distribution", fields...) + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60). + RatedWarn(30.0, "failed to get data distribution", fields...) } else { *failures = 0 dh.handleDistResp(ctx, resp, dispatchTask) } + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120). + RatedInfo(120.0, "pull and handle distribution done", + zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan())) } func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetDataDistributionResponse, dispatchTask bool) { diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 4795eade4cfae..1b357af21de9a 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -26,43 +26,70 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/metrics" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // CollectionTarget collection target is immutable, type CollectionTarget struct { - segments map[int64]*datapb.SegmentInfo - dmChannels map[string]*DmChannel - partitions typeutil.Set[int64] // stores target partitions info - version int64 + segments map[int64]*datapb.SegmentInfo + channel2Segments map[string][]*datapb.SegmentInfo + partition2Segments map[int64][]*datapb.SegmentInfo + dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info + version int64 // record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info. lackSegmentInfo bool } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { + channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels)) + partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs)) + for _, segment := range segments { + channel := segment.GetInsertChannel() + if _, ok := channel2Segments[channel]; !ok { + channel2Segments[channel] = make([]*datapb.SegmentInfo, 0) + } + channel2Segments[channel] = append(channel2Segments[channel], segment) + partitionID := segment.GetPartitionID() + if _, ok := partition2Segments[partitionID]; !ok { + partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0) + } + partition2Segments[partitionID] = append(partition2Segments[partitionID], segment) + } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitionIDs...), - version: time.Now().UnixNano(), + segments: segments, + channel2Segments: channel2Segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), + version: time.Now().UnixNano(), } } func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + channel2Segments := make(map[string][]*datapb.SegmentInfo) + partition2Segments := make(map[int64][]*datapb.SegmentInfo) var partitions []int64 lackSegmentInfo := false for _, t := range target.GetChannelTargets() { + if _, ok := channel2Segments[t.GetChannelName()]; !ok { + channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0) + } for _, partition := range t.GetPartitionTargets() { + if _, ok := partition2Segments[partition.GetPartitionID()]; !ok { + partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments())) + } for _, segment := range partition.GetSegments() { if segment.GetNumOfRows() <= 0 { lackSegmentInfo = true } - segments[segment.GetID()] = &datapb.SegmentInfo{ + info := &datapb.SegmentInfo{ ID: segment.GetID(), Level: segment.GetLevel(), CollectionID: target.GetCollectionID(), @@ -70,6 +97,9 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget InsertChannel: t.GetChannelName(), NumOfRows: segment.GetNumOfRows(), } + segments[segment.GetID()] = info + channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info) + partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info) } partitions = append(partitions, partition.GetPartitionID()) } @@ -90,11 +120,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitions...), - version: target.GetVersion(), - lackSegmentInfo: lackSegmentInfo, + segments: segments, + channel2Segments: channel2Segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitions...), + version: target.GetVersion(), + lackSegmentInfo: lackSegmentInfo, } } @@ -155,6 +187,14 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo { return p.segments } +func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo { + return p.channel2Segments[channel] +} + +func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo { + return p.partition2Segments[partitionID] +} + func (p *CollectionTarget) GetTargetVersion() int64 { return p.version } @@ -181,34 +221,40 @@ func (p *CollectionTarget) Ready() bool { } type target struct { + keyLock *lock.KeyLock[int64] // guards updateCollectionTarget // just maintain target at collection level - collectionTargetMap map[int64]*CollectionTarget + collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget] } func newTarget() *target { return &target{ - collectionTargetMap: make(map[int64]*CollectionTarget), + keyLock: lock.NewKeyLock[int64](), + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), } } func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) { - if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() { + t.keyLock.Lock(collectionID) + defer t.keyLock.Unlock(collectionID) + if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() { return } - t.collectionTargetMap[collectionID] = target + t.collectionTargetMap.Insert(collectionID, target) } func (t *target) removeCollectionTarget(collectionID int64) { - delete(t.collectionTargetMap, collectionID) + t.collectionTargetMap.Remove(collectionID) } func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { - return t.collectionTargetMap[collectionID] + ret, _ := t.collectionTargetMap.Get(collectionID) + return ret } func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget { - return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordTarget { + targets := make([]*metricsinfo.QueryCoordTarget, 0, t.collectionTargetMap.Len()) + t.collectionTargetMap.Range(func(k int64, v *CollectionTarget) bool { segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment { return metrics.NewSegmentFrom(s) }) @@ -217,10 +263,13 @@ func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget return metrics.NewDMChannelFrom(ch.VchannelInfo) }) - return &metricsinfo.QueryCoordTarget{ + qct := &metricsinfo.QueryCoordTarget{ CollectionID: k, Segments: segments, DMChannels: dmChannels, } + targets = append(targets, qct) + return true }) + return targets } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 10fe0b787b55d..b82032023bc7d 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -76,9 +76,8 @@ type TargetManagerInterface interface { } type TargetManager struct { - rwMutex sync.RWMutex - broker Broker - meta *Meta + broker Broker + meta *Meta // all read segment/channel operation happens on current -> only current target are visible to outer // all add segment/channel operation happens on next -> changes can only happen on next target @@ -100,8 +99,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager { // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, // which may make the current target not available func (mgr *TargetManager) UpdateCollectionCurrentTarget(ctx context.Context, collectionID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID)) log.Debug("start to update current target for collection") @@ -157,8 +154,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec return err } - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() partitions := mgr.meta.GetPartitionsByCollection(ctx, collectionID) partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID @@ -188,7 +183,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec } for _, infos := range channelInfos { - merged := mgr.mergeDmChannelInfo(infos) + merged := mergeDmChannelInfo(infos) dmChannels[merged.GetChannelName()] = merged } @@ -198,7 +193,9 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec } allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) + mgr.next.updateCollectionTarget(collectionID, allocatedTarget) + log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) @@ -206,7 +203,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec return nil } -func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { +func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { var dmChannel *DmChannel for _, info := range infos { @@ -228,8 +225,6 @@ func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmCh // RemoveCollection removes all channels and segments in the given collection func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log.Info("remove collection from targets", zap.Int64("collectionID", collectionID)) @@ -250,9 +245,6 @@ func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int // RemovePartition removes all segment in the given partition, // NOTE: this doesn't remove any channel even the given one is the only partition func (mgr *TargetManager) RemovePartition(ctx context.Context, collectionID int64, partitionIDs ...int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - log := log.With(zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) @@ -359,9 +351,6 @@ func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID in func (mgr *TargetManager) GetGrowingSegmentsByCollection(ctx context.Context, collectionID int64, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -382,9 +371,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle channelName string, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := typeutil.NewUniqueSet() @@ -405,9 +391,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle func (mgr *TargetManager) GetSealedSegmentsByCollection(ctx context.Context, collectionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -421,17 +404,11 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(ctx context.Context, collec channelName string, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { - ret := make(map[int64]*datapb.SegmentInfo) - for k, v := range t.GetAllSegments() { - if v.GetInsertChannel() == channelName { - ret[k] = v - } - } + ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 { + return s.GetID() + }) if len(ret) > 0 { return ret @@ -445,9 +422,6 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(ctx context.Context, colle channelName string, scope TargetScope, ) []int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if channel, ok := t.dmChannels[channelName]; ok { @@ -462,16 +436,11 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll partitionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := make(map[int64]*datapb.SegmentInfo) - for _, s := range t.GetAllSegments() { - if s.GetPartitionID() == partitionID { - segments[s.GetID()] = s - } + for _, s := range t.GetPartitionSegments(partitionID) { + segments[s.GetID()] = s } if len(segments) > 0 { @@ -483,9 +452,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll } func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collectionID int64, scope TargetScope) map[string]*DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -496,9 +462,6 @@ func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collect } func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64, channel string, scope TargetScope) *DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if ch, ok := t.GetAllDmChannels()[channel]; ok { @@ -509,9 +472,6 @@ func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64, } func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if s, ok := t.GetAllSegments()[id]; ok { @@ -523,9 +483,6 @@ func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int } func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collectionID int64, scope TargetScope) int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if t.GetTargetVersion() > 0 { @@ -537,9 +494,6 @@ func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collec } func (mgr *TargetManager) IsCurrentTargetExist(ctx context.Context, collectionID int64, partitionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(CurrentTarget, collectionID) return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0 @@ -552,8 +506,6 @@ func (mgr *TargetManager) IsNextTargetExist(ctx context.Context, collectionID in } func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metastore.QueryCoordCatalog) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() if mgr.current != nil { // use pool here to control maximal writer used by save target pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2) @@ -577,13 +529,14 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto }) } tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) - for id, target := range mgr.current.collectionTargetMap { + mgr.current.collectionTargetMap.Range(func(id int64, target *CollectionTarget) bool { tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg())) if len(tasks) >= batchSize { submit(tasks) tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) } - } + return true + }) if len(tasks) > 0 { submit(tasks) } @@ -592,9 +545,6 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto } func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - targets, err := catalog.GetCollectionTargets(ctx) if err != nil { log.Warn("failed to recover collection target from etcd", zap.Error(err)) @@ -623,8 +573,6 @@ func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCo // if segment isn't l0 segment, and exist in current/next target, then it can be moved func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() current := mgr.current.getCollectionTarget(collectionID) if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 { return true @@ -639,9 +587,6 @@ func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, s } func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) string { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - ret := mgr.getTarget(scope) if ret == nil { return "" @@ -656,9 +601,6 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) } func (mgr *TargetManager) GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - ret := mgr.getCollectionTarget(scope, collectionID) if len(ret) == 0 { return nil, merr.WrapErrCollectionNotLoaded(collectionID) @@ -676,9 +618,7 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target { } func (mgr *TargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - target, ok := mgr.current.collectionTargetMap[collectionID] + target, ok := mgr.current.collectionTargetMap.Get(collectionID) if !ok { return false } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 34bf64136a2e2..95510a6cce895 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -425,33 +425,38 @@ func (suite *TargetManagerSuite) TestGetTarget() { current := &CollectionTarget{} next := &CollectionTarget{} + t1 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t2 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t3 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t4 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t1.Insert(1000, current) + t2.Insert(1000, next) + t3.Insert(1000, current) + t4.Insert(1000, current) + bothMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t1, }, next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: next, - }, + collectionTargetMap: t2, }, } currentMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t3, + }, + next: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - next: &target{}, } nextMgr := &TargetManager{ next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t4, + }, + current: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - current: &target{}, } cases := []testCase{ @@ -720,7 +725,7 @@ func BenchmarkTargetManager(b *testing.B) { collectionNum := 10000 for i := 0; i < collectionNum; i++ { - mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil) + mgr.current.collectionTargetMap.Insert(int64(i), NewCollectionTarget(segments, channels, nil)) } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 99f8c2f06a341..90ae9dfb23593 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(ctx context.Context, collectionID i func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { loading := false + observeTaskNum := 0 + observeStart := time.Now() ob.loadTasks.Range(func(traceID string, task LoadTask) bool { loading = true + observeTaskNum++ + start := time.Now() collection := ob.meta.CollectionManager.GetCollection(ctx, task.CollectionID) if collection == nil { return true @@ -296,9 +300,12 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { ob.loadTasks.Remove(traceID) } + log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start))) return true }) + log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart))) + // trigger check logic when loading collections/partitions if loading { ob.checkerController.Check() @@ -325,11 +332,6 @@ func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collecti } func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool { - log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With( - zap.Int64("collectionID", partition.GetCollectionID()), - zap.Int64("partitionID", partition.GetPartitionID()), - ) - segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget) targetNum := len(segmentTargets) + channelTargetNum @@ -338,7 +340,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa return false } - log.RatedInfo(10, "partition targets", + log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int("segmentTargetNum", len(segmentTargets)), zap.Int("channelTargetNum", channelTargetNum), zap.Int("totalTargetNum", targetNum), @@ -355,11 +359,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, partition.GetCollectionID(), nodes) loadedCount += len(group) } - if loadedCount > 0 { - log.Info("partition load progress", - zap.Int("subChannelCount", subChannelCount), - zap.Int("loadSegmentCount", loadedCount-subChannelCount)) - } loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum))) if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 { @@ -370,30 +369,37 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { - log.Warn("failed to manual check current target, skip update load status") + log.Ctx(ctx).Warn("failed to manual check current target, skip update load status", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) return false } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage) if err != nil { - log.Warn("failed to update partition load percentage") + log.Ctx(ctx).Warn("failed to update partition load percentage", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) } - log.Info("partition load status updated", + log.Ctx(ctx).Info("partition load status updated", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int32("partitionLoadPercentage", loadPercentage), + zap.Int("subChannelCount", subChannelCount), + zap.Int("loadSegmentCount", loadedCount-subChannelCount), ) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) return true } func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) { - log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) - collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID) if err != nil { - log.Warn("failed to update collection load percentage") + log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID)) } - log.Info("collection load status updated", + log.Ctx(ctx).Info("collection load status updated", + zap.Int64("collectionID", collectionID), zap.Int32("collectionLoadPercentage", collectionPercentage), ) if collectionPercentage == 100 { diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index dfbc4c44ddf52..2aa5a90d4b2ab 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -24,7 +24,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -116,49 +115,7 @@ func (action *SegmentAction) GetScope() querypb.DataScope { } func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { - if action.Type() == ActionTypeGrow { - // rpc finished - if !action.rpcReturned.Load() { - return false - } - - // segment found in leader view - views := distMgr.LeaderViewManager.GetByFilter( - meta.WithChannelName2LeaderView(action.Shard), - meta.WithSegment2LeaderView(action.SegmentID, false)) - if len(views) == 0 { - return false - } - - // segment found in dist - segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID)) - return len(segmentInTargetNode) > 0 - } else if action.Type() == ActionTypeReduce { - // FIXME: Now shard leader's segment view is a map of segment ID to node ID, - // loading segment replaces the node ID with the new one, - // which confuses the condition of finishing, - // the leader should return a map of segment ID to list of nodes, - // now, we just always commit the release task to executor once. - // NOTE: DO NOT create a task containing release action and the action is not the last action - sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node())) - views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node())) - growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 { - return lo.Keys(view.GrowingSegments) - }) - segments := make([]int64, 0, len(sealed)+len(growing)) - for _, segment := range sealed { - segments = append(segments, segment.GetID()) - } - segments = append(segments, growing...) - if !funcutil.SliceContain(segments, action.GetSegmentID()) { - return true - } - return action.rpcReturned.Load() - } else if action.Type() == ActionTypeUpdate { - return action.rpcReturned.Load() - } - - return true + return action.rpcReturned.Load() } func (action *SegmentAction) Desc() string { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 316f1a552be71..fe432b4bccbbc 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -38,7 +38,9 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/timerecord" . "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -91,6 +93,7 @@ type replicaChannelIndex struct { } type taskQueue struct { + mu sync.RWMutex // TaskPriority -> TaskID -> Task buckets []map[int64]Task } @@ -106,6 +109,8 @@ func newTaskQueue() *taskQueue { } func (queue *taskQueue) Len() int { + queue.mu.RLock() + defer queue.mu.RUnlock() taskNum := 0 for _, tasks := range queue.buckets { taskNum += len(tasks) @@ -115,17 +120,23 @@ func (queue *taskQueue) Len() int { } func (queue *taskQueue) Add(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] bucket[task.ID()] = task } func (queue *taskQueue) Remove(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] delete(bucket, task.ID()) } // Range iterates all tasks in the queue ordered by priority from high to low func (queue *taskQueue) Range(fn func(task Task) bool) { + queue.mu.RLock() + defer queue.mu.RUnlock() for priority := len(queue.buckets) - 1; priority >= 0; priority-- { for _, task := range queue.buckets[priority] { if !fn(task) { @@ -153,9 +164,8 @@ type Scheduler interface { } type taskScheduler struct { - rwmutex sync.RWMutex ctx context.Context - executors map[int64]*Executor // NodeID -> Executor + executors *ConcurrentMap[int64, *Executor] // NodeID -> Executor idAllocator func() UniqueID distMgr *meta.DistributionManager @@ -165,9 +175,11 @@ type taskScheduler struct { cluster session.Cluster nodeMgr *session.NodeManager - tasks UniqueSet - segmentTasks map[replicaSegmentIndex]Task - channelTasks map[replicaChannelIndex]Task + scheduleMu sync.Mutex // guards schedule() + collKeyLock *lock.KeyLock[int64] // guards Add() + tasks *ConcurrentMap[UniqueID, struct{}] + segmentTasks *ConcurrentMap[replicaSegmentIndex, Task] + channelTasks *ConcurrentMap[replicaChannelIndex, Task] processQueue *taskQueue waitQueue *taskQueue taskStats *expirable.LRU[UniqueID, Task] @@ -184,7 +196,7 @@ func NewScheduler(ctx context.Context, id := time.Now().UnixMilli() return &taskScheduler{ ctx: ctx, - executors: make(map[int64]*Executor), + executors: NewConcurrentMap[int64, *Executor](), idAllocator: func() UniqueID { id++ return id @@ -197,9 +209,10 @@ func NewScheduler(ctx context.Context, cluster: cluster, nodeMgr: nodeMgr, - tasks: make(UniqueSet), - segmentTasks: make(map[replicaSegmentIndex]Task), - channelTasks: make(map[replicaChannelIndex]Task), + collKeyLock: lock.NewKeyLock[int64](), + tasks: NewConcurrentMap[UniqueID, struct{}](), + segmentTasks: NewConcurrentMap[replicaSegmentIndex, Task](), + channelTasks: NewConcurrentMap[replicaChannelIndex, Task](), processQueue: newTaskQueue(), waitQueue: newTaskQueue(), taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15), @@ -209,30 +222,22 @@ func NewScheduler(ctx context.Context, func (scheduler *taskScheduler) Start() {} func (scheduler *taskScheduler) Stop() { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for nodeID, executor := range scheduler.executors { + scheduler.executors.Range(func(nodeID int64, executor *Executor) bool { executor.Stop() - delete(scheduler.executors, nodeID) - } + return true + }) - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { scheduler.remove(task) - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { scheduler.remove(task) - } + return true + }) } func (scheduler *taskScheduler) AddExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - if _, exist := scheduler.executors[nodeID]; exist { - return - } - executor := NewExecutor(scheduler.meta, scheduler.distMgr, scheduler.broker, @@ -240,27 +245,24 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) { scheduler.cluster, scheduler.nodeMgr) - scheduler.executors[nodeID] = executor + if _, exist := scheduler.executors.GetOrInsert(nodeID, executor); exist { + return + } executor.Start(scheduler.ctx) log.Ctx(scheduler.ctx).Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) } func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.GetAndRemove(nodeID) if ok { executor.Stop() - delete(scheduler.executors, nodeID) log.Ctx(scheduler.ctx).Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) } } func (scheduler *taskScheduler) Add(task Task) error { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - + scheduler.collKeyLock.Lock(task.CollectionID()) + defer scheduler.collKeyLock.Unlock(task.CollectionID()) err := scheduler.preAdd(task) if err != nil { task.Cancel(err) @@ -269,19 +271,19 @@ func (scheduler *taskScheduler) Add(task Task) error { task.SetID(scheduler.idAllocator()) scheduler.waitQueue.Add(task) - scheduler.tasks.Insert(task.ID()) + scheduler.tasks.Insert(task.ID(), struct{}{}) switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - scheduler.channelTasks[index] = task + scheduler.channelTasks.Insert(index, task) case *LeaderTask: index := NewReplicaLeaderIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) } scheduler.taskStats.Add(task.ID(), task) @@ -292,21 +294,39 @@ func (scheduler *taskScheduler) Add(task Task) error { } func (scheduler *taskScheduler) updateTaskMetrics() { - segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 + segmentGrowNum, segmentReduceNum, segmentUpdateNum, segmentMoveNum := 0, 0, 0, 0 + leaderGrowNum, leaderReduceNum, leaderUpdateNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 - for _, task := range scheduler.segmentTasks { - taskType := GetTaskType(task) - switch taskType { - case TaskTypeGrow: - segmentGrowNum++ - case TaskTypeReduce: - segmentReduceNum++ - case TaskTypeMove: + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { + switch { + case len(task.Actions()) > 1: segmentMoveNum++ + case task.Actions()[0].Type() == ActionTypeGrow: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentGrowNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderGrowNum++ + } + case task.Actions()[0].Type() == ActionTypeReduce: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentReduceNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderReduceNum++ + } + case task.Actions()[0].Type() == ActionTypeUpdate: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentUpdateNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderUpdateNum++ + } } - } + return true + }) - for _, task := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { taskType := GetTaskType(task) switch taskType { case TaskTypeGrow: @@ -316,11 +336,18 @@ func (scheduler *taskScheduler) updateTaskMetrics() { case TaskTypeMove: channelMoveNum++ } - } + return true + }) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentUpdateTaskLabel).Set(float64(segmentUpdateNum)) + + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderGrowTaskLabel).Set(float64(leaderGrowNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderReduceTaskLabel).Set(float64(leaderReduceNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderUpdateTaskLabel).Set(float64(leaderUpdateNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum)) @@ -332,7 +359,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -365,7 +392,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - if old, ok := scheduler.channelTasks[index]; ok { + if old, ok := scheduler.channelTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -398,7 +425,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { } case *LeaderTask: index := NewReplicaLeaderIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -477,46 +504,42 @@ func (scheduler *taskScheduler) Dispatch(node int64) { log.Ctx(scheduler.ctx).Info("scheduler stopped") default: - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() + scheduler.scheduleMu.Lock() + defer scheduler.scheduleMu.Unlock() scheduler.schedule(node) } } func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action) - for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { taskCollID := task.CollectionID() if collectionID != -1 && collectionID != taskCollID { - continue + return true } actions := filterActions(task.Actions(), nodeID) if len(actions) > 0 { targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } - } + return true + }) return scheduler.calculateTaskDelta(targetActions) } func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action) - for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { taskCollID := task.CollectionID() if collectionID != -1 && collectionID != taskCollID { - continue + return true } actions := filterActions(task.Actions(), nodeID) if len(actions) > 0 { targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } - } + return true + }) return scheduler.calculateTaskDelta(targetActions) } @@ -561,10 +584,7 @@ func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Act } func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.Get(nodeID) if !ok { return nil } @@ -587,16 +607,13 @@ func WithTaskTypeFilter(taskType Type) TaskFilter { } func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - if len(filters) == 0 { - return len(scheduler.channelTasks) + return scheduler.channelTasks.Len() } // rewrite this with for loop counter := 0 - for _, task := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { allMatch := true for _, filter := range filters { if !filter(task) { @@ -607,21 +624,19 @@ func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int { if allMatch { counter++ } - } + return true + }) return counter } func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - if len(filters) == 0 { - return len(scheduler.segmentTasks) + scheduler.segmentTasks.Len() } // rewrite this with for loop counter := 0 - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { allMatch := true for _, filter := range filters { if !filter(task) { @@ -632,7 +647,8 @@ func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int { if allMatch { counter++ } - } + return true + }) return counter } @@ -657,17 +673,19 @@ func (scheduler *taskScheduler) schedule(node int64) { return } + tr := timerecord.NewTimeRecorder("") log := log.Ctx(scheduler.ctx).With( zap.Int64("nodeID", node), ) scheduler.tryPromoteAll() + promoteDur := tr.RecordSpan() log.Debug("process tasks related to node", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) // Process tasks @@ -683,6 +701,7 @@ func (scheduler *taskScheduler) schedule(node int64) { return true }) + preprocessDur := tr.RecordSpan() // The scheduler doesn't limit the number of tasks, // to commit tasks to executors as soon as possible, to reach higher merge possibility @@ -693,22 +712,29 @@ func (scheduler *taskScheduler) schedule(node int64) { } return nil }, "process") + processDur := tr.RecordSpan() for _, task := range toRemove { scheduler.remove(task) } + scheduler.updateTaskMetrics() + log.Info("processed tasks", zap.Int("toProcessNum", len(toProcess)), zap.Int32("committedNum", commmittedNum.Load()), zap.Int("toRemoveNum", len(toRemove)), + zap.Duration("promoteDur", promoteDur), + zap.Duration("preprocessDUr", preprocessDur), + zap.Duration("processDUr", processDur), + zap.Duration("totalDur", tr.ElapseSpan()), ) log.Info("process tasks related to node done", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) } @@ -749,10 +775,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { // return true if the task should be executed, // false otherwise func (scheduler *taskScheduler) preProcess(task Task) bool { - log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With( - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("taskID", task.ID()), - ) if task.Status() != TaskStatusStarted { return false } @@ -775,7 +797,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { } if !ready { - log.RatedInfo(30, "Blocking reduce action in balance channel task") + log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task", + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("taskID", task.ID())) break } } @@ -806,7 +830,7 @@ func (scheduler *taskScheduler) process(task Task) bool { ) actions, step := task.Actions(), task.Step() - executor, ok := scheduler.executors[actions[step].Node()] + executor, ok := scheduler.executors.Get(actions[step].Node()) if !ok { log.Warn("no executor for QueryNode", zap.Int("step", step), @@ -827,19 +851,18 @@ func (scheduler *taskScheduler) check(task Task) error { } func (scheduler *taskScheduler) RemoveByNode(node int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } + return true + }) } func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) { @@ -875,7 +898,7 @@ func (scheduler *taskScheduler) remove(task Task) { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) if task.Status() == TaskStatusFailed && task.Err() != nil && @@ -885,16 +908,15 @@ func (scheduler *taskScheduler) remove(task Task) { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - delete(scheduler.channelTasks, index) + scheduler.channelTasks.Remove(index) log = log.With(zap.String("channel", task.Channel())) case *LeaderTask: index := NewReplicaLeaderIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) } - scheduler.updateTaskMetrics() log.Info("task removed") if scheduler.meta.Exist(task.Context(), task.CollectionID()) { @@ -940,14 +962,18 @@ func (scheduler *taskScheduler) getTaskMetricsLabel(task Task) string { return metrics.UnknownTaskLabel } -func (scheduler *taskScheduler) checkStale(task Task) error { - log := log.Ctx(task.Context()).With( +func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field { + res := []zap.Field{ zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), zap.Int64("replicaID", task.ReplicaID()), zap.String("source", task.Source().String()), - ) + } + res = append(res, fields...) + return res +} +func (scheduler *taskScheduler) checkStale(task Task) error { switch task := task.(type) { case *SegmentTask: if err := scheduler.checkSegmentTaskStale(task); err != nil { @@ -974,7 +1000,9 @@ func (scheduler *taskScheduler) checkStale(task Task) error { zap.Int("step", step)) if scheduler.nodeMgr.Get(action.Node()) == nil { - log.Warn("the task is stale, the target node is offline") + log.Warn("the task is stale, the target node is offline", WrapTaskLog(task, + zap.Int64("nodeID", action.Node()), + zap.Int("step", step))...) return merr.WrapErrNodeNotFound(action.Node()) } } @@ -983,38 +1011,30 @@ func (scheduler *taskScheduler) checkStale(task Task) error { } func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task)...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } _, ok := scheduler.distMgr.GetShardLeader(replica, segment.GetInsertChannel()) if !ok { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...) return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator") } @@ -1026,23 +1046,16 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { } func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrNodeOffline(action.Node()) } if scheduler.targetMgr.GetDmChannel(task.ctx, task.collectionID, task.Channel(), meta.NextTargetFirst) == nil { - log.Warn("the task is stale, the channel to subscribe not exists in targets", - zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets", + WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel") } @@ -1054,48 +1067,41 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { } func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - zap.Int64("leaderID", task.leaderID), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), + zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } case ActionTypeReduce: view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 482282f54f319..c2ae2ff208abd 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -771,26 +771,14 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { suite.NoError(err) } - growings := map[int64]*meta.Segment{} - for _, segment := range suite.releaseSegments[1:] { - growings[segment] = utils.CreateTestSegment(suite.collection, 1, segment, targetNode, 1, "") - } - suite.dist.LeaderViewManager.Update(targetNode, &meta.LeaderView{ - ID: targetNode, - GrowingSegments: growings, - }) - segmentsNum := len(suite.releaseSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) - // Process tasks + // Process tasks and Release done suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(segmentsNum-1, 0, 0, segmentsNum-1) - - // Release done - suite.dist.LeaderViewManager.Update(targetNode) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - // Process tasks done + // Tasks removed suite.dispatchAndWait(targetNode) suite.AssertTaskNum(0, 0, 0, 0) @@ -1090,7 +1078,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { CollectionID: suite.collection, }, }, nil) - for _, segment := range suite.loadSegments { + for _, segment := range suite.loadSegments[1:] { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ { ID: segment, @@ -1111,13 +1099,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { })) suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{})) tasks := []Task{} - segments := make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments { - segments = append(segments, &datapb.SegmentInfo{ - ID: segment, - PartitionID: 1, - InsertChannel: channel.GetChannelName(), - }) task, err := NewSegmentTask( ctx, timeout, @@ -1131,33 +1113,8 @@ func (suite *TaskSuite) TestSegmentTaskStale() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) - suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, partition)) - suite.target.UpdateCollectionNextTarget(ctx, suite.collection) - segmentsNum := len(suite.loadSegments) - suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) - // Process tasks - suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - - // Process tasks done - // Dist contains channels, first task stale - view := &meta.LeaderView{ - ID: targetNode, - CollectionID: suite.collection, - Segments: map[int64]*querypb.SegmentDist{}, - Channel: channel.ChannelName, - } - for _, segment := range suite.loadSegments[1:] { - view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} - } - distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment { - return meta.SegmentFromInfo(info) - }) - suite.dist.LeaderViewManager.Update(targetNode, view) - suite.dist.SegmentDistManager.Update(targetNode, distSegments...) - segments = make([]*datapb.SegmentInfo, 0) + segments := make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments[1:] { segments = append(segments, &datapb.SegmentInfo{ ID: segment, @@ -1165,13 +1122,16 @@ func (suite *TaskSuite) TestSegmentTaskStale() { InsertChannel: channel.GetChannelName(), }) } - bakExpectations := suite.broker.ExpectedCalls - suite.broker.AssertExpectations(suite.T()) - suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0] suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, 2)) suite.target.UpdateCollectionNextTarget(ctx, suite.collection) + + // process done + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(1, 0, 0, 1) + + // task removed suite.dispatchAndWait(targetNode) suite.AssertTaskNum(0, 0, 0, 0) @@ -1184,7 +1144,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.NoError(task.Err()) } } - suite.broker.ExpectedCalls = bakExpectations } func (suite *TaskSuite) TestChannelTaskReplace() { @@ -1497,10 +1456,10 @@ func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) { suite.Equal(process, scheduler.processQueue.Len()) suite.Equal(wait, scheduler.waitQueue.Len()) - suite.Len(scheduler.segmentTasks, segment) - suite.Len(scheduler.channelTasks, channel) - suite.Equal(len(scheduler.tasks), process+wait) - suite.Equal(len(scheduler.tasks), segment+channel) + suite.Equal(scheduler.segmentTasks.Len(), segment) + suite.Equal(scheduler.channelTasks.Len(), channel) + suite.Equal(scheduler.tasks.Len(), process+wait) + suite.Equal(scheduler.tasks.Len(), segment+channel) } func (suite *TaskSuite) dispatchAndWait(node int64) { @@ -1512,13 +1471,14 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { count = 0 keys = make([]any, 0) - for _, executor := range suite.scheduler.executors { + suite.scheduler.executors.Range(func(_ int64, executor *Executor) bool { executor.executingTasks.Range(func(taskIndex string) bool { keys = append(keys, taskIndex) count++ return true }) - } + return true + }) if count == 0 { return diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 5e283b926ee60..881873c418f27 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error { log := log.Ctx(context.TODO()). WithRateGroup("utils.CheckLeaderAvailable", 1, 60). - With(zap.Int64("leaderID", leader.ID)) + With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID)) info := nodeMgr.Get(leader.ID) // Check whether leader is online diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 1eca7efe950cd..9811d30de59b0 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1208,7 +1208,10 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get growingSegments[entry.SegmentID] = &msgpb.MsgPosition{} continue } - growingSegments[entry.SegmentID] = segment.StartPosition() + // QueryCoord only requires the timestamp from the position. + growingSegments[entry.SegmentID] = &msgpb.MsgPosition{ + Timestamp: segment.StartPosition().GetTimestamp(), + } numOfGrowingRows += segment.InsertCount() } diff --git a/pkg/metrics/querycoord_metrics.go b/pkg/metrics/querycoord_metrics.go index 67aa181421d72..d66abea2cd129 100644 --- a/pkg/metrics/querycoord_metrics.go +++ b/pkg/metrics/querycoord_metrics.go @@ -36,6 +36,7 @@ const ( LeaderGrowTaskLabel = "leader_grow" LeaderReduceTaskLabel = "leader_reduce" + LeaderUpdateTaskLabel = "leader_update" UnknownTaskLabel = "unknown"