diff --git a/cmd/collectors/restperf/restperf.go b/cmd/collectors/restperf/restperf.go index 99b280f45..84e482ff9 100644 --- a/cmd/collectors/restperf/restperf.go +++ b/cmd/collectors/restperf/restperf.go @@ -24,6 +24,7 @@ import ( "github.com/netapp/harvest/v2/pkg/tree/node" "github.com/netapp/harvest/v2/pkg/util" "github.com/tidwall/gjson" + "iter" "log/slog" "maps" "path" @@ -65,9 +66,10 @@ var qosDetailQueries = map[string]string{ } type RestPerf struct { - *rest2.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType - perfProp *perfProp - archivedMetrics map[string]*rest2.Metric // Keeps metric definitions that are not found in the counter schema. These metrics may be available in future ONTAP versions. + *rest2.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType + perfProp *perfProp + archivedMetrics map[string]*rest2.Metric // Keeps metric definitions that are not found in the counter schema. These metrics may be available in future ONTAP versions. + hasInstanceSchedule bool } type counter struct { @@ -142,6 +144,8 @@ func (r *RestPerf) Init(a *collector.AbstractCollector) error { return err } + r.InitSchedule() + r.Logger.Debug( "initialized cache", slog.Int("numMetrics", len(r.Prop.Metrics)), @@ -687,11 +691,6 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { startTime time.Time ) - mat := r.Matrix[r.Object] - if len(mat.GetInstances()) == 0 { - return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances fetched in PollInstance") - } - timestamp := r.Matrix[r.Object].GetMetric(timestampMetricName) if timestamp == nil { return nil, errs.New(errs.ErrConfig, "missing timestamp metric") @@ -703,7 +702,7 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { dataQuery := path.Join(r.Prop.Query, "rows") var filter []string - // Sort filters so that the href is deterministic + // Sort metrics so that the href is deterministic metrics := slices.Sorted(maps.Keys(r.Prop.Metrics)) filter = append(filter, "counters.name="+strings.Join(metrics, "|")) @@ -794,6 +793,14 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") } + // Call pollInstance to handle instance creation/deletion for objects without an instance schedule + if !r.hasInstanceSchedule { + _, err = r.pollInstance(curMat, perfToJSON(perfRecords), apiD) + if err != nil { + return nil, err + } + } + for _, perfRecord := range perfRecords { pr := perfRecord.Records t := perfRecord.Timestamp @@ -870,10 +877,14 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) instance = curMat.GetInstance(instanceKey) if instance == nil { - if !isWorkloadObject(r.Prop.Query) && !isWorkloadDetailObject(r.Prop.Query) { - r.Logger.Warn("Skip instanceKey, not found in cache", slog.String("instanceKey", instanceKey)) + if isWorkloadObject(r.Prop.Query) || isWorkloadDetailObject(r.Prop.Query) { + return true + } + instance, err = curMat.NewInstance(instanceKey) + if err != nil { + r.Logger.Error("add instance", slogx.Err(err), slog.String("instanceKey", instanceKey)) + return true } - return true } // check for partial aggregation @@ -1252,7 +1263,7 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) } continue } - // If we reach here then one of the earlier clauses should have executed `continue` statement + // If we reach here, then one of the earlier clauses should have executed `continue` statement r.Logger.Error( "Unknown property", slog.String("key", key), @@ -1300,6 +1311,18 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) return newDataMap, nil } +func perfToJSON(records []rest.PerfRecord) iter.Seq[gjson.Result] { + return func(yield func(gjson.Result) bool) { + for _, record := range records { + if record.Records.IsArray() { + record.Records.ForEach(func(_, r gjson.Result) bool { + return yield(r) + }) + } + } + } +} + // Poll counter "ops" of the related/parent object, required for objects // workload_detail and workload_detail_volume. This counter is already // collected by the other collectors, so this poll is redundant @@ -1472,17 +1495,17 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) { return r.handleError(err, href) } - return r.pollInstance(records, time.Since(apiT)) + return r.pollInstance(r.Matrix[r.Object], slices.Values(records), time.Since(apiT)) } -func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map[string]*matrix.Matrix, error) { +func (r *RestPerf) pollInstance(mat *matrix.Matrix, records iter.Seq[gjson.Result], apiD time.Duration) (map[string]*matrix.Matrix, error) { var ( err error oldInstances *set.Set oldSize, newSize, removed, added int + count int ) - mat := r.Matrix[r.Object] oldInstances = set.New() parseT := time.Now() for key := range mat.GetInstances() { @@ -1498,15 +1521,13 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map instanceKeys = []string{"name"} } - if len(records) == 0 { - return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") - } - - for _, instanceData := range records { + for instanceData := range records { var ( instanceKey string ) + count++ + if !instanceData.IsObject() { r.Logger.Warn("Instance data is not object, skipping", slog.String("type", instanceData.Type.String())) continue @@ -1551,6 +1572,10 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map } } + if count == 0 { + return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") + } + for key := range oldInstances.Iter() { mat.RemoveInstance(key) r.Logger.Debug("removed instance", slog.String("key", key)) @@ -1570,7 +1595,7 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map _ = r.Metadata.LazySetValueUint64("numCalls", "instance", r.Client.Metadata.NumCalls) if newSize == 0 { - return nil, errs.New(errs.ErrNoInstance, "") + return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") } return nil, err @@ -1604,6 +1629,19 @@ func (r *RestPerf) handleError(err error, href string) (map[string]*matrix.Matri return nil, fmt.Errorf("failed to fetch data. href=[%s] err: %w", href, err) } +func (r *RestPerf) InitSchedule() { + if r.Schedule == nil { + return + } + tasks := r.Schedule.GetTasks() + for _, task := range tasks { + if task.Name == "instance" { + r.hasInstanceSchedule = true + return + } + } +} + func isWorkloadObject(query string) bool { _, ok := qosQueries[query] return ok diff --git a/cmd/collectors/restperf/restperf_test.go b/cmd/collectors/restperf/restperf_test.go index 9e1aab6bf..172db83bd 100644 --- a/cmd/collectors/restperf/restperf_test.go +++ b/cmd/collectors/restperf/restperf_test.go @@ -84,7 +84,8 @@ func TestMain(m *testing.M) { propertiesData = jsonToPerfRecords("testdata/volume-poll-properties.json.gz") fullPollData = jsonToPerfRecords("testdata/volume-poll-full.json.gz") fullPollData[0].Timestamp = now.UnixNano() - _, _ = benchPerf.pollInstance(propertiesData[0].Records.Array(), 0) + mat := matrix.New("Volume", "Volume", "Volume") + _, _ = benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0) _, _ = benchPerf.pollData(now, fullPollData) os.Exit(m.Run()) @@ -98,7 +99,8 @@ func BenchmarkRestPerf_PollData(b *testing.B) { for range b.N { now = now.Add(time.Minute * 15) fullPollData[0].Timestamp = now.UnixNano() - mi, _ := benchPerf.pollInstance(propertiesData[0].Records.Array(), 0) + mat := matrix.New("Volume", "Volume", "Volume") + mi, _ := benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0) for _, mm := range mi { ms = append(ms, mm) } @@ -166,7 +168,7 @@ func TestRestPerf_pollData(t *testing.T) { } pollInstance := jsonToPerfRecords(tt.pollInstance) pollData := jsonToPerfRecords(tt.pollDataPath1) - _, err = r.pollInstance(pollInstance[0].Records.Array(), 0) + _, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInstance), 0) if err != nil { t.Fatal(err) } @@ -295,7 +297,7 @@ func TestPartialAggregationSequence(t *testing.T) { t.Fatalf("Failed to fetch poll counter %v", err) } pollInstance := jsonToPerfRecords("testdata/partialAggregation/qos-poll-instance.json") - _, err = r.pollInstance(pollInstance[0].Records.Array(), 0) + _, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInstance), 0) if err != nil { t.Fatal(err) } @@ -424,7 +426,7 @@ func TestQosVolume(t *testing.T) { } pollInst := jsonToPerfRecords(tt.pollInstance) - _, err = r.pollInstance(pollInst[0].Records.Array(), 0) + _, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInst), 0) if err != nil { t.Fatal(err) } diff --git a/conf/restperf/9.12.0/workload.yaml b/conf/restperf/9.12.0/workload.yaml index 9237b6554..823a7a57e 100644 --- a/conf/restperf/9.12.0/workload.yaml +++ b/conf/restperf/9.12.0/workload.yaml @@ -8,6 +8,7 @@ object: qos client_timeout: 1m30s schedule: + - instance: 10m - data: 3m counters: diff --git a/conf/restperf/9.12.0/workload_detail.yaml b/conf/restperf/9.12.0/workload_detail.yaml index 9e8deefa2..cb04097d0 100644 --- a/conf/restperf/9.12.0/workload_detail.yaml +++ b/conf/restperf/9.12.0/workload_detail.yaml @@ -9,6 +9,7 @@ object: qos_detail client_timeout: 1m30s schedule: + - instance: 10m - data: 3m counters: diff --git a/conf/restperf/9.12.0/workload_detail_volume.yaml b/conf/restperf/9.12.0/workload_detail_volume.yaml index 252188795..c85f24ff1 100644 --- a/conf/restperf/9.12.0/workload_detail_volume.yaml +++ b/conf/restperf/9.12.0/workload_detail_volume.yaml @@ -8,6 +8,7 @@ object: qos_detail client_timeout: 1m30s schedule: + - instance: 10m - data: 3m counters: diff --git a/conf/restperf/9.12.0/workload_volume.yaml b/conf/restperf/9.12.0/workload_volume.yaml index 3a7281dd4..3f9d4497d 100644 --- a/conf/restperf/9.12.0/workload_volume.yaml +++ b/conf/restperf/9.12.0/workload_volume.yaml @@ -10,6 +10,7 @@ object: qos client_timeout: 1m30s schedule: + - instance: 10m - data: 3m counters: diff --git a/conf/restperf/default.yaml b/conf/restperf/default.yaml index bde185711..d8ce8f530 100644 --- a/conf/restperf/default.yaml +++ b/conf/restperf/default.yaml @@ -4,7 +4,6 @@ collector: RestPerf # Order here matters! schedule: - counter: 24h - - instance: 10m - data: 1m objects: diff --git a/integration/test/grafana/grafana_mgr.go b/integration/test/grafana/grafana_mgr.go index 6a894446c..499998101 100644 --- a/integration/test/grafana/grafana_mgr.go +++ b/integration/test/grafana/grafana_mgr.go @@ -36,7 +36,7 @@ func (g *Mgr) Import() (bool, string) { if docker.IsDockerBasedPoller() { grafanaURL = "grafana:3000" } - importCmds := []string{"grafana", "import", "--overwrite", "--addr", grafanaURL} + importCmds := []string{"grafana", "import", "--addr", grafanaURL} if docker.IsDockerBasedPoller() { params := []string{"exec", containerIDs[0].ID, "bin/harvest"} params = append(params, importCmds...)