From 0d7a89a4f8f801a0a0e37e296b4fad1c54cde315 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 11 Dec 2024 16:16:42 +0800 Subject: [PATCH] fix: Use the correct RootPath when decompressing binlog in stats task (#38341) issue: #38336 Signed-off-by: Cai Zhang --- internal/datacoord/import_checker.go | 14 +++++----- internal/datacoord/import_checker_test.go | 12 ++++++--- internal/datacoord/import_util.go | 4 +-- internal/datacoord/import_util_test.go | 14 +++++++--- internal/datacoord/job_manager.go | 11 ++++---- internal/datacoord/mock_job_manager.go | 30 ++++++++++++---------- internal/datacoord/mock_segment_manager.go | 21 +++++++-------- internal/indexnode/task_stats.go | 4 +-- internal/metastore/kv/binlog/binlog.go | 22 ++++++++++++++++ 9 files changed, 85 insertions(+), 47 deletions(-) diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 5af7259da08a1..d14ea3f451196 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -277,8 +277,8 @@ func (c *importChecker) checkImportingJob(job ImportJob) { func (c *importChecker) checkStatsJob(job ImportJob) { log := log.With(zap.Int64("jobID", job.GetJobID())) - updateJobState := func(state internalpb.ImportJobState) { - err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(state)) + updateJobState := func(state internalpb.ImportJobState, reason string) { + err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(state), UpdateJobReason(reason)) if err != nil { log.Warn("failed to update job state", zap.Error(err)) return @@ -290,7 +290,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) { // Skip stats stage if not enable stats or is l0 import. if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() || importutilv2.IsL0Import(job.GetOptions()) { - updateJobState(internalpb.ImportJobState_IndexBuilding) + updateJobState(internalpb.ImportJobState_IndexBuilding, "") return } @@ -306,8 +306,8 @@ func (c *importChecker) checkStatsJob(job ImportJob) { taskCnt += len(originSegmentIDs) for i, originSegmentID := range originSegmentIDs { taskLogFields := WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i])) - state := c.sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort) - switch state { + t := c.sjm.GetStatsTask(originSegmentID, indexpb.StatsSubJob_Sort) + switch t.GetState() { case indexpb.JobState_JobStateNone: err := c.sjm.SubmitStatsTask(originSegmentID, statsSegmentIDs[i], indexpb.StatsSubJob_Sort, false) if err != nil { @@ -319,7 +319,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) { log.Debug("waiting for stats task...", taskLogFields...) case indexpb.JobState_JobStateFailed: log.Warn("import job stats failed", taskLogFields...) - updateJobState(internalpb.ImportJobState_Failed) + updateJobState(internalpb.ImportJobState_Failed, t.GetFailReason()) return case indexpb.JobState_JobStateFinished: doneCnt++ @@ -329,7 +329,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) { // All segments are stats-ed. Update job state to `IndexBuilding`. if taskCnt == doneCnt { - updateJobState(internalpb.ImportJobState_IndexBuilding) + updateJobState(internalpb.ImportJobState_IndexBuilding, "") } } diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 702fe3896eeb7..3feea4e13fedf 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -216,16 +216,22 @@ func (s *ImportCheckerSuite) TestCheckJob() { alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil).Maybe() sjm := s.checker.sjm.(*MockStatsJobManager) sjm.EXPECT().SubmitStatsTask(mock.Anything, mock.Anything, mock.Anything, false).Return(nil) - sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateNone) + sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{ + State: indexpb.JobState_JobStateNone, + }) s.checker.checkStatsJob(job) s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) sjm = NewMockStatsJobManager(s.T()) - sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateInProgress) + sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{ + State: indexpb.JobState_JobStateInProgress, + }) s.checker.sjm = sjm s.checker.checkStatsJob(job) s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) sjm = NewMockStatsJobManager(s.T()) - sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateFinished) + sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{ + State: indexpb.JobState_JobStateFinished, + }) s.checker.sjm = sjm s.checker.checkStatsJob(job) s.Equal(internalpb.ImportJobState_IndexBuilding, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 76970b618bd54..27aca3085f3e2 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -460,8 +460,8 @@ func getStatsProgress(jobID int64, imeta ImportMeta, sjm StatsJobManager) float3 } doneCnt := 0 for _, originSegmentID := range originSegmentIDs { - state := sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort) - if state == indexpb.JobState_JobStateFinished { + t := sjm.GetStatsTask(originSegmentID, indexpb.StatsSubJob_Sort) + if t.GetState() == indexpb.JobState_JobStateFinished { doneCnt++ } } diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 53a265515a24e..ddfc21e19ef1e 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -622,11 +622,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats)) assert.NoError(t, err) sjm := NewMockStatsJobManager(t) - sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, _ indexpb.StatsSubJob) indexpb.JobState { + sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, _ indexpb.StatsSubJob) *indexpb.StatsTask { if lo.Contains([]int64{10, 11, 12}, segmentID) { - return indexpb.JobState_JobStateFinished + return &indexpb.StatsTask{ + State: indexpb.JobState_JobStateFinished, + } + } + return &indexpb.StatsTask{ + State: indexpb.JobState_JobStateInProgress, } - return indexpb.JobState_JobStateInProgress }) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) assert.Equal(t, int64(10+30+30+10*0.5), progress) @@ -635,7 +639,9 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // stats state, len(statsSegmentIDs) / (len(originalSegmentIDs) = 1 sjm = NewMockStatsJobManager(t) - sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateFinished) + sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{ + State: indexpb.JobState_JobStateFinished, + }) progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) assert.Equal(t, int64(10+30+30+10), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index 9b9165616079a..9db974a9e1105 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -20,7 +20,7 @@ type StatsJobManager interface { Start() Stop() SubmitStatsTask(originSegmentID, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool) error - GetStatsTaskState(originSegmentID int64, subJobType indexpb.StatsSubJob) indexpb.JobState + GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error } @@ -264,11 +264,12 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6 return nil } -func (jm *statsJobManager) GetStatsTaskState(originSegmentID int64, subJobType indexpb.StatsSubJob) indexpb.JobState { - state := jm.mt.statsTaskMeta.GetStatsTaskStateBySegmentID(originSegmentID, subJobType) +func (jm *statsJobManager) GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask { + task := jm.mt.statsTaskMeta.GetStatsTaskBySegmentID(originSegmentID, subJobType) log.Info("statsJobManager get stats task state", zap.Int64("segmentID", originSegmentID), - zap.String("subJobType", subJobType.String()), zap.String("state", state.String())) - return state + zap.String("subJobType", subJobType.String()), zap.String("state", task.GetState().String()), + zap.String("failReason", task.GetFailReason())) + return task } func (jm *statsJobManager) DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error { diff --git a/internal/datacoord/mock_job_manager.go b/internal/datacoord/mock_job_manager.go index c0361da48c596..1e0eb821c770a 100644 --- a/internal/datacoord/mock_job_manager.go +++ b/internal/datacoord/mock_job_manager.go @@ -67,49 +67,51 @@ func (_c *MockStatsJobManager_DropStatsTask_Call) RunAndReturn(run func(int64, i return _c } -// GetStatsTaskState provides a mock function with given fields: originSegmentID, subJobType -func (_m *MockStatsJobManager) GetStatsTaskState(originSegmentID int64, subJobType indexpb.StatsSubJob) indexpb.JobState { +// GetStatsTask provides a mock function with given fields: originSegmentID, subJobType +func (_m *MockStatsJobManager) GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask { ret := _m.Called(originSegmentID, subJobType) if len(ret) == 0 { - panic("no return value specified for GetStatsTaskState") + panic("no return value specified for GetStatsTask") } - var r0 indexpb.JobState - if rf, ok := ret.Get(0).(func(int64, indexpb.StatsSubJob) indexpb.JobState); ok { + var r0 *indexpb.StatsTask + if rf, ok := ret.Get(0).(func(int64, indexpb.StatsSubJob) *indexpb.StatsTask); ok { r0 = rf(originSegmentID, subJobType) } else { - r0 = ret.Get(0).(indexpb.JobState) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*indexpb.StatsTask) + } } return r0 } -// MockStatsJobManager_GetStatsTaskState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStatsTaskState' -type MockStatsJobManager_GetStatsTaskState_Call struct { +// MockStatsJobManager_GetStatsTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStatsTask' +type MockStatsJobManager_GetStatsTask_Call struct { *mock.Call } -// GetStatsTaskState is a helper method to define mock.On call +// GetStatsTask is a helper method to define mock.On call // - originSegmentID int64 // - subJobType indexpb.StatsSubJob -func (_e *MockStatsJobManager_Expecter) GetStatsTaskState(originSegmentID interface{}, subJobType interface{}) *MockStatsJobManager_GetStatsTaskState_Call { - return &MockStatsJobManager_GetStatsTaskState_Call{Call: _e.mock.On("GetStatsTaskState", originSegmentID, subJobType)} +func (_e *MockStatsJobManager_Expecter) GetStatsTask(originSegmentID interface{}, subJobType interface{}) *MockStatsJobManager_GetStatsTask_Call { + return &MockStatsJobManager_GetStatsTask_Call{Call: _e.mock.On("GetStatsTask", originSegmentID, subJobType)} } -func (_c *MockStatsJobManager_GetStatsTaskState_Call) Run(run func(originSegmentID int64, subJobType indexpb.StatsSubJob)) *MockStatsJobManager_GetStatsTaskState_Call { +func (_c *MockStatsJobManager_GetStatsTask_Call) Run(run func(originSegmentID int64, subJobType indexpb.StatsSubJob)) *MockStatsJobManager_GetStatsTask_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(int64), args[1].(indexpb.StatsSubJob)) }) return _c } -func (_c *MockStatsJobManager_GetStatsTaskState_Call) Return(_a0 indexpb.JobState) *MockStatsJobManager_GetStatsTaskState_Call { +func (_c *MockStatsJobManager_GetStatsTask_Call) Return(_a0 *indexpb.StatsTask) *MockStatsJobManager_GetStatsTask_Call { _c.Call.Return(_a0) return _c } -func (_c *MockStatsJobManager_GetStatsTaskState_Call) RunAndReturn(run func(int64, indexpb.StatsSubJob) indexpb.JobState) *MockStatsJobManager_GetStatsTaskState_Call { +func (_c *MockStatsJobManager_GetStatsTask_Call) RunAndReturn(run func(int64, indexpb.StatsSubJob) *indexpb.StatsTask) *MockStatsJobManager_GetStatsTask_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index f3c2f6db6d36f..22d6bec734d73 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -213,17 +213,17 @@ func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context. return _c } -// ExpireAllocations provides a mock function with given fields: channel, ts -func (_m *MockManager) ExpireAllocations(channel string, ts uint64) error { - ret := _m.Called(channel, ts) +// ExpireAllocations provides a mock function with given fields: ctx, channel, ts +func (_m *MockManager) ExpireAllocations(ctx context.Context, channel string, ts uint64) error { + ret := _m.Called(ctx, channel, ts) if len(ret) == 0 { panic("no return value specified for ExpireAllocations") } var r0 error - if rf, ok := ret.Get(0).(func(string, uint64) error); ok { - r0 = rf(channel, ts) + if rf, ok := ret.Get(0).(func(context.Context, string, uint64) error); ok { + r0 = rf(ctx, channel, ts) } else { r0 = ret.Error(0) } @@ -237,15 +237,16 @@ type MockManager_ExpireAllocations_Call struct { } // ExpireAllocations is a helper method to define mock.On call +// - ctx context.Context // - channel string // - ts uint64 -func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call { - return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)} +func (_e *MockManager_Expecter) ExpireAllocations(ctx interface{}, channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call { + return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", ctx, channel, ts)} } -func (_c *MockManager_ExpireAllocations_Call) Run(run func(channel string, ts uint64)) *MockManager_ExpireAllocations_Call { +func (_c *MockManager_ExpireAllocations_Call) Run(run func(ctx context.Context, channel string, ts uint64)) *MockManager_ExpireAllocations_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(uint64)) + run(args[0].(context.Context), args[1].(string), args[2].(uint64)) }) return _c } @@ -255,7 +256,7 @@ func (_c *MockManager_ExpireAllocations_Call) Return(_a0 error) *MockManager_Exp return _c } -func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint64) error) *MockManager_ExpireAllocations_Call { +func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(context.Context, string, uint64) error) *MockManager_ExpireAllocations_Call { _c.Call.Return(run) return _c } diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index ee5b35ee741a7..4b2fd36fdaf5e 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -125,13 +125,13 @@ func (st *statsTask) PreExecute(ctx context.Context) error { zap.Int64("segmentID", st.req.GetSegmentID()), ) - if err := binlog.DecompressBinLog(storage.InsertBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(), + if err := binlog.DecompressBinLogWithRootPath(st.req.GetStorageConfig().GetRootPath(), storage.InsertBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(), st.req.GetSegmentID(), st.req.GetInsertLogs()); err != nil { log.Warn("Decompress insert binlog error", zap.Error(err)) return err } - if err := binlog.DecompressBinLog(storage.DeleteBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(), + if err := binlog.DecompressBinLogWithRootPath(st.req.GetStorageConfig().GetRootPath(), storage.DeleteBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(), st.req.GetSegmentID(), st.req.GetDeltaLogs()); err != nil { log.Warn("Decompress delta binlog error", zap.Error(err)) return err diff --git a/internal/metastore/kv/binlog/binlog.go b/internal/metastore/kv/binlog/binlog.go index e599dc40d5bcc..126d0a243c66f 100644 --- a/internal/metastore/kv/binlog/binlog.go +++ b/internal/metastore/kv/binlog/binlog.go @@ -167,12 +167,34 @@ func DecompressBinLog(binlogType storage.BinlogType, collectionID, partitionID, return nil } +func DecompressBinLogWithRootPath(rootPath string, binlogType storage.BinlogType, collectionID, partitionID, + segmentID typeutil.UniqueID, fieldBinlogs []*datapb.FieldBinlog, +) error { + for _, fieldBinlog := range fieldBinlogs { + for _, binlog := range fieldBinlog.Binlogs { + if binlog.GetLogPath() == "" { + path, err := BuildLogPathWithRootPath(rootPath, binlogType, collectionID, partitionID, + segmentID, fieldBinlog.GetFieldID(), binlog.GetLogID()) + if err != nil { + return err + } + binlog.LogPath = path + } + } + } + return nil +} + // build a binlog path on the storage by metadata func BuildLogPath(binlogType storage.BinlogType, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) (string, error) { chunkManagerRootPath := paramtable.Get().MinioCfg.RootPath.GetValue() if paramtable.Get().CommonCfg.StorageType.GetValue() == "local" { chunkManagerRootPath = paramtable.Get().LocalStorageCfg.Path.GetValue() } + return BuildLogPathWithRootPath(chunkManagerRootPath, binlogType, collectionID, partitionID, segmentID, fieldID, logID) +} + +func BuildLogPathWithRootPath(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) (string, error) { switch binlogType { case storage.InsertBinlog: return metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, fieldID, logID), nil