Skip to content

Commit

Permalink
Comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed May 10, 2024
1 parent 2473504 commit 19883bb
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
3 changes: 1 addition & 2 deletions lib/kafkalib/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ func ShouldReload(err error) bool {
return errors.Is(err, kafka.RebalanceInProgress)
}

func (r *Reader) Reload() error {
func (r *Reader) Reload() {
// Close, then reload.
_ = r.Close()
r.Reader = kafka.NewReader(r.config)
return nil
}
8 changes: 4 additions & 4 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab
kafkaMsg, err := kafkaConsumer.FetchMessage(ctx)
if err != nil {
if kafkalib.ShouldReload(err) {
if err = kafkaConsumer.Reload(); err != nil {
logger.Fatal("Failed to reload kafka consumer", slog.Any("err", err))
}
slog.Warn("Kafka reader needs to be reloaded", slog.Any("err", err))
kafkaConsumer.Reload()
} else {
slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err))
}

slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Failed to read kafka message", slog.Any("err", err))
time.Sleep(500 * time.Millisecond)
continue
}
Expand Down

0 comments on commit 19883bb

Please sign in to comment.