From c5f72201bba2cf10909e964d79b5689e3e22ab10 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Thu, 2 May 2024 17:07:29 -0700 Subject: [PATCH 1/3] Move `typing.UpdateColumn` to `dml` --- lib/destination/dml/columns.go | 72 ++++++++++ lib/destination/dml/columns_test.go | 134 +++++++++++++++++ .../dml}/columns_toast_test.go | 2 +- lib/destination/dml/merge.go | 12 +- lib/typing/columns/columns.go | 64 --------- lib/typing/columns/columns_test.go | 135 ------------------ 6 files changed, 213 insertions(+), 206 deletions(-) create mode 100644 lib/destination/dml/columns.go create mode 100644 lib/destination/dml/columns_test.go rename lib/{typing/columns => destination/dml}/columns_toast_test.go (99%) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go new file mode 100644 index 000000000..67c791120 --- /dev/null +++ b/lib/destination/dml/columns.go @@ -0,0 +1,72 @@ +package dml + +import ( + "fmt" + "strings" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" +) + +// UpdateQuery will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email +func UpdateQuery(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string { + var cols []string + for _, column := range c.GetColumns() { + if column.ShouldSkip() { + continue + } + + // skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete. + if skipDeleteCol && column.Name() == constants.DeleteColumnMarker { + continue + } + + colName := dialect.QuoteIdentifier(column.Name()) + if column.ToastColumn { + if column.KindDetails == typing.Struct { + cols = append(cols, processToastStructCol(colName, dialect)) + } else { + cols = append(cols, processToastCol(colName, dialect)) + } + + } else { + // This is to make it look like: objCol = cc.objCol + cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) + } + } + + return strings.Join(cols, ",") +} + +func processToastStructCol(colName string, dialect sql.Dialect) string { + switch dialect.(type) { + case sql.BigQueryDialect: + return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, + colName, colName, constants.ToastUnavailableValuePlaceholder, + colName, colName) + case sql.RedshiftDialect: + return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + case sql.MSSQLDialect: + // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + default: + // TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit. + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + } +} + +func processToastCol(colName string, dialect sql.Dialect) string { + if _, ok := dialect.(sql.MSSQLDialect); ok { + // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName, + constants.ToastUnavailableValuePlaceholder, colName, colName) + } else { + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + } +} diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go new file mode 100644 index 000000000..bf25afdcd --- /dev/null +++ b/lib/destination/dml/columns_test.go @@ -0,0 +1,134 @@ +package dml + +import ( + "fmt" + "testing" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/stretchr/testify/assert" +) + +func TestColumnsUpdateQuery(t *testing.T) { + type testCase struct { + name string + columns columns.Columns + expectedString string + dialect sql.Dialect + skipDeleteCol bool + } + + fooBarCols := []string{"foo", "bar"} + + var ( + happyPathCols columns.Columns + stringAndToastCols columns.Columns + lastCaseColTypes columns.Columns + lastCaseEscapeTypes columns.Columns + ) + for _, col := range fooBarCols { + column := columns.NewColumn(col, typing.String) + column.ToastColumn = false + happyPathCols.AddColumn(column) + } + for _, col := range fooBarCols { + var toastCol bool + if col == "foo" { + toastCol = true + } + + column := columns.NewColumn(col, typing.String) + column.ToastColumn = toastCol + stringAndToastCols.AddColumn(column) + } + + lastCaseCols := []string{"a1", "b2", "c3"} + for _, lastCaseCol := range lastCaseCols { + kd := typing.String + var toast bool + // a1 - struct + toast, b2 - string + toast, c3 = regular string. + if lastCaseCol == "a1" { + kd = typing.Struct + toast = true + } else if lastCaseCol == "b2" { + toast = true + } + + column := columns.NewColumn(lastCaseCol, kd) + column.ToastColumn = toast + lastCaseColTypes.AddColumn(column) + } + + lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} + for _, lastCaseColEsc := range lastCaseColsEsc { + kd := typing.String + var toast bool + // a1 - struct + toast, b2 - string + toast, c3 = regular string. + if lastCaseColEsc == "a1" { + kd = typing.Struct + toast = true + } else if lastCaseColEsc == "b2" { + toast = true + } else if lastCaseColEsc == "start" { + kd = typing.Struct + toast = true + } + + column := columns.NewColumn(lastCaseColEsc, kd) + column.ToastColumn = toast + lastCaseEscapeTypes.AddColumn(column) + } + + lastCaseEscapeTypes.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + key := `{"key":"__debezium_unavailable_value"}` + testCases := []testCase{ + { + name: "happy path", + columns: happyPathCols, + dialect: sql.RedshiftDialect{}, + expectedString: `"foo"=cc."foo","bar"=cc."bar"`, + }, + { + name: "string and toast", + columns: stringAndToastCols, + dialect: sql.SnowflakeDialect{}, + expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, + }, + { + name: "struct, string and toast string", + columns: lastCaseColTypes, + dialect: sql.RedshiftDialect{}, + expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`, + }, + { + name: "struct, string and toast string (bigquery)", + columns: lastCaseColTypes, + dialect: sql.BigQueryDialect{}, + expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", + }, + { + name: "struct, string and toast string (bigquery) w/ reserved keywords", + columns: lastCaseEscapeTypes, + dialect: sql.BigQueryDialect{}, + expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", + key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`"), + skipDeleteCol: true, + }, + { + name: "struct, string and toast string (bigquery) w/ reserved keywords", + columns: lastCaseEscapeTypes, + dialect: sql.BigQueryDialect{}, + expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", + key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), + skipDeleteCol: false, + }, + } + + for _, _testCase := range testCases { + actualQuery := UpdateQuery(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) + assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) + } +} diff --git a/lib/typing/columns/columns_toast_test.go b/lib/destination/dml/columns_toast_test.go similarity index 99% rename from lib/typing/columns/columns_toast_test.go rename to lib/destination/dml/columns_toast_test.go index 47ac0fa00..ff98b7b4b 100644 --- a/lib/typing/columns/columns_toast_test.go +++ b/lib/destination/dml/columns_toast_test.go @@ -1,4 +1,4 @@ -package columns +package dml import ( "testing" diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 9459706be..cd7785457 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -128,7 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, false), + m.TableID.FullyQualifiedName(), UpdateQuery(m.Columns, m.Dialect, false), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -166,7 +166,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, true), + m.TableID.FullyQualifiedName(), UpdateQuery(m.Columns, m.Dialect, true), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), @@ -244,7 +244,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), + idempotentClause, UpdateQuery(m.Columns, m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -270,7 +270,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, UpdateQuery(m.Columns, m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -308,7 +308,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), + idempotentClause, UpdateQuery(m.Columns, m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -335,7 +335,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, UpdateQuery(m.Columns, m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 19ffef0c3..2855fd7a9 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -1,12 +1,9 @@ package columns import ( - "fmt" "strings" "sync" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" ) @@ -226,64 +223,3 @@ func (c *Columns) DeleteColumn(name string) { } } } - -// UpdateQuery will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email -func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string { - var cols []string - for _, column := range c.GetColumns() { - if column.ShouldSkip() { - continue - } - - // skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete. - if skipDeleteCol && column.Name() == constants.DeleteColumnMarker { - continue - } - - colName := dialect.QuoteIdentifier(column.Name()) - if column.ToastColumn { - if column.KindDetails == typing.Struct { - cols = append(cols, processToastStructCol(colName, dialect)) - } else { - cols = append(cols, processToastCol(colName, dialect)) - } - - } else { - // This is to make it look like: objCol = cc.objCol - cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) - } - } - - return strings.Join(cols, ",") -} - -func processToastStructCol(colName string, dialect sql.Dialect) string { - switch dialect.(type) { - case sql.BigQueryDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, - colName, colName) - case sql.RedshiftDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - case sql.MSSQLDialect: - // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - default: - // TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - } -} - -func processToastCol(colName string, dialect sql.Dialect) string { - if _, ok := dialect.(sql.MSSQLDialect); ok { - // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName, - constants.ToastUnavailableValuePlaceholder, colName, colName) - } else { - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - } -} diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 0ed03b152..cb785b875 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -4,9 +4,7 @@ import ( "fmt" "testing" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" ) @@ -290,136 +288,3 @@ func TestColumns_Mutation(t *testing.T) { cols.DeleteColumn("bar") assert.Equal(t, len(cols.GetColumns()), 0) } - -func TestColumnsUpdateQuery(t *testing.T) { - type testCase struct { - name string - columns Columns - expectedString string - dialect sql.Dialect - skipDeleteCol bool - } - - fooBarCols := []string{"foo", "bar"} - - var ( - happyPathCols Columns - stringAndToastCols Columns - lastCaseColTypes Columns - lastCaseEscapeTypes Columns - ) - for _, col := range fooBarCols { - happyPathCols.AddColumn(Column{ - name: col, - KindDetails: typing.String, - ToastColumn: false, - }) - } - for _, col := range fooBarCols { - var toastCol bool - if col == "foo" { - toastCol = true - } - - stringAndToastCols.AddColumn(Column{ - name: col, - KindDetails: typing.String, - ToastColumn: toastCol, - }) - } - - lastCaseCols := []string{"a1", "b2", "c3"} - for _, lastCaseCol := range lastCaseCols { - kd := typing.String - var toast bool - // a1 - struct + toast, b2 - string + toast, c3 = regular string. - if lastCaseCol == "a1" { - kd = typing.Struct - toast = true - } else if lastCaseCol == "b2" { - toast = true - } - - lastCaseColTypes.AddColumn(Column{ - name: lastCaseCol, - KindDetails: kd, - ToastColumn: toast, - }) - } - - lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} - for _, lastCaseColEsc := range lastCaseColsEsc { - kd := typing.String - var toast bool - // a1 - struct + toast, b2 - string + toast, c3 = regular string. - if lastCaseColEsc == "a1" { - kd = typing.Struct - toast = true - } else if lastCaseColEsc == "b2" { - toast = true - } else if lastCaseColEsc == "start" { - kd = typing.Struct - toast = true - } - - lastCaseEscapeTypes.AddColumn(Column{ - name: lastCaseColEsc, - KindDetails: kd, - ToastColumn: toast, - }) - } - - lastCaseEscapeTypes.AddColumn(Column{ - name: constants.DeleteColumnMarker, - KindDetails: typing.Boolean, - }) - - key := `{"key":"__debezium_unavailable_value"}` - testCases := []testCase{ - { - name: "happy path", - columns: happyPathCols, - dialect: sql.RedshiftDialect{}, - expectedString: `"foo"=cc."foo","bar"=cc."bar"`, - }, - { - name: "string and toast", - columns: stringAndToastCols, - dialect: sql.SnowflakeDialect{}, - expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, - }, - { - name: "struct, string and toast string", - columns: lastCaseColTypes, - dialect: sql.RedshiftDialect{}, - expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`, - }, - { - name: "struct, string and toast string (bigquery)", - columns: lastCaseColTypes, - dialect: sql.BigQueryDialect{}, - expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", - }, - { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - dialect: sql.BigQueryDialect{}, - expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", - key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`"), - skipDeleteCol: true, - }, - { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - dialect: sql.BigQueryDialect{}, - expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", - key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), - skipDeleteCol: false, - }, - } - - for _, _testCase := range testCases { - actualQuery := _testCase.columns.UpdateQuery(_testCase.dialect, _testCase.skipDeleteCol) - assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) - } -} From 191ceb80f19df5a6a93c58655a94cc61a22c7dc6 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Thu, 2 May 2024 17:13:20 -0700 Subject: [PATCH 2/3] Rename test --- lib/destination/dml/columns_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index bf25afdcd..b6ddeb7cf 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestColumnsUpdateQuery(t *testing.T) { +func TestUpdateQuery(t *testing.T) { type testCase struct { name string columns columns.Columns From f0e68d344fc92b4f61b45212047d9b1faf33897c Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Thu, 2 May 2024 17:30:29 -0700 Subject: [PATCH 3/3] Rename function --- lib/destination/dml/columns.go | 4 ++-- lib/destination/dml/columns_test.go | 4 ++-- lib/destination/dml/merge.go | 12 ++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 67c791120..17e31b25b 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -10,8 +10,8 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -// UpdateQuery will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email -func UpdateQuery(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string { +// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email +func buildColumnsUpdateFragment(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string { var cols []string for _, column := range c.GetColumns() { if column.ShouldSkip() { diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index b6ddeb7cf..8833246c1 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestUpdateQuery(t *testing.T) { +func TestBuildColumnsUpdateFragment(t *testing.T) { type testCase struct { name string columns columns.Columns @@ -128,7 +128,7 @@ func TestUpdateQuery(t *testing.T) { } for _, _testCase := range testCases { - actualQuery := UpdateQuery(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) + actualQuery := buildColumnsUpdateFragment(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index cd7785457..b76763423 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -128,7 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), UpdateQuery(m.Columns, m.Dialect, false), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, false), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -166,7 +166,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), UpdateQuery(m.Columns, m.Dialect, true), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, true), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), @@ -244,7 +244,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, UpdateQuery(m.Columns, m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -270,7 +270,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, UpdateQuery(m.Columns, m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -308,7 +308,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, UpdateQuery(m.Columns, m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -335,7 +335,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, UpdateQuery(m.Columns, m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{