Skip to content

Commit

Permalink
[kafka] Combine BatchWriter functions
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Mar 30, 2024
1 parent 1cba739 commit 67ce745
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,11 @@ func (b *BatchWriter) buildKafkaMessages(rawMsgs []lib.RawMessage) ([]kafka.Mess
}

func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error {
kafkaMsgs, err := b.buildKafkaMessages(rawMsgs)
msgs, err := b.buildKafkaMessages(rawMsgs)
if err != nil {
return fmt.Errorf("failed to encode kafka messages: %w", err)
}

return b.WriteMessages(ctx, kafkaMsgs)
}

func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error {
chunkSize := b.cfg.GetPublishSize()
if chunkSize < 1 {
return fmt.Errorf("chunk size is too small")
Expand All @@ -99,7 +95,7 @@ func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
if err != nil {
return err
}
for attempts := 0; attempts < 10; attempts++ {
for attempts := range 10 {
if attempts > 0 {
sleepDuration := jitter.Jitter(baseJitterMs, maxJitterMs, attempts-1)
slog.Info("Failed to publish to kafka",
Expand Down

0 comments on commit 67ce745

Please sign in to comment.