Skip to content

Commit

Permalink
perf: Restperf should stream results while parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulguptajss committed Dec 5, 2024
1 parent 9a0c0fe commit 8fbcf58
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 59 deletions.
68 changes: 33 additions & 35 deletions cmd/collectors/restperf/restperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {
var (
apiD, parseD time.Duration
metricCount uint64
numPartials uint64
startTime time.Time
prevMat *matrix.Matrix
curMat *matrix.Matrix
Expand Down Expand Up @@ -761,14 +762,14 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {

processBatch := func(perfRecords []rest.PerfRecord) error {
if len(perfRecords) == 0 {
return errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
return nil
}

// Process the current batch of records
count, batchParseD := r.processPerfRecords(perfRecords, curMat, prevMat, oldInstances)
count, np, batchParseD := r.processPerfRecords(perfRecords, curMat, prevMat, oldInstances)
numPartials += np
metricCount += count
parseD += batchParseD
apiD -= batchParseD
return nil
}

Expand All @@ -779,10 +780,33 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {
return nil, err
}

return r.cookCounters(metricCount, curMat, prevMat, oldInstances)
if !r.hasInstanceSchedule {
// Remove old instances that are not found in new instances
for key := range oldInstances.Iter() {
curMat.RemoveInstance(key)
}
}

if isWorkloadDetailObject(r.Prop.Query) {
if err := r.getParentOpsCounters(curMat); err != nil {
// no point to continue as we can't calculate the other counters
return nil, err
}
}

_ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = r.Metadata.LazySetValueUint64("metrics", "data", metricCount)
_ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx)
_ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls)
_ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials)
r.AddCollectCount(metricCount)

return r.cookCounters(curMat, prevMat)
}

func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (uint64, time.Duration) {
func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (uint64, uint64, time.Duration) {
var (
count uint64
parseD time.Duration
Expand Down Expand Up @@ -1092,7 +1116,7 @@ func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *mat
})
}
parseD = time.Since(startTime)
return count, parseD
return count, numPartials, parseD
}

// getMetric retrieves the metric associated with the given key from the current matrix (curMat).
Expand Down Expand Up @@ -1129,38 +1153,12 @@ func (r *RestPerf) getMetric(curMat *matrix.Matrix, prevMat *matrix.Matrix, key
return curMetric, nil
}

func (r *RestPerf) cookCounters(count uint64, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (map[string]*matrix.Matrix, error) {
func (r *RestPerf) cookCounters(curMat *matrix.Matrix, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, error) {
var (
apiD, parseD time.Duration
err error
skips int
numPartials uint64
err error
skips int
)

if !r.hasInstanceSchedule {
// Remove old instances that are not found in new instances
for key := range oldInstances.Iter() {
curMat.RemoveInstance(key)
}
}

if isWorkloadDetailObject(r.Prop.Query) {
if err := r.getParentOpsCounters(curMat); err != nil {
// no point to continue as we can't calculate the other counters
return nil, err
}
}

_ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = r.Metadata.LazySetValueUint64("metrics", "data", count)
_ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx)
_ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls)
_ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials)

r.AddCollectCount(count)

// skip calculating from delta if no data from previous poll
if r.perfProp.isCacheEmpty {
r.Logger.Debug("skip postprocessing until next poll (previous cache empty)")
Expand Down
32 changes: 14 additions & 18 deletions cmd/collectors/restperf/restperf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func TestMain(m *testing.M) {
fullPollData[0].Timestamp = now.UnixNano()
mat := benchPerf.Matrix[benchPerf.Object]
_, _ = benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0)
_, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New())
_, _ = benchPerf.cookCounters(0, mat, mat, set.New())
_, _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New())
_, _ = benchPerf.cookCounters(mat, mat)

os.Exit(m.Run())
}
Expand All @@ -106,8 +106,8 @@ func BenchmarkRestPerf_PollData(b *testing.B) {
for _, mm := range mi {
ms = append(ms, mm)
}
_, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New())
m, err := benchPerf.cookCounters(0, mat, mat, set.New())
_, _, _ = benchPerf.processPerfRecords(fullPollData, mat, mat, set.New())
m, err := benchPerf.cookCounters(mat, mat)
if err != nil {
b.Errorf("error: %v", err)
}
Expand All @@ -126,7 +126,7 @@ func TestRestPerf_pollData(t *testing.T) {
pollDataPath1 string
pollDataPath2 string
numInstances int
numMetrics int
numMetrics uint64
sum int64
pollCounters string
pollCounters2 string
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestRestPerf_pollData(t *testing.T) {
pollData[0].Timestamp = future.UnixNano()

prevMat = r.Matrix[r.Object]
got, _, err := processAndCookCounters(r, pollData, prevMat)
got, metricCount, err := processAndCookCounters(r, pollData, prevMat)
if (err != nil) != tt.wantErr {
t.Errorf("pollData() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -204,10 +204,8 @@ func TestRestPerf_pollData(t *testing.T) {
t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances)
}

metadata := r.Metadata
numMetrics, _ := metadata.GetMetric("metrics").GetValueInt(metadata.GetInstance("data"))
if numMetrics != tt.numMetrics {
t.Errorf("pollData() numMetrics got=%v, want=%v", numMetrics, tt.numMetrics)
if metricCount != tt.numMetrics {
t.Errorf("pollData() numMetrics got=%v, want=%v", metricCount, tt.numMetrics)
}

var sum int64
Expand Down Expand Up @@ -405,7 +403,7 @@ func TestQosVolume(t *testing.T) {
pollDataPath1 string
pollDataPath2 string
numInstances int
numMetrics int
numMetrics uint64
sum int64
pollCounters string
pollInstance string
Expand Down Expand Up @@ -452,7 +450,7 @@ func TestQosVolume(t *testing.T) {
pollData[0].Timestamp = future.UnixNano()

prevMat = r.Matrix[r.Object]
got, _, err := processAndCookCounters(r, pollData, prevMat)
got, metricCount, err := processAndCookCounters(r, pollData, prevMat)
if (err != nil) != tt.wantErr {
t.Errorf("pollData() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -463,10 +461,8 @@ func TestQosVolume(t *testing.T) {
t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances)
}

metadata := r.Metadata
numMetrics, _ := metadata.GetMetric("metrics").GetValueInt(metadata.GetInstance("data"))
if numMetrics != tt.numMetrics {
t.Errorf("pollData() numMetrics got=%v, want=%v", numMetrics, tt.numMetrics)
if metricCount != tt.numMetrics {
t.Errorf("pollData() numMetrics got=%v, want=%v", metricCount, tt.numMetrics)
}

var sum int64
Expand Down Expand Up @@ -494,7 +490,7 @@ func TestQosVolume(t *testing.T) {
func processAndCookCounters(r *RestPerf, pollData []rest.PerfRecord, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, uint64, error) {
curMat := prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()
metricCount, _ := r.processPerfRecords(pollData, curMat, prevMat, set.New())
got, err := r.cookCounters(metricCount, curMat, prevMat, set.New())
metricCount, _, _ := r.processPerfRecords(pollData, curMat, prevMat, set.New())
got, err := r.cookCounters(curMat, prevMat)
return got, metricCount, err
}
11 changes: 5 additions & 6 deletions cmd/tools/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result,
func FetchRestPerfDataStream(client *Client, href string, processBatch func([]PerfRecord) error, headers ...map[string]string) error {
var prevLink string
nextLink := href

recordsFound := false
for {
response, err := client.GetRest(nextLink, headers...)
if err != nil {
Expand All @@ -487,15 +487,11 @@ func FetchRestPerfDataStream(client *Client, href string, processBatch func([]Pe
next := output.Get("_links.next.href")

if numRecords.Int() > 0 {
recordsFound = true
p := PerfRecord{Records: data, Timestamp: time.Now().UnixNano()}
if err := processBatch([]PerfRecord{p}); err != nil {
return err
}
} else {
// Call processBatch with an empty list to handle no records scenario
if err := processBatch([]PerfRecord{}); err != nil {
return err
}
}

prevLink = nextLink
Expand All @@ -506,6 +502,9 @@ func FetchRestPerfDataStream(client *Client, href string, processBatch func([]Pe
break
}
}
if !recordsFound {
return errs.New(errs.ErrNoInstance, "no instances found")
}

return nil
}
Expand Down

0 comments on commit 8fbcf58

Please sign in to comment.