diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index fc9ca961..4eb2d588 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -78,35 +78,36 @@ func (b *BatchWriter) reload(ctx context.Context) error { return nil } -func buildKafkaMessage(topicPrefix string, rawMessage lib.RawMessage) (KafkaMessage, error) { +func buildKafkaMessageWrapper(topicPrefix string, rawMessage lib.RawMessage) (KafkaMessageWrapper, error) { valueBytes, err := json.Marshal(rawMessage.Event()) if err != nil { - return KafkaMessage{}, err + return KafkaMessageWrapper{}, err } keyBytes, err := json.Marshal(rawMessage.PartitionKey()) if err != nil { - return KafkaMessage{}, err + return KafkaMessageWrapper{}, err } - return KafkaMessage{ + return KafkaMessageWrapper{ Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()), MessageKey: keyBytes, MessageValue: valueBytes, }, nil } -type KafkaMessage struct { +// KafkaMessageWrapper is a wrapper around a Kafka message. We did this so that we can marshal and unmarshal the message +type KafkaMessageWrapper struct { Topic string `json:"topic"` MessageKey []byte `json:"messageKey"` MessageValue []byte `json:"messageValue"` } -func (k KafkaMessage) Key() string { +func (k KafkaMessageWrapper) Key() string { return string(k.MessageKey) } -func (k KafkaMessage) toKafkaMessage() kafka.Message { +func (k KafkaMessageWrapper) toKafkaMessage() kafka.Message { return kafka.Message{ Topic: k.Topic, Key: k.MessageKey, @@ -114,17 +115,17 @@ func (k KafkaMessage) toKafkaMessage() kafka.Message { } } -var encoder = func(msg KafkaMessage) ([]byte, error) { +var encoder = func(msg KafkaMessageWrapper) ([]byte, error) { return json.Marshal(msg) } -func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sampleExecutionTime time.Time) error { +func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessageWrapper, sampleExecutionTime time.Time) error { retryCfg, err := retry.NewJitterRetryConfig(100, 5000, 10, retry.AlwaysRetry) if err != nil { return err } - return batch.BySize[KafkaMessage](messages, int(b.writer.BatchBytes), false, encoder, func(chunk [][]byte) error { + return batch.BySize[KafkaMessageWrapper](messages, int(b.writer.BatchBytes), false, encoder, func(chunk [][]byte) error { tags := map[string]string{"what": "error"} defer func() { if b.statsD != nil { @@ -135,7 +136,7 @@ func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sample var kafkaMessages []kafka.Message for _, bytes := range chunk { - var msg KafkaMessage + var msg KafkaMessageWrapper if err = json.Unmarshal(bytes, &msg); err != nil { return fmt.Errorf("failed to unmarshal message: %w", err) } @@ -168,11 +169,11 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error return nil } - var msgs []KafkaMessage + var msgs []KafkaMessageWrapper var sampleExecutionTime time.Time for _, rawMsg := range rawMsgs { sampleExecutionTime = rawMsg.Event().GetExecutionTime() - msg, err := buildKafkaMessage(b.cfg.TopicPrefix, rawMsg) + msg, err := buildKafkaMessageWrapper(b.cfg.TopicPrefix, rawMsg) if err != nil { return fmt.Errorf("failed to build kafka message: %w", err) } diff --git a/lib/kafkalib/writer_test.go b/lib/kafkalib/writer_test.go index 29198c74..e099e9c8 100644 --- a/lib/kafkalib/writer_test.go +++ b/lib/kafkalib/writer_test.go @@ -24,7 +24,7 @@ func TestNewMessage(t *testing.T) { }, ) - msg, err := buildKafkaMessage("topic-prefix", rawMessage) + msg, err := buildKafkaMessageWrapper("topic-prefix", rawMessage) assert.NoError(t, err) assert.Equal(t, "topic-prefix.topic-suffix", msg.Topic) assert.Equal(t, `{"key":"value"}`, string(msg.MessageKey))