From 51c51223f2cd6c0a3255a76dca9b97fe79272a82 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 9 May 2024 23:17:28 -0700 Subject: [PATCH] Reload. --- processes/consumer/kafka.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index 89afc5c69..212eebfbf 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -3,8 +3,8 @@ package consumer import ( "context" "crypto/tls" + "errors" "log/slog" - "strings" "sync" "time" @@ -107,7 +107,7 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab for { kafkaMsg, err := kafkaConsumer.FetchMessage(ctx) if err != nil { - if strings.Contains(err.Error(), "the client should rejoin the group") { + if errors.Is(err, kafka.RebalanceInProgress) { if err = kafkaConsumer.Reload(); err != nil { logger.Fatal("Failed to reload kafka consumer", slog.Any("err", err)) }