diff --git a/lib/debezium/transformer/transformer.go b/lib/debezium/transformer/transformer.go index caeacc32..5696ea80 100644 --- a/lib/debezium/transformer/transformer.go +++ b/lib/debezium/transformer/transformer.go @@ -2,10 +2,9 @@ package transformer import ( "fmt" - "time" - "github.com/artie-labs/transfer/lib/cdc/util" "github.com/artie-labs/transfer/lib/debezium" + "time" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/debezium/converters" @@ -30,10 +29,9 @@ type Adapter interface { } type DebeziumTransformer struct { - adapter Adapter - schema debezium.Schema - iter RowsIterator - valueConverters map[string]converters.ValueConverter + adapter Adapter + iter RowsIterator + lightTransformer LightDebeziumTransformer } func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) { @@ -41,31 +39,15 @@ func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) { if err != nil { return nil, fmt.Errorf("failed to create iterator :%w", err) } + return NewDebeziumTransformerWithIterator(adapter, iter), nil } func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *DebeziumTransformer { - fieldConverters := adapter.FieldConverters() - fields := make([]debezium.Field, len(fieldConverters)) - valueConverters := map[string]converters.ValueConverter{} - for i, fieldConverter := range fieldConverters { - fields[i] = fieldConverter.ValueConverter.ToField(fieldConverter.Name) - valueConverters[fieldConverter.Name] = fieldConverter.ValueConverter - } - - schema := debezium.Schema{ - FieldsObject: []debezium.FieldsObject{{ - Fields: fields, - Optional: false, - FieldLabel: debezium.After, - }}, - } - return &DebeziumTransformer{ - adapter: adapter, - schema: schema, - iter: iter, - valueConverters: valueConverters, + adapter: adapter, + iter: iter, + lightTransformer: NewLightDebeziumTransformer(adapter.TableName(), adapter.PartitionKeys(), adapter.FieldConverters()), } } @@ -90,8 +72,12 @@ func (d *DebeziumTransformer) Next() ([]lib.RawMessage, error) { return nil, fmt.Errorf("failed to create Debezium payload: %w", err) } - // TODO: debezium.FieldsObject is not set - result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), debezium.FieldsObject{}, d.partitionKey(row), &payload)) + pk, err := d.lightTransformer.BuildPartitionKey(nil, row) + if err != nil { + return nil, fmt.Errorf("failed to create partition key: %w", err) + } + + result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), pk.Schema, pk.Payload, &payload)) } return result, nil @@ -106,24 +92,7 @@ func (d *DebeziumTransformer) partitionKey(row Row) map[string]any { } func (d *DebeziumTransformer) createPayload(row Row) (util.SchemaEventPayload, error) { - dbzRow, err := convertRow(d.valueConverters, row) - if err != nil { - return util.SchemaEventPayload{}, err - } - - payload := util.Payload{ - After: dbzRow, - Source: util.Source{ - Table: d.adapter.TableName(), - TsMs: time.Now().UnixMilli(), - }, - Operation: "r", - } - - return util.SchemaEventPayload{ - Schema: d.schema, - Payload: payload, - }, nil + return d.lightTransformer.BuildEventPayload(util.Source{Table: d.lightTransformer.tableName, TsMs: time.Now().UnixMilli()}, nil, row, "r") } func convertRow(valueConverters map[string]converters.ValueConverter, row Row) (Row, error) { diff --git a/lib/debezium/transformer/transformer_test.go b/lib/debezium/transformer/transformer_test.go index e0b09dbe..7102b922 100644 --- a/lib/debezium/transformer/transformer_test.go +++ b/lib/debezium/transformer/transformer_test.go @@ -220,7 +220,7 @@ func TestDebeziumTransformer_Next(t *testing.T) { ) assert.NoError(t, err) _, err = iterator.Collect(transformer) - assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert row value for key "foo": test error`) + assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert after row: failed to convert row value for key "foo": test error`) } { // Happy path @@ -245,35 +245,33 @@ func TestDebeziumTransformer_Next(t *testing.T) { rows := results[0] assert.Len(t, rows, 1) rawMessage := rows[0] - assert.Equal(t, Row{"foo": "bar", "qux": 12}, rawMessage.PartitionKey()) + assert.Equal(t, Row{"foo": "converted-bar", "qux": "converted-12"}, rawMessage.PartitionKey()) assert.Equal(t, "im-a-little-topic-suffix", rawMessage.TopicSuffix()) payload, isOk := rawMessage.Event().(*util.SchemaEventPayload) assert.True(t, isOk) payload.Payload.Source.TsMs = 12345 // Modify source time since it'll be ~now - expected := util.SchemaEventPayload( - util.SchemaEventPayload{ - Schema: debezium.Schema{ - SchemaType: "", - FieldsObject: []debezium.FieldsObject{ - { - FieldObjectType: "", - Fields: []debezium.Field{ - {FieldName: "foo", Type: "string"}, - {FieldName: "qux", Type: "int32"}, - {FieldName: "baz", Type: "string"}, - }, - Optional: false, - FieldLabel: "after", + expected := util.SchemaEventPayload{ + Schema: debezium.Schema{ + SchemaType: "", + FieldsObject: []debezium.FieldsObject{ + { + FieldObjectType: "", + Fields: []debezium.Field{ + {FieldName: "foo", Type: "string"}, + {FieldName: "qux", Type: "int32"}, + {FieldName: "baz", Type: "string"}, }, + Optional: false, + FieldLabel: "after", }, }, - Payload: util.Payload{ - After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"}, - Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"}, - Operation: "r", - }, }, - ) + Payload: util.Payload{ + After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"}, + Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"}, + Operation: "r", + }, + } assert.Equal(t, expected, *payload) } }