Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 8, 2024
1 parent b13b623 commit aa7ce50
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
27 changes: 11 additions & 16 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package snowflake

import (
"context"
"fmt"
"strconv"
"strings"
Expand All @@ -9,19 +10,16 @@ import (

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/kafkalib/partition"
"github.com/artie-labs/transfer/lib/sql"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/kafkalib/partition"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
)

func (s *SnowflakeTestSuite) identifierFor(tableData *optimization.TableData) sql.TableIdentifier {
Expand Down Expand Up @@ -80,7 +78,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {

s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, nil, false, true))

err := s.stageStore.Merge(tableData)
err := s.stageStore.Merge(context.Background(), tableData)
_col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name")
assert.True(s.T(), isOk)
assert.Equal(s.T(), _col.KindDetails, typing.String)
Expand Down Expand Up @@ -126,7 +124,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() {

s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, nil, false, true))

assert.NoError(s.T(), s.stageStore.Merge(tableData))
assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount())
}

Expand Down Expand Up @@ -174,7 +172,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {
tableID := s.identifierFor(tableData)
fqName := tableID.FullyQualifiedName()
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, true))
err := s.stageStore.Merge(tableData)
err := s.stageStore.Merge(context.Background(), tableData)
assert.Nil(s.T(), err)
s.fakeStageStore.ExecReturns(nil, nil)
// CREATE TABLE IF NOT EXISTS customer.public.orders___artie_Mwv9YADmRy (id int,name string,__artie_delete boolean,created_at timestamp_tz) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) COMMENT='expires:2023-06-27 11:54:03 UTC'
Expand Down Expand Up @@ -258,8 +256,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
_config := types.NewDwhTableConfig(&sflkCols, nil, false, true)
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), _config)

err := s.stageStore.Merge(tableData)
assert.Nil(s.T(), err)
assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
s.fakeStageStore.ExecReturns(nil, nil)
assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 5, "called merge")

Expand All @@ -282,8 +279,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
break
}

err = s.stageStore.Merge(tableData)
assert.NoError(s.T(), err)
assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
s.fakeStageStore.ExecReturns(nil, nil)
assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 10, "called merge again")

Expand All @@ -294,8 +290,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {

func (s *SnowflakeTestSuite) TestExecuteMergeExitEarly() {
tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{}, "foo")
err := s.stageStore.Merge(tableData)
assert.Nil(s.T(), err)
assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
}

func (s *SnowflakeTestSuite) TestStore_AdditionalEqualityStrings() {
Expand Down
11 changes: 5 additions & 6 deletions clients/snowflake/staging_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package snowflake

import (
"context"
"encoding/csv"
"fmt"
"io"
Expand All @@ -9,15 +10,13 @@ import (

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/destination/types"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -157,7 +156,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() {
sflkTc := s.stageStore.GetConfigMap().TableConfigCache(tempTableID)

{
assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, true))
assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(context.Background(), tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, true))
assert.Equal(s.T(), 3, s.fakeStageStore.ExecCallCount())

// First call is to create the temp table
Expand All @@ -179,7 +178,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() {
}
{
// Don't create the temporary table.
assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, false))
assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(context.Background(), tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, false))
assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount())
}

Expand Down

0 comments on commit aa7ce50

Please sign in to comment.