diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index 34a96e935796f..29c6c65e56bb2 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -33,10 +33,12 @@ var ( // and other operations (insert/delete/statistics/etc.) // since in concurrent situation, there operation may block each other in high payload - sqp atomic.Pointer[conc.Pool[any]] - sqOnce sync.Once - dp atomic.Pointer[conc.Pool[any]] - dynOnce sync.Once + sqp atomic.Pointer[conc.Pool[any]] + sqOnce sync.Once + dp atomic.Pointer[conc.Pool[any]] + dynOnce sync.Once + loadPool atomic.Pointer[conc.Pool[any]] + loadOnce sync.Once ) // initSQPool initialize @@ -67,6 +69,19 @@ func initDynamicPool() { }) } +func initLoadPool() { + loadOnce.Do(func() { + pool := conc.NewPool[any]( + hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(), + conc.WithPreAlloc(false), + conc.WithDisablePurge(false), + conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + ) + + loadPool.Store(pool) + }) +} + // GetSQPool returns the singleton pool instance for search/query operations. func GetSQPool() *conc.Pool[any] { initSQPool() @@ -78,3 +93,8 @@ func GetDynamicPool() *conc.Pool[any] { initDynamicPool() return dp.Load() } + +func GetLoadPool() *conc.Pool[any] { + initLoadPool() + return loadPool.Load() +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 81bd9bcd6082b..4caee7ca58d7e 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -620,7 +620,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field } var status C.CStatus - GetDynamicPool().Submit(func() (any, error) { + GetLoadPool().Submit(func() (any, error) { status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) return nil, nil }).Await() @@ -672,7 +672,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()) var status C.CStatus - GetDynamicPool().Submit(func() (any, error) { + GetLoadPool().Submit(func() (any, error) { log.Info("submitted loadFieldData task to dy pool") status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) return nil, nil @@ -723,7 +723,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi } var status C.CStatus - GetDynamicPool().Submit(func() (any, error) { + GetLoadPool().Submit(func() (any, error) { status = C.AddFieldDataInfoForSealed(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) return nil, nil }).Await() @@ -851,7 +851,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo } var status C.CStatus - GetDynamicPool().Submit(func() (any, error) { + GetLoadPool().Submit(func() (any, error) { status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo) return nil, nil }).Await() diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 45d5e00051f3a..caf337cbd139d 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -83,19 +83,16 @@ type Loader interface { type LoadResource struct { MemorySize uint64 DiskSize uint64 - WorkNum int } func (r *LoadResource) Add(resource LoadResource) { r.MemorySize += resource.MemorySize r.DiskSize += resource.DiskSize - r.WorkNum += resource.WorkNum } func (r *LoadResource) Sub(resource LoadResource) { r.MemorySize -= resource.MemorySize r.DiskSize -= resource.DiskSize - r.WorkNum -= resource.WorkNum } func NewLoader( @@ -365,29 +362,13 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer } diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() - poolCap := hardware.GetCPUNum() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt() - if poolCap > 256 { - poolCap = 256 - } - if loader.committedResource.WorkNum >= poolCap { - return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap)) - } else if loader.committedResource.MemorySize+memoryUsage >= totalMemory { + if loader.committedResource.MemorySize+memoryUsage >= totalMemory { return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory)) } else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap { return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap)) } concurrencyLevel := funcutil.Min(hardware.GetCPUNum(), len(infos)) - - for _, info := range infos { - for _, field := range info.GetBinlogPaths() { - resource.WorkNum += len(field.GetBinlogs()) - } - for _, index := range info.GetIndexInfos() { - resource.WorkNum += len(index.IndexFilePaths) - } - } - for ; concurrencyLevel > 1; concurrencyLevel /= 2 { _, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel) if err == nil { @@ -409,8 +390,6 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer } loader.committedResource.Add(resource) log.Info("request resource for loading segments (unit in MiB)", - zap.Int("workerNum", resource.WorkNum), - zap.Int("committedWorkerNum", loader.committedResource.WorkNum), zap.Float64("memory", toMB(resource.MemorySize)), zap.Float64("committedMemory", toMB(loader.committedResource.MemorySize)), zap.Float64("disk", toMB(resource.DiskSize)),