From 2440919fb7aa5812b9c2d7d47402c0fe718c2e1b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 17 Sep 2024 11:12:19 -0700 Subject: [PATCH] [Snowflake] Address idle action item follow ups (#907) --- clients/snowflake/errors.go | 11 ------ clients/snowflake/errors_test.go | 34 ------------------- clients/snowflake/snowflake.go | 52 ++++++++++------------------- clients/snowflake/snowflake_test.go | 9 ++--- clients/snowflake/writes.go | 52 ++++------------------------- 5 files changed, 26 insertions(+), 132 deletions(-) delete mode 100644 clients/snowflake/errors.go delete mode 100644 clients/snowflake/errors_test.go diff --git a/clients/snowflake/errors.go b/clients/snowflake/errors.go deleted file mode 100644 index 5b26e3b44..000000000 --- a/clients/snowflake/errors.go +++ /dev/null @@ -1,11 +0,0 @@ -package snowflake - -import "strings" - -func IsAuthExpiredError(err error) bool { - if err == nil { - return false - } - - return strings.Contains(err.Error(), "Authentication token has expired") -} diff --git a/clients/snowflake/errors_test.go b/clients/snowflake/errors_test.go deleted file mode 100644 index 6fd8af129..000000000 --- a/clients/snowflake/errors_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package snowflake - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAuthenticationExpirationErr(t *testing.T) { - type _tc struct { - err error - expected bool - } - - tcs := []_tc{ - { - err: fmt.Errorf("390114: Authentication token has expired. The user must authenticate again."), - expected: true, - }, - { - err: nil, - expected: false, - }, - { - err: fmt.Errorf("some random error"), - expected: false, - }, - } - - for idx, tc := range tcs { - assert.Equal(t, tc.expected, IsAuthExpiredError(tc.err), idx) - } -} diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index df8c6b0f3..95a3e8575 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -17,11 +17,8 @@ import ( "github.com/artie-labs/transfer/lib/typing" ) -const maxRetries = 10 - type Store struct { db.Store - testDB bool // Used for testing configMap *types.DwhToTablesConfigMap config config.Config } @@ -70,30 +67,6 @@ func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap { return s.configMap } -func (s *Store) reestablishConnection() error { - if s.testDB { - // Don't actually re-establish for tests. - return nil - } - - cfg, err := s.config.Snowflake.ToConfig() - if err != nil { - return fmt.Errorf("failed to get snowflake config: %w", err) - } - - dsn, err := gosnowflake.DSN(cfg) - if err != nil { - return fmt.Errorf("failed to get Snowflake DSN: %w", err) - } - - store, err := db.Open("snowflake", dsn) - if err != nil { - return err - } - s.Store = store - return nil -} - // Dedupe takes a table and will remove duplicates based on the primary key(s). // These queries are inspired and modified from: https://stackoverflow.com/a/71515946 func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { @@ -106,21 +79,30 @@ func LoadSnowflake(cfg config.Config, _store *db.Store) (*Store, error) { if _store != nil { // Used for tests. return &Store{ - testDB: true, configMap: &types.DwhToTablesConfigMap{}, config: cfg, - - Store: *_store, + Store: *_store, }, nil } - s := &Store{ - configMap: &types.DwhToTablesConfigMap{}, - config: cfg, + snowflakeCfg, err := cfg.Snowflake.ToConfig() + if err != nil { + return nil, fmt.Errorf("failed to get Snowflake config: %w", err) + } + + dsn, err := gosnowflake.DSN(snowflakeCfg) + if err != nil { + return nil, fmt.Errorf("failed to get Snowflake DSN: %w", err) } - if err := s.reestablishConnection(); err != nil { + store, err := db.Open("snowflake", dsn) + if err != nil { return nil, err } - return s, nil + + return &Store{ + configMap: &types.DwhToTablesConfigMap{}, + config: cfg, + Store: store, + }, nil } diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 21ed189de..8d8851af5 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -126,13 +126,8 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() { s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, nil, false, true)) - s.fakeStageStore.ExecReturnsOnCall(0, nil, fmt.Errorf("390114: Authentication token has expired. The user must authenticate again.")) - err := s.stageStore.Merge(tableData) - assert.NoError(s.T(), err, "transient errors like auth errors will be retried") - - // 5 regular ones and then 1 additional one to re-establish auth and another one for dropping the temporary table - baseline := 5 - assert.Equal(s.T(), baseline+2, s.fakeStageStore.ExecCallCount(), "called merge") + assert.NoError(s.T(), s.stageStore.Merge(tableData)) + assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount()) } func (s *SnowflakeTestSuite) TestExecuteMerge() { diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index c69e893f6..f99c6ad01 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -1,12 +1,9 @@ package snowflake import ( - "log/slog" - "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/logger" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -14,28 +11,10 @@ import ( ) func (s *Store) Append(tableData *optimization.TableData, _ bool) error { - var err error - for i := 0; i < maxRetries; i++ { - if i > 0 { - // TODO: remove - if IsAuthExpiredError(err) { - slog.Warn("Authentication has expired, will reload the Snowflake store and retry appending", slog.Any("err", err)) - if connErr := s.reestablishConnection(); connErr != nil { - // TODO: Remove this panic and return an error instead. Ensure the callers of [Append] handle this properly. - logger.Panic("Failed to reestablish connection", slog.Any("err", connErr)) - } - } else { - break - } - } - - // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. - err = shared.Append(s, tableData, types.AdditionalSettings{ - AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, - }) - } - - return err + // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. + return shared.Append(s, tableData, types.AdditionalSettings{ + AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, + }) } func (s *Store) additionalEqualityStrings(tableData *optimization.TableData) []string { @@ -47,24 +26,7 @@ func (s *Store) additionalEqualityStrings(tableData *optimization.TableData) []s } func (s *Store) Merge(tableData *optimization.TableData) error { - var err error - for i := 0; i < maxRetries; i++ { - if i > 0 { - // TODO: Remove - if IsAuthExpiredError(err) { - slog.Warn("Authentication has expired, will reload the Snowflake store and retry merging", slog.Any("err", err)) - if connErr := s.reestablishConnection(); connErr != nil { - // TODO: Remove this panic and return an error instead. Ensure the callers of [Merge] handle this properly. - logger.Panic("Failed to reestablish connection", slog.Any("err", connErr)) - } - } else { - break - } - } - - err = shared.Merge(s, tableData, types.MergeOpts{ - AdditionalEqualityStrings: s.additionalEqualityStrings(tableData), - }) - } - return err + return shared.Merge(s, tableData, types.MergeOpts{ + AdditionalEqualityStrings: s.additionalEqualityStrings(tableData), + }) }