Skip to content

Commit

Permalink
Add cluster label to shard metric (#639)
Browse files Browse the repository at this point in the history
- Almost all the other metrics have this label, so it becomes a norm to
   have it on this metric as well
 - Only capture shards with state "STARTED"

Signed-off-by: Arpit Agarwal <[email protected]>
  • Loading branch information
arpitjindal97 authored Dec 12, 2023
1 parent e9ad2f2 commit 9fd3634
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 24 deletions.
84 changes: 61 additions & 23 deletions collector/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"encoding/json"
"fmt"
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
"net/http"
"net/url"
"path"
Expand All @@ -24,66 +26,104 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
defaultNodeShardLabels = []string{"node"}

defaultNodeShardLabelValues = func(node string) []string {
return []string{
node,
}
}
)

// ShardResponse has shard's node and index info
type ShardResponse struct {
Index string `json:"index"`
Shard string `json:"shard"`
State string `json:"state"`
Node string `json:"node"`
}

// Shards information struct
type Shards struct {
logger log.Logger
client *http.Client
url *url.URL
logger log.Logger
client *http.Client
url *url.URL
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

nodeShardMetrics []*nodeShardMetric
jsonParseFailures prometheus.Counter
}

// ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the
// (not exported) clusterinfo.consumer interface
func (s *Shards) ClusterLabelUpdates() *chan *clusterinfo.Response {
return &s.clusterInfoCh
}

// String implements the stringer interface. It is part of the clusterinfo.consumer interface
func (s *Shards) String() string {
return namespace + "shards"
}

type nodeShardMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(shards float64) float64
Labels func(node string) []string
Labels labels
}

// NewShards defines Shards Prometheus metrics
func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards {
return &Shards{

nodeLabels := labels{
keys: func(...string) []string {
return []string{"node", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
if lastClusterinfo != nil {
return append(s, lastClusterinfo.ClusterName)
}
// this shouldn't happen, as the clusterinfo Retriever has a blocking
// Run method. It blocks until the first clusterinfo call has succeeded
return append(s, "unknown_cluster")
},
}

shards := &Shards{
logger: logger,
client: client,
url: url,

clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
},

nodeShardMetrics: []*nodeShardMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "node_shards", "total"),
"Total shards per node",
defaultNodeShardLabels, nil,
nodeLabels.keys(), nil,
),
Value: func(shards float64) float64 {
return shards
},
Labels: defaultNodeShardLabelValues,
Labels: nodeLabels,
}},

jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
}

// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
go func() {
level.Debug(logger).Log("msg", "starting cluster info receive loop")
for ci := range shards.clusterInfoCh {
if ci != nil {
level.Debug(logger).Log("msg", "received cluster info update", "cluster", ci.ClusterName)
shards.lastClusterInfo = ci
}
}
level.Debug(logger).Log("msg", "exiting cluster info receive loop")
}()

return shards
}

// Describe Shards
Expand Down Expand Up @@ -137,7 +177,7 @@ func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) {
return sfr, err
}

// Collect number of shards on each nodes
// Collect number of shards on each node
func (s *Shards) Collect(ch chan<- prometheus.Metric) {

defer func() {
Expand All @@ -156,10 +196,8 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) {
nodeShards := make(map[string]float64)

for _, shard := range sr {
if val, ok := nodeShards[shard.Node]; ok {
nodeShards[shard.Node] = val + 1
} else {
nodeShards[shard.Node] = 1
if shard.State == "STARTED" {
nodeShards[shard.Node]++
}
}

Expand All @@ -169,7 +207,7 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) {
metric.Desc,
metric.Type,
metric.Value(shards),
metric.Labels(node)...,
metric.Labels.values(s.lastClusterInfo, node)...,
)
}
}
Expand Down
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,18 @@ func main() {
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))

if *esExportIndices || *esExportShards {
prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL))
sC := collector.NewShards(logger, httpClient, esURL)
prometheus.MustRegister(sC)
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases)
prometheus.MustRegister(iC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
os.Exit(1)
}
if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register shards collector in cluster info")
os.Exit(1)
}
}

if *esExportSLM {
Expand Down

0 comments on commit 9fd3634

Please sign in to comment.