Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 21, 2024
1 parent ef85a7a commit e6ff8b3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 54 deletions.
53 changes: 21 additions & 32 deletions lib/debezium/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package transformer

import (
"fmt"
"time"

"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/artie-labs/transfer/lib/debezium"
"time"

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/debezium/converters"
Expand All @@ -30,27 +29,26 @@ type Adapter interface {
}

type DebeziumTransformer struct {
adapter Adapter
schema debezium.Schema
iter RowsIterator
valueConverters map[string]converters.ValueConverter
adapter Adapter
schema debezium.Schema
iter RowsIterator
lightTransformer LightDebeziumTransformer
}

func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) {
iter, err := adapter.NewIterator()
if err != nil {
return nil, fmt.Errorf("failed to create iterator :%w", err)
}

return NewDebeziumTransformerWithIterator(adapter, iter), nil
}

func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *DebeziumTransformer {
fieldConverters := adapter.FieldConverters()
fields := make([]debezium.Field, len(fieldConverters))
valueConverters := map[string]converters.ValueConverter{}
for i, fieldConverter := range fieldConverters {
fields[i] = fieldConverter.ValueConverter.ToField(fieldConverter.Name)
valueConverters[fieldConverter.Name] = fieldConverter.ValueConverter
}

schema := debezium.Schema{
Expand All @@ -61,11 +59,12 @@ func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *Deb
}},
}

lightTransformer := NewLightDebeziumTransformer(adapter.TableName(), adapter.PartitionKeys(), fieldConverters)
return &DebeziumTransformer{
adapter: adapter,
schema: schema,
iter: iter,
valueConverters: valueConverters,
adapter: adapter,
schema: schema,
iter: iter,
lightTransformer: lightTransformer,
}
}

Expand All @@ -90,8 +89,12 @@ func (d *DebeziumTransformer) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to create Debezium payload: %w", err)
}

// TODO: debezium.FieldsObject is not set
result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), debezium.FieldsObject{}, d.partitionKey(row), &payload))
pk, err := d.lightTransformer.BuildPartitionKey(nil, row)
if err != nil {
return nil, fmt.Errorf("failed to create partition key: %w", err)
}

result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), pk.Schema, pk.Payload, &payload))
}

return result, nil
Expand All @@ -106,24 +109,10 @@ func (d *DebeziumTransformer) partitionKey(row Row) map[string]any {
}

func (d *DebeziumTransformer) createPayload(row Row) (util.SchemaEventPayload, error) {
dbzRow, err := convertRow(d.valueConverters, row)
if err != nil {
return util.SchemaEventPayload{}, err
}

payload := util.Payload{
After: dbzRow,
Source: util.Source{
Table: d.adapter.TableName(),
TsMs: time.Now().UnixMilli(),
},
Operation: "r",
}

return util.SchemaEventPayload{
Schema: d.schema,
Payload: payload,
}, nil
return d.lightTransformer.BuildEventPayload(util.Source{
Table: d.lightTransformer.tableName,
TsMs: time.Now().UnixMilli(),
}, nil, row, "r")
}

func convertRow(valueConverters map[string]converters.ValueConverter, row Row) (Row, error) {
Expand Down
42 changes: 20 additions & 22 deletions lib/debezium/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestDebeziumTransformer_Next(t *testing.T) {
)
assert.NoError(t, err)
_, err = iterator.Collect(transformer)
assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert row value for key "foo": test error`)
assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert after row: failed to convert row value for key "foo": test error`)
}
{
// Happy path
Expand All @@ -245,35 +245,33 @@ func TestDebeziumTransformer_Next(t *testing.T) {
rows := results[0]
assert.Len(t, rows, 1)
rawMessage := rows[0]
assert.Equal(t, Row{"foo": "bar", "qux": 12}, rawMessage.PartitionKey())
assert.Equal(t, Row{"foo": "converted-bar", "qux": "converted-12"}, rawMessage.PartitionKey())
assert.Equal(t, "im-a-little-topic-suffix", rawMessage.TopicSuffix())
payload, isOk := rawMessage.Event().(*util.SchemaEventPayload)
assert.True(t, isOk)
payload.Payload.Source.TsMs = 12345 // Modify source time since it'll be ~now
expected := util.SchemaEventPayload(
util.SchemaEventPayload{
Schema: debezium.Schema{
SchemaType: "",
FieldsObject: []debezium.FieldsObject{
{
FieldObjectType: "",
Fields: []debezium.Field{
{FieldName: "foo", Type: "string"},
{FieldName: "qux", Type: "int32"},
{FieldName: "baz", Type: "string"},
},
Optional: false,
FieldLabel: "after",
expected := util.SchemaEventPayload{
Schema: debezium.Schema{
SchemaType: "",
FieldsObject: []debezium.FieldsObject{
{
FieldObjectType: "",
Fields: []debezium.Field{
{FieldName: "foo", Type: "string"},
{FieldName: "qux", Type: "int32"},
{FieldName: "baz", Type: "string"},
},
Optional: false,
FieldLabel: "after",
},
},
Payload: util.Payload{
After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"},
Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"},
Operation: "r",
},
},
)
Payload: util.Payload{
After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"},
Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"},
Operation: "r",
},
}
assert.Equal(t, expected, *payload)
}
}
Expand Down

0 comments on commit e6ff8b3

Please sign in to comment.