Skip to content

Commit

Permalink
Update content type and subject format
Browse files Browse the repository at this point in the history
  • Loading branch information
josejulio committed Sep 16, 2024
1 parent c394f88 commit a920b76
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions internal/eventing/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"strings"

confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -160,7 +161,8 @@ func (p *kafkaProducer) Produce(ctx context.Context, event *api.Event) error {
return err
}

e.SetSubject(makeCloudEventSubject(event.EventType, event.ResourceId))
e.SetDataContentType("application/json")
e.SetSubject(makeCloudEventSubject(event.EventType, event.ResourceType, event.ResourceId))

ret := p.Manager.Client.Send(confluent.WithMessageKey(cecontext.WithTopic(cloudevents.WithEncodingStructured(ctx), p.Topic), p.Manager.Source), e)
if cloudevents.IsUndelivered(ret) {
Expand All @@ -184,6 +186,6 @@ func makeCloudEventType(eventType, resourceType, operation string) string {
return fmt.Sprintf("redhat.inventory.%s.%s.%s", eventType, resourceType, operation)
}

func makeCloudEventSubject(eventType string, resourceId string) string {
return fmt.Sprintf("%s/%s", eventType, resourceId)
func makeCloudEventSubject(eventType, resourceType, resourceId string) string {
return "/" + strings.Join([]string{eventType, resourceType, resourceId}, "/")
}

0 comments on commit a920b76

Please sign in to comment.