Skip to content

Commit

Permalink
[dml] Pass []Column instead of Columns
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 3, 2024
1 parent 6f690e5 commit f274f72
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
4 changes: 2 additions & 2 deletions lib/destination/dml/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
func buildColumnsUpdateFragment(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string {
func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, skipDeleteCol bool) string {
var cols []string
for _, column := range c.GetColumns() {
for _, column := range columns {
if column.ShouldSkip() {
continue
}
Expand Down
22 changes: 11 additions & 11 deletions lib/destination/dml/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestBuildColumnsUpdateFragment(t *testing.T) {
type testCase struct {
name string
columns columns.Columns
columns []columns.Column
expectedString string
dialect sql.Dialect
skipDeleteCol bool
Expand All @@ -23,15 +23,15 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {
fooBarCols := []string{"foo", "bar"}

var (
happyPathCols columns.Columns
stringAndToastCols columns.Columns
lastCaseColTypes columns.Columns
lastCaseEscapeTypes columns.Columns
happyPathCols []columns.Column
stringAndToastCols []columns.Column
lastCaseColTypes []columns.Column
lastCaseEscapeTypes []columns.Column
)
for _, col := range fooBarCols {
column := columns.NewColumn(col, typing.String)
column.ToastColumn = false
happyPathCols.AddColumn(column)
happyPathCols = append(happyPathCols, column)
}
for _, col := range fooBarCols {
var toastCol bool
Expand All @@ -41,7 +41,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {

column := columns.NewColumn(col, typing.String)
column.ToastColumn = toastCol
stringAndToastCols.AddColumn(column)
stringAndToastCols = append(stringAndToastCols, column)
}

lastCaseCols := []string{"a1", "b2", "c3"}
Expand All @@ -58,7 +58,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes.AddColumn(column)
lastCaseColTypes = append(lastCaseColTypes, column)
}

lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"}
Expand All @@ -78,10 +78,10 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {

column := columns.NewColumn(lastCaseColEsc, kd)
column.ToastColumn = toast
lastCaseEscapeTypes.AddColumn(column)
lastCaseEscapeTypes = append(lastCaseEscapeTypes, column)
}

lastCaseEscapeTypes.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

key := `{"key":"__debezium_unavailable_value"}`
testCases := []testCase{
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {
}

for _, _testCase := range testCases {
actualQuery := buildColumnsUpdateFragment(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol)
actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol)
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
12 changes: 6 additions & 6 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, false),
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false),
// FROM table (temp) WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause,
),
Expand Down Expand Up @@ -166,7 +166,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, true),
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
),
Expand Down Expand Up @@ -244,7 +244,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false),
idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand All @@ -270,7 +270,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// Delete
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down Expand Up @@ -308,7 +308,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false),
idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand All @@ -335,7 +335,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
// Delete
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down

0 comments on commit f274f72

Please sign in to comment.