Skip to content

Commit

Permalink
feat: enable balancer support to the retry configuration (#123)
Browse files Browse the repository at this point in the history
* feat: enable balancer support to the retry configuration

* chore: add balancer tests

---------

Co-authored-by: Abdulsametileri <[email protected]>
  • Loading branch information
dilaragorum and Abdulsametileri authored Mar 29, 2024
1 parent f9bc1d6 commit ac1f317
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
| `batchConfiguration.balancer` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Balancer) | leastBytes |
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
| `sasl.authType` | `SCRAM` or `PLAIN` | |
Expand Down
29 changes: 29 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka

import "github.com/segmentio/kafka-go"

type Balancer kafka.Balancer

func GetBalancerCRC32() Balancer {
return &kafka.CRC32Balancer{}
}

func GetBalancerHash() Balancer {
return &kafka.Hash{}
}

func GetBalancerLeastBytes() Balancer {
return &kafka.LeastBytes{}
}

func GetBalancerMurmur2Balancer() Balancer {
return &kafka.Murmur2Balancer{}
}

func GetBalancerReferenceHash() Balancer {
return &kafka.ReferenceHash{}
}

func GetBalancerRoundRobin() Balancer {
return &kafka.RoundRobin{}
}
66 changes: 66 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kafka

import (
"reflect"
"testing"
)

func TestGetBalancerCRC32(t *testing.T) {
balancer := GetBalancerCRC32()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.CRC32Balancer" {
t.Errorf("Expected *kafka.CRC32Balancer, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerHash(t *testing.T) {
balancer := GetBalancerHash()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.Hash" {
t.Errorf("Expected *kafka.Hash, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerLeastBytes(t *testing.T) {
balancer := GetBalancerLeastBytes()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.LeastBytes" {
t.Errorf("Expected *kafka.LeastBytes, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerMurmur2Balancer(t *testing.T) {
balancer := GetBalancerMurmur2Balancer()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.Murmur2Balancer" {
t.Errorf("Expected *kafka.Murmur2Balancer, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerReferenceHash(t *testing.T) {
balancer := GetBalancerReferenceHash()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.ReferenceHash" {
t.Errorf("Expected *kafka.ReferenceHash, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerRoundRobinh(t *testing.T) {
balancer := GetBalancerRoundRobin()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.RoundRobin" {
t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String())
}
}
8 changes: 6 additions & 2 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package kafka
import (
"time"

"github.com/segmentio/kafka-go"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger"

"github.com/segmentio/kafka-go"
)

type ReaderConfig kafka.ReaderConfig
Expand Down Expand Up @@ -84,6 +84,9 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset),
RetentionTime: cfg.Reader.RetentionTime,
},
Producer: kcronsumer.ProducerConfig{
Balancer: cfg.RetryConfiguration.Balancer,
},
LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel),
}

Expand Down Expand Up @@ -155,6 +158,7 @@ type RetryConfiguration struct {
Rack string
LogLevel LogLevel
Brokers []string
Balancer Balancer
MaxRetry int
WorkDuration time.Duration
SkipMessageByHeaderFn SkipMessageByHeaderFn
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2
go 1.19

require (
github.com/Trendyol/kafka-cronsumer v1.5.0
github.com/Trendyol/kafka-cronsumer v1.5.1
github.com/Trendyol/otel-kafka-konsumer v0.0.7
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.52.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U=
github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ=
github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
)

require (
github.com/Trendyol/kafka-cronsumer v1.5.0 // indirect
github.com/Trendyol/kafka-cronsumer v1.5.1 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U=
github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ=
github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down

0 comments on commit ac1f317

Please sign in to comment.