From ac1f317f2f6632ba63f31ba8087f96174d70b9a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dilara=20G=C3=B6r=C3=BCm?= <73517346+dilaragorum@users.noreply.github.com> Date: Fri, 29 Mar 2024 09:55:48 +0300 Subject: [PATCH] feat: enable balancer support to the retry configuration (#123) * feat: enable balancer support to the retry configuration * chore: add balancer tests --------- Co-authored-by: Abdulsametileri --- README.md | 1 + balancer.go | 29 ++++++++++++++++++ balancer_test.go | 66 +++++++++++++++++++++++++++++++++++++++++ consumer_config.go | 8 +++-- go.mod | 2 +- go.sum | 2 ++ test/integration/go.mod | 2 +- test/integration/go.sum | 2 ++ 8 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 balancer.go create mode 100644 balancer_test.go diff --git a/README.md b/README.md index 1d60aac..6041a60 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | | `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | | `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | | +| `batchConfiguration.balancer` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Balancer) | leastBytes | | `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | | `sasl.authType` | `SCRAM` or `PLAIN` | | diff --git a/balancer.go b/balancer.go new file mode 100644 index 0000000..9319c67 --- /dev/null +++ b/balancer.go @@ -0,0 +1,29 @@ +package kafka + +import "github.com/segmentio/kafka-go" + +type Balancer kafka.Balancer + +func GetBalancerCRC32() Balancer { + return &kafka.CRC32Balancer{} +} + +func GetBalancerHash() Balancer { + return &kafka.Hash{} +} + +func GetBalancerLeastBytes() Balancer { + return &kafka.LeastBytes{} +} + +func GetBalancerMurmur2Balancer() Balancer { + return &kafka.Murmur2Balancer{} +} + +func GetBalancerReferenceHash() Balancer { + return &kafka.ReferenceHash{} +} + +func GetBalancerRoundRobin() Balancer { + return &kafka.RoundRobin{} +} diff --git a/balancer_test.go b/balancer_test.go new file mode 100644 index 0000000..11da72d --- /dev/null +++ b/balancer_test.go @@ -0,0 +1,66 @@ +package kafka + +import ( + "reflect" + "testing" +) + +func TestGetBalancerCRC32(t *testing.T) { + balancer := GetBalancerCRC32() + if balancer == nil { + t.Error("Expected non-nil balancer, got nil") + } + if reflect.TypeOf(balancer).String() != "*kafka.CRC32Balancer" { + t.Errorf("Expected *kafka.CRC32Balancer, got %s", reflect.TypeOf(balancer).String()) + } +} + +func TestGetBalancerHash(t *testing.T) { + balancer := GetBalancerHash() + if balancer == nil { + t.Error("Expected non-nil balancer, got nil") + } + if reflect.TypeOf(balancer).String() != "*kafka.Hash" { + t.Errorf("Expected *kafka.Hash, got %s", reflect.TypeOf(balancer).String()) + } +} + +func TestGetBalancerLeastBytes(t *testing.T) { + balancer := GetBalancerLeastBytes() + if balancer == nil { + t.Error("Expected non-nil balancer, got nil") + } + if reflect.TypeOf(balancer).String() != "*kafka.LeastBytes" { + t.Errorf("Expected *kafka.LeastBytes, got %s", reflect.TypeOf(balancer).String()) + } +} + +func TestGetBalancerMurmur2Balancer(t *testing.T) { + balancer := GetBalancerMurmur2Balancer() + if balancer == nil { + t.Error("Expected non-nil balancer, got nil") + } + if reflect.TypeOf(balancer).String() != "*kafka.Murmur2Balancer" { + t.Errorf("Expected *kafka.Murmur2Balancer, got %s", reflect.TypeOf(balancer).String()) + } +} + +func TestGetBalancerReferenceHash(t *testing.T) { + balancer := GetBalancerReferenceHash() + if balancer == nil { + t.Error("Expected non-nil balancer, got nil") + } + if reflect.TypeOf(balancer).String() != "*kafka.ReferenceHash" { + t.Errorf("Expected *kafka.ReferenceHash, got %s", reflect.TypeOf(balancer).String()) + } +} + +func TestGetBalancerRoundRobinh(t *testing.T) { + balancer := GetBalancerRoundRobin() + if balancer == nil { + t.Error("Expected non-nil balancer, got nil") + } + if reflect.TypeOf(balancer).String() != "*kafka.RoundRobin" { + t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String()) + } +} diff --git a/consumer_config.go b/consumer_config.go index f0cdc16..3610f6b 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -3,14 +3,14 @@ package kafka import ( "time" + "github.com/segmentio/kafka-go" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger" - - "github.com/segmentio/kafka-go" ) type ReaderConfig kafka.ReaderConfig @@ -84,6 +84,9 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset), RetentionTime: cfg.Reader.RetentionTime, }, + Producer: kcronsumer.ProducerConfig{ + Balancer: cfg.RetryConfiguration.Balancer, + }, LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel), } @@ -155,6 +158,7 @@ type RetryConfiguration struct { Rack string LogLevel LogLevel Brokers []string + Balancer Balancer MaxRetry int WorkDuration time.Duration SkipMessageByHeaderFn SkipMessageByHeaderFn diff --git a/go.mod b/go.mod index 9d5f81a..ba73591 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2 go 1.19 require ( - github.com/Trendyol/kafka-cronsumer v1.5.0 + github.com/Trendyol/kafka-cronsumer v1.5.1 github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.52.1 diff --git a/go.sum b/go.sum index f655f8a..95a2f10 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U= github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ= +github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= diff --git a/test/integration/go.mod b/test/integration/go.mod index 3c5f5e6..162f505 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - github.com/Trendyol/kafka-cronsumer v1.5.0 // indirect + github.com/Trendyol/kafka-cronsumer v1.5.1 // indirect github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 78f67c5..2489c9f 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,5 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U= github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ= +github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=