Skip to content

Commit

Permalink
[typing] Remove destKind arg from Columns.UpdateQuery (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored May 1, 2024
1 parent c4ee2e2 commit cf0f030
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 39 deletions.
12 changes: 6 additions & 6 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.DestKind, m.Dialect, false),
m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, false),
// FROM table (temp) WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause,
),
Expand Down Expand Up @@ -163,7 +163,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.DestKind, m.Dialect, true),
m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, true),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, constants.DeleteColumnMarker,
),
Expand Down Expand Up @@ -239,7 +239,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, false),
idempotentClause, m.Columns.UpdateQuery(m.Dialect, false),
// Insert
constants.DeleteColumnMarker, strings.Join(cols, ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down Expand Up @@ -272,7 +272,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// Delete
constants.DeleteColumnMarker,
// Update
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, true),
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
// Insert
constants.DeleteColumnMarker, strings.Join(cols, ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down Expand Up @@ -309,7 +309,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, false),
idempotentClause, m.Columns.UpdateQuery(m.Dialect, false),
// Insert
constants.DeleteColumnMarker, strings.Join(cols, ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down Expand Up @@ -343,7 +343,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
// Delete
constants.DeleteColumnMarker,
// Update
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, true),
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
// Insert
constants.DeleteColumnMarker, strings.Join(cols, ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down
20 changes: 10 additions & 10 deletions lib/typing/columns/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (c *Columns) DeleteColumn(name string) {
}

// UpdateQuery will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
func (c *Columns) UpdateQuery(destKind constants.DestinationKind, dialect sql.Dialect, skipDeleteCol bool) string {
func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string {
var cols []string
for _, column := range c.GetColumns() {
if column.ShouldSkip() {
Expand All @@ -272,9 +272,9 @@ func (c *Columns) UpdateQuery(destKind constants.DestinationKind, dialect sql.Di
colName := column.Name(dialect)
if column.ToastColumn {
if column.KindDetails == typing.Struct {
cols = append(cols, processToastStructCol(colName, destKind))
cols = append(cols, processToastStructCol(colName, dialect))
} else {
cols = append(cols, processToastCol(colName, destKind))
cols = append(cols, processToastCol(colName, dialect))
}

} else {
Expand All @@ -286,16 +286,16 @@ func (c *Columns) UpdateQuery(destKind constants.DestinationKind, dialect sql.Di
return strings.Join(cols, ",")
}

func processToastStructCol(colName string, destKind constants.DestinationKind) string {
switch destKind {
case constants.BigQuery:
func processToastStructCol(colName string, dialect sql.Dialect) string {
switch dialect.(type) {
case sql.BigQueryDialect:
return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`,
colName, colName, constants.ToastUnavailableValuePlaceholder,
colName, colName)
case constants.Redshift:
case sql.RedshiftDialect:
return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`,
colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
case constants.MSSQL:
case sql.MSSQLDialect:
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END",
colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
Expand All @@ -306,8 +306,8 @@ func processToastStructCol(colName string, destKind constants.DestinationKind) s
}
}

func processToastCol(colName string, destKind constants.DestinationKind) string {
if destKind == constants.MSSQL {
func processToastCol(colName string, dialect sql.Dialect) string {
if _, ok := dialect.(sql.MSSQLDialect); ok {
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName,
constants.ToastUnavailableValuePlaceholder, colName, colName)
Expand Down
21 changes: 7 additions & 14 deletions lib/typing/columns/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ func TestColumnsUpdateQuery(t *testing.T) {
name string
columns Columns
expectedString string
destKind constants.DestinationKind
dialect sql.Dialect
skipDeleteCol bool
}
Expand Down Expand Up @@ -481,53 +480,47 @@ func TestColumnsUpdateQuery(t *testing.T) {
{
name: "happy path",
columns: happyPathCols,
destKind: constants.Redshift,
dialect: sql.RedshiftDialect{},
expectedString: `"foo"=cc."foo","bar"=cc."bar"`,
},
{
name: "string and toast",
columns: stringAndToastCols,
destKind: constants.Snowflake,
dialect: sql.SnowflakeDialect{UppercaseEscNames: true},
expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`,
},
{
name: "struct, string and toast string",
columns: lastCaseColTypes,
destKind: constants.Redshift,
dialect: sql.RedshiftDialect{},
expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`,
},
{
name: "struct, string and toast string (bigquery)",
columns: lastCaseColTypes,
destKind: constants.BigQuery,
dialect: sql.BigQueryDialect{},
expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`",
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
destKind: constants.BigQuery,
dialect: sql.BigQueryDialect{},
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
dialect: sql.BigQueryDialect{},
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`"),
skipDeleteCol: true,
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
destKind: constants.BigQuery,
dialect: sql.BigQueryDialect{},
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
dialect: sql.BigQueryDialect{},
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"),
skipDeleteCol: false,
},
}

for _, _testCase := range testCases {
actualQuery := _testCase.columns.UpdateQuery(_testCase.destKind, _testCase.dialect, _testCase.skipDeleteCol)
actualQuery := _testCase.columns.UpdateQuery(_testCase.dialect, _testCase.skipDeleteCol)
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
18 changes: 9 additions & 9 deletions lib/typing/columns/columns_toast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ package columns
import (
"testing"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/stretchr/testify/assert"
)

func TestProcessToastStructCol(t *testing.T) {
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.Redshift))
assert.Equal(t, `foo= CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.BigQuery))
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.Snowflake))
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.MSSQL))
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{}))
assert.Equal(t, `foo= CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{}))
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{}))
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{}))
}

func TestProcessToastCol(t *testing.T) {
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.Redshift))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.BigQuery))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.Snowflake))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.MSSQL))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{}))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{}))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{}))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{}))
}

0 comments on commit cf0f030

Please sign in to comment.