Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 21, 2024
1 parent e6ff8b3 commit c9366b4
Showing 1 changed file with 2 additions and 22 deletions.
24 changes: 2 additions & 22 deletions lib/debezium/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type Adapter interface {

type DebeziumTransformer struct {
adapter Adapter
schema debezium.Schema
iter RowsIterator
lightTransformer LightDebeziumTransformer
}
Expand All @@ -45,26 +44,10 @@ func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) {
}

func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *DebeziumTransformer {
fieldConverters := adapter.FieldConverters()
fields := make([]debezium.Field, len(fieldConverters))
for i, fieldConverter := range fieldConverters {
fields[i] = fieldConverter.ValueConverter.ToField(fieldConverter.Name)
}

schema := debezium.Schema{
FieldsObject: []debezium.FieldsObject{{
Fields: fields,
Optional: false,
FieldLabel: debezium.After,
}},
}

lightTransformer := NewLightDebeziumTransformer(adapter.TableName(), adapter.PartitionKeys(), fieldConverters)
return &DebeziumTransformer{
adapter: adapter,
schema: schema,
iter: iter,
lightTransformer: lightTransformer,
lightTransformer: NewLightDebeziumTransformer(adapter.TableName(), adapter.PartitionKeys(), adapter.FieldConverters()),
}
}

Expand Down Expand Up @@ -109,10 +92,7 @@ func (d *DebeziumTransformer) partitionKey(row Row) map[string]any {
}

func (d *DebeziumTransformer) createPayload(row Row) (util.SchemaEventPayload, error) {
return d.lightTransformer.BuildEventPayload(util.Source{
Table: d.lightTransformer.tableName,
TsMs: time.Now().UnixMilli(),
}, nil, row, "r")
return d.lightTransformer.BuildEventPayload(util.Source{Table: d.lightTransformer.tableName, TsMs: time.Now().UnixMilli()}, nil, row, "r")
}

func convertRow(valueConverters map[string]converters.ValueConverter, row Row) (Row, error) {
Expand Down

0 comments on commit c9366b4

Please sign in to comment.