Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 4, 2024
1 parent f86daa2 commit 55d21e2
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,27 @@ func buildKafkaMessage(topicPrefix string, rawMessage lib.RawMessage) (KafkaMess
}

return KafkaMessage{
Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()),
Key: keyBytes,
Value: valueBytes,
Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()),
MessageKey: keyBytes,
MessageValue: valueBytes,
}, nil
}

type KafkaMessage struct {
Topic string `json:"topic"`
Key []byte `json:"key"`
Value []byte `json:"value"`
Topic string `json:"topic"`
MessageKey []byte `json:"messageKey"`
MessageValue []byte `json:"messageValue"`
}

func (k KafkaMessage) Key() string {
return string(k.MessageKey)
}

func (k KafkaMessage) toKafkaMessage() kafka.Message {
return kafka.Message{
Topic: k.Topic,
Key: k.Key,
Value: k.Value,
Key: k.MessageKey,
Value: k.MessageValue,
}
}

Expand All @@ -120,7 +124,7 @@ func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sample
return err
}

return batch.BySize[KafkaMessage](messages, int(b.writer.BatchBytes), encoder, func(chunk [][]byte) error {
return batch.BySize[KafkaMessage](messages, int(b.writer.BatchBytes), false, encoder, func(chunk [][]byte) error {
tags := map[string]string{"what": "error"}
defer func() {
if b.statsD != nil {
Expand All @@ -141,13 +145,10 @@ func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sample

err = retry.WithRetries(retryCfg, func(_ int, _ error) error {
publishErr := b.writer.WriteMessages(ctx, kafkaMessages...)
if isExceedMaxMessageBytesErr(publishErr) {
slog.Info("Skipping this batch since the message size exceeded the server max")
return nil
}

if err = b.reload(ctx); err != nil {
return fmt.Errorf("failed to reload kafka writer: %w", err)
if isRetryableError(publishErr) {
if err = b.reload(ctx); err != nil {
return fmt.Errorf("failed to reload kafka writer: %w", err)
}
}

return publishErr
Expand Down

0 comments on commit 55d21e2

Please sign in to comment.