Skip to content

Commit

Permalink
feat: Harvest Rest and RestPerf collectors should support batching (#…
Browse files Browse the repository at this point in the history
…3195)

Fixes: #3192
  • Loading branch information
cgrinds authored Oct 7, 2024
1 parent 5f6ef52 commit 2cc9329
Show file tree
Hide file tree
Showing 24 changed files with 184 additions and 67 deletions.
9 changes: 3 additions & 6 deletions cmd/collectors/commonutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func InvokeRestCallWithTestFile(client *rest.Client, href string, logger *slog.L
}

func InvokeRestCall(client *rest.Client, href string, logger *slog.Logger) ([]gjson.Result, error) {
result, err := rest.Fetch(client, href)
result, err := rest.FetchAll(client, href)
if err != nil {
logger.Error(
"Failed to fetch data",
Expand Down Expand Up @@ -101,20 +101,17 @@ func GetClusterTime(client *rest.Client, returnTimeOut *int, logger *slog.Logger
query := "private/cli/cluster/date"
fields := []string{"date"}

maxRecords := 1

href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(&maxRecords).
ReturnTimeout(returnTimeOut).
Build()

if records, err = rest.Fetch(client, href); err != nil {
if records, err = rest.FetchSome(client, href, 1, DefaultBatchSize); err != nil {
return clusterTime, err
}
if len(records) == 0 {
return clusterTime, errs.New(errs.ErrConfig, " date not found on cluster")
return clusterTime, errs.New(errs.ErrConfig, "date not found on cluster")
}

for _, instanceData := range records {
Expand Down
2 changes: 2 additions & 0 deletions cmd/collectors/ems/ems.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func (e *Ems) PollInstance() (map[string]*matrix.Matrix, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
ReturnTimeout(e.ReturnTimeOut).
Build()

Expand Down Expand Up @@ -436,6 +437,7 @@ func (e *Ems) getHref(names []string, filter []string) string {
APIPath(e.Query).
Fields(e.Fields).
Filter(filter).
MaxRecords(collectors.DefaultBatchSize).
ReturnTimeout(e.ReturnTimeOut).
Build()
return href
Expand Down
3 changes: 2 additions & 1 deletion cmd/collectors/power.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func collectChassisFRU(client *rest.Client, logger *slog.Logger) (map[string]int
APIPath(query).
Fields(fields).
Filter(filter).
MaxRecords(DefaultBatchSize).
Build()

result, err := rest.Fetch(client, href)
result, err := rest.FetchAll(client, href)
if err != nil {
return nil, fmt.Errorf("failed to fetch data href=%s err=%w", href, err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/collectors/rest/plugins/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (a *Aggregate) getObjectStoreData() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(apiQuery).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{`tier_name=!" "|""`}).
Build()

Expand Down
2 changes: 2 additions & 0 deletions cmd/collectors/rest/plugins/certificate/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (c *Certificate) GetAdminVserver() (string, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields([]string{"type"}).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"type=admin"}).
Build()

Expand All @@ -221,6 +222,7 @@ func (c *Certificate) GetSecuritySsl(adminSvm string) (string, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields([]string{"serial"}).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"vserver=" + adminSvm}).
Build()

Expand Down
11 changes: 11 additions & 0 deletions cmd/collectors/rest/plugins/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ func (h *Health) getDisks() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"container_type=broken|unassigned"}).
Build()

Expand All @@ -652,6 +653,7 @@ func (h *Health) getShelves() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Build()

return collectors.InvokeRestCall(h.client, href, h.SLogger)
Expand All @@ -663,6 +665,7 @@ func (h *Health) getNodes() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"health=false"}).
Build()

Expand All @@ -675,6 +678,7 @@ func (h *Health) getHADown() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"possible=!true"}).
Build()

Expand All @@ -685,6 +689,7 @@ func (h *Health) getRansomwareVolumes() ([]gjson.Result, error) {
query := "api/storage/volumes"
href := rest.NewHrefBuilder().
APIPath(query).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"anti_ransomware.state=enabled", "anti_ransomware.attack_probability=low|moderate|high"}).
Build()

Expand All @@ -697,6 +702,7 @@ func (h *Health) getNonCompliantLicense() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"state=noncompliant"}).
Build()

Expand All @@ -709,6 +715,7 @@ func (h *Health) getMoveFailedVolumes() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"movement.state=cutover_wait|failed|cutover_pending"}).
Build()

Expand All @@ -720,6 +727,7 @@ func (h *Health) getNonHomeLIFs() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields([]string{"svm", "location"}).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"location.is_home=false"}).
Build()

Expand All @@ -732,6 +740,7 @@ func (h *Health) getFCPorts() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"enabled=true", "state=offlined_by_system"}).
Build()

Expand All @@ -744,6 +753,7 @@ func (h *Health) getEthernetPorts() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"enabled=true", "state=down"}).
Build()

Expand All @@ -754,6 +764,7 @@ func (h *Health) getSupportAlerts(filter []string) ([]gjson.Result, error) {
query := "api/private/support/alerts"
href := rest.NewHrefBuilder().
APIPath(query).
MaxRecords(collectors.DefaultBatchSize).
Filter(filter).
Build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (o *OntapS3Service) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matri
href := rest.NewHrefBuilder().
APIPath(o.query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Build()

if result, err = collectors.InvokeRestCall(o.client, href, o.SLogger); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (s *SecurityAccount) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matr
href := rest.NewHrefBuilder().
APIPath(s.query).
Fields([]string{"applications"}).
MaxRecords(collectors.DefaultBatchSize).
Build()

s.client.Metadata.Reset()
Expand Down
6 changes: 4 additions & 2 deletions cmd/collectors/rest/plugins/snapmirror/snapmirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ func (m *SnapMirror) getSVMPeerData(cluster string) error {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"peer.cluster.name=!" + cluster}).
Build()

result, err := rest.Fetch(m.client, href)
result, err := rest.FetchAll(m.client, href)
if err != nil {
m.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href))
return err
Expand Down Expand Up @@ -155,9 +156,10 @@ func (m *SnapMirror) getClusterPeerData() error {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Build()

result, err := rest.Fetch(m.client, href)
result, err := rest.FetchAll(m.client, href)
if err != nil {
m.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href))
return err
Expand Down
4 changes: 4 additions & 0 deletions cmd/collectors/rest/plugins/svm/svm.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (s *SVM) GetKerberosConfig() (map[string]string, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Build()

if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil {
Expand Down Expand Up @@ -249,6 +250,7 @@ func (s *SVM) GetFpolicy() (map[string]Fpolicy, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Build()

if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil {
Expand Down Expand Up @@ -283,6 +285,7 @@ func (s *SVM) GetIscsiServices() (map[string]string, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Build()

if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil {
Expand Down Expand Up @@ -316,6 +319,7 @@ func (s *SVM) GetIscsiCredentials() (map[string]string, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Build()

if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/collectors/rest/plugins/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (v *Volume) getEncryptedDisks() ([]gjson.Result, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{"protection_mode=!data|full"}).
Build()

Expand Down Expand Up @@ -295,6 +296,7 @@ func (v *Volume) getVolume(field string, fields []string, volumeMap map[string]v
href := rest.NewHrefBuilder().
APIPath(query).
Fields(fields).
MaxRecords(collectors.DefaultBatchSize).
Filter([]string{field}).
Build()

Expand Down
10 changes: 5 additions & 5 deletions cmd/collectors/rest/plugins/volumeanalytics/volumeanalytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const explorer = "volume_analytics"

var MaxDirCollectCount = 100
var maxDirCollectCount = "100"

type VolumeAnalytics struct {
*plugin.AbstractPlugin
Expand Down Expand Up @@ -52,11 +52,11 @@ func (v *VolumeAnalytics) Init() error {
m := v.Params.GetChildS("MaxDirectoryCount")
if m != nil {
count := m.GetContentS()
i, err := strconv.Atoi(count)
_, err := strconv.Atoi(count)
if err != nil {
v.SLogger.Warn("using default", slog.String("MaxDirectoryCount", count))
} else {
MaxDirCollectCount = i
maxDirCollectCount = count
}
}

Expand All @@ -70,7 +70,7 @@ func (v *VolumeAnalytics) Init() error {
return err
}

// Assigned the value to currentVal so that plugin would be invoked first time to populate cache.
// Assigned the value to currentVal so that plugin would be invoked the first time to populate cache.
v.currentVal = v.SetPluginInterval()
return nil
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func (v *VolumeAnalytics) getAnalyticsData(instanceID string) ([]gjson.Result, g
APIPath(query).
Fields(fields).
Filter([]string{"order_by=analytics.bytes_used+desc", "type=directory"}).
MaxRecords(&MaxDirCollectCount).
MaxRecords(maxDirCollectCount).
Build()
if result, analytics, err = rest.FetchAnalytics(v.client, href); err != nil {
return nil, gjson.Result{}, err
Expand Down
8 changes: 6 additions & 2 deletions cmd/collectors/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Rest struct {
Prop *prop
endpoints []*EndPoint
isIgnoreUnknownFieldsEnabled bool
BatchSize string
}

type EndPoint struct {
Expand Down Expand Up @@ -323,7 +324,7 @@ func getFieldName(source string, parent string) []string {
func (r *Rest) PollCounter() (map[string]*matrix.Matrix, error) {

startTime := time.Now()
// Update the cluster info to track if customer version is updated
// Update the cluster info to track if ONTAP version is updated
err := r.Client.UpdateClusterInfo(5)
if err != nil {
return nil, err
Expand Down Expand Up @@ -356,6 +357,7 @@ func (r *Rest) updateHref() {
Fields(r.Fields(r.Prop)).
HiddenFields(r.Prop.HiddenFields).
Filter(r.Prop.Filter).
MaxRecords(r.BatchSize).
ReturnTimeout(r.Prop.ReturnTimeOut).
IsIgnoreUnknownFieldsEnabled(r.isIgnoreUnknownFieldsEnabled).
Build()
Expand All @@ -366,6 +368,7 @@ func (r *Rest) updateHref() {
Fields(r.Fields(e.prop)).
HiddenFields(e.prop.HiddenFields).
Filter(r.filter(e)).
MaxRecords(r.BatchSize).
ReturnTimeout(r.Prop.ReturnTimeOut).
IsIgnoreUnknownFieldsEnabled(r.isIgnoreUnknownFieldsEnabled).
Build()
Expand Down Expand Up @@ -677,7 +680,7 @@ func (r *Rest) GetRestData(href string) ([]gjson.Result, error) {
return nil, errs.New(errs.ErrConfig, "empty url")
}

result, err := rest.Fetch(r.Client, href)
result, err := rest.FetchAll(r.Client, href)
if err != nil {
return r.handleError(err)
}
Expand Down Expand Up @@ -783,6 +786,7 @@ func (r *Rest) getNodeUuids() ([]collector.ID, error) {
href := rest.NewHrefBuilder().
APIPath(query).
Fields([]string{"serial_number", "system_id"}).
MaxRecords(collectors.DefaultBatchSize).
ReturnTimeout(r.Prop.ReturnTimeOut).
Build()

Expand Down
12 changes: 11 additions & 1 deletion cmd/collectors/rest/templating.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rest

import (
"fmt"
"github.com/netapp/harvest/v2/cmd/collectors"
"github.com/netapp/harvest/v2/pkg/errs"
"github.com/netapp/harvest/v2/pkg/tree/node"
"github.com/netapp/harvest/v2/pkg/util"
Expand Down Expand Up @@ -61,7 +62,16 @@ func (r *Rest) InitCache() error {
}
}

// private end point do not support * as fields. We need to pass fields in endpoint
if b := r.Params.GetChildContentS("batch_size"); b != "" {
if _, err := strconv.Atoi(b); err == nil {
r.BatchSize = b
}
}
if r.BatchSize == "" && r.Params.GetChildContentS("no_max_records") != "true" {
r.BatchSize = collectors.DefaultBatchSize
}

// Private end points do not support * as fields. We need to pass fields in endpoint
query := r.Params.GetChildS("query")
r.Prop.IsPublic = true
if query != nil {
Expand Down
Loading

0 comments on commit 2cc9329

Please sign in to comment.