diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 09e563c30..72c1de667 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -2,6 +2,7 @@ package dml import ( "fmt" + "slices" "strings" "github.com/artie-labs/transfer/lib/config/constants" @@ -18,6 +19,12 @@ func quoteColumns(cols []columns.Column, dialect sql.Dialect) []string { return result } +func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) { + origLength := len(cols) + cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker }) + return cols, len(cols) != origLength +} + // buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email // NOTE: This should only be used with valid columns. func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string { diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index 934ded1ff..679d215dd 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -20,6 +20,44 @@ func TestQuoteColumns(t *testing.T) { assert.Equal(t, []string{`"A"`, `"B"`}, quoteColumns(cols, sql.SnowflakeDialect{})) } +func TestRemoveDeleteColumnMarker(t *testing.T) { + col1 := columns.NewColumn("a", typing.Invalid) + col2 := columns.NewColumn("b", typing.Invalid) + col3 := columns.NewColumn("c", typing.Invalid) + deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid) + + { + result, removed := removeDeleteColumnMarker([]columns.Column{}) + assert.Empty(t, result) + assert.False(t, removed) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1}) + assert.Equal(t, []columns.Column{col1}, result) + assert.False(t, removed) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2}) + assert.Equal(t, []columns.Column{col1, col2}, result) + assert.False(t, removed) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol}) + assert.True(t, removed) + assert.Empty(t, result) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2}) + assert.True(t, removed) + assert.Equal(t, []columns.Column{col1, col2}, result) + } + { + result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3}) + assert.True(t, removed) + assert.Equal(t, []columns.Column{col1, col2, col3}, result) + } +} + func TestBuildColumnsUpdateFragment(t *testing.T) { type testCase struct { name string diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index d694bbad9..e3ccbf39f 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -3,7 +3,6 @@ package dml import ( "errors" "fmt" - "slices" "strings" "github.com/artie-labs/transfer/lib/array" @@ -71,12 +70,6 @@ func (m *MergeArgument) Valid() error { return nil } -func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) { - origLength := len(cols) - cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker }) - return cols, len(cols) != origLength -} - func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equalitySQLParts []string) string { return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 72b292be2..dc2195693 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -32,44 +32,6 @@ func (m MockTableIdentifier) FullyQualifiedName() string { return m.fqName } -func TestRemoveDeleteColumnMarker(t *testing.T) { - col1 := columns.NewColumn("a", typing.Invalid) - col2 := columns.NewColumn("b", typing.Invalid) - col3 := columns.NewColumn("c", typing.Invalid) - deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid) - - { - result, removed := removeDeleteColumnMarker([]columns.Column{}) - assert.Empty(t, result) - assert.False(t, removed) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1}) - assert.Equal(t, []columns.Column{col1}, result) - assert.False(t, removed) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2}) - assert.Equal(t, []columns.Column{col1, col2}, result) - assert.False(t, removed) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol}) - assert.True(t, removed) - assert.Empty(t, result) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2}) - assert.True(t, removed) - assert.Equal(t, []columns.Column{col1, col2}, result) - } - { - result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3}) - assert.True(t, removed) - assert.Equal(t, []columns.Column{col1, col2, col3}, result) - } -} - func TestMergeStatementSoftDelete(t *testing.T) { // No idempotent key fqTable := "database.schema.table"