Skip to content

Commit

Permalink
Feature / enable&disable duration (#81)
Browse files Browse the repository at this point in the history
* feat: naming change

* feat: duration validation disabled

* feat: integration test added

* feat: nonstopwork var defined for duration controls

* feat: modify readme

* feat: renaming and goroutine leak fix

* feat: integration test topic name changed

* feat: linter fix
  • Loading branch information
ugurcanerdogan authored Sep 3, 2024
1 parent 6066a2b commit 741d6f4
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 138 deletions.
72 changes: 36 additions & 36 deletions README.md

Large diffs are not rendered by default.

83 changes: 0 additions & 83 deletions internal/cron.go

This file was deleted.

24 changes: 12 additions & 12 deletions internal/cronsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
)

type kafkaCronsumer struct {
type cronsumer struct {
messageChannel chan MessageWrapper

kafkaConsumer Consumer
Expand All @@ -23,11 +23,11 @@ type kafkaCronsumer struct {
cfg *kafka.Config
}

func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *kafkaCronsumer {
func newCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *cronsumer {
cfg.SetDefaults()
cfg.Validate()

return &kafkaCronsumer{
return &cronsumer{
cfg: cfg,
messageChannel: make(chan MessageWrapper),
kafkaConsumer: newConsumer(cfg),
Expand All @@ -40,13 +40,13 @@ func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *
}
}

func (k *kafkaCronsumer) SetupConcurrentWorkers(concurrency int) {
func (k *cronsumer) SetupConcurrentWorkers(concurrency int) {
for i := 0; i < concurrency; i++ {
go k.processMessage()
}
}

func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancelFuncWrapper *func()) {
func (k *cronsumer) Listen(ctx context.Context, strategyName string, cancelFuncWrapper *func()) {
startTime := time.Now()
startTimeUnixNano := startTime.UnixNano()

Expand Down Expand Up @@ -102,17 +102,17 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel
}
}

func (k *kafkaCronsumer) Stop() {
func (k *cronsumer) Stop() {
close(k.messageChannel)
k.kafkaConsumer.Stop()
k.kafkaProducer.Close()
}

func (k *kafkaCronsumer) GetMetric() *CronsumerMetric {
func (k *cronsumer) GetMetric() *CronsumerMetric {
return k.metric
}

func (k *kafkaCronsumer) processMessage() {
func (k *cronsumer) processMessage() {
for msg := range k.messageChannel {
if err := k.consumeFn(msg.Message); err != nil {
msg.AddHeader(createErrHeader(err))
Expand All @@ -121,20 +121,20 @@ func (k *kafkaCronsumer) processMessage() {
}
}

func (k *kafkaCronsumer) sendToMessageChannel(msg MessageWrapper) {
func (k *cronsumer) sendToMessageChannel(msg MessageWrapper) {
defer k.recoverMessage(msg)
k.messageChannel <- msg
}

func (k *kafkaCronsumer) recoverMessage(msg MessageWrapper) {
func (k *cronsumer) recoverMessage(msg MessageWrapper) {
// sending MessageWrapper to closed channel panic could be occurred cause of concurrency for exception topic listeners
if r := recover(); r != nil {
k.cfg.Logger.Warnf("Recovered MessageWrapper: %s", string(msg.Value))
k.produce(msg)
}
}

func (k *kafkaCronsumer) produce(msg MessageWrapper) {
func (k *cronsumer) produce(msg MessageWrapper) {
if msg.IsGteMaxRetryCount(k.maxRetry) {
k.cfg.Logger.Infof("Message from %s exceeds to retry limit %d. KafkaMessage: %s", k.cfg.Consumer.Topic, k.maxRetry, msg.Value)

Expand All @@ -157,6 +157,6 @@ func (k *kafkaCronsumer) produce(msg MessageWrapper) {
}
}

func (k *kafkaCronsumer) isDeadLetterTopicFeatureEnabled() bool {
func (k *cronsumer) isDeadLetterTopicFeatureEnabled() bool {
return k.deadLetterTopic != ""
}
98 changes: 98 additions & 0 deletions internal/cronsumer_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package internal

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/Trendyol/kafka-cronsumer/pkg/kafka"

"github.com/Trendyol/kafka-cronsumer/pkg/logger"

gocron "github.com/robfig/cron/v3"
)

type cronsumerClient struct {
cfg *kafka.Config
cron *gocron.Cron
consumer *cronsumer
metricCollectors []prometheus.Collector
}

func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer {
c := newCronsumer(cfg, fn)

return &cronsumerClient{
cron: gocron.New(),
consumer: c,
cfg: cfg,
metricCollectors: []prometheus.Collector{NewCollector(cfg.MetricPrefix, c.metric)},
}
}

func (s *cronsumerClient) WithLogger(logger logger.Interface) {
s.cfg.Logger = logger
}

func (s *cronsumerClient) Start() {
s.setup()
s.cron.Start()
}

func (s *cronsumerClient) Run() {
s.setup()
s.cron.Run()
}

func (s *cronsumerClient) Stop() {
s.cron.Stop()
s.consumer.Stop()
}

func (s *cronsumerClient) Produce(message kafka.Message) error {
return s.consumer.kafkaProducer.Produce(message)
}

func (s *cronsumerClient) ProduceBatch(messages []kafka.Message) error {
return s.consumer.kafkaProducer.ProduceBatch(messages)
}

func (s *cronsumerClient) GetMetricCollectors() []prometheus.Collector {
return s.metricCollectors
}

func (s *cronsumerClient) setup() {
cfg := s.cfg.Consumer

s.consumer.SetupConcurrentWorkers(cfg.Concurrency)
schedule, err := gocron.ParseStandard(cfg.Cron)
if err != nil {
panic("Cron parse error: " + err.Error())
}

_, _ = s.cron.AddFunc(cfg.Cron, func() {
cancelFuncWrapper := s.startListen(cfg)
if cfg.Duration == kafka.NonStopWork {
now := time.Now()
nextRun := schedule.Next(now)
duration := nextRun.Sub(now)
time.AfterFunc(duration, cancelFuncWrapper)
} else {
time.AfterFunc(cfg.Duration, cancelFuncWrapper)
}
})
}

func (s *cronsumerClient) startListen(cfg kafka.ConsumerConfig) func() {
s.cfg.Logger.Debug("Consuming " + cfg.Topic + " started at time: " + time.Now().String())

ctx, cancel := context.WithCancel(context.Background())
cancelFuncWrapper := func() {
s.cfg.Logger.Debug("Consuming " + cfg.Topic + " paused at " + time.Now().String())
cancel()
}

go s.consumer.Listen(ctx, cfg.BackOffStrategy.String(), &cancelFuncWrapper)
return cancelFuncWrapper
}
File renamed without changes.
8 changes: 4 additions & 4 deletions internal/cronsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test_Produce_Max_Retry_Count_Reach(t *testing.T) {
var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
return nil
}
c := &kafkaCronsumer{
c := &cronsumer{
cfg: kafkaConfig,
messageChannel: make(chan MessageWrapper),
kafkaConsumer: mockConsumer{},
Expand Down Expand Up @@ -63,7 +63,7 @@ func Test_Produce_Max_Retry_Count_Reach_Dead_Letter_Topic_Feature_Enabled(t *tes
var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
return nil
}
c := &kafkaCronsumer{
c := &cronsumer{
cfg: &kafka.Config{
Logger: logger.New("info"),
},
Expand Down Expand Up @@ -112,7 +112,7 @@ func Test_Produce_With_Retry(t *testing.T) {
return nil
}
producer := newMockProducer()
c := &kafkaCronsumer{
c := &cronsumer{
cfg: kafkaConfig,
messageChannel: make(chan MessageWrapper),
kafkaConsumer: mockConsumer{},
Expand Down Expand Up @@ -159,7 +159,7 @@ func Test_Recover_Message(t *testing.T) {
return nil
}
producer := newMockProducer()
c := &kafkaCronsumer{
c := &cronsumer{
cfg: kafkaConfig,
messageChannel: make(chan MessageWrapper),
kafkaConsumer: mockConsumer{},
Expand Down
6 changes: 3 additions & 3 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (
FixedBackOffStrategy = "fixed"
)

//nolint:all
var NonStopWork time.Duration = 0

type Config struct {
Brokers []string `yaml:"brokers"`
Consumer ConsumerConfig `yaml:"consumer"`
Expand Down Expand Up @@ -139,9 +142,6 @@ func (c *Config) Validate() {
if c.Consumer.Cron == "" {
panic("you have to set cron expression")
}
if c.Consumer.Duration == 0 {
panic("you have to set panic duration")
}
if !isValidBackOffStrategy(c.Consumer.BackOffStrategy) {
panic("you have to set valid backoff strategy")
}
Expand Down
File renamed without changes.
42 changes: 42 additions & 0 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,48 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) {
}
}

func Test_Should_Consume_Exception_Message_Successfully_When_Duration_Zero(t *testing.T) {
// Given
t.Parallel()
topic := "exception-no-duration"
_, cleanUp := createTopic(t, topic)
defer cleanUp()

config := &kafka.Config{
Brokers: []string{"localhost:9092"},
Consumer: kafka.ConsumerConfig{
GroupID: "sample-consumer",
Topic: topic,
Cron: "*/1 * * * *",
Duration: kafka.NonStopWork, // duration set as 0
},
LogLevel: "info",
}

waitMessageCh := make(chan kafka.Message)

var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
waitMessageCh <- message
return nil
}

c := cronsumer.New(config, consumeFn)
c.Start()

// When
expectedMessage := kafka.Message{Topic: topic, Value: []byte("some message")}
if err := c.Produce(expectedMessage); err != nil {
fmt.Println("Produce err", err.Error())
}

// Then
actualMessage := <-waitMessageCh
if !bytes.Equal(actualMessage.Value, expectedMessage.Value) {
t.Errorf("Expected: %s, Actual: %s", expectedMessage.Value, actualMessage.Value)
}
}

func getRetryCount(message kafka.Message) int {
for _, header := range message.Headers {
if header.Key == "x-retry-count" {
Expand Down

0 comments on commit 741d6f4

Please sign in to comment.