Skip to content

Commit

Permalink
Functions to be private.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Mar 25, 2024
1 parent 77e0014 commit c1815ca
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions lib/kafkalib/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package kafkalib

import "strings"

func IsExceedMaxMessageBytesErr(err error) bool {
func isExceedMaxMessageBytesErr(err error) bool {
return err != nil && strings.Contains(err.Error(),
"Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum")
}

// RetryableError - returns true if the error is retryable
// retryableError - returns true if the error is retryable
// If it's retryable, you need to reload the Kafka client.
func RetryableError(err error) bool {
func retryableError(err error) bool {
return err != nil && strings.Contains(err.Error(), "Topic Authorization Failed: the client is not authorized to access the requested topic")
}
2 changes: 1 addition & 1 deletion lib/kafkalib/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestIsExceedMaxMessageBytesErr(t *testing.T) {
}

for _, tc := range tcs {
actual := IsExceedMaxMessageBytesErr(tc.err)
actual := isExceedMaxMessageBytesErr(tc.err)
assert.Equal(t, tc.expected, actual, tc.err)
}
}
4 changes: 2 additions & 2 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
)
time.Sleep(sleepDuration)

if RetryableError(kafkaErr) {
if retryableError(kafkaErr) {
if reloadErr := w.reload(ctx); reloadErr != nil {
slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr))
}
Expand All @@ -116,7 +116,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
break
}

if IsExceedMaxMessageBytesErr(kafkaErr) {
if isExceedMaxMessageBytesErr(kafkaErr) {
slog.Info("Skipping this chunk since the batch exceeded the server")
kafkaErr = nil
break
Expand Down

0 comments on commit c1815ca

Please sign in to comment.