diff --git a/collector/shards.go b/collector/shards.go index ba9a0ad2..37ea1fef 100644 --- a/collector/shards.go +++ b/collector/shards.go @@ -10,11 +10,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package collector import ( "encoding/json" "fmt" + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" "net/http" "net/url" "path" @@ -24,59 +26,83 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -var ( - defaultNodeShardLabels = []string{"node"} - - defaultNodeShardLabelValues = func(node string) []string { - return []string{ - node, - } - } -) - // ShardResponse has shard's node and index info type ShardResponse struct { Index string `json:"index"` Shard string `json:"shard"` + State string `json:"state"` Node string `json:"node"` } // Shards information struct type Shards struct { - logger log.Logger - client *http.Client - url *url.URL + logger log.Logger + client *http.Client + url *url.URL + clusterInfoCh chan *clusterinfo.Response + lastClusterInfo *clusterinfo.Response nodeShardMetrics []*nodeShardMetric jsonParseFailures prometheus.Counter } +// ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the +// (not exported) clusterinfo.consumer interface +func (s *Shards) ClusterLabelUpdates() *chan *clusterinfo.Response { + return &s.clusterInfoCh +} + +// String implements the stringer interface. It is part of the clusterinfo.consumer interface +func (s *Shards) String() string { + return namespace + "shards" +} + type nodeShardMetric struct { Type prometheus.ValueType Desc *prometheus.Desc Value func(shards float64) float64 - Labels func(node string) []string + Labels labels } // NewShards defines Shards Prometheus metrics func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards { - return &Shards{ + + nodeLabels := labels{ + keys: func(...string) []string { + return []string{"node", "cluster"} + }, + values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string { + if lastClusterinfo != nil { + return append(s, lastClusterinfo.ClusterName) + } + // this shouldn't happen, as the clusterinfo Retriever has a blocking + // Run method. It blocks until the first clusterinfo call has succeeded + return append(s, "unknown_cluster") + }, + } + + shards := &Shards{ logger: logger, client: client, url: url, + clusterInfoCh: make(chan *clusterinfo.Response), + lastClusterInfo: &clusterinfo.Response{ + ClusterName: "unknown_cluster", + }, + nodeShardMetrics: []*nodeShardMetric{ { Type: prometheus.GaugeValue, Desc: prometheus.NewDesc( prometheus.BuildFQName(namespace, "node_shards", "total"), "Total shards per node", - defaultNodeShardLabels, nil, + nodeLabels.keys(), nil, ), Value: func(shards float64) float64 { return shards }, - Labels: defaultNodeShardLabelValues, + Labels: nodeLabels, }}, jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ @@ -84,6 +110,20 @@ func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards { Help: "Number of errors while parsing JSON.", }), } + + // start go routine to fetch clusterinfo updates and save them to lastClusterinfo + go func() { + level.Debug(logger).Log("msg", "starting cluster info receive loop") + for ci := range shards.clusterInfoCh { + if ci != nil { + level.Debug(logger).Log("msg", "received cluster info update", "cluster", ci.ClusterName) + shards.lastClusterInfo = ci + } + } + level.Debug(logger).Log("msg", "exiting cluster info receive loop") + }() + + return shards } // Describe Shards @@ -137,7 +177,7 @@ func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) { return sfr, err } -// Collect number of shards on each nodes +// Collect number of shards on each node func (s *Shards) Collect(ch chan<- prometheus.Metric) { defer func() { @@ -156,10 +196,8 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) { nodeShards := make(map[string]float64) for _, shard := range sr { - if val, ok := nodeShards[shard.Node]; ok { - nodeShards[shard.Node] = val + 1 - } else { - nodeShards[shard.Node] = 1 + if shard.State == "STARTED" { + nodeShards[shard.Node]++ } } @@ -169,7 +207,7 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) { metric.Desc, metric.Type, metric.Value(shards), - metric.Labels(node)..., + metric.Labels.values(s.lastClusterInfo, node)..., ) } } diff --git a/main.go b/main.go index c7db1def..5e4c6da0 100644 --- a/main.go +++ b/main.go @@ -199,13 +199,18 @@ func main() { prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode)) if *esExportIndices || *esExportShards { - prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL)) + sC := collector.NewShards(logger, httpClient, esURL) + prometheus.MustRegister(sC) iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases) prometheus.MustRegister(iC) if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil { level.Error(logger).Log("msg", "failed to register indices collector in cluster info") os.Exit(1) } + if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil { + level.Error(logger).Log("msg", "failed to register shards collector in cluster info") + os.Exit(1) + } } if *esExportSLM {