diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index abad1caa..fc9ca961 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -90,23 +90,27 @@ func buildKafkaMessage(topicPrefix string, rawMessage lib.RawMessage) (KafkaMess } return KafkaMessage{ - Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()), - Key: keyBytes, - Value: valueBytes, + Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()), + MessageKey: keyBytes, + MessageValue: valueBytes, }, nil } type KafkaMessage struct { - Topic string `json:"topic"` - Key []byte `json:"key"` - Value []byte `json:"value"` + Topic string `json:"topic"` + MessageKey []byte `json:"messageKey"` + MessageValue []byte `json:"messageValue"` +} + +func (k KafkaMessage) Key() string { + return string(k.MessageKey) } func (k KafkaMessage) toKafkaMessage() kafka.Message { return kafka.Message{ Topic: k.Topic, - Key: k.Key, - Value: k.Value, + Key: k.MessageKey, + Value: k.MessageValue, } } @@ -120,7 +124,7 @@ func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sample return err } - return batch.BySize[KafkaMessage](messages, int(b.writer.BatchBytes), encoder, func(chunk [][]byte) error { + return batch.BySize[KafkaMessage](messages, int(b.writer.BatchBytes), false, encoder, func(chunk [][]byte) error { tags := map[string]string{"what": "error"} defer func() { if b.statsD != nil { @@ -141,13 +145,10 @@ func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sample err = retry.WithRetries(retryCfg, func(_ int, _ error) error { publishErr := b.writer.WriteMessages(ctx, kafkaMessages...) - if isExceedMaxMessageBytesErr(publishErr) { - slog.Info("Skipping this batch since the message size exceeded the server max") - return nil - } - - if err = b.reload(ctx); err != nil { - return fmt.Errorf("failed to reload kafka writer: %w", err) + if isRetryableError(publishErr) { + if err = b.reload(ctx); err != nil { + return fmt.Errorf("failed to reload kafka writer: %w", err) + } } return publishErr