Skip to content

Commit

Permalink
Merge branch 'master' into nv/move-mssql-merge-queries
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored May 13, 2024
2 parents a5bf518 + 5fdca45 commit 73bc9fe
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 38 deletions.
30 changes: 0 additions & 30 deletions lib/kafkalib/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package kafkalib

import (
"context"
"errors"

"github.com/segmentio/kafka-go"
)

Expand All @@ -12,31 +10,3 @@ type Consumer interface {
ReadMessage(ctx context.Context) (kafka.Message, error)
CommitMessages(ctx context.Context, msgs ...kafka.Message) error
}

type Reader struct {
*kafka.Reader
config kafka.ReaderConfig
}

func NewReader(config kafka.ReaderConfig) *Reader {
return &Reader{
Reader: kafka.NewReader(config),
config: config,
}
}

func ShouldReload(err error) bool {
if err == nil {
return false
}

// Kafka Segment Go doesn't handle reloading the client: https://github.com/segmentio/kafka-go/issues/833
// [27] Rebalance In Progress: the coordinator has begun rebalancing the group, the client should rejoin the group
return errors.Is(err, kafka.RebalanceInProgress)
}

func (r *Reader) Reload() {
// Close, then reload.
_ = r.Close()
r.Reader = kafka.NewReader(r.config)
}
10 changes: 2 additions & 8 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,12 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab
Brokers: cfg.Kafka.BootstrapServers(),
}

kafkaConsumer := kafkalib.NewReader(kafkaCfg)
kafkaConsumer := kafka.NewReader(kafkaCfg)
topicToConsumer.Add(topic, kafkaConsumer)
for {
kafkaMsg, err := kafkaConsumer.FetchMessage(ctx)
if err != nil {
if kafkalib.ShouldReload(err) {
slog.Warn("Kafka reader needs to be reloaded", slog.Any("err", err))
kafkaConsumer.Reload()
} else {
slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err))
}

slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err))
time.Sleep(500 * time.Millisecond)
continue
}
Expand Down

0 comments on commit 73bc9fe

Please sign in to comment.