Skip to content

Commit

Permalink
Move BuildColumnsUpdateFragment to columns (#632)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored May 13, 2024
1 parent 33f8a1d commit 533e144
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 173 deletions.
74 changes: 74 additions & 0 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -205,3 +206,76 @@ func TestBuildProcessToastColExpression(t *testing.T) {
func TestBuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, BigQueryDialect{}.BuildProcessToastStructColExpression("foo"))
}

func TestQuoteColumns(t *testing.T) {
assert.Equal(t, []string{}, columns.QuoteColumns(nil, BigQueryDialect{}))
cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)}
assert.Equal(t, []string{"`a`", "`b`"}, columns.QuoteColumns(cols, BigQueryDialect{}))
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
var lastCaseColTypes []columns.Column
lastCaseCols := []string{"a1", "b2", "c3"}
for _, lastCaseCol := range lastCaseCols {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseCol == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseCol == "b2" {
toast = true
}

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes = append(lastCaseColTypes, column)
}

var lastCaseEscapeTypes []columns.Column
lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"}
for _, lastCaseColEsc := range lastCaseColsEsc {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseColEsc == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseColEsc == "b2" {
toast = true
} else if lastCaseColEsc == "start" {
kd = typing.Struct
toast = true
}

column := columns.NewColumn(lastCaseColEsc, kd)
column.ToastColumn = toast
lastCaseEscapeTypes = append(lastCaseEscapeTypes, column)
}

lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

key := `{"key":"__debezium_unavailable_value"}`
testCases := []struct {
name string
columns []columns.Column
expectedString string
}{
{
name: "struct, string and toast string (bigquery)",
columns: lastCaseColTypes,
expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`",
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"),
},
}

for _, _testCase := range testCases {
actualQuery := columns.BuildColumnsUpdateFragment(_testCase.columns, BigQueryDialect{})
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
50 changes: 50 additions & 0 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func TestRedshiftDialect_QuoteIdentifier(t *testing.T) {
Expand Down Expand Up @@ -210,3 +211,52 @@ func TestBuildProcessToastColExpression(t *testing.T) {
func TestBuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, RedshiftDialect{}.BuildProcessToastStructColExpression("foo"))
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
var happyPathCols []columns.Column
for _, col := range []string{"foo", "bar"} {
column := columns.NewColumn(col, typing.String)
column.ToastColumn = false
happyPathCols = append(happyPathCols, column)
}

var lastCaseColTypes []columns.Column
lastCaseCols := []string{"a1", "b2", "c3"}
for _, lastCaseCol := range lastCaseCols {
kd := typing.String
var toast bool
// a1 - struct + toast, b2 - string + toast, c3 = regular string.
if lastCaseCol == "a1" {
kd = typing.Struct
toast = true
} else if lastCaseCol == "b2" {
toast = true
}

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes = append(lastCaseColTypes, column)
}

testCases := []struct {
name string
columns []columns.Column
expectedString string
}{
{
name: "happy path",
columns: happyPathCols,
expectedString: `"foo"=cc."foo","bar"=cc."bar"`,
},
{
name: "struct, string and toast string",
columns: lastCaseColTypes,
expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`,
},
}

for _, _testCase := range testCases {
actualQuery := columns.BuildColumnsUpdateFragment(_testCase.columns, RedshiftDialect{})
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
24 changes: 24 additions & 0 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -250,3 +251,26 @@ func TestBuildProcessToastColExpression(t *testing.T) {
func TestBuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, SnowflakeDialect{}.BuildProcessToastStructColExpression("foo"))
}

func TestQuoteColumns(t *testing.T) {
assert.Equal(t, []string{}, columns.QuoteColumns(nil, SnowflakeDialect{}))
cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)}
assert.Equal(t, []string{`"A"`, `"B"`}, columns.QuoteColumns(cols, SnowflakeDialect{}))
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
var stringAndToastCols []columns.Column
for _, col := range []string{"foo", "bar"} {
var toastCol bool
if col == "foo" {
toastCol = true
}

column := columns.NewColumn(col, typing.String)
column.ToastColumn = toastCol
stringAndToastCols = append(stringAndToastCols, column)
}

actualQuery := columns.BuildColumnsUpdateFragment(stringAndToastCols, SnowflakeDialect{})
assert.Equal(t, `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, actualQuery)
}
32 changes: 0 additions & 32 deletions lib/destination/dml/columns.go
Original file line number Diff line number Diff line change
@@ -1,33 +1 @@
package dml

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
// NOTE: This should only be used with valid columns.
func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string {
var cols []string
for _, column := range columns {
colName := dialect.QuoteIdentifier(column.Name())
if column.ToastColumn {
var colValue string
if column.KindDetails == typing.Struct {
colValue = dialect.BuildProcessToastStructColExpression(colName)
} else {
colValue = dialect.BuildProcessToastColExpression(colName)
}
cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue))
} else {
// This is to make it look like: objCol = cc.objCol
cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName))
}
}

return strings.Join(cols, ",")
}
136 changes: 0 additions & 136 deletions lib/destination/dml/columns_test.go

This file was deleted.

10 changes: 5 additions & 5 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (m *MergeArgument) buildRedshiftUpdateQuery(cols []columns.Column) string {

return fmt.Sprintf(`UPDATE %s AS c SET %s FROM %s AS cc WHERE %s;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(cols, m.Dialect),
m.TableID.FullyQualifiedName(), columns.BuildColumnsUpdateFragment(cols, m.Dialect),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(clauses, " AND "),
)
Expand Down Expand Up @@ -209,7 +209,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect),
idempotentClause, columns.BuildColumnsUpdateFragment(m.Columns, m.Dialect),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(m.Columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand All @@ -234,7 +234,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// Delete
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(cols, m.Dialect),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, columns.BuildColumnsUpdateFragment(cols, m.Dialect),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(cols, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down Expand Up @@ -266,7 +266,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect),
idempotentClause, columns.BuildColumnsUpdateFragment(m.Columns, m.Dialect),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(m.Columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand All @@ -292,7 +292,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
// Delete
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(cols, m.Dialect),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, columns.BuildColumnsUpdateFragment(cols, m.Dialect),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(cols, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down
Loading

0 comments on commit 533e144

Please sign in to comment.