Skip to content

Commit

Permalink
Move BuildColumnsUpdateFragment to columns
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 13, 2024
1 parent 33f8a1d commit 2401990
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 153 deletions.
88 changes: 88 additions & 0 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -205,3 +206,90 @@ func TestBuildProcessToastColExpression(t *testing.T) {
func TestBuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, BigQueryDialect{}.BuildProcessToastStructColExpression("foo"))
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
type testCase struct {
name string
columns []columns.Column
expectedString string
}

fooBarCols := []string{"foo", "bar"}

var (
lastCaseColTypes []columns.Column
lastCaseEscapeTypes []columns.Column
)
for _, col := range fooBarCols {
column := columns.NewColumn(col, typing.String)
column.ToastColumn = false
}
for _, col := range fooBarCols {
var toastCol bool
if col == "foo" {
toastCol = true
}

column := columns.NewColumn(col, typing.String)
column.ToastColumn = toastCol
}

lastCaseCols := []string{"a1", "b2", "c3"}
for _, lastCaseCol := range lastCaseCols {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseCol == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseCol == "b2" {
toast = true
}

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes = append(lastCaseColTypes, column)
}

lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"}
for _, lastCaseColEsc := range lastCaseColsEsc {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseColEsc == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseColEsc == "b2" {
toast = true
} else if lastCaseColEsc == "start" {
kd = typing.Struct
toast = true
}

column := columns.NewColumn(lastCaseColEsc, kd)
column.ToastColumn = toast
lastCaseEscapeTypes = append(lastCaseEscapeTypes, column)
}

lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

key := `{"key":"__debezium_unavailable_value"}`
testCases := []testCase{
{
name: "struct, string and toast string (bigquery)",
columns: lastCaseColTypes,
expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`",
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"),
},
}

for _, _testCase := range testCases {
actualQuery := columns.BuildColumnsUpdateFragment(_testCase.columns, BigQueryDialect{})
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
56 changes: 56 additions & 0 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func TestRedshiftDialect_QuoteIdentifier(t *testing.T) {
Expand Down Expand Up @@ -210,3 +211,58 @@ func TestBuildProcessToastColExpression(t *testing.T) {
func TestBuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, RedshiftDialect{}.BuildProcessToastStructColExpression("foo"))
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
type testCase struct {
name string
columns []columns.Column
expectedString string
}

fooBarCols := []string{"foo", "bar"}

var (
happyPathCols []columns.Column
lastCaseColTypes []columns.Column
)
for _, col := range fooBarCols {
column := columns.NewColumn(col, typing.String)
column.ToastColumn = false
happyPathCols = append(happyPathCols, column)
}

lastCaseCols := []string{"a1", "b2", "c3"}
for _, lastCaseCol := range lastCaseCols {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseCol == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseCol == "b2" {
toast = true
}

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes = append(lastCaseColTypes, column)
}

testCases := []testCase{
{
name: "happy path",
columns: happyPathCols,
expectedString: `"foo"=cc."foo","bar"=cc."bar"`,
},
{
name: "struct, string and toast string",
columns: lastCaseColTypes,
expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`,
},
}

for _, _testCase := range testCases {
actualQuery := columns.BuildColumnsUpdateFragment(_testCase.columns, RedshiftDialect{})
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
18 changes: 18 additions & 0 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -250,3 +251,20 @@ func TestBuildProcessToastColExpression(t *testing.T) {
func TestBuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, SnowflakeDialect{}.BuildProcessToastStructColExpression("foo"))
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
var stringAndToastCols []columns.Column
for _, col := range []string{"foo", "bar"} {
var toastCol bool
if col == "foo" {
toastCol = true
}

column := columns.NewColumn(col, typing.String)
column.ToastColumn = toastCol
stringAndToastCols = append(stringAndToastCols, column)
}

actualQuery := columns.BuildColumnsUpdateFragment(stringAndToastCols, SnowflakeDialect{})
assert.Equal(t, `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, actualQuery)
}
32 changes: 0 additions & 32 deletions lib/destination/dml/columns.go
Original file line number Diff line number Diff line change
@@ -1,33 +1 @@
package dml

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
// NOTE: This should only be used with valid columns.
func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string {
var cols []string
for _, column := range columns {
colName := dialect.QuoteIdentifier(column.Name())
if column.ToastColumn {
var colValue string
if column.KindDetails == typing.Struct {
colValue = dialect.BuildProcessToastStructColExpression(colName)
} else {
colValue = dialect.BuildProcessToastColExpression(colName)
}
cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue))
} else {
// This is to make it look like: objCol = cc.objCol
cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName))
}
}

return strings.Join(cols, ",")
}
116 changes: 0 additions & 116 deletions lib/destination/dml/columns_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package dml

import (
"fmt"
"testing"

bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
redshiftDialect "github.com/artie-labs/transfer/clients/redshift/dialect"
snowflakeDialect "github.com/artie-labs/transfer/clients/snowflake/dialect"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/stretchr/testify/assert"
Expand All @@ -22,115 +18,3 @@ func TestQuoteColumns(t *testing.T) {
assert.Equal(t, []string{"`a`", "`b`"}, columns.QuoteColumns(cols, bigQueryDialect.BigQueryDialect{}))
assert.Equal(t, []string{`"A"`, `"B"`}, columns.QuoteColumns(cols, snowflakeDialect.SnowflakeDialect{}))
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
type testCase struct {
name string
columns []columns.Column
expectedString string
dialect sql.Dialect
}

fooBarCols := []string{"foo", "bar"}

var (
happyPathCols []columns.Column
stringAndToastCols []columns.Column
lastCaseColTypes []columns.Column
lastCaseEscapeTypes []columns.Column
)
for _, col := range fooBarCols {
column := columns.NewColumn(col, typing.String)
column.ToastColumn = false
happyPathCols = append(happyPathCols, column)
}
for _, col := range fooBarCols {
var toastCol bool
if col == "foo" {
toastCol = true
}

column := columns.NewColumn(col, typing.String)
column.ToastColumn = toastCol
stringAndToastCols = append(stringAndToastCols, column)
}

lastCaseCols := []string{"a1", "b2", "c3"}
for _, lastCaseCol := range lastCaseCols {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseCol == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseCol == "b2" {
toast = true
}

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes = append(lastCaseColTypes, column)
}

lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"}
for _, lastCaseColEsc := range lastCaseColsEsc {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseColEsc == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseColEsc == "b2" {
toast = true
} else if lastCaseColEsc == "start" {
kd = typing.Struct
toast = true
}

column := columns.NewColumn(lastCaseColEsc, kd)
column.ToastColumn = toast
lastCaseEscapeTypes = append(lastCaseEscapeTypes, column)
}

lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

key := `{"key":"__debezium_unavailable_value"}`
testCases := []testCase{
{
name: "happy path",
columns: happyPathCols,
dialect: redshiftDialect.RedshiftDialect{},
expectedString: `"foo"=cc."foo","bar"=cc."bar"`,
},
{
name: "string and toast",
columns: stringAndToastCols,
dialect: snowflakeDialect.SnowflakeDialect{},
expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`,
},
{
name: "struct, string and toast string",
columns: lastCaseColTypes,
dialect: redshiftDialect.RedshiftDialect{},
expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`,
},
{
name: "struct, string and toast string (bigquery)",
columns: lastCaseColTypes,
dialect: bigQueryDialect.BigQueryDialect{},
expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`",
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
dialect: bigQueryDialect.BigQueryDialect{},
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"),
},
}

for _, _testCase := range testCases {
actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect)
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
Loading

0 comments on commit 2401990

Please sign in to comment.