Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Exemplars Support #448

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
200bb33
add support for Prometheus Exemplars
EvgeniaMartynova-thebeat Dec 21, 2021
587a5d7
add support for Prometheus Exemplars
EvgeniaMartynova-thebeat Dec 21, 2021
e92ba95
use opentracing.SpanFromContext() instead of a separate module for no…
EvgeniaMartynova-thebeat Dec 21, 2021
f183540
use opentracing.SpanFromContext() instead of a separate module for no…
EvgeniaMartynova-thebeat Dec 21, 2021
2d3c3e8
add support for Prometheus Exemplars for different clients such as gr…
EvgeniaMartynova-thebeat Dec 24, 2021
43002eb
Merge branch 'master' into feat/prometheus-exemplars-support
Dec 27, 2021
e9f654d
create a constant in trace package for TraceID
EvgeniaMartynova-thebeat Dec 31, 2021
40eae9e
Merge remote-tracking branch 'origin/feat/prometheus-exemplars-suppor…
EvgeniaMartynova-thebeat Dec 31, 2021
b19e769
add support of Prometheus Exemplars for request/message CounterVec me…
EvgeniaMartynova-thebeat Jan 3, 2022
2cee064
use decoration pattern for metrics to reduce a code duplication
EvgeniaMartynova-thebeat Jan 7, 2022
cd67c81
use decoration pattern for metrics to reduce a code duplication
EvgeniaMartynova-thebeat Jan 7, 2022
d4b073c
Merge branch 'master' into feat/prometheus-exemplars-support
Jan 7, 2022
a6324be
add documentation, PR comments
EvgeniaMartynova-thebeat Jan 9, 2022
cadf753
Merge remote-tracking branch 'origin/feat/prometheus-exemplars-suppor…
EvgeniaMartynova-thebeat Jan 9, 2022
40691fb
update documentation according to a PR comment
EvgeniaMartynova-thebeat Jan 10, 2022
a0fad1b
update tests according to a PR comment
EvgeniaMartynova-thebeat Jan 13, 2022
60554ab
a potential panic fix
EvgeniaMartynova-thebeat Jan 19, 2022
409dede
Merge branch 'master' into feat/prometheus-exemplars-support
Jan 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions client/amqp/v2/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
"github.com/streadway/amqp"
Expand Down Expand Up @@ -100,7 +100,7 @@ func (tc *Publisher) Publish(ctx context.Context, exchange, key string, mandator
start := time.Now()
err := tc.channel.Publish(exchange, key, mandatory, immediate, msg)

observePublish(sp, start, exchange, err)
observePublish(ctx, sp, start, exchange, err)
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}
Expand All @@ -120,7 +120,11 @@ func (c amqpHeadersCarrier) Set(key, val string) {
c[key] = val
}

func observePublish(span opentracing.Span, start time.Time, exchange string, err error) {
func observePublish(ctx context.Context, span opentracing.Span, start time.Time, exchange string, err error) {
trace.SpanComplete(span, err)
publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())

durationHistogram := trace.Histogram{
Observer: publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err == nil)),
}
durationHistogram.Observe(ctx, time.Since(start).Seconds())
}
8 changes: 5 additions & 3 deletions client/es/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/estransport"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -120,8 +120,10 @@ func (c *transportClient) Perform(req *http.Request) (*http.Response, error) {

func observeResponse(req *http.Request, sp opentracing.Span, rsp *http.Response, start time.Time) {
endSpan(sp, rsp)
reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)).
Observe(time.Since(start).Seconds())
durationHistogram := trace.Histogram{
Observer: reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)),
}
durationHistogram.Observe(req.Context(), time.Since(start).Seconds())
}

// Config is a wrapper for elasticsearch.Config
Expand Down
8 changes: 5 additions & 3 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func unaryInterceptor(target string) grpc.UnaryClientInterceptor {
invokeDuration := time.Since(invokeTime)

rpcStatus, _ := status.FromError(err) // codes.OK if err == nil, codes.Unknown if !ok
rpcDurationMetrics.
WithLabelValues(unary, target, method, rpcStatus.Code().String()).
Observe(invokeDuration.Seconds())

durationHistogram := trace.Histogram{
Observer: rpcDurationMetrics.WithLabelValues(unary, target, method, rpcStatus.Code().String()),
}
durationHistogram.Observe(ctx, invokeDuration.Seconds())

if err != nil {
trace.SpanError(span)
Expand Down
7 changes: 4 additions & 3 deletions client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ func (tc *TracedClient) Do(req *http.Request) (*http.Response, error) {
}

ext.HTTPStatusCode.Set(ht.Span(), uint16(rsp.StatusCode))
reqDurationMetrics.
WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)).
Observe(time.Since(start).Seconds())
durationHistogram := trace.Histogram{
Observer: reqDurationMetrics.WithLabelValues(req.Method, req.URL.Host, strconv.Itoa(rsp.StatusCode)),
}
durationHistogram.Observe(req.Context(), time.Since(start).Seconds())

if hdr := req.Header.Get(encoding.AcceptEncodingHeader); hdr != "" {
rsp.Body = decompress(hdr, rsp)
Expand Down
4 changes: 2 additions & 2 deletions client/kafka/v2/async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage)

err := injectTracingHeaders(msg, sp)
if err != nil {
statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}

ap.asyncProd.Input() <- msg
statusCountAdd(deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions client/kafka/v2/kafka.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package v2

import (
"context"
"errors"
"fmt"
"os"

"github.com/Shopify/sarama"
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/internal/validation"
"github.com/beatlabs/patron/trace"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -41,6 +43,11 @@ func init() {
prometheus.MustRegister(messageStatus)
}

func statusCountAddWithExemplars(ctx context.Context, deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatusCounter := trace.Counter{Counter: messageStatus.WithLabelValues(string(status), topic, deliveryType)}
messageStatusCounter.Add(ctx, float64(cnt))
}

func statusCountAdd(deliveryType string, status deliveryStatus, topic string, cnt int) {
messageStatus.WithLabelValues(string(status), topic, deliveryType).Add(float64(cnt))
}
Expand Down
16 changes: 8 additions & 8 deletions client/kafka/v2/sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ func (p *SyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) (p

err = injectTracingHeaders(msg, sp)
if err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, fmt.Errorf("failed to inject tracing headers: %w", err)
}

partition, offset, err = p.syncProd.SendMessage(msg)
if err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1)
trace.SpanError(sp)
return -1, -1, err
}

statusCountAdd(deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSent, msg.Topic, 1)
trace.SpanSuccess(sp)
return partition, offset, nil
}
Expand All @@ -57,19 +57,19 @@ func (p *SyncProducer) SendBatch(ctx context.Context, messages []*sarama.Produce

for _, msg := range messages {
if err := injectTracingHeaders(msg, sp); err != nil {
statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
statusCountAddWithExemplars(ctx, deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages))
trace.SpanError(sp)
return fmt.Errorf("failed to inject tracing headers: %w", err)
}
}

if err := p.syncProd.SendMessages(messages); err != nil {
statusCountBatchAdd(deliveryTypeSync, deliveryStatusSendError, messages)
statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSendError, messages)
trace.SpanError(sp)
return err
}

statusCountBatchAdd(deliveryTypeSync, deliveryStatusSent, messages)
statusCountBatchAdd(ctx, deliveryTypeSync, deliveryStatusSent, messages)
trace.SpanSuccess(sp)
return nil
}
Expand All @@ -87,8 +87,8 @@ func (p *SyncProducer) Close() error {
return nil
}

func statusCountBatchAdd(deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
func statusCountBatchAdd(ctx context.Context, deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) {
for _, msg := range messages {
statusCountAdd(deliveryType, status, msg.Topic, 1)
statusCountAddWithExemplars(ctx, deliveryType, status, msg.Topic, 1)
}
}
7 changes: 5 additions & 2 deletions client/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/beatlabs/patron/trace"
"github.com/go-redis/redis/extra/rediscmd"
"github.com/go-redis/redis/v8"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -89,7 +89,10 @@ func (th tracingHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmd

func observeDuration(ctx context.Context, cmd string, err error) {
dur := time.Since(ctx.Value(duration{}).(time.Time))
cmdDurationMetrics.WithLabelValues(cmd, strconv.FormatBool(err != nil)).Observe(dur.Seconds())
durationHistogram := trace.Histogram{
Observer: cmdDurationMetrics.WithLabelValues(cmd, strconv.FormatBool(err == nil)),
}
durationHistogram.Observe(ctx, dur.Seconds())
}

func startSpan(ctx context.Context, address, opName string) (opentracing.Span, context.Context) {
Expand Down
12 changes: 8 additions & 4 deletions client/sns/v2/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func (p Publisher) Publish(ctx context.Context, input *sns.PublishInput) (messag
start := time.Now()
out, err := p.api.PublishWithContext(ctx, input)
if input.TopicArn != nil {
observePublish(span, start, *input.TopicArn, err)
observePublish(ctx, span, start, *input.TopicArn, err)
}
if input.TargetArn != nil {
observePublish(span, start, *input.TargetArn, err)
observePublish(ctx, span, start, *input.TargetArn, err)
}
if err != nil {
return "", fmt.Errorf("failed to publish message: %w", err)
Expand Down Expand Up @@ -124,7 +124,11 @@ func injectHeaders(span opentracing.Span, input *sns.PublishInput) error {
return nil
}

func observePublish(span opentracing.Span, start time.Time, topic string, err error) {
func observePublish(ctx context.Context, span opentracing.Span, start time.Time, topic string, err error) {
trace.SpanComplete(span, err)
publishDurationMetrics.WithLabelValues(topic, strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())

durationHistogram := trace.Histogram{
Observer: publishDurationMetrics.WithLabelValues(topic, strconv.FormatBool(err == nil)),
}
durationHistogram.Observe(ctx, time.Since(start).Seconds())
}
Loading