diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index a14e7119..7bb4ab37 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -70,15 +70,11 @@ func (b *BatchWriter) buildKafkaMessages(rawMsgs []lib.RawMessage) ([]kafka.Mess } func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error { - kafkaMsgs, err := b.buildKafkaMessages(rawMsgs) + msgs, err := b.buildKafkaMessages(rawMsgs) if err != nil { return fmt.Errorf("failed to encode kafka messages: %w", err) } - return b.WriteMessages(ctx, kafkaMsgs) -} - -func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error { chunkSize := b.cfg.GetPublishSize() if chunkSize < 1 { return fmt.Errorf("chunk size is too small") @@ -99,7 +95,7 @@ func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e if err != nil { return err } - for attempts := 0; attempts < 10; attempts++ { + for attempts := range 10 { if attempts > 0 { sleepDuration := jitter.Jitter(baseJitterMs, maxJitterMs, attempts-1) slog.Info("Failed to publish to kafka",