From 9f397db8a0d3f0f1d23315188bc79217b4ceceee Mon Sep 17 00:00:00 2001 From: Antony Natale Date: Fri, 6 Sep 2024 15:38:48 -0400 Subject: [PATCH] adds counter for kafka events to producer --- internal/eventing/factory.go | 6 ++++- internal/eventing/kafka/manager.go | 37 ++++++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/internal/eventing/factory.go b/internal/eventing/factory.go index 8e9162e5..132e77b0 100644 --- a/internal/eventing/factory.go +++ b/internal/eventing/factory.go @@ -15,7 +15,11 @@ func New(c CompletedConfig, source string, logger *log.Helper) (api.Manager, err case "stdout": return stdout.New(logger) case "kafka": - return kafka.New(c.Kafka, source, logger) + km, err := kafka.New(c.Kafka, source, logger) + if err != nil { + return nil, err + } + return km, nil } return nil, fmt.Errorf("unrecognized eventer type: %s", c.Eventer) diff --git a/internal/eventing/kafka/manager.go b/internal/eventing/kafka/manager.go index 93f40224..23f72ede 100644 --- a/internal/eventing/kafka/manager.go +++ b/internal/eventing/kafka/manager.go @@ -5,6 +5,9 @@ import ( "fmt" "github.com/go-kratos/kratos/v2/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -95,7 +98,11 @@ func (m *KafkaManager) Errs() <-chan error { func (m *KafkaManager) Lookup(identity *authnapi.Identity, resource_type string, resource_id int64) (api.Producer, error) { // there is no complicated topic dispatch logic... for now. - return NewProducer(m, m.Config.DefaultTopic, identity), nil + producer, err := NewProducer(m, m.Config.DefaultTopic, identity) + if err != nil { + return nil, err + } + return producer, nil } func (m *KafkaManager) Shutdown(ctx context.Context) error { @@ -107,18 +114,30 @@ type kafkaProducer struct { Topic string Identity *authnapi.Identity - Logger *log.Helper + Logger *log.Helper + eventsCounter metric.Int64Counter +} + +// NewProducerEventsCounter creates a meter for capturing event metrics +func NewProducerEventsCounter(meter metric.Meter, histogramName string) (metric.Int64Counter, error) { + return meter.Int64Counter(histogramName, metric.WithUnit("{event_type}")) } // NewProducer produces a kafka producer that is bound to a particular topic. -func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identity) *kafkaProducer { +func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identity) (*kafkaProducer, error) { + meter := otel.Meter("github.com/project-kessel/inventory-api/blob/main/internal/server/otel") + eventsCounter, err := NewProducerEventsCounter(meter, "kafka_event") + if err != nil { + return nil, err + } return &kafkaProducer{ Manager: manager, Topic: topic, Identity: identity, - Logger: manager.Logger, - } + Logger: manager.Logger, + eventsCounter: eventsCounter, + }, nil } // Produce creates the cloud event and sends it on the Kafka Topic @@ -143,5 +162,13 @@ func (p *kafkaProducer) Produce(ctx context.Context, event *api.Event) error { p.Logger.Infof("Kafka returned: %v", ret) } + p.eventsCounter.Add( + context.Background(), + 1, + metric.WithAttributes( + attribute.String("event_type", event.EventType), + attribute.String("resource_type", event.ResourceType), + ), + ) return ret }