From 6f690e5aba4401bf94b12f3cec7ff34745092e30 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 2 May 2024 18:09:33 -0700 Subject: [PATCH 01/12] Move `typing.UpdateQuery` to `dml` (#550) --- lib/destination/dml/columns.go | 72 ++++++++++ lib/destination/dml/columns_test.go | 134 +++++++++++++++++ .../dml}/columns_toast_test.go | 2 +- lib/destination/dml/merge.go | 12 +- lib/typing/columns/columns.go | 64 --------- lib/typing/columns/columns_test.go | 135 ------------------ 6 files changed, 213 insertions(+), 206 deletions(-) create mode 100644 lib/destination/dml/columns.go create mode 100644 lib/destination/dml/columns_test.go rename lib/{typing/columns => destination/dml}/columns_toast_test.go (99%) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go new file mode 100644 index 000000000..17e31b25b --- /dev/null +++ b/lib/destination/dml/columns.go @@ -0,0 +1,72 @@ +package dml + +import ( + "fmt" + "strings" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" +) + +// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email +func buildColumnsUpdateFragment(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string { + var cols []string + for _, column := range c.GetColumns() { + if column.ShouldSkip() { + continue + } + + // skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete. + if skipDeleteCol && column.Name() == constants.DeleteColumnMarker { + continue + } + + colName := dialect.QuoteIdentifier(column.Name()) + if column.ToastColumn { + if column.KindDetails == typing.Struct { + cols = append(cols, processToastStructCol(colName, dialect)) + } else { + cols = append(cols, processToastCol(colName, dialect)) + } + + } else { + // This is to make it look like: objCol = cc.objCol + cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) + } + } + + return strings.Join(cols, ",") +} + +func processToastStructCol(colName string, dialect sql.Dialect) string { + switch dialect.(type) { + case sql.BigQueryDialect: + return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, + colName, colName, constants.ToastUnavailableValuePlaceholder, + colName, colName) + case sql.RedshiftDialect: + return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + case sql.MSSQLDialect: + // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + default: + // TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit. + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + } +} + +func processToastCol(colName string, dialect sql.Dialect) string { + if _, ok := dialect.(sql.MSSQLDialect); ok { + // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName, + constants.ToastUnavailableValuePlaceholder, colName, colName) + } else { + return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", + colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + } +} diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go new file mode 100644 index 000000000..8833246c1 --- /dev/null +++ b/lib/destination/dml/columns_test.go @@ -0,0 +1,134 @@ +package dml + +import ( + "fmt" + "testing" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/stretchr/testify/assert" +) + +func TestBuildColumnsUpdateFragment(t *testing.T) { + type testCase struct { + name string + columns columns.Columns + expectedString string + dialect sql.Dialect + skipDeleteCol bool + } + + fooBarCols := []string{"foo", "bar"} + + var ( + happyPathCols columns.Columns + stringAndToastCols columns.Columns + lastCaseColTypes columns.Columns + lastCaseEscapeTypes columns.Columns + ) + for _, col := range fooBarCols { + column := columns.NewColumn(col, typing.String) + column.ToastColumn = false + happyPathCols.AddColumn(column) + } + for _, col := range fooBarCols { + var toastCol bool + if col == "foo" { + toastCol = true + } + + column := columns.NewColumn(col, typing.String) + column.ToastColumn = toastCol + stringAndToastCols.AddColumn(column) + } + + lastCaseCols := []string{"a1", "b2", "c3"} + for _, lastCaseCol := range lastCaseCols { + kd := typing.String + var toast bool + // a1 - struct + toast, b2 - string + toast, c3 = regular string. + if lastCaseCol == "a1" { + kd = typing.Struct + toast = true + } else if lastCaseCol == "b2" { + toast = true + } + + column := columns.NewColumn(lastCaseCol, kd) + column.ToastColumn = toast + lastCaseColTypes.AddColumn(column) + } + + lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} + for _, lastCaseColEsc := range lastCaseColsEsc { + kd := typing.String + var toast bool + // a1 - struct + toast, b2 - string + toast, c3 = regular string. + if lastCaseColEsc == "a1" { + kd = typing.Struct + toast = true + } else if lastCaseColEsc == "b2" { + toast = true + } else if lastCaseColEsc == "start" { + kd = typing.Struct + toast = true + } + + column := columns.NewColumn(lastCaseColEsc, kd) + column.ToastColumn = toast + lastCaseEscapeTypes.AddColumn(column) + } + + lastCaseEscapeTypes.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + key := `{"key":"__debezium_unavailable_value"}` + testCases := []testCase{ + { + name: "happy path", + columns: happyPathCols, + dialect: sql.RedshiftDialect{}, + expectedString: `"foo"=cc."foo","bar"=cc."bar"`, + }, + { + name: "string and toast", + columns: stringAndToastCols, + dialect: sql.SnowflakeDialect{}, + expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, + }, + { + name: "struct, string and toast string", + columns: lastCaseColTypes, + dialect: sql.RedshiftDialect{}, + expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`, + }, + { + name: "struct, string and toast string (bigquery)", + columns: lastCaseColTypes, + dialect: sql.BigQueryDialect{}, + expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", + }, + { + name: "struct, string and toast string (bigquery) w/ reserved keywords", + columns: lastCaseEscapeTypes, + dialect: sql.BigQueryDialect{}, + expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", + key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`"), + skipDeleteCol: true, + }, + { + name: "struct, string and toast string (bigquery) w/ reserved keywords", + columns: lastCaseEscapeTypes, + dialect: sql.BigQueryDialect{}, + expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", + key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), + skipDeleteCol: false, + }, + } + + for _, _testCase := range testCases { + actualQuery := buildColumnsUpdateFragment(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) + assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) + } +} diff --git a/lib/typing/columns/columns_toast_test.go b/lib/destination/dml/columns_toast_test.go similarity index 99% rename from lib/typing/columns/columns_toast_test.go rename to lib/destination/dml/columns_toast_test.go index 47ac0fa00..ff98b7b4b 100644 --- a/lib/typing/columns/columns_toast_test.go +++ b/lib/destination/dml/columns_toast_test.go @@ -1,4 +1,4 @@ -package columns +package dml import ( "testing" diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 9459706be..b76763423 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -128,7 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, false), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, false), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -166,7 +166,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, true), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, true), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), @@ -244,7 +244,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -270,7 +270,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -308,7 +308,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -335,7 +335,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 19ffef0c3..2855fd7a9 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -1,12 +1,9 @@ package columns import ( - "fmt" "strings" "sync" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" ) @@ -226,64 +223,3 @@ func (c *Columns) DeleteColumn(name string) { } } } - -// UpdateQuery will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email -func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string { - var cols []string - for _, column := range c.GetColumns() { - if column.ShouldSkip() { - continue - } - - // skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete. - if skipDeleteCol && column.Name() == constants.DeleteColumnMarker { - continue - } - - colName := dialect.QuoteIdentifier(column.Name()) - if column.ToastColumn { - if column.KindDetails == typing.Struct { - cols = append(cols, processToastStructCol(colName, dialect)) - } else { - cols = append(cols, processToastCol(colName, dialect)) - } - - } else { - // This is to make it look like: objCol = cc.objCol - cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) - } - } - - return strings.Join(cols, ",") -} - -func processToastStructCol(colName string, dialect sql.Dialect) string { - switch dialect.(type) { - case sql.BigQueryDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, - colName, colName) - case sql.RedshiftDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - case sql.MSSQLDialect: - // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - default: - // TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - } -} - -func processToastCol(colName string, dialect sql.Dialect) string { - if _, ok := dialect.(sql.MSSQLDialect); ok { - // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName, - constants.ToastUnavailableValuePlaceholder, colName, colName) - } else { - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - } -} diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 0ed03b152..cb785b875 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -4,9 +4,7 @@ import ( "fmt" "testing" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" ) @@ -290,136 +288,3 @@ func TestColumns_Mutation(t *testing.T) { cols.DeleteColumn("bar") assert.Equal(t, len(cols.GetColumns()), 0) } - -func TestColumnsUpdateQuery(t *testing.T) { - type testCase struct { - name string - columns Columns - expectedString string - dialect sql.Dialect - skipDeleteCol bool - } - - fooBarCols := []string{"foo", "bar"} - - var ( - happyPathCols Columns - stringAndToastCols Columns - lastCaseColTypes Columns - lastCaseEscapeTypes Columns - ) - for _, col := range fooBarCols { - happyPathCols.AddColumn(Column{ - name: col, - KindDetails: typing.String, - ToastColumn: false, - }) - } - for _, col := range fooBarCols { - var toastCol bool - if col == "foo" { - toastCol = true - } - - stringAndToastCols.AddColumn(Column{ - name: col, - KindDetails: typing.String, - ToastColumn: toastCol, - }) - } - - lastCaseCols := []string{"a1", "b2", "c3"} - for _, lastCaseCol := range lastCaseCols { - kd := typing.String - var toast bool - // a1 - struct + toast, b2 - string + toast, c3 = regular string. - if lastCaseCol == "a1" { - kd = typing.Struct - toast = true - } else if lastCaseCol == "b2" { - toast = true - } - - lastCaseColTypes.AddColumn(Column{ - name: lastCaseCol, - KindDetails: kd, - ToastColumn: toast, - }) - } - - lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} - for _, lastCaseColEsc := range lastCaseColsEsc { - kd := typing.String - var toast bool - // a1 - struct + toast, b2 - string + toast, c3 = regular string. - if lastCaseColEsc == "a1" { - kd = typing.Struct - toast = true - } else if lastCaseColEsc == "b2" { - toast = true - } else if lastCaseColEsc == "start" { - kd = typing.Struct - toast = true - } - - lastCaseEscapeTypes.AddColumn(Column{ - name: lastCaseColEsc, - KindDetails: kd, - ToastColumn: toast, - }) - } - - lastCaseEscapeTypes.AddColumn(Column{ - name: constants.DeleteColumnMarker, - KindDetails: typing.Boolean, - }) - - key := `{"key":"__debezium_unavailable_value"}` - testCases := []testCase{ - { - name: "happy path", - columns: happyPathCols, - dialect: sql.RedshiftDialect{}, - expectedString: `"foo"=cc."foo","bar"=cc."bar"`, - }, - { - name: "string and toast", - columns: stringAndToastCols, - dialect: sql.SnowflakeDialect{}, - expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, - }, - { - name: "struct, string and toast string", - columns: lastCaseColTypes, - dialect: sql.RedshiftDialect{}, - expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`, - }, - { - name: "struct, string and toast string (bigquery)", - columns: lastCaseColTypes, - dialect: sql.BigQueryDialect{}, - expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", - }, - { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - dialect: sql.BigQueryDialect{}, - expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", - key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`"), - skipDeleteCol: true, - }, - { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - dialect: sql.BigQueryDialect{}, - expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", - key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), - skipDeleteCol: false, - }, - } - - for _, _testCase := range testCases { - actualQuery := _testCase.columns.UpdateQuery(_testCase.dialect, _testCase.skipDeleteCol) - assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) - } -} From 10246237a115d5c84daa8fd9faaceb36cd223c29 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 08:44:51 -0700 Subject: [PATCH 02/12] [dml] Pass `[]Column` instead of `Columns` (#551) --- lib/destination/dml/columns.go | 4 ++-- lib/destination/dml/columns_test.go | 22 +++++++++++----------- lib/destination/dml/merge.go | 12 ++++++------ 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 17e31b25b..f70f0bcfe 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -11,9 +11,9 @@ import ( ) // buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email -func buildColumnsUpdateFragment(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string { +func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, skipDeleteCol bool) string { var cols []string - for _, column := range c.GetColumns() { + for _, column := range columns { if column.ShouldSkip() { continue } diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index 8833246c1..9b836571e 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -14,7 +14,7 @@ import ( func TestBuildColumnsUpdateFragment(t *testing.T) { type testCase struct { name string - columns columns.Columns + columns []columns.Column expectedString string dialect sql.Dialect skipDeleteCol bool @@ -23,15 +23,15 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { fooBarCols := []string{"foo", "bar"} var ( - happyPathCols columns.Columns - stringAndToastCols columns.Columns - lastCaseColTypes columns.Columns - lastCaseEscapeTypes columns.Columns + happyPathCols []columns.Column + stringAndToastCols []columns.Column + lastCaseColTypes []columns.Column + lastCaseEscapeTypes []columns.Column ) for _, col := range fooBarCols { column := columns.NewColumn(col, typing.String) column.ToastColumn = false - happyPathCols.AddColumn(column) + happyPathCols = append(happyPathCols, column) } for _, col := range fooBarCols { var toastCol bool @@ -41,7 +41,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { column := columns.NewColumn(col, typing.String) column.ToastColumn = toastCol - stringAndToastCols.AddColumn(column) + stringAndToastCols = append(stringAndToastCols, column) } lastCaseCols := []string{"a1", "b2", "c3"} @@ -58,7 +58,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { column := columns.NewColumn(lastCaseCol, kd) column.ToastColumn = toast - lastCaseColTypes.AddColumn(column) + lastCaseColTypes = append(lastCaseColTypes, column) } lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} @@ -78,10 +78,10 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { column := columns.NewColumn(lastCaseColEsc, kd) column.ToastColumn = toast - lastCaseEscapeTypes.AddColumn(column) + lastCaseEscapeTypes = append(lastCaseEscapeTypes, column) } - lastCaseEscapeTypes.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) key := `{"key":"__debezium_unavailable_value"}` testCases := []testCase{ @@ -128,7 +128,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { } for _, _testCase := range testCases { - actualQuery := buildColumnsUpdateFragment(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) + actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index b76763423..b7ab2c656 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -128,7 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, false), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -166,7 +166,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, true), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), @@ -244,7 +244,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -270,7 +270,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -308,7 +308,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -335,7 +335,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ From bfe263d9ed90471ae9c5b3c5dca89b5d50c38769 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 12:24:32 -0700 Subject: [PATCH 03/12] [dml] Clean up `buildColumnsUpdateFragment` (#552) --- lib/destination/dml/columns.go | 29 ++++++++++++----------- lib/destination/dml/columns_toast_test.go | 16 ++++++------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index f70f0bcfe..843a8f310 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -25,12 +25,13 @@ func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, s colName := dialect.QuoteIdentifier(column.Name()) if column.ToastColumn { + var colValue string if column.KindDetails == typing.Struct { - cols = append(cols, processToastStructCol(colName, dialect)) + colValue = processToastStructCol(colName, dialect) } else { - cols = append(cols, processToastCol(colName, dialect)) + colValue = processToastCol(colName, dialect) } - + cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue)) } else { // This is to make it look like: objCol = cc.objCol cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) @@ -43,30 +44,30 @@ func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, s func processToastStructCol(colName string, dialect sql.Dialect) string { switch dialect.(type) { case sql.BigQueryDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, + return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) case sql.RedshiftDialect: - return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) case sql.MSSQLDialect: // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) default: // TODO: Change this to Snowflake and error out if the destKind isn't supported so we're explicit. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } } func processToastCol(colName string, dialect sql.Dialect) string { if _, ok := dialect.(sql.MSSQLDialect); ok { // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName, + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } else { - return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", - colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } } diff --git a/lib/destination/dml/columns_toast_test.go b/lib/destination/dml/columns_toast_test.go index ff98b7b4b..a98c9d879 100644 --- a/lib/destination/dml/columns_toast_test.go +++ b/lib/destination/dml/columns_toast_test.go @@ -8,15 +8,15 @@ import ( ) func TestProcessToastStructCol(t *testing.T) { - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{})) - assert.Equal(t, `foo= CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{})) - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{})) - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{})) } func TestProcessToastCol(t *testing.T) { - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) } From 6d61a58fc3fef7bf500a6d2306a7c3b679588efd Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 13:07:29 -0700 Subject: [PATCH 04/12] [dml] Factor out INSERT query building (#553) --- lib/destination/dml/columns_test.go | 14 +++++++ lib/destination/dml/columns_toast_test.go | 22 ----------- lib/destination/dml/merge.go | 47 +++++++++-------------- lib/destination/dml/merge_test.go | 16 ++++++++ 4 files changed, 49 insertions(+), 50 deletions(-) delete mode 100644 lib/destination/dml/columns_toast_test.go diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index 9b836571e..56181997e 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -132,3 +132,17 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } } + +func TestProcessToastStructCol(t *testing.T) { + assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{})) +} + +func TestProcessToastCol(t *testing.T) { + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) +} diff --git a/lib/destination/dml/columns_toast_test.go b/lib/destination/dml/columns_toast_test.go deleted file mode 100644 index a98c9d879..000000000 --- a/lib/destination/dml/columns_toast_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package dml - -import ( - "testing" - - "github.com/artie-labs/transfer/lib/sql" - "github.com/stretchr/testify/assert" -) - -func TestProcessToastStructCol(t *testing.T) { - assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{})) - assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{})) - assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{})) - assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{})) -} - -func TestProcessToastCol(t *testing.T) { - assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) - assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) - assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) - assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) -} diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index b7ab2c656..ffb1c0189 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -72,6 +72,23 @@ func removeDeleteColumnMarker(columns []string) ([]string, bool) { return columns, len(columns) != origLength } +func (m *MergeArgument) buildInsertQuery(columns, equalitySQLParts []string) string { + return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, + // insert into target (col1, col2, col3) + m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), + // SELECT cc.col1, cc.col2, ... FROM staging as CC + array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ + Vals: sql.QuoteIdentifiers(columns, m.Dialect), + Separator: ",", + Prefix: "cc.", + }), m.SubQuery, + // LEFT JOIN table on pk(s) + m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), + // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) + m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()), + ) +} + func (m *MergeArgument) GetParts() ([]string, error) { if err := m.Valid(); err != nil { return nil, err @@ -111,20 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { if m.SoftDelete { return []string{ // INSERT - fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, - // insert into target (col1, col2, col3) - m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), - // SELECT cc.col1, cc.col2, ... FROM staging as CC - array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: sql.QuoteIdentifiers(columns, m.Dialect), - Separator: ",", - Prefix: "cc.", - }), m.SubQuery, - // LEFT JOIN table on pk(s) - m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), - // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()), - ), + m.buildInsertQuery(columns, equalitySQLParts), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 @@ -149,20 +153,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { parts := []string{ // INSERT - fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, - // insert into target (col1, col2, col3) - m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), - // SELECT cc.col1, cc.col2, ... FROM staging as CC - array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: sql.QuoteIdentifiers(columns, m.Dialect), - Separator: ",", - Prefix: "cc.", - }), m.SubQuery, - // LEFT JOIN table on pk(s) - m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), - // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()), - ), + m.buildInsertQuery(columns, equalitySQLParts), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 0f0081bf3..bfb54f798 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -304,3 +304,19 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) { assert.Contains(t, mergeSQL, `"ID","GROUP","UPDATED_AT","START"`, mergeSQL) assert.Contains(t, mergeSQL, `cc."ID",cc."GROUP",cc."UPDATED_AT",cc."START"`, mergeSQL) } + +func TestBuildInsertQuery(t *testing.T) { + mergeArg := MergeArgument{ + TableID: MockTableIdentifier{"{TABLE_ID}"}, + SubQuery: "{SUB_QUERY}", + PrimaryKeys: []columns.Column{ + columns.NewColumn("col1", typing.Invalid), + columns.NewColumn("othercol", typing.Invalid), + }, + Dialect: sql.SnowflakeDialect{}, + } + assert.Equal(t, + `INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on {EQUALITY_PART_1} and {EQUALITY_PART_2} WHERE c."COL1" IS NULL;`, + mergeArg.buildInsertQuery([]string{"col1", "col2"}, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), + ) +} From 02b56ce3b2b5feae369d18bad9650188767b084d Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 13:56:53 -0700 Subject: [PATCH 05/12] [dml] Use `columns.Column[]` over `string[]` (#554) --- lib/destination/dml/columns.go | 8 +++++ lib/destination/dml/columns_test.go | 9 ++++++ lib/destination/dml/merge.go | 36 +++++++++++----------- lib/destination/dml/merge_test.go | 47 +++++++++++++++++------------ lib/typing/columns/columns.go | 22 ++++++++++++++ lib/typing/columns/columns_test.go | 47 +++++++++++++++++++++++++++++ 6 files changed, 131 insertions(+), 38 deletions(-) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 843a8f310..656390800 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -10,6 +10,14 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) +func quoteColumns(cols []columns.Column, dialect sql.Dialect) []string { + result := make([]string, len(cols)) + for i, col := range cols { + result[i] = dialect.QuoteIdentifier(col.Name()) + } + return result +} + // buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, skipDeleteCol bool) string { var cols []string diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index 56181997e..d3204a1e8 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -11,6 +11,15 @@ import ( "github.com/stretchr/testify/assert" ) +func TestQuoteColumns(t *testing.T) { + assert.Equal(t, []string{}, quoteColumns(nil, sql.BigQueryDialect{})) + assert.Equal(t, []string{}, quoteColumns(nil, sql.SnowflakeDialect{})) + + cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)} + assert.Equal(t, []string{"`a`", "`b`"}, quoteColumns(cols, sql.BigQueryDialect{})) + assert.Equal(t, []string{`"A"`, `"B"`}, quoteColumns(cols, sql.SnowflakeDialect{})) +} + func TestBuildColumnsUpdateFragment(t *testing.T) { type testCase struct { name string diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index ffb1c0189..d1e095ea9 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -66,19 +66,19 @@ func (m *MergeArgument) Valid() error { return nil } -func removeDeleteColumnMarker(columns []string) ([]string, bool) { - origLength := len(columns) - columns = slices.DeleteFunc(columns, func(col string) bool { return col == constants.DeleteColumnMarker }) - return columns, len(columns) != origLength +func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) { + origLength := len(cols) + cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker }) + return cols, len(cols) != origLength } -func (m *MergeArgument) buildInsertQuery(columns, equalitySQLParts []string) string { +func (m *MergeArgument) buildInsertQuery(columns []columns.Column, equalitySQLParts []string) string { return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) - m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), + m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(columns, m.Dialect), ","), // SELECT cc.col1, cc.col2, ... FROM staging as CC array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: sql.QuoteIdentifiers(columns, m.Dialect), + Vals: quoteColumns(columns, m.Dialect), Separator: ",", Prefix: "cc.", }), m.SubQuery, @@ -123,7 +123,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { equalitySQLParts = append(equalitySQLParts, equalitySQL) } - columns := m.Columns.GetColumnsToUpdate() + columns := m.Columns.ValidColumns() if m.SoftDelete { return []string{ @@ -226,7 +226,7 @@ func (m *MergeArgument) GetStatement() (string, error) { equalitySQLParts = append(equalitySQLParts, m.AdditionalEqualityStrings...) } - columns := m.Columns.GetColumnsToUpdate() + columns := m.Columns.ValidColumns() if m.SoftDelete { return fmt.Sprintf(` @@ -237,9 +237,9 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Update + Soft Deletion idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // Insert - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: sql.QuoteIdentifiers(columns, m.Dialect), + Vals: quoteColumns(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil @@ -263,9 +263,9 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Update m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // Insert - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: sql.QuoteIdentifiers(columns, m.Dialect), + Vals: quoteColumns(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil @@ -289,7 +289,7 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) { equalitySQLParts = append(equalitySQLParts, equalitySQL) } - columns := m.Columns.GetColumnsToUpdate() + columns := m.Columns.ValidColumns() if m.SoftDelete { return fmt.Sprintf(` @@ -301,9 +301,9 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, // Update + Soft Deletion idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // Insert - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: sql.QuoteIdentifiers(columns, m.Dialect), + Vals: quoteColumns(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil @@ -328,9 +328,9 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Update m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // Insert - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: sql.QuoteIdentifiers(columns, m.Dialect), + Vals: quoteColumns(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index bfb54f798..5a76abbfe 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -33,35 +33,40 @@ func (m MockTableIdentifier) FullyQualifiedName() string { } func TestRemoveDeleteColumnMarker(t *testing.T) { + col1 := columns.NewColumn("a", typing.Invalid) + col2 := columns.NewColumn("b", typing.Invalid) + col3 := columns.NewColumn("c", typing.Invalid) + deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid) + { - columns, removed := removeDeleteColumnMarker([]string{}) - assert.Empty(t, columns) + result, removed := removeDeleteColumnMarker([]columns.Column{}) + assert.Empty(t, result) assert.False(t, removed) } { - columns, removed := removeDeleteColumnMarker([]string{"a"}) - assert.Equal(t, []string{"a"}, columns) + result, removed := removeDeleteColumnMarker([]columns.Column{col1}) + assert.Equal(t, []columns.Column{col1}, result) assert.False(t, removed) } { - columns, removed := removeDeleteColumnMarker([]string{"a", "b"}) - assert.Equal(t, []string{"a", "b"}, columns) + result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2}) + assert.Equal(t, []columns.Column{col1, col2}, result) assert.False(t, removed) } { - columns, removed := removeDeleteColumnMarker([]string{constants.DeleteColumnMarker}) + result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol}) assert.True(t, removed) - assert.Empty(t, columns) + assert.Empty(t, result) } { - columns, removed := removeDeleteColumnMarker([]string{"a", constants.DeleteColumnMarker, "b"}) + result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2}) assert.True(t, removed) - assert.Equal(t, []string{"a", "b"}, columns) + assert.Equal(t, []columns.Column{col1, col2}, result) } { - columns, removed := removeDeleteColumnMarker([]string{"a", constants.DeleteColumnMarker, "b", constants.DeleteColumnMarker, "c"}) + result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3}) assert.True(t, removed) - assert.Equal(t, []string{"a", "b", "c"}, columns) + assert.Equal(t, []columns.Column{col1, col2, col3}, result) } } @@ -306,17 +311,19 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) { } func TestBuildInsertQuery(t *testing.T) { + cols := []columns.Column{ + columns.NewColumn("col1", typing.Invalid), + columns.NewColumn("col2", typing.Invalid), + } + mergeArg := MergeArgument{ - TableID: MockTableIdentifier{"{TABLE_ID}"}, - SubQuery: "{SUB_QUERY}", - PrimaryKeys: []columns.Column{ - columns.NewColumn("col1", typing.Invalid), - columns.NewColumn("othercol", typing.Invalid), - }, - Dialect: sql.SnowflakeDialect{}, + TableID: MockTableIdentifier{"{TABLE_ID}"}, + SubQuery: "{SUB_QUERY}", + PrimaryKeys: []columns.Column{cols[0], columns.NewColumn("othercol", typing.Invalid)}, + Dialect: sql.SnowflakeDialect{}, } assert.Equal(t, `INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on {EQUALITY_PART_1} and {EQUALITY_PART_2} WHERE c."COL1" IS NULL;`, - mergeArg.buildInsertQuery([]string{"col1", "col2"}, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), + mergeArg.buildInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), ) } diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 2855fd7a9..564e7ea37 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -166,6 +166,7 @@ func (c *Columns) GetColumn(name string) (Column, bool) { // GetColumnsToUpdate will filter all the `Invalid` columns so that we do not update it. // This is used mostly for the SQL MERGE queries. +// TODO: Replace all uses of [GetColumnsToUpdate] with [ValidColumns] func (c *Columns) GetColumnsToUpdate() []string { if c == nil { return []string{} @@ -186,6 +187,27 @@ func (c *Columns) GetColumnsToUpdate() []string { return cols } +// ValidColumns will filter all the `Invalid` columns so that we do not update them. +// This is used mostly for the SQL MERGE queries. +func (c *Columns) ValidColumns() []Column { + if c == nil { + return []Column{} + } + + c.RLock() + defer c.RUnlock() + + var cols []Column + for _, col := range c.columns { + if col.KindDetails == typing.Invalid { + continue + } + + cols = append(cols, col) + } + return cols +} + func (c *Columns) GetColumns() []Column { if c == nil { return []Column{} diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index cb785b875..82819ef75 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -2,6 +2,7 @@ package columns import ( "fmt" + "slices" "testing" "github.com/artie-labs/transfer/lib/ptr" @@ -184,6 +185,52 @@ func TestColumns_GetColumnsToUpdate(t *testing.T) { } } +func TestColumns_ValidColumns(t *testing.T) { + var happyPathCols = []Column{ + { + name: "hi", + KindDetails: typing.String, + }, + { + name: "bye", + KindDetails: typing.String, + }, + { + name: "start", + KindDetails: typing.String, + }, + } + + extraCols := happyPathCols + for i := 0; i < 100; i++ { + extraCols = append(extraCols, Column{ + name: fmt.Sprintf("hello_%v", i), + KindDetails: typing.Invalid, + }) + } + + testCases := []struct { + name string + cols []Column + expectedCols []Column + }{ + { + name: "happy path", + cols: happyPathCols, + expectedCols: slices.Clone(happyPathCols), + }, + { + name: "happy path + extra col", + cols: extraCols, + expectedCols: slices.Clone(happyPathCols), + }, + } + + for _, testCase := range testCases { + assert.Equal(t, testCase.expectedCols, (&Columns{columns: testCase.cols}).ValidColumns(), testCase.name) + } +} + func TestColumns_UpsertColumns(t *testing.T) { keys := []string{"a", "b", "c", "d", "e"} var cols Columns From 63d740037d97cdf0c33363291e3d64e55f5d4516 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 14:41:25 -0700 Subject: [PATCH 06/12] [dml] Reuse result of `ValidColumns` call (#555) --- lib/destination/dml/columns.go | 5 +---- lib/destination/dml/merge.go | 8 ++++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 656390800..4a3a9172f 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -19,13 +19,10 @@ func quoteColumns(cols []columns.Column, dialect sql.Dialect) []string { } // buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email +// NOTE: This should only be used with valid columns. func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, skipDeleteCol bool) string { var cols []string for _, column := range columns { - if column.ShouldSkip() { - continue - } - // skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete. if skipDeleteCol && column.Name() == constants.DeleteColumnMarker { continue diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index d1e095ea9..803225bd4 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -132,7 +132,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect, false), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -157,7 +157,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect, true), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), @@ -299,7 +299,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -326,7 +326,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ From ebcd7b49d7f673436b882659433dccd90086e72c Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 14:54:20 -0700 Subject: [PATCH 07/12] [dml] Remove `buildColumnsUpdateFragment` argument (#556) --- lib/destination/dml/columns.go | 7 +------ lib/destination/dml/columns_test.go | 12 +----------- lib/destination/dml/merge.go | 14 +++++++------- 3 files changed, 9 insertions(+), 24 deletions(-) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 4a3a9172f..09e563c30 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -20,14 +20,9 @@ func quoteColumns(cols []columns.Column, dialect sql.Dialect) []string { // buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email // NOTE: This should only be used with valid columns. -func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, skipDeleteCol bool) string { +func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string { var cols []string for _, column := range columns { - // skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete. - if skipDeleteCol && column.Name() == constants.DeleteColumnMarker { - continue - } - colName := dialect.QuoteIdentifier(column.Name()) if column.ToastColumn { var colValue string diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index d3204a1e8..934ded1ff 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -26,7 +26,6 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { columns []columns.Column expectedString string dialect sql.Dialect - skipDeleteCol bool } fooBarCols := []string{"foo", "bar"} @@ -118,26 +117,17 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { dialect: sql.BigQueryDialect{}, expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", }, - { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - dialect: sql.BigQueryDialect{}, - expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", - key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`"), - skipDeleteCol: true, - }, { name: "struct, string and toast string (bigquery) w/ reserved keywords", columns: lastCaseEscapeTypes, dialect: sql.BigQueryDialect{}, expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), - skipDeleteCol: false, }, } for _, _testCase := range testCases { - actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) + actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect) assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 803225bd4..9233b273c 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -43,7 +43,7 @@ func (m *MergeArgument) Valid() error { return fmt.Errorf("merge argument does not contain primary keys") } - if len(m.Columns.GetColumns()) == 0 { + if len(m.Columns.ValidColumns()) == 0 { return fmt.Errorf("columns cannot be empty") } @@ -132,7 +132,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect, false), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -157,7 +157,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect, true), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), @@ -235,7 +235,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -261,7 +261,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -299,7 +299,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -326,7 +326,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ From 92e4eafd7b05ca30f9b0ee8c5d28ea88b2a38ed3 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 15:16:28 -0700 Subject: [PATCH 08/12] [dml] Lookup primary key columns outside of `MergeArgument` (#557) --- clients/shared/merge.go | 15 +++++++++++++-- lib/destination/dml/merge.go | 6 +----- lib/destination/dml/merge_bigquery_test.go | 5 +++-- lib/optimization/table_data.go | 9 ++------- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 7b3f06927..4c462193e 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -119,12 +119,23 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg subQuery = fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, temporaryTableName) } + cols := tableData.ReadOnlyInMemoryCols() + + var primaryKeys []columns.Column + for _, primaryKey := range tableData.PrimaryKeys() { + column, ok := cols.GetColumn(primaryKey) + if !ok { + return fmt.Errorf("column for primary key %q does not exist", primaryKey) + } + primaryKeys = append(primaryKeys, column) + } + mergeArg := dml.MergeArgument{ TableID: tableID, SubQuery: subQuery, IdempotentKey: tableData.TopicConfig().IdempotentKey, - PrimaryKeys: tableData.PrimaryKeys(), - Columns: tableData.ReadOnlyInMemoryCols(), + PrimaryKeys: primaryKeys, + Columns: cols, SoftDelete: tableData.TopicConfig().SoftDelete, DestKind: dwh.Label(), Dialect: dwh.Dialect(), diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 9233b273c..aa5e58038 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -204,12 +204,8 @@ func (m *MergeArgument) GetStatement() (string, error) { quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name()) equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey) - pkCol, isOk := m.Columns.GetColumn(primaryKey.Name()) - if !isOk { - return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.Name(), m.Columns) - } - if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind { + if m.DestKind == constants.BigQuery && primaryKey.KindDetails.Kind == typing.Struct.Kind { // BigQuery requires special casting to compare two JSON objects. equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", quotedPrimaryKey, quotedPrimaryKey) } diff --git a/lib/destination/dml/merge_bigquery_test.go b/lib/destination/dml/merge_bigquery_test.go index 355c8229e..7d031e6f3 100644 --- a/lib/destination/dml/merge_bigquery_test.go +++ b/lib/destination/dml/merge_bigquery_test.go @@ -33,15 +33,16 @@ func TestMergeStatement_TempTable(t *testing.T) { } func TestMergeStatement_JSONKey(t *testing.T) { + orderOIDCol := columns.NewColumn("order_oid", typing.Struct) var cols columns.Columns - cols.AddColumn(columns.NewColumn("order_oid", typing.Struct)) + cols.AddColumn(orderOIDCol) cols.AddColumn(columns.NewColumn("name", typing.String)) cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) mergeArg := &MergeArgument{ TableID: MockTableIdentifier{"customers.orders"}, SubQuery: "customers.orders_tmp", - PrimaryKeys: []columns.Column{columns.NewColumn("order_oid", typing.Invalid)}, + PrimaryKeys: []columns.Column{orderOIDCol}, Columns: &cols, DestKind: constants.BigQuery, Dialect: sql.BigQueryDialect{}, diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index 09017077e..79b960a92 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -66,13 +66,8 @@ func (t *TableData) ContainOtherOperations() bool { return t.containOtherOperations } -func (t *TableData) PrimaryKeys() []columns.Column { - var pks []columns.Column - for _, pk := range t.primaryKeys { - pks = append(pks, columns.NewColumn(pk, typing.Invalid)) - } - - return pks +func (t *TableData) PrimaryKeys() []string { + return t.primaryKeys } func (t *TableData) Name() string { From bf2dfe5f8416715e4a0668d5118c27f9208c0b64 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 15:23:39 -0700 Subject: [PATCH 09/12] [dml] Pass `[]columns.Column` in `MergeArgument` (#558) --- clients/shared/merge.go | 2 +- lib/destination/dml/merge.go | 40 ++++++++++------------ lib/destination/dml/merge_bigquery_test.go | 4 +-- lib/destination/dml/merge_mssql_test.go | 2 +- lib/destination/dml/merge_parts_test.go | 14 ++++---- lib/destination/dml/merge_test.go | 10 +++--- lib/destination/dml/merge_valid_test.go | 33 ++++++++++++------ 7 files changed, 57 insertions(+), 48 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 4c462193e..55062a16a 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -135,7 +135,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg SubQuery: subQuery, IdempotentKey: tableData.TopicConfig().IdempotentKey, PrimaryKeys: primaryKeys, - Columns: cols, + Columns: cols.ValidColumns(), SoftDelete: tableData.TopicConfig().SoftDelete, DestKind: dwh.Label(), Dialect: dwh.Dialect(), diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index aa5e58038..271ee33e1 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -24,7 +24,7 @@ type MergeArgument struct { AdditionalEqualityStrings []string // Columns will need to be escaped - Columns *columns.Columns + Columns []columns.Column DestKind constants.DestinationKind SoftDelete bool @@ -43,9 +43,14 @@ func (m *MergeArgument) Valid() error { return fmt.Errorf("merge argument does not contain primary keys") } - if len(m.Columns.ValidColumns()) == 0 { + if len(m.Columns) == 0 { return fmt.Errorf("columns cannot be empty") } + for _, column := range m.Columns { + if column.ShouldSkip() { + return fmt.Errorf("column %q is invalid and should be skipped", column.Name()) + } + } if m.TableID == nil { return fmt.Errorf("tableID cannot be nil") @@ -123,16 +128,14 @@ func (m *MergeArgument) GetParts() ([]string, error) { equalitySQLParts = append(equalitySQLParts, equalitySQL) } - columns := m.Columns.ValidColumns() - if m.SoftDelete { return []string{ // INSERT - m.buildInsertQuery(columns, equalitySQLParts), + m.buildInsertQuery(m.Columns, equalitySQLParts), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -140,8 +143,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { } // We also need to remove __artie flags since it does not exist in the destination table - var removed bool - columns, removed = removeDeleteColumnMarker(columns) + columns, removed := removeDeleteColumnMarker(m.Columns) if !removed { return nil, errors.New("artie delete flag doesn't exist") } @@ -222,8 +224,6 @@ func (m *MergeArgument) GetStatement() (string, error) { equalitySQLParts = append(equalitySQLParts, m.AdditionalEqualityStrings...) } - columns := m.Columns.ValidColumns() - if m.SoftDelete { return fmt.Sprintf(` MERGE INTO %s c USING %s AS cc ON %s @@ -231,19 +231,18 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect), + idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect), // Insert - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(m.Columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: quoteColumns(columns, m.Dialect), + Vals: quoteColumns(m.Columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil } // We also need to remove __artie flags since it does not exist in the destination table - var removed bool - columns, removed = removeDeleteColumnMarker(columns) + columns, removed := removeDeleteColumnMarker(m.Columns) if !removed { return "", errors.New("artie delete flag doesn't exist") } @@ -285,8 +284,6 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) { equalitySQLParts = append(equalitySQLParts, equalitySQL) } - columns := m.Columns.ValidColumns() - if m.SoftDelete { return fmt.Sprintf(` MERGE INTO %s c @@ -295,19 +292,18 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(columns, m.Dialect), + idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect), // Insert - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(columns, m.Dialect), ","), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(quoteColumns(m.Columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: quoteColumns(columns, m.Dialect), + Vals: quoteColumns(m.Columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil } // We also need to remove __artie flags since it does not exist in the destination table - var removed bool - columns, removed = removeDeleteColumnMarker(columns) + columns, removed := removeDeleteColumnMarker(m.Columns) if !removed { return "", errors.New("artie delete flag doesn't exist") } diff --git a/lib/destination/dml/merge_bigquery_test.go b/lib/destination/dml/merge_bigquery_test.go index 7d031e6f3..1abeac2c0 100644 --- a/lib/destination/dml/merge_bigquery_test.go +++ b/lib/destination/dml/merge_bigquery_test.go @@ -20,7 +20,7 @@ func TestMergeStatement_TempTable(t *testing.T) { TableID: MockTableIdentifier{"customers.orders"}, SubQuery: "customers.orders_tmp", PrimaryKeys: []columns.Column{columns.NewColumn("order_id", typing.Invalid)}, - Columns: &cols, + Columns: cols.ValidColumns(), DestKind: constants.BigQuery, Dialect: sql.BigQueryDialect{}, SoftDelete: false, @@ -43,7 +43,7 @@ func TestMergeStatement_JSONKey(t *testing.T) { TableID: MockTableIdentifier{"customers.orders"}, SubQuery: "customers.orders_tmp", PrimaryKeys: []columns.Column{orderOIDCol}, - Columns: &cols, + Columns: cols.ValidColumns(), DestKind: constants.BigQuery, Dialect: sql.BigQueryDialect{}, SoftDelete: false, diff --git a/lib/destination/dml/merge_mssql_test.go b/lib/destination/dml/merge_mssql_test.go index 15613eb7f..67d5137e4 100644 --- a/lib/destination/dml/merge_mssql_test.go +++ b/lib/destination/dml/merge_mssql_test.go @@ -45,7 +45,7 @@ func Test_GetMSSQLStatement(t *testing.T) { SubQuery: subQuery, IdempotentKey: "", PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, - Columns: &_cols, + Columns: _cols.ValidColumns(), DestKind: constants.MSSQL, Dialect: sql.MSSQLDialect{}, SoftDelete: false, diff --git a/lib/destination/dml/merge_parts_test.go b/lib/destination/dml/merge_parts_test.go index de08ebbc3..22a35bbfd 100644 --- a/lib/destination/dml/merge_parts_test.go +++ b/lib/destination/dml/merge_parts_test.go @@ -71,7 +71,7 @@ func TestMergeStatementParts_SkipDelete(t *testing.T) { TableID: MockTableIdentifier{fqTableName}, SubQuery: tempTableName, PrimaryKeys: res.PrimaryKeys, - Columns: &res.ColumnsToTypes, + Columns: res.ColumnsToTypes.ValidColumns(), DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(false), @@ -98,7 +98,7 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) { TableID: MockTableIdentifier{fqTableName}, SubQuery: tempTableName, PrimaryKeys: res.PrimaryKeys, - Columns: &res.ColumnsToTypes, + Columns: res.ColumnsToTypes.ValidColumns(), DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, SoftDelete: true, @@ -138,7 +138,7 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) { TableID: MockTableIdentifier{fqTableName}, SubQuery: tempTableName, PrimaryKeys: res.PrimaryKeys, - Columns: &res.ColumnsToTypes, + Columns: res.ColumnsToTypes.ValidColumns(), DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, SoftDelete: true, @@ -181,7 +181,7 @@ func TestMergeStatementParts(t *testing.T) { TableID: MockTableIdentifier{fqTableName}, SubQuery: tempTableName, PrimaryKeys: res.PrimaryKeys, - Columns: &res.ColumnsToTypes, + Columns: res.ColumnsToTypes.ValidColumns(), DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(true), @@ -207,7 +207,7 @@ func TestMergeStatementParts(t *testing.T) { TableID: MockTableIdentifier{fqTableName}, SubQuery: tempTableName, PrimaryKeys: res.PrimaryKeys, - Columns: &res.ColumnsToTypes, + Columns: res.ColumnsToTypes.ValidColumns(), DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, IdempotentKey: "created_at", @@ -239,7 +239,7 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) { TableID: MockTableIdentifier{fqTableName}, SubQuery: tempTableName, PrimaryKeys: res.PrimaryKeys, - Columns: &res.ColumnsToTypes, + Columns: res.ColumnsToTypes.ValidColumns(), DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(true), @@ -265,7 +265,7 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) { TableID: MockTableIdentifier{fqTableName}, SubQuery: tempTableName, PrimaryKeys: res.PrimaryKeys, - Columns: &res.ColumnsToTypes, + Columns: res.ColumnsToTypes.ValidColumns(), DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(true), diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 5a76abbfe..3836e7282 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -100,7 +100,7 @@ func TestMergeStatementSoftDelete(t *testing.T) { SubQuery: subQuery, IdempotentKey: idempotentKey, PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, - Columns: &_cols, + Columns: _cols.ValidColumns(), DestKind: constants.Snowflake, Dialect: sql.SnowflakeDialect{}, SoftDelete: true, @@ -149,7 +149,7 @@ func TestMergeStatement(t *testing.T) { SubQuery: subQuery, IdempotentKey: "", PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, - Columns: &_cols, + Columns: _cols.ValidColumns(), DestKind: constants.Snowflake, Dialect: sql.SnowflakeDialect{}, SoftDelete: false, @@ -197,7 +197,7 @@ func TestMergeStatementIdempotentKey(t *testing.T) { SubQuery: subQuery, IdempotentKey: "updated_at", PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, - Columns: &_cols, + Columns: _cols.ValidColumns(), DestKind: constants.Snowflake, Dialect: sql.SnowflakeDialect{}, SoftDelete: false, @@ -242,7 +242,7 @@ func TestMergeStatementCompositeKey(t *testing.T) { columns.NewColumn("id", typing.Invalid), columns.NewColumn("another_id", typing.Invalid), }, - Columns: &_cols, + Columns: _cols.ValidColumns(), DestKind: constants.Snowflake, Dialect: sql.SnowflakeDialect{}, SoftDelete: false, @@ -291,7 +291,7 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) { columns.NewColumn("id", typing.Invalid), columns.NewColumn("group", typing.Invalid), }, - Columns: &_cols, + Columns: _cols.ValidColumns(), DestKind: constants.Snowflake, Dialect: sql.SnowflakeDialect{}, SoftDelete: false, diff --git a/lib/destination/dml/merge_valid_test.go b/lib/destination/dml/merge_valid_test.go index fc7114762..358f2fc45 100644 --- a/lib/destination/dml/merge_valid_test.go +++ b/lib/destination/dml/merge_valid_test.go @@ -17,10 +17,11 @@ func TestMergeArgument_Valid(t *testing.T) { columns.NewColumn("id", typing.Integer), } - var cols columns.Columns - cols.AddColumn(columns.NewColumn("id", typing.Integer)) - cols.AddColumn(columns.NewColumn("firstName", typing.String)) - cols.AddColumn(columns.NewColumn("lastName", typing.String)) + cols := []columns.Column{ + columns.NewColumn("id", typing.Integer), + columns.NewColumn("firstName", typing.String), + columns.NewColumn("lastName", typing.String), + } testCases := []struct { name string @@ -47,7 +48,7 @@ func TestMergeArgument_Valid(t *testing.T) { name: "pks, cols, colsTpTypes exists but no subquery or tableID", mergeArg: &MergeArgument{ PrimaryKeys: primaryKeys, - Columns: &cols, + Columns: cols, }, expectedErr: "tableID cannot be nil", }, @@ -55,7 +56,7 @@ func TestMergeArgument_Valid(t *testing.T) { name: "pks, cols, colsTpTypes, subquery exists but no tableID", mergeArg: &MergeArgument{ PrimaryKeys: primaryKeys, - Columns: &cols, + Columns: cols, SubQuery: "schema.tableName", }, expectedErr: "tableID cannot be nil", @@ -64,7 +65,7 @@ func TestMergeArgument_Valid(t *testing.T) { name: "pks, cols, colsTpTypes, tableID exists but no subquery", mergeArg: &MergeArgument{ PrimaryKeys: primaryKeys, - Columns: &cols, + Columns: cols, TableID: MockTableIdentifier{"schema.tableName"}, }, expectedErr: "subQuery cannot be empty", @@ -73,7 +74,7 @@ func TestMergeArgument_Valid(t *testing.T) { name: "missing dest kind", mergeArg: &MergeArgument{ PrimaryKeys: primaryKeys, - Columns: &cols, + Columns: cols, SubQuery: "schema.tableName", TableID: MockTableIdentifier{"schema.tableName"}, }, @@ -83,7 +84,7 @@ func TestMergeArgument_Valid(t *testing.T) { name: "missing dialect kind", mergeArg: &MergeArgument{ PrimaryKeys: primaryKeys, - Columns: &cols, + Columns: cols, SubQuery: "schema.tableName", TableID: MockTableIdentifier{"schema.tableName"}, DestKind: constants.BigQuery, @@ -94,12 +95,24 @@ func TestMergeArgument_Valid(t *testing.T) { name: "everything exists", mergeArg: &MergeArgument{ PrimaryKeys: primaryKeys, - Columns: &cols, + Columns: cols, + SubQuery: "schema.tableName", + TableID: MockTableIdentifier{"schema.tableName"}, + DestKind: constants.BigQuery, + Dialect: sql.BigQueryDialect{}, + }, + }, + { + name: "invalid column", + mergeArg: &MergeArgument{ + PrimaryKeys: primaryKeys, + Columns: []columns.Column{columns.NewColumn("id", typing.Invalid)}, SubQuery: "schema.tableName", TableID: MockTableIdentifier{"schema.tableName"}, DestKind: constants.BigQuery, Dialect: sql.BigQueryDialect{}, }, + expectedErr: `column "id" is invalid and should be skipped`, }, } From c639579a8272fa9524fbe7fdac21405addbb9728 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 15:34:05 -0700 Subject: [PATCH 10/12] [dml] Reuse `quoteColumns` for primary keys (#559) --- lib/destination/dml/merge.go | 15 +++++---------- lib/destination/dml/merge_test.go | 2 +- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 271ee33e1..d494e1f7e 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -77,7 +77,7 @@ func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) { return cols, len(cols) != origLength } -func (m *MergeArgument) buildInsertQuery(columns []columns.Column, equalitySQLParts []string) string { +func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equalitySQLParts []string) string { return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(columns, m.Dialect), ","), @@ -131,7 +131,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { if m.SoftDelete { return []string{ // INSERT - m.buildInsertQuery(m.Columns, equalitySQLParts), + m.buildRedshiftInsertQuery(m.Columns, equalitySQLParts), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 @@ -148,14 +148,9 @@ func (m *MergeArgument) GetParts() ([]string, error) { return nil, errors.New("artie delete flag doesn't exist") } - var pks []string - for _, pk := range m.PrimaryKeys { - pks = append(pks, m.Dialect.QuoteIdentifier(pk.Name())) - } - parts := []string{ // INSERT - m.buildInsertQuery(columns, equalitySQLParts), + m.buildRedshiftInsertQuery(columns, equalitySQLParts), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 @@ -170,10 +165,10 @@ func (m *MergeArgument) GetParts() ([]string, error) { // DELETE fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s as cc WHERE cc.%s = true);`, // DELETE from table where (pk_1, pk_2) - m.TableID.FullyQualifiedName(), strings.Join(pks, ","), + m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(m.PrimaryKeys, m.Dialect), ","), // IN (cc.pk_1, cc.pk_2) FROM staging array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: pks, + Vals: quoteColumns(m.PrimaryKeys, m.Dialect), Separator: ",", Prefix: "cc.", }), m.SubQuery, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 3836e7282..04453597b 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -324,6 +324,6 @@ func TestBuildInsertQuery(t *testing.T) { } assert.Equal(t, `INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on {EQUALITY_PART_1} and {EQUALITY_PART_2} WHERE c."COL1" IS NULL;`, - mergeArg.buildInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), + mergeArg.buildRedshiftInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), ) } From f71b8b2a129be6ec4a639207ce685ec1a6c24da3 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 15:46:48 -0700 Subject: [PATCH 11/12] [dml] Split out Redshift delete query (#560) --- lib/destination/dml/merge.go | 26 ++++++++++++++------------ lib/destination/dml/merge_test.go | 21 ++++++++++++++++++++- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index d494e1f7e..d694bbad9 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -94,6 +94,19 @@ func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equal ) } +func (m *MergeArgument) buildRedshiftDeleteQuery() string { + return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s as cc WHERE cc.%s = true);`, + // DELETE from table where (pk_1, pk_2) + m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(m.PrimaryKeys, m.Dialect), ","), + // IN (cc.pk_1, cc.pk_2) FROM staging + array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ + Vals: quoteColumns(m.PrimaryKeys, m.Dialect), + Separator: ",", + Prefix: "cc.", + }), m.SubQuery, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), + ) +} + func (m *MergeArgument) GetParts() ([]string, error) { if err := m.Valid(); err != nil { return nil, err @@ -161,18 +174,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { } if *m.ContainsHardDeletes { - parts = append(parts, - // DELETE - fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s as cc WHERE cc.%s = true);`, - // DELETE from table where (pk_1, pk_2) - m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(m.PrimaryKeys, m.Dialect), ","), - // IN (cc.pk_1, cc.pk_2) FROM staging - array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: quoteColumns(m.PrimaryKeys, m.Dialect), - Separator: ",", - Prefix: "cc.", - }), m.SubQuery, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), - )) + parts = append(parts, m.buildRedshiftDeleteQuery()) } return parts, nil diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 04453597b..72b292be2 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -310,7 +310,7 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) { assert.Contains(t, mergeSQL, `cc."ID",cc."GROUP",cc."UPDATED_AT",cc."START"`, mergeSQL) } -func TestBuildInsertQuery(t *testing.T) { +func TestBuildRedshiftInsertQuery(t *testing.T) { cols := []columns.Column{ columns.NewColumn("col1", typing.Invalid), columns.NewColumn("col2", typing.Invalid), @@ -327,3 +327,22 @@ func TestBuildInsertQuery(t *testing.T) { mergeArg.buildRedshiftInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), ) } + +func TestBuildRedshiftDeleteQuery(t *testing.T) { + cols := []columns.Column{ + columns.NewColumn("col1", typing.Invalid), + columns.NewColumn("col2", typing.Invalid), + columns.NewColumn("col3", typing.Invalid), + } + + mergeArg := MergeArgument{ + TableID: MockTableIdentifier{"{TABLE_ID}"}, + SubQuery: "{SUB_QUERY}", + PrimaryKeys: []columns.Column{cols[0], cols[1]}, + Dialect: sql.SnowflakeDialect{}, + } + assert.Equal(t, + `DELETE FROM {TABLE_ID} WHERE ("COL1","COL2") IN (SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc WHERE cc."__ARTIE_DELETE" = true);`, + mergeArg.buildRedshiftDeleteQuery(), + ) +} From c539bb939d80024014f3287be024c5bb40fb60c2 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 3 May 2024 15:49:59 -0700 Subject: [PATCH 12/12] [dml] Move `removeDeleteColumnMarker` (#561) --- lib/destination/dml/columns.go | 7 ++++++ lib/destination/dml/columns_test.go | 38 +++++++++++++++++++++++++++++ lib/destination/dml/merge.go | 7 ------ lib/destination/dml/merge_test.go | 38 ----------------------------- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 09e563c30..72c1de667 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -2,6 +2,7 @@ package dml import ( "fmt" + "slices" "strings" "github.com/artie-labs/transfer/lib/config/constants" @@ -18,6 +19,12 @@ func quoteColumns(cols []columns.Column, dialect sql.Dialect) []string { return result } +func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) { + origLength := len(cols) + cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker }) + return cols, len(cols) != origLength +} + // buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email // NOTE: This should only be used with valid columns. func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string { diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index 934ded1ff..679d215dd 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -20,6 +20,44 @@ func TestQuoteColumns(t *testing.T) { assert.Equal(t, []string{`"A"`, `"B"`}, quoteColumns(cols, sql.SnowflakeDialect{})) } +func TestRemoveDeleteColumnMarker(t *testing.T) { + col1 := columns.NewColumn("a", typing.Invalid) + col2 := columns.NewColumn("b", typing.Invalid) + col3 := columns.NewColumn("c", typing.Invalid) + deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid) + + { + result, removed := removeDeleteColumnMarker([]columns.Column{}) + assert.Empty(t, result) + assert.False(t, removed) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1}) + assert.Equal(t, []columns.Column{col1}, result) + assert.False(t, removed) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2}) + assert.Equal(t, []columns.Column{col1, col2}, result) + assert.False(t, removed) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol}) + assert.True(t, removed) + assert.Empty(t, result) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2}) + assert.True(t, removed) + assert.Equal(t, []columns.Column{col1, col2}, result) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3}) + assert.True(t, removed) + assert.Equal(t, []columns.Column{col1, col2, col3}, result) + } +} + func TestBuildColumnsUpdateFragment(t *testing.T) { type testCase struct { name string diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index d694bbad9..e3ccbf39f 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -3,7 +3,6 @@ package dml import ( "errors" "fmt" - "slices" "strings" "github.com/artie-labs/transfer/lib/array" @@ -71,12 +70,6 @@ func (m *MergeArgument) Valid() error { return nil } -func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) { - origLength := len(cols) - cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker }) - return cols, len(cols) != origLength -} - func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equalitySQLParts []string) string { return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 72b292be2..dc2195693 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -32,44 +32,6 @@ func (m MockTableIdentifier) FullyQualifiedName() string { return m.fqName } -func TestRemoveDeleteColumnMarker(t *testing.T) { - col1 := columns.NewColumn("a", typing.Invalid) - col2 := columns.NewColumn("b", typing.Invalid) - col3 := columns.NewColumn("c", typing.Invalid) - deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid) - - { - result, removed := removeDeleteColumnMarker([]columns.Column{}) - assert.Empty(t, result) - assert.False(t, removed) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1}) - assert.Equal(t, []columns.Column{col1}, result) - assert.False(t, removed) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2}) - assert.Equal(t, []columns.Column{col1, col2}, result) - assert.False(t, removed) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol}) - assert.True(t, removed) - assert.Empty(t, result) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2}) - assert.True(t, removed) - assert.Equal(t, []columns.Column{col1, col2}, result) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3}) - assert.True(t, removed) - assert.Equal(t, []columns.Column{col1, col2, col3}, result) - } -} - func TestMergeStatementSoftDelete(t *testing.T) { // No idempotent key fqTable := "database.schema.table"