From e6ff8b313ad132c901f40ab3c3f0d1f29ed28e5a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 21 Dec 2024 00:32:28 -0800 Subject: [PATCH] Checkpoint. --- lib/debezium/transformer/transformer.go | 53 ++++++++------------ lib/debezium/transformer/transformer_test.go | 42 ++++++++-------- 2 files changed, 41 insertions(+), 54 deletions(-) diff --git a/lib/debezium/transformer/transformer.go b/lib/debezium/transformer/transformer.go index caeacc32..08510c34 100644 --- a/lib/debezium/transformer/transformer.go +++ b/lib/debezium/transformer/transformer.go @@ -2,10 +2,9 @@ package transformer import ( "fmt" - "time" - "github.com/artie-labs/transfer/lib/cdc/util" "github.com/artie-labs/transfer/lib/debezium" + "time" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/debezium/converters" @@ -30,10 +29,10 @@ type Adapter interface { } type DebeziumTransformer struct { - adapter Adapter - schema debezium.Schema - iter RowsIterator - valueConverters map[string]converters.ValueConverter + adapter Adapter + schema debezium.Schema + iter RowsIterator + lightTransformer LightDebeziumTransformer } func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) { @@ -41,16 +40,15 @@ func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) { if err != nil { return nil, fmt.Errorf("failed to create iterator :%w", err) } + return NewDebeziumTransformerWithIterator(adapter, iter), nil } func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *DebeziumTransformer { fieldConverters := adapter.FieldConverters() fields := make([]debezium.Field, len(fieldConverters)) - valueConverters := map[string]converters.ValueConverter{} for i, fieldConverter := range fieldConverters { fields[i] = fieldConverter.ValueConverter.ToField(fieldConverter.Name) - valueConverters[fieldConverter.Name] = fieldConverter.ValueConverter } schema := debezium.Schema{ @@ -61,11 +59,12 @@ func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *Deb }}, } + lightTransformer := NewLightDebeziumTransformer(adapter.TableName(), adapter.PartitionKeys(), fieldConverters) return &DebeziumTransformer{ - adapter: adapter, - schema: schema, - iter: iter, - valueConverters: valueConverters, + adapter: adapter, + schema: schema, + iter: iter, + lightTransformer: lightTransformer, } } @@ -90,8 +89,12 @@ func (d *DebeziumTransformer) Next() ([]lib.RawMessage, error) { return nil, fmt.Errorf("failed to create Debezium payload: %w", err) } - // TODO: debezium.FieldsObject is not set - result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), debezium.FieldsObject{}, d.partitionKey(row), &payload)) + pk, err := d.lightTransformer.BuildPartitionKey(nil, row) + if err != nil { + return nil, fmt.Errorf("failed to create partition key: %w", err) + } + + result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), pk.Schema, pk.Payload, &payload)) } return result, nil @@ -106,24 +109,10 @@ func (d *DebeziumTransformer) partitionKey(row Row) map[string]any { } func (d *DebeziumTransformer) createPayload(row Row) (util.SchemaEventPayload, error) { - dbzRow, err := convertRow(d.valueConverters, row) - if err != nil { - return util.SchemaEventPayload{}, err - } - - payload := util.Payload{ - After: dbzRow, - Source: util.Source{ - Table: d.adapter.TableName(), - TsMs: time.Now().UnixMilli(), - }, - Operation: "r", - } - - return util.SchemaEventPayload{ - Schema: d.schema, - Payload: payload, - }, nil + return d.lightTransformer.BuildEventPayload(util.Source{ + Table: d.lightTransformer.tableName, + TsMs: time.Now().UnixMilli(), + }, nil, row, "r") } func convertRow(valueConverters map[string]converters.ValueConverter, row Row) (Row, error) { diff --git a/lib/debezium/transformer/transformer_test.go b/lib/debezium/transformer/transformer_test.go index e0b09dbe..7102b922 100644 --- a/lib/debezium/transformer/transformer_test.go +++ b/lib/debezium/transformer/transformer_test.go @@ -220,7 +220,7 @@ func TestDebeziumTransformer_Next(t *testing.T) { ) assert.NoError(t, err) _, err = iterator.Collect(transformer) - assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert row value for key "foo": test error`) + assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert after row: failed to convert row value for key "foo": test error`) } { // Happy path @@ -245,35 +245,33 @@ func TestDebeziumTransformer_Next(t *testing.T) { rows := results[0] assert.Len(t, rows, 1) rawMessage := rows[0] - assert.Equal(t, Row{"foo": "bar", "qux": 12}, rawMessage.PartitionKey()) + assert.Equal(t, Row{"foo": "converted-bar", "qux": "converted-12"}, rawMessage.PartitionKey()) assert.Equal(t, "im-a-little-topic-suffix", rawMessage.TopicSuffix()) payload, isOk := rawMessage.Event().(*util.SchemaEventPayload) assert.True(t, isOk) payload.Payload.Source.TsMs = 12345 // Modify source time since it'll be ~now - expected := util.SchemaEventPayload( - util.SchemaEventPayload{ - Schema: debezium.Schema{ - SchemaType: "", - FieldsObject: []debezium.FieldsObject{ - { - FieldObjectType: "", - Fields: []debezium.Field{ - {FieldName: "foo", Type: "string"}, - {FieldName: "qux", Type: "int32"}, - {FieldName: "baz", Type: "string"}, - }, - Optional: false, - FieldLabel: "after", + expected := util.SchemaEventPayload{ + Schema: debezium.Schema{ + SchemaType: "", + FieldsObject: []debezium.FieldsObject{ + { + FieldObjectType: "", + Fields: []debezium.Field{ + {FieldName: "foo", Type: "string"}, + {FieldName: "qux", Type: "int32"}, + {FieldName: "baz", Type: "string"}, }, + Optional: false, + FieldLabel: "after", }, }, - Payload: util.Payload{ - After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"}, - Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"}, - Operation: "r", - }, }, - ) + Payload: util.Payload{ + After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"}, + Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"}, + Operation: "r", + }, + } assert.Equal(t, expected, *payload) } }