Skip to content

Commit

Permalink
fix: update binlog index memory uasge before loading segments (#28528)
Browse files Browse the repository at this point in the history
issue: #27678 
when interimIndex = true, memory predict should be update with the
memory usage of binlog index build process.

Signed-off-by: cqy123456 <[email protected]>
  • Loading branch information
cqy123456 authored Nov 29, 2023
1 parent d10c5af commit 3b1b14d
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ queryNode:
enableIndex: true
nlist: 128 # segment index nlist
nprobe: 16 # nprobe to search segment, based on your accuracy requirement, must smaller than nlist
memExpansionRate: 1.15 # the ratio of building interim index memory usage to raw data
loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments
enableDisk: false # enable querynode load disk index, and search on disk index
maxDiskUsagePercentage: 95
Expand Down
1 change: 0 additions & 1 deletion internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
") than other column's row count (" +
std::to_string(num_rows_.value()) + ")");
}
AssertInfo(!vector_indexings_.is_ready(field_id), "vec index is not ready");
if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
Expand Down
5 changes: 5 additions & 0 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
predictDiskUsage += uint64(getBinlogDataSize(fieldBinlog))
} else {
predictMemUsage += uint64(getBinlogDataSize(fieldBinlog))
enableBinlogIndex := paramtable.Get().QueryNodeCfg.EnableTempSegmentIndex.GetAsBool()
if enableBinlogIndex {
buildBinlogIndexRate := paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat()
predictMemUsage += uint64(float32(getBinlogDataSize(fieldBinlog)) * float32(buildBinlogIndexRate))
}
}
}

Expand Down
19 changes: 18 additions & 1 deletion internal/querynodev2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,21 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
}

func (suite *ServiceSuite) genSegmentIndexInfos(loadInfo []*querypb.SegmentLoadInfo) []*indexpb.IndexInfo {
indexInfoList := make([]*indexpb.IndexInfo, 0)
seg0LoadInfo := loadInfo[0]
fieldIndexInfos := seg0LoadInfo.IndexInfos
for _, info := range fieldIndexInfos {
indexInfoList = append(indexInfoList, &indexpb.IndexInfo{
CollectionID: suite.collectionID,
FieldID: info.GetFieldID(),
IndexName: info.GetIndexName(),
IndexParams: info.GetIndexParams(),
})
}
return indexInfoList
}

func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema) []*querypb.SegmentLoadInfo {
ctx := context.Background()

Expand Down Expand Up @@ -682,6 +697,8 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
info.IndexInfos = nil
return info
})
// generate indexinfos for setting index meta.
indexInfoList := suite.genSegmentIndexInfos(infos)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
Expand All @@ -693,7 +710,7 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
Schema: schema,
NeedTransfer: false,
LoadScope: querypb.LoadScope_Full,
IndexInfoList: []*indexpb.IndexInfo{{}},
IndexInfoList: indexInfoList,
}

// Load segment
Expand Down
20 changes: 15 additions & 5 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,11 +1600,12 @@ type queryNodeConfig struct {
StatsPublishInterval ParamItem `refreshable:"true"`

// segcore
KnowhereThreadPoolSize ParamItem `refreshable:"false"`
ChunkRows ParamItem `refreshable:"false"`
EnableTempSegmentIndex ParamItem `refreshable:"false"`
InterimIndexNlist ParamItem `refreshable:"false"`
InterimIndexNProbe ParamItem `refreshable:"false"`
KnowhereThreadPoolSize ParamItem `refreshable:"false"`
ChunkRows ParamItem `refreshable:"false"`
EnableTempSegmentIndex ParamItem `refreshable:"false"`
InterimIndexNlist ParamItem `refreshable:"false"`
InterimIndexNProbe ParamItem `refreshable:"false"`
InterimIndexMemExpandRate ParamItem `refreshable:"false"`

// memory limit
LoadMemoryUsageFactor ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -1743,6 +1744,15 @@ func (p *queryNodeConfig) init(base *BaseTable) {
}
p.InterimIndexNlist.Init(base.mgr)

p.InterimIndexMemExpandRate = ParamItem{
Key: "queryNode.segcore.interimIndex.memExpansionRate",
Version: "2.0.0",
DefaultValue: "1.15",
Doc: "extra memory needed by building interim index",
Export: true,
}
p.InterimIndexMemExpandRate.Init(base.mgr)

p.InterimIndexNProbe = ParamItem{
Key: "queryNode.segcore.interimIndex.nprobe",
Version: "2.0.0",
Expand Down

0 comments on commit 3b1b14d

Please sign in to comment.