Skip to content

Commit

Permalink
perf: add streaming to Rest collector (#3305)
Browse files Browse the repository at this point in the history
* perf: add streaming to Rest collector
  • Loading branch information
rahulguptajss authored Nov 19, 2024
1 parent ef244dc commit 2d6fe66
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 80 deletions.
10 changes: 8 additions & 2 deletions cmd/collectors/keyperf/keyperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/netapp/harvest/v2/pkg/conf"
"github.com/netapp/harvest/v2/pkg/errs"
"github.com/netapp/harvest/v2/pkg/matrix"
"github.com/netapp/harvest/v2/pkg/set"
"github.com/netapp/harvest/v2/pkg/slogx"
"github.com/tidwall/gjson"
"log/slog"
Expand Down Expand Up @@ -302,6 +303,11 @@ func (kp *KeyPerf) pollData(
)

prevMat = kp.Matrix[kp.Object]
// Track old instances before processing batches
oldInstances := set.New()
for key := range prevMat.GetInstances() {
oldInstances.Add(key)
}

// clone matrix without numeric data
curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
Expand All @@ -314,10 +320,10 @@ func (kp *KeyPerf) pollData(
if len(perfRecords) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+kp.Object+" instances on cluster")
}
count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false)
count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false, oldInstances)

// process endpoints
eCount, endpointAPID := kp.ProcessEndPoints(curMat, endpointFunc)
eCount, endpointAPID := kp.ProcessEndPoints(curMat, endpointFunc, oldInstances)
count += eCount

parseD = time.Since(startTime)
Expand Down
113 changes: 56 additions & 57 deletions cmd/collectors/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,65 +380,79 @@ func (r *Rest) updateHref() {
}

func (r *Rest) PollData() (map[string]*matrix.Matrix, error) {

var (
startTime time.Time
err error
records []gjson.Result
apiD, parseD time.Duration
metricCount uint64
)

r.Matrix[r.Object].Reset()
r.Client.Metadata.Reset()

startTime = time.Now()

if records, err = r.GetRestData(r.Prop.Href); err != nil {
return nil, err
}

if len(records) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
// Track old instances before processing batches
oldInstances := set.New()
for key := range r.Matrix[r.Object].GetInstances() {
oldInstances.Add(key)
}

return r.pollData(startTime, records, func(e *EndPoint) ([]gjson.Result, time.Duration, error) {
return r.ProcessEndPoint(e)
})
}
processBatch := func(records []gjson.Result) error {
if len(records) == 0 {
return errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

func (r *Rest) pollData(
startTime time.Time,
records []gjson.Result,
endpointFunc func(e *EndPoint) ([]gjson.Result, time.Duration, error),
) (map[string]*matrix.Matrix, error) {
// Process the current batch of records
count, batchParseD := r.pollData(records, oldInstances)
metricCount += count
parseD += batchParseD
apiD -= batchParseD
return nil
}

var (
count uint64
apiD, parseD time.Duration
)
startTime := time.Now()
if err := rest.FetchAllStream(r.Client, r.Prop.Href, processBatch); err != nil {
return nil, err
}
apiD += time.Since(startTime)

apiD = time.Since(startTime)
startTime = time.Now()
mat := r.Matrix[r.Object]
// Process endpoints after all batches have been processed
eCount, endpointAPID := r.ProcessEndPoints(r.Matrix[r.Object], r.ProcessEndPoint, oldInstances)
metricCount += eCount
apiD += endpointAPID

count, _ = r.HandleResults(mat, records, r.Prop, false)
r.postPollData(apiD, parseD, metricCount, oldInstances)
return r.Matrix, nil
}

// process endpoints
eCount, endpointAPID := r.ProcessEndPoints(mat, endpointFunc)
count += eCount
parseD = time.Since(startTime)
func (r *Rest) postPollData(apiD time.Duration, parseD time.Duration, metricCount uint64, oldInstances *set.Set) {
// Remove old instances that are not found in new instances
for key := range oldInstances.Iter() {
r.Matrix[r.Object].RemoveInstance(key)
}

numRecords := len(r.Matrix[r.Object].GetInstances())

_ = r.Metadata.LazySetValueInt64("api_time", "data", (apiD + endpointAPID).Microseconds())
_ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = r.Metadata.LazySetValueUint64("metrics", "data", count)
_ = r.Metadata.LazySetValueUint64("metrics", "data", metricCount)
_ = r.Metadata.LazySetValueUint64("instances", "data", uint64(numRecords))
_ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx)
_ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls)

r.AddCollectCount(count)
r.AddCollectCount(metricCount)
}

func (r *Rest) pollData(records []gjson.Result, oldInstances *set.Set) (uint64, time.Duration) {

return r.Matrix, nil
var (
count uint64
parseD time.Duration
)

startTime := time.Now()
mat := r.Matrix[r.Object]

count, _ = r.HandleResults(mat, records, r.Prop, false, oldInstances)
parseD = time.Since(startTime)

return count, parseD
}

func (r *Rest) ProcessEndPoint(e *EndPoint) ([]gjson.Result, time.Duration, error) {
Expand All @@ -450,7 +464,7 @@ func (r *Rest) ProcessEndPoint(e *EndPoint) ([]gjson.Result, time.Duration, erro
return data, time.Since(now), nil
}

func (r *Rest) ProcessEndPoints(mat *matrix.Matrix, endpointFunc func(e *EndPoint) ([]gjson.Result, time.Duration, error)) (uint64, time.Duration) {
func (r *Rest) ProcessEndPoints(mat *matrix.Matrix, endpointFunc func(e *EndPoint) ([]gjson.Result, time.Duration, error), oldInstances *set.Set) (uint64, time.Duration) {
var (
err error
count uint64
Expand All @@ -475,7 +489,7 @@ func (r *Rest) ProcessEndPoints(mat *matrix.Matrix, endpointFunc func(e *EndPoin
r.Logger.Debug("no instances on cluster", slog.String("APIPath", endpoint.prop.Query))
continue
}
count, _ = r.HandleResults(mat, records, endpoint.prop, true)
count, _ = r.HandleResults(mat, records, endpoint.prop, true, oldInstances)
}

return count, totalAPID
Expand Down Expand Up @@ -531,21 +545,15 @@ func (r *Rest) LoadPlugin(kind string, abc *plugin.AbstractPlugin) plugin.Plugin

// HandleResults function is used for handling the rest response for parent as well as endpoints calls,
// isEndPoint would be true only for the endpoint call, and it can't create/delete instance.
func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *prop, isEndPoint bool) (uint64, uint64) {
func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *prop, isEndPoint bool, oldInstances *set.Set) (uint64, uint64) {
var (
err error
count uint64
numPartials uint64
)

oldInstances := set.New()
currentInstances := set.New()

// copy keys of current instances. This is used to remove deleted instances from matrix later
for key := range mat.GetInstances() {
oldInstances.Add(key)
}

for _, instanceData := range result {
var (
instanceKey string
Expand Down Expand Up @@ -591,11 +599,10 @@ func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *pr
} else {
currentInstances.Add(instanceKey)
}
oldInstances.Remove(instanceKey)

// clear all instance labels as there are some fields which may be missing between polls
// Don't remove instance labels when endpoints are being processed because endpoints uses parent instance only.
if !isEndPoint {
oldInstances.Remove(instanceKey)
instance.ClearLabels()
}
for label, display := range prop.InstanceLabels {
Expand Down Expand Up @@ -671,14 +678,6 @@ func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *pr
}
}

// Used for parent as we don't want to remove instances for endpoints
if !isEndPoint {
// remove deleted instances
for key := range oldInstances.Iter() {
mat.RemoveInstance(key)
}
}

return count, numPartials
}

Expand Down
30 changes: 9 additions & 21 deletions cmd/collectors/rest/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/netapp/harvest/v2/cmd/poller/options"
"github.com/netapp/harvest/v2/pkg/conf"
"github.com/netapp/harvest/v2/pkg/matrix"
"github.com/netapp/harvest/v2/pkg/set"
"github.com/netapp/harvest/v2/pkg/util"
"github.com/tidwall/gjson"
"os"
Expand Down Expand Up @@ -73,31 +74,18 @@ func TestMain(m *testing.M) {

benchRest = newRest("Volume", "volume.yaml", "testdata/conf")
fullPollData = collectors.JSONToGson("testdata/volume-1.json.gz", true)
now := time.Now().Truncate(time.Second)
_, _ = benchRest.pollData(now, fullPollData, volumeEndpoints)
_, _ = benchRest.pollData(fullPollData, set.New())

os.Exit(m.Run())
}

func BenchmarkRestPerf_PollData(b *testing.B) {
var err error
ms = make([]*matrix.Matrix, 0)
now := time.Now().Truncate(time.Second)

for range b.N {
now = now.Add(time.Minute * 15)
mi, _ := benchRest.pollData(now, fullPollData, volumeEndpoints)

for _, mm := range mi {
ms = append(ms, mm)
}
mi, err = benchRest.pollData(now, fullPollData, volumeEndpoints)
if err != nil {
b.Errorf("error: %v", err)
}
for _, mm := range mi {
ms = append(ms, mm)
}
_, _ = benchRest.pollData(fullPollData, set.New())
}
}

Expand All @@ -121,14 +109,14 @@ func Test_pollDataVolume(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {

r := newRest("Volume", "volume.yaml", "testdata/conf")
now := time.Now().Truncate(time.Second)
pollData := collectors.JSONToGson(tt.pollDataPath1, true)

mm, err := r.pollData(now, pollData, volumeEndpoints)
if err != nil {
t.Fatal(err)
}
m := mm["Volume"]
mcount, parseD := r.pollData(pollData, set.New())
mecount, apiD := r.ProcessEndPoints(r.Matrix[r.Object], volumeEndpoints, set.New())

metricCount := mcount + mecount
r.postPollData(apiD, parseD, metricCount, set.New())
m := r.Matrix["Volume"]

if len(m.GetInstances()) != tt.numInstances {
t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances)
Expand Down
50 changes: 50 additions & 0 deletions cmd/tools/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,56 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result,
return result, *analytics, nil
}

func FetchAllStream(client *Client, href string, processBatch func([]gjson.Result) error, headers ...map[string]string) error {
var prevLink string
nextLink := href

for {
var records []gjson.Result
response, err := client.GetRest(nextLink, headers...)
if err != nil {
return fmt.Errorf("error making request %w", err)
}

output := gjson.ParseBytes(response)
data := output.Get("records")
numRecords := output.Get("num_records")
next := output.Get("_links.next.href")

if data.Exists() {
if numRecords.Int() > 0 {
// Process the current batch of records
if err := processBatch(data.Array()); err != nil {
return err
}
}

prevLink = nextLink
// If there is a next link, follow it
nextLink = next.String()
if nextLink == "" || nextLink == prevLink {
// no nextLink or nextLink is the same as the previous link, no progress is being made, exit
break
}
} else {
contentJSON := `{"records":[]}`
response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", response)
if err != nil {
return fmt.Errorf("error setting record %w", err)
}
value := gjson.GetBytes(response, "records")
records = append(records, value.Array()...)
// Process the current batch of records
if err := processBatch(records); err != nil {
return err
}
break
}
}

return nil
}

func fetchAll(client *Client, href string, records *[]gjson.Result, headers ...map[string]string) error {

var prevLink string
Expand Down

0 comments on commit 2d6fe66

Please sign in to comment.