Skip to content

Commit

Permalink
[dml] Move removeDeleteColumnMarker
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 3, 2024
1 parent c639579 commit 1e777fd
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 45 deletions.
7 changes: 7 additions & 0 deletions lib/destination/dml/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dml

import (
"fmt"
"slices"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
Expand All @@ -18,6 +19,12 @@ func quoteColumns(cols []columns.Column, dialect sql.Dialect) []string {
return result
}

func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) {
origLength := len(cols)
cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker })
return cols, len(cols) != origLength
}

// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
// NOTE: This should only be used with valid columns.
func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string {
Expand Down
38 changes: 38 additions & 0 deletions lib/destination/dml/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,44 @@ func TestQuoteColumns(t *testing.T) {
assert.Equal(t, []string{`"A"`, `"B"`}, quoteColumns(cols, sql.SnowflakeDialect{}))
}

func TestRemoveDeleteColumnMarker(t *testing.T) {
col1 := columns.NewColumn("a", typing.Invalid)
col2 := columns.NewColumn("b", typing.Invalid)
col3 := columns.NewColumn("c", typing.Invalid)
deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid)

{
result, removed := removeDeleteColumnMarker([]columns.Column{})
assert.Empty(t, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1})
assert.Equal(t, []columns.Column{col1}, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2})
assert.Equal(t, []columns.Column{col1, col2}, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol})
assert.True(t, removed)
assert.Empty(t, result)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2})
assert.True(t, removed)
assert.Equal(t, []columns.Column{col1, col2}, result)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3})
assert.True(t, removed)
assert.Equal(t, []columns.Column{col1, col2, col3}, result)
}
}

func TestBuildColumnsUpdateFragment(t *testing.T) {
type testCase struct {
name string
Expand Down
7 changes: 0 additions & 7 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dml
import (
"errors"
"fmt"
"slices"
"strings"

"github.com/artie-labs/transfer/lib/array"
Expand Down Expand Up @@ -71,12 +70,6 @@ func (m *MergeArgument) Valid() error {
return nil
}

func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) {
origLength := len(cols)
cols = slices.DeleteFunc(cols, func(col columns.Column) bool { return col.Name() == constants.DeleteColumnMarker })
return cols, len(cols) != origLength
}

func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equalitySQLParts []string) string {
return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`,
// insert into target (col1, col2, col3)
Expand Down
38 changes: 0 additions & 38 deletions lib/destination/dml/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,44 +32,6 @@ func (m MockTableIdentifier) FullyQualifiedName() string {
return m.fqName
}

func TestRemoveDeleteColumnMarker(t *testing.T) {
col1 := columns.NewColumn("a", typing.Invalid)
col2 := columns.NewColumn("b", typing.Invalid)
col3 := columns.NewColumn("c", typing.Invalid)
deleteColumnMarkerCol := columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid)

{
result, removed := removeDeleteColumnMarker([]columns.Column{})
assert.Empty(t, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1})
assert.Equal(t, []columns.Column{col1}, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, col2})
assert.Equal(t, []columns.Column{col1, col2}, result)
assert.False(t, removed)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{deleteColumnMarkerCol})
assert.True(t, removed)
assert.Empty(t, result)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2})
assert.True(t, removed)
assert.Equal(t, []columns.Column{col1, col2}, result)
}
{
result, removed := removeDeleteColumnMarker([]columns.Column{col1, deleteColumnMarkerCol, col2, deleteColumnMarkerCol, col3})
assert.True(t, removed)
assert.Equal(t, []columns.Column{col1, col2, col3}, result)
}
}

func TestMergeStatementSoftDelete(t *testing.T) {
// No idempotent key
fqTable := "database.schema.table"
Expand Down

0 comments on commit 1e777fd

Please sign in to comment.