Skip to content

Commit

Permalink
Merge branch 'master' into minor-improvement-bigquery-cast-col
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 3, 2024
2 parents 323f8d5 + c539bb9 commit e323e52
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 373 deletions.
15 changes: 13 additions & 2 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,23 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
subQuery = fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, temporaryTableName)
}

cols := tableData.ReadOnlyInMemoryCols()

var primaryKeys []columns.Column
for _, primaryKey := range tableData.PrimaryKeys() {
column, ok := cols.GetColumn(primaryKey)
if !ok {
return fmt.Errorf("column for primary key %q does not exist", primaryKey)
}
primaryKeys = append(primaryKeys, column)
}

mergeArg := dml.MergeArgument{
TableID: tableID,
SubQuery: subQuery,
IdempotentKey: tableData.TopicConfig().IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys(),
Columns: tableData.ReadOnlyInMemoryCols(),
PrimaryKeys: primaryKeys,
Columns: cols.ValidColumns(),
SoftDelete: tableData.TopicConfig().SoftDelete,
DestKind: dwh.Label(),
Dialect: dwh.Dialect(),
Expand Down
80 changes: 80 additions & 0 deletions lib/destination/dml/columns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package dml

import (
"fmt"
"slices"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func quoteColumns(cols []columns.Column, dialect sql.Dialect) []string {
result := make([]string, len(cols))
for i, col := range cols {
result[i] = dialect.QuoteIdentifier(col.Name())
}
return result
}

func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) {
origLength := len(cols)
cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker })
return cols, len(cols) != origLength
}

// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
// NOTE: This should only be used with valid columns.
func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string {
var cols []string
for _, column := range columns {
colName := dialect.QuoteIdentifier(column.Name())
if column.ToastColumn {
var colValue string
if column.KindDetails == typing.Struct {
colValue = processToastStructCol(colName, dialect)
} else {
colValue = processToastCol(colName, dialect)
}
cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue))
} else {
// This is to make it look like: objCol = cc.objCol
cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName))
}
}

return strings.Join(cols, ",")
}

func processToastStructCol(colName string, dialect sql.Dialect) string {
switch dialect.(type) {
case sql.BigQueryDialect:
return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder,
colName, colName)
case sql.RedshiftDialect:
return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
case sql.MSSQLDialect:
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
default:
// TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit.
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}
}

func processToastCol(colName string, dialect sql.Dialect) string {
if _, ok := dialect.(sql.MSSQLDialect); ok {
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName,
constants.ToastUnavailableValuePlaceholder, colName, colName)
} else {
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}
}
185 changes: 185 additions & 0 deletions lib/destination/dml/columns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package dml

import (
"fmt"
"testing"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/stretchr/testify/assert"
)

func TestQuoteColumns(t *testing.T) {
assert.Equal(t, []string{}, quoteColumns(nil, sql.BigQueryDialect{}))
assert.Equal(t, []string{}, quoteColumns(nil, sql.SnowflakeDialect{}))

cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)}
assert.Equal(t, []string{"`a`", "`b`"}, quoteColumns(cols, sql.BigQueryDialect{}))
assert.Equal(t, []string{`"A"`, `"B"`}, quoteColumns(cols, sql.SnowflakeDialect{}))
}

func TestRemoveDeleteColumnMarker(t *testing.T) {
col1 := columns.NewColumn("a", typing.Invalid)
col2 := columns.NewColumn("b", typing.Invalid)
col3 := columns.NewColumn("c", typing.Invalid)
deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid)

{
result, removed := removeDeleteColumnMarker([]columns.Column{})
assert.Empty(t, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1})
assert.Equal(t, []columns.Column{col1}, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2})
assert.Equal(t, []columns.Column{col1, col2}, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol})
assert.True(t, removed)
assert.Empty(t, result)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2})
assert.True(t, removed)
assert.Equal(t, []columns.Column{col1, col2}, result)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3})
assert.True(t, removed)
assert.Equal(t, []columns.Column{col1, col2, col3}, result)
}
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
type testCase struct {
name string
columns []columns.Column
expectedString string
dialect sql.Dialect
}

fooBarCols := []string{"foo", "bar"}

var (
happyPathCols []columns.Column
stringAndToastCols []columns.Column
lastCaseColTypes []columns.Column
lastCaseEscapeTypes []columns.Column
)
for _, col := range fooBarCols {
column := columns.NewColumn(col, typing.String)
column.ToastColumn = false
happyPathCols = append(happyPathCols, column)
}
for _, col := range fooBarCols {
var toastCol bool
if col == "foo" {
toastCol = true
}

column := columns.NewColumn(col, typing.String)
column.ToastColumn = toastCol
stringAndToastCols = append(stringAndToastCols, column)
}

lastCaseCols := []string{"a1", "b2", "c3"}
for _, lastCaseCol := range lastCaseCols {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseCol == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseCol == "b2" {
toast = true
}

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes = append(lastCaseColTypes, column)
}

lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"}
for _, lastCaseColEsc := range lastCaseColsEsc {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseColEsc == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseColEsc == "b2" {
toast = true
} else if lastCaseColEsc == "start" {
kd = typing.Struct
toast = true
}

column := columns.NewColumn(lastCaseColEsc, kd)
column.ToastColumn = toast
lastCaseEscapeTypes = append(lastCaseEscapeTypes, column)
}

lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

key := `{"key":"__debezium_unavailable_value"}`
testCases := []testCase{
{
name: "happy path",
columns: happyPathCols,
dialect: sql.RedshiftDialect{},
expectedString: `"foo"=cc."foo","bar"=cc."bar"`,
},
{
name: "string and toast",
columns: stringAndToastCols,
dialect: sql.SnowflakeDialect{},
expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`,
},
{
name: "struct, string and toast string",
columns: lastCaseColTypes,
dialect: sql.RedshiftDialect{},
expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`,
},
{
name: "struct, string and toast string (bigquery)",
columns: lastCaseColTypes,
dialect: sql.BigQueryDialect{},
expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`",
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
dialect: sql.BigQueryDialect{},
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"),
},
}

for _, _testCase := range testCases {
actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect)
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}

func TestProcessToastStructCol(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{}))
}

func TestProcessToastCol(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{}))
}
Loading

0 comments on commit e323e52

Please sign in to comment.