Skip to content

Commit

Permalink
Moving types around (#927)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 25, 2024
1 parent 756a7fe commit 12da05a
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 197 deletions.
24 changes: 0 additions & 24 deletions lib/config/bigquery.go

This file was deleted.

105 changes: 20 additions & 85 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,10 @@ const (
FlushIntervalSecondsMax = 6 * 60 * 60
)

type Sentry struct {
DSN string `yaml:"dsn"`
}

type Pubsub struct {
ProjectID string `yaml:"projectID"`
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`
PathToCredentials string `yaml:"pathToCredentials"`
}

type Kafka struct {
// Comma-separated Kafka servers to port.
// e.g. host1:port1,host2:port2,...
// Following kafka's spec mentioned here: https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers
BootstrapServer string `yaml:"bootstrapServer"`
GroupID string `yaml:"groupID"`
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`

// Optional parameters
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
EnableAWSMSKIAM bool `yaml:"enableAWSMKSIAM,omitempty"`
DisableTLS bool `yaml:"disableTLS,omitempty"`
}

func (k *Kafka) BootstrapServers() []string {
return strings.Split(k.BootstrapServer, ",")
}

type S3Settings struct {
FolderName string `yaml:"folderName"`
Bucket string `yaml:"bucket"`
AwsAccessKeyID string `yaml:"awsAccessKeyID"`
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
OutputFormat constants.S3OutputFormat `yaml:"outputFormat"`
}

func (s *S3Settings) Validate() error {
if s == nil {
return fmt.Errorf("s3 settings are nil")
Expand All @@ -77,18 +44,6 @@ func (s *S3Settings) Validate() error {
return nil
}

type Redshift struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Bucket string `yaml:"bucket"`
OptionalS3Prefix string `yaml:"optionalS3Prefix"`
// https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html
CredentialsClause string `yaml:"credentialsClause"`
}

func (p *Pubsub) String() string {
return fmt.Sprintf("project_id=%s, pathToCredentials=%s", p.ProjectID, p.PathToCredentials)
}
Expand All @@ -110,50 +65,10 @@ func (c Config) TopicConfigs() ([]*kafkalib.TopicConfig, error) {
return nil, fmt.Errorf("unsupported queue: %q", c.Queue)
}

type Mode string

const (
History Mode = "history"
Replication Mode = "replication"
)

func (m Mode) String() string {
return string(m)
}

type Config struct {
Mode Mode `yaml:"mode"`
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`

// Flush rules
FlushIntervalSeconds int `yaml:"flushIntervalSeconds"`
FlushSizeKb int `yaml:"flushSizeKb"`
BufferRows uint `yaml:"bufferRows"`

// Supported message queues
Pubsub *Pubsub `yaml:"pubsub,omitempty"`
Kafka *Kafka `yaml:"kafka,omitempty"`

// Supported destinations
MSSQL *MSSQL `yaml:"mssql,omitempty"`
BigQuery *BigQuery `yaml:"bigquery,omitempty"`
Snowflake *Snowflake `yaml:"snowflake,omitempty"`
Redshift *Redshift `yaml:"redshift,omitempty"`
S3 *S3Settings `yaml:"s3,omitempty"`

Reporting struct {
Sentry *Sentry `yaml:"sentry"`
}

Telemetry struct {
Metrics struct {
Provider constants.ExporterKind `yaml:"provider"`
Settings map[string]any `yaml:"settings,omitempty"`
}
}
}

func readFileToConfig(pathToConfig string) (*Config, error) {
file, err := os.Open(pathToConfig)
if err != nil {
Expand Down Expand Up @@ -219,6 +134,26 @@ func (c Config) ValidateRedshift() error {
return nil
}

func (c Config) ValidateMSSQL() error {
if c.Output != constants.MSSQL {
return fmt.Errorf("output is not mssql, output: %v", c.Output)
}

if c.MSSQL == nil {
return fmt.Errorf("mssql config is nil")
}

if empty := stringutil.Empty(c.MSSQL.Host, c.MSSQL.Username, c.MSSQL.Password, c.MSSQL.Database); empty {
return fmt.Errorf("one of mssql settings is empty (host, username, password, database)")
}

if c.MSSQL.Port <= 0 {
return fmt.Errorf("invalid mssql port: %d", c.MSSQL.Port)
}

return nil
}

// Validate will check the output source validity
// It will also check if a topic exists + iterate over each topic to make sure it's valid.
// The actual output source (like Snowflake) and CDC parser will be loaded and checked by other funcs.
Expand Down
26 changes: 0 additions & 26 deletions lib/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,6 @@ func TestBigQuery_DSN(t *testing.T) {
assert.Equal(t, "bigquery://project/eu/dataset", b.DSN())
}

func TestKafka_BootstrapServers(t *testing.T) {
type _tc struct {
bootstrapServerString string
expectedBootstrapServers []string
}

tcs := []_tc{
{
bootstrapServerString: "localhost:9092",
expectedBootstrapServers: []string{"localhost:9092"},
},
{
bootstrapServerString: "a:9092,b:9093,c:9094",
expectedBootstrapServers: []string{"a:9092", "b:9093", "c:9094"},
},
}

for idx, tc := range tcs {
k := Kafka{
BootstrapServer: tc.bootstrapServerString,
}

assert.Equal(t, tc.expectedBootstrapServers, k.BootstrapServers(), idx)
}
}

func TestKafka_String(t *testing.T) {
k := Kafka{
BootstrapServer: "server",
Expand Down
53 changes: 53 additions & 0 deletions lib/config/destination_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package config

import "github.com/artie-labs/transfer/lib/config/constants"

type BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
// Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
}

type MSSQL struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`
}

type Redshift struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Bucket string `yaml:"bucket"`
OptionalS3Prefix string `yaml:"optionalS3Prefix"`
// https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html
CredentialsClause string `yaml:"credentialsClause"`
}

type S3Settings struct {
FolderName string `yaml:"folderName"`
Bucket string `yaml:"bucket"`
AwsAccessKeyID string `yaml:"awsAccessKeyID"`
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
OutputFormat constants.S3OutputFormat `yaml:"outputFormat"`
}

type Snowflake struct {
AccountID string `yaml:"account"`
Username string `yaml:"username"`
// If pathToPrivateKey is specified, the password field will be ignored
PathToPrivateKey string `yaml:"pathToPrivateKey,omitempty"`
Password string `yaml:"password,omitempty"`

Warehouse string `yaml:"warehouse"`
Region string `yaml:"region"`
Host string `yaml:"host"`
Application string `yaml:"application"`
}
36 changes: 25 additions & 11 deletions lib/config/snowflake.go → lib/config/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,37 @@ package config

import (
"fmt"
"net/url"

"github.com/artie-labs/transfer/lib/cryptography"
"github.com/artie-labs/transfer/lib/typing"
"github.com/snowflakedb/gosnowflake"
)

type Snowflake struct {
AccountID string `yaml:"account"`
Username string `yaml:"username"`
// If pathToPrivateKey is specified, the password field will be ignored
PathToPrivateKey string `yaml:"pathToPrivateKey,omitempty"`
Password string `yaml:"password,omitempty"`

Warehouse string `yaml:"warehouse"`
Region string `yaml:"region"`
Host string `yaml:"host"`
Application string `yaml:"application"`
// DSN - returns the notation for BigQuery following this format: bigquery://projectID/[location/]datasetID?queryString
// If location is passed in, we'll specify it. Else, it'll default to empty and our library will set it to US.
func (b *BigQuery) DSN() string {
dsn := fmt.Sprintf("bigquery://%s/%s", b.ProjectID, b.DefaultDataset)

if b.Location != "" {
dsn = fmt.Sprintf("bigquery://%s/%s/%s", b.ProjectID, b.Location, b.DefaultDataset)
}

return dsn
}

func (m MSSQL) DSN() string {
query := url.Values{}
query.Add("database", m.Database)

u := &url.URL{
Scheme: "sqlserver",
User: url.UserPassword(m.Username, m.Password),
Host: fmt.Sprintf("%s:%d", m.Host, m.Port),
RawQuery: query.Encode(),
}

return u.String()
}

func (s Snowflake) ToConfig() (*gosnowflake.Config, error) {
Expand Down
51 changes: 0 additions & 51 deletions lib/config/mssql.go

This file was deleted.

Loading

0 comments on commit 12da05a

Please sign in to comment.