Skip to content

Commit

Permalink
enhance: improve load speed (milvus-io#28518) (milvus-io#28719)
Browse files Browse the repository at this point in the history
This check rejects load request if running out the pool workers, but
small segment would be loaded soon, another segments would been loading
again after a check interval, which leads to slow loading for collection

Block the request by go pool

pr: milvus-io#28518

Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Nov 26, 2023
1 parent 3b06db1 commit a1b861e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 30 deletions.
28 changes: 24 additions & 4 deletions internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ var (
// and other operations (insert/delete/statistics/etc.)
// since in concurrent situation, there operation may block each other in high payload

sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
loadPool atomic.Pointer[conc.Pool[any]]
loadOnce sync.Once
)

// initSQPool initialize
Expand Down Expand Up @@ -67,6 +69,19 @@ func initDynamicPool() {
})
}

func initLoadPool() {
loadOnce.Do(func() {
pool := conc.NewPool[any](
hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(),
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
)

loadPool.Store(pool)
})
}

// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
Expand All @@ -78,3 +93,8 @@ func GetDynamicPool() *conc.Pool[any] {
initDynamicPool()
return dp.Load()
}

func GetLoadPool() *conc.Pool[any] {
initLoadPool()
return loadPool.Load()
}
8 changes: 4 additions & 4 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
}

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
Expand Down Expand Up @@ -672,7 +672,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
log.Info("submitted loadFieldData task to dy pool")
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
Expand Down Expand Up @@ -723,7 +723,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
}

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.AddFieldDataInfoForSealed(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
Expand Down Expand Up @@ -851,7 +851,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo
}

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo)
return nil, nil
}).Await()
Expand Down
23 changes: 1 addition & 22 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,16 @@ type Loader interface {
type LoadResource struct {
MemorySize uint64
DiskSize uint64
WorkNum int
}

func (r *LoadResource) Add(resource LoadResource) {
r.MemorySize += resource.MemorySize
r.DiskSize += resource.DiskSize
r.WorkNum += resource.WorkNum
}

func (r *LoadResource) Sub(resource LoadResource) {
r.MemorySize -= resource.MemorySize
r.DiskSize -= resource.DiskSize
r.WorkNum -= resource.WorkNum
}

func NewLoader(
Expand Down Expand Up @@ -365,29 +362,13 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
}
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()

poolCap := hardware.GetCPUNum() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt()
if poolCap > 256 {
poolCap = 256
}
if loader.committedResource.WorkNum >= poolCap {
return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap))
} else if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
}

concurrencyLevel := funcutil.Min(hardware.GetCPUNum(), len(infos))

for _, info := range infos {
for _, field := range info.GetBinlogPaths() {
resource.WorkNum += len(field.GetBinlogs())
}
for _, index := range info.GetIndexInfos() {
resource.WorkNum += len(index.IndexFilePaths)
}
}

for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel)
if err == nil {
Expand All @@ -409,8 +390,6 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
}
loader.committedResource.Add(resource)
log.Info("request resource for loading segments (unit in MiB)",
zap.Int("workerNum", resource.WorkNum),
zap.Int("committedWorkerNum", loader.committedResource.WorkNum),
zap.Float64("memory", toMB(resource.MemorySize)),
zap.Float64("committedMemory", toMB(loader.committedResource.MemorySize)),
zap.Float64("disk", toMB(resource.DiskSize)),
Expand Down

0 comments on commit a1b861e

Please sign in to comment.