diff --git a/lib/cdc/event.go b/lib/cdc/event.go index f32ee4924..a261377ad 100644 --- a/lib/cdc/event.go +++ b/lib/cdc/event.go @@ -13,7 +13,7 @@ import ( type Format interface { Labels() []string // Labels() to return a list of strings to maintain backward compatibility. GetPrimaryKey(key []byte, tc *kafkalib.TopicConfig) (map[string]any, error) - GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (Event, error) + GetEventFromBytes(bytes []byte) (Event, error) } type Event interface { diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index e416dfc29..5dcc6e045 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -3,26 +3,23 @@ package mongo import ( "encoding/json" "fmt" + "reflect" "time" - "go.mongodb.org/mongo-driver/bson" - - "github.com/artie-labs/transfer/lib/debezium" - - "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/ext" "github.com/artie-labs/transfer/lib/typing/mongo" + "go.mongodb.org/mongo-driver/bson" ) type Debezium string -func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (cdc.Event, error) { +func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var schemaEventPayload SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") @@ -48,16 +45,21 @@ func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byt return nil, fmt.Errorf("failed to call mongo JSONEToMap: %w", err) } - // Now, we need to iterate over each key and if the value is JSON - // We need to parse the JSON into a string format + // Now, let's iterate over each key. If the value is a map, we'll need to JSON marshal it. + // We do this to ensure parity with how relational Debezium emits the message. for key, value := range after { - if typing.ParseValue(typingSettings, key, nil, value) == typing.Struct { - valBytes, err := json.Marshal(value) - if err != nil { - return nil, fmt.Errorf("failed to marshal: %w", err) + switch value.(type) { + case nil, string, int, int32, int64, float32, float64, bool: + continue + default: + if reflect.TypeOf(value).Kind() == reflect.Map { + valBytes, err := json.Marshal(value) + if err != nil { + return nil, fmt.Errorf("failed to marshal: %w", err) + } + + after[key] = string(valBytes) } - - after[key] = string(valBytes) } } diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index 336ea4429..95b84220a 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -8,12 +8,8 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/debezium" - "github.com/artie-labs/transfer/lib/typing" - - "github.com/stretchr/testify/assert" - "go.mongodb.org/mongo-driver/bson" - "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/stretchr/testify/assert" ) func (m *MongoTestSuite) TestGetPrimaryKey() { @@ -84,16 +80,6 @@ func (m *MongoTestSuite) TestSource_GetExecutionTime() { 18, 6, 35, 21, 0, time.UTC), schemaEvtPayload.GetExecutionTime()) } -func (m *MongoTestSuite) TestBsonTypes() { - var tsMap map[string]any - bsonData := []byte(` -{"_id": {"$numberLong": "10004"}, "order_date": {"$date": 1456012800000},"purchaser_id": {"$numberLong": "1003"},"quantity": 1,"product_id": {"$numberLong": "107"}} -`) - - err := bson.UnmarshalExtJSON(bsonData, false, &tsMap) - assert.NoError(m.T(), err) -} - func (m *MongoTestSuite) TestMongoDBEventOrder() { payload := ` { @@ -125,7 +111,7 @@ func (m *MongoTestSuite) TestMongoDBEventOrder() { } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) schemaEvt, isOk := evt.(*SchemaEventPayload) @@ -141,7 +127,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() { "schema": {}, "payload": { "before": null, - "after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"robin@example.com\", \"nested\": {\"object\": \"foo\"}}", + "after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"robin@example.com\", \"nested\": {\"object\": \"foo\"}, \"nil\": null}", "patch": null, "filter": null, "updateDescription": null, @@ -166,7 +152,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() { } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) assert.NoError(m.T(), err) @@ -235,7 +221,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore_NoData() { } } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) { // Making sure the `before` payload is set. @@ -288,7 +274,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() { } } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) { // Making sure the `before` payload is set. @@ -326,7 +312,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() { } func (m *MongoTestSuite) TestGetEventFromBytesTombstone() { - _, err := m.Debezium.GetEventFromBytes(typing.Settings{}, nil) + _, err := m.Debezium.GetEventFromBytes(nil) assert.ErrorContains(m.T(), err, "empty message") } @@ -516,7 +502,7 @@ func (m *MongoTestSuite) TestMongoDBEventWithSchema() { } } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) schemaEvt, isOk := evt.(*SchemaEventPayload) assert.True(m.T(), isOk) diff --git a/lib/cdc/relational/debezium.go b/lib/cdc/relational/debezium.go index 29731befa..5ba5a65e0 100644 --- a/lib/cdc/relational/debezium.go +++ b/lib/cdc/relational/debezium.go @@ -9,12 +9,11 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/typing" ) type Debezium string -func (d *Debezium) GetEventFromBytes(_ typing.Settings, bytes []byte) (cdc.Event, error) { +func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var event util.SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") diff --git a/lib/cdc/relational/debezium_test.go b/lib/cdc/relational/debezium_test.go index 4d802a37f..b40eefc56 100644 --- a/lib/cdc/relational/debezium_test.go +++ b/lib/cdc/relational/debezium_test.go @@ -19,7 +19,7 @@ var validTc = &kafkalib.TopicConfig{ } func (r *RelationTestSuite) TestGetEventFromBytesTombstone() { - _, err := r.GetEventFromBytes(typing.Settings{}, nil) + _, err := r.GetEventFromBytes(nil) assert.ErrorContains(r.T(), err, "empty message") } @@ -83,7 +83,7 @@ func (r *RelationTestSuite) TestPostgresEvent() { } } ` - evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := r.Debezium.GetEventFromBytes([]byte(payload)) assert.Nil(r.T(), err) assert.False(r.T(), evt.DeletePayload()) @@ -189,7 +189,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() { } } ` - evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := r.Debezium.GetEventFromBytes([]byte(payload)) assert.Nil(r.T(), err) assert.False(r.T(), evt.DeletePayload()) @@ -513,7 +513,7 @@ func (r *RelationTestSuite) TestGetEventFromBytes_MySQL() { "transaction": null } }` - evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := r.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(r.T(), err) assert.Equal(r.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime()) assert.Equal(r.T(), "customers", evt.GetTableName()) diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 2e37945d6..b2bf96f73 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -51,8 +51,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo return "", fmt.Errorf("cannot unmarshall key %s: %w", string(p.Msg.Key()), err) } - typingSettings := cfg.SharedTransferConfig.TypingSettings - _event, err := topicConfig.GetEventFromBytes(typingSettings, p.Msg.Value()) + _event, err := topicConfig.GetEventFromBytes(p.Msg.Value()) if err != nil { tags["what"] = "marshall_value_err" return "", fmt.Errorf("cannot unmarshall event: %w", err)