Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: RestPerf collectors should only run PollInstance for workloads #3207

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 60 additions & 22 deletions cmd/collectors/restperf/restperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/netapp/harvest/v2/pkg/tree/node"
"github.com/netapp/harvest/v2/pkg/util"
"github.com/tidwall/gjson"
"iter"
"log/slog"
"maps"
"path"
Expand Down Expand Up @@ -65,9 +66,10 @@ var qosDetailQueries = map[string]string{
}

type RestPerf struct {
*rest2.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
perfProp *perfProp
archivedMetrics map[string]*rest2.Metric // Keeps metric definitions that are not found in the counter schema. These metrics may be available in future ONTAP versions.
*rest2.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
perfProp *perfProp
archivedMetrics map[string]*rest2.Metric // Keeps metric definitions that are not found in the counter schema. These metrics may be available in future ONTAP versions.
hasInstanceSchedule bool
}

type counter struct {
Expand Down Expand Up @@ -142,6 +144,8 @@ func (r *RestPerf) Init(a *collector.AbstractCollector) error {
return err
}

r.InitSchedule()

r.Logger.Debug(
"initialized cache",
slog.Int("numMetrics", len(r.Prop.Metrics)),
Expand Down Expand Up @@ -687,11 +691,6 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {
startTime time.Time
)

mat := r.Matrix[r.Object]
if len(mat.GetInstances()) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances fetched in PollInstance")
}

timestamp := r.Matrix[r.Object].GetMetric(timestampMetricName)
if timestamp == nil {
return nil, errs.New(errs.ErrConfig, "missing timestamp metric")
Expand All @@ -703,7 +702,7 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {
dataQuery := path.Join(r.Prop.Query, "rows")

var filter []string
// Sort filters so that the href is deterministic
// Sort metrics so that the href is deterministic
metrics := slices.Sorted(maps.Keys(r.Prop.Metrics))

filter = append(filter, "counters.name="+strings.Join(metrics, "|"))
Expand Down Expand Up @@ -794,6 +793,14 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

// Call pollInstance to handle instance creation/deletion for objects without an instance schedule
if !r.hasInstanceSchedule {
_, err = r.pollInstance(curMat, perfToJSON(perfRecords), apiD)
if err != nil {
return nil, err
}
}

for _, perfRecord := range perfRecords {
pr := perfRecord.Records
t := perfRecord.Timestamp
Expand Down Expand Up @@ -870,10 +877,14 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)

instance = curMat.GetInstance(instanceKey)
if instance == nil {
if !isWorkloadObject(r.Prop.Query) && !isWorkloadDetailObject(r.Prop.Query) {
r.Logger.Warn("Skip instanceKey, not found in cache", slog.String("instanceKey", instanceKey))
if isWorkloadObject(r.Prop.Query) || isWorkloadDetailObject(r.Prop.Query) {
return true
}
instance, err = curMat.NewInstance(instanceKey)
cgrinds marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
r.Logger.Error("add instance", slogx.Err(err), slog.String("instanceKey", instanceKey))
return true
}
return true
}

// check for partial aggregation
Expand Down Expand Up @@ -1252,7 +1263,7 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
}
continue
}
// If we reach here then one of the earlier clauses should have executed `continue` statement
// If we reach here, then one of the earlier clauses should have executed `continue` statement
r.Logger.Error(
"Unknown property",
slog.String("key", key),
Expand Down Expand Up @@ -1300,6 +1311,18 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
return newDataMap, nil
}

func perfToJSON(records []rest.PerfRecord) iter.Seq[gjson.Result] {
return func(yield func(gjson.Result) bool) {
for _, record := range records {
if record.Records.IsArray() {
record.Records.ForEach(func(_, r gjson.Result) bool {
return yield(r)
})
}
}
}
}

// Poll counter "ops" of the related/parent object, required for objects
// workload_detail and workload_detail_volume. This counter is already
// collected by the other collectors, so this poll is redundant
Expand Down Expand Up @@ -1472,17 +1495,17 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) {
return r.handleError(err, href)
}

return r.pollInstance(records, time.Since(apiT))
return r.pollInstance(r.Matrix[r.Object], slices.Values(records), time.Since(apiT))
}

func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map[string]*matrix.Matrix, error) {
func (r *RestPerf) pollInstance(mat *matrix.Matrix, records iter.Seq[gjson.Result], apiD time.Duration) (map[string]*matrix.Matrix, error) {
var (
err error
oldInstances *set.Set
oldSize, newSize, removed, added int
count int
)

mat := r.Matrix[r.Object]
oldInstances = set.New()
parseT := time.Now()
for key := range mat.GetInstances() {
Expand All @@ -1498,15 +1521,13 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map
instanceKeys = []string{"name"}
}

if len(records) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

for _, instanceData := range records {
for instanceData := range records {
var (
instanceKey string
)

count++

if !instanceData.IsObject() {
r.Logger.Warn("Instance data is not object, skipping", slog.String("type", instanceData.Type.String()))
continue
Expand Down Expand Up @@ -1551,6 +1572,10 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map
}
}

if count == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

for key := range oldInstances.Iter() {
mat.RemoveInstance(key)
r.Logger.Debug("removed instance", slog.String("key", key))
Expand All @@ -1570,7 +1595,7 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map
_ = r.Metadata.LazySetValueUint64("numCalls", "instance", r.Client.Metadata.NumCalls)

if newSize == 0 {
return nil, errs.New(errs.ErrNoInstance, "")
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

return nil, err
Expand Down Expand Up @@ -1604,6 +1629,19 @@ func (r *RestPerf) handleError(err error, href string) (map[string]*matrix.Matri
return nil, fmt.Errorf("failed to fetch data. href=[%s] err: %w", href, err)
}

func (r *RestPerf) InitSchedule() {
if r.Schedule == nil {
return
}
tasks := r.Schedule.GetTasks()
for _, task := range tasks {
if task.Name == "instance" {
r.hasInstanceSchedule = true
return
}
}
}

func isWorkloadObject(query string) bool {
_, ok := qosQueries[query]
return ok
Expand Down
12 changes: 7 additions & 5 deletions cmd/collectors/restperf/restperf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func TestMain(m *testing.M) {
propertiesData = jsonToPerfRecords("testdata/volume-poll-properties.json.gz")
fullPollData = jsonToPerfRecords("testdata/volume-poll-full.json.gz")
fullPollData[0].Timestamp = now.UnixNano()
_, _ = benchPerf.pollInstance(propertiesData[0].Records.Array(), 0)
mat := matrix.New("Volume", "Volume", "Volume")
_, _ = benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0)
_, _ = benchPerf.pollData(now, fullPollData)

os.Exit(m.Run())
Expand All @@ -98,7 +99,8 @@ func BenchmarkRestPerf_PollData(b *testing.B) {
for range b.N {
now = now.Add(time.Minute * 15)
fullPollData[0].Timestamp = now.UnixNano()
mi, _ := benchPerf.pollInstance(propertiesData[0].Records.Array(), 0)
mat := matrix.New("Volume", "Volume", "Volume")
mi, _ := benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0)
for _, mm := range mi {
ms = append(ms, mm)
}
Expand Down Expand Up @@ -166,7 +168,7 @@ func TestRestPerf_pollData(t *testing.T) {
}
pollInstance := jsonToPerfRecords(tt.pollInstance)
pollData := jsonToPerfRecords(tt.pollDataPath1)
_, err = r.pollInstance(pollInstance[0].Records.Array(), 0)
_, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInstance), 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -295,7 +297,7 @@ func TestPartialAggregationSequence(t *testing.T) {
t.Fatalf("Failed to fetch poll counter %v", err)
}
pollInstance := jsonToPerfRecords("testdata/partialAggregation/qos-poll-instance.json")
_, err = r.pollInstance(pollInstance[0].Records.Array(), 0)
_, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInstance), 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -424,7 +426,7 @@ func TestQosVolume(t *testing.T) {
}

pollInst := jsonToPerfRecords(tt.pollInstance)
_, err = r.pollInstance(pollInst[0].Records.Array(), 0)
_, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInst), 0)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions conf/restperf/9.12.0/workload.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object: qos
client_timeout: 1m30s

schedule:
- instance: 10m
- data: 3m

counters:
Expand Down
1 change: 1 addition & 0 deletions conf/restperf/9.12.0/workload_detail.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ object: qos_detail
client_timeout: 1m30s

schedule:
- instance: 10m
- data: 3m

counters:
Expand Down
1 change: 1 addition & 0 deletions conf/restperf/9.12.0/workload_detail_volume.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object: qos_detail
client_timeout: 1m30s

schedule:
- instance: 10m
- data: 3m

counters:
Expand Down
1 change: 1 addition & 0 deletions conf/restperf/9.12.0/workload_volume.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ object: qos
client_timeout: 1m30s

schedule:
- instance: 10m
- data: 3m

counters:
Expand Down
1 change: 0 additions & 1 deletion conf/restperf/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ collector: RestPerf
# Order here matters!
schedule:
- counter: 24h
- instance: 10m
- data: 1m

objects:
Expand Down
2 changes: 1 addition & 1 deletion integration/test/grafana/grafana_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (g *Mgr) Import() (bool, string) {
if docker.IsDockerBasedPoller() {
grafanaURL = "grafana:3000"
}
importCmds := []string{"grafana", "import", "--overwrite", "--addr", grafanaURL}
importCmds := []string{"grafana", "import", "--addr", grafanaURL}
if docker.IsDockerBasedPoller() {
params := []string{"exec", containerIDs[0].ID, "bin/harvest"}
params = append(params, importCmds...)
Expand Down