From e64e0831b3f6e50525de158de4f238dc9296d922 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 19 Dec 2024 20:54:19 +0800 Subject: [PATCH] enhance: Optimize GetLocalDiskSize and segment loader mutex Signed-off-by: bigsheeper --- .../segments/disk_usage_fetcher.go | 81 +++++++++++++++++++ internal/querynodev2/segments/segment.go | 5 -- .../querynodev2/segments/segment_loader.go | 47 +++++------ .../segments/segment_loader_test.go | 6 +- internal/querynodev2/server.go | 4 +- internal/util/segcore/cgo_util.go | 3 +- internal/util/segcore/cgo_util_test.go | 3 +- pkg/util/paramtable/component_param.go | 9 +++ pkg/util/paramtable/component_param_test.go | 1 + 9 files changed, 119 insertions(+), 40 deletions(-) create mode 100644 internal/querynodev2/segments/disk_usage_fetcher.go diff --git a/internal/querynodev2/segments/disk_usage_fetcher.go b/internal/querynodev2/segments/disk_usage_fetcher.go new file mode 100644 index 0000000000000..ca4aeecaee250 --- /dev/null +++ b/internal/querynodev2/segments/disk_usage_fetcher.go @@ -0,0 +1,81 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package segments + +import ( + "context" + "fmt" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/segcore" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type diskUsageFetcher struct { + ctx context.Context + path string + usage *atomic.Int64 + err *atomic.Error +} + +func NewDiskUsageFetcher(ctx context.Context) *diskUsageFetcher { + return &diskUsageFetcher{ + ctx: ctx, + path: paramtable.Get().LocalStorageCfg.Path.GetValue(), + usage: atomic.NewInt64(0), + err: atomic.NewError(nil), + } +} + +func (d *diskUsageFetcher) GetDiskUsage() (int64, error) { + return d.usage.Load(), d.err.Load() +} + +func (d *diskUsageFetcher) fetch() { + diskUsage, err := segcore.GetLocalUsedSize(d.path) + if err != nil { + log.Warn("failed to get disk usage", zap.Error(err)) + d.err.Store(err) + return + } + d.usage.Store(diskUsage) + d.err.Store(nil) + metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(diskUsage) / 1024 / 1024) // in MB + log.Ctx(d.ctx).WithRateGroup("diskUsageFetcher", 1, 300). + RatedInfo(300, "querynode disk usage", zap.Int64("size", diskUsage), zap.Int64("nodeID", paramtable.GetNodeID())) +} + +func (d *diskUsageFetcher) Start() { + d.fetch() // Synchronously fetch once before starting. + + interval := paramtable.Get().QueryNodeCfg.DiskSizeFetchInterval.GetAsDuration(time.Second) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-d.ctx.Done(): + return + case <-ticker.C: + d.fetch() + } + } +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index dab60442b3f41..e2bd082f6af2d 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1275,11 +1275,6 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { GetDynamicPool().Submit(func() (any, error) { C.DeleteSegment(ptr) - localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) - // ignore error here, shall not block releasing - if err == nil { - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB - } return nil, nil }).Await() diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6e883ab30781a..d76189affb255 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -46,7 +46,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/segcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -147,6 +146,7 @@ type resourceEstimateFactor struct { } func NewLoader( + ctx context.Context, manager *Manager, cm storage.ChunkManager, ) *segmentLoader { @@ -166,12 +166,15 @@ func NewLoader( } log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) + duf := NewDiskUsageFetcher(ctx) + go duf.Start() loader := &segmentLoader{ manager: manager, cm: cm, loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](), committedResourceNotifier: syncutil.NewVersionedNotifier(), + duf: duf, } return loader @@ -207,11 +210,14 @@ type segmentLoader struct { manager *Manager cm storage.ChunkManager - mut sync.Mutex // The channel will be closed as the segment loaded - loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + + mut sync.Mutex // guards committedResource committedResource LoadResource committedResourceNotifier *syncutil.VersionedNotifier + + duf *diskUsageFetcher } var _ Loader = (*segmentLoader)(nil) @@ -383,8 +389,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp log := log.Ctx(ctx).With( zap.Stringer("segmentType", segmentType), ) - loader.mut.Lock() - defer loader.mut.Unlock() // filter out loaded & loading segments infos := make([]*querypb.SegmentLoadInfo, 0, len(segments)) @@ -407,8 +411,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp } func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) { - loader.mut.Lock() - defer loader.mut.Unlock() for i := range segments { result, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID()) if ok { @@ -443,22 +445,22 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer zap.Int64s("segmentIDs", segmentIDs), ) - loader.mut.Lock() - defer loader.mut.Unlock() - - result := requestResourceResult{ - CommittedResource: loader.committedResource, - } - memoryUsage := hardware.GetUsedMemoryCount() totalMemory := hardware.GetMemoryCount() - diskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) + diskUsage, err := loader.duf.GetDiskUsage() if err != nil { - return result, errors.Wrap(err, "get local used size failed") + return requestResourceResult{}, errors.Wrap(err, "get local used size failed") } diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() + loader.mut.Lock() + defer loader.mut.Unlock() + + result := requestResourceResult{ + CommittedResource: loader.committedResource, + } + if loader.committedResource.MemorySize+memoryUsage >= totalMemory { return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory)) } else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap { @@ -466,7 +468,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer } result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos)) - mu, du, err := loader.checkSegmentSize(ctx, infos) + mu, du, err := loader.checkSegmentSize(ctx, infos, memoryUsage, totalMemory, diskUsage) if err != nil { log.Warn("no sufficient resource to load segments", zap.Error(err)) return result, err @@ -1347,7 +1349,7 @@ func JoinIDPath(ids ...int64) string { // checkSegmentSize checks whether the memory & disk is sufficient to load the segments // returns the memory & disk usage while loading if possible to load, // otherwise, returns error -func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo) (uint64, uint64, error) { +func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, memUsage, totalMem uint64, localDiskUsage int64) (uint64, uint64, error) { if len(segmentLoadInfos) == 0 { return 0, 0, nil } @@ -1360,18 +1362,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return float64(mem) / 1024 / 1024 } - memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize - totalMem := hardware.GetMemoryCount() + memUsage = memUsage + loader.committedResource.MemorySize if memUsage == 0 || totalMem == 0 { return 0, 0, errors.New("get memory failed when checkSegmentSize") } - localDiskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) - if err != nil { - return 0, 0, errors.Wrap(err, "get local used size failed") - } - - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage))) diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize factor := resourceEstimateFactor{ diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 4b87d4a182afc..496fbd1301bf9 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -81,7 +81,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { // Dependencies suite.manager = NewManager() - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data @@ -98,7 +98,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { func (suite *SegmentLoaderSuite) SetupBM25() { // Dependencies suite.manager = NewManager() - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) suite.schema = mock_segcore.GenTestBM25CollectionSchema("test") @@ -798,7 +798,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() { ctx := context.Background() chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index fd331a6bdfa0b..6b8f5f71156ed 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -322,7 +322,7 @@ func (node *QueryNode) Init() error { node.factory.Init(paramtable.Get()) localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() - localUsedSize, err := segcore.GetLocalUsedSize(node.ctx, localRootPath) + localUsedSize, err := segcore.GetLocalUsedSize(localRootPath) if err != nil { log.Warn("get local used size failed", zap.Error(err)) initError = err @@ -369,7 +369,7 @@ func (node *QueryNode) Init() error { node.subscribingChannels = typeutil.NewConcurrentSet[string]() node.unsubscribingChannels = typeutil.NewConcurrentSet[string]() node.manager = segments.NewManager() - node.loader = segments.NewLoader(node.manager, node.chunkManager) + node.loader = segments.NewLoader(node.ctx, node.manager, node.chunkManager) node.manager.SetLoader(node.loader) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) // init pipeline manager diff --git a/internal/util/segcore/cgo_util.go b/internal/util/segcore/cgo_util.go index a34c08657df13..ec0872d9f4af3 100644 --- a/internal/util/segcore/cgo_util.go +++ b/internal/util/segcore/cgo_util.go @@ -27,7 +27,6 @@ package segcore import "C" import ( - "context" "math" "unsafe" @@ -64,7 +63,7 @@ func getCProtoBlob(cProto *C.CProto) []byte { } // GetLocalUsedSize returns the used size of the local path -func GetLocalUsedSize(ctx context.Context, path string) (int64, error) { +func GetLocalUsedSize(path string) (int64, error) { var availableSize int64 cSize := (*C.int64_t)(&availableSize) cPath := C.CString(path) diff --git a/internal/util/segcore/cgo_util_test.go b/internal/util/segcore/cgo_util_test.go index 3a517ede69d56..a3ad1d935406f 100644 --- a/internal/util/segcore/cgo_util_test.go +++ b/internal/util/segcore/cgo_util_test.go @@ -1,7 +1,6 @@ package segcore import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -13,7 +12,7 @@ func TestConsumeCStatusIntoError(t *testing.T) { } func TestGetLocalUsedSize(t *testing.T) { - size, err := GetLocalUsedSize(context.Background(), "") + size, err := GetLocalUsedSize("") assert.NoError(t, err) assert.NotNil(t, size) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 68d139b659a23..8045afc90cc88 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2509,6 +2509,7 @@ type queryNodeConfig struct { // loader IoPoolSize ParamItem `refreshable:"false"` DeltaDataExpansionRate ParamItem `refreshable:"true"` + DiskSizeFetchInterval ParamItem `refreshable:"false"` // schedule task policy. SchedulePolicyName ParamItem `refreshable:"false"` @@ -3125,6 +3126,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.DeltaDataExpansionRate.Init(base.mgr) + p.DiskSizeFetchInterval = ParamItem{ + Key: "querynode.diskSizeFetchInterval", + Version: "2.5.0", + DefaultValue: "60", + Doc: "The time interval in seconds for retrieving disk usage.", + } + p.DiskSizeFetchInterval.Init(base.mgr) + // schedule read task policy. p.SchedulePolicyName = ParamItem{ Key: "queryNode.scheduler.scheduleReadPolicy.name", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 2fbe4079ccfbb..6d0b210fc2f73 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -483,6 +483,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool()) + assert.Equal(t, 60*time.Second, Params.DiskSizeFetchInterval.GetAsDuration(time.Second)) }) t.Run("test dataCoordConfig", func(t *testing.T) {