Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint. #630

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 15 additions & 46 deletions lib/debezium/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package transformer

import (
"fmt"
"time"

"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/artie-labs/transfer/lib/debezium"
"time"

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/debezium/converters"
Expand All @@ -30,42 +29,25 @@ type Adapter interface {
}

type DebeziumTransformer struct {
adapter Adapter
schema debezium.Schema
iter RowsIterator
valueConverters map[string]converters.ValueConverter
adapter Adapter
iter RowsIterator
lightTransformer LightDebeziumTransformer
}

func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) {
iter, err := adapter.NewIterator()
if err != nil {
return nil, fmt.Errorf("failed to create iterator :%w", err)
}

return NewDebeziumTransformerWithIterator(adapter, iter), nil
}

func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *DebeziumTransformer {
fieldConverters := adapter.FieldConverters()
fields := make([]debezium.Field, len(fieldConverters))
valueConverters := map[string]converters.ValueConverter{}
for i, fieldConverter := range fieldConverters {
fields[i] = fieldConverter.ValueConverter.ToField(fieldConverter.Name)
valueConverters[fieldConverter.Name] = fieldConverter.ValueConverter
}

schema := debezium.Schema{
FieldsObject: []debezium.FieldsObject{{
Fields: fields,
Optional: false,
FieldLabel: debezium.After,
}},
}

return &DebeziumTransformer{
adapter: adapter,
schema: schema,
iter: iter,
valueConverters: valueConverters,
adapter: adapter,
iter: iter,
lightTransformer: NewLightDebeziumTransformer(adapter.TableName(), adapter.PartitionKeys(), adapter.FieldConverters()),
}
}

Expand All @@ -90,8 +72,12 @@ func (d *DebeziumTransformer) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to create Debezium payload: %w", err)
}

// TODO: debezium.FieldsObject is not set
result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), debezium.FieldsObject{}, d.partitionKey(row), &payload))
pk, err := d.lightTransformer.BuildPartitionKey(nil, row)
if err != nil {
return nil, fmt.Errorf("failed to create partition key: %w", err)
}

result = append(result, lib.NewRawMessage(d.adapter.TopicSuffix(), pk.Schema, pk.Payload, &payload))
}

return result, nil
Expand All @@ -106,24 +92,7 @@ func (d *DebeziumTransformer) partitionKey(row Row) map[string]any {
}

func (d *DebeziumTransformer) createPayload(row Row) (util.SchemaEventPayload, error) {
dbzRow, err := convertRow(d.valueConverters, row)
if err != nil {
return util.SchemaEventPayload{}, err
}

payload := util.Payload{
After: dbzRow,
Source: util.Source{
Table: d.adapter.TableName(),
TsMs: time.Now().UnixMilli(),
},
Operation: "r",
}

return util.SchemaEventPayload{
Schema: d.schema,
Payload: payload,
}, nil
return d.lightTransformer.BuildEventPayload(util.Source{Table: d.lightTransformer.tableName, TsMs: time.Now().UnixMilli()}, nil, row, "r")
}

func convertRow(valueConverters map[string]converters.ValueConverter, row Row) (Row, error) {
Expand Down
42 changes: 20 additions & 22 deletions lib/debezium/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestDebeziumTransformer_Next(t *testing.T) {
)
assert.NoError(t, err)
_, err = iterator.Collect(transformer)
assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert row value for key "foo": test error`)
assert.ErrorContains(t, err, `failed to create Debezium payload: failed to convert after row: failed to convert row value for key "foo": test error`)
}
{
// Happy path
Expand All @@ -245,35 +245,33 @@ func TestDebeziumTransformer_Next(t *testing.T) {
rows := results[0]
assert.Len(t, rows, 1)
rawMessage := rows[0]
assert.Equal(t, Row{"foo": "bar", "qux": 12}, rawMessage.PartitionKey())
assert.Equal(t, Row{"foo": "converted-bar", "qux": "converted-12"}, rawMessage.PartitionKey())
assert.Equal(t, "im-a-little-topic-suffix", rawMessage.TopicSuffix())
payload, isOk := rawMessage.Event().(*util.SchemaEventPayload)
assert.True(t, isOk)
payload.Payload.Source.TsMs = 12345 // Modify source time since it'll be ~now
expected := util.SchemaEventPayload(
util.SchemaEventPayload{
Schema: debezium.Schema{
SchemaType: "",
FieldsObject: []debezium.FieldsObject{
{
FieldObjectType: "",
Fields: []debezium.Field{
{FieldName: "foo", Type: "string"},
{FieldName: "qux", Type: "int32"},
{FieldName: "baz", Type: "string"},
},
Optional: false,
FieldLabel: "after",
expected := util.SchemaEventPayload{
Schema: debezium.Schema{
SchemaType: "",
FieldsObject: []debezium.FieldsObject{
{
FieldObjectType: "",
Fields: []debezium.Field{
{FieldName: "foo", Type: "string"},
{FieldName: "qux", Type: "int32"},
{FieldName: "baz", Type: "string"},
},
Optional: false,
FieldLabel: "after",
},
},
Payload: util.Payload{
After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"},
Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"},
Operation: "r",
},
},
)
Payload: util.Payload{
After: map[string]any{"foo": "converted-bar", "qux": "converted-12", "baz": "converted-corge"},
Source: util.Source{Connector: "", TsMs: 12345, Database: "", Schema: "", Table: "im-a-little-table"},
Operation: "r",
},
}
assert.Equal(t, expected, *payload)
}
}
Expand Down
Loading