From 0dc26a12addddcfd5c6459e5d0b2176ae81d506c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 25 Mar 2024 10:46:18 -0700 Subject: [PATCH] More minor clean up. --- lib/kafkalib/message.go | 2 +- lib/kafkalib/message_test.go | 2 +- lib/kafkalib/writer.go | 45 +++++++++++++++++++----------------- 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/lib/kafkalib/message.go b/lib/kafkalib/message.go index 43974b68..51acf258 100644 --- a/lib/kafkalib/message.go +++ b/lib/kafkalib/message.go @@ -6,7 +6,7 @@ import ( "github.com/segmentio/kafka-go" ) -func NewMessage(topic string, partitionKey map[string]any, value any) (kafka.Message, error) { +func newMessage(topic string, partitionKey map[string]any, value any) (kafka.Message, error) { valueBytes, err := json.Marshal(value) if err != nil { return kafka.Message{}, err diff --git a/lib/kafkalib/message_test.go b/lib/kafkalib/message_test.go index 2d51b054..94ebbe8f 100644 --- a/lib/kafkalib/message_test.go +++ b/lib/kafkalib/message_test.go @@ -19,7 +19,7 @@ func TestNewMessage(t *testing.T) { }, } - msg, err := NewMessage("topic", map[string]any{"key": "value"}, payload) + msg, err := newMessage("topic", map[string]any{"key": "value"}, payload) assert.NoError(t, err) assert.Equal(t, "topic", msg.Topic) assert.Equal(t, `{"key":"value"}`, string(msg.Key)) diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 679c4a76..c116b9c1 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -39,44 +39,47 @@ func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD mtr.Client) (* return &BatchWriter{writer, cfg, statsD}, nil } -func (w *BatchWriter) reload(ctx context.Context) error { +func (b *BatchWriter) reload(ctx context.Context) error { slog.Info("Reloading kafka writer") - if err := w.writer.Close(); err != nil { + if err := b.writer.Close(); err != nil { return err } - writer, err := NewWriter(ctx, w.cfg) + writer, err := NewWriter(ctx, b.cfg) if err != nil { return err } - w.writer = writer + b.writer = writer return nil } -func buildKafkaMessages(cfg *config.Kafka, msgs []lib.RawMessage) ([]kafka.Message, error) { - result := make([]kafka.Message, len(msgs)) - for i, msg := range msgs { - topic := fmt.Sprintf("%s.%s", cfg.TopicPrefix, msg.TopicSuffix) - kMsg, err := NewMessage(topic, msg.PartitionKey, msg.GetPayload()) +func (b *BatchWriter) buildKafkaMessages(rawMsgs []lib.RawMessage) ([]kafka.Message, error) { + var kafkaMsgs []kafka.Message + for _, rawMsg := range rawMsgs { + topic := fmt.Sprintf("%s.%s", b.cfg.TopicPrefix, rawMsg.TopicSuffix) + kafkaMsg, err := newMessage(topic, rawMsg.PartitionKey, rawMsg.GetPayload()) if err != nil { return nil, err } - result[i] = kMsg + + kafkaMsgs = append(kafkaMsgs, kafkaMsg) } - return result, nil + + return kafkaMsgs, nil } -func (w *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error { - msgs, err := buildKafkaMessages(&w.cfg, rawMsgs) +func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error { + kafkaMsgs, err := b.buildKafkaMessages(rawMsgs) if err != nil { return fmt.Errorf("failed to encode kafka messages: %w", err) } - return w.WriteMessages(ctx, msgs) + + return b.WriteMessages(ctx, kafkaMsgs) } -func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error { - chunkSize := w.cfg.GetPublishSize() +func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error { + chunkSize := b.cfg.GetPublishSize() if chunkSize < 1 { return fmt.Errorf("chunk size is too small") } @@ -104,13 +107,13 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e time.Sleep(sleepDuration) if retryableError(kafkaErr) { - if reloadErr := w.reload(ctx); reloadErr != nil { + if reloadErr := b.reload(ctx); reloadErr != nil { slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr)) } } } - kafkaErr = w.writer.WriteMessages(ctx, chunk...) + kafkaErr = b.writer.WriteMessages(ctx, chunk...) if kafkaErr == nil { tags["what"] = "success" break @@ -123,7 +126,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e } } - w.statsD.Count("kafka.publish", int64(len(chunk)), tags) + b.statsD.Count("kafka.publish", int64(len(chunk)), tags) if kafkaErr != nil { return fmt.Errorf("failed to write message: %w, approxSize: %d", kafkaErr, size.GetApproxSize(chunk)) } @@ -136,7 +139,7 @@ type messageIterator interface { Next() ([]lib.RawMessage, error) } -func (w *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) (int, error) { +func (b *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) (int, error) { start := time.Now() var count int for iter.HasNext() { @@ -145,7 +148,7 @@ func (w *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) ( return 0, fmt.Errorf("failed to iterate over messages: %w", err) } else if len(msgs) > 0 { - if err = w.WriteRawMessages(ctx, msgs); err != nil { + if err = b.WriteRawMessages(ctx, msgs); err != nil { return 0, fmt.Errorf("failed to write messages to kafka: %w", err) } count += len(msgs)