Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow exporting JS metadata #318

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ test-cov:

.PHONY: lint
lint:
@PATH=$(shell go env GOPATH)/bin:$(PATH)
@PATH=$(shell go env GOBIN):$(PATH)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seemed to make some assumptions on workspace layout which were not true on my machine, feel free to ignore

@if ! which golangci-lint >/dev/null; then \
echo "golangci-lint is required and was not found"; \
exit 1; \
fi
go vet ./...
$(shell go env GOPATH)/bin/golangci-lint run ./...
$(shell go env GOBIN)/golangci-lint run ./...
11 changes: 10 additions & 1 deletion collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,16 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p
return newReplicatorCollector(getSystem(system, prefix), servers)
}
if isJszEndpoint(system) {
return newJszCollector(getSystem(system, prefix), endpoint, servers)
return newJszCollector(getSystem(system, prefix), endpoint, servers, []string{}, []string{})
}
return newNatsCollector(getSystem(system, prefix), endpoint, servers)
}

// NewJszCollector creates a new NATS JetStream Collector.
func NewJszCollector(
endpoint, prefix string,
servers []*CollectedServer,
streamMetaKeys, consumerMetaKeys []string,
) prometheus.Collector {
return newJszCollector(getSystem(JetStreamSystem, prefix), endpoint, servers, streamMetaKeys, consumerMetaKeys)
}
Comment on lines +502 to +509
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a separate code path now for JetStream collectors to be able to supply the meta keys. This seemed better than to pass the arguments around for the other collectors where they wouldn't be used.

This makes JS collectors somewhat of a special case in the code, not sure if you like it that way.

133 changes: 124 additions & 9 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,22 @@ func verifyCollector(system, url string, endpoint string, cases map[string]float
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
verifySpecificCollector(cases, coll, t)
}

// now collect the metrics
func verifyJszCollector(url string, endpoint string, cases map[string]float64, t *testing.T) {
// create a new collector.
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewJszCollector(endpoint, "", servers, []string{}, []string{})

verifySpecificCollector(cases, coll, t)
}

func verifySpecificCollector(cases map[string]float64, coll prometheus.Collector, t *testing.T) {
c := make(chan prometheus.Metric)
go coll.Collect(c)
for {
Expand Down Expand Up @@ -107,6 +121,34 @@ func verifyStreamingCollector(url string, endpoint string, cases map[string]floa
// To account for the metrics that share the same descriptor but differ in their variable label values,
// return a list of lists of label pairs for each of the supplied metric names.
func getLabelValues(system, url, endpoint string, metricNames []string) (map[string][]map[string]string, error) {
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
return getLabelValuesFromCollector(metricNames, coll)
}

// To account for the metrics that share the same descriptor but differ in their variable label values,
// return a list of lists of label pairs for each of the supplied metric names.
func getJszLabelValues(
url, endpoint string,
streamMetaKeys, consumerMetaKeys, metricNames []string,
) (map[string][]map[string]string, error) {
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewJszCollector(endpoint, "", servers, streamMetaKeys, consumerMetaKeys)
return getLabelValuesFromCollector(metricNames, coll)
}

func getLabelValuesFromCollector(
metricNames []string,
coll prometheus.Collector,
) (map[string][]map[string]string, error) {
labelValues := make(map[string][]map[string]string)
namesMap := make(map[string]bool)
for _, metricName := range metricNames {
Expand Down Expand Up @@ -148,13 +190,7 @@ func getLabelValues(system, url, endpoint string, metricNames []string) (map[str
}
}()

// create a new collector and collect
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
// collect metrics
coll.Collect(metrics)
close(metrics)

Expand Down Expand Up @@ -746,7 +782,86 @@ func TestJetStreamMetrics(t *testing.T) {
"jetstream_server_total_streams": 1,
"jetstream_server_total_consumers": 1,
}
verifyCollector(JetStreamSystem, url, "jsz", cases, t)
verifyJszCollector(url, "all", cases, t)
}

func TestJetStreamMetricLabels(t *testing.T) {
clientPort := 4229
monitorPort := 8229
s := pet.RunJetStreamServerWithPorts(clientPort, monitorPort, "ABC")

defer s.Shutdown()

url := fmt.Sprintf("http://127.0.0.1:%d/", monitorPort)
nc, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", clientPort))
if err != nil {
t.Fatal(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}

streamName := "myStr"
streamK := "streamFoo"
streamV := "bar"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Metadata: map[string]string{streamK: streamV},
})
if err != nil {
t.Fatal(err)
}

consumerName := "myCon"
consumerK := "consFoo"
consumerV := "baz"
consumerConfig := nats.ConsumerConfig{Name: consumerName, Metadata: map[string]string{consumerK: consumerV}}
_, err = js.AddConsumer(streamName, &consumerConfig)
if err != nil {
t.Fatal(err)
}

// expected label keys
streamLabelKey := "stream_meta_" + streamK
consumerLabelKey := "consumer_meta_" + consumerK

streamMetric := "jetstream_stream_total_bytes"
consumerMetric := "jetstream_consumer_num_ack_pending"
labelValues, err := getJszLabelValues(
url,
"all",
[]string{streamK},
[]string{consumerK},
[]string{streamMetric, consumerMetric},
)
if err != nil {
t.Fatalf("Unexpected error getting labels for %s metrics: %v", consumerMetric, err)
}

streamMaps, found := labelValues[streamMetric]
if !found || len(streamMaps) != 1 {
t.Fatalf("No info found for metric: %v", streamMetric)
}
streamLabels := streamMaps[0]
if streamLabels[streamLabelKey] != streamV {
t.Fatalf("Value of stream label %s has unexpected value \"%s\"", streamLabelKey, streamLabels[streamLabelKey])
}

consumerMaps, found := labelValues[consumerMetric]
if !found || len(consumerMaps) != 1 {
t.Fatalf("No info found for metric: %v", consumerMetric)
}
consumerLabels := consumerMaps[0]

if consumerLabels[streamLabelKey] != streamV {
t.Fatalf("Value of consumer label %s has unexpected value \"%s\"", streamLabelKey, consumerLabels[streamLabelKey])
}
if consumerLabels[consumerLabelKey] != consumerV {
t.Fatalf("Value of consumer label %s has unexpected value \"%s\"", consumerLabelKey, consumerLabels[consumerLabelKey])
}
}

func TestReplicatorMetrics(t *testing.T) {
Expand Down
74 changes: 58 additions & 16 deletions collector/jsz.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,61 @@ type jszCollector struct {
consumerNumPending *prometheus.Desc
consumerAckFloorStreamSeq *prometheus.Desc
consumerAckFloorConsumerSeq *prometheus.Desc

// metadata extractors
streamMetricExtractors []func(nats.StreamDetail) string
consumerMetricExtractors []func(*nats.ConsumerInfo) string
}

func isJszEndpoint(system string) bool {
return system == JetStreamSystem
}

func newJszCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector {
func newJszCollector(
system, endpoint string,
servers []*CollectedServer,
streamMetaKeys, consumerMetaKeys []string,
) prometheus.Collector {
serverLabels := []string{"server_id", "server_name", "cluster", "domain", "meta_leader", "is_meta_leader"}

var streamLabels []string
var streamLabels = make([]string, 0)
streamLabels = append(streamLabels, serverLabels...)
streamLabels = append(streamLabels, "account")
streamLabels = append(streamLabels, "account_id")
streamLabels = append(streamLabels, "stream_name")
streamLabels = append(streamLabels, "stream_leader")
streamLabels = append(streamLabels, "is_stream_leader")
for _, k := range streamMetaKeys {
streamLabels = append(streamLabels, "stream_meta_"+k)
}
Comment on lines +84 to +86
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream metadata is exported as labels named stream_meta_…, making sure they don't conflict with consumer metadata defined below.

Maybe it would actually be nice to combine these, WDYT?

var streamMetricExtractors = make([]func(nats.StreamDetail) string, len(streamMetaKeys))
for i, k := range streamMetaKeys {
streamMetricExtractors[i] = func(s nats.StreamDetail) string {
if s.Config == nil {
return ""
}
return s.Config.Metadata[k] // defaults to empty string
}
}

var consumerLabels []string
var consumerLabels = make([]string, 0)
consumerLabels = append(consumerLabels, streamLabels...)
consumerLabels = append(consumerLabels, "consumer_name")
consumerLabels = append(consumerLabels, "consumer_leader")
consumerLabels = append(consumerLabels, "is_consumer_leader")
consumerLabels = append(consumerLabels, "consumer_desc")
for _, k := range consumerMetaKeys {
consumerLabels = append(consumerLabels, "consumer_meta_"+k)
}
var consumerMetricExtractors = make([]func(*nats.ConsumerInfo) string, len(consumerMetaKeys))
for i, k := range consumerMetaKeys {
consumerMetricExtractors[i] = func(c *nats.ConsumerInfo) string {
if c == nil || c.Config == nil {
return ""
}
return c.Config.Metadata[k] // defaults to empty string
}
}

nc := &jszCollector{
httpClient: &http.Client{
Expand Down Expand Up @@ -230,6 +262,8 @@ func newJszCollector(system, endpoint string, servers []*CollectedServer) promet
consumerLabels,
nil,
),
streamMetricExtractors: streamMetricExtractors,
consumerMetricExtractors: consumerMetricExtractors,
}

// Use the endpoint
Expand Down Expand Up @@ -351,12 +385,18 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) {
} else {
isStreamLeader = "true"
}
streamLabelValues := []string{
// Server Labels
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader,
// Stream Labels
accountName, accountID, streamName, streamLeader, isStreamLeader,
}
for _, extractor := range nc.streamMetricExtractors {
value := extractor(stream)
streamLabelValues = append(streamLabelValues, value)
}
streamMetric := func(key *prometheus.Desc, value float64) prometheus.Metric {
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value,
// Server Labels
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader,
// Stream Labels
accountName, accountID, streamName, streamLeader, isStreamLeader)
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, streamLabelValues...)
}
ch <- streamMetric(nc.streamMessages, float64(stream.State.Msgs))
ch <- streamMetric(nc.streamBytes, float64(stream.State.Bytes))
Expand All @@ -381,15 +421,17 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) {
} else {
isConsumerLeader = "true"
}
consumerLabelValues := streamLabelValues
consumerLabelValues = append(consumerLabelValues,
// Consumer Labels
consumerName, consumerLeader, isConsumerLeader, consumerDesc,
)
for _, extractor := range nc.consumerMetricExtractors {
value := extractor(consumer)
consumerLabelValues = append(consumerLabelValues, value)
}
consumerMetric := func(key *prometheus.Desc, value float64) prometheus.Metric {
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value,
// Server Labels
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader,
// Stream Labels
accountName, accountID, streamName, streamLeader, isStreamLeader,
// Consumer Labels
consumerName, consumerLeader, isConsumerLeader, consumerDesc,
)
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, consumerLabelValues...)
}
ch <- consumerMetric(nc.consumerDeliveredConsumerSeq, float64(consumer.Delivered.Consumer))
ch <- consumerMetric(nc.consumerDeliveredStreamSeq, float64(consumer.Delivered.Stream))
Expand Down
21 changes: 20 additions & 1 deletion exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type NATSExporterOptions struct {
GetStreamingChannelz bool
GetStreamingServerz bool
GetJszFilter string
JszSteamMetaKeys string
JszConsumerMetaKeys string
RetryInterval time.Duration
CertFile string
KeyFile string
Expand Down Expand Up @@ -135,6 +137,15 @@ func (ne *NATSExporter) createCollector(system, endpoint string) {
ne.servers))
}

func (ne *NATSExporter) createJszCollector(endpoint string, streamMetaKeys, consumerMetaKeys []string) {
ne.registerCollector(collector.JetStreamSystem, endpoint,
collector.NewJszCollector(endpoint,
ne.opts.Prefix,
ne.servers,
streamMetaKeys,
consumerMetaKeys))
}

func (ne *NATSExporter) registerCollector(system, endpoint string, nc prometheus.Collector) {
if err := ne.registry.Register(nc); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
Expand Down Expand Up @@ -225,7 +236,15 @@ func (ne *NATSExporter) InitializeCollectors() error {
default:
return fmt.Errorf("invalid jsz filter %q", opts.GetJszFilter)
}
ne.createCollector(collector.JetStreamSystem, opts.GetJszFilter)
splitOrEmpty := func(s string) []string {
if s == "" {
return []string{}
}
return strings.Split(s, ",")
}
streamMetaKeys := splitOrEmpty(opts.JszSteamMetaKeys)
consumerMetaKeys := splitOrEmpty(opts.JszConsumerMetaKeys)
ne.createJszCollector(opts.GetJszFilter, streamMetaKeys, consumerMetaKeys)
Comment on lines +239 to +247
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be smart to validate that the resulting label names are actually valid in Prometheus. WDYT?

}
if len(ne.Collectors) == 0 {
return fmt.Errorf("no Collectors specified")
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func main() {
flag.BoolVar(&opts.GetStreamingServerz, "serverz", false, "Get streaming server metrics.")
flag.BoolVar(&opts.GetVarz, "varz", false, "Get general metrics.")
flag.StringVar(&opts.GetJszFilter, "jsz", "", "Select JetStream metrics to filter (e.g streams, accounts, consumers)")
flag.StringVar(&opts.JszSteamMetaKeys, "jsz_stream_meta_keys", "",
"Select JetStream stream metadata to output (comma separated)")
flag.StringVar(&opts.JszConsumerMetaKeys, "jsz_consumer_meta_keys", "",
"Select JetStream consumer metadata to output (comma separated)")
flag.StringVar(&opts.CertFile, "tlscert", "", "Server certificate file (Enables HTTPS).")
flag.StringVar(&opts.KeyFile, "tlskey", "", "Private key for server certificate (used with HTTPS).")
flag.StringVar(&opts.CaFile, "tlscacert", "", "Client certificate CA for verification (used with HTTPS).")
Expand Down
2 changes: 1 addition & 1 deletion test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func RunJetStreamServerWithPorts(port, monitorPort int, domain string) *server.S
opts.JetStream = true
opts.JetStreamDomain = domain
tdir, _ := os.MkdirTemp(tempRoot, "js-storedir-")
opts.StoreDir = filepath.Dir(tdir)
opts.StoreDir = tdir
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this was a bug. It always selected the parent directory of the tdir, effectively re-using the same director always.

Also: I had to create that parent directory by hand. Are you sure that is intended?

opts.HTTPHost = "127.0.0.1"
opts.HTTPPort = monitorPort

Expand Down