From 6cd5cd12bbe5c83e88891ddb6b4cc2f09ed07728 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 12 Dec 2024 15:47:39 -0800 Subject: [PATCH] [Redshift] Better handling around TOAST columns (#1080) --- clients/redshift/dialect/dialect.go | 11 ++++-- clients/redshift/dialect/dialect_test.go | 47 +++++++++++++++++------- lib/sql/tests/columns_test.go | 2 +- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 0036c094e..064ab52f9 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -31,12 +31,15 @@ func (RedshiftDialect) IsTableDoesNotExistErr(_ error) bool { } func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string { + toastedValue := "%" + constants.ToastUnavailableValuePlaceholder + "%" colName := sql.QuoteTableAliasColumn(tableAlias, column, rd) - if column.KindDetails == typing.Struct { - return fmt.Sprintf(`COALESCE(%s != JSON_PARSE('{"key":"%s"}'), true)`, - colName, constants.ToastUnavailableValuePlaceholder) + + switch column.KindDetails { + case typing.Struct, typing.Array: + return fmt.Sprintf("COALESCE(JSON_SERIALIZE(%s) NOT LIKE '%s', TRUE)", colName, toastedValue) + default: + return fmt.Sprintf(`COALESCE(%s NOT LIKE '%s', TRUE)`, colName, toastedValue) } - return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ []string) string { diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index 1ab2e952f..f64d4c052 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -80,14 +80,35 @@ func TestRedshiftDialect_BuildDropColumnQuery(t *testing.T) { } func TestRedshiftDialect_BuildIsNotToastValueExpression(t *testing.T) { - assert.Equal(t, - `COALESCE(tbl."bar" != '__debezium_unavailable_value', true)`, - RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)), - ) - assert.Equal(t, - `COALESCE(tbl."foo" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true)`, - RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)), - ) + { + // Typing is not specified + assert.Equal(t, + `COALESCE(tbl."foo" NOT LIKE '%__debezium_unavailable_value%', TRUE)`, + RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Invalid)), + ) + } + { + // Struct + assert.Equal(t, + `COALESCE(JSON_SERIALIZE(tbl."foo") NOT LIKE '%__debezium_unavailable_value%', TRUE)`, + RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)), + ) + } + { + // Array + assert.Equal(t, + `COALESCE(JSON_SERIALIZE(tbl."foo") NOT LIKE '%__debezium_unavailable_value%', TRUE)`, + RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Array)), + ) + } + { + // String + assert.Equal(t, + `COALESCE(tbl."foo" NOT LIKE '%__debezium_unavailable_value%', TRUE)`, + RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.String)), + ) + } + } func TestRedshiftDialect_BuildMergeInsertQuery(t *testing.T) { @@ -231,7 +252,7 @@ func TestRedshiftDialect_BuildMergeQueries_SkipDelete(t *testing.T) { parts[0]) assert.Equal(t, - `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`, + `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`, parts[1]) } @@ -259,7 +280,7 @@ func TestRedshiftDialect_BuildMergeQueries_SoftDelete(t *testing.T) { `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT stg."id",stg."email",stg."first_name",stg."last_name",stg."created_at",stg."toast_text",stg."__artie_delete" FROM public.tableName__temp AS stg LEFT JOIN public.tableName AS tgt ON tgt."id" = stg."id" WHERE tgt."id" IS NULL;`, parts[0]) assert.Equal(t, - `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = false;`, + `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = false;`, parts[1]) assert.Equal(t, `UPDATE public.tableName AS tgt SET "__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = true;`, @@ -289,7 +310,7 @@ func TestRedshiftDialect_BuildMergeQueries_SoftDeleteComposite(t *testing.T) { `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT stg."id",stg."email",stg."first_name",stg."last_name",stg."created_at",stg."toast_text",stg."__artie_delete" FROM public.tableName__temp AS stg LEFT JOIN public.tableName AS tgt ON tgt."id" = stg."id" AND tgt."email" = stg."email" WHERE tgt."id" IS NULL;`, parts[0]) assert.Equal(t, - `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = false;`, + `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = false;`, parts[1]) assert.Equal(t, `UPDATE public.tableName AS tgt SET "__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = true;`, @@ -323,7 +344,7 @@ func TestRedshiftDialect_BuildMergeQueries(t *testing.T) { parts[0]) assert.Equal(t, - `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`, + `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`, parts[1]) assert.Equal(t, @@ -355,7 +376,7 @@ func TestRedshiftDialect_BuildMergeQueries_CompositeKey(t *testing.T) { parts[0]) assert.Equal(t, - `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_delete", false) = false;`, + `UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_delete", false) = false;`, parts[1]) assert.Equal(t, diff --git a/lib/sql/tests/columns_test.go b/lib/sql/tests/columns_test.go index 77147bd30..c9f6ee277 100644 --- a/lib/sql/tests/columns_test.go +++ b/lib/sql/tests/columns_test.go @@ -157,7 +157,7 @@ func TestBuildColumnsUpdateFragment_Redshift(t *testing.T) { { name: "struct, string and toast string", columns: lastCaseColTypes, - expectedString: `"a1"= CASE WHEN COALESCE(stg."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN stg."a1" ELSE tgt."a1" END,"b2"= CASE WHEN COALESCE(stg."b2" != '__debezium_unavailable_value', true) THEN stg."b2" ELSE tgt."b2" END,"c3"=stg."c3"`, + expectedString: `"a1"= CASE WHEN COALESCE(JSON_SERIALIZE(stg."a1") NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."a1" ELSE tgt."a1" END,"b2"= CASE WHEN COALESCE(stg."b2" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."b2" ELSE tgt."b2" END,"c3"=stg."c3"`, }, }