Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 12, 2024
1 parent ec32b74 commit 5b111f0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
2 changes: 0 additions & 2 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
func encodeStructToJSONString(value any) (string, error) {
fmt.Println("value", value, fmt.Sprintf("type %T", value))
if stringValue, isOk := value.(string); isOk {
if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
Expand All @@ -272,6 +271,5 @@ func encodeStructToJSONString(value any) (string, error) {
return "", fmt.Errorf("failed to marshal value: %w", err)
}

fmt.Println("string(bytes)", string(bytes))
return string(bytes), nil
}
35 changes: 33 additions & 2 deletions lib/debezium/converters/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package converters

import (
"encoding/base64"
"encoding/json"
"fmt"

"github.com/artie-labs/transfer/lib/config/constants"
Expand Down Expand Up @@ -88,16 +89,46 @@ func (Float64) Convert(value any) (any, error) {
}
}

type Array struct{}
func NewArray(json bool) Array {
return Array{json: json}
}

type Array struct {
json bool
}

func (Array) ToKindDetails() typing.KindDetails {
return typing.Array
}

func (Array) Convert(value any) (any, error) {
func (a Array) Convert(value any) (any, error) {
if fmt.Sprint(value) == fmt.Sprintf("[%s]", constants.ToastUnavailableValuePlaceholder) {
return constants.ToastUnavailableValuePlaceholder, nil
}

// Convert value which is an array of []interface{} to array of JSON objects.
if a.json {
// Parse the individual elements
elements, ok := value.([]any)
if !ok {
return nil, fmt.Errorf("expected []interface{}, got %T", value)
}

convertedElements := make([]any, len(elements))
for i, element := range elements {
if castedElement, ok := element.(string); ok {
var obj any
err := json.Unmarshal([]byte(castedElement), &obj)
if err != nil {
return nil, err
}

convertedElements[i] = obj
}
}

return convertedElements, nil
}

return value, nil
}
7 changes: 7 additions & 0 deletions lib/debezium/converters/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,11 @@ func TestArray_Convert(t *testing.T) {
assert.Equal(t, constants.ToastUnavailableValuePlaceholder, value)
}
}
{
// Array of JSON objects
value, err := NewArray(true).Convert([]any{"{\"body\": \"they are on to us\", \"sender\": \"pablo\"}"})
assert.NoError(t, err)
assert.Len(t, value.([]any), 1)
assert.Equal(t, map[string]any{"body": "they are on to us", "sender": "pablo"}, value.([]any)[0])
}
}
10 changes: 9 additions & 1 deletion lib/debezium/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,21 @@ const (
Map FieldType = "map"
)

type Item struct {
Type FieldType `json:"type"`
Optional bool `json:"optional"`
DebeziumType SupportedDebeziumType `json:"name"`
}

type Field struct {
Type FieldType `json:"type"`
Optional bool `json:"optional"`
Default any `json:"default"`
FieldName string `json:"field"`
DebeziumType SupportedDebeziumType `json:"name"`
Parameters map[string]any `json:"parameters"`
// [Items] is only populated if the literal type is an array.
Items Item `json:"items"`
}

func (f Field) GetScaleAndPrecision() (int32, *int32, error) {
Expand Down Expand Up @@ -138,7 +146,7 @@ func (f Field) ToValueConverter() (converters.ValueConverter, error) {

switch f.Type {
case Array:
return converters.Array{}, nil
return converters.NewArray(f.Items.DebeziumType == JSON), nil
case Double, Float:
return converters.Float64{}, nil
}
Expand Down

0 comments on commit 5b111f0

Please sign in to comment.