From 5fdca452d520313d054265ff7b82ec5356d171ed Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 13 May 2024 15:25:20 -0700 Subject: [PATCH] Kafka Revert reload (#634) --- lib/kafkalib/consumer.go | 30 ------------------------------ processes/consumer/kafka.go | 10 ++-------- 2 files changed, 2 insertions(+), 38 deletions(-) diff --git a/lib/kafkalib/consumer.go b/lib/kafkalib/consumer.go index 0d4c19663..7403608dc 100644 --- a/lib/kafkalib/consumer.go +++ b/lib/kafkalib/consumer.go @@ -2,8 +2,6 @@ package kafkalib import ( "context" - "errors" - "github.com/segmentio/kafka-go" ) @@ -12,31 +10,3 @@ type Consumer interface { ReadMessage(ctx context.Context) (kafka.Message, error) CommitMessages(ctx context.Context, msgs ...kafka.Message) error } - -type Reader struct { - *kafka.Reader - config kafka.ReaderConfig -} - -func NewReader(config kafka.ReaderConfig) *Reader { - return &Reader{ - Reader: kafka.NewReader(config), - config: config, - } -} - -func ShouldReload(err error) bool { - if err == nil { - return false - } - - // Kafka Segment Go doesn't handle reloading the client: https://github.com/segmentio/kafka-go/issues/833 - // [27] Rebalance In Progress: the coordinator has begun rebalancing the group, the client should rejoin the group - return errors.Is(err, kafka.RebalanceInProgress) -} - -func (r *Reader) Reload() { - // Close, then reload. - _ = r.Close() - r.Reader = kafka.NewReader(r.config) -} diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index 72f958e0a..5f7ac1d2c 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -104,18 +104,12 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab Brokers: cfg.Kafka.BootstrapServers(), } - kafkaConsumer := kafkalib.NewReader(kafkaCfg) + kafkaConsumer := kafka.NewReader(kafkaCfg) topicToConsumer.Add(topic, kafkaConsumer) for { kafkaMsg, err := kafkaConsumer.FetchMessage(ctx) if err != nil { - if kafkalib.ShouldReload(err) { - slog.Warn("Kafka reader needs to be reloaded", slog.Any("err", err)) - kafkaConsumer.Reload() - } else { - slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err)) - } - + slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err)) time.Sleep(500 * time.Millisecond) continue }