diff --git a/lib/cdc/format/format.go b/lib/cdc/format/format.go index d8a9b3ffb..76a5b37fb 100644 --- a/lib/cdc/format/format.go +++ b/lib/cdc/format/format.go @@ -9,13 +9,8 @@ import ( "github.com/artie-labs/transfer/lib/logger" ) -var ( - r relational.Debezium - m mongo.Debezium -) - func GetFormatParser(label, topic string) cdc.Format { - for _, validFormat := range []cdc.Format{&r, &m} { + for _, validFormat := range []cdc.Format{relational.Debezium{}, mongo.Debezium{}} { for _, fmtLabel := range validFormat.Labels() { if fmtLabel == label { slog.Info("Loaded CDC Format parser...", diff --git a/lib/cdc/format/format_test.go b/lib/cdc/format/format_test.go index 93e737d7f..9b80b0144 100644 --- a/lib/cdc/format/format_test.go +++ b/lib/cdc/format/format_test.go @@ -7,13 +7,30 @@ import ( "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/cdc/mongo" + "github.com/artie-labs/transfer/lib/cdc/relational" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/typing" ) func TestGetFormatParser(t *testing.T) { - validFormats := []string{constants.DBZPostgresAltFormat, constants.DBZPostgresFormat, constants.DBZMongoFormat} - for _, validFormat := range validFormats { - assert.NotNil(t, GetFormatParser(validFormat, "topicA")) + { + // Relational + for _, format := range []string{constants.DBZPostgresAltFormat, constants.DBZPostgresFormat} { + formatParser := GetFormatParser(format, "topicA") + assert.NotNil(t, formatParser) + + _, err := typing.AssertType[relational.Debezium](formatParser) + assert.NoError(t, err) + } + } + { + // Mongo + formatParser := GetFormatParser(constants.DBZMongoFormat, "topicA") + assert.NotNil(t, formatParser) + + _, err := typing.AssertType[mongo.Debezium](formatParser) + assert.NoError(t, err) } } diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index c5f07b451..8cd4cbcdc 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -18,9 +18,9 @@ import ( "go.mongodb.org/mongo-driver/bson" ) -type Debezium string +type Debezium struct{} -func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { +func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var schemaEventPayload SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") @@ -70,11 +70,11 @@ func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { return &schemaEventPayload, nil } -func (d *Debezium) Labels() []string { +func (Debezium) Labels() []string { return []string{constants.DBZMongoFormat} } -func (d *Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { +func (Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { kvMap, err := debezium.ParsePartitionKey(key, tc.CDCKeyFormat) if err != nil { return nil, err diff --git a/lib/cdc/relational/debezium.go b/lib/cdc/relational/debezium.go index 23dd68c29..b7864bded 100644 --- a/lib/cdc/relational/debezium.go +++ b/lib/cdc/relational/debezium.go @@ -11,9 +11,9 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" ) -type Debezium string +type Debezium struct{} -func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { +func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var event util.SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") @@ -26,7 +26,7 @@ func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { return &event, nil } -func (d *Debezium) Labels() []string { +func (Debezium) Labels() []string { return []string{ constants.DBZPostgresFormat, constants.DBZPostgresAltFormat, @@ -35,6 +35,6 @@ func (d *Debezium) Labels() []string { } } -func (d *Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { +func (Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { return debezium.ParsePartitionKey(key, tc.CDCKeyFormat) }