Skip to content

Commit

Permalink
Add extra tests and improve error handling in watermark metrics
Browse files Browse the repository at this point in the history
Improve the error handling for watermark metrics. When we have an error, do not report a 0 value for the metric.

Adds tests for parsing the ratio/percentage and the human readable bytes in watermark data.

Functionality originally added in #611

Signed-off-by: Joe Adams <[email protected]>
  • Loading branch information
sysadmind committed Oct 15, 2023
1 parent 9bb0ad5 commit 3d61697
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 47 deletions.
103 changes: 68 additions & 35 deletions collector/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package collector
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/imdario/mergo"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -222,49 +224,80 @@ func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometh

// Watermark bytes or ratio metrics
if strings.HasSuffix(merged.Cluster.Routing.Allocation.Disk.Watermark.High, "b") {
flooodStageBytes, err := getValueInBytes(merged.Cluster.Routing.Allocation.Disk.Watermark.FloodStage)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse flood_stage bytes", "err", err)
} else {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["floodStageBytes"],
prometheus.GaugeValue,
flooodStageBytes,
)
}

highBytes, err := getValueInBytes(merged.Cluster.Routing.Allocation.Disk.Watermark.High)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse high bytes", "err", err)
} else {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["highBytes"],
prometheus.GaugeValue,
highBytes,
)
}

lowBytes, err := getValueInBytes(merged.Cluster.Routing.Allocation.Disk.Watermark.Low)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse low bytes", "err", err)
} else {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["lowBytes"],
prometheus.GaugeValue,
lowBytes,
)
}

return nil
}

// Watermark ratio metrics
floodRatio, err := getValueAsRatio(merged.Cluster.Routing.Allocation.Disk.Watermark.FloodStage)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse flood_stage ratio", "err", err)
} else {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["floodStageBytes"],
clusterSettingsDesc["floodStageRatio"],
prometheus.GaugeValue,
getValueInBytes(merged.Cluster.Routing.Allocation.Disk.Watermark.FloodStage),
floodRatio,
)
}

highRatio, err := getValueAsRatio(merged.Cluster.Routing.Allocation.Disk.Watermark.High)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse high ratio", "err", err)
} else {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["highBytes"],
clusterSettingsDesc["highRatio"],
prometheus.GaugeValue,
getValueInBytes(merged.Cluster.Routing.Allocation.Disk.Watermark.High),
highRatio,
)
}

lowRatio, err := getValueAsRatio(merged.Cluster.Routing.Allocation.Disk.Watermark.Low)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse low ratio", "err", err)
} else {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["lowBytes"],
clusterSettingsDesc["lowRatio"],
prometheus.GaugeValue,
getValueInBytes(merged.Cluster.Routing.Allocation.Disk.Watermark.Low),
lowRatio,
)

return nil
}

ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["floodStageRatio"],
prometheus.GaugeValue,
getValueAsRatio(merged.Cluster.Routing.Allocation.Disk.Watermark.FloodStage),
)

ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["highRatio"],
prometheus.GaugeValue,
getValueAsRatio(merged.Cluster.Routing.Allocation.Disk.Watermark.High),
)

ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["lowRatio"],
prometheus.GaugeValue,
getValueAsRatio(merged.Cluster.Routing.Allocation.Disk.Watermark.Low),
)

return nil
}

func getValueInBytes(value string) float64 {
func getValueInBytes(value string) (float64, error) {
type UnitValue struct {
unit string
val float64
Expand All @@ -285,29 +318,29 @@ func getValueInBytes(value string) float64 {

number, err := strconv.ParseFloat(numberStr, 64)
if err != nil {
return 0
return 0, err
}
return number * uv.val
return number * uv.val, nil
}
}

return 0
return 0, fmt.Errorf("failed to convert unit %s to bytes", value)
}

func getValueAsRatio(value string) float64 {
func getValueAsRatio(value string) (float64, error) {
if strings.HasSuffix(value, "%") {
percentValue, err := strconv.Atoi(strings.TrimSpace(strings.TrimSuffix(value, "%")))
if err != nil {
return 0
return 0, err
}

return float64(percentValue) / 100
return float64(percentValue) / 100, nil
}

ratio, err := strconv.ParseFloat(value, 64)
if err != nil {
return 0
return 0, err
}

return ratio
return ratio, nil
}
66 changes: 54 additions & 12 deletions collector/cluster_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ elasticsearch_clustersettings_stats_shard_allocation_enabled 0
# HELP elasticsearch_clustersettings_allocation_threshold_enabled Is disk allocation decider enabled.
# TYPE elasticsearch_clustersettings_allocation_threshold_enabled gauge
elasticsearch_clustersettings_allocation_threshold_enabled 1
# HELP elasticsearch_clustersettings_allocation_watermark_flood_stage_ratio Flood stage watermark as a ratio.
# TYPE elasticsearch_clustersettings_allocation_watermark_flood_stage_ratio gauge
elasticsearch_clustersettings_allocation_watermark_flood_stage_ratio 0
# HELP elasticsearch_clustersettings_allocation_watermark_high_ratio High watermark for disk usage as a ratio.
# TYPE elasticsearch_clustersettings_allocation_watermark_high_ratio gauge
elasticsearch_clustersettings_allocation_watermark_high_ratio 0.9
Expand All @@ -82,15 +79,6 @@ elasticsearch_clustersettings_stats_shard_allocation_enabled 0
# HELP elasticsearch_clustersettings_allocation_threshold_enabled Is disk allocation decider enabled.
# TYPE elasticsearch_clustersettings_allocation_threshold_enabled gauge
elasticsearch_clustersettings_allocation_threshold_enabled 0
# HELP elasticsearch_clustersettings_allocation_watermark_flood_stage_ratio Flood stage watermark as a ratio.
# TYPE elasticsearch_clustersettings_allocation_watermark_flood_stage_ratio gauge
elasticsearch_clustersettings_allocation_watermark_flood_stage_ratio 0
# HELP elasticsearch_clustersettings_allocation_watermark_high_ratio High watermark for disk usage as a ratio.
# TYPE elasticsearch_clustersettings_allocation_watermark_high_ratio gauge
elasticsearch_clustersettings_allocation_watermark_high_ratio 0
# HELP elasticsearch_clustersettings_allocation_watermark_low_ratio Low watermark for disk usage as a ratio.
# TYPE elasticsearch_clustersettings_allocation_watermark_low_ratio gauge
elasticsearch_clustersettings_allocation_watermark_low_ratio 0
`,
},
{
Expand Down Expand Up @@ -172,3 +160,57 @@ elasticsearch_clustersettings_allocation_watermark_low_bytes 5.24288e+07
})
}
}

func Test_getValueInBytes(t *testing.T) {
tests := []struct {
name string
input string
want float64
wantErr bool
}{
{name: "Bytes", input: "100b", want: 100},
{name: "Kibibytes", input: "200kb", want: 204800},
{name: "Mebibytes", input: "300mb", want: 314572800},
{name: "Gibibytes", input: "400gb", want: 429496729600},
{name: "Tebibytes", input: "500tb", want: 549755813888000},
{name: "Pebibytes", input: "600pb", want: 675539944105574400},
{name: "Unknown", input: "9ab", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getValueInBytes(tt.input)
if (err != nil) != tt.wantErr {
t.Fatalf("getValueInBytes() error = %v, wantErr %v", err, tt.wantErr)
}

if got != tt.want {
t.Errorf("getValueInBytes() = %v, want %v", got, tt.want)
}
})
}
}

func Test_getValueAsRatio(t *testing.T) {
tests := []struct {
name string
input string
want float64
wantErr bool
}{
{name: "Ratio", input: "0.5", want: 0.5},
{name: "Percentage", input: "50%", want: 0.5},
{name: "Invalid", input: "500b", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getValueAsRatio(tt.input)
if (err != nil) != tt.wantErr {
t.Fatalf("getValueAsRatio() error = %v, wantErr %v", err, tt.wantErr)
}

if got != tt.want {
t.Errorf("getValueAsRatio() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 3d61697

Please sign in to comment.