Skip to content

Commit

Permalink
Revert "Add cluster label to shard metric (prometheus-community#639)"
Browse files Browse the repository at this point in the history
This reverts commit b1136b2.
  • Loading branch information
jaimeyh authored Jun 14, 2024
1 parent 8e31f72 commit 6c24229
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 67 deletions.
84 changes: 23 additions & 61 deletions collector/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"encoding/json"
"fmt"
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
"net/http"
"net/url"
"path"
Expand All @@ -26,104 +24,66 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
defaultNodeShardLabels = []string{"node"}

defaultNodeShardLabelValues = func(node string) []string {
return []string{
node,
}
}
)

// ShardResponse has shard's node and index info
type ShardResponse struct {
Index string `json:"index"`
Shard string `json:"shard"`
State string `json:"state"`
Node string `json:"node"`
}

// Shards information struct
type Shards struct {
logger log.Logger
client *http.Client
url *url.URL
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response
logger log.Logger
client *http.Client
url *url.URL

nodeShardMetrics []*nodeShardMetric
jsonParseFailures prometheus.Counter
}

// ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the
// (not exported) clusterinfo.consumer interface
func (s *Shards) ClusterLabelUpdates() *chan *clusterinfo.Response {
return &s.clusterInfoCh
}

// String implements the stringer interface. It is part of the clusterinfo.consumer interface
func (s *Shards) String() string {
return namespace + "shards"
}

type nodeShardMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(shards float64) float64
Labels labels
Labels func(node string) []string
}

// NewShards defines Shards Prometheus metrics
func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards {

nodeLabels := labels{
keys: func(...string) []string {
return []string{"node", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
if lastClusterinfo != nil {
return append(s, lastClusterinfo.ClusterName)
}
// this shouldn't happen, as the clusterinfo Retriever has a blocking
// Run method. It blocks until the first clusterinfo call has succeeded
return append(s, "unknown_cluster")
},
}

shards := &Shards{
return &Shards{
logger: logger,
client: client,
url: url,

clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
},

nodeShardMetrics: []*nodeShardMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "node_shards", "total"),
"Total shards per node",
nodeLabels.keys(), nil,
defaultNodeShardLabels, nil,
),
Value: func(shards float64) float64 {
return shards
},
Labels: nodeLabels,
Labels: defaultNodeShardLabelValues,
}},

jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
}

// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
go func() {
level.Debug(logger).Log("msg", "starting cluster info receive loop")
for ci := range shards.clusterInfoCh {
if ci != nil {
level.Debug(logger).Log("msg", "received cluster info update", "cluster", ci.ClusterName)
shards.lastClusterInfo = ci
}
}
level.Debug(logger).Log("msg", "exiting cluster info receive loop")
}()

return shards
}

// Describe Shards
Expand Down Expand Up @@ -177,7 +137,7 @@ func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) {
return sfr, err
}

// Collect number of shards on each node
// Collect number of shards on each nodes
func (s *Shards) Collect(ch chan<- prometheus.Metric) {

defer func() {
Expand All @@ -196,8 +156,10 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) {
nodeShards := make(map[string]float64)

for _, shard := range sr {
if shard.State == "STARTED" {
nodeShards[shard.Node]++
if val, ok := nodeShards[shard.Node]; ok {
nodeShards[shard.Node] = val + 1
} else {
nodeShards[shard.Node] = 1
}
}

Expand All @@ -207,7 +169,7 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) {
metric.Desc,
metric.Type,
metric.Value(shards),
metric.Labels.values(s.lastClusterInfo, node)...,
metric.Labels(node)...,
)
}
}
Expand Down
7 changes: 1 addition & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,13 @@ func main() {
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))

if *esExportIndices || *esExportShards {
sC := collector.NewShards(logger, httpClient, esURL)
prometheus.MustRegister(sC)
prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL))
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases)
prometheus.MustRegister(iC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
os.Exit(1)
}
if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register shards collector in cluster info")
os.Exit(1)
}
}

if *esExportSLM {
Expand Down

0 comments on commit 6c24229

Please sign in to comment.