Skip to content

Commit

Permalink
perf: Restperf should stream results while parsing (#3356)
Browse files Browse the repository at this point in the history
* perf: Restperf should stream results while parsing
  • Loading branch information
rahulguptajss authored Dec 6, 2024
1 parent 2eb15ab commit f7fdf52
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 140 deletions.
180 changes: 97 additions & 83 deletions cmd/collectors/restperf/restperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,12 @@ func (r *RestPerf) processWorkLoadCounter() (map[string]*matrix.Matrix, error) {

func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {
var (
err error
perfRecords []rest.PerfRecord
startTime time.Time
apiD, parseD time.Duration
metricCount uint64
numPartials uint64
startTime time.Time
prevMat *matrix.Matrix
curMat *matrix.Matrix
)

timestamp := r.Matrix[r.Object].GetMetric(timestampMetricName)
Expand Down Expand Up @@ -743,88 +746,80 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {
}
}

err = rest.FetchRestPerfData(r.Client, href, &perfRecords, headers)
// Track old instances before processing batches
oldInstances := set.New()
// The creation and deletion of objects with an instance schedule are managed through pollInstance.
if !r.hasInstanceSchedule {
for key := range r.Matrix[r.Object].GetInstances() {
oldInstances.Add(key)
}
}

if err != nil {
return nil, fmt.Errorf("failed to fetch href=%s %w", href, err)
prevMat = r.Matrix[r.Object]
// clone matrix without numeric data
curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()

processBatch := func(perfRecords []rest.PerfRecord) error {
if len(perfRecords) == 0 {
return nil
}

// Process the current batch of records
count, np, batchParseD := r.processPerfRecords(perfRecords, curMat, prevMat, oldInstances)
numPartials += np
metricCount += count
parseD += batchParseD
return nil
}

return r.pollData(startTime, perfRecords)
}
err = rest.FetchRestPerfDataStream(r.Client, href, processBatch, headers)
apiD += time.Since(startTime)

// getMetric retrieves the metric associated with the given key from the current matrix (curMat).
// If the metric does not exist in curMat, it is created with the provided display settings.
// The function also ensures that the same metric exists in the previous matrix (prevMat) to
// allow for subsequent calculations (e.g., prevMetric - curMetric).
// This is particularly important in cases such as ONTAP upgrades, where curMat may contain
// additional metrics that are not present in prevMat. If prevMat does not have the metric,
// it is created to prevent a panic when attempting to perform calculations with non-existent metrics.
//
// This metric creation process within RestPerf is necessary during PollData because the information about whether a metric
// is an array is not available in the RestPerf PollCounter. The determination of whether a metric is an array
// is made by examining the actual data in RestPerf. Therefore, metric creation in RestPerf is performed during
// the poll data phase, and special handling is required for such cases.
//
// The function returns the current metric and any error encountered during its retrieval or creation.
func (r *RestPerf) getMetric(curMat *matrix.Matrix, prevMat *matrix.Matrix, key string, display ...string) (*matrix.Metric, error) {
var err error
curMetric := curMat.GetMetric(key)
if curMetric == nil {
curMetric, err = curMat.NewMetricFloat64(key, display...)
if err != nil {
return nil, err
if err != nil {
return nil, err
}

if !r.hasInstanceSchedule {
// Remove old instances that are not found in new instances
for key := range oldInstances.Iter() {
curMat.RemoveInstance(key)
}
}

prevMetric := prevMat.GetMetric(key)
if prevMetric == nil {
_, err = prevMat.NewMetricFloat64(key, display...)
if err != nil {
if isWorkloadDetailObject(r.Prop.Query) {
if err := r.getParentOpsCounters(curMat); err != nil {
// no point to continue as we can't calculate the other counters
return nil, err
}
}
return curMetric, nil

_ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = r.Metadata.LazySetValueUint64("metrics", "data", metricCount)
_ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx)
_ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls)
_ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials)
r.AddCollectCount(metricCount)

return r.cookCounters(curMat, prevMat)
}

func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord) (map[string]*matrix.Matrix, error) {
func (r *RestPerf) processPerfRecords(perfRecords []rest.PerfRecord, curMat *matrix.Matrix, prevMat *matrix.Matrix, oldInstances *set.Set) (uint64, uint64, time.Duration) {
var (
count uint64
apiD, parseD time.Duration
err error
parseD time.Duration
instanceKeys []string
skips int
numPartials uint64
instIndex int
ts float64
prevMat *matrix.Matrix
curMat *matrix.Matrix
err error
)

prevMat = r.Matrix[r.Object]
// clone matrix without numeric data
curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()
instanceKeys = r.Prop.InstanceKeys
startTime := time.Now()

apiD = time.Since(startTime)
// init current time
ts = float64(startTime.UnixNano()) / BILLION

startTime = time.Now()
instIndex = -1

if len(perfRecords) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

// Call pollInstance to handle instance creation/deletion for objects without an instance schedule
if !r.hasInstanceSchedule {
_, err = r.pollInstance(curMat, perfToJSON(perfRecords), apiD)
if err != nil {
return nil, err
}
}

for _, perfRecord := range perfRecords {
pr := perfRecord.Records
t := perfRecord.Timestamp
Expand All @@ -842,7 +837,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
isHistogram bool
histogramMetric *matrix.Metric
)
instIndex++

if !instanceData.IsObject() {
r.Logger.Warn("Instance data is not object, skipping", slog.String("type", instanceData.Type.String()))
Expand Down Expand Up @@ -911,6 +905,8 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
}
}

oldInstances.Remove(instanceKey)

// check for partial aggregation
if instanceData.Get("aggregation.complete").ClonedString() == "false" {
instance.SetPartial(true)
Expand Down Expand Up @@ -1072,7 +1068,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
slog.String("name", name),
slog.String("label", label),
slog.String("value", values[i]),
slog.Int("instIndex", instIndex),
)
continue
}
Expand All @@ -1086,7 +1081,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
"NewMetricFloat64",
slogx.Err(err),
slog.String("name", name),
slog.Int("instIndex", instIndex),
)
}
}
Expand All @@ -1098,7 +1092,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
slogx.Err(err),
slog.String("key", metric.Name),
slog.String("metric", metric.Label),
slog.Int("instIndex", instIndex),
)
}
} else {
Expand All @@ -1107,7 +1100,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
slogx.Err(err),
slog.String("key", metric.Name),
slog.String("metric", metric.Label),
slog.Int("instIndex", instIndex),
)
}
count++
Expand All @@ -1123,24 +1115,49 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
return true
})
}
parseD = time.Since(startTime)
return count, numPartials, parseD
}

if isWorkloadDetailObject(r.Prop.Query) {
if err := r.getParentOpsCounters(curMat); err != nil {
// no point to continue as we can't calculate the other counters
// getMetric retrieves the metric associated with the given key from the current matrix (curMat).
// If the metric does not exist in curMat, it is created with the provided display settings.
// The function also ensures that the same metric exists in the previous matrix (prevMat) to
// allow for subsequent calculations (e.g., prevMetric - curMetric).
// This is particularly important in cases such as ONTAP upgrades, where curMat may contain
// additional metrics that are not present in prevMat. If prevMat does not have the metric,
// it is created to prevent a panic when attempting to perform calculations with non-existent metrics.
//
// This metric creation process within RestPerf is necessary during PollData because the information about whether a metric
// is an array is not available in the RestPerf PollCounter. The determination of whether a metric is an array
// is made by examining the actual data in RestPerf. Therefore, metric creation in RestPerf is performed during
// the poll data phase, and special handling is required for such cases.
//
// The function returns the current metric and any error encountered during its retrieval or creation.
func (r *RestPerf) getMetric(curMat *matrix.Matrix, prevMat *matrix.Matrix, key string, display ...string) (*matrix.Metric, error) {
var err error
curMetric := curMat.GetMetric(key)
if curMetric == nil {
curMetric, err = curMat.NewMetricFloat64(key, display...)
if err != nil {
return nil, err
}
}

parseD = time.Since(startTime)
_ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = r.Metadata.LazySetValueUint64("metrics", "data", count)
_ = r.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx)
_ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls)
_ = r.Metadata.LazySetValueUint64("numPartials", "data", numPartials)
prevMetric := prevMat.GetMetric(key)
if prevMetric == nil {
_, err = prevMat.NewMetricFloat64(key, display...)
if err != nil {
return nil, err
}
}
return curMetric, nil
}

r.AddCollectCount(count)
func (r *RestPerf) cookCounters(curMat *matrix.Matrix, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, error) {
var (
err error
skips int
)

// skip calculating from delta if no data from previous poll
if r.perfProp.isCacheEmpty {
Expand Down Expand Up @@ -1253,7 +1270,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
slog.String("key", key),
slog.String("property", property),
slog.String("denominator", counter.denominator),
slog.Int("instIndex", instIndex),
)
continue
}
Expand Down Expand Up @@ -1296,7 +1312,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
"Unknown property",
slog.String("key", key),
slog.String("property", property),
slog.Int("instIndex", instIndex),
)
}

Expand All @@ -1314,7 +1329,6 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
slog.Int("i", i),
slog.String("metric", metric.GetName()),
slog.String("key", key),
slog.Int("instIndex", instIndex),
)
continue
}
Expand Down
Loading

0 comments on commit f7fdf52

Please sign in to comment.