From 19883bb746507e2eb7973c71e3869b8401160f9f Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 9 May 2024 23:28:59 -0700 Subject: [PATCH] Comment. --- lib/kafkalib/consumer.go | 3 +-- processes/consumer/kafka.go | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/kafkalib/consumer.go b/lib/kafkalib/consumer.go index c95d1361c..0d4c19663 100644 --- a/lib/kafkalib/consumer.go +++ b/lib/kafkalib/consumer.go @@ -35,9 +35,8 @@ func ShouldReload(err error) bool { return errors.Is(err, kafka.RebalanceInProgress) } -func (r *Reader) Reload() error { +func (r *Reader) Reload() { // Close, then reload. _ = r.Close() r.Reader = kafka.NewReader(r.config) - return nil } diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index 5a74b4831..bf25108e1 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -107,12 +107,12 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab kafkaMsg, err := kafkaConsumer.FetchMessage(ctx) if err != nil { if kafkalib.ShouldReload(err) { - if err = kafkaConsumer.Reload(); err != nil { - logger.Fatal("Failed to reload kafka consumer", slog.Any("err", err)) - } + slog.Warn("Kafka reader needs to be reloaded", slog.Any("err", err)) + kafkaConsumer.Reload() + } else { + slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err)) } - slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err)) time.Sleep(500 * time.Millisecond) continue }