diff --git a/lib/kafkalib/consumer.go b/lib/kafkalib/consumer.go index 0d4c19663..7403608dc 100644 --- a/lib/kafkalib/consumer.go +++ b/lib/kafkalib/consumer.go @@ -2,8 +2,6 @@ package kafkalib import ( "context" - "errors" - "github.com/segmentio/kafka-go" ) @@ -12,31 +10,3 @@ type Consumer interface { ReadMessage(ctx context.Context) (kafka.Message, error) CommitMessages(ctx context.Context, msgs ...kafka.Message) error } - -type Reader struct { - *kafka.Reader - config kafka.ReaderConfig -} - -func NewReader(config kafka.ReaderConfig) *Reader { - return &Reader{ - Reader: kafka.NewReader(config), - config: config, - } -} - -func ShouldReload(err error) bool { - if err == nil { - return false - } - - // Kafka Segment Go doesn't handle reloading the client: https://github.com/segmentio/kafka-go/issues/833 - // [27] Rebalance In Progress: the coordinator has begun rebalancing the group, the client should rejoin the group - return errors.Is(err, kafka.RebalanceInProgress) -} - -func (r *Reader) Reload() { - // Close, then reload. - _ = r.Close() - r.Reader = kafka.NewReader(r.config) -} diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index 72f958e0a..5f7ac1d2c 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -104,18 +104,12 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab Brokers: cfg.Kafka.BootstrapServers(), } - kafkaConsumer := kafkalib.NewReader(kafkaCfg) + kafkaConsumer := kafka.NewReader(kafkaCfg) topicToConsumer.Add(topic, kafkaConsumer) for { kafkaMsg, err := kafkaConsumer.FetchMessage(ctx) if err != nil { - if kafkalib.ShouldReload(err) { - slog.Warn("Kafka reader needs to be reloaded", slog.Any("err", err)) - kafkaConsumer.Reload() - } else { - slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err)) - } - + slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err)) time.Sleep(500 * time.Millisecond) continue }