Skip to content

Commit

Permalink
Reload.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed May 10, 2024
1 parent 0f435f9 commit 51c5122
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package consumer
import (
"context"
"crypto/tls"
"errors"
"log/slog"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -107,7 +107,7 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab
for {
kafkaMsg, err := kafkaConsumer.FetchMessage(ctx)
if err != nil {
if strings.Contains(err.Error(), "the client should rejoin the group") {
if errors.Is(err, kafka.RebalanceInProgress) {
if err = kafkaConsumer.Reload(); err != nil {
logger.Fatal("Failed to reload kafka consumer", slog.Any("err", err))
}
Expand Down

0 comments on commit 51c5122

Please sign in to comment.