From e412bc42c591761fb555015be2845adb5e53f93f Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 15 May 2023 19:09:27 -0700 Subject: [PATCH] BigQuery - Better handle on Arrays (#104) --- clients/bigquery/cast.go | 64 +++++++++++++++++++++++++++ clients/bigquery/cast_test.go | 53 ++++++++++++++++++++++ clients/bigquery/merge.go | 57 ++---------------------- lib/array/strings.go | 83 ++++++++++++++++++++++++++++------- lib/array/strings_test.go | 75 ++++++++++++++++++++++++++++--- lib/dwh/ddl/ddl.go | 2 +- lib/dwh/dml/merge.go | 4 +- 7 files changed, 259 insertions(+), 79 deletions(-) create mode 100644 clients/bigquery/cast.go create mode 100644 clients/bigquery/cast_test.go diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go new file mode 100644 index 000000000..0fe5aeedc --- /dev/null +++ b/clients/bigquery/cast.go @@ -0,0 +1,64 @@ +package bigquery + +import ( + "fmt" + "strings" + "time" + + "github.com/artie-labs/transfer/lib/array" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/stringutil" + "github.com/artie-labs/transfer/lib/typing/ext" + + "github.com/artie-labs/transfer/lib/typing" +) + +func CastColVal(colVal interface{}, colKind typing.Column) (string, error) { + if colVal != nil { + switch colKind.KindDetails.Kind { + case typing.ETime.Kind: + extTime, err := ext.ParseFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err) + } + + switch extTime.NestedKind.Type { + case ext.DateTimeKindType: + colVal = fmt.Sprintf("PARSE_DATETIME('%s', '%v')", RFC3339Format, extTime.String(time.RFC3339Nano)) + case ext.DateKindType: + colVal = fmt.Sprintf("PARSE_DATE('%s', '%v')", PostgresDateFormat, extTime.String(ext.Date.Format)) + case ext.TimeKindType: + colVal = fmt.Sprintf("PARSE_TIME('%s', '%v')", PostgresTimeFormatNoTZ, extTime.String(ext.PostgresTimeFormatNoTZ)) + } + // All the other types do not need string wrapping. + case typing.String.Kind, typing.Struct.Kind: + colVal = stringutil.Wrap(colVal) + colVal = stringutil.LineBreaksToCarriageReturns(fmt.Sprint(colVal)) + if colKind.KindDetails == typing.Struct { + if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { + colVal = typing.BigQueryJSON(fmt.Sprintf(`{"key": "%s"}`, constants.ToastUnavailableValuePlaceholder)) + } else { + // This is how you cast string -> JSON + colVal = fmt.Sprintf("JSON %s", colVal) + } + } + case typing.Array.Kind: + var err error + colVal, err = array.InterfaceToArrayStringEscaped(colVal) + if err != nil { + return "", err + } + } + } else { + if colKind.KindDetails == typing.String { + // BigQuery does not like null as a string for CTEs. + // It throws this error: Value of type INT64 cannot be assigned to column name, which has type STRING + colVal = "''" + } else { + colVal = "null" + } + } + + return fmt.Sprint(colVal), nil +} diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go new file mode 100644 index 000000000..79e97c6fc --- /dev/null +++ b/clients/bigquery/cast_test.go @@ -0,0 +1,53 @@ +package bigquery + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/typing" +) + +func TestCastColVal(t *testing.T) { + type _testCase struct { + name string + colVal interface{} + colKind typing.Column + + expectedErr error + expectedString string + } + + testCases := []_testCase{ + { + name: "escaping string", + colVal: "foo", + colKind: typing.Column{KindDetails: typing.String}, + expectedString: "'foo'", + }, + { + name: "123 as int", + colVal: 123, + colKind: typing.Column{KindDetails: typing.Integer}, + expectedString: "123", + }, + { + name: "struct", + colVal: `{"hello": "world"}`, + colKind: typing.Column{KindDetails: typing.Struct}, + expectedString: `JSON '{"hello": "world"}'`, + }, + { + name: "array", + colVal: []int{1, 2, 3, 4, 5}, + colKind: typing.Column{KindDetails: typing.Array}, + expectedString: `['1','2','3','4','5']`, + }, + } + + for _, testCase := range testCases { + actualString, actualErr := CastColVal(testCase.colVal, testCase.colKind) + assert.Equal(t, testCase.expectedErr, actualErr, testCase.name) + assert.Equal(t, testCase.expectedString, actualString, testCase.name) + } +} diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index 4f55213b1..de96df676 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -2,19 +2,15 @@ package bigquery import ( "context" - "encoding/json" "fmt" "strings" - "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/dwh/ddl" "github.com/artie-labs/transfer/lib/dwh/dml" "github.com/artie-labs/transfer/lib/logger" "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/ext" ) func merge(tableData *optimization.TableData) (string, error) { @@ -32,65 +28,20 @@ func merge(tableData *optimization.TableData) (string, error) { var rowValues []string firstRow := true - // TODO - Reduce complexity. for _, value := range tableData.RowsData() { var colVals []string for _, col := range cols { colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col) - colVal := value[col] - if colVal != nil { - switch colKind.KindDetails.Kind { - case typing.ETime.Kind: - extTime, err := ext.ParseFromInterface(colVal) - if err != nil { - return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err) - } - - switch extTime.NestedKind.Type { - case ext.DateTimeKindType: - colVal = fmt.Sprintf("PARSE_DATETIME('%s', '%v')", RFC3339Format, extTime.String(time.RFC3339Nano)) - case ext.DateKindType: - colVal = fmt.Sprintf("PARSE_DATE('%s', '%v')", PostgresDateFormat, extTime.String(ext.Date.Format)) - case ext.TimeKindType: - colVal = fmt.Sprintf("PARSE_TIME('%s', '%v')", PostgresTimeFormatNoTZ, extTime.String(ext.PostgresTimeFormatNoTZ)) - } - // All the other types do not need string wrapping. - case typing.String.Kind, typing.Struct.Kind: - colVal = stringutil.Wrap(colVal) - colVal = stringutil.LineBreaksToCarriageReturns(fmt.Sprint(colVal)) - if colKind.KindDetails == typing.Struct { - if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { - colVal = typing.BigQueryJSON(fmt.Sprintf(`{"key": "%s"}`, constants.ToastUnavailableValuePlaceholder)) - } else { - // This is how you cast string -> JSON - colVal = fmt.Sprintf("JSON %s", colVal) - } - } - case typing.Array.Kind: - // We need to marshall, so we can escape the strings. - // https://go.dev/play/p/BcCwUSCeTmT - colValBytes, err := json.Marshal(colVal) - if err != nil { - return "", err - } - - colVal = stringutil.Wrap(string(colValBytes)) - } - } else { - if colKind.KindDetails == typing.String { - // BigQuery does not like null as a string for CTEs. - // It throws this error: Value of type INT64 cannot be assigned to column name, which has type STRING - colVal = "''" - } else { - colVal = "null" - } + colVal, err := CastColVal(value[col], colKind) + if err != nil { + return "", err } if firstRow { colVal = fmt.Sprintf("%v as %s", colVal, col) } - colVals = append(colVals, fmt.Sprint(colVal)) + colVals = append(colVals, colVal) } firstRow = false diff --git a/lib/array/strings.go b/lib/array/strings.go index 360193368..7f661129c 100644 --- a/lib/array/strings.go +++ b/lib/array/strings.go @@ -1,13 +1,56 @@ package array import ( + "encoding/json" "fmt" + "reflect" "strings" + "github.com/artie-labs/transfer/lib/stringutil" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/typing" ) +func InterfaceToArrayStringEscaped(val interface{}) (string, error) { + if val == nil { + return "", nil + } + + list := reflect.ValueOf(val) + if list.Kind() != reflect.Slice { + return "", fmt.Errorf("wrong data type") + } + + var vals []string + for i := 0; i < list.Len(); i++ { + kind := list.Index(i).Kind() + value := list.Index(i).Interface() + var shouldParse bool + if kind == reflect.Interface { + valMap, isOk := value.(map[string]interface{}) + if isOk { + value = valMap + } + + shouldParse = true + } + + if kind == reflect.Map || kind == reflect.Struct || shouldParse { + bytes, err := json.Marshal(value) + if err != nil { + return "", err + } + + vals = append(vals, stringutil.Wrap(string(bytes))) + } else { + vals = append(vals, stringutil.Wrap(value)) + } + } + + return fmt.Sprintf("[%s]", strings.Join(vals, ",")), nil +} + type StringsJoinAddPrefixArgs struct { Vals []string Separator string @@ -26,12 +69,12 @@ func StringsJoinAddPrefix(args StringsJoinAddPrefixArgs) string { return strings.Join(retVals, args.Separator) } -// ColumnsUpdateQuery will take a list of columns + tablePrefix and return -// columnA = tablePrefix.columnA, columnB = tablePrefix.columnB. This is the Update syntax that Snowflake requires -func ColumnsUpdateQuery(columns []string, columnsToTypes typing.Columns, tablePrefix string, bigQueryTypeCasting bool) string { - // TODO - deprecate tablePrefix as an arg (it's redundant). - - // NOTE: columns and sflkCols must be the same. +// ColumnsUpdateQuery takes: +// columns - list of columns to iterate +// columnsToTypes - given that list, provide the types (separate list because this list may contain invalid columns +// bigQueryTypeCasting - We'll need to escape the column comparison if the column's a struct. +// It then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email +func ColumnsUpdateQuery(columns []string, columnsToTypes typing.Columns, bigQueryTypeCasting bool) string { var _columns []string for _, column := range columns { columnType, isOk := columnsToTypes.GetColumn(column) @@ -39,26 +82,32 @@ func ColumnsUpdateQuery(columns []string, columnsToTypes typing.Columns, tablePr if columnType.KindDetails == typing.Struct { if bigQueryTypeCasting { _columns = append(_columns, - fmt.Sprintf("%s= CASE WHEN TO_JSON_STRING(%s.%s) != %s THEN %s.%s ELSE c.%s END", - column, tablePrefix, column, - fmt.Sprintf(`'{"key": "%s"}'`, constants.ToastUnavailableValuePlaceholder), - tablePrefix, column, column)) + fmt.Sprintf(`%s= CASE WHEN TO_JSON_STRING(cc.%s) != '{"key": "%s"}' THEN cc.%s ELSE c.%s END`, + // col CASE when TO_JSON_STRING(cc.col) != { 'key': TOAST_UNAVAILABLE_VALUE } + column, column, constants.ToastUnavailableValuePlaceholder, + // cc.col ELSE c.col END + column, column)) } else { _columns = append(_columns, - fmt.Sprintf("%s= CASE WHEN %s.%s != {'key': '%s'} THEN %s.%s ELSE c.%s END", - column, tablePrefix, column, - constants.ToastUnavailableValuePlaceholder, tablePrefix, column, column)) + fmt.Sprintf("%s= CASE WHEN cc.%s != {'key': '%s'} THEN cc.%s ELSE c.%s END", + // col CASE WHEN cc.col + column, column, + // { 'key': TOAST_UNAVAILABLE_VALUE } THEN cc.col ELSE c.col END", + constants.ToastUnavailableValuePlaceholder, column, column)) } } else { // t.column3 = CASE WHEN t.column3 != '__debezium_unavailable_value' THEN t.column3 ELSE s.column3 END _columns = append(_columns, - fmt.Sprintf("%s= CASE WHEN %s.%s != '%s' THEN %s.%s ELSE c.%s END", column, tablePrefix, column, - constants.ToastUnavailableValuePlaceholder, tablePrefix, column, column)) + fmt.Sprintf("%s= CASE WHEN cc.%s != '%s' THEN cc.%s ELSE c.%s END", + // col = CASE WHEN cc.col != TOAST_UNAVAILABLE_VALUE + column, column, constants.ToastUnavailableValuePlaceholder, + // THEN cc.col ELSE c.col END + column, column)) } } else { - // This is to make it look like: objCol = cc.objCol::variant - _columns = append(_columns, fmt.Sprintf("%s=%s.%s", column, tablePrefix, column)) + // This is to make it look like: objCol = cc.objCol + _columns = append(_columns, fmt.Sprintf("%s=cc.%s", column, column)) } } diff --git a/lib/array/strings_test.go b/lib/array/strings_test.go index 445a54329..2e8687070 100644 --- a/lib/array/strings_test.go +++ b/lib/array/strings_test.go @@ -10,12 +10,79 @@ import ( "github.com/stretchr/testify/assert" ) +func TestToArrayString(t *testing.T) { + type _testCase struct { + name string + val interface{} + + expectedList string + expectedErr error + } + + testCases := []_testCase{ + { + name: "nil", + }, + { + name: "wrong data type", + val: true, + expectedList: "", + expectedErr: fmt.Errorf("wrong data type"), + }, + { + name: "list of numbers", + val: []int{1, 2, 3, 4, 5}, + expectedList: "['1','2','3','4','5']", + }, + { + name: "list of strings", + val: []string{"abc", "def", "ghi"}, + expectedList: "['abc','def','ghi']", + }, + { + name: "list of bools", + val: []bool{true, false, true}, + expectedList: "['true','false','true']", + }, + { + name: "array of nested objects", + val: []map[string]interface{}{ + { + "foo": "bar", + }, + { + "hello": "world", + }, + }, + expectedList: `['{"foo":"bar"}','{"hello":"world"}']`, + }, + { + name: "array of nested lists", + val: [][]string{ + { + "foo", "bar", + }, + { + "abc", "def", + }, + }, + expectedList: `['[foo bar]','[abc def]']`, + }, + } + + for _, testCase := range testCases { + actualString, actualErr := InterfaceToArrayStringEscaped(testCase.val) + assert.Equal(t, testCase.expectedList, actualString, testCase.name) + assert.Equal(t, testCase.expectedErr, actualErr, testCase.name) + } + +} + func TestColumnsUpdateQuery(t *testing.T) { type testCase struct { name string columns []string columnsToTypes typing.Columns - tablePrefix string expectedString string bigQuery bool } @@ -72,35 +139,31 @@ func TestColumnsUpdateQuery(t *testing.T) { name: "happy path", columns: fooBarCols, columnsToTypes: happyPathCols, - tablePrefix: "cc", expectedString: "foo=cc.foo,bar=cc.bar", }, { name: "string and toast", columns: fooBarCols, columnsToTypes: stringAndToastCols, - tablePrefix: "cc", expectedString: "foo= CASE WHEN cc.foo != '__debezium_unavailable_value' THEN cc.foo ELSE c.foo END,bar=cc.bar", }, { name: "struct, string and toast string", columns: lastCaseCols, columnsToTypes: lastCaseColTypes, - tablePrefix: "cc", expectedString: "a1= CASE WHEN cc.a1 != {'key': '__debezium_unavailable_value'} THEN cc.a1 ELSE c.a1 END,b2= CASE WHEN cc.b2 != '__debezium_unavailable_value' THEN cc.b2 ELSE c.b2 END,c3=cc.c3", }, { name: "struct, string and toast string (bigquery)", columns: lastCaseCols, columnsToTypes: lastCaseColTypes, - tablePrefix: "cc", bigQuery: true, expectedString: `a1= CASE WHEN TO_JSON_STRING(cc.a1) != '{"key": "__debezium_unavailable_value"}' THEN cc.a1 ELSE c.a1 END,b2= CASE WHEN cc.b2 != '__debezium_unavailable_value' THEN cc.b2 ELSE c.b2 END,c3=cc.c3`, }, } for _, _testCase := range testCases { - actualQuery := ColumnsUpdateQuery(_testCase.columns, _testCase.columnsToTypes, _testCase.tablePrefix, _testCase.bigQuery) + actualQuery := ColumnsUpdateQuery(_testCase.columns, _testCase.columnsToTypes, _testCase.bigQuery) assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } diff --git a/lib/dwh/ddl/ddl.go b/lib/dwh/ddl/ddl.go index 5ed60e494..e490c14eb 100644 --- a/lib/dwh/ddl/ddl.go +++ b/lib/dwh/ddl/ddl.go @@ -16,7 +16,7 @@ type AlterTableArgs struct { Tc *types.DwhTableConfig FqTableName string CreateTable bool - ColumnOp constants.ColumnOperation + ColumnOp constants.ColumnOperation CdcTime time.Time } diff --git a/lib/dwh/dml/merge.go b/lib/dwh/dml/merge.go index 477409c59..4c96031b1 100644 --- a/lib/dwh/dml/merge.go +++ b/lib/dwh/dml/merge.go @@ -69,7 +69,7 @@ func MergeStatement(m MergeArgument) (string, error) { ); `, m.FqTableName, m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, "cc", m.BigQueryTypeCasting), + idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.BigQueryTypeCasting), // Insert constants.DeleteColumnMarker, strings.Join(m.Columns, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -110,7 +110,7 @@ func MergeStatement(m MergeArgument) (string, error) { // Delete constants.DeleteColumnMarker, // Update - constants.DeleteColumnMarker, idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, "cc", m.BigQueryTypeCasting), + constants.DeleteColumnMarker, idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.BigQueryTypeCasting), // Insert constants.DeleteColumnMarker, strings.Join(m.Columns, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{