diff --git a/Makefile b/Makefile index 5c8234248ea32..4e0f8b575032e 100644 --- a/Makefile +++ b/Makefile @@ -426,6 +426,7 @@ generate-mockery-datacoord: getdeps $(INSTALL_PATH)/mockery --name=Handler --dir=internal/datacoord --filename=mock_handler.go --output=internal/datacoord --structname=NMockHandler --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=allocator --dir=internal/datacoord --filename=mock_allocator_test.go --output=internal/datacoord --structname=NMockAllocator --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=RWChannelStore --dir=internal/datacoord --filename=mock_channel_store.go --output=internal/datacoord --structname=MockRWChannelStore --with-expecter --inpackage + $(INSTALL_PATH)/mockery --name=IndexEngineVersionManager --dir=internal/datacoord --filename=mock_index_engine_version_manager.go --output=internal/datacoord --structname=MockVersionManager --with-expecter --inpackage generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 781cbdba9d27b..0435d5bc41fbb 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -72,8 +72,9 @@ type compactionTrigger struct { forceMu sync.Mutex quit chan struct{} wg sync.WaitGroup - // segRefer *SegmentReferenceManager - // indexCoord types.IndexCoord + + indexEngineVersionManager IndexEngineVersionManager + estimateNonDiskSegmentPolicy calUpperLimitPolicy estimateDiskSegmentPolicy calUpperLimitPolicy // A sloopy hack, so we can test with different segment row count without worrying that @@ -85,17 +86,15 @@ func newCompactionTrigger( meta *meta, compactionHandler compactionPlanContext, allocator allocator, - // segRefer *SegmentReferenceManager, - // indexCoord types.IndexCoord, handler Handler, + indexVersionManager IndexEngineVersionManager, ) *compactionTrigger { return &compactionTrigger{ - meta: meta, - allocator: allocator, - signals: make(chan *compactionSignal, 100), - compactionHandler: compactionHandler, - // segRefer: segRefer, - // indexCoord: indexCoord, + meta: meta, + allocator: allocator, + signals: make(chan *compactionSignal, 100), + compactionHandler: compactionHandler, + indexEngineVersionManager: indexVersionManager, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, handler: handler, @@ -908,6 +907,17 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis return true } + // index version of segment lower than current version, trigger compaction + for _, index := range segment.segmentIndexes { + if index.CurrentIndexVersion < t.indexEngineVersionManager.GetCurrentIndexEngineVersion() { + log.Info("index version is too old, trigger compaction", + zap.Int64("segmentID", segment.ID), + zap.Int32("currentIndexVersion", index.CurrentIndexVersion), + zap.Int32("currentEngineVersion", t.indexEngineVersionManager.GetCurrentIndexEngineVersion())) + return true + } + } + return false } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index c815b76be58a9..27cf3e0840b84 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -73,6 +73,10 @@ func (h *spyCompactionHandler) start() {} func (h *spyCompactionHandler) stop() {} +func newMockVersionManager() IndexEngineVersionManager { + return &versionManagerImpl{} +} + var _ compactionPlanContext = (*spyCompactionHandler)(nil) func Test_compactionTrigger_force(t *testing.T) { @@ -1294,6 +1298,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + indexEngineVersionManager: newMockVersionManager(), estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, testingOnly: true, @@ -1471,6 +1476,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + indexEngineVersionManager: newMockVersionManager(), estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, testingOnly: true, @@ -1629,13 +1635,14 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + indexEngineVersionManager: newMockVersionManager(), + testingOnly: true, } tr.start() defer tr.stop() @@ -1678,7 +1685,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { // Test shouldDoSingleCompaction func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { - trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler()) + trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newIndexEngineVersionManager()) // Test too many deltalogs. var binlogs []*datapb.FieldBinlog @@ -1816,6 +1823,53 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { // deltalog is large enough, should do compaction couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{}) assert.True(t, couldDo) + + mockVersionManager := NewMockVersionManager(t) + mockVersionManager.On("GetCurrentIndexEngineVersion", mock.Anything).Return(int32(2), nil) + trigger.indexEngineVersionManager = mockVersionManager + info4 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 600, + NumOfRows: 10000, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs2, + }, + segmentIndexes: map[UniqueID]*model.SegmentIndex{ + 101: { + CurrentIndexVersion: 1, + }, + }, + } + info5 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 600, + NumOfRows: 10000, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs2, + }, + segmentIndexes: map[UniqueID]*model.SegmentIndex{ + 101: { + CurrentIndexVersion: 2, + }, + }, + } + + // expire time < Timestamp To, but index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex + couldDo = trigger.ShouldDoSingleCompaction(info4, false, &compactTime{expireTime: 300}) + assert.True(t, couldDo) + // expire time < Timestamp To, and index engine version is 2 which is equal CurrentIndexVersion in segmentIndex + couldDo = trigger.ShouldDoSingleCompaction(info5, false, &compactTime{expireTime: 300}) + assert.False(t, couldDo) } func Test_compactionTrigger_new(t *testing.T) { @@ -1839,7 +1893,7 @@ func Test_compactionTrigger_new(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, newMockHandler()) + got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, newMockHandler(), newMockVersionManager()) assert.Equal(t, tt.args.meta, got.meta) assert.Equal(t, tt.args.compactionHandler, got.compactionHandler) assert.Equal(t, tt.args.allocator, got.allocator) @@ -1848,7 +1902,7 @@ func Test_compactionTrigger_new(t *testing.T) { } func Test_compactionTrigger_handleSignal(t *testing.T) { - got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler()) + got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager()) signal := &compactionSignal{ segmentID: 1, } @@ -1858,12 +1912,12 @@ func Test_compactionTrigger_handleSignal(t *testing.T) { } func Test_compactionTrigger_allocTs(t *testing.T) { - got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler()) + got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager()) ts, err := got.allocTs() assert.NoError(t, err) assert.True(t, ts > 0) - got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, &FailsAllocator{}, newMockHandler()) + got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, &FailsAllocator{}, newMockHandler(), newMockVersionManager()) ts, err = got.allocTs() assert.Error(t, err) assert.Equal(t, uint64(0), ts) @@ -1895,7 +1949,7 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) { &Server{ meta: m, }, - }) + }, newMockVersionManager()) coll := &collectionInfo{ ID: 1, Schema: newTestSchema(), @@ -1925,6 +1979,7 @@ type CompactionTriggerSuite struct { allocator *NMockAllocator handler *NMockHandler compactionHandler *MockCompactionPlanContext + versionManager *MockVersionManager } func (s *CompactionTriggerSuite) SetupSuite() { @@ -2046,11 +2101,13 @@ func (s *CompactionTriggerSuite) SetupTest() { s.allocator = NewNMockAllocator(s.T()) s.compactionHandler = NewMockCompactionPlanContext(s.T()) s.handler = NewNMockHandler(s.T()) + s.versionManager = NewMockVersionManager(s.T()) s.tr = newCompactionTrigger( s.meta, s.compactionHandler, s.allocator, s.handler, + s.versionManager, ) s.tr.testingOnly = true } diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index 99d2feab2e65a..4991d8f9455ca 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -80,14 +80,14 @@ type indexBuilder struct { policy buildIndexPolicy nodeManager *IndexNodeManager chunkManager storage.ChunkManager - indexEngineVersionManager *IndexEngineVersionManager + indexEngineVersionManager IndexEngineVersionManager } func newIndexBuilder( ctx context.Context, metaTable *meta, nodeManager *IndexNodeManager, chunkManager storage.ChunkManager, - indexEngineVersionManager *IndexEngineVersionManager, + indexEngineVersionManager IndexEngineVersionManager, ) *indexBuilder { ctx, cancel := context.WithCancel(ctx) diff --git a/internal/datacoord/index_engine_version_manager.go b/internal/datacoord/index_engine_version_manager.go index 82944705cdc97..3c5d4d25ae115 100644 --- a/internal/datacoord/index_engine_version_manager.go +++ b/internal/datacoord/index_engine_version_manager.go @@ -10,18 +10,28 @@ import ( "github.com/milvus-io/milvus/pkg/log" ) -type IndexEngineVersionManager struct { +type IndexEngineVersionManager interface { + Startup(sessions map[string]*sessionutil.Session) + AddNode(session *sessionutil.Session) + RemoveNode(session *sessionutil.Session) + Update(session *sessionutil.Session) + + GetCurrentIndexEngineVersion() int32 + GetMinimalIndexEngineVersion() int32 +} + +type versionManagerImpl struct { mu sync.Mutex versions map[int64]sessionutil.IndexEngineVersion } -func newIndexEngineVersionManager() *IndexEngineVersionManager { - return &IndexEngineVersionManager{ +func newIndexEngineVersionManager() IndexEngineVersionManager { + return &versionManagerImpl{ versions: map[int64]sessionutil.IndexEngineVersion{}, } } -func (m *IndexEngineVersionManager) Startup(sessions map[string]*sessionutil.Session) { +func (m *versionManagerImpl) Startup(sessions map[string]*sessionutil.Session) { m.mu.Lock() defer m.mu.Unlock() @@ -30,33 +40,33 @@ func (m *IndexEngineVersionManager) Startup(sessions map[string]*sessionutil.Ses } } -func (m *IndexEngineVersionManager) AddNode(session *sessionutil.Session) { +func (m *versionManagerImpl) AddNode(session *sessionutil.Session) { m.mu.Lock() defer m.mu.Unlock() m.addOrUpdate(session) } -func (m *IndexEngineVersionManager) RemoveNode(session *sessionutil.Session) { +func (m *versionManagerImpl) RemoveNode(session *sessionutil.Session) { m.mu.Lock() defer m.mu.Unlock() delete(m.versions, session.ServerID) } -func (m *IndexEngineVersionManager) Update(session *sessionutil.Session) { +func (m *versionManagerImpl) Update(session *sessionutil.Session) { m.mu.Lock() defer m.mu.Unlock() m.addOrUpdate(session) } -func (m *IndexEngineVersionManager) addOrUpdate(session *sessionutil.Session) { +func (m *versionManagerImpl) addOrUpdate(session *sessionutil.Session) { log.Info("addOrUpdate version", zap.Int64("nodeId", session.ServerID), zap.Int32("minimal", session.IndexEngineVersion.MinimalIndexVersion), zap.Int32("current", session.IndexEngineVersion.CurrentIndexVersion)) m.versions[session.ServerID] = session.IndexEngineVersion } -func (m *IndexEngineVersionManager) GetCurrentIndexEngineVersion() int32 { +func (m *versionManagerImpl) GetCurrentIndexEngineVersion() int32 { m.mu.Lock() defer m.mu.Unlock() @@ -75,7 +85,7 @@ func (m *IndexEngineVersionManager) GetCurrentIndexEngineVersion() int32 { return current } -func (m *IndexEngineVersionManager) GetMinimalIndexEngineVersion() int32 { +func (m *versionManagerImpl) GetMinimalIndexEngineVersion() int32 { m.mu.Lock() defer m.mu.Unlock() diff --git a/internal/datacoord/mock_index_engine_version_manager.go b/internal/datacoord/mock_index_engine_version_manager.go new file mode 100644 index 0000000000000..c1da3ed7c4054 --- /dev/null +++ b/internal/datacoord/mock_index_engine_version_manager.go @@ -0,0 +1,249 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package datacoord + +import ( + sessionutil "github.com/milvus-io/milvus/internal/util/sessionutil" + mock "github.com/stretchr/testify/mock" +) + +// MockVersionManager is an autogenerated mock type for the IndexEngineVersionManager type +type MockVersionManager struct { + mock.Mock +} + +type MockVersionManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockVersionManager) EXPECT() *MockVersionManager_Expecter { + return &MockVersionManager_Expecter{mock: &_m.Mock} +} + +// AddNode provides a mock function with given fields: session +func (_m *MockVersionManager) AddNode(session *sessionutil.Session) { + _m.Called(session) +} + +// MockVersionManager_AddNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddNode' +type MockVersionManager_AddNode_Call struct { + *mock.Call +} + +// AddNode is a helper method to define mock.On call +// - session *sessionutil.Session +func (_e *MockVersionManager_Expecter) AddNode(session interface{}) *MockVersionManager_AddNode_Call { + return &MockVersionManager_AddNode_Call{Call: _e.mock.On("AddNode", session)} +} + +func (_c *MockVersionManager_AddNode_Call) Run(run func(session *sessionutil.Session)) *MockVersionManager_AddNode_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*sessionutil.Session)) + }) + return _c +} + +func (_c *MockVersionManager_AddNode_Call) Return() *MockVersionManager_AddNode_Call { + _c.Call.Return() + return _c +} + +func (_c *MockVersionManager_AddNode_Call) RunAndReturn(run func(*sessionutil.Session)) *MockVersionManager_AddNode_Call { + _c.Call.Return(run) + return _c +} + +// GetCurrentIndexEngineVersion provides a mock function with given fields: +func (_m *MockVersionManager) GetCurrentIndexEngineVersion() int32 { + ret := _m.Called() + + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int32) + } + + return r0 +} + +// MockVersionManager_GetCurrentIndexEngineVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCurrentIndexEngineVersion' +type MockVersionManager_GetCurrentIndexEngineVersion_Call struct { + *mock.Call +} + +// GetCurrentIndexEngineVersion is a helper method to define mock.On call +func (_e *MockVersionManager_Expecter) GetCurrentIndexEngineVersion() *MockVersionManager_GetCurrentIndexEngineVersion_Call { + return &MockVersionManager_GetCurrentIndexEngineVersion_Call{Call: _e.mock.On("GetCurrentIndexEngineVersion")} +} + +func (_c *MockVersionManager_GetCurrentIndexEngineVersion_Call) Run(run func()) *MockVersionManager_GetCurrentIndexEngineVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockVersionManager_GetCurrentIndexEngineVersion_Call) Return(_a0 int32) *MockVersionManager_GetCurrentIndexEngineVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockVersionManager_GetCurrentIndexEngineVersion_Call) RunAndReturn(run func() int32) *MockVersionManager_GetCurrentIndexEngineVersion_Call { + _c.Call.Return(run) + return _c +} + +// GetMinimalIndexEngineVersion provides a mock function with given fields: +func (_m *MockVersionManager) GetMinimalIndexEngineVersion() int32 { + ret := _m.Called() + + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int32) + } + + return r0 +} + +// MockVersionManager_GetMinimalIndexEngineVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMinimalIndexEngineVersion' +type MockVersionManager_GetMinimalIndexEngineVersion_Call struct { + *mock.Call +} + +// GetMinimalIndexEngineVersion is a helper method to define mock.On call +func (_e *MockVersionManager_Expecter) GetMinimalIndexEngineVersion() *MockVersionManager_GetMinimalIndexEngineVersion_Call { + return &MockVersionManager_GetMinimalIndexEngineVersion_Call{Call: _e.mock.On("GetMinimalIndexEngineVersion")} +} + +func (_c *MockVersionManager_GetMinimalIndexEngineVersion_Call) Run(run func()) *MockVersionManager_GetMinimalIndexEngineVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockVersionManager_GetMinimalIndexEngineVersion_Call) Return(_a0 int32) *MockVersionManager_GetMinimalIndexEngineVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockVersionManager_GetMinimalIndexEngineVersion_Call) RunAndReturn(run func() int32) *MockVersionManager_GetMinimalIndexEngineVersion_Call { + _c.Call.Return(run) + return _c +} + +// RemoveNode provides a mock function with given fields: session +func (_m *MockVersionManager) RemoveNode(session *sessionutil.Session) { + _m.Called(session) +} + +// MockVersionManager_RemoveNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveNode' +type MockVersionManager_RemoveNode_Call struct { + *mock.Call +} + +// RemoveNode is a helper method to define mock.On call +// - session *sessionutil.Session +func (_e *MockVersionManager_Expecter) RemoveNode(session interface{}) *MockVersionManager_RemoveNode_Call { + return &MockVersionManager_RemoveNode_Call{Call: _e.mock.On("RemoveNode", session)} +} + +func (_c *MockVersionManager_RemoveNode_Call) Run(run func(session *sessionutil.Session)) *MockVersionManager_RemoveNode_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*sessionutil.Session)) + }) + return _c +} + +func (_c *MockVersionManager_RemoveNode_Call) Return() *MockVersionManager_RemoveNode_Call { + _c.Call.Return() + return _c +} + +func (_c *MockVersionManager_RemoveNode_Call) RunAndReturn(run func(*sessionutil.Session)) *MockVersionManager_RemoveNode_Call { + _c.Call.Return(run) + return _c +} + +// Startup provides a mock function with given fields: sessions +func (_m *MockVersionManager) Startup(sessions map[string]*sessionutil.Session) { + _m.Called(sessions) +} + +// MockVersionManager_Startup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Startup' +type MockVersionManager_Startup_Call struct { + *mock.Call +} + +// Startup is a helper method to define mock.On call +// - sessions map[string]*sessionutil.Session +func (_e *MockVersionManager_Expecter) Startup(sessions interface{}) *MockVersionManager_Startup_Call { + return &MockVersionManager_Startup_Call{Call: _e.mock.On("Startup", sessions)} +} + +func (_c *MockVersionManager_Startup_Call) Run(run func(sessions map[string]*sessionutil.Session)) *MockVersionManager_Startup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]*sessionutil.Session)) + }) + return _c +} + +func (_c *MockVersionManager_Startup_Call) Return() *MockVersionManager_Startup_Call { + _c.Call.Return() + return _c +} + +func (_c *MockVersionManager_Startup_Call) RunAndReturn(run func(map[string]*sessionutil.Session)) *MockVersionManager_Startup_Call { + _c.Call.Return(run) + return _c +} + +// Update provides a mock function with given fields: session +func (_m *MockVersionManager) Update(session *sessionutil.Session) { + _m.Called(session) +} + +// MockVersionManager_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update' +type MockVersionManager_Update_Call struct { + *mock.Call +} + +// Update is a helper method to define mock.On call +// - session *sessionutil.Session +func (_e *MockVersionManager_Expecter) Update(session interface{}) *MockVersionManager_Update_Call { + return &MockVersionManager_Update_Call{Call: _e.mock.On("Update", session)} +} + +func (_c *MockVersionManager_Update_Call) Run(run func(session *sessionutil.Session)) *MockVersionManager_Update_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*sessionutil.Session)) + }) + return _c +} + +func (_c *MockVersionManager_Update_Call) Return() *MockVersionManager_Update_Call { + _c.Call.Return() + return _c +} + +func (_c *MockVersionManager_Update_Call) RunAndReturn(run func(*sessionutil.Session)) *MockVersionManager_Update_Call { + _c.Call.Return(run) + return _c +} + +// NewMockVersionManager creates a new instance of MockVersionManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockVersionManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockVersionManager { + mock := &MockVersionManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 24f7f194065f1..0ddf8f728418b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -149,7 +149,7 @@ type Server struct { // segReferManager *SegmentReferenceManager indexBuilder *indexBuilder indexNodeManager *IndexNodeManager - indexEngineVersionManager *IndexEngineVersionManager + indexEngineVersionManager IndexEngineVersionManager // manage ways that data coord access other coord broker Broker @@ -454,7 +454,7 @@ func (s *Server) stopCompactionHandler() { } func (s *Server) createCompactionTrigger() { - s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler) + s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) } func (s *Server) stopCompactionTrigger() { diff --git a/internal/mocks/mock_proxy.go b/internal/mocks/mock_proxy.go index 706b2fe25c7c7..22715c2f1c4c7 100644 --- a/internal/mocks/mock_proxy.go +++ b/internal/mocks/mock_proxy.go @@ -4445,17 +4445,17 @@ func (_c *MockProxy_RenameCollection_Call) RunAndReturn(run func(context.Context return _c } -// ReplicateMessage provides a mock function with given fields: ctx, req -func (_m *MockProxy) ReplicateMessage(ctx context.Context, req *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error) { - ret := _m.Called(ctx, req) +// ReplicateMessage provides a mock function with given fields: _a0, _a1 +func (_m *MockProxy) ReplicateMessage(_a0 context.Context, _a1 *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error) { + ret := _m.Called(_a0, _a1) var r0 *milvuspb.ReplicateMessageResponse var r1 error if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error)); ok { - return rf(ctx, req) + return rf(_a0, _a1) } if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.ReplicateMessageRequest) *milvuspb.ReplicateMessageResponse); ok { - r0 = rf(ctx, req) + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*milvuspb.ReplicateMessageResponse) @@ -4463,7 +4463,7 @@ func (_m *MockProxy) ReplicateMessage(ctx context.Context, req *milvuspb.Replica } if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.ReplicateMessageRequest) error); ok { - r1 = rf(ctx, req) + r1 = rf(_a0, _a1) } else { r1 = ret.Error(1) } @@ -4477,13 +4477,13 @@ type MockProxy_ReplicateMessage_Call struct { } // ReplicateMessage is a helper method to define mock.On call -// - ctx context.Context -// - req *milvuspb.ReplicateMessageRequest -func (_e *MockProxy_Expecter) ReplicateMessage(ctx interface{}, req interface{}) *MockProxy_ReplicateMessage_Call { - return &MockProxy_ReplicateMessage_Call{Call: _e.mock.On("ReplicateMessage", ctx, req)} +// - _a0 context.Context +// - _a1 *milvuspb.ReplicateMessageRequest +func (_e *MockProxy_Expecter) ReplicateMessage(_a0 interface{}, _a1 interface{}) *MockProxy_ReplicateMessage_Call { + return &MockProxy_ReplicateMessage_Call{Call: _e.mock.On("ReplicateMessage", _a0, _a1)} } -func (_c *MockProxy_ReplicateMessage_Call) Run(run func(ctx context.Context, req *milvuspb.ReplicateMessageRequest)) *MockProxy_ReplicateMessage_Call { +func (_c *MockProxy_ReplicateMessage_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.ReplicateMessageRequest)) *MockProxy_ReplicateMessage_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(*milvuspb.ReplicateMessageRequest)) })