From f1b6ccf305b4dcceaa80d496001a6c808ae80dda Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 2 Jan 2024 18:08:49 +0800 Subject: [PATCH] enhance: compaction use ChannelManager interface (#29530) Rewrite compaction_test.go See also: #29447 Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 4 +- internal/datacoord/compaction_test.go | 1002 ++++++------------------- 2 files changed, 239 insertions(+), 767 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 6605231b392f9..5ceb36c3a70bc 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -111,7 +111,7 @@ type compactionPlanHandler struct { meta CompactionMeta allocator allocator - chManager *ChannelManagerImpl + chManager ChannelManager scheduler Scheduler sessions SessionManager @@ -120,7 +120,7 @@ type compactionPlanHandler struct { stopWg sync.WaitGroup } -func newCompactionPlanHandler(sessions SessionManager, cm *ChannelManagerImpl, meta CompactionMeta, allocator allocator, +func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator, ) *compactionPlanHandler { return &compactionPlanHandler{ plans: make(map[int64]*compactionTask), diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 4b14f8696f5c3..7dae970d8d8a6 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -17,26 +17,17 @@ package datacoord import ( - "context" - "sync" "testing" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - mockkv "github.com/milvus-io/milvus/internal/kv/mocks" - "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" - "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/metautil" - "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -48,15 +39,19 @@ func TestCompactionPlanHandlerSuite(t *testing.T) { type CompactionPlanHandlerSuite struct { suite.Suite - mockMeta *MockCompactionMeta - mockAlloc *NMockAllocator - mockSch *MockScheduler + mockMeta *MockCompactionMeta + mockAlloc *NMockAllocator + mockSch *MockScheduler + mockCm *MockChannelManager + mockSession *MockSessionManager } func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockMeta = NewMockCompactionMeta(s.T()) s.mockAlloc = NewNMockAllocator(s.T()) s.mockSch = NewMockScheduler(s.T()) + s.mockCm = NewMockChannelManager(s.T()) + s.mockSession = NewMockSessionManager(s.T()) } func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { @@ -81,34 +76,21 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { } func (s *CompactionPlanHandlerSuite) TestCheckResult() { - session := &SessionManagerImpl{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - 2: {client: &mockDataNodeClient{ - compactionStateResp: &datapb.CompactionStateResponse{ - Results: []*datapb.CompactionPlanResult{ - {PlanID: 1, State: commonpb.CompactionState_Executing}, - {PlanID: 3, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 3}}}, - {PlanID: 4, State: commonpb.CompactionState_Executing}, - {PlanID: 6, State: commonpb.CompactionState_Executing}, - }, - }, - }}, - }, - }, - } + s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ + 1: {PlanID: 1, State: commonpb.CompactionState_Executing}, + 2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, + 3: {PlanID: 3, State: commonpb.CompactionState_Executing}, + 4: {PlanID: 4, State: commonpb.CompactionState_Executing}, + }) { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() - handler := newCompactionPlanHandler(session, nil, nil, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc) handler.checkResult() } { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once() - handler := newCompactionPlanHandler(session, nil, nil, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc) handler.checkResult() } } @@ -276,360 +258,180 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs) } -func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { - type fields struct { - plans map[int64]*compactionTask - sessions SessionManager - chManager *ChannelManagerImpl - allocatorFactory func() allocator - } - type args struct { - signal *compactionSignal - plan *datapb.CompactionPlan - } +func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { + s.mockCm.EXPECT().FindWatcher(mock.Anything).RunAndReturn(func(channel string) (int64, error) { + if channel == "ch-1" { + return 0, errors.Errorf("mock error for ch-1") + } + + return 1, nil + }).Twice() + s.mockSch.EXPECT().Submit(mock.Anything).Return().Once() + tests := []struct { - name string - fields fields - args args - wantErr bool - err error + description string + channel string + hasError bool }{ - { - "test exec compaction", - fields{ - plans: map[int64]*compactionTask{}, - sessions: &SessionManagerImpl{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - 1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1)}}, - }, - }, - }, - chManager: &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}}, - }, - }, - }, - allocatorFactory: func() allocator { return newMockAllocator() }, - }, - args{ - signal: &compactionSignal{id: 100}, - plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, - }, - false, - nil, - }, - { - "test exec compaction failed", - fields{ - plans: map[int64]*compactionTask{}, - chManager: &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{}}, - bufferID: {NodeID: bufferID, Channels: []RWChannel{}}, - }, - }, - }, - allocatorFactory: func() allocator { return newMockAllocator() }, - }, - args{ - signal: &compactionSignal{id: 100}, - plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, - }, - true, - errChannelNotWatched, - }, + {"channel with error", "ch-1", true}, + {"channel with no error", "ch-2", false}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - scheduler := NewCompactionScheduler() - c := &compactionPlanHandler{ - plans: tt.fields.plans, - sessions: tt.fields.sessions, - chManager: tt.fields.chManager, - allocator: tt.fields.allocatorFactory(), - scheduler: scheduler, - } - Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1") - c.start() - err := c.execCompactionPlan(tt.args.signal, tt.args.plan) - require.ErrorIs(t, tt.err, err) - - task := c.getCompaction(tt.args.plan.PlanID) - if !tt.wantErr { - assert.Equal(t, tt.args.plan, task.plan) - assert.Equal(t, tt.args.signal, task.triggerInfo) - assert.Equal(t, 1, c.scheduler.GetTaskCount()) + + handler := newCompactionPlanHandler(nil, s.mockCm, s.mockMeta, s.mockAlloc) + handler.scheduler = s.mockSch + + for idx, test := range tests { + sig := &compactionSignal{id: int64(idx)} + plan := &datapb.CompactionPlan{ + PlanID: int64(idx), + } + s.Run(test.description, func() { + plan.Channel = test.channel + + err := handler.execCompactionPlan(sig, plan) + if test.hasError { + s.Error(err) } else { - assert.Eventually(t, - func() bool { - scheduler.mu.RLock() - defer scheduler.mu.RUnlock() - return c.scheduler.GetTaskCount() == 0 && len(scheduler.parallelTasks[1]) == 0 - }, - 5*time.Second, 100*time.Millisecond) + s.NoError(err) } - c.stop() }) } } -func Test_compactionPlanHandler_execWithParallels(t *testing.T) { - mockDataNode := &mocks.MockDataNodeClient{} - paramtable.Get().Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "0.001") - defer paramtable.Get().Reset(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key) - c := &compactionPlanHandler{ - plans: map[int64]*compactionTask{}, - sessions: &SessionManagerImpl{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - 1: {client: mockDataNode}, - }, - }, - }, - chManager: &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}}, - }, - }, - }, - allocator: newMockAllocator(), - scheduler: NewCompactionScheduler(), - } - - signal := &compactionSignal{id: 100} - plan1 := &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction} - plan2 := &datapb.CompactionPlan{PlanID: 2, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction} - plan3 := &datapb.CompactionPlan{PlanID: 3, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction} - - var mut sync.RWMutex - called := 0 - - mockDataNode.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything). - Run(func(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) { - mut.Lock() - defer mut.Unlock() - called++ - }).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil).Times(2) - - err := c.execCompactionPlan(signal, plan1) - require.NoError(t, err) - err = c.execCompactionPlan(signal, plan2) - require.NoError(t, err) - err = c.execCompactionPlan(signal, plan3) - require.NoError(t, err) - - assert.Equal(t, 3, c.scheduler.GetTaskCount()) - - // parallel for the same node are 2 - c.schedule() - c.schedule() - - // wait for compaction called - assert.Eventually(t, func() bool { - mut.RLock() - defer mut.RUnlock() - return called == 2 - }, 3*time.Second, time.Millisecond*100) - - tasks := c.scheduler.Schedule() - assert.Equal(t, 0, len(tasks)) -} - -func getInsertLogPath(rootPath string, segmentID typeutil.UniqueID) string { - return metautil.BuildInsertLogPath(rootPath, 10, 100, segmentID, 1000, 10000) -} - -func getStatsLogPath(rootPath string, segmentID typeutil.UniqueID) string { - return metautil.BuildStatsLogPath(rootPath, 10, 100, segmentID, 1000, 10000) -} - -func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string { - return metautil.BuildDeltaLogPath(rootPath, 10, 100, segmentID, 10000) -} - -func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) { - mockDataNode := &mocks.MockDataNodeClient{} - call := mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything, mock.Anything). - Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest, opts ...grpc.CallOption) {}). - Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil) - - dataNodeID := UniqueID(111) - - seg1 := &datapb.SegmentInfo{ - ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}, - } - - seg2 := &datapb.SegmentInfo{ - ID: 2, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))}, - } - +func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { plan := &datapb.CompactionPlan{ PlanID: 1, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: seg1.ID, - FieldBinlogs: seg1.GetBinlogs(), - Field2StatslogPaths: seg1.GetStatslogs(), - Deltalogs: seg1.GetDeltalogs(), - }, - { - SegmentID: seg2.ID, - FieldBinlogs: seg2.GetBinlogs(), - Field2StatslogPaths: seg2.GetStatslogs(), - Deltalogs: seg2.GetDeltalogs(), - }, - }, - Type: datapb.CompactionType_MergeCompaction, - } - - sessions := &SessionManagerImpl{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - dataNodeID: {client: mockDataNode}, - }, + {SegmentID: 1}, + {SegmentID: 2}, }, + Type: datapb.CompactionType_MixCompaction, } - task := &compactionTask{ - triggerInfo: &compactionSignal{id: 1}, - state: executing, - plan: plan, - dataNodeID: dataNodeID, - } + s.Run("illegal nil result", func() { + s.SetupTest() + handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + err := handler.handleMergeCompactionResult(nil, nil) + s.Error(err) + }) - plans := map[int64]*compactionTask{1: task} - - metakv := mockkv.NewMetaKv(t) - metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe() - metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe() - metakv.EXPECT().HasPrefix(mock.Anything).Return(false, nil).Maybe() - errMeta := &meta{ - catalog: &datacoord.Catalog{MetaKv: metakv}, - segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ - seg1.ID: {SegmentInfo: seg1}, - seg2.ID: {SegmentInfo: seg2}, + s.Run("not empty compacted to segment info", func() { + s.SetupTest() + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn( + func(segID int64) *SegmentInfo { + if segID == 3 { + return NewSegmentInfo(&datapb.SegmentInfo{ID: 3}) + } + return nil + }).Once() + s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() + + handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} + + compactionResult := &datapb.CompactionPlanResult{ + PlanID: plan.PlanID, + Segments: []*datapb.CompactionSegment{ + {SegmentID: 3, NumOfRows: 15}, }, - }, - } + } - meta := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ - seg1.ID: {SegmentInfo: seg1}, - seg2.ID: {SegmentInfo: seg2}, + err := handler.handleMergeCompactionResult(plan, compactionResult) + s.NoError(err) + }) + s.Run("prepare error", func() { + s.SetupTest() + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return( + nil, nil, nil, errors.New("mock error")).Once() + + handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} + compactionResult := &datapb.CompactionPlanResult{ + PlanID: plan.PlanID, + Segments: []*datapb.CompactionSegment{ + {SegmentID: 4, NumOfRows: 15}, }, - }, - } - - c := &compactionPlanHandler{ - plans: plans, - sessions: sessions, - meta: meta, - } - - c2 := &compactionPlanHandler{ - plans: plans, - sessions: sessions, - meta: errMeta, - } + } - compactionResult := &datapb.CompactionPlanResult{ - PlanID: 1, - Segments: []*datapb.CompactionSegment{ - { - SegmentID: 3, - NumOfRows: 15, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, - }, - }, - } + err := handler.handleMergeCompactionResult(plan, compactionResult) + s.Error(err) + }) - compactionResult2 := &datapb.CompactionPlanResult{ - PlanID: 1, - Segments: []*datapb.CompactionSegment{ - { - SegmentID: 3, - NumOfRows: 0, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, + s.Run("alter error", func() { + s.SetupTest() + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return( + []*SegmentInfo{}, + NewSegmentInfo(&datapb.SegmentInfo{ID: 100}), + &segMetricMutation{}, nil).Once() + s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything). + Return(errors.New("mock error")).Once() + + handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} + compactionResult := &datapb.CompactionPlanResult{ + PlanID: plan.PlanID, + Segments: []*datapb.CompactionSegment{ + {SegmentID: 4, NumOfRows: 15}, }, - }, - } - - has, err := meta.HasSegments([]UniqueID{1, 2}) - require.NoError(t, err) - require.True(t, has) - - has, err = meta.HasSegments([]UniqueID{3}) - require.Error(t, err) - require.False(t, has) - - err = c.handleMergeCompactionResult(plan, compactionResult) - assert.NoError(t, err) - - err = c.handleMergeCompactionResult(plan, compactionResult2) - assert.NoError(t, err) + } - err = c2.handleMergeCompactionResult(plan, compactionResult2) - assert.Error(t, err) + err := handler.handleMergeCompactionResult(plan, compactionResult) + s.Error(err) + }) - has, err = meta.HasSegments([]UniqueID{1, 2, 3}) - require.NoError(t, err) - require.True(t, has) + s.Run("sync segment error", func() { + s.SetupTest() + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return( + []*SegmentInfo{}, + NewSegmentInfo(&datapb.SegmentInfo{ID: 100}), + &segMetricMutation{}, nil).Once() + s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything). + Return(nil).Once() + s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() + + handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} + compactionResult := &datapb.CompactionPlanResult{ + PlanID: plan.PlanID, + Segments: []*datapb.CompactionSegment{ + {SegmentID: 4, NumOfRows: 15}, + }, + } - call.Unset() - mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything, mock.Anything). - Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil) - err = c.handleMergeCompactionResult(plan, compactionResult2) - assert.Error(t, err) + err := handler.handleMergeCompactionResult(plan, compactionResult) + s.Error(err) + }) } -func TestCompactionPlanHandler_completeCompaction(t *testing.T) { - t.Run("test not exists compaction task", func(t *testing.T) { - c := &compactionPlanHandler{ - plans: map[int64]*compactionTask{1: {}}, - } - err := c.completeCompaction(&datapb.CompactionPlanResult{PlanID: 2}) - assert.Error(t, err) +func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() { + s.Run("test not exists compaction task", func() { + handler := newCompactionPlanHandler(nil, nil, nil, nil) + err := handler.completeCompaction(&datapb.CompactionPlanResult{PlanID: 2}) + s.Error(err) }) - t.Run("test completed compaction task", func(t *testing.T) { + + s.Run("test completed compaction task", func() { c := &compactionPlanHandler{ plans: map[int64]*compactionTask{1: {state: completed}}, } err := c.completeCompaction(&datapb.CompactionPlanResult{PlanID: 1}) - assert.Error(t, err) + s.Error(err) }) - t.Run("test complete merge compaction task", func(t *testing.T) { - mockDataNode := &mocks.MockDataNodeClient{} - mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything, mock.Anything). - Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest, opts ...grpc.CallOption) {}). - Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil) + s.Run("test complete merge compaction task", func() { + s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() + // mock for handleMergeCompactionResult + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return( + []*SegmentInfo{}, + NewSegmentInfo(&datapb.SegmentInfo{ID: 100}), + &segMetricMutation{}, nil).Once() + s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything). + Return(nil).Once() + s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Return() dataNodeID := UniqueID(111) @@ -663,18 +465,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { Deltalogs: seg2.GetDeltalogs(), }, }, - Type: datapb.CompactionType_MergeCompaction, - } - - sessions := &SessionManagerImpl{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - dataNodeID: {client: mockDataNode}, - }, - }, + Type: datapb.CompactionType_MixCompaction, } task := &compactionTask{ @@ -686,15 +477,6 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { plans := map[int64]*compactionTask{1: task} - meta := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ - seg1.ID: {SegmentInfo: seg1}, - seg2.ID: {SegmentInfo: seg2}, - }, - }, - } compactionResult := datapb.CompactionPlanResult{ PlanID: 1, Segments: []*datapb.CompactionSegment{ @@ -708,420 +490,98 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { }, } - c := &compactionPlanHandler{ - plans: plans, - sessions: sessions, - meta: meta, - scheduler: NewCompactionScheduler(), - } + c := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + c.scheduler = s.mockSch + c.plans = plans err := c.completeCompaction(&compactionResult) - assert.NoError(t, err) - assert.Nil(t, compactionResult.GetSegments()[0].GetInsertLogs()) - assert.Nil(t, compactionResult.GetSegments()[0].GetField2StatslogPaths()) - assert.Nil(t, compactionResult.GetSegments()[0].GetDeltalogs()) + s.NoError(err) + s.Nil(compactionResult.GetSegments()[0].GetInsertLogs()) + s.Nil(compactionResult.GetSegments()[0].GetField2StatslogPaths()) + s.Nil(compactionResult.GetSegments()[0].GetDeltalogs()) }) +} - t.Run("test empty result merge compaction task", func(t *testing.T) { - mockDataNode := &mocks.MockDataNodeClient{} - mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything, mock.Anything). - Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest, opts ...grpc.CallOption) {}). - Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil) - - dataNodeID := UniqueID(111) - - seg1 := &datapb.SegmentInfo{ - ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}, - } - - seg2 := &datapb.SegmentInfo{ - ID: 2, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))}, - } - - plan := &datapb.CompactionPlan{ - PlanID: 1, - SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: seg1.ID, - FieldBinlogs: seg1.GetBinlogs(), - Field2StatslogPaths: seg1.GetStatslogs(), - Deltalogs: seg1.GetDeltalogs(), - }, - { - SegmentID: seg2.ID, - FieldBinlogs: seg2.GetBinlogs(), - Field2StatslogPaths: seg2.GetStatslogs(), - Deltalogs: seg2.GetDeltalogs(), - }, - }, - Type: datapb.CompactionType_MergeCompaction, - } - - sessions := &SessionManagerImpl{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - dataNodeID: {client: mockDataNode}, - }, - }, - } - - task := &compactionTask{ +func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { + inPlans := map[int64]*compactionTask{ + 1: { triggerInfo: &compactionSignal{id: 1}, + plan: &datapb.CompactionPlan{PlanID: 1}, state: executing, - plan: plan, - dataNodeID: dataNodeID, - } - - plans := map[int64]*compactionTask{1: task} - - meta := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ - seg1.ID: {SegmentInfo: seg1}, - seg2.ID: {SegmentInfo: seg2}, - }, - }, - } + }, + 2: { + triggerInfo: &compactionSignal{id: 1}, + plan: &datapb.CompactionPlan{PlanID: 2}, + state: completed, + }, + 3: { + triggerInfo: &compactionSignal{id: 1}, + plan: &datapb.CompactionPlan{PlanID: 3}, + state: failed, + }, + } - meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) - meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) + expected := lo.Values(inPlans) - segments := meta.GetAllSegmentsUnsafe() - assert.Equal(t, len(segments), 2) - compactionResult := datapb.CompactionPlanResult{ - PlanID: 1, - Segments: []*datapb.CompactionSegment{ - { - SegmentID: 3, - NumOfRows: 0, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, - }, - }, - } + handler := &compactionPlanHandler{plans: inPlans} + got := handler.getCompactionTasksBySignalID(1) + s.ElementsMatch(expected, got) - c := &compactionPlanHandler{ - plans: plans, - sessions: sessions, - meta: meta, - scheduler: NewCompactionScheduler(), - } - - err := c.completeCompaction(&compactionResult) - assert.NoError(t, err) + task := handler.getCompaction(1) + s.NotNil(task) + s.EqualValues(1, task.plan.PlanID) - segments = meta.GetAllSegmentsUnsafe() - assert.Equal(t, len(segments), 3) + task = handler.getCompaction(19530) + s.Nil(task) +} - for _, segment := range segments { - assert.True(t, segment.State == commonpb.SegmentState_Dropped) - } +func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { + s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ + 1: {PlanID: 1, State: commonpb.CompactionState_Executing}, + 2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, + 3: {PlanID: 3, State: commonpb.CompactionState_Executing}, }) -} -func Test_compactionPlanHandler_getCompaction(t *testing.T) { - type fields struct { - plans map[int64]*compactionTask - sessions SessionManager - } - type args struct { - planID int64 - } - tests := []struct { - name string - fields fields - args args - want *compactionTask - }{ - { - "test get non existed task", - fields{plans: map[int64]*compactionTask{}}, - args{planID: 1}, - nil, + inPlans := map[int64]*compactionTask{ + 1: { + triggerInfo: &compactionSignal{}, + plan: &datapb.CompactionPlan{PlanID: 1}, + state: executing, }, - { - "test get existed task", - fields{ - plans: map[int64]*compactionTask{1: { - state: executing, - }}, - }, - args{planID: 1}, - &compactionTask{ - state: executing, - }, + 2: { + triggerInfo: &compactionSignal{}, + plan: &datapb.CompactionPlan{PlanID: 2}, + state: executing, }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &compactionPlanHandler{ - plans: tt.fields.plans, - sessions: tt.fields.sessions, - } - got := c.getCompaction(tt.args.planID) - assert.EqualValues(t, tt.want, got) - }) - } -} - -func Test_compactionPlanHandler_updateCompaction(t *testing.T) { - type fields struct { - plans map[int64]*compactionTask - sessions SessionManager - meta *meta - } - type args struct { - ts Timestamp - } - - ts := time.Now() - tests := []struct { - name string - fields fields - args args - wantErr bool - timeout []int64 - failed []int64 - unexpired []int64 - }{ - { - "test update compaction task", - fields{ - plans: map[int64]*compactionTask{ - 1: { - state: executing, - dataNodeID: 1, - plan: &datapb.CompactionPlan{ - PlanID: 1, - StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0), - TimeoutInSeconds: 10, - SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - {SegmentID: 1}, - }, - }, - }, - 2: { - state: executing, - dataNodeID: 2, - plan: &datapb.CompactionPlan{ - PlanID: 2, - StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0), - TimeoutInSeconds: 1, - }, - }, - 3: { - state: executing, - dataNodeID: 2, - plan: &datapb.CompactionPlan{ - PlanID: 3, - StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0), - TimeoutInSeconds: 1, - }, - }, - 4: { - state: executing, - dataNodeID: 2, - plan: &datapb.CompactionPlan{ - PlanID: 4, - StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0) - 200*1000, - TimeoutInSeconds: 1, - }, - }, - 5: { // timeout and failed - state: timeout, - dataNodeID: 2, - plan: &datapb.CompactionPlan{ - PlanID: 5, - StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0) - 200*1000, - TimeoutInSeconds: 1, - }, - }, - 6: { // timeout and executing - state: timeout, - dataNodeID: 2, - plan: &datapb.CompactionPlan{ - PlanID: 6, - StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0) - 200*1000, - TimeoutInSeconds: 1, - }, - }, - }, - meta: &meta{ - segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ - 1: {SegmentInfo: &datapb.SegmentInfo{ID: 1}}, - }, - }, - }, - sessions: &SessionManagerImpl{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - 2: {client: &mockDataNodeClient{ - compactionStateResp: &datapb.CompactionStateResponse{ - Results: []*datapb.CompactionPlanResult{ - {PlanID: 1, State: commonpb.CompactionState_Executing}, - {PlanID: 3, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 3}}}, - {PlanID: 4, State: commonpb.CompactionState_Executing}, - {PlanID: 6, State: commonpb.CompactionState_Executing}, - }, - }, - }}, - }, - }, - }, - }, - args{ts: tsoutil.ComposeTS(ts.Add(5*time.Second).UnixNano()/int64(time.Millisecond), 0)}, - false, - []int64{4, 6}, - []int64{2, 5}, - []int64{1, 3}, + 3: { + triggerInfo: &compactionSignal{}, + plan: &datapb.CompactionPlan{PlanID: 3}, + state: timeout, + }, + 4: { + triggerInfo: &compactionSignal{}, + plan: &datapb.CompactionPlan{PlanID: 4}, + state: timeout, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - scheduler := NewCompactionScheduler() - c := &compactionPlanHandler{ - plans: tt.fields.plans, - sessions: tt.fields.sessions, - meta: tt.fields.meta, - scheduler: scheduler, - } - err := c.updateCompaction(tt.args.ts) - assert.Equal(t, tt.wantErr, err != nil) + handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler.plans = inPlans - for _, id := range tt.timeout { - task := c.getCompaction(id) - assert.Equal(t, timeout, task.state) - } - - for _, id := range tt.failed { - task := c.getCompaction(id) - assert.Equal(t, failed, task.state) - } - - for _, id := range tt.unexpired { - task := c.getCompaction(id) - assert.NotEqual(t, failed, task.state) - } + err := handler.updateCompaction(0) + s.NoError(err) - scheduler.mu.Lock() - assert.Equal(t, 0, len(scheduler.parallelTasks[2])) - scheduler.mu.Unlock() - }) - } -} + task := handler.plans[1] + s.Equal(timeout, task.state) -func Test_newCompactionPlanHandler(t *testing.T) { - type args struct { - sessions SessionManager - cm *ChannelManagerImpl - meta *meta - allocator allocator - } - tests := []struct { - name string - args args - want *compactionPlanHandler - }{ - { - "test new handler", - args{ - &SessionManagerImpl{}, - &ChannelManagerImpl{}, - &meta{}, - newMockAllocator(), - }, - &compactionPlanHandler{ - plans: map[int64]*compactionTask{}, - sessions: &SessionManagerImpl{}, - chManager: &ChannelManagerImpl{}, - meta: &meta{}, - allocator: newMockAllocator(), - scheduler: NewCompactionScheduler(), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := newCompactionPlanHandler(tt.args.sessions, tt.args.cm, tt.args.meta, tt.args.allocator) - assert.EqualValues(t, tt.want, got) - }) - } -} + task = handler.plans[2] + s.Equal(executing, task.state) -func Test_getCompactionTasksBySignalID(t *testing.T) { - type fields struct { - plans map[int64]*compactionTask - } - type args struct { - signalID int64 - } - tests := []struct { - name string - fields fields - args args - want []*compactionTask - }{ - { - "test get compaction tasks", - fields{ - plans: map[int64]*compactionTask{ - 1: { - triggerInfo: &compactionSignal{id: 1}, - state: executing, - }, - 2: { - triggerInfo: &compactionSignal{id: 1}, - state: completed, - }, - 3: { - triggerInfo: &compactionSignal{id: 1}, - state: failed, - }, - }, - }, - args{1}, - []*compactionTask{ - { - triggerInfo: &compactionSignal{id: 1}, - state: executing, - }, - { - triggerInfo: &compactionSignal{id: 1}, - state: completed, - }, - { - triggerInfo: &compactionSignal{id: 1}, - state: failed, - }, - }, - }, - } + task = handler.plans[3] + s.Equal(timeout, task.state) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - h := &compactionPlanHandler{ - plans: tt.fields.plans, - } - got := h.getCompactionTasksBySignalID(tt.args.signalID) - assert.ElementsMatch(t, tt.want, got) - }) - } + task = handler.plans[4] + s.Equal(failed, task.state) } func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog { @@ -1145,3 +605,15 @@ func getFieldBinlogPathsWithEntry(id int64, entry int64, paths ...string) *datap } return l } + +func getInsertLogPath(rootPath string, segmentID typeutil.UniqueID) string { + return metautil.BuildInsertLogPath(rootPath, 10, 100, segmentID, 1000, 10000) +} + +func getStatsLogPath(rootPath string, segmentID typeutil.UniqueID) string { + return metautil.BuildStatsLogPath(rootPath, 10, 100, segmentID, 1000, 10000) +} + +func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string { + return metautil.BuildDeltaLogPath(rootPath, 10, 100, segmentID, 10000) +}