Skip to content

Commit

Permalink
Adding tests to merge.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 9, 2024
1 parent 48414e4 commit 1704857
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 2 deletions.
2 changes: 0 additions & 2 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ func (d DatabricksDialect) BuildMergeQueries(
softDelete bool,
_ bool,
) ([]string, error) {
// TODO: Add tests.

// Build the base equality condition for the MERGE query
equalitySQLParts := sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, d)
if len(additionalEqualityStrings) > 0 {
Expand Down
162 changes: 162 additions & 0 deletions clients/databricks/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package dialect

import (
"fmt"
"sort"
"strings"
"testing"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -122,5 +127,162 @@ func TestDatabricksDialect_BuildDedupeQueries(t *testing.T) {
queries[1])
assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2])
}
}

func buildColumns(colTypesMap map[string]typing.KindDetails) *columns.Columns {
var colNames []string
for colName := range colTypesMap {
colNames = append(colNames, colName)
}
// Sort the column names alphabetically to ensure deterministic order
sort.Strings(colNames)

var cols columns.Columns
for _, colName := range colNames {
cols.AddColumn(columns.NewColumn(colName, colTypesMap[colName]))
}

return &cols
}

func TestDatabricksDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
fqTable := "database.schema.table"
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"bar": typing.String,
"updated_at": typing.ETime,
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

{
statements, err := DatabricksDialect{}.BuildMergeQueries(
fakeTableID,
fqTable,
[]columns.Column{columns.NewColumn("id", typing.Invalid)},
nil,
_cols.ValidColumns(),
true,
false,
)
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t,
[]string{
"MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id`",
"WHEN MATCHED AND IFNULL(stg.`__artie_only_set_delete`, false) = false THEN UPDATE SET `__artie_delete`=stg.`__artie_delete`,`bar`=stg.`bar`,`id`=stg.`id`,`updated_at`=stg.`updated_at`",
"WHEN MATCHED AND IFNULL(stg.`__artie_only_set_delete`, false) = true THEN UPDATE SET `__artie_delete`=stg.`__artie_delete`",
"WHEN NOT MATCHED THEN INSERT (`__artie_delete`,`bar`,`id`,`updated_at`) VALUES (stg.`__artie_delete`,stg.`bar`,stg.`id`,stg.`updated_at`);",
},
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}
}

func TestDatabricksDialect_BuildMergeQueries(t *testing.T) {
fqTable := "database.schema.table"
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"bar": typing.String,
"updated_at": typing.String,
"start": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := DatabricksDialect{}.BuildMergeQueries(
fakeTableID,
fqTable,
[]columns.Column{columns.NewColumn("id", typing.Invalid)},
nil,
_cols.ValidColumns(),
false,
false,
)
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t,
[]string{
"MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id`",
"WHEN MATCHED AND stg.`__artie_delete` THEN DELETE", "WHEN MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN UPDATE SET `bar`=stg.`bar`,`id`=stg.`id`,`start`=stg.`start`,`updated_at`=stg.`updated_at`",
"WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`bar`,`id`,`start`,`updated_at`) VALUES (stg.`bar`,stg.`id`,stg.`start`,stg.`updated_at`);",
},
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}

func TestDatabricksDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
fqTable := "database.schema.table"
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"another_id": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := DatabricksDialect{}.BuildMergeQueries(
fakeTableID,
fqTable,
[]columns.Column{
columns.NewColumn("id", typing.Invalid),
columns.NewColumn("another_id", typing.Invalid),
},
nil,
_cols.ValidColumns(),
false,
false,
)
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t,
[]string{
"MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id` AND tgt.`another_id` = stg.`another_id`",
"WHEN MATCHED AND stg.`__artie_delete` THEN DELETE", "WHEN MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN UPDATE SET `another_id`=stg.`another_id`,`id`=stg.`id`",
"WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`another_id`,`id`) VALUES (stg.`another_id`,stg.`id`);",
},
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}

func TestDatabricksDialect_BuildMergeQueries_EscapePrimaryKeys(t *testing.T) {
fqTable := "database.schema.table"
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"group": typing.String,
"updated_at": typing.String,
"start": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := DatabricksDialect{}.BuildMergeQueries(
fakeTableID,
fqTable,
[]columns.Column{
columns.NewColumn("id", typing.Invalid),
columns.NewColumn("group", typing.Invalid),
},
nil,
_cols.ValidColumns(),
false,
false,
)
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t,
[]string{
"MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id` AND tgt.`group` = stg.`group`",
"WHEN MATCHED AND stg.`__artie_delete` THEN DELETE", "WHEN MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN UPDATE SET `group`=stg.`group`,`id`=stg.`id`,`start`=stg.`start`,`updated_at`=stg.`updated_at`",
"WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`group`,`id`,`start`,`updated_at`) VALUES (stg.`group`,stg.`id`,stg.`start`,stg.`updated_at`);",
},
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}

0 comments on commit 1704857

Please sign in to comment.