From 5c142bf0584ff802baf003221a9f988fc3ecf822 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Thu, 12 Oct 2023 09:43:00 -0400 Subject: [PATCH] Refactor snapshots collector (#789) - Move metric Desc to vars to aid in unused linter checks - Use new Collector interface - Add a util getURL func. Similar funcs are in most of the collectors. The only difference is typically what struct to unmarshal the JSON into. This func just returns []byte so that the caller can handle the unique data structure. This does not adjust the labels on the metrics. I want that to be handled separately to make sure the refactor itself matches existing behavior. Signed-off-by: Joe Adams --- CHANGELOG.md | 8 + collector/snapshots.go | 395 ++++++++++++++---------------------- collector/snapshots_test.go | 13 +- collector/util.go | 57 ++++++ main.go | 7 - 5 files changed, 227 insertions(+), 253 deletions(-) create mode 100644 collector/util.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cb10b4a..6d9af75a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## master / unreleased + +BREAKING CHANGES: + +The flag `--es.snapshots` has been renamed to `--collector.snapshots`. + +* [CHANGE] Rename --es.snapshots to --collector.snapshots #XXX + ## 1.6.0 / 2023-06-22 BREAKING CHANGES: diff --git a/collector/snapshots.go b/collector/snapshots.go index 9fc15216..cb6a894a 100644 --- a/collector/snapshots.go +++ b/collector/snapshots.go @@ -14,32 +14,17 @@ package collector import ( + "context" "encoding/json" "fmt" - "io" "net/http" "net/url" "path" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) -type snapshotMetric struct { - Type prometheus.ValueType - Desc *prometheus.Desc - Value func(snapshotStats SnapshotStatDataResponse) float64 - Labels func(repositoryName string, snapshotStats SnapshotStatDataResponse) []string -} - -type repositoryMetric struct { - Type prometheus.ValueType - Desc *prometheus.Desc - Value func(snapshotsStats SnapshotStatsResponse) float64 - Labels func(repositoryName string) []string -} - var ( defaultSnapshotLabels = []string{"repository", "state", "version"} defaultSnapshotLabelValues = func(repositoryName string, snapshotStats SnapshotStatDataResponse) []string { @@ -51,261 +36,195 @@ var ( } ) +var ( + numIndices = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_indices"), + "Number of indices in the last snapshot", + defaultSnapshotLabels, nil, + ) + snapshotStartTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_start_time_timestamp"), + "Last snapshot start timestamp", + defaultSnapshotLabels, nil, + ) + snapshotEndTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_end_time_timestamp"), + "Last snapshot end timestamp", + defaultSnapshotLabels, nil, + ) + snapshotNumFailures = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_failures"), + "Last snapshot number of failures", + defaultSnapshotLabels, nil, + ) + snapshotNumShards = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_total_shards"), + "Last snapshot total shards", + defaultSnapshotLabels, nil, + ) + snapshotFailedShards = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_failed_shards"), + "Last snapshot failed shards", + defaultSnapshotLabels, nil, + ) + snapshotSuccessfulShards = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_successful_shards"), + "Last snapshot successful shards", + defaultSnapshotLabels, nil, + ) + + numSnapshots = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "number_of_snapshots"), + "Number of snapshots in a repository", + defaultSnapshotRepositoryLabels, nil, + ) + oldestSnapshotTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "oldest_snapshot_timestamp"), + "Timestamp of the oldest snapshot", + defaultSnapshotRepositoryLabels, nil, + ) + latestSnapshotTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "latest_snapshot_timestamp_seconds"), + "Timestamp of the latest SUCCESS or PARTIAL snapshot", + defaultSnapshotRepositoryLabels, nil, + ) +) + +func init() { + registerCollector("snapshots", defaultDisabled, NewSnapshots) +} + // Snapshots information struct type Snapshots struct { logger log.Logger - client *http.Client - url *url.URL - - snapshotMetrics []*snapshotMetric - repositoryMetrics []*repositoryMetric + hc *http.Client + u *url.URL } // NewSnapshots defines Snapshots Prometheus metrics -func NewSnapshots(logger log.Logger, client *http.Client, url *url.URL) *Snapshots { +func NewSnapshots(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { return &Snapshots{ logger: logger, - client: client, - url: url, - - snapshotMetrics: []*snapshotMetric{ - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_indices"), - "Number of indices in the last snapshot", - defaultSnapshotLabels, nil, - ), - Value: func(snapshotStats SnapshotStatDataResponse) float64 { - return float64(len(snapshotStats.Indices)) - }, - Labels: defaultSnapshotLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_start_time_timestamp"), - "Last snapshot start timestamp", - defaultSnapshotLabels, nil, - ), - Value: func(snapshotStats SnapshotStatDataResponse) float64 { - return float64(snapshotStats.StartTimeInMillis / 1000) - }, - Labels: defaultSnapshotLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_end_time_timestamp"), - "Last snapshot end timestamp", - defaultSnapshotLabels, nil, - ), - Value: func(snapshotStats SnapshotStatDataResponse) float64 { - return float64(snapshotStats.EndTimeInMillis / 1000) - }, - Labels: defaultSnapshotLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_failures"), - "Last snapshot number of failures", - defaultSnapshotLabels, nil, - ), - Value: func(snapshotStats SnapshotStatDataResponse) float64 { - return float64(len(snapshotStats.Failures)) - }, - Labels: defaultSnapshotLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_total_shards"), - "Last snapshot total shards", - defaultSnapshotLabels, nil, - ), - Value: func(snapshotStats SnapshotStatDataResponse) float64 { - return float64(snapshotStats.Shards.Total) - }, - Labels: defaultSnapshotLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_failed_shards"), - "Last snapshot failed shards", - defaultSnapshotLabels, nil, - ), - Value: func(snapshotStats SnapshotStatDataResponse) float64 { - return float64(snapshotStats.Shards.Failed) - }, - Labels: defaultSnapshotLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_successful_shards"), - "Last snapshot successful shards", - defaultSnapshotLabels, nil, - ), - Value: func(snapshotStats SnapshotStatDataResponse) float64 { - return float64(snapshotStats.Shards.Successful) - }, - Labels: defaultSnapshotLabelValues, - }, - }, - repositoryMetrics: []*repositoryMetric{ - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "number_of_snapshots"), - "Number of snapshots in a repository", - defaultSnapshotRepositoryLabels, nil, - ), - Value: func(snapshotsStats SnapshotStatsResponse) float64 { - return float64(len(snapshotsStats.Snapshots)) - }, - Labels: defaultSnapshotRepositoryLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "oldest_snapshot_timestamp"), - "Timestamp of the oldest snapshot", - defaultSnapshotRepositoryLabels, nil, - ), - Value: func(snapshotsStats SnapshotStatsResponse) float64 { - if len(snapshotsStats.Snapshots) == 0 { - return 0 - } - return float64(snapshotsStats.Snapshots[0].StartTimeInMillis / 1000) - }, - Labels: defaultSnapshotRepositoryLabelValues, - }, - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "latest_snapshot_timestamp_seconds"), - "Timestamp of the latest SUCCESS or PARTIAL snapshot", - defaultSnapshotRepositoryLabels, nil, - ), - Value: func(snapshotsStats SnapshotStatsResponse) float64 { - for i := len(snapshotsStats.Snapshots) - 1; i >= 0; i-- { - var snap = snapshotsStats.Snapshots[i] - if snap.State == "SUCCESS" || snap.State == "PARTIAL" { - return float64(snap.StartTimeInMillis / 1000) - } - } - return 0 - }, - Labels: defaultSnapshotRepositoryLabelValues, - }, - }, - } + u: u, + hc: hc, + }, nil } -// Describe add Snapshots metrics descriptions -func (s *Snapshots) Describe(ch chan<- *prometheus.Desc) { - for _, metric := range s.snapshotMetrics { - ch <- metric.Desc - } - for _, metric := range s.repositoryMetrics { - ch <- metric.Desc - } - -} - -func (s *Snapshots) getAndParseURL(u *url.URL, data interface{}) error { - res, err := s.client.Get(u.String()) - if err != nil { - return fmt.Errorf("failed to get from %s://%s:%s%s: %s", - u.Scheme, u.Hostname(), u.Port(), u.Path, err) - } - - defer func() { - err = res.Body.Close() - if err != nil { - level.Warn(s.logger).Log( - "msg", "failed to close http.Client", - "err", err, - ) - } - }() - - if res.StatusCode != http.StatusOK { - return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) - } +func (c *Snapshots) Update(ctx context.Context, ch chan<- prometheus.Metric) error { + // indices + snapshotsStatsResp := make(map[string]SnapshotStatsResponse) + u := c.u.ResolveReference(&url.URL{Path: "/_snapshot"}) - bts, err := io.ReadAll(res.Body) + var srr SnapshotRepositoriesResponse + resp, err := getURL(ctx, c.hc, c.logger, u.String()) if err != nil { return err } - if err := json.Unmarshal(bts, data); err != nil { - return err - } - return nil -} - -func (s *Snapshots) fetchAndDecodeSnapshotsStats() (map[string]SnapshotStatsResponse, error) { - mssr := make(map[string]SnapshotStatsResponse) - - u := *s.url - u.Path = path.Join(u.Path, "/_snapshot") - var srr SnapshotRepositoriesResponse - err := s.getAndParseURL(&u, &srr) + err = json.Unmarshal(resp, &srr) if err != nil { - return nil, err + return fmt.Errorf("failed to unmarshal JSON: %v", err) } + for repository := range srr { - u := *s.url - u.Path = path.Join(u.Path, "/_snapshot", repository, "/_all") + pathPart := path.Join("/_snapshot", repository, "/_all") + u := c.u.ResolveReference(&url.URL{Path: pathPart}) var ssr SnapshotStatsResponse - err := s.getAndParseURL(&u, &ssr) + resp, err := getURL(ctx, c.hc, c.logger, u.String()) if err != nil { continue } - mssr[repository] = ssr + err = json.Unmarshal(resp, &ssr) + if err != nil { + return fmt.Errorf("failed to unmarshal JSON: %v", err) + } + snapshotsStatsResp[repository] = ssr } - return mssr, nil -} + // Snapshots stats + for repositoryName, snapshotStats := range snapshotsStatsResp { -// Collect gets Snapshots metric values -func (s *Snapshots) Collect(ch chan<- prometheus.Metric) { + ch <- prometheus.MustNewConstMetric( + numSnapshots, + prometheus.GaugeValue, + float64(len(snapshotStats.Snapshots)), + defaultSnapshotRepositoryLabelValues(repositoryName)..., + ) - // indices - snapshotsStatsResp, err := s.fetchAndDecodeSnapshotsStats() - if err != nil { - level.Warn(s.logger).Log( - "msg", "failed to fetch and decode snapshot stats", - "err", err, + oldest := float64(0) + if len(snapshotStats.Snapshots) > 0 { + oldest = float64(snapshotStats.Snapshots[0].StartTimeInMillis / 1000) + } + ch <- prometheus.MustNewConstMetric( + oldestSnapshotTimestamp, + prometheus.GaugeValue, + oldest, + defaultSnapshotRepositoryLabelValues(repositoryName)..., ) - return - } - // Snapshots stats - for repositoryName, snapshotStats := range snapshotsStatsResp { - for _, metric := range s.repositoryMetrics { - ch <- prometheus.MustNewConstMetric( - metric.Desc, - metric.Type, - metric.Value(snapshotStats), - metric.Labels(repositoryName)..., - ) + latest := float64(0) + for i := len(snapshotStats.Snapshots) - 1; i >= 0; i-- { + var snap = snapshotStats.Snapshots[i] + if snap.State == "SUCCESS" || snap.State == "PARTIAL" { + latest = float64(snap.StartTimeInMillis / 1000) + break + } } + ch <- prometheus.MustNewConstMetric( + latestSnapshotTimestamp, + prometheus.GaugeValue, + latest, + defaultSnapshotRepositoryLabelValues(repositoryName)..., + ) + if len(snapshotStats.Snapshots) == 0 { continue } lastSnapshot := snapshotStats.Snapshots[len(snapshotStats.Snapshots)-1] - for _, metric := range s.snapshotMetrics { - ch <- prometheus.MustNewConstMetric( - metric.Desc, - metric.Type, - metric.Value(lastSnapshot), - metric.Labels(repositoryName, lastSnapshot)..., - ) - } + ch <- prometheus.MustNewConstMetric( + numIndices, + prometheus.GaugeValue, + float64(len(lastSnapshot.Indices)), + defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., + ) + ch <- prometheus.MustNewConstMetric( + snapshotStartTimestamp, + prometheus.GaugeValue, + float64(lastSnapshot.StartTimeInMillis/1000), + defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., + ) + ch <- prometheus.MustNewConstMetric( + snapshotEndTimestamp, + prometheus.GaugeValue, + float64(lastSnapshot.EndTimeInMillis/1000), + defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., + ) + ch <- prometheus.MustNewConstMetric( + snapshotNumFailures, + prometheus.GaugeValue, + float64(len(lastSnapshot.Failures)), + defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., + ) + ch <- prometheus.MustNewConstMetric( + snapshotNumShards, + prometheus.GaugeValue, + float64(lastSnapshot.Shards.Total), + defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., + ) + ch <- prometheus.MustNewConstMetric( + snapshotFailedShards, + prometheus.GaugeValue, + float64(lastSnapshot.Shards.Failed), + defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., + ) + ch <- prometheus.MustNewConstMetric( + snapshotSuccessfulShards, + prometheus.GaugeValue, + float64(lastSnapshot.Shards.Successful), + defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., + ) } + + return nil } diff --git a/collector/snapshots_test.go b/collector/snapshots_test.go index c097183e..a4f88b11 100644 --- a/collector/snapshots_test.go +++ b/collector/snapshots_test.go @@ -209,15 +209,12 @@ func TestSnapshots(t *testing.T) { t.Fatal(err) } - s := NewSnapshots(log.NewNopLogger(), http.DefaultClient, u) - - // TODO: Convert to collector interface - // c, err := NewSnapshots(log.NewNopLogger(), u, http.DefaultClient) - // if err != nil { - // t.Fatal(err) - // } + c, err := NewSnapshots(log.NewNopLogger(), u, http.DefaultClient) + if err != nil { + t.Fatal(err) + } - if err := testutil.CollectAndCompare(s, strings.NewReader(tt.want)); err != nil { + if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(tt.want)); err != nil { t.Fatal(err) } }) diff --git a/collector/util.go b/collector/util.go new file mode 100644 index 00000000..19c045cd --- /dev/null +++ b/collector/util.go @@ -0,0 +1,57 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "fmt" + "io" + "net/http" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +func getURL(ctx context.Context, hc *http.Client, log log.Logger, u string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, err + } + + resp, err := hc.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to get %s: %v", u, err) + } + + defer func() { + err = resp.Body.Close() + if err != nil { + level.Warn(log).Log( + "msg", "failed to close response body", + "err", err, + ) + } + }() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("HTTP Request failed with code %d", resp.StatusCode) + } + + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return b, nil +} diff --git a/main.go b/main.go index e50dd09e..c7db1def 100644 --- a/main.go +++ b/main.go @@ -83,9 +83,6 @@ func main() { esExportShards = kingpin.Flag("es.shards", "Export stats for shards in the cluster (implies --es.indices)."). Default("false").Bool() - esExportSnapshots = kingpin.Flag("es.snapshots", - "Export stats for the cluster snapshots."). - Default("false").Bool() esExportSLM = kingpin.Flag("es.slm", "Export stats for SLM snapshots."). Default("false").Bool() @@ -211,10 +208,6 @@ func main() { } } - if *esExportSnapshots { - prometheus.MustRegister(collector.NewSnapshots(logger, httpClient, esURL)) - } - if *esExportSLM { prometheus.MustRegister(collector.NewSLM(logger, httpClient, esURL)) }