diff --git a/README.md b/README.md index 9c5fb80..cbed720 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,7 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) { | `consumer.groupId` | Exception consumer group id | | exception-consumer-group | | `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | | | `consumer.concurrency` | Number of goroutines used at listeners | 1 | | +| `consumer.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false | | | `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MinBytes) | 1 | | | `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxBytes) | 1 MB | | | `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxWait) | 10s | | diff --git a/cronsumer.go b/cronsumer.go index a236643..de4c7ec 100644 --- a/cronsumer.go +++ b/cronsumer.go @@ -5,11 +5,29 @@ package cronsumer import ( "github.com/Trendyol/kafka-cronsumer/internal" "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "github.com/Trendyol/kafka-cronsumer/pkg/logger" ) // New returns the newly created kafka consumer instance. // config.Config specifies cron, duration and so many parameters. // ConsumeFn describes how to consume messages from specified topic. func New(cfg *kafka.Config, c kafka.ConsumeFn) kafka.Cronsumer { + cfg.Logger = logger.New(cfg.LogLevel) + + if cfg.Consumer.VerifyTopicOnStartup { + kclient, err := internal.NewKafkaClient(cfg) + if err != nil { + panic("panic when initializing kafka client for verify topic error: " + err.Error()) + } + exist, err := internal.VerifyTopics(kclient, cfg.Consumer.Topic) + if err != nil { + panic("panic " + err.Error()) + } + if !exist { + panic("topic: " + cfg.Consumer.Topic + " does not exist, please check cluster authority etc.") + } + cfg.Logger.Infof("Topic [%s] verified successfully!", cfg.Consumer.Topic) + } + return internal.NewCronsumer(cfg, c) } diff --git a/examples/single-consumer/main.go b/examples/single-consumer/main.go index 7ba89c8..4079f74 100644 --- a/examples/single-consumer/main.go +++ b/examples/single-consumer/main.go @@ -12,7 +12,7 @@ func main() { Brokers: []string{"localhost:29092"}, Consumer: kafka.ConsumerConfig{ GroupID: "sample-consumer", - Topic: "exception", + Topic: "exception-not-exist", Cron: "*/1 * * * *", Duration: 20 * time.Second, }, diff --git a/internal/cron.go b/internal/cron.go index 5cf7066..329fe6f 100644 --- a/internal/cron.go +++ b/internal/cron.go @@ -21,7 +21,6 @@ type cronsumer struct { } func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer { - cfg.Logger = logger.New(cfg.LogLevel) c := newKafkaCronsumer(cfg, fn) return &cronsumer{ diff --git a/internal/verify_topic.go b/internal/verify_topic.go new file mode 100644 index 0000000..ead0328 --- /dev/null +++ b/internal/verify_topic.go @@ -0,0 +1,69 @@ +package internal + +import ( + "context" + "fmt" + + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + segmentio "github.com/segmentio/kafka-go" +) + +type kafkaClient interface { + Metadata(ctx context.Context, req *segmentio.MetadataRequest) (*segmentio.MetadataResponse, error) + GetClient() *segmentio.Client +} + +type client struct { + *segmentio.Client +} + +func NewKafkaClient(cfg *kafka.Config) (kafkaClient, error) { + kc := client{ + Client: &segmentio.Client{ + Addr: segmentio.TCP(cfg.Brokers...), + }, + } + + transport := &segmentio.Transport{ + MetadataTopics: []string{cfg.Consumer.Topic}, + } + + if cfg.SASL.Enabled { + transport.TLS = NewTLSConfig(cfg.SASL) + transport.SASL = Mechanism(cfg.SASL) + } + + kc.Transport = transport + return &kc, nil +} + +func (c *client) GetClient() *segmentio.Client { + return c.Client +} + +func VerifyTopics(client kafkaClient, topics ...string) (bool, error) { + metadata, err := client.Metadata(context.Background(), &segmentio.MetadataRequest{ + Topics: topics, + }) + if err != nil { + return false, fmt.Errorf("error when during verifyTopics metadata request %w", err) + } + return checkTopicsWithinMetadata(metadata, topics) +} + +func checkTopicsWithinMetadata(metadata *segmentio.MetadataResponse, topics []string) (bool, error) { + metadataTopics := make(map[string]struct{}, len(metadata.Topics)) + for _, topic := range metadata.Topics { + if topic.Error != nil { + continue + } + metadataTopics[topic.Name] = struct{}{} + } + + for _, topic := range topics { + if _, exist := metadataTopics[topic]; !exist { + return false, nil + } + } + return true, nil +} diff --git a/internal/verify_topic_test.go b/internal/verify_topic_test.go new file mode 100644 index 0000000..47d1845 --- /dev/null +++ b/internal/verify_topic_test.go @@ -0,0 +1,115 @@ +package internal + +import ( + "context" + "errors" + "testing" + + "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + segmentio "github.com/segmentio/kafka-go" +) + +type mockKafkaClientWrapper struct { + wantErr bool + wantExistTopic bool +} + +func (m mockKafkaClientWrapper) GetClient() *segmentio.Client { + return &segmentio.Client{} +} + +func (m mockKafkaClientWrapper) Metadata(_ context.Context, _ *segmentio.MetadataRequest) (*segmentio.MetadataResponse, error) { + if m.wantErr { + return nil, errors.New("metadataReqErr") + } + + if !m.wantExistTopic { + return &segmentio.MetadataResponse{ + Topics: []segmentio.Topic{ + {Name: "topic1", Error: segmentio.UnknownTopicOrPartition}, + {Name: "topic2", Error: nil}, + }, + }, nil + } + + return &segmentio.MetadataResponse{ + Topics: []segmentio.Topic{ + {Name: "topic1", Error: nil}, + {Name: "topic2", Error: nil}, + }, + }, nil +} + +func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) { + t.Run("Should_Return_Error_When_Metadata_Request_Has_Failed", func(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{wantErr: true} + + // When + _, err := VerifyTopics(mockClient, "topic1") + + // Then + if err == nil { + t.Error("metadata request must be failed!") + } + }) + t.Run("Should_Return_False_When_Given_Topic_Does_Not_Exist", func(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{wantExistTopic: false} + + // When + exist, err := VerifyTopics(mockClient, "topic1") + + // Then + if exist { + t.Errorf("topic %s must not exist", "topic1") + } + if err != nil { + t.Error("err must be nil") + } + }) + t.Run("Should_Return_True_When_Given_Topic_Exist", func(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{wantExistTopic: true} + + // When + exist, err := VerifyTopics(mockClient, "topic1") + + // Then + if !exist { + t.Errorf("topic %s must exist", "topic1") + } + if err != nil { + t.Error("err must be nil") + } + }) +} + +func Test_newKafkaClient(t *testing.T) { + // Given + cfg := &kafka.Config{Brokers: []string{"127.0.0.1:9092"}, Consumer: kafka.ConsumerConfig{Topic: "topic"}} + + // When + sut, err := NewKafkaClient(cfg) + + // Then + if sut.GetClient().Addr.String() != "127.0.0.1:9092" { + t.Errorf("broker address must be 127.0.0.1:9092") + } + if err != nil { + t.Errorf("err must be nil") + } +} + +func Test_kClient_GetClient(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{} + + // When + sut := mockClient.GetClient() + + // Then + if sut == nil { + t.Error("client must not be nil") + } +} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index fb33fef..0cb792f 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -75,6 +75,7 @@ type ConsumerConfig struct { Cron string `yaml:"cron"` BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"skipMessageByHeaderFn"` + VerifyTopicOnStartup bool `yaml:"verifyTopicOnStartup"` } type ProducerConfig struct {