Skip to content

Commit

Permalink
[MongoDB] Remove calling typing.ParseValue (#829)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jul 30, 2024
1 parent 3b7e57d commit be7956d
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 48 deletions.
2 changes: 1 addition & 1 deletion lib/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type Format interface {
Labels() []string // Labels() to return a list of strings to maintain backward compatibility.
GetPrimaryKey(key []byte, tc *kafkalib.TopicConfig) (map[string]any, error)
GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (Event, error)
GetEventFromBytes(bytes []byte) (Event, error)
}

type Event interface {
Expand Down
36 changes: 19 additions & 17 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,23 @@ package mongo
import (
"encoding/json"
"fmt"
"reflect"
"time"

"go.mongodb.org/mongo-driver/bson"

"github.com/artie-labs/transfer/lib/debezium"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/artie-labs/transfer/lib/typing/mongo"
"go.mongodb.org/mongo-driver/bson"
)

type Debezium string

func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (cdc.Event, error) {
func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var schemaEventPayload SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
Expand All @@ -48,16 +45,21 @@ func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byt
return nil, fmt.Errorf("failed to call mongo JSONEToMap: %w", err)
}

// Now, we need to iterate over each key and if the value is JSON
// We need to parse the JSON into a string format
// Now, let's iterate over each key. If the value is a map, we'll need to JSON marshal it.
// We do this to ensure parity with how relational Debezium emits the message.
for key, value := range after {
if typing.ParseValue(typingSettings, key, nil, value) == typing.Struct {
valBytes, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
switch value.(type) {
case nil, string, int, int32, int64, float32, float64, bool:
continue
default:
if reflect.TypeOf(value).Kind() == reflect.Map {
valBytes, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
}

after[key] = string(valBytes)
}

after[key] = string(valBytes)
}
}

Expand Down
30 changes: 8 additions & 22 deletions lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ import (

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/typing"

"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"

"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/stretchr/testify/assert"
)

func (m *MongoTestSuite) TestGetPrimaryKey() {
Expand Down Expand Up @@ -84,16 +80,6 @@ func (m *MongoTestSuite) TestSource_GetExecutionTime() {
18, 6, 35, 21, 0, time.UTC), schemaEvtPayload.GetExecutionTime())
}

func (m *MongoTestSuite) TestBsonTypes() {
var tsMap map[string]any
bsonData := []byte(`
{"_id": {"$numberLong": "10004"}, "order_date": {"$date": 1456012800000},"purchaser_id": {"$numberLong": "1003"},"quantity": 1,"product_id": {"$numberLong": "107"}}
`)

err := bson.UnmarshalExtJSON(bsonData, false, &tsMap)
assert.NoError(m.T(), err)
}

func (m *MongoTestSuite) TestMongoDBEventOrder() {
payload := `
{
Expand Down Expand Up @@ -125,7 +111,7 @@ func (m *MongoTestSuite) TestMongoDBEventOrder() {
}
`

evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)

schemaEvt, isOk := evt.(*SchemaEventPayload)
Expand All @@ -141,7 +127,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() {
"schema": {},
"payload": {
"before": null,
"after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"[email protected]\", \"nested\": {\"object\": \"foo\"}}",
"after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"[email protected]\", \"nested\": {\"object\": \"foo\"}, \"nil\": null}",
"patch": null,
"filter": null,
"updateDescription": null,
Expand All @@ -166,7 +152,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() {
}
`

evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(m.T(), err)
Expand Down Expand Up @@ -235,7 +221,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore_NoData() {
}
}
`
evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
{
// Making sure the `before` payload is set.
Expand Down Expand Up @@ -288,7 +274,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() {
}
}
`
evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
{
// Making sure the `before` payload is set.
Expand Down Expand Up @@ -326,7 +312,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() {
}

func (m *MongoTestSuite) TestGetEventFromBytesTombstone() {
_, err := m.Debezium.GetEventFromBytes(typing.Settings{}, nil)
_, err := m.Debezium.GetEventFromBytes(nil)
assert.ErrorContains(m.T(), err, "empty message")
}

Expand Down Expand Up @@ -516,7 +502,7 @@ func (m *MongoTestSuite) TestMongoDBEventWithSchema() {
}
}
`
evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
schemaEvt, isOk := evt.(*SchemaEventPayload)
assert.True(m.T(), isOk)
Expand Down
3 changes: 1 addition & 2 deletions lib/cdc/relational/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
)

type Debezium string

func (d *Debezium) GetEventFromBytes(_ typing.Settings, bytes []byte) (cdc.Event, error) {
func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var event util.SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
Expand Down
8 changes: 4 additions & 4 deletions lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var validTc = &kafkalib.TopicConfig{
}

func (r *RelationTestSuite) TestGetEventFromBytesTombstone() {
_, err := r.GetEventFromBytes(typing.Settings{}, nil)
_, err := r.GetEventFromBytes(nil)
assert.ErrorContains(r.T(), err, "empty message")
}

Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *RelationTestSuite) TestPostgresEvent() {
}
}
`
evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := r.Debezium.GetEventFromBytes([]byte(payload))
assert.Nil(r.T(), err)
assert.False(r.T(), evt.DeletePayload())

Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
}
}
`
evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := r.Debezium.GetEventFromBytes([]byte(payload))
assert.Nil(r.T(), err)
assert.False(r.T(), evt.DeletePayload())

Expand Down Expand Up @@ -513,7 +513,7 @@ func (r *RelationTestSuite) TestGetEventFromBytes_MySQL() {
"transaction": null
}
}`
evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := r.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(r.T(), err)
assert.Equal(r.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime())
assert.Equal(r.T(), "customers", evt.GetTableName())
Expand Down
3 changes: 1 addition & 2 deletions processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
return "", fmt.Errorf("cannot unmarshall key %s: %w", string(p.Msg.Key()), err)
}

typingSettings := cfg.SharedTransferConfig.TypingSettings
_event, err := topicConfig.GetEventFromBytes(typingSettings, p.Msg.Value())
_event, err := topicConfig.GetEventFromBytes(p.Msg.Value())
if err != nil {
tags["what"] = "marshall_value_err"
return "", fmt.Errorf("cannot unmarshall event: %w", err)
Expand Down

0 comments on commit be7956d

Please sign in to comment.