From 2cc9329a79394990592aa8b20c0bc9741f1eebe6 Mon Sep 17 00:00:00 2001 From: Chris Grindstaff Date: Mon, 7 Oct 2024 06:14:23 -0400 Subject: [PATCH] feat: Harvest Rest and RestPerf collectors should support batching (#3195) Fixes: #3192 --- cmd/collectors/commonutils.go | 9 +- cmd/collectors/ems/ems.go | 2 + cmd/collectors/power.go | 3 +- .../rest/plugins/aggregate/aggregate.go | 1 + .../rest/plugins/certificate/certificate.go | 2 + cmd/collectors/rest/plugins/health/health.go | 11 ++ .../plugins/ontaps3service/ontaps3service.go | 1 + .../securityaccount/securityaccount.go | 1 + .../rest/plugins/snapmirror/snapmirror.go | 6 +- cmd/collectors/rest/plugins/svm/svm.go | 4 + cmd/collectors/rest/plugins/volume/volume.go | 2 + .../volumeanalytics/volumeanalytics.go | 10 +- cmd/collectors/rest/rest.go | 8 +- cmd/collectors/rest/templating.go | 12 +- cmd/collectors/restperf/plugins/disk/disk.go | 10 +- cmd/collectors/restperf/plugins/fcvi/fcvi.go | 4 +- cmd/collectors/restperf/plugins/nic/nic.go | 1 + .../restperf/plugins/volumetag/volumetag.go | 4 +- .../volumetopclients/volumetopclients.go | 2 + cmd/collectors/restperf/restperf.go | 9 +- cmd/tools/generate/counter.go | 2 +- cmd/tools/rest/href_builder.go | 15 ++- cmd/tools/rest/rest.go | 124 ++++++++++++++---- integration/go.sum | 8 -- 24 files changed, 184 insertions(+), 67 deletions(-) diff --git a/cmd/collectors/commonutils.go b/cmd/collectors/commonutils.go index 382f05d86..59b1a117c 100644 --- a/cmd/collectors/commonutils.go +++ b/cmd/collectors/commonutils.go @@ -72,7 +72,7 @@ func InvokeRestCallWithTestFile(client *rest.Client, href string, logger *slog.L } func InvokeRestCall(client *rest.Client, href string, logger *slog.Logger) ([]gjson.Result, error) { - result, err := rest.Fetch(client, href) + result, err := rest.FetchAll(client, href) if err != nil { logger.Error( "Failed to fetch data", @@ -101,20 +101,17 @@ func GetClusterTime(client *rest.Client, returnTimeOut *int, logger *slog.Logger query := "private/cli/cluster/date" fields := []string{"date"} - maxRecords := 1 - href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). - MaxRecords(&maxRecords). ReturnTimeout(returnTimeOut). Build() - if records, err = rest.Fetch(client, href); err != nil { + if records, err = rest.FetchSome(client, href, 1, DefaultBatchSize); err != nil { return clusterTime, err } if len(records) == 0 { - return clusterTime, errs.New(errs.ErrConfig, " date not found on cluster") + return clusterTime, errs.New(errs.ErrConfig, "date not found on cluster") } for _, instanceData := range records { diff --git a/cmd/collectors/ems/ems.go b/cmd/collectors/ems/ems.go index 3fb200089..d210e6c2f 100644 --- a/cmd/collectors/ems/ems.go +++ b/cmd/collectors/ems/ems.go @@ -294,6 +294,7 @@ func (e *Ems) PollInstance() (map[string]*matrix.Matrix, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). ReturnTimeout(e.ReturnTimeOut). Build() @@ -436,6 +437,7 @@ func (e *Ems) getHref(names []string, filter []string) string { APIPath(e.Query). Fields(e.Fields). Filter(filter). + MaxRecords(collectors.DefaultBatchSize). ReturnTimeout(e.ReturnTimeOut). Build() return href diff --git a/cmd/collectors/power.go b/cmd/collectors/power.go index 75efd68c2..20d616c69 100644 --- a/cmd/collectors/power.go +++ b/cmd/collectors/power.go @@ -33,9 +33,10 @@ func collectChassisFRU(client *rest.Client, logger *slog.Logger) (map[string]int APIPath(query). Fields(fields). Filter(filter). + MaxRecords(DefaultBatchSize). Build() - result, err := rest.Fetch(client, href) + result, err := rest.FetchAll(client, href) if err != nil { return nil, fmt.Errorf("failed to fetch data href=%s err=%w", href, err) } diff --git a/cmd/collectors/rest/plugins/aggregate/aggregate.go b/cmd/collectors/rest/plugins/aggregate/aggregate.go index f8cddadf9..8ede0b186 100644 --- a/cmd/collectors/rest/plugins/aggregate/aggregate.go +++ b/cmd/collectors/rest/plugins/aggregate/aggregate.go @@ -172,6 +172,7 @@ func (a *Aggregate) getObjectStoreData() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(apiQuery). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{`tier_name=!" "|""`}). Build() diff --git a/cmd/collectors/rest/plugins/certificate/certificate.go b/cmd/collectors/rest/plugins/certificate/certificate.go index 21689c853..9e17a8b05 100644 --- a/cmd/collectors/rest/plugins/certificate/certificate.go +++ b/cmd/collectors/rest/plugins/certificate/certificate.go @@ -195,6 +195,7 @@ func (c *Certificate) GetAdminVserver() (string, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields([]string{"type"}). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"type=admin"}). Build() @@ -221,6 +222,7 @@ func (c *Certificate) GetSecuritySsl(adminSvm string) (string, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields([]string{"serial"}). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"vserver=" + adminSvm}). Build() diff --git a/cmd/collectors/rest/plugins/health/health.go b/cmd/collectors/rest/plugins/health/health.go index 47e30e5f5..2c641b46f 100644 --- a/cmd/collectors/rest/plugins/health/health.go +++ b/cmd/collectors/rest/plugins/health/health.go @@ -640,6 +640,7 @@ func (h *Health) getDisks() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"container_type=broken|unassigned"}). Build() @@ -652,6 +653,7 @@ func (h *Health) getShelves() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() return collectors.InvokeRestCall(h.client, href, h.SLogger) @@ -663,6 +665,7 @@ func (h *Health) getNodes() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"health=false"}). Build() @@ -675,6 +678,7 @@ func (h *Health) getHADown() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"possible=!true"}). Build() @@ -685,6 +689,7 @@ func (h *Health) getRansomwareVolumes() ([]gjson.Result, error) { query := "api/storage/volumes" href := rest.NewHrefBuilder(). APIPath(query). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"anti_ransomware.state=enabled", "anti_ransomware.attack_probability=low|moderate|high"}). Build() @@ -697,6 +702,7 @@ func (h *Health) getNonCompliantLicense() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"state=noncompliant"}). Build() @@ -709,6 +715,7 @@ func (h *Health) getMoveFailedVolumes() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"movement.state=cutover_wait|failed|cutover_pending"}). Build() @@ -720,6 +727,7 @@ func (h *Health) getNonHomeLIFs() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields([]string{"svm", "location"}). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"location.is_home=false"}). Build() @@ -732,6 +740,7 @@ func (h *Health) getFCPorts() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"enabled=true", "state=offlined_by_system"}). Build() @@ -744,6 +753,7 @@ func (h *Health) getEthernetPorts() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"enabled=true", "state=down"}). Build() @@ -754,6 +764,7 @@ func (h *Health) getSupportAlerts(filter []string) ([]gjson.Result, error) { query := "api/private/support/alerts" href := rest.NewHrefBuilder(). APIPath(query). + MaxRecords(collectors.DefaultBatchSize). Filter(filter). Build() diff --git a/cmd/collectors/rest/plugins/ontaps3service/ontaps3service.go b/cmd/collectors/rest/plugins/ontaps3service/ontaps3service.go index 058a3577b..691119dfd 100644 --- a/cmd/collectors/rest/plugins/ontaps3service/ontaps3service.go +++ b/cmd/collectors/rest/plugins/ontaps3service/ontaps3service.go @@ -73,6 +73,7 @@ func (o *OntapS3Service) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matri href := rest.NewHrefBuilder(). APIPath(o.query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() if result, err = collectors.InvokeRestCall(o.client, href, o.SLogger); err != nil { diff --git a/cmd/collectors/rest/plugins/securityaccount/securityaccount.go b/cmd/collectors/rest/plugins/securityaccount/securityaccount.go index b0d9f54a4..e8cd71767 100644 --- a/cmd/collectors/rest/plugins/securityaccount/securityaccount.go +++ b/cmd/collectors/rest/plugins/securityaccount/securityaccount.go @@ -66,6 +66,7 @@ func (s *SecurityAccount) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matr href := rest.NewHrefBuilder(). APIPath(s.query). Fields([]string{"applications"}). + MaxRecords(collectors.DefaultBatchSize). Build() s.client.Metadata.Reset() diff --git a/cmd/collectors/rest/plugins/snapmirror/snapmirror.go b/cmd/collectors/rest/plugins/snapmirror/snapmirror.go index 433aee278..73e529983 100644 --- a/cmd/collectors/rest/plugins/snapmirror/snapmirror.go +++ b/cmd/collectors/rest/plugins/snapmirror/snapmirror.go @@ -124,10 +124,11 @@ func (m *SnapMirror) getSVMPeerData(cluster string) error { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"peer.cluster.name=!" + cluster}). Build() - result, err := rest.Fetch(m.client, href) + result, err := rest.FetchAll(m.client, href) if err != nil { m.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href)) return err @@ -155,9 +156,10 @@ func (m *SnapMirror) getClusterPeerData() error { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() - result, err := rest.Fetch(m.client, href) + result, err := rest.FetchAll(m.client, href) if err != nil { m.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href)) return err diff --git a/cmd/collectors/rest/plugins/svm/svm.go b/cmd/collectors/rest/plugins/svm/svm.go index ae02c15cd..34c5b62b7 100644 --- a/cmd/collectors/rest/plugins/svm/svm.go +++ b/cmd/collectors/rest/plugins/svm/svm.go @@ -216,6 +216,7 @@ func (s *SVM) GetKerberosConfig() (map[string]string, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil { @@ -249,6 +250,7 @@ func (s *SVM) GetFpolicy() (map[string]Fpolicy, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil { @@ -283,6 +285,7 @@ func (s *SVM) GetIscsiServices() (map[string]string, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil { @@ -316,6 +319,7 @@ func (s *SVM) GetIscsiCredentials() (map[string]string, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() if result, err = collectors.InvokeRestCall(s.client, href, s.SLogger); err != nil { diff --git a/cmd/collectors/rest/plugins/volume/volume.go b/cmd/collectors/rest/plugins/volume/volume.go index c64135f4a..290001719 100644 --- a/cmd/collectors/rest/plugins/volume/volume.go +++ b/cmd/collectors/rest/plugins/volume/volume.go @@ -259,6 +259,7 @@ func (v *Volume) getEncryptedDisks() ([]gjson.Result, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"protection_mode=!data|full"}). Build() @@ -295,6 +296,7 @@ func (v *Volume) getVolume(field string, fields []string, volumeMap map[string]v href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{field}). Build() diff --git a/cmd/collectors/rest/plugins/volumeanalytics/volumeanalytics.go b/cmd/collectors/rest/plugins/volumeanalytics/volumeanalytics.go index 6b5edf5ca..6d0182f7e 100644 --- a/cmd/collectors/rest/plugins/volumeanalytics/volumeanalytics.go +++ b/cmd/collectors/rest/plugins/volumeanalytics/volumeanalytics.go @@ -18,7 +18,7 @@ import ( const explorer = "volume_analytics" -var MaxDirCollectCount = 100 +var maxDirCollectCount = "100" type VolumeAnalytics struct { *plugin.AbstractPlugin @@ -52,11 +52,11 @@ func (v *VolumeAnalytics) Init() error { m := v.Params.GetChildS("MaxDirectoryCount") if m != nil { count := m.GetContentS() - i, err := strconv.Atoi(count) + _, err := strconv.Atoi(count) if err != nil { v.SLogger.Warn("using default", slog.String("MaxDirectoryCount", count)) } else { - MaxDirCollectCount = i + maxDirCollectCount = count } } @@ -70,7 +70,7 @@ func (v *VolumeAnalytics) Init() error { return err } - // Assigned the value to currentVal so that plugin would be invoked first time to populate cache. + // Assigned the value to currentVal so that plugin would be invoked the first time to populate cache. v.currentVal = v.SetPluginInterval() return nil } @@ -310,7 +310,7 @@ func (v *VolumeAnalytics) getAnalyticsData(instanceID string) ([]gjson.Result, g APIPath(query). Fields(fields). Filter([]string{"order_by=analytics.bytes_used+desc", "type=directory"}). - MaxRecords(&MaxDirCollectCount). + MaxRecords(maxDirCollectCount). Build() if result, analytics, err = rest.FetchAnalytics(v.client, href); err != nil { return nil, gjson.Result{}, err diff --git a/cmd/collectors/rest/rest.go b/cmd/collectors/rest/rest.go index 616affc2f..ab5564d7c 100644 --- a/cmd/collectors/rest/rest.go +++ b/cmd/collectors/rest/rest.go @@ -54,6 +54,7 @@ type Rest struct { Prop *prop endpoints []*EndPoint isIgnoreUnknownFieldsEnabled bool + BatchSize string } type EndPoint struct { @@ -323,7 +324,7 @@ func getFieldName(source string, parent string) []string { func (r *Rest) PollCounter() (map[string]*matrix.Matrix, error) { startTime := time.Now() - // Update the cluster info to track if customer version is updated + // Update the cluster info to track if ONTAP version is updated err := r.Client.UpdateClusterInfo(5) if err != nil { return nil, err @@ -356,6 +357,7 @@ func (r *Rest) updateHref() { Fields(r.Fields(r.Prop)). HiddenFields(r.Prop.HiddenFields). Filter(r.Prop.Filter). + MaxRecords(r.BatchSize). ReturnTimeout(r.Prop.ReturnTimeOut). IsIgnoreUnknownFieldsEnabled(r.isIgnoreUnknownFieldsEnabled). Build() @@ -366,6 +368,7 @@ func (r *Rest) updateHref() { Fields(r.Fields(e.prop)). HiddenFields(e.prop.HiddenFields). Filter(r.filter(e)). + MaxRecords(r.BatchSize). ReturnTimeout(r.Prop.ReturnTimeOut). IsIgnoreUnknownFieldsEnabled(r.isIgnoreUnknownFieldsEnabled). Build() @@ -677,7 +680,7 @@ func (r *Rest) GetRestData(href string) ([]gjson.Result, error) { return nil, errs.New(errs.ErrConfig, "empty url") } - result, err := rest.Fetch(r.Client, href) + result, err := rest.FetchAll(r.Client, href) if err != nil { return r.handleError(err) } @@ -783,6 +786,7 @@ func (r *Rest) getNodeUuids() ([]collector.ID, error) { href := rest.NewHrefBuilder(). APIPath(query). Fields([]string{"serial_number", "system_id"}). + MaxRecords(collectors.DefaultBatchSize). ReturnTimeout(r.Prop.ReturnTimeOut). Build() diff --git a/cmd/collectors/rest/templating.go b/cmd/collectors/rest/templating.go index 5b2cc9506..fc68a5dd4 100644 --- a/cmd/collectors/rest/templating.go +++ b/cmd/collectors/rest/templating.go @@ -2,6 +2,7 @@ package rest import ( "fmt" + "github.com/netapp/harvest/v2/cmd/collectors" "github.com/netapp/harvest/v2/pkg/errs" "github.com/netapp/harvest/v2/pkg/tree/node" "github.com/netapp/harvest/v2/pkg/util" @@ -61,7 +62,16 @@ func (r *Rest) InitCache() error { } } - // private end point do not support * as fields. We need to pass fields in endpoint + if b := r.Params.GetChildContentS("batch_size"); b != "" { + if _, err := strconv.Atoi(b); err == nil { + r.BatchSize = b + } + } + if r.BatchSize == "" && r.Params.GetChildContentS("no_max_records") != "true" { + r.BatchSize = collectors.DefaultBatchSize + } + + // Private end points do not support * as fields. We need to pass fields in endpoint query := r.Params.GetChildS("query") r.Prop.IsPublic = true if query != nil { diff --git a/cmd/collectors/restperf/plugins/disk/disk.go b/cmd/collectors/restperf/plugins/disk/disk.go index 3e5a2abdf..429ffa2d9 100644 --- a/cmd/collectors/restperf/plugins/disk/disk.go +++ b/cmd/collectors/restperf/plugins/disk/disk.go @@ -2,6 +2,7 @@ package disk import ( "context" + "github.com/netapp/harvest/v2/cmd/collectors" "github.com/netapp/harvest/v2/cmd/poller/plugin" "github.com/netapp/harvest/v2/cmd/tools/rest" "github.com/netapp/harvest/v2/pkg/conf" @@ -208,9 +209,10 @@ func (d *Disk) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, *util.M href := rest.NewHrefBuilder(). APIPath(d.query). Fields([]string{"*"}). + MaxRecords(collectors.DefaultBatchSize). Build() - records, err := rest.Fetch(d.client, href) + records, err := rest.FetchAll(d.client, href) if err != nil { d.SLogger.Error("Failed to fetch shelfData", slog.Any("err", err), slog.String("href", href)) return nil, nil, err @@ -531,10 +533,11 @@ func (d *Disk) getDisks() error { href := rest.NewHrefBuilder(). APIPath(query). + MaxRecords(collectors.DefaultBatchSize). Fields([]string{"name", "uid", "shelf.uid", "type", "aggregates"}). Build() - records, err := rest.Fetch(d.client, href) + records, err := rest.FetchAll(d.client, href) if err != nil { d.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href)) return err @@ -589,10 +592,11 @@ func (d *Disk) getAggregates() error { href := rest.NewHrefBuilder(). APIPath(query). + MaxRecords(collectors.DefaultBatchSize). Fields([]string{"aggregate", "composite", "node", "uses_shared_disks", "storage_type"}). Build() - records, err := rest.Fetch(d.client, href) + records, err := rest.FetchAll(d.client, href) if err != nil { d.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href)) return err diff --git a/cmd/collectors/restperf/plugins/fcvi/fcvi.go b/cmd/collectors/restperf/plugins/fcvi/fcvi.go index c8da1915b..8f75b6e73 100644 --- a/cmd/collectors/restperf/plugins/fcvi/fcvi.go +++ b/cmd/collectors/restperf/plugins/fcvi/fcvi.go @@ -1,6 +1,7 @@ package fcvi import ( + "github.com/netapp/harvest/v2/cmd/collectors" "github.com/netapp/harvest/v2/cmd/poller/plugin" "github.com/netapp/harvest/v2/cmd/tools/rest" "github.com/netapp/harvest/v2/pkg/conf" @@ -43,8 +44,9 @@ func (f *FCVI) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, *util.M href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() - records, err := rest.Fetch(f.client, href) + records, err := rest.FetchAll(f.client, href) if err != nil { f.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href)) return nil, nil, err diff --git a/cmd/collectors/restperf/plugins/nic/nic.go b/cmd/collectors/restperf/plugins/nic/nic.go index 6a0c14508..fbaca404c 100644 --- a/cmd/collectors/restperf/plugins/nic/nic.go +++ b/cmd/collectors/restperf/plugins/nic/nic.go @@ -237,6 +237,7 @@ func (n *Nic) getIfgroupInfo() map[string]string { href := rest.NewHrefBuilder(). APIPath(query). Fields(fields). + MaxRecords(collectors.DefaultBatchSize). Build() if ifgroupsData, err = collectors.InvokeRestCallWithTestFile(n.client, href, n.SLogger, n.testFilePath); err != nil { diff --git a/cmd/collectors/restperf/plugins/volumetag/volumetag.go b/cmd/collectors/restperf/plugins/volumetag/volumetag.go index 0fe499ba8..beb9875d8 100644 --- a/cmd/collectors/restperf/plugins/volumetag/volumetag.go +++ b/cmd/collectors/restperf/plugins/volumetag/volumetag.go @@ -1,6 +1,7 @@ package volumetag import ( + "github.com/netapp/harvest/v2/cmd/collectors" "github.com/netapp/harvest/v2/cmd/poller/plugin" "github.com/netapp/harvest/v2/cmd/tools/rest" "github.com/netapp/harvest/v2/pkg/conf" @@ -47,10 +48,11 @@ func (v *VolumeTag) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, *u href := rest.NewHrefBuilder(). APIPath(query). + MaxRecords(collectors.DefaultBatchSize). Fields([]string{"comment"}). Build() - records, err := rest.Fetch(v.client, href) + records, err := rest.FetchAll(v.client, href) if err != nil { v.SLogger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href)) return nil, nil, err diff --git a/cmd/collectors/restperf/plugins/volumetopclients/volumetopclients.go b/cmd/collectors/restperf/plugins/volumetopclients/volumetopclients.go index d229ff9bf..6bafaddc1 100644 --- a/cmd/collectors/restperf/plugins/volumetopclients/volumetopclients.go +++ b/cmd/collectors/restperf/plugins/volumetopclients/volumetopclients.go @@ -362,6 +362,7 @@ func (t *TopClients) fetchVolumesWithActivityTrackingEnabled() (*set.Set, error) href := rest.NewHrefBuilder(). APIPath(query). Fields([]string{"svm.name", "name"}). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"activity_tracking.state=on"}). Build() @@ -389,6 +390,7 @@ func (t *TopClients) fetchTopClients(volumes *set.Set, svms *set.Set, metric str href := rest.NewHrefBuilder(). APIPath(query). Fields([]string{"client_ip", "svm", "volume.name", metric}). + MaxRecords(collectors.DefaultBatchSize). Filter([]string{"top_metric=" + metric, "volume=" + strings.Join(volumes.Values(), "|"), "svm=" + strings.Join(svms.Values(), "|")}). Build() diff --git a/cmd/collectors/restperf/restperf.go b/cmd/collectors/restperf/restperf.go index a08e533fd..8586405fc 100644 --- a/cmd/collectors/restperf/restperf.go +++ b/cmd/collectors/restperf/restperf.go @@ -263,6 +263,7 @@ func (r *RestPerf) PollCounter() (map[string]*matrix.Matrix, error) { href := rest.NewHrefBuilder(). APIPath(r.Prop.Query). + MaxRecords(r.BatchSize). ReturnTimeout(r.Prop.ReturnTimeOut). Build() r.Logger.Debug("", slog.String("href", href)) @@ -273,7 +274,7 @@ func (r *RestPerf) PollCounter() (map[string]*matrix.Matrix, error) { apiT := time.Now() r.Client.Metadata.Reset() - records, err = rest.Fetch(r.Client, href) + records, err = rest.FetchAll(r.Client, href) if err != nil { return r.handleError(err, href) } @@ -710,6 +711,7 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) { APIPath(dataQuery). Fields([]string{"*"}). Filter(filter). + MaxRecords(r.BatchSize). ReturnTimeout(r.Prop.ReturnTimeOut). Build() @@ -1338,7 +1340,7 @@ func (r *RestPerf) getParentOpsCounters(data *matrix.Matrix) error { return errs.New(errs.ErrConfig, "empty url") } - records, err = rest.Fetch(r.Client, href) + records, err = rest.FetchAll(r.Client, href) if err != nil { r.Logger.Error("Failed to fetch data", slog.Any("err", err), slog.String("href", href)) return err @@ -1453,6 +1455,7 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) { APIPath(dataQuery). Fields([]string{fields}). Filter(filter). + MaxRecords(r.BatchSize). ReturnTimeout(r.Prop.ReturnTimeOut). Build() @@ -1463,7 +1466,7 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) { apiT := time.Now() r.Client.Metadata.Reset() - records, err = rest.Fetch(r.Client, href) + records, err = rest.FetchAll(r.Client, href) if err != nil { return r.handleError(err, href) } diff --git a/cmd/tools/generate/counter.go b/cmd/tools/generate/counter.go index 5ca7431b1..ffc86354b 100644 --- a/cmd/tools/generate/counter.go +++ b/cmd/tools/generate/counter.go @@ -979,7 +979,7 @@ func processRestPerfCounters(path string, client *rest.Client) map[string]Counte href := rest.NewHrefBuilder(). APIPath(model.Query). Build() - records, err = rest.Fetch(client, href) + records, err = rest.FetchAll(client, href) if err != nil { fmt.Printf("error while invoking api %+v\n", err) return nil diff --git a/cmd/tools/rest/href_builder.go b/cmd/tools/rest/href_builder.go index 50b3b797b..a1e15738f 100644 --- a/cmd/tools/rest/href_builder.go +++ b/cmd/tools/rest/href_builder.go @@ -17,7 +17,7 @@ type HrefBuilder struct { filter []string queryFields string queryValue string - maxRecords *int + maxRecords string returnTimeout *int isIgnoreUnknownFieldsEnabled bool } @@ -61,7 +61,7 @@ func (b *HrefBuilder) QueryValue(queryValue string) *HrefBuilder { return b } -func (b *HrefBuilder) MaxRecords(maxRecords *int) *HrefBuilder { +func (b *HrefBuilder) MaxRecords(maxRecords string) *HrefBuilder { b.maxRecords = maxRecords return b } @@ -119,13 +119,20 @@ func (b *HrefBuilder) Build() string { // Sort filters so that the href is deterministic slices.Sort(b.filter) + hasMaxRecords := false + for _, f := range b.filter { + if strings.Contains(f, "max_records") { + hasMaxRecords = true + } addArg(&href, "&", f) } addArg(&href, "&query_fields=", b.queryFields) addArg(&href, "&query=", b.queryValue) - if b.maxRecords != nil { - addArg(&href, "&max_records=", strconv.Itoa(*b.maxRecords)) + + // Only add max_records if a filter has not already added it + if !hasMaxRecords && b.maxRecords != "" { + addArg(&href, "&max_records=", b.maxRecords) } if b.returnTimeout != nil { addArg(&href, "&return_timeout=", strconv.Itoa(*b.returnTimeout)) diff --git a/cmd/tools/rest/rest.go b/cmd/tools/rest/rest.go index f237ebebd..b2a7658f8 100644 --- a/cmd/tools/rest/rest.go +++ b/cmd/tools/rest/rest.go @@ -197,11 +197,11 @@ func fetchData(poller *conf.Poller, timeout time.Duration) (*Results, error) { QueryValue(args.QueryValue) if args.MaxRecords != "" { - maxRecords, err := strconv.Atoi(args.MaxRecords) + _, err := strconv.Atoi(args.MaxRecords) if err != nil { return nil, fmt.Errorf("--max-records should be numeric %s", args.MaxRecords) } - hrefBuilder.MaxRecords(&maxRecords) + hrefBuilder.MaxRecords(args.MaxRecords) } href := hrefBuilder.Build() @@ -406,30 +406,16 @@ func FetchForCli(client *Client, href string, records *[]any, downloadAll bool, return nil } -// Fetch collects all records -func Fetch(client *Client, href string) ([]gjson.Result, error) { +// FetchAll collects all records. +// If you want to limit the number of records returned, use FetchSome. +func FetchAll(client *Client, href string) ([]gjson.Result, error) { var ( records []gjson.Result result []gjson.Result err error ) - downloadAll := true - maxRecords := 0 - if strings.Contains(href, "max_records") { - mr, err := util.GetQueryParam(href, "max_records") - if err != nil { - return nil, err - } - if mr != "" { - mri, err := strconv.Atoi(mr) - if err != nil { - return nil, err - } - maxRecords = mri - downloadAll = maxRecords == 0 - } - } - err = fetch(client, href, &records, downloadAll, int64(maxRecords)) + + err = fetchAll(client, href, &records) if err != nil { return nil, err } @@ -477,7 +463,7 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result, return result, *analytics, nil } -func fetch(client *Client, href string, records *[]gjson.Result, downloadAll bool, maxRecords int64) error { +func fetchAll(client *Client, href string, records *[]gjson.Result) error { getRest, err := client.GetRest(href) if err != nil { return fmt.Errorf("error making request %w", err) @@ -500,23 +486,103 @@ func fetch(client *Client, href string, records *[]gjson.Result, downloadAll boo // extract returned records since paginated records need to be merged into a single lists if numRecords.Exists() && numRecords.Int() > 0 { *records = append(*records, data) - if !downloadAll { - maxRecords -= numRecords.Int() - if maxRecords <= 0 { + } + + // If all results are desired and there is a next link, follow it + if next.Exists() { + nextLink := next.String() + if nextLink != "" { + if nextLink == href { + // nextLink is the same as the previous link, no progress is being made, exit return nil } + err := fetchAll(client, nextLink, records) + if err != nil { + return err + } } } + } + return nil +} - // If all results are desired and there is a next link, follow it - if next.Exists() && downloadAll { +// FetchSome collects at most recordsWanted records, following pagination links as needed. +// Use batchSize to limit the number of records returned in a single response. +// If recordsWanted is -1, all records are collected. +func FetchSome(client *Client, href string, recordsWanted int, batchSize string) ([]gjson.Result, error) { + var ( + records []gjson.Result + result []gjson.Result + err error + ) + + // Set max_records to batchSize to limit the number of records returned in a single response. + // If recordsWanted is < batchSize, set batchSize to recordsWanted. + batch, _ := strconv.Atoi(batchSize) + if recordsWanted < batch { + batch = recordsWanted + } + + u, err := url.Parse(href) + if err != nil { + return nil, err + } + q := u.Query() + q.Set("max_records", strconv.Itoa(batch)) + encoded := q.Encode() + u.RawQuery = encoded + href = u.String() + + err = fetchLimit(client, href, &records, recordsWanted) + if err != nil { + return nil, err + } + for _, r := range records { + result = append(result, r.Array()...) + } + return result, nil +} + +func fetchLimit(client *Client, href string, records *[]gjson.Result, recordsWanted int) error { + getRest, err := client.GetRest(href) + if err != nil { + return fmt.Errorf("error making request %w", err) + } + + output := gjson.ParseBytes(getRest) + data := output.Get("records") + numRecords := output.Get("num_records") + next := output.Get("_links.next.href") + + if !data.Exists() { + contentJSON := `{"records":[]}` + response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", getRest) + if err != nil { + return fmt.Errorf("error setting record %w", err) + } + value := gjson.GetBytes(response, "records") + *records = append(*records, value) + } else { + // extract returned records since paginated records need to be merged into a single lists + if numRecords.Exists() && numRecords.Int() > 0 { + *records = append(*records, data) + if recordsWanted != -1 { + recordsWanted -= int(numRecords.Int()) + if recordsWanted <= 0 { + return nil + } + } + } + + // Follow the next link + if next.Exists() { nextLink := next.String() if nextLink != "" { if nextLink == href { - // nextLink is same as previous link, no progress is being made, exit + // nextLink is the same as the previous link, no progress is being made, exit return nil } - err := fetch(client, nextLink, records, downloadAll, maxRecords) + err := fetchLimit(client, nextLink, records, recordsWanted) if err != nil { return err } diff --git a/integration/go.sum b/integration/go.sum index d9b6489aa..e44fa1391 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -1,5 +1,3 @@ -github.com/carlmjohnson/requests v0.23.5 h1:NPANcAofwwSuC6SIMwlgmHry2V3pLrSqRiSBKYbNHHA= -github.com/carlmjohnson/requests v0.23.5/go.mod h1:zG9P28thdRnN61aD7iECFhH5iGGKX2jIjKQD9kqYH+o= github.com/carlmjohnson/requests v0.24.2 h1:JDakhAmTIKL/qL/1P7Kkc2INGBJIkIFP6xUeUmPzLso= github.com/carlmjohnson/requests v0.24.2/go.mod h1:duYA/jDnyZ6f3xbcF5PpZ9N8clgopubP2nK5i6MVMhU= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -28,8 +26,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae h1:dIZY4ULFcto4tAFlj1FYZl8ztUZ13bdq+PLY+NOfbyI= -github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 h1:7UMa6KCCMjZEMDtTVdcGu0B1GmmC7QJKiCCjyTAWQy0= github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -51,8 +47,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= -github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -66,8 +60,6 @@ github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYg github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=