Skip to content

Commit

Permalink
[Snowflake] Better handling of TOAST columns (#1078)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 12, 2024
1 parent 29ddf90 commit 17103d3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
9 changes: 6 additions & 3 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ func (SnowflakeDialect) IsTableDoesNotExistErr(err error) bool {
}

func (sd SnowflakeDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
toastedValue := "%" + constants.ToastUnavailableValuePlaceholder + "%"
colName := sql.QuoteTableAliasColumn(tableAlias, column, sd)
if column.KindDetails == typing.Struct {
return fmt.Sprintf("COALESCE(%s != {'key': '%s'}, true)", colName, constants.ToastUnavailableValuePlaceholder)
switch column.KindDetails {
case typing.String:
return fmt.Sprintf("COALESCE(%s NOT LIKE '%s', TRUE)", colName, toastedValue)
default:
return fmt.Sprintf("COALESCE(TO_VARCHAR(%s) NOT LIKE '%s', TRUE)", colName, toastedValue)
}
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (SnowflakeDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
Expand Down
30 changes: 22 additions & 8 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,28 @@ func TestSnowflakeDialect_BuildDropColumnQuery(t *testing.T) {
}

func TestSnowflakeDialect_BuildIsNotToastValueExpression(t *testing.T) {
assert.Equal(t,
`COALESCE(tbl."BAR" != '__debezium_unavailable_value', true)`,
SnowflakeDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)),
)
assert.Equal(t,
`COALESCE(tbl."FOO" != {'key': '__debezium_unavailable_value'}, true)`,
SnowflakeDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
{
// Unspecified data type
assert.Equal(t,
`COALESCE(TO_VARCHAR(tbl."BAR") NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
SnowflakeDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)),
)
}
{
// Structs
assert.Equal(t,
`COALESCE(TO_VARCHAR(tbl."FOO") NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
SnowflakeDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
}
{
// String
assert.Equal(t,
`COALESCE(tbl."BAR" NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
SnowflakeDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.String)),
)
}

}

func buildColumns(colTypesMap map[string]typing.KindDetails) *columns.Columns {
Expand Down
2 changes: 1 addition & 1 deletion lib/sql/tests/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestBuildColumnsUpdateFragment_Snowflake(t *testing.T) {
}

actualQuery := sql.BuildColumnsUpdateFragment(stringAndToastCols, "stg", "tgt", snowflakeDialect.SnowflakeDialect{})
assert.Equal(t, `"FOO"= CASE WHEN COALESCE(stg."FOO" != '__debezium_unavailable_value', true) THEN stg."FOO" ELSE tgt."FOO" END,"BAR"=stg."BAR"`, actualQuery)
assert.Equal(t, `"FOO"= CASE WHEN COALESCE(stg."FOO" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."FOO" ELSE tgt."FOO" END,"BAR"=stg."BAR"`, actualQuery)
}

func TestBuildColumnComparison(t *testing.T) {
Expand Down

0 comments on commit 17103d3

Please sign in to comment.