Skip to content

Commit

Permalink
[dml] Clean up buildColumnsUpdateFragment
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 3, 2024
1 parent 1024623 commit c6c5ae4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
29 changes: 15 additions & 14 deletions lib/destination/dml/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, s

colName := dialect.QuoteIdentifier(column.Name())
if column.ToastColumn {
var colValue string
if column.KindDetails == typing.Struct {
cols = append(cols, processToastStructCol(colName, dialect))
colValue = processToastStructCol(colName, dialect)
} else {
cols = append(cols, processToastCol(colName, dialect))
colValue = processToastCol(colName, dialect)
}

cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue))
} else {
// This is to make it look like: objCol = cc.objCol
cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName))
Expand All @@ -43,30 +44,30 @@ func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, s
func processToastStructCol(colName string, dialect sql.Dialect) string {
switch dialect.(type) {
case sql.BigQueryDialect:
return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`,
colName, colName, constants.ToastUnavailableValuePlaceholder,
return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder,
colName, colName)
case sql.RedshiftDialect:
return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`,
colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
case sql.MSSQLDialect:
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END",
colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
default:
// TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit.
return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END",
colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}
}

func processToastCol(colName string, dialect sql.Dialect) string {
if _, ok := dialect.(sql.MSSQLDialect); ok {
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName,
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName,
constants.ToastUnavailableValuePlaceholder, colName, colName)
} else {
return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END",
colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}
}
16 changes: 8 additions & 8 deletions lib/destination/dml/columns_toast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
)

func TestProcessToastStructCol(t *testing.T) {
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{}))
assert.Equal(t, `foo= CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{}))
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{}))
assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{}))
}

func TestProcessToastCol(t *testing.T) {
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{}))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{}))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{}))
assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{}))
}

0 comments on commit c6c5ae4

Please sign in to comment.