From 79cf4ba478b09b8065fcfb77cb5e2c6a102c9a8c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 22 Nov 2024 14:04:21 -0800 Subject: [PATCH] Escape TOAST. --- clients/shared/merge.go | 2 -- lib/debezium/converters/basic.go | 14 ++++++++++++++ lib/debezium/converters/basic_test.go | 26 ++++++++++++++++++++++++++ lib/debezium/schema.go | 4 ++-- models/event/event.go | 14 -------------- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 2aadf03be..2d4fb20d2 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -151,8 +151,6 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi return fmt.Errorf("failed to generate merge statements: %w", err) } - fmt.Println("mergeStatements", mergeStatements) - if err = destination.ExecStatements(dwh, mergeStatements); err != nil { return fmt.Errorf("failed to execute merge statements: %w", err) } diff --git a/lib/debezium/converters/basic.go b/lib/debezium/converters/basic.go index 60e4ccb97..ff4a9de3f 100644 --- a/lib/debezium/converters/basic.go +++ b/lib/debezium/converters/basic.go @@ -87,3 +87,17 @@ func (Float64) Convert(value any) (any, error) { return nil, fmt.Errorf("unexpected type %T", value) } } + +type Array struct{} + +func (Array) ToKindDetails() typing.KindDetails { + return typing.Array +} + +func (Array) Convert(value any) (any, error) { + if fmt.Sprint(value) == fmt.Sprintf("[%s]", constants.ToastUnavailableValuePlaceholder) { + return constants.ToastUnavailableValuePlaceholder, nil + } + + return value, nil +} diff --git a/lib/debezium/converters/basic_test.go b/lib/debezium/converters/basic_test.go index 95d5d130e..6747b7e59 100644 --- a/lib/debezium/converters/basic_test.go +++ b/lib/debezium/converters/basic_test.go @@ -3,6 +3,8 @@ package converters import ( "testing" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/stretchr/testify/assert" ) @@ -85,3 +87,27 @@ func TestFloat64_Convert(t *testing.T) { } } } + +func TestArray_Convert(t *testing.T) { + { + // Irrelevant data type + value, err := Array{}.Convert([]int{1, 2, 3, 4}) + assert.NoError(t, err) + assert.Equal(t, []int{1, 2, 3, 4}, value) + } + { + // TOASTED data + { + // As []any + value, err := Array{}.Convert([]any{constants.ToastUnavailableValuePlaceholder}) + assert.NoError(t, err) + assert.Equal(t, constants.ToastUnavailableValuePlaceholder, value) + } + { + // As []string + value, err := Array{}.Convert([]string{constants.ToastUnavailableValuePlaceholder}) + assert.NoError(t, err) + assert.Equal(t, constants.ToastUnavailableValuePlaceholder, value) + } + } +} diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 5702cb753..539a5bf5b 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -137,6 +137,8 @@ func (f Field) ToValueConverter() (converters.ValueConverter, error) { } switch f.Type { + case Array: + return converters.Array{}, nil case Double, Float: return converters.Float64{}, nil } @@ -167,8 +169,6 @@ func (f Field) ToKindDetails() (typing.KindDetails, error) { return typing.Struct, nil case Boolean: return typing.Boolean, nil - case Array: - return typing.Array, nil default: return typing.Invalid, fmt.Errorf("unhandled field type %q", f.Type) } diff --git a/models/event/event.go b/models/event/event.go index a56125040..9dc1ba7b6 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -224,22 +224,8 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali toastedCol = true } } - - valArray, isOk := val.([]any) - if isOk { - if len(valArray) == 1 { - if _, isOk = valArray[0].(string); isOk { - if valArray[0] == constants.ToastUnavailableValuePlaceholder { - val = constants.ToastUnavailableValuePlaceholder - toastedCol = true - } - } - } - } } - fmt.Println("toastedCol", toastedCol, "val", val) - if toastedCol { err := inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{ ToastCol: typing.ToPtr(true),