Skip to content

Commit

Permalink
Wrapper.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 4, 2024
1 parent d53aa52 commit 4cd1dd7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
27 changes: 14 additions & 13 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,53 +78,54 @@ func (b *BatchWriter) reload(ctx context.Context) error {
return nil
}

func buildKafkaMessage(topicPrefix string, rawMessage lib.RawMessage) (KafkaMessage, error) {
func buildKafkaMessageWrapper(topicPrefix string, rawMessage lib.RawMessage) (KafkaMessageWrapper, error) {
valueBytes, err := json.Marshal(rawMessage.Event())
if err != nil {
return KafkaMessage{}, err
return KafkaMessageWrapper{}, err
}

keyBytes, err := json.Marshal(rawMessage.PartitionKey())
if err != nil {
return KafkaMessage{}, err
return KafkaMessageWrapper{}, err
}

return KafkaMessage{
return KafkaMessageWrapper{
Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()),
MessageKey: keyBytes,
MessageValue: valueBytes,
}, nil
}

type KafkaMessage struct {
// KafkaMessageWrapper is a wrapper around a Kafka message. We did this so that we can marshal and unmarshal the message
type KafkaMessageWrapper struct {
Topic string `json:"topic"`
MessageKey []byte `json:"messageKey"`
MessageValue []byte `json:"messageValue"`
}

func (k KafkaMessage) Key() string {
func (k KafkaMessageWrapper) Key() string {
return string(k.MessageKey)
}

func (k KafkaMessage) toKafkaMessage() kafka.Message {
func (k KafkaMessageWrapper) toKafkaMessage() kafka.Message {
return kafka.Message{
Topic: k.Topic,
Key: k.MessageKey,
Value: k.MessageValue,
}
}

var encoder = func(msg KafkaMessage) ([]byte, error) {
var encoder = func(msg KafkaMessageWrapper) ([]byte, error) {
return json.Marshal(msg)
}

func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sampleExecutionTime time.Time) error {
func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessageWrapper, sampleExecutionTime time.Time) error {
retryCfg, err := retry.NewJitterRetryConfig(100, 5000, 10, retry.AlwaysRetry)
if err != nil {
return err
}

return batch.BySize[KafkaMessage](messages, int(b.writer.BatchBytes), false, encoder, func(chunk [][]byte) error {
return batch.BySize[KafkaMessageWrapper](messages, int(b.writer.BatchBytes), false, encoder, func(chunk [][]byte) error {
tags := map[string]string{"what": "error"}
defer func() {
if b.statsD != nil {
Expand All @@ -135,7 +136,7 @@ func (b *BatchWriter) write(ctx context.Context, messages []KafkaMessage, sample

var kafkaMessages []kafka.Message
for _, bytes := range chunk {
var msg KafkaMessage
var msg KafkaMessageWrapper
if err = json.Unmarshal(bytes, &msg); err != nil {
return fmt.Errorf("failed to unmarshal message: %w", err)
}
Expand Down Expand Up @@ -168,11 +169,11 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error
return nil
}

var msgs []KafkaMessage
var msgs []KafkaMessageWrapper
var sampleExecutionTime time.Time
for _, rawMsg := range rawMsgs {
sampleExecutionTime = rawMsg.Event().GetExecutionTime()
msg, err := buildKafkaMessage(b.cfg.TopicPrefix, rawMsg)
msg, err := buildKafkaMessageWrapper(b.cfg.TopicPrefix, rawMsg)
if err != nil {
return fmt.Errorf("failed to build kafka message: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kafkalib/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNewMessage(t *testing.T) {
},
)

msg, err := buildKafkaMessage("topic-prefix", rawMessage)
msg, err := buildKafkaMessageWrapper("topic-prefix", rawMessage)
assert.NoError(t, err)
assert.Equal(t, "topic-prefix.topic-suffix", msg.Topic)
assert.Equal(t, `{"key":"value"}`, string(msg.MessageKey))
Expand Down

0 comments on commit 4cd1dd7

Please sign in to comment.