Skip to content

Commit

Permalink
Remove subquery from other snowflake dialect tests
Browse files Browse the repository at this point in the history
  • Loading branch information
danafallon committed Jul 16, 2024
1 parent 315ace3 commit e3bfae8
Showing 1 changed file with 21 additions and 67 deletions.
88 changes: 21 additions & 67 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package dialect
import (
"fmt"
"sort"
"strings"
"testing"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
Expand Down Expand Up @@ -337,23 +335,12 @@ func TestSnowflakeDialect_BuildMergeQueries(t *testing.T) {
_cols.AddColumn(columns.NewColumn(col, colToTypes[col]))
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"",
[]columns.Column{columns.NewColumn("id", typing.Invalid)},
nil,
Expand All @@ -364,31 +351,20 @@ func TestSnowflakeDialect_BuildMergeQueries(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,start,__artie_delete from (values ('1', '456', 'foo', '2001-02-03 04:05:06 +0000 UTC', false),('2', 'bb', 'bar', '2001-02-03 04:05:06 +0000 UTC', false),('3', 'dd', 'world', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,bar,updated_at,start,__artie_delete) ) AS stg ON tgt."ID" = stg."ID"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "ID"=stg."ID","BAR"=stg."BAR","UPDATED_AT"=stg."UPDATED_AT","START"=stg."START"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","BAR","UPDATED_AT","START") VALUES (stg."ID",stg."BAR",stg."UPDATED_AT",stg."START");`, statements[0])
}

func TestSnowflakeDialect_BuildMergeQueries_IdempotentKey(t *testing.T) {
fqTable := "database.schema.table"
cols := []string{
"id",
"bar",
"updated_at",
constants.DeleteColumnMarker,
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%v', false)", "1", "456", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', false)", "2", "bb", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', false)", "3", "dd", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))
// cols := []string{
// "id",
// "bar",
// "updated_at",
// constants.DeleteColumnMarker,
// }

var _cols columns.Columns
_cols.AddColumn(columns.NewColumn("id", typing.String))
Expand All @@ -399,7 +375,7 @@ func TestSnowflakeDialect_BuildMergeQueries_IdempotentKey(t *testing.T) {

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"updated_at",
[]columns.Column{columns.NewColumn("id", typing.Invalid)},
nil,
Expand All @@ -410,32 +386,21 @@ func TestSnowflakeDialect_BuildMergeQueries_IdempotentKey(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03 04:05:06 +0000 UTC', false),('2', 'bb', '2001-02-03 04:05:06 +0000 UTC', false),('3', 'dd', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ID"=stg."ID"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID") VALUES (stg."ID");`, statements[0])
}

func TestSnowflakeDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
fqTable := "database.schema.table"
cols := []string{
"id",
"another_id",
"bar",
"updated_at",
constants.DeleteColumnMarker,
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "1", "3", "456", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "2", "2", "bb", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "3", "1", "dd", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))
// cols := []string{
// "id",
// "another_id",
// "bar",
// "updated_at",
// constants.DeleteColumnMarker,
// }

var _cols columns.Columns
_cols.AddColumn(columns.NewColumn("id", typing.String))
Expand All @@ -447,7 +412,7 @@ func TestSnowflakeDialect_BuildMergeQueries_CompositeKey(t *testing.T) {

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"updated_at",
[]columns.Column{
columns.NewColumn("id", typing.Invalid),
Expand All @@ -461,7 +426,7 @@ func TestSnowflakeDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,another_id,bar,updated_at,__artie_delete from (values ('1', '3', '456', '2001-02-03 04:05:06 +0000 UTC', false),('2', '2', 'bb', '2001-02-03 04:05:06 +0000 UTC', false),('3', '1', 'dd', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,another_id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" AND tgt."ANOTHER_ID" = stg."ANOTHER_ID"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID" AND tgt."ANOTHER_ID" = stg."ANOTHER_ID"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ID"=stg."ID","ANOTHER_ID"=stg."ANOTHER_ID"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","ANOTHER_ID") VALUES (stg."ID",stg."ANOTHER_ID");`, statements[0])
Expand All @@ -485,23 +450,12 @@ func TestSnowflakeDialect_BuildMergeQueries_EscapePrimaryKeys(t *testing.T) {
_cols.AddColumn(columns.NewColumn(col, colToTypes[col]))
}

dateValue := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
tableValues := []string{
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", dateValue.Round(0).UTC()),
fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", dateValue.Round(0).UTC()),
}

// select stg.foo, stg.bar from (values (12, 34), (44, 55)) as cc(foo, bar);
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns(fqTable)

statements, err := SnowflakeDialect{}.BuildMergeQueries(
fakeTableID,
subQuery,
fqTable,
"",
[]columns.Column{
columns.NewColumn("id", typing.Invalid),
Expand All @@ -515,7 +469,7 @@ func TestSnowflakeDialect_BuildMergeQueries_EscapePrimaryKeys(t *testing.T) {
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,group,updated_at,start,__artie_delete from (values ('1', '456', 'foo', '2001-02-03 04:05:06 +0000 UTC', false),('2', 'bb', 'bar', '2001-02-03 04:05:06 +0000 UTC', false),('3', 'dd', 'world', '2001-02-03 04:05:06 +0000 UTC', false)) as _tbl(id,group,updated_at,start,__artie_delete) ) AS stg ON tgt."ID" = stg."ID" AND tgt."GROUP" = stg."GROUP"
MERGE INTO database.schema.table tgt USING ( database.schema.table ) AS stg ON tgt."ID" = stg."ID" AND tgt."GROUP" = stg."GROUP"
WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "ID"=stg."ID","GROUP"=stg."GROUP","UPDATED_AT"=stg."UPDATED_AT","START"=stg."START"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","GROUP","UPDATED_AT","START") VALUES (stg."ID",stg."GROUP",stg."UPDATED_AT",stg."START");`, statements[0])
Expand Down

0 comments on commit e3bfae8

Please sign in to comment.