Skip to content

Commit

Permalink
More minor clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Mar 25, 2024
1 parent c1815ca commit 0dc26a1
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 23 deletions.
2 changes: 1 addition & 1 deletion lib/kafkalib/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/segmentio/kafka-go"
)

func NewMessage(topic string, partitionKey map[string]any, value any) (kafka.Message, error) {
func newMessage(topic string, partitionKey map[string]any, value any) (kafka.Message, error) {
valueBytes, err := json.Marshal(value)
if err != nil {
return kafka.Message{}, err
Expand Down
2 changes: 1 addition & 1 deletion lib/kafkalib/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestNewMessage(t *testing.T) {
},
}

msg, err := NewMessage("topic", map[string]any{"key": "value"}, payload)
msg, err := newMessage("topic", map[string]any{"key": "value"}, payload)
assert.NoError(t, err)
assert.Equal(t, "topic", msg.Topic)
assert.Equal(t, `{"key":"value"}`, string(msg.Key))
Expand Down
45 changes: 24 additions & 21 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,44 +39,47 @@ func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD mtr.Client) (*
return &BatchWriter{writer, cfg, statsD}, nil
}

func (w *BatchWriter) reload(ctx context.Context) error {
func (b *BatchWriter) reload(ctx context.Context) error {
slog.Info("Reloading kafka writer")
if err := w.writer.Close(); err != nil {
if err := b.writer.Close(); err != nil {
return err
}

writer, err := NewWriter(ctx, w.cfg)
writer, err := NewWriter(ctx, b.cfg)
if err != nil {
return err
}

w.writer = writer
b.writer = writer
return nil
}

func buildKafkaMessages(cfg *config.Kafka, msgs []lib.RawMessage) ([]kafka.Message, error) {
result := make([]kafka.Message, len(msgs))
for i, msg := range msgs {
topic := fmt.Sprintf("%s.%s", cfg.TopicPrefix, msg.TopicSuffix)
kMsg, err := NewMessage(topic, msg.PartitionKey, msg.GetPayload())
func (b *BatchWriter) buildKafkaMessages(rawMsgs []lib.RawMessage) ([]kafka.Message, error) {
var kafkaMsgs []kafka.Message
for _, rawMsg := range rawMsgs {
topic := fmt.Sprintf("%s.%s", b.cfg.TopicPrefix, rawMsg.TopicSuffix)
kafkaMsg, err := newMessage(topic, rawMsg.PartitionKey, rawMsg.GetPayload())
if err != nil {
return nil, err
}
result[i] = kMsg

kafkaMsgs = append(kafkaMsgs, kafkaMsg)
}
return result, nil

return kafkaMsgs, nil
}

func (w *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error {
msgs, err := buildKafkaMessages(&w.cfg, rawMsgs)
func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error {
kafkaMsgs, err := b.buildKafkaMessages(rawMsgs)
if err != nil {
return fmt.Errorf("failed to encode kafka messages: %w", err)
}
return w.WriteMessages(ctx, msgs)

return b.WriteMessages(ctx, kafkaMsgs)
}

func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error {
chunkSize := w.cfg.GetPublishSize()
func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error {
chunkSize := b.cfg.GetPublishSize()
if chunkSize < 1 {
return fmt.Errorf("chunk size is too small")
}
Expand Down Expand Up @@ -104,13 +107,13 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
time.Sleep(sleepDuration)

if retryableError(kafkaErr) {
if reloadErr := w.reload(ctx); reloadErr != nil {
if reloadErr := b.reload(ctx); reloadErr != nil {
slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr))
}
}
}

kafkaErr = w.writer.WriteMessages(ctx, chunk...)
kafkaErr = b.writer.WriteMessages(ctx, chunk...)
if kafkaErr == nil {
tags["what"] = "success"
break
Expand All @@ -123,7 +126,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
}
}

w.statsD.Count("kafka.publish", int64(len(chunk)), tags)
b.statsD.Count("kafka.publish", int64(len(chunk)), tags)
if kafkaErr != nil {
return fmt.Errorf("failed to write message: %w, approxSize: %d", kafkaErr, size.GetApproxSize(chunk))
}
Expand All @@ -136,7 +139,7 @@ type messageIterator interface {
Next() ([]lib.RawMessage, error)
}

func (w *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) (int, error) {
func (b *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) (int, error) {
start := time.Now()
var count int
for iter.HasNext() {
Expand All @@ -145,7 +148,7 @@ func (w *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) (
return 0, fmt.Errorf("failed to iterate over messages: %w", err)

} else if len(msgs) > 0 {
if err = w.WriteRawMessages(ctx, msgs); err != nil {
if err = b.WriteRawMessages(ctx, msgs); err != nil {
return 0, fmt.Errorf("failed to write messages to kafka: %w", err)
}
count += len(msgs)
Expand Down

0 comments on commit 0dc26a1

Please sign in to comment.