Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: Remove Prometheus Exemplars augmentation from counter metrics without suffix _total #460

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/kafka/v2/async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage)

err := injectTracingHeaders(msg, sp)
if err != nil {
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}

ap.asyncProd.Input() <- msg
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
statusCountAdd(deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return nil
}
Expand Down
7 changes: 0 additions & 7 deletions client/kafka/v2/kafka.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package v2

import (
"context"
"errors"
"fmt"
"os"

"github.com/Shopify/sarama"
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/internal/validation"
"github.com/beatlabs/patron/trace"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -43,11 +41,6 @@ func init() {
prometheus.MustRegister(messageStatus)
}

func statusCountAddWithExemplars(ctx context.Context, deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatusCounter := trace.Counter{Counter: messageStatus.WithLabelValues(string(status), topic, deliveryType)}
messageStatusCounter.Add(ctx, float64(cnt))
}

func statusCountAdd(deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatus.WithLabelValues(string(status), topic, deliveryType).Add(float64(cnt))
}
Expand Down
16 changes: 8 additions & 8 deletions client/kafka/v2/sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ func (p *SyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) (p

err = injectTracingHeaders(msg, sp)
if err != nil {
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, fmt.Errorf("failed to inject tracing headers: %w", err)
}

partition, offset, err = p.syncProd.SendMessage(msg)
if err != nil {
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, err
}

statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
statusCountAdd(deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return partition, offset, nil
}
Expand All @@ -57,19 +57,19 @@ func (p *SyncProducer) SendBatch(ctx context.Context, messages []*sarama.Produce

for _, msg := range messages {
if err := injectTracingHeaders(msg, sp); err != nil {
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}
}

if err := p.syncProd.SendMessages(messages); err != nil {
statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSendError, messages)
statusCountBatchAdd(deliveryTypeSync, deliveryStatusSendError, messages)
trace.SpanError(sp)
return err
}

statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSent, messages)
statusCountBatchAdd(deliveryTypeSync, deliveryStatusSent, messages)
trace.SpanSuccess(sp)
return nil
}
Expand All @@ -87,8 +87,8 @@ func (p *SyncProducer) Close() error {
return nil
}

func statusCountBatchAdd(ctx context.Context, deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
func statusCountBatchAdd(deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
for _, msg := range messages {
statusCountAddWithExemplars(ctx, deliveryType, status, msg.Topic, 1)
statusCountAdd(deliveryType, status, msg.Topic, 1)
}
}
11 changes: 5 additions & 6 deletions component/amqp/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *Component) processLoop(ctx context.Context, sub subscription) error {
return errors.New("subscription channel closed")
}
log.Debugf("processing message %d", delivery.DeliveryTag)
observeReceivedMessageStats(ctx, c.queueCfg.queue, delivery.Timestamp)
observeReceivedMessageStats(c.queueCfg.queue, delivery.Timestamp)
c.processBatch(ctx, c.createMessage(ctx, delivery), btc)
case <-batchTimeout.C:
log.Debugf("batch timeout expired, sending batch")
Expand All @@ -233,9 +233,9 @@ func (c *Component) processLoop(ctx context.Context, sub subscription) error {
}
}

func observeReceivedMessageStats(ctx context.Context, queue string, timestamp time.Time) {
func observeReceivedMessageStats(queue string, timestamp time.Time) {
messageAge.WithLabelValues(queue).Set(time.Now().UTC().Sub(timestamp).Seconds())
messageCountInc(ctx, queue, fetchedMessageState, nil)
messageCountInc(queue, fetchedMessageState, nil)
}

type subscription struct {
Expand Down Expand Up @@ -329,13 +329,12 @@ func (c *Component) stats(sub subscription) error {
return nil
}

func messageCountInc(ctx context.Context, queue string, state messageState, err error) {
func messageCountInc(queue string, state messageState, err error) {
hasError := "false"
if err != nil {
hasError = "true"
}
messageStatusCounter := trace.Counter{Counter: messageCounterVec.WithLabelValues(queue, string(state), hasError)}
messageStatusCounter.Inc(ctx)
messageCounterVec.WithLabelValues(queue, string(state), hasError).Inc()
}

func mapHeader(hh amqp.Table) map[string]string {
Expand Down
4 changes: 2 additions & 2 deletions component/amqp/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func (m message) Message() amqp.Delivery {
func (m message) ACK() error {
err := m.msg.Ack(false)
trace.SpanComplete(m.span, err)
messageCountInc(m.ctx, m.queue, ackMessageState, err)
messageCountInc(m.queue, ackMessageState, err)
return err
}

func (m message) NACK() error {
err := m.msg.Nack(false, m.requeue)
messageCountInc(m.ctx, m.queue, nackMessageState, err)
messageCountInc(m.queue, nackMessageState, err)
trace.SpanComplete(m.span, err)
return err
}
Expand Down
8 changes: 3 additions & 5 deletions component/async/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

patronErrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -29,9 +28,8 @@ func init() {
prometheus.MustRegister(consumerErrors)
}

func consumerErrorsInc(ctx context.Context, name string) {
consumerErrorsCounter := trace.Counter{Counter: consumerErrors.WithLabelValues(name)}
consumerErrorsCounter.Inc(ctx)
func consumerErrorsInc(name string) {
consumerErrors.WithLabelValues(name).Inc()
}

// Component implementation of a async component.
Expand Down Expand Up @@ -162,7 +160,7 @@ func (c *Component) Run(ctx context.Context) error {
if ctx.Err() == context.Canceled {
break
}
consumerErrorsInc(ctx, c.name)
consumerErrorsInc(c.name)
if c.retries > 0 {
log.Errorf("failed run, retry %d/%d with %v wait: %v", i, c.retries, c.retryWait, err)
time.Sleep(c.retryWait)
Expand Down
17 changes: 7 additions & 10 deletions component/kafka/group/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,8 @@ func topicPartitionOffsetDiffGaugeSet(group, topic string, partition int32, high
}

// messageStatusCountInc increments the messageStatus counter for a certain status.
func messageStatusCountInc(ctx context.Context, status, group, topic string) {
httpStatusCounter := trace.Counter{
Counter: messageStatus.WithLabelValues(status, group, topic),
}
httpStatusCounter.Inc(ctx)
func messageStatusCountInc(status, group, topic string) {
messageStatus.WithLabelValues(status, group, topic).Inc()
}

// New initializes a new kafka consumer component with support for functional configuration.
Expand Down Expand Up @@ -321,7 +318,7 @@ func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, clai
if ok {
log.Debugf("message claimed: value = %s, timestamp = %v, topic = %s", string(msg.Value), msg.Timestamp, msg.Topic)
topicPartitionOffsetDiffGaugeSet(c.group, msg.Topic, msg.Partition, claim.HighWaterMarkOffset(), msg.Offset)
messageStatusCountInc(c.ctx, messageReceived, c.group, msg.Topic)
messageStatusCountInc(messageReceived, c.group, msg.Topic)
err := c.insertMessage(session, msg)
if err != nil {
return err
Expand Down Expand Up @@ -350,7 +347,7 @@ func (c *consumerHandler) flush(session sarama.ConsumerGroupSession) error {
if len(c.msgBuf) > 0 {
messages := make([]kafka.Message, 0, len(c.msgBuf))
for _, msg := range c.msgBuf {
messageStatusCountInc(c.ctx, messageProcessed, c.group, msg.Topic)
messageStatusCountInc(messageProcessed, c.group, msg.Topic)
ctx, sp := c.getContextWithCorrelation(msg)
messages = append(messages, kafka.NewMessage(ctx, sp, msg))
}
Expand Down Expand Up @@ -388,16 +385,16 @@ func (c *consumerHandler) executeFailureStrategy(messages []kafka.Message, err e
case kafka.ExitStrategy:
for _, m := range messages {
trace.SpanError(m.Span())
messageStatusCountInc(c.ctx, messageErrored, c.group, m.Message().Topic)
messageStatusCountInc(messageErrored, c.group, m.Message().Topic)
}
log.Errorf("could not process message(s)")
c.err = err
return err
case kafka.SkipStrategy:
for _, m := range messages {
trace.SpanError(m.Span())
messageStatusCountInc(c.ctx, messageErrored, c.group, m.Message().Topic)
messageStatusCountInc(c.ctx, messageSkipped, c.group, m.Message().Topic)
messageStatusCountInc(messageErrored, c.group, m.Message().Topic)
messageStatusCountInc(messageSkipped, c.group, m.Message().Topic)
}
log.Errorf("could not process message(s) so skipping with error: %v", err)
default:
Expand Down
9 changes: 3 additions & 6 deletions component/sqs/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Component) consume(ctx context.Context, chErr chan error) {
}

logger.Debugf("Consume: received %d messages", len(output.Messages))
messageCountInc(ctx, c.queue.name, fetchedMessageState, false, len(output.Messages))
messageCountInc(c.queue.name, fetchedMessageState, false, len(output.Messages))

if len(output.Messages) == 0 {
continue
Expand Down Expand Up @@ -330,16 +330,13 @@ func observerMessageAge(queue string, attributes map[string]*string) {
messageAge.WithLabelValues(queue).Set(time.Now().UTC().Sub(time.Unix(timestamp, 0)).Seconds())
}

func messageCountInc(ctx context.Context, queue string, state messageState, hasError bool, count int) {
func messageCountInc(queue string, state messageState, hasError bool, count int) {
hasErrorString := "false"
if hasError {
hasErrorString = "true"
}

messageCounter := trace.Counter{
Counter: messageCounterVec.WithLabelValues(queue, string(state), hasErrorString),
}
messageCounter.Add(ctx, float64(count))
messageCounterVec.WithLabelValues(queue, string(state), hasErrorString).Add(float64(count))
}

func getCorrelationID(ma map[string]*sqs.MessageAttributeValue) string {
Expand Down
12 changes: 6 additions & 6 deletions component/sqs/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ func (m message) ACK() error {
ReceiptHandle: m.msg.ReceiptHandle,
})
if err != nil {
messageCountInc(m.ctx, m.queue.name, ackMessageState, true, 1)
messageCountInc(m.queue.name, ackMessageState, true, 1)
trace.SpanError(m.span)
return err
}
messageCountInc(m.ctx, m.queue.name, ackMessageState, false, 1)
messageCountInc(m.queue.name, ackMessageState, false, 1)
trace.SpanSuccess(m.span)
return nil
}

func (m message) NACK() {
messageCountInc(m.ctx, m.queue.name, nackMessageState, false, 1)
messageCountInc(m.queue.name, nackMessageState, false, 1)
trace.SpanSuccess(m.span)
}

Expand Down Expand Up @@ -117,23 +117,23 @@ func (b batch) ACK() ([]Message, error) {
QueueUrl: aws.String(b.queue.url),
})
if err != nil {
messageCountInc(b.ctx, b.queue.name, ackMessageState, true, len(b.messages))
messageCountInc(b.queue.name, ackMessageState, true, len(b.messages))
for _, msg := range b.messages {
trace.SpanError(msg.Span())
}
return nil, err
}

if len(output.Successful) > 0 {
messageCountInc(b.ctx, b.queue.name, ackMessageState, false, len(output.Successful))
messageCountInc(b.queue.name, ackMessageState, false, len(output.Successful))

for _, suc := range output.Successful {
trace.SpanSuccess(msgMap[aws.StringValue(suc.Id)].Span())
}
}

if len(output.Failed) > 0 {
messageCountInc(b.ctx, b.queue.name, ackMessageState, true, len(output.Failed))
messageCountInc(b.queue.name, ackMessageState, true, len(output.Failed))
failed := make([]Message, 0, len(output.Failed))
for _, fail := range output.Failed {
msg := msgMap[aws.StringValue(fail.Id)]
Expand Down
9 changes: 9 additions & 0 deletions trace/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type Counter struct {

// Add adds the given value to the counter. If there is a span associated with a context ctx the method
// replaces the currently saved exemplar (if any) with a new one, created from the provided value.
// NB: to have a counter metric augmented with exemplars a counter metric name MUST have a suffix "_total"
// otherwise the metric will not be collected by Prometheus, refer to an OpenMetrics specification:
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md
func (c *Counter) Add(ctx context.Context, count float64) {
spanFromCtx := opentracing.SpanFromContext(ctx)
if spanFromCtx != nil {
Expand All @@ -30,6 +33,9 @@ func (c *Counter) Add(ctx context.Context, count float64) {

// Inc increments the given value to the counter. If there is a span associated with a context ctx the method
// replaces the currently saved exemplar (if any) with a new one, created from the provided value.
// NB: to have a counter metric augmented with exemplars a counter metric name MUST have a suffix "_total"
// otherwise the metric will not be collected by Prometheus, refer to an OpenMetrics specification:
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md
func (c *Counter) Inc(ctx context.Context) {
spanFromCtx := opentracing.SpanFromContext(ctx)
if spanFromCtx != nil {
Expand All @@ -50,6 +56,9 @@ type Histogram struct {

// Observe adds an observation. If there is a span associated with a context ctx the method replaces
// the currently saved exemplar (if any) with a new one, created from the provided value.
// NB: to have a histogram metric augmented with exemplars a histogram metric name MUST have a suffix "_bucket".
// otherwise the metric will not be collected by Prometheus, refer to an OpenMetrics specification:
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md
func (h *Histogram) Observe(ctx context.Context, v float64) {
spanFromCtx := opentracing.SpanFromContext(ctx)
if spanFromCtx != nil {
Expand Down