Skip to content

Commit

Permalink
Relocate Kafka component config (#433)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sotirios Mantziaris authored Oct 29, 2021
1 parent 7b51009 commit e746e6b
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 65 deletions.
21 changes: 0 additions & 21 deletions client/kafka/v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,6 @@ func New(brokers []string, saramaConfig *sarama.Config) *Builder {
}
}

// DefaultConsumerSaramaConfig function creates a Sarama configuration with a client ID derived from host name and consumer name.
func DefaultConsumerSaramaConfig(name string, readCommitted bool) (*sarama.Config, error) {
host, err := os.Hostname()
if err != nil {
return nil, errors.New("failed to get hostname")
}

config := sarama.NewConfig()
config.ClientID = fmt.Sprintf("%s-%s", host, name)
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_0
if readCommitted {
// from Kafka documentation:
// Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. In order for this to work, consumers reading from these partitions should be configured to only read committed data. This can be achieved by by setting the isolation.level=read_committed in the consumer's configuration.
// In read_committed mode, the consumer will read only those transactional messages which have been successfully committed. It will continue to read non-transactional messages as before. There is no client-side buffering in read_committed mode. Instead, the end offset of a partition for a read_committed consumer would be the offset of the first message in the partition belonging to an open transaction. This offset is known as the 'Last Stable Offset'(LSO).
config.Consumer.IsolationLevel = sarama.ReadCommitted
}

return config, nil
}

// DefaultProducerSaramaConfig creates a default Sarama configuration with idempotency enabled.
// See also:
// * https://pkg.go.dev/github.com/Shopify/sarama#RequiredAcks
Expand Down
11 changes: 0 additions & 11 deletions client/kafka/v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,6 @@ func TestBuilder_CreateAsync(t *testing.T) {
}
}

func Test_DefaultConsumerSaramaConfig(t *testing.T) {
sc, err := DefaultConsumerSaramaConfig("name", true)
require.NoError(t, err)
require.True(t, strings.HasSuffix(sc.ClientID, fmt.Sprintf("-%s", "name")))
require.Equal(t, sarama.ReadCommitted, sc.Consumer.IsolationLevel)

sc, err = DefaultConsumerSaramaConfig("name", false)
require.NoError(t, err)
require.NotEqual(t, sarama.ReadCommitted, sc.Consumer.IsolationLevel)
}

func TestDefaultProducerSaramaConfig(t *testing.T) {
sc, err := DefaultProducerSaramaConfig("name", true)
require.NoError(t, err)
Expand Down
14 changes: 6 additions & 8 deletions component/async/kafka/group/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/async"
"github.com/beatlabs/patron/component/async/kafka"
kafkacmp "github.com/beatlabs/patron/component/kafka"
"github.com/beatlabs/patron/encoding"
"github.com/beatlabs/patron/encoding/json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
t.Parallel()

defaultSaramaCfg, err := v2.DefaultConsumerSaramaConfig("test-consumer", false)
defaultSaramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-consumer", false)
require.Nil(t, err)

brokers := []string{"192.168.1.1"}
Expand Down Expand Up @@ -127,7 +126,7 @@ func TestFactory_Create(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

saramaCfg, err := v2.DefaultConsumerSaramaConfig(tt.fields.clientName, false)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig(tt.fields.clientName, false)
require.Nil(t, err)

f, err := New(tt.fields.clientName, "no-group", tt.fields.topics, tt.fields.brokers, saramaCfg, tt.fields.oo...)
Expand Down Expand Up @@ -229,7 +228,6 @@ func saramaConsumerMessage(value string, header *sarama.RecordHeader) *sarama.Co
}

func versionedConsumerMessage(value string, header *sarama.RecordHeader, version uint8) *sarama.ConsumerMessage {

bytes := []byte(value)

if version > 0 {
Expand All @@ -249,7 +247,7 @@ func versionedConsumerMessage(value string, header *sarama.RecordHeader, version
}

func TestConsumer_ConsumeFailedBroker(t *testing.T) {
saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-consumer", false)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-consumer", false)
require.NoError(t, err)
f, err := New("name", "group", []string{"topic"}, []string{"1", "2"}, saramaCfg)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions component/async/kafka/simple/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (
"time"

"github.com/Shopify/sarama"
v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/async/kafka"
kafkacmp "github.com/beatlabs/patron/component/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
t.Parallel()

defaultSaramaCfg, err := v2.DefaultConsumerSaramaConfig("test-consumer", false)
defaultSaramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-consumer", false)
require.Nil(t, err)

brokers := []string{"192.168.1.1"}
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestNew(t *testing.T) {
}

func TestFactory_Create(t *testing.T) {
saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-consumer", false)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-consumer", false)
require.Nil(t, err)

type fields struct {
Expand Down
24 changes: 24 additions & 0 deletions component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package kafka

import (
"context"
"errors"
"fmt"
"os"

"github.com/Shopify/sarama"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -84,3 +87,24 @@ type batch struct {
func (b batch) Messages() []Message {
return b.messages
}

// DefaultConsumerSaramaConfig function creates a Sarama configuration with a client ID derived from host name and consumer name.
func DefaultConsumerSaramaConfig(name string, readCommitted bool) (*sarama.Config, error) {
host, err := os.Hostname()
if err != nil {
return nil, errors.New("failed to get hostname")
}

config := sarama.NewConfig()
config.ClientID = fmt.Sprintf("%s-%s", host, name)
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_0
if readCommitted {
// from Kafka documentation:
// Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. In order for this to work, consumers reading from these partitions should be configured to only read committed data. This can be achieved by by setting the isolation.level=read_committed in the consumer's configuration.
// In read_committed mode, the consumer will read only those transactional messages which have been successfully committed. It will continue to read non-transactional messages as before. There is no client-side buffering in read_committed mode. Instead, the end offset of a partition for a read_committed consumer would be the offset of the first message in the partition belonging to an open transaction. This offset is known as the 'Last Stable Offset'(LSO).
config.Consumer.IsolationLevel = sarama.ReadCommitted
}

return config, nil
}
14 changes: 14 additions & 0 deletions component/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package kafka

import (
"context"
"fmt"
"strings"
"testing"

"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/require"

"github.com/Shopify/sarama"
"github.com/beatlabs/patron/correlation"
Expand Down Expand Up @@ -74,3 +77,14 @@ func Test_Message(t *testing.T) {
assert.Equal(t, span, msg.Span())
assert.Equal(t, cm, msg.Message())
}

func Test_DefaultConsumerSaramaConfig(t *testing.T) {
sc, err := DefaultConsumerSaramaConfig("name", true)
require.NoError(t, err)
require.True(t, strings.HasSuffix(sc.ClientID, fmt.Sprintf("-%s", "name")))
require.Equal(t, sarama.ReadCommitted, sc.Consumer.IsolationLevel)

sc, err = DefaultConsumerSaramaConfig("name", false)
require.NoError(t, err)
require.NotEqual(t, sarama.ReadCommitted, sc.Consumer.IsolationLevel)
}
3 changes: 2 additions & 1 deletion examples/http-sec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
v2 "github.com/beatlabs/patron/client/kafka/v2"
patronhttp "github.com/beatlabs/patron/component/http"
"github.com/beatlabs/patron/component/http/auth/apikey"
"github.com/beatlabs/patron/component/kafka"
"github.com/beatlabs/patron/encoding/json"
"github.com/beatlabs/patron/examples"
"github.com/beatlabs/patron/log"
Expand Down Expand Up @@ -79,7 +80,7 @@ type kafkaProducer struct {

// newAsyncKafkaProducer creates a new asynchronous kafka producer client
func newAsyncKafkaProducer(kafkaBroker, topic string, readCommitted bool) (*kafkaProducer, error) {
saramaCfg, err := v2.DefaultConsumerSaramaConfig("http-sec-consumer", readCommitted)
saramaCfg, err := kafka.DefaultConsumerSaramaConfig("http-sec-consumer", readCommitted)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions examples/kafka-legacy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

"github.com/beatlabs/patron"
patronamqp "github.com/beatlabs/patron/client/amqp/v2"
v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/async"
"github.com/beatlabs/patron/component/async/kafka"
"github.com/beatlabs/patron/component/async/kafka/group"
kafkacmp "github.com/beatlabs/patron/component/kafka"
"github.com/beatlabs/patron/encoding/json"
"github.com/beatlabs/patron/encoding/protobuf"
"github.com/beatlabs/patron/examples"
Expand Down Expand Up @@ -87,7 +87,7 @@ func newKafkaComponent(name, broker, topic, groupID string, publisher *patronamq
pub: publisher,
}

saramaCfg, err := v2.DefaultConsumerSaramaConfig("kafka-legacy", false)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("kafka-legacy", false)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions examples/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/Shopify/sarama"
"github.com/beatlabs/patron"
patronamqp "github.com/beatlabs/patron/client/amqp/v2"
v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/kafka"
"github.com/beatlabs/patron/component/kafka/group"
"github.com/beatlabs/patron/encoding/json"
Expand Down Expand Up @@ -88,7 +87,7 @@ func newKafkaComponent(name, broker, topic, groupID string, publisher *patronamq
}

// create a consumer that accepts only consistent reads
saramaCfg, err := v2.DefaultConsumerSaramaConfig("kafka-consumer", true)
saramaCfg, err := kafka.DefaultConsumerSaramaConfig("kafka-consumer", true)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions test/docker/kafka/component_async_package_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (

"github.com/Shopify/sarama"
"github.com/beatlabs/patron"
v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/async"
"github.com/beatlabs/patron/component/async/kafka"
"github.com/beatlabs/patron/component/async/kafka/group"
kafkacmp "github.com/beatlabs/patron/component/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -211,7 +211,7 @@ func newKafkaAsyncPackageComponent(t *testing.T, name string, retries uint, proc
return nil
}

saramaCfg, err := v2.DefaultConsumerSaramaConfig(name, true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig(name, true)
require.NoError(t, err)

factory, err := group.New(
Expand Down
3 changes: 1 addition & 2 deletions test/docker/kafka/component_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/Shopify/sarama"
"github.com/beatlabs/patron"
v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/kafka"
"github.com/beatlabs/patron/component/kafka/group"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -211,7 +210,7 @@ func TestKafkaComponent_FailOnceAndRetry(t *testing.T) {
}

func newComponent(t *testing.T, name string, retries uint, batchSize uint, processorFunc kafka.BatchProcessorFunc) *group.Component {
saramaCfg, err := v2.DefaultConsumerSaramaConfig(name, true)
saramaCfg, err := kafka.DefaultConsumerSaramaConfig(name, true)
saramaCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
saramaCfg.Version = sarama.V2_6_0_0
require.NoError(t, err)
Expand Down
8 changes: 3 additions & 5 deletions test/docker/kafka/consumer_group_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/Shopify/sarama"
v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/async/kafka"
"github.com/beatlabs/patron/component/async/kafka/group"
kafkacmp "github.com/beatlabs/patron/component/kafka"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -23,8 +23,7 @@ func TestGroupConsume(t *testing.T) {
chMessages := make(chan []string)
chErr := make(chan error)
go func() {

saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-group-consumer", true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-group-consumer", true)
require.NoError(t, err)

factory, err := group.New("test1", uuid.New().String(), []string{groupTopic1}, Brokers(), saramaCfg, kafka.DecoderJSON(),
Expand Down Expand Up @@ -79,8 +78,7 @@ func TestGroupConsume_ClaimMessageError(t *testing.T) {
chMessages := make(chan []string)
chErr := make(chan error)
go func() {

saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-consumer", true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-consumer", true)
require.NoError(t, err)

// Consumer will error out in ClaimMessage as no DecoderFunc has been set
Expand Down
14 changes: 6 additions & 8 deletions test/docker/kafka/consumer_simple_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"time"

"github.com/Shopify/sarama"
v2 "github.com/beatlabs/patron/client/kafka/v2"
"github.com/beatlabs/patron/component/async/kafka"
"github.com/beatlabs/patron/component/async/kafka/simple"
kafkacmp "github.com/beatlabs/patron/component/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -21,8 +21,7 @@ func TestSimpleConsume(t *testing.T) {
chMessages := make(chan []string)
chErr := make(chan error)
go func() {

saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-simple-consumer", true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer", true)
require.NoError(t, err)

factory, err := simple.New("test1", simpleTopic1, Brokers(), saramaCfg, kafka.DecoderJSON(), kafka.Version(sarama.V2_1_0_0.String()),
Expand Down Expand Up @@ -75,8 +74,7 @@ func TestSimpleConsume_ClaimMessageError(t *testing.T) {
chMessages := make(chan []string)
chErr := make(chan error)
go func() {

saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-simple-consumer-claim", true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer-claim", true)
require.NoError(t, err)

factory, err := simple.New("test1", simpleTopic2, Brokers(), saramaCfg, kafka.Version(sarama.V2_1_0_0.String()),
Expand Down Expand Up @@ -138,7 +136,7 @@ func TestSimpleConsume_WithDurationOffset(t *testing.T) {
chMessages := make(chan []string)
chErr := make(chan error)
go func() {
saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-simple-consumer-w-duration", true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer-w-duration", true)
require.NoError(t, err)

factory, err := simple.New("test1", simpleTopic3, Brokers(), saramaCfg, kafka.DecoderJSON(), kafka.Version(sarama.V2_1_0_0.String()),
Expand Down Expand Up @@ -193,7 +191,7 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset(t *testing.T) {
chErr := make(chan error)
chNotif := make(chan struct{})
go func() {
saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-simple-consumer-w-notif", true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer-w-notif", true)
require.NoError(t, err)

factory, err := simple.New("test4", simpleTopic4, Brokers(), saramaCfg, kafka.DecoderJSON(), kafka.Version(sarama.V2_1_0_0.String()),
Expand Down Expand Up @@ -244,7 +242,7 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset_NoMessages(t *te
chErr := make(chan error)
chNotif := make(chan struct{})
go func() {
saramaCfg, err := v2.DefaultConsumerSaramaConfig("test-simple-consumer", true)
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer", true)
require.NoError(t, err)

factory, err := simple.New("test5", simpleTopic5, Brokers(), saramaCfg, kafka.DecoderJSON(), kafka.Version(sarama.V2_1_0_0.String()),
Expand Down

0 comments on commit e746e6b

Please sign in to comment.