Skip to content

Commit

Permalink
BigQuery - Better handle on Arrays (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 16, 2023
1 parent f44f4a4 commit e412bc4
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 79 deletions.
64 changes: 64 additions & 0 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package bigquery

import (
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/lib/array"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/typing"
)

func CastColVal(colVal interface{}, colKind typing.Column) (string, error) {
if colVal != nil {
switch colKind.KindDetails.Kind {
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
}

switch extTime.NestedKind.Type {
case ext.DateTimeKindType:
colVal = fmt.Sprintf("PARSE_DATETIME('%s', '%v')", RFC3339Format, extTime.String(time.RFC3339Nano))
case ext.DateKindType:
colVal = fmt.Sprintf("PARSE_DATE('%s', '%v')", PostgresDateFormat, extTime.String(ext.Date.Format))
case ext.TimeKindType:
colVal = fmt.Sprintf("PARSE_TIME('%s', '%v')", PostgresTimeFormatNoTZ, extTime.String(ext.PostgresTimeFormatNoTZ))
}
// All the other types do not need string wrapping.
case typing.String.Kind, typing.Struct.Kind:
colVal = stringutil.Wrap(colVal)
colVal = stringutil.LineBreaksToCarriageReturns(fmt.Sprint(colVal))
if colKind.KindDetails == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
colVal = typing.BigQueryJSON(fmt.Sprintf(`{"key": "%s"}`, constants.ToastUnavailableValuePlaceholder))
} else {
// This is how you cast string -> JSON
colVal = fmt.Sprintf("JSON %s", colVal)
}
}
case typing.Array.Kind:
var err error
colVal, err = array.InterfaceToArrayStringEscaped(colVal)
if err != nil {
return "", err
}
}
} else {
if colKind.KindDetails == typing.String {
// BigQuery does not like null as a string for CTEs.
// It throws this error: Value of type INT64 cannot be assigned to column name, which has type STRING
colVal = "''"
} else {
colVal = "null"
}
}

return fmt.Sprint(colVal), nil
}
53 changes: 53 additions & 0 deletions clients/bigquery/cast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package bigquery

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/typing"
)

func TestCastColVal(t *testing.T) {
type _testCase struct {
name string
colVal interface{}
colKind typing.Column

expectedErr error
expectedString string
}

testCases := []_testCase{
{
name: "escaping string",
colVal: "foo",
colKind: typing.Column{KindDetails: typing.String},
expectedString: "'foo'",
},
{
name: "123 as int",
colVal: 123,
colKind: typing.Column{KindDetails: typing.Integer},
expectedString: "123",
},
{
name: "struct",
colVal: `{"hello": "world"}`,
colKind: typing.Column{KindDetails: typing.Struct},
expectedString: `JSON '{"hello": "world"}'`,
},
{
name: "array",
colVal: []int{1, 2, 3, 4, 5},
colKind: typing.Column{KindDetails: typing.Array},
expectedString: `['1','2','3','4','5']`,
},
}

for _, testCase := range testCases {
actualString, actualErr := CastColVal(testCase.colVal, testCase.colKind)
assert.Equal(t, testCase.expectedErr, actualErr, testCase.name)
assert.Equal(t, testCase.expectedString, actualString, testCase.name)
}
}
57 changes: 4 additions & 53 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@ package bigquery

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/dwh/ddl"
"github.com/artie-labs/transfer/lib/dwh/dml"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
)

func merge(tableData *optimization.TableData) (string, error) {
Expand All @@ -32,65 +28,20 @@ func merge(tableData *optimization.TableData) (string, error) {
var rowValues []string
firstRow := true

// TODO - Reduce complexity.
for _, value := range tableData.RowsData() {
var colVals []string
for _, col := range cols {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal := value[col]
if colVal != nil {
switch colKind.KindDetails.Kind {
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
}

switch extTime.NestedKind.Type {
case ext.DateTimeKindType:
colVal = fmt.Sprintf("PARSE_DATETIME('%s', '%v')", RFC3339Format, extTime.String(time.RFC3339Nano))
case ext.DateKindType:
colVal = fmt.Sprintf("PARSE_DATE('%s', '%v')", PostgresDateFormat, extTime.String(ext.Date.Format))
case ext.TimeKindType:
colVal = fmt.Sprintf("PARSE_TIME('%s', '%v')", PostgresTimeFormatNoTZ, extTime.String(ext.PostgresTimeFormatNoTZ))
}
// All the other types do not need string wrapping.
case typing.String.Kind, typing.Struct.Kind:
colVal = stringutil.Wrap(colVal)
colVal = stringutil.LineBreaksToCarriageReturns(fmt.Sprint(colVal))
if colKind.KindDetails == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
colVal = typing.BigQueryJSON(fmt.Sprintf(`{"key": "%s"}`, constants.ToastUnavailableValuePlaceholder))
} else {
// This is how you cast string -> JSON
colVal = fmt.Sprintf("JSON %s", colVal)
}
}
case typing.Array.Kind:
// We need to marshall, so we can escape the strings.
// https://go.dev/play/p/BcCwUSCeTmT
colValBytes, err := json.Marshal(colVal)
if err != nil {
return "", err
}

colVal = stringutil.Wrap(string(colValBytes))
}
} else {
if colKind.KindDetails == typing.String {
// BigQuery does not like null as a string for CTEs.
// It throws this error: Value of type INT64 cannot be assigned to column name, which has type STRING
colVal = "''"
} else {
colVal = "null"
}
colVal, err := CastColVal(value[col], colKind)
if err != nil {
return "", err
}

if firstRow {
colVal = fmt.Sprintf("%v as %s", colVal, col)
}

colVals = append(colVals, fmt.Sprint(colVal))
colVals = append(colVals, colVal)
}

firstRow = false
Expand Down
83 changes: 66 additions & 17 deletions lib/array/strings.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,56 @@
package array

import (
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/artie-labs/transfer/lib/stringutil"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
)

func InterfaceToArrayStringEscaped(val interface{}) (string, error) {
if val == nil {
return "", nil
}

list := reflect.ValueOf(val)
if list.Kind() != reflect.Slice {
return "", fmt.Errorf("wrong data type")
}

var vals []string
for i := 0; i < list.Len(); i++ {
kind := list.Index(i).Kind()
value := list.Index(i).Interface()
var shouldParse bool
if kind == reflect.Interface {
valMap, isOk := value.(map[string]interface{})
if isOk {
value = valMap
}

shouldParse = true
}

if kind == reflect.Map || kind == reflect.Struct || shouldParse {
bytes, err := json.Marshal(value)
if err != nil {
return "", err
}

vals = append(vals, stringutil.Wrap(string(bytes)))
} else {
vals = append(vals, stringutil.Wrap(value))
}
}

return fmt.Sprintf("[%s]", strings.Join(vals, ",")), nil
}

type StringsJoinAddPrefixArgs struct {
Vals []string
Separator string
Expand All @@ -26,39 +69,45 @@ func StringsJoinAddPrefix(args StringsJoinAddPrefixArgs) string {
return strings.Join(retVals, args.Separator)
}

// ColumnsUpdateQuery will take a list of columns + tablePrefix and return
// columnA = tablePrefix.columnA, columnB = tablePrefix.columnB. This is the Update syntax that Snowflake requires
func ColumnsUpdateQuery(columns []string, columnsToTypes typing.Columns, tablePrefix string, bigQueryTypeCasting bool) string {
// TODO - deprecate tablePrefix as an arg (it's redundant).

// NOTE: columns and sflkCols must be the same.
// ColumnsUpdateQuery takes:
// columns - list of columns to iterate
// columnsToTypes - given that list, provide the types (separate list because this list may contain invalid columns
// bigQueryTypeCasting - We'll need to escape the column comparison if the column's a struct.
// It then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
func ColumnsUpdateQuery(columns []string, columnsToTypes typing.Columns, bigQueryTypeCasting bool) string {
var _columns []string
for _, column := range columns {
columnType, isOk := columnsToTypes.GetColumn(column)
if isOk && columnType.ToastColumn {
if columnType.KindDetails == typing.Struct {
if bigQueryTypeCasting {
_columns = append(_columns,
fmt.Sprintf("%s= CASE WHEN TO_JSON_STRING(%s.%s) != %s THEN %s.%s ELSE c.%s END",
column, tablePrefix, column,
fmt.Sprintf(`'{"key": "%s"}'`, constants.ToastUnavailableValuePlaceholder),
tablePrefix, column, column))
fmt.Sprintf(`%s= CASE WHEN TO_JSON_STRING(cc.%s) != '{"key": "%s"}' THEN cc.%s ELSE c.%s END`,
// col CASE when TO_JSON_STRING(cc.col) != { 'key': TOAST_UNAVAILABLE_VALUE }
column, column, constants.ToastUnavailableValuePlaceholder,
// cc.col ELSE c.col END
column, column))
} else {
_columns = append(_columns,
fmt.Sprintf("%s= CASE WHEN %s.%s != {'key': '%s'} THEN %s.%s ELSE c.%s END",
column, tablePrefix, column,
constants.ToastUnavailableValuePlaceholder, tablePrefix, column, column))
fmt.Sprintf("%s= CASE WHEN cc.%s != {'key': '%s'} THEN cc.%s ELSE c.%s END",
// col CASE WHEN cc.col
column, column,
// { 'key': TOAST_UNAVAILABLE_VALUE } THEN cc.col ELSE c.col END",
constants.ToastUnavailableValuePlaceholder, column, column))
}
} else {
// t.column3 = CASE WHEN t.column3 != '__debezium_unavailable_value' THEN t.column3 ELSE s.column3 END
_columns = append(_columns,
fmt.Sprintf("%s= CASE WHEN %s.%s != '%s' THEN %s.%s ELSE c.%s END", column, tablePrefix, column,
constants.ToastUnavailableValuePlaceholder, tablePrefix, column, column))
fmt.Sprintf("%s= CASE WHEN cc.%s != '%s' THEN cc.%s ELSE c.%s END",
// col = CASE WHEN cc.col != TOAST_UNAVAILABLE_VALUE
column, column, constants.ToastUnavailableValuePlaceholder,
// THEN cc.col ELSE c.col END
column, column))
}

} else {
// This is to make it look like: objCol = cc.objCol::variant
_columns = append(_columns, fmt.Sprintf("%s=%s.%s", column, tablePrefix, column))
// This is to make it look like: objCol = cc.objCol
_columns = append(_columns, fmt.Sprintf("%s=cc.%s", column, column))
}

}
Expand Down
Loading

0 comments on commit e412bc4

Please sign in to comment.