diff --git a/lib/kafkalib/errors.go b/lib/kafkalib/errors.go index 696d132f..6cce3837 100644 --- a/lib/kafkalib/errors.go +++ b/lib/kafkalib/errors.go @@ -2,13 +2,13 @@ package kafkalib import "strings" -func IsExceedMaxMessageBytesErr(err error) bool { +func isExceedMaxMessageBytesErr(err error) bool { return err != nil && strings.Contains(err.Error(), "Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum") } -// RetryableError - returns true if the error is retryable +// retryableError - returns true if the error is retryable // If it's retryable, you need to reload the Kafka client. -func RetryableError(err error) bool { +func retryableError(err error) bool { return err != nil && strings.Contains(err.Error(), "Topic Authorization Failed: the client is not authorized to access the requested topic") } diff --git a/lib/kafkalib/errors_test.go b/lib/kafkalib/errors_test.go index 00ff8f54..53ee7973 100644 --- a/lib/kafkalib/errors_test.go +++ b/lib/kafkalib/errors_test.go @@ -27,7 +27,7 @@ func TestIsExceedMaxMessageBytesErr(t *testing.T) { } for _, tc := range tcs { - actual := IsExceedMaxMessageBytesErr(tc.err) + actual := isExceedMaxMessageBytesErr(tc.err) assert.Equal(t, tc.expected, actual, tc.err) } } diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 65661da1..679c4a76 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -103,7 +103,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e ) time.Sleep(sleepDuration) - if RetryableError(kafkaErr) { + if retryableError(kafkaErr) { if reloadErr := w.reload(ctx); reloadErr != nil { slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr)) } @@ -116,7 +116,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e break } - if IsExceedMaxMessageBytesErr(kafkaErr) { + if isExceedMaxMessageBytesErr(kafkaErr) { slog.Info("Skipping this chunk since the batch exceeded the server") kafkaErr = nil break