From cdd524f4dab9a20fa841bb174565868b7666c263 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 11 Dec 2024 17:48:10 -0800 Subject: [PATCH] [Debezium] Better support around JSON arrays (#1072) --- lib/debezium/converters/basic.go | 33 +++++++++++++++++++++++++-- lib/debezium/converters/basic_test.go | 7 ++++++ lib/debezium/schema.go | 10 +++++++- lib/debezium/types_test.go | 9 ++++++++ 4 files changed, 56 insertions(+), 3 deletions(-) diff --git a/lib/debezium/converters/basic.go b/lib/debezium/converters/basic.go index ff4a9de3f..b361c5656 100644 --- a/lib/debezium/converters/basic.go +++ b/lib/debezium/converters/basic.go @@ -2,6 +2,7 @@ package converters import ( "encoding/base64" + "encoding/json" "fmt" "github.com/artie-labs/transfer/lib/config/constants" @@ -88,16 +89,44 @@ func (Float64) Convert(value any) (any, error) { } } -type Array struct{} +func NewArray(json bool) Array { + return Array{json: json} +} + +type Array struct { + json bool +} func (Array) ToKindDetails() typing.KindDetails { return typing.Array } -func (Array) Convert(value any) (any, error) { +func (a Array) Convert(value any) (any, error) { if fmt.Sprint(value) == fmt.Sprintf("[%s]", constants.ToastUnavailableValuePlaceholder) { return constants.ToastUnavailableValuePlaceholder, nil } + if a.json { + // Debezium will give us a list of JSON strings. We will then need to convert them to JSON objects. + elements, ok := value.([]any) + if !ok { + return nil, fmt.Errorf("expected []any, got %T", value) + } + + convertedElements := make([]any, len(elements)) + for i, element := range elements { + if castedElement, ok := element.(string); ok { + var obj any + if err := json.Unmarshal([]byte(castedElement), &obj); err != nil { + return nil, err + } + + convertedElements[i] = obj + } + } + + return convertedElements, nil + } + return value, nil } diff --git a/lib/debezium/converters/basic_test.go b/lib/debezium/converters/basic_test.go index 941befe0e..8b65298e7 100644 --- a/lib/debezium/converters/basic_test.go +++ b/lib/debezium/converters/basic_test.go @@ -110,4 +110,11 @@ func TestArray_Convert(t *testing.T) { assert.Equal(t, constants.ToastUnavailableValuePlaceholder, value) } } + { + // Array of JSON objects + value, err := NewArray(true).Convert([]any{"{\"body\": \"they are on to us\", \"sender\": \"pablo\"}"}) + assert.NoError(t, err) + assert.Len(t, value.([]any), 1) + assert.Equal(t, map[string]any{"body": "they are on to us", "sender": "pablo"}, value.([]any)[0]) + } } diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 539a5bf5b..365750020 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -53,6 +53,12 @@ const ( Map FieldType = "map" ) +type Item struct { + Type FieldType `json:"type"` + Optional bool `json:"optional"` + DebeziumType SupportedDebeziumType `json:"name"` +} + type Field struct { Type FieldType `json:"type"` Optional bool `json:"optional"` @@ -60,6 +66,8 @@ type Field struct { FieldName string `json:"field"` DebeziumType SupportedDebeziumType `json:"name"` Parameters map[string]any `json:"parameters"` + // [ItemsMetadata] is only populated if the literal type is an array. + ItemsMetadata Item `json:"items"` } func (f Field) GetScaleAndPrecision() (int32, *int32, error) { @@ -138,7 +146,7 @@ func (f Field) ToValueConverter() (converters.ValueConverter, error) { switch f.Type { case Array: - return converters.Array{}, nil + return converters.NewArray(f.ItemsMetadata.DebeziumType == JSON), nil case Double, Float: return converters.Float64{}, nil } diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index 0aa06ca97..8196057fd 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -172,6 +172,15 @@ func TestField_ParseValue(t *testing.T) { assert.Equal(t, `[[{"foo":"bar"}],[{"hello":"world"},{"dusty":"the mini aussie"}]]`, val) } } + { + // Array + field := Field{Type: Array, ItemsMetadata: Item{DebeziumType: JSON}} + value, err := field.ParseValue([]any{`{"foo": "bar", "foo": "bar"}`, `{"hello": "world"}`}) + assert.NoError(t, err) + assert.Len(t, value.([]any), 2) + assert.Equal(t, map[string]any{"foo": "bar"}, value.([]any)[0]) + assert.Equal(t, map[string]any{"hello": "world"}, value.([]any)[1]) + } { // Int32 value, err := Field{Type: Int32}.ParseValue(float64(3))