Skip to content

Commit

Permalink
Update Producer Message to include key, closes #85 (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
slysterous authored and cpapidas committed Sep 23, 2019
1 parent b0c5863 commit 02bb6c0
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 3 deletions.
28 changes: 26 additions & 2 deletions trace/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,29 @@ import (
"github.com/beatlabs/patron/encoding/json"
"github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/trace"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

// Message abstraction of a Kafka message.
type Message struct {
topic string
body []byte
key *string
}

// NewMessage creates a new message.
func NewMessage(t string, b []byte) *Message {
return &Message{topic: t, body: b}
}

func NewMessageWithKey(t string, b []byte, k string) (*Message, error) {
if k == "" {
return nil, errors.New("key string can not be null")
}
return &Message{topic: t, body: b, key: &k}, nil
}

// NewJSONMessage creates a new message with a JSON encoded body.
func NewJSONMessage(t string, d interface{}) (*Message, error) {
b, err := json.Encode(d)
Expand All @@ -31,6 +39,18 @@ func NewJSONMessage(t string, d interface{}) (*Message, error) {
return &Message{topic: t, body: b}, nil
}

// NewJSONMessageWithKey creates a new message with a JSON encoded body and a message key
func NewJSONMessageWithKey(t string, d interface{}, k string) (*Message, error) {
if k == "" {
return nil, errors.New("key string can not be null")
}
b, err := json.Encode(d)
if err != nil {
return nil, errors.Wrap(err, "failed to JSON encode")
}
return &Message{topic: t, body: b, key: &k}, nil
}

// Producer interface for Kafka.
type Producer interface {
Send(ctx context.Context, msg *Message) error
Expand Down Expand Up @@ -112,9 +132,13 @@ func createProducerMessage(msg *Message, sp opentracing.Span) (*sarama.ProducerM
if err != nil {
return nil, errors.Wrap(err, "failed to inject tracing headers")
}
var saramaKey sarama.Encoder
if msg.key != nil {
saramaKey = sarama.ByteEncoder(*msg.key)
}
return &sarama.ProducerMessage{
Topic: msg.topic,
Key: nil,
Key: saramaKey,
Value: sarama.ByteEncoder(msg.body),
Headers: c,
}, nil
Expand Down
67 changes: 66 additions & 1 deletion trace/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/Shopify/sarama"
"github.com/beatlabs/patron/trace"
"github.com/stretchr/testify/assert"
jaeger "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go"
)

func TestNewMessage(t *testing.T) {
Expand All @@ -16,6 +16,29 @@ func TestNewMessage(t *testing.T) {
assert.Equal(t, []byte("TEST"), m.body)
}

func TestNewMessageWithKey(t *testing.T) {
tests := []struct {
name string
data []byte
key string
wantErr bool
}{
{name: "success", data: []byte("TEST"), key: "TEST"},
{name: "failure due to empty message key", data: []byte("TEST"), key: "", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewMessageWithKey("TOPIC", tt.data, tt.key)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
} else {
assert.NoError(t, err)
assert.NotNil(t, got)
}
})
}
}
func TestNewJSONMessage(t *testing.T) {
tests := []struct {
name string
Expand All @@ -38,6 +61,30 @@ func TestNewJSONMessage(t *testing.T) {
})
}
}
func TestNewJSONMessageWithKey(t *testing.T) {
tests := []struct {
name string
data interface{}
key string
wantErr bool
}{
{name: "failure due to invalid data", data: make(chan bool), key: "TEST", wantErr: true},
{name: "success", data: "TEST", key: "TEST"},
{name: "failure due to empty message key", data: "TEST", key: "", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewJSONMessageWithKey("TOPIC", tt.data, tt.key)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
} else {
assert.NoError(t, err)
assert.NotNil(t, got)
}
})
}
}

func TestNewSyncProducer_Failure(t *testing.T) {
got, err := NewAsyncProducer([]string{})
Expand Down Expand Up @@ -74,6 +121,24 @@ func TestAsyncProducer_SendMessage_Close(t *testing.T) {
assert.NoError(t, ap.Close())
}

func TestAsyncProducer_SendMessage_WithKey(t *testing.T) {
testKey := "TEST"
msg, err := NewJSONMessageWithKey("TOPIC", "TEST", testKey)
assert.Equal(t, testKey, *msg.key)
assert.NoError(t, err)
seed := createKafkaBroker(t, true)
ap, err := NewAsyncProducer([]string{seed.Addr()}, Version(sarama.V0_8_2_0.String()))
assert.NoError(t, err)
assert.NotNil(t, ap)
err = trace.Setup("test", "1.0.0", "0.0.0.0:6831", jaeger.SamplerTypeProbabilistic, 0.1)
assert.NoError(t, err)
_, ctx := trace.ChildSpan(context.Background(), "123", "cmp")
err = ap.Send(ctx, msg)
assert.NoError(t, err)
assert.Error(t, <-ap.Error())
assert.NoError(t, ap.Close())
}

func createKafkaBroker(t *testing.T, retError bool) *sarama.MockBroker {
lead := sarama.NewMockBroker(t, 2)
metadataResponse := new(sarama.MetadataResponse)
Expand Down

0 comments on commit 02bb6c0

Please sign in to comment.