Skip to content

Commit

Permalink
Clean up more snowflake dialect tests (#793)
Browse files Browse the repository at this point in the history
  • Loading branch information
danafallon authored Jul 16, 2024
1 parent 315ace3 commit 6009217
Showing 1 changed file with 45 additions and 112 deletions.
157 changes: 45 additions & 112 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package dialect
import (
"fmt"
"sort"
"strings"
"testing"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
Expand Down Expand Up @@ -258,25 +256,31 @@ func TestSnowflakeDialect_BuildIsNotToastValueExpression(t *testing.T) {
)
}

func buildColumns(colTypesMap map[string]typing.KindDetails) *columns.Columns {
colNames := []string{}
for colName := range colTypesMap {
colNames = append(colNames, colName)
}
// Sort the column names alphabetically to ensure deterministic order
sort.Strings(colNames)

var cols columns.Columns
for _, colName := range colNames {
cols.AddColumn(columns.NewColumn(colName, colTypesMap[colName]))
}

return &cols
}

func TestSnowflakeDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
// No idempotent key
fqTable := "database.schema.table"
cols := map[string]typing.KindDetails{
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"bar": typing.String,
"updated_at": typing.ETime,
constants.DeleteColumnMarker: typing.Boolean,
}

colNames := []string{}
for colName := range cols {
colNames = append(colNames, colName)
}
sort.Strings(colNames)
var _cols columns.Columns
for _, colName := range colNames {
_cols.AddColumn(columns.NewColumn(colName, cols[colName]))
}
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)
Expand Down Expand Up @@ -322,38 +326,20 @@ WHEN NOT MATCHED THEN INSERT ("__ARTIE_DELETE","BAR","ID","UPDATED_AT") VALUES (
func TestSnowflakeDialect_BuildMergeQueries(t *testing.T) {
// No idempotent key
fqTable := "database.schema.table"
colToTypes := map[string]typing.KindDetails{
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"bar": typing.String,
"updated_at": typing.String,
"start": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
}

// This feels a bit round about, but this is because iterating over a map is not deterministic.
cols := []string{"id", "bar", "updated_at", "start", constants.DeleteColumnMarker}
var _cols columns.Columns
for _, col := range cols {
_cols.AddColumn(columns.NewColumn(col, colToTypes[col]))
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"",
[]columns.Column{columns.NewColumn("id", typing.Invalid)},
nil,
Expand All @@ -364,42 +350,25 @@ func TestSnowflakeDialect_BuildMergeQueries(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,start,__artie_delete from (values ('1', '456', 'foo', '2001-02-03 04:05:06 +0000 UTC', false),('2', 'bb', 'bar', '2001-02-03 04:05:06 +0000 UTC', false),('3', 'dd', 'world', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,bar,updated_at,start,__artie_delete) ) AS stg ON tgt."ID" = stg."ID"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "ID"=stg."ID","BAR"=stg."BAR","UPDATED_AT"=stg."UPDATED_AT","START"=stg."START"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","BAR","UPDATED_AT","START") VALUES (stg."ID",stg."BAR",stg."UPDATED_AT",stg."START");`, statements[0])
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "BAR"=stg."BAR","ID"=stg."ID","START"=stg."START","UPDATED_AT"=stg."UPDATED_AT"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("BAR","ID","START","UPDATED_AT") VALUES (stg."BAR",stg."ID",stg."START",stg."UPDATED_AT");`, statements[0])
}

func TestSnowflakeDialect_BuildMergeQueries_IdempotentKey(t *testing.T) {
fqTable := "database.schema.table"
cols := []string{
"id",
"bar",
"updated_at",
constants.DeleteColumnMarker,
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%v', false)", "1", "456", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', false)", "2", "bb", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', false)", "3", "dd", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

var _cols columns.Columns
_cols.AddColumn(columns.NewColumn("id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"updated_at",
[]columns.Column{columns.NewColumn("id", typing.Invalid)},
nil,
Expand All @@ -410,44 +379,26 @@ func TestSnowflakeDialect_BuildMergeQueries_IdempotentKey(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03 04:05:06 +0000 UTC', false),('2', 'bb', '2001-02-03 04:05:06 +0000 UTC', false),('3', 'dd', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ID"=stg."ID"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID") VALUES (stg."ID");`, statements[0])
}

func TestSnowflakeDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
fqTable := "database.schema.table"
cols := []string{
"id",
"another_id",
"bar",
"updated_at",
constants.DeleteColumnMarker,
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "1", "3", "456", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "2", "2", "bb", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "3", "1", "dd", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

var _cols columns.Columns
_cols.AddColumn(columns.NewColumn("id", typing.String))
_cols.AddColumn(columns.NewColumn("another_id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"another_id": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"updated_at",
[]columns.Column{
columns.NewColumn("id", typing.Invalid),
Expand All @@ -461,47 +412,29 @@ func TestSnowflakeDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,another_id,bar,updated_at,__artie_delete from (values ('1', '3', '456', '2001-02-03 04:05:06 +0000 UTC', false),('2', '2', 'bb', '2001-02-03 04:05:06 +0000 UTC', false),('3', '1', 'dd', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,another_id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" AND tgt."ANOTHER_ID" = stg."ANOTHER_ID"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID" AND tgt."ANOTHER_ID" = stg."ANOTHER_ID"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ID"=stg."ID","ANOTHER_ID"=stg."ANOTHER_ID"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","ANOTHER_ID") VALUES (stg."ID",stg."ANOTHER_ID");`, statements[0])
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ANOTHER_ID"=stg."ANOTHER_ID","ID"=stg."ID"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ANOTHER_ID","ID") VALUES (stg."ANOTHER_ID",stg."ID");`, statements[0])
}

func TestSnowflakeDialect_BuildMergeQueries_EscapePrimaryKeys(t *testing.T) {
// No idempotent key
fqTable := "database.schema.table"
colToTypes := map[string]typing.KindDetails{
_cols := buildColumns(map[string]typing.KindDetails{
"id": typing.String,
"group": typing.String,
"updated_at": typing.String,
"start": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
}

// This feels a bit round about, but this is because iterating over a map is not deterministic.
cols := []string{"id", "group", "updated_at", "start", constants.DeleteColumnMarker}
var _cols columns.Columns
for _, col := range cols {
_cols.AddColumn(columns.NewColumn(col, colToTypes[col]))
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))
})

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"",
[]columns.Column{
columns.NewColumn("id", typing.Invalid),
Expand All @@ -515,8 +448,8 @@ func TestSnowflakeDialect_BuildMergeQueries_EscapePrimaryKeys(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,group,updated_at,start,__artie_delete from (values ('1', '456', 'foo', '2001-02-03 04:05:06 +0000 UTC', false),('2', 'bb', 'bar', '2001-02-03 04:05:06 +0000 UTC', false),('3', 'dd', 'world', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,group,updated_at,start,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" AND tgt."GROUP" = stg."GROUP"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID" AND tgt."GROUP" = stg."GROUP"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "ID"=stg."ID","GROUP"=stg."GROUP","UPDATED_AT"=stg."UPDATED_AT","START"=stg."START"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","GROUP","UPDATED_AT","START") VALUES (stg."ID",stg."GROUP",stg."UPDATED_AT",stg."START");`, statements[0])
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "GROUP"=stg."GROUP","ID"=stg."ID","START"=stg."START","UPDATED_AT"=stg."UPDATED_AT"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("GROUP","ID","START","UPDATED_AT") VALUES (stg."GROUP",stg."ID",stg."START",stg."UPDATED_AT");`, statements[0])
}

0 comments on commit 6009217

Please sign in to comment.