Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 24, 2024
1 parent b84bf80 commit eb56ba4
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 28 deletions.
21 changes: 3 additions & 18 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
)

Expand Down Expand Up @@ -86,23 +85,9 @@ func (s *Store) reestablishConnection() error {
return nil
}

cfg := &gosnowflake.Config{
Account: s.config.Snowflake.AccountID,
User: s.config.Snowflake.Username,
Password: s.config.Snowflake.Password,
Warehouse: s.config.Snowflake.Warehouse,
Region: s.config.Snowflake.Region,
Application: s.config.Snowflake.Application,
Params: map[string]*string{
// https://docs.snowflake.com/en/sql-reference/parameters#abort-detached-query
"ABORT_DETACHED_QUERY": ptr.ToString("true"),
},
}

if s.config.Snowflake.Host != "" {
// If the host is specified
cfg.Host = s.config.Snowflake.Host
cfg.Region = ""
cfg, err := s.config.Snowflake.ToConfig()
if err != nil {
return fmt.Errorf("faield to get snowflake config: %w", err)
}

dsn, err := gosnowflake.DSN(cfg)
Expand Down
10 changes: 0 additions & 10 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,6 @@ type SharedTransferConfig struct {
TypingSettings typing.Settings `yaml:"typingSettings"`
}

type Snowflake struct {
AccountID string `yaml:"account"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Warehouse string `yaml:"warehouse"`
Region string `yaml:"region"`
Host string `yaml:"host"`
Application string `yaml:"application"`
}

func (p *Pubsub) String() string {
return fmt.Sprintf("project_id=%s, pathToCredentials=%s", p.ProjectID, p.PathToCredentials)
}
Expand Down
80 changes: 80 additions & 0 deletions lib/config/snowflake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package config

import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"os"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/snowflakedb/gosnowflake"
)

type Snowflake struct {
AccountID string `yaml:"account"`

Username string `yaml:"username"`
// If pathToPrivateKey is specified, the password field will be ignored
PathToPrivateKey string `json:"pathToPrivateKey,omitempty"`
Password string `yaml:"password,omitempty"`

Warehouse string `yaml:"warehouse"`
Region string `yaml:"region"`
Host string `yaml:"host"`

Application string `yaml:"application"`
}

func (s Snowflake) ToConfig() (*gosnowflake.Config, error) {
cfg := &gosnowflake.Config{
Account: s.AccountID,
User: s.Username,
Warehouse: s.Warehouse,
Region: s.Region,
Application: s.Application,
Params: map[string]*string{
// https://docs.snowflake.com/en/sql-reference/parameters#abort-detached-query
"ABORT_DETACHED_QUERY": ptr.ToString("true"),
},
}

if s.PathToPrivateKey != "" {
key, err := loadPrivateKey(s.PathToPrivateKey)
if err != nil {
return nil, fmt.Errorf("failed to load private key: %w", err)
}

cfg.PrivateKey = key
cfg.Authenticator = gosnowflake.AuthTypeJwt
} else {
cfg.Password = s.Password
}

if s.Host != "" {
// If the host is specified
cfg.Host = s.Host
cfg.Region = ""
}

return cfg, nil
}

func loadPrivateKey(path string) (*rsa.PrivateKey, error) {
keyBytes, err := os.ReadFile(path)
if err != nil {
return nil, err
}

block, _ := pem.Decode(keyBytes)
if block == nil || block.Type != "RSA PRIVATE KEY" {
return nil, fmt.Errorf("failed to decode PEM block containing private key")
}

key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse private key: %v", err)
}

return key, nil
}

0 comments on commit eb56ba4

Please sign in to comment.