From deacda0e19e9baac94e1d353f44233d2a32b7e86 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 20 May 2024 21:57:21 -0700 Subject: [PATCH 1/3] Clean up --- lib/kafkalib/errors.go | 7 ++++++- lib/kafkalib/writer.go | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/kafkalib/errors.go b/lib/kafkalib/errors.go index a4592497..1a04ce10 100644 --- a/lib/kafkalib/errors.go +++ b/lib/kafkalib/errors.go @@ -6,7 +6,12 @@ import ( ) func isExceedMaxMessageBytesErr(err error) bool { - return err != nil && errors.Is(err, kafka.MessageSizeTooLarge) + var e kafka.MessageTooLargeError + if err != nil && errors.As(err, &e) { + return true + } + + return false } // isRetryableError - returns true if the error is retryable diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 68d03c9c..5f4d9125 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -134,6 +134,7 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error break } + fmt.Println("kafkaErr: ", kafkaErr, isExceedMaxMessageBytesErr(kafkaErr)) if isExceedMaxMessageBytesErr(kafkaErr) { slog.Info("Skipping this batch since the message size exceeded the server max") kafkaErr = nil From 0d44d38a8d3b11227a4a0d54c88a7d6dc1ed1768 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 20 May 2024 22:01:17 -0700 Subject: [PATCH 2/3] Update tests --- lib/kafkalib/errors_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafkalib/errors_test.go b/lib/kafkalib/errors_test.go index 41faae07..ba7467ed 100644 --- a/lib/kafkalib/errors_test.go +++ b/lib/kafkalib/errors_test.go @@ -27,7 +27,7 @@ func TestIsExceedMaxMessageBytesErr(t *testing.T) { expected: false, }, { - err: kafka.MessageSizeTooLarge, + err: kafka.MessageTooLargeError{}, expected: true, }, } From 1fca4d2a8389c956209c92061593a6a8a4261617 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 20 May 2024 22:03:28 -0700 Subject: [PATCH 3/3] remove fmt.Println --- lib/kafkalib/writer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 5f4d9125..68d03c9c 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -134,7 +134,6 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error break } - fmt.Println("kafkaErr: ", kafkaErr, isExceedMaxMessageBytesErr(kafkaErr)) if isExceedMaxMessageBytesErr(kafkaErr) { slog.Info("Skipping this batch since the message size exceeded the server max") kafkaErr = nil