Skip to content

Commit

Permalink
Merge pull request #76 from Trendyol/feature/verify-topic
Browse files Browse the repository at this point in the history
feat: add verify topic feature
  • Loading branch information
A.Samet İleri authored Jun 3, 2024
2 parents 94c756a + cbc6017 commit d261a9a
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
| `consumer.groupId` | Exception consumer group id | | exception-consumer-group |
| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | |
| `consumer.concurrency` | Number of goroutines used at listeners | 1 | |
| `consumer.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false | |
| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MinBytes) | 1 | |
| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MaxBytes) | 1 MB | |
| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MaxWait) | 10s | |
Expand Down
18 changes: 18 additions & 0 deletions cronsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,29 @@ package cronsumer
import (
"github.com/Trendyol/kafka-cronsumer/internal"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/Trendyol/kafka-cronsumer/pkg/logger"
)

// New returns the newly created kafka consumer instance.
// config.Config specifies cron, duration and so many parameters.
// ConsumeFn describes how to consume messages from specified topic.
func New(cfg *kafka.Config, c kafka.ConsumeFn) kafka.Cronsumer {
cfg.Logger = logger.New(cfg.LogLevel)

if cfg.Consumer.VerifyTopicOnStartup {
kclient, err := internal.NewKafkaClient(cfg)
if err != nil {
panic("panic when initializing kafka client for verify topic error: " + err.Error())
}
exist, err := internal.VerifyTopics(kclient, cfg.Consumer.Topic)
if err != nil {
panic("panic " + err.Error())
}
if !exist {
panic("topic: " + cfg.Consumer.Topic + " does not exist, please check cluster authority etc.")
}
cfg.Logger.Infof("Topic [%s] verified successfully!", cfg.Consumer.Topic)
}

return internal.NewCronsumer(cfg, c)
}
2 changes: 1 addition & 1 deletion examples/single-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func main() {
Brokers: []string{"localhost:29092"},
Consumer: kafka.ConsumerConfig{
GroupID: "sample-consumer",
Topic: "exception",
Topic: "exception-not-exist",
Cron: "*/1 * * * *",
Duration: 20 * time.Second,
},
Expand Down
1 change: 0 additions & 1 deletion internal/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type cronsumer struct {
}

func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer {
cfg.Logger = logger.New(cfg.LogLevel)
c := newKafkaCronsumer(cfg, fn)

return &cronsumer{
Expand Down
69 changes: 69 additions & 0 deletions internal/verify_topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package internal

import (
"context"
"fmt"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
segmentio "github.com/segmentio/kafka-go"
)

type kafkaClient interface {
Metadata(ctx context.Context, req *segmentio.MetadataRequest) (*segmentio.MetadataResponse, error)
GetClient() *segmentio.Client
}

type client struct {
*segmentio.Client
}

func NewKafkaClient(cfg *kafka.Config) (kafkaClient, error) {
kc := client{
Client: &segmentio.Client{
Addr: segmentio.TCP(cfg.Brokers...),
},
}

transport := &segmentio.Transport{
MetadataTopics: []string{cfg.Consumer.Topic},
}

if cfg.SASL.Enabled {
transport.TLS = NewTLSConfig(cfg.SASL)
transport.SASL = Mechanism(cfg.SASL)
}

kc.Transport = transport
return &kc, nil
}

func (c *client) GetClient() *segmentio.Client {
return c.Client
}

func VerifyTopics(client kafkaClient, topics ...string) (bool, error) {
metadata, err := client.Metadata(context.Background(), &segmentio.MetadataRequest{
Topics: topics,
})
if err != nil {
return false, fmt.Errorf("error when during verifyTopics metadata request %w", err)
}
return checkTopicsWithinMetadata(metadata, topics)
}

func checkTopicsWithinMetadata(metadata *segmentio.MetadataResponse, topics []string) (bool, error) {
metadataTopics := make(map[string]struct{}, len(metadata.Topics))
for _, topic := range metadata.Topics {
if topic.Error != nil {
continue
}
metadataTopics[topic.Name] = struct{}{}
}

for _, topic := range topics {
if _, exist := metadataTopics[topic]; !exist {
return false, nil
}
}
return true, nil
}
115 changes: 115 additions & 0 deletions internal/verify_topic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package internal

import (
"context"
"errors"
"testing"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
segmentio "github.com/segmentio/kafka-go"
)

type mockKafkaClientWrapper struct {
wantErr bool
wantExistTopic bool
}

func (m mockKafkaClientWrapper) GetClient() *segmentio.Client {
return &segmentio.Client{}
}

func (m mockKafkaClientWrapper) Metadata(_ context.Context, _ *segmentio.MetadataRequest) (*segmentio.MetadataResponse, error) {
if m.wantErr {
return nil, errors.New("metadataReqErr")
}

if !m.wantExistTopic {
return &segmentio.MetadataResponse{
Topics: []segmentio.Topic{
{Name: "topic1", Error: segmentio.UnknownTopicOrPartition},
{Name: "topic2", Error: nil},
},
}, nil
}

return &segmentio.MetadataResponse{
Topics: []segmentio.Topic{
{Name: "topic1", Error: nil},
{Name: "topic2", Error: nil},
},
}, nil
}

func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) {
t.Run("Should_Return_Error_When_Metadata_Request_Has_Failed", func(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{wantErr: true}

// When
_, err := VerifyTopics(mockClient, "topic1")

// Then
if err == nil {
t.Error("metadata request must be failed!")
}
})
t.Run("Should_Return_False_When_Given_Topic_Does_Not_Exist", func(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{wantExistTopic: false}

// When
exist, err := VerifyTopics(mockClient, "topic1")

// Then
if exist {
t.Errorf("topic %s must not exist", "topic1")
}
if err != nil {
t.Error("err must be nil")
}
})
t.Run("Should_Return_True_When_Given_Topic_Exist", func(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{wantExistTopic: true}

// When
exist, err := VerifyTopics(mockClient, "topic1")

// Then
if !exist {
t.Errorf("topic %s must exist", "topic1")
}
if err != nil {
t.Error("err must be nil")
}
})
}

func Test_newKafkaClient(t *testing.T) {
// Given
cfg := &kafka.Config{Brokers: []string{"127.0.0.1:9092"}, Consumer: kafka.ConsumerConfig{Topic: "topic"}}

// When
sut, err := NewKafkaClient(cfg)

// Then
if sut.GetClient().Addr.String() != "127.0.0.1:9092" {
t.Errorf("broker address must be 127.0.0.1:9092")
}
if err != nil {
t.Errorf("err must be nil")
}
}

func Test_kClient_GetClient(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{}

// When
sut := mockClient.GetClient()

// Then
if sut == nil {
t.Error("client must not be nil")
}
}
1 change: 1 addition & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ConsumerConfig struct {
Cron string `yaml:"cron"`
BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"`
SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"skipMessageByHeaderFn"`
VerifyTopicOnStartup bool `yaml:"verifyTopicOnStartup"`
}

type ProducerConfig struct {
Expand Down

0 comments on commit d261a9a

Please sign in to comment.