Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dml] Pass []Column instead of Columns #551

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/destination/dml/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
func buildColumnsUpdateFragment(c *columns.Columns, dialect sql.Dialect, skipDeleteCol bool) string {
func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect, skipDeleteCol bool) string {
var cols []string
for _, column := range c.GetColumns() {
for _, column := range columns {
if column.ShouldSkip() {
continue
}
Expand Down
22 changes: 11 additions & 11 deletions lib/destination/dml/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestBuildColumnsUpdateFragment(t *testing.T) {
type testCase struct {
name string
columns columns.Columns
columns []columns.Column
expectedString string
dialect sql.Dialect
skipDeleteCol bool
Expand All @@ -23,15 +23,15 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {
fooBarCols := []string{"foo", "bar"}

var (
happyPathCols columns.Columns
stringAndToastCols columns.Columns
lastCaseColTypes columns.Columns
lastCaseEscapeTypes columns.Columns
happyPathCols []columns.Column
stringAndToastCols []columns.Column
lastCaseColTypes []columns.Column
lastCaseEscapeTypes []columns.Column
)
for _, col := range fooBarCols {
column := columns.NewColumn(col, typing.String)
column.ToastColumn = false
happyPathCols.AddColumn(column)
happyPathCols = append(happyPathCols, column)
}
for _, col := range fooBarCols {
var toastCol bool
Expand All @@ -41,7 +41,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {

column := columns.NewColumn(col, typing.String)
column.ToastColumn = toastCol
stringAndToastCols.AddColumn(column)
stringAndToastCols = append(stringAndToastCols, column)
}

lastCaseCols := []string{"a1", "b2", "c3"}
Expand All @@ -58,7 +58,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {

column := columns.NewColumn(lastCaseCol, kd)
column.ToastColumn = toast
lastCaseColTypes.AddColumn(column)
lastCaseColTypes = append(lastCaseColTypes, column)
}

lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"}
Expand All @@ -78,10 +78,10 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {

column := columns.NewColumn(lastCaseColEsc, kd)
column.ToastColumn = toast
lastCaseEscapeTypes.AddColumn(column)
lastCaseEscapeTypes = append(lastCaseEscapeTypes, column)
}

lastCaseEscapeTypes.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

key := `{"key":"__debezium_unavailable_value"}`
testCases := []testCase{
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {
}

for _, _testCase := range testCases {
actualQuery := buildColumnsUpdateFragment(&_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol)
actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect, _testCase.skipDeleteCol)
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}
12 changes: 6 additions & 6 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, false),
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false),
// FROM table (temp) WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause,
),
Expand Down Expand Up @@ -166,7 +166,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect, true),
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
),
Expand Down Expand Up @@ -244,7 +244,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false),
idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand All @@ -270,7 +270,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// Delete
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down Expand Up @@ -308,7 +308,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "),
// Update + Soft Deletion
idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, false),
idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, false),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand All @@ -335,7 +335,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
// Delete
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect, true),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(m.Columns.GetColumns(), m.Dialect, true),
// Insert
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Expand Down