Skip to content

Commit

Permalink
Combine more
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Mar 30, 2024
1 parent df5db03 commit 8b991ea
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
11 changes: 7 additions & 4 deletions lib/kafkalib/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ package kafkalib

import (
"encoding/json"
"fmt"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/segmentio/kafka-go"
)

func newMessage(topic string, partitionKey map[string]any, value any) (kafka.Message, error) {
valueBytes, err := json.Marshal(value)
func newMessage(cfg config.Kafka, rawMessage lib.RawMessage) (kafka.Message, error) {
valueBytes, err := json.Marshal(rawMessage.GetPayload())
if err != nil {
return kafka.Message{}, err
}

keyBytes, err := json.Marshal(partitionKey)
keyBytes, err := json.Marshal(rawMessage.PartitionKey)
if err != nil {
return kafka.Message{}, err
}

return kafka.Message{
Topic: topic,
Topic: fmt.Sprintf("%s.%s", cfg.TopicPrefix, rawMessage.TopicSuffix),
Key: keyBytes,
Value: valueBytes,
}, nil
Expand Down
26 changes: 16 additions & 10 deletions lib/kafkalib/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,31 @@ package kafkalib
import (
"testing"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/stretchr/testify/assert"
)

func TestNewMessage(t *testing.T) {
payload := util.SchemaEventPayload{
Payload: util.Payload{
After: map[string]any{"a": "b"},
Source: util.Source{
TsMs: 1000,
Table: "table",
rawMessage := lib.NewRawMessage(
"topic-suffix",
map[string]any{"key": "value"},
util.SchemaEventPayload{
Payload: util.Payload{
After: map[string]any{"a": "b"},
Source: util.Source{
TsMs: 1000,
Table: "table",
},
Operation: "c",
},
Operation: "c",
},
}
)

msg, err := newMessage("topic", map[string]any{"key": "value"}, payload)
msg, err := newMessage(config.Kafka{TopicPrefix: "topic-prefix"}, rawMessage)
assert.NoError(t, err)
assert.Equal(t, "topic", msg.Topic)
assert.Equal(t, "topic-prefix.topic-suffix", msg.Topic)
assert.Equal(t, `{"key":"value"}`, string(msg.Key))
assert.Equal(t, `{"schema":{"type":"","fields":null},"payload":{"before":null,"after":{"a":"b"},"source":{"connector":"","ts_ms":1000,"db":"","schema":"","table":"table"},"op":"c"}}`, string(msg.Value))
}
21 changes: 5 additions & 16 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,14 @@ func (b *BatchWriter) reload(ctx context.Context) error {
return nil
}

func (b *BatchWriter) buildKafkaMessages(rawMsgs []lib.RawMessage) ([]kafka.Message, error) {
var kafkaMsgs []kafka.Message
func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error {
var msgs []kafka.Message
for _, rawMsg := range rawMsgs {
topic := fmt.Sprintf("%s.%s", b.cfg.TopicPrefix, rawMsg.TopicSuffix)
kafkaMsg, err := newMessage(topic, rawMsg.PartitionKey, rawMsg.GetPayload())
kafkaMsg, err := newMessage(b.cfg, rawMsg)
if err != nil {
return nil, err
return fmt.Errorf("failed to encode kafka messages: %w", err)
}

kafkaMsgs = append(kafkaMsgs, kafkaMsg)
}

return kafkaMsgs, nil
}

func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error {
msgs, err := b.buildKafkaMessages(rawMsgs)
if err != nil {
return fmt.Errorf("failed to encode kafka messages: %w", err)
msgs = append(msgs, kafkaMsg)
}

chunkSize := b.cfg.GetPublishSize()
Expand Down

0 comments on commit 8b991ea

Please sign in to comment.