Skip to content

Commit

Permalink
enhance: add collection id as a parameter for list segment and channe…
Browse files Browse the repository at this point in the history
…l request

enhance: add db name in replica description

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Jan 2, 2025
1 parent aa0a87e commit 590d8b4
Show file tree
Hide file tree
Showing 40 changed files with 314 additions and 163 deletions.
9 changes: 9 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,15 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap
}

func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool {
log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
Expand All @@ -141,15 +138,13 @@ func (t *l0CompactionTask) processExecuting() bool {
return false
}

updateOps = append(updateOps, setState(datapb.CompactionTaskState_meta_saved))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
updateOps = append(updateOps, setState(datapb.CompactionTaskState_failed))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
Expand All @@ -159,9 +154,7 @@ func (t *l0CompactionTask) processExecuting() bool {
}

func (t *l0CompactionTask) processMetaSaved() bool {
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
err := t.updateAndSaveTaskMeta(updateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return false
Expand Down Expand Up @@ -358,6 +351,15 @@ func (t *l0CompactionTask) SaveTaskMeta() error {
}

func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: typeutil.TimestampToString(uint64(task.StartTime)),
EndTime: typeutil.TimestampToString(uint64(task.EndTime)),
StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000),
EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000),
TotalRows: task.TotalRows,
InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string {
return strconv.FormatInt(t, 10)
Expand Down
26 changes: 14 additions & 12 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ func (t *mixCompactionTask) processPipelining() bool {

func (t *mixCompactionTask) processMetaSaved() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
}
Expand All @@ -119,15 +117,12 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
failedUpdateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_failed)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -137,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool {
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -154,7 +149,7 @@ func (t *mixCompactionTask) processExecuting() bool {
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
log.Info("mixCompactionTask fail in datanode")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
Expand Down Expand Up @@ -240,10 +235,8 @@ func (t *mixCompactionTask) processCompleted() bool {

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("mixCompactionTask processCompleted done")

task := t.GetTaskProto()
log.Info("mixCompactionTask processCompleted done",
zap.Int64("planID", task.GetPlanID()), zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
return true
}

Expand Down Expand Up @@ -289,6 +282,15 @@ func (t *mixCompactionTask) doClean() error {
}

func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
FailReason: s.FailReason,
IndexSize: s.IndexSize,
IndexVersion: s.IndexVersion,
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime),
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000),
}
}

Expand Down
16 changes: 5 additions & 11 deletions internal/datacoord/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,17 @@ func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[s
func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
v := jsonReq.Get(metricsinfo.MetricRequestParamINKey)
if !v.Exists() {
// default to get all segments from dataanode
// default to get all segments from datanode
return s.getDataNodeSegmentsJSON(ctx, req)
}

in := v.String()
if in == "dn" {
// TODO: support filter by collection id
if in == metricsinfo.MetricsRequestParamsInDN {
return s.getDataNodeSegmentsJSON(ctx, req)
}

if in == "dc" {
v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
collectionID := int64(0)
if v.Exists() {
collectionID = v.Int()
}

if in == metricsinfo.MetricsRequestParamsInDC {
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
segments := s.meta.getSegmentsMetrics(collectionID)
for _, seg := range segments {
isIndexed, indexedFields := s.meta.indexMeta.GetSegmentIndexedFields(seg.CollectionID, seg.SegmentID)
Expand All @@ -163,7 +157,7 @@ func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRe

bs, err := json.Marshal(segments)
if err != nil {
log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error()))
log.Ctx(ctx).Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error()))
return "", nil
}
return string(bs), nil
Expand Down
6 changes: 1 addition & 5 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,11 +1203,7 @@ func (s *Server) registerMetricsRequest() {

s.metricsRequest.RegisterMetricsRequest(metricsinfo.IndexKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
v := jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
collectionID := int64(0)
if v.Exists() {
collectionID = v.Int()
}
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return s.meta.indexMeta.GetIndexJSON(collectionID), nil
})
log.Ctx(s.ctx).Info("register metrics actions finished")
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,14 @@ func (node *DataNode) registerMetricsRequest() {

node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetSegmentsJSON(), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return node.flowgraphManager.GetSegmentsJSON(collectionID), nil
})

node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetChannelsJSON(), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return node.flowgraphManager.GetChannelsJSON(collectionID), nil
})
log.Ctx(node.ctx).Info("register metrics actions finished")
}
Expand Down
15 changes: 11 additions & 4 deletions internal/flushcommon/pipeline/flow_graph_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type FlowgraphManager interface {
GetFlowgraphCount() int
GetCollectionIDs() []int64

GetChannelsJSON() string
GetSegmentsJSON() string
GetChannelsJSON(collectionID int64) string
GetSegmentsJSON(collectionID int64) string
Close()
}

Expand Down Expand Up @@ -121,9 +121,12 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
}

// GetChannelsJSON returns all channels in json format.
func (fm *fgManagerImpl) GetChannelsJSON() string {
func (fm *fgManagerImpl) GetChannelsJSON(collectionID int64) string {
var channels []*metricsinfo.Channel
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
if collectionID > 0 && ds.metacache.Collection() != collectionID {
return true
}
latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch)
channels = append(channels, &metricsinfo.Channel{
Name: ch,
Expand All @@ -143,9 +146,13 @@ func (fm *fgManagerImpl) GetChannelsJSON() string {
return string(ret)
}

func (fm *fgManagerImpl) GetSegmentsJSON() string {
func (fm *fgManagerImpl) GetSegmentsJSON(collectionID int64) string {
var segments []*metricsinfo.Segment
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
if collectionID > 0 && ds.metacache.Collection() != collectionID {
return true
}

meta := ds.metacache
for _, segment := range meta.GetSegmentsBy() {
segments = append(segments, &metricsinfo.Segment{
Expand Down
17 changes: 14 additions & 3 deletions internal/flushcommon/pipeline/flow_graph_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,14 @@ func TestGetChannelsJSON(t *testing.T) {
assert.NoError(t, err)
expectedJSON := string(expectedBytes)

jsonResult := fm.GetChannelsJSON()
jsonResult := fm.GetChannelsJSON(0)
assert.JSONEq(t, expectedJSON, jsonResult)

jsonResult = fm.GetChannelsJSON(10)
var ret []*metricsinfo.Channel
err = json.Unmarshal([]byte(jsonResult), &ret)
assert.NoError(t, err)
assert.Equal(t, 0, len(ret))
}

func TestGetSegmentJSON(t *testing.T) {
Expand Down Expand Up @@ -228,7 +234,12 @@ func TestGetSegmentJSON(t *testing.T) {
expectedJSON := string(expectedBytes)

ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory)
jsonResult := fm.GetSegmentsJSON()
fmt.Println(jsonResult)
jsonResult := fm.GetSegmentsJSON(0)
assert.JSONEq(t, expectedJSON, jsonResult)

jsonResult = fm.GetSegmentsJSON(10)
var ret []*metricsinfo.Segment
err = json.Unmarshal([]byte(jsonResult), &ret)
assert.NoError(t, err)
assert.Equal(t, 0, len(ret))
}
Loading

0 comments on commit 590d8b4

Please sign in to comment.