diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index fc304404..828a9060 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -259,7 +259,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto // MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` // Relational will return a string representation of the struct such as `{"hello": "world"}` func encodeStructToJSONString(value any) (string, error) { - fmt.Println("value", value, fmt.Sprintf("type %T", value)) if stringValue, isOk := value.(string); isOk { if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil @@ -272,6 +271,5 @@ func encodeStructToJSONString(value any) (string, error) { return "", fmt.Errorf("failed to marshal value: %w", err) } - fmt.Println("string(bytes)", string(bytes)) return string(bytes), nil } diff --git a/lib/debezium/converters/basic.go b/lib/debezium/converters/basic.go index ff4a9de3..990cc628 100644 --- a/lib/debezium/converters/basic.go +++ b/lib/debezium/converters/basic.go @@ -2,6 +2,7 @@ package converters import ( "encoding/base64" + "encoding/json" "fmt" "github.com/artie-labs/transfer/lib/config/constants" @@ -88,16 +89,46 @@ func (Float64) Convert(value any) (any, error) { } } -type Array struct{} +func NewArray(json bool) Array { + return Array{json: json} +} + +type Array struct { + json bool +} func (Array) ToKindDetails() typing.KindDetails { return typing.Array } -func (Array) Convert(value any) (any, error) { +func (a Array) Convert(value any) (any, error) { if fmt.Sprint(value) == fmt.Sprintf("[%s]", constants.ToastUnavailableValuePlaceholder) { return constants.ToastUnavailableValuePlaceholder, nil } + // Convert value which is an array of []interface{} to array of JSON objects. + if a.json { + // Parse the individual elements + elements, ok := value.([]any) + if !ok { + return nil, fmt.Errorf("expected []interface{}, got %T", value) + } + + convertedElements := make([]any, len(elements)) + for i, element := range elements { + if castedElement, ok := element.(string); ok { + var obj any + err := json.Unmarshal([]byte(castedElement), &obj) + if err != nil { + return nil, err + } + + convertedElements[i] = obj + } + } + + return convertedElements, nil + } + return value, nil } diff --git a/lib/debezium/converters/basic_test.go b/lib/debezium/converters/basic_test.go index 941befe0..8b65298e 100644 --- a/lib/debezium/converters/basic_test.go +++ b/lib/debezium/converters/basic_test.go @@ -110,4 +110,11 @@ func TestArray_Convert(t *testing.T) { assert.Equal(t, constants.ToastUnavailableValuePlaceholder, value) } } + { + // Array of JSON objects + value, err := NewArray(true).Convert([]any{"{\"body\": \"they are on to us\", \"sender\": \"pablo\"}"}) + assert.NoError(t, err) + assert.Len(t, value.([]any), 1) + assert.Equal(t, map[string]any{"body": "they are on to us", "sender": "pablo"}, value.([]any)[0]) + } } diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 539a5bf5..b6a53ed2 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -53,6 +53,12 @@ const ( Map FieldType = "map" ) +type Item struct { + Type FieldType `json:"type"` + Optional bool `json:"optional"` + DebeziumType SupportedDebeziumType `json:"name"` +} + type Field struct { Type FieldType `json:"type"` Optional bool `json:"optional"` @@ -60,6 +66,8 @@ type Field struct { FieldName string `json:"field"` DebeziumType SupportedDebeziumType `json:"name"` Parameters map[string]any `json:"parameters"` + // [Items] is only populated if the literal type is an array. + Items Item `json:"items"` } func (f Field) GetScaleAndPrecision() (int32, *int32, error) { @@ -138,7 +146,7 @@ func (f Field) ToValueConverter() (converters.ValueConverter, error) { switch f.Type { case Array: - return converters.Array{}, nil + return converters.NewArray(f.Items.DebeziumType == JSON), nil case Double, Float: return converters.Float64{}, nil }