diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index f70f0bcfe..843a8f310 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -25,12 +25,13 @@ func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, s colName := dialect.QuoteIdentifier(column.Name()) if column.ToastColumn { + var colValue string if column.KindDetails == typing.Struct { - cols = append(cols, processToastStructCol(colName, dialect)) + colValue = processToastStructCol(colName, dialect) } else { - cols = append(cols, processToastCol(colName, dialect)) + colValue = processToastCol(colName, dialect) } - + cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue)) } else { // This is to make it look like: objCol = cc.objCol cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) @@ -43,30 +44,30 @@ func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, s func processToastStructCol(colName string, dialect sql.Dialect) string { switch dialect.(type) { case sql.BigQueryDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, + return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) case sql.RedshiftDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) case sql.MSSQLDialect: // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) default: // TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } } func processToastCol(colName string, dialect sql.Dialect) string { if _, ok := dialect.(sql.MSSQLDialect); ok { // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName, + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } else { - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } } diff --git a/lib/destination/dml/columns_toast_test.go b/lib/destination/dml/columns_toast_test.go index ff98b7b4b..a98c9d879 100644 --- a/lib/destination/dml/columns_toast_test.go +++ b/lib/destination/dml/columns_toast_test.go @@ -8,15 +8,15 @@ import ( ) func TestProcessToastStructCol(t *testing.T) { - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{})) - assert.Equal(t, `foo= CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{})) - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{})) - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{})) } func TestProcessToastCol(t *testing.T) { - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) }