From ad9c09cd82170ce8a70cf7efe53a299113760ddf Mon Sep 17 00:00:00 2001 From: phhoa Date: Fri, 6 May 2022 21:38:14 +0800 Subject: [PATCH] Add consumer ack floor metrics --- collector/jsz.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/collector/jsz.go b/collector/jsz.go index d4aed38..f48c7a4 100644 --- a/collector/jsz.go +++ b/collector/jsz.go @@ -51,6 +51,8 @@ type jszCollector struct { consumerNumRedelivered *prometheus.Desc consumerNumWaiting *prometheus.Desc consumerNumPending *prometheus.Desc + consumerAckFloorStreamSeq *prometheus.Desc + consumerAckFloorConsumerSeq *prometheus.Desc } func isJszEndpoint(system string) bool { @@ -191,6 +193,18 @@ func newJszCollector(system, endpoint string, servers []*CollectedServer) promet consumerLabels, nil, ), + consumerAckFloorStreamSeq: prometheus.NewDesc( + prometheus.BuildFQName(system, "consumer", "ack_floor_stream_seq"), + "Number of ack floor stream seq from a consumer", + consumerLabels, + nil, + ), + consumerAckFloorConsumerSeq: prometheus.NewDesc( + prometheus.BuildFQName(system, "consumer", "ack_floor_consumer_seq"), + "Number of ack floor consumer seq from a consumer", + consumerLabels, + nil, + ), } // Use the endpoint @@ -345,6 +359,8 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) { ch <- consumerMetric(nc.consumerNumRedelivered, float64(consumer.NumRedelivered)) ch <- consumerMetric(nc.consumerNumWaiting, float64(consumer.NumWaiting)) ch <- consumerMetric(nc.consumerNumPending, float64(consumer.NumPending)) + ch <- consumerMetric(nc.consumerAckFloorStreamSeq, float64(consumer.AckFloor.Stream)) + ch <- consumerMetric(nc.consumerAckFloorConsumerSeq, float64(consumer.AckFloor.Consumer)) } } }