Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Exemplars Support #448

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
200bb33
add support for Prometheus Exemplars
EvgeniaMartynova-thebeat Dec 21, 2021
587a5d7
add support for Prometheus Exemplars
EvgeniaMartynova-thebeat Dec 21, 2021
e92ba95
use opentracing.SpanFromContext() instead of a separate module for no…
EvgeniaMartynova-thebeat Dec 21, 2021
f183540
use opentracing.SpanFromContext() instead of a separate module for no…
EvgeniaMartynova-thebeat Dec 21, 2021
2d3c3e8
add support for Prometheus Exemplars for different clients such as gr…
EvgeniaMartynova-thebeat Dec 24, 2021
43002eb
Merge branch 'master' into feat/prometheus-exemplars-support
Dec 27, 2021
e9f654d
create a constant in trace package for TraceID
EvgeniaMartynova-thebeat Dec 31, 2021
40eae9e
Merge remote-tracking branch 'origin/feat/prometheus-exemplars-suppor…
EvgeniaMartynova-thebeat Dec 31, 2021
b19e769
add support of Prometheus Exemplars for request/message CounterVec me…
EvgeniaMartynova-thebeat Jan 3, 2022
2cee064
use decoration pattern for metrics to reduce a code duplication
EvgeniaMartynova-thebeat Jan 7, 2022
cd67c81
use decoration pattern for metrics to reduce a code duplication
EvgeniaMartynova-thebeat Jan 7, 2022
d4b073c
Merge branch 'master' into feat/prometheus-exemplars-support
Jan 7, 2022
a6324be
add documentation, PR comments
EvgeniaMartynova-thebeat Jan 9, 2022
cadf753
Merge remote-tracking branch 'origin/feat/prometheus-exemplars-suppor…
EvgeniaMartynova-thebeat Jan 9, 2022
40691fb
update documentation according to a PR comment
EvgeniaMartynova-thebeat Jan 10, 2022
a0fad1b
update tests according to a PR comment
EvgeniaMartynova-thebeat Jan 13, 2022
60554ab
a potential panic fix
EvgeniaMartynova-thebeat Jan 19, 2022
409dede
Merge branch 'master' into feat/prometheus-exemplars-support
Jan 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions client/amqp/v2/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"context"
"errors"
"fmt"
"github.com/uber/jaeger-client-go"
"strconv"
"time"

"github.com/uber/jaeger-client-go"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line should be removed

"github.com/beatlabs/patron/correlation"
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
"github.com/streadway/amqp"
Expand Down Expand Up @@ -128,7 +129,7 @@ func observePublish(span opentracing.Span, start time.Time, exchange string, err

if sctx, ok := span.Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
time.Since(start).Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(time.Since(start).Seconds())
Expand Down
7 changes: 4 additions & 3 deletions client/es/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package es
import (
"bytes"
"fmt"
"github.com/uber/jaeger-client-go"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -13,12 +12,14 @@ import (
"strings"
"time"

"github.com/uber/jaeger-client-go"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/estransport"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -125,7 +126,7 @@ func observeResponse(req *http.Request, sp opentracing.Span, rsp *http.Response,

if sctx, ok := sp.Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
time.Since(start).Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(time.Since(start).Seconds())
Expand Down
5 changes: 3 additions & 2 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ package grpc

import (
"context"
"github.com/uber/jaeger-client-go"
"time"

"github.com/uber/jaeger-client-go"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

"github.com/beatlabs/patron/correlation"
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
Expand Down Expand Up @@ -91,7 +92,7 @@ func unaryInterceptor(target string) grpc.UnaryClientInterceptor {
durationHistogram := rpcDurationMetrics.WithLabelValues(unary, target, method, rpcStatus.Code().String())
if sctx, ok := span.Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
invokeDuration.Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
invokeDuration.Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(invokeDuration.Seconds())
Expand Down
5 changes: 3 additions & 2 deletions client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ package http
import (
"compress/flate"
"compress/gzip"
"github.com/uber/jaeger-client-go"
"io"
"net/http"
"strconv"
"time"

"github.com/uber/jaeger-client-go"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (tc *TracedClient) Do(req *http.Request) (*http.Response, error) {

if sctx, ok := ht.Span().Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a way to type assert safely which is:
v, ok := x.(T) // ok will be either true or false

time.Since(start).Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
time.Since(start).Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(time.Since(start).Seconds())
Expand Down
4 changes: 2 additions & 2 deletions client/kafka/v2/async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage)

err := injectTracingHeaders(msg, sp)
if err != nil {
statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}

ap.asyncProd.Input() <- msg
statusCountAdd(deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return nil
}
Expand Down
21 changes: 21 additions & 0 deletions client/kafka/v2/kafka.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package v2

import (
"context"
"errors"
"fmt"
"os"

"github.com/beatlabs/patron/trace"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

"github.com/Shopify/sarama"
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/internal/validation"
Expand Down Expand Up @@ -41,6 +46,22 @@ func init() {
prometheus.MustRegister(messageStatus)
}

func statusCountAddWithExemplars(ctx context.Context, deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatusCounter := messageStatus.WithLabelValues(string(status), topic, deliveryType)
spanFromCtx := opentracing.SpanFromContext(ctx)
if spanFromCtx != nil {
if sctx, ok := spanFromCtx.Context().(jaeger.SpanContext); ok {
messageStatusCounter.(prometheus.ExemplarAdder).AddWithExemplar(
float64(cnt), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
messageStatusCounter.Add(float64(cnt))
}
} else {
messageStatusCounter.Add(float64(cnt))
}
}

func statusCountAdd(deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatus.WithLabelValues(string(status), topic, deliveryType).Add(float64(cnt))
}
Expand Down
16 changes: 8 additions & 8 deletions client/kafka/v2/sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ func (p *SyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) (p

err = injectTracingHeaders(msg, sp)
if err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, fmt.Errorf("failed to inject tracing headers: %w", err)
}

partition, offset, err = p.syncProd.SendMessage(msg)
if err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, err
}

statusCountAdd(deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return partition, offset, nil
}
Expand All @@ -57,19 +57,19 @@ func (p *SyncProducer) SendBatch(ctx context.Context, messages []*sarama.Produce

for _, msg := range messages {
if err := injectTracingHeaders(msg, sp); err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}
}

if err := p.syncProd.SendMessages(messages); err != nil {
statusCountBatchAdd(deliveryTypeSync, deliveryStatusSendError, messages)
statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSendError, messages)
trace.SpanError(sp)
return err
}

statusCountBatchAdd(deliveryTypeSync, deliveryStatusSent, messages)
statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSent, messages)
trace.SpanSuccess(sp)
return nil
}
Expand All @@ -87,8 +87,8 @@ func (p *SyncProducer) Close() error {
return nil
}

func statusCountBatchAdd(deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
func statusCountBatchAdd(ctx context.Context, deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
for _, msg := range messages {
statusCountAdd(deliveryType, status, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryType, status, msg.Topic, 1)
}
}
7 changes: 4 additions & 3 deletions client/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package redis

import (
"context"
"github.com/uber/jaeger-client-go"
"strconv"
"time"

"github.com/uber/jaeger-client-go"

"github.com/beatlabs/patron/trace"
"github.com/go-redis/redis/extra/rediscmd"
"github.com/go-redis/redis/v8"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -96,7 +97,7 @@ func observeDuration(ctx context.Context, cmd string, err error) {
if spanFromCtx != nil {
if sctx, ok := spanFromCtx.Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
dur.Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
dur.Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(dur.Seconds())
Expand Down
5 changes: 3 additions & 2 deletions client/sns/v2/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"context"
"errors"
"fmt"
"github.com/uber/jaeger-client-go"
"strconv"
"time"

"github.com/uber/jaeger-client-go"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sns/snsiface"
Expand Down Expand Up @@ -131,7 +132,7 @@ func observePublish(span opentracing.Span, start time.Time, topic string, err er

if sctx, ok := span.Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
time.Since(start).Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(time.Since(start).Seconds())
Expand Down
7 changes: 4 additions & 3 deletions client/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"context"
"database/sql"
"database/sql/driver"
"github.com/uber/jaeger-client-go"
"regexp"
"strconv"
"time"

"github.com/uber/jaeger-client-go"

"github.com/beatlabs/patron/trace"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -498,7 +499,7 @@ func observeDuration(span opentracing.Span, start time.Time, op string, err erro

if sctx, ok := span.Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
time.Since(start).Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(time.Since(start).Seconds())
Expand Down
5 changes: 3 additions & 2 deletions client/sqs/v2/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"context"
"errors"
"fmt"
"github.com/uber/jaeger-client-go"
"strconv"
"time"

"github.com/uber/jaeger-client-go"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
Expand Down Expand Up @@ -108,7 +109,7 @@ func observePublish(span opentracing.Span, start time.Time, queue string, err er

if sctx, ok := span.Context().(jaeger.SpanContext); ok {
durationHistogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": sctx.TraceID().String()},
time.Since(start).Seconds(), prometheus.Labels{trace.TraceID: sctx.TraceID().String()},
)
} else {
durationHistogram.Observe(time.Since(start).Seconds())
Expand Down
Loading