Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 17, 2024
1 parent 665e1ae commit 8474127
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 123 deletions.
11 changes: 0 additions & 11 deletions clients/snowflake/errors.go

This file was deleted.

34 changes: 0 additions & 34 deletions clients/snowflake/errors_test.go

This file was deleted.

50 changes: 17 additions & 33 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/artie-labs/transfer/lib/typing"
)

const maxRetries = 10

type Store struct {
db.Store
testDB bool // Used for testing
Expand Down Expand Up @@ -70,30 +68,6 @@ func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap {
return s.configMap
}

func (s *Store) reestablishConnection() error {
if s.testDB {
// Don't actually re-establish for tests.
return nil
}

cfg, err := s.config.Snowflake.ToConfig()
if err != nil {
return fmt.Errorf("failed to get snowflake config: %w", err)
}

dsn, err := gosnowflake.DSN(cfg)
if err != nil {
return fmt.Errorf("failed to get Snowflake DSN: %w", err)
}

store, err := db.Open("snowflake", dsn)
if err != nil {
return err
}
s.Store = store
return nil
}

// Dedupe takes a table and will remove duplicates based on the primary key(s).
// These queries are inspired and modified from: https://stackoverflow.com/a/71515946
func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
Expand All @@ -109,18 +83,28 @@ func LoadSnowflake(cfg config.Config, _store *db.Store) (*Store, error) {
testDB: true,
configMap: &types.DwhToTablesConfigMap{},
config: cfg,

Store: *_store,
Store: *_store,
}, nil
}

s := &Store{
configMap: &types.DwhToTablesConfigMap{},
config: cfg,
snowflakeCfg, err := cfg.Snowflake.ToConfig()
if err != nil {
return nil, fmt.Errorf("failed to get Snowflake config: %w", err)
}

dsn, err := gosnowflake.DSN(snowflakeCfg)
if err != nil {
return nil, fmt.Errorf("failed to get Snowflake DSN: %w", err)
}

if err := s.reestablishConnection(); err != nil {
store, err := db.Open("snowflake", dsn)
if err != nil {
return nil, err
}
return s, nil

return &Store{
configMap: &types.DwhToTablesConfigMap{},
config: cfg,
Store: store,
}, nil
}
51 changes: 6 additions & 45 deletions clients/snowflake/writes.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,19 @@
package snowflake

import (
"log/slog"

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) Append(tableData *optimization.TableData, _ bool) error {
var err error
for i := 0; i < maxRetries; i++ {
if i > 0 {
// TODO: remove
if IsAuthExpiredError(err) {
slog.Warn("Authentication has expired, will reload the Snowflake store and retry appending", slog.Any("err", err))
if connErr := s.reestablishConnection(); connErr != nil {
// TODO: Remove this panic and return an error instead. Ensure the callers of [Append] handle this properly.
logger.Panic("Failed to reestablish connection", slog.Any("err", connErr))
}
} else {
break
}
}

// TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing.
err = shared.Append(s, tableData, types.AdditionalSettings{
AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`,
})
}

return err
return shared.Append(s, tableData, types.AdditionalSettings{
AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`,
})
}

func (s *Store) additionalEqualityStrings(tableData *optimization.TableData) []string {
Expand All @@ -47,24 +25,7 @@ func (s *Store) additionalEqualityStrings(tableData *optimization.TableData) []s
}

func (s *Store) Merge(tableData *optimization.TableData) error {
var err error
for i := 0; i < maxRetries; i++ {
if i > 0 {
// TODO: Remove
if IsAuthExpiredError(err) {
slog.Warn("Authentication has expired, will reload the Snowflake store and retry merging", slog.Any("err", err))
if connErr := s.reestablishConnection(); connErr != nil {
// TODO: Remove this panic and return an error instead. Ensure the callers of [Merge] handle this properly.
logger.Panic("Failed to reestablish connection", slog.Any("err", connErr))
}
} else {
break
}
}

err = shared.Merge(s, tableData, types.MergeOpts{
AdditionalEqualityStrings: s.additionalEqualityStrings(tableData),
})
}
return err
return shared.Merge(s, tableData, types.MergeOpts{
AdditionalEqualityStrings: s.additionalEqualityStrings(tableData),
})
}

0 comments on commit 8474127

Please sign in to comment.