diff --git a/cmd/collectors/restperf/restperf.go b/cmd/collectors/restperf/restperf.go index c85ab6a76..f9791cd3a 100644 --- a/cmd/collectors/restperf/restperf.go +++ b/cmd/collectors/restperf/restperf.go @@ -693,6 +693,7 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { var ( apiD, parseD time.Duration metricCount uint64 + numPartials uint64 startTime time.Time prevMat *matrix.Matrix curMat *matrix.Matrix @@ -761,14 +762,14 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { processBatch := func(perfRecords []rest.PerfRecord) error { if len(perfRecords) == 0 { - return errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") + return nil } // Process the current batch of records - count, batchParseD := r.processPerfRecords(perfRecords, curMat, prevMat, oldInstances) + count, np, batchParseD := r.processPerfRecords(perfRecords, curMat, prevMat, oldInstances) + numPartials += np metricCount += count parseD += batchParseD - apiD -= batchParseD return nil } @@ -779,10 +780,33 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { return nil, err } - return r.cookCounters(metricCount, curMat, prevMat, oldInstances) + if !r.hasInstanceSchedule { + // Remove old instances that are not found in new instances + for key := range oldInstances.Iter() { + curMat.RemoveInstance(key) + } + } + + if isWorkloadDetailObject(r.Prop.Query) { + if err := r.getParentOpsCounters(curMat); err != nil { + // no point to continue as we can't calculate the other counters + return nil, err + } + } + + _ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds()) + _ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds()) + _ = r.Metadata.LazySetValueUint64("metrics", "data", metricCount) + _ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances()))) + _ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx) + _ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls) + _ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials) + r.AddCollectCount(metricCount) + + return r.cookCounters(curMat, prevMat) } -func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (uint64, time.Duration) { +func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (uint64, uint64, time.Duration) { var ( count uint64 parseD time.Duration @@ -1092,7 +1116,7 @@ func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *mat }) } parseD = time.Since(startTime) - return count, parseD + return count, numPartials, parseD } // getMetric retrieves the metric associated with the given key from the current matrix (curMat). @@ -1129,38 +1153,12 @@ func (r *RestPerf) getMetric(curMat *matrix.Matrix, prevMat *matrix.Matrix, key return curMetric, nil } -func (r *RestPerf) cookCounters(count uint64, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (map[string]*matrix.Matrix, error) { +func (r *RestPerf) cookCounters(curMat *matrix.Matrix, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, error) { var ( - apiD, parseD time.Duration - err error - skips int - numPartials uint64 + err error + skips int ) - if !r.hasInstanceSchedule { - // Remove old instances that are not found in new instances - for key := range oldInstances.Iter() { - curMat.RemoveInstance(key) - } - } - - if isWorkloadDetailObject(r.Prop.Query) { - if err := r.getParentOpsCounters(curMat); err != nil { - // no point to continue as we can't calculate the other counters - return nil, err - } - } - - _ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds()) - _ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds()) - _ = r.Metadata.LazySetValueUint64("metrics", "data", count) - _ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances()))) - _ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx) - _ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls) - _ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials) - - r.AddCollectCount(count) - // skip calculating from delta if no data from previous poll if r.perfProp.isCacheEmpty { r.Logger.Debug("skip postprocessing until next poll (previous cache empty)") diff --git a/cmd/collectors/restperf/restperf_test.go b/cmd/collectors/restperf/restperf_test.go index 3200db8fa..95bdad2a9 100644 --- a/cmd/collectors/restperf/restperf_test.go +++ b/cmd/collectors/restperf/restperf_test.go @@ -87,8 +87,8 @@ func TestMain(m *testing.M) { fullPollData[0].Timestamp = now.UnixNano() mat := benchPerf.Matrix[benchPerf.Object] _, _ = benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0) - _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New()) - _, _ = benchPerf.cookCounters(0, mat, mat, set.New()) + _, _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New()) + _, _ = benchPerf.cookCounters(mat, mat) os.Exit(m.Run()) } @@ -106,8 +106,8 @@ func BenchmarkRestPerf_PollData(b *testing.B) { for _, mm := range mi { ms = append(ms, mm) } - _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New()) - m, err := benchPerf.cookCounters(0, mat, mat, set.New()) + _, _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New()) + m, err := benchPerf.cookCounters(mat, mat) if err != nil { b.Errorf("error: %v", err) } @@ -126,7 +126,7 @@ func TestRestPerf_pollData(t *testing.T) { pollDataPath1 string pollDataPath2 string numInstances int - numMetrics int + numMetrics uint64 sum int64 pollCounters string pollCounters2 string @@ -193,7 +193,7 @@ func TestRestPerf_pollData(t *testing.T) { pollData[0].Timestamp = future.UnixNano() prevMat = r.Matrix[r.Object] - got, _, err := processAndCookCounters(r, pollData, prevMat) + got, metricCount, err := processAndCookCounters(r, pollData, prevMat) if (err != nil) != tt.wantErr { t.Errorf("pollData() error = %v, wantErr %v", err, tt.wantErr) return @@ -204,10 +204,8 @@ func TestRestPerf_pollData(t *testing.T) { t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances) } - metadata := r.Metadata - numMetrics, _ := metadata.GetMetric("metrics").GetValueInt(metadata.GetInstance("data")) - if numMetrics != tt.numMetrics { - t.Errorf("pollData() numMetrics got=%v, want=%v", numMetrics, tt.numMetrics) + if metricCount != tt.numMetrics { + t.Errorf("pollData() numMetrics got=%v, want=%v", metricCount, tt.numMetrics) } var sum int64 @@ -405,7 +403,7 @@ func TestQosVolume(t *testing.T) { pollDataPath1 string pollDataPath2 string numInstances int - numMetrics int + numMetrics uint64 sum int64 pollCounters string pollInstance string @@ -452,7 +450,7 @@ func TestQosVolume(t *testing.T) { pollData[0].Timestamp = future.UnixNano() prevMat = r.Matrix[r.Object] - got, _, err := processAndCookCounters(r, pollData, prevMat) + got, metricCount, err := processAndCookCounters(r, pollData, prevMat) if (err != nil) != tt.wantErr { t.Errorf("pollData() error = %v, wantErr %v", err, tt.wantErr) return @@ -463,10 +461,8 @@ func TestQosVolume(t *testing.T) { t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances) } - metadata := r.Metadata - numMetrics, _ := metadata.GetMetric("metrics").GetValueInt(metadata.GetInstance("data")) - if numMetrics != tt.numMetrics { - t.Errorf("pollData() numMetrics got=%v, want=%v", numMetrics, tt.numMetrics) + if metricCount != tt.numMetrics { + t.Errorf("pollData() numMetrics got=%v, want=%v", metricCount, tt.numMetrics) } var sum int64 @@ -494,7 +490,7 @@ func TestQosVolume(t *testing.T) { func processAndCookCounters(r *RestPerf, pollData []rest.PerfRecord, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, uint64, error) { curMat := prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true}) curMat.Reset() - metricCount, _ := r.processPerfRecords(pollData, curMat, prevMat, set.New()) - got, err := r.cookCounters(metricCount, curMat, prevMat, set.New()) + metricCount, _, _ := r.processPerfRecords(pollData, curMat, prevMat, set.New()) + got, err := r.cookCounters(curMat, prevMat) return got, metricCount, err } diff --git a/cmd/tools/rest/rest.go b/cmd/tools/rest/rest.go index 215296d44..7561bcd23 100644 --- a/cmd/tools/rest/rest.go +++ b/cmd/tools/rest/rest.go @@ -473,7 +473,7 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result, func FetchRestPerfDataStream(client *Client, href string, processBatch func([]PerfRecord) error, headers ...map[string]string) error { var prevLink string nextLink := href - + recordsFound := false for { response, err := client.GetRest(nextLink, headers...) if err != nil { @@ -487,15 +487,11 @@ func FetchRestPerfDataStream(client *Client, href string, processBatch func([]Pe next := output.Get("_links.next.href") if numRecords.Int() > 0 { + recordsFound = true p := PerfRecord{Records: data, Timestamp: time.Now().UnixNano()} if err := processBatch([]PerfRecord{p}); err != nil { return err } - } else { - // Call processBatch with an empty list to handle no records scenario - if err := processBatch([]PerfRecord{}); err != nil { - return err - } } prevLink = nextLink @@ -506,6 +502,9 @@ func FetchRestPerfDataStream(client *Client, href string, processBatch func([]Pe break } } + if !recordsFound { + return errs.New(errs.ErrNoInstance, "no instances found") + } return nil }