diff --git a/lib/debezium/converters/basic.go b/lib/debezium/converters/basic.go index 60e4ccb97..8fa0e9834 100644 --- a/lib/debezium/converters/basic.go +++ b/lib/debezium/converters/basic.go @@ -87,3 +87,24 @@ func (Float64) Convert(value any) (any, error) { return nil, fmt.Errorf("unexpected type %T", value) } } + +type Array struct{} + +func (Array) ToKindDetails() typing.KindDetails { + return typing.Array +} + +func (Array) Convert(value any) (any, error) { + if casted, isOk := value.([]string); isOk { + // Filter TOASTED array + if len(casted) == 1 { + if casted[0] == constants.ToastUnavailableValuePlaceholder { + return constants.ToastUnavailableValuePlaceholder, nil + } + + return casted, nil + } + } + + return value, nil +} diff --git a/lib/debezium/converters/basic_test.go b/lib/debezium/converters/basic_test.go index 95d5d130e..88cb0e0d2 100644 --- a/lib/debezium/converters/basic_test.go +++ b/lib/debezium/converters/basic_test.go @@ -85,3 +85,18 @@ func TestFloat64_Convert(t *testing.T) { } } } + +func TestArray_Convert(t *testing.T) { + { + // Valid + value, err := Array{}.Convert([]int{1, 2, 3}) + assert.NoError(t, err) + assert.Equal(t, []int{1, 2, 3}, value) + } + { + // Filter TOASTED value + value, err := Array{}.Convert([]string{"__debezium_unavailable_value"}) + assert.NoError(t, err) + assert.Equal(t, "__debezium_unavailable_value", value) + } +} diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 5702cb753..b9b4a7db3 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -139,6 +139,8 @@ func (f Field) ToValueConverter() (converters.ValueConverter, error) { switch f.Type { case Double, Float: return converters.Float64{}, nil + case Array: + return converters.Array{}, nil } return nil, nil @@ -167,8 +169,6 @@ func (f Field) ToKindDetails() (typing.KindDetails, error) { return typing.Struct, nil case Boolean: return typing.Boolean, nil - case Array: - return typing.Array, nil default: return typing.Invalid, fmt.Errorf("unhandled field type %q", f.Type) }