Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 30, 2024
1 parent b638fed commit 8fed478
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 26 deletions.
2 changes: 1 addition & 1 deletion lib/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type Format interface {
Labels() []string // Labels() to return a list of strings to maintain backward compatibility.
GetPrimaryKey(key []byte, tc *kafkalib.TopicConfig) (map[string]any, error)
GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (Event, error)
GetEventFromBytes(bytes []byte) (Event, error)
}

type Event interface {
Expand Down
14 changes: 5 additions & 9 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,20 @@ import (
"reflect"
"time"

"go.mongodb.org/mongo-driver/bson"

"github.com/artie-labs/transfer/lib/debezium"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/artie-labs/transfer/lib/typing/mongo"
"go.mongodb.org/mongo-driver/bson"
)

type Debezium string

func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byte) (cdc.Event, error) {
func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var schemaEventPayload SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
Expand Down
14 changes: 6 additions & 8 deletions lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/typing"

"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -113,7 +111,7 @@ func (m *MongoTestSuite) TestMongoDBEventOrder() {
}
`

evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)

schemaEvt, isOk := evt.(*SchemaEventPayload)
Expand Down Expand Up @@ -154,7 +152,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() {
}
`

evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(m.T(), err)
Expand Down Expand Up @@ -223,7 +221,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore_NoData() {
}
}
`
evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
{
// Making sure the `before` payload is set.
Expand Down Expand Up @@ -276,7 +274,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() {
}
}
`
evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
{
// Making sure the `before` payload is set.
Expand Down Expand Up @@ -314,7 +312,7 @@ func (m *MongoTestSuite) TestMongoDBEventCustomerBefore() {
}

func (m *MongoTestSuite) TestGetEventFromBytesTombstone() {
_, err := m.Debezium.GetEventFromBytes(typing.Settings{}, nil)
_, err := m.Debezium.GetEventFromBytes(nil)
assert.ErrorContains(m.T(), err, "empty message")
}

Expand Down Expand Up @@ -504,7 +502,7 @@ func (m *MongoTestSuite) TestMongoDBEventWithSchema() {
}
}
`
evt, err := m.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
schemaEvt, isOk := evt.(*SchemaEventPayload)
assert.True(m.T(), isOk)
Expand Down
3 changes: 1 addition & 2 deletions lib/cdc/relational/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
)

type Debezium string

func (d *Debezium) GetEventFromBytes(_ typing.Settings, bytes []byte) (cdc.Event, error) {
func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var event util.SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
Expand Down
8 changes: 4 additions & 4 deletions lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var validTc = &kafkalib.TopicConfig{
}

func (r *RelationTestSuite) TestGetEventFromBytesTombstone() {
_, err := r.GetEventFromBytes(typing.Settings{}, nil)
_, err := r.GetEventFromBytes(nil)
assert.ErrorContains(r.T(), err, "empty message")
}

Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *RelationTestSuite) TestPostgresEvent() {
}
}
`
evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := r.Debezium.GetEventFromBytes([]byte(payload))
assert.Nil(r.T(), err)
assert.False(r.T(), evt.DeletePayload())

Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
}
}
`
evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := r.Debezium.GetEventFromBytes([]byte(payload))
assert.Nil(r.T(), err)
assert.False(r.T(), evt.DeletePayload())

Expand Down Expand Up @@ -513,7 +513,7 @@ func (r *RelationTestSuite) TestGetEventFromBytes_MySQL() {
"transaction": null
}
}`
evt, err := r.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
evt, err := r.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(r.T(), err)
assert.Equal(r.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime())
assert.Equal(r.T(), "customers", evt.GetTableName())
Expand Down
3 changes: 1 addition & 2 deletions processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
return "", fmt.Errorf("cannot unmarshall key %s: %w", string(p.Msg.Key()), err)
}

typingSettings := cfg.SharedTransferConfig.TypingSettings
_event, err := topicConfig.GetEventFromBytes(typingSettings, p.Msg.Value())
_event, err := topicConfig.GetEventFromBytes(p.Msg.Value())
if err != nil {
tags["what"] = "marshall_value_err"
return "", fmt.Errorf("cannot unmarshall event: %w", err)
Expand Down

0 comments on commit 8fed478

Please sign in to comment.