From a628ac85ca477f629816ac6308c2eb5c6ddaee79 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 9 Oct 2024 21:53:05 -0700 Subject: [PATCH] [Databricks] Adding tests to merge. (#959) --- clients/databricks/dialect/dialect.go | 2 - clients/databricks/dialect/dialect_test.go | 162 +++++++++++++++++++++ 2 files changed, 162 insertions(+), 2 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 43a25b71d..841fced92 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -94,8 +94,6 @@ func (d DatabricksDialect) BuildMergeQueries( softDelete bool, _ bool, ) ([]string, error) { - // TODO: Add tests. - // Build the base equality condition for the MERGE query equalitySQLParts := sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, d) if len(additionalEqualityStrings) > 0 { diff --git a/clients/databricks/dialect/dialect_test.go b/clients/databricks/dialect/dialect_test.go index 35b6b078a..948724e54 100644 --- a/clients/databricks/dialect/dialect_test.go +++ b/clients/databricks/dialect/dialect_test.go @@ -2,8 +2,13 @@ package dialect import ( "fmt" + "sort" + "strings" "testing" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" "github.com/stretchr/testify/assert" @@ -122,5 +127,162 @@ func TestDatabricksDialect_BuildDedupeQueries(t *testing.T) { queries[1]) assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2]) } +} + +func buildColumns(colTypesMap map[string]typing.KindDetails) *columns.Columns { + var colNames []string + for colName := range colTypesMap { + colNames = append(colNames, colName) + } + // Sort the column names alphabetically to ensure deterministic order + sort.Strings(colNames) + + var cols columns.Columns + for _, colName := range colNames { + cols.AddColumn(columns.NewColumn(colName, colTypesMap[colName])) + } + + return &cols +} + +func TestDatabricksDialect_BuildMergeQueries_SoftDelete(t *testing.T) { + fqTable := "database.schema.table" + _cols := buildColumns(map[string]typing.KindDetails{ + "id": typing.String, + "bar": typing.String, + "updated_at": typing.ETime, + constants.DeleteColumnMarker: typing.Boolean, + constants.OnlySetDeleteColumnMarker: typing.Boolean, + }) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + + { + statements, err := DatabricksDialect{}.BuildMergeQueries( + fakeTableID, + fqTable, + []columns.Column{columns.NewColumn("id", typing.Invalid)}, + nil, + _cols.ValidColumns(), + true, + false, + ) + assert.Len(t, statements, 1) + assert.NoError(t, err) + assert.Equal(t, + []string{ + "MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id`", + "WHEN MATCHED AND IFNULL(stg.`__artie_only_set_delete`, false) = false THEN UPDATE SET `__artie_delete`=stg.`__artie_delete`,`bar`=stg.`bar`,`id`=stg.`id`,`updated_at`=stg.`updated_at`", + "WHEN MATCHED AND IFNULL(stg.`__artie_only_set_delete`, false) = true THEN UPDATE SET `__artie_delete`=stg.`__artie_delete`", + "WHEN NOT MATCHED THEN INSERT (`__artie_delete`,`bar`,`id`,`updated_at`) VALUES (stg.`__artie_delete`,stg.`bar`,stg.`id`,stg.`updated_at`);", + }, + strings.Split(strings.TrimSpace(statements[0]), "\n")) + } +} + +func TestDatabricksDialect_BuildMergeQueries(t *testing.T) { + fqTable := "database.schema.table" + _cols := buildColumns(map[string]typing.KindDetails{ + "id": typing.String, + "bar": typing.String, + "updated_at": typing.String, + "start": typing.String, + constants.DeleteColumnMarker: typing.Boolean, + constants.OnlySetDeleteColumnMarker: typing.Boolean, + }) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + + statements, err := DatabricksDialect{}.BuildMergeQueries( + fakeTableID, + fqTable, + []columns.Column{columns.NewColumn("id", typing.Invalid)}, + nil, + _cols.ValidColumns(), + false, + false, + ) + assert.Len(t, statements, 1) + assert.NoError(t, err) + assert.Equal(t, + []string{ + "MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id`", + "WHEN MATCHED AND stg.`__artie_delete` THEN DELETE", "WHEN MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN UPDATE SET `bar`=stg.`bar`,`id`=stg.`id`,`start`=stg.`start`,`updated_at`=stg.`updated_at`", + "WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`bar`,`id`,`start`,`updated_at`) VALUES (stg.`bar`,stg.`id`,stg.`start`,stg.`updated_at`);", + }, + strings.Split(strings.TrimSpace(statements[0]), "\n")) +} + +func TestDatabricksDialect_BuildMergeQueries_CompositeKey(t *testing.T) { + fqTable := "database.schema.table" + _cols := buildColumns(map[string]typing.KindDetails{ + "id": typing.String, + "another_id": typing.String, + constants.DeleteColumnMarker: typing.Boolean, + constants.OnlySetDeleteColumnMarker: typing.Boolean, + }) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + + statements, err := DatabricksDialect{}.BuildMergeQueries( + fakeTableID, + fqTable, + []columns.Column{ + columns.NewColumn("id", typing.Invalid), + columns.NewColumn("another_id", typing.Invalid), + }, + nil, + _cols.ValidColumns(), + false, + false, + ) + assert.Len(t, statements, 1) + assert.NoError(t, err) + assert.Equal(t, + []string{ + "MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id` AND tgt.`another_id` = stg.`another_id`", + "WHEN MATCHED AND stg.`__artie_delete` THEN DELETE", "WHEN MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN UPDATE SET `another_id`=stg.`another_id`,`id`=stg.`id`", + "WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`another_id`,`id`) VALUES (stg.`another_id`,stg.`id`);", + }, + strings.Split(strings.TrimSpace(statements[0]), "\n")) +} + +func TestDatabricksDialect_BuildMergeQueries_EscapePrimaryKeys(t *testing.T) { + fqTable := "database.schema.table" + _cols := buildColumns(map[string]typing.KindDetails{ + "id": typing.String, + "group": typing.String, + "updated_at": typing.String, + "start": typing.String, + constants.DeleteColumnMarker: typing.Boolean, + constants.OnlySetDeleteColumnMarker: typing.Boolean, + }) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + statements, err := DatabricksDialect{}.BuildMergeQueries( + fakeTableID, + fqTable, + []columns.Column{ + columns.NewColumn("id", typing.Invalid), + columns.NewColumn("group", typing.Invalid), + }, + nil, + _cols.ValidColumns(), + false, + false, + ) + assert.Len(t, statements, 1) + assert.NoError(t, err) + assert.Equal(t, + []string{ + "MERGE INTO database.schema.table tgt USING database.schema.table stg ON tgt.`id` = stg.`id` AND tgt.`group` = stg.`group`", + "WHEN MATCHED AND stg.`__artie_delete` THEN DELETE", "WHEN MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN UPDATE SET `group`=stg.`group`,`id`=stg.`id`,`start`=stg.`start`,`updated_at`=stg.`updated_at`", + "WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`group`,`id`,`start`,`updated_at`) VALUES (stg.`group`,stg.`id`,stg.`start`,stg.`updated_at`);", + }, + strings.Split(strings.TrimSpace(statements[0]), "\n")) }