diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 17e31b25b..f70f0bcfe 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -11,9 +11,9 @@ import ( ) // buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email -func buildColumnsUpdateFragment(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string { +func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, skipDeleteCol bool) string { var cols []string - for _, column := range c.GetColumns() { + for _, column := range columns { if column.ShouldSkip() { continue } diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index 8833246c1..9b836571e 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -14,7 +14,7 @@ import ( func TestBuildColumnsUpdateFragment(t *testing.T) { type testCase struct { name string - columns columns.Columns + columns []columns.Column expectedString string dialect sql.Dialect skipDeleteCol bool @@ -23,15 +23,15 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { fooBarCols := []string{"foo", "bar"} var ( - happyPathCols columns.Columns - stringAndToastCols columns.Columns - lastCaseColTypes columns.Columns - lastCaseEscapeTypes columns.Columns + happyPathCols []columns.Column + stringAndToastCols []columns.Column + lastCaseColTypes []columns.Column + lastCaseEscapeTypes []columns.Column ) for _, col := range fooBarCols { column := columns.NewColumn(col, typing.String) column.ToastColumn = false - happyPathCols.AddColumn(column) + happyPathCols = append(happyPathCols, column) } for _, col := range fooBarCols { var toastCol bool @@ -41,7 +41,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { column := columns.NewColumn(col, typing.String) column.ToastColumn = toastCol - stringAndToastCols.AddColumn(column) + stringAndToastCols = append(stringAndToastCols, column) } lastCaseCols := []string{"a1", "b2", "c3"} @@ -58,7 +58,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { column := columns.NewColumn(lastCaseCol, kd) column.ToastColumn = toast - lastCaseColTypes.AddColumn(column) + lastCaseColTypes = append(lastCaseColTypes, column) } lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} @@ -78,10 +78,10 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { column := columns.NewColumn(lastCaseColEsc, kd) column.ToastColumn = toast - lastCaseEscapeTypes.AddColumn(column) + lastCaseEscapeTypes = append(lastCaseEscapeTypes, column) } - lastCaseEscapeTypes.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) key := `{"key":"__debezium_unavailable_value"}` testCases := []testCase{ @@ -128,7 +128,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { } for _, _testCase := range testCases { - actualQuery := buildColumnsUpdateFragment(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) + actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol) assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index b76763423..b7ab2c656 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -128,7 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, false), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // FROM table (temp) WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, ), @@ -166,7 +166,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, true), + m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), @@ -244,7 +244,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -270,7 +270,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -308,7 +308,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false), + idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -335,7 +335,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{