diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 62e564f25..428154877 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -119,7 +119,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.DestKind, m.Dialect, false), + m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, false), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -163,7 +163,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.DestKind, m.Dialect, true), + m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, true), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, constants.DeleteColumnMarker, ), @@ -239,7 +239,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, false), + idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), // Insert constants.DeleteColumnMarker, strings.Join(cols, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -272,7 +272,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete constants.DeleteColumnMarker, // Update - constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, true), + constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), // Insert constants.DeleteColumnMarker, strings.Join(cols, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -309,7 +309,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, false), + idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), // Insert constants.DeleteColumnMarker, strings.Join(cols, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -343,7 +343,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete constants.DeleteColumnMarker, // Update - constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.DestKind, m.Dialect, true), + constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), // Insert constants.DeleteColumnMarker, strings.Join(cols, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 386d74ea8..e9dac6272 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -257,7 +257,7 @@ func (c *Columns) DeleteColumn(name string) { } // UpdateQuery will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email -func (c *Columns) UpdateQuery(destKind constants.DestinationKind, dialect sql.Dialect, skipDeleteCol bool) string { +func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string { var cols []string for _, column := range c.GetColumns() { if column.ShouldSkip() { @@ -272,9 +272,9 @@ func (c *Columns) UpdateQuery(destKind constants.DestinationKind, dialect sql.Di colName := column.Name(dialect) if column.ToastColumn { if column.KindDetails == typing.Struct { - cols = append(cols, processToastStructCol(colName, destKind)) + cols = append(cols, processToastStructCol(colName, dialect)) } else { - cols = append(cols, processToastCol(colName, destKind)) + cols = append(cols, processToastCol(colName, dialect)) } } else { @@ -286,16 +286,16 @@ func (c *Columns) UpdateQuery(destKind constants.DestinationKind, dialect sql.Di return strings.Join(cols, ",") } -func processToastStructCol(colName string, destKind constants.DestinationKind) string { - switch destKind { - case constants.BigQuery: +func processToastStructCol(colName string, dialect sql.Dialect) string { + switch dialect.(type) { + case sql.BigQueryDialect: return fmt.Sprintf(`%s= CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - case constants.Redshift: + case sql.RedshiftDialect: return fmt.Sprintf(`%s= CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) - case constants.MSSQL: + case sql.MSSQLDialect: // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) @@ -306,8 +306,8 @@ func processToastStructCol(colName string, destKind constants.DestinationKind) s } } -func processToastCol(colName string, destKind constants.DestinationKind) string { - if destKind == constants.MSSQL { +func processToastCol(colName string, dialect sql.Dialect) string { + if _, ok := dialect.(sql.MSSQLDialect); ok { // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. return fmt.Sprintf("%s= CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, colName, constants.ToastUnavailableValuePlaceholder, colName, colName) diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index a4d162687..95ec25c50 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -481,45 +481,39 @@ func TestColumnsUpdateQuery(t *testing.T) { { name: "happy path", columns: happyPathCols, - destKind: constants.Redshift, dialect: sql.RedshiftDialect{}, expectedString: `"foo"=cc."foo","bar"=cc."bar"`, }, { name: "string and toast", columns: stringAndToastCols, - destKind: constants.Snowflake, dialect: sql.SnowflakeDialect{UppercaseEscNames: true}, expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, }, { name: "struct, string and toast string", columns: lastCaseColTypes, - destKind: constants.Redshift, dialect: sql.RedshiftDialect{}, expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`, }, { name: "struct, string and toast string (bigquery)", columns: lastCaseColTypes, - destKind: constants.BigQuery, dialect: sql.BigQueryDialect{}, expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", }, { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - destKind: constants.BigQuery, - dialect: sql.BigQueryDialect{}, + name: "struct, string and toast string (bigquery) w/ reserved keywords", + columns: lastCaseEscapeTypes, + dialect: sql.BigQueryDialect{}, expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`"), skipDeleteCol: true, }, { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - destKind: constants.BigQuery, - dialect: sql.BigQueryDialect{}, + name: "struct, string and toast string (bigquery) w/ reserved keywords", + columns: lastCaseEscapeTypes, + dialect: sql.BigQueryDialect{}, expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), skipDeleteCol: false, @@ -527,7 +521,7 @@ func TestColumnsUpdateQuery(t *testing.T) { } for _, _testCase := range testCases { - actualQuery := _testCase.columns.UpdateQuery(_testCase.destKind, _testCase.dialect, _testCase.skipDeleteCol) + actualQuery := _testCase.columns.UpdateQuery(_testCase.dialect, _testCase.skipDeleteCol) assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } } diff --git a/lib/typing/columns/columns_toast_test.go b/lib/typing/columns/columns_toast_test.go index 6056eb5f5..47ac0fa00 100644 --- a/lib/typing/columns/columns_toast_test.go +++ b/lib/typing/columns/columns_toast_test.go @@ -3,20 +3,20 @@ package columns import ( "testing" - "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/sql" "github.com/stretchr/testify/assert" ) func TestProcessToastStructCol(t *testing.T) { - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.Redshift)) - assert.Equal(t, `foo= CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.BigQuery)) - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.Snowflake)) - assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", constants.MSSQL)) + assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.RedshiftDialect{})) + assert.Equal(t, `foo= CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.BigQueryDialect{})) + assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.SnowflakeDialect{})) + assert.Equal(t, `foo= CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, processToastStructCol("foo", sql.MSSQLDialect{})) } func TestProcessToastCol(t *testing.T) { - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.Redshift)) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.BigQuery)) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.Snowflake)) - assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", constants.MSSQL)) + assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) + assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) + assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) + assert.Equal(t, `bar= CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) }