Skip to content

Commit

Permalink
enhance: refine the datacoord meta related interfaces (#37957)
Browse files Browse the repository at this point in the history
issue: #35917 
This PR refines the meta-related APIs in datacoord to allow the ctx to
be passed down to the catalog operation interfaces

Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy authored Nov 26, 2024
1 parent 2208b7c commit 1dbb6cd
Show file tree
Hide file tree
Showing 55 changed files with 1,342 additions and 1,102 deletions.
4 changes: 2 additions & 2 deletions internal/datacoord/analyze_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func (m *analyzeMeta) AddAnalyzeTask(task *indexpb.AnalyzeTask) error {
return m.saveTask(task)
}

func (m *analyzeMeta) DropAnalyzeTask(taskID int64) error {
func (m *analyzeMeta) DropAnalyzeTask(ctx context.Context, taskID int64) error {
m.Lock()
defer m.Unlock()

log.Info("drop analyze task", zap.Int64("taskID", taskID))
if err := m.catalog.DropAnalyzeTask(m.ctx, taskID); err != nil {
if err := m.catalog.DropAnalyzeTask(ctx, taskID); err != nil {
log.Warn("drop analyze task by catalog failed", zap.Int64("taskID", taskID),
zap.Error(err))
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/analyze_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *AnalyzeMetaSuite) Test_AnalyzeMeta() {
})

s.Run("DropAnalyzeTask", func() {
err := am.DropAnalyzeTask(7)
err := am.DropAnalyzeTask(ctx, 7)
s.NoError(err)
s.Equal(6, len(am.GetAllTasks()))
})
Expand Down Expand Up @@ -212,7 +212,7 @@ func (s *AnalyzeMetaSuite) Test_failCase() {
})

s.Run("DropAnalyzeTask", func() {
err := am.DropAnalyzeTask(1)
err := am.DropAnalyzeTask(ctx, 1)
s.Error(err)
s.NotNil(am.GetTask(1))
})
Expand Down
22 changes: 11 additions & 11 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type compactionPlanContext interface {
isFull() bool
// get compaction tasks by signal id
getCompactionTasksNumBySignalID(signalID int64) int
getCompactionInfo(signalID int64) *compactionInfo
getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo
removeTasksByChannel(channel string)
}

Expand Down Expand Up @@ -96,8 +96,8 @@ type compactionPlanHandler struct {
stopWg sync.WaitGroup
}

func (c *compactionPlanHandler) getCompactionInfo(triggerID int64) *compactionInfo {
tasks := c.meta.GetCompactionTasksByTriggerID(triggerID)
func (c *compactionPlanHandler) getCompactionInfo(ctx context.Context, triggerID int64) *compactionInfo {
tasks := c.meta.GetCompactionTasksByTriggerID(ctx, triggerID)
return summaryCompactionState(tasks)
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (c *compactionPlanHandler) start() {

func (c *compactionPlanHandler) loadMeta() {
// TODO: make it compatible to all types of compaction with persist meta
triggers := c.meta.GetCompactionTasks()
triggers := c.meta.GetCompactionTasks(context.TODO())
for _, tasks := range triggers {
for _, task := range tasks {
state := task.GetState()
Expand All @@ -346,7 +346,7 @@ func (c *compactionPlanHandler) loadMeta() {
zap.Error(err),
)
// ignore the drop error
c.meta.DropCompactionTask(task)
c.meta.DropCompactionTask(context.TODO(), task)
continue
}
if t.NeedReAssignNodeID() {
Expand Down Expand Up @@ -434,14 +434,14 @@ func (c *compactionPlanHandler) Clean() {

func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
// gc clustering compaction tasks
triggers := c.meta.GetCompactionTasks()
triggers := c.meta.GetCompactionTasks(context.TODO())
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
err := c.meta.DropCompactionTask(task)
err := c.meta.DropCompactionTask(context.TODO(), task)
log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID))
if err != nil {
log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err))
Expand Down Expand Up @@ -478,7 +478,7 @@ func (c *compactionPlanHandler) cleanPartitionStats() error {
for _, info := range unusedPartStats {
log.Debug("collection has been dropped, remove partition stats",
zap.Int64("collID", info.GetCollectionID()))
if err := c.meta.CleanPartitionStatsInfo(info); err != nil {
if err := c.meta.CleanPartitionStatsInfo(context.TODO(), info); err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return err
}
Expand All @@ -492,7 +492,7 @@ func (c *compactionPlanHandler) cleanPartitionStats() error {
if len(infos) > 2 {
for i := 2; i < len(infos); i++ {
info := infos[i]
if err := c.meta.CleanPartitionStatsInfo(info); err != nil {
if err := c.meta.CleanPartitionStatsInfo(context.TODO(), info); err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return err
}
Expand Down Expand Up @@ -592,7 +592,7 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
c.meta.SetSegmentsCompacting(context.TODO(), t.GetTaskProto().GetInputSegments(), false)
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
return err
}
Expand All @@ -614,7 +614,7 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
default:
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(t.GetInputSegments())
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(context.TODO(), t.GetInputSegments())
if !exist {
return nil, merr.WrapErrIllegalCompactionPlan("segment not exist")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (policy *clusteringCompactionPolicy) checkAllL2SegmentsContains(ctx context
segment.GetLevel() == datapb.SegmentLevel_L2 &&
segment.isCompacting
}
segments := policy.meta.SelectSegments(SegmentFilterFunc(getCompactingL2Segment))
segments := policy.meta.SelectSegments(ctx, SegmentFilterFunc(getCompactingL2Segment))
if len(segments) > 0 {
log.Ctx(ctx).Info("there are some segments are compacting",
zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
Expand Down
9 changes: 6 additions & 3 deletions internal/datacoord/compaction_policy_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,14 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionAbnormal() {
}

func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNoClusteringKeySchema() {
ctx := context.Background()
coll := &collectionInfo{
ID: 100,
Schema: newTestSchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)

s.meta.compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
s.meta.compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
Expand All @@ -230,13 +231,14 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNoClusteringKe
}

func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
ctx := context.Background()
coll := &collectionInfo{
ID: 100,
Schema: newTestScalarClusteringKeySchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)

s.meta.compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
s.meta.compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
Expand All @@ -250,6 +252,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
}

func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting() {
ctx := context.Background()
s.Run("no collection is compacting", func() {
compacting, triggerID := s.clusteringCompactionPolicy.collectionIsClusteringCompacting(collID)
s.False(compacting)
Expand Down Expand Up @@ -280,7 +283,7 @@ func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting()
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: collID,
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/compaction_policy_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *SingleCompactionPolicySuite) TestIsDeleteRowsTooManySegment() {
}

func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() {
ctx := context.Background()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.IndexBasedCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.IndexBasedCompaction.Key)

Expand All @@ -134,7 +135,7 @@ func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() {
compactionTaskMeta: compactionTaskMeta,
segments: segmentsInfo,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: collID,
Expand Down
20 changes: 10 additions & 10 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(segID)
segInfo := t.meta.GetHealthySegment(context.TODO(), segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return segment.GetSegmentID()
})

_, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetTaskProto(), t.result)
_, metricMutation, err := t.meta.CompleteCompactionMutation(context.TODO(), t.GetTaskProto(), t.result)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}

err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("markResultSegmentVisible UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markResultSegmentVisible UpdateSegmentsInfo", err)
Expand All @@ -449,7 +449,7 @@ func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("markInputSegmentsDropped UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markInputSegmentsDropped UpdateSegmentsInfo", err)
Expand Down Expand Up @@ -519,7 +519,7 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
}

func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
t.meta.SetSegmentsCompacting(context.TODO(), t.GetTaskProto().GetInputSegments(), false)
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Expand All @@ -532,7 +532,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}
isInputDropped := false
for _, segID := range t.GetTaskProto().GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
if t.meta.GetHealthySegment(context.TODO(), segID) == nil {
isInputDropped = true
break
}
Expand All @@ -559,7 +559,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
Expand All @@ -576,7 +576,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// tmpSegment is always invisible
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
Expand All @@ -593,7 +593,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Version: t.GetTaskProto().GetPlanID(),
SegmentIDs: t.GetTaskProto().GetResultSegments(),
}
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
err := t.meta.CleanPartitionStatsInfo(context.TODO(), partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
}
Expand Down Expand Up @@ -703,7 +703,7 @@ func (t *clusteringCompactionTask) checkTimeout() bool {
}

func (t *clusteringCompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
return t.meta.SaveCompactionTask(task)
return t.meta.SaveCompactionTask(context.TODO(), task)
}

func (t *clusteringCompactionTask) SaveTaskMeta() error {
Expand Down
28 changes: 14 additions & 14 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang

task.processPipelining()

seg11 := s.meta.GetSegment(101)
seg11 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
seg21 := s.meta.GetSegment(102)
seg21 := s.meta.GetSegment(context.TODO(), 102)
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)

Expand Down Expand Up @@ -165,21 +165,21 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang

task.processFailedOrTimeout()

seg12 := s.meta.GetSegment(101)
seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
s.Equal(commonpb.SegmentState_Dropped, seg12.State)

seg22 := s.meta.GetSegment(102)
seg22 := s.meta.GetSegment(context.TODO(), 102)
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
s.Equal(int64(10000), seg22.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg22.State)

seg32 := s.meta.GetSegment(103)
seg32 := s.meta.GetSegment(context.TODO(), 103)
s.Equal(datapb.SegmentLevel_L1, seg32.Level)
s.Equal(int64(0), seg32.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Flushed, seg32.State)

seg42 := s.meta.GetSegment(104)
seg42 := s.meta.GetSegment(context.TODO(), 104)
s.Equal(datapb.SegmentLevel_L1, seg42.Level)
s.Equal(int64(0), seg42.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Flushed, seg42.State)
Expand Down Expand Up @@ -254,29 +254,29 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang

task.processFailedOrTimeout()

seg12 := s.meta.GetSegment(101)
seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
seg22 := s.meta.GetSegment(102)
seg22 := s.meta.GetSegment(context.TODO(), 102)
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
s.Equal(int64(10000), seg22.PartitionStatsVersion)

seg32 := s.meta.GetSegment(103)
seg32 := s.meta.GetSegment(context.TODO(), 103)
s.Equal(datapb.SegmentLevel_L2, seg32.Level)
s.Equal(commonpb.SegmentState_Dropped, seg32.State)
s.True(seg32.IsInvisible)

seg42 := s.meta.GetSegment(104)
seg42 := s.meta.GetSegment(context.TODO(), 104)
s.Equal(datapb.SegmentLevel_L2, seg42.Level)
s.Equal(commonpb.SegmentState_Dropped, seg42.State)
s.True(seg42.IsInvisible)

seg52 := s.meta.GetSegment(105)
seg52 := s.meta.GetSegment(context.TODO(), 105)
s.Equal(datapb.SegmentLevel_L2, seg52.Level)
s.Equal(int64(10001), seg52.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg52.State)
s.True(seg52.IsInvisible)

seg62 := s.meta.GetSegment(106)
seg62 := s.meta.GetSegment(context.TODO(), 106)
s.Equal(datapb.SegmentLevel_L2, seg62.Level)
s.Equal(int64(10001), seg62.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg62.State)
Expand Down Expand Up @@ -636,7 +636,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
}

task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11}))
err := s.meta.indexMeta.CreateIndex(index)
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
s.NoError(err)

s.False(task.Process())
Expand All @@ -650,7 +650,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
CollectionID: 1,
IndexID: 3,
}
err := s.meta.indexMeta.CreateIndex(index)
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
s.NoError(err)

s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
Expand Down
Loading

0 comments on commit 1dbb6cd

Please sign in to comment.