Skip to content

Commit

Permalink
Merge pull request #102 from tonytheleg/RHCLOUD-34499-add-eventing-me…
Browse files Browse the repository at this point in the history
…trics

RHCLOUD-34499 adds counter for kafka events to producer
  • Loading branch information
tonytheleg authored Sep 11, 2024
2 parents d43a2cd + 9f397db commit 9310dac
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
6 changes: 5 additions & 1 deletion internal/eventing/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ func New(c CompletedConfig, source string, logger *log.Helper) (api.Manager, err
case "stdout":
return stdout.New(logger)
case "kafka":
return kafka.New(c.Kafka, source, logger)
km, err := kafka.New(c.Kafka, source, logger)
if err != nil {
return nil, err
}
return km, nil
}

return nil, fmt.Errorf("unrecognized eventer type: %s", c.Eventer)
Expand Down
37 changes: 32 additions & 5 deletions internal/eventing/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"

"github.com/go-kratos/kratos/v2/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -95,7 +98,11 @@ func (m *KafkaManager) Errs() <-chan error {
func (m *KafkaManager) Lookup(identity *authnapi.Identity, resource_type string, resource_id int64) (api.Producer, error) {

// there is no complicated topic dispatch logic... for now.
return NewProducer(m, m.Config.DefaultTopic, identity), nil
producer, err := NewProducer(m, m.Config.DefaultTopic, identity)
if err != nil {
return nil, err
}
return producer, nil
}

func (m *KafkaManager) Shutdown(ctx context.Context) error {
Expand All @@ -107,18 +114,30 @@ type kafkaProducer struct {
Topic string
Identity *authnapi.Identity

Logger *log.Helper
Logger *log.Helper
eventsCounter metric.Int64Counter
}

// NewProducerEventsCounter creates a meter for capturing event metrics
func NewProducerEventsCounter(meter metric.Meter, histogramName string) (metric.Int64Counter, error) {
return meter.Int64Counter(histogramName, metric.WithUnit("{event_type}"))
}

// NewProducer produces a kafka producer that is bound to a particular topic.
func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identity) *kafkaProducer {
func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identity) (*kafkaProducer, error) {
meter := otel.Meter("github.com/project-kessel/inventory-api/blob/main/internal/server/otel")
eventsCounter, err := NewProducerEventsCounter(meter, "kafka_event")
if err != nil {
return nil, err
}
return &kafkaProducer{
Manager: manager,
Topic: topic,
Identity: identity,

Logger: manager.Logger,
}
Logger: manager.Logger,
eventsCounter: eventsCounter,
}, nil
}

// Produce creates the cloud event and sends it on the Kafka Topic
Expand All @@ -143,5 +162,13 @@ func (p *kafkaProducer) Produce(ctx context.Context, event *api.Event) error {
p.Logger.Infof("Kafka returned: %v", ret)
}

p.eventsCounter.Add(
context.Background(),
1,
metric.WithAttributes(
attribute.String("event_type", event.EventType),
attribute.String("resource_type", event.ResourceType),
),
)
return ret
}

0 comments on commit 9310dac

Please sign in to comment.