Skip to content

Commit

Permalink
enhance: [10kcp] Optimize save collection target latency (#38345) (#3…
Browse files Browse the repository at this point in the history
…8370) (#38795)

issue: #38237
pr: #38345
this PR only use better compression level for proto msg which is larger
than 1MB, and use a lighter compression level for smaller proto msg,
which could get a better latency in most case.

this PR could reduce the latency from 22.7s to 4.7s with 10000
collctions and each collections has 1000 segments.

before this PR:
BenchmarkTargetManager-8 1 22781536357 ns/op 566407275088 B/op 11188282
allocs/op
after this PR:
BenchmarkTargetManager-8 1 4729566944 ns/op 36713248864 B/op 10963615
allocs/op

Signed-off-by: Wei Liu <[email protected]>
Co-authored-by: wei liu <[email protected]>
  • Loading branch information
bigsheeper and weiliu1031 authored Dec 26, 2024
1 parent 501d1b5 commit e13b8a2
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
8 changes: 7 additions & 1 deletion internal/metastore/kv/querycoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,14 @@ func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) err
if err != nil {
return err
}

// only compress data when size is larger than 1MB
compressLevel := zstd.SpeedFastest
if len(v) > 1024*1024 {
compressLevel = zstd.SpeedBetterCompression
}
var compressed bytes.Buffer
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(compressLevel))
kvs[k] = compressed.String()
}

Expand Down
47 changes: 47 additions & 0 deletions internal/querycoordv2/meta/target_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,53 @@ func (suite *TargetManagerSuite) TestRecover() {
suite.Len(targets, 0)
}

func BenchmarkTargetManager(b *testing.B) {
paramtable.Init()
config := GenerateEtcdConfig()
cli, _ := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())

kv := etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())

catalog := querycoord.NewCatalog(kv)
idAllocator := RandomIncrementIDAllocator()
meta := NewMeta(idAllocator, catalog, session.NewNodeManager())
mgr := NewTargetManager(nil, meta)

segmentNum := 1000
segments := make(map[int64]*datapb.SegmentInfo)
for i := 0; i < segmentNum; i++ {
segments[int64(i)] = &datapb.SegmentInfo{
ID: int64(i),
InsertChannel: "channel-1",
}
}

channels := map[string]*DmChannel{
"channel-1": {
VchannelInfo: &datapb.VchannelInfo{
CollectionID: int64(1),
ChannelName: "channel-1",
},
},
}

collectionNum := 10000
for i := 0; i < collectionNum; i++ {
mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mgr.SaveCurrentTarget(catalog)
}
}

func TestTargetManager(t *testing.T) {
suite.Run(t, new(TargetManagerSuite))
}

0 comments on commit e13b8a2

Please sign in to comment.