-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow exporting JS metadata #318
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -494,7 +494,16 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p | |
return newReplicatorCollector(getSystem(system, prefix), servers) | ||
} | ||
if isJszEndpoint(system) { | ||
return newJszCollector(getSystem(system, prefix), endpoint, servers) | ||
return newJszCollector(getSystem(system, prefix), endpoint, servers, []string{}, []string{}) | ||
} | ||
return newNatsCollector(getSystem(system, prefix), endpoint, servers) | ||
} | ||
|
||
// NewJszCollector creates a new NATS JetStream Collector. | ||
func NewJszCollector( | ||
endpoint, prefix string, | ||
servers []*CollectedServer, | ||
streamMetaKeys, consumerMetaKeys []string, | ||
) prometheus.Collector { | ||
return newJszCollector(getSystem(JetStreamSystem, prefix), endpoint, servers, streamMetaKeys, consumerMetaKeys) | ||
} | ||
Comment on lines
+502
to
+509
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a separate code path now for JetStream collectors to be able to supply the meta keys. This seemed better than to pass the arguments around for the other collectors where they wouldn't be used. This makes JS collectors somewhat of a special case in the code, not sure if you like it that way. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,30 +56,62 @@ type jszCollector struct { | |
consumerNumPending *prometheus.Desc | ||
consumerAckFloorStreamSeq *prometheus.Desc | ||
consumerAckFloorConsumerSeq *prometheus.Desc | ||
|
||
// metadata extractors | ||
streamMetricExtractors []func(nats.StreamDetail) string | ||
consumerMetricExtractors []func(*nats.ConsumerInfo) string | ||
} | ||
|
||
func isJszEndpoint(system string) bool { | ||
return system == JetStreamSystem | ||
} | ||
|
||
func newJszCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector { | ||
func newJszCollector( | ||
system, endpoint string, | ||
servers []*CollectedServer, | ||
streamMetaKeys, consumerMetaKeys []string, | ||
) prometheus.Collector { | ||
serverLabels := []string{"server_id", "server_name", "cluster", "domain", "meta_leader", "is_meta_leader"} | ||
|
||
var streamLabels []string | ||
var streamLabels = make([]string, 0) | ||
streamLabels = append(streamLabels, serverLabels...) | ||
streamLabels = append(streamLabels, "account") | ||
streamLabels = append(streamLabels, "account_id") | ||
streamLabels = append(streamLabels, "stream_name") | ||
streamLabels = append(streamLabels, "stream_leader") | ||
streamLabels = append(streamLabels, "is_stream_leader") | ||
streamLabels = append(streamLabels, "stream_raft_group") | ||
for _, k := range streamMetaKeys { | ||
streamLabels = append(streamLabels, "stream_meta_"+k) | ||
} | ||
Comment on lines
+84
to
+86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stream metadata is exported as labels named Maybe it would actually be nice to combine these, WDYT? |
||
var streamMetricExtractors = make([]func(nats.StreamDetail) string, len(streamMetaKeys)) | ||
for i, k := range streamMetaKeys { | ||
streamMetricExtractors[i] = func(s nats.StreamDetail) string { | ||
if s.Config == nil { | ||
return "" | ||
} | ||
return s.Config.Metadata[k] // defaults to empty string | ||
} | ||
} | ||
|
||
var consumerLabels []string | ||
var consumerLabels = make([]string, 0) | ||
consumerLabels = append(consumerLabels, streamLabels...) | ||
consumerLabels = append(consumerLabels, "consumer_name") | ||
consumerLabels = append(consumerLabels, "consumer_leader") | ||
consumerLabels = append(consumerLabels, "is_consumer_leader") | ||
consumerLabels = append(consumerLabels, "consumer_desc") | ||
for _, k := range consumerMetaKeys { | ||
consumerLabels = append(consumerLabels, "consumer_meta_"+k) | ||
} | ||
var consumerMetricExtractors = make([]func(*nats.ConsumerInfo) string, len(consumerMetaKeys)) | ||
for i, k := range consumerMetaKeys { | ||
consumerMetricExtractors[i] = func(c *nats.ConsumerInfo) string { | ||
if c == nil || c.Config == nil { | ||
return "" | ||
} | ||
return c.Config.Metadata[k] // defaults to empty string | ||
} | ||
} | ||
|
||
nc := &jszCollector{ | ||
httpClient: &http.Client{ | ||
|
@@ -231,6 +263,8 @@ func newJszCollector(system, endpoint string, servers []*CollectedServer) promet | |
consumerLabels, | ||
nil, | ||
), | ||
streamMetricExtractors: streamMetricExtractors, | ||
consumerMetricExtractors: consumerMetricExtractors, | ||
} | ||
|
||
// Use the endpoint | ||
|
@@ -354,12 +388,18 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) { | |
} | ||
streamRaftGroup = stream.RaftGroup | ||
|
||
streamLabelValues := []string{ | ||
// Server Labels | ||
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader, | ||
// Stream Labels | ||
accountName, accountID, streamName, streamLeader, isStreamLeader, streamRaftGroup, | ||
} | ||
for _, extractor := range nc.streamMetricExtractors { | ||
value := extractor(stream) | ||
streamLabelValues = append(streamLabelValues, value) | ||
} | ||
streamMetric := func(key *prometheus.Desc, value float64) prometheus.Metric { | ||
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, | ||
// Server Labels | ||
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader, | ||
// Stream Labels | ||
accountName, accountID, streamName, streamLeader, isStreamLeader, streamRaftGroup) | ||
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, streamLabelValues...) | ||
} | ||
ch <- streamMetric(nc.streamMessages, float64(stream.State.Msgs)) | ||
ch <- streamMetric(nc.streamBytes, float64(stream.State.Bytes)) | ||
|
@@ -384,15 +424,17 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) { | |
} else { | ||
isConsumerLeader = "true" | ||
} | ||
consumerLabelValues := streamLabelValues | ||
consumerLabelValues = append(consumerLabelValues, | ||
// Consumer Labels | ||
consumerName, consumerLeader, isConsumerLeader, consumerDesc, | ||
) | ||
for _, extractor := range nc.consumerMetricExtractors { | ||
value := extractor(consumer) | ||
consumerLabelValues = append(consumerLabelValues, value) | ||
} | ||
consumerMetric := func(key *prometheus.Desc, value float64) prometheus.Metric { | ||
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, | ||
// Server Labels | ||
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader, | ||
// Stream Labels | ||
accountName, accountID, streamName, streamLeader, isStreamLeader, streamRaftGroup, | ||
// Consumer Labels | ||
consumerName, consumerLeader, isConsumerLeader, consumerDesc, | ||
) | ||
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, consumerLabelValues...) | ||
} | ||
ch <- consumerMetric(nc.consumerDeliveredConsumerSeq, float64(consumer.Delivered.Consumer)) | ||
ch <- consumerMetric(nc.consumerDeliveredStreamSeq, float64(consumer.Delivered.Stream)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,8 @@ type NATSExporterOptions struct { | |
GetStreamingChannelz bool | ||
GetStreamingServerz bool | ||
GetJszFilter string | ||
JszSteamMetaKeys string | ||
JszConsumerMetaKeys string | ||
RetryInterval time.Duration | ||
CertFile string | ||
KeyFile string | ||
|
@@ -135,6 +137,15 @@ func (ne *NATSExporter) createCollector(system, endpoint string) { | |
ne.servers)) | ||
} | ||
|
||
func (ne *NATSExporter) createJszCollector(endpoint string, streamMetaKeys, consumerMetaKeys []string) { | ||
ne.registerCollector(collector.JetStreamSystem, endpoint, | ||
collector.NewJszCollector(endpoint, | ||
ne.opts.Prefix, | ||
ne.servers, | ||
streamMetaKeys, | ||
consumerMetaKeys)) | ||
} | ||
|
||
func (ne *NATSExporter) registerCollector(system, endpoint string, nc prometheus.Collector) { | ||
if err := ne.registry.Register(nc); err != nil { | ||
if _, ok := err.(prometheus.AlreadyRegisteredError); ok { | ||
|
@@ -225,7 +236,15 @@ func (ne *NATSExporter) InitializeCollectors() error { | |
default: | ||
return fmt.Errorf("invalid jsz filter %q", opts.GetJszFilter) | ||
} | ||
ne.createCollector(collector.JetStreamSystem, opts.GetJszFilter) | ||
splitOrEmpty := func(s string) []string { | ||
if s == "" { | ||
return []string{} | ||
} | ||
return strings.Split(s, ",") | ||
} | ||
streamMetaKeys := splitOrEmpty(opts.JszSteamMetaKeys) | ||
consumerMetaKeys := splitOrEmpty(opts.JszConsumerMetaKeys) | ||
ne.createJszCollector(opts.GetJszFilter, streamMetaKeys, consumerMetaKeys) | ||
Comment on lines
+239
to
+247
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be smart to validate that the resulting label names are actually valid in Prometheus. WDYT? |
||
} | ||
if len(ne.Collectors) == 0 { | ||
return fmt.Errorf("no Collectors specified") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -201,7 +201,7 @@ func RunJetStreamServerWithPorts(port, monitorPort int, domain string) *server.S | |
opts.JetStream = true | ||
opts.JetStreamDomain = domain | ||
tdir, _ := os.MkdirTemp(tempRoot, "js-storedir-") | ||
opts.StoreDir = filepath.Dir(tdir) | ||
opts.StoreDir = tdir | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, this was a bug. It always selected the parent directory of the Also: I had to create that parent directory by hand. Are you sure that is intended? |
||
opts.HTTPHost = "127.0.0.1" | ||
opts.HTTPPort = monitorPort | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seemed to make some assumptions on workspace layout which were not true on my machine, feel free to ignore