diff --git a/collector/shards.go b/collector/shards.go index 37ea1fef..ba9a0ad2 100644 --- a/collector/shards.go +++ b/collector/shards.go @@ -10,13 +10,11 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package collector import ( "encoding/json" "fmt" - "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" "net/http" "net/url" "path" @@ -26,83 +24,59 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +var ( + defaultNodeShardLabels = []string{"node"} + + defaultNodeShardLabelValues = func(node string) []string { + return []string{ + node, + } + } +) + // ShardResponse has shard's node and index info type ShardResponse struct { Index string `json:"index"` Shard string `json:"shard"` - State string `json:"state"` Node string `json:"node"` } // Shards information struct type Shards struct { - logger log.Logger - client *http.Client - url *url.URL - clusterInfoCh chan *clusterinfo.Response - lastClusterInfo *clusterinfo.Response + logger log.Logger + client *http.Client + url *url.URL nodeShardMetrics []*nodeShardMetric jsonParseFailures prometheus.Counter } -// ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the -// (not exported) clusterinfo.consumer interface -func (s *Shards) ClusterLabelUpdates() *chan *clusterinfo.Response { - return &s.clusterInfoCh -} - -// String implements the stringer interface. It is part of the clusterinfo.consumer interface -func (s *Shards) String() string { - return namespace + "shards" -} - type nodeShardMetric struct { Type prometheus.ValueType Desc *prometheus.Desc Value func(shards float64) float64 - Labels labels + Labels func(node string) []string } // NewShards defines Shards Prometheus metrics func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards { - - nodeLabels := labels{ - keys: func(...string) []string { - return []string{"node", "cluster"} - }, - values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string { - if lastClusterinfo != nil { - return append(s, lastClusterinfo.ClusterName) - } - // this shouldn't happen, as the clusterinfo Retriever has a blocking - // Run method. It blocks until the first clusterinfo call has succeeded - return append(s, "unknown_cluster") - }, - } - - shards := &Shards{ + return &Shards{ logger: logger, client: client, url: url, - clusterInfoCh: make(chan *clusterinfo.Response), - lastClusterInfo: &clusterinfo.Response{ - ClusterName: "unknown_cluster", - }, - nodeShardMetrics: []*nodeShardMetric{ { Type: prometheus.GaugeValue, Desc: prometheus.NewDesc( prometheus.BuildFQName(namespace, "node_shards", "total"), "Total shards per node", - nodeLabels.keys(), nil, + defaultNodeShardLabels, nil, ), Value: func(shards float64) float64 { return shards }, - Labels: nodeLabels, + Labels: defaultNodeShardLabelValues, }}, jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ @@ -110,20 +84,6 @@ func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards { Help: "Number of errors while parsing JSON.", }), } - - // start go routine to fetch clusterinfo updates and save them to lastClusterinfo - go func() { - level.Debug(logger).Log("msg", "starting cluster info receive loop") - for ci := range shards.clusterInfoCh { - if ci != nil { - level.Debug(logger).Log("msg", "received cluster info update", "cluster", ci.ClusterName) - shards.lastClusterInfo = ci - } - } - level.Debug(logger).Log("msg", "exiting cluster info receive loop") - }() - - return shards } // Describe Shards @@ -177,7 +137,7 @@ func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) { return sfr, err } -// Collect number of shards on each node +// Collect number of shards on each nodes func (s *Shards) Collect(ch chan<- prometheus.Metric) { defer func() { @@ -196,8 +156,10 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) { nodeShards := make(map[string]float64) for _, shard := range sr { - if shard.State == "STARTED" { - nodeShards[shard.Node]++ + if val, ok := nodeShards[shard.Node]; ok { + nodeShards[shard.Node] = val + 1 + } else { + nodeShards[shard.Node] = 1 } } @@ -207,7 +169,7 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) { metric.Desc, metric.Type, metric.Value(shards), - metric.Labels.values(s.lastClusterInfo, node)..., + metric.Labels(node)..., ) } } diff --git a/main.go b/main.go index 5e4c6da0..c7db1def 100644 --- a/main.go +++ b/main.go @@ -199,18 +199,13 @@ func main() { prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode)) if *esExportIndices || *esExportShards { - sC := collector.NewShards(logger, httpClient, esURL) - prometheus.MustRegister(sC) + prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL)) iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases) prometheus.MustRegister(iC) if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil { level.Error(logger).Log("msg", "failed to register indices collector in cluster info") os.Exit(1) } - if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil { - level.Error(logger).Log("msg", "failed to register shards collector in cluster info") - os.Exit(1) - } } if *esExportSLM {