diff --git a/cmd/collectors/restperf/restperf.go b/cmd/collectors/restperf/restperf.go index 48a24b9c6..f9791cd3a 100644 --- a/cmd/collectors/restperf/restperf.go +++ b/cmd/collectors/restperf/restperf.go @@ -691,9 +691,12 @@ func (r *RestPerf) processWorkLoadCounter() (map[string]*matrix.Matrix, error) { func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { var ( - err error - perfRecords []rest.PerfRecord - startTime time.Time + apiD, parseD time.Duration + metricCount uint64 + numPartials uint64 + startTime time.Time + prevMat *matrix.Matrix + curMat *matrix.Matrix ) timestamp := r.Matrix[r.Object].GetMetric(timestampMetricName) @@ -743,88 +746,80 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { } } - err = rest.FetchRestPerfData(r.Client, href, &perfRecords, headers) + // Track old instances before processing batches + oldInstances := set.New() + // The creation and deletion of objects with an instance schedule are managed through pollInstance. + if !r.hasInstanceSchedule { + for key := range r.Matrix[r.Object].GetInstances() { + oldInstances.Add(key) + } + } - if err != nil { - return nil, fmt.Errorf("failed to fetch href=%s %w", href, err) + prevMat = r.Matrix[r.Object] + // clone matrix without numeric data + curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true}) + curMat.Reset() + + processBatch := func(perfRecords []rest.PerfRecord) error { + if len(perfRecords) == 0 { + return nil + } + + // Process the current batch of records + count, np, batchParseD := r.processPerfRecords(perfRecords, curMat, prevMat, oldInstances) + numPartials += np + metricCount += count + parseD += batchParseD + return nil } - return r.pollData(startTime, perfRecords) -} + err = rest.FetchRestPerfDataStream(r.Client, href, processBatch, headers) + apiD += time.Since(startTime) -// getMetric retrieves the metric associated with the given key from the current matrix (curMat). -// If the metric does not exist in curMat, it is created with the provided display settings. -// The function also ensures that the same metric exists in the previous matrix (prevMat) to -// allow for subsequent calculations (e.g., prevMetric - curMetric). -// This is particularly important in cases such as ONTAP upgrades, where curMat may contain -// additional metrics that are not present in prevMat. If prevMat does not have the metric, -// it is created to prevent a panic when attempting to perform calculations with non-existent metrics. -// -// This metric creation process within RestPerf is necessary during PollData because the information about whether a metric -// is an array is not available in the RestPerf PollCounter. The determination of whether a metric is an array -// is made by examining the actual data in RestPerf. Therefore, metric creation in RestPerf is performed during -// the poll data phase, and special handling is required for such cases. -// -// The function returns the current metric and any error encountered during its retrieval or creation. -func (r *RestPerf) getMetric(curMat *matrix.Matrix, prevMat *matrix.Matrix, key string, display ...string) (*matrix.Metric, error) { - var err error - curMetric := curMat.GetMetric(key) - if curMetric == nil { - curMetric, err = curMat.NewMetricFloat64(key, display...) - if err != nil { - return nil, err + if err != nil { + return nil, err + } + + if !r.hasInstanceSchedule { + // Remove old instances that are not found in new instances + for key := range oldInstances.Iter() { + curMat.RemoveInstance(key) } } - prevMetric := prevMat.GetMetric(key) - if prevMetric == nil { - _, err = prevMat.NewMetricFloat64(key, display...) - if err != nil { + if isWorkloadDetailObject(r.Prop.Query) { + if err := r.getParentOpsCounters(curMat); err != nil { + // no point to continue as we can't calculate the other counters return nil, err } } - return curMetric, nil + + _ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds()) + _ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds()) + _ = r.Metadata.LazySetValueUint64("metrics", "data", metricCount) + _ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances()))) + _ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx) + _ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls) + _ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials) + r.AddCollectCount(metricCount) + + return r.cookCounters(curMat, prevMat) } -func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) (map[string]*matrix.Matrix, error) { +func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (uint64, uint64, time.Duration) { var ( count uint64 - apiD, parseD time.Duration - err error + parseD time.Duration instanceKeys []string - skips int numPartials uint64 - instIndex int ts float64 - prevMat *matrix.Matrix - curMat *matrix.Matrix + err error ) - - prevMat = r.Matrix[r.Object] - // clone matrix without numeric data - curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true}) - curMat.Reset() instanceKeys = r.Prop.InstanceKeys + startTime := time.Now() - apiD = time.Since(startTime) // init current time ts = float64(startTime.UnixNano()) / BILLION - - startTime = time.Now() - instIndex = -1 - - if len(perfRecords) == 0 { - return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") - } - - // Call pollInstance to handle instance creation/deletion for objects without an instance schedule - if !r.hasInstanceSchedule { - _, err = r.pollInstance(curMat, perfToJSON(perfRecords), apiD) - if err != nil { - return nil, err - } - } - for _, perfRecord := range perfRecords { pr := perfRecord.Records t := perfRecord.Timestamp @@ -842,7 +837,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) isHistogram bool histogramMetric *matrix.Metric ) - instIndex++ if !instanceData.IsObject() { r.Logger.Warn("Instance data is not object, skipping", slog.String("type", instanceData.Type.String())) @@ -911,6 +905,8 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) } } + oldInstances.Remove(instanceKey) + // check for partial aggregation if instanceData.Get("aggregation.complete").ClonedString() == "false" { instance.SetPartial(true) @@ -1072,7 +1068,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) slog.String("name", name), slog.String("label", label), slog.String("value", values[i]), - slog.Int("instIndex", instIndex), ) continue } @@ -1086,7 +1081,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) "NewMetricFloat64", slogx.Err(err), slog.String("name", name), - slog.Int("instIndex", instIndex), ) } } @@ -1098,7 +1092,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) slogx.Err(err), slog.String("key", metric.Name), slog.String("metric", metric.Label), - slog.Int("instIndex", instIndex), ) } } else { @@ -1107,7 +1100,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) slogx.Err(err), slog.String("key", metric.Name), slog.String("metric", metric.Label), - slog.Int("instIndex", instIndex), ) } count++ @@ -1123,24 +1115,49 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) return true }) } + parseD = time.Since(startTime) + return count, numPartials, parseD +} - if isWorkloadDetailObject(r.Prop.Query) { - if err := r.getParentOpsCounters(curMat); err != nil { - // no point to continue as we can't calculate the other counters +// getMetric retrieves the metric associated with the given key from the current matrix (curMat). +// If the metric does not exist in curMat, it is created with the provided display settings. +// The function also ensures that the same metric exists in the previous matrix (prevMat) to +// allow for subsequent calculations (e.g., prevMetric - curMetric). +// This is particularly important in cases such as ONTAP upgrades, where curMat may contain +// additional metrics that are not present in prevMat. If prevMat does not have the metric, +// it is created to prevent a panic when attempting to perform calculations with non-existent metrics. +// +// This metric creation process within RestPerf is necessary during PollData because the information about whether a metric +// is an array is not available in the RestPerf PollCounter. The determination of whether a metric is an array +// is made by examining the actual data in RestPerf. Therefore, metric creation in RestPerf is performed during +// the poll data phase, and special handling is required for such cases. +// +// The function returns the current metric and any error encountered during its retrieval or creation. +func (r *RestPerf) getMetric(curMat *matrix.Matrix, prevMat *matrix.Matrix, key string, display ...string) (*matrix.Metric, error) { + var err error + curMetric := curMat.GetMetric(key) + if curMetric == nil { + curMetric, err = curMat.NewMetricFloat64(key, display...) + if err != nil { return nil, err } } - parseD = time.Since(startTime) - _ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds()) - _ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds()) - _ = r.Metadata.LazySetValueUint64("metrics", "data", count) - _ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances()))) - _ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx) - _ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls) - _ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials) + prevMetric := prevMat.GetMetric(key) + if prevMetric == nil { + _, err = prevMat.NewMetricFloat64(key, display...) + if err != nil { + return nil, err + } + } + return curMetric, nil +} - r.AddCollectCount(count) +func (r *RestPerf) cookCounters(curMat *matrix.Matrix, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, error) { + var ( + err error + skips int + ) // skip calculating from delta if no data from previous poll if r.perfProp.isCacheEmpty { @@ -1253,7 +1270,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) slog.String("key", key), slog.String("property", property), slog.String("denominator", counter.denominator), - slog.Int("instIndex", instIndex), ) continue } @@ -1296,7 +1312,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) "Unknown property", slog.String("key", key), slog.String("property", property), - slog.Int("instIndex", instIndex), ) } @@ -1314,7 +1329,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) slog.Int("i", i), slog.String("metric", metric.GetName()), slog.String("key", key), - slog.Int("instIndex", instIndex), ) continue } diff --git a/cmd/collectors/restperf/restperf_test.go b/cmd/collectors/restperf/restperf_test.go index e0b47196e..95bdad2a9 100644 --- a/cmd/collectors/restperf/restperf_test.go +++ b/cmd/collectors/restperf/restperf_test.go @@ -10,6 +10,7 @@ import ( "github.com/netapp/harvest/v2/cmd/tools/rest" "github.com/netapp/harvest/v2/pkg/conf" "github.com/netapp/harvest/v2/pkg/matrix" + "github.com/netapp/harvest/v2/pkg/set" "github.com/netapp/harvest/v2/pkg/tree" "github.com/netapp/harvest/v2/pkg/tree/node" "github.com/netapp/harvest/v2/third_party/tidwall/gjson" @@ -84,9 +85,10 @@ func TestMain(m *testing.M) { propertiesData = jsonToPerfRecords("testdata/volume-poll-properties.json.gz") fullPollData = jsonToPerfRecords("testdata/volume-poll-full.json.gz") fullPollData[0].Timestamp = now.UnixNano() - mat := matrix.New("Volume", "Volume", "Volume") + mat := benchPerf.Matrix[benchPerf.Object] _, _ = benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0) - _, _ = benchPerf.pollData(now, fullPollData) + _, _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New()) + _, _ = benchPerf.cookCounters(mat, mat) os.Exit(m.Run()) } @@ -104,7 +106,8 @@ func BenchmarkRestPerf_PollData(b *testing.B) { for _, mm := range mi { ms = append(ms, mm) } - m, err := benchPerf.pollData(now, fullPollData) + _, _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New()) + m, err := benchPerf.cookCounters(mat, mat) if err != nil { b.Errorf("error: %v", err) } @@ -123,7 +126,7 @@ func TestRestPerf_pollData(t *testing.T) { pollDataPath1 string pollDataPath2 string numInstances int - numMetrics int + numMetrics uint64 sum int64 pollCounters string pollCounters2 string @@ -173,23 +176,24 @@ func TestRestPerf_pollData(t *testing.T) { t.Fatal(err) } - now := time.Now().Truncate(time.Second) - pollData[0].Timestamp = now.UnixNano() - _, err = r.pollData(now, pollData) + prevMat := r.Matrix[r.Object] + _, _, err = processAndCookCounters(r, pollData, prevMat) if err != nil { t.Fatal(err) } + counters = jsonToPerfRecords(tt.pollCounters2) _, err = r.pollCounter(counters[0].Records.Array(), 0) if err != nil { t.Fatal(err) } - future := now.Add(time.Minute * 15) + future := time.Now().Add(time.Minute * 15) pollData = jsonToPerfRecords(tt.pollDataPath2) pollData[0].Timestamp = future.UnixNano() - got, err := r.pollData(future, pollData) + prevMat = r.Matrix[r.Object] + got, metricCount, err := processAndCookCounters(r, pollData, prevMat) if (err != nil) != tt.wantErr { t.Errorf("pollData() error = %v, wantErr %v", err, tt.wantErr) return @@ -200,10 +204,8 @@ func TestRestPerf_pollData(t *testing.T) { t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances) } - metadata := r.Metadata - numMetrics, _ := metadata.GetMetric("metrics").GetValueInt(metadata.GetInstance("data")) - if numMetrics != tt.numMetrics { - t.Errorf("pollData() numMetrics got=%v, want=%v", numMetrics, tt.numMetrics) + if metricCount != tt.numMetrics { + t.Errorf("pollData() numMetrics got=%v, want=%v", metricCount, tt.numMetrics) } var sum int64 @@ -230,15 +232,16 @@ func TestRestPerf_pollData(t *testing.T) { func (r *RestPerf) testPollInstanceAndDataWithMetrics(t *testing.T, pollDataFile string, expectedExportedInst, expectedExportedMetrics int) { // Additional logic to count metrics + prevMat := r.Matrix[r.Object] pollData := jsonToPerfRecords(pollDataFile) - data, err := r.pollData(time.Now().Truncate(time.Second), pollData) + got, _, err := processAndCookCounters(r, pollData, prevMat) if err != nil { t.Fatal(err) } totalMetrics := 0 exportableInstance := 0 - mat := data[r.Object] + mat := got[r.Object] if mat != nil { for _, instance := range mat.GetInstances() { if instance.IsExportable() { @@ -289,6 +292,7 @@ func TestPartialAggregationSequence(t *testing.T) { var ( err error ) + conf.TestLoadHarvestConfig("testdata/config.yml") r := newRestPerf("Workload", "workload.yaml") counters := jsonToPerfRecords("testdata/partialAggregation/qos-counters.json") @@ -399,7 +403,7 @@ func TestQosVolume(t *testing.T) { pollDataPath1 string pollDataPath2 string numInstances int - numMetrics int + numMetrics uint64 sum int64 pollCounters string pollInstance string @@ -434,7 +438,9 @@ func TestQosVolume(t *testing.T) { pollData := jsonToPerfRecords(tt.pollDataPath1) now := time.Now().Truncate(time.Second) pollData[0].Timestamp = now.UnixNano() - _, err = r.pollData(now, pollData) + prevMat := r.Matrix[r.Object] + + _, _, err = processAndCookCounters(r, pollData, prevMat) if err != nil { t.Fatal(err) } @@ -443,7 +449,8 @@ func TestQosVolume(t *testing.T) { pollData = jsonToPerfRecords(tt.pollDataPath2) pollData[0].Timestamp = future.UnixNano() - got, err := r.pollData(future, pollData) + prevMat = r.Matrix[r.Object] + got, metricCount, err := processAndCookCounters(r, pollData, prevMat) if (err != nil) != tt.wantErr { t.Errorf("pollData() error = %v, wantErr %v", err, tt.wantErr) return @@ -454,10 +461,8 @@ func TestQosVolume(t *testing.T) { t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances) } - metadata := r.Metadata - numMetrics, _ := metadata.GetMetric("metrics").GetValueInt(metadata.GetInstance("data")) - if numMetrics != tt.numMetrics { - t.Errorf("pollData() numMetrics got=%v, want=%v", numMetrics, tt.numMetrics) + if metricCount != tt.numMetrics { + t.Errorf("pollData() numMetrics got=%v, want=%v", metricCount, tt.numMetrics) } var sum int64 @@ -481,3 +486,11 @@ func TestQosVolume(t *testing.T) { }) } } + +func processAndCookCounters(r *RestPerf, pollData []rest.PerfRecord, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, uint64, error) { + curMat := prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true}) + curMat.Reset() + metricCount, _, _ := r.processPerfRecords(pollData, curMat, prevMat, set.New()) + got, err := r.cookCounters(curMat, prevMat) + return got, metricCount, err +} diff --git a/cmd/tools/rest/rest.go b/cmd/tools/rest/rest.go index a4d82cf00..71d749883 100644 --- a/cmd/tools/rest/rest.go +++ b/cmd/tools/rest/rest.go @@ -470,6 +470,45 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result, return result, *analytics, nil } +func FetchRestPerfDataStream(client *Client, href string, processBatch func([]PerfRecord) error, headers ...map[string]string) error { + var prevLink string + nextLink := href + recordsFound := false + for { + response, err := client.GetRest(nextLink, headers...) + if err != nil { + return fmt.Errorf("error making request %w", err) + } + + // extract returned records since paginated records need to be merged into a single list + output := gjson.ParseBytes(response) + data := output.Get("records") + numRecords := output.Get("num_records") + next := output.Get("_links.next.href") + + if numRecords.Int() > 0 { + recordsFound = true + p := PerfRecord{Records: data, Timestamp: time.Now().UnixNano()} + if err := processBatch([]PerfRecord{p}); err != nil { + return err + } + } + + prevLink = nextLink + nextLink = next.ClonedString() + + if nextLink == "" || nextLink == prevLink { + // no nextLink or nextLink is the same as the previous link, no progress is being made, exit + break + } + } + if !recordsFound { + return errs.New(errs.ErrNoInstance, "no instances found") + } + + return nil +} + func FetchAllStream(client *Client, href string, processBatch func([]gjson.Result) error, headers ...map[string]string) error { var prevLink string nextLink := href @@ -701,41 +740,6 @@ func fetchAnalytics(client *Client, href string, records *[]gjson.Result, analyt return nil } -// FetchRestPerfData This method is used in PerfRest collector. This method returns timestamp per batch -func FetchRestPerfData(client *Client, href string, perfRecords *[]PerfRecord, headers ...map[string]string) error { - - var prevLink string - nextLink := href - - for { - response, err := client.GetRest(nextLink, headers...) - if err != nil { - return fmt.Errorf("error making request %w", err) - } - - // extract returned records since paginated records need to be merged into a single list - output := gjson.ParseBytes(response) - data := output.Get("records") - numRecords := output.Get("num_records") - next := output.Get("_links.next.href") - - if numRecords.Int() > 0 { - p := PerfRecord{Records: data, Timestamp: time.Now().UnixNano()} - *perfRecords = append(*perfRecords, p) - } - - prevLink = nextLink - nextLink = next.ClonedString() - - if nextLink == "" || nextLink == prevLink { - // no nextLink or nextLink is the same as the previous link, no progress is being made, exit - break - } - } - - return nil -} - func stderr(format string, a ...any) { _, _ = fmt.Fprintf(os.Stderr, format, a...) }