Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.5] Reduce mutex contention in datacoord meta #38904

Open
wants to merge 1 commit into
base: 2.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,14 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
return nil, 0, err
}

partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
!segment.GetIsInvisible()
})
}))

views := make([]CompactionView, 0)
// partSegments is list of chanPartSegments, which is channel-partition organized segments
Expand Down
7 changes: 3 additions & 4 deletions internal/datacoord/compaction_policy_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,14 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
return nil, 0, err
}

partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now
!segment.GetIsInvisible()
})
}))

views := make([]CompactionView, 0)
for _, group := range partSegments {
Expand Down
9 changes: 9 additions & 0 deletions internal/datacoord/compaction_policy_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() {
segments[103] = buildTestSegment(101, collID, datapb.SegmentLevel_L2, 100, 10000, 1)
segmentsInfo := &SegmentsInfo{
segments: segments,
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
collID: {
101: segments[101],
102: segments[102],
103: segments[103],
},
},
},
}

compactionTaskMeta := newTestCompactionTaskMeta(s.T())
Expand Down
16 changes: 13 additions & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
zap.Int64("signal.collectionID", signal.collectionID),
zap.Int64("signal.partitionID", signal.partitionID),
zap.Int64("signal.segmentID", signal.segmentID))
partSegments := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
isSegmentHealthy(segment) &&
filter := SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
Expand All @@ -302,6 +301,17 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
!segment.GetIsInvisible()
}) // partSegments is list of chanPartSegments, which is channel-partition organized segments

partSegments := make([]*chanPartSegments, 0)
// get all segments if signal.collection == 0, otherwise get collection segments
if signal.collectionID != 0 {
partSegments = GetSegmentsChanPart(t.meta, signal.collectionID, filter)
} else {
collections := t.meta.GetCollections()
for _, collection := range collections {
partSegments = append(partSegments, GetSegmentsChanPart(t.meta, collection.ID, filter)...)
}
}

if len(partSegments) == 0 {
log.Info("the length of SegmentsChanPart is 0, skip to handle compaction")
return nil
Expand Down
211 changes: 134 additions & 77 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,34 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
},
}

segInfo := &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
IsSorted: true,
}
m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
IsSorted: true,
SegmentInfo: segInfo,
},
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
collectionID: {
1: {
SegmentInfo: segInfo,
},
},
},
},
Expand Down Expand Up @@ -214,6 +224,76 @@ func Test_compactionTrigger_force(t *testing.T) {

mock0Allocator := newMock0Allocator(t)

seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
IsSorted: true,
},
}

seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
IsSorted: true,
},
}

seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
IsSorted: true,
},
}

tests := []struct {
name string
fields fields
Expand All @@ -230,71 +310,18 @@ func Test_compactionTrigger_force(t *testing.T) {
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
IsSorted: true,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
IsSorted: true,
1: seg1,
2: seg2,
3: seg3,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
2: {
seg1.GetID(): seg1,
seg2.GetID(): seg2,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
IsSorted: true,
1111: {
seg3.GetID(): seg3,
},
},
},
Expand Down Expand Up @@ -617,7 +644,13 @@ func Test_compactionTrigger_force(t *testing.T) {
t.Run(tt.name+" with DiskANN index", func(t *testing.T) {
for _, segment := range tt.fields.meta.segments.GetSegments() {
// Collection 1000 means it has DiskANN index
delete(tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()], segment.GetID())
segment.CollectionID = 1000
_, ok := tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()]
if !ok {
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()] = make(map[UniqueID]*SegmentInfo)
}
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()][segment.GetID()] = segment
}
tr := &compactionTrigger{
meta: tt.fields.meta,
Expand Down Expand Up @@ -725,6 +758,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
vecFieldID := int64(201)
segmentInfos := &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
secondaryIndexes: segmentInfoIndexes{
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
},
}

indexMeta := newSegmentIndexMeta(nil)
Expand All @@ -751,6 +787,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
},
}

segmentInfos.secondaryIndexes.coll2Segments[2] = make(map[UniqueID]*SegmentInfo)
nSegments := 50
for i := UniqueID(0); i < UniqueID(nSegments); i++ {
info := &SegmentInfo{
Expand Down Expand Up @@ -794,6 +831,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
})

segmentInfos.segments[i] = info
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
}

mock0Allocator := newMockAllocator(t)
Expand Down Expand Up @@ -1110,15 +1148,28 @@ func mockSegment(segID, rows, deleteRows, sizeInMB int64) *datapb.SegmentInfo {

func mockSegmentsInfo(sizeInMB ...int64) *SegmentsInfo {
segments := make(map[int64]*SegmentInfo, len(sizeInMB))
collectionID := int64(2)
channel := "ch1"
coll2Segments := make(map[UniqueID]map[UniqueID]*SegmentInfo)
coll2Segments[collectionID] = make(map[UniqueID]*SegmentInfo)
channel2Segments := make(map[string]map[UniqueID]*SegmentInfo)
channel2Segments[channel] = make(map[UniqueID]*SegmentInfo)
for i, size := range sizeInMB {
segId := int64(i + 1)
segments[segId] = &SegmentInfo{
info := &SegmentInfo{
SegmentInfo: mockSegment(segId, size, 1, size),
lastFlushTime: time.Now().Add(-100 * time.Minute),
}
segments[segId] = info
coll2Segments[collectionID][segId] = info
channel2Segments[channel][segId] = info
}
return &SegmentsInfo{
segments: segments,
secondaryIndexes: segmentInfoIndexes{
coll2Segments: coll2Segments,
channel2Segments: channel2Segments,
},
}
}

Expand Down Expand Up @@ -1564,6 +1615,10 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {

segmentInfos := &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{2: {}},
channel2Segments: map[string]map[UniqueID]*SegmentInfo{"ch1": {}},
},
}

size := []int64{
Expand Down Expand Up @@ -1636,6 +1691,8 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
})

segmentInfos.segments[i] = info
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
segmentInfos.secondaryIndexes.channel2Segments["ch1"][i] = info
}

mock0Allocator := newMockAllocator(t)
Expand Down
Loading
Loading