From 8fed478ebad6a32a6da7b6459963830133d12177 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 30 Jul 2024 15:49:59 -0700 Subject: [PATCH] Clean up. --- lib/cdc/event.go | 2 +- lib/cdc/mongo/debezium.go | 14 +++++--------- lib/cdc/mongo/debezium_test.go | 14 ++++++-------- lib/cdc/relational/debezium.go | 3 +-- lib/cdc/relational/debezium_test.go | 8 ++++---- processes/consumer/process.go | 3 +-- 6 files changed, 18 insertions(+), 26 deletions(-) diff --git a/lib/cdc/event.go b/lib/cdc/event.go index f32ee4924..a261377ad 100644 --- a/lib/cdc/event.go +++ b/lib/cdc/event.go @@ -13,7 +13,7 @@ import ( type Format interface { Labels() []string // Labels() to return a list of strings to maintain backward compatibility. GetPrimaryKey(key []byte, tc *kafkalib.TopicConfig) (map[string]any, error) - GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (Event, error) + GetEventFromBytes(bytes []byte) (Event, error) } type Event interface { diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index 604d0a99e..1ba8d12b7 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -6,24 +6,20 @@ import ( "reflect" "time" - "go.mongodb.org/mongo-driver/bson" - - "github.com/artie-labs/transfer/lib/debezium" - - "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/ext" "github.com/artie-labs/transfer/lib/typing/mongo" + "go.mongodb.org/mongo-driver/bson" ) type Debezium string -func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (cdc.Event, error) { +func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var schemaEventPayload SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index 41c9bbf53..a95fc8a92 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -8,8 +8,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/debezium" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/stretchr/testify/assert" ) @@ -113,7 +111,7 @@ func (m *MongoTestSuite) TestMongoDBEventOrder() { } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) schemaEvt, isOk := evt.(*SchemaEventPayload) @@ -154,7 +152,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() { } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) assert.NoError(m.T(), err) @@ -223,7 +221,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore_NoData() { } } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) { // Making sure the `before` payload is set. @@ -276,7 +274,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() { } } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) { // Making sure the `before` payload is set. @@ -314,7 +312,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() { } func (m *MongoTestSuite) TestGetEventFromBytesTombstone() { - _, err := m.Debezium.GetEventFromBytes(typing.Settings{}, nil) + _, err := m.Debezium.GetEventFromBytes(nil) assert.ErrorContains(m.T(), err, "empty message") } @@ -504,7 +502,7 @@ func (m *MongoTestSuite) TestMongoDBEventWithSchema() { } } ` - evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) schemaEvt, isOk := evt.(*SchemaEventPayload) assert.True(m.T(), isOk) diff --git a/lib/cdc/relational/debezium.go b/lib/cdc/relational/debezium.go index 29731befa..5ba5a65e0 100644 --- a/lib/cdc/relational/debezium.go +++ b/lib/cdc/relational/debezium.go @@ -9,12 +9,11 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/typing" ) type Debezium string -func (d *Debezium) GetEventFromBytes(_ typing.Settings, bytes []byte) (cdc.Event, error) { +func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var event util.SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") diff --git a/lib/cdc/relational/debezium_test.go b/lib/cdc/relational/debezium_test.go index 4d802a37f..b40eefc56 100644 --- a/lib/cdc/relational/debezium_test.go +++ b/lib/cdc/relational/debezium_test.go @@ -19,7 +19,7 @@ var validTc = &kafkalib.TopicConfig{ } func (r *RelationTestSuite) TestGetEventFromBytesTombstone() { - _, err := r.GetEventFromBytes(typing.Settings{}, nil) + _, err := r.GetEventFromBytes(nil) assert.ErrorContains(r.T(), err, "empty message") } @@ -83,7 +83,7 @@ func (r *RelationTestSuite) TestPostgresEvent() { } } ` - evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := r.Debezium.GetEventFromBytes([]byte(payload)) assert.Nil(r.T(), err) assert.False(r.T(), evt.DeletePayload()) @@ -189,7 +189,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() { } } ` - evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := r.Debezium.GetEventFromBytes([]byte(payload)) assert.Nil(r.T(), err) assert.False(r.T(), evt.DeletePayload()) @@ -513,7 +513,7 @@ func (r *RelationTestSuite) TestGetEventFromBytes_MySQL() { "transaction": null } }` - evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) + evt, err := r.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(r.T(), err) assert.Equal(r.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime()) assert.Equal(r.T(), "customers", evt.GetTableName()) diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 2e37945d6..b2bf96f73 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -51,8 +51,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo return "", fmt.Errorf("cannot unmarshall key %s: %w", string(p.Msg.Key()), err) } - typingSettings := cfg.SharedTransferConfig.TypingSettings - _event, err := topicConfig.GetEventFromBytes(typingSettings, p.Msg.Value()) + _event, err := topicConfig.GetEventFromBytes(p.Msg.Value()) if err != nil { tags["what"] = "marshall_value_err" return "", fmt.Errorf("cannot unmarshall event: %w", err)