Skip to content

Commit

Permalink
feat: trigger compaction to handle index version (milvus-io#28442)
Browse files Browse the repository at this point in the history
issue: milvus-io#28441

---------

Signed-off-by: Enwei Jiao <[email protected]>
  • Loading branch information
jiaoew1991 committed Nov 27, 2023
1 parent 9e82a75 commit 1cc62a1
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 49 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=Handler --dir=internal/datacoord --filename=mock_handler.go --output=internal/datacoord --structname=NMockHandler --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=allocator --dir=internal/datacoord --filename=mock_allocator_test.go --output=internal/datacoord --structname=NMockAllocator --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=RWChannelStore --dir=internal/datacoord --filename=mock_channel_store.go --output=internal/datacoord --structname=MockRWChannelStore --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=IndexEngineVersionManager --dir=internal/datacoord --filename=mock_index_engine_version_manager.go --output=internal/datacoord --structname=MockVersionManager --with-expecter --inpackage

generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage
Expand Down
33 changes: 23 additions & 10 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type compactionTrigger struct {
forceMu sync.Mutex
quit chan struct{}
wg sync.WaitGroup
// segRefer *SegmentReferenceManager
// indexCoord types.IndexCoord

indexEngineVersionManager IndexEngineVersionManager

estimateNonDiskSegmentPolicy calUpperLimitPolicy
estimateDiskSegmentPolicy calUpperLimitPolicy
// A sloopy hack, so we can test with different segment row count without worrying that
Expand All @@ -85,17 +86,15 @@ func newCompactionTrigger(
meta *meta,
compactionHandler compactionPlanContext,
allocator allocator,
// segRefer *SegmentReferenceManager,
// indexCoord types.IndexCoord,
handler Handler,
indexVersionManager IndexEngineVersionManager,
) *compactionTrigger {
return &compactionTrigger{
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
// segRefer: segRefer,
// indexCoord: indexCoord,
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
indexEngineVersionManager: indexVersionManager,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
handler: handler,
Expand Down Expand Up @@ -908,6 +907,20 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
return true
}

// index version of segment lower than current version and IndexFileKeys should have value, trigger compaction
for _, index := range segment.segmentIndexes {
if index.CurrentIndexVersion < t.indexEngineVersionManager.GetCurrentIndexEngineVersion() &&
len(index.IndexFileKeys) > 0 {
log.Info("index version is too old, trigger compaction",
zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", index.IndexID),
zap.Strings("indexFileKeys", index.IndexFileKeys),
zap.Int32("currentIndexVersion", index.CurrentIndexVersion),
zap.Int32("currentEngineVersion", t.indexEngineVersionManager.GetCurrentIndexEngineVersion()))
return true
}
}

return false
}

Expand Down
107 changes: 94 additions & 13 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (h *spyCompactionHandler) start() {}

func (h *spyCompactionHandler) stop() {}

func newMockVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{}
}

var _ compactionPlanContext = (*spyCompactionHandler)(nil)

func Test_compactionTrigger_force(t *testing.T) {
Expand Down Expand Up @@ -1294,6 +1298,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
testingOnly: true,
Expand Down Expand Up @@ -1471,6 +1476,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
testingOnly: true,
Expand Down Expand Up @@ -1629,13 +1635,14 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
testingOnly: true,
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
testingOnly: true,
}
tr.start()
defer tr.stop()
Expand Down Expand Up @@ -1678,7 +1685,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {

// Test shouldDoSingleCompaction
func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newIndexEngineVersionManager())

// Test too many deltalogs.
var binlogs []*datapb.FieldBinlog
Expand Down Expand Up @@ -1816,6 +1823,77 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
// deltalog is large enough, should do compaction
couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{})
assert.True(t, couldDo)

mockVersionManager := NewMockVersionManager(t)
mockVersionManager.On("GetCurrentIndexEngineVersion", mock.Anything).Return(int32(2), nil)
trigger.indexEngineVersionManager = mockVersionManager
info4 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 600,
NumOfRows: 10000,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs2,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
101: {
CurrentIndexVersion: 1,
IndexFileKeys: []string{"index1"},
},
},
}
info5 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 600,
NumOfRows: 10000,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs2,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
101: {
CurrentIndexVersion: 2,
IndexFileKeys: []string{"index1"},
},
},
}
info6 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 600,
NumOfRows: 10000,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs2,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
101: {
CurrentIndexVersion: 1,
IndexFileKeys: nil,
},
},
}

// expire time < Timestamp To, but index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex
couldDo = trigger.ShouldDoSingleCompaction(info4, false, &compactTime{expireTime: 300})
assert.True(t, couldDo)
// expire time < Timestamp To, and index engine version is 2 which is equal CurrentIndexVersion in segmentIndex
couldDo = trigger.ShouldDoSingleCompaction(info5, false, &compactTime{expireTime: 300})
assert.False(t, couldDo)
// expire time < Timestamp To, and index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex but indexFileKeys is nil
couldDo = trigger.ShouldDoSingleCompaction(info6, false, &compactTime{expireTime: 300})
assert.False(t, couldDo)
}

func Test_compactionTrigger_new(t *testing.T) {
Expand All @@ -1839,7 +1917,7 @@ func Test_compactionTrigger_new(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, newMockHandler())
got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, newMockHandler(), newMockVersionManager())
assert.Equal(t, tt.args.meta, got.meta)
assert.Equal(t, tt.args.compactionHandler, got.compactionHandler)
assert.Equal(t, tt.args.allocator, got.allocator)
Expand All @@ -1848,7 +1926,7 @@ func Test_compactionTrigger_new(t *testing.T) {
}

func Test_compactionTrigger_handleSignal(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newMockVersionManager())
signal := &compactionSignal{
segmentID: 1,
}
Expand All @@ -1858,12 +1936,12 @@ func Test_compactionTrigger_handleSignal(t *testing.T) {
}

func Test_compactionTrigger_allocTs(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newMockVersionManager())
ts, err := got.allocTs()
assert.NoError(t, err)
assert.True(t, ts > 0)

got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, &FailsAllocator{}, newMockHandler())
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, &FailsAllocator{}, newMockHandler(), newMockVersionManager())
ts, err = got.allocTs()
assert.Error(t, err)
assert.Equal(t, uint64(0), ts)
Expand Down Expand Up @@ -1895,7 +1973,7 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) {
&Server{
meta: m,
},
})
}, newMockVersionManager())
coll := &collectionInfo{
ID: 1,
Schema: newTestSchema(),
Expand Down Expand Up @@ -1925,6 +2003,7 @@ type CompactionTriggerSuite struct {
allocator *NMockAllocator
handler *NMockHandler
compactionHandler *MockCompactionPlanContext
versionManager *MockVersionManager
}

func (s *CompactionTriggerSuite) SetupSuite() {
Expand Down Expand Up @@ -2046,11 +2125,13 @@ func (s *CompactionTriggerSuite) SetupTest() {
s.allocator = NewNMockAllocator(s.T())
s.compactionHandler = NewMockCompactionPlanContext(s.T())
s.handler = NewNMockHandler(s.T())
s.versionManager = NewMockVersionManager(s.T())
s.tr = newCompactionTrigger(
s.meta,
s.compactionHandler,
s.allocator,
s.handler,
s.versionManager,
)
s.tr.testingOnly = true
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ type indexBuilder struct {
policy buildIndexPolicy
nodeManager *IndexNodeManager
chunkManager storage.ChunkManager
indexEngineVersionManager *IndexEngineVersionManager
indexEngineVersionManager IndexEngineVersionManager
}

func newIndexBuilder(
ctx context.Context,
metaTable *meta, nodeManager *IndexNodeManager,
chunkManager storage.ChunkManager,
indexEngineVersionManager *IndexEngineVersionManager,
indexEngineVersionManager IndexEngineVersionManager,
) *indexBuilder {
ctx, cancel := context.WithCancel(ctx)

Expand Down
30 changes: 20 additions & 10 deletions internal/datacoord/index_engine_version_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,28 @@ import (
"github.com/milvus-io/milvus/pkg/log"
)

type IndexEngineVersionManager struct {
type IndexEngineVersionManager interface {
Startup(sessions map[string]*sessionutil.Session)
AddNode(session *sessionutil.Session)
RemoveNode(session *sessionutil.Session)
Update(session *sessionutil.Session)

GetCurrentIndexEngineVersion() int32
GetMinimalIndexEngineVersion() int32
}

type versionManagerImpl struct {
mu sync.Mutex
versions map[int64]sessionutil.IndexEngineVersion
}

func newIndexEngineVersionManager() *IndexEngineVersionManager {
return &IndexEngineVersionManager{
func newIndexEngineVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{
versions: map[int64]sessionutil.IndexEngineVersion{},
}
}

func (m *IndexEngineVersionManager) Startup(sessions map[string]*sessionutil.Session) {
func (m *versionManagerImpl) Startup(sessions map[string]*sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -30,33 +40,33 @@ func (m *IndexEngineVersionManager) Startup(sessions map[string]*sessionutil.Ses
}
}

func (m *IndexEngineVersionManager) AddNode(session *sessionutil.Session) {
func (m *versionManagerImpl) AddNode(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()

m.addOrUpdate(session)
}

func (m *IndexEngineVersionManager) RemoveNode(session *sessionutil.Session) {
func (m *versionManagerImpl) RemoveNode(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()

delete(m.versions, session.ServerID)
}

func (m *IndexEngineVersionManager) Update(session *sessionutil.Session) {
func (m *versionManagerImpl) Update(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()

m.addOrUpdate(session)
}

func (m *IndexEngineVersionManager) addOrUpdate(session *sessionutil.Session) {
func (m *versionManagerImpl) addOrUpdate(session *sessionutil.Session) {
log.Info("addOrUpdate version", zap.Int64("nodeId", session.ServerID), zap.Int32("minimal", session.IndexEngineVersion.MinimalIndexVersion), zap.Int32("current", session.IndexEngineVersion.CurrentIndexVersion))
m.versions[session.ServerID] = session.IndexEngineVersion
}

func (m *IndexEngineVersionManager) GetCurrentIndexEngineVersion() int32 {
func (m *versionManagerImpl) GetCurrentIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -75,7 +85,7 @@ func (m *IndexEngineVersionManager) GetCurrentIndexEngineVersion() int32 {
return current
}

func (m *IndexEngineVersionManager) GetMinimalIndexEngineVersion() int32 {
func (m *versionManagerImpl) GetMinimalIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down
Loading

0 comments on commit 1cc62a1

Please sign in to comment.