Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Redshift] Better handling around TOAST columns #1080

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ func (RedshiftDialect) IsTableDoesNotExistErr(_ error) bool {
}

func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
toastedValue := "%" + constants.ToastUnavailableValuePlaceholder + "%"
colName := sql.QuoteTableAliasColumn(tableAlias, column, rd)
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`COALESCE(%s != JSON_PARSE('{"key":"%s"}'), true)`,
colName, constants.ToastUnavailableValuePlaceholder)

switch column.KindDetails {
case typing.Struct, typing.Array:
return fmt.Sprintf("COALESCE(JSON_SERIALIZE(%s) NOT LIKE '%s', TRUE)", colName, toastedValue)
default:
return fmt.Sprintf(`COALESCE(%s NOT LIKE '%s', TRUE)`, colName, toastedValue)
}
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ []string) string {
Expand Down
47 changes: 34 additions & 13 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,35 @@ func TestRedshiftDialect_BuildDropColumnQuery(t *testing.T) {
}

func TestRedshiftDialect_BuildIsNotToastValueExpression(t *testing.T) {
assert.Equal(t,
`COALESCE(tbl."bar" != '__debezium_unavailable_value', true)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)),
)
assert.Equal(t,
`COALESCE(tbl."foo" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
{
// Typing is not specified
assert.Equal(t,
`COALESCE(tbl."foo" NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Invalid)),
)
}
{
// Struct
assert.Equal(t,
`COALESCE(JSON_SERIALIZE(tbl."foo") NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
}
{
// Array
assert.Equal(t,
`COALESCE(JSON_SERIALIZE(tbl."foo") NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Array)),
)
}
{
// String
assert.Equal(t,
`COALESCE(tbl."foo" NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.String)),
)
}

}

func TestRedshiftDialect_BuildMergeInsertQuery(t *testing.T) {
Expand Down Expand Up @@ -231,7 +252,7 @@ func TestRedshiftDialect_BuildMergeQueries_SkipDelete(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
parts[1])
}

Expand Down Expand Up @@ -259,7 +280,7 @@ func TestRedshiftDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT stg."id",stg."email",stg."first_name",stg."last_name",stg."created_at",stg."toast_text",stg."__artie_delete" FROM public.tableName__temp AS stg LEFT JOIN public.tableName AS tgt ON tgt."id" = stg."id" WHERE tgt."id" IS NULL;`,
parts[0])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
parts[1])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = true;`,
Expand Down Expand Up @@ -289,7 +310,7 @@ func TestRedshiftDialect_BuildMergeQueries_SoftDeleteComposite(t *testing.T) {
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT stg."id",stg."email",stg."first_name",stg."last_name",stg."created_at",stg."toast_text",stg."__artie_delete" FROM public.tableName__temp AS stg LEFT JOIN public.tableName AS tgt ON tgt."id" = stg."id" AND tgt."email" = stg."email" WHERE tgt."id" IS NULL;`,
parts[0])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
parts[1])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = true;`,
Expand Down Expand Up @@ -323,7 +344,7 @@ func TestRedshiftDialect_BuildMergeQueries(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
Expand Down Expand Up @@ -355,7 +376,7 @@ func TestRedshiftDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
Expand Down
2 changes: 1 addition & 1 deletion lib/sql/tests/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestBuildColumnsUpdateFragment_Redshift(t *testing.T) {
{
name: "struct, string and toast string",
columns: lastCaseColTypes,
expectedString: `"a1"= CASE WHEN COALESCE(stg."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN stg."a1" ELSE tgt."a1" END,"b2"= CASE WHEN COALESCE(stg."b2" != '__debezium_unavailable_value', true) THEN stg."b2" ELSE tgt."b2" END,"c3"=stg."c3"`,
expectedString: `"a1"= CASE WHEN COALESCE(JSON_SERIALIZE(stg."a1") NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."a1" ELSE tgt."a1" END,"b2"= CASE WHEN COALESCE(stg."b2" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."b2" ELSE tgt."b2" END,"c3"=stg."c3"`,
},
}

Expand Down
Loading